import os
import os.path
import time
-import tempfile
import re
import platform
import logging
import copy
-import random
from ganeti import ssh
from ganeti import utils
from ganeti import locking
from ganeti import constants
from ganeti import objects
-from ganeti import opcodes
from ganeti import serializer
from ganeti import ssconf
Subclasses must follow these rules:
- implement ExpandNames
- - implement CheckPrereq
- - implement Exec
+ - implement CheckPrereq (except when tasklets are used)
+ - implement Exec (except when tasklets are used)
- implement BuildHooksEnv
- redefine HPATH and HTYPE
- optionally redefine their run requirements:
Note that all commands require root permissions.
+ @ivar dry_run_result: the value (if any) that will be returned to the caller
+ in dry-run mode (signalled by opcode dry_run parameter)
+
"""
HPATH = None
HTYPE = None
def __init__(self, processor, op, context, rpc):
"""Constructor for LogicalUnit.
- This needs to be overriden in derived classes in order to check op
+ This needs to be overridden in derived classes in order to check op
validity.
"""
# Dicts used to declare locking needs to mcpu
self.needed_locks = None
self.acquired_locks = {}
- self.share_locks = dict(((i, 0) for i in locking.LEVELS))
+ self.share_locks = dict.fromkeys(locking.LEVELS, 0)
self.add_locks = {}
self.remove_locks = {}
# Used to force good behavior when calling helper functions
# logging
self.LogWarning = processor.LogWarning
self.LogInfo = processor.LogInfo
+ self.LogStep = processor.LogStep
+ # support for dry-run
+ self.dry_run_result = None
+
+ # Tasklets
+ self.tasklets = None
for attr_name in self._OP_REQP:
attr_val = getattr(op, attr_name, None)
if attr_val is None:
raise errors.OpPrereqError("Required parameter '%s' missing" %
attr_name)
+
self.CheckArguments()
def __GetSSH(self):
CheckPrereq, doing these separate is better because:
- ExpandNames is left as as purely a lock-related function
- - CheckPrereq is run after we have aquired locks (and possible
+ - CheckPrereq is run after we have acquired locks (and possible
waited for them)
The function is allowed to change the self.op attribute so that
level you can modify self.share_locks, setting a true value (usually 1) for
that level. By default locks are not shared.
+ This function can also define a list of tasklets, which then will be
+ executed in order instead of the usual LU-level CheckPrereq and Exec
+ functions, if those are not defined by the LU.
+
Examples::
# Acquire all nodes and one instance
their canonical form if it hasn't been done by ExpandNames before.
"""
- raise NotImplementedError
+ if self.tasklets is not None:
+ for (idx, tl) in enumerate(self.tasklets):
+ logging.debug("Checking prerequisites for tasklet %s/%s",
+ idx + 1, len(self.tasklets))
+ tl.CheckPrereq()
+ else:
+ raise NotImplementedError
def Exec(self, feedback_fn):
"""Execute the LU.
code, or expected.
"""
- raise NotImplementedError
+ if self.tasklets is not None:
+ for (idx, tl) in enumerate(self.tasklets):
+ logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
+ tl.Exec(feedback_fn)
+ else:
+ raise NotImplementedError
def BuildHooksEnv(self):
"""Build hooks environment for this LU.
HTYPE = None
+class Tasklet:
+ """Tasklet base class.
+
+ Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
+ they can mix legacy code with tasklets. Locking needs to be done in the LU,
+ tasklets know nothing about locks.
+
+ Subclasses must follow these rules:
+ - Implement CheckPrereq
+ - Implement Exec
+
+ """
+ def __init__(self, lu):
+ self.lu = lu
+
+ # Shortcuts
+ self.cfg = lu.cfg
+ self.rpc = lu.rpc
+
+ def CheckPrereq(self):
+ """Check prerequisites for this tasklets.
+
+ This method should check whether the prerequisites for the execution of
+ this tasklet are fulfilled. It can do internode communication, but it
+ should be idempotent - no cluster or system changes are allowed.
+
+ The method should raise errors.OpPrereqError in case something is not
+ fulfilled. Its return value is ignored.
+
+ This method should also update all parameters to their canonical form if it
+ hasn't been done before.
+
+ """
+ raise NotImplementedError
+
+ def Exec(self, feedback_fn):
+ """Execute the tasklet.
+
+ This method should implement the actual work. It should raise
+ errors.OpExecError for failures that are somewhat dealt with in code, or
+ expected.
+
+ """
+ raise NotImplementedError
+
+
def _GetWantedNodes(lu, nodes):
"""Returns list of checked and expanded node names.
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
- memory, vcpus, nics, disk_template, disks):
+ memory, vcpus, nics, disk_template, disks,
+ bep, hvp, hypervisor_name):
"""Builds instance related env variables for hooks
This builds the hook environment from individual variables.
@type vcpus: string
@param vcpus: the count of VCPUs the instance has
@type nics: list
- @param nics: list of tuples (ip, bridge, mac) representing
- the NICs the instance has
+ @param nics: list of tuples (ip, mac, mode, link) representing
+ the NICs the instance has
@type disk_template: string
- @param disk_template: the distk template of the instance
+ @param disk_template: the disk template of the instance
@type disks: list
@param disks: the list of (size, mode) pairs
+ @type bep: dict
+ @param bep: the backend parameters for the instance
+ @type hvp: dict
+ @param hvp: the hypervisor parameters for the instance
+ @type hypervisor_name: string
+ @param hypervisor_name: the hypervisor for the instance
@rtype: dict
@return: the hook environment for this instance
"INSTANCE_MEMORY": memory,
"INSTANCE_VCPUS": vcpus,
"INSTANCE_DISK_TEMPLATE": disk_template,
+ "INSTANCE_HYPERVISOR": hypervisor_name,
}
if nics:
env["INSTANCE_DISK_COUNT"] = disk_count
+ for source, kind in [(bep, "BE"), (hvp, "HV")]:
+ for key, value in source.items():
+ env["INSTANCE_%s_%s" % (kind, key)] = value
+
return env
-def _PreBuildNICHooksList(lu, nics):
+
+def _NICListToTuple(lu, nics):
"""Build a list of nic information tuples.
- This list is suitable to be passed to _BuildInstanceHookEnv.
+ This list is suitable to be passed to _BuildInstanceHookEnv or as a return
+ value in LUQueryInstanceData.
@type lu: L{LogicalUnit}
@param lu: the logical unit on whose behalf we execute
hooks_nics.append((ip, mac, mode, link))
return hooks_nics
+
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
"""Builds instance related env variables for hooks from an object.
@return: the hook environment dictionary
"""
- bep = lu.cfg.GetClusterInfo().FillBE(instance)
+ cluster = lu.cfg.GetClusterInfo()
+ bep = cluster.FillBE(instance)
+ hvp = cluster.FillHV(instance)
args = {
'name': instance.name,
'primary_node': instance.primary_node,
'status': instance.admin_up,
'memory': bep[constants.BE_MEMORY],
'vcpus': bep[constants.BE_VCPUS],
- 'nics': _PreBuildNICHooksList(lu, instance.nics),
+ 'nics': _NICListToTuple(lu, instance.nics),
'disk_template': instance.disk_template,
'disks': [(disk.size, disk.mode) for disk in instance.disks],
+ 'bep': bep,
+ 'hvp': hvp,
+ 'hypervisor_name': instance.hypervisor,
}
if override:
args.update(override)
"""
if node is None:
- node=instance.primary_node
+ node = instance.primary_node
_CheckNicsBridgesExist(lu, instance.nics, node)
+def _GetNodeInstancesInner(cfg, fn):
+ return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
+
+
+def _GetNodeInstances(cfg, node_name):
+ """Returns a list of all primary and secondary instances on a node.
+
+ """
+
+ return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
+
+
+def _GetNodePrimaryInstances(cfg, node_name):
+ """Returns primary instances on a node.
+
+ """
+ return _GetNodeInstancesInner(cfg,
+ lambda inst: node_name == inst.primary_node)
+
+
+def _GetNodeSecondaryInstances(cfg, node_name):
+ """Returns secondary instances on a node.
+
+ """
+ return _GetNodeInstancesInner(cfg,
+ lambda inst: node_name in inst.secondary_nodes)
+
+
+def _GetStorageTypeArgs(cfg, storage_type):
+ """Returns the arguments for a storage type.
+
+ """
+ # Special case for file storage
+ if storage_type == constants.ST_FILE:
+ # storage.FileStorage wants a list of storage directories
+ return [[cfg.GetFileStorageDir()]]
+
+ return []
+
+
+def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
+ faulty = []
+
+ for dev in instance.disks:
+ cfg.SetDiskID(dev, node_name)
+
+ result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
+ result.Raise("Failed to get disk status from node %s" % node_name,
+ prereq=prereq)
+
+ for idx, bdev_status in enumerate(result.payload):
+ if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
+ faulty.append(idx)
+
+ return faulty
+
+
+class LUPostInitCluster(LogicalUnit):
+ """Logical unit for running hooks after cluster initialization.
+
+ """
+ HPATH = "cluster-init"
+ HTYPE = constants.HTYPE_CLUSTER
+ _OP_REQP = []
+
+ def BuildHooksEnv(self):
+ """Build hooks env.
+
+ """
+ env = {"OP_TARGET": self.cfg.GetClusterName()}
+ mn = self.cfg.GetMasterNode()
+ return env, [], [mn]
+
+ def CheckPrereq(self):
+ """No prerequisites to check.
+
+ """
+ return True
+
+ def Exec(self, feedback_fn):
+ """Nothing to do.
+
+ """
+ return True
+
+
class LUDestroyCluster(NoHooksLU):
"""Logical unit for destroying the cluster.
This checks whether the cluster is empty.
- Any errors are signalled by raising errors.OpPrereqError.
+ Any errors are signaled by raising errors.OpPrereqError.
"""
master = self.cfg.GetMasterNode()
locking.LEVEL_NODE: locking.ALL_SET,
locking.LEVEL_INSTANCE: locking.ALL_SET,
}
- self.share_locks = dict(((i, 1) for i in locking.LEVELS))
+ self.share_locks = dict.fromkeys(locking.LEVELS, 1)
def _VerifyNode(self, nodeinfo, file_list, local_cksum,
node_result, feedback_fn, master_files,
Test list:
- compares ganeti version
- - checks vg existance and size > 20G
+ - checks vg existence and size > 20G
- checks config file checksum
- checks ssh to other nodes
else:
# not candidate and this is not a must-have file
bad = True
- feedback_fn(" - ERROR: non master-candidate has old/wrong file"
- " '%s'" % file_name)
+ feedback_fn(" - ERROR: file '%s' should not exist on non master"
+ " candidates (and the file is outdated)" % file_name)
else:
# all good, except non-master/non-must have combination
if not node_is_mc and not must_have_file:
if bep[constants.BE_AUTO_BALANCE]:
needed_mem += bep[constants.BE_MEMORY]
if nodeinfo['mfree'] < needed_mem:
- feedback_fn(" - ERROR: not enough memory on node %s to accomodate"
+ feedback_fn(" - ERROR: not enough memory on node %s to accommodate"
" failovers should node %s fail" % (node, prinode))
bad = True
return bad
def BuildHooksEnv(self):
"""Build hooks env.
- Cluster-Verify hooks just rone in the post phase and their failure makes
+ Cluster-Verify hooks just ran in the post phase and their failure makes
the output be logged in the verify output and the verification to fail.
"""
return not bad
def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
- """Analize the post-hooks' result
+ """Analyze the post-hooks' result
This method analyses the hook result, handles it, and sends some
nicely-formatted feedback back to the user.
locking.LEVEL_NODE: locking.ALL_SET,
locking.LEVEL_INSTANCE: locking.ALL_SET,
}
- self.share_locks = dict(((i, 1) for i in locking.LEVELS))
+ self.share_locks = dict.fromkeys(locking.LEVELS, 1)
def CheckPrereq(self):
"""Check prerequisites.
if not nv_dict:
return result
- node_lvs = self.rpc.call_volume_list(nodes, vg_name)
+ node_lvs = self.rpc.call_lv_list(nodes, vg_name)
- to_act = set()
for node in nodes:
# node_volume
node_res = node_lvs[node]
return result
+class LURepairDiskSizes(NoHooksLU):
+ """Verifies the cluster disks sizes.
+
+ """
+ _OP_REQP = ["instances"]
+ REQ_BGL = False
+
+ def ExpandNames(self):
+
+ if not isinstance(self.op.instances, list):
+ raise errors.OpPrereqError("Invalid argument type 'instances'")
+
+ if self.op.instances:
+ self.wanted_names = []
+ for name in self.op.instances:
+ full_name = self.cfg.ExpandInstanceName(name)
+ if full_name is None:
+ raise errors.OpPrereqError("Instance '%s' not known" % name)
+ self.wanted_names.append(full_name)
+ self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
+ self.needed_locks = {
+ locking.LEVEL_NODE: [],
+ locking.LEVEL_INSTANCE: self.wanted_names,
+ }
+ self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+ else:
+ self.wanted_names = None
+ self.needed_locks = {
+ locking.LEVEL_NODE: locking.ALL_SET,
+ locking.LEVEL_INSTANCE: locking.ALL_SET,
+ }
+ self.share_locks = dict(((i, 1) for i in locking.LEVELS))
+
+ def DeclareLocks(self, level):
+ if level == locking.LEVEL_NODE and self.wanted_names is not None:
+ self._LockInstancesNodes(primary_only=True)
+
+ def CheckPrereq(self):
+ """Check prerequisites.
+
+ This only checks the optional instance list against the existing names.
+
+ """
+ if self.wanted_names is None:
+ self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
+
+ self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
+ in self.wanted_names]
+
+ def Exec(self, feedback_fn):
+ """Verify the size of cluster disks.
+
+ """
+ # TODO: check child disks too
+ # TODO: check differences in size between primary/secondary nodes
+ per_node_disks = {}
+ for instance in self.wanted_instances:
+ pnode = instance.primary_node
+ if pnode not in per_node_disks:
+ per_node_disks[pnode] = []
+ for idx, disk in enumerate(instance.disks):
+ per_node_disks[pnode].append((instance, idx, disk))
+
+ changed = []
+ for node, dskl in per_node_disks.items():
+ result = self.rpc.call_blockdev_getsizes(node, [v[2] for v in dskl])
+ if result.failed:
+ self.LogWarning("Failure in blockdev_getsizes call to node"
+ " %s, ignoring", node)
+ continue
+ if len(result.data) != len(dskl):
+ self.LogWarning("Invalid result from node %s, ignoring node results",
+ node)
+ continue
+ for ((instance, idx, disk), size) in zip(dskl, result.data):
+ if size is None:
+ self.LogWarning("Disk %d of instance %s did not return size"
+ " information, ignoring", idx, instance.name)
+ continue
+ if not isinstance(size, (int, long)):
+ self.LogWarning("Disk %d of instance %s did not return valid"
+ " size information, ignoring", idx, instance.name)
+ continue
+ size = size >> 20
+ if size != disk.size:
+ self.LogInfo("Disk %d of instance %s has mismatched size,"
+ " correcting: recorded %d, actual %d", idx,
+ instance.name, disk.size, size)
+ disk.size = size
+ self.cfg.Update(instance)
+ changed.append((instance.name, idx, size))
+ return changed
+
+
class LURenameCluster(LogicalUnit):
"""Rename the cluster.
result = self.rpc.call_upload_file(node_list,
constants.SSH_KNOWN_HOSTS_FILE)
for to_node, to_result in result.iteritems():
- msg = to_result.fail_msg
- if msg:
- msg = ("Copy of file %s to node %s failed: %s" %
- (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
- self.proc.LogWarning(msg)
+ msg = to_result.fail_msg
+ if msg:
+ msg = ("Copy of file %s to node %s failed: %s" %
+ (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
+ self.proc.LogWarning(msg)
finally:
- result = self.rpc.call_node_start_master(master, False)
+ result = self.rpc.call_node_start_master(master, False, False)
msg = result.fail_msg
if msg:
self.LogWarning("Could not re-enable the master role on"
@type disk: L{objects.Disk}
@param disk: the disk to check
- @rtype: booleean
+ @rtype: boolean
@return: boolean indicating whether a LD_LV dev_type was found or not
"""
if self.op.enabled_hypervisors is not None:
self.hv_list = self.op.enabled_hypervisors
+ if not self.hv_list:
+ raise errors.OpPrereqError("Enabled hypervisors list must contain at"
+ " least one member")
+ invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
+ if invalid_hvs:
+ raise errors.OpPrereqError("Enabled hypervisors contains invalid"
+ " entries: %s" % invalid_hvs)
else:
self.hv_list = cluster.enabled_hypervisors
if self.op.candidate_pool_size is not None:
self.cluster.candidate_pool_size = self.op.candidate_pool_size
+ # we need to update the pool size here, otherwise the save will fail
+ _AdjustCandidatePool(self)
self.cfg.Update(self.cluster)
- # we want to update nodes after the cluster so that if any errors
- # happen, we have recorded and saved the cluster info
- if self.op.candidate_pool_size is not None:
- _AdjustCandidatePool(self)
-
def _RedistributeAncillaryFiles(lu, additional_nodes=None):
"""Distribute additional files which are part of the cluster configuration.
constants.SSH_KNOWN_HOSTS_FILE,
constants.RAPI_CERT_FILE,
constants.RAPI_USERS_FILE,
+ constants.HMAC_CLUSTER_KEY,
])
enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
if os.path.exists(fname):
result = lu.rpc.call_upload_file(dist_nodes, fname)
for to_node, to_result in result.items():
- msg = to_result.fail_msg
- if msg:
- msg = ("Copy of file %s to node %s failed: %s" %
- (fname, to_node, msg))
- lu.proc.LogWarning(msg)
+ msg = to_result.fail_msg
+ if msg:
+ msg = ("Copy of file %s to node %s failed: %s" %
+ (fname, to_node, msg))
+ lu.proc.LogWarning(msg)
class LURedistributeConfig(NoHooksLU):
lu.cfg.SetDiskID(dev, node)
retries = 0
+ degr_retries = 10 # in seconds, as we sleep 1 second each time
while True:
max_time = 0
done = True
lu.LogWarning("Can't compute data for node %s/%s",
node, instance.disks[i].iv_name)
continue
- # we ignore the ldisk parameter
- perc_done, est_time, is_degraded, _ = mstat
- cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
- if perc_done is not None:
+
+ cumul_degraded = (cumul_degraded or
+ (mstat.is_degraded and mstat.sync_percent is None))
+ if mstat.sync_percent is not None:
done = False
- if est_time is not None:
- rem_time = "%d estimated seconds remaining" % est_time
- max_time = est_time
+ if mstat.estimated_time is not None:
+ rem_time = "%d estimated seconds remaining" % mstat.estimated_time
+ max_time = mstat.estimated_time
else:
rem_time = "no time estimate"
lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
- (instance.disks[i].iv_name, perc_done, rem_time))
+ (instance.disks[i].iv_name, mstat.sync_percent, rem_time))
+
+ # if we're done but degraded, let's do a few small retries, to
+ # make sure we see a stable and not transient situation; therefore
+ # we force restart of the loop
+ if (done or oneshot) and cumul_degraded and degr_retries > 0:
+ logging.info("Degraded disks found, %d retries left", degr_retries)
+ degr_retries -= 1
+ time.sleep(1)
+ continue
+
if done or oneshot:
break
"""
lu.cfg.SetDiskID(dev, node)
- if ldisk:
- idx = 6
- else:
- idx = 5
result = True
+
if on_primary or dev.AssembleOnSecondary():
rstats = lu.rpc.call_blockdev_find(node, dev)
msg = rstats.fail_msg
lu.LogWarning("Can't find disk on node %s", node)
result = False
else:
- result = result and (not rstats.payload[idx])
+ if ldisk:
+ result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
+ else:
+ result = result and not rstats.payload.is_degraded
+
if dev.children:
for child in dev.children:
result = result and _CheckDiskConsistency(lu, child, node, on_primary)
- it does not have primary or secondary instances
- it's not the master
- Any errors are signalled by raising errors.OpPrereqError.
+ Any errors are signaled by raising errors.OpPrereqError.
"""
node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
"name", "pinst_cnt", "sinst_cnt",
"pinst_list", "sinst_list",
"pip", "sip", "tags",
- "serial_no",
+ "serial_no", "ctime", "mtime",
"master_candidate",
"master",
"offline",
"drained",
+ "role",
)
def ExpandNames(self):
val = list(node.GetTags())
elif field == "serial_no":
val = node.serial_no
+ elif field == "ctime":
+ val = node.ctime
+ elif field == "mtime":
+ val = node.mtime
elif field == "master_candidate":
val = node.master_candidate
elif field == "master":
val = node.drained
elif self._FIELDS_DYNAMIC.Matches(field):
val = live_data[node.name].get(field, None)
+ elif field == "role":
+ if node.name == master_node:
+ val = "M"
+ elif node.master_candidate:
+ val = "C"
+ elif node.drained:
+ val = "D"
+ elif node.offline:
+ val = "O"
+ else:
+ val = "R"
else:
raise errors.ParameterError(field)
node_output.append(val)
return output
+class LUQueryNodeStorage(NoHooksLU):
+ """Logical unit for getting information on storage units on node(s).
+
+ """
+ _OP_REQP = ["nodes", "storage_type", "output_fields"]
+ REQ_BGL = False
+ _FIELDS_STATIC = utils.FieldSet("node")
+
+ def ExpandNames(self):
+ storage_type = self.op.storage_type
+
+ if storage_type not in constants.VALID_STORAGE_FIELDS:
+ raise errors.OpPrereqError("Unknown storage type: %s" % storage_type)
+
+ dynamic_fields = constants.VALID_STORAGE_FIELDS[storage_type]
+
+ _CheckOutputFields(static=self._FIELDS_STATIC,
+ dynamic=utils.FieldSet(*dynamic_fields),
+ selected=self.op.output_fields)
+
+ self.needed_locks = {}
+ self.share_locks[locking.LEVEL_NODE] = 1
+
+ if self.op.nodes:
+ self.needed_locks[locking.LEVEL_NODE] = \
+ _GetWantedNodes(self, self.op.nodes)
+ else:
+ self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+
+ def CheckPrereq(self):
+ """Check prerequisites.
+
+ This checks that the fields required are valid output fields.
+
+ """
+ self.op.name = getattr(self.op, "name", None)
+
+ self.nodes = self.acquired_locks[locking.LEVEL_NODE]
+
+ def Exec(self, feedback_fn):
+ """Computes the list of nodes and their attributes.
+
+ """
+ # Always get name to sort by
+ if constants.SF_NAME in self.op.output_fields:
+ fields = self.op.output_fields[:]
+ else:
+ fields = [constants.SF_NAME] + self.op.output_fields
+
+ # Never ask for node as it's only known to the LU
+ while "node" in fields:
+ fields.remove("node")
+
+ field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
+ name_idx = field_idx[constants.SF_NAME]
+
+ st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
+ data = self.rpc.call_storage_list(self.nodes,
+ self.op.storage_type, st_args,
+ self.op.name, fields)
+
+ result = []
+
+ for node in utils.NiceSort(self.nodes):
+ nresult = data[node]
+ if nresult.offline:
+ continue
+
+ msg = nresult.fail_msg
+ if msg:
+ self.LogWarning("Can't get storage data from node %s: %s", node, msg)
+ continue
+
+ rows = dict([(row[name_idx], row) for row in nresult.payload])
+
+ for name in utils.NiceSort(rows.keys()):
+ row = rows[name]
+
+ out = []
+
+ for field in self.op.output_fields:
+ if field == "node":
+ val = node
+ elif field in field_idx:
+ val = row[field_idx[field]]
+ else:
+ raise errors.ParameterError(field)
+
+ out.append(val)
+
+ result.append(out)
+
+ return result
+
+
+class LUModifyNodeStorage(NoHooksLU):
+ """Logical unit for modifying a storage volume on a node.
+
+ """
+ _OP_REQP = ["node_name", "storage_type", "name", "changes"]
+ REQ_BGL = False
+
+ def CheckArguments(self):
+ node_name = self.cfg.ExpandNodeName(self.op.node_name)
+ if node_name is None:
+ raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
+
+ self.op.node_name = node_name
+
+ storage_type = self.op.storage_type
+ if storage_type not in constants.VALID_STORAGE_FIELDS:
+ raise errors.OpPrereqError("Unknown storage type: %s" % storage_type)
+
+ def ExpandNames(self):
+ self.needed_locks = {
+ locking.LEVEL_NODE: self.op.node_name,
+ }
+
+ def CheckPrereq(self):
+ """Check prerequisites.
+
+ """
+ storage_type = self.op.storage_type
+
+ try:
+ modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
+ except KeyError:
+ raise errors.OpPrereqError("Storage units of type '%s' can not be"
+ " modified" % storage_type)
+
+ diff = set(self.op.changes.keys()) - modifiable
+ if diff:
+ raise errors.OpPrereqError("The following fields can not be modified for"
+ " storage units of type '%s': %r" %
+ (storage_type, list(diff)))
+
+ def Exec(self, feedback_fn):
+ """Computes the list of nodes and their attributes.
+
+ """
+ st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
+ result = self.rpc.call_storage_modify(self.op.node_name,
+ self.op.storage_type, st_args,
+ self.op.name, self.op.changes)
+ result.Raise("Failed to modify storage unit '%s' on %s" %
+ (self.op.name, self.op.node_name))
+
+
class LUAddNode(LogicalUnit):
"""Logical unit for adding node to the cluster.
- it is resolvable
- its parameters (single/dual homed) matches the cluster
- Any errors are signalled by raising errors.OpPrereqError.
+ Any errors are signaled by raising errors.OpPrereqError.
"""
node_name = self.op.node_name
raise errors.OpPrereqError("The master has a private ip but the"
" new node doesn't have one")
- # checks reachablity
+ # checks reachability
if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
raise errors.OpPrereqError("Node not reachable by ping")
" based ping to noded port")
cp_size = self.cfg.GetClusterInfo().candidate_pool_size
- mc_now, _ = self.cfg.GetMasterCandidateStats()
- master_candidate = mc_now < cp_size
+ if self.op.readd:
+ exceptions = [node]
+ else:
+ exceptions = []
+ mc_now, mc_max = self.cfg.GetMasterCandidateStats(exceptions)
+ # the new node will increase mc_max with one, so:
+ mc_max = min(mc_max + 1, cp_size)
+ self.master_candidate = mc_now < mc_max
- self.new_node = objects.Node(name=node,
- primary_ip=primary_ip,
- secondary_ip=secondary_ip,
- master_candidate=master_candidate,
- offline=False, drained=False)
+ if self.op.readd:
+ self.new_node = self.cfg.GetNodeInfo(node)
+ assert self.new_node is not None, "Can't retrieve locked node %s" % node
+ else:
+ self.new_node = objects.Node(name=node,
+ primary_ip=primary_ip,
+ secondary_ip=secondary_ip,
+ master_candidate=self.master_candidate,
+ offline=False, drained=False)
def Exec(self, feedback_fn):
"""Adds the new node to the cluster.
new_node = self.new_node
node = new_node.name
+ # for re-adds, reset the offline/drained/master-candidate flags;
+ # we need to reset here, otherwise offline would prevent RPC calls
+ # later in the procedure; this also means that if the re-add
+ # fails, we are left with a non-offlined, broken node
+ if self.op.readd:
+ new_node.drained = new_node.offline = False
+ self.LogInfo("Readding a node, the offline/drained flags were reset")
+ # if we demote the node, we do cleanup later in the procedure
+ new_node.master_candidate = self.master_candidate
+
+ # notify the user about any possible mc promotion
+ if new_node.master_candidate:
+ self.LogInfo("Node will be a master candidate")
+
# check connectivity
result = self.rpc.call_version([node])[node]
result.Raise("Can't get version information from node %s" % node)
if self.op.readd:
_RedistributeAncillaryFiles(self)
self.context.ReaddNode(new_node)
+ # make sure we redistribute the config
+ self.cfg.Update(new_node)
+ # and make sure the new node will not have old files around
+ if not new_node.master_candidate:
+ result = self.rpc.call_node_demote_from_mc(new_node.name)
+ msg = result.RemoteFailMsg()
+ if msg:
+ self.LogWarning("Node failed to demote itself from master"
+ " candidate status: %s" % msg)
else:
_RedistributeAncillaryFiles(self, additional_nodes=[node])
self.context.AddNode(new_node)
node.master_candidate = False
changed_mc = True
result.append(("master_candidate", "auto-demotion due to drain"))
+ rrc = self.rpc.call_node_demote_from_mc(node.name)
+ msg = rrc.RemoteFailMsg()
+ if msg:
+ self.LogWarning("Node failed to demote itself: %s" % msg)
if node.offline:
node.offline = False
result.append(("offline", "clear offline status due to drain"))
def ExpandNames(self):
"""Locking for PowercycleNode.
- This is a last-resource option and shouldn't block on other
+ This is a last-resort option and shouldn't block on other
jobs. Therefore, we grab no locks.
"""
"software_version": constants.RELEASE_VERSION,
"protocol_version": constants.PROTOCOL_VERSION,
"config_version": constants.CONFIG_VERSION,
- "os_api_version": constants.OS_API_VERSION,
+ "os_api_version": max(constants.OS_API_VERSIONS),
"export_version": constants.EXPORT_VERSION,
"architecture": (platform.architecture()[0], platform.machine()),
"name": cluster.cluster_name,
"master": cluster.master_node,
- "default_hypervisor": cluster.default_hypervisor,
+ "default_hypervisor": cluster.enabled_hypervisors[0],
"enabled_hypervisors": cluster.enabled_hypervisors,
- "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
- for hypervisor in cluster.enabled_hypervisors]),
+ "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
+ for hypervisor_name in cluster.enabled_hypervisors]),
"beparams": cluster.beparams,
"nicparams": cluster.nicparams,
"candidate_pool_size": cluster.candidate_pool_size,
"master_netdev": cluster.master_netdev,
"volume_group_name": cluster.volume_group_name,
"file_storage_dir": cluster.file_storage_dir,
+ "ctime": cluster.ctime,
+ "mtime": cluster.mtime,
}
return result
assert self.instance is not None, \
"Cannot retrieve locked instance %s" % self.op.instance_name
_CheckNodeOnline(self, self.instance.primary_node)
+ if not hasattr(self.op, "ignore_size"):
+ self.op.ignore_size = False
def Exec(self, feedback_fn):
"""Activate the disks.
"""
- disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
+ disks_ok, disks_info = \
+ _AssembleInstanceDisks(self, self.instance,
+ ignore_size=self.op.ignore_size)
if not disks_ok:
raise errors.OpExecError("Cannot activate block devices")
return disks_info
-def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
+def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False,
+ ignore_size=False):
"""Prepare the block devices for an instance.
This sets up the block devices on all nodes.
@type ignore_secondaries: boolean
@param ignore_secondaries: if true, errors on secondary nodes
won't result in an error return from the function
+ @type ignore_size: boolean
+ @param ignore_size: if true, the current known size of the disk
+ will not be used during the disk activation, useful for cases
+ when the size is wrong
@return: False if the operation failed, otherwise a list of
(host, instance_visible_name, node_visible_name)
with the mapping from node devices to instance devices
# 1st pass, assemble on all nodes in secondary mode
for inst_disk in instance.disks:
for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
+ if ignore_size:
+ node_disk = node_disk.Copy()
+ node_disk.UnsetSize()
lu.cfg.SetDiskID(node_disk, node)
result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
msg = result.fail_msg
for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
if node != instance.primary_node:
continue
+ if ignore_size:
+ node_disk = node_disk.Copy()
+ node_disk.UnsetSize()
lu.cfg.SetDiskID(node_disk, node)
result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
msg = result.fail_msg
"""Start the disks of an instance.
"""
- disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
+ disks_ok, _ = _AssembleInstanceDisks(lu, instance,
ignore_secondaries=force)
if not disks_ok:
_ShutdownInstanceDisks(lu, instance)
_CheckNodeOnline(self, instance.primary_node)
bep = self.cfg.GetClusterInfo().FillBE(instance)
- # check bridges existance
+ # check bridges existence
_CheckInstanceBridgesExist(self, instance)
remote_info = self.rpc.call_instance_info(instance.primary_node,
_CheckNodeOnline(self, instance.primary_node)
- # check bridges existance
+ # check bridges existence
_CheckInstanceBridgesExist(self, instance)
def Exec(self, feedback_fn):
_ShutdownInstanceDisks(self, inst)
+class LURecreateInstanceDisks(LogicalUnit):
+ """Recreate an instance's missing disks.
+
+ """
+ HPATH = "instance-recreate-disks"
+ HTYPE = constants.HTYPE_INSTANCE
+ _OP_REQP = ["instance_name", "disks"]
+ REQ_BGL = False
+
+ def CheckArguments(self):
+ """Check the arguments.
+
+ """
+ if not isinstance(self.op.disks, list):
+ raise errors.OpPrereqError("Invalid disks parameter")
+ for item in self.op.disks:
+ if (not isinstance(item, int) or
+ item < 0):
+ raise errors.OpPrereqError("Invalid disk specification '%s'" %
+ str(item))
+
+ def ExpandNames(self):
+ self._ExpandAndLockInstance()
+
+ def BuildHooksEnv(self):
+ """Build hooks env.
+
+ This runs on master, primary and secondary nodes of the instance.
+
+ """
+ env = _BuildInstanceHookEnvByObject(self, self.instance)
+ nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
+ return env, nl, nl
+
+ def CheckPrereq(self):
+ """Check prerequisites.
+
+ This checks that the instance is in the cluster and is not running.
+
+ """
+ instance = self.cfg.GetInstanceInfo(self.op.instance_name)
+ assert instance is not None, \
+ "Cannot retrieve locked instance %s" % self.op.instance_name
+ _CheckNodeOnline(self, instance.primary_node)
+
+ if instance.disk_template == constants.DT_DISKLESS:
+ raise errors.OpPrereqError("Instance '%s' has no disks" %
+ self.op.instance_name)
+ if instance.admin_up:
+ raise errors.OpPrereqError("Instance '%s' is marked to be up" %
+ self.op.instance_name)
+ remote_info = self.rpc.call_instance_info(instance.primary_node,
+ instance.name,
+ instance.hypervisor)
+ remote_info.Raise("Error checking node %s" % instance.primary_node,
+ prereq=True)
+ if remote_info.payload:
+ raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
+ (self.op.instance_name,
+ instance.primary_node))
+
+ if not self.op.disks:
+ self.op.disks = range(len(instance.disks))
+ else:
+ for idx in self.op.disks:
+ if idx >= len(instance.disks):
+ raise errors.OpPrereqError("Invalid disk index passed '%s'" % idx)
+
+ self.instance = instance
+
+ def Exec(self, feedback_fn):
+ """Recreate the disks.
+
+ """
+ to_skip = []
+ for idx, disk in enumerate(self.instance.disks):
+ if idx not in self.op.disks: # disk idx has not been passed in
+ to_skip.append(idx)
+ continue
+
+ _CreateDisks(self, self.instance, to_skip=to_skip)
+
+
class LURenameInstance(LogicalUnit):
"""Rename an instance.
_FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
"admin_state",
"disk_template", "ip", "mac", "bridge",
+ "nic_mode", "nic_link",
"sda_size", "sdb_size", "vcpus", "tags",
"network_port", "beparams",
r"(disk)\.(size)/([0-9]+)",
r"(disk)\.(sizes)", "disk_usage",
- r"(nic)\.(mac|ip|bridge)/([0-9]+)",
- r"(nic)\.(macs|ips|bridges)",
+ r"(nic)\.(mac|ip|mode|link)/([0-9]+)",
+ r"(nic)\.(bridge)/([0-9]+)",
+ r"(nic)\.(macs|ips|modes|links|bridges)",
r"(disk|nic)\.(count)",
- "serial_no", "hypervisor", "hvparams",] +
+ "serial_no", "hypervisor", "hvparams",
+ "ctime", "mtime",
+ ] +
["hv/%s" % name
for name in constants.HVS_PARAMETERS] +
["be/%s" % name
HVPREFIX = "hv/"
BEPREFIX = "be/"
output = []
+ cluster = self.cfg.GetClusterInfo()
for instance in instance_list:
iout = []
- i_hv = self.cfg.GetClusterInfo().FillHV(instance)
- i_be = self.cfg.GetClusterInfo().FillBE(instance)
+ i_hv = cluster.FillHV(instance)
+ i_be = cluster.FillBE(instance)
+ i_nicp = [objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
+ nic.nicparams) for nic in instance.nics]
for field in self.op.output_fields:
st_match = self._FIELDS_STATIC.Matches(field)
if field == "name":
val = live_data[instance.name].get("memory", "?")
else:
val = "-"
+ elif field == "vcpus":
+ val = i_be[constants.BE_VCPUS]
elif field == "disk_template":
val = instance.disk_template
elif field == "ip":
- val = instance.nics[0].ip
+ if instance.nics:
+ val = instance.nics[0].ip
+ else:
+ val = None
+ elif field == "nic_mode":
+ if instance.nics:
+ val = i_nicp[0][constants.NIC_MODE]
+ else:
+ val = None
+ elif field == "nic_link":
+ if instance.nics:
+ val = i_nicp[0][constants.NIC_LINK]
+ else:
+ val = None
elif field == "bridge":
- val = instance.nics[0].bridge
+ if (instance.nics and
+ i_nicp[0][constants.NIC_MODE] == constants.NIC_MODE_BRIDGED):
+ val = i_nicp[0][constants.NIC_LINK]
+ else:
+ val = None
elif field == "mac":
- val = instance.nics[0].mac
+ if instance.nics:
+ val = instance.nics[0].mac
+ else:
+ val = None
elif field == "sda_size" or field == "sdb_size":
idx = ord(field[2]) - ord('a')
try:
val = list(instance.GetTags())
elif field == "serial_no":
val = instance.serial_no
+ elif field == "ctime":
+ val = instance.ctime
+ elif field == "mtime":
+ val = instance.mtime
elif field == "network_port":
val = instance.network_port
elif field == "hypervisor":
val = [nic.mac for nic in instance.nics]
elif st_groups[1] == "ips":
val = [nic.ip for nic in instance.nics]
+ elif st_groups[1] == "modes":
+ val = [nicp[constants.NIC_MODE] for nicp in i_nicp]
+ elif st_groups[1] == "links":
+ val = [nicp[constants.NIC_LINK] for nicp in i_nicp]
elif st_groups[1] == "bridges":
- val = [nic.bridge for nic in instance.nics]
+ val = []
+ for nicp in i_nicp:
+ if nicp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
+ val.append(nicp[constants.NIC_LINK])
+ else:
+ val.append(None)
else:
# index-based item
nic_idx = int(st_groups[2])
val = instance.nics[nic_idx].mac
elif st_groups[1] == "ip":
val = instance.nics[nic_idx].ip
+ elif st_groups[1] == "mode":
+ val = i_nicp[nic_idx][constants.NIC_MODE]
+ elif st_groups[1] == "link":
+ val = i_nicp[nic_idx][constants.NIC_LINK]
elif st_groups[1] == "bridge":
- val = instance.nics[nic_idx].bridge
+ nic_mode = i_nicp[nic_idx][constants.NIC_MODE]
+ if nic_mode == constants.NIC_MODE_BRIDGED:
+ val = i_nicp[nic_idx][constants.NIC_LINK]
+ else:
+ val = None
else:
assert False, "Unhandled NIC parameter"
else:
- assert False, "Unhandled variable parameter"
+ assert False, ("Declared but unhandled variable parameter '%s'" %
+ field)
else:
- raise errors.ParameterError(field)
+ assert False, "Declared but unhandled parameter '%s'" % field
iout.append(val)
output.append(iout)
target_node = secondary_nodes[0]
_CheckNodeOnline(self, target_node)
_CheckNodeNotDrained(self, target_node)
- # check memory requirements on the secondary node
- _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
- instance.name, bep[constants.BE_MEMORY],
- instance.hypervisor)
+ if instance.admin_up:
+ # check memory requirements on the secondary node
+ _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
+ instance.name, bep[constants.BE_MEMORY],
+ instance.hypervisor)
+ else:
+ self.LogInfo("Not checking memory on the secondary node as"
+ " instance will not be started")
+
# check bridge existance
_CheckInstanceBridgesExist(self, instance, node=target_node)
logging.info("Starting instance %s on node %s",
instance.name, target_node)
- disks_ok, dummy = _AssembleInstanceDisks(self, instance,
+ disks_ok, _ = _AssembleInstanceDisks(self, instance,
ignore_secondaries=True)
if not disks_ok:
_ShutdownInstanceDisks(self, instance)
def ExpandNames(self):
self._ExpandAndLockInstance()
+
self.needed_locks[locking.LEVEL_NODE] = []
self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+ self._migrater = TLMigrateInstance(self, self.op.instance_name,
+ self.op.live, self.op.cleanup)
+ self.tasklets = [self._migrater]
+
def DeclareLocks(self, level):
if level == locking.LEVEL_NODE:
self._LockInstancesNodes()
This runs on master, primary and secondary nodes of the instance.
"""
- env = _BuildInstanceHookEnvByObject(self, self.instance)
+ instance = self._migrater.instance
+ env = _BuildInstanceHookEnvByObject(self, instance)
env["MIGRATE_LIVE"] = self.op.live
env["MIGRATE_CLEANUP"] = self.op.cleanup
- nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
+ nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
return env, nl, nl
+
+class LUMigrateNode(LogicalUnit):
+ """Migrate all instances from a node.
+
+ """
+ HPATH = "node-migrate"
+ HTYPE = constants.HTYPE_NODE
+ _OP_REQP = ["node_name", "live"]
+ REQ_BGL = False
+
+ def ExpandNames(self):
+ self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name)
+ if self.op.node_name is None:
+ raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name)
+
+ self.needed_locks = {
+ locking.LEVEL_NODE: [self.op.node_name],
+ }
+
+ self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
+
+ # Create tasklets for migrating instances for all instances on this node
+ names = []
+ tasklets = []
+
+ for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name):
+ logging.debug("Migrating instance %s", inst.name)
+ names.append(inst.name)
+
+ tasklets.append(TLMigrateInstance(self, inst.name, self.op.live, False))
+
+ self.tasklets = tasklets
+
+ # Declare instance locks
+ self.needed_locks[locking.LEVEL_INSTANCE] = names
+
+ def DeclareLocks(self, level):
+ if level == locking.LEVEL_NODE:
+ self._LockInstancesNodes()
+
+ def BuildHooksEnv(self):
+ """Build hooks env.
+
+ This runs on the master, the primary and all the secondaries.
+
+ """
+ env = {
+ "NODE_NAME": self.op.node_name,
+ }
+
+ nl = [self.cfg.GetMasterNode()]
+
+ return (env, nl, nl)
+
+
+class TLMigrateInstance(Tasklet):
+ def __init__(self, lu, instance_name, live, cleanup):
+ """Initializes this class.
+
+ """
+ Tasklet.__init__(self, lu)
+
+ # Parameters
+ self.instance_name = instance_name
+ self.live = live
+ self.cleanup = cleanup
+
def CheckPrereq(self):
"""Check prerequisites.
"""
instance = self.cfg.GetInstanceInfo(
- self.cfg.ExpandInstanceName(self.op.instance_name))
+ self.cfg.ExpandInstanceName(self.instance_name))
if instance is None:
raise errors.OpPrereqError("Instance '%s' not known" %
- self.op.instance_name)
+ self.instance_name)
if instance.disk_template != constants.DT_DRBD8:
raise errors.OpPrereqError("Instance's disk layout is not"
# check bridge existance
_CheckInstanceBridgesExist(self, instance, node=target_node)
- if not self.op.cleanup:
+ if not self.cleanup:
_CheckNodeNotDrained(self, target_node)
result = self.rpc.call_instance_migratable(instance.primary_node,
instance)
self._GoReconnect(False)
self._WaitUntilSync()
except errors.OpExecError, err:
- self.LogWarning("Migration failed and I can't reconnect the"
- " drives: error '%s'\n"
- "Please look and recover the instance status" %
- str(err))
+ self.lu.LogWarning("Migration failed and I can't reconnect the"
+ " drives: error '%s'\n"
+ "Please look and recover the instance status" %
+ str(err))
def _AbortMigration(self):
"""Call the hypervisor code to abort a started migration.
time.sleep(10)
result = self.rpc.call_instance_migrate(source_node, instance,
self.nodes_ip[target_node],
- self.op.live)
+ self.live)
msg = result.fail_msg
if msg:
logging.error("Instance migration failed, trying to revert"
"""Perform the migration.
"""
+ feedback_fn("Migrating instance %s" % self.instance.name)
+
self.feedback_fn = feedback_fn
self.source_node = self.instance.primary_node
self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
}
- if self.op.cleanup:
+
+ if self.cleanup:
return self._ExecCleanup()
else:
return self._ExecMigration()
if len(secondary_nodes) != 0:
raise errors.ProgrammerError("Wrong template configuration")
- names = _GenerateUniqueNames(lu, [".disk%d" % i
+ names = _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
for i in range(disk_count)])
for idx, disk in enumerate(disk_info):
disk_index = idx + base_index
[primary_node, remote_node] * len(disk_info), instance_name)
names = []
- for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
+ for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
for i in range(disk_count)]):
names.append(lv_prefix + "_data")
names.append(lv_prefix + "_meta")
return "originstname+%s" % instance.name
-def _CreateDisks(lu, instance):
+def _CreateDisks(lu, instance, to_skip=None):
"""Create all disks for an instance.
This abstracts away some work from AddInstance.
@param lu: the logical unit on whose behalf we execute
@type instance: L{objects.Instance}
@param instance: the instance whose disks we should create
+ @type to_skip: list
+ @param to_skip: list of indices to skip
@rtype: boolean
@return: the success of the creation
# Note: this needs to be kept in sync with adding of disks in
# LUSetInstanceParams
- for device in instance.disks:
+ for idx, device in enumerate(instance.disks):
+ if to_skip and idx in to_skip:
+ continue
logging.info("Creating volume %s for instance %s",
device.iv_name, instance.name)
#HARDCODE
self.op.hvparams)
hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
hv_type.CheckParameterSyntax(filled_hvp)
+ self.hv_full = filled_hvp
# fill and remember the beparams dict
utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
bridge = nic.get("bridge", None)
link = nic.get("link", None)
if bridge and link:
- raise errors.OpPrereqError("Cannot pass 'bridge' and 'link' at the same time")
+ raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
+ " at the same time")
elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic")
elif bridge:
"""
nics = [n.ToDict() for n in self.nics]
- ial = IAllocator(self,
+ ial = IAllocator(self.cfg, self.rpc,
mode=constants.IALLOCATOR_MODE_ALLOC,
name=self.op.instance_name,
disk_template=self.op.disk_template,
os_type=self.op.os_type,
memory=self.be_full[constants.BE_MEMORY],
vcpus=self.be_full[constants.BE_VCPUS],
- nics=_PreBuildNICHooksList(self, self.nics),
+ nics=_NICListToTuple(self, self.nics),
disk_template=self.op.disk_template,
disks=[(d["size"], d["mode"]) for d in self.disks],
+ bep=self.be_full,
+ hvp=self.hv_full,
+ hypervisor_name=self.op.hypervisor,
))
nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
self.be_full[constants.BE_MEMORY],
self.op.hypervisor)
+ self.dry_run_result = list(nodenames)
+
def Exec(self, feedback_fn):
"""Create and add the instance to the cluster.
result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
result.Raise("Could not start instance")
+ return list(iobj.all_nodes)
+
class LUConnectConsole(NoHooksLU):
"""Connect to an instance's console.
if not hasattr(self.op, "iallocator"):
self.op.iallocator = None
- # check for valid parameter combination
- cnt = [self.op.remote_node, self.op.iallocator].count(None)
- if self.op.mode == constants.REPLACE_DISK_CHG:
- if cnt == 2:
- raise errors.OpPrereqError("When changing the secondary either an"
- " iallocator script must be used or the"
- " new node given")
- elif cnt == 0:
- raise errors.OpPrereqError("Give either the iallocator or the new"
- " secondary, not both")
- else: # not replacing the secondary
- if cnt != 2:
- raise errors.OpPrereqError("The iallocator and new node options can"
- " be used only when changing the"
- " secondary node")
+ TLReplaceDisks.CheckArguments(self.op.mode, self.op.remote_node,
+ self.op.iallocator)
def ExpandNames(self):
self._ExpandAndLockInstance()
if self.op.iallocator is not None:
self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+
elif self.op.remote_node is not None:
remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
if remote_node is None:
raise errors.OpPrereqError("Node '%s' not known" %
self.op.remote_node)
+
self.op.remote_node = remote_node
+
# Warning: do not remove the locking of the new secondary here
# unless DRBD8.AddChildren is changed to work in parallel;
# currently it doesn't since parallel invocations of
# FindUnusedMinor will conflict
self.needed_locks[locking.LEVEL_NODE] = [remote_node]
self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
+
else:
self.needed_locks[locking.LEVEL_NODE] = []
self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+ self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
+ self.op.iallocator, self.op.remote_node,
+ self.op.disks)
+
+ self.tasklets = [self.replacer]
+
def DeclareLocks(self, level):
# If we're not already locking all nodes in the set we have to declare the
# instance's primary/secondary nodes.
self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
self._LockInstancesNodes()
- def _RunAllocator(self):
- """Compute a new secondary node using an IAllocator.
-
- """
- ial = IAllocator(self,
- mode=constants.IALLOCATOR_MODE_RELOC,
- name=self.op.instance_name,
- relocate_from=[self.sec_node])
-
- ial.Run(self.op.iallocator)
-
- if not ial.success:
- raise errors.OpPrereqError("Can't compute nodes using"
- " iallocator '%s': %s" % (self.op.iallocator,
- ial.info))
- if len(ial.nodes) != ial.required_nodes:
- raise errors.OpPrereqError("iallocator '%s' returned invalid number"
- " of nodes (%s), required %s" %
- (len(ial.nodes), ial.required_nodes))
- self.op.remote_node = ial.nodes[0]
- self.LogInfo("Selected new secondary for the instance: %s",
- self.op.remote_node)
-
def BuildHooksEnv(self):
"""Build hooks env.
This runs on the master, the primary and all the secondaries.
"""
+ instance = self.replacer.instance
env = {
"MODE": self.op.mode,
"NEW_SECONDARY": self.op.remote_node,
- "OLD_SECONDARY": self.instance.secondary_nodes[0],
+ "OLD_SECONDARY": instance.secondary_nodes[0],
}
- env.update(_BuildInstanceHookEnvByObject(self, self.instance))
+ env.update(_BuildInstanceHookEnvByObject(self, instance))
nl = [
self.cfg.GetMasterNode(),
- self.instance.primary_node,
+ instance.primary_node,
]
if self.op.remote_node is not None:
nl.append(self.op.remote_node)
return env, nl, nl
+
+class LUEvacuateNode(LogicalUnit):
+ """Relocate the secondary instances from a node.
+
+ """
+ HPATH = "node-evacuate"
+ HTYPE = constants.HTYPE_NODE
+ _OP_REQP = ["node_name"]
+ REQ_BGL = False
+
+ def CheckArguments(self):
+ if not hasattr(self.op, "remote_node"):
+ self.op.remote_node = None
+ if not hasattr(self.op, "iallocator"):
+ self.op.iallocator = None
+
+ TLReplaceDisks.CheckArguments(constants.REPLACE_DISK_CHG,
+ self.op.remote_node,
+ self.op.iallocator)
+
+ def ExpandNames(self):
+ self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name)
+ if self.op.node_name is None:
+ raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name)
+
+ self.needed_locks = {}
+
+ # Declare node locks
+ if self.op.iallocator is not None:
+ self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+
+ elif self.op.remote_node is not None:
+ remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
+ if remote_node is None:
+ raise errors.OpPrereqError("Node '%s' not known" %
+ self.op.remote_node)
+
+ self.op.remote_node = remote_node
+
+ # Warning: do not remove the locking of the new secondary here
+ # unless DRBD8.AddChildren is changed to work in parallel;
+ # currently it doesn't since parallel invocations of
+ # FindUnusedMinor will conflict
+ self.needed_locks[locking.LEVEL_NODE] = [remote_node]
+ self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
+
+ else:
+ raise errors.OpPrereqError("Invalid parameters")
+
+ # Create tasklets for replacing disks for all secondary instances on this
+ # node
+ names = []
+ tasklets = []
+
+ for inst in _GetNodeSecondaryInstances(self.cfg, self.op.node_name):
+ logging.debug("Replacing disks for instance %s", inst.name)
+ names.append(inst.name)
+
+ replacer = TLReplaceDisks(self, inst.name, constants.REPLACE_DISK_CHG,
+ self.op.iallocator, self.op.remote_node, [])
+ tasklets.append(replacer)
+
+ self.tasklets = tasklets
+ self.instance_names = names
+
+ # Declare instance locks
+ self.needed_locks[locking.LEVEL_INSTANCE] = self.instance_names
+
+ def DeclareLocks(self, level):
+ # If we're not already locking all nodes in the set we have to declare the
+ # instance's primary/secondary nodes.
+ if (level == locking.LEVEL_NODE and
+ self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
+ self._LockInstancesNodes()
+
+ def BuildHooksEnv(self):
+ """Build hooks env.
+
+ This runs on the master, the primary and all the secondaries.
+
+ """
+ env = {
+ "NODE_NAME": self.op.node_name,
+ }
+
+ nl = [self.cfg.GetMasterNode()]
+
+ if self.op.remote_node is not None:
+ env["NEW_SECONDARY"] = self.op.remote_node
+ nl.append(self.op.remote_node)
+
+ return (env, nl, nl)
+
+
+class TLReplaceDisks(Tasklet):
+ """Replaces disks for an instance.
+
+ Note: Locking is not within the scope of this class.
+
+ """
+ def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
+ disks):
+ """Initializes this class.
+
+ """
+ Tasklet.__init__(self, lu)
+
+ # Parameters
+ self.instance_name = instance_name
+ self.mode = mode
+ self.iallocator_name = iallocator_name
+ self.remote_node = remote_node
+ self.disks = disks
+
+ # Runtime data
+ self.instance = None
+ self.new_node = None
+ self.target_node = None
+ self.other_node = None
+ self.remote_node_info = None
+ self.node_secondary_ip = None
+
+ @staticmethod
+ def CheckArguments(mode, remote_node, iallocator):
+ """Helper function for users of this class.
+
+ """
+ # check for valid parameter combination
+ if mode == constants.REPLACE_DISK_CHG:
+ if remote_node is None and iallocator is None:
+ raise errors.OpPrereqError("When changing the secondary either an"
+ " iallocator script must be used or the"
+ " new node given")
+
+ if remote_node is not None and iallocator is not None:
+ raise errors.OpPrereqError("Give either the iallocator or the new"
+ " secondary, not both")
+
+ elif remote_node is not None or iallocator is not None:
+ # Not replacing the secondary
+ raise errors.OpPrereqError("The iallocator and new node options can"
+ " only be used when changing the"
+ " secondary node")
+
+ @staticmethod
+ def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
+ """Compute a new secondary node using an IAllocator.
+
+ """
+ ial = IAllocator(lu.cfg, lu.rpc,
+ mode=constants.IALLOCATOR_MODE_RELOC,
+ name=instance_name,
+ relocate_from=relocate_from)
+
+ ial.Run(iallocator_name)
+
+ if not ial.success:
+ raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
+ " %s" % (iallocator_name, ial.info))
+
+ if len(ial.nodes) != ial.required_nodes:
+ raise errors.OpPrereqError("iallocator '%s' returned invalid number"
+ " of nodes (%s), required %s" %
+ (len(ial.nodes), ial.required_nodes))
+
+ remote_node_name = ial.nodes[0]
+
+ lu.LogInfo("Selected new secondary for instance '%s': %s",
+ instance_name, remote_node_name)
+
+ return remote_node_name
+
+ def _FindFaultyDisks(self, node_name):
+ return _FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
+ node_name, True)
+
def CheckPrereq(self):
"""Check prerequisites.
This checks that the instance is in the cluster.
"""
- instance = self.cfg.GetInstanceInfo(self.op.instance_name)
- assert instance is not None, \
- "Cannot retrieve locked instance %s" % self.op.instance_name
- self.instance = instance
+ self.instance = self.cfg.GetInstanceInfo(self.instance_name)
+ assert self.instance is not None, \
+ "Cannot retrieve locked instance %s" % self.instance_name
- if instance.disk_template != constants.DT_DRBD8:
+ if self.instance.disk_template != constants.DT_DRBD8:
raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
" instances")
- if len(instance.secondary_nodes) != 1:
+ if len(self.instance.secondary_nodes) != 1:
raise errors.OpPrereqError("The instance has a strange layout,"
" expected one secondary but found %d" %
- len(instance.secondary_nodes))
+ len(self.instance.secondary_nodes))
- self.sec_node = instance.secondary_nodes[0]
+ secondary_node = self.instance.secondary_nodes[0]
- if self.op.iallocator is not None:
- self._RunAllocator()
+ if self.iallocator_name is None:
+ remote_node = self.remote_node
+ else:
+ remote_node = self._RunAllocator(self.lu, self.iallocator_name,
+ self.instance.name, secondary_node)
- remote_node = self.op.remote_node
if remote_node is not None:
self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
assert self.remote_node_info is not None, \
"Cannot retrieve locked node %s" % remote_node
else:
self.remote_node_info = None
- if remote_node == instance.primary_node:
+
+ if remote_node == self.instance.primary_node:
raise errors.OpPrereqError("The specified node is the primary node of"
" the instance.")
- elif remote_node == self.sec_node:
+
+ if remote_node == secondary_node:
raise errors.OpPrereqError("The specified node is already the"
" secondary node of the instance.")
- if self.op.mode == constants.REPLACE_DISK_PRI:
- n1 = self.tgt_node = instance.primary_node
- n2 = self.oth_node = self.sec_node
- elif self.op.mode == constants.REPLACE_DISK_SEC:
- n1 = self.tgt_node = self.sec_node
- n2 = self.oth_node = instance.primary_node
- elif self.op.mode == constants.REPLACE_DISK_CHG:
- n1 = self.new_node = remote_node
- n2 = self.oth_node = instance.primary_node
- self.tgt_node = self.sec_node
- _CheckNodeNotDrained(self, remote_node)
+ if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
+ constants.REPLACE_DISK_CHG):
+ raise errors.OpPrereqError("Cannot specify disks to be replaced")
+
+ if self.mode == constants.REPLACE_DISK_AUTO:
+ faulty_primary = self._FindFaultyDisks(self.instance.primary_node)
+ faulty_secondary = self._FindFaultyDisks(secondary_node)
+
+ if faulty_primary and faulty_secondary:
+ raise errors.OpPrereqError("Instance %s has faulty disks on more than"
+ " one node and can not be repaired"
+ " automatically" % self.instance_name)
+
+ if faulty_primary:
+ self.disks = faulty_primary
+ self.target_node = self.instance.primary_node
+ self.other_node = secondary_node
+ check_nodes = [self.target_node, self.other_node]
+ elif faulty_secondary:
+ self.disks = faulty_secondary
+ self.target_node = secondary_node
+ self.other_node = self.instance.primary_node
+ check_nodes = [self.target_node, self.other_node]
+ else:
+ self.disks = []
+ check_nodes = []
+
else:
- raise errors.ProgrammerError("Unhandled disk replace mode")
+ # Non-automatic modes
+ if self.mode == constants.REPLACE_DISK_PRI:
+ self.target_node = self.instance.primary_node
+ self.other_node = secondary_node
+ check_nodes = [self.target_node, self.other_node]
- _CheckNodeOnline(self, n1)
- _CheckNodeOnline(self, n2)
+ elif self.mode == constants.REPLACE_DISK_SEC:
+ self.target_node = secondary_node
+ self.other_node = self.instance.primary_node
+ check_nodes = [self.target_node, self.other_node]
- if not self.op.disks:
- self.op.disks = range(len(instance.disks))
+ elif self.mode == constants.REPLACE_DISK_CHG:
+ self.new_node = remote_node
+ self.other_node = self.instance.primary_node
+ self.target_node = secondary_node
+ check_nodes = [self.new_node, self.other_node]
- for disk_idx in self.op.disks:
- instance.FindDisk(disk_idx)
+ _CheckNodeNotDrained(self.lu, remote_node)
- def _ExecD8DiskOnly(self, feedback_fn):
- """Replace a disk on the primary or secondary for dbrd8.
+ else:
+ raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
+ self.mode)
- The algorithm for replace is quite complicated:
+ # If not specified all disks should be replaced
+ if not self.disks:
+ self.disks = range(len(self.instance.disks))
- 1. for each disk to be replaced:
+ for node in check_nodes:
+ _CheckNodeOnline(self.lu, node)
- 1. create new LVs on the target node with unique names
- 1. detach old LVs from the drbd device
- 1. rename old LVs to name_replaced.<time_t>
- 1. rename new LVs to old LVs
- 1. attach the new LVs (with the old names now) to the drbd device
+ # Check whether disks are valid
+ for disk_idx in self.disks:
+ self.instance.FindDisk(disk_idx)
- 1. wait for sync across all devices
+ # Get secondary node IP addresses
+ node_2nd_ip = {}
- 1. for each modified disk:
+ for node_name in [self.target_node, self.other_node, self.new_node]:
+ if node_name is not None:
+ node_2nd_ip[node_name] = self.cfg.GetNodeInfo(node_name).secondary_ip
- 1. remove old LVs (which have the name name_replaces.<time_t>)
+ self.node_secondary_ip = node_2nd_ip
- Failures are not very well handled.
+ def Exec(self, feedback_fn):
+ """Execute disk replacement.
+
+ This dispatches the disk replacement to the appropriate handler.
"""
- steps_total = 6
- warning, info = (self.proc.LogWarning, self.proc.LogInfo)
- instance = self.instance
- iv_names = {}
+ if not self.disks:
+ feedback_fn("No disks need replacement")
+ return
+
+ feedback_fn("Replacing disk(s) %s for %s" %
+ (", ".join([str(i) for i in self.disks]), self.instance.name))
+
+ activate_disks = (not self.instance.admin_up)
+
+ # Activate the instance disks if we're replacing them on a down instance
+ if activate_disks:
+ _StartInstanceDisks(self.lu, self.instance, True)
+
+ try:
+ # Should we replace the secondary node?
+ if self.new_node is not None:
+ return self._ExecDrbd8Secondary()
+ else:
+ return self._ExecDrbd8DiskOnly()
+
+ finally:
+ # Deactivate the instance disks if we're replacing them on a down instance
+ if activate_disks:
+ _SafeShutdownInstanceDisks(self.lu, self.instance)
+
+ def _CheckVolumeGroup(self, nodes):
+ self.lu.LogInfo("Checking volume groups")
+
vgname = self.cfg.GetVGName()
- # start of work
- cfg = self.cfg
- tgt_node = self.tgt_node
- oth_node = self.oth_node
- # Step: check device activation
- self.proc.LogStep(1, steps_total, "check device existence")
- info("checking volume groups")
- my_vg = cfg.GetVGName()
- results = self.rpc.call_vg_list([oth_node, tgt_node])
+ # Make sure volume group exists on all involved nodes
+ results = self.rpc.call_vg_list(nodes)
if not results:
raise errors.OpExecError("Can't list volume groups on the nodes")
- for node in oth_node, tgt_node:
+
+ for node in nodes:
res = results[node]
res.Raise("Error checking node %s" % node)
- if my_vg not in res.payload:
- raise errors.OpExecError("Volume group '%s' not found on %s" %
- (my_vg, node))
- for idx, dev in enumerate(instance.disks):
- if idx not in self.op.disks:
+ if vgname not in res.payload:
+ raise errors.OpExecError("Volume group '%s' not found on node %s" %
+ (vgname, node))
+
+ def _CheckDisksExistence(self, nodes):
+ # Check disk existence
+ for idx, dev in enumerate(self.instance.disks):
+ if idx not in self.disks:
continue
- for node in tgt_node, oth_node:
- info("checking disk/%d on %s" % (idx, node))
- cfg.SetDiskID(dev, node)
+
+ for node in nodes:
+ self.lu.LogInfo("Checking disk/%d on %s" % (idx, node))
+ self.cfg.SetDiskID(dev, node)
+
result = self.rpc.call_blockdev_find(node, dev)
+
msg = result.fail_msg
- if not msg and not result.payload:
- msg = "disk not found"
- if msg:
+ if msg or not result.payload:
+ if not msg:
+ msg = "disk not found"
raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
(idx, node, msg))
- # Step: check other node consistency
- self.proc.LogStep(2, steps_total, "check peer consistency")
- for idx, dev in enumerate(instance.disks):
- if idx not in self.op.disks:
+ def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
+ for idx, dev in enumerate(self.instance.disks):
+ if idx not in self.disks:
continue
- info("checking disk/%d consistency on %s" % (idx, oth_node))
- if not _CheckDiskConsistency(self, dev, oth_node,
- oth_node==instance.primary_node):
- raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
- " to replace disks on this node (%s)" %
- (oth_node, tgt_node))
- # Step: create new storage
- self.proc.LogStep(3, steps_total, "allocate new storage")
- for idx, dev in enumerate(instance.disks):
- if idx not in self.op.disks:
+ self.lu.LogInfo("Checking disk/%d consistency on node %s" %
+ (idx, node_name))
+
+ if not _CheckDiskConsistency(self.lu, dev, node_name, on_primary,
+ ldisk=ldisk):
+ raise errors.OpExecError("Node %s has degraded storage, unsafe to"
+ " replace disks for instance %s" %
+ (node_name, self.instance.name))
+
+ def _CreateNewStorage(self, node_name):
+ vgname = self.cfg.GetVGName()
+ iv_names = {}
+
+ for idx, dev in enumerate(self.instance.disks):
+ if idx not in self.disks:
continue
- size = dev.size
- cfg.SetDiskID(dev, tgt_node)
- lv_names = [".disk%d_%s" % (idx, suf)
- for suf in ["data", "meta"]]
- names = _GenerateUniqueNames(self, lv_names)
- lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
+
+ self.lu.LogInfo("Adding storage on %s for disk/%d" % (node_name, idx))
+
+ self.cfg.SetDiskID(dev, node_name)
+
+ lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
+ names = _GenerateUniqueNames(self.lu, lv_names)
+
+ lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
logical_id=(vgname, names[0]))
lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
logical_id=(vgname, names[1]))
+
new_lvs = [lv_data, lv_meta]
old_lvs = dev.children
iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
- info("creating new local storage on %s for %s" %
- (tgt_node, dev.iv_name))
+
# we pass force_create=True to force the LVM creation
for new_lv in new_lvs:
- _CreateBlockDev(self, tgt_node, instance, new_lv, True,
- _GetInstanceInfoText(instance), False)
+ _CreateBlockDev(self.lu, node_name, self.instance, new_lv, True,
+ _GetInstanceInfoText(self.instance), False)
+
+ return iv_names
+
+ def _CheckDevices(self, node_name, iv_names):
+ for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
+ self.cfg.SetDiskID(dev, node_name)
+
+ result = self.rpc.call_blockdev_find(node_name, dev)
+
+ msg = result.fail_msg
+ if msg or not result.payload:
+ if not msg:
+ msg = "disk not found"
+ raise errors.OpExecError("Can't find DRBD device %s: %s" %
+ (name, msg))
+
+ if result.payload.is_degraded:
+ raise errors.OpExecError("DRBD device %s is degraded!" % name)
+
+ def _RemoveOldStorage(self, node_name, iv_names):
+ for name, (dev, old_lvs, _) in iv_names.iteritems():
+ self.lu.LogInfo("Remove logical volumes for %s" % name)
+
+ for lv in old_lvs:
+ self.cfg.SetDiskID(lv, node_name)
+
+ msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
+ if msg:
+ self.lu.LogWarning("Can't remove old LV: %s" % msg,
+ hint="remove unused LVs manually")
+
+ def _ExecDrbd8DiskOnly(self):
+ """Replace a disk on the primary or secondary for DRBD 8.
+
+ The algorithm for replace is quite complicated:
+
+ 1. for each disk to be replaced:
+
+ 1. create new LVs on the target node with unique names
+ 1. detach old LVs from the drbd device
+ 1. rename old LVs to name_replaced.<time_t>
+ 1. rename new LVs to old LVs
+ 1. attach the new LVs (with the old names now) to the drbd device
+
+ 1. wait for sync across all devices
+
+ 1. for each modified disk:
+
+ 1. remove old LVs (which have the name name_replaces.<time_t>)
+
+ Failures are not very well handled.
+
+ """
+ steps_total = 6
+
+ # Step: check device activation
+ self.lu.LogStep(1, steps_total, "Check device existence")
+ self._CheckDisksExistence([self.other_node, self.target_node])
+ self._CheckVolumeGroup([self.target_node, self.other_node])
+
+ # Step: check other node consistency
+ self.lu.LogStep(2, steps_total, "Check peer consistency")
+ self._CheckDisksConsistency(self.other_node,
+ self.other_node == self.instance.primary_node,
+ False)
+
+ # Step: create new storage
+ self.lu.LogStep(3, steps_total, "Allocate new storage")
+ iv_names = self._CreateNewStorage(self.target_node)
# Step: for each lv, detach+rename*2+attach
- self.proc.LogStep(4, steps_total, "change drbd configuration")
+ self.lu.LogStep(4, steps_total, "Changing drbd configuration")
for dev, old_lvs, new_lvs in iv_names.itervalues():
- info("detaching %s drbd from local storage" % dev.iv_name)
- result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
+ self.lu.LogInfo("Detaching %s drbd from local storage" % dev.iv_name)
+
+ result = self.rpc.call_blockdev_removechildren(self.target_node, dev, old_lvs)
result.Raise("Can't detach drbd from local storage on node"
- " %s for device %s" % (tgt_node, dev.iv_name))
+ " %s for device %s" % (self.target_node, dev.iv_name))
#dev.children = []
#cfg.Update(instance)
temp_suffix = int(time.time())
ren_fn = lambda d, suff: (d.physical_id[0],
d.physical_id[1] + "_replaced-%s" % suff)
- # build the rename list based on what LVs exist on the node
- rlist = []
+
+ # Build the rename list based on what LVs exist on the node
+ rename_old_to_new = []
for to_ren in old_lvs:
- result = self.rpc.call_blockdev_find(tgt_node, to_ren)
+ result = self.rpc.call_blockdev_find(self.target_node, to_ren)
if not result.fail_msg and result.payload:
# device exists
- rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
+ rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
+
+ self.lu.LogInfo("Renaming the old LVs on the target node")
+ result = self.rpc.call_blockdev_rename(self.target_node, rename_old_to_new)
+ result.Raise("Can't rename old LVs on node %s" % self.target_node)
- info("renaming the old LVs on the target node")
- result = self.rpc.call_blockdev_rename(tgt_node, rlist)
- result.Raise("Can't rename old LVs on node %s" % tgt_node)
- # now we rename the new LVs to the old LVs
- info("renaming the new LVs on the target node")
- rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
- result = self.rpc.call_blockdev_rename(tgt_node, rlist)
- result.Raise("Can't rename new LVs on node %s" % tgt_node)
+ # Now we rename the new LVs to the old LVs
+ self.lu.LogInfo("Renaming the new LVs on the target node")
+ rename_new_to_old = [(new, old.physical_id)
+ for old, new in zip(old_lvs, new_lvs)]
+ result = self.rpc.call_blockdev_rename(self.target_node, rename_new_to_old)
+ result.Raise("Can't rename new LVs on node %s" % self.target_node)
for old, new in zip(old_lvs, new_lvs):
new.logical_id = old.logical_id
- cfg.SetDiskID(new, tgt_node)
+ self.cfg.SetDiskID(new, self.target_node)
for disk in old_lvs:
disk.logical_id = ren_fn(disk, temp_suffix)
- cfg.SetDiskID(disk, tgt_node)
+ self.cfg.SetDiskID(disk, self.target_node)
- # now that the new lvs have the old name, we can add them to the device
- info("adding new mirror component on %s" % tgt_node)
- result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
+ # Now that the new lvs have the old name, we can add them to the device
+ self.lu.LogInfo("Adding new mirror component on %s" % self.target_node)
+ result = self.rpc.call_blockdev_addchildren(self.target_node, dev, new_lvs)
msg = result.fail_msg
if msg:
for new_lv in new_lvs:
- msg2 = self.rpc.call_blockdev_remove(tgt_node, new_lv).fail_msg
+ msg2 = self.rpc.call_blockdev_remove(self.target_node, new_lv).fail_msg
if msg2:
- warning("Can't rollback device %s: %s", dev, msg2,
- hint="cleanup manually the unused logical volumes")
+ self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
+ hint=("cleanup manually the unused logical"
+ "volumes"))
raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
dev.children = new_lvs
- cfg.Update(instance)
- # Step: wait for sync
+ self.cfg.Update(self.instance)
- # this can fail as the old devices are degraded and _WaitForSync
- # does a combined result over all disks, so we don't check its
- # return value
- self.proc.LogStep(5, steps_total, "sync devices")
- _WaitForSync(self, instance, unlock=True)
+ # Wait for sync
+ # This can fail as the old devices are degraded and _WaitForSync
+ # does a combined result over all disks, so we don't check its return value
+ self.lu.LogStep(5, steps_total, "Sync devices")
+ _WaitForSync(self.lu, self.instance, unlock=True)
- # so check manually all the devices
- for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
- cfg.SetDiskID(dev, instance.primary_node)
- result = self.rpc.call_blockdev_find(instance.primary_node, dev)
- msg = result.fail_msg
- if not msg and not result.payload:
- msg = "disk not found"
- if msg:
- raise errors.OpExecError("Can't find DRBD device %s: %s" %
- (name, msg))
- if result.payload[5]:
- raise errors.OpExecError("DRBD device %s is degraded!" % name)
+ # Check all devices manually
+ self._CheckDevices(self.instance.primary_node, iv_names)
# Step: remove old storage
- self.proc.LogStep(6, steps_total, "removing old storage")
- for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
- info("remove logical volumes for %s" % name)
- for lv in old_lvs:
- cfg.SetDiskID(lv, tgt_node)
- msg = self.rpc.call_blockdev_remove(tgt_node, lv).fail_msg
- if msg:
- warning("Can't remove old LV: %s" % msg,
- hint="manually remove unused LVs")
- continue
+ self.lu.LogStep(6, steps_total, "Removing old storage")
+ self._RemoveOldStorage(self.target_node, iv_names)
- def _ExecD8Secondary(self, feedback_fn):
- """Replace the secondary node for drbd8.
+ def _ExecDrbd8Secondary(self):
+ """Replace the secondary node for DRBD 8.
The algorithm for replace is quite complicated:
- for all disks of the instance:
"""
steps_total = 6
- warning, info = (self.proc.LogWarning, self.proc.LogInfo)
- instance = self.instance
- iv_names = {}
- # start of work
- cfg = self.cfg
- old_node = self.tgt_node
- new_node = self.new_node
- pri_node = instance.primary_node
- nodes_ip = {
- old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
- new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
- pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
- }
# Step: check device activation
- self.proc.LogStep(1, steps_total, "check device existence")
- info("checking volume groups")
- my_vg = cfg.GetVGName()
- results = self.rpc.call_vg_list([pri_node, new_node])
- for node in pri_node, new_node:
- res = results[node]
- res.Raise("Error checking node %s" % node)
- if my_vg not in res.payload:
- raise errors.OpExecError("Volume group '%s' not found on %s" %
- (my_vg, node))
- for idx, dev in enumerate(instance.disks):
- if idx not in self.op.disks:
- continue
- info("checking disk/%d on %s" % (idx, pri_node))
- cfg.SetDiskID(dev, pri_node)
- result = self.rpc.call_blockdev_find(pri_node, dev)
- msg = result.fail_msg
- if not msg and not result.payload:
- msg = "disk not found"
- if msg:
- raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
- (idx, pri_node, msg))
+ self.lu.LogStep(1, steps_total, "Check device existence")
+ self._CheckDisksExistence([self.instance.primary_node])
+ self._CheckVolumeGroup([self.instance.primary_node])
# Step: check other node consistency
- self.proc.LogStep(2, steps_total, "check peer consistency")
- for idx, dev in enumerate(instance.disks):
- if idx not in self.op.disks:
- continue
- info("checking disk/%d consistency on %s" % (idx, pri_node))
- if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
- raise errors.OpExecError("Primary node (%s) has degraded storage,"
- " unsafe to replace the secondary" %
- pri_node)
+ self.lu.LogStep(2, steps_total, "Check peer consistency")
+ self._CheckDisksConsistency(self.instance.primary_node, True, True)
# Step: create new storage
- self.proc.LogStep(3, steps_total, "allocate new storage")
- for idx, dev in enumerate(instance.disks):
- info("adding new local storage on %s for disk/%d" %
- (new_node, idx))
+ self.lu.LogStep(3, steps_total, "Allocate new storage")
+ for idx, dev in enumerate(self.instance.disks):
+ self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
+ (self.new_node, idx))
# we pass force_create=True to force LVM creation
for new_lv in dev.children:
- _CreateBlockDev(self, new_node, instance, new_lv, True,
- _GetInstanceInfoText(instance), False)
+ _CreateBlockDev(self.lu, self.new_node, self.instance, new_lv, True,
+ _GetInstanceInfoText(self.instance), False)
# Step 4: dbrd minors and drbd setups changes
# after this, we must manually remove the drbd minors on both the
# error and the success paths
- minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
- instance.name)
- logging.debug("Allocated minors %s" % (minors,))
- self.proc.LogStep(4, steps_total, "changing drbd configuration")
- for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
- size = dev.size
- info("activating a new drbd on %s for disk/%d" % (new_node, idx))
+ self.lu.LogStep(4, steps_total, "Changing drbd configuration")
+ minors = self.cfg.AllocateDRBDMinor([self.new_node for dev in self.instance.disks],
+ self.instance.name)
+ logging.debug("Allocated minors %r" % (minors,))
+
+ iv_names = {}
+ for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
+ self.lu.LogInfo("activating a new drbd on %s for disk/%d" % (self.new_node, idx))
# create new devices on new_node; note that we create two IDs:
# one without port, so the drbd will be activated without
# networking information on the new node at this stage, and one
# with network, for the latter activation in step 4
(o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
- if pri_node == o_node1:
+ if self.instance.primary_node == o_node1:
p_minor = o_minor1
else:
p_minor = o_minor2
- new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
- new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
+ new_alone_id = (self.instance.primary_node, self.new_node, None, p_minor, new_minor, o_secret)
+ new_net_id = (self.instance.primary_node, self.new_node, o_port, p_minor, new_minor, o_secret)
iv_names[idx] = (dev, dev.children, new_net_id)
logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
new_net_id)
new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
logical_id=new_alone_id,
- children=dev.children)
+ children=dev.children,
+ size=dev.size)
try:
- _CreateSingleBlockDev(self, new_node, instance, new_drbd,
- _GetInstanceInfoText(instance), False)
+ _CreateSingleBlockDev(self.lu, self.new_node, self.instance, new_drbd,
+ _GetInstanceInfoText(self.instance), False)
except errors.GenericError:
- self.cfg.ReleaseDRBDMinors(instance.name)
+ self.cfg.ReleaseDRBDMinors(self.instance.name)
raise
- for idx, dev in enumerate(instance.disks):
- # we have new devices, shutdown the drbd on the old secondary
- info("shutting down drbd for disk/%d on old node" % idx)
- cfg.SetDiskID(dev, old_node)
- msg = self.rpc.call_blockdev_shutdown(old_node, dev).fail_msg
+ # We have new devices, shutdown the drbd on the old secondary
+ for idx, dev in enumerate(self.instance.disks):
+ self.lu.LogInfo("Shutting down drbd for disk/%d on old node" % idx)
+ self.cfg.SetDiskID(dev, self.target_node)
+ msg = self.rpc.call_blockdev_shutdown(self.target_node, dev).fail_msg
if msg:
- warning("Failed to shutdown drbd for disk/%d on old node: %s" %
- (idx, msg),
- hint="Please cleanup this device manually as soon as possible")
+ self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
+ "node: %s" % (idx, msg),
+ hint=("Please cleanup this device manually as"
+ " soon as possible"))
- info("detaching primary drbds from the network (=> standalone)")
- result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
- instance.disks)[pri_node]
+ self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
+ result = self.rpc.call_drbd_disconnect_net([self.instance.primary_node], self.node_secondary_ip,
+ self.instance.disks)[self.instance.primary_node]
msg = result.fail_msg
if msg:
# detaches didn't succeed (unlikely)
- self.cfg.ReleaseDRBDMinors(instance.name)
+ self.cfg.ReleaseDRBDMinors(self.instance.name)
raise errors.OpExecError("Can't detach the disks from the network on"
" old node: %s" % (msg,))
# if we managed to detach at least one, we update all the disks of
# the instance to point to the new secondary
- info("updating instance configuration")
+ self.lu.LogInfo("Updating instance configuration")
for dev, _, new_logical_id in iv_names.itervalues():
dev.logical_id = new_logical_id
- cfg.SetDiskID(dev, pri_node)
- cfg.Update(instance)
+ self.cfg.SetDiskID(dev, self.instance.primary_node)
+
+ self.cfg.Update(self.instance)
# and now perform the drbd attach
- info("attaching primary drbds to new secondary (standalone => connected)")
- result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
- instance.disks, instance.name,
+ self.lu.LogInfo("Attaching primary drbds to new secondary"
+ " (standalone => connected)")
+ result = self.rpc.call_drbd_attach_net([self.instance.primary_node, self.new_node], self.node_secondary_ip,
+ self.instance.disks, self.instance.name,
False)
for to_node, to_result in result.items():
msg = to_result.fail_msg
if msg:
- warning("can't attach drbd disks on node %s: %s", to_node, msg,
- hint="please do a gnt-instance info to see the"
- " status of disks")
-
- # this can fail as the old devices are degraded and _WaitForSync
- # does a combined result over all disks, so we don't check its
- # return value
- self.proc.LogStep(5, steps_total, "sync devices")
- _WaitForSync(self, instance, unlock=True)
-
- # so check manually all the devices
- for idx, (dev, old_lvs, _) in iv_names.iteritems():
- cfg.SetDiskID(dev, pri_node)
- result = self.rpc.call_blockdev_find(pri_node, dev)
- msg = result.fail_msg
- if not msg and not result.payload:
- msg = "disk not found"
- if msg:
- raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
- (idx, msg))
- if result.payload[5]:
- raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
-
- self.proc.LogStep(6, steps_total, "removing old storage")
- for idx, (dev, old_lvs, _) in iv_names.iteritems():
- info("remove logical volumes for disk/%d" % idx)
- for lv in old_lvs:
- cfg.SetDiskID(lv, old_node)
- msg = self.rpc.call_blockdev_remove(old_node, lv).fail_msg
- if msg:
- warning("Can't remove LV on old secondary: %s", msg,
- hint="Cleanup stale volumes by hand")
+ self.lu.LogWarning("Can't attach drbd disks on node %s: %s", to_node, msg,
+ hint=("please do a gnt-instance info to see the"
+ " status of disks"))
- def Exec(self, feedback_fn):
- """Execute disk replacement.
+ # Wait for sync
+ # This can fail as the old devices are degraded and _WaitForSync
+ # does a combined result over all disks, so we don't check its return value
+ self.lu.LogStep(5, steps_total, "Sync devices")
+ _WaitForSync(self.lu, self.instance, unlock=True)
- This dispatches the disk replacement to the appropriate handler.
+ # Check all devices manually
+ self._CheckDevices(self.instance.primary_node, iv_names)
- """
- instance = self.instance
+ # Step: remove old storage
+ self.lu.LogStep(6, steps_total, "Removing old storage")
+ self._RemoveOldStorage(self.target_node, iv_names)
- # Activate the instance disks if we're replacing them on a down instance
- if not instance.admin_up:
- _StartInstanceDisks(self, instance, True)
- if self.op.mode == constants.REPLACE_DISK_CHG:
- fn = self._ExecD8Secondary
- else:
- fn = self._ExecD8DiskOnly
+class LURepairNodeStorage(NoHooksLU):
+ """Repairs the volume group on a node.
+
+ """
+ _OP_REQP = ["node_name"]
+ REQ_BGL = False
+
+ def CheckArguments(self):
+ node_name = self.cfg.ExpandNodeName(self.op.node_name)
+ if node_name is None:
+ raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
+
+ self.op.node_name = node_name
+
+ def ExpandNames(self):
+ self.needed_locks = {
+ locking.LEVEL_NODE: [self.op.node_name],
+ }
+
+ def _CheckFaultyDisks(self, instance, node_name):
+ if _FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
+ node_name, True):
+ raise errors.OpPrereqError("Instance '%s' has faulty disks on"
+ " node '%s'" % (inst.name, node_name))
+
+ def CheckPrereq(self):
+ """Check prerequisites.
+
+ """
+ storage_type = self.op.storage_type
+
+ if (constants.SO_FIX_CONSISTENCY not in
+ constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
+ raise errors.OpPrereqError("Storage units of type '%s' can not be"
+ " repaired" % storage_type)
- ret = fn(feedback_fn)
+ # Check whether any instance on this node has faulty disks
+ for inst in _GetNodeInstances(self.cfg, self.op.node_name):
+ check_nodes = set(inst.all_nodes)
+ check_nodes.discard(self.op.node_name)
+ for inst_node_name in check_nodes:
+ self._CheckFaultyDisks(inst, inst_node_name)
- # Deactivate the instance disks if we're replacing them on a down instance
- if not instance.admin_up:
- _SafeShutdownInstanceDisks(self, instance)
+ def Exec(self, feedback_fn):
+ feedback_fn("Repairing storage unit '%s' on %s ..." %
+ (self.op.name, self.op.node_name))
- return ret
+ st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
+ result = self.rpc.call_storage_execute(self.op.node_name,
+ self.op.storage_type, st_args,
+ self.op.name,
+ constants.SO_FIX_CONSISTENCY)
+ result.Raise("Failed to repair storage unit '%s' on %s" %
+ (self.op.name, self.op.node_name))
class LUGrowDisk(LogicalUnit):
def ExpandNames(self):
self.needed_locks = {}
- self.share_locks = dict(((i, 1) for i in locking.LEVELS))
+ self.share_locks = dict.fromkeys(locking.LEVELS, 1)
if not isinstance(self.op.instances, list):
raise errors.OpPrereqError("Invalid argument type 'instances'")
in self.wanted_names]
return
+ def _ComputeBlockdevStatus(self, node, instance_name, dev):
+ """Returns the status of a block device
+
+ """
+ if self.op.static or not node:
+ return None
+
+ self.cfg.SetDiskID(dev, node)
+
+ result = self.rpc.call_blockdev_find(node, dev)
+ if result.offline:
+ return None
+
+ result.Raise("Can't compute disk status for %s" % instance_name)
+
+ status = result.payload
+ if status is None:
+ return None
+
+ return (status.dev_path, status.major, status.minor,
+ status.sync_percent, status.estimated_time,
+ status.is_degraded, status.ldisk_status)
+
def _ComputeDiskStatus(self, instance, snode, dev):
"""Compute block device status.
"""
- static = self.op.static
- if not static:
- self.cfg.SetDiskID(dev, instance.primary_node)
- dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
- if dev_pstatus.offline:
- dev_pstatus = None
- else:
- dev_pstatus.Raise("Can't compute disk status for %s" % instance.name)
- dev_pstatus = dev_pstatus.payload
- else:
- dev_pstatus = None
-
if dev.dev_type in constants.LDS_DRBD:
# we change the snode then (otherwise we use the one passed in)
if dev.logical_id[0] == instance.primary_node:
else:
snode = dev.logical_id[0]
- if snode and not static:
- self.cfg.SetDiskID(dev, snode)
- dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
- if dev_sstatus.offline:
- dev_sstatus = None
- else:
- dev_sstatus.Raise("Can't compute disk status for %s" % instance.name)
- dev_sstatus = dev_sstatus.payload
- else:
- dev_sstatus = None
+ dev_pstatus = self._ComputeBlockdevStatus(instance.primary_node,
+ instance.name, dev)
+ dev_sstatus = self._ComputeBlockdevStatus(snode, instance.name, dev)
if dev.children:
dev_children = [self._ComputeDiskStatus(instance, snode, child)
"sstatus": dev_sstatus,
"children": dev_children,
"mode": dev.mode,
+ "size": dev.size,
}
return data
"pnode": instance.primary_node,
"snodes": instance.secondary_nodes,
"os": instance.os,
- "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
+ # this happens to be the same format used for hooks
+ "nics": _NICListToTuple(self, instance.nics),
"disks": disks,
"hypervisor": instance.hypervisor,
"network_port": instance.network_port,
"hv_actual": cluster.FillHV(instance),
"be_instance": instance.beparams,
"be_actual": cluster.FillBE(instance),
+ "serial_no": instance.serial_no,
+ "mtime": instance.mtime,
+ "ctime": instance.ctime,
}
result[instance.name] = idict
else:
if not isinstance(disk_op, int):
raise errors.OpPrereqError("Invalid disk index")
+ if not isinstance(disk_dict, dict):
+ msg = "Invalid disk value: expected dict, got '%s'" % disk_dict
+ raise errors.OpPrereqError(msg)
+
if disk_op == constants.DDM_ADD:
mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
if mode not in constants.DISK_ACCESS_SET:
else:
if not isinstance(nic_op, int):
raise errors.OpPrereqError("Invalid nic index")
+ if not isinstance(nic_dict, dict):
+ msg = "Invalid nic value: expected dict, got '%s'" % nic_dict
+ raise errors.OpPrereqError(msg)
# nic_dict should be a dict
nic_ip = nic_dict.get('ip', None)
nic_bridge = nic_dict.get('bridge', None)
nic_link = nic_dict.get('link', None)
if nic_bridge and nic_link:
- raise errors.OpPrereqError("Cannot pass 'bridge' and 'link' at the same time")
+ raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
+ " at the same time")
elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
nic_dict['bridge'] = None
elif nic_link and nic_link.lower() == constants.VALUE_NONE:
"""Return the new params dict for the given params.
@type old_params: dict
- @type old_params: old parameters
+ @param old_params: old parameters
@type update_dict: dict
- @type update_dict: dict containing new parameter values,
- or constants.VALUE_DEFAULT to reset the
- parameter to its default value
+ @param update_dict: dict containing new parameter values,
+ or constants.VALUE_DEFAULT to reset the
+ parameter to its default value
@type default_values: dict
@param default_values: default values for the filled parameters
@type parameter_types: dict
This only checks the instance list against the existing names.
"""
- force = self.force = self.op.force
+ self.force = self.op.force
# checking the new params on the primary/secondary nodes
# remove it from its current node. In the future we could fix this by:
# - making a tasklet to search (share-lock all), then create the new one,
# then one to remove, after
- # - removing the removal operation altoghether
+ # - removing the removal operation altogether
self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
def DeclareLocks(self, level):
for disk in instance.disks:
self.cfg.SetDiskID(disk, src_node)
+ # per-disk results
+ dresults = []
try:
- for disk in instance.disks:
+ for idx, disk in enumerate(instance.disks):
# result.payload will be a snapshot of an lvm leaf of the one we passed
result = self.rpc.call_blockdev_snapshot(src_node, disk)
msg = result.fail_msg
if msg:
- self.LogWarning("Could not snapshot block device %s on node %s: %s",
- disk.logical_id[1], src_node, msg)
+ self.LogWarning("Could not snapshot disk/%s on node %s: %s",
+ idx, src_node, msg)
snap_disks.append(False)
else:
disk_id = (vgname, result.payload)
instance, cluster_name, idx)
msg = result.fail_msg
if msg:
- self.LogWarning("Could not export block device %s from node %s to"
- " node %s: %s", dev.logical_id[1], src_node,
- dst_node.name, msg)
+ self.LogWarning("Could not export disk/%s from node %s to"
+ " node %s: %s", idx, src_node, dst_node.name, msg)
+ dresults.append(False)
+ else:
+ dresults.append(True)
msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg
if msg:
- self.LogWarning("Could not remove snapshot block device %s from node"
- " %s: %s", dev.logical_id[1], src_node, msg)
+ self.LogWarning("Could not remove snapshot for disk/%d from node"
+ " %s: %s", idx, src_node, msg)
+ else:
+ dresults.append(False)
result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
+ fin_resu = True
msg = result.fail_msg
if msg:
self.LogWarning("Could not finalize export for instance %s"
" on node %s: %s", instance.name, dst_node.name, msg)
+ fin_resu = False
nodelist = self.cfg.GetNodeList()
nodelist.remove(dst_node.name)
if msg:
self.LogWarning("Could not remove older export for instance %s"
" on node %s: %s", iname, node, msg)
+ return fin_resu, dresults
class LURemoveExport(NoHooksLU):
"relocate_from",
]
- def __init__(self, lu, mode, name, **kwargs):
- self.lu = lu
+ def __init__(self, cfg, rpc, mode, name, **kwargs):
+ self.cfg = cfg
+ self.rpc = rpc
# init buffer variables
self.in_text = self.out_text = self.in_data = self.out_data = None
# init all input fields so that pylint is happy
This is the data that is independent of the actual operation.
"""
- cfg = self.lu.cfg
+ cfg = self.cfg
cluster_info = cfg.GetClusterInfo()
# cluster data
data = {
elif self.mode == constants.IALLOCATOR_MODE_RELOC:
hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
- node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
- hypervisor_name)
- node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
- cluster_info.enabled_hypervisors)
+ node_data = self.rpc.call_node_info(node_list, cfg.GetVGName(),
+ hypervisor_name)
+ node_iinfo = \
+ self.rpc.call_all_instances_info(node_list,
+ cluster_info.enabled_hypervisors)
for nname, nresult in node_data.items():
# first fill in static (config-based) values
ninfo = cfg.GetNodeInfo(nname)
"master_candidate": ninfo.master_candidate,
}
- if not ninfo.offline:
+ if not (ninfo.offline or ninfo.drained):
nresult.Raise("Can't get data for node %s" % nname)
node_iinfo[nname].Raise("Can't get node instance info from node %s" %
nname)
remote_info = nresult.payload
+
for attr in ['memory_total', 'memory_free', 'memory_dom0',
'vg_size', 'vg_free', 'cpu_total']:
if attr not in remote_info:
done.
"""
- instance = self.lu.cfg.GetInstanceInfo(self.name)
+ instance = self.cfg.GetInstanceInfo(self.name)
if instance is None:
raise errors.ProgrammerError("Unknown instance '%s' passed to"
" IAllocator" % self.name)
"""
if call_fn is None:
- call_fn = self.lu.rpc.call_iallocator_runner
- data = self.in_text
+ call_fn = self.rpc.call_iallocator_runner
- result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
+ result = call_fn(self.cfg.GetMasterNode(), name, self.in_text)
result.Raise("Failure while running the iallocator script")
self.out_text = result.payload
"""
if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
- ial = IAllocator(self,
+ ial = IAllocator(self.cfg, self.rpc,
mode=self.op.mode,
name=self.op.name,
mem_size=self.op.mem_size,
hypervisor=self.op.hypervisor,
)
else:
- ial = IAllocator(self,
+ ial = IAllocator(self.cfg, self.rpc,
mode=self.op.mode,
name=self.op.name,
relocate_from=list(self.relocate_from),