X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/fe48262190eea55a2e8d5ac3147f0aa6e59ea55b..9ac99fdae57b360468731ad92471b37843fe0347:/lib/cmdlib.py diff --git a/lib/cmdlib.py b/lib/cmdlib.py index 9cbe8ab..cdac3d0 100644 --- a/lib/cmdlib.py +++ b/lib/cmdlib.py @@ -1,7 +1,7 @@ # # -# Copyright (C) 2006, 2007 Google Inc. +# Copyright (C) 2006, 2007, 2008 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -42,6 +42,7 @@ from ganeti import constants from ganeti import objects from ganeti import opcodes from ganeti import ssconf +from ganeti import serializer class LogicalUnit(object): @@ -413,7 +414,7 @@ class LUInitCluster(LogicalUnit): """ HPATH = "cluster-init" HTYPE = constants.HTYPE_CLUSTER - _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix", + _OP_REQP = ["cluster_name", "hypervisor_type", "mac_prefix", "def_bridge", "master_netdev", "file_storage_dir"] REQ_CLUSTER = False @@ -472,11 +473,14 @@ class LUInitCluster(LogicalUnit): secondary_ip) self.secondary_ip = secondary_ip - # checks presence of the volume group given - vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name) - - if vgstatus: - raise errors.OpPrereqError("Error: %s" % vgstatus) + if not hasattr(self.op, "vg_name"): + self.op.vg_name = None + # if vg_name not None, checks if volume group is valid + if self.op.vg_name: + vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name) + if vgstatus: + raise errors.OpPrereqError("Error: %s\nspecify --no-lvm-storage if" + " you are not using lvm" % vgstatus) self.op.file_storage_dir = os.path.normpath(self.op.file_storage_dir) @@ -600,7 +604,7 @@ class LUVerifyCluster(NoHooksLU): """Verifies the cluster status. """ - _OP_REQP = [] + _OP_REQP = ["skip_checks"] def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result, remote_version, feedback_fn): @@ -621,7 +625,7 @@ class LUVerifyCluster(NoHooksLU): # compares ganeti version local_version = constants.PROTOCOL_VERSION if not remote_version: - feedback_fn(" - ERROR: connection to %s failed" % (node)) + feedback_fn(" - ERROR: connection to %s failed" % (node)) return True if local_version != remote_version: @@ -672,7 +676,8 @@ class LUVerifyCluster(NoHooksLU): feedback_fn(" - ERROR: hypervisor verify failure: '%s'" % hyp_result) return bad - def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn): + def _VerifyInstance(self, instance, instanceconfig, node_vol_is, + node_instance, feedback_fn): """Verify an instance. This function checks to see if the required block devices are @@ -681,13 +686,6 @@ class LUVerifyCluster(NoHooksLU): """ bad = False - instancelist = self.cfg.GetInstanceList() - if not instance in instancelist: - feedback_fn(" - ERROR: instance %s not in instance list %s" % - (instance, instancelist)) - bad = True - - instanceconfig = self.cfg.GetInstanceInfo(instance) node_current = instanceconfig.primary_node node_vol_should = {} @@ -701,7 +699,8 @@ class LUVerifyCluster(NoHooksLU): bad = True if not instanceconfig.status == 'down': - if not instance in node_instance[node_current]: + if (node_current not in node_instance or + not instance in node_instance[node_current]): feedback_fn(" - ERROR: instance %s not running on node %s" % (instance, node_current)) bad = True @@ -747,13 +746,44 @@ class LUVerifyCluster(NoHooksLU): bad = True return bad + def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn): + """Verify N+1 Memory Resilience. + + Check that if one single node dies we can still start all the instances it + was primary for. + + """ + bad = False + + for node, nodeinfo in node_info.iteritems(): + # This code checks that every node which is now listed as secondary has + # enough memory to host all instances it is supposed to should a single + # other node in the cluster fail. + # FIXME: not ready for failover to an arbitrary node + # FIXME: does not support file-backed instances + # WARNING: we currently take into account down instances as well as up + # ones, considering that even if they're down someone might want to start + # them even in the event of a node failure. + for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems(): + needed_mem = 0 + for instance in instances: + needed_mem += instance_cfg[instance].memory + if nodeinfo['mfree'] < needed_mem: + feedback_fn(" - ERROR: not enough memory on node %s to accomodate" + " failovers should node %s fail" % (node, prinode)) + bad = True + return bad + def CheckPrereq(self): """Check prerequisites. - This has no prerequisites. + Transform the list of checks we're going to skip into a set and check that + all its members are valid. """ - pass + self.skip_set = frozenset(self.op.skip_checks) + if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set): + raise errors.OpPrereqError("Invalid checks to be skipped specified") def Exec(self, feedback_fn): """Verify integrity of cluster, performing various test on nodes. @@ -767,8 +797,11 @@ class LUVerifyCluster(NoHooksLU): vg_name = self.cfg.GetVGName() nodelist = utils.NiceSort(self.cfg.GetNodeList()) instancelist = utils.NiceSort(self.cfg.GetInstanceList()) + i_non_redundant = [] # Non redundant instances node_volume = {} node_instance = {} + node_info = {} + instance_cfg = {} # FIXME: verify OS list # do local checksums @@ -788,6 +821,7 @@ class LUVerifyCluster(NoHooksLU): } all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param) all_rversion = rpc.call_version(nodelist) + all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName()) for node in nodelist: feedback_fn("* Verifying node %s" % node) @@ -820,18 +854,74 @@ class LUVerifyCluster(NoHooksLU): node_instance[node] = nodeinstance + # node_info + nodeinfo = all_ninfo[node] + if not isinstance(nodeinfo, dict): + feedback_fn(" - ERROR: connection to %s failed" % (node,)) + bad = True + continue + + try: + node_info[node] = { + "mfree": int(nodeinfo['memory_free']), + "dfree": int(nodeinfo['vg_free']), + "pinst": [], + "sinst": [], + # dictionary holding all instances this node is secondary for, + # grouped by their primary node. Each key is a cluster node, and each + # value is a list of instances which have the key as primary and the + # current node as secondary. this is handy to calculate N+1 memory + # availability if you can only failover from a primary to its + # secondary. + "sinst-by-pnode": {}, + } + except ValueError: + feedback_fn(" - ERROR: invalid value returned from node %s" % (node,)) + bad = True + continue + node_vol_should = {} for instance in instancelist: feedback_fn("* Verifying instance %s" % instance) - result = self._VerifyInstance(instance, node_volume, node_instance, - feedback_fn) - bad = bad or result - inst_config = self.cfg.GetInstanceInfo(instance) + result = self._VerifyInstance(instance, inst_config, node_volume, + node_instance, feedback_fn) + bad = bad or result inst_config.MapLVsByNode(node_vol_should) + instance_cfg[instance] = inst_config + + pnode = inst_config.primary_node + if pnode in node_info: + node_info[pnode]['pinst'].append(instance) + else: + feedback_fn(" - ERROR: instance %s, connection to primary node" + " %s failed" % (instance, pnode)) + bad = True + + # If the instance is non-redundant we cannot survive losing its primary + # node, so we are not N+1 compliant. On the other hand we have no disk + # templates with more than one secondary so that situation is not well + # supported either. + # FIXME: does not support file-backed instances + if len(inst_config.secondary_nodes) == 0: + i_non_redundant.append(instance) + elif len(inst_config.secondary_nodes) > 1: + feedback_fn(" - WARNING: multiple secondaries for instance %s" + % instance) + + for snode in inst_config.secondary_nodes: + if snode in node_info: + node_info[snode]['sinst'].append(instance) + if pnode not in node_info[snode]['sinst-by-pnode']: + node_info[snode]['sinst-by-pnode'][pnode] = [] + node_info[snode]['sinst-by-pnode'][pnode].append(instance) + else: + feedback_fn(" - ERROR: instance %s, connection to secondary node" + " %s failed" % (instance, snode)) + feedback_fn("* Verifying orphan volumes") result = self._VerifyOrphanVolumes(node_vol_should, node_volume, feedback_fn) @@ -842,6 +932,16 @@ class LUVerifyCluster(NoHooksLU): feedback_fn) bad = bad or result + if constants.VERIFY_NPLUSONE_MEM not in self.skip_set: + feedback_fn("* Verifying N+1 Memory redundancy") + result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn) + bad = bad or result + + feedback_fn("* Other Notes") + if i_non_redundant: + feedback_fn(" - NOTICE: %d non-redundant instance(s) found." + % len(i_non_redundant)) + return int(bad) @@ -996,6 +1096,79 @@ class LURenameCluster(LogicalUnit): " please restart manually.") +def _RecursiveCheckIfLVMBased(disk): + """Check if the given disk or its children are lvm-based. + + Args: + disk: ganeti.objects.Disk object + + Returns: + boolean indicating whether a LD_LV dev_type was found or not + + """ + if disk.children: + for chdisk in disk.children: + if _RecursiveCheckIfLVMBased(chdisk): + return True + return disk.dev_type == constants.LD_LV + + +class LUSetClusterParams(LogicalUnit): + """Change the parameters of the cluster. + + """ + HPATH = "cluster-modify" + HTYPE = constants.HTYPE_CLUSTER + _OP_REQP = [] + + def BuildHooksEnv(self): + """Build hooks env. + + """ + env = { + "OP_TARGET": self.sstore.GetClusterName(), + "NEW_VG_NAME": self.op.vg_name, + } + mn = self.sstore.GetMasterNode() + return env, [mn], [mn] + + def CheckPrereq(self): + """Check prerequisites. + + This checks whether the given params don't conflict and + if the given volume group is valid. + + """ + if not self.op.vg_name: + instances = [self.cfg.GetInstanceInfo(name) + for name in self.cfg.GetInstanceList()] + for inst in instances: + for disk in inst.disks: + if _RecursiveCheckIfLVMBased(disk): + raise errors.OpPrereqError("Cannot disable lvm storage while" + " lvm-based instances exist") + + # if vg_name not None, checks given volume group on all nodes + if self.op.vg_name: + node_list = self.cfg.GetNodeList() + vglist = rpc.call_vg_list(node_list) + for node in node_list: + vgstatus = _HasValidVG(vglist[node], self.op.vg_name) + if vgstatus: + raise errors.OpPrereqError("Error on node '%s': %s" % + (node, vgstatus)) + + def Exec(self, feedback_fn): + """Change the parameters of the cluster. + + """ + if self.op.vg_name != self.cfg.GetVGName(): + self.cfg.SetVGName(self.op.vg_name) + else: + feedback_fn("Cluster LVM configuration already in desired" + " state, not changing") + + def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False): """Sleep and poll for an instance's disk to sync. @@ -1078,7 +1251,7 @@ def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False): if on_primary or dev.AssembleOnSecondary(): rstats = rpc.call_blockdev_find(node, dev) if not rstats: - logger.ToStderr("Can't get any data from node %s" % node) + logger.ToStderr("Node %s: Disk degraded, not found or node down" % node) result = False else: result = result and (not rstats[idx]) @@ -1093,7 +1266,7 @@ class LUDiagnoseOS(NoHooksLU): """Logical unit for OS diagnose/query. """ - _OP_REQP = [] + _OP_REQP = ["output_fields", "names"] def CheckPrereq(self): """Check prerequisites. @@ -1101,7 +1274,44 @@ class LUDiagnoseOS(NoHooksLU): This always succeeds, since this is a pure query LU. """ - return + if self.op.names: + raise errors.OpPrereqError("Selective OS query not supported") + + self.dynamic_fields = frozenset(["name", "valid", "node_status"]) + _CheckOutputFields(static=[], + dynamic=self.dynamic_fields, + selected=self.op.output_fields) + + @staticmethod + def _DiagnoseByOS(node_list, rlist): + """Remaps a per-node return list into an a per-os per-node dictionary + + Args: + node_list: a list with the names of all nodes + rlist: a map with node names as keys and OS objects as values + + Returns: + map: a map with osnames as keys and as value another map, with + nodes as + keys and list of OS objects as values + e.g. {"debian-etch": {"node1": [,...], + "node2": [,]} + } + + """ + all_os = {} + for node_name, nr in rlist.iteritems(): + if not nr: + continue + for os in nr: + if os.name not in all_os: + # build a list of nodes for this os containing empty lists + # for each node in node_list + all_os[os.name] = {} + for nname in node_list: + all_os[os.name][nname] = [] + all_os[os.name][node_name].append(os) + return all_os def Exec(self, feedback_fn): """Compute the list of OSes. @@ -1111,7 +1321,25 @@ class LUDiagnoseOS(NoHooksLU): node_data = rpc.call_os_diagnose(node_list) if node_data == False: raise errors.OpExecError("Can't gather the list of OSes") - return node_data + pol = self._DiagnoseByOS(node_list, node_data) + output = [] + for os_name, os_data in pol.iteritems(): + row = [] + for field in self.op.output_fields: + if field == "name": + val = os_name + elif field == "valid": + val = utils.all([osl and osl[0] for osl in os_data.values()]) + elif field == "node_status": + val = {} + for node_name, nos_list in os_data.iteritems(): + val[node_name] = [(v.status, v.path) for v in nos_list] + else: + raise errors.ParameterError(field) + row.append(val) + output.append(row) + + return output class LURemoveNode(LogicalUnit): @@ -1405,13 +1633,24 @@ class LUAddNode(LogicalUnit): if not utils.IsValidIP(secondary_ip): raise errors.OpPrereqError("Invalid secondary IP given") self.op.secondary_ip = secondary_ip + node_list = cfg.GetNodeList() - if node in node_list: - raise errors.OpPrereqError("Node %s is already in the configuration" - % node) + if not self.op.readd and node in node_list: + raise errors.OpPrereqError("Node %s is already in the configuration" % + node) + elif self.op.readd and node not in node_list: + raise errors.OpPrereqError("Node %s is not in the configuration" % node) for existing_node_name in node_list: existing_node = cfg.GetNodeInfo(existing_node_name) + + if self.op.readd and node == existing_node_name: + if (existing_node.primary_ip != primary_ip or + existing_node.secondary_ip != secondary_ip): + raise errors.OpPrereqError("Readded node doesn't have the same IP" + " address configuration as before") + continue + if (existing_node.primary_ip == primary_ip or existing_node.secondary_ip == primary_ip or existing_node.primary_ip == secondary_ip or @@ -1574,8 +1813,9 @@ class LUAddNode(LogicalUnit): if not self.ssh.CopyFileToNode(node, fname): logger.Error("could not copy file %s to node %s" % (fname, node)) - logger.Info("adding node %s to cluster.conf" % node) - self.cfg.AddNode(new_node) + if not self.op.readd: + logger.Info("adding node %s to cluster.conf" % node) + self.cfg.AddNode(new_node) class LUMasterFailover(LogicalUnit): @@ -1756,6 +1996,12 @@ class LURunClusterCommand(NoHooksLU): """Run a command on some nodes. """ + # put the master at the end of the nodes list + master_node = self.sstore.GetMasterNode() + if master_node in self.nodes: + self.nodes.remove(master_node) + self.nodes.append(master_node) + data = [] for node in self.nodes: result = self.ssh.Run(node, "root", self.op.command) @@ -2270,7 +2516,7 @@ class LURenameInstance(LogicalUnit): instance_list = self.cfg.GetInstanceList() if new_name in instance_list: raise errors.OpPrereqError("Instance '%s' is already in the cluster" % - instance_name) + new_name) if not getattr(self.op, "ignore_ip", False): command = ["fping", "-q", name_info.ip] @@ -2287,11 +2533,33 @@ class LURenameInstance(LogicalUnit): inst = self.instance old_name = inst.name + if inst.disk_template == constants.DT_FILE: + old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1]) + self.cfg.RenameInstance(inst.name, self.op.new_name) # re-read the instance from the configuration after rename inst = self.cfg.GetInstanceInfo(self.op.new_name) + if inst.disk_template == constants.DT_FILE: + new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1]) + result = rpc.call_file_storage_dir_rename(inst.primary_node, + old_file_storage_dir, + new_file_storage_dir) + + if not result: + raise errors.OpExecError("Could not connect to node '%s' to rename" + " directory '%s' to '%s' (but the instance" + " has been renamed in Ganeti)" % ( + inst.primary_node, old_file_storage_dir, + new_file_storage_dir)) + + if not result[0]: + raise errors.OpExecError("Could not rename directory '%s' to '%s'" + " (but the instance has been renamed in" + " Ganeti)" % (old_file_storage_dir, + new_file_storage_dir)) + _StartInstanceDisks(self.cfg, inst, None) try: if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name, @@ -2552,7 +2820,7 @@ class LUFailoverInstance(LogicalUnit): for dev in instance.disks: # for remote_raid1, these are md over drbd if not _CheckDiskConsistency(self.cfg, dev, target_node, False): - if not self.op.ignore_consistency: + if instance.status == "up" and not self.op.ignore_consistency: raise errors.OpExecError("Disk %s is degraded on target node," " aborting failover." % dev.iv_name) @@ -2577,21 +2845,23 @@ class LUFailoverInstance(LogicalUnit): # distribute new instance config to the other nodes self.cfg.AddInstance(instance) - feedback_fn("* activating the instance's disks on target node") - logger.Info("Starting instance %s on node %s" % - (instance.name, target_node)) + # Only start the instance if it's marked as up + if instance.status == "up": + feedback_fn("* activating the instance's disks on target node") + logger.Info("Starting instance %s on node %s" % + (instance.name, target_node)) - disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg, - ignore_secondaries=True) - if not disks_ok: - _ShutdownInstanceDisks(instance, self.cfg) - raise errors.OpExecError("Can't activate the instance's disks") + disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg, + ignore_secondaries=True) + if not disks_ok: + _ShutdownInstanceDisks(instance, self.cfg) + raise errors.OpExecError("Can't activate the instance's disks") - feedback_fn("* starting the instance on the target node") - if not rpc.call_instance_start(target_node, instance, None): - _ShutdownInstanceDisks(instance, self.cfg) - raise errors.OpExecError("Could not start instance %s on node %s." % - (instance.name, target_node)) + feedback_fn("* starting the instance on the target node") + if not rpc.call_instance_start(target_node, instance, None): + _ShutdownInstanceDisks(instance, self.cfg) + raise errors.OpExecError("Could not start instance %s on node %s." % + (instance.name, target_node)) def _CreateBlockDevOnPrimary(cfg, node, instance, device, info): @@ -2692,7 +2962,8 @@ def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name): def _GenerateDiskTemplate(cfg, template_name, instance_name, primary_node, - secondary_nodes, disk_sz, swap_sz): + secondary_nodes, disk_sz, swap_sz, + file_storage_dir, file_driver): """Generate the entire disk layout for a given template type. """ @@ -2724,6 +2995,17 @@ def _GenerateDiskTemplate(cfg, template_name, drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node, swap_sz, names[2:4], "sdb") disks = [drbd_sda_dev, drbd_sdb_dev] + elif template_name == constants.DT_FILE: + if len(secondary_nodes) != 0: + raise errors.ProgrammerError("Wrong template configuration") + + file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz, + iv_name="sda", logical_id=(file_driver, + "%s/sda" % file_storage_dir)) + file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz, + iv_name="sdb", logical_id=(file_driver, + "%s/sdb" % file_storage_dir)) + disks = [file_sda_dev, file_sdb_dev] else: raise errors.ProgrammerError("Invalid disk template '%s'" % template_name) return disks @@ -2750,9 +3032,22 @@ def _CreateDisks(cfg, instance): """ info = _GetInstanceInfoText(instance) + if instance.disk_template == constants.DT_FILE: + file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1]) + result = rpc.call_file_storage_dir_create(instance.primary_node, + file_storage_dir) + + if not result: + logger.Error("Could not connect to node '%s'" % instance.primary_node) + return False + + if not result[0]: + logger.Error("failed to create directory '%s'" % file_storage_dir) + return False + for device in instance.disks: logger.Info("creating volume %s for instance %s" % - (device.iv_name, instance.name)) + (device.iv_name, instance.name)) #HARDCODE for secondary_node in instance.secondary_nodes: if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance, @@ -2766,6 +3061,7 @@ def _CreateDisks(cfg, instance): logger.Error("failed to create volume %s on primary!" % device.iv_name) return False + return True @@ -2795,19 +3091,86 @@ def _RemoveDisks(instance, cfg): " continuing anyway" % (device.iv_name, node)) result = False + + if instance.disk_template == constants.DT_FILE: + file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1]) + if not rpc.call_file_storage_dir_remove(instance.primary_node, + file_storage_dir): + logger.Error("could not remove directory '%s'" % file_storage_dir) + result = False + return result +def _ComputeDiskSize(disk_template, disk_size, swap_size): + """Compute disk size requirements in the volume group + + This is currently hard-coded for the two-drive layout. + + """ + # Required free disk space as a function of disk and swap space + req_size_dict = { + constants.DT_DISKLESS: None, + constants.DT_PLAIN: disk_size + swap_size, + # 256 MB are added for drbd metadata, 128MB for each drbd device + constants.DT_DRBD8: disk_size + swap_size + 256, + constants.DT_FILE: None, + } + + if disk_template not in req_size_dict: + raise errors.ProgrammerError("Disk template '%s' size requirement" + " is unknown" % disk_template) + + return req_size_dict[disk_template] + + class LUCreateInstance(LogicalUnit): """Create an instance. """ HPATH = "instance-add" HTYPE = constants.HTYPE_INSTANCE - _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode", + _OP_REQP = ["instance_name", "mem_size", "disk_size", "disk_template", "swap_size", "mode", "start", "vcpus", "wait_for_sync", "ip_check", "mac"] + def _RunAllocator(self): + """Run the allocator based on input opcode. + + """ + disks = [{"size": self.op.disk_size, "mode": "w"}, + {"size": self.op.swap_size, "mode": "w"}] + nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None), + "bridge": self.op.bridge}] + ial = IAllocator(self.cfg, self.sstore, + name=self.op.instance_name, + disk_template=self.op.disk_template, + tags=[], + os=self.op.os_type, + vcpus=self.op.vcpus, + mem_size=self.op.mem_size, + disks=disks, + nics=nics, + mode=constants.IALLOCATOR_MODE_ALLOC) + + ial.Run(self.op.iallocator) + + if not ial.success: + raise errors.OpPrereqError("Can't compute nodes using" + " iallocator '%s': %s" % (self.op.iallocator, + ial.info)) + if len(ial.nodes) != ial.required_nodes: + raise errors.OpPrereqError("iallocator '%s' returned invalid number" + " of nodes (%s), required %s" % + (len(ial.nodes), ial.required_nodes)) + self.op.pnode = ial.nodes[0] + logger.ToStdout("Selected nodes for the instance: %s" % + (", ".join(ial.nodes),)) + logger.Info("Selected nodes for instance %s via iallocator %s: %s" % + (self.op.instance_name, self.op.iallocator, ial.nodes)) + if ial.required_nodes == 2: + self.op.snode = ial.nodes[1] + def BuildHooksEnv(self): """Build hooks env. @@ -2844,7 +3207,9 @@ class LUCreateInstance(LogicalUnit): """Check prerequisites. """ - for attr in ["kernel_path", "initrd_path", "hvm_boot_order"]: + # set optional parameters to none if they don't exist + for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode", + "iallocator"]: if not hasattr(self.op, attr): setattr(self.op, attr, None) @@ -2853,6 +3218,11 @@ class LUCreateInstance(LogicalUnit): raise errors.OpPrereqError("Invalid instance creation mode '%s'" % self.op.mode) + if (not self.cfg.GetVGName() and + self.op.disk_template not in constants.DTS_NOT_LVM): + raise errors.OpPrereqError("Cluster does not support lvm-based" + " instances") + if self.op.mode == constants.INSTANCE_IMPORT: src_node = getattr(self.op, "src_node", None) src_path = getattr(self.op, "src_path", None) @@ -2893,6 +3263,81 @@ class LUCreateInstance(LogicalUnit): if getattr(self.op, "os_type", None) is None: raise errors.OpPrereqError("No guest OS specified") + #### instance parameters check + + # disk template and mirror node verification + if self.op.disk_template not in constants.DISK_TEMPLATES: + raise errors.OpPrereqError("Invalid disk template name") + + # instance name verification + hostname1 = utils.HostInfo(self.op.instance_name) + + self.op.instance_name = instance_name = hostname1.name + instance_list = self.cfg.GetInstanceList() + if instance_name in instance_list: + raise errors.OpPrereqError("Instance '%s' is already in the cluster" % + instance_name) + + # ip validity checks + ip = getattr(self.op, "ip", None) + if ip is None or ip.lower() == "none": + inst_ip = None + elif ip.lower() == "auto": + inst_ip = hostname1.ip + else: + if not utils.IsValidIP(ip): + raise errors.OpPrereqError("given IP address '%s' doesn't look" + " like a valid IP" % ip) + inst_ip = ip + self.inst_ip = self.op.ip = inst_ip + + if self.op.start and not self.op.ip_check: + raise errors.OpPrereqError("Cannot ignore IP address conflicts when" + " adding an instance in start mode") + + if self.op.ip_check: + if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT): + raise errors.OpPrereqError("IP %s of instance %s already in use" % + (hostname1.ip, instance_name)) + + # MAC address verification + if self.op.mac != "auto": + if not utils.IsValidMac(self.op.mac.lower()): + raise errors.OpPrereqError("invalid MAC address specified: %s" % + self.op.mac) + + # bridge verification + bridge = getattr(self.op, "bridge", None) + if bridge is None: + self.op.bridge = self.cfg.GetDefBridge() + else: + self.op.bridge = bridge + + # boot order verification + if self.op.hvm_boot_order is not None: + if len(self.op.hvm_boot_order.strip("acdn")) != 0: + raise errors.OpPrereqError("invalid boot order specified," + " must be one or more of [acdn]") + # file storage checks + if (self.op.file_driver and + not self.op.file_driver in constants.FILE_DRIVER): + raise errors.OpPrereqError("Invalid file driver name '%s'" % + self.op.file_driver) + + if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir): + raise errors.OpPrereqError("File storage directory not a relative" + " path") + #### allocator run + + if [self.op.iallocator, self.op.pnode].count(None) != 1: + raise errors.OpPrereqError("One and only one of iallocator and primary" + " node must be given") + + if self.op.iallocator is not None: + self._RunAllocator() + + #### node related checks + # check primary node pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode)) if pnode is None: @@ -2901,10 +3346,8 @@ class LUCreateInstance(LogicalUnit): self.op.pnode = pnode.name self.pnode = pnode self.secondaries = [] - # disk template and mirror node verification - if self.op.disk_template not in constants.DISK_TEMPLATES: - raise errors.OpPrereqError("Invalid disk template name") + # mirror node verification if self.op.disk_template in constants.DTS_NET_MIRROR: if getattr(self.op, "snode", None) is None: raise errors.OpPrereqError("The networked disk templates need" @@ -2919,19 +3362,8 @@ class LUCreateInstance(LogicalUnit): " the primary node.") self.secondaries.append(snode_name) - # Required free disk space as a function of disk and swap space - req_size_dict = { - constants.DT_DISKLESS: None, - constants.DT_PLAIN: self.op.disk_size + self.op.swap_size, - # 256 MB are added for drbd metadata, 128MB for each drbd device - constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256, - } - - if self.op.disk_template not in req_size_dict: - raise errors.ProgrammerError("Disk template '%s' size requirement" - " is unknown" % self.op.disk_template) - - req_size = req_size_dict[self.op.disk_template] + req_size = _ComputeDiskSize(self.op.disk_template, + self.op.disk_size, self.op.swap_size) # Check lv size requirements if req_size is not None: @@ -2960,60 +3392,13 @@ class LUCreateInstance(LogicalUnit): if self.op.kernel_path == constants.VALUE_NONE: raise errors.OpPrereqError("Can't set instance kernel to none") - # instance verification - hostname1 = utils.HostInfo(self.op.instance_name) - - self.op.instance_name = instance_name = hostname1.name - instance_list = self.cfg.GetInstanceList() - if instance_name in instance_list: - raise errors.OpPrereqError("Instance '%s' is already in the cluster" % - instance_name) - - ip = getattr(self.op, "ip", None) - if ip is None or ip.lower() == "none": - inst_ip = None - elif ip.lower() == "auto": - inst_ip = hostname1.ip - else: - if not utils.IsValidIP(ip): - raise errors.OpPrereqError("given IP address '%s' doesn't look" - " like a valid IP" % ip) - inst_ip = ip - self.inst_ip = inst_ip - - if self.op.start and not self.op.ip_check: - raise errors.OpPrereqError("Cannot ignore IP address conflicts when" - " adding an instance in start mode") - - if self.op.ip_check: - if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT): - raise errors.OpPrereqError("IP %s of instance %s already in use" % - (hostname1.ip, instance_name)) - - # MAC address verification - if self.op.mac != "auto": - if not utils.IsValidMac(self.op.mac.lower()): - raise errors.OpPrereqError("invalid MAC address specified: %s" % - self.op.mac) - - # bridge verification - bridge = getattr(self.op, "bridge", None) - if bridge is None: - self.op.bridge = self.cfg.GetDefBridge() - else: - self.op.bridge = bridge + # bridge check on primary node if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]): raise errors.OpPrereqError("target bridge '%s' does not exist on" " destination node '%s'" % (self.op.bridge, pnode.name)) - # boot order verification - if self.op.hvm_boot_order is not None: - if len(self.op.hvm_boot_order.strip("acdn")) != 0: - raise errors.OpPrereqError("invalid boot order specified," - " must be one or more of [acdn]") - if self.op.start: self.instance_status = 'up' else: @@ -3041,11 +3426,25 @@ class LUCreateInstance(LogicalUnit): else: network_port = None + # this is needed because os.path.join does not accept None arguments + if self.op.file_storage_dir is None: + string_file_storage_dir = "" + else: + string_file_storage_dir = self.op.file_storage_dir + + # build the full file storage dir path + file_storage_dir = os.path.normpath(os.path.join( + self.sstore.GetFileStorageDir(), + string_file_storage_dir, instance)) + + disks = _GenerateDiskTemplate(self.cfg, self.op.disk_template, instance, pnode_name, self.secondaries, self.op.disk_size, - self.op.swap_size) + self.op.swap_size, + file_storage_dir, + self.op.file_driver) iobj = objects.Instance(name=instance, os=self.op.os_type, primary_node=pnode_name, @@ -3825,7 +4224,7 @@ class LUQueryInstanceData(NoHooksLU): return result -class LUSetInstanceParms(LogicalUnit): +class LUSetInstanceParams(LogicalUnit): """Modifies an instances's parameters. """ @@ -3877,9 +4276,9 @@ class LUSetInstanceParms(LogicalUnit): self.kernel_path = getattr(self.op, "kernel_path", None) self.initrd_path = getattr(self.op, "initrd_path", None) self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None) - all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac, - self.kernel_path, self.initrd_path, self.hvm_boot_order] - if all_parms.count(None) == len(all_parms): + all_params = [self.mem, self.vcpus, self.ip, self.bridge, self.mac, + self.kernel_path, self.initrd_path, self.hvm_boot_order] + if all_params.count(None) == len(all_params): raise errors.OpPrereqError("No changes submitted") if self.mem is not None: try: @@ -4037,7 +4436,7 @@ class LUExportInstance(LogicalUnit): def CheckPrereq(self): """Check prerequisites. - This checks that the instance name is a valid one. + This checks that the instance and node names are valid. """ instance_name = self.cfg.ExpandInstanceName(self.op.instance_name) @@ -4062,10 +4461,11 @@ class LUExportInstance(LogicalUnit): instance = self.instance dst_node = self.dst_node src_node = instance.primary_node - # shutdown the instance, unless requested not to do so if self.op.shutdown: - op = opcodes.OpShutdownInstance(instance_name=instance.name) - self.proc.ChainOpCode(op) + # shutdown the instance, but not the disks + if not rpc.call_instance_shutdown(src_node, instance): + raise errors.OpExecError("Could not shutdown instance %s on node %s" % + (instance.name, src_node)) vgname = self.cfg.GetVGName() @@ -4088,22 +4488,20 @@ class LUExportInstance(LogicalUnit): snap_disks.append(new_dev) finally: - if self.op.shutdown: - op = opcodes.OpStartupInstance(instance_name=instance.name, - force=False) - self.proc.ChainOpCode(op) + if self.op.shutdown and instance.status == "up": + if not rpc.call_instance_start(src_node, instance, None): + _ShutdownInstanceDisks(instance, self.cfg) + raise errors.OpExecError("Could not start instance") # TODO: check for size for dev in snap_disks: - if not rpc.call_snapshot_export(src_node, dev, dst_node.name, - instance): - logger.Error("could not export block device %s from node" - " %s to node %s" % - (dev.logical_id[1], src_node, dst_node.name)) + if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance): + logger.Error("could not export block device %s from node %s to node %s" + % (dev.logical_id[1], src_node, dst_node.name)) if not rpc.call_blockdev_remove(src_node, dev): - logger.Error("could not remove snapshot block device %s from" - " node %s" % (dev.logical_id[1], src_node)) + logger.Error("could not remove snapshot block device %s from node %s" % + (dev.logical_id[1], src_node)) if not rpc.call_finalize_export(dst_node.name, instance, snap_disks): logger.Error("could not finalize export for instance %s on node %s" % @@ -4125,6 +4523,45 @@ class LUExportInstance(LogicalUnit): " on node %s" % (instance.name, node)) +class LURemoveExport(NoHooksLU): + """Remove exports related to the named instance. + + """ + _OP_REQP = ["instance_name"] + + def CheckPrereq(self): + """Check prerequisites. + """ + pass + + def Exec(self, feedback_fn): + """Remove any export. + + """ + instance_name = self.cfg.ExpandInstanceName(self.op.instance_name) + # If the instance was not found we'll try with the name that was passed in. + # This will only work if it was an FQDN, though. + fqdn_warn = False + if not instance_name: + fqdn_warn = True + instance_name = self.op.instance_name + + op = opcodes.OpQueryExports(nodes=[]) + exportlist = self.proc.ChainOpCode(op) + found = False + for node in exportlist: + if instance_name in exportlist[node]: + found = True + if not rpc.call_export_remove(node, instance_name): + logger.Error("could not remove export for instance %s" + " on node %s" % (instance_name, node)) + + if fqdn_warn and not found: + feedback_fn("Export not found. If trying to remove an export belonging" + " to a deleted instance please use its Fully Qualified" + " Domain Name.") + + class TagsLU(NoHooksLU): """Generic tags LU. @@ -4310,3 +4747,342 @@ class LUTestDelay(NoHooksLU): if not node_result: raise errors.OpExecError("Failure during rpc call to node %s," " result: %s" % (node, node_result)) + + +class IAllocator(object): + """IAllocator framework. + + An IAllocator instance has three sets of attributes: + - cfg/sstore that are needed to query the cluster + - input data (all members of the _KEYS class attribute are required) + - four buffer attributes (in|out_data|text), that represent the + input (to the external script) in text and data structure format, + and the output from it, again in two formats + - the result variables from the script (success, info, nodes) for + easy usage + + """ + _KEYS = [ + "mode", "name", + "mem_size", "disks", "disk_template", + "os", "tags", "nics", "vcpus", + ] + + def __init__(self, cfg, sstore, **kwargs): + self.cfg = cfg + self.sstore = sstore + # init buffer variables + self.in_text = self.out_text = self.in_data = self.out_data = None + # init all input fields so that pylint is happy + self.mode = self.name = None + self.mem_size = self.disks = self.disk_template = None + self.os = self.tags = self.nics = self.vcpus = None + # computed fields + self.required_nodes = None + # init result fields + self.success = self.info = self.nodes = None + for key in kwargs: + if key not in self._KEYS: + raise errors.ProgrammerError("Invalid input parameter '%s' to" + " IAllocator" % key) + setattr(self, key, kwargs[key]) + for key in self._KEYS: + if key not in kwargs: + raise errors.ProgrammerError("Missing input parameter '%s' to" + " IAllocator" % key) + self._BuildInputData() + + def _ComputeClusterData(self): + """Compute the generic allocator input data. + + This is the data that is independent of the actual operation. + + """ + cfg = self.cfg + # cluster data + data = { + "version": 1, + "cluster_name": self.sstore.GetClusterName(), + "cluster_tags": list(cfg.GetClusterInfo().GetTags()), + # we don't have job IDs + } + + # node data + node_results = {} + node_list = cfg.GetNodeList() + node_data = rpc.call_node_info(node_list, cfg.GetVGName()) + for nname in node_list: + ninfo = cfg.GetNodeInfo(nname) + if nname not in node_data or not isinstance(node_data[nname], dict): + raise errors.OpExecError("Can't get data for node %s" % nname) + remote_info = node_data[nname] + for attr in ['memory_total', 'memory_free', + 'vg_size', 'vg_free']: + if attr not in remote_info: + raise errors.OpExecError("Node '%s' didn't return attribute '%s'" % + (nname, attr)) + try: + int(remote_info[attr]) + except ValueError, err: + raise errors.OpExecError("Node '%s' returned invalid value for '%s':" + " %s" % (nname, attr, str(err))) + pnr = { + "tags": list(ninfo.GetTags()), + "total_memory": utils.TryConvert(int, remote_info['memory_total']), + "free_memory": utils.TryConvert(int, remote_info['memory_free']), + "total_disk": utils.TryConvert(int, remote_info['vg_size']), + "free_disk": utils.TryConvert(int, remote_info['vg_free']), + "primary_ip": ninfo.primary_ip, + "secondary_ip": ninfo.secondary_ip, + } + node_results[nname] = pnr + data["nodes"] = node_results + + # instance data + instance_data = {} + i_list = cfg.GetInstanceList() + for iname in i_list: + iinfo = cfg.GetInstanceInfo(iname) + nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge} + for n in iinfo.nics] + pir = { + "tags": list(iinfo.GetTags()), + "should_run": iinfo.status == "up", + "vcpus": iinfo.vcpus, + "memory": iinfo.memory, + "os": iinfo.os, + "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes), + "nics": nic_data, + "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks], + "disk_template": iinfo.disk_template, + } + instance_data[iname] = pir + + data["instances"] = instance_data + + self.in_data = data + + def _AddNewInstance(self): + """Add new instance data to allocator structure. + + This in combination with _AllocatorGetClusterData will create the + correct structure needed as input for the allocator. + + The checks for the completeness of the opcode must have already been + done. + + """ + data = self.in_data + if len(self.disks) != 2: + raise errors.OpExecError("Only two-disk configurations supported") + + disk_space = _ComputeDiskSize(self.disk_template, + self.disks[0]["size"], self.disks[1]["size"]) + + if self.disk_template in constants.DTS_NET_MIRROR: + self.required_nodes = 2 + else: + self.required_nodes = 1 + request = { + "type": "allocate", + "name": self.name, + "disk_template": self.disk_template, + "tags": self.tags, + "os": self.os, + "vcpus": self.vcpus, + "memory": self.mem_size, + "disks": self.disks, + "disk_space_total": disk_space, + "nics": self.nics, + "required_nodes": self.required_nodes, + } + data["request"] = request + + def _AddRelocateInstance(self): + """Add relocate instance data to allocator structure. + + This in combination with _IAllocatorGetClusterData will create the + correct structure needed as input for the allocator. + + The checks for the completeness of the opcode must have already been + done. + + """ + instance = self.cfg.GetInstanceInfo(self.name) + if instance is None: + raise errors.ProgrammerError("Unknown instance '%s' passed to" + " IAllocator" % self.name) + + if instance.disk_template not in constants.DTS_NET_MIRROR: + raise errors.OpPrereqError("Can't relocate non-mirrored instances") + + if len(instance.secondary_nodes) != 1: + raise errors.OpPrereqError("Instance has not exactly one secondary node") + + self.required_nodes = 1 + + disk_space = _ComputeDiskSize(instance.disk_template, + instance.disks[0].size, + instance.disks[1].size) + + request = { + "type": "relocate", + "name": self.name, + "disk_space_total": disk_space, + "required_nodes": self.required_nodes, + "nodes": list(instance.secondary_nodes), + } + self.in_data["request"] = request + + def _BuildInputData(self): + """Build input data structures. + + """ + self._ComputeClusterData() + + if self.mode == constants.IALLOCATOR_MODE_ALLOC: + self._AddNewInstance() + else: + self._AddRelocateInstance() + + self.in_text = serializer.Dump(self.in_data) + + def Run(self, name, validate=True): + """Run an instance allocator and return the results. + + """ + data = self.in_text + + alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH, + os.path.isfile) + if alloc_script is None: + raise errors.OpExecError("Can't find allocator '%s'" % name) + + fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.") + try: + os.write(fd, data) + os.close(fd) + result = utils.RunCmd([alloc_script, fin_name]) + if result.failed: + raise errors.OpExecError("Instance allocator call failed: %s," + " output: %s" % + (result.fail_reason, result.output)) + finally: + os.unlink(fin_name) + self.out_text = result.stdout + if validate: + self._ValidateResult() + + def _ValidateResult(self): + """Process the allocator results. + + This will process and if successful save the result in + self.out_data and the other parameters. + + """ + try: + rdict = serializer.Load(self.out_text) + except Exception, err: + raise errors.OpExecError("Can't parse iallocator results: %s" % str(err)) + + if not isinstance(rdict, dict): + raise errors.OpExecError("Can't parse iallocator results: not a dict") + + for key in "success", "info", "nodes": + if key not in rdict: + raise errors.OpExecError("Can't parse iallocator results:" + " missing key '%s'" % key) + setattr(self, key, rdict[key]) + + if not isinstance(rdict["nodes"], list): + raise errors.OpExecError("Can't parse iallocator results: 'nodes' key" + " is not a list") + self.out_data = rdict + + +class LUTestAllocator(NoHooksLU): + """Run allocator tests. + + This LU runs the allocator tests + + """ + _OP_REQP = ["direction", "mode", "name"] + + def CheckPrereq(self): + """Check prerequisites. + + This checks the opcode parameters depending on the director and mode test. + + """ + if self.op.mode == constants.IALLOCATOR_MODE_ALLOC: + for attr in ["name", "mem_size", "disks", "disk_template", + "os", "tags", "nics", "vcpus"]: + if not hasattr(self.op, attr): + raise errors.OpPrereqError("Missing attribute '%s' on opcode input" % + attr) + iname = self.cfg.ExpandInstanceName(self.op.name) + if iname is not None: + raise errors.OpPrereqError("Instance '%s' already in the cluster" % + iname) + if not isinstance(self.op.nics, list): + raise errors.OpPrereqError("Invalid parameter 'nics'") + for row in self.op.nics: + if (not isinstance(row, dict) or + "mac" not in row or + "ip" not in row or + "bridge" not in row): + raise errors.OpPrereqError("Invalid contents of the" + " 'nics' parameter") + if not isinstance(self.op.disks, list): + raise errors.OpPrereqError("Invalid parameter 'disks'") + if len(self.op.disks) != 2: + raise errors.OpPrereqError("Only two-disk configurations supported") + for row in self.op.disks: + if (not isinstance(row, dict) or + "size" not in row or + not isinstance(row["size"], int) or + "mode" not in row or + row["mode"] not in ['r', 'w']): + raise errors.OpPrereqError("Invalid contents of the" + " 'disks' parameter") + elif self.op.mode == constants.IALLOCATOR_MODE_RELOC: + if not hasattr(self.op, "name"): + raise errors.OpPrereqError("Missing attribute 'name' on opcode input") + fname = self.cfg.ExpandInstanceName(self.op.name) + if fname is None: + raise errors.OpPrereqError("Instance '%s' not found for relocation" % + self.op.name) + self.op.name = fname + else: + raise errors.OpPrereqError("Invalid test allocator mode '%s'" % + self.op.mode) + + if self.op.direction == constants.IALLOCATOR_DIR_OUT: + if not hasattr(self.op, "allocator") or self.op.allocator is None: + raise errors.OpPrereqError("Missing allocator name") + elif self.op.direction != constants.IALLOCATOR_DIR_IN: + raise errors.OpPrereqError("Wrong allocator test '%s'" % + self.op.direction) + + def Exec(self, feedback_fn): + """Run the allocator test. + + """ + ial = IAllocator(self.cfg, self.sstore, + mode=self.op.mode, + name=self.op.name, + mem_size=self.op.mem_size, + disks=self.op.disks, + disk_template=self.op.disk_template, + os=self.op.os, + tags=self.op.tags, + nics=self.op.nics, + vcpus=self.op.vcpus, + ) + + if self.op.direction == constants.IALLOCATOR_DIR_IN: + result = ial.in_text + else: + ial.Run(self.op.allocator, validate=False) + result = ial.out_text + return result