diff --git a/lib/utils.py b/lib/utils.py index f29b903d69d45fbd81c82b4f9e940291e3ba631c..b32f041b2c41be084488ca088b0391f835d86a96 100644 --- a/lib/utils.py +++ b/lib/utils.py @@ -1762,8 +1762,12 @@ def TcpPing(target, port, timeout=10, live_port_needed=False, source=None): than C{EADDRNOTAVAIL} will be ignored """ - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + try: + family = GetAddressFamily(target) + except errors.GenericError: + return False + sock = socket.socket(family, socket.SOCK_STREAM) success = False if source is not None: diff --git a/test/ganeti.utils_unittest.py b/test/ganeti.utils_unittest.py index 6d489b2672f6e688c21fa293e0ad0a8d38a2534a..bdd96d863d8dbdbb5ef5c50237000f0e4291db64 100755 --- a/test/ganeti.utils_unittest.py +++ b/test/ganeti.utils_unittest.py @@ -1164,12 +1164,14 @@ class TestShellQuoting(unittest.TestCase): self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c") -class TestTcpPing(unittest.TestCase): - """Testcase for TCP version of ping - against listen(2)ing port""" +class _BaseTcpPingTest: + """Base class for TcpPing tests against listen(2)ing port""" + family = None + address = None def setUp(self): - self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.listener.bind((constants.IP4_ADDRESS_LOCALHOST, 0)) + self.listener = socket.socket(self.family, socket.SOCK_STREAM) + self.listener.bind((self.address, 0)) self.listenerport = self.listener.getsockname()[1] self.listener.listen(1) @@ -1179,28 +1181,56 @@ class TestTcpPing(unittest.TestCase): del self.listenerport def testTcpPingToLocalHostAccept(self): - self.assert_(TcpPing(constants.IP4_ADDRESS_LOCALHOST, + self.assert_(TcpPing(self.address, self.listenerport, - timeout=10, + timeout=constants.TCP_PING_TIMEOUT, live_port_needed=True, - source=constants.IP4_ADDRESS_LOCALHOST, + source=self.address, ), "failed to connect to test listener") - self.assert_(TcpPing(constants.IP4_ADDRESS_LOCALHOST, - self.listenerport, - timeout=10, - live_port_needed=True, - ), + self.assert_(TcpPing(self.address, self.listenerport, + timeout=constants.TCP_PING_TIMEOUT, + live_port_needed=True), "failed to connect to test listener (no source)") -class TestTcpPingDeaf(unittest.TestCase): - """Testcase for TCP version of ping - against non listen(2)ing port""" +class TestIP4TcpPing(unittest.TestCase, _BaseTcpPingTest): + """Testcase for IPv4 TCP version of ping - against listen(2)ing port""" + family = socket.AF_INET + address = constants.IP4_ADDRESS_LOCALHOST + + def setUp(self): + unittest.TestCase.setUp(self) + _BaseTcpPingTest.setUp(self) + + def tearDown(self): + unittest.TestCase.tearDown(self) + _BaseTcpPingTest.tearDown(self) + + +class TestIP6TcpPing(unittest.TestCase, _BaseTcpPingTest): + """Testcase for IPv6 TCP version of ping - against listen(2)ing port""" + family = socket.AF_INET6 + address = constants.IP6_ADDRESS_LOCALHOST + + def setUp(self): + unittest.TestCase.setUp(self) + _BaseTcpPingTest.setUp(self) + + def tearDown(self): + unittest.TestCase.tearDown(self) + _BaseTcpPingTest.tearDown(self) + + +class _BaseTcpPingDeafTest: + """Base class for TcpPing tests against non listen(2)ing port""" + family = None + address = None def setUp(self): - self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.deaflistener.bind((constants.IP4_ADDRESS_LOCALHOST, 0)) + self.deaflistener = socket.socket(self.family, socket.SOCK_STREAM) + self.deaflistener.bind((self.address, 0)) self.deaflistenerport = self.deaflistener.getsockname()[1] def tearDown(self): @@ -1208,36 +1238,65 @@ class TestTcpPingDeaf(unittest.TestCase): del self.deaflistenerport def testTcpPingToLocalHostAcceptDeaf(self): - self.failIf(TcpPing(constants.IP4_ADDRESS_LOCALHOST, - self.deaflistenerport, - timeout=constants.TCP_PING_TIMEOUT, - live_port_needed=True, - source=constants.IP4_ADDRESS_LOCALHOST, - ), # need successful connect(2) - "successfully connected to deaf listener") - - self.failIf(TcpPing(constants.IP4_ADDRESS_LOCALHOST, - self.deaflistenerport, - timeout=constants.TCP_PING_TIMEOUT, - live_port_needed=True, - ), # need successful connect(2) - "successfully connected to deaf listener (no source addr)") + self.assertFalse(TcpPing(self.address, + self.deaflistenerport, + timeout=constants.TCP_PING_TIMEOUT, + live_port_needed=True, + source=self.address, + ), # need successful connect(2) + "successfully connected to deaf listener") + + self.assertFalse(TcpPing(self.address, + self.deaflistenerport, + timeout=constants.TCP_PING_TIMEOUT, + live_port_needed=True, + ), # need successful connect(2) + "successfully connected to deaf listener (no source)") def testTcpPingToLocalHostNoAccept(self): - self.assert_(TcpPing(constants.IP4_ADDRESS_LOCALHOST, + self.assert_(TcpPing(self.address, self.deaflistenerport, timeout=constants.TCP_PING_TIMEOUT, live_port_needed=False, - source=constants.IP4_ADDRESS_LOCALHOST, + source=self.address, ), # ECONNREFUSED is OK "failed to ping alive host on deaf port") - self.assert_(TcpPing(constants.IP4_ADDRESS_LOCALHOST, + self.assert_(TcpPing(self.address, self.deaflistenerport, timeout=constants.TCP_PING_TIMEOUT, live_port_needed=False, ), # ECONNREFUSED is OK - "failed to ping alive host on deaf port (no source addr)") + "failed to ping alive host on deaf port (no source)") + + +class TestIP4TcpPingDeaf(unittest.TestCase, _BaseTcpPingDeafTest): + """Testcase for IPv4 TCP version of ping - against non listen(2)ing port""" + family = socket.AF_INET + address = constants.IP4_ADDRESS_LOCALHOST + + def setUp(self): + self.deaflistener = socket.socket(self.family, socket.SOCK_STREAM) + self.deaflistener.bind((self.address, 0)) + self.deaflistenerport = self.deaflistener.getsockname()[1] + + def tearDown(self): + del self.deaflistener + del self.deaflistenerport + + +class TestIP6TcpPingDeaf(unittest.TestCase, _BaseTcpPingDeafTest): + """Testcase for IPv6 TCP version of ping - against non listen(2)ing port""" + family = socket.AF_INET6 + address = constants.IP6_ADDRESS_LOCALHOST + + def setUp(self): + unittest.TestCase.setUp(self) + _BaseTcpPingDeafTest.setUp(self) + + def tearDown(self): + unittest.TestCase.tearDown(self) + _BaseTcpPingDeafTest.tearDown(self) class TestOwnIpAddress(unittest.TestCase):