Commit 37531236 authored by Andrea Spadaccini's avatar Andrea Spadaccini Committed by Michael Hanselmann
Browse files

Added helper functions in netutils and related constants



Added the following functions to netutils:
- IsValidInterface
- GetInterfaceIpAddresses
- _GetIpAddressesFromIpOutput

Added the following static methods to netutils.IPAddress:
- GetAddressFamilyFromVersion
- GetVersionFromAddressFamily

Added unit tests for the new methods in netutils.IPAddress, for the IP
address search regex and for GetInterfaceIpAddresses
Signed-off-by: default avatarAndrea Spadaccini <spadaccio@google.com>
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarMichael Hanselmann <hansmi@google.com>
parent 140ff718
......@@ -28,13 +28,16 @@ the command line scripts.
import errno
import os
import re
import socket
import struct
import IN
import logging
from ganeti import constants
from ganeti import errors
from ganeti import utils
# Structure definition for getsockopt(SOL_SOCKET, SO_PEERCRED, ...):
# struct ucred { pid_t pid; uid_t uid; gid_t gid; };
......@@ -48,6 +51,38 @@ from ganeti import errors
_STRUCT_UCRED = "iII"
_STRUCT_UCRED_SIZE = struct.calcsize(_STRUCT_UCRED)
# Regexes used to find IP addresses in the output of ip.
_IP_RE_TEXT = r"[.:a-z0-9]+" # separate for testing purposes
_IP_FAMILY_RE = re.compile(r"(?P<family>inet6?)\s+(?P<ip>%s)/" % _IP_RE_TEXT,
re.IGNORECASE)
# Dict used to convert from a string representing an IP family to an IP
# version
_NAME_TO_IP_VER = {
"inet": constants.IP4_VERSION,
"inet6": constants.IP6_VERSION,
}
def _GetIpAddressesFromIpOutput(ip_output):
"""Parses the output of the ip command and retrieves the IP addresses and
version.
@param ip_output: string containing the output of the ip command;
@rtype: dict; (int, list)
@return: a dict having as keys the IP versions and as values the
corresponding list of addresses found in the IP output.
"""
addr = dict((i, []) for i in _NAME_TO_IP_VER.values())
for row in ip_output.splitlines():
match = _IP_FAMILY_RE.search(row)
if match and IPAddress.IsValid(match.group("ip")):
addr[_NAME_TO_IP_VER[match.group("family")]].append(match.group("ip"))
return addr
def GetSocketCredentials(sock):
"""Returns the credentials of the foreign process connected to a socket.
......@@ -62,6 +97,39 @@ def GetSocketCredentials(sock):
return struct.unpack(_STRUCT_UCRED, peercred)
def IsValidInterface(ifname):
"""Validate an interface name.
@type ifname: string
@param ifname: Name of the network interface
@return: boolean indicating whether the interface name is valid or not.
"""
return os.path.exists(utils.PathJoin("/sys/class/net", ifname))
def GetInterfaceIpAddresses(ifname):
"""Returns the IP addresses associated to the interface.
@type ifname: string
@param ifname: Name of the network interface
@return: A dict having for keys the IP version (either
L{constants.IP4_VERSION} or L{constants.IP6_VERSION}) and for
values the lists of IP addresses of the respective version
associated to the interface
"""
result = utils.RunCmd([constants.IP_COMMAND_PATH, "-o", "addr", "show",
ifname])
if result.failed:
logging.error("Error running the ip command while getting the IP"
" addresses of %s", ifname)
return None
return _GetIpAddressesFromIpOutput(result.output)
def GetHostname(name=None, family=None):
"""Returns a Hostname object.
......@@ -366,7 +434,7 @@ class IPAddress(object):
@type address: str
@param address: ip address whose family will be returned
@rtype: int
@return: socket.AF_INET or socket.AF_INET6
@return: C{socket.AF_INET} or C{socket.AF_INET6}
@raise errors.GenericError: for invalid addresses
"""
......@@ -382,6 +450,43 @@ class IPAddress(object):
raise errors.IPAddressError("Invalid address '%s'" % address)
@staticmethod
def GetVersionFromAddressFamily(family):
"""Convert an IP address family to the corresponding IP version.
@type family: int
@param family: IP address family, one of socket.AF_INET or socket.AF_INET6
@return: an int containing the IP version, one of L{constants.IP4_VERSION}
or L{constants.IP6_VERSION}
@raise errors.ProgrammerError: for unknown families
"""
if family == socket.AF_INET:
return constants.IP4_VERSION
elif family == socket.AF_INET6:
return constants.IP6_VERSION
raise errors.ProgrammerError("%s is not a valid IP address family" % family)
@staticmethod
def GetAddressFamilyFromVersion(version):
"""Convert an IP version to the corresponding IP address family.
@type version: int
@param version: IP version, one of L{constants.IP4_VERSION} or
L{constants.IP6_VERSION}
@return: an int containing the IP address family, one of C{socket.AF_INET}
or C{socket.AF_INET6}
@raise errors.ProgrammerError: for unknown IP versions
"""
if version == constants.IP4_VERSION:
return socket.AF_INET
elif version == constants.IP6_VERSION:
return socket.AF_INET6
raise errors.ProgrammerError("%s is not a valid IP version" % version)
@classmethod
def IsLoopback(cls, address):
"""Determine whether it is a loopback address.
......
7: dummy0: <BROADCAST,NOARP> mtu 1500 qdisc noop state DOWN
link/ether 06:d2:06:24:99:dc brd ff:ff:ff:ff:ff:ff
inet 192.0.2.1/32 scope global dummy0
inet6 2001:db8:85a3::8a2e:370:7334/128 scope global
valid_lft forever preferred_lft forever
1: lo: <LOOPBACK,UP,LOWER_UP> mtu 16436 qdisc noqueue state UNKNOWN
inet 127.0.0.1/8 scope host lo
1: lo: <LOOPBACK,UP,LOWER_UP> mtu 16436
inet6 ::1/128 scope host
valid_lft forever preferred_lft forever
1: lo inet 127.0.0.1/8 scope host lo
1: lo inet6 ::1/128 scope host \ valid_lft forever preferred_lft forever
1: lo: <LOOPBACK,UP,LOWER_UP> mtu 16436 qdisc noqueue state UNKNOWN \ link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00
1: lo inet 127.0.0.1/8 scope host lo
1: lo inet6 ::1/128 scope host \ valid_lft forever preferred_lft forever
1: lo: <LOOPBACK,UP,LOWER_UP> mtu 16436 qdisc noqueue state UNKNOWN
link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00
inet 127.0.0.1/8 scope host lo
inet6 ::1/128 scope host
valid_lft forever preferred_lft forever
......@@ -22,6 +22,7 @@
"""Script for unittesting the netutils module"""
import os
import re
import shutil
import socket
import tempfile
......@@ -171,6 +172,27 @@ class TestIPAddress(unittest.TestCase):
self.assertFalse(netutils.IPAddress.Own("192.0.2.1"),
"Should not own IP address 192.0.2.1")
def testFamilyVersionConversions(self):
# IPAddress.GetAddressFamilyFromVersion
self.assertEqual(
netutils.IPAddress.GetAddressFamilyFromVersion(constants.IP4_VERSION),
socket.AF_INET)
self.assertEqual(
netutils.IPAddress.GetAddressFamilyFromVersion(constants.IP6_VERSION),
socket.AF_INET6)
self.assertRaises(errors.ProgrammerError,
netutils.IPAddress.GetAddressFamilyFromVersion, 3)
# IPAddress.GetVersionFromAddressFamily
self.assertEqual(
netutils.IPAddress.GetVersionFromAddressFamily(socket.AF_INET),
constants.IP4_VERSION)
self.assertEqual(
netutils.IPAddress.GetVersionFromAddressFamily(socket.AF_INET6),
constants.IP6_VERSION)
self.assertRaises(errors.ProgrammerError,
netutils.IPAddress.GetVersionFromAddressFamily, socket.AF_UNIX)
class TestIP4Address(unittest.TestCase):
def testGetIPIntFromString(self):
......@@ -421,6 +443,66 @@ class TestFormatAddress(unittest.TestCase):
self.assertRaises(errors.ParameterError, netutils.FormatAddress,
("::1"), family=socket.AF_INET )
class TestIpParsing(testutils.GanetiTestCase):
"""Test the code that parses the ip command output"""
def testIp4(self):
valid_addresses = [constants.IP4_ADDRESS_ANY,
constants.IP4_ADDRESS_LOCALHOST,
"192.0.2.1", # RFC5737, IPv4 address blocks for docs
"198.51.100.1",
"203.0.113.1",
]
for addr in valid_addresses:
self.failUnless(re.search(netutils._IP_RE_TEXT, addr))
def testIp6(self):
valid_addresses = [constants.IP6_ADDRESS_ANY,
constants.IP6_ADDRESS_LOCALHOST,
"0:0:0:0:0:0:0:1", # other form for IP6_ADDRESS_LOCALHOST
"0:0:0:0:0:0:0:0", # other form for IP6_ADDRESS_ANY
"2001:db8:85a3::8a2e:370:7334", # RFC3849 IP6 docs block
"2001:0db8:85a3:0000:0000:8a2e:0370:7334",
"0:0:0:0:0:FFFF:192.0.2.1", # IPv4-compatible IPv6
"::FFFF:192.0.2.1",
"0:0:0:0:0:0:203.0.113.1", # IPv4-mapped IPv6
"::203.0.113.1",
]
for addr in valid_addresses:
self.failUnless(re.search(netutils._IP_RE_TEXT, addr))
def testParseIpCommandOutput(self):
# IPv4-only, fake loopback interface
tests = ["ip-addr-show-lo-ipv4.txt", "ip-addr-show-lo-oneline-ipv4.txt"]
for test_file in tests:
data = self._ReadTestData(test_file)
addr = netutils._GetIpAddressesFromIpOutput(data)
self.failUnless(len(addr[4]) == 1 and addr[4][0] == "127.0.0.1" and not
addr[6])
# IPv6-only, fake loopback interface
tests = ["ip-addr-show-lo-ipv6.txt", "ip-addr-show-lo-ipv6.txt"]
for test_file in tests:
data = self._ReadTestData(test_file)
addr = netutils._GetIpAddressesFromIpOutput(data)
self.failUnless(len(addr[6]) == 1 and addr[6][0] == "::1" and not addr[4])
# IPv4 and IPv6, fake loopback interface
tests = ["ip-addr-show-lo.txt", "ip-addr-show-lo-oneline.txt"]
for test_file in tests:
data = self._ReadTestData(test_file)
addr = netutils._GetIpAddressesFromIpOutput(data)
self.failUnless(len(addr[6]) == 1 and addr[6][0] == "::1" and
len(addr[4]) == 1 and addr[4][0] == "127.0.0.1")
# IPv4 and IPv6, dummy interface
data = self._ReadTestData("ip-addr-show-dummy0.txt")
addr = netutils._GetIpAddressesFromIpOutput(data)
self.failUnless(len(addr[6]) == 1 and
addr[6][0] == "2001:db8:85a3::8a2e:370:7334" and
len(addr[4]) == 1 and
addr[4][0] == "192.0.2.1")
if __name__ == "__main__":
testutils.GanetiTestProgram()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment