X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/229853143d90fdf6178a3039a640a4508493f07a..d4f4b3e763cf53d4718521d22c3651cbb3f349a1:/lib/cmdlib.py diff --git a/lib/cmdlib.py b/lib/cmdlib.py index 810cf0c..46944d9 100644 --- a/lib/cmdlib.py +++ b/lib/cmdlib.py @@ -37,11 +37,10 @@ from ganeti import logger from ganeti import utils from ganeti import errors from ganeti import hypervisor -from ganeti import config +from ganeti import locking from ganeti import constants from ganeti import objects from ganeti import opcodes -from ganeti import ssconf from ganeti import serializer @@ -49,21 +48,27 @@ class LogicalUnit(object): """Logical Unit base class. Subclasses must follow these rules: - - implement CheckPrereq which also fills in the opcode instance - with all the fields (even if as None) + - implement ExpandNames + - implement CheckPrereq - implement Exec - implement BuildHooksEnv - redefine HPATH and HTYPE - - optionally redefine their run requirements (REQ_MASTER); note that all - commands require root permissions + - optionally redefine their run requirements: + REQ_MASTER: the LU needs to run on the master node + REQ_WSSTORE: the LU needs a writable SimpleStore + REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively + + Note that all commands require root permissions. """ HPATH = None HTYPE = None _OP_REQP = [] REQ_MASTER = True + REQ_WSSTORE = False + REQ_BGL = True - def __init__(self, processor, op, cfg, sstore): + def __init__(self, processor, op, context, sstore): """Constructor for LogicalUnit. This needs to be overriden in derived classes in order to check op @@ -72,8 +77,14 @@ class LogicalUnit(object): """ self.proc = processor self.op = op - self.cfg = cfg + self.cfg = context.cfg self.sstore = sstore + self.context = context + self.needed_locks = None + self.acquired_locks = {} + self.share_locks = dict(((i, 0) for i in locking.LEVELS)) + # Used to force good behavior when calling helper functions + self.recalculate_locks = {} self.__ssh = None for attr_name in self._OP_REQP: @@ -82,7 +93,7 @@ class LogicalUnit(object): raise errors.OpPrereqError("Required parameter '%s' missing" % attr_name) - if not cfg.IsCluster(): + if not self.cfg.IsCluster(): raise errors.OpPrereqError("Cluster not initialized yet," " use 'gnt-cluster init' first.") if self.REQ_MASTER: @@ -101,6 +112,66 @@ class LogicalUnit(object): ssh = property(fget=__GetSSH) + def ExpandNames(self): + """Expand names for this LU. + + This method is called before starting to execute the opcode, and it should + update all the parameters of the opcode to their canonical form (e.g. a + short node name must be fully expanded after this method has successfully + completed). This way locking, hooks, logging, ecc. can work correctly. + + LUs which implement this method must also populate the self.needed_locks + member, as a dict with lock levels as keys, and a list of needed lock names + as values. Rules: + - Use an empty dict if you don't need any lock + - If you don't need any lock at a particular level omit that level + - Don't put anything for the BGL level + - If you want all locks at a level use locking.ALL_SET as a value + + If you need to share locks (rather than acquire them exclusively) at one + level you can modify self.share_locks, setting a true value (usually 1) for + that level. By default locks are not shared. + + Examples: + # Acquire all nodes and one instance + self.needed_locks = { + locking.LEVEL_NODE: locking.ALL_SET, + locking.LEVEL_INSTANCE: ['instance1.example.tld'], + } + # Acquire just two nodes + self.needed_locks = { + locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'], + } + # Acquire no locks + self.needed_locks = {} # No, you can't leave it to the default value None + + """ + # The implementation of this method is mandatory only if the new LU is + # concurrent, so that old LUs don't need to be changed all at the same + # time. + if self.REQ_BGL: + self.needed_locks = {} # Exclusive LUs don't need locks. + else: + raise NotImplementedError + + def DeclareLocks(self, level): + """Declare LU locking needs for a level + + While most LUs can just declare their locking needs at ExpandNames time, + sometimes there's the need to calculate some locks after having acquired + the ones before. This function is called just before acquiring locks at a + particular level, but after acquiring the ones at lower levels, and permits + such calculations. It can be used to modify self.needed_locks, and by + default it does nothing. + + This function is only called if you have something already set in + self.needed_locks for the level. + + @param level: Locking level which is going to be locked + @type level: member of ganeti.locking.LEVELS + + """ + def CheckPrereq(self): """Check prerequisites for this LU. @@ -113,9 +184,7 @@ class LogicalUnit(object): not fulfilled. Its return value is ignored. This method should also update all the parameters of the opcode to - their canonical form; e.g. a short node name must be fully - expanded after this method has successfully completed (so that - hooks, logging, etc. work correctly). + their canonical form if it hasn't been done by ExpandNames before. """ raise NotImplementedError @@ -170,6 +239,73 @@ class LogicalUnit(object): """ return lu_result + def _ExpandAndLockInstance(self): + """Helper function to expand and lock an instance. + + Many LUs that work on an instance take its name in self.op.instance_name + and need to expand it and then declare the expanded name for locking. This + function does it, and then updates self.op.instance_name to the expanded + name. It also initializes needed_locks as a dict, if this hasn't been done + before. + + """ + if self.needed_locks is None: + self.needed_locks = {} + else: + assert locking.LEVEL_INSTANCE not in self.needed_locks, \ + "_ExpandAndLockInstance called with instance-level locks set" + expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name) + if expanded_name is None: + raise errors.OpPrereqError("Instance '%s' not known" % + self.op.instance_name) + self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name + self.op.instance_name = expanded_name + + def _LockInstancesNodes(self, primary_only=False): + """Helper function to declare instances' nodes for locking. + + This function should be called after locking one or more instances to lock + their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE] + with all primary or secondary nodes for instances already locked and + present in self.needed_locks[locking.LEVEL_INSTANCE]. + + It should be called from DeclareLocks, and for safety only works if + self.recalculate_locks[locking.LEVEL_NODE] is set. + + In the future it may grow parameters to just lock some instance's nodes, or + to just lock primaries or secondary nodes, if needed. + + If should be called in DeclareLocks in a way similar to: + + if level == locking.LEVEL_NODE: + self._LockInstancesNodes() + + @type primary_only: boolean + @param primary_only: only lock primary nodes of locked instances + + """ + assert locking.LEVEL_NODE in self.recalculate_locks, \ + "_LockInstancesNodes helper function called with no nodes to recalculate" + + # TODO: check if we're really been called with the instance locks held + + # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the + # future we might want to have different behaviors depending on the value + # of self.recalculate_locks[locking.LEVEL_NODE] + wanted_nodes = [] + for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]: + instance = self.context.cfg.GetInstanceInfo(instance_name) + wanted_nodes.append(instance.primary_node) + if not primary_only: + wanted_nodes.extend(instance.secondary_nodes) + + if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE: + self.needed_locks[locking.LEVEL_NODE] = wanted_nodes + elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND: + self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes) + + del self.recalculate_locks[locking.LEVEL_NODE] + class NoHooksLU(LogicalUnit): """Simple LU which runs no hooks. @@ -192,17 +328,17 @@ def _GetWantedNodes(lu, nodes): if not isinstance(nodes, list): raise errors.OpPrereqError("Invalid argument type 'nodes'") - if nodes: - wanted = [] + if not nodes: + raise errors.ProgrammerError("_GetWantedNodes should only be called with a" + " non-empty list of nodes whose name is to be expanded.") - for name in nodes: - node = lu.cfg.ExpandNodeName(name) - if node is None: - raise errors.OpPrereqError("No such node name '%s'" % name) - wanted.append(node) + wanted = [] + for name in nodes: + node = lu.cfg.ExpandNodeName(name) + if node is None: + raise errors.OpPrereqError("No such node name '%s'" % name) + wanted.append(node) - else: - wanted = lu.cfg.GetNodeList() return utils.NiceSort(wanted) @@ -347,12 +483,12 @@ class LUDestroyCluster(NoHooksLU): """ master = self.sstore.GetMasterNode() - if not rpc.call_node_stop_master(master): + if not rpc.call_node_stop_master(master, False): raise errors.OpExecError("Could not disable the master role") priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS) utils.CreateBackup(priv_key) utils.CreateBackup(pub_key) - rpc.call_node_leave_cluster(master) + return master class LUVerifyCluster(LogicalUnit): @@ -362,6 +498,14 @@ class LUVerifyCluster(LogicalUnit): HPATH = "cluster-verify" HTYPE = constants.HTYPE_CLUSTER _OP_REQP = ["skip_checks"] + REQ_BGL = False + + def ExpandNames(self): + self.needed_locks = { + locking.LEVEL_NODE: locking.ALL_SET, + locking.LEVEL_INSTANCE: locking.ALL_SET, + } + self.share_locks = dict(((i, 1) for i in locking.LEVELS)) def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result, remote_version, feedback_fn): @@ -726,7 +870,7 @@ class LUVerifyCluster(LogicalUnit): feedback_fn(" - NOTICE: %d non-redundant instance(s) found." % len(i_non_redundant)) - return int(bad) + return not bad def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result): """Analize the post-hooks' result, handle it, and send some @@ -739,7 +883,8 @@ class LUVerifyCluster(LogicalUnit): lu_result: previous Exec result """ - # We only really run POST phase hooks, and are only interested in their results + # We only really run POST phase hooks, and are only interested in + # their results if phase == constants.HOOKS_PHASE_POST: # Used to change hooks' output to proper indentation indent_re = re.compile('^', re.M) @@ -775,6 +920,14 @@ class LUVerifyDisks(NoHooksLU): """ _OP_REQP = [] + REQ_BGL = False + + def ExpandNames(self): + self.needed_locks = { + locking.LEVEL_NODE: locking.ALL_SET, + locking.LEVEL_INSTANCE: locking.ALL_SET, + } + self.share_locks = dict(((i, 1) for i in locking.LEVELS)) def CheckPrereq(self): """Check prerequisites. @@ -849,6 +1002,7 @@ class LURenameCluster(LogicalUnit): HPATH = "cluster-rename" HTYPE = constants.HTYPE_CLUSTER _OP_REQP = ["name"] + REQ_WSSTORE = True def BuildHooksEnv(self): """Build hooks env. @@ -875,8 +1029,7 @@ class LURenameCluster(LogicalUnit): raise errors.OpPrereqError("Neither the name nor the IP address of the" " cluster has changed") if new_ip != old_ip: - result = utils.RunCmd(["fping", "-q", new_ip]) - if not result.failed: + if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT): raise errors.OpPrereqError("The given cluster IP address (%s) is" " reachable on the network. Aborting." % new_ip) @@ -893,7 +1046,7 @@ class LURenameCluster(LogicalUnit): # shutdown the master IP master = ss.GetMasterNode() - if not rpc.call_node_stop_master(master): + if not rpc.call_node_stop_master(master, False): raise errors.OpExecError("Could not disable the master role") try: @@ -916,7 +1069,7 @@ class LURenameCluster(LogicalUnit): logger.Error("copy of file %s to node %s failed" % (fname, to_node)) finally: - if not rpc.call_node_start_master(master): + if not rpc.call_node_start_master(master, False): logger.Error("Could not re-enable the master role on the master," " please restart manually.") @@ -1046,15 +1199,7 @@ def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False): if done or oneshot: break - if unlock: - #utils.Unlock('cmd') - pass - try: - time.sleep(min(60, max_time)) - finally: - if unlock: - #utils.Lock('cmd') - pass + time.sleep(min(60, max_time)) if done: proc.LogInfo("Instance %s's disks are in sync." % instance.name) @@ -1095,13 +1240,9 @@ class LUDiagnoseOS(NoHooksLU): """ _OP_REQP = ["output_fields", "names"] + REQ_BGL = False - def CheckPrereq(self): - """Check prerequisites. - - This always succeeds, since this is a pure query LU. - - """ + def ExpandNames(self): if self.op.names: raise errors.OpPrereqError("Selective OS query not supported") @@ -1110,6 +1251,16 @@ class LUDiagnoseOS(NoHooksLU): dynamic=self.dynamic_fields, selected=self.op.output_fields) + # Lock all nodes, in shared mode + self.needed_locks = {} + self.share_locks[locking.LEVEL_NODE] = 1 + self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET + + def CheckPrereq(self): + """Check prerequisites. + + """ + @staticmethod def _DiagnoseByOS(node_list, rlist): """Remaps a per-node return list into an a per-os per-node dictionary @@ -1145,7 +1296,7 @@ class LUDiagnoseOS(NoHooksLU): """Compute the list of OSes. """ - node_list = self.cfg.GetNodeList() + node_list = self.acquired_locks[locking.LEVEL_NODE] node_data = rpc.call_os_diagnose(node_list) if node_data == False: raise errors.OpExecError("Can't gather the list of OSes") @@ -1182,7 +1333,7 @@ class LURemoveNode(LogicalUnit): """Build hooks env. This doesn't run on the target node in the pre phase as a failed - node would not allows itself to run. + node would then be impossible to remove. """ env = { @@ -1234,15 +1385,9 @@ class LURemoveNode(LogicalUnit): logger.Info("stopping the node daemon and removing configs from node %s" % node.name) - rpc.call_node_leave_cluster(node.name) - - self.ssh.Run(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT) + self.context.RemoveNode(node.name) - logger.Info("Removing node %s from config" % node.name) - - self.cfg.RemoveNode(node.name) - - utils.RemoveHostFromEtcHosts(node.name) + rpc.call_node_leave_cluster(node.name) class LUQueryNodes(NoHooksLU): @@ -1250,13 +1395,9 @@ class LUQueryNodes(NoHooksLU): """ _OP_REQP = ["output_fields", "names"] + REQ_BGL = False - def CheckPrereq(self): - """Check prerequisites. - - This checks that the fields required are valid output fields. - - """ + def ExpandNames(self): self.dynamic_fields = frozenset([ "dtotal", "dfree", "mtotal", "mnode", "mfree", @@ -1264,20 +1405,48 @@ class LUQueryNodes(NoHooksLU): "ctotal", ]) - _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt", - "pinst_list", "sinst_list", - "pip", "sip"], + self.static_fields = frozenset([ + "name", "pinst_cnt", "sinst_cnt", + "pinst_list", "sinst_list", + "pip", "sip", "tags", + ]) + + _CheckOutputFields(static=self.static_fields, dynamic=self.dynamic_fields, selected=self.op.output_fields) - self.wanted = _GetWantedNodes(self, self.op.names) + self.needed_locks = {} + self.share_locks[locking.LEVEL_NODE] = 1 + + if self.op.names: + self.wanted = _GetWantedNodes(self, self.op.names) + else: + self.wanted = locking.ALL_SET + + self.do_locking = not self.static_fields.issuperset(self.op.output_fields) + if self.do_locking: + # if we don't request only static fields, we need to lock the nodes + self.needed_locks[locking.LEVEL_NODE] = self.wanted + + + def CheckPrereq(self): + """Check prerequisites. + + """ + # The validation of the node list is done in the _GetWantedNodes, + # if non empty, and if empty, there's no validation to do + pass def Exec(self, feedback_fn): """Computes the list of nodes and their attributes. """ - nodenames = self.wanted - nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames] + all_info = self.cfg.GetAllNodesInfo() + if self.do_locking: + nodenames = self.acquired_locks[locking.LEVEL_NODE] + else: + nodenames = all_info.keys() + nodelist = [all_info[name] for name in nodenames] # begin data gathering @@ -1337,6 +1506,8 @@ class LUQueryNodes(NoHooksLU): val = node.primary_ip elif field == "sip": val = node.secondary_ip + elif field == "tags": + val = list(node.GetTags()) elif field in self.dynamic_fields: val = live_data[node.name].get(field, None) else: @@ -1352,6 +1523,20 @@ class LUQueryNodeVolumes(NoHooksLU): """ _OP_REQP = ["nodes", "output_fields"] + REQ_BGL = False + + def ExpandNames(self): + _CheckOutputFields(static=["node"], + dynamic=["phys", "vg", "name", "size", "instance"], + selected=self.op.output_fields) + + self.needed_locks = {} + self.share_locks[locking.LEVEL_NODE] = 1 + if not self.op.nodes: + self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET + else: + self.needed_locks[locking.LEVEL_NODE] = \ + _GetWantedNodes(self, self.op.nodes) def CheckPrereq(self): """Check prerequisites. @@ -1359,12 +1544,7 @@ class LUQueryNodeVolumes(NoHooksLU): This checks that the fields required are valid output fields. """ - self.nodes = _GetWantedNodes(self, self.op.nodes) - - _CheckOutputFields(static=["node"], - dynamic=["phys", "vg", "name", "size", "instance"], - selected=self.op.output_fields) - + self.nodes = self.acquired_locks[locking.LEVEL_NODE] def Exec(self, feedback_fn): """Computes the list of nodes and their attributes. @@ -1518,11 +1698,6 @@ class LUAddNode(LogicalUnit): primary_ip=primary_ip, secondary_ip=secondary_ip) - if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31: - if not os.path.exists(constants.VNC_PASSWORD_FILE): - raise errors.OpPrereqError("Cluster VNC password file %s missing" % - constants.VNC_PASSWORD_FILE) - def Exec(self, feedback_fn): """Adds the new node to the cluster. @@ -1530,46 +1705,7 @@ class LUAddNode(LogicalUnit): new_node = self.new_node node = new_node.name - # set up inter-node password and certificate and restarts the node daemon - gntpass = self.sstore.GetNodeDaemonPassword() - if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass): - raise errors.OpExecError("ganeti password corruption detected") - f = open(constants.SSL_CERT_FILE) - try: - gntpem = f.read(8192) - finally: - f.close() - # in the base64 pem encoding, neither '!' nor '.' are valid chars, - # so we use this to detect an invalid certificate; as long as the - # cert doesn't contain this, the here-document will be correctly - # parsed by the shell sequence below - if re.search('^!EOF\.', gntpem, re.MULTILINE): - raise errors.OpExecError("invalid PEM encoding in the SSL certificate") - if not gntpem.endswith("\n"): - raise errors.OpExecError("PEM must end with newline") - logger.Info("copy cluster pass to %s and starting the node daemon" % node) - - # and then connect with ssh to set password and start ganeti-noded - # note that all the below variables are sanitized at this point, - # either by being constants or by the checks above - ss = self.sstore - mycommand = ("umask 077 && " - "echo '%s' > '%s' && " - "cat > '%s' << '!EOF.' && \n" - "%s!EOF.\n%s restart" % - (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS), - constants.SSL_CERT_FILE, gntpem, - constants.NODE_INITD_SCRIPT)) - - result = self.ssh.Run(node, 'root', mycommand, batch=False, ask_key=True) - if result.failed: - raise errors.OpExecError("Remote command on node %s, error: %s," - " output: %s" % - (node, result.fail_reason, result.output)) - # check connectivity - time.sleep(4) - result = rpc.call_version([node])[node] if result: if constants.PROTOCOL_VERSION == result: @@ -1616,12 +1752,22 @@ class LUAddNode(LogicalUnit): " you gave (%s). Please fix and re-run this" " command." % new_node.secondary_ip) - success, msg = self.ssh.VerifyNodeHostname(node) - if not success: - raise errors.OpExecError("Node '%s' claims it has a different hostname" - " than the one the resolver gives: %s." - " Please fix and re-run this command." % - (node, msg)) + node_verify_list = [self.sstore.GetMasterNode()] + node_verify_param = { + 'nodelist': [node], + # TODO: do a node-net-test as well? + } + + result = rpc.call_node_verify(node_verify_list, node_verify_param) + for verifier in node_verify_list: + if not result[verifier]: + raise errors.OpExecError("Cannot communicate with %s's node daemon" + " for remote verification" % verifier) + if result[verifier]['nodelist']: + for failed in result[verifier]['nodelist']: + feedback_fn("ssh/hostname verification failed %s -> %s" % + (verifier, result[verifier]['nodelist'][failed])) + raise errors.OpExecError("ssh/hostname verification failed.") # Distribute updated /etc/hosts and known_hosts to all nodes, # including the node just added @@ -1640,87 +1786,18 @@ class LUAddNode(LogicalUnit): logger.Error("copy of file %s to node %s failed" % (fname, to_node)) - to_copy = ss.GetFileList() + to_copy = self.sstore.GetFileList() if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31: to_copy.append(constants.VNC_PASSWORD_FILE) for fname in to_copy: - if not self.ssh.CopyFileToNode(node, fname): + result = rpc.call_upload_file([node], fname) + if not result[node]: logger.Error("could not copy file %s to node %s" % (fname, node)) - if not self.op.readd: - logger.Info("adding node %s to cluster.conf" % node) - self.cfg.AddNode(new_node) - - -class LUMasterFailover(LogicalUnit): - """Failover the master node to the current node. - - This is a special LU in that it must run on a non-master node. - - """ - HPATH = "master-failover" - HTYPE = constants.HTYPE_CLUSTER - REQ_MASTER = False - _OP_REQP = [] - - def BuildHooksEnv(self): - """Build hooks env. - - This will run on the new master only in the pre phase, and on all - the nodes in the post phase. - - """ - env = { - "OP_TARGET": self.new_master, - "NEW_MASTER": self.new_master, - "OLD_MASTER": self.old_master, - } - return env, [self.new_master], self.cfg.GetNodeList() - - def CheckPrereq(self): - """Check prerequisites. - - This checks that we are not already the master. - - """ - self.new_master = utils.HostInfo().name - self.old_master = self.sstore.GetMasterNode() - - if self.old_master == self.new_master: - raise errors.OpPrereqError("This commands must be run on the node" - " where you want the new master to be." - " %s is already the master" % - self.old_master) - - def Exec(self, feedback_fn): - """Failover the master node. - - This command, when run on a non-master node, will cause the current - master to cease being master, and the non-master to become new - master. - - """ - #TODO: do not rely on gethostname returning the FQDN - logger.Info("setting master to %s, old master: %s" % - (self.new_master, self.old_master)) - - if not rpc.call_node_stop_master(self.old_master): - logger.Error("could disable the master role on the old master" - " %s, please disable manually" % self.old_master) - - ss = self.sstore - ss.SetKey(ss.SS_MASTER_NODE, self.new_master) - if not rpc.call_upload_file(self.cfg.GetNodeList(), - ss.KeyToFilename(ss.SS_MASTER_NODE)): - logger.Error("could not distribute the new simple store master file" - " to the other nodes, please check.") - - if not rpc.call_node_start_master(self.new_master): - logger.Error("could not start the master role on the new master" - " %s, please check" % self.new_master) - feedback_fn("Error in activating the master IP on the new master," - " please fix manually.") - + if self.op.readd: + self.context.ReaddNode(new_node) + else: + self.context.AddNode(new_node) class LUQueryClusterInfo(NoHooksLU): @@ -1729,6 +1806,10 @@ class LUQueryClusterInfo(NoHooksLU): """ _OP_REQP = [] REQ_MASTER = False + REQ_BGL = False + + def ExpandNames(self): + self.needed_locks = {} def CheckPrereq(self): """No prerequsites needed for this LU. @@ -1755,50 +1836,15 @@ class LUQueryClusterInfo(NoHooksLU): return result -class LUClusterCopyFile(NoHooksLU): - """Copy file to cluster. - - """ - _OP_REQP = ["nodes", "filename"] - - def CheckPrereq(self): - """Check prerequisites. - - It should check that the named file exists and that the given list - of nodes is valid. - - """ - if not os.path.exists(self.op.filename): - raise errors.OpPrereqError("No such filename '%s'" % self.op.filename) - - self.nodes = _GetWantedNodes(self, self.op.nodes) - - def Exec(self, feedback_fn): - """Copy a file from master to some nodes. - - Args: - opts - class with options as members - args - list containing a single element, the file name - Opts used: - nodes - list containing the name of target nodes; if empty, all nodes - - """ - filename = self.op.filename - - myname = utils.HostInfo().name - - for node in self.nodes: - if node == myname: - continue - if not self.ssh.CopyFileToNode(node, filename): - logger.Error("Copy of file %s to node %s failed" % (filename, node)) - - class LUDumpClusterConfig(NoHooksLU): """Return a text-representation of the cluster-config. """ _OP_REQP = [] + REQ_BGL = False + + def ExpandNames(self): + self.needed_locks = {} def CheckPrereq(self): """No prerequisites. @@ -1813,43 +1859,21 @@ class LUDumpClusterConfig(NoHooksLU): return self.cfg.DumpConfig() -class LURunClusterCommand(NoHooksLU): - """Run a command on some nodes. - - """ - _OP_REQP = ["command", "nodes"] - - def CheckPrereq(self): - """Check prerequisites. - - It checks that the given list of nodes is valid. - - """ - self.nodes = _GetWantedNodes(self, self.op.nodes) - - def Exec(self, feedback_fn): - """Run a command on some nodes. - - """ - # put the master at the end of the nodes list - master_node = self.sstore.GetMasterNode() - if master_node in self.nodes: - self.nodes.remove(master_node) - self.nodes.append(master_node) - - data = [] - for node in self.nodes: - result = self.ssh.Run(node, "root", self.op.command) - data.append((node, result.output, result.exit_code)) - - return data - - class LUActivateInstanceDisks(NoHooksLU): """Bring up an instance's disks. """ _OP_REQP = ["instance_name"] + REQ_BGL = False + + def ExpandNames(self): + self._ExpandAndLockInstance() + self.needed_locks[locking.LEVEL_NODE] = [] + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + + def DeclareLocks(self, level): + if level == locking.LEVEL_NODE: + self._LockInstancesNodes() def CheckPrereq(self): """Check prerequisites. @@ -1857,13 +1881,9 @@ class LUActivateInstanceDisks(NoHooksLU): This checks that the instance is in the cluster. """ - instance = self.cfg.GetInstanceInfo( - self.cfg.ExpandInstanceName(self.op.instance_name)) - if instance is None: - raise errors.OpPrereqError("Instance '%s' not known" % - self.op.instance_name) - self.instance = instance - + self.instance = self.cfg.GetInstanceInfo(self.op.instance_name) + assert self.instance is not None, \ + "Cannot retrieve locked instance %s" % self.op.instance_name def Exec(self, feedback_fn): """Activate the disks. @@ -1957,6 +1977,16 @@ class LUDeactivateInstanceDisks(NoHooksLU): """ _OP_REQP = ["instance_name"] + REQ_BGL = False + + def ExpandNames(self): + self._ExpandAndLockInstance() + self.needed_locks[locking.LEVEL_NODE] = [] + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + + def DeclareLocks(self, level): + if level == locking.LEVEL_NODE: + self._LockInstancesNodes() def CheckPrereq(self): """Check prerequisites. @@ -1964,29 +1994,36 @@ class LUDeactivateInstanceDisks(NoHooksLU): This checks that the instance is in the cluster. """ - instance = self.cfg.GetInstanceInfo( - self.cfg.ExpandInstanceName(self.op.instance_name)) - if instance is None: - raise errors.OpPrereqError("Instance '%s' not known" % - self.op.instance_name) - self.instance = instance + self.instance = self.cfg.GetInstanceInfo(self.op.instance_name) + assert self.instance is not None, \ + "Cannot retrieve locked instance %s" % self.op.instance_name def Exec(self, feedback_fn): """Deactivate the disks """ instance = self.instance - ins_l = rpc.call_instance_list([instance.primary_node]) - ins_l = ins_l[instance.primary_node] - if not type(ins_l) is list: - raise errors.OpExecError("Can't contact node '%s'" % - instance.primary_node) + _SafeShutdownInstanceDisks(instance, self.cfg) - if self.instance.name in ins_l: - raise errors.OpExecError("Instance is running, can't shutdown" - " block devices.") - _ShutdownInstanceDisks(instance, self.cfg) +def _SafeShutdownInstanceDisks(instance, cfg): + """Shutdown block devices of an instance. + + This function checks if an instance is running, before calling + _ShutdownInstanceDisks. + + """ + ins_l = rpc.call_instance_list([instance.primary_node]) + ins_l = ins_l[instance.primary_node] + if not type(ins_l) is list: + raise errors.OpExecError("Can't contact node '%s'" % + instance.primary_node) + + if instance.name in ins_l: + raise errors.OpExecError("Instance is running, can't shutdown" + " block devices.") + + _ShutdownInstanceDisks(instance, cfg) def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False): @@ -2047,6 +2084,16 @@ class LUStartupInstance(LogicalUnit): HPATH = "instance-start" HTYPE = constants.HTYPE_INSTANCE _OP_REQP = ["instance_name", "force"] + REQ_BGL = False + + def ExpandNames(self): + self._ExpandAndLockInstance() + self.needed_locks[locking.LEVEL_NODE] = [] + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + + def DeclareLocks(self, level): + if level == locking.LEVEL_NODE: + self._LockInstancesNodes() def BuildHooksEnv(self): """Build hooks env. @@ -2068,11 +2115,9 @@ class LUStartupInstance(LogicalUnit): This checks that the instance is in the cluster. """ - instance = self.cfg.GetInstanceInfo( - self.cfg.ExpandInstanceName(self.op.instance_name)) - if instance is None: - raise errors.OpPrereqError("Instance '%s' not known" % - self.op.instance_name) + self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name) + assert self.instance is not None, \ + "Cannot retrieve locked instance %s" % self.op.instance_name # check bridges existance _CheckInstanceBridgesExist(instance) @@ -2081,9 +2126,6 @@ class LUStartupInstance(LogicalUnit): "starting instance %s" % instance.name, instance.memory) - self.instance = instance - self.op.instance_name = instance.name - def Exec(self, feedback_fn): """Start the instance. @@ -2110,6 +2152,24 @@ class LURebootInstance(LogicalUnit): HPATH = "instance-reboot" HTYPE = constants.HTYPE_INSTANCE _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"] + REQ_BGL = False + + def ExpandNames(self): + if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT, + constants.INSTANCE_REBOOT_HARD, + constants.INSTANCE_REBOOT_FULL]: + raise errors.ParameterError("reboot type not in [%s, %s, %s]" % + (constants.INSTANCE_REBOOT_SOFT, + constants.INSTANCE_REBOOT_HARD, + constants.INSTANCE_REBOOT_FULL)) + self._ExpandAndLockInstance() + self.needed_locks[locking.LEVEL_NODE] = [] + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + + def DeclareLocks(self, level): + if level == locking.LEVEL_NODE: + primary_only = not constants.INSTANCE_REBOOT_FULL + self._LockInstancesNodes(primary_only=primary_only) def BuildHooksEnv(self): """Build hooks env. @@ -2131,18 +2191,13 @@ class LURebootInstance(LogicalUnit): This checks that the instance is in the cluster. """ - instance = self.cfg.GetInstanceInfo( - self.cfg.ExpandInstanceName(self.op.instance_name)) - if instance is None: - raise errors.OpPrereqError("Instance '%s' not known" % - self.op.instance_name) + self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name) + assert self.instance is not None, \ + "Cannot retrieve locked instance %s" % self.op.instance_name # check bridges existance _CheckInstanceBridgesExist(instance) - self.instance = instance - self.op.instance_name = instance.name - def Exec(self, feedback_fn): """Reboot the instance. @@ -2154,14 +2209,6 @@ class LURebootInstance(LogicalUnit): node_current = instance.primary_node - if reboot_type not in [constants.INSTANCE_REBOOT_SOFT, - constants.INSTANCE_REBOOT_HARD, - constants.INSTANCE_REBOOT_FULL]: - raise errors.ParameterError("reboot type not in [%s, %s, %s]" % - (constants.INSTANCE_REBOOT_SOFT, - constants.INSTANCE_REBOOT_HARD, - constants.INSTANCE_REBOOT_FULL)) - if reboot_type in [constants.INSTANCE_REBOOT_SOFT, constants.INSTANCE_REBOOT_HARD]: if not rpc.call_instance_reboot(node_current, instance, @@ -2186,6 +2233,16 @@ class LUShutdownInstance(LogicalUnit): HPATH = "instance-stop" HTYPE = constants.HTYPE_INSTANCE _OP_REQP = ["instance_name"] + REQ_BGL = False + + def ExpandNames(self): + self._ExpandAndLockInstance() + self.needed_locks[locking.LEVEL_NODE] = [] + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + + def DeclareLocks(self, level): + if level == locking.LEVEL_NODE: + self._LockInstancesNodes() def BuildHooksEnv(self): """Build hooks env. @@ -2204,12 +2261,9 @@ class LUShutdownInstance(LogicalUnit): This checks that the instance is in the cluster. """ - instance = self.cfg.GetInstanceInfo( - self.cfg.ExpandInstanceName(self.op.instance_name)) - if instance is None: - raise errors.OpPrereqError("Instance '%s' not known" % - self.op.instance_name) - self.instance = instance + self.instance = self.cfg.GetInstanceInfo(self.op.instance_name) + assert self.instance is not None, \ + "Cannot retrieve locked instance %s" % self.op.instance_name def Exec(self, feedback_fn): """Shutdown the instance. @@ -2231,6 +2285,16 @@ class LUReinstallInstance(LogicalUnit): HPATH = "instance-reinstall" HTYPE = constants.HTYPE_INSTANCE _OP_REQP = ["instance_name"] + REQ_BGL = False + + def ExpandNames(self): + self._ExpandAndLockInstance() + self.needed_locks[locking.LEVEL_NODE] = [] + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + + def DeclareLocks(self, level): + if level == locking.LEVEL_NODE: + self._LockInstancesNodes() def BuildHooksEnv(self): """Build hooks env. @@ -2249,11 +2313,10 @@ class LUReinstallInstance(LogicalUnit): This checks that the instance is in the cluster and is not running. """ - instance = self.cfg.GetInstanceInfo( - self.cfg.ExpandInstanceName(self.op.instance_name)) - if instance is None: - raise errors.OpPrereqError("Instance '%s' not known" % - self.op.instance_name) + instance = self.cfg.GetInstanceInfo(self.op.instance_name) + assert instance is not None, \ + "Cannot retrieve locked instance %s" % self.op.instance_name + if instance.disk_template == constants.DT_DISKLESS: raise errors.OpPrereqError("Instance '%s' has no disks" % self.op.instance_name) @@ -2354,9 +2417,7 @@ class LURenameInstance(LogicalUnit): new_name) if not getattr(self.op, "ignore_ip", False): - command = ["fping", "-q", name_info.ip] - result = utils.RunCmd(command) - if not result.failed: + if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT): raise errors.OpPrereqError("IP %s of instance %s already in use" % (name_info.ip, new_name)) @@ -2372,6 +2433,9 @@ class LURenameInstance(LogicalUnit): old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1]) self.cfg.RenameInstance(inst.name, self.op.new_name) + # Change the instance lock. This is definitely safe while we hold the BGL + self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name) + self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name) # re-read the instance from the configuration after rename inst = self.cfg.GetInstanceInfo(self.op.new_name) @@ -2399,8 +2463,8 @@ class LURenameInstance(LogicalUnit): try: if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name, "sda", "sdb"): - msg = ("Could run OS rename script for instance %s on node %s (but the" - " instance has been renamed in Ganeti)" % + msg = ("Could not run OS rename script for instance %s on node %s" + " (but the instance has been renamed in Ganeti)" % (inst.name, inst.primary_node)) logger.Error(msg) finally: @@ -2464,6 +2528,8 @@ class LURemoveInstance(LogicalUnit): logger.Info("removing instance %s out of cluster config" % instance.name) self.cfg.RemoveInstance(instance.name) + # Remove the new instance from the Ganeti Lock Manager + self.context.glm.remove(locking.LEVEL_INSTANCE, instance.name) class LUQueryInstances(NoHooksLU): @@ -2471,30 +2537,60 @@ class LUQueryInstances(NoHooksLU): """ _OP_REQP = ["output_fields", "names"] + REQ_BGL = False - def CheckPrereq(self): - """Check prerequisites. - - This checks that the fields required are valid output fields. - - """ + def ExpandNames(self): self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"]) - _CheckOutputFields(static=["name", "os", "pnode", "snodes", - "admin_state", "admin_ram", - "disk_template", "ip", "mac", "bridge", - "sda_size", "sdb_size", "vcpus"], + self.static_fields = frozenset([ + "name", "os", "pnode", "snodes", + "admin_state", "admin_ram", + "disk_template", "ip", "mac", "bridge", + "sda_size", "sdb_size", "vcpus", "tags", + "auto_balance", + "network_port", "kernel_path", "initrd_path", + "hvm_boot_order", "hvm_acpi", "hvm_pae", + "hvm_cdrom_image_path", "hvm_nic_type", + "hvm_disk_type", "vnc_bind_address", + ]) + _CheckOutputFields(static=self.static_fields, dynamic=self.dynamic_fields, selected=self.op.output_fields) - self.wanted = _GetWantedInstances(self, self.op.names) + self.needed_locks = {} + self.share_locks[locking.LEVEL_INSTANCE] = 1 + self.share_locks[locking.LEVEL_NODE] = 1 + + if self.op.names: + self.wanted = _GetWantedInstances(self, self.op.names) + else: + self.wanted = locking.ALL_SET + + self.do_locking = not self.static_fields.issuperset(self.op.output_fields) + if self.do_locking: + self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted + self.needed_locks[locking.LEVEL_NODE] = [] + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + + def DeclareLocks(self, level): + if level == locking.LEVEL_NODE and self.do_locking: + self._LockInstancesNodes() + + def CheckPrereq(self): + """Check prerequisites. + + """ + pass def Exec(self, feedback_fn): """Computes the list of nodes and their attributes. """ - instance_names = self.wanted - instance_list = [self.cfg.GetInstanceInfo(iname) for iname - in instance_names] + all_info = self.cfg.GetAllInstancesInfo() + if self.do_locking: + instance_names = self.acquired_locks[locking.LEVEL_INSTANCE] + else: + instance_names = all_info.keys() + instance_list = [all_info[iname] for iname in instance_names] # begin data gathering @@ -2575,6 +2671,20 @@ class LUQueryInstances(NoHooksLU): val = disk.size elif field == "vcpus": val = instance.vcpus + elif field == "tags": + val = list(instance.GetTags()) + elif field in ("network_port", "kernel_path", "initrd_path", + "hvm_boot_order", "hvm_acpi", "hvm_pae", + "hvm_cdrom_image_path", "hvm_nic_type", + "hvm_disk_type", "vnc_bind_address"): + val = getattr(instance, field, None) + if val is not None: + pass + elif field in ("hvm_nic_type", "hvm_disk_type", + "kernel_path", "initrd_path"): + val = "default" + else: + val = "-" else: raise errors.ParameterError(field) iout.append(val) @@ -2590,6 +2700,16 @@ class LUFailoverInstance(LogicalUnit): HPATH = "instance-failover" HTYPE = constants.HTYPE_INSTANCE _OP_REQP = ["instance_name", "ignore_consistency"] + REQ_BGL = False + + def ExpandNames(self): + self._ExpandAndLockInstance() + self.needed_locks[locking.LEVEL_NODE] = [] + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + + def DeclareLocks(self, level): + if level == locking.LEVEL_NODE: + self._LockInstancesNodes() def BuildHooksEnv(self): """Build hooks env. @@ -2610,11 +2730,9 @@ class LUFailoverInstance(LogicalUnit): This checks that the instance is in the cluster. """ - instance = self.cfg.GetInstanceInfo( - self.cfg.ExpandInstanceName(self.op.instance_name)) - if instance is None: - raise errors.OpPrereqError("Instance '%s' not known" % - self.op.instance_name) + self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name) + assert self.instance is not None, \ + "Cannot retrieve locked instance %s" % self.op.instance_name if instance.disk_template not in constants.DTS_NET_MIRROR: raise errors.OpPrereqError("Instance's disk layout is not" @@ -2637,8 +2755,6 @@ class LUFailoverInstance(LogicalUnit): " exist on destination node '%s'" % (brlist, target_node)) - self.instance = instance - def Exec(self, feedback_fn): """Failover an instance. @@ -2762,22 +2878,6 @@ def _GenerateUniqueNames(cfg, exts): return results -def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names): - """Generate a drbd device complete with its children. - - """ - port = cfg.AllocatePort() - vgname = cfg.GetVGName() - dev_data = objects.Disk(dev_type=constants.LD_LV, size=size, - logical_id=(vgname, names[0])) - dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128, - logical_id=(vgname, names[1])) - drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size, - logical_id = (primary, secondary, port), - children = [dev_data, dev_meta]) - return drbd_dev - - def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name): """Generate a drbd8 device complete with its children. @@ -3046,7 +3146,7 @@ class LUCreateInstance(LogicalUnit): # set optional parameters to none if they don't exist for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode", "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path", - "vnc_bind_address"]: + "hvm_nic_type", "hvm_disk_type", "vnc_bind_address"]: if not hasattr(self.op, attr): setattr(self.op, attr, None) @@ -3261,6 +3361,15 @@ class LUCreateInstance(LogicalUnit): " like a valid IP address" % self.op.vnc_bind_address) + # Xen HVM device type checks + if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31: + if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES: + raise errors.OpPrereqError("Invalid NIC type %s specified for Xen HVM" + " hypervisor" % self.op.hvm_nic_type) + if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES: + raise errors.OpPrereqError("Invalid disk type %s specified for Xen HVM" + " hypervisor" % self.op.hvm_disk_type) + if self.op.start: self.instance_status = 'up' else: @@ -3326,6 +3435,8 @@ class LUCreateInstance(LogicalUnit): hvm_pae=self.op.hvm_pae, hvm_cdrom_image_path=self.op.hvm_cdrom_image_path, vnc_bind_address=self.op.vnc_bind_address, + hvm_nic_type=self.op.hvm_nic_type, + hvm_disk_type=self.op.hvm_disk_type, ) feedback_fn("* creating instance disks...") @@ -3336,6 +3447,8 @@ class LUCreateInstance(LogicalUnit): feedback_fn("adding instance %s to cluster config" % instance) self.cfg.AddInstance(iobj) + # Add the new instance to the Ganeti Lock Manager + self.context.glm.add(locking.LEVEL_INSTANCE, instance) if self.op.wait_for_sync: disk_abort = not _WaitForSync(self.cfg, iobj, self.proc) @@ -3350,6 +3463,8 @@ class LUCreateInstance(LogicalUnit): if disk_abort: _RemoveDisks(iobj, self.cfg) self.cfg.RemoveInstance(iobj.name) + # Remove the new instance from the Ganeti Lock Manager + self.context.glm.remove(locking.LEVEL_INSTANCE, iobj.name) raise errors.OpExecError("There are some degraded disks for" " this instance") @@ -3394,6 +3509,10 @@ class LUConnectConsole(NoHooksLU): """ _OP_REQP = ["instance_name"] + REQ_BGL = False + + def ExpandNames(self): + self._ExpandAndLockInstance() def CheckPrereq(self): """Check prerequisites. @@ -3401,12 +3520,9 @@ class LUConnectConsole(NoHooksLU): This checks that the instance is in the cluster. """ - instance = self.cfg.GetInstanceInfo( - self.cfg.ExpandInstanceName(self.op.instance_name)) - if instance is None: - raise errors.OpPrereqError("Instance '%s' not known" % - self.op.instance_name) - self.instance = instance + self.instance = self.cfg.GetInstanceInfo(self.op.instance_name) + assert self.instance is not None, \ + "Cannot retrieve locked instance %s" % self.op.instance_name def Exec(self, feedback_fn): """Connect to the console of an instance @@ -3438,6 +3554,38 @@ class LUReplaceDisks(LogicalUnit): HPATH = "mirrors-replace" HTYPE = constants.HTYPE_INSTANCE _OP_REQP = ["instance_name", "mode", "disks"] + REQ_BGL = False + + def ExpandNames(self): + self._ExpandAndLockInstance() + + if not hasattr(self.op, "remote_node"): + self.op.remote_node = None + + ia_name = getattr(self.op, "iallocator", None) + if ia_name is not None: + if self.op.remote_node is not None: + raise errors.OpPrereqError("Give either the iallocator or the new" + " secondary, not both") + self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET + elif self.op.remote_node is not None: + remote_node = self.cfg.ExpandNodeName(self.op.remote_node) + if remote_node is None: + raise errors.OpPrereqError("Node '%s' not known" % + self.op.remote_node) + self.op.remote_node = remote_node + self.needed_locks[locking.LEVEL_NODE] = [remote_node] + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND + else: + self.needed_locks[locking.LEVEL_NODE] = [] + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + + def DeclareLocks(self, level): + # If we're not already locking all nodes in the set we have to declare the + # instance's primary/secondary nodes. + if (level == locking.LEVEL_NODE and + self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET): + self._LockInstancesNodes() def _RunAllocator(self): """Compute a new secondary node using an IAllocator. @@ -3488,16 +3636,10 @@ class LUReplaceDisks(LogicalUnit): This checks that the instance is in the cluster. """ - if not hasattr(self.op, "remote_node"): - self.op.remote_node = None - - instance = self.cfg.GetInstanceInfo( - self.cfg.ExpandInstanceName(self.op.instance_name)) - if instance is None: - raise errors.OpPrereqError("Instance '%s' not known" % - self.op.instance_name) + instance = self.cfg.GetInstanceInfo(self.op.instance_name) + assert instance is not None, \ + "Cannot retrieve locked instance %s" % self.op.instance_name self.instance = instance - self.op.instance_name = instance.name if instance.disk_template not in constants.DTS_NET_MIRROR: raise errors.OpPrereqError("Instance's disk layout is not" @@ -3512,18 +3654,13 @@ class LUReplaceDisks(LogicalUnit): ia_name = getattr(self.op, "iallocator", None) if ia_name is not None: - if self.op.remote_node is not None: - raise errors.OpPrereqError("Give either the iallocator or the new" - " secondary, not both") - self.op.remote_node = self._RunAllocator() + self._RunAllocator() remote_node = self.op.remote_node if remote_node is not None: - remote_node = self.cfg.ExpandNodeName(remote_node) - if remote_node is None: - raise errors.OpPrereqError("Node '%s' not known" % - self.op.remote_node) self.remote_node_info = self.cfg.GetNodeInfo(remote_node) + assert self.remote_node_info is not None, \ + "Cannot retrieve locked node %s" % remote_node else: self.remote_node_info = None if remote_node == instance.primary_node: @@ -3564,7 +3701,6 @@ class LUReplaceDisks(LogicalUnit): if instance.FindDisk(name) is None: raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" % (name, instance.name)) - self.op.remote_node = remote_node def _ExecD8DiskOnly(self, feedback_fn): """Replace a disk on the primary or secondary for dbrd8. @@ -3910,8 +4046,7 @@ class LUReplaceDisks(LogicalUnit): # Activate the instance disks if we're replacing them on a down instance if instance.status == "down": - op = opcodes.OpActivateInstanceDisks(instance_name=instance.name) - self.proc.ChainOpCode(op) + _StartInstanceDisks(self.cfg, instance, True) if instance.disk_template == constants.DT_DRBD8: if self.op.remote_node is None: @@ -3925,39 +4060,146 @@ class LUReplaceDisks(LogicalUnit): # Deactivate the instance disks if we're replacing them on a down instance if instance.status == "down": - op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name) - self.proc.ChainOpCode(op) + _SafeShutdownInstanceDisks(instance, self.cfg) return ret -class LUQueryInstanceData(NoHooksLU): - """Query runtime instance data. +class LUGrowDisk(LogicalUnit): + """Grow a disk of an instance. """ - _OP_REQP = ["instances"] + HPATH = "disk-grow" + HTYPE = constants.HTYPE_INSTANCE + _OP_REQP = ["instance_name", "disk", "amount"] + REQ_BGL = False + + def ExpandNames(self): + self._ExpandAndLockInstance() + self.needed_locks[locking.LEVEL_NODE] = [] + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + + def DeclareLocks(self, level): + if level == locking.LEVEL_NODE: + self._LockInstancesNodes() + + def BuildHooksEnv(self): + """Build hooks env. + + This runs on the master, the primary and all the secondaries. + + """ + env = { + "DISK": self.op.disk, + "AMOUNT": self.op.amount, + } + env.update(_BuildInstanceHookEnvByObject(self.instance)) + nl = [ + self.sstore.GetMasterNode(), + self.instance.primary_node, + ] + return env, nl, nl def CheckPrereq(self): """Check prerequisites. - This only checks the optional instance list against the existing names. + This checks that the instance is in the cluster. """ + instance = self.cfg.GetInstanceInfo(self.op.instance_name) + assert instance is not None, \ + "Cannot retrieve locked instance %s" % self.op.instance_name + + self.instance = instance + + if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8): + raise errors.OpPrereqError("Instance's disk layout does not support" + " growing.") + + if instance.FindDisk(self.op.disk) is None: + raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" % + (self.op.disk, instance.name)) + + nodenames = [instance.primary_node] + list(instance.secondary_nodes) + nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName()) + for node in nodenames: + info = nodeinfo.get(node, None) + if not info: + raise errors.OpPrereqError("Cannot get current information" + " from node '%s'" % node) + vg_free = info.get('vg_free', None) + if not isinstance(vg_free, int): + raise errors.OpPrereqError("Can't compute free disk space on" + " node %s" % node) + if self.op.amount > info['vg_free']: + raise errors.OpPrereqError("Not enough disk space on target node %s:" + " %d MiB available, %d MiB required" % + (node, info['vg_free'], self.op.amount)) + + def Exec(self, feedback_fn): + """Execute disk grow. + + """ + instance = self.instance + disk = instance.FindDisk(self.op.disk) + for node in (instance.secondary_nodes + (instance.primary_node,)): + self.cfg.SetDiskID(disk, node) + result = rpc.call_blockdev_grow(node, disk, self.op.amount) + if not result or not isinstance(result, (list, tuple)) or len(result) != 2: + raise errors.OpExecError("grow request failed to node %s" % node) + elif not result[0]: + raise errors.OpExecError("grow request failed to node %s: %s" % + (node, result[1])) + disk.RecordGrow(self.op.amount) + self.cfg.Update(instance) + return + + +class LUQueryInstanceData(NoHooksLU): + """Query runtime instance data. + + """ + _OP_REQP = ["instances"] + REQ_BGL = False + def ExpandNames(self): + self.needed_locks = {} + self.share_locks = dict(((i, 1) for i in locking.LEVELS)) + if not isinstance(self.op.instances, list): raise errors.OpPrereqError("Invalid argument type 'instances'") + if self.op.instances: - self.wanted_instances = [] - names = self.op.instances - for name in names: - instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name)) - if instance is None: - raise errors.OpPrereqError("No such instance name '%s'" % name) - self.wanted_instances.append(instance) + self.wanted_names = [] + for name in self.op.instances: + full_name = self.cfg.ExpandInstanceName(name) + if full_name is None: + raise errors.OpPrereqError("Instance '%s' not known" % + self.op.instance_name) + self.wanted_names.append(full_name) + self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names else: - self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name - in self.cfg.GetInstanceList()] - return + self.wanted_names = None + self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET + + self.needed_locks[locking.LEVEL_NODE] = [] + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + + def DeclareLocks(self, level): + if level == locking.LEVEL_NODE: + self._LockInstancesNodes() + def CheckPrereq(self): + """Check prerequisites. + + This only checks the optional instance list against the existing names. + + """ + if self.wanted_names is None: + self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE] + + self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name + in self.wanted_names] + return def _ComputeDiskStatus(self, instance, snode, dev): """Compute block device status. @@ -4037,9 +4279,28 @@ class LUQueryInstanceData(NoHooksLU): idict["hvm_acpi"] = instance.hvm_acpi idict["hvm_pae"] = instance.hvm_pae idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path + idict["hvm_nic_type"] = instance.hvm_nic_type + idict["hvm_disk_type"] = instance.hvm_disk_type if htkind in constants.HTS_REQ_PORT: - idict["vnc_bind_address"] = instance.vnc_bind_address + if instance.vnc_bind_address is None: + vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS + else: + vnc_bind_address = instance.vnc_bind_address + if instance.network_port is None: + vnc_console_port = None + elif vnc_bind_address == constants.BIND_ADDRESS_GLOBAL: + vnc_console_port = "%s:%s" % (instance.primary_node, + instance.network_port) + elif vnc_bind_address == constants.LOCALHOST_IP_ADDRESS: + vnc_console_port = "%s:%s on node %s" % (vnc_bind_address, + instance.network_port, + instance.primary_node) + else: + vnc_console_port = "%s:%s" % (instance.vnc_bind_address, + instance.network_port) + idict["vnc_console_port"] = vnc_console_port + idict["vnc_bind_address"] = vnc_bind_address idict["network_port"] = instance.network_port result[instance.name] = idict @@ -4054,6 +4315,10 @@ class LUSetInstanceParams(LogicalUnit): HPATH = "instance-modify" HTYPE = constants.HTYPE_INSTANCE _OP_REQP = ["instance_name"] + REQ_BGL = False + + def ExpandNames(self): + self._ExpandAndLockInstance() def BuildHooksEnv(self): """Build hooks env. @@ -4091,6 +4356,9 @@ class LUSetInstanceParams(LogicalUnit): This only checks the instance list against the existing names. """ + # FIXME: all the parameters could be checked before, in ExpandNames, or in + # a separate CheckArguments function, if we implement one, so the operation + # can be aborted without waiting for any lock, should it have an error... self.mem = getattr(self.op, "mem", None) self.vcpus = getattr(self.op, "vcpus", None) self.ip = getattr(self.op, "ip", None) @@ -4101,12 +4369,15 @@ class LUSetInstanceParams(LogicalUnit): self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None) self.hvm_acpi = getattr(self.op, "hvm_acpi", None) self.hvm_pae = getattr(self.op, "hvm_pae", None) + self.hvm_nic_type = getattr(self.op, "hvm_nic_type", None) + self.hvm_disk_type = getattr(self.op, "hvm_disk_type", None) self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None) self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None) + self.force = getattr(self.op, "force", None) all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac, self.kernel_path, self.initrd_path, self.hvm_boot_order, self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path, - self.vnc_bind_address] + self.vnc_bind_address, self.hvm_nic_type, self.hvm_disk_type] if all_parms.count(None) == len(all_parms): raise errors.OpPrereqError("No changes submitted") if self.mem is not None: @@ -4168,11 +4439,13 @@ class LUSetInstanceParams(LogicalUnit): # hvm_cdrom_image_path verification if self.op.hvm_cdrom_image_path is not None: - if not os.path.isabs(self.op.hvm_cdrom_image_path): + if not (os.path.isabs(self.op.hvm_cdrom_image_path) or + self.op.hvm_cdrom_image_path.lower() == "none"): raise errors.OpPrereqError("The path to the HVM CDROM image must" " be an absolute path or None, not %s" % self.op.hvm_cdrom_image_path) - if not os.path.isfile(self.op.hvm_cdrom_image_path): + if not (os.path.isfile(self.op.hvm_cdrom_image_path) or + self.op.hvm_cdrom_image_path.lower() == "none"): raise errors.OpPrereqError("The HVM CDROM image must either be a" " regular file or a symlink pointing to" " an existing regular file, not %s" % @@ -4185,13 +4458,52 @@ class LUSetInstanceParams(LogicalUnit): " like a valid IP address" % self.op.vnc_bind_address) - instance = self.cfg.GetInstanceInfo( - self.cfg.ExpandInstanceName(self.op.instance_name)) - if instance is None: - raise errors.OpPrereqError("No such instance name '%s'" % - self.op.instance_name) - self.op.instance_name = instance.name - self.instance = instance + instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name) + assert self.instance is not None, \ + "Cannot retrieve locked instance %s" % self.op.instance_name + self.warn = [] + if self.mem is not None and not self.force: + pnode = self.instance.primary_node + nodelist = [pnode] + nodelist.extend(instance.secondary_nodes) + instance_info = rpc.call_instance_info(pnode, instance.name) + nodeinfo = rpc.call_node_info(nodelist, self.cfg.GetVGName()) + + if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict): + # Assume the primary node is unreachable and go ahead + self.warn.append("Can't get info from primary node %s" % pnode) + else: + if instance_info: + current_mem = instance_info['memory'] + else: + # Assume instance not running + # (there is a slight race condition here, but it's not very probable, + # and we have no other way to check) + current_mem = 0 + miss_mem = self.mem - current_mem - nodeinfo[pnode]['memory_free'] + if miss_mem > 0: + raise errors.OpPrereqError("This change will prevent the instance" + " from starting, due to %d MB of memory" + " missing on its primary node" % miss_mem) + + for node in instance.secondary_nodes: + if node not in nodeinfo or not isinstance(nodeinfo[node], dict): + self.warn.append("Can't get info from secondary node %s" % node) + elif self.mem > nodeinfo[node]['memory_free']: + self.warn.append("Not enough memory to failover instance to secondary" + " node %s" % node) + + # Xen HVM device type checks + if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31: + if self.op.hvm_nic_type is not None: + if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES: + raise errors.OpPrereqError("Invalid NIC type %s specified for Xen" + " HVM hypervisor" % self.op.hvm_nic_type) + if self.op.hvm_disk_type is not None: + if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES: + raise errors.OpPrereqError("Invalid disk type %s specified for Xen" + " HVM hypervisor" % self.op.hvm_disk_type) + return def Exec(self, feedback_fn): @@ -4199,6 +4511,11 @@ class LUSetInstanceParams(LogicalUnit): All parameters take effect only at the next restart of the instance. """ + # Process here the warnings from CheckPrereq, as we don't have a + # feedback_fn there. + for warn in self.warn: + feedback_fn("WARNING: %s" % warn) + result = [] instance = self.instance if self.mem: @@ -4228,20 +4545,29 @@ class LUSetInstanceParams(LogicalUnit): else: instance.hvm_boot_order = self.hvm_boot_order result.append(("hvm_boot_order", self.hvm_boot_order)) - if self.hvm_acpi: + if self.hvm_acpi is not None: instance.hvm_acpi = self.hvm_acpi result.append(("hvm_acpi", self.hvm_acpi)) - if self.hvm_pae: + if self.hvm_pae is not None: instance.hvm_pae = self.hvm_pae result.append(("hvm_pae", self.hvm_pae)) + if self.hvm_nic_type is not None: + instance.hvm_nic_type = self.hvm_nic_type + result.append(("hvm_nic_type", self.hvm_nic_type)) + if self.hvm_disk_type is not None: + instance.hvm_disk_type = self.hvm_disk_type + result.append(("hvm_disk_type", self.hvm_disk_type)) if self.hvm_cdrom_image_path: - instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path + if self.hvm_cdrom_image_path == constants.VALUE_NONE: + instance.hvm_cdrom_image_path = None + else: + instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path)) if self.vnc_bind_address: instance.vnc_bind_address = self.vnc_bind_address result.append(("vnc_bind_address", self.vnc_bind_address)) - self.cfg.AddInstance(instance) + self.cfg.Update(instance) return result @@ -4250,13 +4576,23 @@ class LUQueryExports(NoHooksLU): """Query the exports list """ - _OP_REQP = [] + _OP_REQP = ['nodes'] + REQ_BGL = False + + def ExpandNames(self): + self.needed_locks = {} + self.share_locks[locking.LEVEL_NODE] = 1 + if not self.op.nodes: + self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET + else: + self.needed_locks[locking.LEVEL_NODE] = \ + _GetWantedNodes(self, self.op.nodes) def CheckPrereq(self): - """Check that the nodelist contains only existing nodes. + """Check prerequisites. """ - self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None)) + self.nodes = self.acquired_locks[locking.LEVEL_NODE] def Exec(self, feedback_fn): """Compute the list of all the exported system images. @@ -4277,6 +4613,23 @@ class LUExportInstance(LogicalUnit): HPATH = "instance-export" HTYPE = constants.HTYPE_INSTANCE _OP_REQP = ["instance_name", "target_node", "shutdown"] + REQ_BGL = False + + def ExpandNames(self): + self._ExpandAndLockInstance() + # FIXME: lock only instance primary and destination node + # + # Sad but true, for now we have do lock all nodes, as we don't know where + # the previous export might be, and and in this LU we search for it and + # remove it from its current node. In the future we could fix this by: + # - making a tasklet to search (share-lock all), then create the new one, + # then one to remove, after + # - removing the removal operation altoghether + self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET + + def DeclareLocks(self, level): + """Last minute lock declaration.""" + # All nodes are locked anyway, so nothing to do here. def BuildHooksEnv(self): """Build hooks env. @@ -4299,20 +4652,16 @@ class LUExportInstance(LogicalUnit): This checks that the instance and node names are valid. """ - instance_name = self.cfg.ExpandInstanceName(self.op.instance_name) + instance_name = self.op.instance_name self.instance = self.cfg.GetInstanceInfo(instance_name) - if self.instance is None: - raise errors.OpPrereqError("Instance '%s' not found" % - self.op.instance_name) + assert self.instance is not None, \ + "Cannot retrieve locked instance %s" % self.op.instance_name - # node verification - dst_node_short = self.cfg.ExpandNodeName(self.op.target_node) - self.dst_node = self.cfg.GetNodeInfo(dst_node_short) + self.dst_node = self.cfg.GetNodeInfo( + self.cfg.ExpandNodeName(self.op.target_node)) - if self.dst_node is None: - raise errors.OpPrereqError("Destination node '%s' is unknown." % - self.op.target_node) - self.op.target_node = self.dst_node.name + assert self.dst_node is not None, \ + "Cannot retrieve locked node %s" % self.op.target_node # instance disk type verification for disk in self.instance.disks: @@ -4330,8 +4679,8 @@ class LUExportInstance(LogicalUnit): if self.op.shutdown: # shutdown the instance, but not the disks if not rpc.call_instance_shutdown(src_node, instance): - raise errors.OpExecError("Could not shutdown instance %s on node %s" % - (instance.name, src_node)) + raise errors.OpExecError("Could not shutdown instance %s on node %s" % + (instance.name, src_node)) vgname = self.cfg.GetVGName() @@ -4380,8 +4729,7 @@ class LUExportInstance(LogicalUnit): # if we proceed the backup would be removed because OpQueryExports # substitutes an empty list with the full cluster node list. if nodelist: - op = opcodes.OpQueryExports(nodes=nodelist) - exportlist = self.proc.ChainOpCode(op) + exportlist = rpc.call_export_list(nodelist) for node in exportlist: if instance.name in exportlist[node]: if not rpc.call_export_remove(node, instance.name): @@ -4412,8 +4760,7 @@ class LURemoveExport(NoHooksLU): fqdn_warn = True instance_name = self.op.instance_name - op = opcodes.OpQueryExports(nodes=[]) - exportlist = self.proc.ChainOpCode(op) + exportlist = rpc.call_export_list(self.cfg.GetNodeList()) found = False for node in exportlist: if instance_name in exportlist[node]: @@ -4469,7 +4816,7 @@ class LUGetTags(TagsLU): """Returns the tag list. """ - return self.target.GetTags() + return list(self.target.GetTags()) class LUSearchTags(NoHooksLU): @@ -4578,25 +4925,35 @@ class LUDelTags(TagsLU): " config file and the operation has been" " aborted. Please retry.") + class LUTestDelay(NoHooksLU): """Sleep for a specified amount of time. - This LU sleeps on the master and/or nodes for a specified amoutn of + This LU sleeps on the master and/or nodes for a specified amount of time. """ _OP_REQP = ["duration", "on_master", "on_nodes"] + REQ_BGL = False - def CheckPrereq(self): - """Check prerequisites. + def ExpandNames(self): + """Expand names and set required locks. - This checks that we have a good list of nodes and/or the duration - is valid. + This expands the node list, if any. """ - + self.needed_locks = {} if self.op.on_nodes: + # _GetWantedNodes can be used here, but is not always appropriate to use + # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for + # more information. self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes) + self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes + + def CheckPrereq(self): + """Check prerequisites. + + """ def Exec(self, feedback_fn): """Do the actual sleep. @@ -4846,7 +5203,7 @@ class IAllocator(object): result = call_fn(self.sstore.GetMasterNode(), name, self.in_text) - if not isinstance(result, tuple) or len(result) != 4: + if not isinstance(result, (list, tuple)) or len(result) != 4: raise errors.OpExecError("Invalid result from master iallocator runner") rcode, stdout, stderr, fail = result @@ -4854,9 +5211,8 @@ class IAllocator(object): if rcode == constants.IARUN_NOTFOUND: raise errors.OpExecError("Can't find allocator '%s'" % name) elif rcode == constants.IARUN_FAILURE: - raise errors.OpExecError("Instance allocator call failed: %s," - " output: %s" % - (fail, stdout+stderr)) + raise errors.OpExecError("Instance allocator call failed: %s," + " output: %s" % (fail, stdout+stderr)) self.out_text = stdout if validate: self._ValidateResult()