X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/e54c4c5e2633cd458020ed65339d22a8e55cf79d..acec9d51f4aea4b4571fcee477bea935487b8b83:/lib/cmdlib.py?ds=sidebyside diff --git a/lib/cmdlib.py b/lib/cmdlib.py index 45a0bfb..596b0de 100644 --- a/lib/cmdlib.py +++ b/lib/cmdlib.py @@ -1,7 +1,7 @@ # # -# Copyright (C) 2006, 2007 Google Inc. +# Copyright (C) 2006, 2007, 2008 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -42,6 +42,7 @@ from ganeti import constants from ganeti import objects from ganeti import opcodes from ganeti import ssconf +from ganeti import serializer class LogicalUnit(object): @@ -53,15 +54,18 @@ class LogicalUnit(object): - implement Exec - implement BuildHooksEnv - redefine HPATH and HTYPE - - optionally redefine their run requirements (REQ_CLUSTER, - REQ_MASTER); note that all commands require root permissions + - optionally redefine their run requirements: + REQ_MASTER: the LU needs to run on the master node + REQ_WSSTORE: the LU needs a writable SimpleStore + + Note that all commands require root permissions. """ HPATH = None HTYPE = None _OP_REQP = [] - REQ_CLUSTER = True REQ_MASTER = True + REQ_WSSTORE = False def __init__(self, processor, op, cfg, sstore): """Constructor for LogicalUnit. @@ -81,15 +85,15 @@ class LogicalUnit(object): if attr_val is None: raise errors.OpPrereqError("Required parameter '%s' missing" % attr_name) - if self.REQ_CLUSTER: - if not cfg.IsCluster(): - raise errors.OpPrereqError("Cluster not initialized yet," - " use 'gnt-cluster init' first.") - if self.REQ_MASTER: - master = sstore.GetMasterNode() - if master != utils.HostInfo().name: - raise errors.OpPrereqError("Commands must be run on the master" - " node %s" % master) + + if not cfg.IsCluster(): + raise errors.OpPrereqError("Cluster not initialized yet," + " use 'gnt-cluster init' first.") + if self.REQ_MASTER: + master = sstore.GetMasterNode() + if master != utils.HostInfo().name: + raise errors.OpPrereqError("Commands must be run on the master" + " node %s" % master) def __GetSSH(self): """Returns the SshRunner object @@ -144,11 +148,7 @@ class LogicalUnit(object): added by the hooks runner. If the LU doesn't define any environment, an empty dict (and not None) should be returned. - As for the node lists, the master should not be included in the - them, as it will be added by the hooks runner in case this LU - requires a cluster to run on (otherwise we don't have a node - list). No nodes should be returned as an empty list (and not - None). + No nodes should be returned as an empty list (and not None). Note that if the HPATH for a LU class is None, this function will not be called. @@ -156,6 +156,24 @@ class LogicalUnit(object): """ raise NotImplementedError + def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result): + """Notify the LU about the results of its hooks. + + This method is called every time a hooks phase is executed, and notifies + the Logical Unit about the hooks' result. The LU can then use it to alter + its result based on the hooks. By default the method does nothing and the + previous result is passed back unchanged but any LU can define it if it + wants to use the local cluster hook-scripts somehow. + + Args: + phase: the hooks phase that has just been run + hooks_results: the results of the multi-node hooks rpc call + feedback_fn: function to send feedback back to the caller + lu_result: the previous result this LU had, or None in the PRE phase. + + """ + return lu_result + class NoHooksLU(LogicalUnit): """Simple LU which runs no hooks. @@ -167,31 +185,6 @@ class NoHooksLU(LogicalUnit): HPATH = None HTYPE = None - def BuildHooksEnv(self): - """Build hooks env. - - This is a no-op, since we don't run hooks. - - """ - return {}, [], [] - - -def _AddHostToEtcHosts(hostname): - """Wrapper around utils.SetEtcHostsEntry. - - """ - hi = utils.HostInfo(name=hostname) - utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()]) - - -def _RemoveHostFromEtcHosts(hostname): - """Wrapper around utils.RemoveEtcHostsEntry. - - """ - hi = utils.HostInfo(name=hostname) - utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name) - utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName()) - def _GetWantedNodes(lu, nodes): """Returns list of checked and expanded node names. @@ -316,85 +309,6 @@ def _BuildInstanceHookEnvByObject(instance, override=None): return _BuildInstanceHookEnv(**args) -def _HasValidVG(vglist, vgname): - """Checks if the volume group list is valid. - - A non-None return value means there's an error, and the return value - is the error message. - - """ - vgsize = vglist.get(vgname, None) - if vgsize is None: - return "volume group '%s' missing" % vgname - elif vgsize < 20480: - return ("volume group '%s' too small (20480MiB required, %dMib found)" % - (vgname, vgsize)) - return None - - -def _InitSSHSetup(node): - """Setup the SSH configuration for the cluster. - - - This generates a dsa keypair for root, adds the pub key to the - permitted hosts and adds the hostkey to its own known hosts. - - Args: - node: the name of this host as a fqdn - - """ - priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS) - - for name in priv_key, pub_key: - if os.path.exists(name): - utils.CreateBackup(name) - utils.RemoveFile(name) - - result = utils.RunCmd(["ssh-keygen", "-t", "dsa", - "-f", priv_key, - "-q", "-N", ""]) - if result.failed: - raise errors.OpExecError("Could not generate ssh keypair, error %s" % - result.output) - - f = open(pub_key, 'r') - try: - utils.AddAuthorizedKey(auth_keys, f.read(8192)) - finally: - f.close() - - -def _InitGanetiServerSetup(ss): - """Setup the necessary configuration for the initial node daemon. - - This creates the nodepass file containing the shared password for - the cluster and also generates the SSL certificate. - - """ - # Create pseudo random password - randpass = sha.new(os.urandom(64)).hexdigest() - # and write it into sstore - ss.SetKey(ss.SS_NODED_PASS, randpass) - - result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024", - "-days", str(365*5), "-nodes", "-x509", - "-keyout", constants.SSL_CERT_FILE, - "-out", constants.SSL_CERT_FILE, "-batch"]) - if result.failed: - raise errors.OpExecError("could not generate server ssl cert, command" - " %s had exitcode %s and error message %s" % - (result.cmd, result.exit_code, result.output)) - - os.chmod(constants.SSL_CERT_FILE, 0400) - - result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"]) - - if result.failed: - raise errors.OpExecError("Could not start the node daemon, command %s" - " had exitcode %s and error %s" % - (result.cmd, result.exit_code, result.output)) - - def _CheckInstanceBridgesExist(instance): """Check that the brigdes needed by an instance exist. @@ -407,160 +321,6 @@ def _CheckInstanceBridgesExist(instance): (brlist, instance.primary_node)) -class LUInitCluster(LogicalUnit): - """Initialise the cluster. - - """ - HPATH = "cluster-init" - HTYPE = constants.HTYPE_CLUSTER - _OP_REQP = ["cluster_name", "hypervisor_type", "mac_prefix", - "def_bridge", "master_netdev", "file_storage_dir"] - REQ_CLUSTER = False - - def BuildHooksEnv(self): - """Build hooks env. - - Notes: Since we don't require a cluster, we must manually add - ourselves in the post-run node list. - - """ - env = {"OP_TARGET": self.op.cluster_name} - return env, [], [self.hostname.name] - - def CheckPrereq(self): - """Verify that the passed name is a valid one. - - """ - if config.ConfigWriter.IsCluster(): - raise errors.OpPrereqError("Cluster is already initialised") - - if self.op.hypervisor_type == constants.HT_XEN_HVM31: - if not os.path.exists(constants.VNC_PASSWORD_FILE): - raise errors.OpPrereqError("Please prepare the cluster VNC" - "password file %s" % - constants.VNC_PASSWORD_FILE) - - self.hostname = hostname = utils.HostInfo() - - if hostname.ip.startswith("127."): - raise errors.OpPrereqError("This host's IP resolves to the private" - " range (%s). Please fix DNS or %s." % - (hostname.ip, constants.ETC_HOSTS)) - - if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT, - source=constants.LOCALHOST_IP_ADDRESS): - raise errors.OpPrereqError("Inconsistency: this host's name resolves" - " to %s,\nbut this ip address does not" - " belong to this host." - " Aborting." % hostname.ip) - - self.clustername = clustername = utils.HostInfo(self.op.cluster_name) - - if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT, - timeout=5): - raise errors.OpPrereqError("Cluster IP already active. Aborting.") - - secondary_ip = getattr(self.op, "secondary_ip", None) - if secondary_ip and not utils.IsValidIP(secondary_ip): - raise errors.OpPrereqError("Invalid secondary ip given") - if (secondary_ip and - secondary_ip != hostname.ip and - (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT, - source=constants.LOCALHOST_IP_ADDRESS))): - raise errors.OpPrereqError("You gave %s as secondary IP," - " but it does not belong to this host." % - secondary_ip) - self.secondary_ip = secondary_ip - - if not hasattr(self.op, "vg_name"): - self.op.vg_name = None - # if vg_name not None, checks if volume group is valid - if self.op.vg_name: - vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name) - if vgstatus: - raise errors.OpPrereqError("Error: %s\nspecify --no-lvm-storage if" - " you are not using lvm" % vgstatus) - - self.op.file_storage_dir = os.path.normpath(self.op.file_storage_dir) - - if not os.path.isabs(self.op.file_storage_dir): - raise errors.OpPrereqError("The file storage directory you have is" - " not an absolute path.") - - if not os.path.exists(self.op.file_storage_dir): - try: - os.makedirs(self.op.file_storage_dir, 0750) - except OSError, err: - raise errors.OpPrereqError("Cannot create file storage directory" - " '%s': %s" % - (self.op.file_storage_dir, err)) - - if not os.path.isdir(self.op.file_storage_dir): - raise errors.OpPrereqError("The file storage directory '%s' is not" - " a directory." % self.op.file_storage_dir) - - if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$", - self.op.mac_prefix): - raise errors.OpPrereqError("Invalid mac prefix given '%s'" % - self.op.mac_prefix) - - if self.op.hypervisor_type not in constants.HYPER_TYPES: - raise errors.OpPrereqError("Invalid hypervisor type given '%s'" % - self.op.hypervisor_type) - - result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev]) - if result.failed: - raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" % - (self.op.master_netdev, - result.output.strip())) - - if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and - os.access(constants.NODE_INITD_SCRIPT, os.X_OK)): - raise errors.OpPrereqError("Init.d script '%s' missing or not" - " executable." % constants.NODE_INITD_SCRIPT) - - def Exec(self, feedback_fn): - """Initialize the cluster. - - """ - clustername = self.clustername - hostname = self.hostname - - # set up the simple store - self.sstore = ss = ssconf.SimpleStore() - ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type) - ss.SetKey(ss.SS_MASTER_NODE, hostname.name) - ss.SetKey(ss.SS_MASTER_IP, clustername.ip) - ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev) - ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name) - ss.SetKey(ss.SS_FILE_STORAGE_DIR, self.op.file_storage_dir) - - # set up the inter-node password and certificate - _InitGanetiServerSetup(ss) - - # start the master ip - rpc.call_node_start_master(hostname.name) - - # set up ssh config and /etc/hosts - f = open(constants.SSH_HOST_RSA_PUB, 'r') - try: - sshline = f.read() - finally: - f.close() - sshkey = sshline.split(" ")[1] - - _AddHostToEtcHosts(hostname.name) - _InitSSHSetup(hostname.name) - - # init of cluster config file - self.cfg = cfgw = config.ConfigWriter() - cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip, - sshkey, self.op.mac_prefix, - self.op.vg_name, self.op.def_bridge) - - ssh.WriteKnownHostsFile(cfgw, ss, constants.SSH_KNOWN_HOSTS_FILE) - - class LUDestroyCluster(NoHooksLU): """Logical unit for destroying the cluster. @@ -599,10 +359,12 @@ class LUDestroyCluster(NoHooksLU): rpc.call_node_leave_cluster(master) -class LUVerifyCluster(NoHooksLU): +class LUVerifyCluster(LogicalUnit): """Verifies the cluster status. """ + HPATH = "cluster-verify" + HTYPE = constants.HTYPE_CLUSTER _OP_REQP = ["skip_checks"] def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result, @@ -640,7 +402,8 @@ class LUVerifyCluster(NoHooksLU): (node,)) bad = True else: - vgstatus = _HasValidVG(vglist, self.cfg.GetVGName()) + vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(), + constants.MIN_VG_SIZE) if vgstatus: feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node)) bad = True @@ -663,13 +426,24 @@ class LUVerifyCluster(NoHooksLU): if 'nodelist' not in node_result: bad = True - feedback_fn(" - ERROR: node hasn't returned node connectivity data") + feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data") else: if node_result['nodelist']: bad = True for node in node_result['nodelist']: - feedback_fn(" - ERROR: communication with node '%s': %s" % + feedback_fn(" - ERROR: ssh communication with node '%s': %s" % (node, node_result['nodelist'][node])) + if 'node-net-test' not in node_result: + bad = True + feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data") + else: + if node_result['node-net-test']: + bad = True + nlist = utils.NiceSort(node_result['node-net-test'].keys()) + for node in nlist: + feedback_fn(" - ERROR: tcp communication with node '%s': %s" % + (node, node_result['node-net-test'][node])) + hyp_result = node_result.get('hypervisor', None) if hyp_result is not None: feedback_fn(" - ERROR: hypervisor verify failure: '%s'" % hyp_result) @@ -784,6 +558,18 @@ class LUVerifyCluster(NoHooksLU): if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set): raise errors.OpPrereqError("Invalid checks to be skipped specified") + def BuildHooksEnv(self): + """Build hooks env. + + Cluster-Verify hooks just rone in the post phase and their failure makes + the output be logged in the verify output and the verification to fail. + + """ + all_nodes = self.cfg.GetNodeList() + # TODO: populate the environment with useful information for verify hooks + env = {} + return env, [], all_nodes + def Exec(self, feedback_fn): """Verify integrity of cluster, performing various test on nodes. @@ -795,6 +581,7 @@ class LUVerifyCluster(NoHooksLU): vg_name = self.cfg.GetVGName() nodelist = utils.NiceSort(self.cfg.GetNodeList()) + nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist] instancelist = utils.NiceSort(self.cfg.GetInstanceList()) i_non_redundant = [] # Non redundant instances node_volume = {} @@ -817,6 +604,8 @@ class LUVerifyCluster(NoHooksLU): 'filelist': file_names, 'nodelist': nodelist, 'hypervisor': None, + 'node-net-test': [(node.name, node.primary_ip, node.secondary_ip) + for node in nodeinfo] } all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param) all_rversion = rpc.call_version(nodelist) @@ -943,6 +732,47 @@ class LUVerifyCluster(NoHooksLU): return int(bad) + def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result): + """Analize the post-hooks' result, handle it, and send some + nicely-formatted feedback back to the user. + + Args: + phase: the hooks phase that has just been run + hooks_results: the results of the multi-node hooks rpc call + feedback_fn: function to send feedback back to the caller + lu_result: previous Exec result + + """ + # We only really run POST phase hooks, and are only interested in their results + if phase == constants.HOOKS_PHASE_POST: + # Used to change hooks' output to proper indentation + indent_re = re.compile('^', re.M) + feedback_fn("* Hooks Results") + if not hooks_results: + feedback_fn(" - ERROR: general communication failure") + lu_result = 1 + else: + for node_name in hooks_results: + show_node_header = True + res = hooks_results[node_name] + if res is False or not isinstance(res, list): + feedback_fn(" Communication failure") + lu_result = 1 + continue + for script, hkr, output in res: + if hkr == constants.HKR_FAIL: + # The node header is only shown once, if there are + # failing hooks on that node + if show_node_header: + feedback_fn(" Node %s:" % node_name) + show_node_header = False + feedback_fn(" ERROR: Script %s failed, output:" % script) + output = indent_re.sub(' ', output) + feedback_fn("%s" % output) + lu_result = 1 + + return lu_result + class LUVerifyDisks(NoHooksLU): """Verifies the cluster disks status. @@ -1023,6 +853,7 @@ class LURenameCluster(LogicalUnit): HPATH = "cluster-rename" HTYPE = constants.HTYPE_CLUSTER _OP_REQP = ["name"] + REQ_WSSTORE = True def BuildHooksEnv(self): """Build hooks env. @@ -1152,7 +983,8 @@ class LUSetClusterParams(LogicalUnit): node_list = self.cfg.GetNodeList() vglist = rpc.call_vg_list(node_list) for node in node_list: - vgstatus = _HasValidVG(vglist[node], self.op.vg_name) + vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name, + constants.MIN_VG_SIZE) if vgstatus: raise errors.OpPrereqError("Error on node '%s': %s" % (node, vgstatus)) @@ -1220,12 +1052,14 @@ def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False): break if unlock: - utils.Unlock('cmd') + #utils.Unlock('cmd') + pass try: time.sleep(min(60, max_time)) finally: if unlock: - utils.Lock('cmd') + #utils.Lock('cmd') + pass if done: proc.LogInfo("Instance %s's disks are in sync." % instance.name) @@ -1302,14 +1136,14 @@ class LUDiagnoseOS(NoHooksLU): for node_name, nr in rlist.iteritems(): if not nr: continue - for os in nr: - if os.name not in all_os: + for os_obj in nr: + if os_obj.name not in all_os: # build a list of nodes for this os containing empty lists # for each node in node_list - all_os[os.name] = {} + all_os[os_obj.name] = {} for nname in node_list: - all_os[os.name][nname] = [] - all_os[os.name][node_name].append(os) + all_os[os_obj.name][nname] = [] + all_os[os_obj.name][node_name].append(os_obj) return all_os def Exec(self, feedback_fn): @@ -1413,7 +1247,7 @@ class LURemoveNode(LogicalUnit): self.cfg.RemoveNode(node.name) - _RemoveHostFromEtcHosts(node.name) + utils.RemoveHostFromEtcHosts(node.name) class LUQueryNodes(NoHooksLU): @@ -1428,9 +1262,12 @@ class LUQueryNodes(NoHooksLU): This checks that the fields required are valid output fields. """ - self.dynamic_fields = frozenset(["dtotal", "dfree", - "mtotal", "mnode", "mfree", - "bootid"]) + self.dynamic_fields = frozenset([ + "dtotal", "dfree", + "mtotal", "mnode", "mfree", + "bootid", + "ctotal", + ]) _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt", "pinst_list", "sinst_list", @@ -1461,6 +1298,7 @@ class LUQueryNodes(NoHooksLU): "mfree": utils.TryConvert(int, nodeinfo['memory_free']), "dtotal": utils.TryConvert(int, nodeinfo['vg_size']), "dfree": utils.TryConvert(int, nodeinfo['vg_free']), + "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']), "bootid": nodeinfo['bootid'], } else: @@ -1632,13 +1470,24 @@ class LUAddNode(LogicalUnit): if not utils.IsValidIP(secondary_ip): raise errors.OpPrereqError("Invalid secondary IP given") self.op.secondary_ip = secondary_ip + node_list = cfg.GetNodeList() - if node in node_list: - raise errors.OpPrereqError("Node %s is already in the configuration" - % node) + if not self.op.readd and node in node_list: + raise errors.OpPrereqError("Node %s is already in the configuration" % + node) + elif self.op.readd and node not in node_list: + raise errors.OpPrereqError("Node %s is not in the configuration" % node) for existing_node_name in node_list: existing_node = cfg.GetNodeInfo(existing_node_name) + + if self.op.readd and node == existing_node_name: + if (existing_node.primary_ip != primary_ip or + existing_node.secondary_ip != secondary_ip): + raise errors.OpPrereqError("Readded node doesn't have the same IP" + " address configuration as before") + continue + if (existing_node.primary_ip == primary_ip or existing_node.secondary_ip == primary_ip or existing_node.primary_ip == secondary_ip or @@ -1760,7 +1609,7 @@ class LUAddNode(LogicalUnit): raise errors.OpExecError("Cannot transfer ssh keys to the new node") # Add node to our /etc/hosts, and add key to known_hosts - _AddHostToEtcHosts(new_node.name) + utils.AddHostToEtcHosts(new_node.name) if new_node.secondary_ip != new_node.primary_ip: if not rpc.call_node_tcp_ping(new_node.name, @@ -1782,7 +1631,9 @@ class LUAddNode(LogicalUnit): # Distribute updated /etc/hosts and known_hosts to all nodes, # including the node just added myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode()) - dist_nodes = self.cfg.GetNodeList() + [node] + dist_nodes = self.cfg.GetNodeList() + if not self.op.readd: + dist_nodes.append(node) if myself.name in dist_nodes: dist_nodes.remove(myself.name) @@ -1801,8 +1652,9 @@ class LUAddNode(LogicalUnit): if not self.ssh.CopyFileToNode(node, fname): logger.Error("could not copy file %s to node %s" % (fname, node)) - logger.Info("adding node %s to cluster.conf" % node) - self.cfg.AddNode(new_node) + if not self.op.readd: + logger.Info("adding node %s to cluster.conf" % node) + self.cfg.AddNode(new_node) class LUMasterFailover(LogicalUnit): @@ -1814,6 +1666,7 @@ class LUMasterFailover(LogicalUnit): HPATH = "master-failover" HTYPE = constants.HTYPE_CLUSTER REQ_MASTER = False + REQ_WSSTORE = True _OP_REQP = [] def BuildHooksEnv(self): @@ -1902,6 +1755,7 @@ class LUQueryClusterInfo(NoHooksLU): "export_version": constants.EXPORT_VERSION, "master": self.sstore.GetMasterNode(), "architecture": (platform.architecture()[0], platform.machine()), + "hypervisor_type": self.sstore.GetHypervisorType(), } return result @@ -2503,7 +2357,7 @@ class LURenameInstance(LogicalUnit): instance_list = self.cfg.GetInstanceList() if new_name in instance_list: raise errors.OpPrereqError("Instance '%s' is already in the cluster" % - instance_name) + new_name) if not getattr(self.op, "ignore_ip", False): command = ["fping", "-q", name_info.ip] @@ -2565,7 +2419,7 @@ class LURemoveInstance(LogicalUnit): """ HPATH = "instance-remove" HTYPE = constants.HTYPE_INSTANCE - _OP_REQP = ["instance_name"] + _OP_REQP = ["instance_name", "ignore_failures"] def BuildHooksEnv(self): """Build hooks env. @@ -2775,7 +2629,7 @@ class LUFailoverInstance(LogicalUnit): secondary_nodes = instance.secondary_nodes if not secondary_nodes: raise errors.ProgrammerError("no secondary node but using " - "DT_REMOTE_RAID1 template") + "a mirrored disk template") target_node = secondary_nodes[0] # check memory requirements on the secondary node @@ -2805,7 +2659,7 @@ class LUFailoverInstance(LogicalUnit): feedback_fn("* checking disk consistency between source and target") for dev in instance.disks: - # for remote_raid1, these are md over drbd + # for drbd, these are drbd over lvm if not _CheckDiskConsistency(self.cfg, dev, target_node, False): if instance.status == "up" and not self.op.ignore_consistency: raise errors.OpExecError("Disk %s is degraded on target node," @@ -2830,7 +2684,7 @@ class LUFailoverInstance(LogicalUnit): instance.primary_node = target_node # distribute new instance config to the other nodes - self.cfg.AddInstance(instance) + self.cfg.Update(instance) # Only start the instance if it's marked as up if instance.status == "up": @@ -3025,7 +2879,7 @@ def _CreateDisks(cfg, instance): file_storage_dir) if not result: - logger.Error("Could not connect to node '%s'" % inst.primary_node) + logger.Error("Could not connect to node '%s'" % instance.primary_node) return False if not result[0]: @@ -3089,16 +2943,76 @@ def _RemoveDisks(instance, cfg): return result +def _ComputeDiskSize(disk_template, disk_size, swap_size): + """Compute disk size requirements in the volume group + + This is currently hard-coded for the two-drive layout. + + """ + # Required free disk space as a function of disk and swap space + req_size_dict = { + constants.DT_DISKLESS: None, + constants.DT_PLAIN: disk_size + swap_size, + # 256 MB are added for drbd metadata, 128MB for each drbd device + constants.DT_DRBD8: disk_size + swap_size + 256, + constants.DT_FILE: None, + } + + if disk_template not in req_size_dict: + raise errors.ProgrammerError("Disk template '%s' size requirement" + " is unknown" % disk_template) + + return req_size_dict[disk_template] + + class LUCreateInstance(LogicalUnit): """Create an instance. """ HPATH = "instance-add" HTYPE = constants.HTYPE_INSTANCE - _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode", + _OP_REQP = ["instance_name", "mem_size", "disk_size", "disk_template", "swap_size", "mode", "start", "vcpus", "wait_for_sync", "ip_check", "mac"] + def _RunAllocator(self): + """Run the allocator based on input opcode. + + """ + disks = [{"size": self.op.disk_size, "mode": "w"}, + {"size": self.op.swap_size, "mode": "w"}] + nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None), + "bridge": self.op.bridge}] + ial = IAllocator(self.cfg, self.sstore, + mode=constants.IALLOCATOR_MODE_ALLOC, + name=self.op.instance_name, + disk_template=self.op.disk_template, + tags=[], + os=self.op.os_type, + vcpus=self.op.vcpus, + mem_size=self.op.mem_size, + disks=disks, + nics=nics, + ) + + ial.Run(self.op.iallocator) + + if not ial.success: + raise errors.OpPrereqError("Can't compute nodes using" + " iallocator '%s': %s" % (self.op.iallocator, + ial.info)) + if len(ial.nodes) != ial.required_nodes: + raise errors.OpPrereqError("iallocator '%s' returned invalid number" + " of nodes (%s), required %s" % + (len(ial.nodes), ial.required_nodes)) + self.op.pnode = ial.nodes[0] + logger.ToStdout("Selected nodes for the instance: %s" % + (", ".join(ial.nodes),)) + logger.Info("Selected nodes for instance %s via iallocator %s: %s" % + (self.op.instance_name, self.op.iallocator, ial.nodes)) + if ial.required_nodes == 2: + self.op.snode = ial.nodes[1] + def BuildHooksEnv(self): """Build hooks env. @@ -3135,7 +3049,10 @@ class LUCreateInstance(LogicalUnit): """Check prerequisites. """ - for attr in ["kernel_path", "initrd_path", "hvm_boot_order"]: + # set optional parameters to none if they don't exist + for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode", + "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path", + "vnc_bind_address"]: if not hasattr(self.op, attr): setattr(self.op, attr, None) @@ -3189,27 +3106,91 @@ class LUCreateInstance(LogicalUnit): if getattr(self.op, "os_type", None) is None: raise errors.OpPrereqError("No guest OS specified") - # check primary node - pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode)) - if pnode is None: - raise errors.OpPrereqError("Primary node '%s' is unknown" % - self.op.pnode) - self.op.pnode = pnode.name - self.pnode = pnode - self.secondaries = [] + #### instance parameters check + # disk template and mirror node verification if self.op.disk_template not in constants.DISK_TEMPLATES: raise errors.OpPrereqError("Invalid disk template name") + # instance name verification + hostname1 = utils.HostInfo(self.op.instance_name) + + self.op.instance_name = instance_name = hostname1.name + instance_list = self.cfg.GetInstanceList() + if instance_name in instance_list: + raise errors.OpPrereqError("Instance '%s' is already in the cluster" % + instance_name) + + # ip validity checks + ip = getattr(self.op, "ip", None) + if ip is None or ip.lower() == "none": + inst_ip = None + elif ip.lower() == "auto": + inst_ip = hostname1.ip + else: + if not utils.IsValidIP(ip): + raise errors.OpPrereqError("given IP address '%s' doesn't look" + " like a valid IP" % ip) + inst_ip = ip + self.inst_ip = self.op.ip = inst_ip + + if self.op.start and not self.op.ip_check: + raise errors.OpPrereqError("Cannot ignore IP address conflicts when" + " adding an instance in start mode") + + if self.op.ip_check: + if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT): + raise errors.OpPrereqError("IP %s of instance %s already in use" % + (hostname1.ip, instance_name)) + + # MAC address verification + if self.op.mac != "auto": + if not utils.IsValidMac(self.op.mac.lower()): + raise errors.OpPrereqError("invalid MAC address specified: %s" % + self.op.mac) + + # bridge verification + bridge = getattr(self.op, "bridge", None) + if bridge is None: + self.op.bridge = self.cfg.GetDefBridge() + else: + self.op.bridge = bridge + + # boot order verification + if self.op.hvm_boot_order is not None: + if len(self.op.hvm_boot_order.strip("acdn")) != 0: + raise errors.OpPrereqError("invalid boot order specified," + " must be one or more of [acdn]") + # file storage checks if (self.op.file_driver and not self.op.file_driver in constants.FILE_DRIVER): raise errors.OpPrereqError("Invalid file driver name '%s'" % self.op.file_driver) if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir): - raise errors.OpPrereqError("File storage directory not a relative" - " path") + raise errors.OpPrereqError("File storage directory not a relative" + " path") + #### allocator run + + if [self.op.iallocator, self.op.pnode].count(None) != 1: + raise errors.OpPrereqError("One and only one of iallocator and primary" + " node must be given") + + if self.op.iallocator is not None: + self._RunAllocator() + #### node related checks + + # check primary node + pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode)) + if pnode is None: + raise errors.OpPrereqError("Primary node '%s' is unknown" % + self.op.pnode) + self.op.pnode = pnode.name + self.pnode = pnode + self.secondaries = [] + + # mirror node verification if self.op.disk_template in constants.DTS_NET_MIRROR: if getattr(self.op, "snode", None) is None: raise errors.OpPrereqError("The networked disk templates need" @@ -3224,20 +3205,8 @@ class LUCreateInstance(LogicalUnit): " the primary node.") self.secondaries.append(snode_name) - # Required free disk space as a function of disk and swap space - req_size_dict = { - constants.DT_DISKLESS: None, - constants.DT_PLAIN: self.op.disk_size + self.op.swap_size, - # 256 MB are added for drbd metadata, 128MB for each drbd device - constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256, - constants.DT_FILE: None, - } - - if self.op.disk_template not in req_size_dict: - raise errors.ProgrammerError("Disk template '%s' size requirement" - " is unknown" % self.op.disk_template) - - req_size = req_size_dict[self.op.disk_template] + req_size = _ComputeDiskSize(self.op.disk_template, + self.op.disk_size, self.op.swap_size) # Check lv size requirements if req_size is not None: @@ -3247,7 +3216,7 @@ class LUCreateInstance(LogicalUnit): info = nodeinfo.get(node, None) if not info: raise errors.OpPrereqError("Cannot get current information" - " from node '%s'" % nodeinfo) + " from node '%s'" % node) vg_free = info.get('vg_free', None) if not isinstance(vg_free, int): raise errors.OpPrereqError("Can't compute free disk space on" @@ -3266,59 +3235,37 @@ class LUCreateInstance(LogicalUnit): if self.op.kernel_path == constants.VALUE_NONE: raise errors.OpPrereqError("Can't set instance kernel to none") - # instance verification - hostname1 = utils.HostInfo(self.op.instance_name) - - self.op.instance_name = instance_name = hostname1.name - instance_list = self.cfg.GetInstanceList() - if instance_name in instance_list: - raise errors.OpPrereqError("Instance '%s' is already in the cluster" % - instance_name) - - ip = getattr(self.op, "ip", None) - if ip is None or ip.lower() == "none": - inst_ip = None - elif ip.lower() == "auto": - inst_ip = hostname1.ip - else: - if not utils.IsValidIP(ip): - raise errors.OpPrereqError("given IP address '%s' doesn't look" - " like a valid IP" % ip) - inst_ip = ip - self.inst_ip = inst_ip - - if self.op.start and not self.op.ip_check: - raise errors.OpPrereqError("Cannot ignore IP address conflicts when" - " adding an instance in start mode") - - if self.op.ip_check: - if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT): - raise errors.OpPrereqError("IP %s of instance %s already in use" % - (hostname1.ip, instance_name)) - - # MAC address verification - if self.op.mac != "auto": - if not utils.IsValidMac(self.op.mac.lower()): - raise errors.OpPrereqError("invalid MAC address specified: %s" % - self.op.mac) - - # bridge verification - bridge = getattr(self.op, "bridge", None) - if bridge is None: - self.op.bridge = self.cfg.GetDefBridge() - else: - self.op.bridge = bridge + # bridge check on primary node if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]): raise errors.OpPrereqError("target bridge '%s' does not exist on" " destination node '%s'" % (self.op.bridge, pnode.name)) - # boot order verification - if self.op.hvm_boot_order is not None: - if len(self.op.hvm_boot_order.strip("acdn")) != 0: - raise errors.OpPrereqError("invalid boot order specified," - " must be one or more of [acdn]") + # memory check on primary node + if self.op.start: + _CheckNodeFreeMemory(self.cfg, self.pnode.name, + "creating instance %s" % self.op.instance_name, + self.op.mem_size) + + # hvm_cdrom_image_path verification + if self.op.hvm_cdrom_image_path is not None: + if not os.path.isabs(self.op.hvm_cdrom_image_path): + raise errors.OpPrereqError("The path to the HVM CDROM image must" + " be an absolute path or None, not %s" % + self.op.hvm_cdrom_image_path) + if not os.path.isfile(self.op.hvm_cdrom_image_path): + raise errors.OpPrereqError("The HVM CDROM image must either be a" + " regular file or a symlink pointing to" + " an existing regular file, not %s" % + self.op.hvm_cdrom_image_path) + + # vnc_bind_address verification + if self.op.vnc_bind_address is not None: + if not utils.IsValidIP(self.op.vnc_bind_address): + raise errors.OpPrereqError("given VNC bind address '%s' doesn't look" + " like a valid IP address" % + self.op.vnc_bind_address) if self.op.start: self.instance_status = 'up' @@ -3347,6 +3294,9 @@ class LUCreateInstance(LogicalUnit): else: network_port = None + if self.op.vnc_bind_address is None: + self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS + # this is needed because os.path.join does not accept None arguments if self.op.file_storage_dir is None: string_file_storage_dir = "" @@ -3378,6 +3328,10 @@ class LUCreateInstance(LogicalUnit): kernel_path=self.op.kernel_path, initrd_path=self.op.initrd_path, hvm_boot_order=self.op.hvm_boot_order, + hvm_acpi=self.op.hvm_acpi, + hvm_pae=self.op.hvm_pae, + hvm_cdrom_image_path=self.op.hvm_cdrom_image_path, + vnc_bind_address=self.op.vnc_bind_address, ) feedback_fn("* creating instance disks...") @@ -3491,6 +3445,29 @@ class LUReplaceDisks(LogicalUnit): HTYPE = constants.HTYPE_INSTANCE _OP_REQP = ["instance_name", "mode", "disks"] + def _RunAllocator(self): + """Compute a new secondary node using an IAllocator. + + """ + ial = IAllocator(self.cfg, self.sstore, + mode=constants.IALLOCATOR_MODE_RELOC, + name=self.op.instance_name, + relocate_from=[self.sec_node]) + + ial.Run(self.op.iallocator) + + if not ial.success: + raise errors.OpPrereqError("Can't compute nodes using" + " iallocator '%s': %s" % (self.op.iallocator, + ial.info)) + if len(ial.nodes) != ial.required_nodes: + raise errors.OpPrereqError("iallocator '%s' returned invalid number" + " of nodes (%s), required %s" % + (len(ial.nodes), ial.required_nodes)) + self.op.remote_node = ial.nodes[0] + logger.ToStdout("Selected new secondary for the instance: %s" % + self.op.remote_node) + def BuildHooksEnv(self): """Build hooks env. @@ -3517,6 +3494,9 @@ class LUReplaceDisks(LogicalUnit): This checks that the instance is in the cluster. """ + if not hasattr(self.op, "remote_node"): + self.op.remote_node = None + instance = self.cfg.GetInstanceInfo( self.cfg.ExpandInstanceName(self.op.instance_name)) if instance is None: @@ -3536,7 +3516,14 @@ class LUReplaceDisks(LogicalUnit): self.sec_node = instance.secondary_nodes[0] - remote_node = getattr(self.op, "remote_node", None) + ia_name = getattr(self.op, "iallocator", None) + if ia_name is not None: + if self.op.remote_node is not None: + raise errors.OpPrereqError("Give either the iallocator or the new" + " secondary, not both") + self.op.remote_node = self._RunAllocator() + + remote_node = self.op.remote_node if remote_node is not None: remote_node = self.cfg.ExpandNodeName(remote_node) if remote_node is None: @@ -3554,13 +3541,6 @@ class LUReplaceDisks(LogicalUnit): # replacement as for drbd7 (no different port allocated) raise errors.OpPrereqError("Same secondary given, cannot execute" " replacement") - # the user gave the current secondary, switch to - # 'no-replace-secondary' mode for drbd7 - remote_node = None - if (instance.disk_template == constants.DT_REMOTE_RAID1 and - self.op.mode != constants.REPLACE_DISK_ALL): - raise errors.OpPrereqError("Template 'remote_raid1' only allows all" - " disks replacement, not individual ones") if instance.disk_template == constants.DT_DRBD8: if (self.op.mode == constants.REPLACE_DISK_ALL and remote_node is not None): @@ -3592,101 +3572,6 @@ class LUReplaceDisks(LogicalUnit): (name, instance.name)) self.op.remote_node = remote_node - def _ExecRR1(self, feedback_fn): - """Replace the disks of an instance. - - """ - instance = self.instance - iv_names = {} - # start of work - if self.op.remote_node is None: - remote_node = self.sec_node - else: - remote_node = self.op.remote_node - cfg = self.cfg - for dev in instance.disks: - size = dev.size - lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]] - names = _GenerateUniqueNames(cfg, lv_names) - new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node, - remote_node, size, names) - iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd) - logger.Info("adding new mirror component on secondary for %s" % - dev.iv_name) - #HARDCODE - if not _CreateBlockDevOnSecondary(cfg, remote_node, instance, - new_drbd, False, - _GetInstanceInfoText(instance)): - raise errors.OpExecError("Failed to create new component on secondary" - " node %s. Full abort, cleanup manually!" % - remote_node) - - logger.Info("adding new mirror component on primary") - #HARDCODE - if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, - instance, new_drbd, - _GetInstanceInfoText(instance)): - # remove secondary dev - cfg.SetDiskID(new_drbd, remote_node) - rpc.call_blockdev_remove(remote_node, new_drbd) - raise errors.OpExecError("Failed to create volume on primary!" - " Full abort, cleanup manually!!") - - # the device exists now - # call the primary node to add the mirror to md - logger.Info("adding new mirror component to md") - if not rpc.call_blockdev_addchildren(instance.primary_node, dev, - [new_drbd]): - logger.Error("Can't add mirror compoment to md!") - cfg.SetDiskID(new_drbd, remote_node) - if not rpc.call_blockdev_remove(remote_node, new_drbd): - logger.Error("Can't rollback on secondary") - cfg.SetDiskID(new_drbd, instance.primary_node) - if not rpc.call_blockdev_remove(instance.primary_node, new_drbd): - logger.Error("Can't rollback on primary") - raise errors.OpExecError("Full abort, cleanup manually!!") - - dev.children.append(new_drbd) - cfg.AddInstance(instance) - - # this can fail as the old devices are degraded and _WaitForSync - # does a combined result over all disks, so we don't check its - # return value - _WaitForSync(cfg, instance, self.proc, unlock=True) - - # so check manually all the devices - for name in iv_names: - dev, child, new_drbd = iv_names[name] - cfg.SetDiskID(dev, instance.primary_node) - is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5] - if is_degr: - raise errors.OpExecError("MD device %s is degraded!" % name) - cfg.SetDiskID(new_drbd, instance.primary_node) - is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5] - if is_degr: - raise errors.OpExecError("New drbd device %s is degraded!" % name) - - for name in iv_names: - dev, child, new_drbd = iv_names[name] - logger.Info("remove mirror %s component" % name) - cfg.SetDiskID(dev, instance.primary_node) - if not rpc.call_blockdev_removechildren(instance.primary_node, - dev, [child]): - logger.Error("Can't remove child from mirror, aborting" - " *this device cleanup*.\nYou need to cleanup manually!!") - continue - - for node in child.logical_id[:2]: - logger.Info("remove child device on %s" % node) - cfg.SetDiskID(child, node) - if not rpc.call_blockdev_remove(node, child): - logger.Error("Warning: failed to remove device from node %s," - " continuing operation." % node) - - dev.children.remove(child) - - cfg.AddInstance(instance) - def _ExecD8DiskOnly(self, feedback_fn): """Replace a disk on the primary or secondary for dbrd8. @@ -4028,16 +3913,28 @@ class LUReplaceDisks(LogicalUnit): """ instance = self.instance - if instance.disk_template == constants.DT_REMOTE_RAID1: - fn = self._ExecRR1 - elif instance.disk_template == constants.DT_DRBD8: + + # Activate the instance disks if we're replacing them on a down instance + if instance.status == "down": + op = opcodes.OpActivateInstanceDisks(instance_name=instance.name) + self.proc.ChainOpCode(op) + + if instance.disk_template == constants.DT_DRBD8: if self.op.remote_node is None: fn = self._ExecD8DiskOnly else: fn = self._ExecD8Secondary else: raise errors.ProgrammerError("Unhandled disk replacement case") - return fn(feedback_fn) + + ret = fn(feedback_fn) + + # Deactivate the instance disks if we're replacing them on a down instance + if instance.status == "down": + op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name) + self.proc.ChainOpCode(op) + + return ret class LUQueryInstanceData(NoHooksLU): @@ -4133,13 +4030,24 @@ class LUQueryInstanceData(NoHooksLU): "memory": instance.memory, "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics], "disks": disks, - "network_port": instance.network_port, "vcpus": instance.vcpus, - "kernel_path": instance.kernel_path, - "initrd_path": instance.initrd_path, - "hvm_boot_order": instance.hvm_boot_order, } + htkind = self.sstore.GetHypervisorType() + if htkind == constants.HT_XEN_PVM30: + idict["kernel_path"] = instance.kernel_path + idict["initrd_path"] = instance.initrd_path + + if htkind == constants.HT_XEN_HVM31: + idict["hvm_boot_order"] = instance.hvm_boot_order + idict["hvm_acpi"] = instance.hvm_acpi + idict["hvm_pae"] = instance.hvm_pae + idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path + + if htkind in constants.HTS_REQ_PORT: + idict["vnc_bind_address"] = instance.vnc_bind_address + idict["network_port"] = instance.network_port + result[instance.name] = idict return result @@ -4197,9 +4105,15 @@ class LUSetInstanceParams(LogicalUnit): self.kernel_path = getattr(self.op, "kernel_path", None) self.initrd_path = getattr(self.op, "initrd_path", None) self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None) - all_params = [self.mem, self.vcpus, self.ip, self.bridge, self.mac, - self.kernel_path, self.initrd_path, self.hvm_boot_order] - if all_params.count(None) == len(all_params): + self.hvm_acpi = getattr(self.op, "hvm_acpi", None) + self.hvm_pae = getattr(self.op, "hvm_pae", None) + self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None) + self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None) + all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac, + self.kernel_path, self.initrd_path, self.hvm_boot_order, + self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path, + self.vnc_bind_address] + if all_parms.count(None) == len(all_parms): raise errors.OpPrereqError("No changes submitted") if self.mem is not None: try: @@ -4258,6 +4172,25 @@ class LUSetInstanceParams(LogicalUnit): " must be one or more of [acdn]" " or 'default'") + # hvm_cdrom_image_path verification + if self.op.hvm_cdrom_image_path is not None: + if not os.path.isabs(self.op.hvm_cdrom_image_path): + raise errors.OpPrereqError("The path to the HVM CDROM image must" + " be an absolute path or None, not %s" % + self.op.hvm_cdrom_image_path) + if not os.path.isfile(self.op.hvm_cdrom_image_path): + raise errors.OpPrereqError("The HVM CDROM image must either be a" + " regular file or a symlink pointing to" + " an existing regular file, not %s" % + self.op.hvm_cdrom_image_path) + + # vnc_bind_address verification + if self.op.vnc_bind_address is not None: + if not utils.IsValidIP(self.op.vnc_bind_address): + raise errors.OpPrereqError("given VNC bind address '%s' doesn't look" + " like a valid IP address" % + self.op.vnc_bind_address) + instance = self.cfg.GetInstanceInfo( self.cfg.ExpandInstanceName(self.op.instance_name)) if instance is None: @@ -4301,6 +4234,18 @@ class LUSetInstanceParams(LogicalUnit): else: instance.hvm_boot_order = self.hvm_boot_order result.append(("hvm_boot_order", self.hvm_boot_order)) + if self.hvm_acpi: + instance.hvm_acpi = self.hvm_acpi + result.append(("hvm_acpi", self.hvm_acpi)) + if self.hvm_pae: + instance.hvm_pae = self.hvm_pae + result.append(("hvm_pae", self.hvm_pae)) + if self.hvm_cdrom_image_path: + instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path + result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path)) + if self.vnc_bind_address: + instance.vnc_bind_address = self.vnc_bind_address + result.append(("vnc_bind_address", self.vnc_bind_address)) self.cfg.AddInstance(instance) @@ -4357,7 +4302,7 @@ class LUExportInstance(LogicalUnit): def CheckPrereq(self): """Check prerequisites. - This checks that the instance name is a valid one. + This checks that the instance and node names are valid. """ instance_name = self.cfg.ExpandInstanceName(self.op.instance_name) @@ -4375,6 +4320,12 @@ class LUExportInstance(LogicalUnit): self.op.target_node) self.op.target_node = self.dst_node.name + # instance disk type verification + for disk in self.instance.disks: + if disk.dev_type == constants.LD_FILE: + raise errors.OpPrereqError("Export not supported for instances with" + " file-based disks") + def Exec(self, feedback_fn): """Export an instance to an image in the cluster. @@ -4386,7 +4337,7 @@ class LUExportInstance(LogicalUnit): # shutdown the instance, but not the disks if not rpc.call_instance_shutdown(src_node, instance): raise errors.OpExecError("Could not shutdown instance %s on node %s" % - (instance.name, source_node)) + (instance.name, src_node)) vgname = self.cfg.GetVGName() @@ -4444,6 +4395,45 @@ class LUExportInstance(LogicalUnit): " on node %s" % (instance.name, node)) +class LURemoveExport(NoHooksLU): + """Remove exports related to the named instance. + + """ + _OP_REQP = ["instance_name"] + + def CheckPrereq(self): + """Check prerequisites. + """ + pass + + def Exec(self, feedback_fn): + """Remove any export. + + """ + instance_name = self.cfg.ExpandInstanceName(self.op.instance_name) + # If the instance was not found we'll try with the name that was passed in. + # This will only work if it was an FQDN, though. + fqdn_warn = False + if not instance_name: + fqdn_warn = True + instance_name = self.op.instance_name + + op = opcodes.OpQueryExports(nodes=[]) + exportlist = self.proc.ChainOpCode(op) + found = False + for node in exportlist: + if instance_name in exportlist[node]: + found = True + if not rpc.call_export_remove(node, instance_name): + logger.Error("could not remove export for instance %s" + " on node %s" % (instance_name, node)) + + if fqdn_warn and not found: + feedback_fn("Export not found. If trying to remove an export belonging" + " to a deleted instance please use its Fully Qualified" + " Domain Name.") + + class TagsLU(NoHooksLU): """Generic tags LU. @@ -4629,3 +4619,372 @@ class LUTestDelay(NoHooksLU): if not node_result: raise errors.OpExecError("Failure during rpc call to node %s," " result: %s" % (node, node_result)) + + +class IAllocator(object): + """IAllocator framework. + + An IAllocator instance has three sets of attributes: + - cfg/sstore that are needed to query the cluster + - input data (all members of the _KEYS class attribute are required) + - four buffer attributes (in|out_data|text), that represent the + input (to the external script) in text and data structure format, + and the output from it, again in two formats + - the result variables from the script (success, info, nodes) for + easy usage + + """ + _ALLO_KEYS = [ + "mem_size", "disks", "disk_template", + "os", "tags", "nics", "vcpus", + ] + _RELO_KEYS = [ + "relocate_from", + ] + + def __init__(self, cfg, sstore, mode, name, **kwargs): + self.cfg = cfg + self.sstore = sstore + # init buffer variables + self.in_text = self.out_text = self.in_data = self.out_data = None + # init all input fields so that pylint is happy + self.mode = mode + self.name = name + self.mem_size = self.disks = self.disk_template = None + self.os = self.tags = self.nics = self.vcpus = None + self.relocate_from = None + # computed fields + self.required_nodes = None + # init result fields + self.success = self.info = self.nodes = None + if self.mode == constants.IALLOCATOR_MODE_ALLOC: + keyset = self._ALLO_KEYS + elif self.mode == constants.IALLOCATOR_MODE_RELOC: + keyset = self._RELO_KEYS + else: + raise errors.ProgrammerError("Unknown mode '%s' passed to the" + " IAllocator" % self.mode) + for key in kwargs: + if key not in keyset: + raise errors.ProgrammerError("Invalid input parameter '%s' to" + " IAllocator" % key) + setattr(self, key, kwargs[key]) + for key in keyset: + if key not in kwargs: + raise errors.ProgrammerError("Missing input parameter '%s' to" + " IAllocator" % key) + self._BuildInputData() + + def _ComputeClusterData(self): + """Compute the generic allocator input data. + + This is the data that is independent of the actual operation. + + """ + cfg = self.cfg + # cluster data + data = { + "version": 1, + "cluster_name": self.sstore.GetClusterName(), + "cluster_tags": list(cfg.GetClusterInfo().GetTags()), + "hypervisor_type": self.sstore.GetHypervisorType(), + # we don't have job IDs + } + + i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()] + + # node data + node_results = {} + node_list = cfg.GetNodeList() + node_data = rpc.call_node_info(node_list, cfg.GetVGName()) + for nname in node_list: + ninfo = cfg.GetNodeInfo(nname) + if nname not in node_data or not isinstance(node_data[nname], dict): + raise errors.OpExecError("Can't get data for node %s" % nname) + remote_info = node_data[nname] + for attr in ['memory_total', 'memory_free', 'memory_dom0', + 'vg_size', 'vg_free', 'cpu_total']: + if attr not in remote_info: + raise errors.OpExecError("Node '%s' didn't return attribute '%s'" % + (nname, attr)) + try: + remote_info[attr] = int(remote_info[attr]) + except ValueError, err: + raise errors.OpExecError("Node '%s' returned invalid value for '%s':" + " %s" % (nname, attr, str(err))) + # compute memory used by primary instances + i_p_mem = i_p_up_mem = 0 + for iinfo in i_list: + if iinfo.primary_node == nname: + i_p_mem += iinfo.memory + if iinfo.status == "up": + i_p_up_mem += iinfo.memory + + # compute memory used by instances + pnr = { + "tags": list(ninfo.GetTags()), + "total_memory": remote_info['memory_total'], + "reserved_memory": remote_info['memory_dom0'], + "free_memory": remote_info['memory_free'], + "i_pri_memory": i_p_mem, + "i_pri_up_memory": i_p_up_mem, + "total_disk": remote_info['vg_size'], + "free_disk": remote_info['vg_free'], + "primary_ip": ninfo.primary_ip, + "secondary_ip": ninfo.secondary_ip, + "total_cpus": remote_info['cpu_total'], + } + node_results[nname] = pnr + data["nodes"] = node_results + + # instance data + instance_data = {} + for iinfo in i_list: + nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge} + for n in iinfo.nics] + pir = { + "tags": list(iinfo.GetTags()), + "should_run": iinfo.status == "up", + "vcpus": iinfo.vcpus, + "memory": iinfo.memory, + "os": iinfo.os, + "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes), + "nics": nic_data, + "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks], + "disk_template": iinfo.disk_template, + } + instance_data[iinfo.name] = pir + + data["instances"] = instance_data + + self.in_data = data + + def _AddNewInstance(self): + """Add new instance data to allocator structure. + + This in combination with _AllocatorGetClusterData will create the + correct structure needed as input for the allocator. + + The checks for the completeness of the opcode must have already been + done. + + """ + data = self.in_data + if len(self.disks) != 2: + raise errors.OpExecError("Only two-disk configurations supported") + + disk_space = _ComputeDiskSize(self.disk_template, + self.disks[0]["size"], self.disks[1]["size"]) + + if self.disk_template in constants.DTS_NET_MIRROR: + self.required_nodes = 2 + else: + self.required_nodes = 1 + request = { + "type": "allocate", + "name": self.name, + "disk_template": self.disk_template, + "tags": self.tags, + "os": self.os, + "vcpus": self.vcpus, + "memory": self.mem_size, + "disks": self.disks, + "disk_space_total": disk_space, + "nics": self.nics, + "required_nodes": self.required_nodes, + } + data["request"] = request + + def _AddRelocateInstance(self): + """Add relocate instance data to allocator structure. + + This in combination with _IAllocatorGetClusterData will create the + correct structure needed as input for the allocator. + + The checks for the completeness of the opcode must have already been + done. + + """ + instance = self.cfg.GetInstanceInfo(self.name) + if instance is None: + raise errors.ProgrammerError("Unknown instance '%s' passed to" + " IAllocator" % self.name) + + if instance.disk_template not in constants.DTS_NET_MIRROR: + raise errors.OpPrereqError("Can't relocate non-mirrored instances") + + if len(instance.secondary_nodes) != 1: + raise errors.OpPrereqError("Instance has not exactly one secondary node") + + self.required_nodes = 1 + + disk_space = _ComputeDiskSize(instance.disk_template, + instance.disks[0].size, + instance.disks[1].size) + + request = { + "type": "relocate", + "name": self.name, + "disk_space_total": disk_space, + "required_nodes": self.required_nodes, + "relocate_from": self.relocate_from, + } + self.in_data["request"] = request + + def _BuildInputData(self): + """Build input data structures. + + """ + self._ComputeClusterData() + + if self.mode == constants.IALLOCATOR_MODE_ALLOC: + self._AddNewInstance() + else: + self._AddRelocateInstance() + + self.in_text = serializer.Dump(self.in_data) + + def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner): + """Run an instance allocator and return the results. + + """ + data = self.in_text + + result = call_fn(self.sstore.GetMasterNode(), name, self.in_text) + + if not isinstance(result, tuple) or len(result) != 4: + raise errors.OpExecError("Invalid result from master iallocator runner") + + rcode, stdout, stderr, fail = result + + if rcode == constants.IARUN_NOTFOUND: + raise errors.OpExecError("Can't find allocator '%s'" % name) + elif rcode == constants.IARUN_FAILURE: + raise errors.OpExecError("Instance allocator call failed: %s," + " output: %s" % + (fail, stdout+stderr)) + self.out_text = stdout + if validate: + self._ValidateResult() + + def _ValidateResult(self): + """Process the allocator results. + + This will process and if successful save the result in + self.out_data and the other parameters. + + """ + try: + rdict = serializer.Load(self.out_text) + except Exception, err: + raise errors.OpExecError("Can't parse iallocator results: %s" % str(err)) + + if not isinstance(rdict, dict): + raise errors.OpExecError("Can't parse iallocator results: not a dict") + + for key in "success", "info", "nodes": + if key not in rdict: + raise errors.OpExecError("Can't parse iallocator results:" + " missing key '%s'" % key) + setattr(self, key, rdict[key]) + + if not isinstance(rdict["nodes"], list): + raise errors.OpExecError("Can't parse iallocator results: 'nodes' key" + " is not a list") + self.out_data = rdict + + +class LUTestAllocator(NoHooksLU): + """Run allocator tests. + + This LU runs the allocator tests + + """ + _OP_REQP = ["direction", "mode", "name"] + + def CheckPrereq(self): + """Check prerequisites. + + This checks the opcode parameters depending on the director and mode test. + + """ + if self.op.mode == constants.IALLOCATOR_MODE_ALLOC: + for attr in ["name", "mem_size", "disks", "disk_template", + "os", "tags", "nics", "vcpus"]: + if not hasattr(self.op, attr): + raise errors.OpPrereqError("Missing attribute '%s' on opcode input" % + attr) + iname = self.cfg.ExpandInstanceName(self.op.name) + if iname is not None: + raise errors.OpPrereqError("Instance '%s' already in the cluster" % + iname) + if not isinstance(self.op.nics, list): + raise errors.OpPrereqError("Invalid parameter 'nics'") + for row in self.op.nics: + if (not isinstance(row, dict) or + "mac" not in row or + "ip" not in row or + "bridge" not in row): + raise errors.OpPrereqError("Invalid contents of the" + " 'nics' parameter") + if not isinstance(self.op.disks, list): + raise errors.OpPrereqError("Invalid parameter 'disks'") + if len(self.op.disks) != 2: + raise errors.OpPrereqError("Only two-disk configurations supported") + for row in self.op.disks: + if (not isinstance(row, dict) or + "size" not in row or + not isinstance(row["size"], int) or + "mode" not in row or + row["mode"] not in ['r', 'w']): + raise errors.OpPrereqError("Invalid contents of the" + " 'disks' parameter") + elif self.op.mode == constants.IALLOCATOR_MODE_RELOC: + if not hasattr(self.op, "name"): + raise errors.OpPrereqError("Missing attribute 'name' on opcode input") + fname = self.cfg.ExpandInstanceName(self.op.name) + if fname is None: + raise errors.OpPrereqError("Instance '%s' not found for relocation" % + self.op.name) + self.op.name = fname + self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes + else: + raise errors.OpPrereqError("Invalid test allocator mode '%s'" % + self.op.mode) + + if self.op.direction == constants.IALLOCATOR_DIR_OUT: + if not hasattr(self.op, "allocator") or self.op.allocator is None: + raise errors.OpPrereqError("Missing allocator name") + elif self.op.direction != constants.IALLOCATOR_DIR_IN: + raise errors.OpPrereqError("Wrong allocator test '%s'" % + self.op.direction) + + def Exec(self, feedback_fn): + """Run the allocator test. + + """ + if self.op.mode == constants.IALLOCATOR_MODE_ALLOC: + ial = IAllocator(self.cfg, self.sstore, + mode=self.op.mode, + name=self.op.name, + mem_size=self.op.mem_size, + disks=self.op.disks, + disk_template=self.op.disk_template, + os=self.op.os, + tags=self.op.tags, + nics=self.op.nics, + vcpus=self.op.vcpus, + ) + else: + ial = IAllocator(self.cfg, self.sstore, + mode=self.op.mode, + name=self.op.name, + relocate_from=list(self.relocate_from), + ) + + if self.op.direction == constants.IALLOCATOR_DIR_IN: + result = ial.in_text + else: + ial.Run(self.op.allocator, validate=False) + result = ial.out_text + return result