Implement utils.RunParts and use it for hooks
[ganeti-local] / lib / config.py
index 661e2e8..47de2b5 100644 (file)
@@ -32,7 +32,6 @@ much memory.
 """
 
 import os
-import tempfile
 import random
 import logging
 import time
@@ -48,6 +47,9 @@ from ganeti import serializer
 
 _config_lock = locking.SharedLock()
 
+# job id used for resource management at config upgrade time
+_UPGRADE_CONFIG_JID = "jid-cfg-upgrade"
+
 
 def _ValidateConfig(data):
   """Verifies that a configuration objects looks valid.
@@ -65,6 +67,61 @@ def _ValidateConfig(data):
                                      constants.CONFIG_VERSION))
 
 
+class TemporaryReservationManager:
+  """A temporary resource reservation manager.
+
+  This is used to reserve resources in a job, before using them, making sure
+  other jobs cannot get them in the meantime.
+
+  """
+  def __init__(self):
+    self._ec_reserved = {}
+
+  def Reserved(self, resource):
+    for holder_reserved in self._ec_reserved.items():
+      if resource in holder_reserved:
+        return True
+    return False
+
+  def Reserve(self, ec_id, resource):
+    if self.Reserved(resource):
+      raise errors.ReservationError("Duplicate reservation for resource: %s." %
+                                    (resource))
+    if ec_id not in self._ec_reserved:
+      self._ec_reserved[ec_id] = set([resource])
+    else:
+      self._ec_reserved[ec_id].add(resource)
+
+  def DropECReservations(self, ec_id):
+    if ec_id in self._ec_reserved:
+      del self._ec_reserved[ec_id]
+
+  def GetReserved(self):
+    all_reserved = set()
+    for holder_reserved in self._ec_reserved.values():
+      all_reserved.update(holder_reserved)
+    return all_reserved
+
+  def Generate(self, existing, generate_one_fn, ec_id):
+    """Generate a new resource of this type
+
+    """
+    assert callable(generate_one_fn)
+
+    all_elems = self.GetReserved()
+    all_elems.update(existing)
+    retries = 64
+    while retries > 0:
+      new_resource = generate_one_fn()
+      if new_resource is not None and new_resource not in all_elems:
+        break
+    else:
+      raise errors.ConfigurationError("Not able generate new resource"
+                                      " (last tried: %s)" % new_resource)
+    self.Reserve(ec_id, new_resource)
+    return new_resource
+
+
 class ConfigWriter:
   """The interface to the cluster configuration.
 
@@ -78,9 +135,10 @@ class ConfigWriter:
       self._cfg_file = constants.CLUSTER_CONF_FILE
     else:
       self._cfg_file = cfg_file
-    self._temporary_ids = set()
+    self._temporary_ids = TemporaryReservationManager()
     self._temporary_drbds = {}
-    self._temporary_macs = set()
+    self._temporary_macs = TemporaryReservationManager()
+    self._temporary_secrets = TemporaryReservationManager()
     # Note: in order to prevent errors when resolving our name in
     # _DistributeConfig, we compute it here once and reuse it; it's
     # better to raise an error before starting to modify the config
@@ -97,57 +155,51 @@ class ConfigWriter:
     """
     return os.path.exists(constants.CLUSTER_CONF_FILE)
 
+  def _GenerateOneMAC(self):
+    """Generate one mac address
+
+    """
+    prefix = self._config_data.cluster.mac_prefix
+    byte1 = random.randrange(0, 256)
+    byte2 = random.randrange(0, 256)
+    byte3 = random.randrange(0, 256)
+    mac = "%s:%02x:%02x:%02x" % (prefix, byte1, byte2, byte3)
+    return mac
+
   @locking.ssynchronized(_config_lock, shared=1)
-  def GenerateMAC(self):
+  def GenerateMAC(self, ec_id):
     """Generate a MAC for an instance.
 
     This should check the current instances for duplicates.
 
     """
