self._cfg_file = cfg_file
self._temporary_ids = set()
self._temporary_drbds = {}
+ self._temporary_macs = set()
# Note: in order to prevent errors when resolving our name in
# _DistributeConfig, we compute it here once and reuse it; it's
# better to raise an error before starting to modify the config
byte2 = random.randrange(0, 256)
byte3 = random.randrange(0, 256)
mac = "%s:%02x:%02x:%02x" % (prefix, byte1, byte2, byte3)
- if mac not in all_macs:
+ if mac not in all_macs and mac not in self._temporary_macs:
break
retries -= 1
else:
raise errors.ConfigurationError("Can't generate unique MAC")
+ self._temporary_macs.add(mac)
return mac
@locking.ssynchronized(_config_lock, shared=1)
"""
all_macs = self._AllMACs()
- return mac in all_macs
+ return mac in all_macs or mac in self._temporary_macs
@locking.ssynchronized(_config_lock, shared=1)
def GenerateDRBDSecret(self):
raise errors.ConfigurationError("Can't generate unique DRBD secret")
return secret
- def _ComputeAllLVs(self):
+ def _AllLVs(self):
"""Compute the list of all LVs.
"""
lvnames.update(lv_list)
return lvnames
+ def _AllIDs(self, include_temporary):
+ """Compute the list of all UUIDs and names we have.
+
+ @type include_temporary: boolean
+ @param include_temporary: whether to include the _temporary_ids set
+ @rtype: set
+ @return: a set of IDs
+
+ """
+ existing = set()
+ if include_temporary:
+ existing.update(self._temporary_ids)
+ existing.update(self._AllLVs())
+ existing.update(self._config_data.instances.keys())
+ existing.update(self._config_data.nodes.keys())
+ return existing
+
@locking.ssynchronized(_config_lock, shared=1)
def GenerateUniqueID(self, exceptions=None):
"""Generate an unique disk name.
@return: the unique id
"""
- existing = set()
- existing.update(self._temporary_ids)
- existing.update(self._ComputeAllLVs())
- existing.update(self._config_data.instances.keys())
- existing.update(self._config_data.nodes.keys())
+ existing = self._AllIDs(include_temporary=True)
if exceptions is not None:
existing.update(exceptions)
retries = 64
self._temporary_ids.add(unique_id)
return unique_id
+ def _CleanupTemporaryIDs(self):
+ """Cleanups the _temporary_ids structure.
+
+ """
+ existing = self._AllIDs(include_temporary=False)
+ self._temporary_ids = self._temporary_ids - existing
+
def _AllMACs(self):
"""Return all MACs present in the config.
return result
- @locking.ssynchronized(_config_lock, shared=1)
- def VerifyConfig(self):
+ def _CheckDiskIDs(self, disk, l_ids, p_ids):
+ """Compute duplicate disk IDs
+
+ @type disk: L{objects.Disk}
+ @param disk: the disk at which to start searching
+ @type l_ids: list
+ @param l_ids: list of current logical ids
+ @type p_ids: list
+ @param p_ids: list of current physical ids
+ @rtype: list
+ @return: a list of error messages
+
+ """
+ result = []
+ if disk.logical_id is not None:
+ if disk.logical_id in l_ids:
+ result.append("duplicate logical id %s" % str(disk.logical_id))
+ else:
+ l_ids.append(disk.logical_id)
+ if disk.physical_id is not None:
+ if disk.physical_id in p_ids:
+ result.append("duplicate physical id %s" % str(disk.physical_id))
+ else:
+ p_ids.append(disk.physical_id)
+
+ if disk.children:
+ for child in disk.children:
+ result.extend(self._CheckDiskIDs(child, l_ids, p_ids))
+ return result
+
+ def _UnlockedVerifyConfig(self):
"""Verify function.
+ @rtype: list
+ @return: a list of error messages; a non-empty list signifies
+ configuration errors
+
"""
result = []
seen_macs = []
ports = {}
data = self._config_data
+ seen_lids = []
+ seen_pids = []
+
+ # global cluster checks
+ if not data.cluster.enabled_hypervisors:
+ result.append("enabled hypervisors list doesn't have any entries")
+ invalid_hvs = set(data.cluster.enabled_hypervisors) - constants.HYPER_TYPES
+ if invalid_hvs:
+ result.append("enabled hypervisors contains invalid entries: %s" %
+ invalid_hvs)
+
+ if data.cluster.master_node not in data.nodes:
+ result.append("cluster has invalid primary node '%s'" %
+ data.cluster.master_node)
+
+ # per-instance checks
for instance_name in data.instances:
instance = data.instances[instance_name]
if instance.primary_node not in data.nodes:
ports[net_port] = []
ports[net_port].append((instance.name, "network port"))
+ # instance disk verify
+ for idx, disk in enumerate(instance.disks):
+ result.extend(["instance '%s' disk %d error: %s" %
+ (instance.name, idx, msg) for msg in disk.Verify()])
+ result.extend(self._CheckDiskIDs(disk, seen_lids, seen_pids))
+
# cluster-wide pool of free ports
for free_port in data.cluster.tcpudp_port_pool:
if free_port not in ports:
if not data.nodes[data.cluster.master_node].master_candidate:
result.append("Master node is not a master candidate")
+ # master candidate checks
mc_now, mc_max = self._UnlockedGetMasterCandidateStats()
if mc_now < mc_max:
result.append("Not enough master candidates: actual %d, target %d" %
(mc_now, mc_max))
+ # node checks
+ for node in data.nodes.values():
+ if [node.master_candidate, node.drained, node.offline].count(True) > 1:
+ result.append("Node %s state is invalid: master_candidate=%s,"
+ " drain=%s, offline=%s" %
+ (node.name, node.master_candidate, node.drain,
+ node.offline))
+
+ # drbd minors check
+ d_map, duplicates = self._UnlockedComputeDRBDMap()
+ for node, minor, instance_a, instance_b in duplicates:
+ result.append("DRBD minor %d on node %s is assigned twice to instances"
+ " %s and %s" % (minor, node, instance_a, instance_b))
+
return result
+ @locking.ssynchronized(_config_lock, shared=1)
+ def VerifyConfig(self):
+ """Verify function.
+
+ This is just a wrapper over L{_UnlockedVerifyConfig}.
+
+ @rtype: list
+ @return: a list of error messages; a non-empty list signifies
+ configuration errors
+
+ """
+ return self._UnlockedVerifyConfig()
+
def _UnlockedSetDiskID(self, disk, node_name):
"""Convert the unique ID to the ID needed on the target nodes.
def _UnlockedComputeDRBDMap(self):
"""Compute the used DRBD minor/nodes.
+ @rtype: (dict, list)
@return: dictionary of node_name: dict of minor: instance_name;
the returned dict will have all the nodes in it (even if with
- an empty list).
+ an empty list), and a list of duplicates; if the duplicates
+ list is not empty, the configuration is corrupted and its caller
+ should raise an exception
"""
def _AppendUsedPorts(instance_name, disk, used):
+ duplicates = []
if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) >= 5:
- nodeA, nodeB, dummy, minorA, minorB = disk.logical_id[:5]
- for node, port in ((nodeA, minorA), (nodeB, minorB)):
- assert node in used, "Instance node not found in node list"
+ node_a, node_b, _, minor_a, minor_b = disk.logical_id[:5]
+ for node, port in ((node_a, minor_a), (node_b, minor_b)):
+ assert node in used, ("Node '%s' of instance '%s' not found"
+ " in node list" % (node, instance_name))
if port in used[node]:
- raise errors.ProgrammerError("DRBD minor already used:"
- " %s/%s, %s/%s" %
- (node, port, instance_name,
- used[node][port]))
-
- used[node][port] = instance_name
+ duplicates.append((node, port, instance_name, used[node][port]))
+ else:
+ used[node][port] = instance_name
if disk.children:
for child in disk.children:
- _AppendUsedPorts(instance_name, child, used)
+ duplicates.extend(_AppendUsedPorts(instance_name, child, used))
+ return duplicates
+ duplicates = []
my_dict = dict((node, {}) for node in self._config_data.nodes)
- for (node, minor), instance in self._temporary_drbds.iteritems():
- my_dict[node][minor] = instance
for instance in self._config_data.instances.itervalues():
for disk in instance.disks:
- _AppendUsedPorts(instance.name, disk, my_dict)
- return my_dict
+ duplicates.extend(_AppendUsedPorts(instance.name, disk, my_dict))
+ for (node, minor), instance in self._temporary_drbds.iteritems():
+ if minor in my_dict[node] and my_dict[node][minor] != instance:
+ duplicates.append((node, minor, instance, my_dict[node][minor]))
+ else:
+ my_dict[node][minor] = instance
+ return my_dict, duplicates
@locking.ssynchronized(_config_lock)
def ComputeDRBDMap(self):
an empty list).
"""
- return self._UnlockedComputeDRBDMap()
+ d_map, duplicates = self._UnlockedComputeDRBDMap()
+ if duplicates:
+ raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
+ str(duplicates))
+ return d_map
@locking.ssynchronized(_config_lock)
def AllocateDRBDMinor(self, nodes, instance):
"""
assert isinstance(instance, basestring), \
- "Invalid argument passed to AllocateDRBDMinor"
+ "Invalid argument '%s' passed to AllocateDRBDMinor" % instance
- d_map = self._UnlockedComputeDRBDMap()
+ d_map, duplicates = self._UnlockedComputeDRBDMap()
+ if duplicates:
+ raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
+ str(duplicates))
result = []
for nname in nodes:
ndata = d_map[nname]
minor = keys[-1] + 1
else:
minor = ffree
- result.append(minor)
+ # double-check minor against current instances
+ assert minor not in d_map[nname], \
+ ("Attempt to reuse allocated DRBD minor %d on node %s,"
+ " already allocated to instance %s" %
+ (minor, nname, d_map[nname][minor]))
ndata[minor] = instance
- assert (nname, minor) not in self._temporary_drbds, \
- "Attempt to reuse reserved DRBD minor"
- self._temporary_drbds[(nname, minor)] = instance
+ # double-check minor against reservation
+ r_key = (nname, minor)
+ assert r_key not in self._temporary_drbds, \
+ ("Attempt to reuse reserved DRBD minor %d on node %s,"
+ " reserved for instance %s" %
+ (minor, nname, self._temporary_drbds[r_key]))
+ self._temporary_drbds[r_key] = instance
+ result.append(minor)
logging.debug("Request to allocate drbd minors, input: %s, returning %s",
nodes, result)
return result
all_lvs = instance.MapLVsByNode()
logging.info("Instance '%s' DISK_LAYOUT: %s", instance.name, all_lvs)
+ all_macs = self._AllMACs()
+ for nic in instance.nics:
+ if nic.mac in all_macs:
+ raise errors.ConfigurationError("Cannot add instance %s:"
+ " MAC address '%s' already in use." % (instance.name, nic.mac))
+
instance.serial_no = 1
self._config_data.instances[instance.name] = instance
+ self._config_data.cluster.serial_no += 1
self._UnlockedReleaseDRBDMinors(instance.name)
+ for nic in instance.nics:
+ self._temporary_macs.discard(nic.mac)
self._WriteConfig()
def _SetInstanceStatus(self, instance_name, status):
if instance_name not in self._config_data.instances:
raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
del self._config_data.instances[instance_name]
+ self._config_data.cluster.serial_no += 1
self._WriteConfig()
@locking.ssynchronized(_config_lock)
self._config_data.instances.keys())
def _UnlockedGetInstanceInfo(self, instance_name):
- """Returns informations about an instance.
+ """Returns information about an instance.
This function is for internal use, when the config lock is already held.
@locking.ssynchronized(_config_lock, shared=1)
def GetInstanceInfo(self, instance_name):
- """Returns informations about an instance.
+ """Returns information about an instance.
- It takes the information from the configuration file. Other informations of
+ It takes the information from the configuration file. Other information of
an instance are taken from the live systems.
@param instance_name: name of the instance, e.g.
"""Get the configuration of all instances.
@rtype: dict
- @returns: dict of (instance, instance_info), where instance_info is what
+ @return: dict of (instance, instance_info), where instance_info is what
would GetInstanceInfo return for the node
"""
for node in self._UnlockedGetNodeList()])
return my_dict
- def _UnlockedGetMasterCandidateStats(self):
+ def _UnlockedGetMasterCandidateStats(self, exceptions=None):
"""Get the number of current and maximum desired and possible candidates.
+ @type exceptions: list
+ @param exceptions: if passed, list of nodes that should be ignored
@rtype: tuple
@return: tuple of (current, desired and possible)
"""
mc_now = mc_max = 0
- for node in self._config_data.nodes.itervalues():
- if not node.offline:
+ for node in self._config_data.nodes.values():
+ if exceptions and node.name in exceptions:
+ continue
+ if not (node.offline or node.drained):
mc_max += 1
if node.master_candidate:
mc_now += 1
return (mc_now, mc_max)
@locking.ssynchronized(_config_lock, shared=1)
- def GetMasterCandidateStats(self):
+ def GetMasterCandidateStats(self, exceptions=None):
"""Get the number of current and maximum possible candidates.
This is just a wrapper over L{_UnlockedGetMasterCandidateStats}.
+ @type exceptions: list
+ @param exceptions: if passed, list of nodes that should be ignored
@rtype: tuple
@return: tuple of (current, max)
"""
- return self._UnlockedGetMasterCandidateStats()
+ return self._UnlockedGetMasterCandidateStats(exceptions)
@locking.ssynchronized(_config_lock)
def MaintainCandidatePool(self):
if mc_now >= mc_max:
break
node = self._config_data.nodes[name]
- if node.master_candidate or node.offline:
+ if node.master_candidate or node.offline or node.drained:
continue
mod_list.append(node)
node.master_candidate = True
not hasattr(data.cluster, 'rsahostkeypub')):
raise errors.ConfigurationError("Incomplete configuration"
" (missing cluster.rsahostkeypub)")
+
+ # Upgrade configuration if needed
+ data.UpgradeConfig()
+
self._config_data = data
# reset the last serial as -1 so that the next write will cause
# ssconf update
"""Write the configuration data to persistent storage.
"""
+ # first, cleanup the _temporary_ids set, if an ID is now in the
+ # other objects it should be discarded to prevent unbounded growth
+ # of that structure
+ self._CleanupTemporaryIDs()
+ config_errors = self._UnlockedVerifyConfig()
+ if config_errors:
+ raise errors.ConfigurationError("Configuration data is not"
+ " consistent: %s" %
+ (", ".join(config_errors)))
if destination is None:
destination = self._cfg_file
self._BumpSerialNo()
"""
fn = "\n".join
+ instance_names = utils.NiceSort(self._UnlockedGetInstanceList())
node_names = utils.NiceSort(self._UnlockedGetNodeList())
node_info = [self._UnlockedGetNodeInfo(name) for name in node_names]
+ instance_data = fn(instance_names)
off_data = fn(node.name for node in node_info if node.offline)
+ on_data = fn(node.name for node in node_info if not node.offline)
mc_data = fn(node.name for node in node_info if node.master_candidate)
node_data = fn(node_names)
cluster = self._config_data.cluster
+ cluster_tags = fn(cluster.GetTags())
return {
constants.SS_CLUSTER_NAME: cluster.cluster_name,
+ constants.SS_CLUSTER_TAGS: cluster_tags,
constants.SS_FILE_STORAGE_DIR: cluster.file_storage_dir,
constants.SS_MASTER_CANDIDATES: mc_data,
constants.SS_MASTER_IP: cluster.master_ip,
constants.SS_MASTER_NODE: cluster.master_node,
constants.SS_NODE_LIST: node_data,
constants.SS_OFFLINE_NODES: off_data,
+ constants.SS_ONLINE_NODES: on_data,
+ constants.SS_INSTANCE_LIST: instance_data,
constants.SS_RELEASE_VERSION: constants.RELEASE_VERSION,
}
- @locking.ssynchronized(_config_lock)
- def InitConfig(self, version, cluster_config, master_node_config):
- """Create the initial cluster configuration.
-
- It will contain the current node, which will also be the master
- node, and no instances.
-
- @type version: int
- @param version: Configuration version
- @type cluster_config: objects.Cluster
- @param cluster_config: Cluster configuration
- @type master_node_config: objects.Node
- @param master_node_config: Master node configuration
-
- """
- nodes = {
- master_node_config.name: master_node_config,
- }
-
- self._config_data = objects.ConfigData(version=version,
- cluster=cluster_config,
- nodes=nodes,
- instances={},
- serial_no=1)
- self._WriteConfig()
-
@locking.ssynchronized(_config_lock, shared=1)
def GetVGName(self):
"""Return the volume group name.
@locking.ssynchronized(_config_lock, shared=1)
def GetClusterInfo(self):
- """Returns informations about the cluster
+ """Returns information about the cluster
@rtype: L{objects.Cluster}
@return: the cluster object
if isinstance(target, objects.Instance):
self._UnlockedReleaseDRBDMinors(target.name)
+ for nic in target.nics:
+ self._temporary_macs.discard(nic.mac)
self._WriteConfig()