Add RPC call for storage operations
[ganeti-local] / test / ganeti.utils_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for unittesting the utils module"""
23
24 import unittest
25 import os
26 import time
27 import tempfile
28 import os.path
29 import os
30 import md5
31 import signal
32 import socket
33 import shutil
34 import re
35 import select
36 import string
37
38 import ganeti
39 import testutils
40 from ganeti import constants
41 from ganeti import utils
42 from ganeti import errors
43 from ganeti.utils import IsProcessAlive, RunCmd, \
44      RemoveFile, MatchNameComponent, FormatUnit, \
45      ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
46      ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
47      SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree, OwnIpAddress, \
48      TailFile, ForceDictType, SafeEncode, IsNormAbsPath
49
50 from ganeti.errors import LockError, UnitParseError, GenericError, \
51      ProgrammerError
52
53
54 class TestIsProcessAlive(unittest.TestCase):
55   """Testing case for IsProcessAlive"""
56
57   def testExists(self):
58     mypid = os.getpid()
59     self.assert_(IsProcessAlive(mypid),
60                  "can't find myself running")
61
62   def testNotExisting(self):
63     pid_non_existing = os.fork()
64     if pid_non_existing == 0:
65       os._exit(0)
66     elif pid_non_existing < 0:
67       raise SystemError("can't fork")
68     os.waitpid(pid_non_existing, 0)
69     self.assert_(not IsProcessAlive(pid_non_existing),
70                  "nonexisting process detected")
71
72
73 class TestPidFileFunctions(unittest.TestCase):
74   """Tests for WritePidFile, RemovePidFile and ReadPidFile"""
75
76   def setUp(self):
77     self.dir = tempfile.mkdtemp()
78     self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
79     utils.DaemonPidFileName = self.f_dpn
80
81   def testPidFileFunctions(self):
82     pid_file = self.f_dpn('test')
83     utils.WritePidFile('test')
84     self.failUnless(os.path.exists(pid_file),
85                     "PID file should have been created")
86     read_pid = utils.ReadPidFile(pid_file)
87     self.failUnlessEqual(read_pid, os.getpid())
88     self.failUnless(utils.IsProcessAlive(read_pid))
89     self.failUnlessRaises(GenericError, utils.WritePidFile, 'test')
90     utils.RemovePidFile('test')
91     self.failIf(os.path.exists(pid_file),
92                 "PID file should not exist anymore")
93     self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
94                          "ReadPidFile should return 0 for missing pid file")
95     fh = open(pid_file, "w")
96     fh.write("blah\n")
97     fh.close()
98     self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
99                          "ReadPidFile should return 0 for invalid pid file")
100     utils.RemovePidFile('test')
101     self.failIf(os.path.exists(pid_file),
102                 "PID file should not exist anymore")
103
104   def testKill(self):
105     pid_file = self.f_dpn('child')
106     r_fd, w_fd = os.pipe()
107     new_pid = os.fork()
108     if new_pid == 0: #child
109       utils.WritePidFile('child')
110       os.write(w_fd, 'a')
111       signal.pause()
112       os._exit(0)
113       return
114     # else we are in the parent
115     # wait until the child has written the pid file
116     os.read(r_fd, 1)
117     read_pid = utils.ReadPidFile(pid_file)
118     self.failUnlessEqual(read_pid, new_pid)
119     self.failUnless(utils.IsProcessAlive(new_pid))
120     utils.KillProcess(new_pid, waitpid=True)
121     self.failIf(utils.IsProcessAlive(new_pid))
122     utils.RemovePidFile('child')
123     self.failUnlessRaises(ProgrammerError, utils.KillProcess, 0)
124
125   def tearDown(self):
126     for name in os.listdir(self.dir):
127       os.unlink(os.path.join(self.dir, name))
128     os.rmdir(self.dir)
129
130
131 class TestRunCmd(testutils.GanetiTestCase):
132   """Testing case for the RunCmd function"""
133
134   def setUp(self):
135     testutils.GanetiTestCase.setUp(self)
136     self.magic = time.ctime() + " ganeti test"
137     self.fname = self._CreateTempFile()
138
139   def testOk(self):
140     """Test successful exit code"""
141     result = RunCmd("/bin/sh -c 'exit 0'")
142     self.assertEqual(result.exit_code, 0)
143     self.assertEqual(result.output, "")
144
145   def testFail(self):
146     """Test fail exit code"""
147     result = RunCmd("/bin/sh -c 'exit 1'")
148     self.assertEqual(result.exit_code, 1)
149     self.assertEqual(result.output, "")
150
151   def testStdout(self):
152     """Test standard output"""
153     cmd = 'echo -n "%s"' % self.magic
154     result = RunCmd("/bin/sh -c '%s'" % cmd)
155     self.assertEqual(result.stdout, self.magic)
156     result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
157     self.assertEqual(result.output, "")
158     self.assertFileContent(self.fname, self.magic)
159
160   def testStderr(self):
161     """Test standard error"""
162     cmd = 'echo -n "%s"' % self.magic
163     result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
164     self.assertEqual(result.stderr, self.magic)
165     result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd, output=self.fname)
166     self.assertEqual(result.output, "")
167     self.assertFileContent(self.fname, self.magic)
168
169   def testCombined(self):
170     """Test combined output"""
171     cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
172     expected = "A" + self.magic + "B" + self.magic
173     result = RunCmd("/bin/sh -c '%s'" % cmd)
174     self.assertEqual(result.output, expected)
175     result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
176     self.assertEqual(result.output, "")
177     self.assertFileContent(self.fname, expected)
178
179   def testSignal(self):
180     """Test signal"""
181     result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
182     self.assertEqual(result.signal, 15)
183     self.assertEqual(result.output, "")
184
185   def testListRun(self):
186     """Test list runs"""
187     result = RunCmd(["true"])
188     self.assertEqual(result.signal, None)
189     self.assertEqual(result.exit_code, 0)
190     result = RunCmd(["/bin/sh", "-c", "exit 1"])
191     self.assertEqual(result.signal, None)
192     self.assertEqual(result.exit_code, 1)
193     result = RunCmd(["echo", "-n", self.magic])
194     self.assertEqual(result.signal, None)
195     self.assertEqual(result.exit_code, 0)
196     self.assertEqual(result.stdout, self.magic)
197
198   def testFileEmptyOutput(self):
199     """Test file output"""
200     result = RunCmd(["true"], output=self.fname)
201     self.assertEqual(result.signal, None)
202     self.assertEqual(result.exit_code, 0)
203     self.assertFileContent(self.fname, "")
204
205   def testLang(self):
206     """Test locale environment"""
207     old_env = os.environ.copy()
208     try:
209       os.environ["LANG"] = "en_US.UTF-8"
210       os.environ["LC_ALL"] = "en_US.UTF-8"
211       result = RunCmd(["locale"])
212       for line in result.output.splitlines():
213         key, value = line.split("=", 1)
214         # Ignore these variables, they're overridden by LC_ALL
215         if key == "LANG" or key == "LANGUAGE":
216           continue
217         self.failIf(value and value != "C" and value != '"C"',
218             "Variable %s is set to the invalid value '%s'" % (key, value))
219     finally:
220       os.environ = old_env
221
222   def testDefaultCwd(self):
223     """Test default working directory"""
224     self.failUnlessEqual(RunCmd(["pwd"]).stdout.strip(), "/")
225
226   def testCwd(self):
227     """Test default working directory"""
228     self.failUnlessEqual(RunCmd(["pwd"], cwd="/").stdout.strip(), "/")
229     self.failUnlessEqual(RunCmd(["pwd"], cwd="/tmp").stdout.strip(), "/tmp")
230     cwd = os.getcwd()
231     self.failUnlessEqual(RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd)
232
233
234 class TestRemoveFile(unittest.TestCase):
235   """Test case for the RemoveFile function"""
236
237   def setUp(self):
238     """Create a temp dir and file for each case"""
239     self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
240     fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
241     os.close(fd)
242
243   def tearDown(self):
244     if os.path.exists(self.tmpfile):
245       os.unlink(self.tmpfile)
246     os.rmdir(self.tmpdir)
247
248
249   def testIgnoreDirs(self):
250     """Test that RemoveFile() ignores directories"""
251     self.assertEqual(None, RemoveFile(self.tmpdir))
252
253
254   def testIgnoreNotExisting(self):
255     """Test that RemoveFile() ignores non-existing files"""
256     RemoveFile(self.tmpfile)
257     RemoveFile(self.tmpfile)
258
259
260   def testRemoveFile(self):
261     """Test that RemoveFile does remove a file"""
262     RemoveFile(self.tmpfile)
263     if os.path.exists(self.tmpfile):
264       self.fail("File '%s' not removed" % self.tmpfile)
265
266
267   def testRemoveSymlink(self):
268     """Test that RemoveFile does remove symlinks"""
269     symlink = self.tmpdir + "/symlink"
270     os.symlink("no-such-file", symlink)
271     RemoveFile(symlink)
272     if os.path.exists(symlink):
273       self.fail("File '%s' not removed" % symlink)
274     os.symlink(self.tmpfile, symlink)
275     RemoveFile(symlink)
276     if os.path.exists(symlink):
277       self.fail("File '%s' not removed" % symlink)
278
279
280 class TestRename(unittest.TestCase):
281   """Test case for RenameFile"""
282
283   def setUp(self):
284     """Create a temporary directory"""
285     self.tmpdir = tempfile.mkdtemp()
286     self.tmpfile = os.path.join(self.tmpdir, "test1")
287
288     # Touch the file
289     open(self.tmpfile, "w").close()
290
291   def tearDown(self):
292     """Remove temporary directory"""
293     shutil.rmtree(self.tmpdir)
294
295   def testSimpleRename1(self):
296     """Simple rename 1"""
297     utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
298
299   def testSimpleRename2(self):
300     """Simple rename 2"""
301     utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
302                      mkdir=True)
303
304   def testRenameMkdir(self):
305     """Rename with mkdir"""
306     utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
307                      mkdir=True)
308
309
310 class TestMatchNameComponent(unittest.TestCase):
311   """Test case for the MatchNameComponent function"""
312
313   def testEmptyList(self):
314     """Test that there is no match against an empty list"""
315
316     self.failUnlessEqual(MatchNameComponent("", []), None)
317     self.failUnlessEqual(MatchNameComponent("test", []), None)
318
319   def testSingleMatch(self):
320     """Test that a single match is performed correctly"""
321     mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
322     for key in "test2", "test2.example", "test2.example.com":
323       self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
324
325   def testMultipleMatches(self):
326     """Test that a multiple match is returned as None"""
327     mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
328     for key in "test1", "test1.example":
329       self.failUnlessEqual(MatchNameComponent(key, mlist), None)
330
331
332 class TestFormatUnit(unittest.TestCase):
333   """Test case for the FormatUnit function"""
334
335   def testMiB(self):
336     self.assertEqual(FormatUnit(1, 'h'), '1M')
337     self.assertEqual(FormatUnit(100, 'h'), '100M')
338     self.assertEqual(FormatUnit(1023, 'h'), '1023M')
339
340     self.assertEqual(FormatUnit(1, 'm'), '1')
341     self.assertEqual(FormatUnit(100, 'm'), '100')
342     self.assertEqual(FormatUnit(1023, 'm'), '1023')
343
344     self.assertEqual(FormatUnit(1024, 'm'), '1024')
345     self.assertEqual(FormatUnit(1536, 'm'), '1536')
346     self.assertEqual(FormatUnit(17133, 'm'), '17133')
347     self.assertEqual(FormatUnit(1024 * 1024 - 1, 'm'), '1048575')
348
349   def testGiB(self):
350     self.assertEqual(FormatUnit(1024, 'h'), '1.0G')
351     self.assertEqual(FormatUnit(1536, 'h'), '1.5G')
352     self.assertEqual(FormatUnit(17133, 'h'), '16.7G')
353     self.assertEqual(FormatUnit(1024 * 1024 - 1, 'h'), '1024.0G')
354
355     self.assertEqual(FormatUnit(1024, 'g'), '1.0')
356     self.assertEqual(FormatUnit(1536, 'g'), '1.5')
357     self.assertEqual(FormatUnit(17133, 'g'), '16.7')
358     self.assertEqual(FormatUnit(1024 * 1024 - 1, 'g'), '1024.0')
359
360     self.assertEqual(FormatUnit(1024 * 1024, 'g'), '1024.0')
361     self.assertEqual(FormatUnit(5120 * 1024, 'g'), '5120.0')
362     self.assertEqual(FormatUnit(29829 * 1024, 'g'), '29829.0')
363
364   def testTiB(self):
365     self.assertEqual(FormatUnit(1024 * 1024, 'h'), '1.0T')
366     self.assertEqual(FormatUnit(5120 * 1024, 'h'), '5.0T')
367     self.assertEqual(FormatUnit(29829 * 1024, 'h'), '29.1T')
368
369     self.assertEqual(FormatUnit(1024 * 1024, 't'), '1.0')
370     self.assertEqual(FormatUnit(5120 * 1024, 't'), '5.0')
371     self.assertEqual(FormatUnit(29829 * 1024, 't'), '29.1')
372
373 class TestParseUnit(unittest.TestCase):
374   """Test case for the ParseUnit function"""
375
376   SCALES = (('', 1),
377             ('M', 1), ('G', 1024), ('T', 1024 * 1024),
378             ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
379             ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
380
381   def testRounding(self):
382     self.assertEqual(ParseUnit('0'), 0)
383     self.assertEqual(ParseUnit('1'), 4)
384     self.assertEqual(ParseUnit('2'), 4)
385     self.assertEqual(ParseUnit('3'), 4)
386
387     self.assertEqual(ParseUnit('124'), 124)
388     self.assertEqual(ParseUnit('125'), 128)
389     self.assertEqual(ParseUnit('126'), 128)
390     self.assertEqual(ParseUnit('127'), 128)
391     self.assertEqual(ParseUnit('128'), 128)
392     self.assertEqual(ParseUnit('129'), 132)
393     self.assertEqual(ParseUnit('130'), 132)
394
395   def testFloating(self):
396     self.assertEqual(ParseUnit('0'), 0)
397     self.assertEqual(ParseUnit('0.5'), 4)
398     self.assertEqual(ParseUnit('1.75'), 4)
399     self.assertEqual(ParseUnit('1.99'), 4)
400     self.assertEqual(ParseUnit('2.00'), 4)
401     self.assertEqual(ParseUnit('2.01'), 4)
402     self.assertEqual(ParseUnit('3.99'), 4)
403     self.assertEqual(ParseUnit('4.00'), 4)
404     self.assertEqual(ParseUnit('4.01'), 8)
405     self.assertEqual(ParseUnit('1.5G'), 1536)
406     self.assertEqual(ParseUnit('1.8G'), 1844)
407     self.assertEqual(ParseUnit('8.28T'), 8682212)
408
409   def testSuffixes(self):
410     for sep in ('', ' ', '   ', "\t", "\t "):
411       for suffix, scale in TestParseUnit.SCALES:
412         for func in (lambda x: x, str.lower, str.upper):
413           self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
414                            1024 * scale)
415
416   def testInvalidInput(self):
417     for sep in ('-', '_', ',', 'a'):
418       for suffix, _ in TestParseUnit.SCALES:
419         self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
420
421     for suffix, _ in TestParseUnit.SCALES:
422       self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
423
424
425 class TestSshKeys(testutils.GanetiTestCase):
426   """Test case for the AddAuthorizedKey function"""
427
428   KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
429   KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
430            'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
431
432   def setUp(self):
433     testutils.GanetiTestCase.setUp(self)
434     self.tmpname = self._CreateTempFile()
435     handle = open(self.tmpname, 'w')
436     try:
437       handle.write("%s\n" % TestSshKeys.KEY_A)
438       handle.write("%s\n" % TestSshKeys.KEY_B)
439     finally:
440       handle.close()
441
442   def testAddingNewKey(self):
443     AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
444
445     self.assertFileContent(self.tmpname,
446       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
447       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
448       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
449       "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
450
451   def testAddingAlmostButNotCompletelyTheSameKey(self):
452     AddAuthorizedKey(self.tmpname,
453         'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
454
455     self.assertFileContent(self.tmpname,
456       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
457       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
458       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
459       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
460
461   def testAddingExistingKeyWithSomeMoreSpaces(self):
462     AddAuthorizedKey(self.tmpname,
463         'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
464
465     self.assertFileContent(self.tmpname,
466       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
467       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
468       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
469
470   def testRemovingExistingKeyWithSomeMoreSpaces(self):
471     RemoveAuthorizedKey(self.tmpname,
472         'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
473
474     self.assertFileContent(self.tmpname,
475       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
476       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
477
478   def testRemovingNonExistingKey(self):
479     RemoveAuthorizedKey(self.tmpname,
480         'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
481
482     self.assertFileContent(self.tmpname,
483       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
484       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
485       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
486
487
488 class TestEtcHosts(testutils.GanetiTestCase):
489   """Test functions modifying /etc/hosts"""
490
491   def setUp(self):
492     testutils.GanetiTestCase.setUp(self)
493     self.tmpname = self._CreateTempFile()
494     handle = open(self.tmpname, 'w')
495     try:
496       handle.write('# This is a test file for /etc/hosts\n')
497       handle.write('127.0.0.1\tlocalhost\n')
498       handle.write('192.168.1.1 router gw\n')
499     finally:
500       handle.close()
501
502   def testSettingNewIp(self):
503     SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
504
505     self.assertFileContent(self.tmpname,
506       "# This is a test file for /etc/hosts\n"
507       "127.0.0.1\tlocalhost\n"
508       "192.168.1.1 router gw\n"
509       "1.2.3.4\tmyhost.domain.tld myhost\n")
510     self.assertFileMode(self.tmpname, 0644)
511
512   def testSettingExistingIp(self):
513     SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
514                      ['myhost'])
515
516     self.assertFileContent(self.tmpname,
517       "# This is a test file for /etc/hosts\n"
518       "127.0.0.1\tlocalhost\n"
519       "192.168.1.1\tmyhost.domain.tld myhost\n")
520     self.assertFileMode(self.tmpname, 0644)
521
522   def testSettingDuplicateName(self):
523     SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
524
525     self.assertFileContent(self.tmpname,
526       "# This is a test file for /etc/hosts\n"
527       "127.0.0.1\tlocalhost\n"
528       "192.168.1.1 router gw\n"
529       "1.2.3.4\tmyhost\n")
530     self.assertFileMode(self.tmpname, 0644)
531
532   def testRemovingExistingHost(self):
533     RemoveEtcHostsEntry(self.tmpname, 'router')
534
535     self.assertFileContent(self.tmpname,
536       "# This is a test file for /etc/hosts\n"
537       "127.0.0.1\tlocalhost\n"
538       "192.168.1.1 gw\n")
539     self.assertFileMode(self.tmpname, 0644)
540
541   def testRemovingSingleExistingHost(self):
542     RemoveEtcHostsEntry(self.tmpname, 'localhost')
543
544     self.assertFileContent(self.tmpname,
545       "# This is a test file for /etc/hosts\n"
546       "192.168.1.1 router gw\n")
547     self.assertFileMode(self.tmpname, 0644)
548
549   def testRemovingNonExistingHost(self):
550     RemoveEtcHostsEntry(self.tmpname, 'myhost')
551
552     self.assertFileContent(self.tmpname,
553       "# This is a test file for /etc/hosts\n"
554       "127.0.0.1\tlocalhost\n"
555       "192.168.1.1 router gw\n")
556     self.assertFileMode(self.tmpname, 0644)
557
558   def testRemovingAlias(self):
559     RemoveEtcHostsEntry(self.tmpname, 'gw')
560
561     self.assertFileContent(self.tmpname,
562       "# This is a test file for /etc/hosts\n"
563       "127.0.0.1\tlocalhost\n"
564       "192.168.1.1 router\n")
565     self.assertFileMode(self.tmpname, 0644)
566
567
568 class TestShellQuoting(unittest.TestCase):
569   """Test case for shell quoting functions"""
570
571   def testShellQuote(self):
572     self.assertEqual(ShellQuote('abc'), "abc")
573     self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
574     self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
575     self.assertEqual(ShellQuote("a b c"), "'a b c'")
576     self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
577
578   def testShellQuoteArgs(self):
579     self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
580     self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
581     self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
582
583
584 class TestTcpPing(unittest.TestCase):
585   """Testcase for TCP version of ping - against listen(2)ing port"""
586
587   def setUp(self):
588     self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
589     self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
590     self.listenerport = self.listener.getsockname()[1]
591     self.listener.listen(1)
592
593   def tearDown(self):
594     self.listener.shutdown(socket.SHUT_RDWR)
595     del self.listener
596     del self.listenerport
597
598   def testTcpPingToLocalHostAccept(self):
599     self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
600                          self.listenerport,
601                          timeout=10,
602                          live_port_needed=True,
603                          source=constants.LOCALHOST_IP_ADDRESS,
604                          ),
605                  "failed to connect to test listener")
606
607     self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
608                          self.listenerport,
609                          timeout=10,
610                          live_port_needed=True,
611                          ),
612                  "failed to connect to test listener (no source)")
613
614
615 class TestTcpPingDeaf(unittest.TestCase):
616   """Testcase for TCP version of ping - against non listen(2)ing port"""
617
618   def setUp(self):
619     self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
620     self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
621     self.deaflistenerport = self.deaflistener.getsockname()[1]
622
623   def tearDown(self):
624     del self.deaflistener
625     del self.deaflistenerport
626
627   def testTcpPingToLocalHostAcceptDeaf(self):
628     self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
629                         self.deaflistenerport,
630                         timeout=constants.TCP_PING_TIMEOUT,
631                         live_port_needed=True,
632                         source=constants.LOCALHOST_IP_ADDRESS,
633                         ), # need successful connect(2)
634                 "successfully connected to deaf listener")
635
636     self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
637                         self.deaflistenerport,
638                         timeout=constants.TCP_PING_TIMEOUT,
639                         live_port_needed=True,
640                         ), # need successful connect(2)
641                 "successfully connected to deaf listener (no source addr)")
642
643   def testTcpPingToLocalHostNoAccept(self):
644     self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
645                          self.deaflistenerport,
646                          timeout=constants.TCP_PING_TIMEOUT,
647                          live_port_needed=False,
648                          source=constants.LOCALHOST_IP_ADDRESS,
649                          ), # ECONNREFUSED is OK
650                  "failed to ping alive host on deaf port")
651
652     self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
653                          self.deaflistenerport,
654                          timeout=constants.TCP_PING_TIMEOUT,
655                          live_port_needed=False,
656                          ), # ECONNREFUSED is OK
657                  "failed to ping alive host on deaf port (no source addr)")
658
659
660 class TestOwnIpAddress(unittest.TestCase):
661   """Testcase for OwnIpAddress"""
662
663   def testOwnLoopback(self):
664     """check having the loopback ip"""
665     self.failUnless(OwnIpAddress(constants.LOCALHOST_IP_ADDRESS),
666                     "Should own the loopback address")
667
668   def testNowOwnAddress(self):
669     """check that I don't own an address"""
670
671     # network 192.0.2.0/24 is reserved for test/documentation as per
672     # rfc 3330, so we *should* not have an address of this range... if
673     # this fails, we should extend the test to multiple addresses
674     DST_IP = "192.0.2.1"
675     self.failIf(OwnIpAddress(DST_IP), "Should not own IP address %s" % DST_IP)
676
677
678 class TestListVisibleFiles(unittest.TestCase):
679   """Test case for ListVisibleFiles"""
680
681   def setUp(self):
682     self.path = tempfile.mkdtemp()
683
684   def tearDown(self):
685     shutil.rmtree(self.path)
686
687   def _test(self, files, expected):
688     # Sort a copy
689     expected = expected[:]
690     expected.sort()
691
692     for name in files:
693       f = open(os.path.join(self.path, name), 'w')
694       try:
695         f.write("Test\n")
696       finally:
697         f.close()
698
699     found = ListVisibleFiles(self.path)
700     found.sort()
701
702     self.assertEqual(found, expected)
703
704   def testAllVisible(self):
705     files = ["a", "b", "c"]
706     expected = files
707     self._test(files, expected)
708
709   def testNoneVisible(self):
710     files = [".a", ".b", ".c"]
711     expected = []
712     self._test(files, expected)
713
714   def testSomeVisible(self):
715     files = ["a", "b", ".c"]
716     expected = ["a", "b"]
717     self._test(files, expected)
718
719
720 class TestNewUUID(unittest.TestCase):
721   """Test case for NewUUID"""
722
723   _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
724                         '[a-f0-9]{4}-[a-f0-9]{12}$')
725
726   def runTest(self):
727     self.failUnless(self._re_uuid.match(utils.NewUUID()))
728
729
730 class TestUniqueSequence(unittest.TestCase):
731   """Test case for UniqueSequence"""
732
733   def _test(self, input, expected):
734     self.assertEqual(utils.UniqueSequence(input), expected)
735
736   def runTest(self):
737     # Ordered input
738     self._test([1, 2, 3], [1, 2, 3])
739     self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
740     self._test([1, 2, 2, 3], [1, 2, 3])
741     self._test([1, 2, 3, 3], [1, 2, 3])
742
743     # Unordered input
744     self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
745     self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
746
747     # Strings
748     self._test(["a", "a"], ["a"])
749     self._test(["a", "b"], ["a", "b"])
750     self._test(["a", "b", "a"], ["a", "b"])
751
752
753 class TestFirstFree(unittest.TestCase):
754   """Test case for the FirstFree function"""
755
756   def test(self):
757     """Test FirstFree"""
758     self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
759     self.failUnlessEqual(FirstFree([]), None)
760     self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
761     self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
762     self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
763
764
765 class TestTailFile(testutils.GanetiTestCase):
766   """Test case for the TailFile function"""
767
768   def testEmpty(self):
769     fname = self._CreateTempFile()
770     self.failUnlessEqual(TailFile(fname), [])
771     self.failUnlessEqual(TailFile(fname, lines=25), [])
772
773   def testAllLines(self):
774     data = ["test %d" % i for i in range(30)]
775     for i in range(30):
776       fname = self._CreateTempFile()
777       fd = open(fname, "w")
778       fd.write("\n".join(data[:i]))
779       if i > 0:
780         fd.write("\n")
781       fd.close()
782       self.failUnlessEqual(TailFile(fname, lines=i), data[:i])
783
784   def testPartialLines(self):
785     data = ["test %d" % i for i in range(30)]
786     fname = self._CreateTempFile()
787     fd = open(fname, "w")
788     fd.write("\n".join(data))
789     fd.write("\n")
790     fd.close()
791     for i in range(1, 30):
792       self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
793
794   def testBigFile(self):
795     data = ["test %d" % i for i in range(30)]
796     fname = self._CreateTempFile()
797     fd = open(fname, "w")
798     fd.write("X" * 1048576)
799     fd.write("\n")
800     fd.write("\n".join(data))
801     fd.write("\n")
802     fd.close()
803     for i in range(1, 30):
804       self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
805
806
807 class TestFileLock(unittest.TestCase):
808   """Test case for the FileLock class"""
809
810   def setUp(self):
811     self.tmpfile = tempfile.NamedTemporaryFile()
812     self.lock = utils.FileLock(self.tmpfile.name)
813
814   def testSharedNonblocking(self):
815     self.lock.Shared(blocking=False)
816     self.lock.Close()
817
818   def testExclusiveNonblocking(self):
819     self.lock.Exclusive(blocking=False)
820     self.lock.Close()
821
822   def testUnlockNonblocking(self):
823     self.lock.Unlock(blocking=False)
824     self.lock.Close()
825
826   def testSharedBlocking(self):
827     self.lock.Shared(blocking=True)
828     self.lock.Close()
829
830   def testExclusiveBlocking(self):
831     self.lock.Exclusive(blocking=True)
832     self.lock.Close()
833
834   def testUnlockBlocking(self):
835     self.lock.Unlock(blocking=True)
836     self.lock.Close()
837
838   def testSharedExclusiveUnlock(self):
839     self.lock.Shared(blocking=False)
840     self.lock.Exclusive(blocking=False)
841     self.lock.Unlock(blocking=False)
842     self.lock.Close()
843
844   def testExclusiveSharedUnlock(self):
845     self.lock.Exclusive(blocking=False)
846     self.lock.Shared(blocking=False)
847     self.lock.Unlock(blocking=False)
848     self.lock.Close()
849
850   def testCloseShared(self):
851     self.lock.Close()
852     self.assertRaises(AssertionError, self.lock.Shared, blocking=False)
853
854   def testCloseExclusive(self):
855     self.lock.Close()
856     self.assertRaises(AssertionError, self.lock.Exclusive, blocking=False)
857
858   def testCloseUnlock(self):
859     self.lock.Close()
860     self.assertRaises(AssertionError, self.lock.Unlock, blocking=False)
861
862
863 class TestTimeFunctions(unittest.TestCase):
864   """Test case for time functions"""
865
866   def runTest(self):
867     self.assertEqual(utils.SplitTime(1), (1, 0))
868     self.assertEqual(utils.SplitTime(1.5), (1, 500000))
869     self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
870     self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
871     self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
872     self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
873     self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
874     self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
875
876     self.assertRaises(AssertionError, utils.SplitTime, -1)
877
878     self.assertEqual(utils.MergeTime((1, 0)), 1.0)
879     self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
880     self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
881
882     self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3), 1218448917.481)
883     self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
884
885     self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
886     self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
887     self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
888     self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
889     self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
890
891
892 class FieldSetTestCase(unittest.TestCase):
893   """Test case for FieldSets"""
894
895   def testSimpleMatch(self):
896     f = utils.FieldSet("a", "b", "c", "def")
897     self.failUnless(f.Matches("a"))
898     self.failIf(f.Matches("d"), "Substring matched")
899     self.failIf(f.Matches("defghi"), "Prefix string matched")
900     self.failIf(f.NonMatching(["b", "c"]))
901     self.failIf(f.NonMatching(["a", "b", "c", "def"]))
902     self.failUnless(f.NonMatching(["a", "d"]))
903
904   def testRegexMatch(self):
905     f = utils.FieldSet("a", "b([0-9]+)", "c")
906     self.failUnless(f.Matches("b1"))
907     self.failUnless(f.Matches("b99"))
908     self.failIf(f.Matches("b/1"))
909     self.failIf(f.NonMatching(["b12", "c"]))
910     self.failUnless(f.NonMatching(["a", "1"]))
911
912 class TestForceDictType(unittest.TestCase):
913   """Test case for ForceDictType"""
914
915   def setUp(self):
916     self.key_types = {
917       'a': constants.VTYPE_INT,
918       'b': constants.VTYPE_BOOL,
919       'c': constants.VTYPE_STRING,
920       'd': constants.VTYPE_SIZE,
921       }
922
923   def _fdt(self, dict, allowed_values=None):
924     if allowed_values is None:
925       ForceDictType(dict, self.key_types)
926     else:
927       ForceDictType(dict, self.key_types, allowed_values=allowed_values)
928
929     return dict
930
931   def testSimpleDict(self):
932     self.assertEqual(self._fdt({}), {})
933     self.assertEqual(self._fdt({'a': 1}), {'a': 1})
934     self.assertEqual(self._fdt({'a': '1'}), {'a': 1})
935     self.assertEqual(self._fdt({'a': 1, 'b': 1}), {'a':1, 'b': True})
936     self.assertEqual(self._fdt({'b': 1, 'c': 'foo'}), {'b': True, 'c': 'foo'})
937     self.assertEqual(self._fdt({'b': 1, 'c': False}), {'b': True, 'c': ''})
938     self.assertEqual(self._fdt({'b': 'false'}), {'b': False})
939     self.assertEqual(self._fdt({'b': 'False'}), {'b': False})
940     self.assertEqual(self._fdt({'b': 'true'}), {'b': True})
941     self.assertEqual(self._fdt({'b': 'True'}), {'b': True})
942     self.assertEqual(self._fdt({'d': '4'}), {'d': 4})
943     self.assertEqual(self._fdt({'d': '4M'}), {'d': 4})
944
945   def testErrors(self):
946     self.assertRaises(errors.TypeEnforcementError, self._fdt, {'a': 'astring'})
947     self.assertRaises(errors.TypeEnforcementError, self._fdt, {'c': True})
948     self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': 'astring'})
949     self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': '4 L'})
950
951
952 class TestIsAbsNormPath(unittest.TestCase):
953   """Testing case for IsProcessAlive"""
954
955   def _pathTestHelper(self, path, result):
956     if result:
957       self.assert_(IsNormAbsPath(path),
958           "Path %s should result absolute and normalized" % path)
959     else:
960       self.assert_(not IsNormAbsPath(path),
961           "Path %s should not result absolute and normalized" % path)
962
963   def testBase(self):
964     self._pathTestHelper('/etc', True)
965     self._pathTestHelper('/srv', True)
966     self._pathTestHelper('etc', False)
967     self._pathTestHelper('/etc/../root', False)
968     self._pathTestHelper('/etc/', False)
969
970
971 class TestSafeEncode(unittest.TestCase):
972   """Test case for SafeEncode"""
973
974   def testAscii(self):
975     for txt in [string.digits, string.letters, string.punctuation]:
976       self.failUnlessEqual(txt, SafeEncode(txt))
977
978   def testDoubleEncode(self):
979     for i in range(255):
980       txt = SafeEncode(chr(i))
981       self.failUnlessEqual(txt, SafeEncode(txt))
982
983   def testUnicode(self):
984     # 1024 is high enough to catch non-direct ASCII mappings
985     for i in range(1024):
986       txt = SafeEncode(unichr(i))
987       self.failUnlessEqual(txt, SafeEncode(txt))
988
989
990 if __name__ == '__main__':
991   unittest.main()