Introduce utils.IsValidIP{4,6}()
authorManuel Franceschini <livewire@google.com>
Thu, 24 Jun 2010 09:49:54 +0000 (11:49 +0200)
committerManuel Franceschini <livewire@google.com>
Wed, 30 Jun 2010 11:32:42 +0000 (13:32 +0200)
This patch introduces functions to check for valid IPv4 and IPv6
addresses and converts IsValidIP() to return True if it is either a IPv4
or a IPv6 address.

For now we do not change the functional behavior and replace IsValidIP
with IsValidIP4. This might change in the future.

Signed-off-by: Manuel Franceschini <livewire@google.com>
Reviewed-by: Iustin Pop <iustin@google.com>

lib/bootstrap.py
lib/cmdlib.py
lib/hypervisor/hv_kvm.py
lib/hypervisor/hv_xen.py
lib/utils.py
scripts/gnt-instance
test/ganeti.utils_unittest.py

index 57b29cc..94ad63e 100644 (file)
@@ -259,7 +259,7 @@ def InitCluster(cluster_name, mac_prefix,
                                errors.ECODE_NOTUNIQUE)
 
   if secondary_ip:
                                errors.ECODE_NOTUNIQUE)
 
   if secondary_ip:
-    if not utils.IsValidIP(secondary_ip):
+    if not utils.IsValidIP4(secondary_ip):
       raise errors.OpPrereqError("Invalid secondary ip given",
                                  errors.ECODE_INVAL)
     if (secondary_ip != hostname.ip and
       raise errors.OpPrereqError("Invalid secondary ip given",
                                  errors.ECODE_INVAL)
     if (secondary_ip != hostname.ip and
index 60f9aed..a8f9d17 100644 (file)
@@ -3594,7 +3594,7 @@ class LUAddNode(LogicalUnit):
     primary_ip = self.op.primary_ip = dns_data.ip
     if self.op.secondary_ip is None:
       self.op.secondary_ip = primary_ip
     primary_ip = self.op.primary_ip = dns_data.ip
     if self.op.secondary_ip is None:
       self.op.secondary_ip = primary_ip
-    if not utils.IsValidIP(self.op.secondary_ip):
+    if not utils.IsValidIP4(self.op.secondary_ip):
       raise errors.OpPrereqError("Invalid secondary IP given",
                                  errors.ECODE_INVAL)
     secondary_ip = self.op.secondary_ip
       raise errors.OpPrereqError("Invalid secondary IP given",
                                  errors.ECODE_INVAL)
     secondary_ip = self.op.secondary_ip
@@ -6839,7 +6839,7 @@ class LUCreateInstance(LogicalUnit):
                                      errors.ECODE_INVAL)
         nic_ip = self.hostname1.ip
       else:
                                      errors.ECODE_INVAL)
         nic_ip = self.hostname1.ip
       else:
-        if not utils.IsValidIP(ip):
+        if not utils.IsValidIP4(ip):
           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
                                      " like a valid IP" % ip,
                                      errors.ECODE_INVAL)
           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
                                      " like a valid IP" % ip,
                                      errors.ECODE_INVAL)
@@ -8513,7 +8513,7 @@ class LUSetInstanceParams(LogicalUnit):
         if nic_ip.lower() == constants.VALUE_NONE:
           nic_dict['ip'] = None
         else:
         if nic_ip.lower() == constants.VALUE_NONE:
           nic_dict['ip'] = None
         else:
-          if not utils.IsValidIP(nic_ip):
+          if not utils.IsValidIP4(nic_ip):
             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip,
                                        errors.ECODE_INVAL)
 
             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip,
                                        errors.ECODE_INVAL)
 
index 67d83cc..400cd85 100644 (file)
@@ -71,7 +71,7 @@ class KVMHypervisor(hv_base.BaseHypervisor):
     constants.HV_ACPI: hv_base.NO_CHECK,
     constants.HV_SERIAL_CONSOLE: hv_base.NO_CHECK,
     constants.HV_VNC_BIND_ADDRESS:
     constants.HV_ACPI: hv_base.NO_CHECK,
     constants.HV_SERIAL_CONSOLE: hv_base.NO_CHECK,
     constants.HV_VNC_BIND_ADDRESS:
