X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/539210af45bafb300ab3c7b0b76b65552b7af09e..313bceadb9cec8b3a0b627d8f274054a495eb671:/lib/cmdlib.py diff --git a/lib/cmdlib.py b/lib/cmdlib.py index c337cf0..93f9e4f 100644 --- a/lib/cmdlib.py +++ b/lib/cmdlib.py @@ -47,8 +47,8 @@ class LogicalUnit(object): Subclasses must follow these rules: - implement ExpandNames - - implement CheckPrereq - - implement Exec + - implement CheckPrereq (except when tasklets are used) + - implement Exec (except when tasklets are used) - implement BuildHooksEnv - redefine HPATH and HTYPE - optionally redefine their run requirements: @@ -89,14 +89,19 @@ class LogicalUnit(object): # logging self.LogWarning = processor.LogWarning self.LogInfo = processor.LogInfo + self.LogStep = processor.LogStep # support for dry-run self.dry_run_result = None + # Tasklets + self.tasklets = None + for attr_name in self._OP_REQP: attr_val = getattr(op, attr_name, None) if attr_val is None: raise errors.OpPrereqError("Required parameter '%s' missing" % attr_name) + self.CheckArguments() def __GetSSH(self): @@ -148,6 +153,10 @@ class LogicalUnit(object): level you can modify self.share_locks, setting a true value (usually 1) for that level. By default locks are not shared. + This function can also define a list of tasklets, which then will be + executed in order instead of the usual LU-level CheckPrereq and Exec + functions, if those are not defined by the LU. + Examples:: # Acquire all nodes and one instance @@ -204,7 +213,13 @@ class LogicalUnit(object): their canonical form if it hasn't been done by ExpandNames before. """ - raise NotImplementedError + if self.tasklets is not None: + for (idx, tl) in enumerate(self.tasklets): + logging.debug("Checking prerequisites for tasklet %s/%s", + idx + 1, len(self.tasklets)) + tl.CheckPrereq() + else: + raise NotImplementedError def Exec(self, feedback_fn): """Execute the LU. @@ -214,7 +229,12 @@ class LogicalUnit(object): code, or expected. """ - raise NotImplementedError + if self.tasklets is not None: + for (idx, tl) in enumerate(self.tasklets): + logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets)) + tl.Exec(feedback_fn) + else: + raise NotImplementedError def BuildHooksEnv(self): """Build hooks environment for this LU. @@ -338,6 +358,52 @@ class NoHooksLU(LogicalUnit): HTYPE = None +class Tasklet: + """Tasklet base class. + + Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or + they can mix legacy code with tasklets. Locking needs to be done in the LU, + tasklets know nothing about locks. + + Subclasses must follow these rules: + - Implement CheckPrereq + - Implement Exec + + """ + def __init__(self, lu): + self.lu = lu + + # Shortcuts + self.cfg = lu.cfg + self.rpc = lu.rpc + + def CheckPrereq(self): + """Check prerequisites for this tasklets. + + This method should check whether the prerequisites for the execution of + this tasklet are fulfilled. It can do internode communication, but it + should be idempotent - no cluster or system changes are allowed. + + The method should raise errors.OpPrereqError in case something is not + fulfilled. Its return value is ignored. + + This method should also update all parameters to their canonical form if it + hasn't been done before. + + """ + raise NotImplementedError + + def Exec(self, feedback_fn): + """Execute the tasklet. + + This method should implement the actual work. It should raise + errors.OpExecError for failures that are somewhat dealt with in code, or + expected. + + """ + raise NotImplementedError + + def _GetWantedNodes(lu, nodes): """Returns list of checked and expanded node names. @@ -541,6 +607,7 @@ def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status, return env + def _NICListToTuple(lu, nics): """Build a list of nic information tuples. @@ -564,6 +631,7 @@ def _NICListToTuple(lu, nics): hooks_nics.append((ip, mac, mode, link)) return hooks_nics + def _BuildInstanceHookEnvByObject(lu, instance, override=None): """Builds instance related env variables for hooks from an object. @@ -595,7 +663,7 @@ def _BuildInstanceHookEnvByObject(lu, instance, override=None): 'disks': [(disk.size, disk.mode) for disk in instance.disks], 'bep': bep, 'hvp': hvp, - 'hypervisor': instance.hypervisor, + 'hypervisor_name': instance.hypervisor, } if override: args.update(override) @@ -643,6 +711,92 @@ def _CheckInstanceBridgesExist(lu, instance, node=None): _CheckNicsBridgesExist(lu, instance.nics, node) +def _GetNodeInstancesInner(cfg, fn): + return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)] + + +def _GetNodeInstances(cfg, node_name): + """Returns a list of all primary and secondary instances on a node. + + """ + + return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes) + + +def _GetNodePrimaryInstances(cfg, node_name): + """Returns primary instances on a node. + + """ + return _GetNodeInstancesInner(cfg, + lambda inst: node_name == inst.primary_node) + + +def _GetNodeSecondaryInstances(cfg, node_name): + """Returns secondary instances on a node. + + """ + return _GetNodeInstancesInner(cfg, + lambda inst: node_name in inst.secondary_nodes) + + +def _GetStorageTypeArgs(cfg, storage_type): + """Returns the arguments for a storage type. + + """ + # Special case for file storage + if storage_type == constants.ST_FILE: + # storage.FileStorage wants a list of storage directories + return [[cfg.GetFileStorageDir()]] + + return [] + + +def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq): + faulty = [] + + for dev in instance.disks: + cfg.SetDiskID(dev, node_name) + + result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks) + result.Raise("Failed to get disk status from node %s" % node_name, + prereq=prereq) + + for idx, bdev_status in enumerate(result.payload): + if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY: + faulty.append(idx) + + return faulty + + +class LUPostInitCluster(LogicalUnit): + """Logical unit for running hooks after cluster initialization. + + """ + HPATH = "cluster-init" + HTYPE = constants.HTYPE_CLUSTER + _OP_REQP = [] + + def BuildHooksEnv(self): + """Build hooks env. + + """ + env = {"OP_TARGET": self.cfg.GetClusterName()} + mn = self.cfg.GetMasterNode() + return env, [], [mn] + + def CheckPrereq(self): + """No prerequisites to check. + + """ + return True + + def Exec(self, feedback_fn): + """Nothing to do. + + """ + return True + + class LUDestroyCluster(NoHooksLU): """Logical unit for destroying the cluster. @@ -1365,6 +1519,100 @@ class LUVerifyDisks(NoHooksLU): return result +class LURepairDiskSizes(NoHooksLU): + """Verifies the cluster disks sizes. + + """ + _OP_REQP = ["instances"] + REQ_BGL = False + + def ExpandNames(self): + + if not isinstance(self.op.instances, list): + raise errors.OpPrereqError("Invalid argument type 'instances'") + + if self.op.instances: + self.wanted_names = [] + for name in self.op.instances: + full_name = self.cfg.ExpandInstanceName(name) + if full_name is None: + raise errors.OpPrereqError("Instance '%s' not known" % name) + self.wanted_names.append(full_name) + self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names + self.needed_locks = { + locking.LEVEL_NODE: [], + locking.LEVEL_INSTANCE: self.wanted_names, + } + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + else: + self.wanted_names = None + self.needed_locks = { + locking.LEVEL_NODE: locking.ALL_SET, + locking.LEVEL_INSTANCE: locking.ALL_SET, + } + self.share_locks = dict(((i, 1) for i in locking.LEVELS)) + + def DeclareLocks(self, level): + if level == locking.LEVEL_NODE and self.wanted_names is not None: + self._LockInstancesNodes(primary_only=True) + + def CheckPrereq(self): + """Check prerequisites. + + This only checks the optional instance list against the existing names. + + """ + if self.wanted_names is None: + self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE] + + self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name + in self.wanted_names] + + def Exec(self, feedback_fn): + """Verify the size of cluster disks. + + """ + # TODO: check child disks too + # TODO: check differences in size between primary/secondary nodes + per_node_disks = {} + for instance in self.wanted_instances: + pnode = instance.primary_node + if pnode not in per_node_disks: + per_node_disks[pnode] = [] + for idx, disk in enumerate(instance.disks): + per_node_disks[pnode].append((instance, idx, disk)) + + changed = [] + for node, dskl in per_node_disks.items(): + result = self.rpc.call_blockdev_getsizes(node, [v[2] for v in dskl]) + if result.failed: + self.LogWarning("Failure in blockdev_getsizes call to node" + " %s, ignoring", node) + continue + if len(result.data) != len(dskl): + self.LogWarning("Invalid result from node %s, ignoring node results", + node) + continue + for ((instance, idx, disk), size) in zip(dskl, result.data): + if size is None: + self.LogWarning("Disk %d of instance %s did not return size" + " information, ignoring", idx, instance.name) + continue + if not isinstance(size, (int, long)): + self.LogWarning("Disk %d of instance %s did not return valid" + " size information, ignoring", idx, instance.name) + continue + size = size >> 20 + if size != disk.size: + self.LogInfo("Disk %d of instance %s has mismatched size," + " correcting: recorded %d, actual %d", idx, + instance.name, disk.size, size) + disk.size = size + self.cfg.Update(instance) + changed.append((instance.name, idx, size)) + return changed + + class LURenameCluster(LogicalUnit): """Rename the cluster. @@ -1641,6 +1889,7 @@ def _RedistributeAncillaryFiles(lu, additional_nodes=None): constants.SSH_KNOWN_HOSTS_FILE, constants.RAPI_CERT_FILE, constants.RAPI_USERS_FILE, + constants.HMAC_CLUSTER_KEY, ]) enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors @@ -1726,18 +1975,18 @@ def _WaitForSync(lu, instance, oneshot=False, unlock=False): lu.LogWarning("Can't compute data for node %s/%s", node, instance.disks[i].iv_name) continue - # we ignore the ldisk parameter - perc_done, est_time, is_degraded, _ = mstat - cumul_degraded = cumul_degraded or (is_degraded and perc_done is None) - if perc_done is not None: + + cumul_degraded = (cumul_degraded or + (mstat.is_degraded and mstat.sync_percent is None)) + if mstat.sync_percent is not None: done = False - if est_time is not None: - rem_time = "%d estimated seconds remaining" % est_time - max_time = est_time + if mstat.estimated_time is not None: + rem_time = "%d estimated seconds remaining" % mstat.estimated_time + max_time = mstat.estimated_time else: rem_time = "no time estimate" lu.proc.LogInfo("- device %s: %5.2f%% done, %s" % - (instance.disks[i].iv_name, perc_done, rem_time)) + (instance.disks[i].iv_name, mstat.sync_percent, rem_time)) # if we're done but degraded, let's do a few small retries, to # make sure we see a stable and not transient situation; therefore @@ -1767,12 +2016,9 @@ def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False): """ lu.cfg.SetDiskID(dev, node) - if ldisk: - idx = 6 - else: - idx = 5 result = True + if on_primary or dev.AssembleOnSecondary(): rstats = lu.rpc.call_blockdev_find(node, dev) msg = rstats.fail_msg @@ -1783,7 +2029,11 @@ def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False): lu.LogWarning("Can't find disk on node %s", node) result = False else: - result = result and (not rstats.payload[idx]) + if ldisk: + result = result and rstats.payload.ldisk_status == constants.LDS_OKAY + else: + result = result and not rstats.payload.is_degraded + if dev.children: for child in dev.children: result = result and _CheckDiskConsistency(lu, child, node, on_primary) @@ -1974,7 +2224,7 @@ class LUQueryNodes(NoHooksLU): "name", "pinst_cnt", "sinst_cnt", "pinst_list", "sinst_list", "pip", "sip", "tags", - "serial_no", + "serial_no", "ctime", "mtime", "master_candidate", "master", "offline", @@ -2098,6 +2348,10 @@ class LUQueryNodes(NoHooksLU): val = list(node.GetTags()) elif field == "serial_no": val = node.serial_no + elif field == "ctime": + val = node.ctime + elif field == "mtime": + val = node.mtime elif field == "master_candidate": val = node.master_candidate elif field == "master": @@ -2213,6 +2467,154 @@ class LUQueryNodeVolumes(NoHooksLU): return output +class LUQueryNodeStorage(NoHooksLU): + """Logical unit for getting information on storage units on node(s). + + """ + _OP_REQP = ["nodes", "storage_type", "output_fields"] + REQ_BGL = False + _FIELDS_STATIC = utils.FieldSet("node") + + def ExpandNames(self): + storage_type = self.op.storage_type + + if storage_type not in constants.VALID_STORAGE_FIELDS: + raise errors.OpPrereqError("Unknown storage type: %s" % storage_type) + + dynamic_fields = constants.VALID_STORAGE_FIELDS[storage_type] + + _CheckOutputFields(static=self._FIELDS_STATIC, + dynamic=utils.FieldSet(*dynamic_fields), + selected=self.op.output_fields) + + self.needed_locks = {} + self.share_locks[locking.LEVEL_NODE] = 1 + + if self.op.nodes: + self.needed_locks[locking.LEVEL_NODE] = \ + _GetWantedNodes(self, self.op.nodes) + else: + self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET + + def CheckPrereq(self): + """Check prerequisites. + + This checks that the fields required are valid output fields. + + """ + self.op.name = getattr(self.op, "name", None) + + self.nodes = self.acquired_locks[locking.LEVEL_NODE] + + def Exec(self, feedback_fn): + """Computes the list of nodes and their attributes. + + """ + # Always get name to sort by + if constants.SF_NAME in self.op.output_fields: + fields = self.op.output_fields[:] + else: + fields = [constants.SF_NAME] + self.op.output_fields + + # Never ask for node as it's only known to the LU + while "node" in fields: + fields.remove("node") + + field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)]) + name_idx = field_idx[constants.SF_NAME] + + st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type) + data = self.rpc.call_storage_list(self.nodes, + self.op.storage_type, st_args, + self.op.name, fields) + + result = [] + + for node in utils.NiceSort(self.nodes): + nresult = data[node] + if nresult.offline: + continue + + msg = nresult.fail_msg + if msg: + self.LogWarning("Can't get storage data from node %s: %s", node, msg) + continue + + rows = dict([(row[name_idx], row) for row in nresult.payload]) + + for name in utils.NiceSort(rows.keys()): + row = rows[name] + + out = [] + + for field in self.op.output_fields: + if field == "node": + val = node + elif field in field_idx: + val = row[field_idx[field]] + else: + raise errors.ParameterError(field) + + out.append(val) + + result.append(out) + + return result + + +class LUModifyNodeStorage(NoHooksLU): + """Logical unit for modifying a storage volume on a node. + + """ + _OP_REQP = ["node_name", "storage_type", "name", "changes"] + REQ_BGL = False + + def CheckArguments(self): + node_name = self.cfg.ExpandNodeName(self.op.node_name) + if node_name is None: + raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name) + + self.op.node_name = node_name + + storage_type = self.op.storage_type + if storage_type not in constants.VALID_STORAGE_FIELDS: + raise errors.OpPrereqError("Unknown storage type: %s" % storage_type) + + def ExpandNames(self): + self.needed_locks = { + locking.LEVEL_NODE: self.op.node_name, + } + + def CheckPrereq(self): + """Check prerequisites. + + """ + storage_type = self.op.storage_type + + try: + modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type] + except KeyError: + raise errors.OpPrereqError("Storage units of type '%s' can not be" + " modified" % storage_type) + + diff = set(self.op.changes.keys()) - modifiable + if diff: + raise errors.OpPrereqError("The following fields can not be modified for" + " storage units of type '%s': %r" % + (storage_type, list(diff))) + + def Exec(self, feedback_fn): + """Computes the list of nodes and their attributes. + + """ + st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type) + result = self.rpc.call_storage_modify(self.op.node_name, + self.op.storage_type, st_args, + self.op.name, self.op.changes) + result.Raise("Failed to modify storage unit '%s' on %s" % + (self.op.name, self.op.node_name)) + + class LUAddNode(LogicalUnit): """Logical unit for adding node to the cluster. @@ -2580,7 +2982,7 @@ class LUPowercycleNode(NoHooksLU): def ExpandNames(self): """Locking for PowercycleNode. - This is a last-resource option and shouldn't block on other + This is a last-resort option and shouldn't block on other jobs. Therefore, we grab no locks. """ @@ -2644,6 +3046,8 @@ class LUQueryClusterInfo(NoHooksLU): "master_netdev": cluster.master_netdev, "volume_group_name": cluster.volume_group_name, "file_storage_dir": cluster.file_storage_dir, + "ctime": cluster.ctime, + "mtime": cluster.mtime, } return result @@ -2715,19 +3119,24 @@ class LUActivateInstanceDisks(NoHooksLU): assert self.instance is not None, \ "Cannot retrieve locked instance %s" % self.op.instance_name _CheckNodeOnline(self, self.instance.primary_node) + if not hasattr(self.op, "ignore_size"): + self.op.ignore_size = False def Exec(self, feedback_fn): """Activate the disks. """ - disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance) + disks_ok, disks_info = \ + _AssembleInstanceDisks(self, self.instance, + ignore_size=self.op.ignore_size) if not disks_ok: raise errors.OpExecError("Cannot activate block devices") return disks_info -def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False): +def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False, + ignore_size=False): """Prepare the block devices for an instance. This sets up the block devices on all nodes. @@ -2739,6 +3148,10 @@ def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False): @type ignore_secondaries: boolean @param ignore_secondaries: if true, errors on secondary nodes won't result in an error return from the function + @type ignore_size: boolean + @param ignore_size: if true, the current known size of the disk + will not be used during the disk activation, useful for cases + when the size is wrong @return: False if the operation failed, otherwise a list of (host, instance_visible_name, node_visible_name) with the mapping from node devices to instance devices @@ -2759,6 +3172,9 @@ def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False): # 1st pass, assemble on all nodes in secondary mode for inst_disk in instance.disks: for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node): + if ignore_size: + node_disk = node_disk.Copy() + node_disk.UnsetSize() lu.cfg.SetDiskID(node_disk, node) result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False) msg = result.fail_msg @@ -2776,6 +3192,9 @@ def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False): for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node): if node != instance.primary_node: continue + if ignore_size: + node_disk = node_disk.Copy() + node_disk.UnsetSize() lu.cfg.SetDiskID(node_disk, node) result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True) msg = result.fail_msg @@ -3232,13 +3651,29 @@ class LUReinstallInstance(LogicalUnit): _ShutdownInstanceDisks(self, inst) -class LURenameInstance(LogicalUnit): - """Rename an instance. +class LURecreateInstanceDisks(LogicalUnit): + """Recreate an instance's missing disks. """ - HPATH = "instance-rename" + HPATH = "instance-recreate-disks" HTYPE = constants.HTYPE_INSTANCE - _OP_REQP = ["instance_name", "new_name"] + _OP_REQP = ["instance_name", "disks"] + REQ_BGL = False + + def CheckArguments(self): + """Check the arguments. + + """ + if not isinstance(self.op.disks, list): + raise errors.OpPrereqError("Invalid disks parameter") + for item in self.op.disks: + if (not isinstance(item, int) or + item < 0): + raise errors.OpPrereqError("Invalid disk specification '%s'" % + str(item)) + + def ExpandNames(self): + self._ExpandAndLockInstance() def BuildHooksEnv(self): """Build hooks env. @@ -3247,7 +3682,6 @@ class LURenameInstance(LogicalUnit): """ env = _BuildInstanceHookEnvByObject(self, self.instance) - env["INSTANCE_NEW_NAME"] = self.op.new_name nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes) return env, nl, nl @@ -3257,11 +3691,79 @@ class LURenameInstance(LogicalUnit): This checks that the instance is in the cluster and is not running. """ - instance = self.cfg.GetInstanceInfo( - self.cfg.ExpandInstanceName(self.op.instance_name)) - if instance is None: - raise errors.OpPrereqError("Instance '%s' not known" % - self.op.instance_name) + instance = self.cfg.GetInstanceInfo(self.op.instance_name) + assert instance is not None, \ + "Cannot retrieve locked instance %s" % self.op.instance_name + _CheckNodeOnline(self, instance.primary_node) + + if instance.disk_template == constants.DT_DISKLESS: + raise errors.OpPrereqError("Instance '%s' has no disks" % + self.op.instance_name) + if instance.admin_up: + raise errors.OpPrereqError("Instance '%s' is marked to be up" % + self.op.instance_name) + remote_info = self.rpc.call_instance_info(instance.primary_node, + instance.name, + instance.hypervisor) + remote_info.Raise("Error checking node %s" % instance.primary_node, + prereq=True) + if remote_info.payload: + raise errors.OpPrereqError("Instance '%s' is running on the node %s" % + (self.op.instance_name, + instance.primary_node)) + + if not self.op.disks: + self.op.disks = range(len(instance.disks)) + else: + for idx in self.op.disks: + if idx >= len(instance.disks): + raise errors.OpPrereqError("Invalid disk index passed '%s'" % idx) + + self.instance = instance + + def Exec(self, feedback_fn): + """Recreate the disks. + + """ + to_skip = [] + for idx, disk in enumerate(self.instance.disks): + if idx not in self.op.disks: # disk idx has not been passed in + to_skip.append(idx) + continue + + _CreateDisks(self, self.instance, to_skip=to_skip) + + +class LURenameInstance(LogicalUnit): + """Rename an instance. + + """ + HPATH = "instance-rename" + HTYPE = constants.HTYPE_INSTANCE + _OP_REQP = ["instance_name", "new_name"] + + def BuildHooksEnv(self): + """Build hooks env. + + This runs on master, primary and secondary nodes of the instance. + + """ + env = _BuildInstanceHookEnvByObject(self, self.instance) + env["INSTANCE_NEW_NAME"] = self.op.new_name + nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes) + return env, nl, nl + + def CheckPrereq(self): + """Check prerequisites. + + This checks that the instance is in the cluster and is not running. + + """ + instance = self.cfg.GetInstanceInfo( + self.cfg.ExpandInstanceName(self.op.instance_name)) + if instance is None: + raise errors.OpPrereqError("Instance '%s' not known" % + self.op.instance_name) _CheckNodeOnline(self, instance.primary_node) if instance.admin_up: @@ -3423,7 +3925,9 @@ class LUQueryInstances(NoHooksLU): r"(nic)\.(bridge)/([0-9]+)", r"(nic)\.(macs|ips|modes|links|bridges)", r"(disk|nic)\.(count)", - "serial_no", "hypervisor", "hvparams",] + + "serial_no", "hypervisor", "hvparams", + "ctime", "mtime", + ] + ["hv/%s" % name for name in constants.HVS_PARAMETERS] + ["be/%s" % name @@ -3608,6 +4112,10 @@ class LUQueryInstances(NoHooksLU): val = list(instance.GetTags()) elif field == "serial_no": val = instance.serial_no + elif field == "ctime": + val = instance.ctime + elif field == "mtime": + val = instance.mtime elif field == "network_port": val = instance.network_port elif field == "hypervisor": @@ -3835,9 +4343,14 @@ class LUMigrateInstance(LogicalUnit): def ExpandNames(self): self._ExpandAndLockInstance() + self.needed_locks[locking.LEVEL_NODE] = [] self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + self._migrater = TLMigrateInstance(self, self.op.instance_name, + self.op.live, self.op.cleanup) + self.tasklets = [self._migrater] + def DeclareLocks(self, level): if level == locking.LEVEL_NODE: self._LockInstancesNodes() @@ -3848,10 +4361,49 @@ class LUMigrateInstance(LogicalUnit): This runs on master, primary and secondary nodes of the instance. """ - env = _BuildInstanceHookEnvByObject(self, self.instance) + instance = self._migrater.instance + env = _BuildInstanceHookEnvByObject(self, instance) env["MIGRATE_LIVE"] = self.op.live env["MIGRATE_CLEANUP"] = self.op.cleanup - nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes) + nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes) + return env, nl, nl + + +class LUMoveInstance(LogicalUnit): + """Move an instance by data-copying. + + """ + HPATH = "instance-move" + HTYPE = constants.HTYPE_INSTANCE + _OP_REQP = ["instance_name", "target_node"] + REQ_BGL = False + + def ExpandNames(self): + self._ExpandAndLockInstance() + target_node = self.cfg.ExpandNodeName(self.op.target_node) + if target_node is None: + raise errors.OpPrereqError("Node '%s' not known" % + self.op.target_node) + self.op.target_node = target_node + self.needed_locks[locking.LEVEL_NODE] = [target_node] + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND + + def DeclareLocks(self, level): + if level == locking.LEVEL_NODE: + self._LockInstancesNodes(primary_only=True) + + def BuildHooksEnv(self): + """Build hooks env. + + This runs on master, primary and secondary nodes of the instance. + + """ + env = { + "TARGET_NODE": self.op.target_node, + } + env.update(_BuildInstanceHookEnvByObject(self, self.instance)) + nl = [self.cfg.GetMasterNode()] + [self.instance.primary_node, + self.op.target_node] return env, nl, nl def CheckPrereq(self): @@ -3860,11 +4412,215 @@ class LUMigrateInstance(LogicalUnit): This checks that the instance is in the cluster. """ + self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name) + assert self.instance is not None, \ + "Cannot retrieve locked instance %s" % self.op.instance_name + + node = self.cfg.GetNodeInfo(self.op.target_node) + assert node is not None, \ + "Cannot retrieve locked node %s" % self.op.target_node + + self.target_node = target_node = node.name + + if target_node == instance.primary_node: + raise errors.OpPrereqError("Instance %s is already on the node %s" % + (instance.name, target_node)) + + bep = self.cfg.GetClusterInfo().FillBE(instance) + + for idx, dsk in enumerate(instance.disks): + if dsk.dev_type not in (constants.LD_LV, constants.LD_FILE): + raise errors.OpPrereqError("Instance disk %d has a complex layout," + " cannot copy") + + _CheckNodeOnline(self, target_node) + _CheckNodeNotDrained(self, target_node) + + if instance.admin_up: + # check memory requirements on the secondary node + _CheckNodeFreeMemory(self, target_node, "failing over instance %s" % + instance.name, bep[constants.BE_MEMORY], + instance.hypervisor) + else: + self.LogInfo("Not checking memory on the secondary node as" + " instance will not be started") + + # check bridge existance + _CheckInstanceBridgesExist(self, instance, node=target_node) + + def Exec(self, feedback_fn): + """Move an instance. + + The move is done by shutting it down on its present node, copying + the data over (slow) and starting it on the new node. + + """ + instance = self.instance + + source_node = instance.primary_node + target_node = self.target_node + + self.LogInfo("Shutting down instance %s on source node %s", + instance.name, source_node) + + result = self.rpc.call_instance_shutdown(source_node, instance) + msg = result.fail_msg + if msg: + if self.op.ignore_consistency: + self.proc.LogWarning("Could not shutdown instance %s on node %s." + " Proceeding anyway. Please make sure node" + " %s is down. Error details: %s", + instance.name, source_node, source_node, msg) + else: + raise errors.OpExecError("Could not shutdown instance %s on" + " node %s: %s" % + (instance.name, source_node, msg)) + + # create the target disks + try: + _CreateDisks(self, instance, target_node=target_node) + except errors.OpExecError: + self.LogWarning("Device creation failed, reverting...") + try: + _RemoveDisks(self, instance, target_node=target_node) + finally: + self.cfg.ReleaseDRBDMinors(instance.name) + raise + + cluster_name = self.cfg.GetClusterInfo().cluster_name + + errs = [] + # activate, get path, copy the data over + for idx, disk in enumerate(instance.disks): + self.LogInfo("Copying data for disk %d", idx) + result = self.rpc.call_blockdev_assemble(target_node, disk, + instance.name, True) + if result.fail_msg: + self.LogWarning("Can't assemble newly created disk %d: %s", + idx, result.fail_msg) + errs.append(result.fail_msg) + break + dev_path = result.payload + result = self.rpc.call_blockdev_export(source_node, disk, + target_node, dev_path, + cluster_name) + if result.fail_msg: + self.LogWarning("Can't copy data over for disk %d: %s", + idx, result.fail_msg) + errs.append(result.fail_msg) + break + + if errs: + self.LogWarning("Some disks failed to copy, aborting") + try: + _RemoveDisks(self, instance, target_node=target_node) + finally: + self.cfg.ReleaseDRBDMinors(instance.name) + raise errors.OpExecError("Errors during disk copy: %s" % + (",".join(errs),)) + + instance.primary_node = target_node + self.cfg.Update(instance) + + self.LogInfo("Removing the disks on the original node") + _RemoveDisks(self, instance, target_node=source_node) + + # Only start the instance if it's marked as up + if instance.admin_up: + self.LogInfo("Starting instance %s on node %s", + instance.name, target_node) + + disks_ok, _ = _AssembleInstanceDisks(self, instance, + ignore_secondaries=True) + if not disks_ok: + _ShutdownInstanceDisks(self, instance) + raise errors.OpExecError("Can't activate the instance's disks") + + result = self.rpc.call_instance_start(target_node, instance, None, None) + msg = result.fail_msg + if msg: + _ShutdownInstanceDisks(self, instance) + raise errors.OpExecError("Could not start instance %s on node %s: %s" % + (instance.name, target_node, msg)) + + +class LUMigrateNode(LogicalUnit): + """Migrate all instances from a node. + + """ + HPATH = "node-migrate" + HTYPE = constants.HTYPE_NODE + _OP_REQP = ["node_name", "live"] + REQ_BGL = False + + def ExpandNames(self): + self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name) + if self.op.node_name is None: + raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name) + + self.needed_locks = { + locking.LEVEL_NODE: [self.op.node_name], + } + + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND + + # Create tasklets for migrating instances for all instances on this node + names = [] + tasklets = [] + + for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name): + logging.debug("Migrating instance %s", inst.name) + names.append(inst.name) + + tasklets.append(TLMigrateInstance(self, inst.name, self.op.live, False)) + + self.tasklets = tasklets + + # Declare instance locks + self.needed_locks[locking.LEVEL_INSTANCE] = names + + def DeclareLocks(self, level): + if level == locking.LEVEL_NODE: + self._LockInstancesNodes() + + def BuildHooksEnv(self): + """Build hooks env. + + This runs on the master, the primary and all the secondaries. + + """ + env = { + "NODE_NAME": self.op.node_name, + } + + nl = [self.cfg.GetMasterNode()] + + return (env, nl, nl) + + +class TLMigrateInstance(Tasklet): + def __init__(self, lu, instance_name, live, cleanup): + """Initializes this class. + + """ + Tasklet.__init__(self, lu) + + # Parameters + self.instance_name = instance_name + self.live = live + self.cleanup = cleanup + + def CheckPrereq(self): + """Check prerequisites. + + This checks that the instance is in the cluster. + + """ instance = self.cfg.GetInstanceInfo( - self.cfg.ExpandInstanceName(self.op.instance_name)) + self.cfg.ExpandInstanceName(self.instance_name)) if instance is None: raise errors.OpPrereqError("Instance '%s' not known" % - self.op.instance_name) + self.instance_name) if instance.disk_template != constants.DT_DRBD8: raise errors.OpPrereqError("Instance's disk layout is not" @@ -3886,7 +4642,7 @@ class LUMigrateInstance(LogicalUnit): # check bridge existance _CheckInstanceBridgesExist(self, instance, node=target_node) - if not self.op.cleanup: + if not self.cleanup: _CheckNodeNotDrained(self, target_node) result = self.rpc.call_instance_migratable(instance.primary_node, instance) @@ -4033,10 +4789,10 @@ class LUMigrateInstance(LogicalUnit): self._GoReconnect(False) self._WaitUntilSync() except errors.OpExecError, err: - self.LogWarning("Migration failed and I can't reconnect the" - " drives: error '%s'\n" - "Please look and recover the instance status" % - str(err)) + self.lu.LogWarning("Migration failed and I can't reconnect the" + " drives: error '%s'\n" + "Please look and recover the instance status" % + str(err)) def _AbortMigration(self): """Call the hypervisor code to abort a started migration. @@ -4116,7 +4872,7 @@ class LUMigrateInstance(LogicalUnit): time.sleep(10) result = self.rpc.call_instance_migrate(source_node, instance, self.nodes_ip[target_node], - self.op.live) + self.live) msg = result.fail_msg if msg: logging.error("Instance migration failed, trying to revert" @@ -4154,6 +4910,8 @@ class LUMigrateInstance(LogicalUnit): """Perform the migration. """ + feedback_fn("Migrating instance %s" % self.instance.name) + self.feedback_fn = feedback_fn self.source_node = self.instance.primary_node @@ -4163,7 +4921,8 @@ class LUMigrateInstance(LogicalUnit): self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip, self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip, } - if self.op.cleanup: + + if self.cleanup: return self._ExecCleanup() else: return self._ExecMigration() @@ -4348,7 +5107,7 @@ def _GetInstanceInfoText(instance): return "originstname+%s" % instance.name -def _CreateDisks(lu, instance): +def _CreateDisks(lu, instance, to_skip=None, target_node=None): """Create all disks for an instance. This abstracts away some work from AddInstance. @@ -4357,12 +5116,21 @@ def _CreateDisks(lu, instance): @param lu: the logical unit on whose behalf we execute @type instance: L{objects.Instance} @param instance: the instance whose disks we should create + @type to_skip: list + @param to_skip: list of indices to skip + @type target_node: string + @param target_node: if passed, overrides the target node for creation @rtype: boolean @return: the success of the creation """ info = _GetInstanceInfoText(instance) - pnode = instance.primary_node + if target_node is None: + pnode = instance.primary_node + all_nodes = instance.all_nodes + else: + pnode = target_node + all_nodes = [pnode] if instance.disk_template == constants.DT_FILE: file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1]) @@ -4373,16 +5141,18 @@ def _CreateDisks(lu, instance): # Note: this needs to be kept in sync with adding of disks in # LUSetInstanceParams - for device in instance.disks: + for idx, device in enumerate(instance.disks): + if to_skip and idx in to_skip: + continue logging.info("Creating volume %s for instance %s", device.iv_name, instance.name) #HARDCODE - for node in instance.all_nodes: + for node in all_nodes: f_create = node == pnode _CreateBlockDev(lu, node, instance, device, f_create, info, f_create) -def _RemoveDisks(lu, instance): +def _RemoveDisks(lu, instance, target_node=None): """Remove all disks for an instance. This abstracts away some work from `AddInstance()` and @@ -4394,6 +5164,8 @@ def _RemoveDisks(lu, instance): @param lu: the logical unit on whose behalf we execute @type instance: L{objects.Instance} @param instance: the instance whose disks we should remove + @type target_node: string + @param target_node: used to override the node on which to remove the disks @rtype: boolean @return: the success of the removal @@ -4402,7 +5174,11 @@ def _RemoveDisks(lu, instance): all_result = True for device in instance.disks: - for node, disk in device.ComputeNodeTree(instance.primary_node): + if target_node: + edata = [(target_node, device)] + else: + edata = device.ComputeNodeTree(instance.primary_node) + for node, disk in edata: lu.cfg.SetDiskID(disk, node) msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg if msg: @@ -4412,12 +5188,14 @@ def _RemoveDisks(lu, instance): if instance.disk_template == constants.DT_FILE: file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1]) - result = lu.rpc.call_file_storage_dir_remove(instance.primary_node, - file_storage_dir) - msg = result.fail_msg - if msg: + if target_node is node: + tgt = instance.primary_node + else: + tgt = instance.target_node + result = lu.rpc.call_file_storage_dir_remove(tgt, file_storage_dir) + if result.fail_msg: lu.LogWarning("Could not remove directory '%s' on node %s: %s", - file_storage_dir, instance.primary_node, msg) + file_storage_dir, instance.primary_node, result.fail_msg) all_result = False return all_result @@ -4685,7 +5463,7 @@ class LUCreateInstance(LogicalUnit): """ nics = [n.ToDict() for n in self.nics] - ial = IAllocator(self, + ial = IAllocator(self.cfg, self.rpc, mode=constants.IALLOCATOR_MODE_ALLOC, name=self.op.instance_name, disk_template=self.op.disk_template, @@ -5120,43 +5898,40 @@ class LUReplaceDisks(LogicalUnit): if not hasattr(self.op, "iallocator"): self.op.iallocator = None - # check for valid parameter combination - cnt = [self.op.remote_node, self.op.iallocator].count(None) - if self.op.mode == constants.REPLACE_DISK_CHG: - if cnt == 2: - raise errors.OpPrereqError("When changing the secondary either an" - " iallocator script must be used or the" - " new node given") - elif cnt == 0: - raise errors.OpPrereqError("Give either the iallocator or the new" - " secondary, not both") - else: # not replacing the secondary - if cnt != 2: - raise errors.OpPrereqError("The iallocator and new node options can" - " be used only when changing the" - " secondary node") + TLReplaceDisks.CheckArguments(self.op.mode, self.op.remote_node, + self.op.iallocator) def ExpandNames(self): self._ExpandAndLockInstance() if self.op.iallocator is not None: self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET + elif self.op.remote_node is not None: remote_node = self.cfg.ExpandNodeName(self.op.remote_node) if remote_node is None: raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node) + self.op.remote_node = remote_node + # Warning: do not remove the locking of the new secondary here # unless DRBD8.AddChildren is changed to work in parallel; # currently it doesn't since parallel invocations of # FindUnusedMinor will conflict self.needed_locks[locking.LEVEL_NODE] = [remote_node] self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND + else: self.needed_locks[locking.LEVEL_NODE] = [] self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode, + self.op.iallocator, self.op.remote_node, + self.op.disks) + + self.tasklets = [self.replacer] + def DeclareLocks(self, level): # If we're not already locking all nodes in the set we have to declare the # instance's primary/secondary nodes. @@ -5164,213 +5939,503 @@ class LUReplaceDisks(LogicalUnit): self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET): self._LockInstancesNodes() - def _RunAllocator(self): - """Compute a new secondary node using an IAllocator. - - """ - ial = IAllocator(self, - mode=constants.IALLOCATOR_MODE_RELOC, - name=self.op.instance_name, - relocate_from=[self.sec_node]) - - ial.Run(self.op.iallocator) - - if not ial.success: - raise errors.OpPrereqError("Can't compute nodes using" - " iallocator '%s': %s" % (self.op.iallocator, - ial.info)) - if len(ial.nodes) != ial.required_nodes: - raise errors.OpPrereqError("iallocator '%s' returned invalid number" - " of nodes (%s), required %s" % - (len(ial.nodes), ial.required_nodes)) - self.op.remote_node = ial.nodes[0] - self.LogInfo("Selected new secondary for the instance: %s", - self.op.remote_node) - def BuildHooksEnv(self): """Build hooks env. This runs on the master, the primary and all the secondaries. """ + instance = self.replacer.instance env = { "MODE": self.op.mode, "NEW_SECONDARY": self.op.remote_node, - "OLD_SECONDARY": self.instance.secondary_nodes[0], + "OLD_SECONDARY": instance.secondary_nodes[0], } - env.update(_BuildInstanceHookEnvByObject(self, self.instance)) + env.update(_BuildInstanceHookEnvByObject(self, instance)) nl = [ self.cfg.GetMasterNode(), - self.instance.primary_node, + instance.primary_node, ] if self.op.remote_node is not None: nl.append(self.op.remote_node) return env, nl, nl + +class LUEvacuateNode(LogicalUnit): + """Relocate the secondary instances from a node. + + """ + HPATH = "node-evacuate" + HTYPE = constants.HTYPE_NODE + _OP_REQP = ["node_name"] + REQ_BGL = False + + def CheckArguments(self): + if not hasattr(self.op, "remote_node"): + self.op.remote_node = None + if not hasattr(self.op, "iallocator"): + self.op.iallocator = None + + TLReplaceDisks.CheckArguments(constants.REPLACE_DISK_CHG, + self.op.remote_node, + self.op.iallocator) + + def ExpandNames(self): + self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name) + if self.op.node_name is None: + raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name) + + self.needed_locks = {} + + # Declare node locks + if self.op.iallocator is not None: + self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET + + elif self.op.remote_node is not None: + remote_node = self.cfg.ExpandNodeName(self.op.remote_node) + if remote_node is None: + raise errors.OpPrereqError("Node '%s' not known" % + self.op.remote_node) + + self.op.remote_node = remote_node + + # Warning: do not remove the locking of the new secondary here + # unless DRBD8.AddChildren is changed to work in parallel; + # currently it doesn't since parallel invocations of + # FindUnusedMinor will conflict + self.needed_locks[locking.LEVEL_NODE] = [remote_node] + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND + + else: + raise errors.OpPrereqError("Invalid parameters") + + # Create tasklets for replacing disks for all secondary instances on this + # node + names = [] + tasklets = [] + + for inst in _GetNodeSecondaryInstances(self.cfg, self.op.node_name): + logging.debug("Replacing disks for instance %s", inst.name) + names.append(inst.name) + + replacer = TLReplaceDisks(self, inst.name, constants.REPLACE_DISK_CHG, + self.op.iallocator, self.op.remote_node, []) + tasklets.append(replacer) + + self.tasklets = tasklets + self.instance_names = names + + # Declare instance locks + self.needed_locks[locking.LEVEL_INSTANCE] = self.instance_names + + def DeclareLocks(self, level): + # If we're not already locking all nodes in the set we have to declare the + # instance's primary/secondary nodes. + if (level == locking.LEVEL_NODE and + self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET): + self._LockInstancesNodes() + + def BuildHooksEnv(self): + """Build hooks env. + + This runs on the master, the primary and all the secondaries. + + """ + env = { + "NODE_NAME": self.op.node_name, + } + + nl = [self.cfg.GetMasterNode()] + + if self.op.remote_node is not None: + env["NEW_SECONDARY"] = self.op.remote_node + nl.append(self.op.remote_node) + + return (env, nl, nl) + + +class TLReplaceDisks(Tasklet): + """Replaces disks for an instance. + + Note: Locking is not within the scope of this class. + + """ + def __init__(self, lu, instance_name, mode, iallocator_name, remote_node, + disks): + """Initializes this class. + + """ + Tasklet.__init__(self, lu) + + # Parameters + self.instance_name = instance_name + self.mode = mode + self.iallocator_name = iallocator_name + self.remote_node = remote_node + self.disks = disks + + # Runtime data + self.instance = None + self.new_node = None + self.target_node = None + self.other_node = None + self.remote_node_info = None + self.node_secondary_ip = None + + @staticmethod + def CheckArguments(mode, remote_node, iallocator): + """Helper function for users of this class. + + """ + # check for valid parameter combination + if mode == constants.REPLACE_DISK_CHG: + if remote_node is None and iallocator is None: + raise errors.OpPrereqError("When changing the secondary either an" + " iallocator script must be used or the" + " new node given") + + if remote_node is not None and iallocator is not None: + raise errors.OpPrereqError("Give either the iallocator or the new" + " secondary, not both") + + elif remote_node is not None or iallocator is not None: + # Not replacing the secondary + raise errors.OpPrereqError("The iallocator and new node options can" + " only be used when changing the" + " secondary node") + + @staticmethod + def _RunAllocator(lu, iallocator_name, instance_name, relocate_from): + """Compute a new secondary node using an IAllocator. + + """ + ial = IAllocator(lu.cfg, lu.rpc, + mode=constants.IALLOCATOR_MODE_RELOC, + name=instance_name, + relocate_from=relocate_from) + + ial.Run(iallocator_name) + + if not ial.success: + raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':" + " %s" % (iallocator_name, ial.info)) + + if len(ial.nodes) != ial.required_nodes: + raise errors.OpPrereqError("iallocator '%s' returned invalid number" + " of nodes (%s), required %s" % + (len(ial.nodes), ial.required_nodes)) + + remote_node_name = ial.nodes[0] + + lu.LogInfo("Selected new secondary for instance '%s': %s", + instance_name, remote_node_name) + + return remote_node_name + + def _FindFaultyDisks(self, node_name): + return _FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance, + node_name, True) + def CheckPrereq(self): """Check prerequisites. This checks that the instance is in the cluster. """ - instance = self.cfg.GetInstanceInfo(self.op.instance_name) - assert instance is not None, \ - "Cannot retrieve locked instance %s" % self.op.instance_name - self.instance = instance + self.instance = self.cfg.GetInstanceInfo(self.instance_name) + assert self.instance is not None, \ + "Cannot retrieve locked instance %s" % self.instance_name - if instance.disk_template != constants.DT_DRBD8: + if self.instance.disk_template != constants.DT_DRBD8: raise errors.OpPrereqError("Can only run replace disks for DRBD8-based" " instances") - if len(instance.secondary_nodes) != 1: + if len(self.instance.secondary_nodes) != 1: raise errors.OpPrereqError("The instance has a strange layout," " expected one secondary but found %d" % - len(instance.secondary_nodes)) + len(self.instance.secondary_nodes)) - self.sec_node = instance.secondary_nodes[0] + secondary_node = self.instance.secondary_nodes[0] - if self.op.iallocator is not None: - self._RunAllocator() + if self.iallocator_name is None: + remote_node = self.remote_node + else: + remote_node = self._RunAllocator(self.lu, self.iallocator_name, + self.instance.name, secondary_node) - remote_node = self.op.remote_node if remote_node is not None: self.remote_node_info = self.cfg.GetNodeInfo(remote_node) assert self.remote_node_info is not None, \ "Cannot retrieve locked node %s" % remote_node else: self.remote_node_info = None - if remote_node == instance.primary_node: + + if remote_node == self.instance.primary_node: raise errors.OpPrereqError("The specified node is the primary node of" " the instance.") - elif remote_node == self.sec_node: + + if remote_node == secondary_node: raise errors.OpPrereqError("The specified node is already the" " secondary node of the instance.") - if self.op.mode == constants.REPLACE_DISK_PRI: - n1 = self.tgt_node = instance.primary_node - n2 = self.oth_node = self.sec_node - elif self.op.mode == constants.REPLACE_DISK_SEC: - n1 = self.tgt_node = self.sec_node - n2 = self.oth_node = instance.primary_node - elif self.op.mode == constants.REPLACE_DISK_CHG: - n1 = self.new_node = remote_node - n2 = self.oth_node = instance.primary_node - self.tgt_node = self.sec_node - _CheckNodeNotDrained(self, remote_node) + if self.disks and self.mode in (constants.REPLACE_DISK_AUTO, + constants.REPLACE_DISK_CHG): + raise errors.OpPrereqError("Cannot specify disks to be replaced") + + if self.mode == constants.REPLACE_DISK_AUTO: + faulty_primary = self._FindFaultyDisks(self.instance.primary_node) + faulty_secondary = self._FindFaultyDisks(secondary_node) + + if faulty_primary and faulty_secondary: + raise errors.OpPrereqError("Instance %s has faulty disks on more than" + " one node and can not be repaired" + " automatically" % self.instance_name) + + if faulty_primary: + self.disks = faulty_primary + self.target_node = self.instance.primary_node + self.other_node = secondary_node + check_nodes = [self.target_node, self.other_node] + elif faulty_secondary: + self.disks = faulty_secondary + self.target_node = secondary_node + self.other_node = self.instance.primary_node + check_nodes = [self.target_node, self.other_node] + else: + self.disks = [] + check_nodes = [] + else: - raise errors.ProgrammerError("Unhandled disk replace mode") + # Non-automatic modes + if self.mode == constants.REPLACE_DISK_PRI: + self.target_node = self.instance.primary_node + self.other_node = secondary_node + check_nodes = [self.target_node, self.other_node] - _CheckNodeOnline(self, n1) - _CheckNodeOnline(self, n2) + elif self.mode == constants.REPLACE_DISK_SEC: + self.target_node = secondary_node + self.other_node = self.instance.primary_node + check_nodes = [self.target_node, self.other_node] - if not self.op.disks: - self.op.disks = range(len(instance.disks)) + elif self.mode == constants.REPLACE_DISK_CHG: + self.new_node = remote_node + self.other_node = self.instance.primary_node + self.target_node = secondary_node + check_nodes = [self.new_node, self.other_node] - for disk_idx in self.op.disks: - instance.FindDisk(disk_idx) + _CheckNodeNotDrained(self.lu, remote_node) - def _ExecD8DiskOnly(self, feedback_fn): - """Replace a disk on the primary or secondary for dbrd8. + else: + raise errors.ProgrammerError("Unhandled disk replace mode (%s)" % + self.mode) - The algorithm for replace is quite complicated: + # If not specified all disks should be replaced + if not self.disks: + self.disks = range(len(self.instance.disks)) - 1. for each disk to be replaced: + for node in check_nodes: + _CheckNodeOnline(self.lu, node) - 1. create new LVs on the target node with unique names - 1. detach old LVs from the drbd device - 1. rename old LVs to name_replaced. - 1. rename new LVs to old LVs - 1. attach the new LVs (with the old names now) to the drbd device + # Check whether disks are valid + for disk_idx in self.disks: + self.instance.FindDisk(disk_idx) - 1. wait for sync across all devices + # Get secondary node IP addresses + node_2nd_ip = {} - 1. for each modified disk: + for node_name in [self.target_node, self.other_node, self.new_node]: + if node_name is not None: + node_2nd_ip[node_name] = self.cfg.GetNodeInfo(node_name).secondary_ip - 1. remove old LVs (which have the name name_replaces.) + self.node_secondary_ip = node_2nd_ip - Failures are not very well handled. + def Exec(self, feedback_fn): + """Execute disk replacement. + + This dispatches the disk replacement to the appropriate handler. """ - steps_total = 6 - warning, info = (self.proc.LogWarning, self.proc.LogInfo) - instance = self.instance - iv_names = {} + if not self.disks: + feedback_fn("No disks need replacement") + return + + feedback_fn("Replacing disk(s) %s for %s" % + (", ".join([str(i) for i in self.disks]), self.instance.name)) + + activate_disks = (not self.instance.admin_up) + + # Activate the instance disks if we're replacing them on a down instance + if activate_disks: + _StartInstanceDisks(self.lu, self.instance, True) + + try: + # Should we replace the secondary node? + if self.new_node is not None: + return self._ExecDrbd8Secondary() + else: + return self._ExecDrbd8DiskOnly() + + finally: + # Deactivate the instance disks if we're replacing them on a down instance + if activate_disks: + _SafeShutdownInstanceDisks(self.lu, self.instance) + + def _CheckVolumeGroup(self, nodes): + self.lu.LogInfo("Checking volume groups") + vgname = self.cfg.GetVGName() - # start of work - cfg = self.cfg - tgt_node = self.tgt_node - oth_node = self.oth_node - # Step: check device activation - self.proc.LogStep(1, steps_total, "check device existence") - info("checking volume groups") - my_vg = cfg.GetVGName() - results = self.rpc.call_vg_list([oth_node, tgt_node]) + # Make sure volume group exists on all involved nodes + results = self.rpc.call_vg_list(nodes) if not results: raise errors.OpExecError("Can't list volume groups on the nodes") - for node in oth_node, tgt_node: + + for node in nodes: res = results[node] res.Raise("Error checking node %s" % node) - if my_vg not in res.payload: - raise errors.OpExecError("Volume group '%s' not found on %s" % - (my_vg, node)) - for idx, dev in enumerate(instance.disks): - if idx not in self.op.disks: + if vgname not in res.payload: + raise errors.OpExecError("Volume group '%s' not found on node %s" % + (vgname, node)) + + def _CheckDisksExistence(self, nodes): + # Check disk existence + for idx, dev in enumerate(self.instance.disks): + if idx not in self.disks: continue - for node in tgt_node, oth_node: - info("checking disk/%d on %s" % (idx, node)) - cfg.SetDiskID(dev, node) + + for node in nodes: + self.lu.LogInfo("Checking disk/%d on %s" % (idx, node)) + self.cfg.SetDiskID(dev, node) + result = self.rpc.call_blockdev_find(node, dev) + msg = result.fail_msg - if not msg and not result.payload: - msg = "disk not found" - if msg: + if msg or not result.payload: + if not msg: + msg = "disk not found" raise errors.OpExecError("Can't find disk/%d on node %s: %s" % (idx, node, msg)) - # Step: check other node consistency - self.proc.LogStep(2, steps_total, "check peer consistency") - for idx, dev in enumerate(instance.disks): - if idx not in self.op.disks: + def _CheckDisksConsistency(self, node_name, on_primary, ldisk): + for idx, dev in enumerate(self.instance.disks): + if idx not in self.disks: continue - info("checking disk/%d consistency on %s" % (idx, oth_node)) - if not _CheckDiskConsistency(self, dev, oth_node, - oth_node==instance.primary_node): - raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe" - " to replace disks on this node (%s)" % - (oth_node, tgt_node)) - # Step: create new storage - self.proc.LogStep(3, steps_total, "allocate new storage") - for idx, dev in enumerate(instance.disks): - if idx not in self.op.disks: + self.lu.LogInfo("Checking disk/%d consistency on node %s" % + (idx, node_name)) + + if not _CheckDiskConsistency(self.lu, dev, node_name, on_primary, + ldisk=ldisk): + raise errors.OpExecError("Node %s has degraded storage, unsafe to" + " replace disks for instance %s" % + (node_name, self.instance.name)) + + def _CreateNewStorage(self, node_name): + vgname = self.cfg.GetVGName() + iv_names = {} + + for idx, dev in enumerate(self.instance.disks): + if idx not in self.disks: continue - size = dev.size - cfg.SetDiskID(dev, tgt_node) - lv_names = [".disk%d_%s" % (idx, suf) - for suf in ["data", "meta"]] - names = _GenerateUniqueNames(self, lv_names) - lv_data = objects.Disk(dev_type=constants.LD_LV, size=size, + + self.lu.LogInfo("Adding storage on %s for disk/%d" % (node_name, idx)) + + self.cfg.SetDiskID(dev, node_name) + + lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]] + names = _GenerateUniqueNames(self.lu, lv_names) + + lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size, logical_id=(vgname, names[0])) lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128, logical_id=(vgname, names[1])) + new_lvs = [lv_data, lv_meta] old_lvs = dev.children iv_names[dev.iv_name] = (dev, old_lvs, new_lvs) - info("creating new local storage on %s for %s" % - (tgt_node, dev.iv_name)) + # we pass force_create=True to force the LVM creation for new_lv in new_lvs: - _CreateBlockDev(self, tgt_node, instance, new_lv, True, - _GetInstanceInfoText(instance), False) + _CreateBlockDev(self.lu, node_name, self.instance, new_lv, True, + _GetInstanceInfoText(self.instance), False) + + return iv_names + + def _CheckDevices(self, node_name, iv_names): + for name, (dev, old_lvs, new_lvs) in iv_names.iteritems(): + self.cfg.SetDiskID(dev, node_name) + + result = self.rpc.call_blockdev_find(node_name, dev) + + msg = result.fail_msg + if msg or not result.payload: + if not msg: + msg = "disk not found" + raise errors.OpExecError("Can't find DRBD device %s: %s" % + (name, msg)) + + if result.payload.is_degraded: + raise errors.OpExecError("DRBD device %s is degraded!" % name) + + def _RemoveOldStorage(self, node_name, iv_names): + for name, (dev, old_lvs, _) in iv_names.iteritems(): + self.lu.LogInfo("Remove logical volumes for %s" % name) + + for lv in old_lvs: + self.cfg.SetDiskID(lv, node_name) + + msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg + if msg: + self.lu.LogWarning("Can't remove old LV: %s" % msg, + hint="remove unused LVs manually") + + def _ExecDrbd8DiskOnly(self): + """Replace a disk on the primary or secondary for DRBD 8. + + The algorithm for replace is quite complicated: + + 1. for each disk to be replaced: + + 1. create new LVs on the target node with unique names + 1. detach old LVs from the drbd device + 1. rename old LVs to name_replaced. + 1. rename new LVs to old LVs + 1. attach the new LVs (with the old names now) to the drbd device + + 1. wait for sync across all devices + + 1. for each modified disk: + + 1. remove old LVs (which have the name name_replaces.) + + Failures are not very well handled. + + """ + steps_total = 6 + + # Step: check device activation + self.lu.LogStep(1, steps_total, "Check device existence") + self._CheckDisksExistence([self.other_node, self.target_node]) + self._CheckVolumeGroup([self.target_node, self.other_node]) + + # Step: check other node consistency + self.lu.LogStep(2, steps_total, "Check peer consistency") + self._CheckDisksConsistency(self.other_node, + self.other_node == self.instance.primary_node, + False) + + # Step: create new storage + self.lu.LogStep(3, steps_total, "Allocate new storage") + iv_names = self._CreateNewStorage(self.target_node) # Step: for each lv, detach+rename*2+attach - self.proc.LogStep(4, steps_total, "change drbd configuration") + self.lu.LogStep(4, steps_total, "Changing drbd configuration") for dev, old_lvs, new_lvs in iv_names.itervalues(): - info("detaching %s drbd from local storage" % dev.iv_name) - result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs) + self.lu.LogInfo("Detaching %s drbd from local storage" % dev.iv_name) + + result = self.rpc.call_blockdev_removechildren(self.target_node, dev, old_lvs) result.Raise("Can't detach drbd from local storage on node" - " %s for device %s" % (tgt_node, dev.iv_name)) + " %s for device %s" % (self.target_node, dev.iv_name)) #dev.children = [] #cfg.Update(instance) @@ -5384,81 +6449,66 @@ class LUReplaceDisks(LogicalUnit): temp_suffix = int(time.time()) ren_fn = lambda d, suff: (d.physical_id[0], d.physical_id[1] + "_replaced-%s" % suff) - # build the rename list based on what LVs exist on the node - rlist = [] + + # Build the rename list based on what LVs exist on the node + rename_old_to_new = [] for to_ren in old_lvs: - result = self.rpc.call_blockdev_find(tgt_node, to_ren) + result = self.rpc.call_blockdev_find(self.target_node, to_ren) if not result.fail_msg and result.payload: # device exists - rlist.append((to_ren, ren_fn(to_ren, temp_suffix))) + rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix))) + + self.lu.LogInfo("Renaming the old LVs on the target node") + result = self.rpc.call_blockdev_rename(self.target_node, rename_old_to_new) + result.Raise("Can't rename old LVs on node %s" % self.target_node) - info("renaming the old LVs on the target node") - result = self.rpc.call_blockdev_rename(tgt_node, rlist) - result.Raise("Can't rename old LVs on node %s" % tgt_node) - # now we rename the new LVs to the old LVs - info("renaming the new LVs on the target node") - rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)] - result = self.rpc.call_blockdev_rename(tgt_node, rlist) - result.Raise("Can't rename new LVs on node %s" % tgt_node) + # Now we rename the new LVs to the old LVs + self.lu.LogInfo("Renaming the new LVs on the target node") + rename_new_to_old = [(new, old.physical_id) + for old, new in zip(old_lvs, new_lvs)] + result = self.rpc.call_blockdev_rename(self.target_node, rename_new_to_old) + result.Raise("Can't rename new LVs on node %s" % self.target_node) for old, new in zip(old_lvs, new_lvs): new.logical_id = old.logical_id - cfg.SetDiskID(new, tgt_node) + self.cfg.SetDiskID(new, self.target_node) for disk in old_lvs: disk.logical_id = ren_fn(disk, temp_suffix) - cfg.SetDiskID(disk, tgt_node) + self.cfg.SetDiskID(disk, self.target_node) - # now that the new lvs have the old name, we can add them to the device - info("adding new mirror component on %s" % tgt_node) - result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs) + # Now that the new lvs have the old name, we can add them to the device + self.lu.LogInfo("Adding new mirror component on %s" % self.target_node) + result = self.rpc.call_blockdev_addchildren(self.target_node, dev, new_lvs) msg = result.fail_msg if msg: for new_lv in new_lvs: - msg2 = self.rpc.call_blockdev_remove(tgt_node, new_lv).fail_msg + msg2 = self.rpc.call_blockdev_remove(self.target_node, new_lv).fail_msg if msg2: - warning("Can't rollback device %s: %s", dev, msg2, - hint="cleanup manually the unused logical volumes") + self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2, + hint=("cleanup manually the unused logical" + "volumes")) raise errors.OpExecError("Can't add local storage to drbd: %s" % msg) dev.children = new_lvs - cfg.Update(instance) - # Step: wait for sync + self.cfg.Update(self.instance) - # this can fail as the old devices are degraded and _WaitForSync - # does a combined result over all disks, so we don't check its - # return value - self.proc.LogStep(5, steps_total, "sync devices") - _WaitForSync(self, instance, unlock=True) + # Wait for sync + # This can fail as the old devices are degraded and _WaitForSync + # does a combined result over all disks, so we don't check its return value + self.lu.LogStep(5, steps_total, "Sync devices") + _WaitForSync(self.lu, self.instance, unlock=True) - # so check manually all the devices - for name, (dev, old_lvs, new_lvs) in iv_names.iteritems(): - cfg.SetDiskID(dev, instance.primary_node) - result = self.rpc.call_blockdev_find(instance.primary_node, dev) - msg = result.fail_msg - if not msg and not result.payload: - msg = "disk not found" - if msg: - raise errors.OpExecError("Can't find DRBD device %s: %s" % - (name, msg)) - if result.payload[5]: - raise errors.OpExecError("DRBD device %s is degraded!" % name) + # Check all devices manually + self._CheckDevices(self.instance.primary_node, iv_names) # Step: remove old storage - self.proc.LogStep(6, steps_total, "removing old storage") - for name, (dev, old_lvs, new_lvs) in iv_names.iteritems(): - info("remove logical volumes for %s" % name) - for lv in old_lvs: - cfg.SetDiskID(lv, tgt_node) - msg = self.rpc.call_blockdev_remove(tgt_node, lv).fail_msg - if msg: - warning("Can't remove old LV: %s" % msg, - hint="manually remove unused LVs") - continue + self.lu.LogStep(6, steps_total, "Removing old storage") + self._RemoveOldStorage(self.target_node, iv_names) - def _ExecD8Secondary(self, feedback_fn): - """Replace the secondary node for drbd8. + def _ExecDrbd8Secondary(self): + """Replace the secondary node for DRBD 8. The algorithm for replace is quite complicated: - for all disks of the instance: @@ -5477,86 +6527,49 @@ class LUReplaceDisks(LogicalUnit): """ steps_total = 6 - warning, info = (self.proc.LogWarning, self.proc.LogInfo) - instance = self.instance - iv_names = {} - # start of work - cfg = self.cfg - old_node = self.tgt_node - new_node = self.new_node - pri_node = instance.primary_node - nodes_ip = { - old_node: self.cfg.GetNodeInfo(old_node).secondary_ip, - new_node: self.cfg.GetNodeInfo(new_node).secondary_ip, - pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip, - } # Step: check device activation - self.proc.LogStep(1, steps_total, "check device existence") - info("checking volume groups") - my_vg = cfg.GetVGName() - results = self.rpc.call_vg_list([pri_node, new_node]) - for node in pri_node, new_node: - res = results[node] - res.Raise("Error checking node %s" % node) - if my_vg not in res.payload: - raise errors.OpExecError("Volume group '%s' not found on %s" % - (my_vg, node)) - for idx, dev in enumerate(instance.disks): - if idx not in self.op.disks: - continue - info("checking disk/%d on %s" % (idx, pri_node)) - cfg.SetDiskID(dev, pri_node) - result = self.rpc.call_blockdev_find(pri_node, dev) - msg = result.fail_msg - if not msg and not result.payload: - msg = "disk not found" - if msg: - raise errors.OpExecError("Can't find disk/%d on node %s: %s" % - (idx, pri_node, msg)) + self.lu.LogStep(1, steps_total, "Check device existence") + self._CheckDisksExistence([self.instance.primary_node]) + self._CheckVolumeGroup([self.instance.primary_node]) # Step: check other node consistency - self.proc.LogStep(2, steps_total, "check peer consistency") - for idx, dev in enumerate(instance.disks): - if idx not in self.op.disks: - continue - info("checking disk/%d consistency on %s" % (idx, pri_node)) - if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True): - raise errors.OpExecError("Primary node (%s) has degraded storage," - " unsafe to replace the secondary" % - pri_node) + self.lu.LogStep(2, steps_total, "Check peer consistency") + self._CheckDisksConsistency(self.instance.primary_node, True, True) # Step: create new storage - self.proc.LogStep(3, steps_total, "allocate new storage") - for idx, dev in enumerate(instance.disks): - info("adding new local storage on %s for disk/%d" % - (new_node, idx)) + self.lu.LogStep(3, steps_total, "Allocate new storage") + for idx, dev in enumerate(self.instance.disks): + self.lu.LogInfo("Adding new local storage on %s for disk/%d" % + (self.new_node, idx)) # we pass force_create=True to force LVM creation for new_lv in dev.children: - _CreateBlockDev(self, new_node, instance, new_lv, True, - _GetInstanceInfoText(instance), False) + _CreateBlockDev(self.lu, self.new_node, self.instance, new_lv, True, + _GetInstanceInfoText(self.instance), False) # Step 4: dbrd minors and drbd setups changes # after this, we must manually remove the drbd minors on both the # error and the success paths - minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks], - instance.name) - logging.debug("Allocated minors %s" % (minors,)) - self.proc.LogStep(4, steps_total, "changing drbd configuration") - for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)): - info("activating a new drbd on %s for disk/%d" % (new_node, idx)) + self.lu.LogStep(4, steps_total, "Changing drbd configuration") + minors = self.cfg.AllocateDRBDMinor([self.new_node for dev in self.instance.disks], + self.instance.name) + logging.debug("Allocated minors %r" % (minors,)) + + iv_names = {} + for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)): + self.lu.LogInfo("activating a new drbd on %s for disk/%d" % (self.new_node, idx)) # create new devices on new_node; note that we create two IDs: # one without port, so the drbd will be activated without # networking information on the new node at this stage, and one # with network, for the latter activation in step 4 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id - if pri_node == o_node1: + if self.instance.primary_node == o_node1: p_minor = o_minor1 else: p_minor = o_minor2 - new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret) - new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret) + new_alone_id = (self.instance.primary_node, self.new_node, None, p_minor, new_minor, o_secret) + new_net_id = (self.instance.primary_node, self.new_node, o_port, p_minor, new_minor, o_secret) iv_names[idx] = (dev, dev.children, new_net_id) logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor, @@ -5566,106 +6579,124 @@ class LUReplaceDisks(LogicalUnit): children=dev.children, size=dev.size) try: - _CreateSingleBlockDev(self, new_node, instance, new_drbd, - _GetInstanceInfoText(instance), False) + _CreateSingleBlockDev(self.lu, self.new_node, self.instance, new_drbd, + _GetInstanceInfoText(self.instance), False) except errors.GenericError: - self.cfg.ReleaseDRBDMinors(instance.name) + self.cfg.ReleaseDRBDMinors(self.instance.name) raise - for idx, dev in enumerate(instance.disks): - # we have new devices, shutdown the drbd on the old secondary - info("shutting down drbd for disk/%d on old node" % idx) - cfg.SetDiskID(dev, old_node) - msg = self.rpc.call_blockdev_shutdown(old_node, dev).fail_msg + # We have new devices, shutdown the drbd on the old secondary + for idx, dev in enumerate(self.instance.disks): + self.lu.LogInfo("Shutting down drbd for disk/%d on old node" % idx) + self.cfg.SetDiskID(dev, self.target_node) + msg = self.rpc.call_blockdev_shutdown(self.target_node, dev).fail_msg if msg: - warning("Failed to shutdown drbd for disk/%d on old node: %s" % - (idx, msg), - hint="Please cleanup this device manually as soon as possible") + self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old" + "node: %s" % (idx, msg), + hint=("Please cleanup this device manually as" + " soon as possible")) - info("detaching primary drbds from the network (=> standalone)") - result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip, - instance.disks)[pri_node] + self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)") + result = self.rpc.call_drbd_disconnect_net([self.instance.primary_node], self.node_secondary_ip, + self.instance.disks)[self.instance.primary_node] msg = result.fail_msg if msg: # detaches didn't succeed (unlikely) - self.cfg.ReleaseDRBDMinors(instance.name) + self.cfg.ReleaseDRBDMinors(self.instance.name) raise errors.OpExecError("Can't detach the disks from the network on" " old node: %s" % (msg,)) # if we managed to detach at least one, we update all the disks of # the instance to point to the new secondary - info("updating instance configuration") + self.lu.LogInfo("Updating instance configuration") for dev, _, new_logical_id in iv_names.itervalues(): dev.logical_id = new_logical_id - cfg.SetDiskID(dev, pri_node) - cfg.Update(instance) + self.cfg.SetDiskID(dev, self.instance.primary_node) + + self.cfg.Update(self.instance) # and now perform the drbd attach - info("attaching primary drbds to new secondary (standalone => connected)") - result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip, - instance.disks, instance.name, + self.lu.LogInfo("Attaching primary drbds to new secondary" + " (standalone => connected)") + result = self.rpc.call_drbd_attach_net([self.instance.primary_node, self.new_node], self.node_secondary_ip, + self.instance.disks, self.instance.name, False) for to_node, to_result in result.items(): msg = to_result.fail_msg if msg: - warning("can't attach drbd disks on node %s: %s", to_node, msg, - hint="please do a gnt-instance info to see the" - " status of disks") - - # this can fail as the old devices are degraded and _WaitForSync - # does a combined result over all disks, so we don't check its - # return value - self.proc.LogStep(5, steps_total, "sync devices") - _WaitForSync(self, instance, unlock=True) - - # so check manually all the devices - for idx, (dev, old_lvs, _) in iv_names.iteritems(): - cfg.SetDiskID(dev, pri_node) - result = self.rpc.call_blockdev_find(pri_node, dev) - msg = result.fail_msg - if not msg and not result.payload: - msg = "disk not found" - if msg: - raise errors.OpExecError("Can't find DRBD device disk/%d: %s" % - (idx, msg)) - if result.payload[5]: - raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx) - - self.proc.LogStep(6, steps_total, "removing old storage") - for idx, (dev, old_lvs, _) in iv_names.iteritems(): - info("remove logical volumes for disk/%d" % idx) - for lv in old_lvs: - cfg.SetDiskID(lv, old_node) - msg = self.rpc.call_blockdev_remove(old_node, lv).fail_msg - if msg: - warning("Can't remove LV on old secondary: %s", msg, - hint="Cleanup stale volumes by hand") + self.lu.LogWarning("Can't attach drbd disks on node %s: %s", to_node, msg, + hint=("please do a gnt-instance info to see the" + " status of disks")) - def Exec(self, feedback_fn): - """Execute disk replacement. + # Wait for sync + # This can fail as the old devices are degraded and _WaitForSync + # does a combined result over all disks, so we don't check its return value + self.lu.LogStep(5, steps_total, "Sync devices") + _WaitForSync(self.lu, self.instance, unlock=True) - This dispatches the disk replacement to the appropriate handler. + # Check all devices manually + self._CheckDevices(self.instance.primary_node, iv_names) - """ - instance = self.instance + # Step: remove old storage + self.lu.LogStep(6, steps_total, "Removing old storage") + self._RemoveOldStorage(self.target_node, iv_names) - # Activate the instance disks if we're replacing them on a down instance - if not instance.admin_up: - _StartInstanceDisks(self, instance, True) - if self.op.mode == constants.REPLACE_DISK_CHG: - fn = self._ExecD8Secondary - else: - fn = self._ExecD8DiskOnly +class LURepairNodeStorage(NoHooksLU): + """Repairs the volume group on a node. - ret = fn(feedback_fn) + """ + _OP_REQP = ["node_name"] + REQ_BGL = False - # Deactivate the instance disks if we're replacing them on a down instance - if not instance.admin_up: - _SafeShutdownInstanceDisks(self, instance) + def CheckArguments(self): + node_name = self.cfg.ExpandNodeName(self.op.node_name) + if node_name is None: + raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name) + + self.op.node_name = node_name + + def ExpandNames(self): + self.needed_locks = { + locking.LEVEL_NODE: [self.op.node_name], + } - return ret + def _CheckFaultyDisks(self, instance, node_name): + if _FindFaultyInstanceDisks(self.cfg, self.rpc, instance, + node_name, True): + raise errors.OpPrereqError("Instance '%s' has faulty disks on" + " node '%s'" % (inst.name, node_name)) + + def CheckPrereq(self): + """Check prerequisites. + + """ + storage_type = self.op.storage_type + + if (constants.SO_FIX_CONSISTENCY not in + constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])): + raise errors.OpPrereqError("Storage units of type '%s' can not be" + " repaired" % storage_type) + + # Check whether any instance on this node has faulty disks + for inst in _GetNodeInstances(self.cfg, self.op.node_name): + check_nodes = set(inst.all_nodes) + check_nodes.discard(self.op.node_name) + for inst_node_name in check_nodes: + self._CheckFaultyDisks(inst, inst_node_name) + + def Exec(self, feedback_fn): + feedback_fn("Repairing storage unit '%s' on %s ..." % + (self.op.name, self.op.node_name)) + + st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type) + result = self.rpc.call_storage_execute(self.op.node_name, + self.op.storage_type, st_args, + self.op.name, + constants.SO_FIX_CONSISTENCY) + result.Raise("Failed to repair storage unit '%s' on %s" % + (self.op.name, self.op.node_name)) class LUGrowDisk(LogicalUnit): @@ -5804,22 +6835,33 @@ class LUQueryInstanceData(NoHooksLU): in self.wanted_names] return + def _ComputeBlockdevStatus(self, node, instance_name, dev): + """Returns the status of a block device + + """ + if self.op.static or not node: + return None + + self.cfg.SetDiskID(dev, node) + + result = self.rpc.call_blockdev_find(node, dev) + if result.offline: + return None + + result.Raise("Can't compute disk status for %s" % instance_name) + + status = result.payload + if status is None: + return None + + return (status.dev_path, status.major, status.minor, + status.sync_percent, status.estimated_time, + status.is_degraded, status.ldisk_status) + def _ComputeDiskStatus(self, instance, snode, dev): """Compute block device status. """ - static = self.op.static - if not static: - self.cfg.SetDiskID(dev, instance.primary_node) - dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev) - if dev_pstatus.offline: - dev_pstatus = None - else: - dev_pstatus.Raise("Can't compute disk status for %s" % instance.name) - dev_pstatus = dev_pstatus.payload - else: - dev_pstatus = None - if dev.dev_type in constants.LDS_DRBD: # we change the snode then (otherwise we use the one passed in) if dev.logical_id[0] == instance.primary_node: @@ -5827,16 +6869,9 @@ class LUQueryInstanceData(NoHooksLU): else: snode = dev.logical_id[0] - if snode and not static: - self.cfg.SetDiskID(dev, snode) - dev_sstatus = self.rpc.call_blockdev_find(snode, dev) - if dev_sstatus.offline: - dev_sstatus = None - else: - dev_sstatus.Raise("Can't compute disk status for %s" % instance.name) - dev_sstatus = dev_sstatus.payload - else: - dev_sstatus = None + dev_pstatus = self._ComputeBlockdevStatus(instance.primary_node, + instance.name, dev) + dev_sstatus = self._ComputeBlockdevStatus(snode, instance.name, dev) if dev.children: dev_children = [self._ComputeDiskStatus(instance, snode, child) @@ -5901,6 +6936,9 @@ class LUQueryInstanceData(NoHooksLU): "hv_actual": cluster.FillHV(instance), "be_instance": instance.beparams, "be_actual": cluster.FillBE(instance), + "serial_no": instance.serial_no, + "mtime": instance.mtime, + "ctime": instance.ctime, } result[instance.name] = idict @@ -6548,6 +7586,8 @@ class LUExportInstance(LogicalUnit): for disk in instance.disks: self.cfg.SetDiskID(disk, src_node) + # per-disk results + dresults = [] try: for idx, disk in enumerate(instance.disks): # result.payload will be a snapshot of an lvm leaf of the one we passed @@ -6583,16 +7623,23 @@ class LUExportInstance(LogicalUnit): if msg: self.LogWarning("Could not export disk/%s from node %s to" " node %s: %s", idx, src_node, dst_node.name, msg) + dresults.append(False) + else: + dresults.append(True) msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg if msg: self.LogWarning("Could not remove snapshot for disk/%d from node" " %s: %s", idx, src_node, msg) + else: + dresults.append(False) result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks) + fin_resu = True msg = result.fail_msg if msg: self.LogWarning("Could not finalize export for instance %s" " on node %s: %s", instance.name, dst_node.name, msg) + fin_resu = False nodelist = self.cfg.GetNodeList() nodelist.remove(dst_node.name) @@ -6611,6 +7658,7 @@ class LUExportInstance(LogicalUnit): if msg: self.LogWarning("Could not remove older export for instance %s" " on node %s: %s", iname, node, msg) + return fin_resu, dresults class LURemoveExport(NoHooksLU): @@ -6895,8 +7943,9 @@ class IAllocator(object): "relocate_from", ] - def __init__(self, lu, mode, name, **kwargs): - self.lu = lu + def __init__(self, cfg, rpc, mode, name, **kwargs): + self.cfg = cfg + self.rpc = rpc # init buffer variables self.in_text = self.out_text = self.in_data = self.out_data = None # init all input fields so that pylint is happy @@ -6934,7 +7983,7 @@ class IAllocator(object): This is the data that is independent of the actual operation. """ - cfg = self.lu.cfg + cfg = self.cfg cluster_info = cfg.GetClusterInfo() # cluster data data = { @@ -6956,10 +8005,11 @@ class IAllocator(object): elif self.mode == constants.IALLOCATOR_MODE_RELOC: hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor - node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(), - hypervisor_name) - node_iinfo = self.lu.rpc.call_all_instances_info(node_list, - cluster_info.enabled_hypervisors) + node_data = self.rpc.call_node_info(node_list, cfg.GetVGName(), + hypervisor_name) + node_iinfo = \ + self.rpc.call_all_instances_info(node_list, + cluster_info.enabled_hypervisors) for nname, nresult in node_data.items(): # first fill in static (config-based) values ninfo = cfg.GetNodeInfo(nname) @@ -6972,11 +8022,12 @@ class IAllocator(object): "master_candidate": ninfo.master_candidate, } - if not ninfo.offline: + if not (ninfo.offline or ninfo.drained): nresult.Raise("Can't get data for node %s" % nname) node_iinfo[nname].Raise("Can't get node instance info from node %s" % nname) remote_info = nresult.payload + for attr in ['memory_total', 'memory_free', 'memory_dom0', 'vg_size', 'vg_free', 'cpu_total']: if attr not in remote_info: @@ -7096,7 +8147,7 @@ class IAllocator(object): done. """ - instance = self.lu.cfg.GetInstanceInfo(self.name) + instance = self.cfg.GetInstanceInfo(self.name) if instance is None: raise errors.ProgrammerError("Unknown instance '%s' passed to" " IAllocator" % self.name) @@ -7138,9 +8189,9 @@ class IAllocator(object): """ if call_fn is None: - call_fn = self.lu.rpc.call_iallocator_runner + call_fn = self.rpc.call_iallocator_runner - result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text) + result = call_fn(self.cfg.GetMasterNode(), name, self.in_text) result.Raise("Failure while running the iallocator script") self.out_text = result.payload @@ -7244,7 +8295,7 @@ class LUTestAllocator(NoHooksLU): """ if self.op.mode == constants.IALLOCATOR_MODE_ALLOC: - ial = IAllocator(self, + ial = IAllocator(self.cfg, self.rpc, mode=self.op.mode, name=self.op.name, mem_size=self.op.mem_size, @@ -7257,7 +8308,7 @@ class LUTestAllocator(NoHooksLU): hypervisor=self.op.hypervisor, ) else: - ial = IAllocator(self, + ial = IAllocator(self.cfg, self.rpc, mode=self.op.mode, name=self.op.name, relocate_from=list(self.relocate_from),