X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/b91a34a5c61794ad18acb36325ef68afde17b80e..38d7239a7a7182b919b3b8dbeac809b08075ce87:/lib/cmdlib.py diff --git a/lib/cmdlib.py b/lib/cmdlib.py index b919a35..c9474b0 100644 --- a/lib/cmdlib.py +++ b/lib/cmdlib.py @@ -30,6 +30,7 @@ import time import tempfile import re import platform +import logging from ganeti import rpc from ganeti import ssh @@ -80,9 +81,12 @@ class LogicalUnit(object): self.cfg = context.cfg self.sstore = sstore self.context = context + # Dicts used to declare locking needs to mcpu self.needed_locks = None self.acquired_locks = {} self.share_locks = dict(((i, 0) for i in locking.LEVELS)) + self.add_locks = {} + self.remove_locks = {} # Used to force good behavior when calling helper functions self.recalculate_locks = {} self.__ssh = None @@ -126,9 +130,7 @@ class LogicalUnit(object): - Use an empty dict if you don't need any lock - If you don't need any lock at a particular level omit that level - Don't put anything for the BGL level - - If you want all locks at a level use None as a value - (this reflects what LockSet does, and will be replaced before - CheckPrereq with the full list of nodes that have been locked) + - If you want all locks at a level use locking.ALL_SET as a value If you need to share locks (rather than acquire them exclusively) at one level you can modify self.share_locks, setting a true value (usually 1) for @@ -137,8 +139,8 @@ class LogicalUnit(object): Examples: # Acquire all nodes and one instance self.needed_locks = { - locking.LEVEL_NODE: None, - locking.LEVEL_INSTANCES: ['instance1.example.tld'], + locking.LEVEL_NODE: locking.ALL_SET, + locking.LEVEL_INSTANCE: ['instance1.example.tld'], } # Acquire just two nodes self.needed_locks = { @@ -263,7 +265,7 @@ class LogicalUnit(object): self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name self.op.instance_name = expanded_name - def _LockInstancesNodes(self): + def _LockInstancesNodes(self, primary_only=False): """Helper function to declare instances' nodes for locking. This function should be called after locking one or more instances to lock @@ -282,6 +284,9 @@ class LogicalUnit(object): if level == locking.LEVEL_NODE: self._LockInstancesNodes() + @type primary_only: boolean + @param primary_only: only lock primary nodes of locked instances + """ assert locking.LEVEL_NODE in self.recalculate_locks, \ "_LockInstancesNodes helper function called with no nodes to recalculate" @@ -295,8 +300,13 @@ class LogicalUnit(object): for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]: instance = self.context.cfg.GetInstanceInfo(instance_name) wanted_nodes.append(instance.primary_node) - wanted_nodes.extend(instance.secondary_nodes) - self.needed_locks[locking.LEVEL_NODE] = wanted_nodes + if not primary_only: + wanted_nodes.extend(instance.secondary_nodes) + + if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE: + self.needed_locks[locking.LEVEL_NODE] = wanted_nodes + elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND: + self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes) del self.recalculate_locks[locking.LEVEL_NODE] @@ -322,17 +332,17 @@ def _GetWantedNodes(lu, nodes): if not isinstance(nodes, list): raise errors.OpPrereqError("Invalid argument type 'nodes'") - if nodes: - wanted = [] + if not nodes: + raise errors.ProgrammerError("_GetWantedNodes should only be called with a" + " non-empty list of nodes whose name is to be expanded.") - for name in nodes: - node = lu.cfg.ExpandNodeName(name) - if node is None: - raise errors.OpPrereqError("No such node name '%s'" % name) - wanted.append(node) + wanted = [] + for name in nodes: + node = lu.cfg.ExpandNodeName(name) + if node is None: + raise errors.OpPrereqError("No such node name '%s'" % name) + wanted.append(node) - else: - wanted = lu.cfg.GetNodeList() return utils.NiceSort(wanted) @@ -492,6 +502,14 @@ class LUVerifyCluster(LogicalUnit): HPATH = "cluster-verify" HTYPE = constants.HTYPE_CLUSTER _OP_REQP = ["skip_checks"] + REQ_BGL = False + + def ExpandNames(self): + self.needed_locks = { + locking.LEVEL_NODE: locking.ALL_SET, + locking.LEVEL_INSTANCE: locking.ALL_SET, + } + self.share_locks = dict(((i, 1) for i in locking.LEVELS)) def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result, remote_version, feedback_fn): @@ -906,6 +924,14 @@ class LUVerifyDisks(NoHooksLU): """ _OP_REQP = [] + REQ_BGL = False + + def ExpandNames(self): + self.needed_locks = { + locking.LEVEL_NODE: locking.ALL_SET, + locking.LEVEL_INSTANCE: locking.ALL_SET, + } + self.share_locks = dict(((i, 1) for i in locking.LEVELS)) def CheckPrereq(self): """Check prerequisites. @@ -1076,6 +1102,15 @@ class LUSetClusterParams(LogicalUnit): HPATH = "cluster-modify" HTYPE = constants.HTYPE_CLUSTER _OP_REQP = [] + REQ_BGL = False + + def ExpandNames(self): + # FIXME: in the future maybe other cluster params won't require checking on + # all nodes to be modified. + self.needed_locks = { + locking.LEVEL_NODE: locking.ALL_SET, + } + self.share_locks[locking.LEVEL_NODE] = 1 def BuildHooksEnv(self): """Build hooks env. @@ -1095,9 +1130,10 @@ class LUSetClusterParams(LogicalUnit): if the given volume group is valid. """ + # FIXME: This only works because there is only one parameter that can be + # changed or removed. if not self.op.vg_name: - instances = [self.cfg.GetInstanceInfo(name) - for name in self.cfg.GetInstanceList()] + instances = self.cfg.GetAllInstancesInfo().values() for inst in instances: for disk in inst.disks: if _RecursiveCheckIfLVMBased(disk): @@ -1106,7 +1142,7 @@ class LUSetClusterParams(LogicalUnit): # if vg_name not None, checks given volume group on all nodes if self.op.vg_name: - node_list = self.cfg.GetNodeList() + node_list = self.acquired_locks[locking.LEVEL_NODE] vglist = rpc.call_vg_list(node_list) for node in node_list: vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name, @@ -1218,13 +1254,9 @@ class LUDiagnoseOS(NoHooksLU): """ _OP_REQP = ["output_fields", "names"] + REQ_BGL = False - def CheckPrereq(self): - """Check prerequisites. - - This always succeeds, since this is a pure query LU. - - """ + def ExpandNames(self): if self.op.names: raise errors.OpPrereqError("Selective OS query not supported") @@ -1233,6 +1265,16 @@ class LUDiagnoseOS(NoHooksLU): dynamic=self.dynamic_fields, selected=self.op.output_fields) + # Lock all nodes, in shared mode + self.needed_locks = {} + self.share_locks[locking.LEVEL_NODE] = 1 + self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET + + def CheckPrereq(self): + """Check prerequisites. + + """ + @staticmethod def _DiagnoseByOS(node_list, rlist): """Remaps a per-node return list into an a per-os per-node dictionary @@ -1268,7 +1310,7 @@ class LUDiagnoseOS(NoHooksLU): """Compute the list of OSes. """ - node_list = self.cfg.GetNodeList() + node_list = self.acquired_locks[locking.LEVEL_NODE] node_data = rpc.call_os_diagnose(node_list) if node_data == False: raise errors.OpExecError("Can't gather the list of OSes") @@ -1377,36 +1419,55 @@ class LUQueryNodes(NoHooksLU): "ctotal", ]) - _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt", - "pinst_list", "sinst_list", - "pip", "sip", "tags"], + self.static_fields = frozenset([ + "name", "pinst_cnt", "sinst_cnt", + "pinst_list", "sinst_list", + "pip", "sip", "tags", + "serial_no", + ]) + + _CheckOutputFields(static=self.static_fields, dynamic=self.dynamic_fields, selected=self.op.output_fields) self.needed_locks = {} self.share_locks[locking.LEVEL_NODE] = 1 - # TODO: we could lock nodes only if the user asked for dynamic fields. For - # that we need atomic ways to get info for a group of nodes from the - # config, though. - if not self.op.names: - self.needed_locks[locking.LEVEL_NODE] = None + + if self.op.names: + self.wanted = _GetWantedNodes(self, self.op.names) else: - self.needed_locks[locking.LEVEL_NODE] = \ - _GetWantedNodes(self, self.op.names) + self.wanted = locking.ALL_SET + + self.do_locking = not self.static_fields.issuperset(self.op.output_fields) + if self.do_locking: + # if we don't request only static fields, we need to lock the nodes + self.needed_locks[locking.LEVEL_NODE] = self.wanted + def CheckPrereq(self): """Check prerequisites. """ - # This of course is valid only if we locked the nodes - self.wanted = self.acquired_locks[locking.LEVEL_NODE] + # The validation of the node list is done in the _GetWantedNodes, + # if non empty, and if empty, there's no validation to do + pass def Exec(self, feedback_fn): """Computes the list of nodes and their attributes. """ - nodenames = self.wanted - nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames] + all_info = self.cfg.GetAllNodesInfo() + if self.do_locking: + nodenames = self.acquired_locks[locking.LEVEL_NODE] + elif self.wanted != locking.ALL_SET: + nodenames = self.wanted + missing = set(nodenames).difference(all_info.keys()) + if missing: + raise self.OpExecError( + "Some nodes were removed before retrieving their data: %s" % missing) + else: + nodenames = all_info.keys() + nodelist = [all_info[name] for name in nodenames] # begin data gathering @@ -1468,6 +1529,8 @@ class LUQueryNodes(NoHooksLU): val = node.secondary_ip elif field == "tags": val = list(node.GetTags()) + elif field == "serial_no": + val = node.serial_no elif field in self.dynamic_fields: val = live_data[node.name].get(field, None) else: @@ -1483,6 +1546,20 @@ class LUQueryNodeVolumes(NoHooksLU): """ _OP_REQP = ["nodes", "output_fields"] + REQ_BGL = False + + def ExpandNames(self): + _CheckOutputFields(static=["node"], + dynamic=["phys", "vg", "name", "size", "instance"], + selected=self.op.output_fields) + + self.needed_locks = {} + self.share_locks[locking.LEVEL_NODE] = 1 + if not self.op.nodes: + self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET + else: + self.needed_locks[locking.LEVEL_NODE] = \ + _GetWantedNodes(self, self.op.nodes) def CheckPrereq(self): """Check prerequisites. @@ -1490,12 +1567,7 @@ class LUQueryNodeVolumes(NoHooksLU): This checks that the fields required are valid output fields. """ - self.nodes = _GetWantedNodes(self, self.op.nodes) - - _CheckOutputFields(static=["node"], - dynamic=["phys", "vg", "name", "size", "instance"], - selected=self.op.output_fields) - + self.nodes = self.acquired_locks[locking.LEVEL_NODE] def Exec(self, feedback_fn): """Computes the list of nodes and their attributes. @@ -1815,6 +1887,16 @@ class LUActivateInstanceDisks(NoHooksLU): """ _OP_REQP = ["instance_name"] + REQ_BGL = False + + def ExpandNames(self): + self._ExpandAndLockInstance() + self.needed_locks[locking.LEVEL_NODE] = [] + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + + def DeclareLocks(self, level): + if level == locking.LEVEL_NODE: + self._LockInstancesNodes() def CheckPrereq(self): """Check prerequisites. @@ -1822,13 +1904,9 @@ class LUActivateInstanceDisks(NoHooksLU): This checks that the instance is in the cluster. """ - instance = self.cfg.GetInstanceInfo( - self.cfg.ExpandInstanceName(self.op.instance_name)) - if instance is None: - raise errors.OpPrereqError("Instance '%s' not known" % - self.op.instance_name) - self.instance = instance - + self.instance = self.cfg.GetInstanceInfo(self.op.instance_name) + assert self.instance is not None, \ + "Cannot retrieve locked instance %s" % self.op.instance_name def Exec(self, feedback_fn): """Activate the disks. @@ -1922,6 +2000,16 @@ class LUDeactivateInstanceDisks(NoHooksLU): """ _OP_REQP = ["instance_name"] + REQ_BGL = False + + def ExpandNames(self): + self._ExpandAndLockInstance() + self.needed_locks[locking.LEVEL_NODE] = [] + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + + def DeclareLocks(self, level): + if level == locking.LEVEL_NODE: + self._LockInstancesNodes() def CheckPrereq(self): """Check prerequisites. @@ -1929,29 +2017,36 @@ class LUDeactivateInstanceDisks(NoHooksLU): This checks that the instance is in the cluster. """ - instance = self.cfg.GetInstanceInfo( - self.cfg.ExpandInstanceName(self.op.instance_name)) - if instance is None: - raise errors.OpPrereqError("Instance '%s' not known" % - self.op.instance_name) - self.instance = instance + self.instance = self.cfg.GetInstanceInfo(self.op.instance_name) + assert self.instance is not None, \ + "Cannot retrieve locked instance %s" % self.op.instance_name def Exec(self, feedback_fn): """Deactivate the disks """ instance = self.instance - ins_l = rpc.call_instance_list([instance.primary_node]) - ins_l = ins_l[instance.primary_node] - if not type(ins_l) is list: - raise errors.OpExecError("Can't contact node '%s'" % - instance.primary_node) + _SafeShutdownInstanceDisks(instance, self.cfg) - if self.instance.name in ins_l: - raise errors.OpExecError("Instance is running, can't shutdown" - " block devices.") - _ShutdownInstanceDisks(instance, self.cfg) +def _SafeShutdownInstanceDisks(instance, cfg): + """Shutdown block devices of an instance. + + This function checks if an instance is running, before calling + _ShutdownInstanceDisks. + + """ + ins_l = rpc.call_instance_list([instance.primary_node]) + ins_l = ins_l[instance.primary_node] + if not type(ins_l) is list: + raise errors.OpExecError("Can't contact node '%s'" % + instance.primary_node) + + if instance.name in ins_l: + raise errors.OpExecError("Instance is running, can't shutdown" + " block devices.") + + _ShutdownInstanceDisks(instance, cfg) def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False): @@ -2017,7 +2112,7 @@ class LUStartupInstance(LogicalUnit): def ExpandNames(self): self._ExpandAndLockInstance() self.needed_locks[locking.LEVEL_NODE] = [] - self.recalculate_locks[locking.LEVEL_NODE] = 'replace' + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE def DeclareLocks(self, level): if level == locking.LEVEL_NODE: @@ -2092,12 +2187,12 @@ class LURebootInstance(LogicalUnit): constants.INSTANCE_REBOOT_FULL)) self._ExpandAndLockInstance() self.needed_locks[locking.LEVEL_NODE] = [] - self.recalculate_locks[locking.LEVEL_NODE] = 'replace' + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE def DeclareLocks(self, level): if level == locking.LEVEL_NODE: - # FIXME: lock only primary on (not constants.INSTANCE_REBOOT_FULL) - self._LockInstancesNodes() + primary_only = not constants.INSTANCE_REBOOT_FULL + self._LockInstancesNodes(primary_only=primary_only) def BuildHooksEnv(self): """Build hooks env. @@ -2166,7 +2261,7 @@ class LUShutdownInstance(LogicalUnit): def ExpandNames(self): self._ExpandAndLockInstance() self.needed_locks[locking.LEVEL_NODE] = [] - self.recalculate_locks[locking.LEVEL_NODE] = 'replace' + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE def DeclareLocks(self, level): if level == locking.LEVEL_NODE: @@ -2218,7 +2313,7 @@ class LUReinstallInstance(LogicalUnit): def ExpandNames(self): self._ExpandAndLockInstance() self.needed_locks[locking.LEVEL_NODE] = [] - self.recalculate_locks[locking.LEVEL_NODE] = 'replace' + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE def DeclareLocks(self, level): if level == locking.LEVEL_NODE: @@ -2281,7 +2376,7 @@ class LUReinstallInstance(LogicalUnit): if self.op.os_type is not None: feedback_fn("Changing OS to '%s'..." % self.op.os_type) inst.os = self.op.os_type - self.cfg.AddInstance(inst) + self.cfg.Update(inst) _StartInstanceDisks(self.cfg, inst, None) try: @@ -2406,6 +2501,16 @@ class LURemoveInstance(LogicalUnit): HPATH = "instance-remove" HTYPE = constants.HTYPE_INSTANCE _OP_REQP = ["instance_name", "ignore_failures"] + REQ_BGL = False + + def ExpandNames(self): + self._ExpandAndLockInstance() + self.needed_locks[locking.LEVEL_NODE] = [] + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + + def DeclareLocks(self, level): + if level == locking.LEVEL_NODE: + self._LockInstancesNodes() def BuildHooksEnv(self): """Build hooks env. @@ -2423,12 +2528,9 @@ class LURemoveInstance(LogicalUnit): This checks that the instance is in the cluster. """ - instance = self.cfg.GetInstanceInfo( - self.cfg.ExpandInstanceName(self.op.instance_name)) - if instance is None: - raise errors.OpPrereqError("Instance '%s' not known" % - self.op.instance_name) - self.instance = instance + self.instance = self.cfg.GetInstanceInfo(self.op.instance_name) + assert self.instance is not None, \ + "Cannot retrieve locked instance %s" % self.op.instance_name def Exec(self, feedback_fn): """Remove the instance. @@ -2456,8 +2558,7 @@ class LURemoveInstance(LogicalUnit): logger.Info("removing instance %s out of cluster config" % instance.name) self.cfg.RemoveInstance(instance.name) - # Remove the new instance from the Ganeti Lock Manager - self.context.glm.remove(locking.LEVEL_INSTANCE, instance.name) + self.remove_locks[locking.LEVEL_INSTANCE] = instance.name class LUQueryInstances(NoHooksLU): @@ -2469,15 +2570,18 @@ class LUQueryInstances(NoHooksLU): def ExpandNames(self): self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"]) - _CheckOutputFields(static=["name", "os", "pnode", "snodes", - "admin_state", "admin_ram", - "disk_template", "ip", "mac", "bridge", - "sda_size", "sdb_size", "vcpus", "tags", - "auto_balance", - "network_port", "kernel_path", "initrd_path", - "hvm_boot_order", "hvm_acpi", "hvm_pae", - "hvm_cdrom_image_path", "hvm_nic_type", - "hvm_disk_type", "vnc_bind_address"], + self.static_fields = frozenset([ + "name", "os", "pnode", "snodes", + "admin_state", "admin_ram", + "disk_template", "ip", "mac", "bridge", + "sda_size", "sdb_size", "vcpus", "tags", + "network_port", "kernel_path", "initrd_path", + "hvm_boot_order", "hvm_acpi", "hvm_pae", + "hvm_cdrom_image_path", "hvm_nic_type", + "hvm_disk_type", "vnc_bind_address", + "serial_no", + ]) + _CheckOutputFields(static=self.static_fields, dynamic=self.dynamic_fields, selected=self.op.output_fields) @@ -2485,37 +2589,44 @@ class LUQueryInstances(NoHooksLU): self.share_locks[locking.LEVEL_INSTANCE] = 1 self.share_locks[locking.LEVEL_NODE] = 1 - # TODO: we could lock instances (and nodes) only if the user asked for - # dynamic fields. For that we need atomic ways to get info for a group of - # instances from the config, though. - if not self.op.names: - self.needed_locks[locking.LEVEL_INSTANCE] = None # Acquire all + if self.op.names: + self.wanted = _GetWantedInstances(self, self.op.names) else: - self.needed_locks[locking.LEVEL_INSTANCE] = \ - _GetWantedInstances(self, self.op.names) + self.wanted = locking.ALL_SET - self.needed_locks[locking.LEVEL_NODE] = [] - self.recalculate_locks[locking.LEVEL_NODE] = 'replace' + self.do_locking = not self.static_fields.issuperset(self.op.output_fields) + if self.do_locking: + self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted + self.needed_locks[locking.LEVEL_NODE] = [] + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE def DeclareLocks(self, level): - # TODO: locking of nodes could be avoided when not querying them - if level == locking.LEVEL_NODE: + if level == locking.LEVEL_NODE and self.do_locking: self._LockInstancesNodes() def CheckPrereq(self): """Check prerequisites. """ - # This of course is valid only if we locked the instances - self.wanted = self.acquired_locks[locking.LEVEL_INSTANCE] + pass def Exec(self, feedback_fn): """Computes the list of nodes and their attributes. """ - instance_names = self.wanted - instance_list = [self.cfg.GetInstanceInfo(iname) for iname - in instance_names] + all_info = self.cfg.GetAllInstancesInfo() + if self.do_locking: + instance_names = self.acquired_locks[locking.LEVEL_INSTANCE] + elif self.wanted != locking.ALL_SET: + instance_names = self.wanted + missing = set(instance_names).difference(all_info.keys()) + if missing: + raise self.OpExecError( + "Some instances were removed before retrieving their data: %s" + % missing) + else: + instance_names = all_info.keys() + instance_list = [all_info[iname] for iname in instance_names] # begin data gathering @@ -2598,6 +2709,8 @@ class LUQueryInstances(NoHooksLU): val = instance.vcpus elif field == "tags": val = list(instance.GetTags()) + elif field == "serial_no": + val = instance.serial_no elif field in ("network_port", "kernel_path", "initrd_path", "hvm_boot_order", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path", "hvm_nic_type", @@ -2630,7 +2743,7 @@ class LUFailoverInstance(LogicalUnit): def ExpandNames(self): self._ExpandAndLockInstance() self.needed_locks[locking.LEVEL_NODE] = [] - self.recalculate_locks[locking.LEVEL_NODE] = 'replace' + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE def DeclareLocks(self, level): if level == locking.LEVEL_NODE: @@ -2803,7 +2916,8 @@ def _GenerateUniqueNames(cfg, exts): return results -def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name): +def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name, + p_minor, s_minor): """Generate a drbd8 device complete with its children. """ @@ -2814,8 +2928,9 @@ def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name): dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128, logical_id=(vgname, names[1])) drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size, - logical_id = (primary, secondary, port), - children = [dev_data, dev_meta], + logical_id=(primary, secondary, port, + p_minor, s_minor), + children=[dev_data, dev_meta], iv_name=iv_name) return drbd_dev @@ -2848,12 +2963,18 @@ def _GenerateDiskTemplate(cfg, template_name, if len(secondary_nodes) != 1: raise errors.ProgrammerError("Wrong template configuration") remote_node = secondary_nodes[0] + (minor_pa, minor_pb, + minor_sa, minor_sb) = cfg.AllocateDRBDMinor( + [primary_node, primary_node, remote_node, remote_node], instance_name) + names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta", ".sdb_data", ".sdb_meta"]) drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node, - disk_sz, names[0:2], "sda") + disk_sz, names[0:2], "sda", + minor_pa, minor_sa) drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node, - swap_sz, names[2:4], "sdb") + swap_sz, names[2:4], "sdb", + minor_pb, minor_sb) disks = [drbd_sda_dev, drbd_sdb_dev] elif template_name == constants.DT_FILE: if len(secondary_nodes) != 0: @@ -2993,6 +3114,125 @@ class LUCreateInstance(LogicalUnit): _OP_REQP = ["instance_name", "mem_size", "disk_size", "disk_template", "swap_size", "mode", "start", "vcpus", "wait_for_sync", "ip_check", "mac"] + REQ_BGL = False + + def _ExpandNode(self, node): + """Expands and checks one node name. + + """ + node_full = self.cfg.ExpandNodeName(node) + if node_full is None: + raise errors.OpPrereqError("Unknown node %s" % node) + return node_full + + def ExpandNames(self): + """ExpandNames for CreateInstance. + + Figure out the right locks for instance creation. + + """ + self.needed_locks = {} + + # set optional parameters to none if they don't exist + for attr in ["kernel_path", "initrd_path", "pnode", "snode", + "iallocator", "hvm_boot_order", "hvm_acpi", "hvm_pae", + "hvm_cdrom_image_path", "hvm_nic_type", "hvm_disk_type", + "vnc_bind_address"]: + if not hasattr(self.op, attr): + setattr(self.op, attr, None) + + # verify creation mode + if self.op.mode not in (constants.INSTANCE_CREATE, + constants.INSTANCE_IMPORT): + raise errors.OpPrereqError("Invalid instance creation mode '%s'" % + self.op.mode) + # disk template and mirror node verification + if self.op.disk_template not in constants.DISK_TEMPLATES: + raise errors.OpPrereqError("Invalid disk template name") + + #### instance parameters check + + # instance name verification + hostname1 = utils.HostInfo(self.op.instance_name) + self.op.instance_name = instance_name = hostname1.name + + # this is just a preventive check, but someone might still add this + # instance in the meantime, and creation will fail at lock-add time + if instance_name in self.cfg.GetInstanceList(): + raise errors.OpPrereqError("Instance '%s' is already in the cluster" % + instance_name) + + self.add_locks[locking.LEVEL_INSTANCE] = instance_name + + # ip validity checks + ip = getattr(self.op, "ip", None) + if ip is None or ip.lower() == "none": + inst_ip = None + elif ip.lower() == "auto": + inst_ip = hostname1.ip + else: + if not utils.IsValidIP(ip): + raise errors.OpPrereqError("given IP address '%s' doesn't look" + " like a valid IP" % ip) + inst_ip = ip + self.inst_ip = self.op.ip = inst_ip + # used in CheckPrereq for ip ping check + self.check_ip = hostname1.ip + + # MAC address verification + if self.op.mac != "auto": + if not utils.IsValidMac(self.op.mac.lower()): + raise errors.OpPrereqError("invalid MAC address specified: %s" % + self.op.mac) + + # boot order verification + if self.op.hvm_boot_order is not None: + if len(self.op.hvm_boot_order.strip("acdn")) != 0: + raise errors.OpPrereqError("invalid boot order specified," + " must be one or more of [acdn]") + # file storage checks + if (self.op.file_driver and + not self.op.file_driver in constants.FILE_DRIVER): + raise errors.OpPrereqError("Invalid file driver name '%s'" % + self.op.file_driver) + + if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir): + raise errors.OpPrereqError("File storage directory path not absolute") + + ### Node/iallocator related checks + if [self.op.iallocator, self.op.pnode].count(None) != 1: + raise errors.OpPrereqError("One and only one of iallocator and primary" + " node must be given") + + if self.op.iallocator: + self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET + else: + self.op.pnode = self._ExpandNode(self.op.pnode) + nodelist = [self.op.pnode] + if self.op.snode is not None: + self.op.snode = self._ExpandNode(self.op.snode) + nodelist.append(self.op.snode) + self.needed_locks[locking.LEVEL_NODE] = nodelist + + # in case of import lock the source node too + if self.op.mode == constants.INSTANCE_IMPORT: + src_node = getattr(self.op, "src_node", None) + src_path = getattr(self.op, "src_path", None) + + if src_node is None or src_path is None: + raise errors.OpPrereqError("Importing an instance requires source" + " node and path options") + + if not os.path.isabs(src_path): + raise errors.OpPrereqError("The source path must be absolute") + + self.op.src_node = src_node = self._ExpandNode(src_node) + if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET: + self.needed_locks[locking.LEVEL_NODE].append(src_node) + + else: # INSTANCE_CREATE + if getattr(self.op, "os_type", None) is None: + raise errors.OpPrereqError("No guest OS specified") def _RunAllocator(self): """Run the allocator based on input opcode. @@ -3023,7 +3263,8 @@ class LUCreateInstance(LogicalUnit): if len(ial.nodes) != ial.required_nodes: raise errors.OpPrereqError("iallocator '%s' returned invalid number" " of nodes (%s), required %s" % - (len(ial.nodes), ial.required_nodes)) + (self.op.iallocator, len(ial.nodes), + ial.required_nodes)) self.op.pnode = ial.nodes[0] logger.ToStdout("Selected nodes for the instance: %s" % (", ".join(ial.nodes),)) @@ -3068,36 +3309,14 @@ class LUCreateInstance(LogicalUnit): """Check prerequisites. """ - # set optional parameters to none if they don't exist - for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode", - "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path", - "hvm_nic_type", "hvm_disk_type", "vnc_bind_address"]: - if not hasattr(self.op, attr): - setattr(self.op, attr, None) - - if self.op.mode not in (constants.INSTANCE_CREATE, - constants.INSTANCE_IMPORT): - raise errors.OpPrereqError("Invalid instance creation mode '%s'" % - self.op.mode) - if (not self.cfg.GetVGName() and self.op.disk_template not in constants.DTS_NOT_LVM): raise errors.OpPrereqError("Cluster does not support lvm-based" " instances") if self.op.mode == constants.INSTANCE_IMPORT: - src_node = getattr(self.op, "src_node", None) - src_path = getattr(self.op, "src_path", None) - if src_node is None or src_path is None: - raise errors.OpPrereqError("Importing an instance requires source" - " node and path options") - src_node_full = self.cfg.ExpandNodeName(src_node) - if src_node_full is None: - raise errors.OpPrereqError("Unknown source node '%s'" % src_node) - self.op.src_node = src_node = src_node_full - - if not os.path.isabs(src_path): - raise errors.OpPrereqError("The source path must be absolute") + src_node = self.op.src_node + src_path = self.op.src_path export_info = rpc.call_export_info(src_node, src_path) @@ -3121,52 +3340,17 @@ class LUCreateInstance(LogicalUnit): diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS, 'disk0_dump')) self.src_image = diskimage - else: # INSTANCE_CREATE - if getattr(self.op, "os_type", None) is None: - raise errors.OpPrereqError("No guest OS specified") - - #### instance parameters check - - # disk template and mirror node verification - if self.op.disk_template not in constants.DISK_TEMPLATES: - raise errors.OpPrereqError("Invalid disk template name") - - # instance name verification - hostname1 = utils.HostInfo(self.op.instance_name) - self.op.instance_name = instance_name = hostname1.name - instance_list = self.cfg.GetInstanceList() - if instance_name in instance_list: - raise errors.OpPrereqError("Instance '%s' is already in the cluster" % - instance_name) - - # ip validity checks - ip = getattr(self.op, "ip", None) - if ip is None or ip.lower() == "none": - inst_ip = None - elif ip.lower() == "auto": - inst_ip = hostname1.ip - else: - if not utils.IsValidIP(ip): - raise errors.OpPrereqError("given IP address '%s' doesn't look" - " like a valid IP" % ip) - inst_ip = ip - self.inst_ip = self.op.ip = inst_ip + # ip ping checks (we use the same ip that was resolved in ExpandNames) if self.op.start and not self.op.ip_check: raise errors.OpPrereqError("Cannot ignore IP address conflicts when" " adding an instance in start mode") if self.op.ip_check: - if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT): + if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT): raise errors.OpPrereqError("IP %s of instance %s already in use" % - (hostname1.ip, instance_name)) - - # MAC address verification - if self.op.mac != "auto": - if not utils.IsValidMac(self.op.mac.lower()): - raise errors.OpPrereqError("invalid MAC address specified: %s" % - self.op.mac) + (self.check_ip, instance_name)) # bridge verification bridge = getattr(self.op, "bridge", None) @@ -3175,54 +3359,28 @@ class LUCreateInstance(LogicalUnit): else: self.op.bridge = bridge - # boot order verification - if self.op.hvm_boot_order is not None: - if len(self.op.hvm_boot_order.strip("acdn")) != 0: - raise errors.OpPrereqError("invalid boot order specified," - " must be one or more of [acdn]") - # file storage checks - if (self.op.file_driver and - not self.op.file_driver in constants.FILE_DRIVER): - raise errors.OpPrereqError("Invalid file driver name '%s'" % - self.op.file_driver) - - if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir): - raise errors.OpPrereqError("File storage directory not a relative" - " path") #### allocator run - if [self.op.iallocator, self.op.pnode].count(None) != 1: - raise errors.OpPrereqError("One and only one of iallocator and primary" - " node must be given") - if self.op.iallocator is not None: self._RunAllocator() #### node related checks # check primary node - pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode)) - if pnode is None: - raise errors.OpPrereqError("Primary node '%s' is unknown" % - self.op.pnode) - self.op.pnode = pnode.name - self.pnode = pnode + self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode) + assert self.pnode is not None, \ + "Cannot retrieve locked node %s" % self.op.pnode self.secondaries = [] # mirror node verification if self.op.disk_template in constants.DTS_NET_MIRROR: - if getattr(self.op, "snode", None) is None: + if self.op.snode is None: raise errors.OpPrereqError("The networked disk templates need" " a mirror node") - - snode_name = self.cfg.ExpandNodeName(self.op.snode) - if snode_name is None: - raise errors.OpPrereqError("Unknown secondary node '%s'" % - self.op.snode) - elif snode_name == pnode.name: + if self.op.snode == pnode.name: raise errors.OpPrereqError("The secondary node cannot be" " the primary node.") - self.secondaries.append(snode_name) + self.secondaries.append(self.op.snode) req_size = _ComputeDiskSize(self.op.disk_template, self.op.disk_size, self.op.swap_size) @@ -3254,7 +3412,6 @@ class LUCreateInstance(LogicalUnit): if self.op.kernel_path == constants.VALUE_NONE: raise errors.OpPrereqError("Can't set instance kernel to none") - # bridge check on primary node if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]): raise errors.OpPrereqError("target bridge '%s' does not exist on" @@ -3269,6 +3426,7 @@ class LUCreateInstance(LogicalUnit): # hvm_cdrom_image_path verification if self.op.hvm_cdrom_image_path is not None: + # FIXME (als): shouldn't these checks happen on the destination node? if not os.path.isabs(self.op.hvm_cdrom_image_path): raise errors.OpPrereqError("The path to the HVM CDROM image must" " be an absolute path or None, not %s" % @@ -3367,13 +3525,17 @@ class LUCreateInstance(LogicalUnit): feedback_fn("* creating instance disks...") if not _CreateDisks(self.cfg, iobj): _RemoveDisks(iobj, self.cfg) + self.cfg.ReleaseDRBDMinors(instance) raise errors.OpExecError("Device creation failed, reverting...") feedback_fn("adding instance %s to cluster config" % instance) self.cfg.AddInstance(iobj) - # Add the new instance to the Ganeti Lock Manager - self.context.glm.add(locking.LEVEL_INSTANCE, instance) + # Declare that we don't want to remove the instance lock anymore, as we've + # added the instance to the config + del self.remove_locks[locking.LEVEL_INSTANCE] + # Remove the temp. assignements for the instance's drbds + self.cfg.ReleaseDRBDMinors(instance) if self.op.wait_for_sync: disk_abort = not _WaitForSync(self.cfg, iobj, self.proc) @@ -3388,8 +3550,8 @@ class LUCreateInstance(LogicalUnit): if disk_abort: _RemoveDisks(iobj, self.cfg) self.cfg.RemoveInstance(iobj.name) - # Remove the new instance from the Ganeti Lock Manager - self.context.glm.remove(locking.LEVEL_INSTANCE, iobj.name) + # Make sure the instance lock gets removed + self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name raise errors.OpExecError("There are some degraded disks for" " this instance") @@ -3479,6 +3641,38 @@ class LUReplaceDisks(LogicalUnit): HPATH = "mirrors-replace" HTYPE = constants.HTYPE_INSTANCE _OP_REQP = ["instance_name", "mode", "disks"] + REQ_BGL = False + + def ExpandNames(self): + self._ExpandAndLockInstance() + + if not hasattr(self.op, "remote_node"): + self.op.remote_node = None + + ia_name = getattr(self.op, "iallocator", None) + if ia_name is not None: + if self.op.remote_node is not None: + raise errors.OpPrereqError("Give either the iallocator or the new" + " secondary, not both") + self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET + elif self.op.remote_node is not None: + remote_node = self.cfg.ExpandNodeName(self.op.remote_node) + if remote_node is None: + raise errors.OpPrereqError("Node '%s' not known" % + self.op.remote_node) + self.op.remote_node = remote_node + self.needed_locks[locking.LEVEL_NODE] = [remote_node] + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND + else: + self.needed_locks[locking.LEVEL_NODE] = [] + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + + def DeclareLocks(self, level): + # If we're not already locking all nodes in the set we have to declare the + # instance's primary/secondary nodes. + if (level == locking.LEVEL_NODE and + self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET): + self._LockInstancesNodes() def _RunAllocator(self): """Compute a new secondary node using an IAllocator. @@ -3529,16 +3723,10 @@ class LUReplaceDisks(LogicalUnit): This checks that the instance is in the cluster. """ - if not hasattr(self.op, "remote_node"): - self.op.remote_node = None - - instance = self.cfg.GetInstanceInfo( - self.cfg.ExpandInstanceName(self.op.instance_name)) - if instance is None: - raise errors.OpPrereqError("Instance '%s' not known" % - self.op.instance_name) + instance = self.cfg.GetInstanceInfo(self.op.instance_name) + assert instance is not None, \ + "Cannot retrieve locked instance %s" % self.op.instance_name self.instance = instance - self.op.instance_name = instance.name if instance.disk_template not in constants.DTS_NET_MIRROR: raise errors.OpPrereqError("Instance's disk layout is not" @@ -3553,18 +3741,13 @@ class LUReplaceDisks(LogicalUnit): ia_name = getattr(self.op, "iallocator", None) if ia_name is not None: - if self.op.remote_node is not None: - raise errors.OpPrereqError("Give either the iallocator or the new" - " secondary, not both") - self.op.remote_node = self._RunAllocator() + self._RunAllocator() remote_node = self.op.remote_node if remote_node is not None: - remote_node = self.cfg.ExpandNodeName(remote_node) - if remote_node is None: - raise errors.OpPrereqError("Node '%s' not known" % - self.op.remote_node) self.remote_node_info = self.cfg.GetNodeInfo(remote_node) + assert self.remote_node_info is not None, \ + "Cannot retrieve locked node %s" % remote_node else: self.remote_node_info = None if remote_node == instance.primary_node: @@ -3605,7 +3788,6 @@ class LUReplaceDisks(LogicalUnit): if instance.FindDisk(name) is None: raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" % (name, instance.name)) - self.op.remote_node = remote_node def _ExecD8DiskOnly(self, feedback_fn): """Replace a disk on the primary or secondary for dbrd8. @@ -3854,20 +4036,34 @@ class LUReplaceDisks(LogicalUnit): " node '%s'" % (new_lv.logical_id[1], new_node)) - iv_names[dev.iv_name] = (dev, dev.children) + # Step 4: dbrd minors and drbd setups changes + # after this, we must manually remove the drbd minors on both the + # error and the success paths + minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks], + instance.name) + logging.debug("Allocated minors %s" % (minors,)) self.proc.LogStep(4, steps_total, "changing drbd configuration") - for dev in instance.disks: + for dev, new_minor in zip(instance.disks, minors): size = dev.size info("activating a new drbd on %s for %s" % (new_node, dev.iv_name)) # create new devices on new_node + if pri_node == dev.logical_id[0]: + new_logical_id = (pri_node, new_node, + dev.logical_id[2], dev.logical_id[3], new_minor) + else: + new_logical_id = (new_node, pri_node, + dev.logical_id[2], new_minor, dev.logical_id[4]) + iv_names[dev.iv_name] = (dev, dev.children, new_logical_id) + logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor, + new_logical_id) new_drbd = objects.Disk(dev_type=constants.LD_DRBD8, - logical_id=(pri_node, new_node, - dev.logical_id[2]), + logical_id=new_logical_id, children=dev.children) if not _CreateBlockDevOnSecondary(cfg, new_node, instance, new_drbd, False, _GetInstanceInfoText(instance)): + self.cfg.ReleaseDRBDMinors(instance.name) raise errors.OpExecError("Failed to create new DRBD on" " node '%s'" % new_node) @@ -3885,7 +4081,7 @@ class LUReplaceDisks(LogicalUnit): cfg.SetDiskID(dev, pri_node) # set the physical (unique in bdev terms) id to None, meaning # detach from network - dev.physical_id = (None,) * len(dev.physical_id) + dev.physical_id = (None, None, None, None, dev.physical_id[4]) # and 'find' the device, which will 'fix' it to match the # standalone state if rpc.call_blockdev_find(pri_node, dev): @@ -3896,15 +4092,19 @@ class LUReplaceDisks(LogicalUnit): if not done: # no detaches succeeded (very unlikely) + self.cfg.ReleaseDRBDMinors(instance.name) raise errors.OpExecError("Can't detach at least one DRBD from old node") # if we managed to detach at least one, we update all the disks of # the instance to point to the new secondary info("updating instance configuration") - for dev in instance.disks: - dev.logical_id = (pri_node, new_node) + dev.logical_id[2:] + for dev, _, new_logical_id in iv_names.itervalues(): + dev.logical_id = new_logical_id cfg.SetDiskID(dev, pri_node) cfg.Update(instance) + # we can remove now the temp minors as now the new values are + # written to the config file (and therefore stable) + self.cfg.ReleaseDRBDMinors(instance.name) # and now perform the drbd attach info("attaching primary drbds to new secondary (standalone => connected)") @@ -3915,6 +4115,7 @@ class LUReplaceDisks(LogicalUnit): # it will automatically activate the network, if the physical_id # is correct cfg.SetDiskID(dev, pri_node) + logging.debug("Disk to attach: %s", dev) if not rpc.call_blockdev_find(pri_node, dev): warning("can't attach drbd %s to new secondary!" % dev.iv_name, "please do a gnt-instance info to see the status of disks") @@ -3926,14 +4127,14 @@ class LUReplaceDisks(LogicalUnit): _WaitForSync(cfg, instance, self.proc, unlock=True) # so check manually all the devices - for name, (dev, old_lvs) in iv_names.iteritems(): + for name, (dev, old_lvs, _) in iv_names.iteritems(): cfg.SetDiskID(dev, pri_node) is_degr = rpc.call_blockdev_find(pri_node, dev)[5] if is_degr: raise errors.OpExecError("DRBD device %s is degraded!" % name) self.proc.LogStep(6, steps_total, "removing old storage") - for name, (dev, old_lvs) in iv_names.iteritems(): + for name, (dev, old_lvs, _) in iv_names.iteritems(): info("remove logical volumes for %s" % name) for lv in old_lvs: cfg.SetDiskID(lv, old_node) @@ -3951,8 +4152,7 @@ class LUReplaceDisks(LogicalUnit): # Activate the instance disks if we're replacing them on a down instance if instance.status == "down": - op = opcodes.OpActivateInstanceDisks(instance_name=instance.name) - self.proc.ChainOpCode(op) + _StartInstanceDisks(self.cfg, instance, True) if instance.disk_template == constants.DT_DRBD8: if self.op.remote_node is None: @@ -3966,8 +4166,7 @@ class LUReplaceDisks(LogicalUnit): # Deactivate the instance disks if we're replacing them on a down instance if instance.status == "down": - op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name) - self.proc.ChainOpCode(op) + _SafeShutdownInstanceDisks(instance, self.cfg) return ret @@ -3979,6 +4178,16 @@ class LUGrowDisk(LogicalUnit): HPATH = "disk-grow" HTYPE = constants.HTYPE_INSTANCE _OP_REQP = ["instance_name", "disk", "amount"] + REQ_BGL = False + + def ExpandNames(self): + self._ExpandAndLockInstance() + self.needed_locks[locking.LEVEL_NODE] = [] + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + + def DeclareLocks(self, level): + if level == locking.LEVEL_NODE: + self._LockInstancesNodes() def BuildHooksEnv(self): """Build hooks env. @@ -4003,13 +4212,11 @@ class LUGrowDisk(LogicalUnit): This checks that the instance is in the cluster. """ - instance = self.cfg.GetInstanceInfo( - self.cfg.ExpandInstanceName(self.op.instance_name)) - if instance is None: - raise errors.OpPrereqError("Instance '%s' not known" % - self.op.instance_name) + instance = self.cfg.GetInstanceInfo(self.op.instance_name) + assert instance is not None, \ + "Cannot retrieve locked instance %s" % self.op.instance_name + self.instance = instance - self.op.instance_name = instance.name if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8): raise errors.OpPrereqError("Instance's disk layout does not support" @@ -4044,7 +4251,7 @@ class LUGrowDisk(LogicalUnit): for node in (instance.secondary_nodes + (instance.primary_node,)): self.cfg.SetDiskID(disk, node) result = rpc.call_blockdev_grow(node, disk, self.op.amount) - if not result or not isinstance(result, tuple) or len(result) != 2: + if not result or not isinstance(result, (list, tuple)) or len(result) != 2: raise errors.OpExecError("grow request failed to node %s" % node) elif not result[0]: raise errors.OpExecError("grow request failed to node %s: %s" % @@ -4059,6 +4266,33 @@ class LUQueryInstanceData(NoHooksLU): """ _OP_REQP = ["instances"] + REQ_BGL = False + def ExpandNames(self): + self.needed_locks = {} + self.share_locks = dict(((i, 1) for i in locking.LEVELS)) + + if not isinstance(self.op.instances, list): + raise errors.OpPrereqError("Invalid argument type 'instances'") + + if self.op.instances: + self.wanted_names = [] + for name in self.op.instances: + full_name = self.cfg.ExpandInstanceName(name) + if full_name is None: + raise errors.OpPrereqError("Instance '%s' not known" % + self.op.instance_name) + self.wanted_names.append(full_name) + self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names + else: + self.wanted_names = None + self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET + + self.needed_locks[locking.LEVEL_NODE] = [] + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + + def DeclareLocks(self, level): + if level == locking.LEVEL_NODE: + self._LockInstancesNodes() def CheckPrereq(self): """Check prerequisites. @@ -4066,21 +4300,12 @@ class LUQueryInstanceData(NoHooksLU): This only checks the optional instance list against the existing names. """ - if not isinstance(self.op.instances, list): - raise errors.OpPrereqError("Invalid argument type 'instances'") - if self.op.instances: - self.wanted_instances = [] - names = self.op.instances - for name in names: - instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name)) - if instance is None: - raise errors.OpPrereqError("No such instance name '%s'" % name) - self.wanted_instances.append(instance) - else: - self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name - in self.cfg.GetInstanceList()] - return + if self.wanted_names is None: + self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE] + self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name + in self.wanted_names] + return def _ComputeDiskStatus(self, instance, snode, dev): """Compute block device status. @@ -4457,13 +4682,23 @@ class LUQueryExports(NoHooksLU): """Query the exports list """ - _OP_REQP = [] + _OP_REQP = ['nodes'] + REQ_BGL = False + + def ExpandNames(self): + self.needed_locks = {} + self.share_locks[locking.LEVEL_NODE] = 1 + if not self.op.nodes: + self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET + else: + self.needed_locks[locking.LEVEL_NODE] = \ + _GetWantedNodes(self, self.op.nodes) def CheckPrereq(self): - """Check that the nodelist contains only existing nodes. + """Check prerequisites. """ - self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None)) + self.nodes = self.acquired_locks[locking.LEVEL_NODE] def Exec(self, feedback_fn): """Compute the list of all the exported system images. @@ -4484,6 +4719,23 @@ class LUExportInstance(LogicalUnit): HPATH = "instance-export" HTYPE = constants.HTYPE_INSTANCE _OP_REQP = ["instance_name", "target_node", "shutdown"] + REQ_BGL = False + + def ExpandNames(self): + self._ExpandAndLockInstance() + # FIXME: lock only instance primary and destination node + # + # Sad but true, for now we have do lock all nodes, as we don't know where + # the previous export might be, and and in this LU we search for it and + # remove it from its current node. In the future we could fix this by: + # - making a tasklet to search (share-lock all), then create the new one, + # then one to remove, after + # - removing the removal operation altoghether + self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET + + def DeclareLocks(self, level): + """Last minute lock declaration.""" + # All nodes are locked anyway, so nothing to do here. def BuildHooksEnv(self): """Build hooks env. @@ -4506,20 +4758,16 @@ class LUExportInstance(LogicalUnit): This checks that the instance and node names are valid. """ - instance_name = self.cfg.ExpandInstanceName(self.op.instance_name) + instance_name = self.op.instance_name self.instance = self.cfg.GetInstanceInfo(instance_name) - if self.instance is None: - raise errors.OpPrereqError("Instance '%s' not found" % - self.op.instance_name) + assert self.instance is not None, \ + "Cannot retrieve locked instance %s" % self.op.instance_name - # node verification - dst_node_short = self.cfg.ExpandNodeName(self.op.target_node) - self.dst_node = self.cfg.GetNodeInfo(dst_node_short) + self.dst_node = self.cfg.GetNodeInfo( + self.cfg.ExpandNodeName(self.op.target_node)) - if self.dst_node is None: - raise errors.OpPrereqError("Destination node '%s' is unknown." % - self.op.target_node) - self.op.target_node = self.dst_node.name + assert self.dst_node is not None, \ + "Cannot retrieve locked node %s" % self.op.target_node # instance disk type verification for disk in self.instance.disks: @@ -4587,8 +4835,7 @@ class LUExportInstance(LogicalUnit): # if we proceed the backup would be removed because OpQueryExports # substitutes an empty list with the full cluster node list. if nodelist: - op = opcodes.OpQueryExports(nodes=nodelist) - exportlist = self.proc.ChainOpCode(op) + exportlist = rpc.call_export_list(nodelist) for node in exportlist: if instance.name in exportlist[node]: if not rpc.call_export_remove(node, instance.name): @@ -4601,6 +4848,14 @@ class LURemoveExport(NoHooksLU): """ _OP_REQP = ["instance_name"] + REQ_BGL = False + + def ExpandNames(self): + self.needed_locks = {} + # We need all nodes to be locked in order for RemoveExport to work, but we + # don't need to lock the instance itself, as nothing will happen to it (and + # we can remove exports also for a removed instance) + self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET def CheckPrereq(self): """Check prerequisites. @@ -4619,8 +4874,7 @@ class LURemoveExport(NoHooksLU): fqdn_warn = True instance_name = self.op.instance_name - op = opcodes.OpQueryExports(nodes=[]) - exportlist = self.proc.ChainOpCode(op) + exportlist = rpc.call_export_list(self.acquired_locks[locking.LEVEL_NODE]) found = False for node in exportlist: if instance_name in exportlist[node]: @@ -4641,26 +4895,34 @@ class TagsLU(NoHooksLU): This is an abstract class which is the parent of all the other tags LUs. """ - def CheckPrereq(self): - """Check prerequisites. - """ - if self.op.kind == constants.TAG_CLUSTER: - self.target = self.cfg.GetClusterInfo() - elif self.op.kind == constants.TAG_NODE: + def ExpandNames(self): + self.needed_locks = {} + if self.op.kind == constants.TAG_NODE: name = self.cfg.ExpandNodeName(self.op.name) if name is None: raise errors.OpPrereqError("Invalid node name (%s)" % (self.op.name,)) self.op.name = name - self.target = self.cfg.GetNodeInfo(name) + self.needed_locks[locking.LEVEL_NODE] = name elif self.op.kind == constants.TAG_INSTANCE: name = self.cfg.ExpandInstanceName(self.op.name) if name is None: raise errors.OpPrereqError("Invalid instance name (%s)" % (self.op.name,)) self.op.name = name - self.target = self.cfg.GetInstanceInfo(name) + self.needed_locks[locking.LEVEL_INSTANCE] = name + + def CheckPrereq(self): + """Check prerequisites. + + """ + if self.op.kind == constants.TAG_CLUSTER: + self.target = self.cfg.GetClusterInfo() + elif self.op.kind == constants.TAG_NODE: + self.target = self.cfg.GetNodeInfo(self.op.name) + elif self.op.kind == constants.TAG_INSTANCE: + self.target = self.cfg.GetInstanceInfo(self.op.name) else: raise errors.OpPrereqError("Wrong tag type requested (%s)" % str(self.op.kind)) @@ -4671,6 +4933,7 @@ class LUGetTags(TagsLU): """ _OP_REQP = ["kind", "name"] + REQ_BGL = False def Exec(self, feedback_fn): """Returns the tag list. @@ -4684,6 +4947,10 @@ class LUSearchTags(NoHooksLU): """ _OP_REQP = ["pattern"] + REQ_BGL = False + + def ExpandNames(self): + self.needed_locks = {} def CheckPrereq(self): """Check prerequisites. @@ -4703,9 +4970,9 @@ class LUSearchTags(NoHooksLU): """ cfg = self.cfg tgts = [("/cluster", cfg.GetClusterInfo())] - ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()] + ilist = cfg.GetAllInstancesInfo().values() tgts.extend([("/instances/%s" % i.name, i) for i in ilist]) - nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()] + nlist = cfg.GetAllNodesInfo().values() tgts.extend([("/nodes/%s" % n.name, n) for n in nlist]) results = [] for path, target in tgts: @@ -4720,6 +4987,7 @@ class LUAddTags(TagsLU): """ _OP_REQP = ["kind", "name", "tags"] + REQ_BGL = False def CheckPrereq(self): """Check prerequisites. @@ -4753,6 +5021,7 @@ class LUDelTags(TagsLU): """ _OP_REQP = ["kind", "name", "tags"] + REQ_BGL = False def CheckPrereq(self): """Check prerequisites. @@ -5063,7 +5332,7 @@ class IAllocator(object): result = call_fn(self.sstore.GetMasterNode(), name, self.in_text) - if not isinstance(result, tuple) or len(result) != 4: + if not isinstance(result, (list, tuple)) or len(result) != 4: raise errors.OpExecError("Invalid result from master iallocator runner") rcode, stdout, stderr, fail = result