-      (False, lambda x: (utils.IsValidIP(x) or utils.IsNormAbsPath(x)),
+      (False, lambda x: (utils.IsValidIP4(x) or utils.IsNormAbsPath(x)),
        "the VNC bind address must be either a valid IP address or an absolute"
        " pathname", None, None),
     constants.HV_VNC_TLS: hv_base.NO_CHECK,
        "the VNC bind address must be either a valid IP address or an absolute"
        " pathname", None, None),
     constants.HV_VNC_TLS: hv_base.NO_CHECK,
@@ -514,7 +514,7 @@ class KVMHypervisor(hv_base.BaseHypervisor):
 
     vnc_bind_address = hvp[constants.HV_VNC_BIND_ADDRESS]
     if vnc_bind_address:
 
     vnc_bind_address = hvp[constants.HV_VNC_BIND_ADDRESS]
     if vnc_bind_address:
-      if utils.IsValidIP(vnc_bind_address):
+      if utils.IsValidIP4(vnc_bind_address):
         if instance.network_port > constants.VNC_BASE_PORT:
           display = instance.network_port - constants.VNC_BASE_PORT
           if vnc_bind_address == constants.IP4_ADDRESS_ANY:
         if instance.network_port > constants.VNC_BASE_PORT:
           display = instance.network_port - constants.VNC_BASE_PORT
           if vnc_bind_address == constants.IP4_ADDRESS_ANY:
index acf5e0b..876e072 100644 (file)
@@ -549,7 +549,7 @@ class XenHvmHypervisor(XenHypervisor):
       hv_base.ParamInSet(True, constants.HT_HVM_VALID_NIC_TYPES),
     constants.HV_PAE: hv_base.NO_CHECK,
     constants.HV_VNC_BIND_ADDRESS:
       hv_base.ParamInSet(True, constants.HT_HVM_VALID_NIC_TYPES),
     constants.HV_PAE: hv_base.NO_CHECK,
     constants.HV_VNC_BIND_ADDRESS:
-      (False, utils.IsValidIP,
+      (False, utils.IsValidIP4,
        "VNC bind address is not a valid IP address", None, None),
     constants.HV_KERNEL_PATH: hv_base.REQ_FILE_CHECK,
     constants.HV_DEVICE_MODEL: hv_base.REQ_FILE_CHECK,
        "VNC bind address is not a valid IP address", None, None),
     constants.HV_KERNEL_PATH: hv_base.REQ_FILE_CHECK,
     constants.HV_DEVICE_MODEL: hv_base.REQ_FILE_CHECK,
index ebbde57..2f6d04e 100644 (file)
@@ -1292,22 +1292,64 @@ def TryConvert(fn, val):
   return nv
 
 
   return nv
 
 
+def _GenericIsValidIP(family, ip):
+  """Generic internal version of ip validation.
+
+  @type family: int
+  @param family: socket.AF_INET | socket.AF_INET6
+  @type ip: str
+  @param ip: the address to be checked
+  @rtype: boolean
+  @return: True if ip is valid, False otherwise
+
+  """
+  try:
+    socket.inet_pton(family, ip)
+    return True
+  except socket.error:
+    return False
+
+
+def IsValidIP4(ip):
+  """Verifies an IPv4 address.
+
+  This function checks if the given address is a valid IPv4 address.
+
+  @type ip: str
+  @param ip: the address to be checked
+  @rtype: boolean
+  @return: True if ip is valid, False otherwise
+
+  """
+  return _GenericIsValidIP(socket.AF_INET, ip)
+
+
+def IsValidIP6(ip):
+  """Verifies an IPv6 address.
+
+  This function checks if the given address is a valid IPv6 address.
+
+  @type ip: str
+  @param ip: the address to be checked
+  @rtype: boolean
+  @return: True if ip is valid, False otherwise
+
+  """
+  return _GenericIsValidIP(socket.AF_INET6, ip)
+
+
 def IsValidIP(ip):
 def IsValidIP(ip):
-  """Verifies the syntax of an IPv4 address.
+  """Verifies an IP address.
 
 
-  This function checks if the IPv4 address passes is valid or not based
-  on syntax (not IP range, class calculations, etc.).
+  This function checks if the given IP address (both IPv4 and IPv6) is valid.
 
   @type ip: str
   @param ip: the address to be checked
 
   @type ip: str
   @param ip: the address to be checked
