X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/d1b83918e985b53a8e9764cc44808d07dbb2a0be..cb7c019826d8acb1942f6845d0c65c5dfb2aa841:/lib/cmdlib.py diff --git a/lib/cmdlib.py b/lib/cmdlib.py index 74318c8..85555da 100644 --- a/lib/cmdlib.py +++ b/lib/cmdlib.py @@ -95,6 +95,10 @@ class LogicalUnit(object): self.LogStep = processor.LogStep # pylint: disable-msg=C0103 # support for dry-run self.dry_run_result = None + # support for generic debug attribute + if (not hasattr(self.op, "debug_level") or + not isinstance(self.op.debug_level, int)): + self.op.debug_level = 0 # Tasklets self.tasklets = None @@ -280,6 +284,9 @@ class LogicalUnit(object): and hook results """ + # API must be kept, thus we ignore the unused argument and could + # be a function warnings + # pylint: disable-msg=W0613,R0201 return lu_result def _ExpandAndLockInstance(self): @@ -297,12 +304,9 @@ class LogicalUnit(object): else: assert locking.LEVEL_INSTANCE not in self.needed_locks, \ "_ExpandAndLockInstance called with instance-level locks set" - expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name) - if expanded_name is None: - raise errors.OpPrereqError("Instance '%s' not known" % - self.op.instance_name, errors.ECODE_NOENT) - self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name - self.op.instance_name = expanded_name + self.op.instance_name = _ExpandInstanceName(self.cfg, + self.op.instance_name) + self.needed_locks[locking.LEVEL_INSTANCE] = self.op.instance_name def _LockInstancesNodes(self, primary_only=False): """Helper function to declare instances' nodes for locking. @@ -424,7 +428,7 @@ def _GetWantedNodes(lu, nodes): @param nodes: list of node names or None for all nodes @rtype: list @return: the list of nodes, sorted - @raise errors.OpProgrammerError: if the nodes parameter is wrong type + @raise errors.ProgrammerError: if the nodes parameter is wrong type """ if not isinstance(nodes, list): @@ -435,14 +439,7 @@ def _GetWantedNodes(lu, nodes): raise errors.ProgrammerError("_GetWantedNodes should only be called with a" " non-empty list of nodes whose name is to be expanded.") - wanted = [] - for name in nodes: - node = lu.cfg.ExpandNodeName(name) - if node is None: - raise errors.OpPrereqError("No such node name '%s'" % name, - errors.ECODE_NOENT) - wanted.append(node) - + wanted = [_ExpandNodeName(lu.cfg, name) for name in nodes] return utils.NiceSort(wanted) @@ -464,15 +461,7 @@ def _GetWantedInstances(lu, instances): errors.ECODE_INVAL) if instances: - wanted = [] - - for name in instances: - instance = lu.cfg.ExpandInstanceName(name) - if instance is None: - raise errors.OpPrereqError("No such instance name '%s'" % name, - errors.ECODE_NOENT) - wanted.append(instance) - + wanted = [_ExpandInstanceName(lu.cfg, name) for name in instances] else: wanted = utils.NiceSort(lu.cfg.GetInstanceList()) return wanted @@ -552,6 +541,33 @@ def _CheckNodeNotDrained(lu, node): errors.ECODE_INVAL) +def _ExpandItemName(fn, name, kind): + """Expand an item name. + + @param fn: the function to use for expansion + @param name: requested item name + @param kind: text description ('Node' or 'Instance') + @return: the resolved (full) name + @raise errors.OpPrereqError: if the item is not found + + """ + full_name = fn(name) + if full_name is None: + raise errors.OpPrereqError("%s '%s' not known" % (kind, name), + errors.ECODE_NOENT) + return full_name + + +def _ExpandNodeName(cfg, name): + """Wrapper over L{_ExpandItemName} for nodes.""" + return _ExpandItemName(cfg.ExpandNodeName, name, "Node") + + +def _ExpandInstanceName(cfg, name): + """Wrapper over L{_ExpandItemName} for instance.""" + return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance") + + def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status, memory, vcpus, nics, disk_template, disks, bep, hvp, hypervisor_name): @@ -699,7 +715,7 @@ def _BuildInstanceHookEnvByObject(lu, instance, override=None): } if override: args.update(override) - return _BuildInstanceHookEnv(**args) + return _BuildInstanceHookEnv(**args) # pylint: disable-msg=W0142 def _AdjustCandidatePool(lu, exceptions): @@ -909,6 +925,7 @@ class LUDestroyCluster(LogicalUnit): try: hm.RunPhase(constants.HOOKS_PHASE_POST, [master]) except: + # pylint: disable-msg=W0702 self.LogWarning("Errors occurred running hooks on %s" % master) result = self.rpc.call_node_stop_master(master, False) @@ -1029,7 +1046,7 @@ class LUVerifyCluster(LogicalUnit): """ node = nodeinfo.name - _ErrorIf = self._ErrorIf + _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 # main result, node_result should be a non-empty dict test = not node_result or not isinstance(node_result, dict) @@ -1176,7 +1193,7 @@ class LUVerifyCluster(LogicalUnit): available on the instance's node. """ - _ErrorIf = self._ErrorIf + _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 node_current = instanceconfig.primary_node node_vol_should = {} @@ -1291,7 +1308,7 @@ class LUVerifyCluster(LogicalUnit): """ self.bad = False - _ErrorIf = self._ErrorIf + _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 verbose = self.op.verbose self._feedback_fn = feedback_fn feedback_fn("* Verifying global settings") @@ -1440,7 +1457,7 @@ class LUVerifyCluster(LogicalUnit): try: ntime_merged = utils.MergeTime(ntime) except (ValueError, TypeError): - _ErrorIf(test, self.ENODETIME, node, "Node returned invalid time") + _ErrorIf(True, self.ENODETIME, node, "Node returned invalid time") if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW): ntime_diff = abs(nvinfo_starttime - ntime_merged) @@ -1599,7 +1616,8 @@ class LUVerifyCluster(LogicalUnit): test = msg and not res.offline self._ErrorIf(test, self.ENODEHOOKS, node_name, "Communication failure in hooks execution: %s", msg) - if test: + if res.offline or msg: + # No need to investigate payload if node is offline or gave an error. # override manually lu_result here as _ErrorIf only # overrides self.bad lu_result = 1 @@ -1714,10 +1732,7 @@ class LURepairDiskSizes(NoHooksLU): if self.op.instances: self.wanted_names = [] for name in self.op.instances: - full_name = self.cfg.ExpandInstanceName(name) - if full_name is None: - raise errors.OpPrereqError("Instance '%s' not known" % name, - errors.ECODE_NOENT) + full_name = _ExpandInstanceName(self.cfg, name) self.wanted_names.append(full_name) self.needed_locks = { locking.LEVEL_NODE: [], @@ -1839,7 +1854,8 @@ class LURenameCluster(LogicalUnit): "NEW_NAME": self.op.name, } mn = self.cfg.GetMasterNode() - return env, [mn], [mn] + all_nodes = self.cfg.GetNodeList() + return env, [mn], all_nodes def CheckPrereq(self): """Verify that the passed name is a valid one. @@ -2317,10 +2333,9 @@ class LUDiagnoseOS(NoHooksLU): """ @staticmethod - def _DiagnoseByOS(node_list, rlist): + def _DiagnoseByOS(rlist): """Remaps a per-node return list into an a per-os per-node dictionary - @param node_list: a list with the names of all nodes @param rlist: a map with node names as keys and OS objects as values @rtype: dict @@ -2358,7 +2373,7 @@ class LUDiagnoseOS(NoHooksLU): """ valid_nodes = [node for node in self.cfg.GetOnlineNodeList()] node_data = self.rpc.call_os_diagnose(valid_nodes) - pol = self._DiagnoseByOS(valid_nodes, node_data) + pol = self._DiagnoseByOS(node_data) output = [] calc_valid = self._FIELDS_NEEDVALID.intersection(self.op.output_fields) calc_variants = "variants" in self.op.output_fields @@ -2420,8 +2435,11 @@ class LURemoveNode(LogicalUnit): "NODE_NAME": self.op.node_name, } all_nodes = self.cfg.GetNodeList() - if self.op.node_name in all_nodes: + try: all_nodes.remove(self.op.node_name) + except ValueError: + logging.warning("Node %s which is about to be removed not found" + " in the all nodes list", self.op.node_name) return env, all_nodes, all_nodes def CheckPrereq(self): @@ -2435,10 +2453,9 @@ class LURemoveNode(LogicalUnit): Any errors are signaled by raising errors.OpPrereqError. """ - node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name)) - if node is None: - raise errors.OpPrereqError("Node '%s' is unknown." % self.op.node_name, - errors.ECODE_NOENT) + self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name) + node = self.cfg.GetNodeInfo(self.op.node_name) + assert node is not None instance_list = self.cfg.GetInstanceList() @@ -2476,6 +2493,7 @@ class LURemoveNode(LogicalUnit): try: hm.RunPhase(constants.HOOKS_PHASE_POST, [node.name]) except: + # pylint: disable-msg=W0702 self.LogWarning("Errors occurred running hooks on %s" % node.name) result = self.rpc.call_node_leave_cluster(node.name, modify_ssh_setup) @@ -2489,6 +2507,7 @@ class LUQueryNodes(NoHooksLU): """Logical unit for querying nodes. """ + # pylint: disable-msg=W0142 _OP_REQP = ["output_fields", "names", "use_locking"] REQ_BGL = False @@ -2836,12 +2855,7 @@ class LUModifyNodeStorage(NoHooksLU): REQ_BGL = False def CheckArguments(self): - node_name = self.cfg.ExpandNodeName(self.op.node_name) - if node_name is None: - raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name, - errors.ECODE_NOENT) - - self.op.node_name = node_name + self.opnode_name = _ExpandNodeName(self.cfg, self.op.node_name) storage_type = self.op.storage_type if storage_type not in constants.VALID_STORAGE_TYPES: @@ -3019,7 +3033,7 @@ class LUAddNode(LogicalUnit): # later in the procedure; this also means that if the re-add # fails, we are left with a non-offlined, broken node if self.op.readd: - new_node.drained = new_node.offline = False + new_node.drained = new_node.offline = False # pylint: disable-msg=W0201 self.LogInfo("Readding a node, the offline/drained flags were reset") # if we demote the node, we do cleanup later in the procedure new_node.master_candidate = self.master_candidate @@ -3115,11 +3129,7 @@ class LUSetNodeParams(LogicalUnit): REQ_BGL = False def CheckArguments(self): - node_name = self.cfg.ExpandNodeName(self.op.node_name) - if node_name is None: - raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name, - errors.ECODE_INVAL) - self.op.node_name = node_name + self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name) _CheckBooleanOpField(self.op, 'master_candidate') _CheckBooleanOpField(self.op, 'offline') _CheckBooleanOpField(self.op, 'drained') @@ -3197,7 +3207,7 @@ class LUSetNodeParams(LogicalUnit): # If we're being deofflined/drained, we'll MC ourself if needed if (deoffline_or_drain and not offline_or_drain and not - self.op.master_candidate == True): + self.op.master_candidate == True and not node.master_candidate): self.op.master_candidate = _DecideSelfPromotion(self) if self.op.master_candidate: self.LogInfo("Autopromoting node to master candidate") @@ -3268,12 +3278,8 @@ class LUPowercycleNode(NoHooksLU): REQ_BGL = False def CheckArguments(self): - node_name = self.cfg.ExpandNodeName(self.op.node_name) - if node_name is None: - raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name, - errors.ECODE_NOENT) - self.op.node_name = node_name - if node_name == self.cfg.GetMasterNode() and not self.op.force: + self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name) + if self.op.node_name == self.cfg.GetMasterNode() and not self.op.force: raise errors.OpPrereqError("The node is the master and the force" " parameter was not set", errors.ECODE_INVAL) @@ -3390,7 +3396,7 @@ class LUQueryConfigValues(NoHooksLU): elif field == "drain_flag": entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE) elif field == "watcher_pause": - return utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE) + entry = utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE) else: raise errors.ParameterError(field) values.append(entry) @@ -3955,14 +3961,10 @@ class LUReinstallInstance(LogicalUnit): self.op.force_variant = getattr(self.op, "force_variant", False) if self.op.os_type is not None: # OS verification - pnode = self.cfg.GetNodeInfo( - self.cfg.ExpandNodeName(instance.primary_node)) - if pnode is None: - raise errors.OpPrereqError("Primary node '%s' is unknown" % - self.op.pnode, errors.ECODE_NOENT) - result = self.rpc.call_os_get(pnode.name, self.op.os_type) + pnode = _ExpandNodeName(self.cfg, instance.primary_node) + result = self.rpc.call_os_get(pnode, self.op.os_type) result.Raise("OS '%s' not in supported OS list for primary node %s" % - (self.op.os_type, pnode.name), + (self.op.os_type, pnode), prereq=True, ecode=errors.ECODE_INVAL) if not self.op.force_variant: _CheckOSVariant(result.payload, self.op.os_type) @@ -3983,7 +3985,9 @@ class LUReinstallInstance(LogicalUnit): _StartInstanceDisks(self, inst, None) try: feedback_fn("Running the instance OS create scripts...") - result = self.rpc.call_instance_os_add(inst.primary_node, inst, True) + # FIXME: pass debug option from opcode to backend + result = self.rpc.call_instance_os_add(inst.primary_node, inst, True, + self.op.debug_level) result.Raise("Could not install OS for instance %s on node %s" % (inst.name, inst.primary_node)) finally: @@ -4099,11 +4103,10 @@ class LURenameInstance(LogicalUnit): This checks that the instance is in the cluster and is not running. """ - instance = self.cfg.GetInstanceInfo( - self.cfg.ExpandInstanceName(self.op.instance_name)) - if instance is None: - raise errors.OpPrereqError("Instance '%s' not known" % - self.op.instance_name, errors.ECODE_NOENT) + self.op.instance_name = _ExpandInstanceName(self.cfg, + self.op.instance_name) + instance = self.cfg.GetInstanceInfo(self.op.instance_name) + assert instance is not None _CheckNodeOnline(self, instance.primary_node) if instance.admin_up: @@ -4167,7 +4170,7 @@ class LURenameInstance(LogicalUnit): _StartInstanceDisks(self, inst, None) try: result = self.rpc.call_instance_run_rename(inst.primary_node, inst, - old_name) + old_name, self.op.debug_level) msg = result.fail_msg if msg: msg = ("Could not run OS rename script for instance %s on node %s" @@ -4212,7 +4215,8 @@ class LURemoveInstance(LogicalUnit): env = _BuildInstanceHookEnvByObject(self, self.instance) env["SHUTDOWN_TIMEOUT"] = self.shutdown_timeout nl = [self.cfg.GetMasterNode()] - return env, nl, nl + nl_post = list(self.instance.all_nodes) + nl + return env, nl, nl_post def CheckPrereq(self): """Check prerequisites. @@ -4261,6 +4265,7 @@ class LUQueryInstances(NoHooksLU): """Logical unit for querying instances. """ + # pylint: disable-msg=W0142 _OP_REQP = ["output_fields", "names", "use_locking"] REQ_BGL = False _SIMPLE_FIELDS = ["name", "os", "network_port", "hypervisor", @@ -4322,6 +4327,8 @@ class LUQueryInstances(NoHooksLU): """Computes the list of nodes and their attributes. """ + # pylint: disable-msg=R0912 + # way too many branches here all_info = self.cfg.GetAllInstancesInfo() if self.wanted == locking.ALL_SET: # caller didn't specify instance names, so ordering is not important @@ -4568,13 +4575,22 @@ class LUFailoverInstance(LogicalUnit): This runs on master, primary and secondary nodes of the instance. """ + instance = self.instance + source_node = instance.primary_node + target_node = instance.secondary_nodes[0] env = { "IGNORE_CONSISTENCY": self.op.ignore_consistency, "SHUTDOWN_TIMEOUT": self.shutdown_timeout, + "OLD_PRIMARY": source_node, + "OLD_SECONDARY": target_node, + "NEW_PRIMARY": target_node, + "NEW_SECONDARY": source_node, } - env.update(_BuildInstanceHookEnvByObject(self, self.instance)) - nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes) - return env, nl, nl + env.update(_BuildInstanceHookEnvByObject(self, instance)) + nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes) + nl_post = list(nl) + nl_post.append(source_node) + return env, nl, nl_post def CheckPrereq(self): """Check prerequisites. @@ -4716,11 +4732,21 @@ class LUMigrateInstance(LogicalUnit): """ instance = self._migrater.instance + source_node = instance.primary_node + target_node = instance.secondary_nodes[0] env = _BuildInstanceHookEnvByObject(self, instance) env["MIGRATE_LIVE"] = self.op.live env["MIGRATE_CLEANUP"] = self.op.cleanup + env.update({ + "OLD_PRIMARY": source_node, + "OLD_SECONDARY": target_node, + "NEW_PRIMARY": target_node, + "NEW_SECONDARY": source_node, + }) nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes) - return env, nl, nl + nl_post = list(nl) + nl_post.append(source_node) + return env, nl, nl_post class LUMoveInstance(LogicalUnit): @@ -4741,10 +4767,7 @@ class LUMoveInstance(LogicalUnit): def ExpandNames(self): self._ExpandAndLockInstance() - target_node = self.cfg.ExpandNodeName(self.op.target_node) - if target_node is None: - raise errors.OpPrereqError("Node '%s' not known" % - self.op.target_node, errors.ECODE_NOENT) + target_node = _ExpandNodeName(self.cfg, self.op.target_node) self.op.target_node = target_node self.needed_locks[locking.LEVEL_NODE] = [target_node] self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND @@ -4918,10 +4941,7 @@ class LUMigrateNode(LogicalUnit): REQ_BGL = False def ExpandNames(self): - self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name) - if self.op.node_name is None: - raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name, - errors.ECODE_NOENT) + self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name) self.needed_locks = { locking.LEVEL_NODE: [self.op.node_name], @@ -4981,11 +5001,9 @@ class TLMigrateInstance(Tasklet): This checks that the instance is in the cluster. """ - instance = self.cfg.GetInstanceInfo( - self.cfg.ExpandInstanceName(self.instance_name)) - if instance is None: - raise errors.OpPrereqError("Instance '%s' not known" % - self.instance_name, errors.ECODE_NOENT) + instance_name = _ExpandInstanceName(self.lu.cfg, self.instance_name) + instance = self.cfg.GetInstanceInfo(instance_name) + assert instance is not None if instance.disk_template != constants.DT_DRBD8: raise errors.OpPrereqError("Instance's disk layout is not" @@ -5640,15 +5658,10 @@ class LUCreateInstance(LogicalUnit): # TODO: make the ip check more flexible and not depend on the name check raise errors.OpPrereqError("Cannot do ip checks without a name check", errors.ECODE_INVAL) - - def _ExpandNode(self, node): - """Expands and checks one node name. - - """ - node_full = self.cfg.ExpandNodeName(node) - if node_full is None: - raise errors.OpPrereqError("Unknown node %s" % node, errors.ECODE_NOENT) - return node_full + if (self.op.disk_template == constants.DT_FILE and + not constants.ENABLE_FILE_STORAGE): + raise errors.OpPrereqError("File storage disabled at configure time", + errors.ECODE_INVAL) def ExpandNames(self): """ExpandNames for CreateInstance. @@ -5761,16 +5774,14 @@ class LUCreateInstance(LogicalUnit): # MAC address verification mac = nic.get("mac", constants.VALUE_AUTO) if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE): - if not utils.IsValidMac(mac.lower()): - raise errors.OpPrereqError("Invalid MAC address specified: %s" % - mac, errors.ECODE_INVAL) - else: - try: - self.cfg.ReserveMAC(mac, self.proc.GetECId()) - except errors.ReservationError: - raise errors.OpPrereqError("MAC address %s already in use" - " in cluster" % mac, - errors.ECODE_NOTUNIQUE) + mac = utils.NormalizeAndValidateMac(mac) + + try: + self.cfg.ReserveMAC(mac, self.proc.GetECId()) + except errors.ReservationError: + raise errors.OpPrereqError("MAC address %s already in use" + " in cluster" % mac, + errors.ECODE_NOTUNIQUE) # bridge verification bridge = nic.get("bridge", None) @@ -5807,7 +5818,7 @@ class LUCreateInstance(LogicalUnit): raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL) try: size = int(size) - except ValueError: + except (TypeError, ValueError): raise errors.OpPrereqError("Invalid disk size '%s'" % size, errors.ECODE_INVAL) self.disks.append({"size": size, "mode": mode}) @@ -5831,10 +5842,10 @@ class LUCreateInstance(LogicalUnit): if self.op.iallocator: self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET else: - self.op.pnode = self._ExpandNode(self.op.pnode) + self.op.pnode = _ExpandNodeName(self.cfg, self.op.pnode) nodelist = [self.op.pnode] if self.op.snode is not None: - self.op.snode = self._ExpandNode(self.op.snode) + self.op.snode = _ExpandNodeName(self.cfg, self.op.snode) nodelist.append(self.op.snode) self.needed_locks[locking.LEVEL_NODE] = nodelist @@ -5854,12 +5865,12 @@ class LUCreateInstance(LogicalUnit): " path requires a source node option.", errors.ECODE_INVAL) else: - self.op.src_node = src_node = self._ExpandNode(src_node) + self.op.src_node = src_node = _ExpandNodeName(self.cfg, src_node) if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET: self.needed_locks[locking.LEVEL_NODE].append(src_node) if not os.path.isabs(src_path): self.op.src_path = src_path = \ - os.path.join(constants.EXPORT_DIR, src_path) + utils.PathJoin(constants.EXPORT_DIR, src_path) # On import force_variant must be True, because if we forced it at # initial install, our only chance when importing it back is that it @@ -5897,17 +5908,17 @@ class LUCreateInstance(LogicalUnit): " iallocator '%s': %s" % (self.op.iallocator, ial.info), errors.ECODE_NORES) - if len(ial.nodes) != ial.required_nodes: + if len(ial.result) != ial.required_nodes: raise errors.OpPrereqError("iallocator '%s' returned invalid number" " of nodes (%s), required %s" % - (self.op.iallocator, len(ial.nodes), + (self.op.iallocator, len(ial.result), ial.required_nodes), errors.ECODE_FAULT) - self.op.pnode = ial.nodes[0] + self.op.pnode = ial.result[0] self.LogInfo("Selected nodes for instance %s via iallocator %s: %s", self.op.instance_name, self.op.iallocator, - utils.CommaJoin(ial.nodes)) + utils.CommaJoin(ial.result)) if ial.required_nodes == 2: - self.op.snode = ial.nodes[1] + self.op.snode = ial.result[1] def BuildHooksEnv(self): """Build hooks env. @@ -5967,8 +5978,8 @@ class LUCreateInstance(LogicalUnit): if src_path in exp_list[node].payload: found = True self.op.src_node = src_node = node - self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR, - src_path) + self.op.src_path = src_path = utils.PathJoin(constants.EXPORT_DIR, + src_path) break if not found: raise errors.OpPrereqError("No export found for relative path %s" % @@ -6005,7 +6016,7 @@ class LUCreateInstance(LogicalUnit): if export_info.has_option(constants.INISECT_INS, option): # FIXME: are the old os-es, disk sizes, etc. useful? export_name = export_info.get(constants.INISECT_INS, option) - image = os.path.join(src_path, export_name) + image = utils.PathJoin(src_path, export_name) disk_images.append(image) else: disk_images.append(False) @@ -6141,9 +6152,8 @@ class LUCreateInstance(LogicalUnit): string_file_storage_dir = self.op.file_storage_dir # build the full file storage dir path - file_storage_dir = os.path.normpath(os.path.join( - self.cfg.GetFileStorageDir(), - string_file_storage_dir, instance)) + file_storage_dir = utils.PathJoin(self.cfg.GetFileStorageDir(), + string_file_storage_dir, instance) disks = _GenerateDiskTemplate(self, @@ -6219,7 +6229,9 @@ class LUCreateInstance(LogicalUnit): if iobj.disk_template != constants.DT_DISKLESS: if self.op.mode == constants.INSTANCE_CREATE: feedback_fn("* running the instance OS create scripts...") - result = self.rpc.call_instance_os_add(pnode_name, iobj, False) + # FIXME: pass debug option from opcode to backend + result = self.rpc.call_instance_os_add(pnode_name, iobj, False, + self.op.debug_level) result.Raise("Could not add os for instance %s" " on node %s" % (instance, pnode_name)) @@ -6228,9 +6240,11 @@ class LUCreateInstance(LogicalUnit): src_node = self.op.src_node src_images = self.src_images cluster_name = self.cfg.GetClusterName() + # FIXME: pass debug option from opcode to backend import_result = self.rpc.call_instance_os_import(pnode_name, iobj, src_node, src_images, - cluster_name) + cluster_name, + self.op.debug_level) msg = import_result.fail_msg if msg: self.LogWarning("Error while importing the disk images for instance" @@ -6318,6 +6332,8 @@ class LUReplaceDisks(LogicalUnit): self.op.remote_node = None if not hasattr(self.op, "iallocator"): self.op.iallocator = None + if not hasattr(self.op, "early_release"): + self.op.early_release = False TLReplaceDisks.CheckArguments(self.op.mode, self.op.remote_node, self.op.iallocator) @@ -6329,11 +6345,7 @@ class LUReplaceDisks(LogicalUnit): self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET elif self.op.remote_node is not None: - remote_node = self.cfg.ExpandNodeName(self.op.remote_node) - if remote_node is None: - raise errors.OpPrereqError("Node '%s' not known" % - self.op.remote_node, errors.ECODE_NOENT) - + remote_node = _ExpandNodeName(self.cfg, self.op.remote_node) self.op.remote_node = remote_node # Warning: do not remove the locking of the new secondary here @@ -6349,7 +6361,7 @@ class LUReplaceDisks(LogicalUnit): self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode, self.op.iallocator, self.op.remote_node, - self.op.disks) + self.op.disks, False, self.op.early_release) self.tasklets = [self.replacer] @@ -6396,16 +6408,15 @@ class LUEvacuateNode(LogicalUnit): self.op.remote_node = None if not hasattr(self.op, "iallocator"): self.op.iallocator = None + if not hasattr(self.op, "early_release"): + self.op.early_release = False TLReplaceDisks.CheckArguments(constants.REPLACE_DISK_CHG, self.op.remote_node, self.op.iallocator) def ExpandNames(self): - self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name) - if self.op.node_name is None: - raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name, - errors.ECODE_NOENT) + self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name) self.needed_locks = {} @@ -6414,18 +6425,13 @@ class LUEvacuateNode(LogicalUnit): self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET elif self.op.remote_node is not None: - remote_node = self.cfg.ExpandNodeName(self.op.remote_node) - if remote_node is None: - raise errors.OpPrereqError("Node '%s' not known" % - self.op.remote_node, errors.ECODE_NOENT) - - self.op.remote_node = remote_node + self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node) # Warning: do not remove the locking of the new secondary here # unless DRBD8.AddChildren is changed to work in parallel; # currently it doesn't since parallel invocations of # FindUnusedMinor will conflict - self.needed_locks[locking.LEVEL_NODE] = [remote_node] + self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node] self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND else: @@ -6441,7 +6447,8 @@ class LUEvacuateNode(LogicalUnit): names.append(inst.name) replacer = TLReplaceDisks(self, inst.name, constants.REPLACE_DISK_CHG, - self.op.iallocator, self.op.remote_node, []) + self.op.iallocator, self.op.remote_node, [], + True, self.op.early_release) tasklets.append(replacer) self.tasklets = tasklets @@ -6483,7 +6490,7 @@ class TLReplaceDisks(Tasklet): """ def __init__(self, lu, instance_name, mode, iallocator_name, remote_node, - disks): + disks, delay_iallocator, early_release): """Initializes this class. """ @@ -6495,6 +6502,8 @@ class TLReplaceDisks(Tasklet): self.iallocator_name = iallocator_name self.remote_node = remote_node self.disks = disks + self.delay_iallocator = delay_iallocator + self.early_release = early_release # Runtime data self.instance = None @@ -6543,14 +6552,14 @@ class TLReplaceDisks(Tasklet): " %s" % (iallocator_name, ial.info), errors.ECODE_NORES) - if len(ial.nodes) != ial.required_nodes: + if len(ial.result) != ial.required_nodes: raise errors.OpPrereqError("iallocator '%s' returned invalid number" " of nodes (%s), required %s" % (iallocator_name, - len(ial.nodes), ial.required_nodes), + len(ial.result), ial.required_nodes), errors.ECODE_FAULT) - remote_node_name = ial.nodes[0] + remote_node_name = ial.result[0] lu.LogInfo("Selected new secondary for instance '%s': %s", instance_name, remote_node_name) @@ -6581,6 +6590,19 @@ class TLReplaceDisks(Tasklet): len(instance.secondary_nodes), errors.ECODE_FAULT) + if not self.delay_iallocator: + self._CheckPrereq2() + + def _CheckPrereq2(self): + """Check prerequisites, second part. + + This function should always be part of CheckPrereq. It was separated and is + now called from Exec because during node evacuation iallocator was only + called with an unmodified cluster model, not taking planned changes into + account. + + """ + instance = self.instance secondary_node = instance.secondary_nodes[0] if self.iallocator_name is None: @@ -6654,6 +6676,14 @@ class TLReplaceDisks(Tasklet): _CheckNodeNotDrained(self.lu, remote_node) + old_node_info = self.cfg.GetNodeInfo(secondary_node) + assert old_node_info is not None + if old_node_info.offline and not self.early_release: + # doesn't make sense to delay the release + self.early_release = True + self.lu.LogInfo("Old secondary %s is offline, automatically enabling" + " early-release mode", secondary_node) + else: raise errors.ProgrammerError("Unhandled disk replace mode (%s)" % self.mode) @@ -6684,6 +6714,9 @@ class TLReplaceDisks(Tasklet): This dispatches the disk replacement to the appropriate handler. """ + if self.delay_iallocator: + self._CheckPrereq2() + if not self.disks: feedback_fn("No disks need replacement") return @@ -6821,6 +6854,10 @@ class TLReplaceDisks(Tasklet): self.lu.LogWarning("Can't remove old LV: %s" % msg, hint="remove unused LVs manually") + def _ReleaseNodeLock(self, node_name): + """Releases the lock for a given node.""" + self.lu.context.glm.release(locking.LEVEL_NODE, node_name) + def _ExecDrbd8DiskOnly(self, feedback_fn): """Replace a disk on the primary or secondary for DRBD 8. @@ -6931,18 +6968,30 @@ class TLReplaceDisks(Tasklet): self.cfg.Update(self.instance, feedback_fn) + cstep = 5 + if self.early_release: + self.lu.LogStep(cstep, steps_total, "Removing old storage") + cstep += 1 + self._RemoveOldStorage(self.target_node, iv_names) + # WARNING: we release both node locks here, do not do other RPCs + # than WaitForSync to the primary node + self._ReleaseNodeLock([self.target_node, self.other_node]) + # Wait for sync # This can fail as the old devices are degraded and _WaitForSync # does a combined result over all disks, so we don't check its return value - self.lu.LogStep(5, steps_total, "Sync devices") + self.lu.LogStep(cstep, steps_total, "Sync devices") + cstep += 1 _WaitForSync(self.lu, self.instance) # Check all devices manually self._CheckDevices(self.instance.primary_node, iv_names) # Step: remove old storage - self.lu.LogStep(6, steps_total, "Removing old storage") - self._RemoveOldStorage(self.target_node, iv_names) + if not self.early_release: + self.lu.LogStep(cstep, steps_total, "Removing old storage") + cstep += 1 + self._RemoveOldStorage(self.target_node, iv_names) def _ExecDrbd8Secondary(self, feedback_fn): """Replace the secondary node for DRBD 8. @@ -7076,19 +7125,31 @@ class TLReplaceDisks(Tasklet): to_node, msg, hint=("please do a gnt-instance info to see the" " status of disks")) + cstep = 5 + if self.early_release: + self.lu.LogStep(cstep, steps_total, "Removing old storage") + cstep += 1 + self._RemoveOldStorage(self.target_node, iv_names) + # WARNING: we release all node locks here, do not do other RPCs + # than WaitForSync to the primary node + self._ReleaseNodeLock([self.instance.primary_node, + self.target_node, + self.new_node]) # Wait for sync # This can fail as the old devices are degraded and _WaitForSync # does a combined result over all disks, so we don't check its return value - self.lu.LogStep(5, steps_total, "Sync devices") + self.lu.LogStep(cstep, steps_total, "Sync devices") + cstep += 1 _WaitForSync(self.lu, self.instance) # Check all devices manually self._CheckDevices(self.instance.primary_node, iv_names) # Step: remove old storage - self.lu.LogStep(6, steps_total, "Removing old storage") - self._RemoveOldStorage(self.target_node, iv_names) + if not self.early_release: + self.lu.LogStep(cstep, steps_total, "Removing old storage") + self._RemoveOldStorage(self.target_node, iv_names) class LURepairNodeStorage(NoHooksLU): @@ -7099,12 +7160,7 @@ class LURepairNodeStorage(NoHooksLU): REQ_BGL = False def CheckArguments(self): - node_name = self.cfg.ExpandNodeName(self.op.node_name) - if node_name is None: - raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name, - errors.ECODE_NOENT) - - self.op.node_name = node_name + self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name) def ExpandNames(self): self.needed_locks = { @@ -7159,6 +7215,60 @@ class LURepairNodeStorage(NoHooksLU): (self.op.name, self.op.node_name)) +class LUNodeEvacuationStrategy(NoHooksLU): + """Computes the node evacuation strategy. + + """ + _OP_REQP = ["nodes"] + REQ_BGL = False + + def CheckArguments(self): + if not hasattr(self.op, "remote_node"): + self.op.remote_node = None + if not hasattr(self.op, "iallocator"): + self.op.iallocator = None + if self.op.remote_node is not None and self.op.iallocator is not None: + raise errors.OpPrereqError("Give either the iallocator or the new" + " secondary, not both", errors.ECODE_INVAL) + + def ExpandNames(self): + self.op.nodes = _GetWantedNodes(self, self.op.nodes) + self.needed_locks = locks = {} + if self.op.remote_node is None: + locks[locking.LEVEL_NODE] = locking.ALL_SET + else: + self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node) + locks[locking.LEVEL_NODE] = self.op.nodes + [self.op.remote_node] + + def CheckPrereq(self): + pass + + def Exec(self, feedback_fn): + if self.op.remote_node is not None: + instances = [] + for node in self.op.nodes: + instances.extend(_GetNodeSecondaryInstances(self.cfg, node)) + result = [] + for i in instances: + if i.primary_node == self.op.remote_node: + raise errors.OpPrereqError("Node %s is the primary node of" + " instance %s, cannot use it as" + " secondary" % + (self.op.remote_node, i.name), + errors.ECODE_INVAL) + result.append([i.name, self.op.remote_node]) + else: + ial = IAllocator(self.cfg, self.rpc, + mode=constants.IALLOCATOR_MODE_MEVAC, + evac_nodes=self.op.nodes) + ial.Run(self.op.iallocator, validate=True) + if not ial.success: + raise errors.OpExecError("No valid evacuation solution: %s" % ial.info, + errors.ECODE_NORES) + result = ial.result + return result + + class LUGrowDisk(LogicalUnit): """Grow a disk of an instance. @@ -7188,10 +7298,7 @@ class LUGrowDisk(LogicalUnit): "AMOUNT": self.op.amount, } env.update(_BuildInstanceHookEnvByObject(self, self.instance)) - nl = [ - self.cfg.GetMasterNode(), - self.instance.primary_node, - ] + nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes) return env, nl, nl def CheckPrereq(self): @@ -7276,10 +7383,7 @@ class LUQueryInstanceData(NoHooksLU): if self.op.instances: self.wanted_names = [] for name in self.op.instances: - full_name = self.cfg.ExpandInstanceName(name) - if full_name is None: - raise errors.OpPrereqError("Instance '%s' not known" % name, - errors.ECODE_NOENT) + full_name = _ExpandInstanceName(self.cfg, name) self.wanted_names.append(full_name) self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names else: @@ -7470,7 +7574,7 @@ class LUSetInstanceParams(LogicalUnit): errors.ECODE_INVAL) try: size = int(size) - except ValueError, err: + except (TypeError, ValueError), err: raise errors.OpPrereqError("Invalid disk size parameter: %s" % str(err), errors.ECODE_INVAL) disk_dict['size'] = size @@ -7527,9 +7631,8 @@ class LUSetInstanceParams(LogicalUnit): if 'mac' in nic_dict: nic_mac = nic_dict['mac'] if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE): - if not utils.IsValidMac(nic_mac): - raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac, - errors.ECODE_INVAL) + nic_mac = utils.NormalizeAndValidateMac(nic_mac) + if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO: raise errors.OpPrereqError("'auto' is not a valid MAC address when" " modifying an existing nic", @@ -7599,7 +7702,8 @@ class LUSetInstanceParams(LogicalUnit): nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes) return env, nl, nl - def _GetUpdatedParams(self, old_params, update_dict, + @staticmethod + def _GetUpdatedParams(old_params, update_dict, default_values, parameter_types): """Return the new params dict for the given params. @@ -8051,13 +8155,10 @@ class LUExportInstance(LogicalUnit): "Cannot retrieve locked instance %s" % self.op.instance_name _CheckNodeOnline(self, self.instance.primary_node) - self.dst_node = self.cfg.GetNodeInfo( - self.cfg.ExpandNodeName(self.op.target_node)) + self.op.target_node = _ExpandNodeName(self.cfg, self.op.target_node) + self.dst_node = self.cfg.GetNodeInfo(self.op.target_node) + assert self.dst_node is not None - if self.dst_node is None: - # This is wrong node name, not a non-locked node - raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node, - errors.ECODE_NOENT) _CheckNodeOnline(self, self.dst_node.name) _CheckNodeNotDrained(self, self.dst_node.name) @@ -8138,8 +8239,10 @@ class LUExportInstance(LogicalUnit): feedback_fn("Exporting snapshot %s from %s to %s" % (idx, src_node, dst_node.name)) if dev: + # FIXME: pass debug from opcode to backend result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name, - instance, cluster_name, idx) + instance, cluster_name, + idx, self.op.debug_level) msg = result.fail_msg if msg: self.LogWarning("Could not export disk/%s from node %s to" @@ -8253,19 +8356,11 @@ class TagsLU(NoHooksLU): # pylint: disable-msg=W0223 def ExpandNames(self): self.needed_locks = {} if self.op.kind == constants.TAG_NODE: - name = self.cfg.ExpandNodeName(self.op.name) - if name is None: - raise errors.OpPrereqError("Invalid node name (%s)" % - (self.op.name,), errors.ECODE_NOENT) - self.op.name = name - self.needed_locks[locking.LEVEL_NODE] = name + self.op.name = _ExpandNodeName(self.cfg, self.op.name) + self.needed_locks[locking.LEVEL_NODE] = self.op.name elif self.op.kind == constants.TAG_INSTANCE: - name = self.cfg.ExpandInstanceName(self.op.name) - if name is None: - raise errors.OpPrereqError("Invalid instance name (%s)" % - (self.op.name,), errors.ECODE_NOENT) - self.op.name = name - self.needed_locks[locking.LEVEL_INSTANCE] = name + self.op.name = _ExpandInstanceName(self.cfg, self.op.name) + self.needed_locks[locking.LEVEL_INSTANCE] = self.op.name def CheckPrereq(self): """Check prerequisites. @@ -8454,34 +8549,45 @@ class IAllocator(object): easy usage """ + # pylint: disable-msg=R0902 + # lots of instance attributes _ALLO_KEYS = [ - "mem_size", "disks", "disk_template", + "name", "mem_size", "disks", "disk_template", "os", "tags", "nics", "vcpus", "hypervisor", ] _RELO_KEYS = [ - "relocate_from", + "name", "relocate_from", + ] + _EVAC_KEYS = [ + "evac_nodes", ] - def __init__(self, cfg, rpc, mode, name, **kwargs): + def __init__(self, cfg, rpc, mode, **kwargs): self.cfg = cfg self.rpc = rpc # init buffer variables self.in_text = self.out_text = self.in_data = self.out_data = None # init all input fields so that pylint is happy self.mode = mode - self.name = name self.mem_size = self.disks = self.disk_template = None self.os = self.tags = self.nics = self.vcpus = None self.hypervisor = None self.relocate_from = None + self.name = None + self.evac_nodes = None # computed fields self.required_nodes = None # init result fields - self.success = self.info = self.nodes = None + self.success = self.info = self.result = None if self.mode == constants.IALLOCATOR_MODE_ALLOC: keyset = self._ALLO_KEYS + fn = self._AddNewInstance elif self.mode == constants.IALLOCATOR_MODE_RELOC: keyset = self._RELO_KEYS + fn = self._AddRelocateInstance + elif self.mode == constants.IALLOCATOR_MODE_MEVAC: + keyset = self._EVAC_KEYS + fn = self._AddEvacuateNodes else: raise errors.ProgrammerError("Unknown mode '%s' passed to the" " IAllocator" % self.mode) @@ -8490,11 +8596,12 @@ class IAllocator(object): raise errors.ProgrammerError("Invalid input parameter '%s' to" " IAllocator" % key) setattr(self, key, kwargs[key]) + for key in keyset: if key not in kwargs: raise errors.ProgrammerError("Missing input parameter '%s' to" " IAllocator" % key) - self._BuildInputData() + self._BuildInputData(fn) def _ComputeClusterData(self): """Compute the generic allocator input data. @@ -8523,6 +8630,8 @@ class IAllocator(object): hypervisor_name = self.hypervisor elif self.mode == constants.IALLOCATOR_MODE_RELOC: hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor + elif self.mode == constants.IALLOCATOR_MODE_MEVAC: + hypervisor_name = cluster_info.enabled_hypervisors[0] node_data = self.rpc.call_node_info(node_list, cfg.GetVGName(), hypervisor_name) @@ -8633,8 +8742,6 @@ class IAllocator(object): done. """ - data = self.in_data - disk_space = _ComputeDiskSize(self.disk_template, self.disks) if self.disk_template in constants.DTS_NET_MIRROR: @@ -8642,7 +8749,6 @@ class IAllocator(object): else: self.required_nodes = 1 request = { - "type": "allocate", "name": self.name, "disk_template": self.disk_template, "tags": self.tags, @@ -8654,7 +8760,7 @@ class IAllocator(object): "nics": self.nics, "required_nodes": self.required_nodes, } - data["request"] = request + return request def _AddRelocateInstance(self): """Add relocate instance data to allocator structure. @@ -8684,24 +8790,31 @@ class IAllocator(object): disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes) request = { - "type": "relocate", "name": self.name, "disk_space_total": disk_space, "required_nodes": self.required_nodes, "relocate_from": self.relocate_from, } - self.in_data["request"] = request + return request - def _BuildInputData(self): + def _AddEvacuateNodes(self): + """Add evacuate nodes data to allocator structure. + + """ + request = { + "evac_nodes": self.evac_nodes + } + return request + + def _BuildInputData(self, fn): """Build input data structures. """ self._ComputeClusterData() - if self.mode == constants.IALLOCATOR_MODE_ALLOC: - self._AddNewInstance() - else: - self._AddRelocateInstance() + request = fn() + request["type"] = self.mode + self.in_data["request"] = request self.in_text = serializer.Dump(self.in_data) @@ -8734,14 +8847,19 @@ class IAllocator(object): if not isinstance(rdict, dict): raise errors.OpExecError("Can't parse iallocator results: not a dict") - for key in "success", "info", "nodes": + # TODO: remove backwards compatiblity in later versions + if "nodes" in rdict and "result" not in rdict: + rdict["result"] = rdict["nodes"] + del rdict["nodes"] + + for key in "success", "info", "result": if key not in rdict: raise errors.OpExecError("Can't parse iallocator results:" " missing key '%s'" % key) setattr(self, key, rdict[key]) - if not isinstance(rdict["nodes"], list): - raise errors.OpExecError("Can't parse iallocator results: 'nodes' key" + if not isinstance(rdict["result"], list): + raise errors.OpExecError("Can't parse iallocator results: 'result' key" " is not a list") self.out_data = rdict @@ -8797,12 +8915,13 @@ class LUTestAllocator(NoHooksLU): if not hasattr(self.op, "name"): raise errors.OpPrereqError("Missing attribute 'name' on opcode input", errors.ECODE_INVAL) - fname = self.cfg.ExpandInstanceName(self.op.name) - if fname is None: - raise errors.OpPrereqError("Instance '%s' not found for relocation" % - self.op.name, errors.ECODE_NOENT) + fname = _ExpandInstanceName(self.cfg, self.op.name) self.op.name = fname self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes + elif self.op.mode == constants.IALLOCATOR_MODE_MEVAC: + if not hasattr(self.op, "evac_nodes"): + raise errors.OpPrereqError("Missing attribute 'evac_nodes' on" + " opcode input", errors.ECODE_INVAL) else: raise errors.OpPrereqError("Invalid test allocator mode '%s'" % self.op.mode, errors.ECODE_INVAL) @@ -8832,12 +8951,19 @@ class LUTestAllocator(NoHooksLU): vcpus=self.op.vcpus, hypervisor=self.op.hypervisor, ) - else: + elif self.op.mode == constants.IALLOCATOR_MODE_RELOC: ial = IAllocator(self.cfg, self.rpc, mode=self.op.mode, name=self.op.name, relocate_from=list(self.relocate_from), ) + elif self.op.mode == constants.IALLOCATOR_MODE_MEVAC: + ial = IAllocator(self.cfg, self.rpc, + mode=self.op.mode, + evac_nodes=self.op.evac_nodes) + else: + raise errors.ProgrammerError("Uncatched mode %s in" + " LUTestAllocator.Exec", self.op.mode) if self.op.direction == constants.IALLOCATOR_DIR_IN: result = ial.in_text