Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils_unittest.py @ 740c5aab

History | View | Annotate | Download (21.5 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the utils module"""
23

    
24
import unittest
25
import os
26
import time
27
import tempfile
28
import os.path
29
import os
30
import md5
31
import signal
32
import socket
33
import shutil
34
import re
35

    
36
import ganeti
37
import testutils
38
from ganeti import constants
39
from ganeti import utils
40
from ganeti.utils import IsProcessAlive, Lock, Unlock, RunCmd, \
41
     RemoveFile, CheckDict, MatchNameComponent, FormatUnit, \
42
     ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
43
     ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
44
     SetEtcHostsEntry, RemoveEtcHostsEntry
45
from ganeti.errors import LockError, UnitParseError
46

    
47
def _ChildHandler(signal, stack):
48
  global _ChildFlag
49
  _ChildFlag = True
50

    
51
class TestIsProcessAlive(unittest.TestCase):
52
  """Testing case for IsProcessAlive"""
53

    
54
  def setUp(self):
55
    global _ChildFlag
56
    # create a (most probably) non-existing process-id
57
    self.pid_non_existing = os.fork()
58
    if self.pid_non_existing == 0:
59
      os._exit(0)
60
    elif self.pid_non_existing > 0:
61
      os.waitpid(self.pid_non_existing, 0)
62
    else:
63
      raise SystemError("can't fork")
64
    _ChildFlag = False
65
    # Use _ChildHandler for SIGCHLD
66
    self.chldOrig = signal.signal(signal.SIGCHLD, _ChildHandler)
67
    # create a zombie
68
    self.pid_zombie = os.fork()
69
    if self.pid_zombie == 0:
70
      os._exit(0)
71
    elif self.pid_zombie < 0:
72
      raise SystemError("can't fork")
73

    
74
  def tearDown(self):
75
    signal.signal(signal.SIGCHLD, self.chldOrig)
76

    
77
  def testExists(self):
78
    mypid = os.getpid()
79
    self.assert_(IsProcessAlive(mypid),
80
                 "can't find myself running")
81

    
82
  def testZombie(self):
83
    global _ChildFlag
84
    timeout = 10
85

    
86
    while not _ChildFlag:
87
      if timeout >= 0:
88
        time.sleep(0.2)
89
        timeout -= 0.2
90
      else:
91
        self.fail("timed out waiting for child's signal")
92
        break # not executed...
93

    
94
    self.assert_(not IsProcessAlive(self.pid_zombie),
95
                 "zombie not detected as zombie")
96

    
97
  def testNotExisting(self):
98
    self.assert_(not IsProcessAlive(self.pid_non_existing),
99
                 "noexisting process detected")
100

    
101

    
102
class TestLocking(unittest.TestCase):
103
  """Testing case for the Lock/Unlock functions"""
104

    
105
  def setUp(self):
106
    lock_dir = tempfile.mkdtemp(prefix="ganeti.unittest.",
107
                                suffix=".locking")
108
    self.old_lock_dir = constants.LOCK_DIR
109
    constants.LOCK_DIR = lock_dir
110

    
111
  def tearDown(self):
112
    try:
113
      ganeti.utils.Unlock("unittest")
114
    except LockError:
115
      pass
116
    shutil.rmtree(constants.LOCK_DIR, ignore_errors=True)
117
    constants.LOCK_DIR = self.old_lock_dir
118

    
119
  def clean_lock(self, name):
120
    try:
121
      ganeti.utils.Unlock("unittest")
122
    except LockError:
123
      pass
124

    
125

    
126
  def testLock(self):
127
    self.clean_lock("unittest")
128
    self.assertEqual(None, Lock("unittest"))
129

    
130

    
131
  def testUnlock(self):
132
    self.clean_lock("unittest")
133
    ganeti.utils.Lock("unittest")
134
    self.assertEqual(None, Unlock("unittest"))
135

    
136
  def testDoubleLock(self):
137
    self.clean_lock("unittest")
138
    ganeti.utils.Lock("unittest")
139
    self.assertRaises(LockError, Lock, "unittest")
140

    
141

    
142
class TestRunCmd(unittest.TestCase):
143
  """Testing case for the RunCmd function"""
144

    
145
  def setUp(self):
146
    self.magic = time.ctime() + " ganeti test"
147

    
148
  def testOk(self):
149
    """Test successful exit code"""
150
    result = RunCmd("/bin/sh -c 'exit 0'")
151
    self.assertEqual(result.exit_code, 0)
152

    
153
  def testFail(self):
154
    """Test fail exit code"""
155
    result = RunCmd("/bin/sh -c 'exit 1'")
156
    self.assertEqual(result.exit_code, 1)
157

    
158

    
159
  def testStdout(self):
160
    """Test standard output"""
161
    cmd = 'echo -n "%s"' % self.magic
162
    result = RunCmd("/bin/sh -c '%s'" % cmd)
163
    self.assertEqual(result.stdout, self.magic)
164

    
165

    
166
  def testStderr(self):
167
    """Test standard error"""
168
    cmd = 'echo -n "%s"' % self.magic
169
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
170
    self.assertEqual(result.stderr, self.magic)
171

    
172

    
173
  def testCombined(self):
174
    """Test combined output"""
175
    cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
176
    result = RunCmd("/bin/sh -c '%s'" % cmd)
177
    self.assertEqual(result.output, "A" + self.magic + "B" + self.magic)
178

    
179
  def testSignal(self):
180
    """Test signal"""
181
    result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
182
    self.assertEqual(result.signal, 15)
183

    
184
  def testListRun(self):
185
    """Test list runs"""
186
    result = RunCmd(["true"])
187
    self.assertEqual(result.signal, None)
188
    self.assertEqual(result.exit_code, 0)
189
    result = RunCmd(["/bin/sh", "-c", "exit 1"])
190
    self.assertEqual(result.signal, None)
191
    self.assertEqual(result.exit_code, 1)
192
    result = RunCmd(["echo", "-n", self.magic])
193
    self.assertEqual(result.signal, None)
194
    self.assertEqual(result.exit_code, 0)
195
    self.assertEqual(result.stdout, self.magic)
196

    
197
  def testLang(self):
198
    """Test locale environment"""
199
    old_env = os.environ.copy()
200
    try:
201
      os.environ["LANG"] = "en_US.UTF-8"
202
      os.environ["LC_ALL"] = "en_US.UTF-8"
203
      result = RunCmd(["locale"])
204
      for line in result.output.splitlines():
205
        key, value = line.split("=", 1)
206
        # Ignore these variables, they're overridden by LC_ALL
207
        if key == "LANG" or key == "LANGUAGE":
208
          continue
209
        self.failIf(value and value != "C" and value != '"C"',
210
            "Variable %s is set to the invalid value '%s'" % (key, value))
211
    finally:
212
      os.environ = old_env
213

    
214

    
215
class TestRemoveFile(unittest.TestCase):
216
  """Test case for the RemoveFile function"""
217

    
218
  def setUp(self):
219
    """Create a temp dir and file for each case"""
220
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
221
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
222
    os.close(fd)
223

    
224
  def tearDown(self):
225
    if os.path.exists(self.tmpfile):
226
      os.unlink(self.tmpfile)
227
    os.rmdir(self.tmpdir)
228

    
229

    
230
  def testIgnoreDirs(self):
231
    """Test that RemoveFile() ignores directories"""
232
    self.assertEqual(None, RemoveFile(self.tmpdir))
233

    
234

    
235
  def testIgnoreNotExisting(self):
236
    """Test that RemoveFile() ignores non-existing files"""
237
    RemoveFile(self.tmpfile)
238
    RemoveFile(self.tmpfile)
239

    
240

    
241
  def testRemoveFile(self):
242
    """Test that RemoveFile does remove a file"""
243
    RemoveFile(self.tmpfile)
244
    if os.path.exists(self.tmpfile):
245
      self.fail("File '%s' not removed" % self.tmpfile)
246

    
247

    
248
  def testRemoveSymlink(self):
249
    """Test that RemoveFile does remove symlinks"""
250
    symlink = self.tmpdir + "/symlink"
251
    os.symlink("no-such-file", symlink)
252
    RemoveFile(symlink)
253
    if os.path.exists(symlink):
254
      self.fail("File '%s' not removed" % symlink)
255
    os.symlink(self.tmpfile, symlink)
256
    RemoveFile(symlink)
257
    if os.path.exists(symlink):
258
      self.fail("File '%s' not removed" % symlink)
259

    
260

    
261
class TestCheckdict(unittest.TestCase):
262
  """Test case for the CheckDict function"""
263

    
264
  def testAdd(self):
265
    """Test that CheckDict adds a missing key with the correct value"""
266

    
267
    tgt = {'a':1}
268
    tmpl = {'b': 2}
269
    CheckDict(tgt, tmpl)
270
    if 'b' not in tgt or tgt['b'] != 2:
271
      self.fail("Failed to update dict")
272

    
273

    
274
  def testNoUpdate(self):
275
    """Test that CheckDict does not overwrite an existing key"""
276
    tgt = {'a':1, 'b': 3}
277
    tmpl = {'b': 2}
278
    CheckDict(tgt, tmpl)
279
    self.failUnlessEqual(tgt['b'], 3)
280

    
281

    
282
class TestMatchNameComponent(unittest.TestCase):
283
  """Test case for the MatchNameComponent function"""
284

    
285
  def testEmptyList(self):
286
    """Test that there is no match against an empty list"""
287

    
288
    self.failUnlessEqual(MatchNameComponent("", []), None)
289
    self.failUnlessEqual(MatchNameComponent("test", []), None)
290

    
291
  def testSingleMatch(self):
292
    """Test that a single match is performed correctly"""
293
    mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
294
    for key in "test2", "test2.example", "test2.example.com":
295
      self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
296

    
297
  def testMultipleMatches(self):
298
    """Test that a multiple match is returned as None"""
299
    mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
300
    for key in "test1", "test1.example":
301
      self.failUnlessEqual(MatchNameComponent(key, mlist), None)
302

    
303

    
304
class TestFormatUnit(unittest.TestCase):
305
  """Test case for the FormatUnit function"""
306

    
307
  def testMiB(self):
308
    self.assertEqual(FormatUnit(1), '1M')
309
    self.assertEqual(FormatUnit(100), '100M')
310
    self.assertEqual(FormatUnit(1023), '1023M')
311

    
312
  def testGiB(self):
313
    self.assertEqual(FormatUnit(1024), '1.0G')
314
    self.assertEqual(FormatUnit(1536), '1.5G')
315
    self.assertEqual(FormatUnit(17133), '16.7G')
316
    self.assertEqual(FormatUnit(1024 * 1024 - 1), '1024.0G')
317

    
318
  def testTiB(self):
319
    self.assertEqual(FormatUnit(1024 * 1024), '1.0T')
320
    self.assertEqual(FormatUnit(5120 * 1024), '5.0T')
321
    self.assertEqual(FormatUnit(29829 * 1024), '29.1T')
322

    
323

    
324
class TestParseUnit(unittest.TestCase):
325
  """Test case for the ParseUnit function"""
326

    
327
  SCALES = (('', 1),
328
            ('M', 1), ('G', 1024), ('T', 1024 * 1024),
329
            ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
330
            ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
331

    
332
  def testRounding(self):
333
    self.assertEqual(ParseUnit('0'), 0)
334
    self.assertEqual(ParseUnit('1'), 4)
335
    self.assertEqual(ParseUnit('2'), 4)
336
    self.assertEqual(ParseUnit('3'), 4)
337

    
338
    self.assertEqual(ParseUnit('124'), 124)
339
    self.assertEqual(ParseUnit('125'), 128)
340
    self.assertEqual(ParseUnit('126'), 128)
341
    self.assertEqual(ParseUnit('127'), 128)
342
    self.assertEqual(ParseUnit('128'), 128)
343
    self.assertEqual(ParseUnit('129'), 132)
344
    self.assertEqual(ParseUnit('130'), 132)
345

    
346
  def testFloating(self):
347
    self.assertEqual(ParseUnit('0'), 0)
348
    self.assertEqual(ParseUnit('0.5'), 4)
349
    self.assertEqual(ParseUnit('1.75'), 4)
350
    self.assertEqual(ParseUnit('1.99'), 4)
351
    self.assertEqual(ParseUnit('2.00'), 4)
352
    self.assertEqual(ParseUnit('2.01'), 4)
353
    self.assertEqual(ParseUnit('3.99'), 4)
354
    self.assertEqual(ParseUnit('4.00'), 4)
355
    self.assertEqual(ParseUnit('4.01'), 8)
356
    self.assertEqual(ParseUnit('1.5G'), 1536)
357
    self.assertEqual(ParseUnit('1.8G'), 1844)
358
    self.assertEqual(ParseUnit('8.28T'), 8682212)
359

    
360
  def testSuffixes(self):
361
    for sep in ('', ' ', '   ', "\t", "\t "):
362
      for suffix, scale in TestParseUnit.SCALES:
363
        for func in (lambda x: x, str.lower, str.upper):
364
          self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
365
                           1024 * scale)
366

    
367
  def testInvalidInput(self):
368
    for sep in ('-', '_', ',', 'a'):
369
      for suffix, _ in TestParseUnit.SCALES:
370
        self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
371

    
372
    for suffix, _ in TestParseUnit.SCALES:
373
      self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
374

    
375

    
376
class TestSshKeys(testutils.GanetiTestCase):
377
  """Test case for the AddAuthorizedKey function"""
378

    
379
  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
380
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
381
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
382

    
383
  def setUp(self):
384
    (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
385
    try:
386
      handle = os.fdopen(fd, 'w')
387
      try:
388
        handle.write("%s\n" % TestSshKeys.KEY_A)
389
        handle.write("%s\n" % TestSshKeys.KEY_B)
390
      finally:
391
        handle.close()
392
    except:
393
      utils.RemoveFile(self.tmpname)
394
      raise
395

    
396
  def tearDown(self):
397
    utils.RemoveFile(self.tmpname)
398
    del self.tmpname
399

    
400
  def testAddingNewKey(self):
401
    AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
402

    
403
    self.assertFileContent(self.tmpname,
404
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
405
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
406
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
407
      "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
408

    
409
  def testAddingAlmostButNotCompletelyTheSameKey(self):
410
    AddAuthorizedKey(self.tmpname,
411
        'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
412

    
413
    self.assertFileContent(self.tmpname,
414
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
415
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
416
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
417
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
418

    
419
  def testAddingExistingKeyWithSomeMoreSpaces(self):
420
    AddAuthorizedKey(self.tmpname,
421
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
422

    
423
    self.assertFileContent(self.tmpname,
424
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
425
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
426
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
427

    
428
  def testRemovingExistingKeyWithSomeMoreSpaces(self):
429
    RemoveAuthorizedKey(self.tmpname,
430
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
431

    
432
    self.assertFileContent(self.tmpname,
433
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
434
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
435

    
436
  def testRemovingNonExistingKey(self):
437
    RemoveAuthorizedKey(self.tmpname,
438
        'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
439

    
440
    self.assertFileContent(self.tmpname,
441
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
442
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
443
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
444

    
445

    
446
class TestEtcHosts(testutils.GanetiTestCase):
447
  """Test functions modifying /etc/hosts"""
448

    
449
  def setUp(self):
450
    (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
451
    try:
452
      handle = os.fdopen(fd, 'w')
453
      try:
454
        handle.write('# This is a test file for /etc/hosts\n')
455
        handle.write('127.0.0.1\tlocalhost\n')
456
        handle.write('192.168.1.1 router gw\n')
457
      finally:
458
        handle.close()
459
    except:
460
      utils.RemoveFile(self.tmpname)
461
      raise
462

    
463
  def tearDown(self):
464
    utils.RemoveFile(self.tmpname)
465
    del self.tmpname
466

    
467
  def testSettingNewIp(self):
468
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
469

    
470
    self.assertFileContent(self.tmpname,
471
      "# This is a test file for /etc/hosts\n"
472
      "127.0.0.1\tlocalhost\n"
473
      "192.168.1.1 router gw\n"
474
      "1.2.3.4\tmyhost.domain.tld myhost\n")
475

    
476
  def testSettingExistingIp(self):
477
    SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
478
                     ['myhost'])
479

    
480
    self.assertFileContent(self.tmpname,
481
      "# This is a test file for /etc/hosts\n"
482
      "127.0.0.1\tlocalhost\n"
483
      "192.168.1.1\tmyhost.domain.tld myhost\n")
484

    
485
  def testSettingDuplicateName(self):
486
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
487

    
488
    self.assertFileContent(self.tmpname,
489
      "# This is a test file for /etc/hosts\n"
490
      "127.0.0.1\tlocalhost\n"
491
      "192.168.1.1 router gw\n"
492
      "1.2.3.4\tmyhost\n")
493

    
494
  def testRemovingExistingHost(self):
495
    RemoveEtcHostsEntry(self.tmpname, 'router')
496

    
497
    self.assertFileContent(self.tmpname,
498
      "# This is a test file for /etc/hosts\n"
499
      "127.0.0.1\tlocalhost\n"
500
      "192.168.1.1 gw\n")
501

    
502
  def testRemovingSingleExistingHost(self):
503
    RemoveEtcHostsEntry(self.tmpname, 'localhost')
504

    
505
    self.assertFileContent(self.tmpname,
506
      "# This is a test file for /etc/hosts\n"
507
      "192.168.1.1 router gw\n")
508

    
509
  def testRemovingNonExistingHost(self):
510
    RemoveEtcHostsEntry(self.tmpname, 'myhost')
511

    
512
    self.assertFileContent(self.tmpname,
513
      "# This is a test file for /etc/hosts\n"
514
      "127.0.0.1\tlocalhost\n"
515
      "192.168.1.1 router gw\n")
516

    
517
  def testRemovingAlias(self):
518
    RemoveEtcHostsEntry(self.tmpname, 'gw')
519

    
520
    self.assertFileContent(self.tmpname,
521
      "# This is a test file for /etc/hosts\n"
522
      "127.0.0.1\tlocalhost\n"
523
      "192.168.1.1 router\n")
524

    
525

    
526
class TestShellQuoting(unittest.TestCase):
527
  """Test case for shell quoting functions"""
528

    
529
  def testShellQuote(self):
530
    self.assertEqual(ShellQuote('abc'), "abc")
531
    self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
532
    self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
533
    self.assertEqual(ShellQuote("a b c"), "'a b c'")
534
    self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
535

    
536
  def testShellQuoteArgs(self):
537
    self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
538
    self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
539
    self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
540

    
541

    
542
class TestTcpPing(unittest.TestCase):
543
  """Testcase for TCP version of ping - against listen(2)ing port"""
544

    
545
  def setUp(self):
546
    self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
547
    self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
548
    self.listenerport = self.listener.getsockname()[1]
549
    self.listener.listen(1)
550

    
551
  def tearDown(self):
552
    self.listener.shutdown(socket.SHUT_RDWR)
553
    del self.listener
554
    del self.listenerport
555

    
556
  def testTcpPingToLocalHostAccept(self):
557
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
558
                         self.listenerport,
559
                         timeout=10,
560
                         live_port_needed=True,
561
                         source=constants.LOCALHOST_IP_ADDRESS,
562
                         ),
563
                 "failed to connect to test listener")
564

    
565
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
566
                         self.listenerport,
567
                         timeout=10,
568
                         live_port_needed=True,
569
                         ),
570
                 "failed to connect to test listener (no source)")
571

    
572

    
573
class TestTcpPingDeaf(unittest.TestCase):
574
  """Testcase for TCP version of ping - against non listen(2)ing port"""
575

    
576
  def setUp(self):
577
    self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
578
    self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
579
    self.deaflistenerport = self.deaflistener.getsockname()[1]
580

    
581
  def tearDown(self):
582
    del self.deaflistener
583
    del self.deaflistenerport
584

    
585
  def testTcpPingToLocalHostAcceptDeaf(self):
586
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
587
                        self.deaflistenerport,
588
                        timeout=constants.TCP_PING_TIMEOUT,
589
                        live_port_needed=True,
590
                        source=constants.LOCALHOST_IP_ADDRESS,
591
                        ), # need successful connect(2)
592
                "successfully connected to deaf listener")
593

    
594
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
595
                        self.deaflistenerport,
596
                        timeout=constants.TCP_PING_TIMEOUT,
597
                        live_port_needed=True,
598
                        ), # need successful connect(2)
599
                "successfully connected to deaf listener (no source addr)")
600

    
601
  def testTcpPingToLocalHostNoAccept(self):
602
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
603
                         self.deaflistenerport,
604
                         timeout=constants.TCP_PING_TIMEOUT,
605
                         live_port_needed=False,
606
                         source=constants.LOCALHOST_IP_ADDRESS,
607
                         ), # ECONNREFUSED is OK
608
                 "failed to ping alive host on deaf port")
609

    
610
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
611
                         self.deaflistenerport,
612
                         timeout=constants.TCP_PING_TIMEOUT,
613
                         live_port_needed=False,
614
                         ), # ECONNREFUSED is OK
615
                 "failed to ping alive host on deaf port (no source addr)")
616

    
617

    
618
class TestListVisibleFiles(unittest.TestCase):
619
  """Test case for ListVisibleFiles"""
620

    
621
  def setUp(self):
622
    self.path = tempfile.mkdtemp()
623

    
624
  def tearDown(self):
625
    shutil.rmtree(self.path)
626

    
627
  def _test(self, files, expected):
628
    # Sort a copy
629
    expected = expected[:]
630
    expected.sort()
631

    
632
    for name in files:
633
      f = open(os.path.join(self.path, name), 'w')
634
      try:
635
        f.write("Test\n")
636
      finally:
637
        f.close()
638

    
639
    found = ListVisibleFiles(self.path)
640
    found.sort()
641

    
642
    self.assertEqual(found, expected)
643

    
644
  def testAllVisible(self):
645
    files = ["a", "b", "c"]
646
    expected = files
647
    self._test(files, expected)
648

    
649
  def testNoneVisible(self):
650
    files = [".a", ".b", ".c"]
651
    expected = []
652
    self._test(files, expected)
653

    
654
  def testSomeVisible(self):
655
    files = ["a", "b", ".c"]
656
    expected = ["a", "b"]
657
    self._test(files, expected)
658

    
659

    
660
class TestNewUUID(unittest.TestCase):
661
  """Test case for NewUUID"""
662

    
663
  _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
664
                        '[a-f0-9]{4}-[a-f0-9]{12}$')
665

    
666
  def runTest(self):
667
    self.failUnless(self._re_uuid.match(utils.NewUUID()))
668

    
669

    
670
class TestUniqueSequence(unittest.TestCase):
671
  """Test case for UniqueSequence"""
672

    
673
  def _test(self, input, expected):
674
    self.assertEqual(utils.UniqueSequence(input), expected)
675

    
676
  def runTest(self):
677
    # Ordered input
678
    self._test([1, 2, 3], [1, 2, 3])
679
    self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
680
    self._test([1, 2, 2, 3], [1, 2, 3])
681
    self._test([1, 2, 3, 3], [1, 2, 3])
682

    
683
    # Unordered input
684
    self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
685
    self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
686

    
687
    # Strings
688
    self._test(["a", "a"], ["a"])
689
    self._test(["a", "b"], ["a", "b"])
690
    self._test(["a", "b", "a"], ["a", "b"])
691

    
692

    
693
if __name__ == '__main__':
694
  unittest.main()