from ganeti.cmdlib.base import ResultWithJobs, LogicalUnit, NoHooksLU, \
Tasklet, _QueryBase
-from ganeti.cmdlib.common import _ExpandInstanceName, _ExpandItemName, \
+from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_ONLINE, \
+ INSTANCE_NOT_RUNNING, CAN_CHANGE_INSTANCE_OFFLINE, \
+ _ExpandInstanceName, _ExpandItemName, \
_ExpandNodeName, _ShareAll, _CheckNodeGroupInstances, _GetWantedNodes, \
_GetWantedInstances, _RunPostHook, _RedistributeAncillaryFiles, \
_MergeAndVerifyHvState, _MergeAndVerifyDiskState, _GetUpdatedIPolicy, \
_ComputeIPolicyInstanceViolation, _AnnotateDiskParams, _SupportsOob, \
_ComputeIPolicySpecViolation, _GetDefaultIAllocator, \
_CheckInstancesNodeGroups, _LoadNodeEvacResult, _MapInstanceDisksToNodes, \
- _CheckInstanceNodeGroups
+ _CheckInstanceNodeGroups, _CheckParamsNotGlobal, \
+ _IsExclusiveStorageEnabledNode, _CheckInstanceState, \
+ _CheckIAllocatorOrNode, _FindFaultyInstanceDisks
from ganeti.cmdlib.cluster import LUClusterActivateMasterIp, \
LUClusterDeactivateMasterIp, LUClusterConfigQuery, LUClusterDestroy, \
from ganeti.cmdlib.group import LUGroupAdd, LUGroupAssignNodes, \
_GroupQuery, LUGroupQuery, LUGroupSetParams, LUGroupRemove, \
LUGroupRename, LUGroupEvacuate, LUGroupVerifyDisks
+from ganeti.cmdlib.node import LUNodeAdd, LUNodeSetParams, \
+ LUNodePowercycle, LUNodeEvacuate, LUNodeMigrate, LUNodeModifyStorage, \
+ _NodeQuery, LUNodeQuery, LUNodeQueryvols, LUNodeQueryStorage, \
+ LUNodeRemove, LURepairNodeStorage
from ganeti.cmdlib.tags import LUTagsGet, LUTagsSearch, LUTagsSet, LUTagsDel
from ganeti.cmdlib.network import LUNetworkAdd, LUNetworkRemove, \
LUNetworkSetParams, _NetworkQuery, LUNetworkQuery, LUNetworkConnect, \
import ganeti.masterd.instance # pylint: disable=W0611
-# States of instance
-INSTANCE_DOWN = [constants.ADMINST_DOWN]
-INSTANCE_ONLINE = [constants.ADMINST_DOWN, constants.ADMINST_UP]
-INSTANCE_NOT_RUNNING = [constants.ADMINST_DOWN, constants.ADMINST_OFFLINE]
-
-#: Instance status in which an instance can be marked as offline/online
-CAN_CHANGE_INSTANCE_OFFLINE = (frozenset(INSTANCE_DOWN) | frozenset([
- constants.ADMINST_OFFLINE,
- ]))
-
-
-def _IsExclusiveStorageEnabledNode(cfg, node):
- """Whether exclusive_storage is in effect for the given node.
-
- @type cfg: L{config.ConfigWriter}
- @param cfg: The cluster configuration
- @type node: L{objects.Node}
- @param node: The node
- @rtype: bool
- @return: The effective value of exclusive_storage
-
- """
- return cfg.GetNdParams(node)[constants.ND_EXCLUSIVE_STORAGE]
-
-
def _IsExclusiveStorageEnabledNodeName(cfg, nodename):
"""Whether exclusive_storage is in effect for the given node.
assert not lu.glm.is_owned(level), "No locks should be owned"
-def _CheckOutputFields(static, dynamic, selected):
- """Checks whether all selected fields are valid.
-
- @type static: L{utils.FieldSet}
- @param static: static fields set
- @type dynamic: L{utils.FieldSet}
- @param dynamic: dynamic fields set
-
- """
- f = utils.FieldSet()
- f.Extend(static)
- f.Extend(dynamic)
-
- delta = f.NonMatching(selected)
- if delta:
- raise errors.OpPrereqError("Unknown output fields selected: %s"
- % ",".join(delta), errors.ECODE_INVAL)
-
-
-def _CheckParamsNotGlobal(params, glob_pars, kind, bad_levels, good_levels):
- """Make sure that none of the given paramters is global.
-
- If a global parameter is found, an L{errors.OpPrereqError} exception is
- raised. This is used to avoid setting global parameters for individual nodes.
-
- @type params: dictionary
- @param params: Parameters to check
- @type glob_pars: dictionary
- @param glob_pars: Forbidden parameters
- @type kind: string
- @param kind: Kind of parameters (e.g. "node")
- @type bad_levels: string
- @param bad_levels: Level(s) at which the parameters are forbidden (e.g.
- "instance")
- @type good_levels: strings
- @param good_levels: Level(s) at which the parameters are allowed (e.g.
- "cluster or group")
-
- """
- used_globals = glob_pars.intersection(params)
- if used_globals:
- msg = ("The following %s parameters are global and cannot"
- " be customized at %s level, please modify them at"
- " %s level: %s" %
- (kind, bad_levels, good_levels, utils.CommaJoin(used_globals)))
- raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
-
-
def _CheckNodeOnline(lu, node, msg=None):
"""Ensure that a given node is online.
_CheckOSVariant(result.payload, os_name)
-def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
- """Ensure that a node has the given secondary ip.
-
- @type lu: L{LogicalUnit}
- @param lu: the LU on behalf of which we make the check
- @type node: string
- @param node: the node to check
- @type secondary_ip: string
- @param secondary_ip: the ip to check
- @type prereq: boolean
- @param prereq: whether to throw a prerequisite or an execute error
- @raise errors.OpPrereqError: if the node doesn't have the ip, and prereq=True
- @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
-
- """
- result = lu.rpc.call_node_has_ip_address(node, secondary_ip)
- result.Raise("Failure checking secondary ip on node %s" % node,
- prereq=prereq, ecode=errors.ECODE_ENVIRON)
- if not result.payload:
- msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
- " please fix and re-run this command" % secondary_ip)
- if prereq:
- raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
- else:
- raise errors.OpExecError(msg)
-
-
def _GetClusterDomainSecret():
"""Reads the cluster domain secret.
strict=True)
-def _CheckInstanceState(lu, instance, req_states, msg=None):
- """Ensure that an instance is in one of the required states.
-
- @param lu: the LU on behalf of which we make the check
- @param instance: the instance to check
- @param msg: if passed, should be a message to replace the default one
- @raise errors.OpPrereqError: if the instance is not in the required state
-
- """
- if msg is None:
- msg = ("can't use instance from outside %s states" %
- utils.CommaJoin(req_states))
- if instance.admin_state not in req_states:
- raise errors.OpPrereqError("Instance '%s' is marked to be %s, %s" %
- (instance.name, instance.admin_state, msg),
- errors.ECODE_STATE)
-
- if constants.ADMINST_UP not in req_states:
- pnode = instance.primary_node
- if not lu.cfg.GetNodeInfo(pnode).offline:
- ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
- ins_l.Raise("Can't contact node %s for instance information" % pnode,
- prereq=True, ecode=errors.ECODE_ENVIRON)
- if instance.name in ins_l.payload:
- raise errors.OpPrereqError("Instance %s is running, %s" %
- (instance.name, msg), errors.ECODE_STATE)
- else:
- lu.LogWarning("Primary node offline, ignoring check that instance"
- " is down")
-
-
def _ComputeIPolicyInstanceSpecViolation(
ipolicy, instance_spec, disk_template,
_compute_fn=_ComputeIPolicySpecViolation):
return _BuildInstanceHookEnv(**args) # pylint: disable=W0142
-def _DecideSelfPromotion(lu, exceptions=None):
- """Decide whether I should promote myself as a master candidate.
-
- """
- cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
- mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
- # the new node will increase mc_max with one, so:
- mc_should = min(mc_should + 1, cp_size)
- return mc_now < mc_should
-
-
def _CheckNicsBridgesExist(lu, target_nics, target_node):
"""Check that the brigdes needed by a list of nics exist.
raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
-def _GetNodeInstancesInner(cfg, fn):
- return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
-
-
-def _GetNodeInstances(cfg, node_name):
- """Returns a list of all primary and secondary instances on a node.
-
- """
-
- return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
-
-
-def _GetNodePrimaryInstances(cfg, node_name):
- """Returns primary instances on a node.
-
- """
- return _GetNodeInstancesInner(cfg,
- lambda inst: node_name == inst.primary_node)
-
-
-def _GetNodeSecondaryInstances(cfg, node_name):
- """Returns secondary instances on a node.
-
- """
- return _GetNodeInstancesInner(cfg,
- lambda inst: node_name in inst.secondary_nodes)
-
-
-def _GetStorageTypeArgs(cfg, storage_type):
- """Returns the arguments for a storage type.
-
- """
- # Special case for file storage
- if storage_type == constants.ST_FILE:
- # storage.FileStorage wants a list of storage directories
- return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
-
- return []
-
-
-def _FindFaultyInstanceDisks(cfg, rpc_runner, instance, node_name, prereq):
- faulty = []
-
- for dev in instance.disks:
- cfg.SetDiskID(dev, node_name)
-
- result = rpc_runner.call_blockdev_getmirrorstatus(node_name, (instance.disks,
- instance))
- result.Raise("Failed to get disk status from node %s" % node_name,
- prereq=prereq, ecode=errors.ECODE_ENVIRON)
-
- for idx, bdev_status in enumerate(result.payload):
- if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
- faulty.append(idx)
-
- return faulty
-
-
-def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
- """Check the sanity of iallocator and node arguments and use the
- cluster-wide iallocator if appropriate.
-
- Check that at most one of (iallocator, node) is specified. If none is
- specified, or the iallocator is L{constants.DEFAULT_IALLOCATOR_SHORTCUT},
- then the LU's opcode's iallocator slot is filled with the cluster-wide
- default iallocator.
-
- @type iallocator_slot: string
- @param iallocator_slot: the name of the opcode iallocator slot
- @type node_slot: string
- @param node_slot: the name of the opcode target node slot
-
- """
- node = getattr(lu.op, node_slot, None)
- ialloc = getattr(lu.op, iallocator_slot, None)
- if node == []:
- node = None
-
- if node is not None and ialloc is not None:
- raise errors.OpPrereqError("Do not specify both, iallocator and node",
- errors.ECODE_INVAL)
- elif ((node is None and ialloc is None) or
- ialloc == constants.DEFAULT_IALLOCATOR_SHORTCUT):
- default_iallocator = lu.cfg.GetDefaultIAllocator()
- if default_iallocator:
- setattr(lu.op, iallocator_slot, default_iallocator)
- else:
- raise errors.OpPrereqError("No iallocator or node given and no"
- " cluster-wide default iallocator found;"
- " please specify either an iallocator or a"
- " node, or set a cluster-wide default"
- " iallocator", errors.ECODE_INVAL)
-
-
def _CheckHostnameSane(lu, name):
"""Ensures that a given hostname resolves to a 'sane' name.
return self.eq.OldStyleQuery(self)
-class LUNodeRemove(LogicalUnit):
- """Logical unit for removing a node.
-
- """
- HPATH = "node-remove"
- HTYPE = constants.HTYPE_NODE
-
- def BuildHooksEnv(self):
- """Build hooks env.
-
- """
- return {
- "OP_TARGET": self.op.node_name,
- "NODE_NAME": self.op.node_name,
- }
-
- def BuildHooksNodes(self):
- """Build hooks nodes.
-
- This doesn't run on the target node in the pre phase as a failed
- node would then be impossible to remove.
-
- """
- all_nodes = self.cfg.GetNodeList()
- try:
- all_nodes.remove(self.op.node_name)
- except ValueError:
- pass
- return (all_nodes, all_nodes)
-
- def CheckPrereq(self):
- """Check prerequisites.
-
- This checks:
- - the node exists in the configuration
- - it does not have primary or secondary instances
- - it's not the master
-
- Any errors are signaled by raising errors.OpPrereqError.
-
- """
- self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
- node = self.cfg.GetNodeInfo(self.op.node_name)
- assert node is not None
-
- masternode = self.cfg.GetMasterNode()
- if node.name == masternode:
- raise errors.OpPrereqError("Node is the master node, failover to another"
- " node is required", errors.ECODE_INVAL)
-
- for instance_name, instance in self.cfg.GetAllInstancesInfo().items():
- if node.name in instance.all_nodes:
- raise errors.OpPrereqError("Instance %s is still running on the node,"
- " please remove first" % instance_name,
- errors.ECODE_INVAL)
- self.op.node_name = node.name
- self.node = node
-
- def Exec(self, feedback_fn):
- """Removes the node from the cluster.
-
- """
- node = self.node
- logging.info("Stopping the node daemon and removing configs from node %s",
- node.name)
-
- modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
-
- assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
- "Not owning BGL"
-
- # Promote nodes to master candidate as needed
- _AdjustCandidatePool(self, exceptions=[node.name])
- self.context.RemoveNode(node.name)
-
- # Run post hooks on the node before it's removed
- _RunPostHook(self, node.name)
-
- result = self.rpc.call_node_leave_cluster(node.name, modify_ssh_setup)
- msg = result.fail_msg
- if msg:
- self.LogWarning("Errors encountered on the remote node while leaving"
- " the cluster: %s", msg)
-
- # Remove node from our /etc/hosts
- if self.cfg.GetClusterInfo().modify_etc_hosts:
- master_node = self.cfg.GetMasterNode()
- result = self.rpc.call_etc_hosts_modify(master_node,
- constants.ETC_HOSTS_REMOVE,
- node.name, None)
- result.Raise("Can't update hosts file with new host data")
- _RedistributeAncillaryFiles(self)
-
-
-class _NodeQuery(_QueryBase):
- FIELDS = query.NODE_FIELDS
-
- def ExpandNames(self, lu):
- lu.needed_locks = {}
- lu.share_locks = _ShareAll()
-
- if self.names:
- self.wanted = _GetWantedNodes(lu, self.names)
- else:
- self.wanted = locking.ALL_SET
-
- self.do_locking = (self.use_locking and
- query.NQ_LIVE in self.requested_data)
-
- if self.do_locking:
- # If any non-static field is requested we need to lock the nodes
- lu.needed_locks[locking.LEVEL_NODE] = self.wanted
- lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
-
- def DeclareLocks(self, lu, level):
- pass
-
- def _GetQueryData(self, lu):
- """Computes the list of nodes and their attributes.
-
- """
- all_info = lu.cfg.GetAllNodesInfo()
-
- nodenames = self._GetNames(lu, all_info.keys(), locking.LEVEL_NODE)
-
- # Gather data as requested
- if query.NQ_LIVE in self.requested_data:
- # filter out non-vm_capable nodes
- toquery_nodes = [name for name in nodenames if all_info[name].vm_capable]
-
- es_flags = rpc.GetExclusiveStorageForNodeNames(lu.cfg, toquery_nodes)
- node_data = lu.rpc.call_node_info(toquery_nodes, [lu.cfg.GetVGName()],
- [lu.cfg.GetHypervisorType()], es_flags)
- live_data = dict((name, rpc.MakeLegacyNodeInfo(nresult.payload))
- for (name, nresult) in node_data.items()
- if not nresult.fail_msg and nresult.payload)
- else:
- live_data = None
-
- if query.NQ_INST in self.requested_data:
- node_to_primary = dict([(name, set()) for name in nodenames])
- node_to_secondary = dict([(name, set()) for name in nodenames])
-
- inst_data = lu.cfg.GetAllInstancesInfo()
-
- for inst in inst_data.values():
- if inst.primary_node in node_to_primary:
- node_to_primary[inst.primary_node].add(inst.name)
- for secnode in inst.secondary_nodes:
- if secnode in node_to_secondary:
- node_to_secondary[secnode].add(inst.name)
- else:
- node_to_primary = None
- node_to_secondary = None
-
- if query.NQ_OOB in self.requested_data:
- oob_support = dict((name, bool(_SupportsOob(lu.cfg, node)))
- for name, node in all_info.iteritems())
- else:
- oob_support = None
-
- if query.NQ_GROUP in self.requested_data:
- groups = lu.cfg.GetAllNodeGroupsInfo()
- else:
- groups = {}
-
- return query.NodeQueryData([all_info[name] for name in nodenames],
- live_data, lu.cfg.GetMasterNode(),
- node_to_primary, node_to_secondary, groups,
- oob_support, lu.cfg.GetClusterInfo())
-
-
-class LUNodeQuery(NoHooksLU):
- """Logical unit for querying nodes.
-
- """
- # pylint: disable=W0142
- REQ_BGL = False
-
- def CheckArguments(self):
- self.nq = _NodeQuery(qlang.MakeSimpleFilter("name", self.op.names),
- self.op.output_fields, self.op.use_locking)
-
- def ExpandNames(self):
- self.nq.ExpandNames(self)
-
- def DeclareLocks(self, level):
- self.nq.DeclareLocks(self, level)
-
- def Exec(self, feedback_fn):
- return self.nq.OldStyleQuery(self)
-
-
-class LUNodeQueryvols(NoHooksLU):
- """Logical unit for getting volumes on node(s).
-
- """
- REQ_BGL = False
- _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
- _FIELDS_STATIC = utils.FieldSet("node")
-
- def CheckArguments(self):
- _CheckOutputFields(static=self._FIELDS_STATIC,
- dynamic=self._FIELDS_DYNAMIC,
- selected=self.op.output_fields)
-
- def ExpandNames(self):
- self.share_locks = _ShareAll()
-
- if self.op.nodes:
- self.needed_locks = {
- locking.LEVEL_NODE: _GetWantedNodes(self, self.op.nodes),
- }
- else:
- self.needed_locks = {
- locking.LEVEL_NODE: locking.ALL_SET,
- locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
- }
-
- def Exec(self, feedback_fn):
- """Computes the list of nodes and their attributes.
-
- """
- nodenames = self.owned_locks(locking.LEVEL_NODE)
- volumes = self.rpc.call_node_volumes(nodenames)
-
- ilist = self.cfg.GetAllInstancesInfo()
- vol2inst = _MapInstanceDisksToNodes(ilist.values())
-
- output = []
- for node in nodenames:
- nresult = volumes[node]
- if nresult.offline:
- continue
- msg = nresult.fail_msg
- if msg:
- self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
- continue
-
- node_vols = sorted(nresult.payload,
- key=operator.itemgetter("dev"))
-
- for vol in node_vols:
- node_output = []
- for field in self.op.output_fields:
- if field == "node":
- val = node
- elif field == "phys":
- val = vol["dev"]
- elif field == "vg":
- val = vol["vg"]
- elif field == "name":
- val = vol["name"]
- elif field == "size":
- val = int(float(vol["size"]))
- elif field == "instance":
- val = vol2inst.get((node, vol["vg"] + "/" + vol["name"]), "-")
- else:
- raise errors.ParameterError(field)
- node_output.append(str(val))
-
- output.append(node_output)
-
- return output
-
-
-class LUNodeQueryStorage(NoHooksLU):
- """Logical unit for getting information on storage units on node(s).
-
- """
- _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
- REQ_BGL = False
-
- def CheckArguments(self):
- _CheckOutputFields(static=self._FIELDS_STATIC,
- dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
- selected=self.op.output_fields)
-
- def ExpandNames(self):
- self.share_locks = _ShareAll()
-
- if self.op.nodes:
- self.needed_locks = {
- locking.LEVEL_NODE: _GetWantedNodes(self, self.op.nodes),
- }
- else:
- self.needed_locks = {
- locking.LEVEL_NODE: locking.ALL_SET,
- locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
- }
-
- def Exec(self, feedback_fn):
- """Computes the list of nodes and their attributes.
-
- """
- self.nodes = self.owned_locks(locking.LEVEL_NODE)
-
- # Always get name to sort by
- if constants.SF_NAME in self.op.output_fields:
- fields = self.op.output_fields[:]
- else:
- fields = [constants.SF_NAME] + self.op.output_fields
-
- # Never ask for node or type as it's only known to the LU
- for extra in [constants.SF_NODE, constants.SF_TYPE]:
- while extra in fields:
- fields.remove(extra)
-
- field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
- name_idx = field_idx[constants.SF_NAME]
-
- st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
- data = self.rpc.call_storage_list(self.nodes,
- self.op.storage_type, st_args,
- self.op.name, fields)
-
- result = []
-
- for node in utils.NiceSort(self.nodes):
- nresult = data[node]
- if nresult.offline:
- continue
-
- msg = nresult.fail_msg
- if msg:
- self.LogWarning("Can't get storage data from node %s: %s", node, msg)
- continue
-
- rows = dict([(row[name_idx], row) for row in nresult.payload])
-
- for name in utils.NiceSort(rows.keys()):
- row = rows[name]
-
- out = []
-
- for field in self.op.output_fields:
- if field == constants.SF_NODE:
- val = node
- elif field == constants.SF_TYPE:
- val = self.op.storage_type
- elif field in field_idx:
- val = row[field_idx[field]]
- else:
- raise errors.ParameterError(field)
-
- out.append(val)
-
- result.append(out)
-
- return result
-
-
class _InstanceQuery(_QueryBase):
FIELDS = query.INSTANCE_FIELDS
owned_groups = frozenset(lu.owned_locks(locking.LEVEL_NODEGROUP))
# Check if node groups for locked instances are still correct
- for instance_name in owned_instances:
- _CheckInstanceNodeGroups(lu.cfg, instance_name, owned_groups)
-
- def _GetQueryData(self, lu):
- """Computes the list of instances and their attributes.
-
- """
- if self.do_grouplocks:
- self._CheckGroupLocks(lu)
-
- cluster = lu.cfg.GetClusterInfo()
- all_info = lu.cfg.GetAllInstancesInfo()
-
- instance_names = self._GetNames(lu, all_info.keys(), locking.LEVEL_INSTANCE)
-
- instance_list = [all_info[name] for name in instance_names]
- nodes = frozenset(itertools.chain(*(inst.all_nodes
- for inst in instance_list)))
- hv_list = list(set([inst.hypervisor for inst in instance_list]))
- bad_nodes = []
- offline_nodes = []
- wrongnode_inst = set()
-
- # Gather data as requested
- if self.requested_data & set([query.IQ_LIVE, query.IQ_CONSOLE]):
- live_data = {}
- node_data = lu.rpc.call_all_instances_info(nodes, hv_list)
- for name in nodes:
- result = node_data[name]
- if result.offline:
- # offline nodes will be in both lists
- assert result.fail_msg
- offline_nodes.append(name)
- if result.fail_msg:
- bad_nodes.append(name)
- elif result.payload:
- for inst in result.payload:
- if inst in all_info:
- if all_info[inst].primary_node == name:
- live_data.update(result.payload)
- else:
- wrongnode_inst.add(inst)
- else:
- # orphan instance; we don't list it here as we don't
- # handle this case yet in the output of instance listing
- logging.warning("Orphan instance '%s' found on node %s",
- inst, name)
- # else no instance is alive
- else:
- live_data = {}
-
- if query.IQ_DISKUSAGE in self.requested_data:
- gmi = ganeti.masterd.instance
- disk_usage = dict((inst.name,
- gmi.ComputeDiskSize(inst.disk_template,
- [{constants.IDISK_SIZE: disk.size}
- for disk in inst.disks]))
- for inst in instance_list)
- else:
- disk_usage = None
-
- if query.IQ_CONSOLE in self.requested_data:
- consinfo = {}
- for inst in instance_list:
- if inst.name in live_data:
- # Instance is running
- consinfo[inst.name] = _GetInstanceConsole(cluster, inst)
- else:
- consinfo[inst.name] = None
- assert set(consinfo.keys()) == set(instance_names)
- else:
- consinfo = None
-
- if query.IQ_NODES in self.requested_data:
- node_names = set(itertools.chain(*map(operator.attrgetter("all_nodes"),
- instance_list)))
- nodes = dict(lu.cfg.GetMultiNodeInfo(node_names))
- groups = dict((uuid, lu.cfg.GetNodeGroup(uuid))
- for uuid in set(map(operator.attrgetter("group"),
- nodes.values())))
- else:
- nodes = None
- groups = None
-
- if query.IQ_NETWORKS in self.requested_data:
- net_uuids = itertools.chain(*(lu.cfg.GetInstanceNetworks(i.name)
- for i in instance_list))
- networks = dict((uuid, lu.cfg.GetNetwork(uuid)) for uuid in net_uuids)
- else:
- networks = None
-
- return query.InstanceQueryData(instance_list, lu.cfg.GetClusterInfo(),
- disk_usage, offline_nodes, bad_nodes,
- live_data, wrongnode_inst, consinfo,
- nodes, groups, networks)
-
-
-class LUQuery(NoHooksLU):
- """Query for resources/items of a certain kind.
-
- """
- # pylint: disable=W0142
- REQ_BGL = False
-
- def CheckArguments(self):
- qcls = _GetQueryImplementation(self.op.what)
-
- self.impl = qcls(self.op.qfilter, self.op.fields, self.op.use_locking)
-
- def ExpandNames(self):
- self.impl.ExpandNames(self)
-
- def DeclareLocks(self, level):
- self.impl.DeclareLocks(self, level)
-
- def Exec(self, feedback_fn):
- return self.impl.NewStyleQuery(self)
-
-
-class LUQueryFields(NoHooksLU):
- """Query for resources/items of a certain kind.
-
- """
- # pylint: disable=W0142
- REQ_BGL = False
-
- def CheckArguments(self):
- self.qcls = _GetQueryImplementation(self.op.what)
-
- def ExpandNames(self):
- self.needed_locks = {}
-
- def Exec(self, feedback_fn):
- return query.QueryFields(self.qcls.FIELDS, self.op.fields)
-
-
-class LUNodeModifyStorage(NoHooksLU):
- """Logical unit for modifying a storage volume on a node.
-
- """
- REQ_BGL = False
-
- def CheckArguments(self):
- self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
-
- storage_type = self.op.storage_type
-
- try:
- modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
- except KeyError:
- raise errors.OpPrereqError("Storage units of type '%s' can not be"
- " modified" % storage_type,
- errors.ECODE_INVAL)
-
- diff = set(self.op.changes.keys()) - modifiable
- if diff:
- raise errors.OpPrereqError("The following fields can not be modified for"
- " storage units of type '%s': %r" %
- (storage_type, list(diff)),
- errors.ECODE_INVAL)
-
- def ExpandNames(self):
- self.needed_locks = {
- locking.LEVEL_NODE: self.op.node_name,
- }
-
- def Exec(self, feedback_fn):
- """Computes the list of nodes and their attributes.
-
- """
- st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
- result = self.rpc.call_storage_modify(self.op.node_name,
- self.op.storage_type, st_args,
- self.op.name, self.op.changes)
- result.Raise("Failed to modify storage unit '%s' on %s" %
- (self.op.name, self.op.node_name))
-
-
-class LUNodeAdd(LogicalUnit):
- """Logical unit for adding node to the cluster.
-
- """
- HPATH = "node-add"
- HTYPE = constants.HTYPE_NODE
- _NFLAGS = ["master_capable", "vm_capable"]
-
- def CheckArguments(self):
- self.primary_ip_family = self.cfg.GetPrimaryIPFamily()
- # validate/normalize the node name
- self.hostname = netutils.GetHostname(name=self.op.node_name,
- family=self.primary_ip_family)
- self.op.node_name = self.hostname.name
-
- if self.op.readd and self.op.node_name == self.cfg.GetMasterNode():
- raise errors.OpPrereqError("Cannot readd the master node",
- errors.ECODE_STATE)
-
- if self.op.readd and self.op.group:
- raise errors.OpPrereqError("Cannot pass a node group when a node is"
- " being readded", errors.ECODE_INVAL)
-
- def BuildHooksEnv(self):
- """Build hooks env.
-
- This will run on all nodes before, and on all nodes + the new node after.
-
- """
- return {
- "OP_TARGET": self.op.node_name,
- "NODE_NAME": self.op.node_name,
- "NODE_PIP": self.op.primary_ip,
- "NODE_SIP": self.op.secondary_ip,
- "MASTER_CAPABLE": str(self.op.master_capable),
- "VM_CAPABLE": str(self.op.vm_capable),
- }
-
- def BuildHooksNodes(self):
- """Build hooks nodes.
-
- """
- # Exclude added node
- pre_nodes = list(set(self.cfg.GetNodeList()) - set([self.op.node_name]))
- post_nodes = pre_nodes + [self.op.node_name, ]
-
- return (pre_nodes, post_nodes)
-
- def CheckPrereq(self):
- """Check prerequisites.
-
- This checks:
- - the new node is not already in the config
- - it is resolvable
- - its parameters (single/dual homed) matches the cluster
-
- Any errors are signaled by raising errors.OpPrereqError.
-
- """
- cfg = self.cfg
- hostname = self.hostname
- node = hostname.name
- primary_ip = self.op.primary_ip = hostname.ip
- if self.op.secondary_ip is None:
- if self.primary_ip_family == netutils.IP6Address.family:
- raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
- " IPv4 address must be given as secondary",
- errors.ECODE_INVAL)
- self.op.secondary_ip = primary_ip
-
- secondary_ip = self.op.secondary_ip
- if not netutils.IP4Address.IsValid(secondary_ip):
- raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
- " address" % secondary_ip, errors.ECODE_INVAL)
-
- node_list = cfg.GetNodeList()
- if not self.op.readd and node in node_list:
- raise errors.OpPrereqError("Node %s is already in the configuration" %
- node, errors.ECODE_EXISTS)
- elif self.op.readd and node not in node_list:
- raise errors.OpPrereqError("Node %s is not in the configuration" % node,
- errors.ECODE_NOENT)
-
- self.changed_primary_ip = False
-
- for existing_node_name, existing_node in cfg.GetMultiNodeInfo(node_list):
- if self.op.readd and node == existing_node_name:
- if existing_node.secondary_ip != secondary_ip:
- raise errors.OpPrereqError("Readded node doesn't have the same IP"
- " address configuration as before",
- errors.ECODE_INVAL)
- if existing_node.primary_ip != primary_ip:
- self.changed_primary_ip = True
-
- continue
-
- if (existing_node.primary_ip == primary_ip or
- existing_node.secondary_ip == primary_ip or
- existing_node.primary_ip == secondary_ip or
- existing_node.secondary_ip == secondary_ip):
- raise errors.OpPrereqError("New node ip address(es) conflict with"
- " existing node %s" % existing_node.name,
- errors.ECODE_NOTUNIQUE)
-
- # After this 'if' block, None is no longer a valid value for the
- # _capable op attributes
- if self.op.readd:
- old_node = self.cfg.GetNodeInfo(node)
- assert old_node is not None, "Can't retrieve locked node %s" % node
- for attr in self._NFLAGS:
- if getattr(self.op, attr) is None:
- setattr(self.op, attr, getattr(old_node, attr))
- else:
- for attr in self._NFLAGS:
- if getattr(self.op, attr) is None:
- setattr(self.op, attr, True)
-
- if self.op.readd and not self.op.vm_capable:
- pri, sec = cfg.GetNodeInstances(node)
- if pri or sec:
- raise errors.OpPrereqError("Node %s being re-added with vm_capable"
- " flag set to false, but it already holds"
- " instances" % node,
- errors.ECODE_STATE)
-
- # check that the type of the node (single versus dual homed) is the
- # same as for the master
- myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
- master_singlehomed = myself.secondary_ip == myself.primary_ip
- newbie_singlehomed = secondary_ip == primary_ip
- if master_singlehomed != newbie_singlehomed:
- if master_singlehomed:
- raise errors.OpPrereqError("The master has no secondary ip but the"
- " new node has one",
- errors.ECODE_INVAL)
- else:
- raise errors.OpPrereqError("The master has a secondary ip but the"
- " new node doesn't have one",
- errors.ECODE_INVAL)
-
- # checks reachability
- if not netutils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
- raise errors.OpPrereqError("Node not reachable by ping",
- errors.ECODE_ENVIRON)
-
- if not newbie_singlehomed:
- # check reachability from my secondary ip to newbie's secondary ip
- if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
- source=myself.secondary_ip):
- raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
- " based ping to node daemon port",
- errors.ECODE_ENVIRON)
-
- if self.op.readd:
- exceptions = [node]
- else:
- exceptions = []
-
- if self.op.master_capable:
- self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
- else:
- self.master_candidate = False
-
- if self.op.readd:
- self.new_node = old_node
- else:
- node_group = cfg.LookupNodeGroup(self.op.group)
- self.new_node = objects.Node(name=node,
- primary_ip=primary_ip,
- secondary_ip=secondary_ip,
- master_candidate=self.master_candidate,
- offline=False, drained=False,
- group=node_group, ndparams={})
-
- if self.op.ndparams:
- utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
- _CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
- "node", "cluster or group")
-
- if self.op.hv_state:
- self.new_hv_state = _MergeAndVerifyHvState(self.op.hv_state, None)
-
- if self.op.disk_state:
- self.new_disk_state = _MergeAndVerifyDiskState(self.op.disk_state, None)
-
- # TODO: If we need to have multiple DnsOnlyRunner we probably should make
- # it a property on the base class.
- rpcrunner = rpc.DnsOnlyRunner()
- result = rpcrunner.call_version([node])[node]
- result.Raise("Can't get version information from node %s" % node)
- if constants.PROTOCOL_VERSION == result.payload:
- logging.info("Communication to node %s fine, sw version %s match",
- node, result.payload)
- else:
- raise errors.OpPrereqError("Version mismatch master version %s,"
- " node version %s" %
- (constants.PROTOCOL_VERSION, result.payload),
- errors.ECODE_ENVIRON)
-
- vg_name = cfg.GetVGName()
- if vg_name is not None:
- vparams = {constants.NV_PVLIST: [vg_name]}
- excl_stor = _IsExclusiveStorageEnabledNode(cfg, self.new_node)
- cname = self.cfg.GetClusterName()
- result = rpcrunner.call_node_verify_light([node], vparams, cname)[node]
- (errmsgs, _) = _CheckNodePVs(result.payload, excl_stor)
- if errmsgs:
- raise errors.OpPrereqError("Checks on node PVs failed: %s" %
- "; ".join(errmsgs), errors.ECODE_ENVIRON)
-
- def Exec(self, feedback_fn):
- """Adds the new node to the cluster.
-
- """
- new_node = self.new_node
- node = new_node.name
-
- assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
- "Not owning BGL"
-
- # We adding a new node so we assume it's powered
- new_node.powered = True
-
- # for re-adds, reset the offline/drained/master-candidate flags;
- # we need to reset here, otherwise offline would prevent RPC calls
- # later in the procedure; this also means that if the re-add
- # fails, we are left with a non-offlined, broken node
- if self.op.readd:
- new_node.drained = new_node.offline = False # pylint: disable=W0201
- self.LogInfo("Readding a node, the offline/drained flags were reset")
- # if we demote the node, we do cleanup later in the procedure
- new_node.master_candidate = self.master_candidate
- if self.changed_primary_ip:
- new_node.primary_ip = self.op.primary_ip
-
- # copy the master/vm_capable flags
- for attr in self._NFLAGS:
- setattr(new_node, attr, getattr(self.op, attr))
-
- # notify the user about any possible mc promotion
- if new_node.master_candidate:
- self.LogInfo("Node will be a master candidate")
-
- if self.op.ndparams:
- new_node.ndparams = self.op.ndparams
- else:
- new_node.ndparams = {}
-
- if self.op.hv_state:
- new_node.hv_state_static = self.new_hv_state
-
- if self.op.disk_state:
- new_node.disk_state_static = self.new_disk_state
-
- # Add node to our /etc/hosts, and add key to known_hosts
- if self.cfg.GetClusterInfo().modify_etc_hosts:
- master_node = self.cfg.GetMasterNode()
- result = self.rpc.call_etc_hosts_modify(master_node,
- constants.ETC_HOSTS_ADD,
- self.hostname.name,
- self.hostname.ip)
- result.Raise("Can't update hosts file with new host data")
-
- if new_node.secondary_ip != new_node.primary_ip:
- _CheckNodeHasSecondaryIP(self, new_node.name, new_node.secondary_ip,
- False)
-
- node_verify_list = [self.cfg.GetMasterNode()]
- node_verify_param = {
- constants.NV_NODELIST: ([node], {}),
- # TODO: do a node-net-test as well?
- }
-
- result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
- self.cfg.GetClusterName())
- for verifier in node_verify_list:
- result[verifier].Raise("Cannot communicate with node %s" % verifier)
- nl_payload = result[verifier].payload[constants.NV_NODELIST]
- if nl_payload:
- for failed in nl_payload:
- feedback_fn("ssh/hostname verification failed"
- " (checking from %s): %s" %
- (verifier, nl_payload[failed]))
- raise errors.OpExecError("ssh/hostname verification failed")
-
- if self.op.readd:
- _RedistributeAncillaryFiles(self)
- self.context.ReaddNode(new_node)
- # make sure we redistribute the config
- self.cfg.Update(new_node, feedback_fn)
- # and make sure the new node will not have old files around
- if not new_node.master_candidate:
- result = self.rpc.call_node_demote_from_mc(new_node.name)
- msg = result.fail_msg
- if msg:
- self.LogWarning("Node failed to demote itself from master"
- " candidate status: %s" % msg)
- else:
- _RedistributeAncillaryFiles(self, additional_nodes=[node],
- additional_vm=self.op.vm_capable)
- self.context.AddNode(new_node, self.proc.GetECId())
-
-
-class LUNodeSetParams(LogicalUnit):
- """Modifies the parameters of a node.
-
- @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
- to the node role (as _ROLE_*)
- @cvar _R2F: a dictionary from node role to tuples of flags
- @cvar _FLAGS: a list of attribute names corresponding to the flags
-
- """
- HPATH = "node-modify"
- HTYPE = constants.HTYPE_NODE
- REQ_BGL = False
- (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
- _F2R = {
- (True, False, False): _ROLE_CANDIDATE,
- (False, True, False): _ROLE_DRAINED,
- (False, False, True): _ROLE_OFFLINE,
- (False, False, False): _ROLE_REGULAR,
- }
- _R2F = dict((v, k) for k, v in _F2R.items())
- _FLAGS = ["master_candidate", "drained", "offline"]
-
- def CheckArguments(self):
- self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
- all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
- self.op.master_capable, self.op.vm_capable,
- self.op.secondary_ip, self.op.ndparams, self.op.hv_state,
- self.op.disk_state]
- if all_mods.count(None) == len(all_mods):
- raise errors.OpPrereqError("Please pass at least one modification",
- errors.ECODE_INVAL)
- if all_mods.count(True) > 1:
- raise errors.OpPrereqError("Can't set the node into more than one"
- " state at the same time",
- errors.ECODE_INVAL)
-
- # Boolean value that tells us whether we might be demoting from MC
- self.might_demote = (self.op.master_candidate is False or
- self.op.offline is True or
- self.op.drained is True or
- self.op.master_capable is False)
-
- if self.op.secondary_ip:
- if not netutils.IP4Address.IsValid(self.op.secondary_ip):
- raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
- " address" % self.op.secondary_ip,
- errors.ECODE_INVAL)
-
- self.lock_all = self.op.auto_promote and self.might_demote
- self.lock_instances = self.op.secondary_ip is not None
-
- def _InstanceFilter(self, instance):
- """Filter for getting affected instances.
-
- """
- return (instance.disk_template in constants.DTS_INT_MIRROR and
- self.op.node_name in instance.all_nodes)
-
- def ExpandNames(self):
- if self.lock_all:
- self.needed_locks = {
- locking.LEVEL_NODE: locking.ALL_SET,
-
- # Block allocations when all nodes are locked
- locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
- }
- else:
- self.needed_locks = {
- locking.LEVEL_NODE: self.op.node_name,
- }
-
- # Since modifying a node can have severe effects on currently running
- # operations the resource lock is at least acquired in shared mode
- self.needed_locks[locking.LEVEL_NODE_RES] = \
- self.needed_locks[locking.LEVEL_NODE]
-
- # Get all locks except nodes in shared mode; they are not used for anything
- # but read-only access
- self.share_locks = _ShareAll()
- self.share_locks[locking.LEVEL_NODE] = 0
- self.share_locks[locking.LEVEL_NODE_RES] = 0
- self.share_locks[locking.LEVEL_NODE_ALLOC] = 0
-
- if self.lock_instances:
- self.needed_locks[locking.LEVEL_INSTANCE] = \
- frozenset(self.cfg.GetInstancesInfoByFilter(self._InstanceFilter))
-
- def BuildHooksEnv(self):
- """Build hooks env.
-
- This runs on the master node.
-
- """
- return {
- "OP_TARGET": self.op.node_name,
- "MASTER_CANDIDATE": str(self.op.master_candidate),
- "OFFLINE": str(self.op.offline),
- "DRAINED": str(self.op.drained),
- "MASTER_CAPABLE": str(self.op.master_capable),
- "VM_CAPABLE": str(self.op.vm_capable),
- }
-
- def BuildHooksNodes(self):
- """Build hooks nodes.
-
- """
- nl = [self.cfg.GetMasterNode(), self.op.node_name]
- return (nl, nl)
-
- def CheckPrereq(self):
- """Check prerequisites.
+ for instance_name in owned_instances:
+ _CheckInstanceNodeGroups(lu.cfg, instance_name, owned_groups)
- This only checks the instance list against the existing names.
+ def _GetQueryData(self, lu):
+ """Computes the list of instances and their attributes.
"""
- node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
+ if self.do_grouplocks:
+ self._CheckGroupLocks(lu)
- if self.lock_instances:
- affected_instances = \
- self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)
+ cluster = lu.cfg.GetClusterInfo()
+ all_info = lu.cfg.GetAllInstancesInfo()
- # Verify instance locks
- owned_instances = self.owned_locks(locking.LEVEL_INSTANCE)
- wanted_instances = frozenset(affected_instances.keys())
- if wanted_instances - owned_instances:
- raise errors.OpPrereqError("Instances affected by changing node %s's"
- " secondary IP address have changed since"
- " locks were acquired, wanted '%s', have"
- " '%s'; retry the operation" %
- (self.op.node_name,
- utils.CommaJoin(wanted_instances),
- utils.CommaJoin(owned_instances)),
- errors.ECODE_STATE)
- else:
- affected_instances = None
-
- if (self.op.master_candidate is not None or
- self.op.drained is not None or
- self.op.offline is not None):
- # we can't change the master's node flags
- if self.op.node_name == self.cfg.GetMasterNode():
- raise errors.OpPrereqError("The master role can be changed"
- " only via master-failover",
- errors.ECODE_INVAL)
+ instance_names = self._GetNames(lu, all_info.keys(), locking.LEVEL_INSTANCE)
- if self.op.master_candidate and not node.master_capable:
- raise errors.OpPrereqError("Node %s is not master capable, cannot make"
- " it a master candidate" % node.name,
- errors.ECODE_STATE)
+ instance_list = [all_info[name] for name in instance_names]
+ nodes = frozenset(itertools.chain(*(inst.all_nodes
+ for inst in instance_list)))
+ hv_list = list(set([inst.hypervisor for inst in instance_list]))
+ bad_nodes = []
+ offline_nodes = []
+ wrongnode_inst = set()
- if self.op.vm_capable is False:
- (ipri, isec) = self.cfg.GetNodeInstances(self.op.node_name)
- if ipri or isec:
- raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
- " the vm_capable flag" % node.name,
- errors.ECODE_STATE)
+ # Gather data as requested
+ if self.requested_data & set([query.IQ_LIVE, query.IQ_CONSOLE]):
+ live_data = {}
+ node_data = lu.rpc.call_all_instances_info(nodes, hv_list)
+ for name in nodes:
+ result = node_data[name]
+ if result.offline:
+ # offline nodes will be in both lists
+ assert result.fail_msg
+ offline_nodes.append(name)
+ if result.fail_msg:
+ bad_nodes.append(name)
+ elif result.payload:
+ for inst in result.payload:
+ if inst in all_info:
+ if all_info[inst].primary_node == name:
+ live_data.update(result.payload)
+ else:
+ wrongnode_inst.add(inst)
+ else:
+ # orphan instance; we don't list it here as we don't
+ # handle this case yet in the output of instance listing
+ logging.warning("Orphan instance '%s' found on node %s",
+ inst, name)
+ # else no instance is alive
+ else:
+ live_data = {}
- if node.master_candidate and self.might_demote and not self.lock_all:
- assert not self.op.auto_promote, "auto_promote set but lock_all not"
- # check if after removing the current node, we're missing master
- # candidates
- (mc_remaining, mc_should, _) = \
- self.cfg.GetMasterCandidateStats(exceptions=[node.name])
- if mc_remaining < mc_should:
- raise errors.OpPrereqError("Not enough master candidates, please"
- " pass auto promote option to allow"
- " promotion (--auto-promote or RAPI"
- " auto_promote=True)", errors.ECODE_STATE)
-
- self.old_flags = old_flags = (node.master_candidate,
- node.drained, node.offline)
- assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
- self.old_role = old_role = self._F2R[old_flags]
-
- # Check for ineffective changes
- for attr in self._FLAGS:
- if (getattr(self.op, attr) is False and getattr(node, attr) is False):
- self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
- setattr(self.op, attr, None)
-
- # Past this point, any flag change to False means a transition
- # away from the respective state, as only real changes are kept
-
- # TODO: We might query the real power state if it supports OOB
- if _SupportsOob(self.cfg, node):
- if self.op.offline is False and not (node.powered or
- self.op.powered is True):
- raise errors.OpPrereqError(("Node %s needs to be turned on before its"
- " offline status can be reset") %
- self.op.node_name, errors.ECODE_STATE)
- elif self.op.powered is not None:
- raise errors.OpPrereqError(("Unable to change powered state for node %s"
- " as it does not support out-of-band"
- " handling") % self.op.node_name,
- errors.ECODE_STATE)
+ if query.IQ_DISKUSAGE in self.requested_data:
+ gmi = ganeti.masterd.instance
+ disk_usage = dict((inst.name,
+ gmi.ComputeDiskSize(inst.disk_template,
+ [{constants.IDISK_SIZE: disk.size}
+ for disk in inst.disks]))
+ for inst in instance_list)
+ else:
+ disk_usage = None
- # If we're being deofflined/drained, we'll MC ourself if needed
- if (self.op.drained is False or self.op.offline is False or
- (self.op.master_capable and not node.master_capable)):
- if _DecideSelfPromotion(self):
- self.op.master_candidate = True
- self.LogInfo("Auto-promoting node to master candidate")
-
- # If we're no longer master capable, we'll demote ourselves from MC
- if self.op.master_capable is False and node.master_candidate:
- self.LogInfo("Demoting from master candidate")
- self.op.master_candidate = False
-
- # Compute new role
- assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1
- if self.op.master_candidate:
- new_role = self._ROLE_CANDIDATE
- elif self.op.drained:
- new_role = self._ROLE_DRAINED
- elif self.op.offline:
- new_role = self._ROLE_OFFLINE
- elif False in [self.op.master_candidate, self.op.drained, self.op.offline]:
- # False is still in new flags, which means we're un-setting (the
- # only) True flag
- new_role = self._ROLE_REGULAR
- else: # no new flags, nothing, keep old role
- new_role = old_role
-
- self.new_role = new_role
-
- if old_role == self._ROLE_OFFLINE and new_role != old_role:
- # Trying to transition out of offline status
- result = self.rpc.call_version([node.name])[node.name]
- if result.fail_msg:
- raise errors.OpPrereqError("Node %s is being de-offlined but fails"
- " to report its version: %s" %
- (node.name, result.fail_msg),
- errors.ECODE_STATE)
- else:
- self.LogWarning("Transitioning node from offline to online state"
- " without using re-add. Please make sure the node"
- " is healthy!")
-
- # When changing the secondary ip, verify if this is a single-homed to
- # multi-homed transition or vice versa, and apply the relevant
- # restrictions.
- if self.op.secondary_ip:
- # Ok even without locking, because this can't be changed by any LU
- master = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
- master_singlehomed = master.secondary_ip == master.primary_ip
- if master_singlehomed and self.op.secondary_ip != node.primary_ip:
- if self.op.force and node.name == master.name:
- self.LogWarning("Transitioning from single-homed to multi-homed"
- " cluster; all nodes will require a secondary IP"
- " address")
- else:
- raise errors.OpPrereqError("Changing the secondary ip on a"
- " single-homed cluster requires the"
- " --force option to be passed, and the"
- " target node to be the master",
- errors.ECODE_INVAL)
- elif not master_singlehomed and self.op.secondary_ip == node.primary_ip:
- if self.op.force and node.name == master.name:
- self.LogWarning("Transitioning from multi-homed to single-homed"
- " cluster; secondary IP addresses will have to be"
- " removed")
+ if query.IQ_CONSOLE in self.requested_data:
+ consinfo = {}
+ for inst in instance_list:
+ if inst.name in live_data:
+ # Instance is running
+ consinfo[inst.name] = _GetInstanceConsole(cluster, inst)
else:
- raise errors.OpPrereqError("Cannot set the secondary IP to be the"
- " same as the primary IP on a multi-homed"
- " cluster, unless the --force option is"
- " passed, and the target node is the"
- " master", errors.ECODE_INVAL)
-
- assert not (frozenset(affected_instances) -
- self.owned_locks(locking.LEVEL_INSTANCE))
-
- if node.offline:
- if affected_instances:
- msg = ("Cannot change secondary IP address: offline node has"
- " instances (%s) configured to use it" %
- utils.CommaJoin(affected_instances.keys()))
- raise errors.OpPrereqError(msg, errors.ECODE_STATE)
- else:
- # On online nodes, check that no instances are running, and that
- # the node has the new ip and we can reach it.
- for instance in affected_instances.values():
- _CheckInstanceState(self, instance, INSTANCE_DOWN,
- msg="cannot change secondary ip")
-
- _CheckNodeHasSecondaryIP(self, node.name, self.op.secondary_ip, True)
- if master.name != node.name:
- # check reachability from master secondary ip to new secondary ip
- if not netutils.TcpPing(self.op.secondary_ip,
- constants.DEFAULT_NODED_PORT,
- source=master.secondary_ip):
- raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
- " based ping to node daemon port",
- errors.ECODE_ENVIRON)
-
- if self.op.ndparams:
- new_ndparams = _GetUpdatedParams(self.node.ndparams, self.op.ndparams)
- utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
- _CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
- "node", "cluster or group")
- self.new_ndparams = new_ndparams
-
- if self.op.hv_state:
- self.new_hv_state = _MergeAndVerifyHvState(self.op.hv_state,
- self.node.hv_state_static)
-
- if self.op.disk_state:
- self.new_disk_state = \
- _MergeAndVerifyDiskState(self.op.disk_state,
- self.node.disk_state_static)
-
- def Exec(self, feedback_fn):
- """Modifies a node.
-
- """
- node = self.node
- old_role = self.old_role
- new_role = self.new_role
-
- result = []
-
- if self.op.ndparams:
- node.ndparams = self.new_ndparams
+ consinfo[inst.name] = None
+ assert set(consinfo.keys()) == set(instance_names)
+ else:
+ consinfo = None
- if self.op.powered is not None:
- node.powered = self.op.powered
+ if query.IQ_NODES in self.requested_data:
+ node_names = set(itertools.chain(*map(operator.attrgetter("all_nodes"),
+ instance_list)))
+ nodes = dict(lu.cfg.GetMultiNodeInfo(node_names))
+ groups = dict((uuid, lu.cfg.GetNodeGroup(uuid))
+ for uuid in set(map(operator.attrgetter("group"),
+ nodes.values())))
+ else:
+ nodes = None
+ groups = None
- if self.op.hv_state:
- node.hv_state_static = self.new_hv_state
+ if query.IQ_NETWORKS in self.requested_data:
+ net_uuids = itertools.chain(*(lu.cfg.GetInstanceNetworks(i.name)
+ for i in instance_list))
+ networks = dict((uuid, lu.cfg.GetNetwork(uuid)) for uuid in net_uuids)
+ else:
+ networks = None
- if self.op.disk_state:
- node.disk_state_static = self.new_disk_state
+ return query.InstanceQueryData(instance_list, lu.cfg.GetClusterInfo(),
+ disk_usage, offline_nodes, bad_nodes,
+ live_data, wrongnode_inst, consinfo,
+ nodes, groups, networks)
- for attr in ["master_capable", "vm_capable"]:
- val = getattr(self.op, attr)
- if val is not None:
- setattr(node, attr, val)
- result.append((attr, str(val)))
- if new_role != old_role:
- # Tell the node to demote itself, if no longer MC and not offline
- if old_role == self._ROLE_CANDIDATE and new_role != self._ROLE_OFFLINE:
- msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg
- if msg:
- self.LogWarning("Node failed to demote itself: %s", msg)
+class LUQuery(NoHooksLU):
+ """Query for resources/items of a certain kind.
- new_flags = self._R2F[new_role]
- for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS):
- if of != nf:
- result.append((desc, str(nf)))
- (node.master_candidate, node.drained, node.offline) = new_flags
+ """
+ # pylint: disable=W0142
+ REQ_BGL = False
- # we locked all nodes, we adjust the CP before updating this node
- if self.lock_all:
- _AdjustCandidatePool(self, [node.name])
+ def CheckArguments(self):
+ qcls = _GetQueryImplementation(self.op.what)
- if self.op.secondary_ip:
- node.secondary_ip = self.op.secondary_ip
- result.append(("secondary_ip", self.op.secondary_ip))
+ self.impl = qcls(self.op.qfilter, self.op.fields, self.op.use_locking)
- # this will trigger configuration file update, if needed
- self.cfg.Update(node, feedback_fn)
+ def ExpandNames(self):
+ self.impl.ExpandNames(self)
- # this will trigger job queue propagation or cleanup if the mc
- # flag changed
- if [old_role, new_role].count(self._ROLE_CANDIDATE) == 1:
- self.context.ReaddNode(node)
+ def DeclareLocks(self, level):
+ self.impl.DeclareLocks(self, level)
- return result
+ def Exec(self, feedback_fn):
+ return self.impl.NewStyleQuery(self)
-class LUNodePowercycle(NoHooksLU):
- """Powercycles a node.
+class LUQueryFields(NoHooksLU):
+ """Query for resources/items of a certain kind.
"""
+ # pylint: disable=W0142
REQ_BGL = False
def CheckArguments(self):
- self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
- if self.op.node_name == self.cfg.GetMasterNode() and not self.op.force:
- raise errors.OpPrereqError("The node is the master and the force"
- " parameter was not set",
- errors.ECODE_INVAL)
+ self.qcls = _GetQueryImplementation(self.op.what)
def ExpandNames(self):
- """Locking for PowercycleNode.
-
- This is a last-resort option and shouldn't block on other
- jobs. Therefore, we grab no locks.
-
- """
self.needed_locks = {}
def Exec(self, feedback_fn):
- """Reboots a node.
-
- """
- result = self.rpc.call_node_powercycle(self.op.node_name,
- self.cfg.GetHypervisorType())
- result.Raise("Failed to schedule the reboot")
- return result.payload
+ return query.QueryFields(self.qcls.FIELDS, self.op.fields)
class LUInstanceActivateDisks(NoHooksLU):
(instance.name, target_node, msg))
-class LUNodeMigrate(LogicalUnit):
- """Migrate all instances from a node.
-
- """
- HPATH = "node-migrate"
- HTYPE = constants.HTYPE_NODE
- REQ_BGL = False
-
- def CheckArguments(self):
- pass
-
- def ExpandNames(self):
- self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
-
- self.share_locks = _ShareAll()
- self.needed_locks = {
- locking.LEVEL_NODE: [self.op.node_name],
- }
-
- def BuildHooksEnv(self):
- """Build hooks env.
-
- This runs on the master, the primary and all the secondaries.
-
- """
- return {
- "NODE_NAME": self.op.node_name,
- "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
- }
-
- def BuildHooksNodes(self):
- """Build hooks nodes.
-
- """
- nl = [self.cfg.GetMasterNode()]
- return (nl, nl)
-
- def CheckPrereq(self):
- pass
-
- def Exec(self, feedback_fn):
- # Prepare jobs for migration instances
- allow_runtime_changes = self.op.allow_runtime_changes
- jobs = [
- [opcodes.OpInstanceMigrate(instance_name=inst.name,
- mode=self.op.mode,
- live=self.op.live,
- iallocator=self.op.iallocator,
- target_node=self.op.target_node,
- allow_runtime_changes=allow_runtime_changes,
- ignore_ipolicy=self.op.ignore_ipolicy)]
- for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name)]
-
- # TODO: Run iallocator in this opcode and pass correct placement options to
- # OpInstanceMigrate. Since other jobs can modify the cluster between
- # running the iallocator and the actual migration, a good consistency model
- # will have to be found.
-
- assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
- frozenset([self.op.node_name]))
-
- return ResultWithJobs(jobs)
-
-
class TLMigrateInstance(Tasklet):
"""Tasklet class for instance migration.
self._RemoveOldStorage(self.target_node, iv_names)
-class LURepairNodeStorage(NoHooksLU):
- """Repairs the volume group on a node.
-
- """
- REQ_BGL = False
-
- def CheckArguments(self):
- self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
-
- storage_type = self.op.storage_type
-
- if (constants.SO_FIX_CONSISTENCY not in
- constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
- raise errors.OpPrereqError("Storage units of type '%s' can not be"
- " repaired" % storage_type,
- errors.ECODE_INVAL)
-
- def ExpandNames(self):
- self.needed_locks = {
- locking.LEVEL_NODE: [self.op.node_name],
- }
-
- def _CheckFaultyDisks(self, instance, node_name):
- """Ensure faulty disks abort the opcode or at least warn."""
- try:
- if _FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
- node_name, True):
- raise errors.OpPrereqError("Instance '%s' has faulty disks on"
- " node '%s'" % (instance.name, node_name),
- errors.ECODE_STATE)
- except errors.OpPrereqError, err:
- if self.op.ignore_consistency:
- self.LogWarning(str(err.args[0]))
- else:
- raise
-
- def CheckPrereq(self):
- """Check prerequisites.
-
- """
- # Check whether any instance on this node has faulty disks
- for inst in _GetNodeInstances(self.cfg, self.op.node_name):
- if inst.admin_state != constants.ADMINST_UP:
- continue
- check_nodes = set(inst.all_nodes)
- check_nodes.discard(self.op.node_name)
- for inst_node_name in check_nodes:
- self._CheckFaultyDisks(inst, inst_node_name)
-
- def Exec(self, feedback_fn):
- feedback_fn("Repairing storage unit '%s' on %s ..." %
- (self.op.name, self.op.node_name))
-
- st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
- result = self.rpc.call_storage_execute(self.op.node_name,
- self.op.storage_type, st_args,
- self.op.name,
- constants.SO_FIX_CONSISTENCY)
- result.Raise("Failed to repair storage unit '%s' on %s" %
- (self.op.name, self.op.node_name))
-
-
-class LUNodeEvacuate(NoHooksLU):
- """Evacuates instances off a list of nodes.
-
- """
- REQ_BGL = False
-
- _MODE2IALLOCATOR = {
- constants.NODE_EVAC_PRI: constants.IALLOCATOR_NEVAC_PRI,
- constants.NODE_EVAC_SEC: constants.IALLOCATOR_NEVAC_SEC,
- constants.NODE_EVAC_ALL: constants.IALLOCATOR_NEVAC_ALL,
- }
- assert frozenset(_MODE2IALLOCATOR.keys()) == constants.NODE_EVAC_MODES
- assert (frozenset(_MODE2IALLOCATOR.values()) ==
- constants.IALLOCATOR_NEVAC_MODES)
-
- def CheckArguments(self):
- _CheckIAllocatorOrNode(self, "iallocator", "remote_node")
-
- def ExpandNames(self):
- self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
-
- if self.op.remote_node is not None:
- self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node)
- assert self.op.remote_node
-
- if self.op.remote_node == self.op.node_name:
- raise errors.OpPrereqError("Can not use evacuated node as a new"
- " secondary node", errors.ECODE_INVAL)
-
- if self.op.mode != constants.NODE_EVAC_SEC:
- raise errors.OpPrereqError("Without the use of an iallocator only"
- " secondary instances can be evacuated",
- errors.ECODE_INVAL)
-
- # Declare locks
- self.share_locks = _ShareAll()
- self.needed_locks = {
- locking.LEVEL_INSTANCE: [],
- locking.LEVEL_NODEGROUP: [],
- locking.LEVEL_NODE: [],
- }
-
- # Determine nodes (via group) optimistically, needs verification once locks
- # have been acquired
- self.lock_nodes = self._DetermineNodes()
-
- def _DetermineNodes(self):
- """Gets the list of nodes to operate on.
-
- """
- if self.op.remote_node is None:
- # Iallocator will choose any node(s) in the same group
- group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_name])
- else:
- group_nodes = frozenset([self.op.remote_node])
-
- # Determine nodes to be locked
- return set([self.op.node_name]) | group_nodes
-
- def _DetermineInstances(self):
- """Builds list of instances to operate on.
-
- """
- assert self.op.mode in constants.NODE_EVAC_MODES
-
- if self.op.mode == constants.NODE_EVAC_PRI:
- # Primary instances only
- inst_fn = _GetNodePrimaryInstances
- assert self.op.remote_node is None, \
- "Evacuating primary instances requires iallocator"
- elif self.op.mode == constants.NODE_EVAC_SEC:
- # Secondary instances only
- inst_fn = _GetNodeSecondaryInstances
- else:
- # All instances
- assert self.op.mode == constants.NODE_EVAC_ALL
- inst_fn = _GetNodeInstances
- # TODO: In 2.6, change the iallocator interface to take an evacuation mode
- # per instance
- raise errors.OpPrereqError("Due to an issue with the iallocator"
- " interface it is not possible to evacuate"
- " all instances at once; specify explicitly"
- " whether to evacuate primary or secondary"
- " instances",
- errors.ECODE_INVAL)
-
- return inst_fn(self.cfg, self.op.node_name)
-
- def DeclareLocks(self, level):
- if level == locking.LEVEL_INSTANCE:
- # Lock instances optimistically, needs verification once node and group
- # locks have been acquired
- self.needed_locks[locking.LEVEL_INSTANCE] = \
- set(i.name for i in self._DetermineInstances())
-
- elif level == locking.LEVEL_NODEGROUP:
- # Lock node groups for all potential target nodes optimistically, needs
- # verification once nodes have been acquired
- self.needed_locks[locking.LEVEL_NODEGROUP] = \
- self.cfg.GetNodeGroupsFromNodes(self.lock_nodes)
-
- elif level == locking.LEVEL_NODE:
- self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes
-
- def CheckPrereq(self):
- # Verify locks
- owned_instances = self.owned_locks(locking.LEVEL_INSTANCE)
- owned_nodes = self.owned_locks(locking.LEVEL_NODE)
- owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
-
- need_nodes = self._DetermineNodes()
-
- if not owned_nodes.issuperset(need_nodes):
- raise errors.OpPrereqError("Nodes in same group as '%s' changed since"
- " locks were acquired, current nodes are"
- " are '%s', used to be '%s'; retry the"
- " operation" %
- (self.op.node_name,
- utils.CommaJoin(need_nodes),
- utils.CommaJoin(owned_nodes)),
- errors.ECODE_STATE)
-
- wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes)
- if owned_groups != wanted_groups:
- raise errors.OpExecError("Node groups changed since locks were acquired,"
- " current groups are '%s', used to be '%s';"
- " retry the operation" %
- (utils.CommaJoin(wanted_groups),
- utils.CommaJoin(owned_groups)))
-
- # Determine affected instances
- self.instances = self._DetermineInstances()
- self.instance_names = [i.name for i in self.instances]
-
- if set(self.instance_names) != owned_instances:
- raise errors.OpExecError("Instances on node '%s' changed since locks"
- " were acquired, current instances are '%s',"
- " used to be '%s'; retry the operation" %
- (self.op.node_name,
- utils.CommaJoin(self.instance_names),
- utils.CommaJoin(owned_instances)))
-
- if self.instance_names:
- self.LogInfo("Evacuating instances from node '%s': %s",
- self.op.node_name,
- utils.CommaJoin(utils.NiceSort(self.instance_names)))
- else:
- self.LogInfo("No instances to evacuate from node '%s'",
- self.op.node_name)
-
- if self.op.remote_node is not None:
- for i in self.instances:
- if i.primary_node == self.op.remote_node:
- raise errors.OpPrereqError("Node %s is the primary node of"
- " instance %s, cannot use it as"
- " secondary" %
- (self.op.remote_node, i.name),
- errors.ECODE_INVAL)
-
- def Exec(self, feedback_fn):
- assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None)
-
- if not self.instance_names:
- # No instances to evacuate
- jobs = []
-
- elif self.op.iallocator is not None:
- # TODO: Implement relocation to other group
- evac_mode = self._MODE2IALLOCATOR[self.op.mode]
- req = iallocator.IAReqNodeEvac(evac_mode=evac_mode,
- instances=list(self.instance_names))
- ial = iallocator.IAllocator(self.cfg, self.rpc, req)
-
- ial.Run(self.op.iallocator)
-
- if not ial.success:
- raise errors.OpPrereqError("Can't compute node evacuation using"
- " iallocator '%s': %s" %
- (self.op.iallocator, ial.info),
- errors.ECODE_NORES)
-
- jobs = _LoadNodeEvacResult(self, ial.result, self.op.early_release, True)
-
- elif self.op.remote_node is not None:
- assert self.op.mode == constants.NODE_EVAC_SEC
- jobs = [
- [opcodes.OpInstanceReplaceDisks(instance_name=instance_name,
- remote_node=self.op.remote_node,
- disks=[],
- mode=constants.REPLACE_DISK_CHG,
- early_release=self.op.early_release)]
- for instance_name in self.instance_names]
-
- else:
- raise errors.ProgrammerError("No iallocator or remote node")
-
- return ResultWithJobs(jobs)
-
-
def _DiskSizeInBytesToMebibytes(lu, size):
"""Converts a disk size in bytes to mebibytes.
--- /dev/null
+#
+#
+
+# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+# 02110-1301, USA.
+
+
+"""Logical units dealing with nodes."""
+
+import logging
+import operator
+
+from ganeti import constants
+from ganeti import errors
+from ganeti import locking
+from ganeti import netutils
+from ganeti import objects
+from ganeti import opcodes
+from ganeti import qlang
+from ganeti import query
+from ganeti import rpc
+from ganeti import utils
+from ganeti.masterd import iallocator
+
+from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, _QueryBase, \
+ ResultWithJobs
+from ganeti.cmdlib.common import _CheckParamsNotGlobal, \
+ _MergeAndVerifyHvState, _MergeAndVerifyDiskState, \
+ _IsExclusiveStorageEnabledNode, _CheckNodePVs, \
+ _RedistributeAncillaryFiles, _ExpandNodeName, _ShareAll, _SupportsOob, \
+ _CheckInstanceState, INSTANCE_DOWN, _GetUpdatedParams, \
+ _AdjustCandidatePool, _CheckIAllocatorOrNode, _LoadNodeEvacResult, \
+ _GetWantedNodes, _MapInstanceDisksToNodes, _RunPostHook, \
+ _FindFaultyInstanceDisks
+
+
+def _DecideSelfPromotion(lu, exceptions=None):
+ """Decide whether I should promote myself as a master candidate.
+
+ """
+ cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
+ mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
+ # the new node will increase mc_max with one, so:
+ mc_should = min(mc_should + 1, cp_size)
+ return mc_now < mc_should
+
+
+def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
+ """Ensure that a node has the given secondary ip.
+
+ @type lu: L{LogicalUnit}
+ @param lu: the LU on behalf of which we make the check
+ @type node: string
+ @param node: the node to check
+ @type secondary_ip: string
+ @param secondary_ip: the ip to check
+ @type prereq: boolean
+ @param prereq: whether to throw a prerequisite or an execute error
+ @raise errors.OpPrereqError: if the node doesn't have the ip, and prereq=True
+ @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
+
+ """
+ result = lu.rpc.call_node_has_ip_address(node, secondary_ip)
+ result.Raise("Failure checking secondary ip on node %s" % node,
+ prereq=prereq, ecode=errors.ECODE_ENVIRON)
+ if not result.payload:
+ msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
+ " please fix and re-run this command" % secondary_ip)
+ if prereq:
+ raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
+ else:
+ raise errors.OpExecError(msg)
+
+
+class LUNodeAdd(LogicalUnit):
+ """Logical unit for adding node to the cluster.
+
+ """
+ HPATH = "node-add"
+ HTYPE = constants.HTYPE_NODE
+ _NFLAGS = ["master_capable", "vm_capable"]
+
+ def CheckArguments(self):
+ self.primary_ip_family = self.cfg.GetPrimaryIPFamily()
+ # validate/normalize the node name
+ self.hostname = netutils.GetHostname(name=self.op.node_name,
+ family=self.primary_ip_family)
+ self.op.node_name = self.hostname.name
+
+ if self.op.readd and self.op.node_name == self.cfg.GetMasterNode():
+ raise errors.OpPrereqError("Cannot readd the master node",
+ errors.ECODE_STATE)
+
+ if self.op.readd and self.op.group:
+ raise errors.OpPrereqError("Cannot pass a node group when a node is"
+ " being readded", errors.ECODE_INVAL)
+
+ def BuildHooksEnv(self):
+ """Build hooks env.
+
+ This will run on all nodes before, and on all nodes + the new node after.
+
+ """
+ return {
+ "OP_TARGET": self.op.node_name,
+ "NODE_NAME": self.op.node_name,
+ "NODE_PIP": self.op.primary_ip,
+ "NODE_SIP": self.op.secondary_ip,
+ "MASTER_CAPABLE": str(self.op.master_capable),
+ "VM_CAPABLE": str(self.op.vm_capable),
+ }
+
+ def BuildHooksNodes(self):
+ """Build hooks nodes.
+
+ """
+ # Exclude added node
+ pre_nodes = list(set(self.cfg.GetNodeList()) - set([self.op.node_name]))
+ post_nodes = pre_nodes + [self.op.node_name, ]
+
+ return (pre_nodes, post_nodes)
+
+ def CheckPrereq(self):
+ """Check prerequisites.
+
+ This checks:
+ - the new node is not already in the config
+ - it is resolvable
+ - its parameters (single/dual homed) matches the cluster
+
+ Any errors are signaled by raising errors.OpPrereqError.
+
+ """
+ cfg = self.cfg
+ hostname = self.hostname
+ node = hostname.name
+ primary_ip = self.op.primary_ip = hostname.ip
+ if self.op.secondary_ip is None:
+ if self.primary_ip_family == netutils.IP6Address.family:
+ raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
+ " IPv4 address must be given as secondary",
+ errors.ECODE_INVAL)
+ self.op.secondary_ip = primary_ip
+
+ secondary_ip = self.op.secondary_ip
+ if not netutils.IP4Address.IsValid(secondary_ip):
+ raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
+ " address" % secondary_ip, errors.ECODE_INVAL)
+
+ node_list = cfg.GetNodeList()
+ if not self.op.readd and node in node_list:
+ raise errors.OpPrereqError("Node %s is already in the configuration" %
+ node, errors.ECODE_EXISTS)
+ elif self.op.readd and node not in node_list:
+ raise errors.OpPrereqError("Node %s is not in the configuration" % node,
+ errors.ECODE_NOENT)
+
+ self.changed_primary_ip = False
+
+ for existing_node_name, existing_node in cfg.GetMultiNodeInfo(node_list):
+ if self.op.readd and node == existing_node_name:
+ if existing_node.secondary_ip != secondary_ip:
+ raise errors.OpPrereqError("Readded node doesn't have the same IP"
+ " address configuration as before",
+ errors.ECODE_INVAL)
+ if existing_node.primary_ip != primary_ip:
+ self.changed_primary_ip = True
+
+ continue
+
+ if (existing_node.primary_ip == primary_ip or
+ existing_node.secondary_ip == primary_ip or
+ existing_node.primary_ip == secondary_ip or
+ existing_node.secondary_ip == secondary_ip):
+ raise errors.OpPrereqError("New node ip address(es) conflict with"
+ " existing node %s" % existing_node.name,
+ errors.ECODE_NOTUNIQUE)
+
+ # After this 'if' block, None is no longer a valid value for the
+ # _capable op attributes
+ if self.op.readd:
+ old_node = self.cfg.GetNodeInfo(node)
+ assert old_node is not None, "Can't retrieve locked node %s" % node
+ for attr in self._NFLAGS:
+ if getattr(self.op, attr) is None:
+ setattr(self.op, attr, getattr(old_node, attr))
+ else:
+ for attr in self._NFLAGS:
+ if getattr(self.op, attr) is None:
+ setattr(self.op, attr, True)
+
+ if self.op.readd and not self.op.vm_capable:
+ pri, sec = cfg.GetNodeInstances(node)
+ if pri or sec:
+ raise errors.OpPrereqError("Node %s being re-added with vm_capable"
+ " flag set to false, but it already holds"
+ " instances" % node,
+ errors.ECODE_STATE)
+
+ # check that the type of the node (single versus dual homed) is the
+ # same as for the master
+ myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
+ master_singlehomed = myself.secondary_ip == myself.primary_ip
+ newbie_singlehomed = secondary_ip == primary_ip
+ if master_singlehomed != newbie_singlehomed:
+ if master_singlehomed:
+ raise errors.OpPrereqError("The master has no secondary ip but the"
+ " new node has one",
+ errors.ECODE_INVAL)
+ else:
+ raise errors.OpPrereqError("The master has a secondary ip but the"
+ " new node doesn't have one",
+ errors.ECODE_INVAL)
+
+ # checks reachability
+ if not netutils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
+ raise errors.OpPrereqError("Node not reachable by ping",
+ errors.ECODE_ENVIRON)
+
+ if not newbie_singlehomed:
+ # check reachability from my secondary ip to newbie's secondary ip
+ if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
+ source=myself.secondary_ip):
+ raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
+ " based ping to node daemon port",
+ errors.ECODE_ENVIRON)
+
+ if self.op.readd:
+ exceptions = [node]
+ else:
+ exceptions = []
+
+ if self.op.master_capable:
+ self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
+ else:
+ self.master_candidate = False
+
+ if self.op.readd:
+ self.new_node = old_node
+ else:
+ node_group = cfg.LookupNodeGroup(self.op.group)
+ self.new_node = objects.Node(name=node,
+ primary_ip=primary_ip,
+ secondary_ip=secondary_ip,
+ master_candidate=self.master_candidate,
+ offline=False, drained=False,
+ group=node_group, ndparams={})
+
+ if self.op.ndparams:
+ utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
+ _CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
+ "node", "cluster or group")
+
+ if self.op.hv_state:
+ self.new_hv_state = _MergeAndVerifyHvState(self.op.hv_state, None)
+
+ if self.op.disk_state:
+ self.new_disk_state = _MergeAndVerifyDiskState(self.op.disk_state, None)
+
+ # TODO: If we need to have multiple DnsOnlyRunner we probably should make
+ # it a property on the base class.
+ rpcrunner = rpc.DnsOnlyRunner()
+ result = rpcrunner.call_version([node])[node]
+ result.Raise("Can't get version information from node %s" % node)
+ if constants.PROTOCOL_VERSION == result.payload:
+ logging.info("Communication to node %s fine, sw version %s match",
+ node, result.payload)
+ else:
+ raise errors.OpPrereqError("Version mismatch master version %s,"
+ " node version %s" %
+ (constants.PROTOCOL_VERSION, result.payload),
+ errors.ECODE_ENVIRON)
+
+ vg_name = cfg.GetVGName()
+ if vg_name is not None:
+ vparams = {constants.NV_PVLIST: [vg_name]}
+ excl_stor = _IsExclusiveStorageEnabledNode(cfg, self.new_node)
+ cname = self.cfg.GetClusterName()
+ result = rpcrunner.call_node_verify_light([node], vparams, cname)[node]
+ (errmsgs, _) = _CheckNodePVs(result.payload, excl_stor)
+ if errmsgs:
+ raise errors.OpPrereqError("Checks on node PVs failed: %s" %
+ "; ".join(errmsgs), errors.ECODE_ENVIRON)
+
+ def Exec(self, feedback_fn):
+ """Adds the new node to the cluster.
+
+ """
+ new_node = self.new_node
+ node = new_node.name
+
+ assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
+ "Not owning BGL"
+
+ # We adding a new node so we assume it's powered
+ new_node.powered = True
+
+ # for re-adds, reset the offline/drained/master-candidate flags;
+ # we need to reset here, otherwise offline would prevent RPC calls
+ # later in the procedure; this also means that if the re-add
+ # fails, we are left with a non-offlined, broken node
+ if self.op.readd:
+ new_node.drained = new_node.offline = False # pylint: disable=W0201
+ self.LogInfo("Readding a node, the offline/drained flags were reset")
+ # if we demote the node, we do cleanup later in the procedure
+ new_node.master_candidate = self.master_candidate
+ if self.changed_primary_ip:
+ new_node.primary_ip = self.op.primary_ip
+
+ # copy the master/vm_capable flags
+ for attr in self._NFLAGS:
+ setattr(new_node, attr, getattr(self.op, attr))
+
+ # notify the user about any possible mc promotion
+ if new_node.master_candidate:
+ self.LogInfo("Node will be a master candidate")
+
+ if self.op.ndparams:
+ new_node.ndparams = self.op.ndparams
+ else:
+ new_node.ndparams = {}
+
+ if self.op.hv_state:
+ new_node.hv_state_static = self.new_hv_state
+
+ if self.op.disk_state:
+ new_node.disk_state_static = self.new_disk_state
+
+ # Add node to our /etc/hosts, and add key to known_hosts
+ if self.cfg.GetClusterInfo().modify_etc_hosts:
+ master_node = self.cfg.GetMasterNode()
+ result = self.rpc.call_etc_hosts_modify(master_node,
+ constants.ETC_HOSTS_ADD,
+ self.hostname.name,
+ self.hostname.ip)
+ result.Raise("Can't update hosts file with new host data")
+
+ if new_node.secondary_ip != new_node.primary_ip:
+ _CheckNodeHasSecondaryIP(self, new_node.name, new_node.secondary_ip,
+ False)
+
+ node_verify_list = [self.cfg.GetMasterNode()]
+ node_verify_param = {
+ constants.NV_NODELIST: ([node], {}),
+ # TODO: do a node-net-test as well?
+ }
+
+ result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
+ self.cfg.GetClusterName())
+ for verifier in node_verify_list:
+ result[verifier].Raise("Cannot communicate with node %s" % verifier)
+ nl_payload = result[verifier].payload[constants.NV_NODELIST]
+ if nl_payload:
+ for failed in nl_payload:
+ feedback_fn("ssh/hostname verification failed"
+ " (checking from %s): %s" %
+ (verifier, nl_payload[failed]))
+ raise errors.OpExecError("ssh/hostname verification failed")
+
+ if self.op.readd:
+ _RedistributeAncillaryFiles(self)
+ self.context.ReaddNode(new_node)
+ # make sure we redistribute the config
+ self.cfg.Update(new_node, feedback_fn)
+ # and make sure the new node will not have old files around
+ if not new_node.master_candidate:
+ result = self.rpc.call_node_demote_from_mc(new_node.name)
+ msg = result.fail_msg
+ if msg:
+ self.LogWarning("Node failed to demote itself from master"
+ " candidate status: %s" % msg)
+ else:
+ _RedistributeAncillaryFiles(self, additional_nodes=[node],
+ additional_vm=self.op.vm_capable)
+ self.context.AddNode(new_node, self.proc.GetECId())
+
+
+class LUNodeSetParams(LogicalUnit):
+ """Modifies the parameters of a node.
+
+ @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
+ to the node role (as _ROLE_*)
+ @cvar _R2F: a dictionary from node role to tuples of flags
+ @cvar _FLAGS: a list of attribute names corresponding to the flags
+
+ """
+ HPATH = "node-modify"
+ HTYPE = constants.HTYPE_NODE
+ REQ_BGL = False
+ (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
+ _F2R = {
+ (True, False, False): _ROLE_CANDIDATE,
+ (False, True, False): _ROLE_DRAINED,
+ (False, False, True): _ROLE_OFFLINE,
+ (False, False, False): _ROLE_REGULAR,
+ }
+ _R2F = dict((v, k) for k, v in _F2R.items())
+ _FLAGS = ["master_candidate", "drained", "offline"]
+
+ def CheckArguments(self):
+ self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
+ all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
+ self.op.master_capable, self.op.vm_capable,
+ self.op.secondary_ip, self.op.ndparams, self.op.hv_state,
+ self.op.disk_state]
+ if all_mods.count(None) == len(all_mods):
+ raise errors.OpPrereqError("Please pass at least one modification",
+ errors.ECODE_INVAL)
+ if all_mods.count(True) > 1:
+ raise errors.OpPrereqError("Can't set the node into more than one"
+ " state at the same time",
+ errors.ECODE_INVAL)
+
+ # Boolean value that tells us whether we might be demoting from MC
+ self.might_demote = (self.op.master_candidate is False or
+ self.op.offline is True or
+ self.op.drained is True or
+ self.op.master_capable is False)
+
+ if self.op.secondary_ip:
+ if not netutils.IP4Address.IsValid(self.op.secondary_ip):
+ raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
+ " address" % self.op.secondary_ip,
+ errors.ECODE_INVAL)
+
+ self.lock_all = self.op.auto_promote and self.might_demote
+ self.lock_instances = self.op.secondary_ip is not None
+
+ def _InstanceFilter(self, instance):
+ """Filter for getting affected instances.
+
+ """
+ return (instance.disk_template in constants.DTS_INT_MIRROR and
+ self.op.node_name in instance.all_nodes)
+
+ def ExpandNames(self):
+ if self.lock_all:
+ self.needed_locks = {
+ locking.LEVEL_NODE: locking.ALL_SET,
+
+ # Block allocations when all nodes are locked
+ locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
+ }
+ else:
+ self.needed_locks = {
+ locking.LEVEL_NODE: self.op.node_name,
+ }
+
+ # Since modifying a node can have severe effects on currently running
+ # operations the resource lock is at least acquired in shared mode
+ self.needed_locks[locking.LEVEL_NODE_RES] = \
+ self.needed_locks[locking.LEVEL_NODE]
+
+ # Get all locks except nodes in shared mode; they are not used for anything
+ # but read-only access
+ self.share_locks = _ShareAll()
+ self.share_locks[locking.LEVEL_NODE] = 0
+ self.share_locks[locking.LEVEL_NODE_RES] = 0
+ self.share_locks[locking.LEVEL_NODE_ALLOC] = 0
+
+ if self.lock_instances:
+ self.needed_locks[locking.LEVEL_INSTANCE] = \
+ frozenset(self.cfg.GetInstancesInfoByFilter(self._InstanceFilter))
+
+ def BuildHooksEnv(self):
+ """Build hooks env.
+
+ This runs on the master node.
+
+ """
+ return {
+ "OP_TARGET": self.op.node_name,
+ "MASTER_CANDIDATE": str(self.op.master_candidate),
+ "OFFLINE": str(self.op.offline),
+ "DRAINED": str(self.op.drained),
+ "MASTER_CAPABLE": str(self.op.master_capable),
+ "VM_CAPABLE": str(self.op.vm_capable),
+ }
+
+ def BuildHooksNodes(self):
+ """Build hooks nodes.
+
+ """
+ nl = [self.cfg.GetMasterNode(), self.op.node_name]
+ return (nl, nl)
+
+ def CheckPrereq(self):
+ """Check prerequisites.
+
+ This only checks the instance list against the existing names.
+
+ """
+ node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
+
+ if self.lock_instances:
+ affected_instances = \
+ self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)
+
+ # Verify instance locks
+ owned_instances = self.owned_locks(locking.LEVEL_INSTANCE)
+ wanted_instances = frozenset(affected_instances.keys())
+ if wanted_instances - owned_instances:
+ raise errors.OpPrereqError("Instances affected by changing node %s's"
+ " secondary IP address have changed since"
+ " locks were acquired, wanted '%s', have"
+ " '%s'; retry the operation" %
+ (self.op.node_name,
+ utils.CommaJoin(wanted_instances),
+ utils.CommaJoin(owned_instances)),
+ errors.ECODE_STATE)
+ else:
+ affected_instances = None
+
+ if (self.op.master_candidate is not None or
+ self.op.drained is not None or
+ self.op.offline is not None):
+ # we can't change the master's node flags
+ if self.op.node_name == self.cfg.GetMasterNode():
+ raise errors.OpPrereqError("The master role can be changed"
+ " only via master-failover",
+ errors.ECODE_INVAL)
+
+ if self.op.master_candidate and not node.master_capable:
+ raise errors.OpPrereqError("Node %s is not master capable, cannot make"
+ " it a master candidate" % node.name,
+ errors.ECODE_STATE)
+
+ if self.op.vm_capable is False:
+ (ipri, isec) = self.cfg.GetNodeInstances(self.op.node_name)
+ if ipri or isec:
+ raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
+ " the vm_capable flag" % node.name,
+ errors.ECODE_STATE)
+
+ if node.master_candidate and self.might_demote and not self.lock_all:
+ assert not self.op.auto_promote, "auto_promote set but lock_all not"
+ # check if after removing the current node, we're missing master
+ # candidates
+ (mc_remaining, mc_should, _) = \
+ self.cfg.GetMasterCandidateStats(exceptions=[node.name])
+ if mc_remaining < mc_should:
+ raise errors.OpPrereqError("Not enough master candidates, please"
+ " pass auto promote option to allow"
+ " promotion (--auto-promote or RAPI"
+ " auto_promote=True)", errors.ECODE_STATE)
+
+ self.old_flags = old_flags = (node.master_candidate,
+ node.drained, node.offline)
+ assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
+ self.old_role = old_role = self._F2R[old_flags]
+
+ # Check for ineffective changes
+ for attr in self._FLAGS:
+ if (getattr(self.op, attr) is False and getattr(node, attr) is False):
+ self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
+ setattr(self.op, attr, None)
+
+ # Past this point, any flag change to False means a transition
+ # away from the respective state, as only real changes are kept
+
+ # TODO: We might query the real power state if it supports OOB
+ if _SupportsOob(self.cfg, node):
+ if self.op.offline is False and not (node.powered or
+ self.op.powered is True):
+ raise errors.OpPrereqError(("Node %s needs to be turned on before its"
+ " offline status can be reset") %
+ self.op.node_name, errors.ECODE_STATE)
+ elif self.op.powered is not None:
+ raise errors.OpPrereqError(("Unable to change powered state for node %s"
+ " as it does not support out-of-band"
+ " handling") % self.op.node_name,
+ errors.ECODE_STATE)
+
+ # If we're being deofflined/drained, we'll MC ourself if needed
+ if (self.op.drained is False or self.op.offline is False or
+ (self.op.master_capable and not node.master_capable)):
+ if _DecideSelfPromotion(self):
+ self.op.master_candidate = True
+ self.LogInfo("Auto-promoting node to master candidate")
+
+ # If we're no longer master capable, we'll demote ourselves from MC
+ if self.op.master_capable is False and node.master_candidate:
+ self.LogInfo("Demoting from master candidate")
+ self.op.master_candidate = False
+
+ # Compute new role
+ assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1
+ if self.op.master_candidate:
+ new_role = self._ROLE_CANDIDATE
+ elif self.op.drained:
+ new_role = self._ROLE_DRAINED
+ elif self.op.offline:
+ new_role = self._ROLE_OFFLINE
+ elif False in [self.op.master_candidate, self.op.drained, self.op.offline]:
+ # False is still in new flags, which means we're un-setting (the
+ # only) True flag
+ new_role = self._ROLE_REGULAR
+ else: # no new flags, nothing, keep old role
+ new_role = old_role
+
+ self.new_role = new_role
+
+ if old_role == self._ROLE_OFFLINE and new_role != old_role:
+ # Trying to transition out of offline status
+ result = self.rpc.call_version([node.name])[node.name]
+ if result.fail_msg:
+ raise errors.OpPrereqError("Node %s is being de-offlined but fails"
+ " to report its version: %s" %
+ (node.name, result.fail_msg),
+ errors.ECODE_STATE)
+ else:
+ self.LogWarning("Transitioning node from offline to online state"
+ " without using re-add. Please make sure the node"
+ " is healthy!")
+
+ # When changing the secondary ip, verify if this is a single-homed to
+ # multi-homed transition or vice versa, and apply the relevant
+ # restrictions.
+ if self.op.secondary_ip:
+ # Ok even without locking, because this can't be changed by any LU
+ master = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
+ master_singlehomed = master.secondary_ip == master.primary_ip
+ if master_singlehomed and self.op.secondary_ip != node.primary_ip:
+ if self.op.force and node.name == master.name:
+ self.LogWarning("Transitioning from single-homed to multi-homed"
+ " cluster; all nodes will require a secondary IP"
+ " address")
+ else:
+ raise errors.OpPrereqError("Changing the secondary ip on a"
+ " single-homed cluster requires the"
+ " --force option to be passed, and the"
+ " target node to be the master",
+ errors.ECODE_INVAL)
+ elif not master_singlehomed and self.op.secondary_ip == node.primary_ip:
+ if self.op.force and node.name == master.name:
+ self.LogWarning("Transitioning from multi-homed to single-homed"
+ " cluster; secondary IP addresses will have to be"
+ " removed")
+ else:
+ raise errors.OpPrereqError("Cannot set the secondary IP to be the"
+ " same as the primary IP on a multi-homed"
+ " cluster, unless the --force option is"
+ " passed, and the target node is the"
+ " master", errors.ECODE_INVAL)
+
+ assert not (frozenset(affected_instances) -
+ self.owned_locks(locking.LEVEL_INSTANCE))
+
+ if node.offline:
+ if affected_instances:
+ msg = ("Cannot change secondary IP address: offline node has"
+ " instances (%s) configured to use it" %
+ utils.CommaJoin(affected_instances.keys()))
+ raise errors.OpPrereqError(msg, errors.ECODE_STATE)
+ else:
+ # On online nodes, check that no instances are running, and that
+ # the node has the new ip and we can reach it.
+ for instance in affected_instances.values():
+ _CheckInstanceState(self, instance, INSTANCE_DOWN,
+ msg="cannot change secondary ip")
+
+ _CheckNodeHasSecondaryIP(self, node.name, self.op.secondary_ip, True)
+ if master.name != node.name:
+ # check reachability from master secondary ip to new secondary ip
+ if not netutils.TcpPing(self.op.secondary_ip,
+ constants.DEFAULT_NODED_PORT,
+ source=master.secondary_ip):
+ raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
+ " based ping to node daemon port",
+ errors.ECODE_ENVIRON)
+
+ if self.op.ndparams:
+ new_ndparams = _GetUpdatedParams(self.node.ndparams, self.op.ndparams)
+ utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
+ _CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
+ "node", "cluster or group")
+ self.new_ndparams = new_ndparams
+
+ if self.op.hv_state:
+ self.new_hv_state = _MergeAndVerifyHvState(self.op.hv_state,
+ self.node.hv_state_static)
+
+ if self.op.disk_state:
+ self.new_disk_state = \
+ _MergeAndVerifyDiskState(self.op.disk_state,
+ self.node.disk_state_static)
+
+ def Exec(self, feedback_fn):
+ """Modifies a node.
+
+ """
+ node = self.node
+ old_role = self.old_role
+ new_role = self.new_role
+
+ result = []
+
+ if self.op.ndparams:
+ node.ndparams = self.new_ndparams
+
+ if self.op.powered is not None:
+ node.powered = self.op.powered
+
+ if self.op.hv_state:
+ node.hv_state_static = self.new_hv_state
+
+ if self.op.disk_state:
+ node.disk_state_static = self.new_disk_state
+
+ for attr in ["master_capable", "vm_capable"]:
+ val = getattr(self.op, attr)
+ if val is not None:
+ setattr(node, attr, val)
+ result.append((attr, str(val)))
+
+ if new_role != old_role:
+ # Tell the node to demote itself, if no longer MC and not offline
+ if old_role == self._ROLE_CANDIDATE and new_role != self._ROLE_OFFLINE:
+ msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg
+ if msg:
+ self.LogWarning("Node failed to demote itself: %s", msg)
+
+ new_flags = self._R2F[new_role]
+ for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS):
+ if of != nf:
+ result.append((desc, str(nf)))
+ (node.master_candidate, node.drained, node.offline) = new_flags
+
+ # we locked all nodes, we adjust the CP before updating this node
+ if self.lock_all:
+ _AdjustCandidatePool(self, [node.name])
+
+ if self.op.secondary_ip:
+ node.secondary_ip = self.op.secondary_ip
+ result.append(("secondary_ip", self.op.secondary_ip))
+
+ # this will trigger configuration file update, if needed
+ self.cfg.Update(node, feedback_fn)
+
+ # this will trigger job queue propagation or cleanup if the mc
+ # flag changed
+ if [old_role, new_role].count(self._ROLE_CANDIDATE) == 1:
+ self.context.ReaddNode(node)
+
+ return result
+
+
+class LUNodePowercycle(NoHooksLU):
+ """Powercycles a node.
+
+ """
+ REQ_BGL = False
+
+ def CheckArguments(self):
+ self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
+ if self.op.node_name == self.cfg.GetMasterNode() and not self.op.force:
+ raise errors.OpPrereqError("The node is the master and the force"
+ " parameter was not set",
+ errors.ECODE_INVAL)
+
+ def ExpandNames(self):
+ """Locking for PowercycleNode.
+
+ This is a last-resort option and shouldn't block on other
+ jobs. Therefore, we grab no locks.
+
+ """
+ self.needed_locks = {}
+
+ def Exec(self, feedback_fn):
+ """Reboots a node.
+
+ """
+ result = self.rpc.call_node_powercycle(self.op.node_name,
+ self.cfg.GetHypervisorType())
+ result.Raise("Failed to schedule the reboot")
+ return result.payload
+
+
+def _GetNodeInstancesInner(cfg, fn):
+ return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
+
+
+def _GetNodePrimaryInstances(cfg, node_name):
+ """Returns primary instances on a node.
+
+ """
+ return _GetNodeInstancesInner(cfg,
+ lambda inst: node_name == inst.primary_node)
+
+
+def _GetNodeSecondaryInstances(cfg, node_name):
+ """Returns secondary instances on a node.
+
+ """
+ return _GetNodeInstancesInner(cfg,
+ lambda inst: node_name in inst.secondary_nodes)
+
+
+def _GetNodeInstances(cfg, node_name):
+ """Returns a list of all primary and secondary instances on a node.
+
+ """
+
+ return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
+
+
+class LUNodeEvacuate(NoHooksLU):
+ """Evacuates instances off a list of nodes.
+
+ """
+ REQ_BGL = False
+
+ _MODE2IALLOCATOR = {
+ constants.NODE_EVAC_PRI: constants.IALLOCATOR_NEVAC_PRI,
+ constants.NODE_EVAC_SEC: constants.IALLOCATOR_NEVAC_SEC,
+ constants.NODE_EVAC_ALL: constants.IALLOCATOR_NEVAC_ALL,
+ }
+ assert frozenset(_MODE2IALLOCATOR.keys()) == constants.NODE_EVAC_MODES
+ assert (frozenset(_MODE2IALLOCATOR.values()) ==
+ constants.IALLOCATOR_NEVAC_MODES)
+
+ def CheckArguments(self):
+ _CheckIAllocatorOrNode(self, "iallocator", "remote_node")
+
+ def ExpandNames(self):
+ self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
+
+ if self.op.remote_node is not None:
+ self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node)
+ assert self.op.remote_node
+
+ if self.op.remote_node == self.op.node_name:
+ raise errors.OpPrereqError("Can not use evacuated node as a new"
+ " secondary node", errors.ECODE_INVAL)
+
+ if self.op.mode != constants.NODE_EVAC_SEC:
+ raise errors.OpPrereqError("Without the use of an iallocator only"
+ " secondary instances can be evacuated",
+ errors.ECODE_INVAL)
+
+ # Declare locks
+ self.share_locks = _ShareAll()
+ self.needed_locks = {
+ locking.LEVEL_INSTANCE: [],
+ locking.LEVEL_NODEGROUP: [],
+ locking.LEVEL_NODE: [],
+ }
+
+ # Determine nodes (via group) optimistically, needs verification once locks
+ # have been acquired
+ self.lock_nodes = self._DetermineNodes()
+
+ def _DetermineNodes(self):
+ """Gets the list of nodes to operate on.
+
+ """
+ if self.op.remote_node is None:
+ # Iallocator will choose any node(s) in the same group
+ group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_name])
+ else:
+ group_nodes = frozenset([self.op.remote_node])
+
+ # Determine nodes to be locked
+ return set([self.op.node_name]) | group_nodes
+
+ def _DetermineInstances(self):
+ """Builds list of instances to operate on.
+
+ """
+ assert self.op.mode in constants.NODE_EVAC_MODES
+
+ if self.op.mode == constants.NODE_EVAC_PRI:
+ # Primary instances only
+ inst_fn = _GetNodePrimaryInstances
+ assert self.op.remote_node is None, \
+ "Evacuating primary instances requires iallocator"
+ elif self.op.mode == constants.NODE_EVAC_SEC:
+ # Secondary instances only
+ inst_fn = _GetNodeSecondaryInstances
+ else:
+ # All instances
+ assert self.op.mode == constants.NODE_EVAC_ALL
+ inst_fn = _GetNodeInstances
+ # TODO: In 2.6, change the iallocator interface to take an evacuation mode
+ # per instance
+ raise errors.OpPrereqError("Due to an issue with the iallocator"
+ " interface it is not possible to evacuate"
+ " all instances at once; specify explicitly"
+ " whether to evacuate primary or secondary"
+ " instances",
+ errors.ECODE_INVAL)
+
+ return inst_fn(self.cfg, self.op.node_name)
+
+ def DeclareLocks(self, level):
+ if level == locking.LEVEL_INSTANCE:
+ # Lock instances optimistically, needs verification once node and group
+ # locks have been acquired
+ self.needed_locks[locking.LEVEL_INSTANCE] = \
+ set(i.name for i in self._DetermineInstances())
+
+ elif level == locking.LEVEL_NODEGROUP:
+ # Lock node groups for all potential target nodes optimistically, needs
+ # verification once nodes have been acquired
+ self.needed_locks[locking.LEVEL_NODEGROUP] = \
+ self.cfg.GetNodeGroupsFromNodes(self.lock_nodes)
+
+ elif level == locking.LEVEL_NODE:
+ self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes
+
+ def CheckPrereq(self):
+ # Verify locks
+ owned_instances = self.owned_locks(locking.LEVEL_INSTANCE)
+ owned_nodes = self.owned_locks(locking.LEVEL_NODE)
+ owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
+
+ need_nodes = self._DetermineNodes()
+
+ if not owned_nodes.issuperset(need_nodes):
+ raise errors.OpPrereqError("Nodes in same group as '%s' changed since"
+ " locks were acquired, current nodes are"
+ " are '%s', used to be '%s'; retry the"
+ " operation" %
+ (self.op.node_name,
+ utils.CommaJoin(need_nodes),
+ utils.CommaJoin(owned_nodes)),
+ errors.ECODE_STATE)
+
+ wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes)
+ if owned_groups != wanted_groups:
+ raise errors.OpExecError("Node groups changed since locks were acquired,"
+ " current groups are '%s', used to be '%s';"
+ " retry the operation" %
+ (utils.CommaJoin(wanted_groups),
+ utils.CommaJoin(owned_groups)))
+
+ # Determine affected instances
+ self.instances = self._DetermineInstances()
+ self.instance_names = [i.name for i in self.instances]
+
+ if set(self.instance_names) != owned_instances:
+ raise errors.OpExecError("Instances on node '%s' changed since locks"
+ " were acquired, current instances are '%s',"
+ " used to be '%s'; retry the operation" %
+ (self.op.node_name,
+ utils.CommaJoin(self.instance_names),
+ utils.CommaJoin(owned_instances)))
+
+ if self.instance_names:
+ self.LogInfo("Evacuating instances from node '%s': %s",
+ self.op.node_name,
+ utils.CommaJoin(utils.NiceSort(self.instance_names)))
+ else:
+ self.LogInfo("No instances to evacuate from node '%s'",
+ self.op.node_name)
+
+ if self.op.remote_node is not None:
+ for i in self.instances:
+ if i.primary_node == self.op.remote_node:
+ raise errors.OpPrereqError("Node %s is the primary node of"
+ " instance %s, cannot use it as"
+ " secondary" %
+ (self.op.remote_node, i.name),
+ errors.ECODE_INVAL)
+
+ def Exec(self, feedback_fn):
+ assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None)
+
+ if not self.instance_names:
+ # No instances to evacuate
+ jobs = []
+
+ elif self.op.iallocator is not None:
+ # TODO: Implement relocation to other group
+ evac_mode = self._MODE2IALLOCATOR[self.op.mode]
+ req = iallocator.IAReqNodeEvac(evac_mode=evac_mode,
+ instances=list(self.instance_names))
+ ial = iallocator.IAllocator(self.cfg, self.rpc, req)
+
+ ial.Run(self.op.iallocator)
+
+ if not ial.success:
+ raise errors.OpPrereqError("Can't compute node evacuation using"
+ " iallocator '%s': %s" %
+ (self.op.iallocator, ial.info),
+ errors.ECODE_NORES)
+
+ jobs = _LoadNodeEvacResult(self, ial.result, self.op.early_release, True)
+
+ elif self.op.remote_node is not None:
+ assert self.op.mode == constants.NODE_EVAC_SEC
+ jobs = [
+ [opcodes.OpInstanceReplaceDisks(instance_name=instance_name,
+ remote_node=self.op.remote_node,
+ disks=[],
+ mode=constants.REPLACE_DISK_CHG,
+ early_release=self.op.early_release)]
+ for instance_name in self.instance_names]
+
+ else:
+ raise errors.ProgrammerError("No iallocator or remote node")
+
+ return ResultWithJobs(jobs)
+
+
+class LUNodeMigrate(LogicalUnit):
+ """Migrate all instances from a node.
+
+ """
+ HPATH = "node-migrate"
+ HTYPE = constants.HTYPE_NODE
+ REQ_BGL = False
+
+ def CheckArguments(self):
+ pass
+
+ def ExpandNames(self):
+ self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
+
+ self.share_locks = _ShareAll()
+ self.needed_locks = {
+ locking.LEVEL_NODE: [self.op.node_name],
+ }
+
+ def BuildHooksEnv(self):
+ """Build hooks env.
+
+ This runs on the master, the primary and all the secondaries.
+
+ """
+ return {
+ "NODE_NAME": self.op.node_name,
+ "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
+ }
+
+ def BuildHooksNodes(self):
+ """Build hooks nodes.
+
+ """
+ nl = [self.cfg.GetMasterNode()]
+ return (nl, nl)
+
+ def CheckPrereq(self):
+ pass
+
+ def Exec(self, feedback_fn):
+ # Prepare jobs for migration instances
+ allow_runtime_changes = self.op.allow_runtime_changes
+ jobs = [
+ [opcodes.OpInstanceMigrate(instance_name=inst.name,
+ mode=self.op.mode,
+ live=self.op.live,
+ iallocator=self.op.iallocator,
+ target_node=self.op.target_node,
+ allow_runtime_changes=allow_runtime_changes,
+ ignore_ipolicy=self.op.ignore_ipolicy)]
+ for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name)]
+
+ # TODO: Run iallocator in this opcode and pass correct placement options to
+ # OpInstanceMigrate. Since other jobs can modify the cluster between
+ # running the iallocator and the actual migration, a good consistency model
+ # will have to be found.
+
+ assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
+ frozenset([self.op.node_name]))
+
+ return ResultWithJobs(jobs)
+
+
+def _GetStorageTypeArgs(cfg, storage_type):
+ """Returns the arguments for a storage type.
+
+ """
+ # Special case for file storage
+ if storage_type == constants.ST_FILE:
+ # storage.FileStorage wants a list of storage directories
+ return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
+
+ return []
+
+
+class LUNodeModifyStorage(NoHooksLU):
+ """Logical unit for modifying a storage volume on a node.
+
+ """
+ REQ_BGL = False
+
+ def CheckArguments(self):
+ self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
+
+ storage_type = self.op.storage_type
+
+ try:
+ modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
+ except KeyError:
+ raise errors.OpPrereqError("Storage units of type '%s' can not be"
+ " modified" % storage_type,
+ errors.ECODE_INVAL)
+
+ diff = set(self.op.changes.keys()) - modifiable
+ if diff:
+ raise errors.OpPrereqError("The following fields can not be modified for"
+ " storage units of type '%s': %r" %
+ (storage_type, list(diff)),
+ errors.ECODE_INVAL)
+
+ def ExpandNames(self):
+ self.needed_locks = {
+ locking.LEVEL_NODE: self.op.node_name,
+ }
+
+ def Exec(self, feedback_fn):
+ """Computes the list of nodes and their attributes.
+
+ """
+ st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
+ result = self.rpc.call_storage_modify(self.op.node_name,
+ self.op.storage_type, st_args,
+ self.op.name, self.op.changes)
+ result.Raise("Failed to modify storage unit '%s' on %s" %
+ (self.op.name, self.op.node_name))
+
+
+class _NodeQuery(_QueryBase):
+ FIELDS = query.NODE_FIELDS
+
+ def ExpandNames(self, lu):
+ lu.needed_locks = {}
+ lu.share_locks = _ShareAll()
+
+ if self.names:
+ self.wanted = _GetWantedNodes(lu, self.names)
+ else:
+ self.wanted = locking.ALL_SET
+
+ self.do_locking = (self.use_locking and
+ query.NQ_LIVE in self.requested_data)
+
+ if self.do_locking:
+ # If any non-static field is requested we need to lock the nodes
+ lu.needed_locks[locking.LEVEL_NODE] = self.wanted
+ lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
+
+ def DeclareLocks(self, lu, level):
+ pass
+
+ def _GetQueryData(self, lu):
+ """Computes the list of nodes and their attributes.
+
+ """
+ all_info = lu.cfg.GetAllNodesInfo()
+
+ nodenames = self._GetNames(lu, all_info.keys(), locking.LEVEL_NODE)
+
+ # Gather data as requested
+ if query.NQ_LIVE in self.requested_data:
+ # filter out non-vm_capable nodes
+ toquery_nodes = [name for name in nodenames if all_info[name].vm_capable]
+
+ es_flags = rpc.GetExclusiveStorageForNodeNames(lu.cfg, toquery_nodes)
+ node_data = lu.rpc.call_node_info(toquery_nodes, [lu.cfg.GetVGName()],
+ [lu.cfg.GetHypervisorType()], es_flags)
+ live_data = dict((name, rpc.MakeLegacyNodeInfo(nresult.payload))
+ for (name, nresult) in node_data.items()
+ if not nresult.fail_msg and nresult.payload)
+ else:
+ live_data = None
+
+ if query.NQ_INST in self.requested_data:
+ node_to_primary = dict([(name, set()) for name in nodenames])
+ node_to_secondary = dict([(name, set()) for name in nodenames])
+
+ inst_data = lu.cfg.GetAllInstancesInfo()
+
+ for inst in inst_data.values():
+ if inst.primary_node in node_to_primary:
+ node_to_primary[inst.primary_node].add(inst.name)
+ for secnode in inst.secondary_nodes:
+ if secnode in node_to_secondary:
+ node_to_secondary[secnode].add(inst.name)
+ else:
+ node_to_primary = None
+ node_to_secondary = None
+
+ if query.NQ_OOB in self.requested_data:
+ oob_support = dict((name, bool(_SupportsOob(lu.cfg, node)))
+ for name, node in all_info.iteritems())
+ else:
+ oob_support = None
+
+ if query.NQ_GROUP in self.requested_data:
+ groups = lu.cfg.GetAllNodeGroupsInfo()
+ else:
+ groups = {}
+
+ return query.NodeQueryData([all_info[name] for name in nodenames],
+ live_data, lu.cfg.GetMasterNode(),
+ node_to_primary, node_to_secondary, groups,
+ oob_support, lu.cfg.GetClusterInfo())
+
+
+class LUNodeQuery(NoHooksLU):
+ """Logical unit for querying nodes.
+
+ """
+ # pylint: disable=W0142
+ REQ_BGL = False
+
+ def CheckArguments(self):
+ self.nq = _NodeQuery(qlang.MakeSimpleFilter("name", self.op.names),
+ self.op.output_fields, self.op.use_locking)
+
+ def ExpandNames(self):
+ self.nq.ExpandNames(self)
+
+ def DeclareLocks(self, level):
+ self.nq.DeclareLocks(self, level)
+
+ def Exec(self, feedback_fn):
+ return self.nq.OldStyleQuery(self)
+
+
+def _CheckOutputFields(static, dynamic, selected):
+ """Checks whether all selected fields are valid.
+
+ @type static: L{utils.FieldSet}
+ @param static: static fields set
+ @type dynamic: L{utils.FieldSet}
+ @param dynamic: dynamic fields set
+
+ """
+ f = utils.FieldSet()
+ f.Extend(static)
+ f.Extend(dynamic)
+
+ delta = f.NonMatching(selected)
+ if delta:
+ raise errors.OpPrereqError("Unknown output fields selected: %s"
+ % ",".join(delta), errors.ECODE_INVAL)
+
+
+class LUNodeQueryvols(NoHooksLU):
+ """Logical unit for getting volumes on node(s).
+
+ """
+ REQ_BGL = False
+ _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
+ _FIELDS_STATIC = utils.FieldSet("node")
+
+ def CheckArguments(self):
+ _CheckOutputFields(static=self._FIELDS_STATIC,
+ dynamic=self._FIELDS_DYNAMIC,
+ selected=self.op.output_fields)
+
+ def ExpandNames(self):
+ self.share_locks = _ShareAll()
+
+ if self.op.nodes:
+ self.needed_locks = {
+ locking.LEVEL_NODE: _GetWantedNodes(self, self.op.nodes),
+ }
+ else:
+ self.needed_locks = {
+ locking.LEVEL_NODE: locking.ALL_SET,
+ locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
+ }
+
+ def Exec(self, feedback_fn):
+ """Computes the list of nodes and their attributes.
+
+ """
+ nodenames = self.owned_locks(locking.LEVEL_NODE)
+ volumes = self.rpc.call_node_volumes(nodenames)
+
+ ilist = self.cfg.GetAllInstancesInfo()
+ vol2inst = _MapInstanceDisksToNodes(ilist.values())
+
+ output = []
+ for node in nodenames:
+ nresult = volumes[node]
+ if nresult.offline:
+ continue
+ msg = nresult.fail_msg
+ if msg:
+ self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
+ continue
+
+ node_vols = sorted(nresult.payload,
+ key=operator.itemgetter("dev"))
+
+ for vol in node_vols:
+ node_output = []
+ for field in self.op.output_fields:
+ if field == "node":
+ val = node
+ elif field == "phys":
+ val = vol["dev"]
+ elif field == "vg":
+ val = vol["vg"]
+ elif field == "name":
+ val = vol["name"]
+ elif field == "size":
+ val = int(float(vol["size"]))
+ elif field == "instance":
+ val = vol2inst.get((node, vol["vg"] + "/" + vol["name"]), "-")
+ else:
+ raise errors.ParameterError(field)
+ node_output.append(str(val))
+
+ output.append(node_output)
+
+ return output
+
+
+class LUNodeQueryStorage(NoHooksLU):
+ """Logical unit for getting information on storage units on node(s).
+
+ """
+ _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
+ REQ_BGL = False
+
+ def CheckArguments(self):
+ _CheckOutputFields(static=self._FIELDS_STATIC,
+ dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
+ selected=self.op.output_fields)
+
+ def ExpandNames(self):
+ self.share_locks = _ShareAll()
+
+ if self.op.nodes:
+ self.needed_locks = {
+ locking.LEVEL_NODE: _GetWantedNodes(self, self.op.nodes),
+ }
+ else:
+ self.needed_locks = {
+ locking.LEVEL_NODE: locking.ALL_SET,
+ locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
+ }
+
+ def Exec(self, feedback_fn):
+ """Computes the list of nodes and their attributes.
+
+ """
+ self.nodes = self.owned_locks(locking.LEVEL_NODE)
+
+ # Always get name to sort by
+ if constants.SF_NAME in self.op.output_fields:
+ fields = self.op.output_fields[:]
+ else:
+ fields = [constants.SF_NAME] + self.op.output_fields
+
+ # Never ask for node or type as it's only known to the LU
+ for extra in [constants.SF_NODE, constants.SF_TYPE]:
+ while extra in fields:
+ fields.remove(extra)
+
+ field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
+ name_idx = field_idx[constants.SF_NAME]
+
+ st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
+ data = self.rpc.call_storage_list(self.nodes,
+ self.op.storage_type, st_args,
+ self.op.name, fields)
+
+ result = []
+
+ for node in utils.NiceSort(self.nodes):
+ nresult = data[node]
+ if nresult.offline:
+ continue
+
+ msg = nresult.fail_msg
+ if msg:
+ self.LogWarning("Can't get storage data from node %s: %s", node, msg)
+ continue
+
+ rows = dict([(row[name_idx], row) for row in nresult.payload])
+
+ for name in utils.NiceSort(rows.keys()):
+ row = rows[name]
+
+ out = []
+
+ for field in self.op.output_fields:
+ if field == constants.SF_NODE:
+ val = node
+ elif field == constants.SF_TYPE:
+ val = self.op.storage_type
+ elif field in field_idx:
+ val = row[field_idx[field]]
+ else:
+ raise errors.ParameterError(field)
+
+ out.append(val)
+
+ result.append(out)
+
+ return result
+
+
+class LUNodeRemove(LogicalUnit):
+ """Logical unit for removing a node.
+
+ """
+ HPATH = "node-remove"
+ HTYPE = constants.HTYPE_NODE
+
+ def BuildHooksEnv(self):
+ """Build hooks env.
+
+ """
+ return {
+ "OP_TARGET": self.op.node_name,
+ "NODE_NAME": self.op.node_name,
+ }
+
+ def BuildHooksNodes(self):
+ """Build hooks nodes.
+
+ This doesn't run on the target node in the pre phase as a failed
+ node would then be impossible to remove.
+
+ """
+ all_nodes = self.cfg.GetNodeList()
+ try:
+ all_nodes.remove(self.op.node_name)
+ except ValueError:
+ pass
+ return (all_nodes, all_nodes)
+
+ def CheckPrereq(self):
+ """Check prerequisites.
+
+ This checks:
+ - the node exists in the configuration
+ - it does not have primary or secondary instances
+ - it's not the master
+
+ Any errors are signaled by raising errors.OpPrereqError.
+
+ """
+ self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
+ node = self.cfg.GetNodeInfo(self.op.node_name)
+ assert node is not None
+
+ masternode = self.cfg.GetMasterNode()
+ if node.name == masternode:
+ raise errors.OpPrereqError("Node is the master node, failover to another"
+ " node is required", errors.ECODE_INVAL)
+
+ for instance_name, instance in self.cfg.GetAllInstancesInfo().items():
+ if node.name in instance.all_nodes:
+ raise errors.OpPrereqError("Instance %s is still running on the node,"
+ " please remove first" % instance_name,
+ errors.ECODE_INVAL)
+ self.op.node_name = node.name
+ self.node = node
+
+ def Exec(self, feedback_fn):
+ """Removes the node from the cluster.
+
+ """
+ node = self.node
+ logging.info("Stopping the node daemon and removing configs from node %s",
+ node.name)
+
+ modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
+
+ assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
+ "Not owning BGL"
+
+ # Promote nodes to master candidate as needed
+ _AdjustCandidatePool(self, exceptions=[node.name])
+ self.context.RemoveNode(node.name)
+
+ # Run post hooks on the node before it's removed
+ _RunPostHook(self, node.name)
+
+ result = self.rpc.call_node_leave_cluster(node.name, modify_ssh_setup)
+ msg = result.fail_msg
+ if msg:
+ self.LogWarning("Errors encountered on the remote node while leaving"
+ " the cluster: %s", msg)
+
+ # Remove node from our /etc/hosts
+ if self.cfg.GetClusterInfo().modify_etc_hosts:
+ master_node = self.cfg.GetMasterNode()
+ result = self.rpc.call_etc_hosts_modify(master_node,
+ constants.ETC_HOSTS_REMOVE,
+ node.name, None)
+ result.Raise("Can't update hosts file with new host data")
+ _RedistributeAncillaryFiles(self)
+
+
+class LURepairNodeStorage(NoHooksLU):
+ """Repairs the volume group on a node.
+
+ """
+ REQ_BGL = False
+
+ def CheckArguments(self):
+ self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
+
+ storage_type = self.op.storage_type
+
+ if (constants.SO_FIX_CONSISTENCY not in
+ constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
+ raise errors.OpPrereqError("Storage units of type '%s' can not be"
+ " repaired" % storage_type,
+ errors.ECODE_INVAL)
+
+ def ExpandNames(self):
+ self.needed_locks = {
+ locking.LEVEL_NODE: [self.op.node_name],
+ }
+
+ def _CheckFaultyDisks(self, instance, node_name):
+ """Ensure faulty disks abort the opcode or at least warn."""
+ try:
+ if _FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
+ node_name, True):
+ raise errors.OpPrereqError("Instance '%s' has faulty disks on"
+ " node '%s'" % (instance.name, node_name),
+ errors.ECODE_STATE)
+ except errors.OpPrereqError, err:
+ if self.op.ignore_consistency:
+ self.LogWarning(str(err.args[0]))
+ else:
+ raise
+
+ def CheckPrereq(self):
+ """Check prerequisites.
+
+ """
+ # Check whether any instance on this node has faulty disks
+ for inst in _GetNodeInstances(self.cfg, self.op.node_name):
+ if inst.admin_state != constants.ADMINST_UP:
+ continue
+ check_nodes = set(inst.all_nodes)
+ check_nodes.discard(self.op.node_name)
+ for inst_node_name in check_nodes:
+ self._CheckFaultyDisks(inst, inst_node_name)
+
+ def Exec(self, feedback_fn):
+ feedback_fn("Repairing storage unit '%s' on %s ..." %
+ (self.op.name, self.op.node_name))
+
+ st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
+ result = self.rpc.call_storage_execute(self.op.node_name,
+ self.op.storage_type, st_args,
+ self.op.name,
+ constants.SO_FIX_CONSISTENCY)
+ result.Raise("Failed to repair storage unit '%s' on %s" %
+ (self.op.name, self.op.node_name))