Revision cf0e6df7 test/ganeti.utils_unittest.py
b/test/ganeti.utils_unittest.py | ||
---|---|---|
1164 | 1164 |
self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c") |
1165 | 1165 |
|
1166 | 1166 |
|
1167 |
class TestTcpPing(unittest.TestCase): |
|
1168 |
"""Testcase for TCP version of ping - against listen(2)ing port""" |
|
1167 |
class _BaseTcpPingTest: |
|
1168 |
"""Base class for TcpPing tests against listen(2)ing port""" |
|
1169 |
family = None |
|
1170 |
address = None |
|
1169 | 1171 |
|
1170 | 1172 |
def setUp(self): |
1171 |
self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
1172 |
self.listener.bind((constants.IP4_ADDRESS_LOCALHOST, 0))
|
|
1173 |
self.listener = socket.socket(self.family, socket.SOCK_STREAM)
|
|
1174 |
self.listener.bind((self.address, 0))
|
|
1173 | 1175 |
self.listenerport = self.listener.getsockname()[1] |
1174 | 1176 |
self.listener.listen(1) |
1175 | 1177 |
|
... | ... | |
1179 | 1181 |
del self.listenerport |
1180 | 1182 |
|
1181 | 1183 |
def testTcpPingToLocalHostAccept(self): |
1182 |
self.assert_(TcpPing(constants.IP4_ADDRESS_LOCALHOST,
|
|
1184 |
self.assert_(TcpPing(self.address,
|
|
1183 | 1185 |
self.listenerport, |
1184 |
timeout=10,
|
|
1186 |
timeout=constants.TCP_PING_TIMEOUT,
|
|
1185 | 1187 |
live_port_needed=True, |
1186 |
source=constants.IP4_ADDRESS_LOCALHOST,
|
|
1188 |
source=self.address,
|
|
1187 | 1189 |
), |
1188 | 1190 |
"failed to connect to test listener") |
1189 | 1191 |
|
1190 |
self.assert_(TcpPing(constants.IP4_ADDRESS_LOCALHOST, |
|
1191 |
self.listenerport, |
|
1192 |
timeout=10, |
|
1193 |
live_port_needed=True, |
|
1194 |
), |
|
1192 |
self.assert_(TcpPing(self.address, self.listenerport, |
|
1193 |
timeout=constants.TCP_PING_TIMEOUT, |
|
1194 |
live_port_needed=True), |
|
1195 | 1195 |
"failed to connect to test listener (no source)") |
1196 | 1196 |
|
1197 | 1197 |
|
1198 |
class TestTcpPingDeaf(unittest.TestCase): |
|
1199 |
"""Testcase for TCP version of ping - against non listen(2)ing port""" |
|
1198 |
class TestIP4TcpPing(unittest.TestCase, _BaseTcpPingTest): |
|
1199 |
"""Testcase for IPv4 TCP version of ping - against listen(2)ing port""" |
|
1200 |
family = socket.AF_INET |
|
1201 |
address = constants.IP4_ADDRESS_LOCALHOST |
|
1202 |
|
|
1203 |
def setUp(self): |
|
1204 |
unittest.TestCase.setUp(self) |
|
1205 |
_BaseTcpPingTest.setUp(self) |
|
1206 |
|
|
1207 |
def tearDown(self): |
|
1208 |
unittest.TestCase.tearDown(self) |
|
1209 |
_BaseTcpPingTest.tearDown(self) |
|
1210 |
|
|
1211 |
|
|
1212 |
class TestIP6TcpPing(unittest.TestCase, _BaseTcpPingTest): |
|
1213 |
"""Testcase for IPv6 TCP version of ping - against listen(2)ing port""" |
|
1214 |
family = socket.AF_INET6 |
|
1215 |
address = constants.IP6_ADDRESS_LOCALHOST |
|
1216 |
|
|
1217 |
def setUp(self): |
|
1218 |
unittest.TestCase.setUp(self) |
|
1219 |
_BaseTcpPingTest.setUp(self) |
|
1220 |
|
|
1221 |
def tearDown(self): |
|
1222 |
unittest.TestCase.tearDown(self) |
|
1223 |
_BaseTcpPingTest.tearDown(self) |
|
1224 |
|
|
1225 |
|
|
1226 |
class _BaseTcpPingDeafTest: |
|
1227 |
"""Base class for TcpPing tests against non listen(2)ing port""" |
|
1228 |
family = None |
|
1229 |
address = None |
|
1200 | 1230 |
|
1201 | 1231 |
def setUp(self): |
1202 |
self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
1203 |
self.deaflistener.bind((constants.IP4_ADDRESS_LOCALHOST, 0))
|
|
1232 |
self.deaflistener = socket.socket(self.family, socket.SOCK_STREAM)
|
|
1233 |
self.deaflistener.bind((self.address, 0))
|
|
1204 | 1234 |
self.deaflistenerport = self.deaflistener.getsockname()[1] |
1205 | 1235 |
|
1206 | 1236 |
def tearDown(self): |
... | ... | |
1208 | 1238 |
del self.deaflistenerport |
1209 | 1239 |
|
1210 | 1240 |
def testTcpPingToLocalHostAcceptDeaf(self): |
1211 |
self.failIf(TcpPing(constants.IP4_ADDRESS_LOCALHOST,
|
|
1212 |
self.deaflistenerport, |
|
1213 |
timeout=constants.TCP_PING_TIMEOUT, |
|
1214 |
live_port_needed=True, |
|
1215 |
source=constants.IP4_ADDRESS_LOCALHOST,
|
|
1216 |
), # need successful connect(2) |
|
1217 |
"successfully connected to deaf listener") |
|
1218 |
|
|
1219 |
self.failIf(TcpPing(constants.IP4_ADDRESS_LOCALHOST,
|
|
1220 |
self.deaflistenerport, |
|
1221 |
timeout=constants.TCP_PING_TIMEOUT, |
|
1222 |
live_port_needed=True, |
|
1223 |
), # need successful connect(2) |
|
1224 |
"successfully connected to deaf listener (no source addr)")
|
|
1241 |
self.assertFalse(TcpPing(self.address,
|
|
1242 |
self.deaflistenerport,
|
|
1243 |
timeout=constants.TCP_PING_TIMEOUT,
|
|
1244 |
live_port_needed=True,
|
|
1245 |
source=self.address,
|
|
1246 |
), # need successful connect(2)
|
|
1247 |
"successfully connected to deaf listener")
|
|
1248 |
|
|
1249 |
self.assertFalse(TcpPing(self.address,
|
|
1250 |
self.deaflistenerport,
|
|
1251 |
timeout=constants.TCP_PING_TIMEOUT,
|
|
1252 |
live_port_needed=True,
|
|
1253 |
), # need successful connect(2)
|
|
1254 |
"successfully connected to deaf listener (no source)")
|
|
1225 | 1255 |
|
1226 | 1256 |
def testTcpPingToLocalHostNoAccept(self): |
1227 |
self.assert_(TcpPing(constants.IP4_ADDRESS_LOCALHOST,
|
|
1257 |
self.assert_(TcpPing(self.address,
|
|
1228 | 1258 |
self.deaflistenerport, |
1229 | 1259 |
timeout=constants.TCP_PING_TIMEOUT, |
1230 | 1260 |
live_port_needed=False, |
1231 |
source=constants.IP4_ADDRESS_LOCALHOST,
|
|
1261 |
source=self.address,
|
|
1232 | 1262 |
), # ECONNREFUSED is OK |
1233 | 1263 |
"failed to ping alive host on deaf port") |
1234 | 1264 |
|
1235 |
self.assert_(TcpPing(constants.IP4_ADDRESS_LOCALHOST,
|
|
1265 |
self.assert_(TcpPing(self.address,
|
|
1236 | 1266 |
self.deaflistenerport, |
1237 | 1267 |
timeout=constants.TCP_PING_TIMEOUT, |
1238 | 1268 |
live_port_needed=False, |
1239 | 1269 |
), # ECONNREFUSED is OK |
1240 |
"failed to ping alive host on deaf port (no source addr)") |
|
1270 |
"failed to ping alive host on deaf port (no source)") |
|
1271 |
|
|
1272 |
|
|
1273 |
class TestIP4TcpPingDeaf(unittest.TestCase, _BaseTcpPingDeafTest): |
|
1274 |
"""Testcase for IPv4 TCP version of ping - against non listen(2)ing port""" |
|
1275 |
family = socket.AF_INET |
|
1276 |
address = constants.IP4_ADDRESS_LOCALHOST |
|
1277 |
|
|
1278 |
def setUp(self): |
|
1279 |
self.deaflistener = socket.socket(self.family, socket.SOCK_STREAM) |
|
1280 |
self.deaflistener.bind((self.address, 0)) |
|
1281 |
self.deaflistenerport = self.deaflistener.getsockname()[1] |
|
1282 |
|
|
1283 |
def tearDown(self): |
|
1284 |
del self.deaflistener |
|
1285 |
del self.deaflistenerport |
|
1286 |
|
|
1287 |
|
|
1288 |
class TestIP6TcpPingDeaf(unittest.TestCase, _BaseTcpPingDeafTest): |
|
1289 |
"""Testcase for IPv6 TCP version of ping - against non listen(2)ing port""" |
|
1290 |
family = socket.AF_INET6 |
|
1291 |
address = constants.IP6_ADDRESS_LOCALHOST |
|
1292 |
|
|
1293 |
def setUp(self): |
|
1294 |
unittest.TestCase.setUp(self) |
|
1295 |
_BaseTcpPingDeafTest.setUp(self) |
|
1296 |
|
|
1297 |
def tearDown(self): |
|
1298 |
unittest.TestCase.tearDown(self) |
|
1299 |
_BaseTcpPingDeafTest.tearDown(self) |
|
1241 | 1300 |
|
1242 | 1301 |
|
1243 | 1302 |
class TestOwnIpAddress(unittest.TestCase): |
Also available in: Unified diff