-class TestTcpPing(unittest.TestCase):
- """Testcase for TCP version of ping - against listen(2)ing port"""
-
- def setUp(self):
- self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.listener.bind((constants.IP4_ADDRESS_LOCALHOST, 0))
- self.listenerport = self.listener.getsockname()[1]
- self.listener.listen(1)
-
- def tearDown(self):
- self.listener.shutdown(socket.SHUT_RDWR)
- del self.listener
- del self.listenerport
-
- def testTcpPingToLocalHostAccept(self):
- self.assert_(TcpPing(constants.IP4_ADDRESS_LOCALHOST,
- self.listenerport,
- timeout=10,
- live_port_needed=True,
- source=constants.IP4_ADDRESS_LOCALHOST,
- ),
- "failed to connect to test listener")
-
- self.assert_(TcpPing(constants.IP4_ADDRESS_LOCALHOST,
- self.listenerport,
- timeout=10,
- live_port_needed=True,
- ),
- "failed to connect to test listener (no source)")
-
-
-class TestTcpPingDeaf(unittest.TestCase):
- """Testcase for TCP version of ping - against non listen(2)ing port"""
-
- def setUp(self):
- self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.deaflistener.bind((constants.IP4_ADDRESS_LOCALHOST, 0))
- self.deaflistenerport = self.deaflistener.getsockname()[1]
-
- def tearDown(self):
- del self.deaflistener
- del self.deaflistenerport
-
- def testTcpPingToLocalHostAcceptDeaf(self):
- self.failIf(TcpPing(constants.IP4_ADDRESS_LOCALHOST,
- self.deaflistenerport,
- timeout=constants.TCP_PING_TIMEOUT,
- live_port_needed=True,
- source=constants.IP4_ADDRESS_LOCALHOST,
- ), # need successful connect(2)
- "successfully connected to deaf listener")
-
- self.failIf(TcpPing(constants.IP4_ADDRESS_LOCALHOST,
- self.deaflistenerport,
- timeout=constants.TCP_PING_TIMEOUT,
- live_port_needed=True,
- ), # need successful connect(2)
- "successfully connected to deaf listener (no source addr)")
-
- def testTcpPingToLocalHostNoAccept(self):
- self.assert_(TcpPing(constants.IP4_ADDRESS_LOCALHOST,
- self.deaflistenerport,
- timeout=constants.TCP_PING_TIMEOUT,
- live_port_needed=False,
- source=constants.IP4_ADDRESS_LOCALHOST,
- ), # ECONNREFUSED is OK
- "failed to ping alive host on deaf port")
-
- self.assert_(TcpPing(constants.IP4_ADDRESS_LOCALHOST,
- self.deaflistenerport,
- timeout=constants.TCP_PING_TIMEOUT,
- live_port_needed=False,
- ), # ECONNREFUSED is OK
- "failed to ping alive host on deaf port (no source addr)")
-
-
-class TestOwnIpAddress(unittest.TestCase):
- """Testcase for OwnIpAddress"""
-
- def testOwnLoopback(self):
- """check having the loopback ip"""
- self.failUnless(OwnIpAddress(constants.IP4_ADDRESS_LOCALHOST),
- "Should own the loopback address")
-
- def testNowOwnAddress(self):
- """check that I don't own an address"""
-
- # Network 192.0.2.0/24 is reserved for test/documentation as per
- # RFC 5735, so we *should* not have an address of this range... if
- # this fails, we should extend the test to multiple addresses
- DST_IP = "192.0.2.1"
- self.failIf(OwnIpAddress(DST_IP), "Should not own IP address %s" % DST_IP)
-
-
-def _GetSocketCredentials(path):
- """Connect to a Unix socket and return remote credentials.
-
- """
- sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- try:
- sock.settimeout(10)
- sock.connect(path)
- return utils.GetSocketCredentials(sock)
- finally:
- sock.close()
-
-
-class TestGetSocketCredentials(unittest.TestCase):
- def setUp(self):
- self.tmpdir = tempfile.mkdtemp()
- self.sockpath = utils.PathJoin(self.tmpdir, "sock")
-
- self.listener = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- self.listener.settimeout(10)
- self.listener.bind(self.sockpath)
- self.listener.listen(1)
-
- def tearDown(self):
- self.listener.shutdown(socket.SHUT_RDWR)
- self.listener.close()
- shutil.rmtree(self.tmpdir)
-
- def test(self):
- (c2pr, c2pw) = os.pipe()
-
- # Start child process
- child = os.fork()
- if child == 0:
- try:
- data = serializer.DumpJson(_GetSocketCredentials(self.sockpath))
-
- os.write(c2pw, data)
- os.close(c2pw)
-
- os._exit(0)
- finally:
- os._exit(1)
-
- os.close(c2pw)
-
- # Wait for one connection
- (conn, _) = self.listener.accept()
- conn.recv(1)
- conn.close()
-
- # Wait for result
- result = os.read(c2pr, 4096)
- os.close(c2pr)
-
- # Check child's exit code
- (_, status) = os.waitpid(child, 0)
- self.assertFalse(os.WIFSIGNALED(status))
- self.assertEqual(os.WEXITSTATUS(status), 0)
-
- # Check result
- (pid, uid, gid) = serializer.LoadJson(result)
- self.assertEqual(pid, os.getpid())
- self.assertEqual(uid, os.getuid())
- self.assertEqual(gid, os.getgid())
-
-