X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/601908d0f5e528a3ddfc7cefb2bbfd9f99e7a39f..c1e7897de040fb86a71100684b64f46c8b08e2eb:/lib/cmdlib.py?ds=sidebyside diff --git a/lib/cmdlib.py b/lib/cmdlib.py index 39e7d06..9f8b1ff 100644 --- a/lib/cmdlib.py +++ b/lib/cmdlib.py @@ -33,6 +33,7 @@ import re import platform import logging import copy +import OpenSSL from ganeti import ssh from ganeti import utils @@ -43,6 +44,11 @@ from ganeti import constants from ganeti import objects from ganeti import serializer from ganeti import ssconf +from ganeti import uidpool +from ganeti import compat +from ganeti import masterd + +import ganeti.masterd.instance # pylint: disable-msg=W0611 class LogicalUnit(object): @@ -541,6 +547,75 @@ def _CheckNodeNotDrained(lu, node): errors.ECODE_INVAL) +def _CheckNodeHasOS(lu, node, os_name, force_variant): + """Ensure that a node supports a given OS. + + @param lu: the LU on behalf of which we make the check + @param node: the node to check + @param os_name: the OS to query about + @param force_variant: whether to ignore variant errors + @raise errors.OpPrereqError: if the node is not supporting the OS + + """ + result = lu.rpc.call_os_get(node, os_name) + result.Raise("OS '%s' not in supported OS list for node %s" % + (os_name, node), + prereq=True, ecode=errors.ECODE_INVAL) + if not force_variant: + _CheckOSVariant(result.payload, os_name) + + +def _RequireFileStorage(): + """Checks that file storage is enabled. + + @raise errors.OpPrereqError: when file storage is disabled + + """ + if not constants.ENABLE_FILE_STORAGE: + raise errors.OpPrereqError("File storage disabled at configure time", + errors.ECODE_INVAL) + + +def _CheckDiskTemplate(template): + """Ensure a given disk template is valid. + + """ + if template not in constants.DISK_TEMPLATES: + msg = ("Invalid disk template name '%s', valid templates are: %s" % + (template, utils.CommaJoin(constants.DISK_TEMPLATES))) + raise errors.OpPrereqError(msg, errors.ECODE_INVAL) + if template == constants.DT_FILE: + _RequireFileStorage() + + +def _CheckStorageType(storage_type): + """Ensure a given storage type is valid. + + """ + if storage_type not in constants.VALID_STORAGE_TYPES: + raise errors.OpPrereqError("Unknown storage type: %s" % storage_type, + errors.ECODE_INVAL) + if storage_type == constants.ST_FILE: + _RequireFileStorage() + + + +def _CheckInstanceDown(lu, instance, reason): + """Ensure that an instance is not running.""" + if instance.admin_up: + raise errors.OpPrereqError("Instance %s is marked to be up, %s" % + (instance.name, reason), errors.ECODE_STATE) + + pnode = instance.primary_node + ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode] + ins_l.Raise("Can't contact node %s for instance information" % pnode, + prereq=True, ecode=errors.ECODE_ENVIRON) + + if instance.name in ins_l.payload: + raise errors.OpPrereqError("Instance %s is running, %s" % + (instance.name, reason), errors.ECODE_STATE) + + def _ExpandItemName(fn, name, kind): """Expand an item name. @@ -939,6 +1014,39 @@ class LUDestroyCluster(LogicalUnit): return master +def _VerifyCertificate(filename): + """Verifies a certificate for LUVerifyCluster. + + @type filename: string + @param filename: Path to PEM file + + """ + try: + cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, + utils.ReadFile(filename)) + except Exception, err: # pylint: disable-msg=W0703 + return (LUVerifyCluster.ETYPE_ERROR, + "Failed to load X509 certificate %s: %s" % (filename, err)) + + (errcode, msg) = \ + utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN, + constants.SSL_CERT_EXPIRATION_ERROR) + + if msg: + fnamemsg = "While verifying %s: %s" % (filename, msg) + else: + fnamemsg = None + + if errcode is None: + return (None, fnamemsg) + elif errcode == utils.CERT_WARNING: + return (LUVerifyCluster.ETYPE_WARNING, fnamemsg) + elif errcode == utils.CERT_ERROR: + return (LUVerifyCluster.ETYPE_ERROR, fnamemsg) + + raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode) + + class LUVerifyCluster(LogicalUnit): """Verifies the cluster status. @@ -953,6 +1061,7 @@ class LUVerifyCluster(LogicalUnit): TINSTANCE = "instance" ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG") + ECLUSTERCERT = (TCLUSTER, "ECLUSTERCERT") EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE") EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN") EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT") @@ -978,6 +1087,44 @@ class LUVerifyCluster(LogicalUnit): ETYPE_ERROR = "ERROR" ETYPE_WARNING = "WARNING" + class NodeImage(object): + """A class representing the logical and physical status of a node. + + @ivar volumes: a structure as returned from + L{ganeti.backend.GetVolumeList} (runtime) + @ivar instances: a list of running instances (runtime) + @ivar pinst: list of configured primary instances (config) + @ivar sinst: list of configured secondary instances (config) + @ivar sbp: diction of {secondary-node: list of instances} of all peers + of this node (config) + @ivar mfree: free memory, as reported by hypervisor (runtime) + @ivar dfree: free disk, as reported by the node (runtime) + @ivar offline: the offline status (config) + @type rpc_fail: boolean + @ivar rpc_fail: whether the RPC verify call was successfull (overall, + not whether the individual keys were correct) (runtime) + @type lvm_fail: boolean + @ivar lvm_fail: whether the RPC call didn't return valid LVM data + @type hyp_fail: boolean + @ivar hyp_fail: whether the RPC call didn't return the instance list + @type ghost: boolean + @ivar ghost: whether this is a known node or not (config) + + """ + def __init__(self, offline=False): + self.volumes = {} + self.instances = [] + self.pinst = [] + self.sinst = [] + self.sbp = {} + self.mfree = 0 + self.dfree = 0 + self.offline = offline + self.rpc_fail = False + self.lvm_fail = False + self.hyp_fail = False + self.ghost = False + def ExpandNames(self): self.needed_locks = { locking.LEVEL_NODE: locking.ALL_SET, @@ -1022,8 +1169,7 @@ class LUVerifyCluster(LogicalUnit): if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR: self.bad = self.bad or cond - def _VerifyNode(self, nodeinfo, file_list, local_cksum, - node_result, master_files, drbd_map, vg_name): + def _VerifyNode(self, ninfo, nresult): """Run multiple tests against a node. Test list: @@ -1033,45 +1179,41 @@ class LUVerifyCluster(LogicalUnit): - checks config file checksum - checks ssh to other nodes - @type nodeinfo: L{objects.Node} - @param nodeinfo: the node to check - @param file_list: required list of files - @param local_cksum: dictionary of local files and their checksums - @param node_result: the results from the node - @param master_files: list of files that only masters should have - @param drbd_map: the useddrbd minors for this node, in - form of minor: (instance, must_exist) which correspond to instances - and their running status - @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName()) + @type ninfo: L{objects.Node} + @param ninfo: the node to check + @param nresult: the results from the node + @rtype: boolean + @return: whether overall this call was successful (and we can expect + reasonable values in the respose) """ - node = nodeinfo.name + node = ninfo.name _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 - # main result, node_result should be a non-empty dict - test = not node_result or not isinstance(node_result, dict) + # main result, nresult should be a non-empty dict + test = not nresult or not isinstance(nresult, dict) _ErrorIf(test, self.ENODERPC, node, "unable to verify node: no data returned") if test: - return + return False # compares ganeti version local_version = constants.PROTOCOL_VERSION - remote_version = node_result.get('version', None) + remote_version = nresult.get("version", None) test = not (remote_version and isinstance(remote_version, (list, tuple)) and len(remote_version) == 2) _ErrorIf(test, self.ENODERPC, node, "connection to node returned invalid data") if test: - return + return False test = local_version != remote_version[0] _ErrorIf(test, self.ENODEVERSION, node, "incompatible protocol versions: master %s," " node %s", local_version, remote_version[0]) if test: - return + return False # node seems compatible, we can actually try to look into its results @@ -1082,111 +1224,122 @@ class LUVerifyCluster(LogicalUnit): constants.RELEASE_VERSION, remote_version[1], code=self.ETYPE_WARNING) - # checks vg existence and size > 20G - if vg_name is not None: - vglist = node_result.get(constants.NV_VGLIST, None) - test = not vglist - _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups") - if not test: - vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name, - constants.MIN_VG_SIZE) - _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus) + hyp_result = nresult.get(constants.NV_HYPERVISOR, None) + if isinstance(hyp_result, dict): + for hv_name, hv_result in hyp_result.iteritems(): + test = hv_result is not None + _ErrorIf(test, self.ENODEHV, node, + "hypervisor %s verify failure: '%s'", hv_name, hv_result) - # checks config file checksum - remote_cksum = node_result.get(constants.NV_FILELIST, None) - test = not isinstance(remote_cksum, dict) - _ErrorIf(test, self.ENODEFILECHECK, node, - "node hasn't returned file checksum data") + test = nresult.get(constants.NV_NODESETUP, + ["Missing NODESETUP results"]) + _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s", + "; ".join(test)) + + return True + + def _VerifyNodeTime(self, ninfo, nresult, + nvinfo_starttime, nvinfo_endtime): + """Check the node time. + + @type ninfo: L{objects.Node} + @param ninfo: the node to check + @param nresult: the remote results for the node + @param nvinfo_starttime: the start time of the RPC call + @param nvinfo_endtime: the end time of the RPC call + + """ + node = ninfo.name + _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 + + ntime = nresult.get(constants.NV_TIME, None) + try: + ntime_merged = utils.MergeTime(ntime) + except (ValueError, TypeError): + _ErrorIf(True, self.ENODETIME, node, "Node returned invalid time") + return + + if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW): + ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged) + elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW): + ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime) + else: + ntime_diff = None + + _ErrorIf(ntime_diff is not None, self.ENODETIME, node, + "Node time diverges by at least %s from master node time", + ntime_diff) + + def _VerifyNodeLVM(self, ninfo, nresult, vg_name): + """Check the node time. + + @type ninfo: L{objects.Node} + @param ninfo: the node to check + @param nresult: the remote results for the node + @param vg_name: the configured VG name + + """ + if vg_name is None: + return + + node = ninfo.name + _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 + + # checks vg existence and size > 20G + vglist = nresult.get(constants.NV_VGLIST, None) + test = not vglist + _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups") + if not test: + vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name, + constants.MIN_VG_SIZE) + _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus) + + # check pv names + pvlist = nresult.get(constants.NV_PVLIST, None) + test = pvlist is None + _ErrorIf(test, self.ENODELVM, node, "Can't get PV list from node") if not test: - for file_name in file_list: - node_is_mc = nodeinfo.master_candidate - must_have = (file_name not in master_files) or node_is_mc - # missing - test1 = file_name not in remote_cksum - # invalid checksum - test2 = not test1 and remote_cksum[file_name] != local_cksum[file_name] - # existing and good - test3 = not test1 and remote_cksum[file_name] == local_cksum[file_name] - _ErrorIf(test1 and must_have, self.ENODEFILECHECK, node, - "file '%s' missing", file_name) - _ErrorIf(test2 and must_have, self.ENODEFILECHECK, node, - "file '%s' has wrong checksum", file_name) - # not candidate and this is not a must-have file - _ErrorIf(test2 and not must_have, self.ENODEFILECHECK, node, - "file '%s' should not exist on non master" - " candidates (and the file is outdated)", file_name) - # all good, except non-master/non-must have combination - _ErrorIf(test3 and not must_have, self.ENODEFILECHECK, node, - "file '%s' should not exist" - " on non master candidates", file_name) - - # checks ssh to any - - test = constants.NV_NODELIST not in node_result + # check that ':' is not present in PV names, since it's a + # special character for lvcreate (denotes the range of PEs to + # use on the PV) + for _, pvname, owner_vg in pvlist: + test = ":" in pvname + _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV" + " '%s' of VG '%s'", pvname, owner_vg) + + def _VerifyNodeNetwork(self, ninfo, nresult): + """Check the node time. + + @type ninfo: L{objects.Node} + @param ninfo: the node to check + @param nresult: the remote results for the node + + """ + node = ninfo.name + _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 + + test = constants.NV_NODELIST not in nresult _ErrorIf(test, self.ENODESSH, node, "node hasn't returned node ssh connectivity data") if not test: - if node_result[constants.NV_NODELIST]: - for a_node, a_msg in node_result[constants.NV_NODELIST].items(): + if nresult[constants.NV_NODELIST]: + for a_node, a_msg in nresult[constants.NV_NODELIST].items(): _ErrorIf(True, self.ENODESSH, node, "ssh communication with node '%s': %s", a_node, a_msg) - test = constants.NV_NODENETTEST not in node_result + test = constants.NV_NODENETTEST not in nresult _ErrorIf(test, self.ENODENET, node, "node hasn't returned node tcp connectivity data") if not test: - if node_result[constants.NV_NODENETTEST]: - nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys()) + if nresult[constants.NV_NODENETTEST]: + nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys()) for anode in nlist: _ErrorIf(True, self.ENODENET, node, "tcp communication with node '%s': %s", - anode, node_result[constants.NV_NODENETTEST][anode]) - - hyp_result = node_result.get(constants.NV_HYPERVISOR, None) - if isinstance(hyp_result, dict): - for hv_name, hv_result in hyp_result.iteritems(): - test = hv_result is not None - _ErrorIf(test, self.ENODEHV, node, - "hypervisor %s verify failure: '%s'", hv_name, hv_result) + anode, nresult[constants.NV_NODENETTEST][anode]) - # check used drbd list - if vg_name is not None: - used_minors = node_result.get(constants.NV_DRBDLIST, []) - test = not isinstance(used_minors, (tuple, list)) - _ErrorIf(test, self.ENODEDRBD, node, - "cannot parse drbd status file: %s", str(used_minors)) - if not test: - for minor, (iname, must_exist) in drbd_map.items(): - test = minor not in used_minors and must_exist - _ErrorIf(test, self.ENODEDRBD, node, - "drbd minor %d of instance %s is not active", - minor, iname) - for minor in used_minors: - test = minor not in drbd_map - _ErrorIf(test, self.ENODEDRBD, node, - "unallocated drbd minor %d is in use", minor) - test = node_result.get(constants.NV_NODESETUP, - ["Missing NODESETUP results"]) - _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s", - "; ".join(test)) - - # check pv names - if vg_name is not None: - pvlist = node_result.get(constants.NV_PVLIST, None) - test = pvlist is None - _ErrorIf(test, self.ENODELVM, node, "Can't get PV list from node") - if not test: - # check that ':' is not present in PV names, since it's a - # special character for lvcreate (denotes the range of PEs to - # use on the PV) - for _, pvname, owner_vg in pvlist: - test = ":" in pvname - _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV" - " '%s' of VG '%s'", pvname, owner_vg) - - def _VerifyInstance(self, instance, instanceconfig, node_vol_is, - node_instance, n_offline): + def _VerifyInstance(self, instance, instanceconfig, node_image): """Verify an instance. This function checks to see if the required block devices are @@ -1200,81 +1353,264 @@ class LUVerifyCluster(LogicalUnit): instanceconfig.MapLVsByNode(node_vol_should) for node in node_vol_should: - if node in n_offline: - # ignore missing volumes on offline nodes + n_img = node_image[node] + if n_img.offline or n_img.rpc_fail or n_img.lvm_fail: + # ignore missing volumes on offline or broken nodes continue for volume in node_vol_should[node]: - test = node not in node_vol_is or volume not in node_vol_is[node] + test = volume not in n_img.volumes _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance, "volume %s missing on node %s", volume, node) if instanceconfig.admin_up: - test = ((node_current not in node_instance or - not instance in node_instance[node_current]) and - node_current not in n_offline) + pri_img = node_image[node_current] + test = instance not in pri_img.instances and not pri_img.offline _ErrorIf(test, self.EINSTANCEDOWN, instance, "instance not running on its primary node %s", node_current) - for node in node_instance: + for node, n_img in node_image.items(): if (not node == node_current): - test = instance in node_instance[node] + test = instance in n_img.instances _ErrorIf(test, self.EINSTANCEWRONGNODE, instance, "instance should not run on node %s", node) - def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is): + def _VerifyOrphanVolumes(self, node_vol_should, node_image): """Verify if there are any unknown volumes in the cluster. The .os, .swap and backup volumes are ignored. All other volumes are reported as unknown. """ - for node in node_vol_is: - for volume in node_vol_is[node]: + for node, n_img in node_image.items(): + if n_img.offline or n_img.rpc_fail or n_img.lvm_fail: + # skip non-healthy nodes + continue + for volume in n_img.volumes: test = (node not in node_vol_should or volume not in node_vol_should[node]) self._ErrorIf(test, self.ENODEORPHANLV, node, "volume %s is unknown", volume) - def _VerifyOrphanInstances(self, instancelist, node_instance): + def _VerifyOrphanInstances(self, instancelist, node_image): """Verify the list of running instances. This checks what instances are running but unknown to the cluster. """ - for node in node_instance: - for o_inst in node_instance[node]: + for node, n_img in node_image.items(): + for o_inst in n_img.instances: test = o_inst not in instancelist self._ErrorIf(test, self.ENODEORPHANINSTANCE, node, "instance %s on node %s should not exist", o_inst, node) - def _VerifyNPlusOneMemory(self, node_info, instance_cfg): + def _VerifyNPlusOneMemory(self, node_image, instance_cfg): """Verify N+1 Memory Resilience. - Check that if one single node dies we can still start all the instances it - was primary for. + Check that if one single node dies we can still start all the + instances it was primary for. """ - for node, nodeinfo in node_info.iteritems(): - # This code checks that every node which is now listed as secondary has - # enough memory to host all instances it is supposed to should a single - # other node in the cluster fail. + for node, n_img in node_image.items(): + # This code checks that every node which is now listed as + # secondary has enough memory to host all instances it is + # supposed to should a single other node in the cluster fail. # FIXME: not ready for failover to an arbitrary node # FIXME: does not support file-backed instances - # WARNING: we currently take into account down instances as well as up - # ones, considering that even if they're down someone might want to start - # them even in the event of a node failure. - for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems(): + # WARNING: we currently take into account down instances as well + # as up ones, considering that even if they're down someone + # might want to start them even in the event of a node failure. + for prinode, instances in n_img.sbp.items(): needed_mem = 0 for instance in instances: bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance]) if bep[constants.BE_AUTO_BALANCE]: needed_mem += bep[constants.BE_MEMORY] - test = nodeinfo['mfree'] < needed_mem + test = n_img.mfree < needed_mem self._ErrorIf(test, self.ENODEN1, node, "not enough memory on to accommodate" " failovers should peer node %s fail", prinode) + def _VerifyNodeFiles(self, ninfo, nresult, file_list, local_cksum, + master_files): + """Verifies and computes the node required file checksums. + + @type ninfo: L{objects.Node} + @param ninfo: the node to check + @param nresult: the remote results for the node + @param file_list: required list of files + @param local_cksum: dictionary of local files and their checksums + @param master_files: list of files that only masters should have + + """ + node = ninfo.name + _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 + + remote_cksum = nresult.get(constants.NV_FILELIST, None) + test = not isinstance(remote_cksum, dict) + _ErrorIf(test, self.ENODEFILECHECK, node, + "node hasn't returned file checksum data") + if test: + return + + for file_name in file_list: + node_is_mc = ninfo.master_candidate + must_have = (file_name not in master_files) or node_is_mc + # missing + test1 = file_name not in remote_cksum + # invalid checksum + test2 = not test1 and remote_cksum[file_name] != local_cksum[file_name] + # existing and good + test3 = not test1 and remote_cksum[file_name] == local_cksum[file_name] + _ErrorIf(test1 and must_have, self.ENODEFILECHECK, node, + "file '%s' missing", file_name) + _ErrorIf(test2 and must_have, self.ENODEFILECHECK, node, + "file '%s' has wrong checksum", file_name) + # not candidate and this is not a must-have file + _ErrorIf(test2 and not must_have, self.ENODEFILECHECK, node, + "file '%s' should not exist on non master" + " candidates (and the file is outdated)", file_name) + # all good, except non-master/non-must have combination + _ErrorIf(test3 and not must_have, self.ENODEFILECHECK, node, + "file '%s' should not exist" + " on non master candidates", file_name) + + def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_map): + """Verifies and the node DRBD status. + + @type ninfo: L{objects.Node} + @param ninfo: the node to check + @param nresult: the remote results for the node + @param instanceinfo: the dict of instances + @param drbd_map: the DRBD map as returned by + L{ganeti.config.ConfigWriter.ComputeDRBDMap} + + """ + node = ninfo.name + _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 + + # compute the DRBD minors + node_drbd = {} + for minor, instance in drbd_map[node].items(): + test = instance not in instanceinfo + _ErrorIf(test, self.ECLUSTERCFG, None, + "ghost instance '%s' in temporary DRBD map", instance) + # ghost instance should not be running, but otherwise we + # don't give double warnings (both ghost instance and + # unallocated minor in use) + if test: + node_drbd[minor] = (instance, False) + else: + instance = instanceinfo[instance] + node_drbd[minor] = (instance.name, instance.admin_up) + + # and now check them + used_minors = nresult.get(constants.NV_DRBDLIST, []) + test = not isinstance(used_minors, (tuple, list)) + _ErrorIf(test, self.ENODEDRBD, node, + "cannot parse drbd status file: %s", str(used_minors)) + if test: + # we cannot check drbd status + return + + for minor, (iname, must_exist) in node_drbd.items(): + test = minor not in used_minors and must_exist + _ErrorIf(test, self.ENODEDRBD, node, + "drbd minor %d of instance %s is not active", minor, iname) + for minor in used_minors: + test = minor not in node_drbd + _ErrorIf(test, self.ENODEDRBD, node, + "unallocated drbd minor %d is in use", minor) + + def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name): + """Verifies and updates the node volume data. + + This function will update a L{NodeImage}'s internal structures + with data from the remote call. + + @type ninfo: L{objects.Node} + @param ninfo: the node to check + @param nresult: the remote results for the node + @param nimg: the node image object + @param vg_name: the configured VG name + + """ + node = ninfo.name + _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 + + nimg.lvm_fail = True + lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data") + if vg_name is None: + pass + elif isinstance(lvdata, basestring): + _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s", + utils.SafeEncode(lvdata)) + elif not isinstance(lvdata, dict): + _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)") + else: + nimg.volumes = lvdata + nimg.lvm_fail = False + + def _UpdateNodeInstances(self, ninfo, nresult, nimg): + """Verifies and updates the node instance list. + + If the listing was successful, then updates this node's instance + list. Otherwise, it marks the RPC call as failed for the instance + list key. + + @type ninfo: L{objects.Node} + @param ninfo: the node to check + @param nresult: the remote results for the node + @param nimg: the node image object + + """ + idata = nresult.get(constants.NV_INSTANCELIST, None) + test = not isinstance(idata, list) + self._ErrorIf(test, self.ENODEHV, ninfo.name, "rpc call to node failed" + " (instancelist): %s", utils.SafeEncode(str(idata))) + if test: + nimg.hyp_fail = True + else: + nimg.instances = idata + + def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name): + """Verifies and computes a node information map + + @type ninfo: L{objects.Node} + @param ninfo: the node to check + @param nresult: the remote results for the node + @param nimg: the node image object + @param vg_name: the configured VG name + + """ + node = ninfo.name + _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 + + # try to read free memory (from the hypervisor) + hv_info = nresult.get(constants.NV_HVINFO, None) + test = not isinstance(hv_info, dict) or "memory_free" not in hv_info + _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)") + if not test: + try: + nimg.mfree = int(hv_info["memory_free"]) + except (ValueError, TypeError): + _ErrorIf(True, self.ENODERPC, node, + "node returned invalid nodeinfo, check hypervisor") + + # FIXME: devise a free space model for file based instances as well + if vg_name is not None: + test = (constants.NV_VGLIST not in nresult or + vg_name not in nresult[constants.NV_VGLIST]) + _ErrorIf(test, self.ENODELVM, node, + "node didn't return data for the volume group '%s'" + " - it is either missing or broken", vg_name) + if not test: + try: + nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name]) + except (ValueError, TypeError): + _ErrorIf(True, self.ENODERPC, node, + "node returned invalid LVM info, check LVM status") + def CheckPrereq(self): """Check prerequisites. @@ -1315,8 +1651,14 @@ class LUVerifyCluster(LogicalUnit): for msg in self.cfg.VerifyConfig(): _ErrorIf(True, self.ECLUSTERCFG, None, msg) + # Check the cluster certificates + for cert_filename in constants.ALL_CERT_FILES: + (errcode, msg) = _VerifyCertificate(cert_filename) + _ErrorIf(errcode, self.ECLUSTERCERT, None, msg, code=errcode) + vg_name = self.cfg.GetVGName() hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors + cluster = self.cfg.GetClusterInfo() nodelist = utils.NiceSort(self.cfg.GetNodeList()) nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist] instancelist = utils.NiceSort(self.cfg.GetInstanceList()) @@ -1324,21 +1666,19 @@ class LUVerifyCluster(LogicalUnit): for iname in instancelist) i_non_redundant = [] # Non redundant instances i_non_a_balanced = [] # Non auto-balanced instances - n_offline = [] # List of offline nodes - n_drained = [] # List of nodes being drained - node_volume = {} - node_instance = {} - node_info = {} - instance_cfg = {} + n_offline = 0 # Count of offline nodes + n_drained = 0 # Count of nodes being drained + node_vol_should = {} # FIXME: verify OS list # do local checksums master_files = [constants.CLUSTER_CONF_FILE] file_names = ssconf.SimpleStore().GetFileList() - file_names.append(constants.SSL_CERT_FILE) - file_names.append(constants.RAPI_CERT_FILE) + file_names.extend(constants.ALL_CERT_FILES) file_names.extend(master_files) + if cluster.modify_etc_hosts: + file_names.append(constants.ETC_HOSTS) local_checksums = utils.FingerprintFiles(file_names) @@ -1364,6 +1704,35 @@ class LUVerifyCluster(LogicalUnit): node_verify_param[constants.NV_PVLIST] = [vg_name] node_verify_param[constants.NV_DRBDLIST] = None + # Build our expected cluster state + node_image = dict((node.name, self.NodeImage(offline=node.offline)) + for node in nodeinfo) + + for instance in instancelist: + inst_config = instanceinfo[instance] + + for nname in inst_config.all_nodes: + if nname not in node_image: + # ghost node + gnode = self.NodeImage() + gnode.ghost = True + node_image[nname] = gnode + + inst_config.MapLVsByNode(node_vol_should) + + pnode = inst_config.primary_node + node_image[pnode].pinst.append(instance) + + for snode in inst_config.secondary_nodes: + nimg = node_image[snode] + nimg.sinst.append(instance) + if pnode not in nimg.sbp: + nimg.sbp[pnode] = [] + nimg.sbp[pnode].append(instance) + + # At this point, we have the in-memory data structures complete, + # except for the runtime information, which we'll gather next + # Due to the way our RPC system works, exact response times cannot be # guaranteed (e.g. a broken node could run into a timeout). By keeping the # time before and after executing the request, we can at least have a time @@ -1373,18 +1742,18 @@ class LUVerifyCluster(LogicalUnit): self.cfg.GetClusterName()) nvinfo_endtime = time.time() - cluster = self.cfg.GetClusterInfo() master_node = self.cfg.GetMasterNode() all_drbd_map = self.cfg.ComputeDRBDMap() feedback_fn("* Verifying node status") for node_i in nodeinfo: node = node_i.name + nimg = node_image[node] if node_i.offline: if verbose: feedback_fn("* Skipping offline node %s" % (node,)) - n_offline.append(node) + n_offline += 1 continue if node == master_node: @@ -1393,7 +1762,7 @@ class LUVerifyCluster(LogicalUnit): ntype = "master candidate" elif node_i.drained: ntype = "drained" - n_drained.append(node) + n_drained += 1 else: ntype = "regular" if verbose: @@ -1402,128 +1771,38 @@ class LUVerifyCluster(LogicalUnit): msg = all_nvinfo[node].fail_msg _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg) if msg: + nimg.rpc_fail = True continue nresult = all_nvinfo[node].payload - node_drbd = {} - for minor, instance in all_drbd_map[node].items(): - test = instance not in instanceinfo - _ErrorIf(test, self.ECLUSTERCFG, None, - "ghost instance '%s' in temporary DRBD map", instance) - # ghost instance should not be running, but otherwise we - # don't give double warnings (both ghost instance and - # unallocated minor in use) - if test: - node_drbd[minor] = (instance, False) - else: - instance = instanceinfo[instance] - node_drbd[minor] = (instance.name, instance.admin_up) - - self._VerifyNode(node_i, file_names, local_checksums, - nresult, master_files, node_drbd, vg_name) - - lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data") - if vg_name is None: - node_volume[node] = {} - elif isinstance(lvdata, basestring): - _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s", - utils.SafeEncode(lvdata)) - node_volume[node] = {} - elif not isinstance(lvdata, dict): - _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)") - continue - else: - node_volume[node] = lvdata - - # node_instance - idata = nresult.get(constants.NV_INSTANCELIST, None) - test = not isinstance(idata, list) - _ErrorIf(test, self.ENODEHV, node, - "rpc call to node failed (instancelist)") - if test: - continue - - node_instance[node] = idata - # node_info - nodeinfo = nresult.get(constants.NV_HVINFO, None) - test = not isinstance(nodeinfo, dict) - _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)") - if test: - continue - - # Node time - ntime = nresult.get(constants.NV_TIME, None) - try: - ntime_merged = utils.MergeTime(ntime) - except (ValueError, TypeError): - _ErrorIf(True, self.ENODETIME, node, "Node returned invalid time") - - if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW): - ntime_diff = abs(nvinfo_starttime - ntime_merged) - elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW): - ntime_diff = abs(ntime_merged - nvinfo_endtime) - else: - ntime_diff = None - - _ErrorIf(ntime_diff is not None, self.ENODETIME, node, - "Node time diverges by at least %0.1fs from master node time", - ntime_diff) - - if ntime_diff is not None: - continue - - try: - node_info[node] = { - "mfree": int(nodeinfo['memory_free']), - "pinst": [], - "sinst": [], - # dictionary holding all instances this node is secondary for, - # grouped by their primary node. Each key is a cluster node, and each - # value is a list of instances which have the key as primary and the - # current node as secondary. this is handy to calculate N+1 memory - # availability if you can only failover from a primary to its - # secondary. - "sinst-by-pnode": {}, - } - # FIXME: devise a free space model for file based instances as well - if vg_name is not None: - test = (constants.NV_VGLIST not in nresult or - vg_name not in nresult[constants.NV_VGLIST]) - _ErrorIf(test, self.ENODELVM, node, - "node didn't return data for the volume group '%s'" - " - it is either missing or broken", vg_name) - if test: - continue - node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name]) - except (ValueError, KeyError): - _ErrorIf(True, self.ENODERPC, node, - "node returned invalid nodeinfo, check lvm/hypervisor") - continue + nimg.call_ok = self._VerifyNode(node_i, nresult) + self._VerifyNodeNetwork(node_i, nresult) + self._VerifyNodeLVM(node_i, nresult, vg_name) + self._VerifyNodeFiles(node_i, nresult, file_names, local_checksums, + master_files) + self._VerifyNodeDrbd(node_i, nresult, instanceinfo, all_drbd_map) + self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime) - node_vol_should = {} + self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name) + self._UpdateNodeInstances(node_i, nresult, nimg) + self._UpdateNodeInfo(node_i, nresult, nimg, vg_name) feedback_fn("* Verifying instance status") for instance in instancelist: if verbose: feedback_fn("* Verifying instance %s" % instance) inst_config = instanceinfo[instance] - self._VerifyInstance(instance, inst_config, node_volume, - node_instance, n_offline) + self._VerifyInstance(instance, inst_config, node_image) inst_nodes_offline = [] - inst_config.MapLVsByNode(node_vol_should) - - instance_cfg[instance] = inst_config - pnode = inst_config.primary_node - _ErrorIf(pnode not in node_info and pnode not in n_offline, + pnode_img = node_image[pnode] + _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline, self.ENODERPC, pnode, "instance %s, connection to" " primary node failed", instance) - if pnode in node_info: - node_info[pnode]['pinst'].append(instance) - if pnode in n_offline: + if pnode_img.offline: inst_nodes_offline.append(pnode) # If the instance is non-redundant we cannot survive losing its primary @@ -1531,44 +1810,42 @@ class LUVerifyCluster(LogicalUnit): # templates with more than one secondary so that situation is not well # supported either. # FIXME: does not support file-backed instances - if len(inst_config.secondary_nodes) == 0: + if not inst_config.secondary_nodes: i_non_redundant.append(instance) - _ErrorIf(len(inst_config.secondary_nodes) > 1, - self.EINSTANCELAYOUT, instance, - "instance has multiple secondary nodes", code="WARNING") + _ErrorIf(len(inst_config.secondary_nodes) > 1, self.EINSTANCELAYOUT, + instance, "instance has multiple secondary nodes: %s", + utils.CommaJoin(inst_config.secondary_nodes), + code=self.ETYPE_WARNING) if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]: i_non_a_balanced.append(instance) for snode in inst_config.secondary_nodes: - _ErrorIf(snode not in node_info and snode not in n_offline, - self.ENODERPC, snode, - "instance %s, connection to secondary node" - "failed", instance) - - if snode in node_info: - node_info[snode]['sinst'].append(instance) - if pnode not in node_info[snode]['sinst-by-pnode']: - node_info[snode]['sinst-by-pnode'][pnode] = [] - node_info[snode]['sinst-by-pnode'][pnode].append(instance) - - if snode in n_offline: + s_img = node_image[snode] + _ErrorIf(s_img.rpc_fail and not s_img.offline, self.ENODERPC, snode, + "instance %s, connection to secondary node failed", instance) + + if s_img.offline: inst_nodes_offline.append(snode) # warn that the instance lives on offline nodes _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance, "instance lives on offline node(s) %s", utils.CommaJoin(inst_nodes_offline)) + # ... or ghost nodes + for node in inst_config.all_nodes: + _ErrorIf(node_image[node].ghost, self.EINSTANCEBADNODE, instance, + "instance lives on ghost node %s", node) feedback_fn("* Verifying orphan volumes") - self._VerifyOrphanVolumes(node_vol_should, node_volume) + self._VerifyOrphanVolumes(node_vol_should, node_image) - feedback_fn("* Verifying remaining instances") - self._VerifyOrphanInstances(instancelist, node_instance) + feedback_fn("* Verifying oprhan instances") + self._VerifyOrphanInstances(instancelist, node_image) if constants.VERIFY_NPLUSONE_MEM not in self.skip_set: feedback_fn("* Verifying N+1 Memory redundancy") - self._VerifyNPlusOneMemory(node_info, instance_cfg) + self._VerifyNPlusOneMemory(node_image, instanceinfo) feedback_fn("* Other Notes") if i_non_redundant: @@ -1580,10 +1857,10 @@ class LUVerifyCluster(LogicalUnit): % len(i_non_a_balanced)) if n_offline: - feedback_fn(" - NOTICE: %d offline node(s) found." % len(n_offline)) + feedback_fn(" - NOTICE: %d offline node(s) found." % n_offline) if n_drained: - feedback_fn(" - NOTICE: %d drained node(s) found." % len(n_drained)) + feedback_fn(" - NOTICE: %d drained node(s) found." % n_drained) return not self.bad @@ -1950,8 +2227,11 @@ class LUSetClusterParams(LogicalUnit): """Check parameters """ - if not hasattr(self.op, "candidate_pool_size"): - self.op.candidate_pool_size = None + for attr in ["candidate_pool_size", + "uid_pool", "add_uids", "remove_uids"]: + if not hasattr(self.op, attr): + setattr(self.op, attr, None) + if self.op.candidate_pool_size is not None: try: self.op.candidate_pool_size = int(self.op.candidate_pool_size) @@ -1962,6 +2242,17 @@ class LUSetClusterParams(LogicalUnit): raise errors.OpPrereqError("At least one master candidate needed", errors.ECODE_INVAL) + _CheckBooleanOpField(self.op, "maintain_node_health") + + if self.op.uid_pool: + uidpool.CheckUidPool(self.op.uid_pool) + + if self.op.add_uids: + uidpool.CheckUidPool(self.op.add_uids) + + if self.op.remove_uids: + uidpool.CheckUidPool(self.op.remove_uids) + def ExpandNames(self): # FIXME: in the future maybe other cluster params won't require checking on # all nodes to be modified. @@ -2053,7 +2344,7 @@ class LUSetClusterParams(LogicalUnit): "\n".join(nic_errors)) # hypervisor list/parameters - self.new_hvparams = objects.FillDict(cluster.hvparams, {}) + self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {}) if self.op.hvparams: if not isinstance(self.op.hvparams, dict): raise errors.OpPrereqError("Invalid 'hvparams' parameter on input", @@ -2083,6 +2374,7 @@ class LUSetClusterParams(LogicalUnit): else: self.new_os_hvp[os_name][hv_name].update(hv_dict) + # changes to the hypervisor list if self.op.enabled_hypervisors is not None: self.hv_list = self.op.enabled_hypervisors if not self.hv_list: @@ -2095,6 +2387,16 @@ class LUSetClusterParams(LogicalUnit): " entries: %s" % utils.CommaJoin(invalid_hvs), errors.ECODE_INVAL) + for hv in self.hv_list: + # if the hypervisor doesn't already exist in the cluster + # hvparams, we initialize it to empty, and then (in both + # cases) we make sure to fill the defaults, as we might not + # have a complete defaults list if the hypervisor wasn't + # enabled before + if hv not in new_hvp: + new_hvp[hv] = {} + new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv]) + utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES) else: self.hv_list = cluster.enabled_hypervisors @@ -2110,6 +2412,20 @@ class LUSetClusterParams(LogicalUnit): hv_class.CheckParameterSyntax(hv_params) _CheckHVParams(self, node_list, hv_name, hv_params) + if self.op.os_hvp: + # no need to check any newly-enabled hypervisors, since the + # defaults have already been checked in the above code-block + for os_name, os_hvp in self.new_os_hvp.items(): + for hv_name, hv_params in os_hvp.items(): + utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES) + # we need to fill in the new os_hvp on top of the actual hv_p + cluster_defaults = self.new_hvparams.get(hv_name, {}) + new_osp = objects.FillDict(cluster_defaults, hv_params) + hv_class = hypervisor.GetHypervisor(hv_name) + hv_class.CheckParameterSyntax(new_osp) + _CheckHVParams(self, node_list, hv_name, new_osp) + + def Exec(self, feedback_fn): """Change the parameters of the cluster. @@ -2128,6 +2444,7 @@ class LUSetClusterParams(LogicalUnit): if self.op.os_hvp: self.cluster.os_hvp = self.new_os_hvp if self.op.enabled_hypervisors is not None: + self.cluster.hvparams = self.new_hvparams self.cluster.enabled_hypervisors = self.op.enabled_hypervisors if self.op.beparams: self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams @@ -2139,6 +2456,18 @@ class LUSetClusterParams(LogicalUnit): # we need to update the pool size here, otherwise the save will fail _AdjustCandidatePool(self, []) + if self.op.maintain_node_health is not None: + self.cluster.maintain_node_health = self.op.maintain_node_health + + if self.op.add_uids is not None: + uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids) + + if self.op.remove_uids is not None: + uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids) + + if self.op.uid_pool is not None: + self.cluster.uid_pool = self.op.uid_pool + self.cfg.Update(self.cluster, feedback_fn) @@ -2166,7 +2495,7 @@ def _RedistributeAncillaryFiles(lu, additional_nodes=None): constants.SSH_KNOWN_HOSTS_FILE, constants.RAPI_CERT_FILE, constants.RAPI_USERS_FILE, - constants.HMAC_CLUSTER_KEY, + constants.CONFD_HMAC_KEY, ]) enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors @@ -2523,6 +2852,12 @@ class LURemoveNode(LogicalUnit): self.LogWarning("Errors encountered on the remote node while leaving" " the cluster: %s", msg) + # Remove node from our /etc/hosts + if self.cfg.GetClusterInfo().modify_etc_hosts: + # FIXME: this should be done via an rpc call to node daemon + utils.RemoveHostFromEtcHosts(node.name) + _RedistributeAncillaryFiles(self) + class LUQueryNodes(NoHooksLU): """Logical unit for querying nodes. @@ -2779,17 +3114,14 @@ class LUQueryNodeStorage(NoHooksLU): REQ_BGL = False _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE) - def ExpandNames(self): - storage_type = self.op.storage_type - - if storage_type not in constants.VALID_STORAGE_TYPES: - raise errors.OpPrereqError("Unknown storage type: %s" % storage_type, - errors.ECODE_INVAL) + def CheckArguments(self): + _CheckStorageType(self.op.storage_type) _CheckOutputFields(static=self._FIELDS_STATIC, dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS), selected=self.op.output_fields) + def ExpandNames(self): self.needed_locks = {} self.share_locks[locking.LEVEL_NODE] = 1 @@ -2878,10 +3210,7 @@ class LUModifyNodeStorage(NoHooksLU): def CheckArguments(self): self.opnode_name = _ExpandNodeName(self.cfg, self.op.node_name) - storage_type = self.op.storage_type - if storage_type not in constants.VALID_STORAGE_TYPES: - raise errors.OpPrereqError("Unknown storage type: %s" % storage_type, - errors.ECODE_INVAL) + _CheckStorageType(self.op.storage_type) def ExpandNames(self): self.needed_locks = { @@ -2982,15 +3311,19 @@ class LUAddNode(LogicalUnit): raise errors.OpPrereqError("Node %s is not in the configuration" % node, errors.ECODE_NOENT) + self.changed_primary_ip = False + for existing_node_name in node_list: existing_node = cfg.GetNodeInfo(existing_node_name) if self.op.readd and node == existing_node_name: - if (existing_node.primary_ip != primary_ip or - existing_node.secondary_ip != secondary_ip): + if existing_node.secondary_ip != secondary_ip: raise errors.OpPrereqError("Readded node doesn't have the same IP" " address configuration as before", errors.ECODE_INVAL) + if existing_node.primary_ip != primary_ip: + self.changed_primary_ip = True + continue if (existing_node.primary_ip == primary_ip or @@ -3062,6 +3395,8 @@ class LUAddNode(LogicalUnit): self.LogInfo("Readding a node, the offline/drained flags were reset") # if we demote the node, we do cleanup later in the procedure new_node.master_candidate = self.master_candidate + if self.changed_primary_ip: + new_node.primary_ip = self.op.primary_ip # notify the user about any possible mc promotion if new_node.master_candidate: @@ -3097,6 +3432,7 @@ class LUAddNode(LogicalUnit): # Add node to our /etc/hosts, and add key to known_hosts if self.cfg.GetClusterInfo().modify_etc_hosts: + # FIXME: this should be done via an rpc call to node daemon utils.AddHostToEtcHosts(new_node.name) if new_node.secondary_ip != new_node.primary_ip: @@ -3225,7 +3561,7 @@ class LUSetNodeParams(LogicalUnit): # candidates (mc_remaining, mc_should, _) = \ self.cfg.GetMasterCandidateStats(exceptions=[node.name]) - if mc_remaining != mc_should: + if mc_remaining < mc_should: raise errors.OpPrereqError("Not enough master candidates, please" " pass auto_promote to allow promotion", errors.ECODE_INVAL) @@ -3398,10 +3734,12 @@ class LUQueryClusterInfo(NoHooksLU): "master_netdev": cluster.master_netdev, "volume_group_name": cluster.volume_group_name, "file_storage_dir": cluster.file_storage_dir, + "maintain_node_health": cluster.maintain_node_health, "ctime": cluster.ctime, "mtime": cluster.mtime, "uuid": cluster.uuid, "tags": list(cluster.GetTags()), + "uid_pool": cluster.uid_pool, } return result @@ -3632,14 +3970,7 @@ def _SafeShutdownInstanceDisks(lu, instance): _ShutdownInstanceDisks. """ - pnode = instance.primary_node - ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode] - ins_l.Raise("Can't contact node %s" % pnode) - - if instance.name in ins_l.payload: - raise errors.OpExecError("Instance is running, can't shutdown" - " block devices.") - + _CheckInstanceDown(lu, instance, "cannot shutdown disks") _ShutdownInstanceDisks(lu, instance) @@ -3703,14 +4034,50 @@ def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name): errors.ECODE_NORES) -class LUStartupInstance(LogicalUnit): - """Starts an instance. +def _CheckNodesFreeDisk(lu, nodenames, requested): + """Checks if nodes have enough free disk space in the default VG. - """ - HPATH = "instance-start" - HTYPE = constants.HTYPE_INSTANCE - _OP_REQP = ["instance_name", "force"] - REQ_BGL = False + This function check if all given nodes have the needed amount of + free disk. In case any node has less disk or we cannot get the + information from the node, this function raise an OpPrereqError + exception. + + @type lu: C{LogicalUnit} + @param lu: a logical unit from which we get configuration data + @type nodenames: C{list} + @param nodenames: the list of node names to check + @type requested: C{int} + @param requested: the amount of disk in MiB to check for + @raise errors.OpPrereqError: if the node doesn't have enough disk, or + we cannot check the node + + """ + nodeinfo = lu.rpc.call_node_info(nodenames, lu.cfg.GetVGName(), + lu.cfg.GetHypervisorType()) + for node in nodenames: + info = nodeinfo[node] + info.Raise("Cannot get current information from node %s" % node, + prereq=True, ecode=errors.ECODE_ENVIRON) + vg_free = info.payload.get("vg_free", None) + if not isinstance(vg_free, int): + raise errors.OpPrereqError("Can't compute free disk space on node %s," + " result was '%s'" % (node, vg_free), + errors.ECODE_ENVIRON) + if requested > vg_free: + raise errors.OpPrereqError("Not enough disk space on target node %s:" + " required %d MiB, available %d MiB" % + (node, requested, vg_free), + errors.ECODE_NORES) + + +class LUStartupInstance(LogicalUnit): + """Starts an instance. + + """ + HPATH = "instance-start" + HTYPE = constants.HTYPE_INSTANCE + _OP_REQP = ["instance_name", "force"] + REQ_BGL = False def ExpandNames(self): self._ExpandAndLockInstance() @@ -3989,32 +4356,14 @@ class LUReinstallInstance(LogicalUnit): raise errors.OpPrereqError("Instance '%s' has no disks" % self.op.instance_name, errors.ECODE_INVAL) - if instance.admin_up: - raise errors.OpPrereqError("Instance '%s' is marked to be up" % - self.op.instance_name, - errors.ECODE_STATE) - remote_info = self.rpc.call_instance_info(instance.primary_node, - instance.name, - instance.hypervisor) - remote_info.Raise("Error checking node %s" % instance.primary_node, - prereq=True, ecode=errors.ECODE_ENVIRON) - if remote_info.payload: - raise errors.OpPrereqError("Instance '%s' is running on the node %s" % - (self.op.instance_name, - instance.primary_node), - errors.ECODE_STATE) + _CheckInstanceDown(self, instance, "cannot reinstall") self.op.os_type = getattr(self.op, "os_type", None) self.op.force_variant = getattr(self.op, "force_variant", False) if self.op.os_type is not None: # OS verification pnode = _ExpandNodeName(self.cfg, instance.primary_node) - result = self.rpc.call_os_get(pnode, self.op.os_type) - result.Raise("OS '%s' not in supported OS list for primary node %s" % - (self.op.os_type, pnode), - prereq=True, ecode=errors.ECODE_INVAL) - if not self.op.force_variant: - _CheckOSVariant(result.payload, self.op.os_type) + _CheckNodeHasOS(self, pnode, self.op.os_type, self.op.force_variant) self.instance = instance @@ -4089,18 +4438,7 @@ class LURecreateInstanceDisks(LogicalUnit): if instance.disk_template == constants.DT_DISKLESS: raise errors.OpPrereqError("Instance '%s' has no disks" % self.op.instance_name, errors.ECODE_INVAL) - if instance.admin_up: - raise errors.OpPrereqError("Instance '%s' is marked to be up" % - self.op.instance_name, errors.ECODE_STATE) - remote_info = self.rpc.call_instance_info(instance.primary_node, - instance.name, - instance.hypervisor) - remote_info.Raise("Error checking node %s" % instance.primary_node, - prereq=True, ecode=errors.ECODE_ENVIRON) - if remote_info.payload: - raise errors.OpPrereqError("Instance '%s' is running on the node %s" % - (self.op.instance_name, - instance.primary_node), errors.ECODE_STATE) + _CheckInstanceDown(self, instance, "cannot recreate disks") if not self.op.disks: self.op.disks = range(len(instance.disks)) @@ -4155,19 +4493,7 @@ class LURenameInstance(LogicalUnit): instance = self.cfg.GetInstanceInfo(self.op.instance_name) assert instance is not None _CheckNodeOnline(self, instance.primary_node) - - if instance.admin_up: - raise errors.OpPrereqError("Instance '%s' is marked to be up" % - self.op.instance_name, errors.ECODE_STATE) - remote_info = self.rpc.call_instance_info(instance.primary_node, - instance.name, - instance.hypervisor) - remote_info.Raise("Error checking node %s" % instance.primary_node, - prereq=True, ecode=errors.ECODE_ENVIRON) - if remote_info.payload: - raise errors.OpPrereqError("Instance '%s' is running on the node %s" % - (self.op.instance_name, - instance.primary_node), errors.ECODE_STATE) + _CheckInstanceDown(self, instance, "cannot rename") self.instance = instance # new name verification @@ -4294,18 +4620,29 @@ class LURemoveInstance(LogicalUnit): " node %s: %s" % (instance.name, instance.primary_node, msg)) - logging.info("Removing block devices for instance %s", instance.name) + _RemoveInstance(self, feedback_fn, instance, self.op.ignore_failures) - if not _RemoveDisks(self, instance): - if self.op.ignore_failures: - feedback_fn("Warning: can't remove instance's disks") - else: - raise errors.OpExecError("Can't remove instance's disks") - logging.info("Removing instance %s out of cluster config", instance.name) +def _RemoveInstance(lu, feedback_fn, instance, ignore_failures): + """Utility function to remove an instance. + + """ + logging.info("Removing block devices for instance %s", instance.name) + + if not _RemoveDisks(lu, instance): + if not ignore_failures: + raise errors.OpExecError("Can't remove instance's disks") + feedback_fn("Warning: can't remove instance's disks") + + logging.info("Removing instance %s out of cluster config", instance.name) - self.cfg.RemoveInstance(instance.name) - self.remove_locks[locking.LEVEL_INSTANCE] = instance.name + lu.cfg.RemoveInstance(instance.name) + + assert not lu.remove_locks.get(locking.LEVEL_INSTANCE), \ + "Instance lock removal conflict" + + # Remove lock for the instance + lu.remove_locks[locking.LEVEL_INSTANCE] = instance.name class LUQueryInstances(NoHooksLU): @@ -5519,6 +5856,8 @@ def _GenerateDiskTemplate(lu, template_name, if len(secondary_nodes) != 0: raise errors.ProgrammerError("Wrong template configuration") + _RequireFileStorage() + for idx, disk in enumerate(disk_info): disk_index = idx + base_index disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"], @@ -5687,7 +6026,7 @@ class LUCreateInstance(LogicalUnit): """ HPATH = "instance-add" HTYPE = constants.HTYPE_INSTANCE - _OP_REQP = ["instance_name", "disks", "disk_template", + _OP_REQP = ["instance_name", "disks", "mode", "start", "wait_for_sync", "ip_check", "nics", "hvparams", "beparams"] @@ -5697,35 +6036,50 @@ class LUCreateInstance(LogicalUnit): """Check arguments. """ + # set optional parameters to none if they don't exist + for attr in ["pnode", "snode", "iallocator", "hypervisor", + "disk_template", "identify_defaults"]: + if not hasattr(self.op, attr): + setattr(self.op, attr, None) + # do not require name_check to ease forward/backward compatibility # for tools if not hasattr(self.op, "name_check"): self.op.name_check = True + if not hasattr(self.op, "no_install"): + self.op.no_install = False + if self.op.no_install and self.op.start: + self.LogInfo("No-installation mode selected, disabling startup") + self.op.start = False # validate/normalize the instance name self.op.instance_name = utils.HostInfo.NormalizeName(self.op.instance_name) if self.op.ip_check and not self.op.name_check: # TODO: make the ip check more flexible and not depend on the name check raise errors.OpPrereqError("Cannot do ip checks without a name check", errors.ECODE_INVAL) - if (self.op.disk_template == constants.DT_FILE and - not constants.ENABLE_FILE_STORAGE): - raise errors.OpPrereqError("File storage disabled at configure time", + # check disk information: either all adopt, or no adopt + has_adopt = has_no_adopt = False + for disk in self.op.disks: + if "adopt" in disk: + has_adopt = True + else: + has_no_adopt = True + if has_adopt and has_no_adopt: + raise errors.OpPrereqError("Either all disks are adopted or none is", errors.ECODE_INVAL) + if has_adopt: + if self.op.disk_template != constants.DT_PLAIN: + raise errors.OpPrereqError("Disk adoption is only supported for the" + " 'plain' disk template", + errors.ECODE_INVAL) + if self.op.iallocator is not None: + raise errors.OpPrereqError("Disk adoption not allowed with an" + " iallocator script", errors.ECODE_INVAL) + if self.op.mode == constants.INSTANCE_IMPORT: + raise errors.OpPrereqError("Disk adoption not allowed for" + " instance import", errors.ECODE_INVAL) - def ExpandNames(self): - """ExpandNames for CreateInstance. - - Figure out the right locks for instance creation. - - """ - self.needed_locks = {} - - # set optional parameters to none if they don't exist - for attr in ["pnode", "snode", "iallocator", "hypervisor"]: - if not hasattr(self.op, attr): - setattr(self.op, attr, None) - - # cheap checks, mostly valid constants given + self.adopt_disks = has_adopt # verify creation mode if self.op.mode not in (constants.INSTANCE_CREATE, @@ -5733,145 +6087,15 @@ class LUCreateInstance(LogicalUnit): raise errors.OpPrereqError("Invalid instance creation mode '%s'" % self.op.mode, errors.ECODE_INVAL) - # disk template and mirror node verification - if self.op.disk_template not in constants.DISK_TEMPLATES: - raise errors.OpPrereqError("Invalid disk template name", - errors.ECODE_INVAL) - - if self.op.hypervisor is None: - self.op.hypervisor = self.cfg.GetHypervisorType() - - cluster = self.cfg.GetClusterInfo() - enabled_hvs = cluster.enabled_hypervisors - if self.op.hypervisor not in enabled_hvs: - raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the" - " cluster (%s)" % (self.op.hypervisor, - ",".join(enabled_hvs)), - errors.ECODE_STATE) - - # check hypervisor parameter syntax (locally) - utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES) - filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor], - self.op.hvparams) - hv_type = hypervisor.GetHypervisor(self.op.hypervisor) - hv_type.CheckParameterSyntax(filled_hvp) - self.hv_full = filled_hvp - # check that we don't specify global parameters on an instance - _CheckGlobalHvParams(self.op.hvparams) - - # fill and remember the beparams dict - utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES) - self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT], - self.op.beparams) - - #### instance parameters check - # instance name verification if self.op.name_check: - hostname1 = utils.GetHostInfo(self.op.instance_name) - self.op.instance_name = instance_name = hostname1.name + self.hostname1 = utils.GetHostInfo(self.op.instance_name) + self.op.instance_name = self.hostname1.name # used in CheckPrereq for ip ping check - self.check_ip = hostname1.ip + self.check_ip = self.hostname1.ip else: - instance_name = self.op.instance_name self.check_ip = None - # this is just a preventive check, but someone might still add this - # instance in the meantime, and creation will fail at lock-add time - if instance_name in self.cfg.GetInstanceList(): - raise errors.OpPrereqError("Instance '%s' is already in the cluster" % - instance_name, errors.ECODE_EXISTS) - - self.add_locks[locking.LEVEL_INSTANCE] = instance_name - - # NIC buildup - self.nics = [] - for idx, nic in enumerate(self.op.nics): - nic_mode_req = nic.get("mode", None) - nic_mode = nic_mode_req - if nic_mode is None: - nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE] - - # in routed mode, for the first nic, the default ip is 'auto' - if nic_mode == constants.NIC_MODE_ROUTED and idx == 0: - default_ip_mode = constants.VALUE_AUTO - else: - default_ip_mode = constants.VALUE_NONE - - # ip validity checks - ip = nic.get("ip", default_ip_mode) - if ip is None or ip.lower() == constants.VALUE_NONE: - nic_ip = None - elif ip.lower() == constants.VALUE_AUTO: - if not self.op.name_check: - raise errors.OpPrereqError("IP address set to auto but name checks" - " have been skipped. Aborting.", - errors.ECODE_INVAL) - nic_ip = hostname1.ip - else: - if not utils.IsValidIP(ip): - raise errors.OpPrereqError("Given IP address '%s' doesn't look" - " like a valid IP" % ip, - errors.ECODE_INVAL) - nic_ip = ip - - # TODO: check the ip address for uniqueness - if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip: - raise errors.OpPrereqError("Routed nic mode requires an ip address", - errors.ECODE_INVAL) - - # MAC address verification - mac = nic.get("mac", constants.VALUE_AUTO) - if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE): - mac = utils.NormalizeAndValidateMac(mac) - - try: - self.cfg.ReserveMAC(mac, self.proc.GetECId()) - except errors.ReservationError: - raise errors.OpPrereqError("MAC address %s already in use" - " in cluster" % mac, - errors.ECODE_NOTUNIQUE) - - # bridge verification - bridge = nic.get("bridge", None) - link = nic.get("link", None) - if bridge and link: - raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'" - " at the same time", errors.ECODE_INVAL) - elif bridge and nic_mode == constants.NIC_MODE_ROUTED: - raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic", - errors.ECODE_INVAL) - elif bridge: - link = bridge - - nicparams = {} - if nic_mode_req: - nicparams[constants.NIC_MODE] = nic_mode_req - if link: - nicparams[constants.NIC_LINK] = link - - check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT], - nicparams) - objects.NIC.CheckParameterSyntax(check_params) - self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams)) - - # disk checks/pre-build - self.disks = [] - for disk in self.op.disks: - mode = disk.get("mode", constants.DISK_RDWR) - if mode not in constants.DISK_ACCESS_SET: - raise errors.OpPrereqError("Invalid disk access mode '%s'" % - mode, errors.ECODE_INVAL) - size = disk.get("size", None) - if size is None: - raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL) - try: - size = int(size) - except (TypeError, ValueError): - raise errors.OpPrereqError("Invalid disk size '%s'" % size, - errors.ECODE_INVAL) - self.disks.append({"size": size, "mode": mode}) - # file storage checks if (self.op.file_driver and not self.op.file_driver in constants.FILE_DRIVER): @@ -5888,6 +6112,41 @@ class LUCreateInstance(LogicalUnit): " node must be given", errors.ECODE_INVAL) + if self.op.mode == constants.INSTANCE_IMPORT: + # On import force_variant must be True, because if we forced it at + # initial install, our only chance when importing it back is that it + # works again! + self.op.force_variant = True + + if self.op.no_install: + self.LogInfo("No-installation mode has no effect during import") + + else: # INSTANCE_CREATE + if getattr(self.op, "os_type", None) is None: + raise errors.OpPrereqError("No guest OS specified", + errors.ECODE_INVAL) + self.op.force_variant = getattr(self.op, "force_variant", False) + if self.op.disk_template is None: + raise errors.OpPrereqError("No disk template specified", + errors.ECODE_INVAL) + + def ExpandNames(self): + """ExpandNames for CreateInstance. + + Figure out the right locks for instance creation. + + """ + self.needed_locks = {} + + instance_name = self.op.instance_name + # this is just a preventive check, but someone might still add this + # instance in the meantime, and creation will fail at lock-add time + if instance_name in self.cfg.GetInstanceList(): + raise errors.OpPrereqError("Instance '%s' is already in the cluster" % + instance_name, errors.ECODE_EXISTS) + + self.add_locks[locking.LEVEL_INSTANCE] = instance_name + if self.op.iallocator: self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET else: @@ -5921,17 +6180,6 @@ class LUCreateInstance(LogicalUnit): self.op.src_path = src_path = \ utils.PathJoin(constants.EXPORT_DIR, src_path) - # On import force_variant must be True, because if we forced it at - # initial install, our only chance when importing it back is that it - # works again! - self.op.force_variant = True - - else: # INSTANCE_CREATE - if getattr(self.op, "os_type", None) is None: - raise errors.OpPrereqError("No guest OS specified", - errors.ECODE_INVAL) - self.op.force_variant = getattr(self.op, "force_variant", False) - def _RunAllocator(self): """Run the allocator based on input opcode. @@ -5972,82 +6220,309 @@ class LUCreateInstance(LogicalUnit): def BuildHooksEnv(self): """Build hooks env. - This runs on master, primary and secondary nodes of the instance. + This runs on master, primary and secondary nodes of the instance. + + """ + env = { + "ADD_MODE": self.op.mode, + } + if self.op.mode == constants.INSTANCE_IMPORT: + env["SRC_NODE"] = self.op.src_node + env["SRC_PATH"] = self.op.src_path + env["SRC_IMAGES"] = self.src_images + + env.update(_BuildInstanceHookEnv( + name=self.op.instance_name, + primary_node=self.op.pnode, + secondary_nodes=self.secondaries, + status=self.op.start, + os_type=self.op.os_type, + memory=self.be_full[constants.BE_MEMORY], + vcpus=self.be_full[constants.BE_VCPUS], + nics=_NICListToTuple(self, self.nics), + disk_template=self.op.disk_template, + disks=[(d["size"], d["mode"]) for d in self.disks], + bep=self.be_full, + hvp=self.hv_full, + hypervisor_name=self.op.hypervisor, + )) + + nl = ([self.cfg.GetMasterNode(), self.op.pnode] + + self.secondaries) + return env, nl, nl + + def _ReadExportInfo(self): + """Reads the export information from disk. + + It will override the opcode source node and path with the actual + information, if these two were not specified before. + + @return: the export information + + """ + assert self.op.mode == constants.INSTANCE_IMPORT + + src_node = self.op.src_node + src_path = self.op.src_path + + if src_node is None: + locked_nodes = self.acquired_locks[locking.LEVEL_NODE] + exp_list = self.rpc.call_export_list(locked_nodes) + found = False + for node in exp_list: + if exp_list[node].fail_msg: + continue + if src_path in exp_list[node].payload: + found = True + self.op.src_node = src_node = node + self.op.src_path = src_path = utils.PathJoin(constants.EXPORT_DIR, + src_path) + break + if not found: + raise errors.OpPrereqError("No export found for relative path %s" % + src_path, errors.ECODE_INVAL) + + _CheckNodeOnline(self, src_node) + result = self.rpc.call_export_info(src_node, src_path) + result.Raise("No export or invalid export found in dir %s" % src_path) + + export_info = objects.SerializableConfigParser.Loads(str(result.payload)) + if not export_info.has_section(constants.INISECT_EXP): + raise errors.ProgrammerError("Corrupted export config", + errors.ECODE_ENVIRON) + + ei_version = export_info.get(constants.INISECT_EXP, "version") + if (int(ei_version) != constants.EXPORT_VERSION): + raise errors.OpPrereqError("Wrong export version %s (wanted %d)" % + (ei_version, constants.EXPORT_VERSION), + errors.ECODE_ENVIRON) + return export_info + + def _ReadExportParams(self, einfo): + """Use export parameters as defaults. + + In case the opcode doesn't specify (as in override) some instance + parameters, then try to use them from the export information, if + that declares them. + + """ + self.op.os_type = einfo.get(constants.INISECT_EXP, "os") + + if self.op.disk_template is None: + if einfo.has_option(constants.INISECT_INS, "disk_template"): + self.op.disk_template = einfo.get(constants.INISECT_INS, + "disk_template") + else: + raise errors.OpPrereqError("No disk template specified and the export" + " is missing the disk_template information", + errors.ECODE_INVAL) + + if not self.op.disks: + if einfo.has_option(constants.INISECT_INS, "disk_count"): + disks = [] + # TODO: import the disk iv_name too + for idx in range(einfo.getint(constants.INISECT_INS, "disk_count")): + disk_sz = einfo.getint(constants.INISECT_INS, "disk%d_size" % idx) + disks.append({"size": disk_sz}) + self.op.disks = disks + else: + raise errors.OpPrereqError("No disk info specified and the export" + " is missing the disk information", + errors.ECODE_INVAL) + + if (not self.op.nics and + einfo.has_option(constants.INISECT_INS, "nic_count")): + nics = [] + for idx in range(einfo.getint(constants.INISECT_INS, "nic_count")): + ndict = {} + for name in list(constants.NICS_PARAMETERS) + ["ip", "mac"]: + v = einfo.get(constants.INISECT_INS, "nic%d_%s" % (idx, name)) + ndict[name] = v + nics.append(ndict) + self.op.nics = nics + + if (self.op.hypervisor is None and + einfo.has_option(constants.INISECT_INS, "hypervisor")): + self.op.hypervisor = einfo.get(constants.INISECT_INS, "hypervisor") + if einfo.has_section(constants.INISECT_HYP): + # use the export parameters but do not override the ones + # specified by the user + for name, value in einfo.items(constants.INISECT_HYP): + if name not in self.op.hvparams: + self.op.hvparams[name] = value + + if einfo.has_section(constants.INISECT_BEP): + # use the parameters, without overriding + for name, value in einfo.items(constants.INISECT_BEP): + if name not in self.op.beparams: + self.op.beparams[name] = value + else: + # try to read the parameters old style, from the main section + for name in constants.BES_PARAMETERS: + if (name not in self.op.beparams and + einfo.has_option(constants.INISECT_INS, name)): + self.op.beparams[name] = einfo.get(constants.INISECT_INS, name) + + def _RevertToDefaults(self, cluster): + """Revert the instance parameters to the default values. + + """ + # hvparams + hv_defs = cluster.GetHVDefaults(self.op.hypervisor, self.op.os_type) + for name in self.op.hvparams.keys(): + if name in hv_defs and hv_defs[name] == self.op.hvparams[name]: + del self.op.hvparams[name] + # beparams + be_defs = cluster.beparams.get(constants.PP_DEFAULT, {}) + for name in self.op.beparams.keys(): + if name in be_defs and be_defs[name] == self.op.beparams[name]: + del self.op.beparams[name] + # nic params + nic_defs = cluster.nicparams.get(constants.PP_DEFAULT, {}) + for nic in self.op.nics: + for name in constants.NICS_PARAMETERS: + if name in nic and name in nic_defs and nic[name] == nic_defs[name]: + del nic[name] + + def CheckPrereq(self): + """Check prerequisites. + + """ + if self.op.mode == constants.INSTANCE_IMPORT: + export_info = self._ReadExportInfo() + self._ReadExportParams(export_info) + + _CheckDiskTemplate(self.op.disk_template) + + if (not self.cfg.GetVGName() and + self.op.disk_template not in constants.DTS_NOT_LVM): + raise errors.OpPrereqError("Cluster does not support lvm-based" + " instances", errors.ECODE_STATE) + + if self.op.hypervisor is None: + self.op.hypervisor = self.cfg.GetHypervisorType() + + cluster = self.cfg.GetClusterInfo() + enabled_hvs = cluster.enabled_hypervisors + if self.op.hypervisor not in enabled_hvs: + raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the" + " cluster (%s)" % (self.op.hypervisor, + ",".join(enabled_hvs)), + errors.ECODE_STATE) + + # check hypervisor parameter syntax (locally) + utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES) + filled_hvp = objects.FillDict(cluster.GetHVDefaults(self.op.hypervisor, + self.op.os_type), + self.op.hvparams) + hv_type = hypervisor.GetHypervisor(self.op.hypervisor) + hv_type.CheckParameterSyntax(filled_hvp) + self.hv_full = filled_hvp + # check that we don't specify global parameters on an instance + _CheckGlobalHvParams(self.op.hvparams) + + # fill and remember the beparams dict + utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES) + self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT], + self.op.beparams) + + # now that hvp/bep are in final format, let's reset to defaults, + # if told to do so + if self.op.identify_defaults: + self._RevertToDefaults(cluster) + + # NIC buildup + self.nics = [] + for idx, nic in enumerate(self.op.nics): + nic_mode_req = nic.get("mode", None) + nic_mode = nic_mode_req + if nic_mode is None: + nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE] + + # in routed mode, for the first nic, the default ip is 'auto' + if nic_mode == constants.NIC_MODE_ROUTED and idx == 0: + default_ip_mode = constants.VALUE_AUTO + else: + default_ip_mode = constants.VALUE_NONE + + # ip validity checks + ip = nic.get("ip", default_ip_mode) + if ip is None or ip.lower() == constants.VALUE_NONE: + nic_ip = None + elif ip.lower() == constants.VALUE_AUTO: + if not self.op.name_check: + raise errors.OpPrereqError("IP address set to auto but name checks" + " have been skipped. Aborting.", + errors.ECODE_INVAL) + nic_ip = self.hostname1.ip + else: + if not utils.IsValidIP(ip): + raise errors.OpPrereqError("Given IP address '%s' doesn't look" + " like a valid IP" % ip, + errors.ECODE_INVAL) + nic_ip = ip + + # TODO: check the ip address for uniqueness + if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip: + raise errors.OpPrereqError("Routed nic mode requires an ip address", + errors.ECODE_INVAL) - """ - env = { - "ADD_MODE": self.op.mode, - } - if self.op.mode == constants.INSTANCE_IMPORT: - env["SRC_NODE"] = self.op.src_node - env["SRC_PATH"] = self.op.src_path - env["SRC_IMAGES"] = self.src_images + # MAC address verification + mac = nic.get("mac", constants.VALUE_AUTO) + if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE): + mac = utils.NormalizeAndValidateMac(mac) - env.update(_BuildInstanceHookEnv( - name=self.op.instance_name, - primary_node=self.op.pnode, - secondary_nodes=self.secondaries, - status=self.op.start, - os_type=self.op.os_type, - memory=self.be_full[constants.BE_MEMORY], - vcpus=self.be_full[constants.BE_VCPUS], - nics=_NICListToTuple(self, self.nics), - disk_template=self.op.disk_template, - disks=[(d["size"], d["mode"]) for d in self.disks], - bep=self.be_full, - hvp=self.hv_full, - hypervisor_name=self.op.hypervisor, - )) + try: + self.cfg.ReserveMAC(mac, self.proc.GetECId()) + except errors.ReservationError: + raise errors.OpPrereqError("MAC address %s already in use" + " in cluster" % mac, + errors.ECODE_NOTUNIQUE) - nl = ([self.cfg.GetMasterNode(), self.op.pnode] + - self.secondaries) - return env, nl, nl + # bridge verification + bridge = nic.get("bridge", None) + link = nic.get("link", None) + if bridge and link: + raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'" + " at the same time", errors.ECODE_INVAL) + elif bridge and nic_mode == constants.NIC_MODE_ROUTED: + raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic", + errors.ECODE_INVAL) + elif bridge: + link = bridge + nicparams = {} + if nic_mode_req: + nicparams[constants.NIC_MODE] = nic_mode_req + if link: + nicparams[constants.NIC_LINK] = link - def CheckPrereq(self): - """Check prerequisites. + check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT], + nicparams) + objects.NIC.CheckParameterSyntax(check_params) + self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams)) - """ - if (not self.cfg.GetVGName() and - self.op.disk_template not in constants.DTS_NOT_LVM): - raise errors.OpPrereqError("Cluster does not support lvm-based" - " instances", errors.ECODE_STATE) + # disk checks/pre-build + self.disks = [] + for disk in self.op.disks: + mode = disk.get("mode", constants.DISK_RDWR) + if mode not in constants.DISK_ACCESS_SET: + raise errors.OpPrereqError("Invalid disk access mode '%s'" % + mode, errors.ECODE_INVAL) + size = disk.get("size", None) + if size is None: + raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL) + try: + size = int(size) + except (TypeError, ValueError): + raise errors.OpPrereqError("Invalid disk size '%s'" % size, + errors.ECODE_INVAL) + new_disk = {"size": size, "mode": mode} + if "adopt" in disk: + new_disk["adopt"] = disk["adopt"] + self.disks.append(new_disk) if self.op.mode == constants.INSTANCE_IMPORT: - src_node = self.op.src_node - src_path = self.op.src_path - - if src_node is None: - locked_nodes = self.acquired_locks[locking.LEVEL_NODE] - exp_list = self.rpc.call_export_list(locked_nodes) - found = False - for node in exp_list: - if exp_list[node].fail_msg: - continue - if src_path in exp_list[node].payload: - found = True - self.op.src_node = src_node = node - self.op.src_path = src_path = utils.PathJoin(constants.EXPORT_DIR, - src_path) - break - if not found: - raise errors.OpPrereqError("No export found for relative path %s" % - src_path, errors.ECODE_INVAL) - - _CheckNodeOnline(self, src_node) - result = self.rpc.call_export_info(src_node, src_path) - result.Raise("No export or invalid export found in dir %s" % src_path) - - export_info = objects.SerializableConfigParser.Loads(str(result.payload)) - if not export_info.has_section(constants.INISECT_EXP): - raise errors.ProgrammerError("Corrupted export config", - errors.ECODE_ENVIRON) - - ei_version = export_info.get(constants.INISECT_EXP, 'version') - if (int(ei_version) != constants.EXPORT_VERSION): - raise errors.OpPrereqError("Wrong export version %s (wanted %d)" % - (ei_version, constants.EXPORT_VERSION), - errors.ECODE_ENVIRON) # Check that the new instance doesn't have less disks than the export instance_disks = len(self.disks) @@ -6058,14 +6533,13 @@ class LUCreateInstance(LogicalUnit): (instance_disks, export_disks), errors.ECODE_INVAL) - self.op.os_type = export_info.get(constants.INISECT_EXP, 'os') disk_images = [] for idx in range(export_disks): option = 'disk%d_dump' % idx if export_info.has_option(constants.INISECT_INS, option): # FIXME: are the old os-es, disk sizes, etc. useful? export_name = export_info.get(constants.INISECT_INS, option) - image = utils.PathJoin(src_path, export_name) + image = utils.PathJoin(self.op.src_path, export_name) disk_images.append(image) else: disk_images.append(False) @@ -6073,8 +6547,12 @@ class LUCreateInstance(LogicalUnit): self.src_images = disk_images old_name = export_info.get(constants.INISECT_INS, 'name') - # FIXME: int() here could throw a ValueError on broken exports - exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count')) + try: + exp_nic_count = export_info.getint(constants.INISECT_INS, 'nic_count') + except (TypeError, ValueError), err: + raise errors.OpPrereqError("Invalid export file, nic_count is not" + " an integer: %s" % str(err), + errors.ECODE_STATE) if self.op.instance_name == old_name: for idx, nic in enumerate(self.nics): if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx: @@ -6139,33 +6617,43 @@ class LUCreateInstance(LogicalUnit): req_size = _ComputeDiskSize(self.op.disk_template, self.disks) - # Check lv size requirements - if req_size is not None: - nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(), - self.op.hypervisor) - for node in nodenames: - info = nodeinfo[node] - info.Raise("Cannot get current information from node %s" % node) - info = info.payload - vg_free = info.get('vg_free', None) - if not isinstance(vg_free, int): - raise errors.OpPrereqError("Can't compute free disk space on" - " node %s" % node, errors.ECODE_ENVIRON) - if req_size > vg_free: - raise errors.OpPrereqError("Not enough disk space on target node %s." - " %d MB available, %d MB required" % - (node, vg_free, req_size), - errors.ECODE_NORES) + # Check lv size requirements, if not adopting + if req_size is not None and not self.adopt_disks: + _CheckNodesFreeDisk(self, nodenames, req_size) + + if self.adopt_disks: # instead, we must check the adoption data + all_lvs = set([i["adopt"] for i in self.disks]) + if len(all_lvs) != len(self.disks): + raise errors.OpPrereqError("Duplicate volume names given for adoption", + errors.ECODE_INVAL) + for lv_name in all_lvs: + try: + self.cfg.ReserveLV(lv_name, self.proc.GetECId()) + except errors.ReservationError: + raise errors.OpPrereqError("LV named %s used by another instance" % + lv_name, errors.ECODE_NOTUNIQUE) + + node_lvs = self.rpc.call_lv_list([pnode.name], + self.cfg.GetVGName())[pnode.name] + node_lvs.Raise("Cannot get LV information from node %s" % pnode.name) + node_lvs = node_lvs.payload + delta = all_lvs.difference(node_lvs.keys()) + if delta: + raise errors.OpPrereqError("Missing logical volume(s): %s" % + utils.CommaJoin(delta), + errors.ECODE_INVAL) + online_lvs = [lv for lv in all_lvs if node_lvs[lv][2]] + if online_lvs: + raise errors.OpPrereqError("Online logical volumes found, cannot" + " adopt: %s" % utils.CommaJoin(online_lvs), + errors.ECODE_STATE) + # update the size of disk based on what is found + for dsk in self.disks: + dsk["size"] = int(float(node_lvs[dsk["adopt"]][0])) _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams) - # os verification - result = self.rpc.call_os_get(pnode.name, self.op.os_type) - result.Raise("OS '%s' not in supported os list for primary node %s" % - (self.op.os_type, pnode.name), - prereq=True, ecode=errors.ECODE_INVAL) - if not self.op.force_variant: - _CheckOSVariant(result.payload, self.op.os_type) + _CheckNodeHasOS(self, pnode.name, self.op.os_type, self.op.force_variant) _CheckNicsBridgesExist(self, self.nics, self.pnode.name) @@ -6191,19 +6679,18 @@ class LUCreateInstance(LogicalUnit): else: network_port = None - ##if self.op.vnc_bind_address is None: - ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS + if constants.ENABLE_FILE_STORAGE: + # this is needed because os.path.join does not accept None arguments + if self.op.file_storage_dir is None: + string_file_storage_dir = "" + else: + string_file_storage_dir = self.op.file_storage_dir - # this is needed because os.path.join does not accept None arguments - if self.op.file_storage_dir is None: - string_file_storage_dir = "" + # build the full file storage dir path + file_storage_dir = utils.PathJoin(self.cfg.GetFileStorageDir(), + string_file_storage_dir, instance) else: - string_file_storage_dir = self.op.file_storage_dir - - # build the full file storage dir path - file_storage_dir = utils.PathJoin(self.cfg.GetFileStorageDir(), - string_file_storage_dir, instance) - + file_storage_dir = "" disks = _GenerateDiskTemplate(self, self.op.disk_template, @@ -6225,16 +6712,29 @@ class LUCreateInstance(LogicalUnit): hypervisor=self.op.hypervisor, ) - feedback_fn("* creating instance disks...") - try: - _CreateDisks(self, iobj) - except errors.OpExecError: - self.LogWarning("Device creation failed, reverting...") + if self.adopt_disks: + # rename LVs to the newly-generated names; we need to construct + # 'fake' LV disks with the old data, plus the new unique_id + tmp_disks = [objects.Disk.FromDict(v.ToDict()) for v in disks] + rename_to = [] + for t_dsk, a_dsk in zip (tmp_disks, self.disks): + rename_to.append(t_dsk.logical_id) + t_dsk.logical_id = (t_dsk.logical_id[0], a_dsk["adopt"]) + self.cfg.SetDiskID(t_dsk, pnode_name) + result = self.rpc.call_blockdev_rename(pnode_name, + zip(tmp_disks, rename_to)) + result.Raise("Failed to rename adoped LVs") + else: + feedback_fn("* creating instance disks...") try: - _RemoveDisks(self, iobj) - finally: - self.cfg.ReleaseDRBDMinors(instance) - raise + _CreateDisks(self, iobj) + except errors.OpExecError: + self.LogWarning("Device creation failed, reverting...") + try: + _RemoveDisks(self, iobj) + finally: + self.cfg.ReleaseDRBDMinors(instance) + raise feedback_fn("adding instance %s to cluster config" % instance) @@ -6272,32 +6772,42 @@ class LUCreateInstance(LogicalUnit): raise errors.OpExecError("There are some degraded disks for" " this instance") - feedback_fn("creating os for instance %s on node %s" % - (instance, pnode_name)) - - if iobj.disk_template != constants.DT_DISKLESS: + if iobj.disk_template != constants.DT_DISKLESS and not self.adopt_disks: if self.op.mode == constants.INSTANCE_CREATE: - feedback_fn("* running the instance OS create scripts...") - # FIXME: pass debug option from opcode to backend - result = self.rpc.call_instance_os_add(pnode_name, iobj, False, - self.op.debug_level) - result.Raise("Could not add os for instance %s" - " on node %s" % (instance, pnode_name)) + if not self.op.no_install: + feedback_fn("* running the instance OS create scripts...") + # FIXME: pass debug option from opcode to backend + result = self.rpc.call_instance_os_add(pnode_name, iobj, False, + self.op.debug_level) + result.Raise("Could not add os for instance %s" + " on node %s" % (instance, pnode_name)) elif self.op.mode == constants.INSTANCE_IMPORT: feedback_fn("* running the instance OS import scripts...") - src_node = self.op.src_node - src_images = self.src_images - cluster_name = self.cfg.GetClusterName() - # FIXME: pass debug option from opcode to backend - import_result = self.rpc.call_instance_os_import(pnode_name, iobj, - src_node, src_images, - cluster_name, - self.op.debug_level) - msg = import_result.fail_msg - if msg: - self.LogWarning("Error while importing the disk images for instance" - " %s on node %s: %s" % (instance, pnode_name, msg)) + + transfers = [] + + for idx, image in enumerate(self.src_images): + if not image: + continue + + # FIXME: pass debug option from opcode to backend + dt = masterd.instance.DiskTransfer("disk/%s" % idx, + constants.IEIO_FILE, (image, ), + constants.IEIO_SCRIPT, + (iobj.disks[idx], idx), + None) + transfers.append(dt) + + import_result = \ + masterd.instance.TransferInstanceData(self, feedback_fn, + self.op.src_node, pnode_name, + self.pnode.secondary_ip, + iobj, transfers) + if not compat.all(import_result): + self.LogWarning("Some disks for instance %s on node %s were not" + " imported successfully" % (instance, pnode_name)) + else: # also checked in the prereq part raise errors.ProgrammerError("Unknown OS initialization mode '%s'" @@ -7211,6 +7721,8 @@ class LURepairNodeStorage(NoHooksLU): def CheckArguments(self): self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name) + _CheckStorageType(self.op.storage_type) + def ExpandNames(self): self.needed_locks = { locking.LEVEL_NODE: [self.op.node_name], @@ -7366,26 +7878,16 @@ class LUGrowDisk(LogicalUnit): self.instance = instance - if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8): + if instance.disk_template not in constants.DTS_GROWABLE: raise errors.OpPrereqError("Instance's disk layout does not support" " growing.", errors.ECODE_INVAL) self.disk = instance.FindDisk(self.op.disk) - nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(), - instance.hypervisor) - for node in nodenames: - info = nodeinfo[node] - info.Raise("Cannot get current information from node %s" % node) - vg_free = info.payload.get('vg_free', None) - if not isinstance(vg_free, int): - raise errors.OpPrereqError("Can't compute free disk space on" - " node %s" % node, errors.ECODE_ENVIRON) - if self.op.amount > vg_free: - raise errors.OpPrereqError("Not enough disk space on target node %s:" - " %d MiB available, %d MiB required" % - (node, vg_free, self.op.amount), - errors.ECODE_NORES) + if instance.disk_template != constants.DT_FILE: + # TODO: check the free disk space for file, when that feature will be + # supported + _CheckNodesFreeDisk(self, nodenames, self.op.amount) def Exec(self, feedback_fn): """Execute disk grow. @@ -7589,9 +8091,17 @@ class LUSetInstanceParams(LogicalUnit): self.op.beparams = {} if not hasattr(self.op, 'hvparams'): self.op.hvparams = {} + if not hasattr(self.op, "disk_template"): + self.op.disk_template = None + if not hasattr(self.op, "remote_node"): + self.op.remote_node = None + if not hasattr(self.op, "os_name"): + self.op.os_name = None + if not hasattr(self.op, "force_variant"): + self.op.force_variant = False self.op.force = getattr(self.op, "force", False) - if not (self.op.nics or self.op.disks or - self.op.hvparams or self.op.beparams): + if not (self.op.nics or self.op.disks or self.op.disk_template or + self.op.hvparams or self.op.beparams or self.op.os_name): raise errors.OpPrereqError("No changes submitted", errors.ECODE_INVAL) if self.op.hvparams: @@ -7637,6 +8147,19 @@ class LUSetInstanceParams(LogicalUnit): raise errors.OpPrereqError("Only one disk add or remove operation" " supported at a time", errors.ECODE_INVAL) + if self.op.disks and self.op.disk_template is not None: + raise errors.OpPrereqError("Disk template conversion and other disk" + " changes not supported at the same time", + errors.ECODE_INVAL) + + if self.op.disk_template: + _CheckDiskTemplate(self.op.disk_template) + if (self.op.disk_template in constants.DTS_NET_MIRROR and + self.op.remote_node is None): + raise errors.OpPrereqError("Changing the disk template to a mirrored" + " one requires specifying a secondary node", + errors.ECODE_INVAL) + # NIC validation nic_addremove = 0 for nic_op, nic_dict in self.op.nics: @@ -7699,6 +8222,9 @@ class LUSetInstanceParams(LogicalUnit): def DeclareLocks(self, level): if level == locking.LEVEL_NODE: self._LockInstancesNodes() + if self.op.disk_template and self.op.remote_node: + self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node) + self.needed_locks[locking.LEVEL_NODE].append(self.op.remote_node) def BuildHooksEnv(self): """Build hooks env. @@ -7748,6 +8274,8 @@ class LUSetInstanceParams(LogicalUnit): del args['nics'][-1] env = _BuildInstanceHookEnvByObject(self, self.instance, override=args) + if self.op.disk_template: + env["NEW_DISK_TEMPLATE"] = self.op.disk_template nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes) return env, nl, nl @@ -7801,6 +8329,25 @@ class LUSetInstanceParams(LogicalUnit): pnode = instance.primary_node nodelist = list(instance.all_nodes) + if self.op.disk_template: + if instance.disk_template == self.op.disk_template: + raise errors.OpPrereqError("Instance already has disk template %s" % + instance.disk_template, errors.ECODE_INVAL) + + if (instance.disk_template, + self.op.disk_template) not in self._DISK_CONVERSIONS: + raise errors.OpPrereqError("Unsupported disk template conversion from" + " %s to %s" % (instance.disk_template, + self.op.disk_template), + errors.ECODE_INVAL) + if self.op.disk_template in constants.DTS_NET_MIRROR: + _CheckNodeOnline(self, self.op.remote_node) + _CheckNodeNotDrained(self, self.op.remote_node) + disks = [{"size": d.size} for d in instance.disks] + required = _ComputeDiskSize(self.op.disk_template, disks) + _CheckNodesFreeDisk(self, [self.op.remote_node], required) + _CheckInstanceDown(self, instance, "cannot change disk template") + # hvparams processing if self.op.hvparams: i_hvdict, hv_new = self._GetUpdatedParams( @@ -7966,17 +8513,8 @@ class LUSetInstanceParams(LogicalUnit): if disk_op == constants.DDM_REMOVE: if len(instance.disks) == 1: raise errors.OpPrereqError("Cannot remove the last disk of" - " an instance", - errors.ECODE_INVAL) - ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor]) - ins_l = ins_l[pnode] - msg = ins_l.fail_msg - if msg: - raise errors.OpPrereqError("Can't contact node %s: %s" % - (pnode, msg), errors.ECODE_ENVIRON) - if instance.name in ins_l.payload: - raise errors.OpPrereqError("Instance is running, can't remove" - " disks.", errors.ECODE_STATE) + " an instance", errors.ECODE_INVAL) + _CheckInstanceDown(self, instance, "cannot remove disks") if (disk_op == constants.DDM_ADD and len(instance.nics) >= constants.MAX_DISKS): @@ -7991,8 +8529,103 @@ class LUSetInstanceParams(LogicalUnit): (disk_op, len(instance.disks)), errors.ECODE_INVAL) + # OS change + if self.op.os_name and not self.op.force: + _CheckNodeHasOS(self, instance.primary_node, self.op.os_name, + self.op.force_variant) + return + def _ConvertPlainToDrbd(self, feedback_fn): + """Converts an instance from plain to drbd. + + """ + feedback_fn("Converting template to drbd") + instance = self.instance + pnode = instance.primary_node + snode = self.op.remote_node + + # create a fake disk info for _GenerateDiskTemplate + disk_info = [{"size": d.size, "mode": d.mode} for d in instance.disks] + new_disks = _GenerateDiskTemplate(self, self.op.disk_template, + instance.name, pnode, [snode], + disk_info, None, None, 0) + info = _GetInstanceInfoText(instance) + feedback_fn("Creating aditional volumes...") + # first, create the missing data and meta devices + for disk in new_disks: + # unfortunately this is... not too nice + _CreateSingleBlockDev(self, pnode, instance, disk.children[1], + info, True) + for child in disk.children: + _CreateSingleBlockDev(self, snode, instance, child, info, True) + # at this stage, all new LVs have been created, we can rename the + # old ones + feedback_fn("Renaming original volumes...") + rename_list = [(o, n.children[0].logical_id) + for (o, n) in zip(instance.disks, new_disks)] + result = self.rpc.call_blockdev_rename(pnode, rename_list) + result.Raise("Failed to rename original LVs") + + feedback_fn("Initializing DRBD devices...") + # all child devices are in place, we can now create the DRBD devices + for disk in new_disks: + for node in [pnode, snode]: + f_create = node == pnode + _CreateSingleBlockDev(self, node, instance, disk, info, f_create) + + # at this point, the instance has been modified + instance.disk_template = constants.DT_DRBD8 + instance.disks = new_disks + self.cfg.Update(instance, feedback_fn) + + # disks are created, waiting for sync + disk_abort = not _WaitForSync(self, instance) + if disk_abort: + raise errors.OpExecError("There are some degraded disks for" + " this instance, please cleanup manually") + + def _ConvertDrbdToPlain(self, feedback_fn): + """Converts an instance from drbd to plain. + + """ + instance = self.instance + assert len(instance.secondary_nodes) == 1 + pnode = instance.primary_node + snode = instance.secondary_nodes[0] + feedback_fn("Converting template to plain") + + old_disks = instance.disks + new_disks = [d.children[0] for d in old_disks] + + # copy over size and mode + for parent, child in zip(old_disks, new_disks): + child.size = parent.size + child.mode = parent.mode + + # update instance structure + instance.disks = new_disks + instance.disk_template = constants.DT_PLAIN + self.cfg.Update(instance, feedback_fn) + + feedback_fn("Removing volumes on the secondary node...") + for disk in old_disks: + self.cfg.SetDiskID(disk, snode) + msg = self.rpc.call_blockdev_remove(snode, disk).fail_msg + if msg: + self.LogWarning("Could not remove block device %s on node %s," + " continuing anyway: %s", disk.iv_name, snode, msg) + + feedback_fn("Removing unneeded volumes on the primary node...") + for idx, disk in enumerate(old_disks): + meta = disk.children[1] + self.cfg.SetDiskID(meta, pnode) + msg = self.rpc.call_blockdev_remove(pnode, meta).fail_msg + if msg: + self.LogWarning("Could not remove metadata for disk %d on node %s," + " continuing anyway: %s", idx, pnode, msg) + + def Exec(self, feedback_fn): """Modifies an instance. @@ -8057,6 +8690,20 @@ class LUSetInstanceParams(LogicalUnit): # change a given disk instance.disks[disk_op].mode = disk_dict['mode'] result.append(("disk.mode/%d" % disk_op, disk_dict['mode'])) + + if self.op.disk_template: + r_shut = _ShutdownInstanceDisks(self, instance) + if not r_shut: + raise errors.OpExecError("Cannot shutdow instance disks, unable to" + " proceed with disk template conversion") + mode = (instance.disk_template, self.op.disk_template) + try: + self._DISK_CONVERSIONS[mode](self, feedback_fn) + except: + self.cfg.ReleaseDRBDMinors(instance.name) + raise + result.append(("disk_template", self.op.disk_template)) + # NIC changes for nic_op, nic_dict in self.op.nics: if nic_op == constants.DDM_REMOVE: @@ -8097,10 +8744,18 @@ class LUSetInstanceParams(LogicalUnit): for key, val in self.op.beparams.iteritems(): result.append(("be/%s" % key, val)) + # OS change + if self.op.os_name: + instance.os = self.op.os_name + self.cfg.Update(instance, feedback_fn) return result + _DISK_CONVERSIONS = { + (constants.DT_PLAIN, constants.DT_DRBD8): _ConvertPlainToDrbd, + (constants.DT_DRBD8, constants.DT_PLAIN): _ConvertDrbdToPlain, + } class LUQueryExports(NoHooksLU): """Query the exports list @@ -8157,11 +8812,22 @@ class LUExportInstance(LogicalUnit): """Check the arguments. """ + _CheckBooleanOpField(self.op, "remove_instance") + _CheckBooleanOpField(self.op, "ignore_remove_failures") + self.shutdown_timeout = getattr(self.op, "shutdown_timeout", constants.DEFAULT_SHUTDOWN_TIMEOUT) + self.remove_instance = getattr(self.op, "remove_instance", False) + self.ignore_remove_failures = getattr(self.op, "ignore_remove_failures", + False) + + if self.remove_instance and not self.op.shutdown: + raise errors.OpPrereqError("Can not remove instance without shutting it" + " down before") def ExpandNames(self): self._ExpandAndLockInstance() + # FIXME: lock only instance primary and destination node # # Sad but true, for now we have do lock all nodes, as we don't know where @@ -8186,6 +8852,8 @@ class LUExportInstance(LogicalUnit): "EXPORT_NODE": self.op.target_node, "EXPORT_DO_SHUTDOWN": self.op.shutdown, "SHUTDOWN_TIMEOUT": self.shutdown_timeout, + # TODO: Generic function for boolean env variables + "REMOVE_INSTANCE": str(bool(self.remove_instance)), } env.update(_BuildInstanceHookEnvByObject(self, self.instance)) nl = [self.cfg.GetMasterNode(), self.instance.primary_node, @@ -8212,11 +8880,100 @@ class LUExportInstance(LogicalUnit): _CheckNodeNotDrained(self, self.dst_node.name) # instance disk type verification + # TODO: Implement export support for file-based disks for disk in self.instance.disks: if disk.dev_type == constants.LD_FILE: raise errors.OpPrereqError("Export not supported for instances with" " file-based disks", errors.ECODE_INVAL) + def _CreateSnapshots(self, feedback_fn): + """Creates an LVM snapshot for every disk of the instance. + + @return: List of snapshots as L{objects.Disk} instances + + """ + instance = self.instance + src_node = instance.primary_node + + vgname = self.cfg.GetVGName() + + snap_disks = [] + + for idx, disk in enumerate(instance.disks): + feedback_fn("Creating a snapshot of disk/%s on node %s" % + (idx, src_node)) + + # result.payload will be a snapshot of an lvm leaf of the one we + # passed + result = self.rpc.call_blockdev_snapshot(src_node, disk) + msg = result.fail_msg + if msg: + self.LogWarning("Could not snapshot disk/%s on node %s: %s", + idx, src_node, msg) + snap_disks.append(False) + else: + disk_id = (vgname, result.payload) + new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size, + logical_id=disk_id, physical_id=disk_id, + iv_name=disk.iv_name) + snap_disks.append(new_dev) + + return snap_disks + + def _RemoveSnapshot(self, feedback_fn, snap_disks, disk_index): + """Removes an LVM snapshot. + + @type snap_disks: list + @param snap_disks: The list of all snapshots as returned by + L{_CreateSnapshots} + @type disk_index: number + @param disk_index: Index of the snapshot to be removed + @rtype: bool + @return: Whether removal was successful or not + + """ + disk = snap_disks[disk_index] + if disk: + src_node = self.instance.primary_node + + feedback_fn("Removing snapshot of disk/%s on node %s" % + (disk_index, src_node)) + + result = self.rpc.call_blockdev_remove(src_node, disk) + if not result.fail_msg: + return True + + self.LogWarning("Could not remove snapshot for disk/%d from node" + " %s: %s", disk_index, src_node, result.fail_msg) + + return False + + def _CleanupExports(self, feedback_fn): + """Removes exports of current instance from all other nodes. + + If an instance in a cluster with nodes A..D was exported to node C, its + exports will be removed from the nodes A, B and D. + + """ + nodelist = self.cfg.GetNodeList() + nodelist.remove(self.dst_node.name) + + # on one-node clusters nodelist will be empty after the removal + # if we proceed the backup would be removed because OpQueryExports + # substitutes an empty list with the full cluster node list. + iname = self.instance.name + if nodelist: + feedback_fn("Removing old exports for instance %s" % iname) + exportlist = self.rpc.call_export_list(nodelist) + for node in exportlist: + if exportlist[node].fail_msg: + continue + if iname in exportlist[node].payload: + msg = self.rpc.call_export_remove(node, iname).fail_msg + if msg: + self.LogWarning("Could not remove older export for instance %s" + " on node %s: %s", iname, node, msg) + def Exec(self, feedback_fn): """Export an instance to an image in the cluster. @@ -8230,13 +8987,10 @@ class LUExportInstance(LogicalUnit): feedback_fn("Shutting down instance %s" % instance.name) result = self.rpc.call_instance_shutdown(src_node, instance, self.shutdown_timeout) + # TODO: Maybe ignore failures if ignore_remove_failures is set result.Raise("Could not shutdown instance %s on" " node %s" % (instance.name, src_node)) - vgname = self.cfg.GetVGName() - - snap_disks = [] - # set the disks ID correctly since call_instance_start needs the # correct drbd minor to create the symlinks for disk in instance.disks: @@ -8251,94 +9005,93 @@ class LUExportInstance(LogicalUnit): try: # per-disk results - dresults = [] + removed_snaps = [False] * len(instance.disks) + + snap_disks = None try: - for idx, disk in enumerate(instance.disks): - feedback_fn("Creating a snapshot of disk/%s on node %s" % - (idx, src_node)) - - # result.payload will be a snapshot of an lvm leaf of the one we - # passed - result = self.rpc.call_blockdev_snapshot(src_node, disk) - msg = result.fail_msg - if msg: - self.LogWarning("Could not snapshot disk/%s on node %s: %s", - idx, src_node, msg) - snap_disks.append(False) - else: - disk_id = (vgname, result.payload) - new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size, - logical_id=disk_id, physical_id=disk_id, - iv_name=disk.iv_name) - snap_disks.append(new_dev) + try: + snap_disks = self._CreateSnapshots(feedback_fn) + finally: + if (self.op.shutdown and instance.admin_up and + not self.remove_instance): + feedback_fn("Starting instance %s" % instance.name) + result = self.rpc.call_instance_start(src_node, instance, + None, None) + msg = result.fail_msg + if msg: + _ShutdownInstanceDisks(self, instance) + raise errors.OpExecError("Could not start instance: %s" % msg) + + assert len(snap_disks) == len(instance.disks) + assert len(removed_snaps) == len(instance.disks) + + # TODO: check for size + + def _TransferFinished(idx): + logging.debug("Transfer %s finished", idx) + if self._RemoveSnapshot(feedback_fn, snap_disks, idx): + removed_snaps[idx] = True + + transfers = [] + + for idx, dev in enumerate(snap_disks): + if not dev: + transfers.append(None) + continue - finally: - if self.op.shutdown and instance.admin_up: - feedback_fn("Starting instance %s" % instance.name) - result = self.rpc.call_instance_start(src_node, instance, None, None) - msg = result.fail_msg - if msg: - _ShutdownInstanceDisks(self, instance) - raise errors.OpExecError("Could not start instance: %s" % msg) - - # TODO: check for size - - cluster_name = self.cfg.GetClusterName() - for idx, dev in enumerate(snap_disks): - feedback_fn("Exporting snapshot %s from %s to %s" % - (idx, src_node, dst_node.name)) - if dev: - # FIXME: pass debug from opcode to backend - result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name, - instance, cluster_name, - idx, self.op.debug_level) - msg = result.fail_msg - if msg: - self.LogWarning("Could not export disk/%s from node %s to" - " node %s: %s", idx, src_node, dst_node.name, msg) - dresults.append(False) - else: - dresults.append(True) - msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg - if msg: - self.LogWarning("Could not remove snapshot for disk/%d from node" - " %s: %s", idx, src_node, msg) - else: - dresults.append(False) + path = utils.PathJoin(constants.EXPORT_DIR, "%s.new" % instance.name, + dev.physical_id[1]) - feedback_fn("Finalizing export on %s" % dst_node.name) - result = self.rpc.call_finalize_export(dst_node.name, instance, - snap_disks) - fin_resu = True - msg = result.fail_msg - if msg: - self.LogWarning("Could not finalize export for instance %s" - " on node %s: %s", instance.name, dst_node.name, msg) - fin_resu = False + finished_fn = compat.partial(_TransferFinished, idx) + + # FIXME: pass debug option from opcode to backend + dt = masterd.instance.DiskTransfer("snapshot/%s" % idx, + constants.IEIO_SCRIPT, (dev, idx), + constants.IEIO_FILE, (path, ), + finished_fn) + transfers.append(dt) + + # Actually export data + dresults = \ + masterd.instance.TransferInstanceData(self, feedback_fn, + src_node, dst_node.name, + dst_node.secondary_ip, + instance, transfers) + + assert len(dresults) == len(instance.disks) + + # Check for backwards compatibility + assert compat.all(isinstance(i, bool) for i in dresults), \ + "Not all results are boolean: %r" % dresults + + feedback_fn("Finalizing export on %s" % dst_node.name) + result = self.rpc.call_finalize_export(dst_node.name, instance, + snap_disks) + msg = result.fail_msg + fin_resu = not msg + if msg: + self.LogWarning("Could not finalize export for instance %s" + " on node %s: %s", instance.name, dst_node.name, msg) + + finally: + # Remove all snapshots + assert len(removed_snaps) == len(instance.disks) + for idx, removed in enumerate(removed_snaps): + if not removed: + self._RemoveSnapshot(feedback_fn, snap_disks, idx) finally: if activate_disks: feedback_fn("Deactivating disks for %s" % instance.name) _ShutdownInstanceDisks(self, instance) - nodelist = self.cfg.GetNodeList() - nodelist.remove(dst_node.name) + # Remove instance if requested + if self.remove_instance: + feedback_fn("Removing instance %s" % instance.name) + _RemoveInstance(self, feedback_fn, instance, self.ignore_remove_failures) + + self._CleanupExports(feedback_fn) - # on one-node clusters nodelist will be empty after the removal - # if we proceed the backup would be removed because OpQueryExports - # substitutes an empty list with the full cluster node list. - iname = instance.name - if nodelist: - feedback_fn("Removing old exports for instance %s" % iname) - exportlist = self.rpc.call_export_list(nodelist) - for node in exportlist: - if exportlist[node].fail_msg: - continue - if iname in exportlist[node].payload: - msg = self.rpc.call_export_remove(node, iname).fail_msg - if msg: - self.LogWarning("Could not remove older export for instance %s" - " on node %s: %s", iname, node, msg) return fin_resu, dresults