-    prefix = self._config_data.cluster.mac_prefix
-    all_macs = self._AllMACs()
-    retries = 64
-    while retries > 0:
-      byte1 = random.randrange(0, 256)
-      byte2 = random.randrange(0, 256)
-      byte3 = random.randrange(0, 256)
-      mac = "%s:%02x:%02x:%02x" % (prefix, byte1, byte2, byte3)
-      if mac not in all_macs and mac not in self._temporary_macs:
-        break
-      retries -= 1
-    else:
-      raise errors.ConfigurationError("Can't generate unique MAC")
-    self._temporary_macs.add(mac)
-    return mac
+    existing = self._AllMACs()
+    return self._temporary_ids.Generate(existing, self._GenerateOneMAC, ec_id)
 
   @locking.ssynchronized(_config_lock, shared=1)
-  def IsMacInUse(self, mac):
-    """Predicate: check if the specified MAC is in use in the Ganeti cluster.
+  def ReserveMAC(self, mac, ec_id):
+    """Reserve a MAC for an instance.
 
     This only checks instances managed by this cluster, it does not
     check for potential collisions elsewhere.
 
     """
     all_macs = self._AllMACs()
-    return mac in all_macs or mac in self._temporary_macs
+    if mac in all_macs:
+      raise errors.ReservationError("mac already in use")
+    else:
+      self._temporary_macs.Reserve(mac, ec_id)
 
   @locking.ssynchronized(_config_lock, shared=1)
-  def GenerateDRBDSecret(self):
+  def GenerateDRBDSecret(self, ec_id):
     """Generate a DRBD secret.
 
     This checks the current disks for duplicates.
 
     """
-    all_secrets = self._AllDRBDSecrets()
-    retries = 64
-    while retries > 0:
-      secret = utils.GenerateSecret()
-      if secret not in all_secrets:
-        break
-      retries -= 1
-    else:
-      raise errors.ConfigurationError("Can't generate unique DRBD secret")
-    return secret
+    return self._temporary_secrets.Generate(self._AllDRBDSecrets(),
+                                            utils.GenerateSecret,
+                                            ec_id)
 
   def _AllLVs(self):
     """Compute the list of all LVs.
@@ -171,57 +223,37 @@ class ConfigWriter:
     """
     existing = set()
     if include_temporary:
-      existing.update(self._temporary_ids)
+      existing.update(self._temporary_ids.GetReserved())
     existing.update(self._AllLVs())
     existing.update(self._config_data.instances.keys())
     existing.update(self._config_data.nodes.keys())
     existing.update([i.uuid for i in self._AllUUIDObjects() if i.uuid])
     return existing
 
-  def _GenerateUniqueID(self, exceptions=None):
+  def _GenerateUniqueID(self, ec_id):
     """Generate an unique UUID.
 
     This checks the current node, instances and disk names for
     duplicates.
 
-    @param exceptions: a list with some other names which should be
-        checked for uniqueness (used for example when you want to get
-        more than one id at one time without adding each one in turn
-        to the config file)
-
     @rtype: string
     @return: the unique id
 
     """
-    existing = self._AllIDs(include_temporary=True)
-    if exceptions is not None:
-      existing.update(exceptions)
-    retries = 64
-    while retries > 0:
-      unique_id = utils.NewUUID()
-      if unique_id not in existing and unique_id is not None:
-        break
-    else:
-      raise errors.ConfigurationError("Not able generate an unique ID"
-                                      " (last tried ID: %s" % unique_id)
-    self._temporary_ids.add(unique_id)
-    return unique_id
+    existing = self._AllIDs(include_temporary=False)
+    return self._temporary_ids.Generate(existing, utils.NewUUID, ec_id)
 
   @locking.ssynchronized(_config_lock, shared=1)
-  def GenerateUniqueID(self, exceptions=None):
+  def GenerateUniqueID(self, ec_id):
     """Generate an unique ID.
 
     This is just a wrapper over the unlocked version.
 
-    """
-    return self._GenerateUniqueID(exceptions=exceptions)
-
-  def _CleanupTemporaryIDs(self):
-    """Cleanups the _temporary_ids structure.
+    @type ec_id: string
+    @param ec_id: unique id for the job to reserve the id to
 
     """
-    existing = self._AllIDs(include_temporary=False)
-    self._temporary_ids = self._temporary_ids - existing
+    return self._GenerateUniqueID(ec_id)
 
   def _AllMACs(self):
     """Return all MACs present in the config.
