Revision 2c30e9d7 test/ganeti.utils_unittest.py

b/test/ganeti.utils_unittest.py
27 27
import tempfile
28 28
import os.path
29 29
import md5
30
import socket
31
from errno import EADDRNOTAVAIL
30 32

  
31 33
import ganeti
32 34
from ganeti.utils import IsProcessAlive, Lock, Unlock, RunCmd, \
33 35
     RemoveFile, CheckDict, MatchNameComponent, FormatUnit, \
34 36
     ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
35
     ShellQuote, ShellQuoteArgs, _ParseIpOutput
37
     ShellQuote, ShellQuoteArgs, _ParseIpOutput, TcpPing
36 38
from ganeti.errors import LockError, UnitParseError
37 39

  
40

  
38 41
class TestIsProcessAlive(unittest.TestCase):
39 42
  """Testing case for IsProcessAlive"""
40 43
  def setUp(self):
......
467 470
    self._test(output, ['127.0.0.1', '10.0.0.1', '1.2.3.4'])
468 471

  
469 472

  
473
class TestTcpPing(unittest.TestCase):
474
  """Testcase for TCP version of ping - against listen(2)ing port"""
475

  
476
  def setUp(self):
477
    self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
478
    self.listener.bind(("127.0.0.1", 0))
479
    self.listenerport = self.listener.getsockname()[1]
480
    self.listener.listen(1)
481

  
482
  def tearDown(self):
483
    self.listener.shutdown(socket.SHUT_RDWR)
484
    del self.listener
485
    del self.listenerport
486

  
487
  def testTcpPingToLocalHostAccept(self):
488
    self.assert_(TcpPing("127.0.0.1",
489
                         "127.0.0.1",
490
                         self.listenerport,
491
                         timeout=10,
492
                         live_port_needed=True),
493
                 "failed to connect to test listener")
494

  
495

  
496
class TestTcpPingDeaf(unittest.TestCase):
497
  """Testcase for TCP version of ping - against non listen(2)ing port"""
498

  
499
  def setUp(self):
500
    self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
501
    self.deaflistener.bind(("127.0.0.1", 0))
502
    self.deaflistenerport = self.deaflistener.getsockname()[1]
503

  
504
  def tearDown(self):
505
    del self.deaflistener
506
    del self.deaflistenerport
507

  
508
  def testTcpPingToLocalHostAcceptDeaf(self):
509
    self.failIf(TcpPing("127.0.0.1",
510
                        "127.0.0.1",
511
                        self.deaflistenerport,
512
                        timeout=10,  # timeout for blocking operations
513
                        live_port_needed=True), # need successful connect(2)
514
                "successfully connected to deaf listener")
515

  
516
  def testTcpPingToLocalHostNoAccept(self):
517
    self.assert_(TcpPing("127.0.0.1",
518
                         "127.0.0.1",
519
                         self.deaflistenerport,
520
                         timeout=10, # timeout for blocking operations
521
                         live_port_needed=False), # ECONNREFUSED is OK
522
                 "failed to ping alive host on deaf port")
523

  
524

  
470 525
if __name__ == '__main__':
471 526
  unittest.main()

Also available in: Unified diff