-  @rtype: a regular expression match object
-  @return: a regular expression match object, or None if the
-      address is not valid
+  @rtype: boolean
+  @return: True if ip is valid, False otherwise
 
   """
 
   """
-  unit = "(0|[1-9]\d{0,2})"
-  #TODO: convert and return only boolean
-  return re.match("^%s\.%s\.%s\.%s$" % (unit, unit, unit, unit), ip)
+  return IsValidIP4(ip) or IsValidIP6(ip)
 
 
 def IsValidShellParam(word):
 
 
 def IsValidShellParam(word):
index 521be30..82abc4d 100755 (executable)
@@ -1191,7 +1191,7 @@ def ShowInstanceConfig(opts, args):
         vnc_console_port = "%s:%s (display %s)" % (instance["pnode"],
                                                    port,
                                                    display)
         vnc_console_port = "%s:%s (display %s)" % (instance["pnode"],
                                                    port,
                                                    display)
-      elif display > 0 and utils.IsValidIP(vnc_bind_address):
+      elif display > 0 and utils.IsValidIP4(vnc_bind_address):
         vnc_console_port = ("%s:%s (node %s) (display %s)" %
                              (vnc_bind_address, port,
                               instance["pnode"], display))
         vnc_console_port = ("%s:%s (node %s) (display %s)" %
                              (vnc_bind_address, port,
                               instance["pnode"], display))
index 50ed4e9..e0cdd0d 100755 (executable)
@@ -2426,5 +2426,43 @@ class RunIgnoreProcessNotFound(unittest.TestCase):
     self.assertFalse(utils.IgnoreProcessNotFound(os.kill, pid, 0))
 
 
     self.assertFalse(utils.IgnoreProcessNotFound(os.kill, pid, 0))
 
 
+class TestIsValidIP4(unittest.TestCase):
+  def test(self):
+    self.assert_(utils.IsValidIP4("127.0.0.1"))
+    self.assert_(utils.IsValidIP4("0.0.0.0"))
+    self.assert_(utils.IsValidIP4("255.255.255.255"))
+    self.assertFalse(utils.IsValidIP4("0"))
+    self.assertFalse(utils.IsValidIP4("1"))
+    self.assertFalse(utils.IsValidIP4("1.1.1"))
+    self.assertFalse(utils.IsValidIP4("255.255.255.256"))
+    self.assertFalse(utils.IsValidIP4("::1"))
+
+
+class TestIsValidIP6(unittest.TestCase):
+  def test(self):
+    self.assert_(utils.IsValidIP6("::"))
+    self.assert_(utils.IsValidIP6("::1"))
+    self.assert_(utils.IsValidIP6("1" + (":1" * 7)))
+    self.assert_(utils.IsValidIP6("ffff" + (":ffff" * 7)))
+    self.assertFalse(utils.IsValidIP6("0"))
+    self.assertFalse(utils.IsValidIP6(":1"))
+    self.assertFalse(utils.IsValidIP6("f" + (":f" * 6)))
+    self.assertFalse(utils.IsValidIP6("fffg" + (":ffff" * 7)))
+    self.assertFalse(utils.IsValidIP6("fffff" + (":ffff" * 7)))
+    self.assertFalse(utils.IsValidIP6("1" + (":1" * 8)))
+    self.assertFalse(utils.IsValidIP6("127.0.0.1"))
+
+
+class TestIsValidIP(unittest.TestCase):
+  def test(self):
+    self.assert_(utils.IsValidIP("0.0.0.0"))
+    self.assert_(utils.IsValidIP("127.0.0.1"))
+    self.assert_(utils.IsValidIP("::"))
+    self.assert_(utils.IsValidIP("::1"))
+    self.assertFalse(utils.IsValidIP("0"))
+    self.assertFalse(utils.IsValidIP("1.1.1.256"))
+    self.assertFalse(utils.IsValidIP("a:g::1"))
+
+
 if __name__ == '__main__':
   testutils.GanetiTestProgram()
 if __name__ == '__main__':
   testutils.GanetiTestProgram()