return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM]
-# End types
+class ResultWithJobs:
+ """Data container for LU results with jobs.
+
+ Instances of this class returned from L{LogicalUnit.Exec} will be recognized
+ by L{mcpu.Processor._ProcessResult}. The latter will then submit the jobs
+ contained in the C{jobs} attribute and include the job IDs in the opcode
+ result.
+
+ """
+ def __init__(self, jobs, **kwargs):
+ """Initializes this class.
+
+ Additional return values can be specified as keyword arguments.
+
+ @type jobs: list of lists of L{opcode.OpCode}
+ @param jobs: A list of lists of opcode objects
+
+ """
+ self.jobs = jobs
+ self.other = kwargs
+
+
class LogicalUnit(object):
"""Logical Unit base class.
- implement CheckPrereq (except when tasklets are used)
- implement Exec (except when tasklets are used)
- implement BuildHooksEnv
+ - implement BuildHooksNodes
- redefine HPATH and HTYPE
- optionally redefine their run requirements:
REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
def BuildHooksEnv(self):
"""Build hooks environment for this LU.
- This method should return a three-node tuple consisting of: a dict
- containing the environment that will be used for running the
- specific hook for this LU, a list of node names on which the hook
- should run before the execution, and a list of node names on which
- the hook should run after the execution.
+ @rtype: dict
+ @return: Dictionary containing the environment that will be used for
+ running the hooks for this LU. The keys of the dict must not be prefixed
+ with "GANETI_"--that'll be added by the hooks runner. The hooks runner
+ will extend the environment with additional variables. If no environment
+ should be defined, an empty dictionary should be returned (not C{None}).
+ @note: If the C{HPATH} attribute of the LU class is C{None}, this function
+ will not be called.
- The keys of the dict must not have 'GANETI_' prefixed as this will
- be handled in the hooks runner. Also note additional keys will be
- added by the hooks runner. If the LU doesn't define any
- environment, an empty dict (and not None) should be returned.
+ """
+ raise NotImplementedError
- No nodes should be returned as an empty list (and not None).
+ def BuildHooksNodes(self):
+ """Build list of nodes to run LU's hooks.
- Note that if the HPATH for a LU class is None, this function will
- not be called.
+ @rtype: tuple; (list, list)
+ @return: Tuple containing a list of node names on which the hook
+ should run before the execution and a list of node names on which the
+ hook should run after the execution. No nodes should be returned as an
+ empty list (and not None).
+ @note: If the C{HPATH} attribute of the LU class is C{None}, this function
+ will not be called.
"""
raise NotImplementedError
This just raises an error.
"""
- assert False, "BuildHooksEnv called for NoHooksLUs"
+ raise AssertionError("BuildHooksEnv called for NoHooksLUs")
+
+ def BuildHooksNodes(self):
+ """Empty BuildHooksNodes for NoHooksLU.
+
+ """
+ raise AssertionError("BuildHooksNodes called for NoHooksLU")
class Tasklet:
#: Attribute holding field definitions
FIELDS = None
- def __init__(self, names, fields, use_locking):
+ def __init__(self, filter_, fields, use_locking):
"""Initializes this class.
"""
- self.names = names
self.use_locking = use_locking
- self.query = query.Query(self.FIELDS, fields)
+ self.query = query.Query(self.FIELDS, fields, filter_=filter_,
+ namefield="name")
self.requested_data = self.query.RequestedData()
+ self.names = self.query.RequestedNames()
+
+ # Sort only if no names were requested
+ self.sort_by_name = not self.names
self.do_locking = None
self.wanted = None
# Return expanded names
return self.wanted
- @classmethod
- def FieldsQuery(cls, fields):
- """Returns list of available fields.
-
- @return: List of L{objects.QueryFieldDefinition}
-
- """
- return query.QueryFields(cls.FIELDS, fields)
-
def ExpandNames(self, lu):
"""Expand names for this query.
"""Collect data and execute query.
"""
- return query.GetQueryResponse(self.query, self._GetQueryData(lu))
+ return query.GetQueryResponse(self.query, self._GetQueryData(lu),
+ sort_by_name=self.sort_by_name)
def OldStyleQuery(self, lu):
"""Collect data and execute query.
"""
- return self.query.OldStyleQuery(self._GetQueryData(lu))
+ return self.query.OldStyleQuery(self._GetQueryData(lu),
+ sort_by_name=self.sort_by_name)
def _GetWantedNodes(lu, nodes):
return params_copy
+def _RunPostHook(lu, node_name):
+ """Runs the post-hook for an opcode on a single node.
+
+ """
+ hm = lu.proc.hmclass(lu.rpc.call_hooks_runner, lu)
+ try:
+ hm.RunPhase(constants.HOOKS_PHASE_POST, nodes=[node_name])
+ except:
+ # pylint: disable-msg=W0702
+ lu.LogWarning("Errors occurred running hooks on %s" % node_name)
+
+
def _CheckOutputFields(static, dynamic, selected):
"""Checks whether all selected fields are valid.
# Special case for file storage
if storage_type == constants.ST_FILE:
# storage.FileStorage wants a list of storage directories
- return [[cfg.GetFileStorageDir()]]
+ return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
return []
"""Build hooks env.
"""
- env = {"OP_TARGET": self.cfg.GetClusterName()}
- mn = self.cfg.GetMasterNode()
- return env, [], [mn]
+ return {
+ "OP_TARGET": self.cfg.GetClusterName(),
+ }
+
+ def BuildHooksNodes(self):
+ """Build hooks nodes.
+
+ """
+ return ([], [self.cfg.GetMasterNode()])
def Exec(self, feedback_fn):
"""Nothing to do.
"""Build hooks env.
"""
- env = {"OP_TARGET": self.cfg.GetClusterName()}
- return env, [], []
+ return {
+ "OP_TARGET": self.cfg.GetClusterName(),
+ }
+
+ def BuildHooksNodes(self):
+ """Build hooks nodes.
+
+ """
+ return ([], [])
def CheckPrereq(self):
"""Check prerequisites.
master = self.cfg.GetMasterNode()
# Run post hooks on master node before it's removed
- hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
- try:
- hm.RunPhase(constants.HOOKS_PHASE_POST, [master])
- except:
- # pylint: disable-msg=W0702
- self.LogWarning("Errors occurred running hooks on %s" % master)
+ _RunPostHook(self, master)
result = self.rpc.call_node_stop_master(master, False)
result.Raise("Could not disable the master role")
ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
ECLUSTERCERT = (TCLUSTER, "ECLUSTERCERT")
+ ECLUSTERFILECHECK = (TCLUSTER, "ECLUSTERFILECHECK")
EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
@ivar instances: a list of running instances (runtime)
@ivar pinst: list of configured primary instances (config)
@ivar sinst: list of configured secondary instances (config)
- @ivar sbp: diction of {secondary-node: list of instances} of all peers
- of this node (config)
+ @ivar sbp: dictionary of {primary-node: list of instances} for all
+ instances for which this node is secondary (config)
@ivar mfree: free memory, as reported by hypervisor (runtime)
@ivar dfree: free disk, as reported by the node (runtime)
@ivar offline: the offline status (config)
node_current)
for node, n_img in node_image.items():
- if (not node == node_current):
+ if node != node_current:
test = instance in n_img.instances
_ErrorIf(test, self.EINSTANCEWRONGNODE, instance,
"instance should not run on node %s", node)
for idx, (success, status) in enumerate(disks)]
for nname, success, bdev_status, idx in diskdata:
- _ErrorIf(instanceconfig.admin_up and not success,
+ # the 'ghost node' construction in Exec() ensures that we have a
+ # node here
+ snode = node_image[nname]
+ bad_snode = snode.ghost or snode.offline
+ _ErrorIf(instanceconfig.admin_up and not success and not bad_snode,
self.EINSTANCEFAULTYDISK, instance,
"couldn't retrieve status for disk/%s on %s: %s",
idx, nname, bdev_status)
instances it was primary for.
"""
+ cluster_info = self.cfg.GetClusterInfo()
for node, n_img in node_image.items():
# This code checks that every node which is now listed as
# secondary has enough memory to host all instances it is
for prinode, instances in n_img.sbp.items():
needed_mem = 0
for instance in instances:
- bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
+ bep = cluster_info.FillBE(instance_cfg[instance])
if bep[constants.BE_AUTO_BALANCE]:
needed_mem += bep[constants.BE_MEMORY]
test = n_img.mfree < needed_mem
self._ErrorIf(test, self.ENODEN1, node,
"not enough memory to accomodate instance failovers"
- " should node %s fail", prinode)
+ " should node %s fail (%dMiB needed, %dMiB available)",
+ prinode, needed_mem, n_img.mfree)
- def _VerifyNodeFiles(self, ninfo, nresult, file_list, local_cksum,
- master_files):
- """Verifies and computes the node required file checksums.
+ @classmethod
+ def _VerifyFiles(cls, errorif, nodeinfo, master_node, all_nvinfo,
+ (files_all, files_all_opt, files_mc, files_vm)):
+ """Verifies file checksums collected from all nodes.
- @type ninfo: L{objects.Node}
- @param ninfo: the node to check
- @param nresult: the remote results for the node
- @param file_list: required list of files
- @param local_cksum: dictionary of local files and their checksums
- @param master_files: list of files that only masters should have
+ @param errorif: Callback for reporting errors
+ @param nodeinfo: List of L{objects.Node} objects
+ @param master_node: Name of master node
+ @param all_nvinfo: RPC results
"""
- node = ninfo.name
- _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
+ node_names = frozenset(node.name for node in nodeinfo)
- remote_cksum = nresult.get(constants.NV_FILELIST, None)
- test = not isinstance(remote_cksum, dict)
- _ErrorIf(test, self.ENODEFILECHECK, node,
- "node hasn't returned file checksum data")
- if test:
- return
+ assert master_node in node_names
+ assert (len(files_all | files_all_opt | files_mc | files_vm) ==
+ sum(map(len, [files_all, files_all_opt, files_mc, files_vm]))), \
+ "Found file listed in more than one file list"
+
+ # Define functions determining which nodes to consider for a file
+ file2nodefn = dict([(filename, fn)
+ for (files, fn) in [(files_all, None),
+ (files_all_opt, None),
+ (files_mc, lambda node: (node.master_candidate or
+ node.name == master_node)),
+ (files_vm, lambda node: node.vm_capable)]
+ for filename in files])
- for file_name in file_list:
- node_is_mc = ninfo.master_candidate
- must_have = (file_name not in master_files) or node_is_mc
- # missing
- test1 = file_name not in remote_cksum
- # invalid checksum
- test2 = not test1 and remote_cksum[file_name] != local_cksum[file_name]
- # existing and good
- test3 = not test1 and remote_cksum[file_name] == local_cksum[file_name]
- _ErrorIf(test1 and must_have, self.ENODEFILECHECK, node,
- "file '%s' missing", file_name)
- _ErrorIf(test2 and must_have, self.ENODEFILECHECK, node,
- "file '%s' has wrong checksum", file_name)
- # not candidate and this is not a must-have file
- _ErrorIf(test2 and not must_have, self.ENODEFILECHECK, node,
- "file '%s' should not exist on non master"
- " candidates (and the file is outdated)", file_name)
- # all good, except non-master/non-must have combination
- _ErrorIf(test3 and not must_have, self.ENODEFILECHECK, node,
- "file '%s' should not exist"
- " on non master candidates", file_name)
+ fileinfo = dict((filename, {}) for filename in file2nodefn.keys())
+
+ for node in nodeinfo:
+ nresult = all_nvinfo[node.name]
+
+ if nresult.fail_msg or not nresult.payload:
+ node_files = None
+ else:
+ node_files = nresult.payload.get(constants.NV_FILELIST, None)
+
+ test = not (node_files and isinstance(node_files, dict))
+ errorif(test, cls.ENODEFILECHECK, node.name,
+ "Node did not return file checksum data")
+ if test:
+ continue
+
+ for (filename, checksum) in node_files.items():
+ # Check if the file should be considered for a node
+ fn = file2nodefn[filename]
+ if fn is None or fn(node):
+ fileinfo[filename].setdefault(checksum, set()).add(node.name)
+
+ for (filename, checksums) in fileinfo.items():
+ assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
+
+ # Nodes having the file
+ with_file = frozenset(node_name
+ for nodes in fileinfo[filename].values()
+ for node_name in nodes)
+
+ # Nodes missing file
+ missing_file = node_names - with_file
+
+ if filename in files_all_opt:
+ # All or no nodes
+ errorif(missing_file and missing_file != node_names,
+ cls.ECLUSTERFILECHECK, None,
+ "File %s is optional, but it must exist on all or no nodes (not"
+ " found on %s)",
+ filename, utils.CommaJoin(utils.NiceSort(missing_file)))
+ else:
+ errorif(missing_file, cls.ECLUSTERFILECHECK, None,
+ "File %s is missing from node(s) %s", filename,
+ utils.CommaJoin(utils.NiceSort(missing_file)))
+
+ # See if there are multiple versions of the file
+ test = len(checksums) > 1
+ if test:
+ variants = ["variant %s on %s" %
+ (idx + 1, utils.CommaJoin(utils.NiceSort(nodes)))
+ for (idx, (checksum, nodes)) in
+ enumerate(sorted(checksums.items()))]
+ else:
+ variants = []
+
+ errorif(test, cls.ECLUSTERFILECHECK, None,
+ "File %s found with %s different checksums (%s)",
+ filename, len(checksums), "; ".join(variants))
def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
drbd_map):
except errors.GenericError, err:
self._ErrorIf(True, self.ECLUSTERCFG, None, msg % str(err))
-
def BuildHooksEnv(self):
"""Build hooks env.
the output be logged in the verify output and the verification to fail.
"""
- all_nodes = self.cfg.GetNodeList()
+ cfg = self.cfg
+
env = {
- "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
+ "CLUSTER_TAGS": " ".join(cfg.GetClusterInfo().GetTags())
}
- for node in self.cfg.GetAllNodesInfo().values():
- env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
- return env, [], all_nodes
+ env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
+ for node in cfg.GetAllNodesInfo().values())
+
+ return env
+
+ def BuildHooksNodes(self):
+ """Build hooks nodes.
+
+ """
+ return ([], self.cfg.GetNodeList())
def Exec(self, feedback_fn):
"""Verify integrity of cluster, performing various test on nodes.
node_vol_should = {}
# FIXME: verify OS list
+
+ # File verification
+ filemap = _ComputeAncillaryFiles(cluster, False)
+
# do local checksums
- master_files = [constants.CLUSTER_CONF_FILE]
master_node = self.master_node = self.cfg.GetMasterNode()
master_ip = self.cfg.GetMasterIP()
- file_names = ssconf.SimpleStore().GetFileList()
- file_names.extend(constants.ALL_CERT_FILES)
- file_names.extend(master_files)
- if cluster.modify_etc_hosts:
- file_names.append(constants.ETC_HOSTS)
-
- local_checksums = utils.FingerprintFiles(file_names)
-
# Compute the set of hypervisor parameters
hvp_data = []
for hv_name in hypervisors:
feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
node_verify_param = {
- constants.NV_FILELIST: file_names,
+ constants.NV_FILELIST:
+ utils.UniqueSequence(filename
+ for files in filemap
+ for filename in files),
constants.NV_NODELIST: [node.name for node in nodeinfo
if not node.offline],
constants.NV_HYPERVISOR: hypervisors,
feedback_fn("* Gathering disk information (%s nodes)" % len(nodelist))
instdisk = self._CollectDiskInfo(nodelist, node_image, instanceinfo)
+ feedback_fn("* Verifying configuration file consistency")
+ self._VerifyFiles(_ErrorIf, nodeinfo, master_node, all_nvinfo, filemap)
+
feedback_fn("* Verifying node status")
refos_img = None
nimg.call_ok = self._VerifyNode(node_i, nresult)
self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
self._VerifyNodeNetwork(node_i, nresult)
- self._VerifyNodeFiles(node_i, nresult, file_names, local_checksums,
- master_files)
-
self._VerifyOob(node_i, nresult)
if nimg.vm_capable:
self.ENODERPC, pnode, "instance %s, connection to"
" primary node failed", instance)
- if pnode_img.offline:
- inst_nodes_offline.append(pnode)
+ _ErrorIf(inst_config.admin_up and pnode_img.offline,
+ self.EINSTANCEBADNODE, instance,
+ "instance is marked as running and lives on offline node %s",
+ inst_config.primary_node)
# If the instance is non-redundant we cannot survive losing its primary
# node, so we are not N+1 compliant. On the other hand we have no disk
utils.CommaJoin(inst_config.secondary_nodes),
code=self.ETYPE_WARNING)
- if inst_config.disk_template in constants.DTS_NET_MIRROR:
+ if inst_config.disk_template in constants.DTS_INT_MIRROR:
pnode = inst_config.primary_node
instance_nodes = utils.NiceSort(inst_config.all_nodes)
instance_groups = {}
# warn that the instance lives on offline nodes
_ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance,
- "instance lives on offline node(s) %s",
+ "instance has offline secondary node(s) %s",
utils.CommaJoin(inst_nodes_offline))
# ... or ghost/non-vm_capable nodes
for node in inst_config.all_nodes:
newl = [v[2].Copy() for v in dskl]
for dsk in newl:
self.cfg.SetDiskID(dsk, node)
- result = self.rpc.call_blockdev_getsizes(node, newl)
+ result = self.rpc.call_blockdev_getsize(node, newl)
if result.fail_msg:
- self.LogWarning("Failure in blockdev_getsizes call to node"
+ self.LogWarning("Failure in blockdev_getsize call to node"
" %s, ignoring", node)
continue
- if len(result.data) != len(dskl):
+ if len(result.payload) != len(dskl):
+ logging.warning("Invalid result from node %s: len(dksl)=%d,"
+ " result.payload=%s", node, len(dskl), result.payload)
self.LogWarning("Invalid result from node %s, ignoring node results",
node)
continue
- for ((instance, idx, disk), size) in zip(dskl, result.data):
+ for ((instance, idx, disk), size) in zip(dskl, result.payload):
if size is None:
self.LogWarning("Disk %d of instance %s did not return size"
" information, ignoring", idx, instance.name)
"""Build hooks env.
"""
- env = {
+ return {
"OP_TARGET": self.cfg.GetClusterName(),
"NEW_NAME": self.op.name,
}
- mn = self.cfg.GetMasterNode()
- all_nodes = self.cfg.GetNodeList()
- return env, [mn], all_nodes
+
+ def BuildHooksNodes(self):
+ """Build hooks nodes.
+
+ """
+ return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
def CheckPrereq(self):
"""Verify that the passed name is a valid one.
"""Build hooks env.
"""
- env = {
+ return {
"OP_TARGET": self.cfg.GetClusterName(),
"NEW_VG_NAME": self.op.vg_name,
}
+
+ def BuildHooksNodes(self):
+ """Build hooks nodes.
+
+ """
mn = self.cfg.GetMasterNode()
- return env, [mn], [mn]
+ return ([mn], [mn])
def CheckPrereq(self):
"""Check prerequisites.
utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
+ # TODO: we need a more general way to handle resetting
+ # cluster-level parameters to default values
+ if self.new_ndparams["oob_program"] == "":
+ self.new_ndparams["oob_program"] = \
+ constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
+
if self.op.nicparams:
utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
lu.proc.LogWarning(msg)
+def _ComputeAncillaryFiles(cluster, redist):
+ """Compute files external to Ganeti which need to be consistent.
+
+ @type redist: boolean
+ @param redist: Whether to include files which need to be redistributed
+
+ """
+ # Compute files for all nodes
+ files_all = set([
+ constants.SSH_KNOWN_HOSTS_FILE,
+ constants.CONFD_HMAC_KEY,
+ constants.CLUSTER_DOMAIN_SECRET_FILE,
+ ])
+
+ if not redist:
+ files_all.update(constants.ALL_CERT_FILES)
+ files_all.update(ssconf.SimpleStore().GetFileList())
+
+ if cluster.modify_etc_hosts:
+ files_all.add(constants.ETC_HOSTS)
+
+ # Files which must either exist on all nodes or on none
+ files_all_opt = set([
+ constants.RAPI_USERS_FILE,
+ ])
+
+ # Files which should only be on master candidates
+ files_mc = set()
+ if not redist:
+ files_mc.add(constants.CLUSTER_CONF_FILE)
+
+ # Files which should only be on VM-capable nodes
+ files_vm = set(filename
+ for hv_name in cluster.enabled_hypervisors
+ for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles())
+
+ # Filenames must be unique
+ assert (len(files_all | files_all_opt | files_mc | files_vm) ==
+ sum(map(len, [files_all, files_all_opt, files_mc, files_vm]))), \
+ "Found file listed in more than one file list"
+
+ return (files_all, files_all_opt, files_mc, files_vm)
+
+
def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
"""Distribute additional files which are part of the cluster configuration.
@param additional_vm: whether the additional nodes are vm-capable or not
"""
- # 1. Gather target nodes
- myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
- dist_nodes = lu.cfg.GetOnlineNodeList()
- nvm_nodes = lu.cfg.GetNonVmCapableNodeList()
- vm_nodes = [name for name in dist_nodes if name not in nvm_nodes]
+ # Gather target nodes
+ cluster = lu.cfg.GetClusterInfo()
+ master_info = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
+
+ online_nodes = lu.cfg.GetOnlineNodeList()
+ vm_nodes = lu.cfg.GetVmCapableNodeList()
+
if additional_nodes is not None:
- dist_nodes.extend(additional_nodes)
+ online_nodes.extend(additional_nodes)
if additional_vm:
vm_nodes.extend(additional_nodes)
- if myself.name in dist_nodes:
- dist_nodes.remove(myself.name)
- if myself.name in vm_nodes:
- vm_nodes.remove(myself.name)
-
- # 2. Gather files to distribute
- dist_files = set([constants.ETC_HOSTS,
- constants.SSH_KNOWN_HOSTS_FILE,
- constants.RAPI_CERT_FILE,
- constants.RAPI_USERS_FILE,
- constants.CONFD_HMAC_KEY,
- constants.CLUSTER_DOMAIN_SECRET_FILE,
- ])
-
- vm_files = set()
- enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
- for hv_name in enabled_hypervisors:
- hv_class = hypervisor.GetHypervisor(hv_name)
- vm_files.update(hv_class.GetAncillaryFiles())
-
- # 3. Perform the files upload
- for fname in dist_files:
- _UploadHelper(lu, dist_nodes, fname)
- for fname in vm_files:
- _UploadHelper(lu, vm_nodes, fname)
+
+ # Never distribute to master node
+ for nodelist in [online_nodes, vm_nodes]:
+ if master_info.name in nodelist:
+ nodelist.remove(master_info.name)
+
+ # Gather file lists
+ (files_all, files_all_opt, files_mc, files_vm) = \
+ _ComputeAncillaryFiles(cluster, True)
+
+ # Never re-distribute configuration file from here
+ assert not (constants.CLUSTER_CONF_FILE in files_all or
+ constants.CLUSTER_CONF_FILE in files_vm)
+ assert not files_mc, "Master candidates not handled in this function"
+
+ filemap = [
+ (online_nodes, files_all),
+ (online_nodes, files_all_opt),
+ (vm_nodes, files_vm),
+ ]
+
+ # Upload the files
+ for (node_list, files) in filemap:
+ for fname in files:
+ _UploadHelper(lu, node_list, fname)
class LUClusterRedistConf(NoHooksLU):
"""
REG_BGL = False
+ _SKIP_MASTER = (constants.OOB_POWER_OFF, constants.OOB_POWER_CYCLE)
def CheckPrereq(self):
"""Check prerequisites.
"""
self.nodes = []
+ self.master_node = self.cfg.GetMasterNode()
+
+ assert self.op.power_delay >= 0.0
+
+ if self.op.node_names:
+ if self.op.command in self._SKIP_MASTER:
+ if self.master_node in self.op.node_names:
+ master_node_obj = self.cfg.GetNodeInfo(self.master_node)
+ master_oob_handler = _SupportsOob(self.cfg, master_node_obj)
+
+ if master_oob_handler:
+ additional_text = ("Run '%s %s %s' if you want to operate on the"
+ " master regardless") % (master_oob_handler,
+ self.op.command,
+ self.master_node)
+ else:
+ additional_text = "The master node does not support out-of-band"
+
+ raise errors.OpPrereqError(("Operating on the master node %s is not"
+ " allowed for %s\n%s") %
+ (self.master_node, self.op.command,
+ additional_text), errors.ECODE_INVAL)
+ else:
+ self.op.node_names = self.cfg.GetNodeList()
+ if self.op.command in self._SKIP_MASTER:
+ self.op.node_names.remove(self.master_node)
+
+ if self.op.command in self._SKIP_MASTER:
+ assert self.master_node not in self.op.node_names
+
for node_name in self.op.node_names:
node = self.cfg.GetNodeInfo(node_name)
else:
self.nodes.append(node)
- if (self.op.command == constants.OOB_POWER_OFF and not node.offline):
+ if (not self.op.ignore_status and
+ (self.op.command == constants.OOB_POWER_OFF and not node.offline)):
raise errors.OpPrereqError(("Cannot power off node %s because it is"
" not marked offline") % node_name,
errors.ECODE_STATE)
if self.op.node_names:
self.op.node_names = [_ExpandNodeName(self.cfg, name)
for name in self.op.node_names]
+ lock_names = self.op.node_names
else:
- self.op.node_names = self.cfg.GetNodeList()
+ lock_names = locking.ALL_SET
self.needed_locks = {
- locking.LEVEL_NODE: self.op.node_names,
+ locking.LEVEL_NODE: lock_names,
}
def Exec(self, feedback_fn):
"""Execute OOB and return result if we expect any.
"""
- master_node = self.cfg.GetMasterNode()
+ master_node = self.master_node
ret = []
- for node in self.nodes:
+ for idx, node in enumerate(self.nodes):
node_entry = [(constants.RS_NORMAL, node.name)]
ret.append(node_entry)
node_entry.append((constants.RS_NORMAL, result.payload))
+ if (self.op.command == constants.OOB_POWER_ON and
+ idx < len(self.nodes) - 1):
+ time.sleep(self.op.power_delay)
+
return ret
def _CheckPayload(self, result):
raise errors.OpExecError("Check of out-of-band payload failed due to %s" %
utils.CommaJoin(errs))
+class _OsQuery(_QueryBase):
+ FIELDS = query.OS_FIELDS
-
-class LUOsDiagnose(NoHooksLU):
- """Logical unit for OS diagnose/query.
-
- """
- REQ_BGL = False
- _HID = "hidden"
- _BLK = "blacklisted"
- _VLD = "valid"
- _FIELDS_STATIC = utils.FieldSet()
- _FIELDS_DYNAMIC = utils.FieldSet("name", _VLD, "node_status", "variants",
- "parameters", "api_versions", _HID, _BLK)
-
- def CheckArguments(self):
- if self.op.names:
- raise errors.OpPrereqError("Selective OS query not supported",
- errors.ECODE_INVAL)
-
- _CheckOutputFields(static=self._FIELDS_STATIC,
- dynamic=self._FIELDS_DYNAMIC,
- selected=self.op.output_fields)
-
- def ExpandNames(self):
- # Lock all nodes, in shared mode
+ def ExpandNames(self, lu):
+ # Lock all nodes in shared mode
# Temporary removal of locks, should be reverted later
# TODO: reintroduce locks when they are lighter-weight
- self.needed_locks = {}
+ lu.needed_locks = {}
#self.share_locks[locking.LEVEL_NODE] = 1
#self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+ # The following variables interact with _QueryBase._GetNames
+ if self.names:
+ self.wanted = self.names
+ else:
+ self.wanted = locking.ALL_SET
+
+ self.do_locking = self.use_locking
+
+ def DeclareLocks(self, lu, level):
+ pass
+
@staticmethod
def _DiagnoseByOS(rlist):
"""Remaps a per-node return list into an a per-os per-node dictionary
variants, params, api_versions))
return all_os
- def Exec(self, feedback_fn):
- """Compute the list of OSes.
+ def _GetQueryData(self, lu):
+ """Computes the list of nodes and their attributes.
"""
+ # Locking is not used
+ assert not (lu.acquired_locks or self.do_locking or self.use_locking)
+
valid_nodes = [node.name
- for node in self.cfg.GetAllNodesInfo().values()
+ for node in lu.cfg.GetAllNodesInfo().values()
if not node.offline and node.vm_capable]
- node_data = self.rpc.call_os_diagnose(valid_nodes)
- pol = self._DiagnoseByOS(node_data)
- output = []
- cluster = self.cfg.GetClusterInfo()
+ pol = self._DiagnoseByOS(lu.rpc.call_os_diagnose(valid_nodes))
+ cluster = lu.cfg.GetClusterInfo()
+
+ data = {}
+
+ for (os_name, os_data) in pol.items():
+ info = query.OsInfo(name=os_name, valid=True, node_status=os_data,
+ hidden=(os_name in cluster.hidden_os),
+ blacklisted=(os_name in cluster.blacklisted_os))
+
+ variants = set()
+ parameters = set()
+ api_versions = set()
- for os_name in utils.NiceSort(pol.keys()):
- os_data = pol[os_name]
- row = []
- valid = True
- (variants, params, api_versions) = null_state = (set(), set(), set())
for idx, osl in enumerate(os_data.values()):
- valid = bool(valid and osl and osl[0][1])
- if not valid:
- (variants, params, api_versions) = null_state
+ info.valid = bool(info.valid and osl and osl[0][1])
+ if not info.valid:
break
- node_variants, node_params, node_api = osl[0][3:6]
- if idx == 0: # first entry
- variants = set(node_variants)
- params = set(node_params)
- api_versions = set(node_api)
- else: # keep consistency
+
+ (node_variants, node_params, node_api) = osl[0][3:6]
+ if idx == 0:
+ # First entry
+ variants.update(node_variants)
+ parameters.update(node_params)
+ api_versions.update(node_api)
+ else:
+ # Filter out inconsistent values
variants.intersection_update(node_variants)
- params.intersection_update(node_params)
+ parameters.intersection_update(node_params)
api_versions.intersection_update(node_api)
- is_hid = os_name in cluster.hidden_os
- is_blk = os_name in cluster.blacklisted_os
- if ((self._HID not in self.op.output_fields and is_hid) or
- (self._BLK not in self.op.output_fields and is_blk) or
- (self._VLD not in self.op.output_fields and not valid)):
- continue
+ info.variants = list(variants)
+ info.parameters = list(parameters)
+ info.api_versions = list(api_versions)
- for field in self.op.output_fields:
- if field == "name":
- val = os_name
- elif field == self._VLD:
- val = valid
- elif field == "node_status":
- # this is just a copy of the dict
- val = {}
- for node_name, nos_list in os_data.items():
- val[node_name] = nos_list
- elif field == "variants":
- val = utils.NiceSort(list(variants))
- elif field == "parameters":
- val = list(params)
- elif field == "api_versions":
- val = list(api_versions)
- elif field == self._HID:
- val = is_hid
- elif field == self._BLK:
- val = is_blk
- else:
- raise errors.ParameterError(field)
- row.append(val)
- output.append(row)
+ data[os_name] = info
- return output
+ # Prepare data in requested order
+ return [data[name] for name in self._GetNames(lu, pol.keys(), None)
+ if name in data]
+
+
+class LUOsDiagnose(NoHooksLU):
+ """Logical unit for OS diagnose/query.
+
+ """
+ REQ_BGL = False
+
+ @staticmethod
+ def _BuildFilter(fields, names):
+ """Builds a filter for querying OSes.
+
+ """
+ name_filter = qlang.MakeSimpleFilter("name", names)
+
+ # Legacy behaviour: Hide hidden, blacklisted or invalid OSes if the
+ # respective field is not requested
+ status_filter = [[qlang.OP_NOT, [qlang.OP_TRUE, fname]]
+ for fname in ["hidden", "blacklisted"]
+ if fname not in fields]
+ if "valid" not in fields:
+ status_filter.append([qlang.OP_TRUE, "valid"])
+
+ if status_filter:
+ status_filter.insert(0, qlang.OP_AND)
+ else:
+ status_filter = None
+
+ if name_filter and status_filter:
+ return [qlang.OP_AND, name_filter, status_filter]
+ elif name_filter:
+ return name_filter
+ else:
+ return status_filter
+
+ def CheckArguments(self):
+ self.oq = _OsQuery(self._BuildFilter(self.op.output_fields, self.op.names),
+ self.op.output_fields, False)
+
+ def ExpandNames(self):
+ self.oq.ExpandNames(self)
+
+ def Exec(self, feedback_fn):
+ return self.oq.OldStyleQuery(self)
class LUNodeRemove(LogicalUnit):
node would then be impossible to remove.
"""
- env = {
+ return {
"OP_TARGET": self.op.node_name,
"NODE_NAME": self.op.node_name,
}
+
+ def BuildHooksNodes(self):
+ """Build hooks nodes.
+
+ """
all_nodes = self.cfg.GetNodeList()
try:
all_nodes.remove(self.op.node_name)
except ValueError:
- logging.warning("Node %s which is about to be removed not found"
- " in the all nodes list", self.op.node_name)
- return env, all_nodes, all_nodes
+ logging.warning("Node '%s', which is about to be removed, was not found"
+ " in the list of all nodes", self.op.node_name)
+ return (all_nodes, all_nodes)
def CheckPrereq(self):
"""Check prerequisites.
self.context.RemoveNode(node.name)
# Run post hooks on the node before it's removed
- hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
- try:
- hm.RunPhase(constants.HOOKS_PHASE_POST, [node.name])
- except:
- # pylint: disable-msg=W0702
- self.LogWarning("Errors occurred running hooks on %s" % node.name)
+ _RunPostHook(self, node.name)
result = self.rpc.call_node_leave_cluster(node.name, modify_ssh_setup)
msg = result.fail_msg
# Gather data as requested
if query.NQ_LIVE in self.requested_data:
- node_data = lu.rpc.call_node_info(nodenames, lu.cfg.GetVGName(),
+ # filter out non-vm_capable nodes
+ toquery_nodes = [name for name in nodenames if all_info[name].vm_capable]
+
+ node_data = lu.rpc.call_node_info(toquery_nodes, lu.cfg.GetVGName(),
lu.cfg.GetHypervisorType())
live_data = dict((name, nresult.payload)
for (name, nresult) in node_data.items()
REQ_BGL = False
def CheckArguments(self):
- self.nq = _NodeQuery(self.op.names, self.op.output_fields,
- self.op.use_locking)
+ self.nq = _NodeQuery(qlang.MakeSimpleFilter("name", self.op.names),
+ self.op.output_fields, self.op.use_locking)
def ExpandNames(self):
self.nq.ExpandNames(self)
bad_nodes.append(name)
elif result.payload:
for inst in result.payload:
- if all_info[inst].primary_node == name:
- live_data.update(result.payload)
+ if inst in all_info:
+ if all_info[inst].primary_node == name:
+ live_data.update(result.payload)
+ else:
+ wrongnode_inst.add(inst)
else:
- wrongnode_inst.add(inst)
+ # orphan instance; we don't list it here as we don't
+ # handle this case yet in the output of instance listing
+ logging.warning("Orphan instance '%s' found on node %s",
+ inst, name)
# else no instance is alive
else:
live_data = {}
if query.IQ_DISKUSAGE in self.requested_data:
disk_usage = dict((inst.name,
_ComputeDiskSize(inst.disk_template,
- [{"size": disk.size}
+ [{constants.IDISK_SIZE: disk.size}
for disk in inst.disks]))
for inst in instance_list)
else:
def CheckArguments(self):
qcls = _GetQueryImplementation(self.op.what)
- names = qlang.ReadSimpleFilter("name", self.op.filter)
- self.impl = qcls(names, self.op.fields, False)
+ self.impl = qcls(self.op.filter, self.op.fields, False)
def ExpandNames(self):
self.impl.ExpandNames(self)
self.needed_locks = {}
def Exec(self, feedback_fn):
- return self.qcls.FieldsQuery(self.op.fields)
+ return query.QueryFields(self.qcls.FIELDS, self.op.fields)
class LUNodeModifyStorage(NoHooksLU):
This will run on all nodes before, and on all nodes + the new node after.
"""
- env = {
+ return {
"OP_TARGET": self.op.node_name,
"NODE_NAME": self.op.node_name,
"NODE_PIP": self.op.primary_ip,
"MASTER_CAPABLE": str(self.op.master_capable),
"VM_CAPABLE": str(self.op.vm_capable),
}
- nodes_0 = self.cfg.GetNodeList()
- nodes_1 = nodes_0 + [self.op.node_name, ]
- return env, nodes_0, nodes_1
+
+ def BuildHooksNodes(self):
+ """Build hooks nodes.
+
+ """
+ # Exclude added node
+ pre_nodes = list(set(self.cfg.GetNodeList()) - set([self.op.node_name]))
+ post_nodes = pre_nodes + [self.op.node_name, ]
+
+ return (pre_nodes, post_nodes)
def CheckPrereq(self):
"""Check prerequisites.
if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
instance = self.context.cfg.GetInstanceInfo(instance_name)
- i_mirrored = instance.disk_template in constants.DTS_NET_MIRROR
+ i_mirrored = instance.disk_template in constants.DTS_INT_MIRROR
if i_mirrored and self.op.node_name in instance.all_nodes:
instances_keep.append(instance_name)
self.affected_instances.append(instance)
This runs on the master node.
"""
- env = {
+ return {
"OP_TARGET": self.op.node_name,
"MASTER_CANDIDATE": str(self.op.master_candidate),
"OFFLINE": str(self.op.offline),
"MASTER_CAPABLE": str(self.op.master_capable),
"VM_CAPABLE": str(self.op.vm_capable),
}
- nl = [self.cfg.GetMasterNode(),
- self.op.node_name]
- return env, nl, nl
+
+ def BuildHooksNodes(self):
+ """Build hooks nodes.
+
+ """
+ nl = [self.cfg.GetMasterNode(), self.op.node_name]
+ return (nl, nl)
def CheckPrereq(self):
"""Check prerequisites.
"volume_group_name": cluster.volume_group_name,
"drbd_usermode_helper": cluster.drbd_usermode_helper,
"file_storage_dir": cluster.file_storage_dir,
+ "shared_file_storage_dir": cluster.shared_file_storage_dir,
"maintain_node_health": cluster.maintain_node_health,
"ctime": cluster.ctime,
"mtime": cluster.mtime,
env = {
"FORCE": self.op.force,
}
+
env.update(_BuildInstanceHookEnvByObject(self, self.instance))
+
+ return env
+
+ def BuildHooksNodes(self):
+ """Build hooks nodes.
+
+ """
nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
- return env, nl, nl
+ return (nl, nl)
def CheckPrereq(self):
"""Check prerequisites.
"REBOOT_TYPE": self.op.reboot_type,
"SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
}
+
env.update(_BuildInstanceHookEnvByObject(self, self.instance))
+
+ return env
+
+ def BuildHooksNodes(self):
+ """Build hooks nodes.
+
+ """
nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
- return env, nl, nl
+ return (nl, nl)
def CheckPrereq(self):
"""Check prerequisites.
ignore_secondaries = self.op.ignore_secondaries
reboot_type = self.op.reboot_type
+ remote_info = self.rpc.call_instance_info(instance.primary_node,
+ instance.name,
+ instance.hypervisor)
+ remote_info.Raise("Error checking node %s" % instance.primary_node)
+ instance_running = bool(remote_info.payload)
+
node_current = instance.primary_node
- if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
- constants.INSTANCE_REBOOT_HARD]:
+ if instance_running and reboot_type in [constants.INSTANCE_REBOOT_SOFT,
+ constants.INSTANCE_REBOOT_HARD]:
for disk in instance.disks:
self.cfg.SetDiskID(disk, node_current)
result = self.rpc.call_instance_reboot(node_current, instance,
self.op.shutdown_timeout)
result.Raise("Could not reboot instance")
else:
- result = self.rpc.call_instance_shutdown(node_current, instance,
- self.op.shutdown_timeout)
- result.Raise("Could not shutdown instance for full reboot")
- _ShutdownInstanceDisks(self, instance)
+ if instance_running:
+ result = self.rpc.call_instance_shutdown(node_current, instance,
+ self.op.shutdown_timeout)
+ result.Raise("Could not shutdown instance for full reboot")
+ _ShutdownInstanceDisks(self, instance)
+ else:
+ self.LogInfo("Instance %s was already stopped, starting now",
+ instance.name)
_StartInstanceDisks(self, instance, ignore_secondaries)
result = self.rpc.call_instance_start(node_current, instance, None, None)
msg = result.fail_msg
"""
env = _BuildInstanceHookEnvByObject(self, self.instance)
env["TIMEOUT"] = self.op.timeout
+ return env
+
+ def BuildHooksNodes(self):
+ """Build hooks nodes.
+
+ """
nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
- return env, nl, nl
+ return (nl, nl)
def CheckPrereq(self):
"""Check prerequisites.
This runs on master, primary and secondary nodes of the instance.
"""
- env = _BuildInstanceHookEnvByObject(self, self.instance)
+ return _BuildInstanceHookEnvByObject(self, self.instance)
+
+ def BuildHooksNodes(self):
+ """Build hooks nodes.
+
+ """
nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
- return env, nl, nl
+ return (nl, nl)
def CheckPrereq(self):
"""Check prerequisites.
This runs on master, primary and secondary nodes of the instance.
"""
- env = _BuildInstanceHookEnvByObject(self, self.instance)
+ return _BuildInstanceHookEnvByObject(self, self.instance)
+
+ def BuildHooksNodes(self):
+ """Build hooks nodes.
+
+ """
nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
- return env, nl, nl
+ return (nl, nl)
def CheckPrereq(self):
"""Check prerequisites.
"""
env = _BuildInstanceHookEnvByObject(self, self.instance)
env["INSTANCE_NEW_NAME"] = self.op.new_name
+ return env
+
+ def BuildHooksNodes(self):
+ """Build hooks nodes.
+
+ """
nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
- return env, nl, nl
+ return (nl, nl)
def CheckPrereq(self):
"""Check prerequisites.
hostname = netutils.GetHostname(name=new_name)
self.LogInfo("Resolved given name '%s' to '%s'", new_name,
hostname.name)
+ if not utils.MatchNameComponent(self.op.new_name, [hostname.name]):
+ raise errors.OpPrereqError(("Resolved hostname '%s' does not look the"
+ " same as given hostname '%s'") %
+ (hostname.name, self.op.new_name),
+ errors.ECODE_INVAL)
new_name = self.op.new_name = hostname.name
if (self.op.ip_check and
netutils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT)):
old_name = inst.name
rename_file_storage = False
- if (inst.disk_template == constants.DT_FILE and
+ if (inst.disk_template in (constants.DT_FILE, constants.DT_SHARED_FILE) and
self.op.new_name != inst.name):
old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
rename_file_storage = True
"""
env = _BuildInstanceHookEnvByObject(self, self.instance)
env["SHUTDOWN_TIMEOUT"] = self.op.shutdown_timeout
+ return env
+
+ def BuildHooksNodes(self):
+ """Build hooks nodes.
+
+ """
nl = [self.cfg.GetMasterNode()]
nl_post = list(self.instance.all_nodes) + nl
- return env, nl, nl_post
+ return (nl, nl_post)
def CheckPrereq(self):
"""Check prerequisites.
REQ_BGL = False
def CheckArguments(self):
- self.iq = _InstanceQuery(self.op.names, self.op.output_fields,
- self.op.use_locking)
+ self.iq = _InstanceQuery(qlang.MakeSimpleFilter("name", self.op.names),
+ self.op.output_fields, self.op.use_locking)
def ExpandNames(self):
self.iq.ExpandNames(self)
HTYPE = constants.HTYPE_INSTANCE
REQ_BGL = False
+ def CheckArguments(self):
+ """Check the arguments.
+
+ """
+ self.iallocator = getattr(self.op, "iallocator", None)
+ self.target_node = getattr(self.op, "target_node", None)
+
def ExpandNames(self):
self._ExpandAndLockInstance()
+
+ if self.op.target_node is not None:
+ self.op.target_node = _ExpandNodeName(self.cfg, self.op.target_node)
+
self.needed_locks[locking.LEVEL_NODE] = []
self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+ ignore_consistency = self.op.ignore_consistency
+ shutdown_timeout = self.op.shutdown_timeout
+ self._migrater = TLMigrateInstance(self, self.op.instance_name,
+ cleanup=False,
+ iallocator=self.op.iallocator,
+ target_node=self.op.target_node,
+ failover=True,
+ ignore_consistency=ignore_consistency,
+ shutdown_timeout=shutdown_timeout)
+ self.tasklets = [self._migrater]
+
def DeclareLocks(self, level):
if level == locking.LEVEL_NODE:
- self._LockInstancesNodes()
+ instance = self.context.cfg.GetInstanceInfo(self.op.instance_name)
+ if instance.disk_template in constants.DTS_EXT_MIRROR:
+ if self.op.target_node is None:
+ self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+ else:
+ self.needed_locks[locking.LEVEL_NODE] = [instance.primary_node,
+ self.op.target_node]
+ del self.recalculate_locks[locking.LEVEL_NODE]
+ else:
+ self._LockInstancesNodes()
def BuildHooksEnv(self):
"""Build hooks env.
This runs on master, primary and secondary nodes of the instance.
"""
- instance = self.instance
+ instance = self._migrater.instance
source_node = instance.primary_node
- target_node = instance.secondary_nodes[0]
+ target_node = self._migrater.target_node
env = {
"IGNORE_CONSISTENCY": self.op.ignore_consistency,
"SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
"OLD_PRIMARY": source_node,
- "OLD_SECONDARY": target_node,
"NEW_PRIMARY": target_node,
- "NEW_SECONDARY": source_node,
}
+
+ if instance.disk_template in constants.DTS_INT_MIRROR:
+ env["OLD_SECONDARY"] = instance.secondary_nodes[0]
+ env["NEW_SECONDARY"] = source_node
+ else:
+ env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = ""
+
env.update(_BuildInstanceHookEnvByObject(self, instance))
- nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
- nl_post = list(nl)
- nl_post.append(source_node)
- return env, nl, nl_post
- def CheckPrereq(self):
- """Check prerequisites.
+ return env
- This checks that the instance is in the cluster.
+ def BuildHooksNodes(self):
+ """Build hooks nodes.
"""
- self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
- assert self.instance is not None, \
- "Cannot retrieve locked instance %s" % self.op.instance_name
-
- bep = self.cfg.GetClusterInfo().FillBE(instance)
- if instance.disk_template not in constants.DTS_NET_MIRROR:
- raise errors.OpPrereqError("Instance's disk layout is not"
- " network mirrored, cannot failover.",
- errors.ECODE_STATE)
-
- secondary_nodes = instance.secondary_nodes
- if not secondary_nodes:
- raise errors.ProgrammerError("no secondary node but using "
- "a mirrored disk template")
-
- target_node = secondary_nodes[0]
- _CheckNodeOnline(self, target_node)
- _CheckNodeNotDrained(self, target_node)
- if instance.admin_up:
- # check memory requirements on the secondary node
- _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
- instance.name, bep[constants.BE_MEMORY],
- instance.hypervisor)
- else:
- self.LogInfo("Not checking memory on the secondary node as"
- " instance will not be started")
-
- # check bridge existance
- _CheckInstanceBridgesExist(self, instance, node=target_node)
-
- def Exec(self, feedback_fn):
- """Failover an instance.
-
- The failover is done by shutting it down on its present node and
- starting it on the secondary.
-
- """
- instance = self.instance
- primary_node = self.cfg.GetNodeInfo(instance.primary_node)
-
- source_node = instance.primary_node
- target_node = instance.secondary_nodes[0]
-
- if instance.admin_up:
- feedback_fn("* checking disk consistency between source and target")
- for dev in instance.disks:
- # for drbd, these are drbd over lvm
- if not _CheckDiskConsistency(self, dev, target_node, False):
- if not self.op.ignore_consistency:
- raise errors.OpExecError("Disk %s is degraded on target node,"
- " aborting failover." % dev.iv_name)
- else:
- feedback_fn("* not checking disk consistency as instance is not running")
-
- feedback_fn("* shutting down instance on source node")
- logging.info("Shutting down instance %s on node %s",
- instance.name, source_node)
-
- result = self.rpc.call_instance_shutdown(source_node, instance,
- self.op.shutdown_timeout)
- msg = result.fail_msg
- if msg:
- if self.op.ignore_consistency or primary_node.offline:
- self.proc.LogWarning("Could not shutdown instance %s on node %s."
- " Proceeding anyway. Please make sure node"
- " %s is down. Error details: %s",
- instance.name, source_node, source_node, msg)
- else:
- raise errors.OpExecError("Could not shutdown instance %s on"
- " node %s: %s" %
- (instance.name, source_node, msg))
-
- feedback_fn("* deactivating the instance's disks on source node")
- if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
- raise errors.OpExecError("Can't shut down the instance's disks.")
-
- instance.primary_node = target_node
- # distribute new instance config to the other nodes
- self.cfg.Update(instance, feedback_fn)
-
- # Only start the instance if it's marked as up
- if instance.admin_up:
- feedback_fn("* activating the instance's disks on target node")
- logging.info("Starting instance %s on node %s",
- instance.name, target_node)
-
- disks_ok, _ = _AssembleInstanceDisks(self, instance,
- ignore_secondaries=True)
- if not disks_ok:
- _ShutdownInstanceDisks(self, instance)
- raise errors.OpExecError("Can't activate the instance's disks")
-
- feedback_fn("* starting the instance on the target node")
- result = self.rpc.call_instance_start(target_node, instance, None, None)
- msg = result.fail_msg
- if msg:
- _ShutdownInstanceDisks(self, instance)
- raise errors.OpExecError("Could not start instance %s on node %s: %s" %
- (instance.name, target_node, msg))
+ instance = self._migrater.instance
+ nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
+ return (nl, nl + [instance.primary_node])
class LUInstanceMigrate(LogicalUnit):
def ExpandNames(self):
self._ExpandAndLockInstance()
+ if self.op.target_node is not None:
+ self.op.target_node = _ExpandNodeName(self.cfg, self.op.target_node)
+
self.needed_locks[locking.LEVEL_NODE] = []
self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
self._migrater = TLMigrateInstance(self, self.op.instance_name,
- self.op.cleanup)
+ cleanup=self.op.cleanup,
+ iallocator=self.op.iallocator,
+ target_node=self.op.target_node,
+ failover=False,
+ fallback=self.op.allow_failover)
self.tasklets = [self._migrater]
def DeclareLocks(self, level):
if level == locking.LEVEL_NODE:
- self._LockInstancesNodes()
+ instance = self.context.cfg.GetInstanceInfo(self.op.instance_name)
+ if instance.disk_template in constants.DTS_EXT_MIRROR:
+ if self.op.target_node is None:
+ self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+ else:
+ self.needed_locks[locking.LEVEL_NODE] = [instance.primary_node,
+ self.op.target_node]
+ del self.recalculate_locks[locking.LEVEL_NODE]
+ else:
+ self._LockInstancesNodes()
def BuildHooksEnv(self):
"""Build hooks env.
"""
instance = self._migrater.instance
source_node = instance.primary_node
- target_node = instance.secondary_nodes[0]
+ target_node = self._migrater.target_node
env = _BuildInstanceHookEnvByObject(self, instance)
- env["MIGRATE_LIVE"] = self._migrater.live
- env["MIGRATE_CLEANUP"] = self.op.cleanup
env.update({
- "OLD_PRIMARY": source_node,
- "OLD_SECONDARY": target_node,
- "NEW_PRIMARY": target_node,
- "NEW_SECONDARY": source_node,
- })
+ "MIGRATE_LIVE": self._migrater.live,
+ "MIGRATE_CLEANUP": self.op.cleanup,
+ "OLD_PRIMARY": source_node,
+ "NEW_PRIMARY": target_node,
+ })
+
+ if instance.disk_template in constants.DTS_INT_MIRROR:
+ env["OLD_SECONDARY"] = target_node
+ env["NEW_SECONDARY"] = source_node
+ else:
+ env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = None
+
+ return env
+
+ def BuildHooksNodes(self):
+ """Build hooks nodes.
+
+ """
+ instance = self._migrater.instance
nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
- nl_post = list(nl)
- nl_post.append(source_node)
- return env, nl, nl_post
+ return (nl, nl + [instance.primary_node])
class LUInstanceMove(LogicalUnit):
"SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
}
env.update(_BuildInstanceHookEnvByObject(self, self.instance))
- nl = [self.cfg.GetMasterNode()] + [self.instance.primary_node,
- self.op.target_node]
- return env, nl, nl
+ return env
+
+ def BuildHooksNodes(self):
+ """Build hooks nodes.
+
+ """
+ nl = [
+ self.cfg.GetMasterNode(),
+ self.instance.primary_node,
+ self.op.target_node,
+ ]
+ return (nl, nl)
def CheckPrereq(self):
"""Check prerequisites.
HTYPE = constants.HTYPE_NODE
REQ_BGL = False
+ def CheckArguments(self):
+ _CheckIAllocatorOrNode(self, "iallocator", "remote_node")
+
def ExpandNames(self):
self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
- self.needed_locks = {
- locking.LEVEL_NODE: [self.op.node_name],
- }
-
- self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
+ self.needed_locks = {}
# Create tasklets for migrating instances for all instances on this node
names = []
tasklets = []
+ self.lock_all_nodes = False
+
for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name):
logging.debug("Migrating instance %s", inst.name)
names.append(inst.name)
- tasklets.append(TLMigrateInstance(self, inst.name, False))
+ tasklets.append(TLMigrateInstance(self, inst.name, cleanup=False,
+ iallocator=self.op.iallocator,
+ taget_node=None))
+
+ if inst.disk_template in constants.DTS_EXT_MIRROR:
+ # We need to lock all nodes, as the iallocator will choose the
+ # destination nodes afterwards
+ self.lock_all_nodes = True
self.tasklets = tasklets
+ # Declare node locks
+ if self.lock_all_nodes:
+ self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+ else:
+ self.needed_locks[locking.LEVEL_NODE] = [self.op.node_name]
+ self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
+
# Declare instance locks
self.needed_locks[locking.LEVEL_INSTANCE] = names
def DeclareLocks(self, level):
- if level == locking.LEVEL_NODE:
+ if level == locking.LEVEL_NODE and not self.lock_all_nodes:
self._LockInstancesNodes()
def BuildHooksEnv(self):
This runs on the master, the primary and all the secondaries.
"""
- env = {
+ return {
"NODE_NAME": self.op.node_name,
}
- nl = [self.cfg.GetMasterNode()]
+ def BuildHooksNodes(self):
+ """Build hooks nodes.
- return (env, nl, nl)
+ """
+ nl = [self.cfg.GetMasterNode()]
+ return (nl, nl)
class TLMigrateInstance(Tasklet):
@type live: boolean
@ivar live: whether the migration will be done live or non-live;
this variable is initalized only after CheckPrereq has run
+ @type cleanup: boolean
+ @ivar cleanup: Wheater we cleanup from a failed migration
+ @type iallocator: string
+ @ivar iallocator: The iallocator used to determine target_node
+ @type target_node: string
+ @ivar target_node: If given, the target_node to reallocate the instance to
+ @type failover: boolean
+ @ivar failover: Whether operation results in failover or migration
+ @type fallback: boolean
+ @ivar fallback: Whether fallback to failover is allowed if migration not
+ possible
+ @type ignore_consistency: boolean
+ @ivar ignore_consistency: Wheter we should ignore consistency between source
+ and target node
+ @type shutdown_timeout: int
+ @ivar shutdown_timeout: In case of failover timeout of the shutdown
"""
- def __init__(self, lu, instance_name, cleanup):
+ def __init__(self, lu, instance_name, cleanup=False, iallocator=None,
+ target_node=None, failover=False, fallback=False,
+ ignore_consistency=False,
+ shutdown_timeout=constants.DEFAULT_SHUTDOWN_TIMEOUT):
"""Initializes this class.
"""
self.instance_name = instance_name
self.cleanup = cleanup
self.live = False # will be overridden later
+ self.iallocator = iallocator
+ self.target_node = target_node
+ self.failover = failover
+ self.fallback = fallback
+ self.ignore_consistency = ignore_consistency
+ self.shutdown_timeout = shutdown_timeout
def CheckPrereq(self):
"""Check prerequisites.
instance_name = _ExpandInstanceName(self.lu.cfg, self.instance_name)
instance = self.cfg.GetInstanceInfo(instance_name)
assert instance is not None
+ self.instance = instance
- if instance.disk_template != constants.DT_DRBD8:
- raise errors.OpPrereqError("Instance's disk layout is not"
- " drbd8, cannot migrate.", errors.ECODE_STATE)
+ if (not self.cleanup and not instance.admin_up and not self.failover and
+ self.fallback):
+ self.lu.LogInfo("Instance is marked down, fallback allowed, switching"
+ " to failover")
+ self.failover = True
+
+ if instance.disk_template not in constants.DTS_MIRRORED:
+ if self.failover:
+ text = "failovers"
+ else:
+ text = "migrations"
+ raise errors.OpPrereqError("Instance's disk layout '%s' does not allow"
+ " %s" % (instance.disk_template, text),
+ errors.ECODE_STATE)
+
+ if instance.disk_template in constants.DTS_EXT_MIRROR:
+ _CheckIAllocatorOrNode(self.lu, "iallocator", "target_node")
- secondary_nodes = instance.secondary_nodes
- if not secondary_nodes:
- raise errors.ConfigurationError("No secondary node but using"
- " drbd8 disk template")
+ if self.iallocator:
+ self._RunAllocator()
+
+ # self.target_node is already populated, either directly or by the
+ # iallocator run
+ target_node = self.target_node
+
+ if len(self.lu.tasklets) == 1:
+ # It is safe to remove locks only when we're the only tasklet in the LU
+ nodes_keep = [instance.primary_node, self.target_node]
+ nodes_rel = [node for node in self.lu.acquired_locks[locking.LEVEL_NODE]
+ if node not in nodes_keep]
+ self.lu.context.glm.release(locking.LEVEL_NODE, nodes_rel)
+ self.lu.acquired_locks[locking.LEVEL_NODE] = nodes_keep
+
+ else:
+ secondary_nodes = instance.secondary_nodes
+ if not secondary_nodes:
+ raise errors.ConfigurationError("No secondary node but using"
+ " %s disk template" %
+ instance.disk_template)
+ target_node = secondary_nodes[0]
+ if self.iallocator or (self.target_node and
+ self.target_node != target_node):
+ if self.failover:
+ text = "failed over"
+ else:
+ text = "migrated"
+ raise errors.OpPrereqError("Instances with disk template %s cannot"
+ " be %s over to arbitrary nodes"
+ " (neither an iallocator nor a target"
+ " node can be passed)" %
+ (text, instance.disk_template),
+ errors.ECODE_INVAL)
i_be = self.cfg.GetClusterInfo().FillBE(instance)
- target_node = secondary_nodes[0]
# check memory requirements on the secondary node
- _CheckNodeFreeMemory(self.lu, target_node, "migrating instance %s" %
- instance.name, i_be[constants.BE_MEMORY],
- instance.hypervisor)
+ if not self.failover or instance.admin_up:
+ _CheckNodeFreeMemory(self.lu, target_node, "migrating instance %s" %
+ instance.name, i_be[constants.BE_MEMORY],
+ instance.hypervisor)
+ else:
+ self.lu.LogInfo("Not checking memory on the secondary node as"
+ " instance will not be started")
# check bridge existance
_CheckInstanceBridgesExist(self.lu, instance, node=target_node)
if not self.cleanup:
_CheckNodeNotDrained(self.lu, target_node)
- result = self.rpc.call_instance_migratable(instance.primary_node,
- instance)
- result.Raise("Can't migrate, please use failover",
- prereq=True, ecode=errors.ECODE_STATE)
+ if not self.failover:
+ result = self.rpc.call_instance_migratable(instance.primary_node,
+ instance)
+ if result.fail_msg and self.fallback:
+ self.lu.LogInfo("Can't migrate, instance offline, fallback to"
+ " failover")
+ self.failover = True
+ else:
+ result.Raise("Can't migrate, please use failover",
+ prereq=True, ecode=errors.ECODE_STATE)
- self.instance = instance
+ assert not (self.failover and self.cleanup)
- if self.lu.op.live is not None and self.lu.op.mode is not None:
- raise errors.OpPrereqError("Only one of the 'live' and 'mode'"
- " parameters are accepted",
- errors.ECODE_INVAL)
- if self.lu.op.live is not None:
- if self.lu.op.live:
- self.lu.op.mode = constants.HT_MIGRATION_LIVE
- else:
- self.lu.op.mode = constants.HT_MIGRATION_NONLIVE
- # reset the 'live' parameter to None so that repeated
- # invocations of CheckPrereq do not raise an exception
- self.lu.op.live = None
- elif self.lu.op.mode is None:
- # read the default value from the hypervisor
- i_hv = self.cfg.GetClusterInfo().FillHV(instance, skip_globals=False)
- self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE]
+ def _RunAllocator(self):
+ """Run the allocator based on input opcode.
+
+ """
+ ial = IAllocator(self.cfg, self.rpc,
+ mode=constants.IALLOCATOR_MODE_RELOC,
+ name=self.instance_name,
+ # TODO See why hail breaks with a single node below
+ relocate_from=[self.instance.primary_node,
+ self.instance.primary_node],
+ )
- self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE
+ ial.Run(self.iallocator)
+
+ if not ial.success:
+ raise errors.OpPrereqError("Can't compute nodes using"
+ " iallocator '%s': %s" %
+ (self.iallocator, ial.info),
+ errors.ECODE_NORES)
+ if len(ial.result) != ial.required_nodes:
+ raise errors.OpPrereqError("iallocator '%s' returned invalid number"
+ " of nodes (%s), required %s" %
+ (self.iallocator, len(ial.result),
+ ial.required_nodes), errors.ECODE_FAULT)
+ self.target_node = ial.result[0]
+ self.lu.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
+ self.instance_name, self.iallocator,
+ utils.CommaJoin(ial.result))
+
+ if not self.failover:
+ if self.lu.op.live is not None and self.lu.op.mode is not None:
+ raise errors.OpPrereqError("Only one of the 'live' and 'mode'"
+ " parameters are accepted",
+ errors.ECODE_INVAL)
+ if self.lu.op.live is not None:
+ if self.lu.op.live:
+ self.lu.op.mode = constants.HT_MIGRATION_LIVE
+ else:
+ self.lu.op.mode = constants.HT_MIGRATION_NONLIVE
+ # reset the 'live' parameter to None so that repeated
+ # invocations of CheckPrereq do not raise an exception
+ self.lu.op.live = None
+ elif self.lu.op.mode is None:
+ # read the default value from the hypervisor
+ i_hv = self.cfg.GetClusterInfo().FillHV(self.instance,
+ skip_globals=False)
+ self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE]
+
+ self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE
+ else:
+ # Failover is never live
+ self.live = False
def _WaitUntilSync(self):
"""Poll with custom rpc for disk sync.
" primary node (%s)" % source_node)
demoted_node = target_node
- self._EnsureSecondary(demoted_node)
- try:
+ if instance.disk_template in constants.DTS_INT_MIRROR:
+ self._EnsureSecondary(demoted_node)
+ try:
+ self._WaitUntilSync()
+ except errors.OpExecError:
+ # we ignore here errors, since if the device is standalone, it
+ # won't be able to sync
+ pass
+ self._GoStandalone()
+ self._GoReconnect(False)
self._WaitUntilSync()
- except errors.OpExecError:
- # we ignore here errors, since if the device is standalone, it
- # won't be able to sync
- pass
- self._GoStandalone()
- self._GoReconnect(False)
- self._WaitUntilSync()
self.feedback_fn("* done")
"""
target_node = self.target_node
+ if self.instance.disk_template in constants.DTS_EXT_MIRROR:
+ return
+
try:
self._EnsureSecondary(target_node)
self._GoStandalone()
self.migration_info = migration_info = result.payload
- # Then switch the disks to master/master mode
- self._EnsureSecondary(target_node)
- self._GoStandalone()
- self._GoReconnect(True)
- self._WaitUntilSync()
+ if self.instance.disk_template not in constants.DTS_EXT_MIRROR:
+ # Then switch the disks to master/master mode
+ self._EnsureSecondary(target_node)
+ self._GoStandalone()
+ self._GoReconnect(True)
+ self._WaitUntilSync()
self.feedback_fn("* preparing %s to accept the instance" % target_node)
result = self.rpc.call_accept_instance(target_node,
(instance.name, msg))
self.feedback_fn("* migrating instance to %s" % target_node)
- time.sleep(10)
result = self.rpc.call_instance_migrate(source_node, instance,
self.nodes_ip[target_node],
self.live)
self._RevertDiskStatus()
raise errors.OpExecError("Could not migrate instance %s: %s" %
(instance.name, msg))
- time.sleep(10)
instance.primary_node = target_node
# distribute new instance config to the other nodes
raise errors.OpExecError("Could not finalize instance migration: %s" %
msg)
- self._EnsureSecondary(source_node)
- self._WaitUntilSync()
- self._GoStandalone()
- self._GoReconnect(False)
- self._WaitUntilSync()
+ if self.instance.disk_template not in constants.DTS_EXT_MIRROR:
+ self._EnsureSecondary(source_node)
+ self._WaitUntilSync()
+ self._GoStandalone()
+ self._GoReconnect(False)
+ self._WaitUntilSync()
self.feedback_fn("* done")
+ def _ExecFailover(self):
+ """Failover an instance.
+
+ The failover is done by shutting it down on its present node and
+ starting it on the secondary.
+
+ """
+ instance = self.instance
+ primary_node = self.cfg.GetNodeInfo(instance.primary_node)
+
+ source_node = instance.primary_node
+ target_node = self.target_node
+
+ if instance.admin_up:
+ self.feedback_fn("* checking disk consistency between source and target")
+ for dev in instance.disks:
+ # for drbd, these are drbd over lvm
+ if not _CheckDiskConsistency(self, dev, target_node, False):
+ if not self.ignore_consistency:
+ raise errors.OpExecError("Disk %s is degraded on target node,"
+ " aborting failover." % dev.iv_name)
+ else:
+ self.feedback_fn("* not checking disk consistency as instance is not"
+ " running")
+
+ self.feedback_fn("* shutting down instance on source node")
+ logging.info("Shutting down instance %s on node %s",
+ instance.name, source_node)
+
+ result = self.rpc.call_instance_shutdown(source_node, instance,
+ self.shutdown_timeout)
+ msg = result.fail_msg
+ if msg:
+ if self.ignore_consistency or primary_node.offline:
+ self.lu.LogWarning("Could not shutdown instance %s on node %s."
+ " Proceeding anyway. Please make sure node"
+ " %s is down. Error details: %s",
+ instance.name, source_node, source_node, msg)
+ else:
+ raise errors.OpExecError("Could not shutdown instance %s on"
+ " node %s: %s" %
+ (instance.name, source_node, msg))
+
+ self.feedback_fn("* deactivating the instance's disks on source node")
+ if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
+ raise errors.OpExecError("Can't shut down the instance's disks.")
+
+ instance.primary_node = target_node
+ # distribute new instance config to the other nodes
+ self.cfg.Update(instance, self.feedback_fn)
+
+ # Only start the instance if it's marked as up
+ if instance.admin_up:
+ self.feedback_fn("* activating the instance's disks on target node")
+ logging.info("Starting instance %s on node %s",
+ instance.name, target_node)
+
+ disks_ok, _ = _AssembleInstanceDisks(self, instance,
+ ignore_secondaries=True)
+ if not disks_ok:
+ _ShutdownInstanceDisks(self, instance)
+ raise errors.OpExecError("Can't activate the instance's disks")
+
+ self.feedback_fn("* starting the instance on the target node")
+ result = self.rpc.call_instance_start(target_node, instance, None, None)
+ msg = result.fail_msg
+ if msg:
+ _ShutdownInstanceDisks(self, instance)
+ raise errors.OpExecError("Could not start instance %s on node %s: %s" %
+ (instance.name, target_node, msg))
+
def Exec(self, feedback_fn):
"""Perform the migration.
"""
- feedback_fn("Migrating instance %s" % self.instance.name)
-
self.feedback_fn = feedback_fn
-
self.source_node = self.instance.primary_node
- self.target_node = self.instance.secondary_nodes[0]
+
+ # FIXME: if we implement migrate-to-any in DRBD, this needs fixing
+ if self.instance.disk_template in constants.DTS_INT_MIRROR:
+ self.target_node = self.instance.secondary_nodes[0]
+ # Otherwise self.target_node has been populated either
+ # directly, or through an iallocator.
+
self.all_nodes = [self.source_node, self.target_node]
self.nodes_ip = {
self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
}
- if self.cleanup:
- return self._ExecCleanup()
+ if self.failover:
+ feedback_fn("Failover instance %s" % self.instance.name)
+ self._ExecFailover()
else:
- return self._ExecMigration()
+ feedback_fn("Migrating instance %s" % self.instance.name)
+
+ if self.cleanup:
+ return self._ExecCleanup()
+ else:
+ return self._ExecMigration()
def _CreateBlockDev(lu, node, instance, device, force_create,
for i in range(disk_count)])
for idx, disk in enumerate(disk_info):
disk_index = idx + base_index
- vg = disk.get("vg", vgname)
+ vg = disk.get(constants.IDISK_VG, vgname)
feedback_fn("* disk %i, vg %s, name %s" % (idx, vg, names[idx]))
- disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
+ disk_dev = objects.Disk(dev_type=constants.LD_LV,
+ size=disk[constants.IDISK_SIZE],
logical_id=(vg, names[idx]),
iv_name="disk/%d" % disk_index,
- mode=disk["mode"])
+ mode=disk[constants.IDISK_MODE])
disks.append(disk_dev)
elif template_name == constants.DT_DRBD8:
if len(secondary_nodes) != 1:
names.append(lv_prefix + "_meta")
for idx, disk in enumerate(disk_info):
disk_index = idx + base_index
- vg = disk.get("vg", vgname)
+ vg = disk.get(constants.IDISK_VG, vgname)
disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
- disk["size"], vg, names[idx*2:idx*2+2],
+ disk[constants.IDISK_SIZE], vg,
+ names[idx * 2:idx * 2 + 2],
"disk/%d" % disk_index,
- minors[idx*2], minors[idx*2+1])
- disk_dev.mode = disk["mode"]
+ minors[idx * 2], minors[idx * 2 + 1])
+ disk_dev.mode = disk[constants.IDISK_MODE]
disks.append(disk_dev)
elif template_name == constants.DT_FILE:
if len(secondary_nodes) != 0:
for idx, disk in enumerate(disk_info):
disk_index = idx + base_index
- disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
+ disk_dev = objects.Disk(dev_type=constants.LD_FILE,
+ size=disk[constants.IDISK_SIZE],
+ iv_name="disk/%d" % disk_index,
+ logical_id=(file_driver,
+ "%s/disk%d" % (file_storage_dir,
+ disk_index)),
+ mode=disk[constants.IDISK_MODE])
+ disks.append(disk_dev)
+ elif template_name == constants.DT_SHARED_FILE:
+ if len(secondary_nodes) != 0:
+ raise errors.ProgrammerError("Wrong template configuration")
+
+ opcodes.RequireSharedFileStorage()
+
+ for idx, disk in enumerate(disk_info):
+ disk_index = idx + base_index
+ disk_dev = objects.Disk(dev_type=constants.LD_FILE,
+ size=disk[constants.IDISK_SIZE],
iv_name="disk/%d" % disk_index,
logical_id=(file_driver,
"%s/disk%d" % (file_storage_dir,
disk_index)),
- mode=disk["mode"])
+ mode=disk[constants.IDISK_MODE])
disks.append(disk_dev)
+ elif template_name == constants.DT_BLOCK:
+ if len(secondary_nodes) != 0:
+ raise errors.ProgrammerError("Wrong template configuration")
+
+ for idx, disk in enumerate(disk_info):
+ disk_index = idx + base_index
+ disk_dev = objects.Disk(dev_type=constants.LD_BLOCKDEV,
+ size=disk[constants.IDISK_SIZE],
+ logical_id=(constants.BLOCKDEV_DRIVER_MANUAL,
+ disk[constants.IDISK_ADOPT]),
+ iv_name="disk/%d" % disk_index,
+ mode=disk[constants.IDISK_MODE])
+ disks.append(disk_dev)
+
else:
raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
return disks
"""
node = instance.primary_node
+
+ for device in instance.disks:
+ lu.cfg.SetDiskID(device, node)
+
logging.info("Pause sync of instance %s disks", instance.name)
result = lu.rpc.call_blockdev_pause_resume_sync(node, instance.disks, True)
try:
for idx, device in enumerate(instance.disks):
lu.LogInfo("* Wiping disk %d", idx)
- logging.info("Wiping disk %d for instance %s", idx, instance.name)
+ logging.info("Wiping disk %d for instance %s, node %s",
+ idx, instance.name, node)
# The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
# MAX_WIPE_CHUNK at max
pnode = target_node
all_nodes = [pnode]
- if instance.disk_template == constants.DT_FILE:
+ if instance.disk_template in (constants.DT_FILE, constants.DT_SHARED_FILE):
file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
"""
def _compute(disks, payload):
- """Universal algorithm
+ """Universal algorithm.
"""
vgs = {}
for disk in disks:
- vgs[disk["vg"]] = vgs.get("vg", 0) + disk["size"] + payload
+ vgs[disk[constants.IDISK_VG]] = \
+ vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
return vgs
# 128 MB are added for drbd metadata for each disk
constants.DT_DRBD8: _compute(disks, 128),
constants.DT_FILE: {},
+ constants.DT_SHARED_FILE: {},
}
if disk_template not in req_size_dict:
# Required free disk space as a function of disk and swap space
req_size_dict = {
constants.DT_DISKLESS: None,
- constants.DT_PLAIN: sum(d["size"] for d in disks),
+ constants.DT_PLAIN: sum(d[constants.IDISK_SIZE] for d in disks),
# 128 MB are added for drbd metadata for each disk
- constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
+ constants.DT_DRBD8: sum(d[constants.IDISK_SIZE] + 128 for d in disks),
constants.DT_FILE: None,
+ constants.DT_SHARED_FILE: 0,
+ constants.DT_BLOCK: 0,
}
if disk_template not in req_size_dict:
return req_size_dict[disk_template]
+def _FilterVmNodes(lu, nodenames):
+ """Filters out non-vm_capable nodes from a list.
+
+ @type lu: L{LogicalUnit}
+ @param lu: the logical unit for which we check
+ @type nodenames: list
+ @param nodenames: the list of nodes on which we should check
+ @rtype: list
+ @return: the list of vm-capable nodes
+
+ """
+ vm_nodes = frozenset(lu.cfg.GetNonVmCapableNodeList())
+ return [name for name in nodenames if name not in vm_nodes]
+
+
def _CheckHVParams(lu, nodenames, hvname, hvparams):
"""Hypervisor parameter validation.
@raise errors.OpPrereqError: if the parameters are not valid
"""
+ nodenames = _FilterVmNodes(lu, nodenames)
hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
hvname,
hvparams)
@raise errors.OpPrereqError: if the parameters are not valid
"""
+ nodenames = _FilterVmNodes(lu, nodenames)
result = lu.rpc.call_os_validate(required, nodenames, osname,
[constants.OS_VALIDATE_PARAMETERS],
osparams)
has_adopt = has_no_adopt = False
for disk in self.op.disks:
utils.ForceDictType(disk, constants.IDISK_PARAMS_TYPES)
- if "adopt" in disk:
+ if constants.IDISK_ADOPT in disk:
has_adopt = True
else:
has_no_adopt = True
if self.op.mode == constants.INSTANCE_IMPORT:
raise errors.OpPrereqError("Disk adoption not allowed for"
" instance import", errors.ECODE_INVAL)
+ else:
+ if self.op.disk_template in constants.DTS_MUST_ADOPT:
+ raise errors.OpPrereqError("Disk template %s requires disk adoption,"
+ " but no 'adopt' parameter given" %
+ self.op.disk_template,
+ errors.ECODE_INVAL)
self.adopt_disks = has_adopt
_CheckIAllocatorOrNode(self, "iallocator", "pnode")
if self.op.pnode is not None:
- if self.op.disk_template in constants.DTS_NET_MIRROR:
+ if self.op.disk_template in constants.DTS_INT_MIRROR:
if self.op.snode is None:
raise errors.OpPrereqError("The networked disk templates need"
" a mirror node", errors.ECODE_INVAL)
vcpus=self.be_full[constants.BE_VCPUS],
nics=_NICListToTuple(self, self.nics),
disk_template=self.op.disk_template,
- disks=[(d["size"], d["mode"]) for d in self.disks],
+ disks=[(d[constants.IDISK_SIZE], d[constants.IDISK_MODE])
+ for d in self.disks],
bep=self.be_full,
hvp=self.hv_full,
hypervisor_name=self.op.hypervisor,
))
- nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
- self.secondaries)
- return env, nl, nl
+ return env
+
+ def BuildHooksNodes(self):
+ """Build hooks nodes.
+
+ """
+ nl = [self.cfg.GetMasterNode(), self.op.pnode] + self.secondaries
+ return nl, nl
def _ReadExportInfo(self):
"""Reads the export information from disk.
# TODO: import the disk iv_name too
for idx in range(einfo.getint(constants.INISECT_INS, "disk_count")):
disk_sz = einfo.getint(constants.INISECT_INS, "disk%d_size" % idx)
- disks.append({"size": disk_sz})
+ disks.append({constants.IDISK_SIZE: disk_sz})
self.op.disks = disks
else:
raise errors.OpPrereqError("No disk info specified and the export"
# NIC buildup
self.nics = []
for idx, nic in enumerate(self.op.nics):
- nic_mode_req = nic.get("mode", None)
+ nic_mode_req = nic.get(constants.INIC_MODE, None)
nic_mode = nic_mode_req
if nic_mode is None:
nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
default_ip_mode = constants.VALUE_NONE
# ip validity checks
- ip = nic.get("ip", default_ip_mode)
+ ip = nic.get(constants.INIC_IP, default_ip_mode)
if ip is None or ip.lower() == constants.VALUE_NONE:
nic_ip = None
elif ip.lower() == constants.VALUE_AUTO:
errors.ECODE_INVAL)
# MAC address verification
- mac = nic.get("mac", constants.VALUE_AUTO)
+ mac = nic.get(constants.INIC_MAC, constants.VALUE_AUTO)
if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
mac = utils.NormalizeAndValidateMac(mac)
" in cluster" % mac,
errors.ECODE_NOTUNIQUE)
- # bridge verification
- bridge = nic.get("bridge", None)
- link = nic.get("link", None)
- if bridge and link:
- raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
- " at the same time", errors.ECODE_INVAL)
- elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
- raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic",
- errors.ECODE_INVAL)
- elif bridge:
- link = bridge
-
+ # Build nic parameters
+ link = nic.get(constants.INIC_LINK, None)
nicparams = {}
if nic_mode_req:
nicparams[constants.NIC_MODE] = nic_mode_req
self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
# disk checks/pre-build
+ default_vg = self.cfg.GetVGName()
self.disks = []
for disk in self.op.disks:
- mode = disk.get("mode", constants.DISK_RDWR)
+ mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
if mode not in constants.DISK_ACCESS_SET:
raise errors.OpPrereqError("Invalid disk access mode '%s'" %
mode, errors.ECODE_INVAL)
- size = disk.get("size", None)
+ size = disk.get(constants.IDISK_SIZE, None)
if size is None:
raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
try:
except (TypeError, ValueError):
raise errors.OpPrereqError("Invalid disk size '%s'" % size,
errors.ECODE_INVAL)
- vg = disk.get("vg", self.cfg.GetVGName())
- new_disk = {"size": size, "mode": mode, "vg": vg}
- if "adopt" in disk:
- new_disk["adopt"] = disk["adopt"]
+ new_disk = {
+ constants.IDISK_SIZE: size,
+ constants.IDISK_MODE: mode,
+ constants.IDISK_VG: disk.get(constants.IDISK_VG, default_vg),
+ }
+ if constants.IDISK_ADOPT in disk:
+ new_disk[constants.IDISK_ADOPT] = disk[constants.IDISK_ADOPT]
self.disks.append(new_disk)
if self.op.mode == constants.INSTANCE_IMPORT:
self.secondaries = []
# mirror node verification
- if self.op.disk_template in constants.DTS_NET_MIRROR:
+ if self.op.disk_template in constants.DTS_INT_MIRROR:
if self.op.snode == pnode.name:
raise errors.OpPrereqError("The secondary node cannot be the"
" primary node.", errors.ECODE_INVAL)
req_sizes = _ComputeDiskSizePerVG(self.op.disk_template, self.disks)
_CheckNodesFreeDiskPerVG(self, nodenames, req_sizes)
- else: # instead, we must check the adoption data
- all_lvs = set([i["vg"] + "/" + i["adopt"] for i in self.disks])
+ elif self.op.disk_template == constants.DT_PLAIN: # Check the adoption data
+ all_lvs = set(["%s/%s" % (disk[constants.IDISK_VG],
+ disk[constants.IDISK_ADOPT])
+ for disk in self.disks])
if len(all_lvs) != len(self.disks):
raise errors.OpPrereqError("Duplicate volume names given for adoption",
errors.ECODE_INVAL)
errors.ECODE_STATE)
# update the size of disk based on what is found
for dsk in self.disks:
- dsk["size"] = int(float(node_lvs[dsk["vg"] + "/" + dsk["adopt"]][0]))
+ dsk[constants.IDISK_SIZE] = \
+ int(float(node_lvs["%s/%s" % (dsk[constants.IDISK_VG],
+ dsk[constants.IDISK_ADOPT])][0]))
+
+ elif self.op.disk_template == constants.DT_BLOCK:
+ # Normalize and de-duplicate device paths
+ all_disks = set([os.path.abspath(disk[constants.IDISK_ADOPT])
+ for disk in self.disks])
+ if len(all_disks) != len(self.disks):
+ raise errors.OpPrereqError("Duplicate disk names given for adoption",
+ errors.ECODE_INVAL)
+ baddisks = [d for d in all_disks
+ if not d.startswith(constants.ADOPTABLE_BLOCKDEV_ROOT)]
+ if baddisks:
+ raise errors.OpPrereqError("Device node(s) %s lie outside %s and"
+ " cannot be adopted" %
+ (", ".join(baddisks),
+ constants.ADOPTABLE_BLOCKDEV_ROOT),
+ errors.ECODE_INVAL)
+
+ node_disks = self.rpc.call_bdev_sizes([pnode.name],
+ list(all_disks))[pnode.name]
+ node_disks.Raise("Cannot get block device information from node %s" %
+ pnode.name)
+ node_disks = node_disks.payload
+ delta = all_disks.difference(node_disks.keys())
+ if delta:
+ raise errors.OpPrereqError("Missing block device(s): %s" %
+ utils.CommaJoin(delta),
+ errors.ECODE_INVAL)
+ for dsk in self.disks:
+ dsk[constants.IDISK_SIZE] = \
+ int(float(node_disks[dsk[constants.IDISK_ADOPT]]))
_CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
else:
network_port = None
- if constants.ENABLE_FILE_STORAGE:
+ if constants.ENABLE_FILE_STORAGE or constants.ENABLE_SHARED_FILE_STORAGE:
# this is needed because os.path.join does not accept None arguments
if self.op.file_storage_dir is None:
string_file_storage_dir = ""
string_file_storage_dir = self.op.file_storage_dir
# build the full file storage dir path
- file_storage_dir = utils.PathJoin(self.cfg.GetFileStorageDir(),
+ if self.op.disk_template == constants.DT_SHARED_FILE:
+ get_fsd_fn = self.cfg.GetSharedFileStorageDir
+ else:
+ get_fsd_fn = self.cfg.GetFileStorageDir
+
+ file_storage_dir = utils.PathJoin(get_fsd_fn(),
string_file_storage_dir, instance)
else:
file_storage_dir = ""
)
if self.adopt_disks:
- # rename LVs to the newly-generated names; we need to construct
- # 'fake' LV disks with the old data, plus the new unique_id
- tmp_disks = [objects.Disk.FromDict(v.ToDict()) for v in disks]
- rename_to = []
- for t_dsk, a_dsk in zip (tmp_disks, self.disks):
- rename_to.append(t_dsk.logical_id)
- t_dsk.logical_id = (t_dsk.logical_id[0], a_dsk["adopt"])
- self.cfg.SetDiskID(t_dsk, pnode_name)
- result = self.rpc.call_blockdev_rename(pnode_name,
- zip(tmp_disks, rename_to))
- result.Raise("Failed to rename adoped LVs")
+ if self.op.disk_template == constants.DT_PLAIN:
+ # rename LVs to the newly-generated names; we need to construct
+ # 'fake' LV disks with the old data, plus the new unique_id
+ tmp_disks = [objects.Disk.FromDict(v.ToDict()) for v in disks]
+ rename_to = []
+ for t_dsk, a_dsk in zip (tmp_disks, self.disks):
+ rename_to.append(t_dsk.logical_id)
+ t_dsk.logical_id = (t_dsk.logical_id[0], a_dsk[constants.IDISK_ADOPT])
+ self.cfg.SetDiskID(t_dsk, pnode_name)
+ result = self.rpc.call_blockdev_rename(pnode_name,
+ zip(tmp_disks, rename_to))
+ result.Raise("Failed to rename adoped LVs")
else:
feedback_fn("* creating instance disks...")
try:
if self.op.wait_for_sync:
disk_abort = not _WaitForSync(self, iobj)
- elif iobj.disk_template in constants.DTS_NET_MIRROR:
+ elif iobj.disk_template in constants.DTS_INT_MIRROR:
# make sure the disks are not degraded (still sync-ing is ok)
time.sleep(15)
feedback_fn("* checking mirrors status")
if instance.name not in node_insts.payload:
if instance.admin_up:
- state = "ERROR_down"
+ state = constants.INSTST_ERRORDOWN
else:
- state = "ADMIN_down"
+ state = constants.INSTST_ADMINDOWN
raise errors.OpExecError("Instance %s is not running (state %s)" %
(instance.name, state))
"OLD_SECONDARY": instance.secondary_nodes[0],
}
env.update(_BuildInstanceHookEnvByObject(self, instance))
+ return env
+
+ def BuildHooksNodes(self):
+ """Build hooks nodes.
+
+ """
+ instance = self.replacer.instance
nl = [
self.cfg.GetMasterNode(),
instance.primary_node,
]
if self.op.remote_node is not None:
nl.append(self.op.remote_node)
- return env, nl, nl
+ return nl, nl
class TLReplaceDisks(Tasklet):
return _FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
node_name, True)
+ def _CheckDisksActivated(self, instance):
+ """Checks if the instance disks are activated.
+
+ @param instance: The instance to check disks
+ @return: True if they are activated, False otherwise
+
+ """
+ nodes = instance.all_nodes
+
+ for idx, dev in enumerate(instance.disks):
+ for node in nodes:
+ self.lu.LogInfo("Checking disk/%d on %s", idx, node)
+ self.cfg.SetDiskID(dev, node)
+
+ result = self.rpc.call_blockdev_find(node, dev)
+
+ if result.offline:
+ continue
+ elif result.fail_msg or not result.payload:
+ return False
+
+ return True
+
+
def CheckPrereq(self):
"""Check prerequisites.
errors.ECODE_INVAL)
if self.mode == constants.REPLACE_DISK_AUTO:
+ if not self._CheckDisksActivated(instance):
+ raise errors.OpPrereqError("Please run activate-disks on instance %s"
+ " first" % self.instance_name,
+ errors.ECODE_STATE)
faulty_primary = self._FindFaultyDisks(instance.primary_node)
faulty_secondary = self._FindFaultyDisks(secondary_node)
"AMOUNT": self.op.amount,
}
env.update(_BuildInstanceHookEnvByObject(self, self.instance))
+ return env
+
+ def BuildHooksNodes(self):
+ """Build hooks nodes.
+
+ """
nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
- return env, nl, nl
+ return (nl, nl)
def CheckPrereq(self):
"""Check prerequisites.
self.disk = instance.FindDisk(self.op.disk)
- if instance.disk_template != constants.DT_FILE:
- # TODO: check the free disk space for file, when that feature
- # will be supported
+ if instance.disk_template not in (constants.DT_FILE,
+ constants.DT_SHARED_FILE):
+ # TODO: check the free disk space for file, when that feature will be
+ # supported
_CheckNodesFreeDiskPerVG(self, nodenames,
self.disk.ComputeGrowth(self.op.amount))
def ExpandNames(self):
self.needed_locks = {}
- self.share_locks = dict.fromkeys(locking.LEVELS, 1)
- if self.op.instances:
- self.wanted_names = []
- for name in self.op.instances:
- full_name = _ExpandInstanceName(self.cfg, name)
- self.wanted_names.append(full_name)
- self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
+ # Use locking if requested or when non-static information is wanted
+ if not (self.op.static or self.op.use_locking):
+ self.LogWarning("Non-static data requested, locks need to be acquired")
+ self.op.use_locking = True
+
+ if self.op.instances or not self.op.use_locking:
+ # Expand instance names right here
+ self.wanted_names = _GetWantedInstances(self, self.op.instances)
else:
+ # Will use acquired locks
self.wanted_names = None
- self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
- self.needed_locks[locking.LEVEL_NODE] = []
- self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+ if self.op.use_locking:
+ self.share_locks = dict.fromkeys(locking.LEVELS, 1)
+
+ if self.wanted_names is None:
+ self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
+ else:
+ self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
+
+ self.needed_locks[locking.LEVEL_NODE] = []
+ self.share_locks = dict.fromkeys(locking.LEVELS, 1)
+ self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
def DeclareLocks(self, level):
- if level == locking.LEVEL_NODE:
+ if self.op.use_locking and level == locking.LEVEL_NODE:
self._LockInstancesNodes()
def CheckPrereq(self):
"""
if self.wanted_names is None:
+ assert self.op.use_locking, "Locking was not used"
self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
- self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
- in self.wanted_names]
+ self.wanted_instances = [self.cfg.GetInstanceInfo(name)
+ for name in self.wanted_names]
def _ComputeBlockdevStatus(self, node, instance_name, dev):
"""Returns the status of a block device
else:
dev_children = []
- data = {
+ return {
"iv_name": dev.iv_name,
"dev_type": dev.dev_type,
"logical_id": dev.logical_id,
"size": dev.size,
}
- return data
-
def Exec(self, feedback_fn):
"""Gather and return data"""
result = {}
disks = [self._ComputeDiskStatus(instance, None, device)
for device in instance.disks]
- idict = {
+ result[instance.name] = {
"name": instance.name,
"config_state": config_state,
"run_state": remote_state,
"uuid": instance.uuid,
}
- result[instance.name] = idict
-
return result
raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
if disk_op == constants.DDM_ADD:
- mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
+ mode = disk_dict.setdefault(constants.IDISK_MODE, constants.DISK_RDWR)
if mode not in constants.DISK_ACCESS_SET:
raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode,
errors.ECODE_INVAL)
- size = disk_dict.get('size', None)
+ size = disk_dict.get(constants.IDISK_SIZE, None)
if size is None:
raise errors.OpPrereqError("Required disk parameter size missing",
errors.ECODE_INVAL)
except (TypeError, ValueError), err:
raise errors.OpPrereqError("Invalid disk size parameter: %s" %
str(err), errors.ECODE_INVAL)
- disk_dict['size'] = size
+ disk_dict[constants.IDISK_SIZE] = size
else:
# modification of disk
- if 'size' in disk_dict:
+ if constants.IDISK_SIZE in disk_dict:
raise errors.OpPrereqError("Disk size change not possible, use"
" grow-disk", errors.ECODE_INVAL)
errors.ECODE_INVAL)
if (self.op.disk_template and
- self.op.disk_template in constants.DTS_NET_MIRROR and
+ self.op.disk_template in constants.DTS_INT_MIRROR and
self.op.remote_node is None):
raise errors.OpPrereqError("Changing the disk template to a mirrored"
" one requires specifying a secondary node",
raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
# nic_dict should be a dict
- nic_ip = nic_dict.get('ip', None)
+ nic_ip = nic_dict.get(constants.INIC_IP, None)
if nic_ip is not None:
if nic_ip.lower() == constants.VALUE_NONE:
- nic_dict['ip'] = None
+ nic_dict[constants.INIC_IP] = None
else:
if not netutils.IPAddress.IsValid(nic_ip):
raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip,
errors.ECODE_INVAL)
nic_bridge = nic_dict.get('bridge', None)
- nic_link = nic_dict.get('link', None)
+ nic_link = nic_dict.get(constants.INIC_LINK, None)
if nic_bridge and nic_link:
raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
" at the same time", errors.ECODE_INVAL)
elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
nic_dict['bridge'] = None
elif nic_link and nic_link.lower() == constants.VALUE_NONE:
- nic_dict['link'] = None
+ nic_dict[constants.INIC_LINK] = None
if nic_op == constants.DDM_ADD:
- nic_mac = nic_dict.get('mac', None)
+ nic_mac = nic_dict.get(constants.INIC_MAC, None)
if nic_mac is None:
- nic_dict['mac'] = constants.VALUE_AUTO
+ nic_dict[constants.INIC_MAC] = constants.VALUE_AUTO
- if 'mac' in nic_dict:
- nic_mac = nic_dict['mac']
+ if constants.INIC_MAC in nic_dict:
+ nic_mac = nic_dict[constants.INIC_MAC]
if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
nic_mac = utils.NormalizeAndValidateMac(nic_mac)
this_nic_override = nic_override[idx]
else:
this_nic_override = {}
- if 'ip' in this_nic_override:
- ip = this_nic_override['ip']
+ if constants.INIC_IP in this_nic_override:
+ ip = this_nic_override[constants.INIC_IP]
else:
ip = nic.ip
- if 'mac' in this_nic_override:
- mac = this_nic_override['mac']
+ if constants.INIC_MAC in this_nic_override:
+ mac = this_nic_override[constants.INIC_MAC]
else:
mac = nic.mac
if idx in self.nic_pnew:
link = nicparams[constants.NIC_LINK]
args['nics'].append((ip, mac, mode, link))
if constants.DDM_ADD in nic_override:
- ip = nic_override[constants.DDM_ADD].get('ip', None)
- mac = nic_override[constants.DDM_ADD]['mac']
+ ip = nic_override[constants.DDM_ADD].get(constants.INIC_IP, None)
+ mac = nic_override[constants.DDM_ADD][constants.INIC_MAC]
nicparams = self.nic_pnew[constants.DDM_ADD]
mode = nicparams[constants.NIC_MODE]
link = nicparams[constants.NIC_LINK]
env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
if self.op.disk_template:
env["NEW_DISK_TEMPLATE"] = self.op.disk_template
+
+ return env
+
+ def BuildHooksNodes(self):
+ """Build hooks nodes.
+
+ """
nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
- return env, nl, nl
+ return (nl, nl)
def CheckPrereq(self):
"""Check prerequisites.
self.op.disk_template),
errors.ECODE_INVAL)
_CheckInstanceDown(self, instance, "cannot change disk template")
- if self.op.disk_template in constants.DTS_NET_MIRROR:
+ if self.op.disk_template in constants.DTS_INT_MIRROR:
if self.op.remote_node == pnode:
raise errors.OpPrereqError("Given new secondary node %s is the same"
" as the primary node of the instance" %
_CheckNodeNotDrained(self, self.op.remote_node)
# FIXME: here we assume that the old instance type is DT_PLAIN
assert instance.disk_template == constants.DT_PLAIN
- disks = [{"size": d.size, "vg": d.logical_id[0]}
+ disks = [{constants.IDISK_SIZE: d.size,
+ constants.IDISK_VG: d.logical_id[0]}
for d in instance.disks]
required = _ComputeDiskSizePerVG(self.op.disk_template, disks)
_CheckNodesFreeDiskPerVG(self, [self.op.remote_node], required)
else:
raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
if new_nic_mode == constants.NIC_MODE_ROUTED:
- if 'ip' in nic_dict:
- nic_ip = nic_dict['ip']
+ if constants.INIC_IP in nic_dict:
+ nic_ip = nic_dict[constants.INIC_IP]
else:
nic_ip = old_nic_ip
if nic_ip is None:
raise errors.OpPrereqError('Cannot set the nic ip to None'
' on a routed nic', errors.ECODE_INVAL)
- if 'mac' in nic_dict:
- nic_mac = nic_dict['mac']
+ if constants.INIC_MAC in nic_dict:
+ nic_mac = nic_dict[constants.INIC_MAC]
if nic_mac is None:
raise errors.OpPrereqError('Cannot set the nic mac to None',
errors.ECODE_INVAL)
elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
# otherwise generate the mac
- nic_dict['mac'] = self.cfg.GenerateMAC(self.proc.GetECId())
+ nic_dict[constants.INIC_MAC] = \
+ self.cfg.GenerateMAC(self.proc.GetECId())
else:
# or validate/reserve the current one
try:
snode = self.op.remote_node
# create a fake disk info for _GenerateDiskTemplate
- disk_info = [{"size": d.size, "mode": d.mode} for d in instance.disks]
+ disk_info = [{constants.IDISK_SIZE: d.size, constants.IDISK_MODE: d.mode}
+ for d in instance.disks]
new_disks = _GenerateDiskTemplate(self, self.op.disk_template,
instance.name, pnode, [snode],
disk_info, None, None, 0, feedback_fn)
result.append(("disk/%d" % device_idx, "remove"))
elif disk_op == constants.DDM_ADD:
# add a new disk
- if instance.disk_template == constants.DT_FILE:
+ if instance.disk_template in (constants.DT_FILE,
+ constants.DT_SHARED_FILE):
file_driver, file_path = instance.disks[0].logical_id
file_path = os.path.dirname(file_path)
else:
(new_disk.size, new_disk.mode)))
else:
# change a given disk
- instance.disks[disk_op].mode = disk_dict['mode']
- result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
+ instance.disks[disk_op].mode = disk_dict[constants.IDISK_MODE]
+ result.append(("disk.mode/%d" % disk_op,
+ disk_dict[constants.IDISK_MODE]))
if self.op.disk_template:
r_shut = _ShutdownInstanceDisks(self, instance)
result.append(("nic.%d" % len(instance.nics), "remove"))
elif nic_op == constants.DDM_ADD:
# mac and bridge should be set, by now
- mac = nic_dict['mac']
- ip = nic_dict.get('ip', None)
+ mac = nic_dict[constants.INIC_MAC]
+ ip = nic_dict.get(constants.INIC_IP, None)
nicparams = self.nic_pinst[constants.DDM_ADD]
new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
instance.nics.append(new_nic)
self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
)))
else:
- for key in 'mac', 'ip':
+ for key in (constants.INIC_MAC, constants.INIC_IP):
if key in nic_dict:
setattr(instance.nics[nic_op], key, nic_dict[key])
if nic_op in self.nic_pinst:
env.update(_BuildInstanceHookEnvByObject(self, self.instance))
+ return env
+
+ def BuildHooksNodes(self):
+ """Build hooks nodes.
+
+ """
nl = [self.cfg.GetMasterNode(), self.instance.primary_node]
if self.op.mode == constants.EXPORT_MODE_LOCAL:
nl.append(self.op.target_node)
- return env, nl, nl
+ return (nl, nl)
def CheckPrereq(self):
"""Check prerequisites.
"""Build hooks env.
"""
- env = {
+ return {
"GROUP_NAME": self.op.group_name,
}
+
+ def BuildHooksNodes(self):
+ """Build hooks nodes.
+
+ """
mn = self.cfg.GetMasterNode()
- return env, [mn], [mn]
+ return ([mn], [mn])
def Exec(self, feedback_fn):
"""Add the node group to the cluster.
In particular, it returns information about newly split instances, and
instances that were already split, and remain so after the change.
- Only instances whose disk template is listed in constants.DTS_NET_MIRROR are
+ Only instances whose disk template is listed in constants.DTS_INT_MIRROR are
considered.
@type changes: list of (node_name, new_group_uuid) pairs.
return [instance.primary_node] + list(instance.secondary_nodes)
for inst in instance_data.values():
- if inst.disk_template not in constants.DTS_NET_MIRROR:
+ if inst.disk_template not in constants.DTS_INT_MIRROR:
continue
instance_nodes = InstanceNodes(inst)
class _GroupQuery(_QueryBase):
-
FIELDS = query.GROUP_FIELDS
def ExpandNames(self, lu):
REQ_BGL = False
def CheckArguments(self):
- self.gq = _GroupQuery(self.op.names, self.op.output_fields, False)
+ self.gq = _GroupQuery(qlang.MakeSimpleFilter("name", self.op.names),
+ self.op.output_fields, False)
def ExpandNames(self):
self.gq.ExpandNames(self)
"""Build hooks env.
"""
- env = {
+ return {
"GROUP_NAME": self.op.group_name,
"NEW_ALLOC_POLICY": self.op.alloc_policy,
}
+
+ def BuildHooksNodes(self):
+ """Build hooks nodes.
+
+ """
mn = self.cfg.GetMasterNode()
- return env, [mn], [mn]
+ return ([mn], [mn])
def Exec(self, feedback_fn):
"""Modifies the node group.
"""Build hooks env.
"""
- env = {
+ return {
"GROUP_NAME": self.op.group_name,
}
+
+ def BuildHooksNodes(self):
+ """Build hooks nodes.
+
+ """
mn = self.cfg.GetMasterNode()
- return env, [mn], [mn]
+ return ([mn], [mn])
def Exec(self, feedback_fn):
"""Remove the node group.
def ExpandNames(self):
# This raises errors.OpPrereqError on its own:
- self.group_uuid = self.cfg.LookupNodeGroup(self.op.old_name)
+ self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
self.needed_locks = {
locking.LEVEL_NODEGROUP: [self.group_uuid],
def CheckPrereq(self):
"""Check prerequisites.
- This checks that the given old_name exists as a node group, and that
- new_name doesn't.
+ Ensures requested new name is not yet used.
"""
try:
"""Build hooks env.
"""
- env = {
- "OLD_NAME": self.op.old_name,
+ return {
+ "OLD_NAME": self.op.group_name,
"NEW_NAME": self.op.new_name,
}
+ def BuildHooksNodes(self):
+ """Build hooks nodes.
+
+ """
mn = self.cfg.GetMasterNode()
+
all_nodes = self.cfg.GetAllNodesInfo()
- run_nodes = [mn]
all_nodes.pop(mn, None)
- for node in all_nodes.values():
- if node.group == self.group_uuid:
- run_nodes.append(node.name)
+ run_nodes = [mn]
+ run_nodes.extend(node.name for node in all_nodes.values()
+ if node.group == self.group_uuid)
- return env, run_nodes, run_nodes
+ return (run_nodes, run_nodes)
def Exec(self, feedback_fn):
"""Rename the node group.
if group is None:
raise errors.OpExecError("Could not retrieve group '%s' (UUID: %s)" %
- (self.op.old_name, self.group_uuid))
+ (self.op.group_name, self.group_uuid))
group.name = self.op.new_name
self.cfg.Update(group, feedback_fn)
This is an abstract class which is the parent of all the other tags LUs.
"""
-
def ExpandNames(self):
+ self.group_uuid = None
self.needed_locks = {}
if self.op.kind == constants.TAG_NODE:
self.op.name = _ExpandNodeName(self.cfg, self.op.name)
elif self.op.kind == constants.TAG_INSTANCE:
self.op.name = _ExpandInstanceName(self.cfg, self.op.name)
self.needed_locks[locking.LEVEL_INSTANCE] = self.op.name
+ elif self.op.kind == constants.TAG_NODEGROUP:
+ self.group_uuid = self.cfg.LookupNodeGroup(self.op.name)
# FIXME: Acquire BGL for cluster tag operations (as of this writing it's
# not possible to acquire the BGL based on opcode parameters)
self.target = self.cfg.GetNodeInfo(self.op.name)
elif self.op.kind == constants.TAG_INSTANCE:
self.target = self.cfg.GetInstanceInfo(self.op.name)
+ elif self.op.kind == constants.TAG_NODEGROUP:
+ self.target = self.cfg.GetNodeGroup(self.group_uuid)
else:
raise errors.OpPrereqError("Wrong tag type requested (%s)" %
str(self.op.kind), errors.ECODE_INVAL)
tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
nlist = cfg.GetAllNodesInfo().values()
tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
+ tgts.extend(("/nodegroup/%s" % n.name, n)
+ for n in cfg.GetAllNodeGroupsInfo().values())
results = []
for path, target in tgts:
for tag in target.GetTags():
"i_pri_up_memory": i_p_up_mem,
}
pnr_dyn.update(node_results[nname])
-
- node_results[nname] = pnr_dyn
+ node_results[nname] = pnr_dyn
return node_results
"os": iinfo.os,
"nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
"nics": nic_data,
- "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
+ "disks": [{constants.IDISK_SIZE: dsk.size,
+ constants.IDISK_MODE: dsk.mode}
+ for dsk in iinfo.disks],
"disk_template": iinfo.disk_template,
"hypervisor": iinfo.hypervisor,
}
"""
disk_space = _ComputeDiskSize(self.disk_template, self.disks)
- if self.disk_template in constants.DTS_NET_MIRROR:
+ if self.disk_template in constants.DTS_INT_MIRROR:
self.required_nodes = 2
else:
self.required_nodes = 1
raise errors.ProgrammerError("Unknown instance '%s' passed to"
" IAllocator" % self.name)
- if instance.disk_template not in constants.DTS_NET_MIRROR:
+ if instance.disk_template not in constants.DTS_MIRRORED:
raise errors.OpPrereqError("Can't relocate non-mirrored instances",
errors.ECODE_INVAL)
- if len(instance.secondary_nodes) != 1:
+ if instance.disk_template in constants.DTS_INT_MIRROR and \
+ len(instance.secondary_nodes) != 1:
raise errors.OpPrereqError("Instance has not exactly one secondary node",
errors.ECODE_STATE)
self.required_nodes = 1
- disk_sizes = [{'size': disk.size} for disk in instance.disks]
+ disk_sizes = [{constants.IDISK_SIZE: disk.size} for disk in instance.disks]
disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
request = {
if not isinstance(rdict["result"], list):
raise errors.OpExecError("Can't parse iallocator results: 'result' key"
" is not a list")
+
+ if self.mode == constants.IALLOCATOR_MODE_RELOC:
+ assert self.relocate_from is not None
+ assert self.required_nodes == 1
+
+ node2group = dict((name, ndata["group"])
+ for (name, ndata) in self.in_data["nodes"].items())
+
+ fn = compat.partial(self._NodesToGroups, node2group,
+ self.in_data["nodegroups"])
+
+ request_groups = fn(self.relocate_from)
+ result_groups = fn(rdict["result"])
+
+ if result_groups != request_groups:
+ raise errors.OpExecError("Groups of nodes returned by iallocator (%s)"
+ " differ from original groups (%s)" %
+ (utils.CommaJoin(result_groups),
+ utils.CommaJoin(request_groups)))
+
self.out_data = rdict
+ @staticmethod
+ def _NodesToGroups(node2group, groups, nodes):
+ """Returns a list of unique group names for a list of nodes.
+
+ @type node2group: dict
+ @param node2group: Map from node name to group UUID
+ @type groups: dict
+ @param groups: Group information
+ @type nodes: list
+ @param nodes: Node names
+
+ """
+ result = set()
+
+ for node in nodes:
+ try:
+ group_uuid = node2group[node]
+ except KeyError:
+ # Ignore unknown node
+ pass
+ else:
+ try:
+ group = groups[group_uuid]
+ except KeyError:
+ # Can't find group, let's use UUID
+ group_name = group_uuid
+ else:
+ group_name = group["name"]
+
+ result.add(group_name)
+
+ return sorted(result)
+
class LUTestAllocator(NoHooksLU):
"""Run allocator tests.
constants.QR_INSTANCE: _InstanceQuery,
constants.QR_NODE: _NodeQuery,
constants.QR_GROUP: _GroupQuery,
+ constants.QR_OS: _OsQuery,
}
+assert set(_QUERY_IMPL.keys()) == constants.QR_VIA_OP
+
def _GetQueryImplementation(name):
"""Returns the implemtnation for a query type.
- @param name: Query type, must be one of L{constants.QR_OP_QUERY}
+ @param name: Query type, must be one of L{constants.QR_VIA_OP}
"""
try: