root / test / ganeti.utils_unittest.py @ c50645c0
History | View | Annotate | Download (33.3 kB)
1 |
#!/usr/bin/python
|
---|---|
2 |
#
|
3 |
|
4 |
# Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
|
5 |
#
|
6 |
# This program is free software; you can redistribute it and/or modify
|
7 |
# it under the terms of the GNU General Public License as published by
|
8 |
# the Free Software Foundation; either version 2 of the License, or
|
9 |
# (at your option) any later version.
|
10 |
#
|
11 |
# This program is distributed in the hope that it will be useful, but
|
12 |
# WITHOUT ANY WARRANTY; without even the implied warranty of
|
13 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
14 |
# General Public License for more details.
|
15 |
#
|
16 |
# You should have received a copy of the GNU General Public License
|
17 |
# along with this program; if not, write to the Free Software
|
18 |
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
|
19 |
# 02110-1301, USA.
|
20 |
|
21 |
|
22 |
"""Script for unittesting the utils module"""
|
23 |
|
24 |
import errno |
25 |
import fcntl |
26 |
import glob |
27 |
import os |
28 |
import os.path |
29 |
import re |
30 |
import shutil |
31 |
import signal |
32 |
import socket |
33 |
import stat |
34 |
import tempfile |
35 |
import time |
36 |
import unittest |
37 |
import warnings |
38 |
import random |
39 |
import operator |
40 |
|
41 |
import testutils |
42 |
from ganeti import constants |
43 |
from ganeti import compat |
44 |
from ganeti import utils |
45 |
from ganeti import errors |
46 |
from ganeti.utils import RunCmd, \ |
47 |
FirstFree, \
|
48 |
RunParts, \
|
49 |
SetEtcHostsEntry, RemoveEtcHostsEntry |
50 |
|
51 |
|
52 |
class TestIsProcessAlive(unittest.TestCase): |
53 |
"""Testing case for IsProcessAlive"""
|
54 |
|
55 |
def testExists(self): |
56 |
mypid = os.getpid() |
57 |
self.assert_(utils.IsProcessAlive(mypid), "can't find myself running") |
58 |
|
59 |
def testNotExisting(self): |
60 |
pid_non_existing = os.fork() |
61 |
if pid_non_existing == 0: |
62 |
os._exit(0)
|
63 |
elif pid_non_existing < 0: |
64 |
raise SystemError("can't fork") |
65 |
os.waitpid(pid_non_existing, 0)
|
66 |
self.assertFalse(utils.IsProcessAlive(pid_non_existing),
|
67 |
"nonexisting process detected")
|
68 |
|
69 |
|
70 |
class TestGetProcStatusPath(unittest.TestCase): |
71 |
def test(self): |
72 |
self.assert_("/1234/" in utils._GetProcStatusPath(1234)) |
73 |
self.assertNotEqual(utils._GetProcStatusPath(1), |
74 |
utils._GetProcStatusPath(2))
|
75 |
|
76 |
|
77 |
class TestIsProcessHandlingSignal(unittest.TestCase): |
78 |
def setUp(self): |
79 |
self.tmpdir = tempfile.mkdtemp()
|
80 |
|
81 |
def tearDown(self): |
82 |
shutil.rmtree(self.tmpdir)
|
83 |
|
84 |
def testParseSigsetT(self): |
85 |
self.assertEqual(len(utils._ParseSigsetT("0")), 0) |
86 |
self.assertEqual(utils._ParseSigsetT("1"), set([1])) |
87 |
self.assertEqual(utils._ParseSigsetT("1000a"), set([2, 4, 17])) |
88 |
self.assertEqual(utils._ParseSigsetT("810002"), set([2, 17, 24, ])) |
89 |
self.assertEqual(utils._ParseSigsetT("0000000180000202"), |
90 |
set([2, 10, 32, 33])) |
91 |
self.assertEqual(utils._ParseSigsetT("0000000180000002"), |
92 |
set([2, 32, 33])) |
93 |
self.assertEqual(utils._ParseSigsetT("0000000188000002"), |
94 |
set([2, 28, 32, 33])) |
95 |
self.assertEqual(utils._ParseSigsetT("000000004b813efb"), |
96 |
set([1, 2, 4, 5, 6, 7, 8, 10, 11, 12, 13, 14, 17, |
97 |
24, 25, 26, 28, 31])) |
98 |
self.assertEqual(utils._ParseSigsetT("ffffff"), set(range(1, 25))) |
99 |
|
100 |
def testGetProcStatusField(self): |
101 |
for field in ["SigCgt", "Name", "FDSize"]: |
102 |
for value in ["", "0", "cat", " 1234 KB"]: |
103 |
pstatus = "\n".join([
|
104 |
"VmPeak: 999 kB",
|
105 |
"%s: %s" % (field, value),
|
106 |
"TracerPid: 0",
|
107 |
]) |
108 |
result = utils._GetProcStatusField(pstatus, field) |
109 |
self.assertEqual(result, value.strip())
|
110 |
|
111 |
def test(self): |
112 |
sp = utils.PathJoin(self.tmpdir, "status") |
113 |
|
114 |
utils.WriteFile(sp, data="\n".join([
|
115 |
"Name: bash",
|
116 |
"State: S (sleeping)",
|
117 |
"SleepAVG: 98%",
|
118 |
"Pid: 22250",
|
119 |
"PPid: 10858",
|
120 |
"TracerPid: 0",
|
121 |
"SigBlk: 0000000000010000",
|
122 |
"SigIgn: 0000000000384004",
|
123 |
"SigCgt: 000000004b813efb",
|
124 |
"CapEff: 0000000000000000",
|
125 |
])) |
126 |
|
127 |
self.assert_(utils.IsProcessHandlingSignal(1234, 10, status_path=sp)) |
128 |
|
129 |
def testNoSigCgt(self): |
130 |
sp = utils.PathJoin(self.tmpdir, "status") |
131 |
|
132 |
utils.WriteFile(sp, data="\n".join([
|
133 |
"Name: bash",
|
134 |
])) |
135 |
|
136 |
self.assertRaises(RuntimeError, utils.IsProcessHandlingSignal, |
137 |
1234, 10, status_path=sp) |
138 |
|
139 |
def testNoSuchFile(self): |
140 |
sp = utils.PathJoin(self.tmpdir, "notexist") |
141 |
|
142 |
self.assertFalse(utils.IsProcessHandlingSignal(1234, 10, status_path=sp)) |
143 |
|
144 |
@staticmethod
|
145 |
def _TestRealProcess(): |
146 |
signal.signal(signal.SIGUSR1, signal.SIG_DFL) |
147 |
if utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
|
148 |
raise Exception("SIGUSR1 is handled when it should not be") |
149 |
|
150 |
signal.signal(signal.SIGUSR1, lambda signum, frame: None) |
151 |
if not utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1): |
152 |
raise Exception("SIGUSR1 is not handled when it should be") |
153 |
|
154 |
signal.signal(signal.SIGUSR1, signal.SIG_IGN) |
155 |
if utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
|
156 |
raise Exception("SIGUSR1 is not handled when it should be") |
157 |
|
158 |
signal.signal(signal.SIGUSR1, signal.SIG_DFL) |
159 |
if utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
|
160 |
raise Exception("SIGUSR1 is handled when it should not be") |
161 |
|
162 |
return True |
163 |
|
164 |
def testRealProcess(self): |
165 |
self.assert_(utils.RunInSeparateProcess(self._TestRealProcess)) |
166 |
|
167 |
|
168 |
class TestRunCmd(testutils.GanetiTestCase): |
169 |
"""Testing case for the RunCmd function"""
|
170 |
|
171 |
def setUp(self): |
172 |
testutils.GanetiTestCase.setUp(self)
|
173 |
self.magic = time.ctime() + " ganeti test" |
174 |
self.fname = self._CreateTempFile() |
175 |
self.fifo_tmpdir = tempfile.mkdtemp()
|
176 |
self.fifo_file = os.path.join(self.fifo_tmpdir, "ganeti_test_fifo") |
177 |
os.mkfifo(self.fifo_file)
|
178 |
|
179 |
def tearDown(self): |
180 |
shutil.rmtree(self.fifo_tmpdir)
|
181 |
testutils.GanetiTestCase.tearDown(self)
|
182 |
|
183 |
def testOk(self): |
184 |
"""Test successful exit code"""
|
185 |
result = RunCmd("/bin/sh -c 'exit 0'")
|
186 |
self.assertEqual(result.exit_code, 0) |
187 |
self.assertEqual(result.output, "") |
188 |
|
189 |
def testFail(self): |
190 |
"""Test fail exit code"""
|
191 |
result = RunCmd("/bin/sh -c 'exit 1'")
|
192 |
self.assertEqual(result.exit_code, 1) |
193 |
self.assertEqual(result.output, "") |
194 |
|
195 |
def testStdout(self): |
196 |
"""Test standard output"""
|
197 |
cmd = 'echo -n "%s"' % self.magic |
198 |
result = RunCmd("/bin/sh -c '%s'" % cmd)
|
199 |
self.assertEqual(result.stdout, self.magic) |
200 |
result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname) |
201 |
self.assertEqual(result.output, "") |
202 |
self.assertFileContent(self.fname, self.magic) |
203 |
|
204 |
def testStderr(self): |
205 |
"""Test standard error"""
|
206 |
cmd = 'echo -n "%s"' % self.magic |
207 |
result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
|
208 |
self.assertEqual(result.stderr, self.magic) |
209 |
result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd, output=self.fname) |
210 |
self.assertEqual(result.output, "") |
211 |
self.assertFileContent(self.fname, self.magic) |
212 |
|
213 |
def testCombined(self): |
214 |
"""Test combined output"""
|
215 |
cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic) |
216 |
expected = "A" + self.magic + "B" + self.magic |
217 |
result = RunCmd("/bin/sh -c '%s'" % cmd)
|
218 |
self.assertEqual(result.output, expected)
|
219 |
result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname) |
220 |
self.assertEqual(result.output, "") |
221 |
self.assertFileContent(self.fname, expected) |
222 |
|
223 |
def testSignal(self): |
224 |
"""Test signal"""
|
225 |
result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"]) |
226 |
self.assertEqual(result.signal, 15) |
227 |
self.assertEqual(result.output, "") |
228 |
|
229 |
def testTimeoutClean(self): |
230 |
cmd = "trap 'exit 0' TERM; read < %s" % self.fifo_file |
231 |
result = RunCmd(["/bin/sh", "-c", cmd], timeout=0.2) |
232 |
self.assertEqual(result.exit_code, 0) |
233 |
|
234 |
def testTimeoutKill(self): |
235 |
cmd = ["/bin/sh", "-c", "trap '' TERM; read < %s" % self.fifo_file] |
236 |
timeout = 0.2
|
237 |
out, err, status, ta = utils._RunCmdPipe(cmd, {}, False, "/", False, |
238 |
timeout, _linger_timeout=0.2)
|
239 |
self.assert_(status < 0) |
240 |
self.assertEqual(-status, signal.SIGKILL)
|
241 |
|
242 |
def testTimeoutOutputAfterTerm(self): |
243 |
cmd = "trap 'echo sigtermed; exit 1' TERM; read < %s" % self.fifo_file |
244 |
result = RunCmd(["/bin/sh", "-c", cmd], timeout=0.2) |
245 |
self.assert_(result.failed)
|
246 |
self.assertEqual(result.stdout, "sigtermed\n") |
247 |
|
248 |
def testListRun(self): |
249 |
"""Test list runs"""
|
250 |
result = RunCmd(["true"])
|
251 |
self.assertEqual(result.signal, None) |
252 |
self.assertEqual(result.exit_code, 0) |
253 |
result = RunCmd(["/bin/sh", "-c", "exit 1"]) |
254 |
self.assertEqual(result.signal, None) |
255 |
self.assertEqual(result.exit_code, 1) |
256 |
result = RunCmd(["echo", "-n", self.magic]) |
257 |
self.assertEqual(result.signal, None) |
258 |
self.assertEqual(result.exit_code, 0) |
259 |
self.assertEqual(result.stdout, self.magic) |
260 |
|
261 |
def testFileEmptyOutput(self): |
262 |
"""Test file output"""
|
263 |
result = RunCmd(["true"], output=self.fname) |
264 |
self.assertEqual(result.signal, None) |
265 |
self.assertEqual(result.exit_code, 0) |
266 |
self.assertFileContent(self.fname, "") |
267 |
|
268 |
def testLang(self): |
269 |
"""Test locale environment"""
|
270 |
old_env = os.environ.copy() |
271 |
try:
|
272 |
os.environ["LANG"] = "en_US.UTF-8" |
273 |
os.environ["LC_ALL"] = "en_US.UTF-8" |
274 |
result = RunCmd(["locale"])
|
275 |
for line in result.output.splitlines(): |
276 |
key, value = line.split("=", 1) |
277 |
# Ignore these variables, they're overridden by LC_ALL
|
278 |
if key == "LANG" or key == "LANGUAGE": |
279 |
continue
|
280 |
self.failIf(value and value != "C" and value != '"C"', |
281 |
"Variable %s is set to the invalid value '%s'" % (key, value))
|
282 |
finally:
|
283 |
os.environ = old_env |
284 |
|
285 |
def testDefaultCwd(self): |
286 |
"""Test default working directory"""
|
287 |
self.failUnlessEqual(RunCmd(["pwd"]).stdout.strip(), "/") |
288 |
|
289 |
def testCwd(self): |
290 |
"""Test default working directory"""
|
291 |
self.failUnlessEqual(RunCmd(["pwd"], cwd="/").stdout.strip(), "/") |
292 |
self.failUnlessEqual(RunCmd(["pwd"], cwd="/tmp").stdout.strip(), "/tmp") |
293 |
cwd = os.getcwd() |
294 |
self.failUnlessEqual(RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd) |
295 |
|
296 |
def testResetEnv(self): |
297 |
"""Test environment reset functionality"""
|
298 |
self.failUnlessEqual(RunCmd(["env"], reset_env=True).stdout.strip(), "") |
299 |
self.failUnlessEqual(RunCmd(["env"], reset_env=True, |
300 |
env={"FOO": "bar",}).stdout.strip(), "FOO=bar") |
301 |
|
302 |
def testNoFork(self): |
303 |
"""Test that nofork raise an error"""
|
304 |
self.assertFalse(utils._no_fork)
|
305 |
utils.DisableFork() |
306 |
try:
|
307 |
self.assertTrue(utils._no_fork)
|
308 |
self.assertRaises(errors.ProgrammerError, RunCmd, ["true"]) |
309 |
finally:
|
310 |
utils._no_fork = False
|
311 |
|
312 |
def testWrongParams(self): |
313 |
"""Test wrong parameters"""
|
314 |
self.assertRaises(errors.ProgrammerError, RunCmd, ["true"], |
315 |
output="/dev/null", interactive=True) |
316 |
|
317 |
|
318 |
class TestRunParts(testutils.GanetiTestCase): |
319 |
"""Testing case for the RunParts function"""
|
320 |
|
321 |
def setUp(self): |
322 |
self.rundir = tempfile.mkdtemp(prefix="ganeti-test", suffix=".tmp") |
323 |
|
324 |
def tearDown(self): |
325 |
shutil.rmtree(self.rundir)
|
326 |
|
327 |
def testEmpty(self): |
328 |
"""Test on an empty dir"""
|
329 |
self.failUnlessEqual(RunParts(self.rundir, reset_env=True), []) |
330 |
|
331 |
def testSkipWrongName(self): |
332 |
"""Test that wrong files are skipped"""
|
333 |
fname = os.path.join(self.rundir, "00test.dot") |
334 |
utils.WriteFile(fname, data="")
|
335 |
os.chmod(fname, stat.S_IREAD | stat.S_IEXEC) |
336 |
relname = os.path.basename(fname) |
337 |
self.failUnlessEqual(RunParts(self.rundir, reset_env=True), |
338 |
[(relname, constants.RUNPARTS_SKIP, None)])
|
339 |
|
340 |
def testSkipNonExec(self): |
341 |
"""Test that non executable files are skipped"""
|
342 |
fname = os.path.join(self.rundir, "00test") |
343 |
utils.WriteFile(fname, data="")
|
344 |
relname = os.path.basename(fname) |
345 |
self.failUnlessEqual(RunParts(self.rundir, reset_env=True), |
346 |
[(relname, constants.RUNPARTS_SKIP, None)])
|
347 |
|
348 |
def testError(self): |
349 |
"""Test error on a broken executable"""
|
350 |
fname = os.path.join(self.rundir, "00test") |
351 |
utils.WriteFile(fname, data="")
|
352 |
os.chmod(fname, stat.S_IREAD | stat.S_IEXEC) |
353 |
(relname, status, error) = RunParts(self.rundir, reset_env=True)[0] |
354 |
self.failUnlessEqual(relname, os.path.basename(fname))
|
355 |
self.failUnlessEqual(status, constants.RUNPARTS_ERR)
|
356 |
self.failUnless(error)
|
357 |
|
358 |
def testSorted(self): |
359 |
"""Test executions are sorted"""
|
360 |
files = [] |
361 |
files.append(os.path.join(self.rundir, "64test")) |
362 |
files.append(os.path.join(self.rundir, "00test")) |
363 |
files.append(os.path.join(self.rundir, "42test")) |
364 |
|
365 |
for fname in files: |
366 |
utils.WriteFile(fname, data="")
|
367 |
|
368 |
results = RunParts(self.rundir, reset_env=True) |
369 |
|
370 |
for fname in sorted(files): |
371 |
self.failUnlessEqual(os.path.basename(fname), results.pop(0)[0]) |
372 |
|
373 |
def testOk(self): |
374 |
"""Test correct execution"""
|
375 |
fname = os.path.join(self.rundir, "00test") |
376 |
utils.WriteFile(fname, data="#!/bin/sh\n\necho -n ciao")
|
377 |
os.chmod(fname, stat.S_IREAD | stat.S_IEXEC) |
378 |
(relname, status, runresult) = RunParts(self.rundir, reset_env=True)[0] |
379 |
self.failUnlessEqual(relname, os.path.basename(fname))
|
380 |
self.failUnlessEqual(status, constants.RUNPARTS_RUN)
|
381 |
self.failUnlessEqual(runresult.stdout, "ciao") |
382 |
|
383 |
def testRunFail(self): |
384 |
"""Test correct execution, with run failure"""
|
385 |
fname = os.path.join(self.rundir, "00test") |
386 |
utils.WriteFile(fname, data="#!/bin/sh\n\nexit 1")
|
387 |
os.chmod(fname, stat.S_IREAD | stat.S_IEXEC) |
388 |
(relname, status, runresult) = RunParts(self.rundir, reset_env=True)[0] |
389 |
self.failUnlessEqual(relname, os.path.basename(fname))
|
390 |
self.failUnlessEqual(status, constants.RUNPARTS_RUN)
|
391 |
self.failUnlessEqual(runresult.exit_code, 1) |
392 |
self.failUnless(runresult.failed)
|
393 |
|
394 |
def testRunMix(self): |
395 |
files = [] |
396 |
files.append(os.path.join(self.rundir, "00test")) |
397 |
files.append(os.path.join(self.rundir, "42test")) |
398 |
files.append(os.path.join(self.rundir, "64test")) |
399 |
files.append(os.path.join(self.rundir, "99test")) |
400 |
|
401 |
files.sort() |
402 |
|
403 |
# 1st has errors in execution
|
404 |
utils.WriteFile(files[0], data="#!/bin/sh\n\nexit 1") |
405 |
os.chmod(files[0], stat.S_IREAD | stat.S_IEXEC)
|
406 |
|
407 |
# 2nd is skipped
|
408 |
utils.WriteFile(files[1], data="") |
409 |
|
410 |
# 3rd cannot execute properly
|
411 |
utils.WriteFile(files[2], data="") |
412 |
os.chmod(files[2], stat.S_IREAD | stat.S_IEXEC)
|
413 |
|
414 |
# 4th execs
|
415 |
utils.WriteFile(files[3], data="#!/bin/sh\n\necho -n ciao") |
416 |
os.chmod(files[3], stat.S_IREAD | stat.S_IEXEC)
|
417 |
|
418 |
results = RunParts(self.rundir, reset_env=True) |
419 |
|
420 |
(relname, status, runresult) = results[0]
|
421 |
self.failUnlessEqual(relname, os.path.basename(files[0])) |
422 |
self.failUnlessEqual(status, constants.RUNPARTS_RUN)
|
423 |
self.failUnlessEqual(runresult.exit_code, 1) |
424 |
self.failUnless(runresult.failed)
|
425 |
|
426 |
(relname, status, runresult) = results[1]
|
427 |
self.failUnlessEqual(relname, os.path.basename(files[1])) |
428 |
self.failUnlessEqual(status, constants.RUNPARTS_SKIP)
|
429 |
self.failUnlessEqual(runresult, None) |
430 |
|
431 |
(relname, status, runresult) = results[2]
|
432 |
self.failUnlessEqual(relname, os.path.basename(files[2])) |
433 |
self.failUnlessEqual(status, constants.RUNPARTS_ERR)
|
434 |
self.failUnless(runresult)
|
435 |
|
436 |
(relname, status, runresult) = results[3]
|
437 |
self.failUnlessEqual(relname, os.path.basename(files[3])) |
438 |
self.failUnlessEqual(status, constants.RUNPARTS_RUN)
|
439 |
self.failUnlessEqual(runresult.output, "ciao") |
440 |
self.failUnlessEqual(runresult.exit_code, 0) |
441 |
self.failUnless(not runresult.failed) |
442 |
|
443 |
def testMissingDirectory(self): |
444 |
nosuchdir = utils.PathJoin(self.rundir, "no/such/directory") |
445 |
self.assertEqual(RunParts(nosuchdir), [])
|
446 |
|
447 |
|
448 |
class TestStartDaemon(testutils.GanetiTestCase): |
449 |
def setUp(self): |
450 |
self.tmpdir = tempfile.mkdtemp(prefix="ganeti-test") |
451 |
self.tmpfile = os.path.join(self.tmpdir, "test") |
452 |
|
453 |
def tearDown(self): |
454 |
shutil.rmtree(self.tmpdir)
|
455 |
|
456 |
def testShell(self): |
457 |
utils.StartDaemon("echo Hello World > %s" % self.tmpfile) |
458 |
self._wait(self.tmpfile, 60.0, "Hello World") |
459 |
|
460 |
def testShellOutput(self): |
461 |
utils.StartDaemon("echo Hello World", output=self.tmpfile) |
462 |
self._wait(self.tmpfile, 60.0, "Hello World") |
463 |
|
464 |
def testNoShellNoOutput(self): |
465 |
utils.StartDaemon(["pwd"])
|
466 |
|
467 |
def testNoShellNoOutputTouch(self): |
468 |
testfile = os.path.join(self.tmpdir, "check") |
469 |
self.failIf(os.path.exists(testfile))
|
470 |
utils.StartDaemon(["touch", testfile])
|
471 |
self._wait(testfile, 60.0, "") |
472 |
|
473 |
def testNoShellOutput(self): |
474 |
utils.StartDaemon(["pwd"], output=self.tmpfile) |
475 |
self._wait(self.tmpfile, 60.0, "/") |
476 |
|
477 |
def testNoShellOutputCwd(self): |
478 |
utils.StartDaemon(["pwd"], output=self.tmpfile, cwd=os.getcwd()) |
479 |
self._wait(self.tmpfile, 60.0, os.getcwd()) |
480 |
|
481 |
def testShellEnv(self): |
482 |
utils.StartDaemon("echo \"$GNT_TEST_VAR\"", output=self.tmpfile, |
483 |
env={ "GNT_TEST_VAR": "Hello World", }) |
484 |
self._wait(self.tmpfile, 60.0, "Hello World") |
485 |
|
486 |
def testNoShellEnv(self): |
487 |
utils.StartDaemon(["printenv", "GNT_TEST_VAR"], output=self.tmpfile, |
488 |
env={ "GNT_TEST_VAR": "Hello World", }) |
489 |
self._wait(self.tmpfile, 60.0, "Hello World") |
490 |
|
491 |
def testOutputFd(self): |
492 |
fd = os.open(self.tmpfile, os.O_WRONLY | os.O_CREAT)
|
493 |
try:
|
494 |
utils.StartDaemon(["pwd"], output_fd=fd, cwd=os.getcwd())
|
495 |
finally:
|
496 |
os.close(fd) |
497 |
self._wait(self.tmpfile, 60.0, os.getcwd()) |
498 |
|
499 |
def testPid(self): |
500 |
pid = utils.StartDaemon("echo $$ > %s" % self.tmpfile) |
501 |
self._wait(self.tmpfile, 60.0, str(pid)) |
502 |
|
503 |
def testPidFile(self): |
504 |
pidfile = os.path.join(self.tmpdir, "pid") |
505 |
checkfile = os.path.join(self.tmpdir, "abort") |
506 |
|
507 |
pid = utils.StartDaemon("while sleep 5; do :; done", pidfile=pidfile,
|
508 |
output=self.tmpfile)
|
509 |
try:
|
510 |
fd = os.open(pidfile, os.O_RDONLY) |
511 |
try:
|
512 |
# Check file is locked
|
513 |
self.assertRaises(errors.LockError, utils.LockFile, fd)
|
514 |
|
515 |
pidtext = os.read(fd, 100)
|
516 |
finally:
|
517 |
os.close(fd) |
518 |
|
519 |
self.assertEqual(int(pidtext.strip()), pid) |
520 |
|
521 |
self.assert_(utils.IsProcessAlive(pid))
|
522 |
finally:
|
523 |
# No matter what happens, kill daemon
|
524 |
utils.KillProcess(pid, timeout=5.0, waitpid=False) |
525 |
self.failIf(utils.IsProcessAlive(pid))
|
526 |
|
527 |
self.assertEqual(utils.ReadFile(self.tmpfile), "") |
528 |
|
529 |
def _wait(self, path, timeout, expected): |
530 |
# Due to the asynchronous nature of daemon processes, polling is necessary.
|
531 |
# A timeout makes sure the test doesn't hang forever.
|
532 |
def _CheckFile(): |
533 |
if not (os.path.isfile(path) and |
534 |
utils.ReadFile(path).strip() == expected): |
535 |
raise utils.RetryAgain()
|
536 |
|
537 |
try:
|
538 |
utils.Retry(_CheckFile, (0.01, 1.5, 1.0), timeout) |
539 |
except utils.RetryTimeout:
|
540 |
self.fail("Apparently the daemon didn't run in %s seconds and/or" |
541 |
" didn't write the correct output" % timeout)
|
542 |
|
543 |
def testError(self): |
544 |
self.assertRaises(errors.OpExecError, utils.StartDaemon,
|
545 |
["./does-NOT-EXIST/here/0123456789"])
|
546 |
self.assertRaises(errors.OpExecError, utils.StartDaemon,
|
547 |
["./does-NOT-EXIST/here/0123456789"],
|
548 |
output=os.path.join(self.tmpdir, "DIR/NOT/EXIST")) |
549 |
self.assertRaises(errors.OpExecError, utils.StartDaemon,
|
550 |
["./does-NOT-EXIST/here/0123456789"],
|
551 |
cwd=os.path.join(self.tmpdir, "DIR/NOT/EXIST")) |
552 |
self.assertRaises(errors.OpExecError, utils.StartDaemon,
|
553 |
["./does-NOT-EXIST/here/0123456789"],
|
554 |
output=os.path.join(self.tmpdir, "DIR/NOT/EXIST")) |
555 |
|
556 |
fd = os.open(self.tmpfile, os.O_WRONLY | os.O_CREAT)
|
557 |
try:
|
558 |
self.assertRaises(errors.ProgrammerError, utils.StartDaemon,
|
559 |
["./does-NOT-EXIST/here/0123456789"],
|
560 |
output=self.tmpfile, output_fd=fd)
|
561 |
finally:
|
562 |
os.close(fd) |
563 |
|
564 |
|
565 |
class TestParseCpuMask(unittest.TestCase): |
566 |
"""Test case for the ParseCpuMask function."""
|
567 |
|
568 |
def testWellFormed(self): |
569 |
self.assertEqual(utils.ParseCpuMask(""), []) |
570 |
self.assertEqual(utils.ParseCpuMask("1"), [1]) |
571 |
self.assertEqual(utils.ParseCpuMask("0-2,4,5-5"), [0,1,2,4,5]) |
572 |
|
573 |
def testInvalidInput(self): |
574 |
for data in ["garbage", "0,", "0-1-2", "2-1", "1-a"]: |
575 |
self.assertRaises(errors.ParseError, utils.ParseCpuMask, data)
|
576 |
|
577 |
|
578 |
class TestEtcHosts(testutils.GanetiTestCase): |
579 |
"""Test functions modifying /etc/hosts"""
|
580 |
|
581 |
def setUp(self): |
582 |
testutils.GanetiTestCase.setUp(self)
|
583 |
self.tmpname = self._CreateTempFile() |
584 |
handle = open(self.tmpname, 'w') |
585 |
try:
|
586 |
handle.write('# This is a test file for /etc/hosts\n')
|
587 |
handle.write('127.0.0.1\tlocalhost\n')
|
588 |
handle.write('192.0.2.1 router gw\n')
|
589 |
finally:
|
590 |
handle.close() |
591 |
|
592 |
def testSettingNewIp(self): |
593 |
SetEtcHostsEntry(self.tmpname, '198.51.100.4', 'myhost.example.com', |
594 |
['myhost'])
|
595 |
|
596 |
self.assertFileContent(self.tmpname, |
597 |
"# This is a test file for /etc/hosts\n"
|
598 |
"127.0.0.1\tlocalhost\n"
|
599 |
"192.0.2.1 router gw\n"
|
600 |
"198.51.100.4\tmyhost.example.com myhost\n")
|
601 |
self.assertFileMode(self.tmpname, 0644) |
602 |
|
603 |
def testSettingExistingIp(self): |
604 |
SetEtcHostsEntry(self.tmpname, '192.0.2.1', 'myhost.example.com', |
605 |
['myhost'])
|
606 |
|
607 |
self.assertFileContent(self.tmpname, |
608 |
"# This is a test file for /etc/hosts\n"
|
609 |
"127.0.0.1\tlocalhost\n"
|
610 |
"192.0.2.1\tmyhost.example.com myhost\n")
|
611 |
self.assertFileMode(self.tmpname, 0644) |
612 |
|
613 |
def testSettingDuplicateName(self): |
614 |
SetEtcHostsEntry(self.tmpname, '198.51.100.4', 'myhost', ['myhost']) |
615 |
|
616 |
self.assertFileContent(self.tmpname, |
617 |
"# This is a test file for /etc/hosts\n"
|
618 |
"127.0.0.1\tlocalhost\n"
|
619 |
"192.0.2.1 router gw\n"
|
620 |
"198.51.100.4\tmyhost\n")
|
621 |
self.assertFileMode(self.tmpname, 0644) |
622 |
|
623 |
def testRemovingExistingHost(self): |
624 |
RemoveEtcHostsEntry(self.tmpname, 'router') |
625 |
|
626 |
self.assertFileContent(self.tmpname, |
627 |
"# This is a test file for /etc/hosts\n"
|
628 |
"127.0.0.1\tlocalhost\n"
|
629 |
"192.0.2.1 gw\n")
|
630 |
self.assertFileMode(self.tmpname, 0644) |
631 |
|
632 |
def testRemovingSingleExistingHost(self): |
633 |
RemoveEtcHostsEntry(self.tmpname, 'localhost') |
634 |
|
635 |
self.assertFileContent(self.tmpname, |
636 |
"# This is a test file for /etc/hosts\n"
|
637 |
"192.0.2.1 router gw\n")
|
638 |
self.assertFileMode(self.tmpname, 0644) |
639 |
|
640 |
def testRemovingNonExistingHost(self): |
641 |
RemoveEtcHostsEntry(self.tmpname, 'myhost') |
642 |
|
643 |
self.assertFileContent(self.tmpname, |
644 |
"# This is a test file for /etc/hosts\n"
|
645 |
"127.0.0.1\tlocalhost\n"
|
646 |
"192.0.2.1 router gw\n")
|
647 |
self.assertFileMode(self.tmpname, 0644) |
648 |
|
649 |
def testRemovingAlias(self): |
650 |
RemoveEtcHostsEntry(self.tmpname, 'gw') |
651 |
|
652 |
self.assertFileContent(self.tmpname, |
653 |
"# This is a test file for /etc/hosts\n"
|
654 |
"127.0.0.1\tlocalhost\n"
|
655 |
"192.0.2.1 router\n")
|
656 |
self.assertFileMode(self.tmpname, 0644) |
657 |
|
658 |
|
659 |
class TestGetMounts(unittest.TestCase): |
660 |
"""Test case for GetMounts()."""
|
661 |
|
662 |
TESTDATA = ( |
663 |
"rootfs / rootfs rw 0 0\n"
|
664 |
"none /sys sysfs rw,nosuid,nodev,noexec,relatime 0 0\n"
|
665 |
"none /proc proc rw,nosuid,nodev,noexec,relatime 0 0\n")
|
666 |
|
667 |
def setUp(self): |
668 |
self.tmpfile = tempfile.NamedTemporaryFile()
|
669 |
utils.WriteFile(self.tmpfile.name, data=self.TESTDATA) |
670 |
|
671 |
def testGetMounts(self): |
672 |
self.assertEqual(utils.GetMounts(filename=self.tmpfile.name), |
673 |
[ |
674 |
("rootfs", "/", "rootfs", "rw"), |
675 |
("none", "/sys", "sysfs", "rw,nosuid,nodev,noexec,relatime"), |
676 |
("none", "/proc", "proc", "rw,nosuid,nodev,noexec,relatime"), |
677 |
]) |
678 |
|
679 |
class TestNewUUID(unittest.TestCase): |
680 |
"""Test case for NewUUID"""
|
681 |
|
682 |
def runTest(self): |
683 |
self.failUnless(utils.UUID_RE.match(utils.NewUUID()))
|
684 |
|
685 |
|
686 |
class TestFirstFree(unittest.TestCase): |
687 |
"""Test case for the FirstFree function"""
|
688 |
|
689 |
def test(self): |
690 |
"""Test FirstFree"""
|
691 |
self.failUnlessEqual(FirstFree([0, 1, 3]), 2) |
692 |
self.failUnlessEqual(FirstFree([]), None) |
693 |
self.failUnlessEqual(FirstFree([3, 4, 6]), 0) |
694 |
self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5) |
695 |
self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3) |
696 |
|
697 |
|
698 |
class TestTimeFunctions(unittest.TestCase): |
699 |
"""Test case for time functions"""
|
700 |
|
701 |
def runTest(self): |
702 |
self.assertEqual(utils.SplitTime(1), (1, 0)) |
703 |
self.assertEqual(utils.SplitTime(1.5), (1, 500000)) |
704 |
self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915)) |
705 |
self.assertEqual(utils.SplitTime(123.48012), (123, 480120)) |
706 |
self.assertEqual(utils.SplitTime(123.9996), (123, 999600)) |
707 |
self.assertEqual(utils.SplitTime(123.9995), (123, 999500)) |
708 |
self.assertEqual(utils.SplitTime(123.9994), (123, 999400)) |
709 |
self.assertEqual(utils.SplitTime(123.999999999), (123, 999999)) |
710 |
|
711 |
self.assertRaises(AssertionError, utils.SplitTime, -1) |
712 |
|
713 |
self.assertEqual(utils.MergeTime((1, 0)), 1.0) |
714 |
self.assertEqual(utils.MergeTime((1, 500000)), 1.5) |
715 |
self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5) |
716 |
|
717 |
self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3), |
718 |
1218448917.481)
|
719 |
self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801) |
720 |
|
721 |
self.assertRaises(AssertionError, utils.MergeTime, (0, -1)) |
722 |
self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000)) |
723 |
self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999)) |
724 |
self.assertRaises(AssertionError, utils.MergeTime, (-1, 0)) |
725 |
self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0)) |
726 |
|
727 |
|
728 |
class FieldSetTestCase(unittest.TestCase): |
729 |
"""Test case for FieldSets"""
|
730 |
|
731 |
def testSimpleMatch(self): |
732 |
f = utils.FieldSet("a", "b", "c", "def") |
733 |
self.failUnless(f.Matches("a")) |
734 |
self.failIf(f.Matches("d"), "Substring matched") |
735 |
self.failIf(f.Matches("defghi"), "Prefix string matched") |
736 |
self.failIf(f.NonMatching(["b", "c"])) |
737 |
self.failIf(f.NonMatching(["a", "b", "c", "def"])) |
738 |
self.failUnless(f.NonMatching(["a", "d"])) |
739 |
|
740 |
def testRegexMatch(self): |
741 |
f = utils.FieldSet("a", "b([0-9]+)", "c") |
742 |
self.failUnless(f.Matches("b1")) |
743 |
self.failUnless(f.Matches("b99")) |
744 |
self.failIf(f.Matches("b/1")) |
745 |
self.failIf(f.NonMatching(["b12", "c"])) |
746 |
self.failUnless(f.NonMatching(["a", "1"])) |
747 |
|
748 |
class TestForceDictType(unittest.TestCase): |
749 |
"""Test case for ForceDictType"""
|
750 |
KEY_TYPES = { |
751 |
"a": constants.VTYPE_INT,
|
752 |
"b": constants.VTYPE_BOOL,
|
753 |
"c": constants.VTYPE_STRING,
|
754 |
"d": constants.VTYPE_SIZE,
|
755 |
"e": constants.VTYPE_MAYBE_STRING,
|
756 |
} |
757 |
|
758 |
def _fdt(self, dict, allowed_values=None): |
759 |
if allowed_values is None: |
760 |
utils.ForceDictType(dict, self.KEY_TYPES) |
761 |
else:
|
762 |
utils.ForceDictType(dict, self.KEY_TYPES, allowed_values=allowed_values) |
763 |
|
764 |
return dict |
765 |
|
766 |
def testSimpleDict(self): |
767 |
self.assertEqual(self._fdt({}), {}) |
768 |
self.assertEqual(self._fdt({'a': 1}), {'a': 1}) |
769 |
self.assertEqual(self._fdt({'a': '1'}), {'a': 1}) |
770 |
self.assertEqual(self._fdt({'a': 1, 'b': 1}), {'a':1, 'b': True}) |
771 |
self.assertEqual(self._fdt({'b': 1, 'c': 'foo'}), {'b': True, 'c': 'foo'}) |
772 |
self.assertEqual(self._fdt({'b': 1, 'c': False}), {'b': True, 'c': ''}) |
773 |
self.assertEqual(self._fdt({'b': 'false'}), {'b': False}) |
774 |
self.assertEqual(self._fdt({'b': 'False'}), {'b': False}) |
775 |
self.assertEqual(self._fdt({'b': False}), {'b': False}) |
776 |
self.assertEqual(self._fdt({'b': 'true'}), {'b': True}) |
777 |
self.assertEqual(self._fdt({'b': 'True'}), {'b': True}) |
778 |
self.assertEqual(self._fdt({'d': '4'}), {'d': 4}) |
779 |
self.assertEqual(self._fdt({'d': '4M'}), {'d': 4}) |
780 |
self.assertEqual(self._fdt({"e": None, }), {"e": None, }) |
781 |
self.assertEqual(self._fdt({"e": "Hello World", }), {"e": "Hello World", }) |
782 |
self.assertEqual(self._fdt({"e": False, }), {"e": '', }) |
783 |
self.assertEqual(self._fdt({"b": "hello", }, ["hello"]), {"b": "hello"}) |
784 |
|
785 |
def testErrors(self): |
786 |
self.assertRaises(errors.TypeEnforcementError, self._fdt, {'a': 'astring'}) |
787 |
self.assertRaises(errors.TypeEnforcementError, self._fdt, {"b": "hello"}) |
788 |
self.assertRaises(errors.TypeEnforcementError, self._fdt, {'c': True}) |
789 |
self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': 'astring'}) |
790 |
self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': '4 L'}) |
791 |
self.assertRaises(errors.TypeEnforcementError, self._fdt, {"e": object(), }) |
792 |
self.assertRaises(errors.TypeEnforcementError, self._fdt, {"e": [], }) |
793 |
self.assertRaises(errors.TypeEnforcementError, self._fdt, {"x": None, }) |
794 |
self.assertRaises(errors.TypeEnforcementError, self._fdt, []) |
795 |
self.assertRaises(errors.ProgrammerError, utils.ForceDictType,
|
796 |
{"b": "hello"}, {"b": "no-such-type"}) |
797 |
|
798 |
|
799 |
class RunInSeparateProcess(unittest.TestCase): |
800 |
def test(self): |
801 |
for exp in [True, False]: |
802 |
def _child(): |
803 |
return exp
|
804 |
|
805 |
self.assertEqual(exp, utils.RunInSeparateProcess(_child))
|
806 |
|
807 |
def testArgs(self): |
808 |
for arg in [0, 1, 999, "Hello World", (1, 2, 3)]: |
809 |
def _child(carg1, carg2): |
810 |
return carg1 == "Foo" and carg2 == arg |
811 |
|
812 |
self.assert_(utils.RunInSeparateProcess(_child, "Foo", arg)) |
813 |
|
814 |
def testPid(self): |
815 |
parent_pid = os.getpid() |
816 |
|
817 |
def _check(): |
818 |
return os.getpid() == parent_pid
|
819 |
|
820 |
self.failIf(utils.RunInSeparateProcess(_check))
|
821 |
|
822 |
def testSignal(self): |
823 |
def _kill(): |
824 |
os.kill(os.getpid(), signal.SIGTERM) |
825 |
|
826 |
self.assertRaises(errors.GenericError,
|
827 |
utils.RunInSeparateProcess, _kill) |
828 |
|
829 |
def testException(self): |
830 |
def _exc(): |
831 |
raise errors.GenericError("This is a test") |
832 |
|
833 |
self.assertRaises(errors.GenericError,
|
834 |
utils.RunInSeparateProcess, _exc) |
835 |
|
836 |
|
837 |
class TestValidateServiceName(unittest.TestCase): |
838 |
def testValid(self): |
839 |
testnames = [ |
840 |
0, 1, 2, 3, 1024, 65000, 65534, 65535, |
841 |
"ganeti",
|
842 |
"gnt-masterd",
|
843 |
"HELLO_WORLD_SVC",
|
844 |
"hello.world.1",
|
845 |
"0", "80", "1111", "65535", |
846 |
] |
847 |
|
848 |
for name in testnames: |
849 |
self.assertEqual(utils.ValidateServiceName(name), name)
|
850 |
|
851 |
def testInvalid(self): |
852 |
testnames = [ |
853 |
-15756, -1, 65536, 133428083, |
854 |
"", "Hello World!", "!", "'", "\"", "\t", "\n", "`", |
855 |
"-8546", "-1", "65536", |
856 |
(129 * "A"), |
857 |
] |
858 |
|
859 |
for name in testnames: |
860 |
self.assertRaises(errors.OpPrereqError, utils.ValidateServiceName, name)
|
861 |
|
862 |
|
863 |
class TestReadLockedPidFile(unittest.TestCase): |
864 |
def setUp(self): |
865 |
self.tmpdir = tempfile.mkdtemp()
|
866 |
|
867 |
def tearDown(self): |
868 |
shutil.rmtree(self.tmpdir)
|
869 |
|
870 |
def testNonExistent(self): |
871 |
path = utils.PathJoin(self.tmpdir, "nonexist") |
872 |
self.assert_(utils.ReadLockedPidFile(path) is None) |
873 |
|
874 |
def testUnlocked(self): |
875 |
path = utils.PathJoin(self.tmpdir, "pid") |
876 |
utils.WriteFile(path, data="123")
|
877 |
self.assert_(utils.ReadLockedPidFile(path) is None) |
878 |
|
879 |
def testLocked(self): |
880 |
path = utils.PathJoin(self.tmpdir, "pid") |
881 |
utils.WriteFile(path, data="123")
|
882 |
|
883 |
fl = utils.FileLock.Open(path) |
884 |
try:
|
885 |
fl.Exclusive(blocking=True)
|
886 |
|
887 |
self.assertEqual(utils.ReadLockedPidFile(path), 123) |
888 |
finally:
|
889 |
fl.Close() |
890 |
|
891 |
self.assert_(utils.ReadLockedPidFile(path) is None) |
892 |
|
893 |
def testError(self): |
894 |
path = utils.PathJoin(self.tmpdir, "foobar", "pid") |
895 |
utils.WriteFile(utils.PathJoin(self.tmpdir, "foobar"), data="") |
896 |
# open(2) should return ENOTDIR
|
897 |
self.assertRaises(EnvironmentError, utils.ReadLockedPidFile, path) |
898 |
|
899 |
|
900 |
class TestFindMatch(unittest.TestCase): |
901 |
def test(self): |
902 |
data = { |
903 |
"aaaa": "Four A", |
904 |
"bb": {"Two B": True}, |
905 |
re.compile(r"^x(foo|bar|bazX)([0-9]+)$"): (1, 2, 3), |
906 |
} |
907 |
|
908 |
self.assertEqual(utils.FindMatch(data, "aaaa"), ("Four A", [])) |
909 |
self.assertEqual(utils.FindMatch(data, "bb"), ({"Two B": True}, [])) |
910 |
|
911 |
for i in ["foo", "bar", "bazX"]: |
912 |
for j in range(1, 100, 7): |
913 |
self.assertEqual(utils.FindMatch(data, "x%s%s" % (i, j)), |
914 |
((1, 2, 3), [i, str(j)])) |
915 |
|
916 |
def testNoMatch(self): |
917 |
self.assert_(utils.FindMatch({}, "") is None) |
918 |
self.assert_(utils.FindMatch({}, "foo") is None) |
919 |
self.assert_(utils.FindMatch({}, 1234) is None) |
920 |
|
921 |
data = { |
922 |
"X": "Hello World", |
923 |
re.compile("^(something)$"): "Hello World", |
924 |
} |
925 |
|
926 |
self.assert_(utils.FindMatch(data, "") is None) |
927 |
self.assert_(utils.FindMatch(data, "Hello World") is None) |
928 |
|
929 |
|
930 |
class TimeMock: |
931 |
def __init__(self, values): |
932 |
self.values = values
|
933 |
|
934 |
def __call__(self): |
935 |
return self.values.pop(0) |
936 |
|
937 |
|
938 |
class TestRunningTimeout(unittest.TestCase): |
939 |
def setUp(self): |
940 |
self.time_fn = TimeMock([0.0, 0.3, 4.6, 6.5]) |
941 |
|
942 |
def testRemainingFloat(self): |
943 |
timeout = utils.RunningTimeout(5.0, True, _time_fn=self.time_fn) |
944 |
self.assertAlmostEqual(timeout.Remaining(), 4.7) |
945 |
self.assertAlmostEqual(timeout.Remaining(), 0.4) |
946 |
self.assertAlmostEqual(timeout.Remaining(), -1.5) |
947 |
|
948 |
def testRemaining(self): |
949 |
self.time_fn = TimeMock([0, 2, 4, 5, 6]) |
950 |
timeout = utils.RunningTimeout(5, True, _time_fn=self.time_fn) |
951 |
self.assertEqual(timeout.Remaining(), 3) |
952 |
self.assertEqual(timeout.Remaining(), 1) |
953 |
self.assertEqual(timeout.Remaining(), 0) |
954 |
self.assertEqual(timeout.Remaining(), -1) |
955 |
|
956 |
def testRemainingNonNegative(self): |
957 |
timeout = utils.RunningTimeout(5.0, False, _time_fn=self.time_fn) |
958 |
self.assertAlmostEqual(timeout.Remaining(), 4.7) |
959 |
self.assertAlmostEqual(timeout.Remaining(), 0.4) |
960 |
self.assertEqual(timeout.Remaining(), 0.0) |
961 |
|
962 |
def testNegativeTimeout(self): |
963 |
self.assertRaises(ValueError, utils.RunningTimeout, -1.0, True) |
964 |
|
965 |
|
966 |
class TestTryConvert(unittest.TestCase): |
967 |
def test(self): |
968 |
for src, fn, result in [ |
969 |
("1", int, 1), |
970 |
("a", int, "a"), |
971 |
("", bool, False), |
972 |
("a", bool, True), |
973 |
]: |
974 |
self.assertEqual(utils.TryConvert(fn, src), result)
|
975 |
|
976 |
|
977 |
class TestIsValidShellParam(unittest.TestCase): |
978 |
def test(self): |
979 |
for val, result in [ |
980 |
("abc", True), |
981 |
("ab;cd", False), |
982 |
]: |
983 |
self.assertEqual(utils.IsValidShellParam(val), result)
|
984 |
|
985 |
|
986 |
class TestBuildShellCmd(unittest.TestCase): |
987 |
def test(self): |
988 |
self.assertRaises(errors.ProgrammerError, utils.BuildShellCmd,
|
989 |
"ls %s", "ab;cd") |
990 |
self.assertEqual(utils.BuildShellCmd("ls %s", "ab"), "ls ab") |
991 |
|
992 |
|
993 |
if __name__ == '__main__': |
994 |
testutils.GanetiTestProgram() |