@@ -365,7 +397,7 @@ class ConfigWriter:
     for pnum in keys:
       pdata = ports[pnum]
       if len(pdata) > 1:
-        txt = ", ".join(["%s/%s" % val for val in pdata])
+        txt = utils.CommaJoin(["%s/%s" % val for val in pdata])
         result.append("tcp/udp port %s has duplicates: %s" % (pnum, txt))
 
     # highest used tcp port check
@@ -392,11 +424,49 @@ class ConfigWriter:
                        node.offline))
 
     # drbd minors check
-    d_map, duplicates = self._UnlockedComputeDRBDMap()
+    _, duplicates = self._UnlockedComputeDRBDMap()
     for node, minor, instance_a, instance_b in duplicates:
       result.append("DRBD minor %d on node %s is assigned twice to instances"
                     " %s and %s" % (minor, node, instance_a, instance_b))
 
+    # IP checks
+    default_nicparams = data.cluster.nicparams[constants.PP_DEFAULT]
+    ips = {}
+
+    def _AddIpAddress(ip, name):
+      ips.setdefault(ip, []).append(name)
+
+    _AddIpAddress(data.cluster.master_ip, "cluster_ip")
+
+    for node in data.nodes.values():
+      _AddIpAddress(node.primary_ip, "node:%s/primary" % node.name)
+      if node.secondary_ip != node.primary_ip:
+        _AddIpAddress(node.secondary_ip, "node:%s/secondary" % node.name)
+
+    for instance in data.instances.values():
+      for idx, nic in enumerate(instance.nics):
+        if nic.ip is None:
+          continue
+
+        nicparams = objects.FillDict(default_nicparams, nic.nicparams)
+        nic_mode = nicparams[constants.NIC_MODE]
+        nic_link = nicparams[constants.NIC_LINK]
+
+        if nic_mode == constants.NIC_MODE_BRIDGED:
+          link = "bridge:%s" % nic_link
+        elif nic_mode == constants.NIC_MODE_ROUTED:
+          link = "route:%s" % nic_link
+        else:
+          raise errors.ProgrammerError("NIC mode '%s' not handled" % nic_mode)
+
+        _AddIpAddress("%s/%s" % (link, nic.ip),
+                      "instance:%s/nic:%d" % (instance.name, idx))
+
+    for ip, owners in ips.items():
+      if len(owners) > 1:
+        result.append("IP address %s is used by multiple owners: %s" %
+                      (ip, utils.CommaJoin(owners)))
+
     return result
 
   @locking.ssynchronized(_config_lock, shared=1)
@@ -715,7 +785,7 @@ class ConfigWriter:
     return self._config_data.cluster.rsahostkeypub
 
   @locking.ssynchronized(_config_lock)
-  def AddInstance(self, instance):
+  def AddInstance(self, instance, ec_id):
     """Add an instance to the config.
 
     This should be used after creating a new instance.
@@ -738,28 +808,27 @@ class ConfigWriter:
                                         " MAC address '%s' already in use." %
                                         (instance.name, nic.mac))
 
-    self._EnsureUUID(instance)
+    self._EnsureUUID(instance, ec_id)
 
     instance.serial_no = 1
     instance.ctime = instance.mtime = time.time()
     self._config_data.instances[instance.name] = instance
     self._config_data.cluster.serial_no += 1
     self._UnlockedReleaseDRBDMinors(instance.name)
-    for nic in instance.nics:
-      self._temporary_macs.discard(nic.mac)
     self._WriteConfig()
 
-  def _EnsureUUID(self, item):
+  def _EnsureUUID(self, item, ec_id):
     """Ensures a given object has a valid UUID.
 
     @param item: the instance or node to be checked
