X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/2411519eec8cd9058c19748243d651fefe216440..a0b7d1bda271a27f6c3d50fb098d3d77a5b25255:/lib/cmdlib.py diff --git a/lib/cmdlib.py b/lib/cmdlib.py index ca48386..608937a 100644 --- a/lib/cmdlib.py +++ b/lib/cmdlib.py @@ -1,7 +1,7 @@ # # -# Copyright (C) 2006, 2007 Google Inc. +# Copyright (C) 2006, 2007, 2008 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -30,7 +30,6 @@ import time import tempfile import re import platform -import simplejson from ganeti import rpc from ganeti import ssh @@ -43,14 +42,7 @@ from ganeti import constants from ganeti import objects from ganeti import opcodes from ganeti import ssconf - - -# Check whether the simplejson module supports indentation -_JSON_INDENT = 2 -try: - simplejson.dumps(1, indent=_JSON_INDENT) -except TypeError: - _JSON_INDENT = None +from ganeti import serializer class LogicalUnit(object): @@ -141,11 +133,7 @@ class LogicalUnit(object): added by the hooks runner. If the LU doesn't define any environment, an empty dict (and not None) should be returned. - As for the node lists, the master should not be included in the - them, as it will be added by the hooks runner in case this LU - requires a cluster to run on (otherwise we don't have a node - list). No nodes should be returned as an empty list (and not - None). + No nodes should be returned as an empty list (and not None). Note that if the HPATH for a LU class is None, this function will not be called. @@ -153,6 +141,24 @@ class LogicalUnit(object): """ raise NotImplementedError + def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result): + """Notify the LU about the results of its hooks. + + This method is called every time a hooks phase is executed, and notifies + the Logical Unit about the hooks' result. The LU can then use it to alter + its result based on the hooks. By default the method does nothing and the + previous result is passed back unchanged but any LU can define it if it + wants to use the local cluster hook-scripts somehow. + + Args: + phase: the hooks phase that has just been run + hooks_results: the results of the multi-node hooks rpc call + feedback_fn: function to send feedback back to the caller + lu_result: the previous result this LU had, or None in the PRE phase. + + """ + return lu_result + class NoHooksLU(LogicalUnit): """Simple LU which runs no hooks. @@ -164,14 +170,6 @@ class NoHooksLU(LogicalUnit): HPATH = None HTYPE = None - def BuildHooksEnv(self): - """Build hooks env. - - This is a no-op, since we don't run hooks. - - """ - return {}, [], [] - def _AddHostToEtcHosts(hostname): """Wrapper around utils.SetEtcHostsEntry. @@ -654,11 +652,13 @@ class LUDestroyCluster(NoHooksLU): rpc.call_node_leave_cluster(master) -class LUVerifyCluster(NoHooksLU): +class LUVerifyCluster(LogicalUnit): """Verifies the cluster status. """ - _OP_REQP = [] + HPATH = "cluster-verify" + HTYPE = constants.HTYPE_CLUSTER + _OP_REQP = ["skip_checks"] def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result, remote_version, feedback_fn): @@ -718,13 +718,24 @@ class LUVerifyCluster(NoHooksLU): if 'nodelist' not in node_result: bad = True - feedback_fn(" - ERROR: node hasn't returned node connectivity data") + feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data") else: if node_result['nodelist']: bad = True for node in node_result['nodelist']: - feedback_fn(" - ERROR: communication with node '%s': %s" % + feedback_fn(" - ERROR: ssh communication with node '%s': %s" % (node, node_result['nodelist'][node])) + if 'node-net-test' not in node_result: + bad = True + feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data") + else: + if node_result['node-net-test']: + bad = True + nlist = utils.NiceSort(node_result['node-net-test'].keys()) + for node in nlist: + feedback_fn(" - ERROR: tcp communication with node '%s': %s" % + (node, node_result['node-net-test'][node])) + hyp_result = node_result.get('hypervisor', None) if hyp_result is not None: feedback_fn(" - ERROR: hypervisor verify failure: '%s'" % hyp_result) @@ -740,12 +751,6 @@ class LUVerifyCluster(NoHooksLU): """ bad = False - instancelist = self.cfg.GetInstanceList() - if not instance in instancelist: - feedback_fn(" - ERROR: instance %s not in instance list %s" % - (instance, instancelist)) - bad = True - node_current = instanceconfig.primary_node node_vol_should = {} @@ -806,13 +811,60 @@ class LUVerifyCluster(NoHooksLU): bad = True return bad + def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn): + """Verify N+1 Memory Resilience. + + Check that if one single node dies we can still start all the instances it + was primary for. + + """ + bad = False + + for node, nodeinfo in node_info.iteritems(): + # This code checks that every node which is now listed as secondary has + # enough memory to host all instances it is supposed to should a single + # other node in the cluster fail. + # FIXME: not ready for failover to an arbitrary node + # FIXME: does not support file-backed instances + # WARNING: we currently take into account down instances as well as up + # ones, considering that even if they're down someone might want to start + # them even in the event of a node failure. + for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems(): + needed_mem = 0 + for instance in instances: + if instance_cfg[instance].auto_balance: + needed_mem += instance_cfg[instance].memory + if nodeinfo['mfree'] < needed_mem: + feedback_fn(" - ERROR: not enough memory on node %s to accomodate" + " failovers should node %s fail" % (node, prinode)) + bad = True + return bad + def CheckPrereq(self): """Check prerequisites. - This has no prerequisites. + Transform the list of checks we're going to skip into a set and check that + all its members are valid. """ - pass + self.skip_set = frozenset(self.op.skip_checks) + if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set): + raise errors.OpPrereqError("Invalid checks to be skipped specified") + + def BuildHooksEnv(self): + """Build hooks env. + + Cluster-Verify hooks just rone in the post phase and their failure makes + the output be logged in the verify output and the verification to fail. + + """ + all_nodes = self.cfg.GetNodeList() + tags = self.cfg.GetClusterInfo().GetTags() + # TODO: populate the environment with useful information for verify hooks + env = { + "CLUSTER_TAGS": " ".join(tags), + } + return env, [], all_nodes def Exec(self, feedback_fn): """Verify integrity of cluster, performing various test on nodes. @@ -825,9 +877,14 @@ class LUVerifyCluster(NoHooksLU): vg_name = self.cfg.GetVGName() nodelist = utils.NiceSort(self.cfg.GetNodeList()) + nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist] instancelist = utils.NiceSort(self.cfg.GetInstanceList()) + i_non_redundant = [] # Non redundant instances + i_non_a_balanced = [] # Non auto-balanced instances node_volume = {} node_instance = {} + node_info = {} + instance_cfg = {} # FIXME: verify OS list # do local checksums @@ -844,9 +901,14 @@ class LUVerifyCluster(NoHooksLU): 'filelist': file_names, 'nodelist': nodelist, 'hypervisor': None, + 'node-net-test': [(node.name, node.primary_ip, node.secondary_ip) + for node in nodeinfo] } all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param) all_rversion = rpc.call_version(nodelist) + all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName()) + + incomplete_nodeinfo = False for node in nodelist: feedback_fn("* Verifying node %s" % node) @@ -866,6 +928,7 @@ class LUVerifyCluster(NoHooksLU): elif not isinstance(volumeinfo, dict): feedback_fn(" - ERROR: connection to %s failed" % (node,)) bad = True + incomplete_nodeinfo = True continue else: node_volume[node] = volumeinfo @@ -875,10 +938,39 @@ class LUVerifyCluster(NoHooksLU): if type(nodeinstance) != list: feedback_fn(" - ERROR: connection to %s failed" % (node,)) bad = True + incomplete_nodeinfo = True continue node_instance[node] = nodeinstance + # node_info + nodeinfo = all_ninfo[node] + if not isinstance(nodeinfo, dict): + feedback_fn(" - ERROR: connection to %s failed" % (node,)) + bad = True + incomplete_nodeinfo = True + continue + + try: + node_info[node] = { + "mfree": int(nodeinfo['memory_free']), + "dfree": int(nodeinfo['vg_free']), + "pinst": [], + "sinst": [], + # dictionary holding all instances this node is secondary for, + # grouped by their primary node. Each key is a cluster node, and each + # value is a list of instances which have the key as primary and the + # current node as secondary. this is handy to calculate N+1 memory + # availability if you can only failover from a primary to its + # secondary. + "sinst-by-pnode": {}, + } + except (ValueError, TypeError): + feedback_fn(" - ERROR: invalid value returned from node %s" % (node,)) + bad = True + incomplete_nodeinfo = True + continue + node_vol_should = {} for instance in instancelist: @@ -890,6 +982,40 @@ class LUVerifyCluster(NoHooksLU): inst_config.MapLVsByNode(node_vol_should) + instance_cfg[instance] = inst_config + + pnode = inst_config.primary_node + if pnode in node_info: + node_info[pnode]['pinst'].append(instance) + else: + feedback_fn(" - ERROR: instance %s, connection to primary node" + " %s failed" % (instance, pnode)) + bad = True + + # If the instance is non-redundant we cannot survive losing its primary + # node, so we are not N+1 compliant. On the other hand we have no disk + # templates with more than one secondary so that situation is not well + # supported either. + # FIXME: does not support file-backed instances + if len(inst_config.secondary_nodes) == 0: + i_non_redundant.append(instance) + elif len(inst_config.secondary_nodes) > 1: + feedback_fn(" - WARNING: multiple secondaries for instance %s" + % instance) + + if not inst_config.auto_balance: + i_non_a_balanced.append(instance) + + for snode in inst_config.secondary_nodes: + if snode in node_info: + node_info[snode]['sinst'].append(instance) + if pnode not in node_info[snode]['sinst-by-pnode']: + node_info[snode]['sinst-by-pnode'][pnode] = [] + node_info[snode]['sinst-by-pnode'][pnode].append(instance) + else: + feedback_fn(" - ERROR: instance %s, connection to secondary node" + " %s failed" % (instance, snode)) + feedback_fn("* Verifying orphan volumes") result = self._VerifyOrphanVolumes(node_vol_should, node_volume, feedback_fn) @@ -900,8 +1026,64 @@ class LUVerifyCluster(NoHooksLU): feedback_fn) bad = bad or result + if (constants.VERIFY_NPLUSONE_MEM not in self.skip_set and + not incomplete_nodeinfo): + feedback_fn("* Verifying N+1 Memory redundancy") + result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn) + bad = bad or result + + feedback_fn("* Other Notes") + if i_non_redundant: + feedback_fn(" - NOTICE: %d non-redundant instance(s) found." + % len(i_non_redundant)) + + if i_non_a_balanced: + feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found." + % len(i_non_a_balanced)) + return int(bad) + def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result): + """Analize the post-hooks' result, handle it, and send some + nicely-formatted feedback back to the user. + + Args: + phase: the hooks phase that has just been run + hooks_results: the results of the multi-node hooks rpc call + feedback_fn: function to send feedback back to the caller + lu_result: previous Exec result + + """ + # We only really run POST phase hooks, and are only interested in their results + if phase == constants.HOOKS_PHASE_POST: + # Used to change hooks' output to proper indentation + indent_re = re.compile('^', re.M) + feedback_fn("* Hooks Results") + if not hooks_results: + feedback_fn(" - ERROR: general communication failure") + lu_result = 1 + else: + for node_name in hooks_results: + show_node_header = True + res = hooks_results[node_name] + if res is False or not isinstance(res, list): + feedback_fn(" Communication failure") + lu_result = 1 + continue + for script, hkr, output in res: + if hkr == constants.HKR_FAIL: + # The node header is only shown once, if there are + # failing hooks on that node + if show_node_header: + feedback_fn(" Node %s:" % node_name) + show_node_header = False + feedback_fn(" ERROR: Script %s failed, output:" % script) + output = indent_re.sub(' ', output) + feedback_fn("%s" % output) + lu_result = 1 + + return lu_result + class LUVerifyDisks(NoHooksLU): """Verifies the cluster disks status. @@ -1008,8 +1190,7 @@ class LURenameCluster(LogicalUnit): raise errors.OpPrereqError("Neither the name nor the IP address of the" " cluster has changed") if new_ip != old_ip: - result = utils.RunCmd(["fping", "-q", new_ip]) - if not result.failed: + if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT): raise errors.OpPrereqError("The given cluster IP address (%s) is" " reachable on the network. Aborting." % new_ip) @@ -1151,7 +1332,7 @@ class LUDiagnoseOS(NoHooksLU): """Logical unit for OS diagnose/query. """ - _OP_REQP = [] + _OP_REQP = ["output_fields", "names"] def CheckPrereq(self): """Check prerequisites. @@ -1159,7 +1340,44 @@ class LUDiagnoseOS(NoHooksLU): This always succeeds, since this is a pure query LU. """ - return + if self.op.names: + raise errors.OpPrereqError("Selective OS query not supported") + + self.dynamic_fields = frozenset(["name", "valid", "node_status"]) + _CheckOutputFields(static=[], + dynamic=self.dynamic_fields, + selected=self.op.output_fields) + + @staticmethod + def _DiagnoseByOS(node_list, rlist): + """Remaps a per-node return list into an a per-os per-node dictionary + + Args: + node_list: a list with the names of all nodes + rlist: a map with node names as keys and OS objects as values + + Returns: + map: a map with osnames as keys and as value another map, with + nodes as + keys and list of OS objects as values + e.g. {"debian-etch": {"node1": [,...], + "node2": [,]} + } + + """ + all_os = {} + for node_name, nr in rlist.iteritems(): + if not nr: + continue + for os in nr: + if os.name not in all_os: + # build a list of nodes for this os containing empty lists + # for each node in node_list + all_os[os.name] = {} + for nname in node_list: + all_os[os.name][nname] = [] + all_os[os.name][node_name].append(os) + return all_os def Exec(self, feedback_fn): """Compute the list of OSes. @@ -1169,7 +1387,25 @@ class LUDiagnoseOS(NoHooksLU): node_data = rpc.call_os_diagnose(node_list) if node_data == False: raise errors.OpExecError("Can't gather the list of OSes") - return node_data + pol = self._DiagnoseByOS(node_list, node_data) + output = [] + for os_name, os_data in pol.iteritems(): + row = [] + for field in self.op.output_fields: + if field == "name": + val = os_name + elif field == "valid": + val = utils.all([osl and osl[0] for osl in os_data.values()]) + elif field == "node_status": + val = {} + for node_name, nos_list in os_data.iteritems(): + val[node_name] = [(v.status, v.path) for v in nos_list] + else: + raise errors.ParameterError(field) + row.append(val) + output.append(row) + + return output class LURemoveNode(LogicalUnit): @@ -1259,13 +1495,16 @@ class LUQueryNodes(NoHooksLU): This checks that the fields required are valid output fields. """ - self.dynamic_fields = frozenset(["dtotal", "dfree", - "mtotal", "mnode", "mfree", - "bootid"]) + self.dynamic_fields = frozenset([ + "dtotal", "dfree", + "mtotal", "mnode", "mfree", + "bootid", + "ctotal", + ]) _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt", "pinst_list", "sinst_list", - "pip", "sip"], + "pip", "sip", "tags"], dynamic=self.dynamic_fields, selected=self.op.output_fields) @@ -1292,6 +1531,7 @@ class LUQueryNodes(NoHooksLU): "mfree": utils.TryConvert(int, nodeinfo['memory_free']), "dtotal": utils.TryConvert(int, nodeinfo['vg_size']), "dfree": utils.TryConvert(int, nodeinfo['vg_free']), + "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']), "bootid": nodeinfo['bootid'], } else: @@ -1335,6 +1575,8 @@ class LUQueryNodes(NoHooksLU): val = node.primary_ip elif field == "sip": val = node.secondary_ip + elif field == "tags": + val = list(node.GetTags()) elif field in self.dynamic_fields: val = live_data[node.name].get(field, None) else: @@ -1463,13 +1705,24 @@ class LUAddNode(LogicalUnit): if not utils.IsValidIP(secondary_ip): raise errors.OpPrereqError("Invalid secondary IP given") self.op.secondary_ip = secondary_ip + node_list = cfg.GetNodeList() - if node in node_list: - raise errors.OpPrereqError("Node %s is already in the configuration" - % node) + if not self.op.readd and node in node_list: + raise errors.OpPrereqError("Node %s is already in the configuration" % + node) + elif self.op.readd and node not in node_list: + raise errors.OpPrereqError("Node %s is not in the configuration" % node) for existing_node_name in node_list: existing_node = cfg.GetNodeInfo(existing_node_name) + + if self.op.readd and node == existing_node_name: + if (existing_node.primary_ip != primary_ip or + existing_node.secondary_ip != secondary_ip): + raise errors.OpPrereqError("Readded node doesn't have the same IP" + " address configuration as before") + continue + if (existing_node.primary_ip == primary_ip or existing_node.secondary_ip == primary_ip or existing_node.primary_ip == secondary_ip or @@ -1616,7 +1869,9 @@ class LUAddNode(LogicalUnit): # Distribute updated /etc/hosts and known_hosts to all nodes, # including the node just added myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode()) - dist_nodes = self.cfg.GetNodeList() + [node] + dist_nodes = self.cfg.GetNodeList() + if not self.op.readd: + dist_nodes.append(node) if myself.name in dist_nodes: dist_nodes.remove(myself.name) @@ -1635,8 +1890,9 @@ class LUAddNode(LogicalUnit): if not ssh.CopyFileToNode(node, fname): logger.Error("could not copy file %s to node %s" % (fname, node)) - logger.Info("adding node %s to cluster.conf" % node) - self.cfg.AddNode(new_node) + if not self.op.readd: + logger.Info("adding node %s to cluster.conf" % node) + self.cfg.AddNode(new_node) class LUMasterFailover(LogicalUnit): @@ -1736,6 +1992,7 @@ class LUQueryClusterInfo(NoHooksLU): "export_version": constants.EXPORT_VERSION, "master": self.sstore.GetMasterNode(), "architecture": (platform.architecture()[0], platform.machine()), + "hypervisor_type": self.sstore.GetHypervisorType(), } return result @@ -2012,7 +2269,8 @@ def _CheckNodeFreeMemory(cfg, node, reason, requested): """ nodeinfo = rpc.call_node_info([node], cfg.GetVGName()) - if not nodeinfo or not isinstance(nodeinfo, dict): + if not (nodeinfo and isinstance(nodeinfo, dict) and + node in nodeinfo and isinstance(nodeinfo[node], dict)): raise errors.OpPrereqError("Could not contact node %s for resource" " information" % (node,)) @@ -2337,12 +2595,10 @@ class LURenameInstance(LogicalUnit): instance_list = self.cfg.GetInstanceList() if new_name in instance_list: raise errors.OpPrereqError("Instance '%s' is already in the cluster" % - instance_name) + new_name) if not getattr(self.op, "ignore_ip", False): - command = ["fping", "-q", name_info.ip] - result = utils.RunCmd(command) - if not result.failed: + if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT): raise errors.OpPrereqError("IP %s of instance %s already in use" % (name_info.ip, new_name)) @@ -2363,8 +2619,8 @@ class LURenameInstance(LogicalUnit): try: if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name, "sda", "sdb"): - msg = ("Could run OS rename script for instance %s on node %s (but the" - " instance has been renamed in Ganeti)" % + msg = ("Could not run OS rename script for instance %s on node %s" + " (but the instance has been renamed in Ganeti)" % (inst.name, inst.primary_node)) logger.Error(msg) finally: @@ -2377,7 +2633,7 @@ class LURemoveInstance(LogicalUnit): """ HPATH = "instance-remove" HTYPE = constants.HTYPE_INSTANCE - _OP_REQP = ["instance_name"] + _OP_REQP = ["instance_name", "ignore_failures"] def BuildHooksEnv(self): """Build hooks env. @@ -2446,7 +2702,12 @@ class LUQueryInstances(NoHooksLU): _CheckOutputFields(static=["name", "os", "pnode", "snodes", "admin_state", "admin_ram", "disk_template", "ip", "mac", "bridge", - "sda_size", "sdb_size", "vcpus"], + "sda_size", "sdb_size", "vcpus", "tags", + "auto_balance", + "network_port", "kernel_path", "initrd_path", + "hvm_boot_order", "hvm_acpi", "hvm_pae", + "hvm_cdrom_image_path", "hvm_nic_type", + "hvm_disk_type", "vnc_bind_address"], dynamic=self.dynamic_fields, selected=self.op.output_fields) @@ -2539,6 +2800,21 @@ class LUQueryInstances(NoHooksLU): val = disk.size elif field == "vcpus": val = instance.vcpus + elif field == "tags": + val = list(instance.GetTags()) + elif field == "auto_balance": + val = instance.auto_balance + elif field in ("network_port", "kernel_path", "initrd_path", + "hvm_boot_order", "hvm_acpi", "hvm_pae", + "hvm_cdrom_image_path", "hvm_nic_type", + "hvm_disk_type", "vnc_bind_address"): + val = getattr(instance, field, None) + if val is None: + if field in ("hvm_nic_type", "hvm_disk_type", + "kernel_path", "initrd_path"): + val = "default" + else: + val = "-" else: raise errors.ParameterError(field) iout.append(val) @@ -2642,7 +2918,7 @@ class LUFailoverInstance(LogicalUnit): instance.primary_node = target_node # distribute new instance config to the other nodes - self.cfg.AddInstance(instance) + self.cfg.Update(instance) # Only start the instance if it's marked as up if instance.status == "up": @@ -2663,6 +2939,305 @@ class LUFailoverInstance(LogicalUnit): (instance.name, target_node)) +class LUMigrateInstance(LogicalUnit): + """Migrate an instance. + + This is migration without shutting down, compared to the failover, + which is done with shutdown. + + """ + HPATH = "instance-migrate" + HTYPE = constants.HTYPE_INSTANCE + _OP_REQP = ["instance_name", "live", "cleanup"] + + def BuildHooksEnv(self): + """Build hooks env. + + This runs on master, primary and secondary nodes of the instance. + + """ + env = _BuildInstanceHookEnvByObject(self.instance) + nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes) + return env, nl, nl + + def CheckPrereq(self): + """Check prerequisites. + + This checks that the instance is in the cluster. + + """ + instance = self.cfg.GetInstanceInfo( + self.cfg.ExpandInstanceName(self.op.instance_name)) + if instance is None: + raise errors.OpPrereqError("Instance '%s' not known" % + self.op.instance_name) + + if instance.disk_template != constants.DT_DRBD8: + raise errors.OpPrereqError("Instance's disk layout is not" + " drbd8, cannot migrate.") + + secondary_nodes = instance.secondary_nodes + if not secondary_nodes: + raise errors.ProgrammerError("no secondary node but using " + "drbd8 disk template") + + target_node = secondary_nodes[0] + # check memory requirements on the secondary node + _CheckNodeFreeMemory(self.cfg, target_node, "migrating instance %s" % + instance.name, instance.memory) + + # check bridge existance + brlist = [nic.bridge for nic in instance.nics] + if not rpc.call_bridges_exist(target_node, brlist): + raise errors.OpPrereqError("One or more target bridges %s does not" + " exist on destination node '%s'" % + (brlist, target_node)) + + if not self.op.cleanup: + migratable = rpc.call_instance_migratable(instance.primary_node, + instance) + if not migratable: + raise errors.OpPrereqError("Can't contact node '%s'" % + instance.primary_node) + if not migratable[0]: + raise errors.OpPrereqError("Can't migrate: %s - please use failover" % + migratable[1]) + + self.instance = instance + + def _WaitUntilSync(self): + """Poll with custom rpc for disk sync. + + This uses our own step-based rpc call. + + """ + self.feedback_fn("* wait until resync is done") + all_done = False + while not all_done: + all_done = True + result = rpc.call_drbd_reconfig_net(self.all_nodes, self.instance.name, + self.instance.disks, + self.nodes_ip, False, + constants.DRBD_RECONF_RPC_WFSYNC) + min_percent = 100 + for node in self.all_nodes: + if not result[node] or not result[node][0]: + raise errors.OpExecError("Cannot resync disks on node %s" % (node,)) + node_done, node_percent = result[node][1] + all_done = all_done and node_done + if node_percent is not None: + min_percent = min(min_percent, node_percent) + if not all_done: + if min_percent < 100: + self.feedback_fn(" - progress: %.1f%%" % min_percent) + time.sleep(2) + + def _EnsureSecondary(self, node): + """Demote a node to secondary. + + """ + self.feedback_fn("* switching node %s to secondary mode" % node) + result = rpc.call_drbd_reconfig_net([node], self.instance.name, + self.instance.disks, + self.nodes_ip, False, + constants.DRBD_RECONF_RPC_SECONDARY) + if not result[node] or not result[node][0]: + raise errors.OpExecError("Cannot change disk to secondary on node %s," + " error %s" % + (node, result[node][1])) + + def _GoStandalone(self): + """Disconnect from the network. + + """ + self.feedback_fn("* changing into standalone mode") + result = rpc.call_drbd_reconfig_net(self.all_nodes, self.instance.name, + self.instance.disks, + self.nodes_ip, True, + constants.DRBD_RECONF_RPC_DISCONNECT) + for node in self.all_nodes: + if not result[node] or not result[node][0]: + raise errors.OpExecError("Cannot disconnect disks node %s," + " error %s" % (node, result[node][1])) + + def _GoReconnect(self, multimaster): + """Reconnect to the network. + + """ + if multimaster: + msg = "dual-master" + else: + msg = "single-master" + self.feedback_fn("* changing disks into %s mode" % msg) + result = rpc.call_drbd_reconfig_net(self.all_nodes, self.instance.name, + self.instance.disks, + self.nodes_ip, + multimaster, + constants.DRBD_RECONF_RPC_RECONNECT) + for node in self.all_nodes: + if not result[node] or not result[node][0]: + raise errors.OpExecError("Cannot change disks config on node %s," + " error %s" % (node, result[node][1])) + + def _IdentifyDisks(self): + """Start the migration RPC sequence. + + """ + self.feedback_fn("* identifying disks") + result = rpc.call_drbd_reconfig_net(self.all_nodes, + self.instance.name, + self.instance.disks, + self.nodes_ip, True, + constants.DRBD_RECONF_RPC_INIT) + for node in self.all_nodes: + if not result[node] or not result[node][0]: + raise errors.OpExecError("Cannot identify disks node %s," + " error %s" % (node, result[node][1])) + + def _ExecCleanup(self): + """Try to cleanup after a failed migration. + + The cleanup is done by: + - check that the instance is running only on one node + (and update the config if needed) + - change disks on its secondary node to secondary + - wait until disks are fully synchronized + - disconnect from the network + - change disks into single-master mode + - wait again until disks are fully synchronized + + """ + instance = self.instance + target_node = self.target_node + source_node = self.source_node + + # check running on only one node + self.feedback_fn("* checking where the instance actually runs" + " (if this hangs, the hypervisor might be in" + " a bad state)") + ins_l = rpc.call_instance_list(self.all_nodes) + for node in self.all_nodes: + if not type(ins_l[node]) is list: + raise errors.OpExecError("Can't contact node '%s'" % node) + + runningon_source = instance.name in ins_l[source_node] + runningon_target = instance.name in ins_l[target_node] + + if runningon_source and runningon_target: + raise errors.OpExecError("Instance seems to be running on two nodes," + " or the hypervisor is confused. You will have" + " to ensure manually that it runs only on one" + " and restart this operation.") + + if not (runningon_source or runningon_target): + raise errors.OpExecError("Instance does not seem to be running at all." + " In this case, it's safer to repair by" + " running 'gnt-instance stop' to ensure disk" + " shutdown, and then restarting it.") + + if runningon_target: + # the migration has actually succeeded, we need to update the config + self.feedback_fn("* instance running on secondary node (%s)," + " updating config" % target_node) + instance.primary_node = target_node + self.cfg.Update(instance) + demoted_node = source_node + else: + self.feedback_fn("* instance confirmed to be running on its" + " primary node (%s)" % source_node) + demoted_node = target_node + + self._IdentifyDisks() + + self._EnsureSecondary(demoted_node) + self._WaitUntilSync() + self._GoStandalone() + self._GoReconnect(False) + self._WaitUntilSync() + + self.feedback_fn("* done") + + def _ExecMigration(self): + """Migrate an instance. + + The migrate is done by: + - change the disks into dual-master mode + - wait until disks are fully synchronized again + - migrate the instance + - change disks on the new secondary node (the old primary) to secondary + - wait until disks are fully synchronized + - change disks into single-master mode + + """ + instance = self.instance + target_node = self.target_node + source_node = self.source_node + + self.feedback_fn("* checking disk consistency between source and target") + for dev in instance.disks: + if not _CheckDiskConsistency(self.cfg, dev, target_node, False): + raise errors.OpExecError("Disk %s is degraded or not fully" + " synchronized on target node," + " aborting migrate." % dev.iv_name) + + self._IdentifyDisks() + + self._EnsureSecondary(target_node) + self._GoStandalone() + self._GoReconnect(True) + self._WaitUntilSync() + + self.feedback_fn("* migrating instance to %s" % target_node) + time.sleep(10) + result = rpc.call_instance_migrate(source_node, instance, + self.nodes_ip[target_node], + self.op.live) + if not result or not result[0]: + logger.Error("Instance migration failed, trying to revert disk status") + try: + self._EnsureSecondary(target_node) + self._GoStandalone() + self._GoReconnect(False) + self._WaitUntilSync() + except errors.OpExecError, err: + logger.Error("Can't reconnect the drives: error '%s'\n" + "Please look and recover the instance status" % str(err)) + + raise errors.OpExecError("Could not migrate instance %s: %s" % + (instance.name, result[1])) + time.sleep(10) + + instance.primary_node = target_node + # distribute new instance config to the other nodes + self.cfg.Update(instance) + + self._EnsureSecondary(source_node) + self._WaitUntilSync() + self._GoStandalone() + self._GoReconnect(False) + self._WaitUntilSync() + + self.feedback_fn("* done") + + def Exec(self, feedback_fn): + """Perform the migration. + + """ + self.feedback_fn = feedback_fn + + self.source_node = self.instance.primary_node + self.target_node = self.instance.secondary_nodes[0] + self.all_nodes = [self.source_node, self.target_node] + self.nodes_ip = { + self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip, + self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip, + } + if self.op.cleanup: + return self._ExecCleanup() + else: + return self._ExecMigration() + + def _CreateBlockDevOnPrimary(cfg, node, instance, device, info): """Create a tree of block devices on the primary node. @@ -2903,15 +3478,76 @@ def _RemoveDisks(instance, cfg): return result +def _ComputeDiskSize(disk_template, disk_size, swap_size): + """Compute disk size requirements in the volume group + + This is currently hard-coded for the two-drive layout. + + """ + # Required free disk space as a function of disk and swap space + req_size_dict = { + constants.DT_DISKLESS: None, + constants.DT_PLAIN: disk_size + swap_size, + constants.DT_LOCAL_RAID1: (disk_size + swap_size) * 2, + # 256 MB are added for drbd metadata, 128MB for each drbd device + constants.DT_REMOTE_RAID1: disk_size + swap_size + 256, + constants.DT_DRBD8: disk_size + swap_size + 256, + } + + if disk_template not in req_size_dict: + raise errors.ProgrammerError("Disk template '%s' size requirement" + " is unknown" % disk_template) + + return req_size_dict[disk_template] + + class LUCreateInstance(LogicalUnit): """Create an instance. """ HPATH = "instance-add" HTYPE = constants.HTYPE_INSTANCE - _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode", + _OP_REQP = ["instance_name", "mem_size", "disk_size", "disk_template", "swap_size", "mode", "start", "vcpus", - "wait_for_sync", "ip_check", "mac"] + "wait_for_sync", "ip_check", "mac", "auto_balance"] + + def _RunAllocator(self): + """Run the allocator based on input opcode. + + """ + disks = [{"size": self.op.disk_size, "mode": "w"}, + {"size": self.op.swap_size, "mode": "w"}] + nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None), + "bridge": self.op.bridge}] + ial = IAllocator(self.cfg, self.sstore, + mode=constants.IALLOCATOR_MODE_ALLOC, + name=self.op.instance_name, + disk_template=self.op.disk_template, + tags=[], + os=self.op.os_type, + vcpus=self.op.vcpus, + mem_size=self.op.mem_size, + disks=disks, + nics=nics, + ) + + ial.Run(self.op.iallocator) + + if not ial.success: + raise errors.OpPrereqError("Can't compute nodes using" + " iallocator '%s': %s" % (self.op.iallocator, + ial.info)) + if len(ial.nodes) != ial.required_nodes: + raise errors.OpPrereqError("iallocator '%s' returned invalid number" + " of nodes (%s), required %s" % + (len(ial.nodes), ial.required_nodes)) + self.op.pnode = ial.nodes[0] + logger.ToStdout("Selected nodes for the instance: %s" % + (", ".join(ial.nodes),)) + logger.Info("Selected nodes for instance %s via iallocator %s: %s" % + (self.op.instance_name, self.op.iallocator, ial.nodes)) + if ial.required_nodes == 2: + self.op.snode = ial.nodes[1] def BuildHooksEnv(self): """Build hooks env. @@ -2949,7 +3585,10 @@ class LUCreateInstance(LogicalUnit): """Check prerequisites. """ - for attr in ["kernel_path", "initrd_path", "hvm_boot_order"]: + # set optional parameters to none if they don't exist + for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode", + "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path", + "hvm_nic_type", "hvm_disk_type", "vnc_bind_address"]: if not hasattr(self.op, attr): setattr(self.op, attr, None) @@ -2998,6 +3637,72 @@ class LUCreateInstance(LogicalUnit): if getattr(self.op, "os_type", None) is None: raise errors.OpPrereqError("No guest OS specified") + #### instance parameters check + + # disk template and mirror node verification + if self.op.disk_template not in constants.DISK_TEMPLATES: + raise errors.OpPrereqError("Invalid disk template name") + + # instance name verification + hostname1 = utils.HostInfo(self.op.instance_name) + + self.op.instance_name = instance_name = hostname1.name + instance_list = self.cfg.GetInstanceList() + if instance_name in instance_list: + raise errors.OpPrereqError("Instance '%s' is already in the cluster" % + instance_name) + + # ip validity checks + ip = getattr(self.op, "ip", None) + if ip is None or ip.lower() == "none": + inst_ip = None + elif ip.lower() == "auto": + inst_ip = hostname1.ip + else: + if not utils.IsValidIP(ip): + raise errors.OpPrereqError("given IP address '%s' doesn't look" + " like a valid IP" % ip) + inst_ip = ip + self.inst_ip = self.op.ip = inst_ip + + if self.op.start and not self.op.ip_check: + raise errors.OpPrereqError("Cannot ignore IP address conflicts when" + " adding an instance in start mode") + + if self.op.ip_check: + if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT): + raise errors.OpPrereqError("IP %s of instance %s already in use" % + (hostname1.ip, instance_name)) + + # MAC address verification + if self.op.mac != "auto": + if not utils.IsValidMac(self.op.mac.lower()): + raise errors.OpPrereqError("invalid MAC address specified: %s" % + self.op.mac) + + # bridge verification + bridge = getattr(self.op, "bridge", None) + if bridge is None: + self.op.bridge = self.cfg.GetDefBridge() + else: + self.op.bridge = bridge + + # boot order verification + if self.op.hvm_boot_order is not None: + if len(self.op.hvm_boot_order.strip("acdn")) != 0: + raise errors.OpPrereqError("invalid boot order specified," + " must be one or more of [acdn]") + #### allocator run + + if [self.op.iallocator, self.op.pnode].count(None) != 1: + raise errors.OpPrereqError("One and only one of iallocator and primary" + " node must be given") + + if self.op.iallocator is not None: + self._RunAllocator() + + #### node related checks + # check primary node pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode)) if pnode is None: @@ -3006,10 +3711,8 @@ class LUCreateInstance(LogicalUnit): self.op.pnode = pnode.name self.pnode = pnode self.secondaries = [] - # disk template and mirror node verification - if self.op.disk_template not in constants.DISK_TEMPLATES: - raise errors.OpPrereqError("Invalid disk template name") + # mirror node verification if self.op.disk_template in constants.DTS_NET_MIRROR: if getattr(self.op, "snode", None) is None: raise errors.OpPrereqError("The networked disk templates need" @@ -3024,21 +3727,8 @@ class LUCreateInstance(LogicalUnit): " the primary node.") self.secondaries.append(snode_name) - # Required free disk space as a function of disk and swap space - req_size_dict = { - constants.DT_DISKLESS: None, - constants.DT_PLAIN: self.op.disk_size + self.op.swap_size, - constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2, - # 256 MB are added for drbd metadata, 128MB for each drbd device - constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256, - constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256, - } - - if self.op.disk_template not in req_size_dict: - raise errors.ProgrammerError("Disk template '%s' size requirement" - " is unknown" % self.op.disk_template) - - req_size = req_size_dict[self.op.disk_template] + req_size = _ComputeDiskSize(self.op.disk_template, + self.op.disk_size, self.op.swap_size) # Check lv size requirements if req_size is not None: @@ -3048,7 +3738,7 @@ class LUCreateInstance(LogicalUnit): info = nodeinfo.get(node, None) if not info: raise errors.OpPrereqError("Cannot get current information" - " from node '%s'" % nodeinfo) + " from node '%s'" % node) vg_free = info.get('vg_free', None) if not isinstance(vg_free, int): raise errors.OpPrereqError("Can't compute free disk space on" @@ -3067,59 +3757,46 @@ class LUCreateInstance(LogicalUnit): if self.op.kernel_path == constants.VALUE_NONE: raise errors.OpPrereqError("Can't set instance kernel to none") - # instance verification - hostname1 = utils.HostInfo(self.op.instance_name) - - self.op.instance_name = instance_name = hostname1.name - instance_list = self.cfg.GetInstanceList() - if instance_name in instance_list: - raise errors.OpPrereqError("Instance '%s' is already in the cluster" % - instance_name) - - ip = getattr(self.op, "ip", None) - if ip is None or ip.lower() == "none": - inst_ip = None - elif ip.lower() == "auto": - inst_ip = hostname1.ip - else: - if not utils.IsValidIP(ip): - raise errors.OpPrereqError("given IP address '%s' doesn't look" - " like a valid IP" % ip) - inst_ip = ip - self.inst_ip = inst_ip - - if self.op.start and not self.op.ip_check: - raise errors.OpPrereqError("Cannot ignore IP address conflicts when" - " adding an instance in start mode") - - if self.op.ip_check: - if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT): - raise errors.OpPrereqError("IP %s of instance %s already in use" % - (hostname1.ip, instance_name)) - - # MAC address verification - if self.op.mac != "auto": - if not utils.IsValidMac(self.op.mac.lower()): - raise errors.OpPrereqError("invalid MAC address specified: %s" % - self.op.mac) - - # bridge verification - bridge = getattr(self.op, "bridge", None) - if bridge is None: - self.op.bridge = self.cfg.GetDefBridge() - else: - self.op.bridge = bridge + # bridge check on primary node if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]): raise errors.OpPrereqError("target bridge '%s' does not exist on" " destination node '%s'" % (self.op.bridge, pnode.name)) - # boot order verification - if self.op.hvm_boot_order is not None: - if len(self.op.hvm_boot_order.strip("acdn")) != 0: - raise errors.OpPrereqError("invalid boot order specified," - " must be one or more of [acdn]") + # memory check on primary node + if self.op.start: + _CheckNodeFreeMemory(self.cfg, self.pnode.name, + "creating instance %s" % self.op.instance_name, + self.op.mem_size) + + # hvm_cdrom_image_path verification + if self.op.hvm_cdrom_image_path is not None: + if not os.path.isabs(self.op.hvm_cdrom_image_path): + raise errors.OpPrereqError("The path to the HVM CDROM image must" + " be an absolute path or None, not %s" % + self.op.hvm_cdrom_image_path) + if not os.path.isfile(self.op.hvm_cdrom_image_path): + raise errors.OpPrereqError("The HVM CDROM image must either be a" + " regular file or a symlink pointing to" + " an existing regular file, not %s" % + self.op.hvm_cdrom_image_path) + + # vnc_bind_address verification + if self.op.vnc_bind_address is not None: + if not utils.IsValidIP(self.op.vnc_bind_address): + raise errors.OpPrereqError("given VNC bind address '%s' doesn't look" + " like a valid IP address" % + self.op.vnc_bind_address) + + # Xen HVM device type checks + if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31: + if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES: + raise errors.OpPrereqError("Invalid NIC type %s specified for Xen HVM" + " hypervisor" % self.op.hvm_nic_type) + if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES: + raise errors.OpPrereqError("Invalid disk type %s specified for Xen HVM" + " hypervisor" % self.op.hvm_disk_type) if self.op.start: self.instance_status = 'up' @@ -3148,6 +3825,9 @@ class LUCreateInstance(LogicalUnit): else: network_port = None + if self.op.vnc_bind_address is None: + self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS + disks = _GenerateDiskTemplate(self.cfg, self.op.disk_template, instance, pnode_name, @@ -3165,6 +3845,13 @@ class LUCreateInstance(LogicalUnit): kernel_path=self.op.kernel_path, initrd_path=self.op.initrd_path, hvm_boot_order=self.op.hvm_boot_order, + hvm_acpi=self.op.hvm_acpi, + hvm_pae=self.op.hvm_pae, + hvm_cdrom_image_path=self.op.hvm_cdrom_image_path, + vnc_bind_address=self.op.vnc_bind_address, + hvm_nic_type=self.op.hvm_nic_type, + hvm_disk_type=self.op.hvm_disk_type, + auto_balance=bool(self.op.auto_balance), ) feedback_fn("* creating instance disks...") @@ -3483,6 +4170,29 @@ class LUReplaceDisks(LogicalUnit): HTYPE = constants.HTYPE_INSTANCE _OP_REQP = ["instance_name", "mode", "disks"] + def _RunAllocator(self): + """Compute a new secondary node using an IAllocator. + + """ + ial = IAllocator(self.cfg, self.sstore, + mode=constants.IALLOCATOR_MODE_RELOC, + name=self.op.instance_name, + relocate_from=[self.sec_node]) + + ial.Run(self.op.iallocator) + + if not ial.success: + raise errors.OpPrereqError("Can't compute nodes using" + " iallocator '%s': %s" % (self.op.iallocator, + ial.info)) + if len(ial.nodes) != ial.required_nodes: + raise errors.OpPrereqError("iallocator '%s' returned invalid number" + " of nodes (%s), required %s" % + (len(ial.nodes), ial.required_nodes)) + self.op.remote_node = ial.nodes[0] + logger.ToStdout("Selected new secondary for the instance: %s" % + self.op.remote_node) + def BuildHooksEnv(self): """Build hooks env. @@ -3509,6 +4219,9 @@ class LUReplaceDisks(LogicalUnit): This checks that the instance is in the cluster. """ + if not hasattr(self.op, "remote_node"): + self.op.remote_node = None + instance = self.cfg.GetInstanceInfo( self.cfg.ExpandInstanceName(self.op.instance_name)) if instance is None: @@ -3528,7 +4241,14 @@ class LUReplaceDisks(LogicalUnit): self.sec_node = instance.secondary_nodes[0] - remote_node = getattr(self.op, "remote_node", None) + ia_name = getattr(self.op, "iallocator", None) + if ia_name is not None: + if self.op.remote_node is not None: + raise errors.OpPrereqError("Give either the iallocator or the new" + " secondary, not both") + self._RunAllocator() + + remote_node = self.op.remote_node if remote_node is not None: remote_node = self.cfg.ExpandNodeName(remote_node) if remote_node is None: @@ -4020,6 +4740,12 @@ class LUReplaceDisks(LogicalUnit): """ instance = self.instance + + # Activate the instance disks if we're replacing them on a down instance + if instance.status == "down": + op = opcodes.OpActivateInstanceDisks(instance_name=instance.name) + self.proc.ChainOpCode(op) + if instance.disk_template == constants.DT_REMOTE_RAID1: fn = self._ExecRR1 elif instance.disk_template == constants.DT_DRBD8: @@ -4029,14 +4755,118 @@ class LUReplaceDisks(LogicalUnit): fn = self._ExecD8Secondary else: raise errors.ProgrammerError("Unhandled disk replacement case") - return fn(feedback_fn) + + ret = fn(feedback_fn) + + # Deactivate the instance disks if we're replacing them on a down instance + if instance.status == "down": + op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name) + self.proc.ChainOpCode(op) + + return ret + + +class LUGrowDisk(LogicalUnit): + """Grow a disk of an instance. + + """ + HPATH = "disk-grow" + HTYPE = constants.HTYPE_INSTANCE + _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"] + + def BuildHooksEnv(self): + """Build hooks env. + + This runs on the master, the primary and all the secondaries. + + """ + env = { + "DISK": self.op.disk, + "AMOUNT": self.op.amount, + } + env.update(_BuildInstanceHookEnvByObject(self.instance)) + nl = [ + self.sstore.GetMasterNode(), + self.instance.primary_node, + ] + return env, nl, nl + + def CheckPrereq(self): + """Check prerequisites. + + This checks that the instance is in the cluster. + + """ + instance = self.cfg.GetInstanceInfo( + self.cfg.ExpandInstanceName(self.op.instance_name)) + if instance is None: + raise errors.OpPrereqError("Instance '%s' not known" % + self.op.instance_name) + + if self.op.amount <= 0: + raise errors.OpPrereqError("Invalid grow-by amount: %s" % self.op.amount) + + self.instance = instance + self.op.instance_name = instance.name + + if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8): + raise errors.OpPrereqError("Instance's disk layout does not support" + " growing.") + + self.disk = instance.FindDisk(self.op.disk) + if self.disk is None: + raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" % + (self.op.disk, instance.name)) + + nodenames = [instance.primary_node] + list(instance.secondary_nodes) + nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName()) + for node in nodenames: + info = nodeinfo.get(node, None) + if not info: + raise errors.OpPrereqError("Cannot get current information" + " from node '%s'" % node) + vg_free = info.get('vg_free', None) + if not isinstance(vg_free, int): + raise errors.OpPrereqError("Can't compute free disk space on" + " node %s" % node) + if self.op.amount > info['vg_free']: + raise errors.OpPrereqError("Not enough disk space on target node %s:" + " %d MiB available, %d MiB required" % + (node, info['vg_free'], self.op.amount)) + is_primary = (node == instance.primary_node) + if not _CheckDiskConsistency(self.cfg, self.disk, node, is_primary): + raise errors.OpPrereqError("Disk %s is degraded or not fully" + " synchronized on node %s," + " aborting grow." % (self.op.disk, node)) + + def Exec(self, feedback_fn): + """Execute disk grow. + + """ + instance = self.instance + disk = self.disk + for node in (instance.secondary_nodes + (instance.primary_node,)): + self.cfg.SetDiskID(disk, node) + result = rpc.call_blockdev_grow(node, disk, self.op.amount) + if not result or not isinstance(result, tuple) or len(result) != 2: + raise errors.OpExecError("grow request failed to node %s" % node) + elif not result[0]: + raise errors.OpExecError("grow request failed to node %s: %s" % + (node, result[1])) + disk.RecordGrow(self.op.amount) + self.cfg.Update(instance) + if self.op.wait_for_sync: + disk_abort = not _WaitForSync(self.cfg, instance, self.proc) + if disk_abort: + logger.Error("Warning: disk sync-ing has not returned a good status.\n" + " Please check the instance.") class LUQueryInstanceData(NoHooksLU): """Query runtime instance data. """ - _OP_REQP = ["instances"] + _OP_REQP = ["instances", "static"] def CheckPrereq(self): """Check prerequisites. @@ -4064,8 +4894,13 @@ class LUQueryInstanceData(NoHooksLU): """Compute block device status. """ - self.cfg.SetDiskID(dev, instance.primary_node) - dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev) + static = self.op.static + if not static: + self.cfg.SetDiskID(dev, instance.primary_node) + dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev) + else: + dev_pstatus = None + if dev.dev_type in constants.LDS_DRBD: # we change the snode then (otherwise we use the one passed in) if dev.logical_id[0] == instance.primary_node: @@ -4073,7 +4908,7 @@ class LUQueryInstanceData(NoHooksLU): else: snode = dev.logical_id[0] - if snode: + if snode and not static: self.cfg.SetDiskID(dev, snode) dev_sstatus = rpc.call_blockdev_find(snode, dev) else: @@ -4101,12 +4936,15 @@ class LUQueryInstanceData(NoHooksLU): """Gather and return data""" result = {} for instance in self.wanted_instances: - remote_info = rpc.call_instance_info(instance.primary_node, - instance.name) - if remote_info and "state" in remote_info: - remote_state = "up" + if not self.op.static: + remote_info = rpc.call_instance_info(instance.primary_node, + instance.name) + if remote_info and "state" in remote_info: + remote_state = "up" + else: + remote_state = "down" else: - remote_state = "down" + remote_state = None if instance.status == "down": config_state = "down" else: @@ -4125,13 +4963,40 @@ class LUQueryInstanceData(NoHooksLU): "memory": instance.memory, "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics], "disks": disks, - "network_port": instance.network_port, "vcpus": instance.vcpus, - "kernel_path": instance.kernel_path, - "initrd_path": instance.initrd_path, - "hvm_boot_order": instance.hvm_boot_order, + "auto_balance": instance.auto_balance, } + htkind = self.sstore.GetHypervisorType() + if htkind == constants.HT_XEN_PVM30: + idict["kernel_path"] = instance.kernel_path + idict["initrd_path"] = instance.initrd_path + + if htkind == constants.HT_XEN_HVM31: + idict["hvm_boot_order"] = instance.hvm_boot_order + idict["hvm_acpi"] = instance.hvm_acpi + idict["hvm_pae"] = instance.hvm_pae + idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path + idict["hvm_nic_type"] = instance.hvm_nic_type + idict["hvm_disk_type"] = instance.hvm_disk_type + + if htkind in constants.HTS_REQ_PORT: + if instance.network_port is None: + vnc_console_port = None + elif instance.vnc_bind_address == constants.BIND_ADDRESS_GLOBAL: + vnc_console_port = "%s:%s" % (instance.primary_node, + instance.network_port) + elif instance.vnc_bind_address == constants.LOCALHOST_IP_ADDRESS: + vnc_console_port = "%s:%s on node %s" % (instance.vnc_bind_address, + instance.network_port, + instance.primary_node) + else: + vnc_console_port = "%s:%s" % (instance.vnc_bind_address, + instance.network_port) + idict["vnc_console_port"] = vnc_console_port + idict["vnc_bind_address"] = instance.vnc_bind_address + idict["network_port"] = instance.network_port + result[instance.name] = idict return result @@ -4189,8 +5054,21 @@ class LUSetInstanceParms(LogicalUnit): self.kernel_path = getattr(self.op, "kernel_path", None) self.initrd_path = getattr(self.op, "initrd_path", None) self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None) - all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac, - self.kernel_path, self.initrd_path, self.hvm_boot_order] + self.hvm_acpi = getattr(self.op, "hvm_acpi", None) + self.hvm_pae = getattr(self.op, "hvm_pae", None) + self.hvm_nic_type = getattr(self.op, "hvm_nic_type", None) + self.hvm_disk_type = getattr(self.op, "hvm_disk_type", None) + self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None) + self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None) + self.force = getattr(self.op, "force", None) + self.auto_balance = getattr(self.op, "auto_balance", None) + all_parms = [ + self.mem, self.vcpus, self.ip, self.bridge, self.mac, + self.kernel_path, self.initrd_path, self.hvm_boot_order, + self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path, + self.vnc_bind_address, self.hvm_nic_type, self.hvm_disk_type, + self.auto_balance, + ] if all_parms.count(None) == len(all_parms): raise errors.OpPrereqError("No changes submitted") if self.mem is not None: @@ -4250,6 +5128,43 @@ class LUSetInstanceParms(LogicalUnit): " must be one or more of [acdn]" " or 'default'") + # hvm_cdrom_image_path verification + if self.op.hvm_cdrom_image_path is not None: + if not (os.path.isabs(self.op.hvm_cdrom_image_path) or + self.op.hvm_cdrom_image_path.lower() == "none"): + raise errors.OpPrereqError("The path to the HVM CDROM image must" + " be an absolute path or None, not %s" % + self.op.hvm_cdrom_image_path) + if not (os.path.isfile(self.op.hvm_cdrom_image_path) or + self.op.hvm_cdrom_image_path.lower() == "none"): + raise errors.OpPrereqError("The HVM CDROM image must either be a" + " regular file or a symlink pointing to" + " an existing regular file, not %s" % + self.op.hvm_cdrom_image_path) + + # vnc_bind_address verification + if self.op.vnc_bind_address is not None: + if not utils.IsValidIP(self.op.vnc_bind_address): + raise errors.OpPrereqError("given VNC bind address '%s' doesn't look" + " like a valid IP address" % + self.op.vnc_bind_address) + + # Xen HVM device type checks + if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31: + if self.op.hvm_nic_type is not None: + if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES: + raise errors.OpPrereqError("Invalid NIC type %s specified for Xen" + " HVM hypervisor" % self.op.hvm_nic_type) + if self.op.hvm_disk_type is not None: + if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES: + raise errors.OpPrereqError("Invalid disk type %s specified for Xen" + " HVM hypervisor" % self.op.hvm_disk_type) + + # auto balance setting + if self.auto_balance is not None: + # convert the value to a proper bool value, if it's not + self.auto_balance = bool(self.auto_balance) + instance = self.cfg.GetInstanceInfo( self.cfg.ExpandInstanceName(self.op.instance_name)) if instance is None: @@ -4257,6 +5172,39 @@ class LUSetInstanceParms(LogicalUnit): self.op.instance_name) self.op.instance_name = instance.name self.instance = instance + self.warn = [] + if self.mem is not None and not self.force: + pnode = self.instance.primary_node + nodelist = [pnode] + if instance.auto_balance: + nodelist.extend(instance.secondary_nodes) + instance_info = rpc.call_instance_info(pnode, instance.name) + nodeinfo = rpc.call_node_info(nodelist, self.cfg.GetVGName()) + + if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict): + # Assume the primary node is unreachable and go ahead + self.warn.append("Can't get info from primary node %s" % pnode) + else: + if instance_info: + current_mem = instance_info['memory'] + else: + # Assume instance not running + # (there is a slight race condition here, but it's not very probable, + # and we have no other way to check) + current_mem = 0 + miss_mem = self.mem - current_mem - nodeinfo[pnode]['memory_free'] + if miss_mem > 0: + raise errors.OpPrereqError("This change will prevent the instance" + " from starting, due to %d MB of memory" + " missing on its primary node" % miss_mem) + + if instance.auto_balance: + for node in instance.secondary_nodes: + if node not in nodeinfo or not isinstance(nodeinfo[node], dict): + self.warn.append("Can't get info from secondary node %s" % node) + elif self.mem > nodeinfo[node]['memory_free']: + self.warn.append("Not enough memory to failover instance to" + " secondary node %s" % node) return def Exec(self, feedback_fn): @@ -4264,6 +5212,11 @@ class LUSetInstanceParms(LogicalUnit): All parameters take effect only at the next restart of the instance. """ + # Process here the warnings from CheckPrereq, as we don't have a + # feedback_fn there. + for warn in self.warn: + feedback_fn("WARNING: %s" % warn) + result = [] instance = self.instance if self.mem: @@ -4293,6 +5246,30 @@ class LUSetInstanceParms(LogicalUnit): else: instance.hvm_boot_order = self.hvm_boot_order result.append(("hvm_boot_order", self.hvm_boot_order)) + if self.hvm_acpi is not None: + instance.hvm_acpi = self.hvm_acpi + result.append(("hvm_acpi", self.hvm_acpi)) + if self.hvm_pae is not None: + instance.hvm_pae = self.hvm_pae + result.append(("hvm_pae", self.hvm_pae)) + if self.hvm_nic_type is not None: + instance.hvm_nic_type = self.hvm_nic_type + result.append(("hvm_nic_type", self.hvm_nic_type)) + if self.hvm_disk_type is not None: + instance.hvm_disk_type = self.hvm_disk_type + result.append(("hvm_disk_type", self.hvm_disk_type)) + if self.hvm_cdrom_image_path: + if self.hvm_cdrom_image_path == constants.VALUE_NONE: + instance.hvm_cdrom_image_path = None + else: + instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path + result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path)) + if self.vnc_bind_address: + instance.vnc_bind_address = self.vnc_bind_address + result.append(("vnc_bind_address", self.vnc_bind_address)) + if self.auto_balance is not None: + instance.auto_balance = self.auto_balance + result.append(("auto_balance", self.auto_balance)) self.cfg.AddInstance(instance) @@ -4349,7 +5326,7 @@ class LUExportInstance(LogicalUnit): def CheckPrereq(self): """Check prerequisites. - This checks that the instance name is a valid one. + This checks that the instance and node names are valid. """ instance_name = self.cfg.ExpandInstanceName(self.op.instance_name) @@ -4377,8 +5354,8 @@ class LUExportInstance(LogicalUnit): if self.op.shutdown: # shutdown the instance, but not the disks if not rpc.call_instance_shutdown(src_node, instance): - raise errors.OpExecError("Could not shutdown instance %s on node %s" % - (instance.name, source_node)) + raise errors.OpExecError("Could not shutdown instance %s on node %s" % + (instance.name, src_node)) vgname = self.cfg.GetVGName() @@ -4438,6 +5415,45 @@ class LUExportInstance(LogicalUnit): " on node %s" % (instance.name, node)) +class LURemoveExport(NoHooksLU): + """Remove exports related to the named instance. + + """ + _OP_REQP = ["instance_name"] + + def CheckPrereq(self): + """Check prerequisites. + """ + pass + + def Exec(self, feedback_fn): + """Remove any export. + + """ + instance_name = self.cfg.ExpandInstanceName(self.op.instance_name) + # If the instance was not found we'll try with the name that was passed in. + # This will only work if it was an FQDN, though. + fqdn_warn = False + if not instance_name: + fqdn_warn = True + instance_name = self.op.instance_name + + op = opcodes.OpQueryExports(nodes=[]) + exportlist = self.proc.ChainOpCode(op) + found = False + for node in exportlist: + if instance_name in exportlist[node]: + found = True + if not rpc.call_export_remove(node, instance_name): + logger.Error("could not remove export for instance %s" + " on node %s" % (instance_name, node)) + + if fqdn_warn and not found: + feedback_fn("Export not found. If trying to remove an export belonging" + " to a deleted instance please use its Fully Qualified" + " Domain Name.") + + class TagsLU(NoHooksLU): """Generic tags LU. @@ -4565,7 +5581,7 @@ class LUDelTags(TagsLU): """ TagsLU.CheckPrereq(self) for tag in self.op.tags: - objects.TaggableObject.ValidateTag(tag) + objects.TaggableObject.ValidateTag(tag, removal=True) del_tags = frozenset(self.op.tags) cur_tags = self.target.GetTags() if not del_tags <= cur_tags: @@ -4625,115 +5641,277 @@ class LUTestDelay(NoHooksLU): " result: %s" % (node, node_result)) -def _AllocatorGetClusterData(cfg, sstore): - """Compute the generic allocator input data. +class IAllocator(object): + """IAllocator framework. - This is the data that is independent of the actual operation. + An IAllocator instance has three sets of attributes: + - cfg/sstore that are needed to query the cluster + - input data (all members of the _KEYS class attribute are required) + - four buffer attributes (in|out_data|text), that represent the + input (to the external script) in text and data structure format, + and the output from it, again in two formats + - the result variables from the script (success, info, nodes) for + easy usage """ - # cluster data - data = { - "version": 1, - "cluster_name": sstore.GetClusterName(), - "cluster_tags": list(cfg.GetClusterInfo().GetTags()), - # we don't have job IDs - } - - # node data - node_results = {} - node_list = cfg.GetNodeList() - node_data = rpc.call_node_info(node_list, cfg.GetVGName()) - for nname in node_list: - ninfo = cfg.GetNodeInfo(nname) - if nname not in node_data or not isinstance(node_data[nname], dict): - raise errors.OpExecError("Can't get data for node %s" % nname) - remote_info = node_data[nname] - for attr in ['memory_total', 'memory_free', - 'vg_size', 'vg_free']: - if attr not in remote_info: - raise errors.OpExecError("Node '%s' didn't return attribute '%s'" % - (nname, attr)) - try: - int(remote_info[attr]) - except ValueError, err: - raise errors.OpExecError("Node '%s' returned invalid value for '%s':" - " %s" % (nname, attr, str(err))) - pnr = { - "tags": list(ninfo.GetTags()), - "total_memory": utils.TryConvert(int, remote_info['memory_total']), - "free_memory": utils.TryConvert(int, remote_info['memory_free']), - "total_disk": utils.TryConvert(int, remote_info['vg_size']), - "free_disk": utils.TryConvert(int, remote_info['vg_free']), - "primary_ip": ninfo.primary_ip, - "secondary_ip": ninfo.secondary_ip, + _ALLO_KEYS = [ + "mem_size", "disks", "disk_template", + "os", "tags", "nics", "vcpus", + ] + _RELO_KEYS = [ + "relocate_from", + ] + + def __init__(self, cfg, sstore, mode, name, **kwargs): + self.cfg = cfg + self.sstore = sstore + # init buffer variables + self.in_text = self.out_text = self.in_data = self.out_data = None + # init all input fields so that pylint is happy + self.mode = mode + self.name = name + self.mem_size = self.disks = self.disk_template = None + self.os = self.tags = self.nics = self.vcpus = None + self.relocate_from = None + # computed fields + self.required_nodes = None + # init result fields + self.success = self.info = self.nodes = None + if self.mode == constants.IALLOCATOR_MODE_ALLOC: + keyset = self._ALLO_KEYS + elif self.mode == constants.IALLOCATOR_MODE_RELOC: + keyset = self._RELO_KEYS + else: + raise errors.ProgrammerError("Unknown mode '%s' passed to the" + " IAllocator" % self.mode) + for key in kwargs: + if key not in keyset: + raise errors.ProgrammerError("Invalid input parameter '%s' to" + " IAllocator" % key) + setattr(self, key, kwargs[key]) + for key in keyset: + if key not in kwargs: + raise errors.ProgrammerError("Missing input parameter '%s' to" + " IAllocator" % key) + self._BuildInputData() + + def _ComputeClusterData(self): + """Compute the generic allocator input data. + + This is the data that is independent of the actual operation. + + """ + cfg = self.cfg + # cluster data + data = { + "version": 1, + "cluster_name": self.sstore.GetClusterName(), + "cluster_tags": list(cfg.GetClusterInfo().GetTags()), + "hypervisor_type": self.sstore.GetHypervisorType(), + # we don't have job IDs } - node_results[nname] = pnr - data["nodes"] = node_results - - # instance data - instance_data = {} - i_list = cfg.GetInstanceList() - for iname in i_list: - iinfo = cfg.GetInstanceInfo(iname) - nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge} - for n in iinfo.nics] - pir = { - "tags": list(iinfo.GetTags()), - "should_run": iinfo.status == "up", - "vcpus": iinfo.vcpus, - "memory": iinfo.memory, - "os": iinfo.os, - "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes), - "nics": nic_data, - "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks], - "disk_template": iinfo.disk_template, + + i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()] + + # node data + node_results = {} + node_list = cfg.GetNodeList() + node_data = rpc.call_node_info(node_list, cfg.GetVGName()) + for nname in node_list: + ninfo = cfg.GetNodeInfo(nname) + if nname not in node_data or not isinstance(node_data[nname], dict): + raise errors.OpExecError("Can't get data for node %s" % nname) + remote_info = node_data[nname] + for attr in ['memory_total', 'memory_free', 'memory_dom0', + 'vg_size', 'vg_free', 'cpu_total']: + if attr not in remote_info: + raise errors.OpExecError("Node '%s' didn't return attribute '%s'" % + (nname, attr)) + try: + remote_info[attr] = int(remote_info[attr]) + except ValueError, err: + raise errors.OpExecError("Node '%s' returned invalid value for '%s':" + " %s" % (nname, attr, str(err))) + # compute memory used by primary instances + i_p_mem = i_p_up_mem = 0 + for iinfo in i_list: + if iinfo.primary_node == nname: + i_p_mem += iinfo.memory + if iinfo.status == "up": + i_p_up_mem += iinfo.memory + + # compute memory used by instances + pnr = { + "tags": list(ninfo.GetTags()), + "total_memory": remote_info['memory_total'], + "reserved_memory": remote_info['memory_dom0'], + "free_memory": remote_info['memory_free'], + "i_pri_memory": i_p_mem, + "i_pri_up_memory": i_p_up_mem, + "total_disk": remote_info['vg_size'], + "free_disk": remote_info['vg_free'], + "primary_ip": ninfo.primary_ip, + "secondary_ip": ninfo.secondary_ip, + "total_cpus": remote_info['cpu_total'], + } + node_results[nname] = pnr + data["nodes"] = node_results + + # instance data + instance_data = {} + for iinfo in i_list: + nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge} + for n in iinfo.nics] + pir = { + "tags": list(iinfo.GetTags()), + "should_run": iinfo.status == "up", + "vcpus": iinfo.vcpus, + "memory": iinfo.memory, + "os": iinfo.os, + "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes), + "nics": nic_data, + "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks], + "disk_template": iinfo.disk_template, + } + instance_data[iinfo.name] = pir + + data["instances"] = instance_data + + self.in_data = data + + def _AddNewInstance(self): + """Add new instance data to allocator structure. + + This in combination with _AllocatorGetClusterData will create the + correct structure needed as input for the allocator. + + The checks for the completeness of the opcode must have already been + done. + + """ + data = self.in_data + if len(self.disks) != 2: + raise errors.OpExecError("Only two-disk configurations supported") + + disk_space = _ComputeDiskSize(self.disk_template, + self.disks[0]["size"], self.disks[1]["size"]) + + if self.disk_template in constants.DTS_NET_MIRROR: + self.required_nodes = 2 + else: + self.required_nodes = 1 + request = { + "type": "allocate", + "name": self.name, + "disk_template": self.disk_template, + "tags": self.tags, + "os": self.os, + "vcpus": self.vcpus, + "memory": self.mem_size, + "disks": self.disks, + "disk_space_total": disk_space, + "nics": self.nics, + "required_nodes": self.required_nodes, } - instance_data[iname] = pir + data["request"] = request - data["instances"] = instance_data + def _AddRelocateInstance(self): + """Add relocate instance data to allocator structure. - return data + This in combination with _IAllocatorGetClusterData will create the + correct structure needed as input for the allocator. + The checks for the completeness of the opcode must have already been + done. -def _AllocatorAddNewInstance(data, op): - """Add new instance data to allocator structure. + """ + instance = self.cfg.GetInstanceInfo(self.name) + if instance is None: + raise errors.ProgrammerError("Unknown instance '%s' passed to" + " IAllocator" % self.name) - This in combination with _AllocatorGetClusterData will create the - correct structure needed as input for the allocator. + if instance.disk_template not in constants.DTS_NET_MIRROR: + raise errors.OpPrereqError("Can't relocate non-mirrored instances") - The checks for the completeness of the opcode must have already been - done. + if len(instance.secondary_nodes) != 1: + raise errors.OpPrereqError("Instance has not exactly one secondary node") - """ - request = { - "type": "allocate", - "name": op.name, - "disk_template": op.disk_template, - "tags": op.tags, - "os": op.os, - "vcpus": op.vcpus, - "memory": op.mem_size, - "disks": op.disks, - "nics": op.nics, - } - data["request"] = request + self.required_nodes = 1 + + disk_space = _ComputeDiskSize(instance.disk_template, + instance.disks[0].size, + instance.disks[1].size) + + request = { + "type": "relocate", + "name": self.name, + "disk_space_total": disk_space, + "required_nodes": self.required_nodes, + "relocate_from": self.relocate_from, + } + self.in_data["request"] = request + def _BuildInputData(self): + """Build input data structures. -def _AllocatorAddRelocateInstance(data, op): - """Add relocate instance data to allocator structure. + """ + self._ComputeClusterData() - This in combination with _AllocatorGetClusterData will create the - correct structure needed as input for the allocator. + if self.mode == constants.IALLOCATOR_MODE_ALLOC: + self._AddNewInstance() + else: + self._AddRelocateInstance() - The checks for the completeness of the opcode must have already been - done. + self.in_text = serializer.Dump(self.in_data) - """ - request = { - "type": "replace_secondary", - "name": op.name, - } - data["request"] = request + def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner): + """Run an instance allocator and return the results. + + """ + data = self.in_text + + result = call_fn(self.sstore.GetMasterNode(), name, self.in_text) + + if not isinstance(result, tuple) or len(result) != 4: + raise errors.OpExecError("Invalid result from master iallocator runner") + + rcode, stdout, stderr, fail = result + + if rcode == constants.IARUN_NOTFOUND: + raise errors.OpExecError("Can't find allocator '%s'" % name) + elif rcode == constants.IARUN_FAILURE: + raise errors.OpExecError("Instance allocator call failed: %s," + " output: %s" % + (fail, stdout+stderr)) + self.out_text = stdout + if validate: + self._ValidateResult() + + def _ValidateResult(self): + """Process the allocator results. + + This will process and if successful save the result in + self.out_data and the other parameters. + + """ + try: + rdict = serializer.Load(self.out_text) + except Exception, err: + raise errors.OpExecError("Can't parse iallocator results: %s" % str(err)) + + if not isinstance(rdict, dict): + raise errors.OpExecError("Can't parse iallocator results: not a dict") + + for key in "success", "info", "nodes": + if key not in rdict: + raise errors.OpExecError("Can't parse iallocator results:" + " missing key '%s'" % key) + setattr(self, key, rdict[key]) + + if not isinstance(rdict["nodes"], list): + raise errors.OpExecError("Can't parse iallocator results: 'nodes' key" + " is not a list") + self.out_data = rdict class LUTestAllocator(NoHooksLU): @@ -4750,7 +5928,7 @@ class LUTestAllocator(NoHooksLU): This checks the opcode parameters depending on the director and mode test. """ - if self.op.mode == constants.ALF_MODE_ALLOC: + if self.op.mode == constants.IALLOCATOR_MODE_ALLOC: for attr in ["name", "mem_size", "disks", "disk_template", "os", "tags", "nics", "vcpus"]: if not hasattr(self.op, attr): @@ -4771,6 +5949,8 @@ class LUTestAllocator(NoHooksLU): " 'nics' parameter") if not isinstance(self.op.disks, list): raise errors.OpPrereqError("Invalid parameter 'disks'") + if len(self.op.disks) != 2: + raise errors.OpPrereqError("Only two-disk configurations supported") for row in self.op.disks: if (not isinstance(row, dict) or "size" not in row or @@ -4779,7 +5959,7 @@ class LUTestAllocator(NoHooksLU): row["mode"] not in ['r', 'w']): raise errors.OpPrereqError("Invalid contents of the" " 'disks' parameter") - elif self.op.mode == constants.ALF_MODE_RELOC: + elif self.op.mode == constants.IALLOCATOR_MODE_RELOC: if not hasattr(self.op, "name"): raise errors.OpPrereqError("Missing attribute 'name' on opcode input") fname = self.cfg.ExpandInstanceName(self.op.name) @@ -4787,15 +5967,15 @@ class LUTestAllocator(NoHooksLU): raise errors.OpPrereqError("Instance '%s' not found for relocation" % self.op.name) self.op.name = fname + self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes else: raise errors.OpPrereqError("Invalid test allocator mode '%s'" % self.op.mode) - if self.op.direction == constants.ALF_DIR_OUT: - if not hasattr(self.op, "allocator"): + if self.op.direction == constants.IALLOCATOR_DIR_OUT: + if not hasattr(self.op, "allocator") or self.op.allocator is None: raise errors.OpPrereqError("Missing allocator name") - raise errors.OpPrereqError("Allocator out mode not supported yet") - elif self.op.direction != constants.ALF_DIR_IN: + elif self.op.direction != constants.IALLOCATOR_DIR_IN: raise errors.OpPrereqError("Wrong allocator test '%s'" % self.op.direction) @@ -4803,14 +5983,28 @@ class LUTestAllocator(NoHooksLU): """Run the allocator test. """ - data = _AllocatorGetClusterData(self.cfg, self.sstore) - if self.op.mode == constants.ALF_MODE_ALLOC: - _AllocatorAddNewInstance(data, self.op) + if self.op.mode == constants.IALLOCATOR_MODE_ALLOC: + ial = IAllocator(self.cfg, self.sstore, + mode=self.op.mode, + name=self.op.name, + mem_size=self.op.mem_size, + disks=self.op.disks, + disk_template=self.op.disk_template, + os=self.op.os, + tags=self.op.tags, + nics=self.op.nics, + vcpus=self.op.vcpus, + ) else: - _AllocatorAddRelocateInstance(data, self.op) - - if _JSON_INDENT is None: - text = simplejson.dumps(data) + ial = IAllocator(self.cfg, self.sstore, + mode=self.op.mode, + name=self.op.name, + relocate_from=list(self.relocate_from), + ) + + if self.op.direction == constants.IALLOCATOR_DIR_IN: + result = ial.in_text else: - text = simplejson.dumps(data, indent=_JSON_INDENT) - return text + ial.Run(self.op.allocator, validate=False) + result = ial.out_text + return result