Statistics
| Branch: | Tag: | Revision:

root / testing / ganeti.utils_unittest.py @ 31ee599c

History | View | Annotate | Download (12.3 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the utils module"""
23

    
24
import unittest
25
import os
26
import time
27
import tempfile
28
import os.path
29
import md5
30

    
31
import ganeti
32
from ganeti.utils import IsProcessAlive, Lock, Unlock, RunCmd, \
33
     RemoveFile, CheckDict, MatchNameComponent, FormatUnit, \
34
     ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
35
     ShellQuote, ShellQuoteArgs
36
from ganeti.errors import LockError, UnitParseError
37

    
38
class TestIsProcessAlive(unittest.TestCase):
39
  """Testing case for IsProcessAlive"""
40
  def setUp(self):
41
    # create a zombie and a (hopefully) non-existing process id
42
    self.pid_zombie = os.fork()
43
    if self.pid_zombie == 0:
44
      os._exit(0)
45
    elif self.pid_zombie < 0:
46
      raise SystemError("can't fork")
47
    self.pid_non_existing = os.fork()
48
    if self.pid_non_existing == 0:
49
      os._exit(0)
50
    elif self.pid_non_existing > 0:
51
      os.waitpid(self.pid_non_existing, 0)
52
    else:
53
      raise SystemError("can't fork")
54

    
55

    
56
  def testExists(self):
57
    mypid = os.getpid()
58
    self.assert_(IsProcessAlive(mypid),
59
                 "can't find myself running")
60

    
61
  def testZombie(self):
62
    self.assert_(not IsProcessAlive(self.pid_zombie),
63
                 "zombie not detected as zombie")
64

    
65

    
66
  def testNotExisting(self):
67
    self.assert_(not IsProcessAlive(self.pid_non_existing),
68
                 "noexisting process detected")
69

    
70

    
71
class TestLocking(unittest.TestCase):
72
  """Testing case for the Lock/Unlock functions"""
73
  def clean_lock(self, name):
74
    try:
75
      ganeti.utils.Unlock("unittest")
76
    except LockError:
77
      pass
78

    
79

    
80
  def testLock(self):
81
    self.clean_lock("unittest")
82
    self.assertEqual(None, Lock("unittest"))
83

    
84

    
85
  def testUnlock(self):
86
    self.clean_lock("unittest")
87
    ganeti.utils.Lock("unittest")
88
    self.assertEqual(None, Unlock("unittest"))
89

    
90

    
91
  def testDoubleLock(self):
92
    self.clean_lock("unittest")
93
    ganeti.utils.Lock("unittest")
94
    self.assertRaises(LockError, Lock, "unittest")
95

    
96

    
97
class TestRunCmd(unittest.TestCase):
98
  """Testing case for the RunCmd function"""
99

    
100
  def setUp(self):
101
    self.magic = time.ctime() + " ganeti test"
102

    
103
  def testOk(self):
104
    """Test successful exit code"""
105
    result = RunCmd("/bin/sh -c 'exit 0'")
106
    self.assertEqual(result.exit_code, 0)
107

    
108
  def testFail(self):
109
    """Test fail exit code"""
110
    result = RunCmd("/bin/sh -c 'exit 1'")
111
    self.assertEqual(result.exit_code, 1)
112

    
113

    
114
  def testStdout(self):
115
    """Test standard output"""
116
    cmd = 'echo -n "%s"' % self.magic
117
    result = RunCmd("/bin/sh -c '%s'" % cmd)
118
    self.assertEqual(result.stdout, self.magic)
119

    
120

    
121
  def testStderr(self):
122
    """Test standard error"""
123
    cmd = 'echo -n "%s"' % self.magic
124
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
125
    self.assertEqual(result.stderr, self.magic)
126

    
127

    
128
  def testCombined(self):
129
    """Test combined output"""
130
    cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
131
    result = RunCmd("/bin/sh -c '%s'" % cmd)
132
    self.assertEqual(result.output, "A" + self.magic + "B" + self.magic)
133

    
134
  def testSignal(self):
135
    """Test standard error"""
136
    result = RunCmd("/bin/sh -c 'kill -15 $$'")
137
    self.assertEqual(result.signal, 15)
138

    
139

    
140
class TestRemoveFile(unittest.TestCase):
141
  """Test case for the RemoveFile function"""
142

    
143
  def setUp(self):
144
    """Create a temp dir and file for each case"""
145
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
146
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
147
    os.close(fd)
148

    
149
  def tearDown(self):
150
    if os.path.exists(self.tmpfile):
151
      os.unlink(self.tmpfile)
152
    os.rmdir(self.tmpdir)
153

    
154

    
155
  def testIgnoreDirs(self):
156
    """Test that RemoveFile() ignores directories"""
157
    self.assertEqual(None, RemoveFile(self.tmpdir))
158

    
159

    
160
  def testIgnoreNotExisting(self):
161
    """Test that RemoveFile() ignores non-existing files"""
162
    RemoveFile(self.tmpfile)
163
    RemoveFile(self.tmpfile)
164

    
165

    
166
  def testRemoveFile(self):
167
    """Test that RemoveFile does remove a file"""
168
    RemoveFile(self.tmpfile)
169
    if os.path.exists(self.tmpfile):
170
      self.fail("File '%s' not removed" % self.tmpfile)
171

    
172

    
173
  def testRemoveSymlink(self):
174
    """Test that RemoveFile does remove symlinks"""
175
    symlink = self.tmpdir + "/symlink"
176
    os.symlink("no-such-file", symlink)
177
    RemoveFile(symlink)
178
    if os.path.exists(symlink):
179
      self.fail("File '%s' not removed" % symlink)
180
    os.symlink(self.tmpfile, symlink)
181
    RemoveFile(symlink)
182
    if os.path.exists(symlink):
183
      self.fail("File '%s' not removed" % symlink)
184

    
185

    
186
class TestCheckdict(unittest.TestCase):
187
  """Test case for the CheckDict function"""
188

    
189
  def testAdd(self):
190
    """Test that CheckDict adds a missing key with the correct value"""
191

    
192
    tgt = {'a':1}
193
    tmpl = {'b': 2}
194
    CheckDict(tgt, tmpl)
195
    if 'b' not in tgt or tgt['b'] != 2:
196
      self.fail("Failed to update dict")
197

    
198

    
199
  def testNoUpdate(self):
200
    """Test that CheckDict does not overwrite an existing key"""
201
    tgt = {'a':1, 'b': 3}
202
    tmpl = {'b': 2}
203
    CheckDict(tgt, tmpl)
204
    self.failUnlessEqual(tgt['b'], 3)
205

    
206

    
207
class TestMatchNameComponent(unittest.TestCase):
208
  """Test case for the MatchNameComponent function"""
209

    
210
  def testEmptyList(self):
211
    """Test that there is no match against an empty list"""
212

    
213
    self.failUnlessEqual(MatchNameComponent("", []), None)
214
    self.failUnlessEqual(MatchNameComponent("test", []), None)
215

    
216
  def testSingleMatch(self):
217
    """Test that a single match is performed correctly"""
218
    mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
219
    for key in "test2", "test2.example", "test2.example.com":
220
      self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
221

    
222
  def testMultipleMatches(self):
223
    """Test that a multiple match is returned as None"""
224
    mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
225
    for key in "test1", "test1.example":
226
      self.failUnlessEqual(MatchNameComponent(key, mlist), None)
227

    
228

    
229
class TestFormatUnit(unittest.TestCase):
230
  """Test case for the FormatUnit function"""
231

    
232
  def testMiB(self):
233
    self.assertEqual(FormatUnit(1), '1M')
234
    self.assertEqual(FormatUnit(100), '100M')
235
    self.assertEqual(FormatUnit(1023), '1023M')
236

    
237
  def testGiB(self):
238
    self.assertEqual(FormatUnit(1024), '1.0G')
239
    self.assertEqual(FormatUnit(1536), '1.5G')
240
    self.assertEqual(FormatUnit(17133), '16.7G')
241
    self.assertEqual(FormatUnit(1024 * 1024 - 1), '1024.0G')
242

    
243
  def testTiB(self):
244
    self.assertEqual(FormatUnit(1024 * 1024), '1.0T')
245
    self.assertEqual(FormatUnit(5120 * 1024), '5.0T')
246
    self.assertEqual(FormatUnit(29829 * 1024), '29.1T')
247

    
248

    
249
class TestParseUnit(unittest.TestCase):
250
  """Test case for the ParseUnit function"""
251

    
252
  SCALES = (('', 1),
253
            ('M', 1), ('G', 1024), ('T', 1024 * 1024),
254
            ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
255
            ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
256

    
257
  def testRounding(self):
258
    self.assertEqual(ParseUnit('0'), 0)
259
    self.assertEqual(ParseUnit('1'), 4)
260
    self.assertEqual(ParseUnit('2'), 4)
261
    self.assertEqual(ParseUnit('3'), 4)
262

    
263
    self.assertEqual(ParseUnit('124'), 124)
264
    self.assertEqual(ParseUnit('125'), 128)
265
    self.assertEqual(ParseUnit('126'), 128)
266
    self.assertEqual(ParseUnit('127'), 128)
267
    self.assertEqual(ParseUnit('128'), 128)
268
    self.assertEqual(ParseUnit('129'), 132)
269
    self.assertEqual(ParseUnit('130'), 132)
270

    
271
  def testFloating(self):
272
    self.assertEqual(ParseUnit('0'), 0)
273
    self.assertEqual(ParseUnit('0.5'), 4)
274
    self.assertEqual(ParseUnit('1.75'), 4)
275
    self.assertEqual(ParseUnit('1.99'), 4)
276
    self.assertEqual(ParseUnit('2.00'), 4)
277
    self.assertEqual(ParseUnit('2.01'), 4)
278
    self.assertEqual(ParseUnit('3.99'), 4)
279
    self.assertEqual(ParseUnit('4.00'), 4)
280
    self.assertEqual(ParseUnit('4.01'), 8)
281
    self.assertEqual(ParseUnit('1.5G'), 1536)
282
    self.assertEqual(ParseUnit('1.8G'), 1844)
283
    self.assertEqual(ParseUnit('8.28T'), 8682212)
284

    
285
  def testSuffixes(self):
286
    for sep in ('', ' ', '   ', "\t", "\t "):
287
      for suffix, scale in TestParseUnit.SCALES:
288
        for func in (lambda x: x, str.lower, str.upper):
289
          self.assertEqual(ParseUnit('1024' + sep + func(suffix)), 1024 * scale)
290

    
291
  def testInvalidInput(self):
292
    for sep in ('-', '_', ',', 'a'):
293
      for suffix, _ in TestParseUnit.SCALES:
294
        self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
295

    
296
    for suffix, _ in TestParseUnit.SCALES:
297
      self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
298

    
299

    
300
class TestSshKeys(unittest.TestCase):
301
  """Test case for the AddAuthorizedKey function"""
302

    
303
  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
304
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
305
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
306

    
307
  # NOTE: The MD5 sums below were calculated after manually
308
  #       checking the output files.
309

    
310
  def writeTestFile(self):
311
    (fd, tmpname) = tempfile.mkstemp(prefix = 'ganeti-test')
312
    f = os.fdopen(fd, 'w')
313
    try:
314
      f.write(TestSshKeys.KEY_A)
315
      f.write("\n")
316
      f.write(TestSshKeys.KEY_B)
317
      f.write("\n")
318
    finally:
319
      f.close()
320

    
321
    return tmpname
322

    
323
  def testAddingNewKey(self):
324
    tmpname = self.writeTestFile()
325
    try:
326
      AddAuthorizedKey(tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
327

    
328
      f = open(tmpname, 'r')
329
      try:
330
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
331
                         'ccc71523108ca6e9d0343797dc3e9f16')
332
      finally:
333
        f.close()
334
    finally:
335
      os.unlink(tmpname)
336

    
337
  def testAddingAlmostButNotCompletlyTheSameKey(self):
338
    tmpname = self.writeTestFile()
339
    try:
340
      AddAuthorizedKey(tmpname,
341
          'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
342

    
343
      f = open(tmpname, 'r')
344
      try:
345
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
346
                         'f2c939d57addb5b3a6846884be896b46')
347
      finally:
348
        f.close()
349
    finally:
350
      os.unlink(tmpname)
351

    
352
  def testAddingExistingKeyWithSomeMoreSpaces(self):
353
    tmpname = self.writeTestFile()
354
    try:
355
      AddAuthorizedKey(tmpname,
356
          'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
357

    
358
      f = open(tmpname, 'r')
359
      try:
360
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
361
                         '4e612764808bd46337eb0f575415fc30')
362
      finally:
363
        f.close()
364
    finally:
365
      os.unlink(tmpname)
366

    
367
  def testRemovingExistingKeyWithSomeMoreSpaces(self):
368
    tmpname = self.writeTestFile()
369
    try:
370
      RemoveAuthorizedKey(tmpname,
371
          'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
372

    
373
      f = open(tmpname, 'r')
374
      try:
375
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
376
                         '77516d987fca07f70e30b830b3e4f2ed')
377
      finally:
378
        f.close()
379
    finally:
380
      os.unlink(tmpname)
381

    
382
  def testRemovingNonExistingKey(self):
383
    tmpname = self.writeTestFile()
384
    try:
385
      RemoveAuthorizedKey(tmpname,
386
          'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
387

    
388
      f = open(tmpname, 'r')
389
      try:
390
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
391
                         '4e612764808bd46337eb0f575415fc30')
392
      finally:
393
        f.close()
394
    finally:
395
      os.unlink(tmpname)
396

    
397

    
398
class TestShellQuoting(unittest.TestCase):
399
  """Test case for shell quoting functions"""
400

    
401
  def testShellQuote(self):
402
    self.assertEqual(ShellQuote('abc'), "abc")
403
    self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
404
    self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
405
    self.assertEqual(ShellQuote("a b c"), "'a b c'")
406
    self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
407

    
408
  def testShellQuoteArgs(self):
409
    self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
410
    self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
411
    self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
412

    
413

    
414
if __name__ == '__main__':
415
  unittest.main()