+    @param ec_id: the execution context id for the uuid reservation
 
     """
     if not item.uuid:
-      item.uuid = self._GenerateUniqueID()
-    elif item.uuid in self._AllIDs(temporary=True):
-      raise errors.ConfigurationError("Cannot add '%s': UUID already in use" %
-                                      (item.name, item.uuid))
+      item.uuid = self._GenerateUniqueID(ec_id)
+    elif item.uuid in self._AllIDs(include_temporary=True):
+      raise errors.ConfigurationError("Cannot add '%s': UUID %s already"
+                                      " in use" % (item.name, item.uuid))
 
   def _SetInstanceStatus(self, instance_name, status):
     """Set the instance's status to a given value.
@@ -898,16 +967,16 @@ class ConfigWriter:
     return my_dict
 
   @locking.ssynchronized(_config_lock)
-  def AddNode(self, node):
+  def AddNode(self, node, ec_id):
     """Add a node to the configuration.
 
     @type node: L{objects.Node}
     @param node: a Node instance
 
     """
-    logging.info("Adding node %s to configuration" % node.name)
+    logging.info("Adding node %s to configuration", node.name)
 
-    self._EnsureUUID(node)
+    self._EnsureUUID(node, ec_id)
 
     node.serial_no = 1
     node.ctime = node.mtime = time.time()
@@ -920,7 +989,7 @@ class ConfigWriter:
     """Remove a node from the configuration.
 
     """
-    logging.info("Removing node %s from configuration" % node_name)
+    logging.info("Removing node %s from configuration", node_name)
 
     if node_name not in self._config_data.nodes:
       raise errors.ConfigurationError("Unknown node '%s'" % node_name)
@@ -955,7 +1024,6 @@ class ConfigWriter:
 
     return self._config_data.nodes[node_name]
 
-
   @locking.ssynchronized(_config_lock, shared=1)
   def GetNodeInfo(self, node_name):
     """Get the configuration of a node, as stored in the config.
@@ -981,7 +1049,6 @@ class ConfigWriter:
     """
     return self._config_data.nodes.keys()
 
-
   @locking.ssynchronized(_config_lock, shared=1)
   def GetNodeList(self):
     """Return the list of nodes which are in the configuration.
@@ -1140,10 +1207,13 @@ class ConfigWriter:
     modified = False
     for item in self._AllUUIDObjects():
       if item.uuid is None:
-        item.uuid = self._GenerateUniqueID()
+        item.uuid = self._GenerateUniqueID(_UPGRADE_CONFIG_JID)
         modified = True
     if modified:
       self._WriteConfig()
+      # This is ok even if it acquires the internal lock, as _UpgradeConfig is
+      # only called at config init time, without the lock held
+      self.DropECReservations(_UPGRADE_CONFIG_JID)
 
   def _DistributeConfig(self, feedback_fn):
     """Distribute the configuration to the other nodes.
@@ -1195,11 +1265,6 @@ class ConfigWriter:
     """
     assert feedback_fn is None or callable(feedback_fn)
 
-    # First, cleanup the _temporary_ids set, if an ID is now in the
-    # other objects it should be discarded to prevent unbounded growth
-    # of that structure
-    self._CleanupTemporaryIDs()
-
     # Warn on config errors, but don't abort the save - the
     # configuration has already been modified, and we can't revert;
     # the best we can do is to warn the user and save as is, leaving
@@ -1207,7 +1272,7 @@ class ConfigWriter:
     config_errors = self._UnlockedVerifyConfig()
     if config_errors:
       errmsg = ("Configuration data is not consistent: %s" %
-                (", ".join(config_errors)))
+                (utils.CommaJoin(config_errors)))
       logging.critical(errmsg)
       if feedback_fn:
         feedback_fn(errmsg)
@@ -1366,7 +1431,14 @@ class ConfigWriter:
 
     if isinstance(target, objects.Instance):
       self._UnlockedReleaseDRBDMinors(target.name)
-      for nic in target.nics:
-        self._temporary_macs.discard(nic.mac)
 
     self._WriteConfig(feedback_fn=feedback_fn)
+
+  @locking.ssynchronized(_config_lock)
+  def DropECReservations(self, ec_id):
+    """Drop per-execution-context reservations
+
+    """
+    self._temporary_ids.DropECReservations(ec_id)
+    self._temporary_macs.DropECReservations(ec_id)
+    self._temporary_secrets.DropECReservations(ec_id)