import tempfile
import shutil
import itertools
+import operator
from ganeti import ssh
from ganeti import utils
import ganeti.masterd.instance # pylint: disable-msg=W0611
-def _SupportsOob(cfg, node):
- """Tells if node supports OOB.
-
- @type cfg: L{config.ConfigWriter}
- @param cfg: The cluster configuration
- @type node: L{objects.Node}
- @param node: The node
- @return: The OOB script if supported or an empty string otherwise
-
- """
- return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM]
-
-
class ResultWithJobs:
"""Data container for LU results with jobs.
sort_by_name=self.sort_by_name)
+def _ShareAll():
+ """Returns a dict declaring all lock levels shared.
+
+ """
+ return dict.fromkeys(locking.LEVELS, 1)
+
+
+def _SupportsOob(cfg, node):
+ """Tells if node supports OOB.
+
+ @type cfg: L{config.ConfigWriter}
+ @param cfg: The cluster configuration
+ @type node: L{objects.Node}
+ @param node: The node
+ @return: The OOB script if supported or an empty string otherwise
+
+ """
+ return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM]
+
+
def _GetWantedNodes(lu, nodes):
"""Returns list of checked and expanded node names.
assert not lu.glm.is_owned(level), "No locks should be owned"
+def _MapInstanceDisksToNodes(instances):
+ """Creates a map from (node, volume) to instance name.
+
+ @type instances: list of L{objects.Instance}
+ @rtype: dict; tuple of (node name, volume name) as key, instance name as value
+
+ """
+ return dict(((node, vol), inst.name)
+ for inst in instances
+ for (node, vols) in inst.MapLVsByNode().items()
+ for vol in vols)
+
+
def _RunPostHook(lu, node_name):
"""Runs the post-hook for an opcode on a single node.
bep = cluster.FillBE(instance)
hvp = cluster.FillHV(instance)
args = {
- 'name': instance.name,
- 'primary_node': instance.primary_node,
- 'secondary_nodes': instance.secondary_nodes,
- 'os_type': instance.os,
- 'status': instance.admin_up,
- 'memory': bep[constants.BE_MEMORY],
- 'vcpus': bep[constants.BE_VCPUS],
- 'nics': _NICListToTuple(lu, instance.nics),
- 'disk_template': instance.disk_template,
- 'disks': [(disk.size, disk.mode) for disk in instance.disks],
- 'bep': bep,
- 'hvp': hvp,
- 'hypervisor_name': instance.hypervisor,
- 'tags': instance.tags,
+ "name": instance.name,
+ "primary_node": instance.primary_node,
+ "secondary_nodes": instance.secondary_nodes,
+ "os_type": instance.os,
+ "status": instance.admin_up,
+ "memory": bep[constants.BE_MEMORY],
+ "vcpus": bep[constants.BE_VCPUS],
+ "nics": _NICListToTuple(lu, instance.nics),
+ "disk_template": instance.disk_template,
+ "disks": [(disk.size, disk.mode) for disk in instance.disks],
+ "bep": bep,
+ "hvp": hvp,
+ "hypervisor_name": instance.hypervisor,
+ "tags": instance.tags,
}
if override:
args.update(override)
@param name: OS name passed by the user, to check for validity
"""
+ variant = objects.OS.GetVariant(name)
if not os_obj.supported_variants:
+ if variant:
+ raise errors.OpPrereqError("OS '%s' doesn't support variants ('%s'"
+ " passed)" % (os_obj.name, variant),
+ errors.ECODE_INVAL)
return
- variant = objects.OS.GetVariant(name)
if not variant:
raise errors.OpPrereqError("OS name must include a variant",
errors.ECODE_INVAL)
locking.LEVEL_NODE: [],
}
- self.share_locks = dict.fromkeys(locking.LEVELS, 1)
+ self.share_locks = _ShareAll()
def DeclareLocks(self, level):
if level == locking.LEVEL_NODE:
@param all_nvinfo: RPC results
"""
- node_names = frozenset(node.name for node in nodeinfo)
+ node_names = frozenset(node.name for node in nodeinfo if not node.offline)
assert master_node in node_names
assert (len(files_all | files_all_opt | files_mc | files_vm) ==
fileinfo = dict((filename, {}) for filename in file2nodefn.keys())
for node in nodeinfo:
+ if node.offline:
+ continue
+
nresult = all_nvinfo[node.name]
if nresult.fail_msg or not nresult.payload:
# All or no nodes
errorif(missing_file and missing_file != node_names,
cls.ECLUSTERFILECHECK, None,
- "File %s is optional, but it must exist on all or no nodes (not"
- " found on %s)",
+ "File %s is optional, but it must exist on all or no"
+ " nodes (not found on %s)",
filename, utils.CommaJoin(utils.NiceSort(missing_file)))
else:
errorif(missing_file, cls.ECLUSTERFILECHECK, None,
_ErrorIf(len(os_data) > 1, self.ENODEOS, node,
"OS '%s' has multiple entries (first one shadows the rest): %s",
os_name, utils.CommaJoin([v[0] for v in os_data]))
- # this will catched in backend too
- _ErrorIf(compat.any(v >= constants.OS_API_V15 for v in f_api)
- and not f_var, self.ENODEOS, node,
- "OS %s with API at least %d does not declare any variant",
- os_name, constants.OS_API_V15)
# comparisons with the 'base' image
test = os_name not in base.oslist
_ErrorIf(test, self.ENODEOS, node,
"""Build hooks nodes.
"""
- assert self.my_node_names, ("Node list not gathered,"
- " has CheckPrereq been executed?")
return ([], self.my_node_names)
def Exec(self, feedback_fn):
"""
# This method has too many local variables. pylint: disable-msg=R0914
+
+ if not self.my_node_names:
+ # empty node group
+ feedback_fn("* Empty node group, skipping verification")
+ return True
+
self.bad = False
_ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
verbose = self.op.verbose
and hook results
"""
- # We only really run POST phase hooks, and are only interested in
- # their results
- if phase == constants.HOOKS_PHASE_POST:
+ # We only really run POST phase hooks, only for non-empty groups,
+ # and are only interested in their results
+ if not self.my_node_names:
+ # empty node group
+ pass
+ elif phase == constants.HOOKS_PHASE_POST:
# Used to change hooks' output to proper indentation
feedback_fn("* Hooks Results")
assert hooks_results, "invalid result from hooks"
self._ErrorIf(test, self.ENODEHOOKS, node_name,
"Script %s failed, output:", script)
if test:
- output = self._HOOKS_INDENT_RE.sub(' ', output)
+ output = self._HOOKS_INDENT_RE.sub(" ", output)
feedback_fn("%s" % output)
lu_result = 0
- return lu_result
+ return lu_result
class LUClusterVerifyDisks(NoHooksLU):
REQ_BGL = False
def ExpandNames(self):
+ self.share_locks = _ShareAll()
self.needed_locks = {
- locking.LEVEL_NODE: locking.ALL_SET,
- locking.LEVEL_INSTANCE: locking.ALL_SET,
- }
- self.share_locks = dict.fromkeys(locking.LEVELS, 1)
+ locking.LEVEL_NODEGROUP: locking.ALL_SET,
+ }
+
+ def Exec(self, feedback_fn):
+ group_names = self.glm.list_owned(locking.LEVEL_NODEGROUP)
+
+ # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
+ return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
+ for group in group_names])
+
+
+class LUGroupVerifyDisks(NoHooksLU):
+ """Verifies the status of all disks in a node group.
+
+ """
+ REQ_BGL = False
+
+ def ExpandNames(self):
+ # Raises errors.OpPrereqError on its own if group can't be found
+ self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
+
+ self.share_locks = _ShareAll()
+ self.needed_locks = {
+ locking.LEVEL_INSTANCE: [],
+ locking.LEVEL_NODEGROUP: [],
+ locking.LEVEL_NODE: [],
+ }
+
+ def DeclareLocks(self, level):
+ if level == locking.LEVEL_INSTANCE:
+ assert not self.needed_locks[locking.LEVEL_INSTANCE]
+
+ # Lock instances optimistically, needs verification once node and group
+ # locks have been acquired
+ self.needed_locks[locking.LEVEL_INSTANCE] = \
+ self.cfg.GetNodeGroupInstances(self.group_uuid)
+
+ elif level == locking.LEVEL_NODEGROUP:
+ assert not self.needed_locks[locking.LEVEL_NODEGROUP]
+
+ self.needed_locks[locking.LEVEL_NODEGROUP] = \
+ set([self.group_uuid] +
+ # Lock all groups used by instances optimistically; this requires
+ # going via the node before it's locked, requiring verification
+ # later on
+ [group_uuid
+ for instance_name in
+ self.glm.list_owned(locking.LEVEL_INSTANCE)
+ for group_uuid in
+ self.cfg.GetInstanceNodeGroups(instance_name)])
+
+ elif level == locking.LEVEL_NODE:
+ # This will only lock the nodes in the group to be verified which contain
+ # actual instances
+ self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
+ self._LockInstancesNodes()
+
+ # Lock all nodes in group to be verified
+ assert self.group_uuid in self.glm.list_owned(locking.LEVEL_NODEGROUP)
+ member_nodes = self.cfg.GetNodeGroup(self.group_uuid).members
+ self.needed_locks[locking.LEVEL_NODE].extend(member_nodes)
+
+ def CheckPrereq(self):
+ owned_instances = frozenset(self.glm.list_owned(locking.LEVEL_INSTANCE))
+ owned_groups = frozenset(self.glm.list_owned(locking.LEVEL_NODEGROUP))
+ owned_nodes = frozenset(self.glm.list_owned(locking.LEVEL_NODE))
+
+ assert self.group_uuid in owned_groups
+
+ # Check if locked instances are still correct
+ wanted_instances = self.cfg.GetNodeGroupInstances(self.group_uuid)
+ if owned_instances != wanted_instances:
+ raise errors.OpPrereqError("Instances in node group %s changed since"
+ " locks were acquired, wanted %s, have %s;"
+ " retry the operation" %
+ (self.op.group_name,
+ utils.CommaJoin(wanted_instances),
+ utils.CommaJoin(owned_instances)),
+ errors.ECODE_STATE)
+
+ # Get instance information
+ self.instances = dict((name, self.cfg.GetInstanceInfo(name))
+ for name in owned_instances)
+
+ # Check if node groups for locked instances are still correct
+ for (instance_name, inst) in self.instances.items():
+ assert self.group_uuid in self.cfg.GetInstanceNodeGroups(instance_name), \
+ "Instance %s has no node in group %s" % (instance_name, self.group_uuid)
+ assert owned_nodes.issuperset(inst.all_nodes), \
+ "Instance %s's nodes changed while we kept the lock" % instance_name
+
+ inst_groups = self.cfg.GetInstanceNodeGroups(instance_name)
+ if not owned_groups.issuperset(inst_groups):
+ raise errors.OpPrereqError("Instance %s's node groups changed since"
+ " locks were acquired, current groups are"
+ " are '%s', owning groups '%s'; retry the"
+ " operation" %
+ (instance_name,
+ utils.CommaJoin(inst_groups),
+ utils.CommaJoin(owned_groups)),
+ errors.ECODE_STATE)
def Exec(self, feedback_fn):
"""Verify integrity of cluster disks.
missing volumes
"""
- result = res_nodes, res_instances, res_missing = {}, [], {}
+ res_nodes = {}
+ res_instances = set()
+ res_missing = {}
- nodes = utils.NiceSort(self.cfg.GetVmCapableNodeList())
- instances = self.cfg.GetAllInstancesInfo().values()
+ nv_dict = _MapInstanceDisksToNodes([inst
+ for inst in self.instances.values()
+ if inst.admin_up])
- nv_dict = {}
- for inst in instances:
- inst_lvs = {}
- if not inst.admin_up:
- continue
- inst.MapLVsByNode(inst_lvs)
- # transform { iname: {node: [vol,],},} to {(node, vol): iname}
- for node, vol_list in inst_lvs.iteritems():
- for vol in vol_list:
- nv_dict[(node, vol)] = inst
-
- if not nv_dict:
- return result
-
- node_lvs = self.rpc.call_lv_list(nodes, [])
- for node, node_res in node_lvs.items():
- if node_res.offline:
- continue
- msg = node_res.fail_msg
- if msg:
- logging.warning("Error enumerating LVs on node %s: %s", node, msg)
- res_nodes[node] = msg
- continue
+ if nv_dict:
+ nodes = utils.NiceSort(set(self.glm.list_owned(locking.LEVEL_NODE)) &
+ set(self.cfg.GetVmCapableNodeList()))
- lvs = node_res.payload
- for lv_name, (_, _, lv_online) in lvs.items():
- inst = nv_dict.pop((node, lv_name), None)
- if (not lv_online and inst is not None
- and inst.name not in res_instances):
- res_instances.append(inst.name)
+ node_lvs = self.rpc.call_lv_list(nodes, [])
- # any leftover items in nv_dict are missing LVs, let's arrange the
- # data better
- for key, inst in nv_dict.iteritems():
- if inst.name not in res_missing:
- res_missing[inst.name] = []
- res_missing[inst.name].append(key)
+ for (node, node_res) in node_lvs.items():
+ if node_res.offline:
+ continue
- return result
+ msg = node_res.fail_msg
+ if msg:
+ logging.warning("Error enumerating LVs on node %s: %s", node, msg)
+ res_nodes[node] = msg
+ continue
+
+ for lv_name, (_, _, lv_online) in node_res.payload.items():
+ inst = nv_dict.pop((node, lv_name), None)
+ if not (lv_online or inst is None):
+ res_instances.add(inst)
+
+ # any leftover items in nv_dict are missing LVs, let's arrange the data
+ # better
+ for key, inst in nv_dict.iteritems():
+ res_missing.setdefault(inst, []).append(key)
+
+ return (res_nodes, list(res_instances), res_missing)
class LUClusterRepairDiskSizes(NoHooksLU):
locking.LEVEL_NODE: locking.ALL_SET,
locking.LEVEL_INSTANCE: locking.ALL_SET,
}
- self.share_locks = dict.fromkeys(locking.LEVELS, 1)
+ self.share_locks = _ShareAll()
def DeclareLocks(self, level):
if level == locking.LEVEL_NODE and self.wanted_names is not None:
nodenames = self.glm.list_owned(locking.LEVEL_NODE)
volumes = self.rpc.call_node_volumes(nodenames)
- ilist = [self.cfg.GetInstanceInfo(iname) for iname
- in self.cfg.GetInstanceList()]
-
- lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
+ ilist = self.cfg.GetAllInstancesInfo()
+ vol2inst = _MapInstanceDisksToNodes(ilist.values())
output = []
for node in nodenames:
self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
continue
- node_vols = nresult.payload[:]
- node_vols.sort(key=lambda vol: vol['dev'])
+ node_vols = sorted(nresult.payload,
+ key=operator.itemgetter("dev"))
for vol in node_vols:
node_output = []
if field == "node":
val = node
elif field == "phys":
- val = vol['dev']
+ val = vol["dev"]
elif field == "vg":
- val = vol['vg']
+ val = vol["vg"]
elif field == "name":
- val = vol['name']
+ val = vol["name"]
elif field == "size":
- val = int(float(vol['size']))
+ val = int(float(vol["size"]))
elif field == "instance":
- for inst in ilist:
- if node not in lv_by_node[inst]:
- continue
- if vol['name'] in lv_by_node[inst][node]:
- val = inst.name
- break
- else:
- val = '-'
+ val = vol2inst.get((node, vol["vg"] + "/" + vol["name"]), "-")
else:
raise errors.ParameterError(field)
node_output.append(str(val))
def ExpandNames(self, lu):
lu.needed_locks = {}
- lu.share_locks[locking.LEVEL_INSTANCE] = 1
- lu.share_locks[locking.LEVEL_NODE] = 1
+ lu.share_locks = _ShareAll()
if self.names:
self.wanted = _GetWantedInstances(lu, self.names)
query.IQ_LIVE in self.requested_data)
if self.do_locking:
lu.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
+ lu.needed_locks[locking.LEVEL_NODEGROUP] = []
lu.needed_locks[locking.LEVEL_NODE] = []
lu.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+ self.do_grouplocks = (self.do_locking and
+ query.IQ_NODES in self.requested_data)
+
def DeclareLocks(self, lu, level):
- if level == locking.LEVEL_NODE and self.do_locking:
- lu._LockInstancesNodes() # pylint: disable-msg=W0212
+ if self.do_locking:
+ if level == locking.LEVEL_NODEGROUP and self.do_grouplocks:
+ assert not lu.needed_locks[locking.LEVEL_NODEGROUP]
+
+ # Lock all groups used by instances optimistically; this requires going
+ # via the node before it's locked, requiring verification later on
+ lu.needed_locks[locking.LEVEL_NODEGROUP] = \
+ set(group_uuid
+ for instance_name in
+ lu.glm.list_owned(locking.LEVEL_INSTANCE)
+ for group_uuid in
+ lu.cfg.GetInstanceNodeGroups(instance_name))
+ elif level == locking.LEVEL_NODE:
+ lu._LockInstancesNodes() # pylint: disable-msg=W0212
+
+ @staticmethod
+ def _CheckGroupLocks(lu):
+ owned_instances = frozenset(lu.glm.list_owned(locking.LEVEL_INSTANCE))
+ owned_groups = frozenset(lu.glm.list_owned(locking.LEVEL_NODEGROUP))
+
+ # Check if node groups for locked instances are still correct
+ for instance_name in owned_instances:
+ inst_groups = lu.cfg.GetInstanceNodeGroups(instance_name)
+ if not owned_groups.issuperset(inst_groups):
+ raise errors.OpPrereqError("Instance %s's node groups changed since"
+ " locks were acquired, current groups are"
+ " are '%s', owning groups '%s'; retry the"
+ " operation" %
+ (instance_name,
+ utils.CommaJoin(inst_groups),
+ utils.CommaJoin(owned_groups)),
+ errors.ECODE_STATE)
def _GetQueryData(self, lu):
"""Computes the list of instances and their attributes.
"""
+ if self.do_grouplocks:
+ self._CheckGroupLocks(lu)
+
cluster = lu.cfg.GetClusterInfo()
all_info = lu.cfg.GetAllInstancesInfo()
else:
consinfo = None
+ if query.IQ_NODES in self.requested_data:
+ node_names = set(itertools.chain(*map(operator.attrgetter("all_nodes"),
+ instance_list)))
+ nodes = dict((name, lu.cfg.GetNodeInfo(name)) for name in node_names)
+ groups = dict((uuid, lu.cfg.GetNodeGroup(uuid))
+ for uuid in set(map(operator.attrgetter("group"),
+ nodes.values())))
+ else:
+ nodes = None
+ groups = None
+
return query.InstanceQueryData(instance_list, lu.cfg.GetClusterInfo(),
disk_usage, offline_nodes, bad_nodes,
- live_data, wrongnode_inst, consinfo)
+ live_data, wrongnode_inst, consinfo,
+ nodes, groups)
class LUQuery(NoHooksLU):
nodeinfo = lu.rpc.call_node_info([node], None, hypervisor_name)
nodeinfo[node].Raise("Can't get data from node %s" % node,
prereq=True, ecode=errors.ECODE_ENVIRON)
- free_mem = nodeinfo[node].payload.get('memory_free', None)
+ free_mem = nodeinfo[node].payload.get("memory_free", None)
if not isinstance(free_mem, int):
raise errors.OpPrereqError("Can't compute free memory on node %s, result"
" was '%s'" % (node, free_mem),
_StartInstanceDisks(self, instance, force)
result = self.rpc.call_instance_start(node_current, instance,
- self.op.hvparams, self.op.beparams)
+ self.op.hvparams, self.op.beparams,
+ self.op.startup_paused)
msg = result.fail_msg
if msg:
_ShutdownInstanceDisks(self, instance)
self.LogInfo("Instance %s was already stopped, starting now",
instance.name)
_StartInstanceDisks(self, instance, ignore_secondaries)
- result = self.rpc.call_instance_start(node_current, instance, None, None)
+ result = self.rpc.call_instance_start(node_current, instance,
+ None, None, False)
msg = result.fail_msg
if msg:
_ShutdownInstanceDisks(self, instance)
"""Recreate the disks.
"""
- # change primary node, if needed
- if self.op.nodes:
- self.instance.primary_node = self.op.nodes[0]
- self.LogWarning("Changing the instance's nodes, you will have to"
- " remove any disks left on the older nodes manually")
+ instance = self.instance
to_skip = []
- for idx, disk in enumerate(self.instance.disks):
+ mods = [] # keeps track of needed logical_id changes
+
+ for idx, disk in enumerate(instance.disks):
if idx not in self.op.disks: # disk idx has not been passed in
to_skip.append(idx)
continue
# update secondaries for disks, if needed
if self.op.nodes:
if disk.dev_type == constants.LD_DRBD8:
- # need to update the nodes
+ # need to update the nodes and minors
assert len(self.op.nodes) == 2
- logical_id = list(disk.logical_id)
- logical_id[0] = self.op.nodes[0]
- logical_id[1] = self.op.nodes[1]
- disk.logical_id = tuple(logical_id)
+ assert len(disk.logical_id) == 6 # otherwise disk internals
+ # have changed
+ (_, _, old_port, _, _, old_secret) = disk.logical_id
+ new_minors = self.cfg.AllocateDRBDMinor(self.op.nodes, instance.name)
+ new_id = (self.op.nodes[0], self.op.nodes[1], old_port,
+ new_minors[0], new_minors[1], old_secret)
+ assert len(disk.logical_id) == len(new_id)
+ mods.append((idx, new_id))
+
+ # now that we have passed all asserts above, we can apply the mods
+ # in a single run (to avoid partial changes)
+ for idx, new_id in mods:
+ instance.disks[idx].logical_id = new_id
+ # change primary node, if needed
if self.op.nodes:
- self.cfg.Update(self.instance, feedback_fn)
+ instance.primary_node = self.op.nodes[0]
+ self.LogWarning("Changing the instance's nodes, you will have to"
+ " remove any disks left on the older nodes manually")
+
+ if self.op.nodes:
+ self.cfg.Update(instance, feedback_fn)
- _CreateDisks(self, self.instance, to_skip=to_skip)
+ _CreateDisks(self, instance, to_skip=to_skip)
class LUInstanceRename(LogicalUnit):
_ShutdownInstanceDisks(self, instance)
raise errors.OpExecError("Can't activate the instance's disks")
- result = self.rpc.call_instance_start(target_node, instance, None, None)
+ result = self.rpc.call_instance_start(target_node, instance,
+ None, None, False)
msg = result.fail_msg
if msg:
_ShutdownInstanceDisks(self, instance)
def ExpandNames(self):
self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
- self.share_locks = dict.fromkeys(locking.LEVELS, 1)
+ self.share_locks = _ShareAll()
self.needed_locks = {
locking.LEVEL_NODE: [self.op.node_name],
}
self.feedback_fn("* checking disk consistency between source and target")
for dev in instance.disks:
# for drbd, these are drbd over lvm
- if not _CheckDiskConsistency(self, dev, target_node, False):
- if not self.ignore_consistency:
+ if not _CheckDiskConsistency(self.lu, dev, target_node, False):
+ if primary_node.offline:
+ self.feedback_fn("Node %s is offline, ignoring degraded disk %s on"
+ " target node %s" %
+ (primary_node.name, dev.iv_name, target_node))
+ elif not self.ignore_consistency:
raise errors.OpExecError("Disk %s is degraded on target node,"
" aborting failover" % dev.iv_name)
else:
(instance.name, source_node, msg))
self.feedback_fn("* deactivating the instance's disks on source node")
- if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
- raise errors.OpExecError("Can't shut down the instance's disks.")
+ if not _ShutdownInstanceDisks(self.lu, instance, ignore_primary=True):
+ raise errors.OpExecError("Can't shut down the instance's disks")
instance.primary_node = target_node
# distribute new instance config to the other nodes
# Only start the instance if it's marked as up
if instance.admin_up:
- self.feedback_fn("* activating the instance's disks on target node")
+ self.feedback_fn("* activating the instance's disks on target node %s" %
+ target_node)
logging.info("Starting instance %s on node %s",
instance.name, target_node)
- disks_ok, _ = _AssembleInstanceDisks(self, instance,
+ disks_ok, _ = _AssembleInstanceDisks(self.lu, instance,
ignore_secondaries=True)
if not disks_ok:
- _ShutdownInstanceDisks(self, instance)
+ _ShutdownInstanceDisks(self.lu, instance)
raise errors.OpExecError("Can't activate the instance's disks")
- self.feedback_fn("* starting the instance on the target node")
- result = self.rpc.call_instance_start(target_node, instance, None, None)
+ self.feedback_fn("* starting the instance on the target node %s" %
+ target_node)
+ result = self.rpc.call_instance_start(target_node, instance, None, None,
+ False)
msg = result.fail_msg
if msg:
- _ShutdownInstanceDisks(self, instance)
+ _ShutdownInstanceDisks(self.lu, instance)
raise errors.OpExecError("Could not start instance %s on node %s: %s" %
(instance.name, target_node, msg))
disk_images = []
for idx in range(export_disks):
- option = 'disk%d_dump' % idx
+ option = "disk%d_dump" % idx
if export_info.has_option(constants.INISECT_INS, option):
# FIXME: are the old os-es, disk sizes, etc. useful?
export_name = export_info.get(constants.INISECT_INS, option)
self.src_images = disk_images
- old_name = export_info.get(constants.INISECT_INS, 'name')
+ old_name = export_info.get(constants.INISECT_INS, "name")
try:
- exp_nic_count = export_info.getint(constants.INISECT_INS, 'nic_count')
+ exp_nic_count = export_info.getint(constants.INISECT_INS, "nic_count")
except (TypeError, ValueError), err:
raise errors.OpPrereqError("Invalid export file, nic_count is not"
" an integer: %s" % str(err),
if self.op.instance_name == old_name:
for idx, nic in enumerate(self.nics):
if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
- nic_mac_ini = 'nic%d_mac' % idx
+ nic_mac_ini = "nic%d_mac" % idx
nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
# ENDIF: self.op.mode == constants.INSTANCE_IMPORT
self.cfg.Update(iobj, feedback_fn)
logging.info("Starting instance %s on node %s", instance, pnode_name)
feedback_fn("* starting instance...")
- result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
+ result = self.rpc.call_instance_start(pnode_name, iobj,
+ None, None, False)
result.Raise("Could not start instance")
return list(iobj.all_nodes)
ial = IAllocator(lu.cfg, lu.rpc,
mode=constants.IALLOCATOR_MODE_RELOC,
name=instance_name,
- relocate_from=relocate_from)
+ relocate_from=list(relocate_from))
ial.Run(iallocator_name)
(node_name, self.instance.name))
def _CreateNewStorage(self, node_name):
+ """Create new storage on the primary or secondary node.
+
+ This is only used for same-node replaces, not for changing the
+ secondary node, hence we don't want to modify the existing disk.
+
+ """
iv_names = {}
for idx, dev in enumerate(self.instance.disks):
logical_id=(vg_meta, names[1]))
new_lvs = [lv_data, lv_meta]
- old_lvs = dev.children
+ old_lvs = [child.Copy() for child in dev.children]
iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
# we pass force_create=True to force the LVM creation
self.lu.LogWarning("Can't remove old LV: %s" % msg,
hint="remove unused LVs manually")
- def _ExecDrbd8DiskOnly(self, feedback_fn):
+ def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable-msg=W0613
"""Replace a disk on the primary or secondary for DRBD 8.
The algorithm for replace is quite complicated:
rename_new_to_old)
result.Raise("Can't rename new LVs on node %s" % self.target_node)
+ # Intermediate steps of in memory modifications
for old, new in zip(old_lvs, new_lvs):
new.logical_id = old.logical_id
self.cfg.SetDiskID(new, self.target_node)
+ # We need to modify old_lvs so that removal later removes the
+ # right LVs, not the newly added ones; note that old_lvs is a
+ # copy here
for disk in old_lvs:
disk.logical_id = ren_fn(disk, temp_suffix)
self.cfg.SetDiskID(disk, self.target_node)
"volumes"))
raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
- dev.children = new_lvs
-
- self.cfg.Update(self.instance, feedback_fn)
-
cstep = 5
if self.early_release:
self.lu.LogStep(cstep, steps_total, "Removing old storage")
(self.op.name, self.op.node_name))
-class LUNodeEvacStrategy(NoHooksLU):
- """Computes the node evacuation strategy.
+class LUNodeEvacuate(NoHooksLU):
+ """Evacuates instances off a list of nodes.
"""
REQ_BGL = False
_CheckIAllocatorOrNode(self, "iallocator", "remote_node")
def ExpandNames(self):
- self.op.nodes = _GetWantedNodes(self, self.op.nodes)
- self.needed_locks = locks = {}
+ self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
+
+ if self.op.remote_node is not None:
+ self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node)
+ assert self.op.remote_node
+
+ if self.op.remote_node == self.op.node_name:
+ raise errors.OpPrereqError("Can not use evacuated node as a new"
+ " secondary node", errors.ECODE_INVAL)
+
+ if self.op.mode != constants.IALLOCATOR_NEVAC_SEC:
+ raise errors.OpPrereqError("Without the use of an iallocator only"
+ " secondary instances can be evacuated",
+ errors.ECODE_INVAL)
+
+ # Declare locks
+ self.share_locks = _ShareAll()
+ self.needed_locks = {
+ locking.LEVEL_INSTANCE: [],
+ locking.LEVEL_NODEGROUP: [],
+ locking.LEVEL_NODE: [],
+ }
+
if self.op.remote_node is None:
- locks[locking.LEVEL_NODE] = locking.ALL_SET
+ # Iallocator will choose any node(s) in the same group
+ group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_name])
else:
- self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node)
- locks[locking.LEVEL_NODE] = self.op.nodes + [self.op.remote_node]
+ group_nodes = frozenset([self.op.remote_node])
- def Exec(self, feedback_fn):
- instances = []
- for node in self.op.nodes:
- instances.extend(_GetNodeSecondaryInstances(self.cfg, node))
- if not instances:
- return []
+ # Determine nodes to be locked
+ self.lock_nodes = set([self.op.node_name]) | group_nodes
+
+ def _DetermineInstances(self):
+ """Builds list of instances to operate on.
+
+ """
+ assert self.op.mode in constants.IALLOCATOR_NEVAC_MODES
+
+ if self.op.mode == constants.IALLOCATOR_NEVAC_PRI:
+ # Primary instances only
+ inst_fn = _GetNodePrimaryInstances
+ assert self.op.remote_node is None, \
+ "Evacuating primary instances requires iallocator"
+ elif self.op.mode == constants.IALLOCATOR_NEVAC_SEC:
+ # Secondary instances only
+ inst_fn = _GetNodeSecondaryInstances
+ else:
+ # All instances
+ assert self.op.mode == constants.IALLOCATOR_NEVAC_ALL
+ inst_fn = _GetNodeInstances
+
+ return inst_fn(self.cfg, self.op.node_name)
+
+ def DeclareLocks(self, level):
+ if level == locking.LEVEL_INSTANCE:
+ # Lock instances optimistically, needs verification once node and group
+ # locks have been acquired
+ self.needed_locks[locking.LEVEL_INSTANCE] = \
+ set(i.name for i in self._DetermineInstances())
+
+ elif level == locking.LEVEL_NODEGROUP:
+ # Lock node groups optimistically, needs verification once nodes have
+ # been acquired
+ self.needed_locks[locking.LEVEL_NODEGROUP] = \
+ self.cfg.GetNodeGroupsFromNodes(self.lock_nodes)
+
+ elif level == locking.LEVEL_NODE:
+ self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes
+
+ def CheckPrereq(self):
+ # Verify locks
+ owned_instances = self.glm.list_owned(locking.LEVEL_INSTANCE)
+ owned_nodes = self.glm.list_owned(locking.LEVEL_NODE)
+ owned_groups = self.glm.list_owned(locking.LEVEL_NODEGROUP)
+
+ assert owned_nodes == self.lock_nodes
+
+ wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes)
+ if owned_groups != wanted_groups:
+ raise errors.OpExecError("Node groups changed since locks were acquired,"
+ " current groups are '%s', used to be '%s'" %
+ (utils.CommaJoin(wanted_groups),
+ utils.CommaJoin(owned_groups)))
+
+ # Determine affected instances
+ self.instances = self._DetermineInstances()
+ self.instance_names = [i.name for i in self.instances]
+
+ if set(self.instance_names) != owned_instances:
+ raise errors.OpExecError("Instances on node '%s' changed since locks"
+ " were acquired, current instances are '%s',"
+ " used to be '%s'" %
+ (self.op.node_name,
+ utils.CommaJoin(self.instance_names),
+ utils.CommaJoin(owned_instances)))
+
+ if self.instance_names:
+ self.LogInfo("Evacuating instances from node '%s': %s",
+ self.op.node_name,
+ utils.CommaJoin(utils.NiceSort(self.instance_names)))
+ else:
+ self.LogInfo("No instances to evacuate from node '%s'",
+ self.op.node_name)
if self.op.remote_node is not None:
- result = []
- for i in instances:
+ for i in self.instances:
if i.primary_node == self.op.remote_node:
raise errors.OpPrereqError("Node %s is the primary node of"
" instance %s, cannot use it as"
" secondary" %
(self.op.remote_node, i.name),
errors.ECODE_INVAL)
- result.append([i.name, self.op.remote_node])
- else:
- ial = IAllocator(self.cfg, self.rpc,
- mode=constants.IALLOCATOR_MODE_MEVAC,
- evac_nodes=self.op.nodes)
- ial.Run(self.op.iallocator, validate=True)
+
+ def Exec(self, feedback_fn):
+ assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None)
+
+ if not self.instance_names:
+ # No instances to evacuate
+ jobs = []
+
+ elif self.op.iallocator is not None:
+ # TODO: Implement relocation to other group
+ ial = IAllocator(self.cfg, self.rpc, constants.IALLOCATOR_MODE_NODE_EVAC,
+ evac_mode=self.op.mode,
+ instances=list(self.instance_names))
+
+ ial.Run(self.op.iallocator)
+
if not ial.success:
- raise errors.OpExecError("No valid evacuation solution: %s" % ial.info,
- errors.ECODE_NORES)
- result = ial.result
- return result
+ raise errors.OpPrereqError("Can't compute node evacuation using"
+ " iallocator '%s': %s" %
+ (self.op.iallocator, ial.info),
+ errors.ECODE_NORES)
+
+ jobs = _LoadNodeEvacResult(self, ial.result, self.op.early_release, True)
+
+ elif self.op.remote_node is not None:
+ assert self.op.mode == constants.IALLOCATOR_NEVAC_SEC
+ jobs = [
+ [opcodes.OpInstanceReplaceDisks(instance_name=instance_name,
+ remote_node=self.op.remote_node,
+ disks=[],
+ mode=constants.REPLACE_DISK_CHG,
+ early_release=self.op.early_release)]
+ for instance_name in self.instance_names
+ ]
+
+ else:
+ raise errors.ProgrammerError("No iallocator or remote node")
+
+ return ResultWithJobs(jobs)
+
+
+def _SetOpEarlyRelease(early_release, op):
+ """Sets C{early_release} flag on opcodes if available.
+
+ """
+ try:
+ op.early_release = early_release
+ except AttributeError:
+ assert not isinstance(op, opcodes.OpInstanceReplaceDisks)
+
+ return op
+
+
+def _NodeEvacDest(use_nodes, group, nodes):
+ """Returns group or nodes depending on caller's choice.
+
+ """
+ if use_nodes:
+ return utils.CommaJoin(nodes)
+ else:
+ return group
+
+
+def _LoadNodeEvacResult(lu, alloc_result, early_release, use_nodes):
+ """Unpacks the result of change-group and node-evacuate iallocator requests.
+
+ Iallocator modes L{constants.IALLOCATOR_MODE_NODE_EVAC} and
+ L{constants.IALLOCATOR_MODE_CHG_GROUP}.
+
+ @type lu: L{LogicalUnit}
+ @param lu: Logical unit instance
+ @type alloc_result: tuple/list
+ @param alloc_result: Result from iallocator
+ @type early_release: bool
+ @param early_release: Whether to release locks early if possible
+ @type use_nodes: bool
+ @param use_nodes: Whether to display node names instead of groups
+
+ """
+ (moved, failed, jobs) = alloc_result
+
+ if failed:
+ lu.LogWarning("Unable to evacuate instances %s",
+ utils.CommaJoin("%s (%s)" % (name, reason)
+ for (name, reason) in failed))
+
+ if moved:
+ lu.LogInfo("Instances to be moved: %s",
+ utils.CommaJoin("%s (to %s)" %
+ (name, _NodeEvacDest(use_nodes, group, nodes))
+ for (name, group, nodes) in moved))
+
+ return [map(compat.partial(_SetOpEarlyRelease, early_release),
+ map(opcodes.OpCode.LoadOpCode, ops))
+ for ops in jobs]
class LUInstanceGrowDisk(LogicalUnit):
self.wanted_names = None
if self.op.use_locking:
- self.share_locks = dict.fromkeys(locking.LEVELS, 1)
+ self.share_locks = _ShareAll()
if self.wanted_names is None:
self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
self.needed_locks[locking.LEVEL_NODE] = []
- self.share_locks = dict.fromkeys(locking.LEVELS, 1)
self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
def DeclareLocks(self, level):
dev_sstatus = self._ComputeBlockdevStatus(snode, instance.name, dev)
if dev.children:
- dev_children = [self._ComputeDiskStatus(instance, snode, child)
- for child in dev.children]
+ dev_children = map(compat.partial(self._ComputeDiskStatus,
+ instance, snode),
+ dev.children)
else:
dev_children = []
cluster = self.cfg.GetClusterInfo()
for instance in self.wanted_instances:
- if not self.op.static:
+ pnode = self.cfg.GetNodeInfo(instance.primary_node)
+
+ if self.op.static or pnode.offline:
+ remote_state = None
+ if pnode.offline:
+ self.LogWarning("Primary node %s is marked offline, returning static"
+ " information only for instance %s" %
+ (pnode.name, instance.name))
+ else:
remote_info = self.rpc.call_instance_info(instance.primary_node,
instance.name,
instance.hypervisor)
remote_state = "up"
else:
remote_state = "down"
- else:
- remote_state = None
+
if instance.admin_up:
config_state = "up"
else:
config_state = "down"
- disks = [self._ComputeDiskStatus(instance, None, device)
- for device in instance.disks]
+ disks = map(compat.partial(self._ComputeDiskStatus, instance, None),
+ instance.disks)
result[instance.name] = {
"name": instance.name,
raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip,
errors.ECODE_INVAL)
- nic_bridge = nic_dict.get('bridge', None)
+ nic_bridge = nic_dict.get("bridge", None)
nic_link = nic_dict.get(constants.INIC_LINK, None)
if nic_bridge and nic_link:
raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
" at the same time", errors.ECODE_INVAL)
elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
- nic_dict['bridge'] = None
+ nic_dict["bridge"] = None
elif nic_link and nic_link.lower() == constants.VALUE_NONE:
nic_dict[constants.INIC_LINK] = None
"""
args = dict()
if constants.BE_MEMORY in self.be_new:
- args['memory'] = self.be_new[constants.BE_MEMORY]
+ args["memory"] = self.be_new[constants.BE_MEMORY]
if constants.BE_VCPUS in self.be_new:
- args['vcpus'] = self.be_new[constants.BE_VCPUS]
+ args["vcpus"] = self.be_new[constants.BE_VCPUS]
# TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
# information at all.
if self.op.nics:
- args['nics'] = []
+ args["nics"] = []
nic_override = dict(self.op.nics)
for idx, nic in enumerate(self.instance.nics):
if idx in nic_override:
nicparams = self.cluster.SimpleFillNIC(nic.nicparams)
mode = nicparams[constants.NIC_MODE]
link = nicparams[constants.NIC_LINK]
- args['nics'].append((ip, mac, mode, link))
+ args["nics"].append((ip, mac, mode, link))
if constants.DDM_ADD in nic_override:
ip = nic_override[constants.DDM_ADD].get(constants.INIC_IP, None)
mac = nic_override[constants.DDM_ADD][constants.INIC_MAC]
nicparams = self.nic_pnew[constants.DDM_ADD]
mode = nicparams[constants.NIC_MODE]
link = nicparams[constants.NIC_LINK]
- args['nics'].append((ip, mac, mode, link))
+ args["nics"].append((ip, mac, mode, link))
elif constants.DDM_REMOVE in nic_override:
- del args['nics'][-1]
+ del args["nics"][-1]
env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
if self.op.disk_template:
# Assume the primary node is unreachable and go ahead
self.warn.append("Can't get info from primary node %s: %s" %
(pnode, msg))
- elif not isinstance(pninfo.payload.get('memory_free', None), int):
+ elif not isinstance(pninfo.payload.get("memory_free", None), int):
self.warn.append("Node data from primary node %s doesn't contain"
" free memory information" % pnode)
elif instance_info.fail_msg:
instance_info.fail_msg)
else:
if instance_info.payload:
- current_mem = int(instance_info.payload['memory'])
+ current_mem = int(instance_info.payload["memory"])
else:
# Assume instance not running
# (there is a slight race condition here, but it's not very probable,
# and we have no other way to check)
current_mem = 0
miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
- pninfo.payload['memory_free'])
+ pninfo.payload["memory_free"])
if miss_mem > 0:
raise errors.OpPrereqError("This change will prevent the instance"
" from starting, due to %d MB of memory"
continue
nres.Raise("Can't get info from secondary node %s" % node,
prereq=True, ecode=errors.ECODE_STATE)
- if not isinstance(nres.payload.get('memory_free', None), int):
+ if not isinstance(nres.payload.get("memory_free", None), int):
raise errors.OpPrereqError("Secondary node %s didn't return free"
" memory information" % node,
errors.ECODE_STATE)
- elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']:
+ elif be_new[constants.BE_MEMORY] > nres.payload["memory_free"]:
raise errors.OpPrereqError("This change will prevent the instance"
" from failover to its secondary node"
" %s, due to not enough memory" % node,
for key in constants.NICS_PARAMETERS
if key in nic_dict])
- if 'bridge' in nic_dict:
- update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
+ if "bridge" in nic_dict:
+ update_params_dict[constants.NIC_LINK] = nic_dict["bridge"]
new_nic_params = _GetUpdatedParams(old_nic_params,
update_params_dict)
else:
nic_ip = old_nic_ip
if nic_ip is None:
- raise errors.OpPrereqError('Cannot set the nic ip to None'
- ' on a routed nic', errors.ECODE_INVAL)
+ raise errors.OpPrereqError("Cannot set the nic ip to None"
+ " on a routed nic", errors.ECODE_INVAL)
if constants.INIC_MAC in nic_dict:
nic_mac = nic_dict[constants.INIC_MAC]
if nic_mac is None:
- raise errors.OpPrereqError('Cannot set the nic mac to None',
+ raise errors.OpPrereqError("Cannot set the nic mac to None",
errors.ECODE_INVAL)
elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
# otherwise generate the mac
not self.op.remove_instance):
assert not activate_disks
feedback_fn("Starting instance %s" % instance.name)
- result = self.rpc.call_instance_start(src_node, instance, None, None)
+ result = self.rpc.call_instance_start(src_node, instance,
+ None, None, False)
msg = result.fail_msg
if msg:
feedback_fn("Failed to start instance: %s" % msg)
return self.op.new_name
+class LUGroupEvacuate(LogicalUnit):
+ HPATH = "group-evacuate"
+ HTYPE = constants.HTYPE_GROUP
+ REQ_BGL = False
+
+ def ExpandNames(self):
+ # This raises errors.OpPrereqError on its own:
+ self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
+
+ if self.op.target_groups:
+ self.req_target_uuids = map(self.cfg.LookupNodeGroup,
+ self.op.target_groups)
+ else:
+ self.req_target_uuids = []
+
+ if self.group_uuid in self.req_target_uuids:
+ raise errors.OpPrereqError("Group to be evacuated (%s) can not be used"
+ " as a target group (targets are %s)" %
+ (self.group_uuid,
+ utils.CommaJoin(self.req_target_uuids)),
+ errors.ECODE_INVAL)
+
+ if not self.op.iallocator:
+ # Use default iallocator
+ self.op.iallocator = self.cfg.GetDefaultIAllocator()
+
+ if not self.op.iallocator:
+ raise errors.OpPrereqError("No iallocator was specified, neither in the"
+ " opcode nor as a cluster-wide default",
+ errors.ECODE_INVAL)
+
+ self.share_locks = _ShareAll()
+ self.needed_locks = {
+ locking.LEVEL_INSTANCE: [],
+ locking.LEVEL_NODEGROUP: [],
+ locking.LEVEL_NODE: [],
+ }
+
+ def DeclareLocks(self, level):
+ if level == locking.LEVEL_INSTANCE:
+ assert not self.needed_locks[locking.LEVEL_INSTANCE]
+
+ # Lock instances optimistically, needs verification once node and group
+ # locks have been acquired
+ self.needed_locks[locking.LEVEL_INSTANCE] = \
+ self.cfg.GetNodeGroupInstances(self.group_uuid)
+
+ elif level == locking.LEVEL_NODEGROUP:
+ assert not self.needed_locks[locking.LEVEL_NODEGROUP]
+
+ if self.req_target_uuids:
+ lock_groups = set([self.group_uuid] + self.req_target_uuids)
+
+ # Lock all groups used by instances optimistically; this requires going
+ # via the node before it's locked, requiring verification later on
+ lock_groups.update(group_uuid
+ for instance_name in
+ self.glm.list_owned(locking.LEVEL_INSTANCE)
+ for group_uuid in
+ self.cfg.GetInstanceNodeGroups(instance_name))
+ else:
+ # No target groups, need to lock all of them
+ lock_groups = locking.ALL_SET
+
+ self.needed_locks[locking.LEVEL_NODEGROUP] = lock_groups
+
+ elif level == locking.LEVEL_NODE:
+ # This will only lock the nodes in the group to be evacuated which
+ # contain actual instances
+ self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
+ self._LockInstancesNodes()
+
+ # Lock all nodes in group to be evacuated
+ assert self.group_uuid in self.glm.list_owned(locking.LEVEL_NODEGROUP)
+ member_nodes = self.cfg.GetNodeGroup(self.group_uuid).members
+ self.needed_locks[locking.LEVEL_NODE].extend(member_nodes)
+
+ def CheckPrereq(self):
+ owned_instances = frozenset(self.glm.list_owned(locking.LEVEL_INSTANCE))
+ owned_groups = frozenset(self.glm.list_owned(locking.LEVEL_NODEGROUP))
+ owned_nodes = frozenset(self.glm.list_owned(locking.LEVEL_NODE))
+
+ assert owned_groups.issuperset(self.req_target_uuids)
+ assert self.group_uuid in owned_groups
+
+ # Check if locked instances are still correct
+ wanted_instances = self.cfg.GetNodeGroupInstances(self.group_uuid)
+ if owned_instances != wanted_instances:
+ raise errors.OpPrereqError("Instances in node group to be evacuated (%s)"
+ " changed since locks were acquired, wanted"
+ " %s, have %s; retry the operation" %
+ (self.group_uuid,
+ utils.CommaJoin(wanted_instances),
+ utils.CommaJoin(owned_instances)),
+ errors.ECODE_STATE)
+
+ # Get instance information
+ self.instances = dict((name, self.cfg.GetInstanceInfo(name))
+ for name in owned_instances)
+
+ # Check if node groups for locked instances are still correct
+ for instance_name in owned_instances:
+ inst = self.instances[instance_name]
+ assert self.group_uuid in self.cfg.GetInstanceNodeGroups(instance_name), \
+ "Instance %s has no node in group %s" % (instance_name, self.group_uuid)
+ assert owned_nodes.issuperset(inst.all_nodes), \
+ "Instance %s's nodes changed while we kept the lock" % instance_name
+
+ inst_groups = self.cfg.GetInstanceNodeGroups(instance_name)
+ if not owned_groups.issuperset(inst_groups):
+ raise errors.OpPrereqError("Instance %s's node groups changed since"
+ " locks were acquired, current groups"
+ " are '%s', owning groups '%s'; retry the"
+ " operation" %
+ (instance_name,
+ utils.CommaJoin(inst_groups),
+ utils.CommaJoin(owned_groups)),
+ errors.ECODE_STATE)
+
+ if self.req_target_uuids:
+ # User requested specific target groups
+ self.target_uuids = self.req_target_uuids
+ else:
+ # All groups except the one to be evacuated are potential targets
+ self.target_uuids = [group_uuid for group_uuid in owned_groups
+ if group_uuid != self.group_uuid]
+
+ if not self.target_uuids:
+ raise errors.OpExecError("There are no possible target groups")
+
+ def BuildHooksEnv(self):
+ """Build hooks env.
+
+ """
+ return {
+ "GROUP_NAME": self.op.group_name,
+ "TARGET_GROUPS": " ".join(self.target_uuids),
+ }
+
+ def BuildHooksNodes(self):
+ """Build hooks nodes.
+
+ """
+ mn = self.cfg.GetMasterNode()
+
+ assert self.group_uuid in self.glm.list_owned(locking.LEVEL_NODEGROUP)
+
+ run_nodes = [mn] + self.cfg.GetNodeGroup(self.group_uuid).members
+
+ return (run_nodes, run_nodes)
+
+ def Exec(self, feedback_fn):
+ instances = list(self.glm.list_owned(locking.LEVEL_INSTANCE))
+
+ assert self.group_uuid not in self.target_uuids
+
+ ial = IAllocator(self.cfg, self.rpc, constants.IALLOCATOR_MODE_CHG_GROUP,
+ instances=instances, target_groups=self.target_uuids)
+
+ ial.Run(self.op.iallocator)
+
+ if not ial.success:
+ raise errors.OpPrereqError("Can't compute group evacuation using"
+ " iallocator '%s': %s" %
+ (self.op.iallocator, ial.info),
+ errors.ECODE_NORES)
+
+ jobs = _LoadNodeEvacResult(self, ial.result, self.op.early_release, False)
+
+ self.LogInfo("Iallocator returned %s job(s) for evacuating node group %s",
+ len(jobs), self.op.group_name)
+
+ return ResultWithJobs(jobs)
+
+
class TagsLU(NoHooksLU): # pylint: disable-msg=W0223
"""Generic tags LU.
TagsLU.ExpandNames(self)
# Share locks as this is only a read operation
- self.share_locks = dict.fromkeys(locking.LEVELS, 1)
+ self.share_locks = _ShareAll()
def Exec(self, feedback_fn):
"""Returns the tag list.
nname)
remote_info = nresult.payload
- for attr in ['memory_total', 'memory_free', 'memory_dom0',
- 'vg_size', 'vg_free', 'cpu_total']:
+ for attr in ["memory_total", "memory_free", "memory_dom0",
+ "vg_size", "vg_free", "cpu_total"]:
if attr not in remote_info:
raise errors.OpExecError("Node '%s' didn't return attribute"
" '%s'" % (nname, attr))
if iinfo.name not in node_iinfo[nname].payload:
i_used_mem = 0
else:
- i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory'])
+ i_used_mem = int(node_iinfo[nname].payload[iinfo.name]["memory"])
i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
- remote_info['memory_free'] -= max(0, i_mem_diff)
+ remote_info["memory_free"] -= max(0, i_mem_diff)
if iinfo.admin_up:
i_p_up_mem += beinfo[constants.BE_MEMORY]
# compute memory used by instances
pnr_dyn = {
- "total_memory": remote_info['memory_total'],
- "reserved_memory": remote_info['memory_dom0'],
- "free_memory": remote_info['memory_free'],
- "total_disk": remote_info['vg_size'],
- "free_disk": remote_info['vg_free'],
- "total_cpus": remote_info['cpu_total'],
+ "total_memory": remote_info["memory_total"],
+ "reserved_memory": remote_info["memory_dom0"],
+ "free_memory": remote_info["memory_free"],
+ "total_disk": remote_info["vg_size"],
+ "free_disk": remote_info["vg_free"],
+ "total_cpus": remote_info["cpu_total"],
"i_pri_memory": i_p_mem,
"i_pri_up_memory": i_p_up_mem,
}
self.in_text = serializer.Dump(self.in_data)
_STRING_LIST = ht.TListOf(ht.TString)
- _JOBSET_LIST = ht.TListOf(ht.TListOf(ht.TStrictDict(True, False, {
+ _JOB_LIST = ht.TListOf(ht.TListOf(ht.TStrictDict(True, False, {
# pylint: disable-msg=E1101
# Class '...' has no 'OP_ID' member
"OP_ID": ht.TElemOf([opcodes.OpInstanceFailover.OP_ID,
opcodes.OpInstanceMigrate.OP_ID,
opcodes.OpInstanceReplaceDisks.OP_ID])
})))
+
+ _NEVAC_MOVED = \
+ ht.TListOf(ht.TAnd(ht.TIsLength(3),
+ ht.TItems([ht.TNonEmptyString,
+ ht.TNonEmptyString,
+ ht.TListOf(ht.TNonEmptyString),
+ ])))
+ _NEVAC_FAILED = \
+ ht.TListOf(ht.TAnd(ht.TIsLength(2),
+ ht.TItems([ht.TNonEmptyString,
+ ht.TMaybeString,
+ ])))
+ _NEVAC_RESULT = ht.TAnd(ht.TIsLength(3),
+ ht.TItems([_NEVAC_MOVED, _NEVAC_FAILED, _JOB_LIST]))
+
_MODE_DATA = {
constants.IALLOCATOR_MODE_ALLOC:
(_AddNewInstance,
(_AddNodeEvacuate, [
("instances", _STRING_LIST),
("evac_mode", ht.TElemOf(constants.IALLOCATOR_NEVAC_MODES)),
- ], _JOBSET_LIST),
+ ], _NEVAC_RESULT),
constants.IALLOCATOR_MODE_CHG_GROUP:
(_AddChangeGroup, [
("instances", _STRING_LIST),
("target_groups", _STRING_LIST),
- ], _JOBSET_LIST),
+ ], _NEVAC_RESULT),
}
def Run(self, name, validate=True, call_fn=None):
else:
raise errors.ProgrammerError("Unhandled mode '%s'" % self.mode)
+ elif self.mode == constants.IALLOCATOR_MODE_NODE_EVAC:
+ assert self.evac_mode in constants.IALLOCATOR_NEVAC_MODES
+
self.out_data = rdict
@staticmethod
elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
fname = _ExpandInstanceName(self.cfg, self.op.name)
self.op.name = fname
- self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
+ self.relocate_from = \
+ list(self.cfg.GetInstanceInfo(fname).secondary_nodes)
elif self.op.mode == constants.IALLOCATOR_MODE_MEVAC:
if not hasattr(self.op, "evac_nodes"):
raise errors.OpPrereqError("Missing attribute 'evac_nodes' on"