from ganeti import qlang
from ganeti import opcodes
from ganeti import ht
+from ganeti import rpc
import ganeti.masterd.instance # pylint: disable=W0611
+#: Size of DRBD meta block device
+DRBD_META_SIZE = 128
+
+
class ResultWithJobs:
"""Data container for LU results with jobs.
HTYPE = None
REQ_BGL = True
- def __init__(self, processor, op, context, rpc):
+ def __init__(self, processor, op, context, rpc_runner):
"""Constructor for LogicalUnit.
This needs to be overridden in derived classes in order to check op
# readability alias
self.owned_locks = context.glm.list_owned
self.context = context
- self.rpc = rpc
+ self.rpc = rpc_runner
# Dicts used to declare locking needs to mcpu
self.needed_locks = None
self.share_locks = dict.fromkeys(locking.LEVELS, 0)
self.op.instance_name)
self.needed_locks[locking.LEVEL_INSTANCE] = self.op.instance_name
- def _LockInstancesNodes(self, primary_only=False):
+ def _LockInstancesNodes(self, primary_only=False,
+ level=locking.LEVEL_NODE):
"""Helper function to declare instances' nodes for locking.
This function should be called after locking one or more instances to lock
@type primary_only: boolean
@param primary_only: only lock primary nodes of locked instances
+ @param level: Which lock level to use for locking nodes
"""
- assert locking.LEVEL_NODE in self.recalculate_locks, \
+ assert level in self.recalculate_locks, \
"_LockInstancesNodes helper function called with no nodes to recalculate"
# TODO: check if we're really been called with the instance locks held
if not primary_only:
wanted_nodes.extend(instance.secondary_nodes)
- if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
- self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
- elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
- self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
+ if self.recalculate_locks[level] == constants.LOCKS_REPLACE:
+ self.needed_locks[level] = wanted_nodes
+ elif self.recalculate_locks[level] == constants.LOCKS_APPEND:
+ self.needed_locks[level].extend(wanted_nodes)
+ else:
+ raise errors.ProgrammerError("Unknown recalculation mode")
- del self.recalculate_locks[locking.LEVEL_NODE]
+ del self.recalculate_locks[level]
class NoHooksLU(LogicalUnit): # pylint: disable=W0223
"""Runs the post-hook for an opcode on a single node.
"""
- hm = lu.proc.hmclass(lu.rpc.call_hooks_runner, lu)
+ hm = lu.proc.BuildHooksManager(lu)
try:
hm.RunPhase(constants.HOOKS_PHASE_POST, nodes=[node_name])
except:
return []
-def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
+def _FindFaultyInstanceDisks(cfg, rpc_runner, instance, node_name, prereq):
faulty = []
for dev in instance.disks:
cfg.SetDiskID(dev, node_name)
- result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
+ result = rpc_runner.call_blockdev_getmirrorstatus(node_name, instance.disks)
result.Raise("Failed to get disk status from node %s" % node_name,
prereq=prereq, ecode=errors.ECODE_ENVIRON)
"""Destroys the cluster.
"""
- master = self.cfg.GetMasterNode()
+ master_params = self.cfg.GetMasterNetworkParameters()
# Run post hooks on master node before it's removed
- _RunPostHook(self, master)
+ _RunPostHook(self, master_params.name)
- result = self.rpc.call_node_deactivate_master_ip(master)
+ result = self.rpc.call_node_deactivate_master_ip(master_params.name,
+ master_params)
result.Raise("Could not disable the master role")
- return master
+ return master_params.name
def _VerifyCertificate(filename):
# any leftover items in nv_dict are missing LVs, let's arrange the data
# better
for key, inst in nv_dict.iteritems():
- res_missing.setdefault(inst, []).append(key)
+ res_missing.setdefault(inst, []).append(list(key))
return (res_nodes, list(res_instances), res_missing)
"""
clustername = self.op.name
- ip = self.ip
+ new_ip = self.ip
# shutdown the master IP
- master = self.cfg.GetMasterNode()
- result = self.rpc.call_node_deactivate_master_ip(master)
+ master_params = self.cfg.GetMasterNetworkParameters()
+ result = self.rpc.call_node_deactivate_master_ip(master_params.name,
+ master_params)
result.Raise("Could not disable the master role")
try:
cluster = self.cfg.GetClusterInfo()
cluster.cluster_name = clustername
- cluster.master_ip = ip
+ cluster.master_ip = new_ip
self.cfg.Update(cluster, feedback_fn)
# update the known hosts file
ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
node_list = self.cfg.GetOnlineNodeList()
try:
- node_list.remove(master)
+ node_list.remove(master_params.name)
except ValueError:
pass
_UploadHelper(self, node_list, constants.SSH_KNOWN_HOSTS_FILE)
finally:
- result = self.rpc.call_node_activate_master_ip(master)
+ master_params.ip = new_ip
+ result = self.rpc.call_node_activate_master_ip(master_params.name,
+ master_params)
msg = result.fail_msg
if msg:
self.LogWarning("Could not re-enable the master role on"
helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
if self.op.master_netdev:
- master = self.cfg.GetMasterNode()
+ master_params = self.cfg.GetMasterNetworkParameters()
feedback_fn("Shutting down master ip on the current netdev (%s)" %
self.cluster.master_netdev)
- result = self.rpc.call_node_deactivate_master_ip(master)
+ result = self.rpc.call_node_deactivate_master_ip(master_params.name,
+ master_params)
result.Raise("Could not disable the master ip")
feedback_fn("Changing master_netdev from %s to %s" %
- (self.cluster.master_netdev, self.op.master_netdev))
+ (master_params.netdev, self.op.master_netdev))
self.cluster.master_netdev = self.op.master_netdev
if self.op.master_netmask:
- master = self.cfg.GetMasterNode()
+ master_params = self.cfg.GetMasterNetworkParameters()
feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
- result = self.rpc.call_node_change_master_netmask(master,
- self.op.master_netmask)
+ result = self.rpc.call_node_change_master_netmask(master_params.name,
+ master_params.netmask,
+ self.op.master_netmask,
+ master_params.ip,
+ master_params.netdev)
if result.fail_msg:
msg = "Could not change the master IP netmask: %s" % result.fail_msg
- self.LogWarning(msg)
feedback_fn(msg)
- else:
- self.cluster.master_netmask = self.op.master_netmask
+
+ self.cluster.master_netmask = self.op.master_netmask
self.cfg.Update(self.cluster, feedback_fn)
if self.op.master_netdev:
+ master_params = self.cfg.GetMasterNetworkParameters()
feedback_fn("Starting the master ip on the new master netdev (%s)" %
self.op.master_netdev)
- result = self.rpc.call_node_activate_master_ip(master)
+ result = self.rpc.call_node_activate_master_ip(master_params.name,
+ master_params)
if result.fail_msg:
self.LogWarning("Could not re-enable the master ip on"
" the master, please restart manually: %s",
"""Activate the master IP.
"""
- master = self.cfg.GetMasterNode()
- self.rpc.call_node_activate_master_ip(master)
+ master_params = self.cfg.GetMasterNetworkParameters()
+ self.rpc.call_node_activate_master_ip(master_params.name,
+ master_params)
class LUClusterDeactivateMasterIp(NoHooksLU):
"""Deactivate the master IP.
"""
- master = self.cfg.GetMasterNode()
- self.rpc.call_node_deactivate_master_ip(master)
+ master_params = self.cfg.GetMasterNetworkParameters()
+ self.rpc.call_node_deactivate_master_ip(master_params.name, master_params)
def _WaitForSync(lu, instance, disks=None, oneshot=False):
modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
+ assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
+ "Not owning BGL"
+
# Promote nodes to master candidate as needed
_AdjustCandidatePool(self, exceptions=[node.name])
self.context.RemoveNode(node.name)
new_node = self.new_node
node = new_node.name
+ assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
+ "Not owning BGL"
+
# We adding a new node so we assume it's powered
new_node.powered = True
self.lock_all = self.op.auto_promote and self.might_demote
self.lock_instances = self.op.secondary_ip is not None
+ def _InstanceFilter(self, instance):
+ """Filter for getting affected instances.
+
+ """
+ return (instance.disk_template in constants.DTS_INT_MIRROR and
+ self.op.node_name in instance.all_nodes)
+
def ExpandNames(self):
if self.lock_all:
self.needed_locks = {locking.LEVEL_NODE: locking.ALL_SET}
self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
if self.lock_instances:
- self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
-
- def DeclareLocks(self, level):
- # If we have locked all instances, before waiting to lock nodes, release
- # all the ones living on nodes unrelated to the current operation.
- if level == locking.LEVEL_NODE and self.lock_instances:
- self.affected_instances = []
- if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
- instances_keep = []
-
- # Build list of instances to release
- locked_i = self.owned_locks(locking.LEVEL_INSTANCE)
- for instance_name, instance in self.cfg.GetMultiInstanceInfo(locked_i):
- if (instance.disk_template in constants.DTS_INT_MIRROR and
- self.op.node_name in instance.all_nodes):
- instances_keep.append(instance_name)
- self.affected_instances.append(instance)
-
- _ReleaseLocks(self, locking.LEVEL_INSTANCE, keep=instances_keep)
-
- assert (set(self.owned_locks(locking.LEVEL_INSTANCE)) ==
- set(instances_keep))
+ self.needed_locks[locking.LEVEL_INSTANCE] = \
+ frozenset(self.cfg.GetInstancesInfoByFilter(self._InstanceFilter))
def BuildHooksEnv(self):
"""Build hooks env.
"""
node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
+ if self.lock_instances:
+ affected_instances = \
+ self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)
+
+ # Verify instance locks
+ owned_instances = self.owned_locks(locking.LEVEL_INSTANCE)
+ wanted_instances = frozenset(affected_instances.keys())
+ if wanted_instances - owned_instances:
+ raise errors.OpPrereqError("Instances affected by changing node %s's"
+ " secondary IP address have changed since"
+ " locks were acquired, wanted '%s', have"
+ " '%s'; retry the operation" %
+ (self.op.node_name,
+ utils.CommaJoin(wanted_instances),
+ utils.CommaJoin(owned_instances)),
+ errors.ECODE_STATE)
+ else:
+ affected_instances = None
+
if (self.op.master_candidate is not None or
self.op.drained is not None or
self.op.offline is not None):
if old_role == self._ROLE_OFFLINE and new_role != old_role:
# Trying to transition out of offline status
- result = self.rpc.call_version([node.name])[node.name]
+ # TODO: Use standard RPC runner, but make sure it works when the node is
+ # still marked offline
+ result = rpc.BootstrapRunner().call_version([node.name])[node.name]
if result.fail_msg:
raise errors.OpPrereqError("Node %s is being de-offlined but fails"
" to report its version: %s" %
raise errors.OpPrereqError("Cannot change the secondary ip on a single"
" homed cluster", errors.ECODE_INVAL)
+ assert not (frozenset(affected_instances) -
+ self.owned_locks(locking.LEVEL_INSTANCE))
+
if node.offline:
- if self.affected_instances:
- raise errors.OpPrereqError("Cannot change secondary ip: offline"
- " node has instances (%s) configured"
- " to use it" % self.affected_instances)
+ if affected_instances:
+ raise errors.OpPrereqError("Cannot change secondary IP address:"
+ " offline node has instances (%s)"
+ " configured to use it" %
+ utils.CommaJoin(affected_instances.keys()))
else:
# On online nodes, check that no instances are running, and that
# the node has the new ip and we can reach it.
- for instance in self.affected_instances:
+ for instance in affected_instances.values():
_CheckInstanceDown(self, instance, "cannot change secondary ip")
_CheckNodeHasSecondaryIP(self, node.name, self.op.secondary_ip, True)
"candidate_pool_size": cluster.candidate_pool_size,
"master_netdev": cluster.master_netdev,
"master_netmask": cluster.master_netmask,
+ "use_external_mip_script": cluster.use_external_mip_script,
"volume_group_name": cluster.volume_group_name,
"drbd_usermode_helper": cluster.drbd_usermode_helper,
"file_storage_dir": cluster.file_storage_dir,
_StartInstanceDisks(self, instance, force)
- result = self.rpc.call_instance_start(node_current, instance,
- self.op.hvparams, self.op.beparams,
- self.op.startup_paused)
+ result = \
+ self.rpc.call_instance_start(node_current,
+ (instance, self.op.hvparams,
+ self.op.beparams),
+ self.op.startup_paused)
msg = result.fail_msg
if msg:
_ShutdownInstanceDisks(self, instance)
self.LogInfo("Instance %s was already stopped, starting now",
instance.name)
_StartInstanceDisks(self, instance, ignore_secondaries)
- result = self.rpc.call_instance_start(node_current, instance,
- None, None, False)
+ result = self.rpc.call_instance_start(node_current,
+ (instance, None, None), False)
msg = result.fail_msg
if msg:
_ShutdownInstanceDisks(self, instance)
try:
feedback_fn("Running the instance OS create scripts...")
# FIXME: pass debug option from opcode to backend
- result = self.rpc.call_instance_os_add(inst.primary_node, inst, True,
- self.op.debug_level,
- osparams=self.os_inst)
+ result = self.rpc.call_instance_os_add(inst.primary_node,
+ (inst, self.os_inst), True,
+ self.op.debug_level)
result.Raise("Could not install OS for instance %s on node %s" %
(inst.name, inst.primary_node))
finally:
_ShutdownInstanceDisks(self, instance)
raise errors.OpExecError("Can't activate the instance's disks")
- result = self.rpc.call_instance_start(target_node, instance,
- None, None, False)
+ result = self.rpc.call_instance_start(target_node,
+ (instance, None, None), False)
msg = result.fail_msg
if msg:
_ShutdownInstanceDisks(self, instance)
self.feedback_fn("* starting the instance on the target node %s" %
target_node)
- result = self.rpc.call_instance_start(target_node, instance, None, None,
+ result = self.rpc.call_instance_start(target_node, (instance, None, None),
False)
msg = result.fail_msg
if msg:
shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
logical_id=(vgnames[0], names[0]))
- dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
+ dev_meta = objects.Disk(dev_type=constants.LD_LV, size=DRBD_META_SIZE,
logical_id=(vgnames[1], names[1]))
drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
logical_id=(primary, secondary, port,
constants.DT_DISKLESS: {},
constants.DT_PLAIN: _compute(disks, 0),
# 128 MB are added for drbd metadata for each disk
- constants.DT_DRBD8: _compute(disks, 128),
+ constants.DT_DRBD8: _compute(disks, DRBD_META_SIZE),
constants.DT_FILE: {},
constants.DT_SHARED_FILE: {},
}
constants.DT_DISKLESS: None,
constants.DT_PLAIN: sum(d[constants.IDISK_SIZE] for d in disks),
# 128 MB are added for drbd metadata for each disk
- constants.DT_DRBD8: sum(d[constants.IDISK_SIZE] + 128 for d in disks),
+ constants.DT_DRBD8:
+ sum(d[constants.IDISK_SIZE] + DRBD_META_SIZE for d in disks),
constants.DT_FILE: None,
constants.DT_SHARED_FILE: 0,
constants.DT_BLOCK: 0,
"""
nodenames = _FilterVmNodes(lu, nodenames)
- hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
- hvname,
- hvparams)
+
+ cluster = lu.cfg.GetClusterInfo()
+ hvfull = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
+
+ hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames, hvname, hvfull)
for node in nodenames:
info = hvinfo[node]
if info.offline:
feedback_fn("* running the instance OS create scripts...")
# FIXME: pass debug option from opcode to backend
os_add_result = \
- self.rpc.call_instance_os_add(pnode_name, iobj, False,
+ self.rpc.call_instance_os_add(pnode_name, (iobj, None), False,
self.op.debug_level)
if pause_sync:
feedback_fn("* resuming disk sync")
self.cfg.Update(iobj, feedback_fn)
logging.info("Starting instance %s on node %s", instance, pnode_name)
feedback_fn("* starting instance...")
- result = self.rpc.call_instance_start(pnode_name, iobj,
- None, None, False)
+ result = self.rpc.call_instance_start(pnode_name, (iobj, None, None),
+ False)
result.Raise("Could not start instance")
return list(iobj.all_nodes)
lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
logical_id=(vg_data, names[0]))
vg_meta = dev.children[1].logical_id[0]
- lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
+ lv_meta = objects.Disk(dev_type=constants.LD_LV, size=DRBD_META_SIZE,
logical_id=(vg_meta, names[1]))
new_lvs = [lv_data, lv_meta]
not self.op.remove_instance):
assert not activate_disks
feedback_fn("Starting instance %s" % instance.name)
- result = self.rpc.call_instance_start(src_node, instance,
- None, None, False)
+ result = self.rpc.call_instance_start(src_node,
+ (instance, None, None), False)
msg = result.fail_msg
if msg:
feedback_fn("Failed to start instance: %s" % msg)
# pylint: disable=R0902
# lots of instance attributes
- def __init__(self, cfg, rpc, mode, **kwargs):
+ def __init__(self, cfg, rpc_runner, mode, **kwargs):
self.cfg = cfg
- self.rpc = rpc
+ self.rpc = rpc_runner
# init buffer variables
self.in_text = self.out_text = self.in_data = self.out_data = None
# init all input fields so that pylint is happy