import platform
import logging
import copy
+import random
from ganeti import ssh
from ganeti import utils
from ganeti import objects
from ganeti import opcodes
from ganeti import serializer
+from ganeti import ssconf
class LogicalUnit(object):
- implement BuildHooksEnv
- redefine HPATH and HTYPE
- optionally redefine their run requirements:
- REQ_MASTER: the LU needs to run on the master node
REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
Note that all commands require root permissions.
HPATH = None
HTYPE = None
_OP_REQP = []
- REQ_MASTER = True
REQ_BGL = True
def __init__(self, processor, op, context, rpc):
if attr_val is None:
raise errors.OpPrereqError("Required parameter '%s' missing" %
attr_name)
-
- if not self.cfg.IsCluster():
- raise errors.OpPrereqError("Cluster not initialized yet,"
- " use 'gnt-cluster init' first.")
- if self.REQ_MASTER:
- master = self.cfg.GetMasterNode()
- if master != utils.HostInfo().name:
- raise errors.OpPrereqError("Commands must be run on the master"
- " node %s" % master)
+ self.CheckArguments()
def __GetSSH(self):
"""Returns the SshRunner object
ssh = property(fget=__GetSSH)
+ def CheckArguments(self):
+ """Check syntactic validity for the opcode arguments.
+
+ This method is for doing a simple syntactic check and ensure
+ validity of opcode parameters, without any cluster-related
+ checks. While the same can be accomplished in ExpandNames and/or
+ CheckPrereq, doing these separate is better because:
+
+ - ExpandNames is left as as purely a lock-related function
+ - CheckPrereq is run after we have aquired locks (and possible
+ waited for them)
+
+ The function is allowed to change the self.op attribute so that
+ later methods can no longer worry about missing parameters.
+
+ """
+ pass
+
def ExpandNames(self):
"""Expand names for this LU.
LUs which implement this method must also populate the self.needed_locks
member, as a dict with lock levels as keys, and a list of needed lock names
as values. Rules:
- - Use an empty dict if you don't need any lock
- - If you don't need any lock at a particular level omit that level
- - Don't put anything for the BGL level
- - If you want all locks at a level use locking.ALL_SET as a value
+
+ - use an empty dict if you don't need any lock
+ - if you don't need any lock at a particular level omit that level
+ - don't put anything for the BGL level
+ - if you want all locks at a level use locking.ALL_SET as a value
If you need to share locks (rather than acquire them exclusively) at one
level you can modify self.share_locks, setting a true value (usually 1) for
that level. By default locks are not shared.
- Examples:
- # Acquire all nodes and one instance
- self.needed_locks = {
- locking.LEVEL_NODE: locking.ALL_SET,
- locking.LEVEL_INSTANCE: ['instance1.example.tld'],
- }
- # Acquire just two nodes
- self.needed_locks = {
- locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
- }
- # Acquire no locks
- self.needed_locks = {} # No, you can't leave it to the default value None
+ Examples::
+
+ # Acquire all nodes and one instance
+ self.needed_locks = {
+ locking.LEVEL_NODE: locking.ALL_SET,
+ locking.LEVEL_INSTANCE: ['instance1.example.tld'],
+ }
+ # Acquire just two nodes
+ self.needed_locks = {
+ locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
+ }
+ # Acquire no locks
+ self.needed_locks = {} # No, you can't leave it to the default value None
"""
# The implementation of this method is mandatory only if the new LU is
previous result is passed back unchanged but any LU can define it if it
wants to use the local cluster hook-scripts somehow.
- Args:
- phase: the hooks phase that has just been run
- hooks_results: the results of the multi-node hooks rpc call
- feedback_fn: function to send feedback back to the caller
- lu_result: the previous result this LU had, or None in the PRE phase.
+ @param phase: one of L{constants.HOOKS_PHASE_POST} or
+ L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
+ @param hook_results: the results of the multi-node hooks rpc call
+ @param feedback_fn: function used send feedback back to the caller
+ @param lu_result: the previous Exec result this LU had, or None
+ in the PRE phase
+ @return: the new Exec result, based on the previous result
+ and hook results
"""
return lu_result
In the future it may grow parameters to just lock some instance's nodes, or
to just lock primaries or secondary nodes, if needed.
- If should be called in DeclareLocks in a way similar to:
+ If should be called in DeclareLocks in a way similar to::
- if level == locking.LEVEL_NODE:
- self._LockInstancesNodes()
+ if level == locking.LEVEL_NODE:
+ self._LockInstancesNodes()
@type primary_only: boolean
@param primary_only: only lock primary nodes of locked instances
def _GetWantedNodes(lu, nodes):
"""Returns list of checked and expanded node names.
- Args:
- nodes: List of nodes (strings) or None for all
+ @type lu: L{LogicalUnit}
+ @param lu: the logical unit on whose behalf we execute
+ @type nodes: list
+ @param nodes: list of node names or None for all nodes
+ @rtype: list
+ @return: the list of nodes, sorted
+ @raise errors.OpProgrammerError: if the nodes parameter is wrong type
"""
if not isinstance(nodes, list):
def _GetWantedInstances(lu, instances):
"""Returns list of checked and expanded instance names.
- Args:
- instances: List of instances (strings) or None for all
+ @type lu: L{LogicalUnit}
+ @param lu: the logical unit on whose behalf we execute
+ @type instances: list
+ @param instances: list of instance names or None for all instances
+ @rtype: list
+ @return: the list of instances, sorted
+ @raise errors.OpPrereqError: if the instances parameter is wrong type
+ @raise errors.OpPrereqError: if any of the passed instances is not found
"""
if not isinstance(instances, list):
def _CheckOutputFields(static, dynamic, selected):
"""Checks whether all selected fields are valid.
- Args:
- static: Static fields
- dynamic: Dynamic fields
+ @type static: L{utils.FieldSet}
+ @param static: static fields set
+ @type dynamic: L{utils.FieldSet}
+ @param dynamic: dynamic fields set
"""
- static_fields = frozenset(static)
- dynamic_fields = frozenset(dynamic)
-
- all_fields = static_fields | dynamic_fields
+ f = utils.FieldSet()
+ f.Extend(static)
+ f.Extend(dynamic)
- if not all_fields.issuperset(selected):
+ delta = f.NonMatching(selected)
+ if delta:
raise errors.OpPrereqError("Unknown output fields selected: %s"
- % ",".join(frozenset(selected).
- difference(all_fields)))
+ % ",".join(delta))
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
memory, vcpus, nics):
- """Builds instance related env variables for hooks from single variables.
+ """Builds instance related env variables for hooks
+
+ This builds the hook environment from individual variables.
+
+ @type name: string
+ @param name: the name of the instance
+ @type primary_node: string
+ @param primary_node: the name of the instance's primary node
+ @type secondary_nodes: list
+ @param secondary_nodes: list of secondary nodes as strings
+ @type os_type: string
+ @param os_type: the name of the instance's OS
+ @type status: string
+ @param status: the desired status of the instances
+ @type memory: string
+ @param memory: the memory size of the instance
+ @type vcpus: string
+ @param vcpus: the count of VCPUs the instance has
+ @type nics: list
+ @param nics: list of tuples (ip, bridge, mac) representing
+ the NICs the instance has
+ @rtype: dict
+ @return: the hook environment for this instance
- Args:
- secondary_nodes: List of secondary nodes as strings
"""
env = {
"OP_TARGET": name,
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
"""Builds instance related env variables for hooks from an object.
- Args:
- instance: objects.Instance object of instance
- override: dict of values to override
+ @type lu: L{LogicalUnit}
+ @param lu: the logical unit on whose behalf we execute
+ @type instance: L{objects.Instance}
+ @param instance: the instance for which we should build the
+ environment
+ @type override: dict
+ @param override: dictionary with key/values that will override
+ our values
+ @rtype: dict
+ @return: the hook environment dictionary
+
"""
bep = lu.cfg.GetClusterInfo().FillBE(instance)
args = {
"""
# check bridges existance
brlist = [nic.bridge for nic in instance.nics]
- if not lu.rpc.call_bridges_exist(instance.primary_node, brlist):
- raise errors.OpPrereqError("one or more target bridges %s does not"
+ result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
+ result.Raise()
+ if not result.data:
+ raise errors.OpPrereqError("One or more target bridges %s does not"
" exist on destination node '%s'" %
(brlist, instance.primary_node))
"""
master = self.cfg.GetMasterNode()
- if not self.rpc.call_node_stop_master(master, False):
+ result = self.rpc.call_node_stop_master(master, False)
+ result.Raise()
+ if not result.data:
raise errors.OpExecError("Could not disable the master role")
priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
utils.CreateBackup(priv_key)
}
self.share_locks = dict(((i, 1) for i in locking.LEVELS))
- def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
- remote_version, feedback_fn):
+ def _VerifyNode(self, nodeinfo, file_list, local_cksum,
+ node_result, feedback_fn, master_files):
"""Run multiple tests against a node.
Test list:
+
- compares ganeti version
- checks vg existance and size > 20G
- checks config file checksum
- checks ssh to other nodes
- Args:
- node: name of the node to check
- file_list: required list of files
- local_cksum: dictionary of local files and their checksums
+ @type nodeinfo: L{objects.Node}
+ @param nodeinfo: the node to check
+ @param file_list: required list of files
+ @param local_cksum: dictionary of local files and their checksums
+ @param node_result: the results from the node
+ @param feedback_fn: function used to accumulate results
+ @param master_files: list of files that only masters should have
"""
+ node = nodeinfo.name
+
+ # main result, node_result should be a non-empty dict
+ if not node_result or not isinstance(node_result, dict):
+ feedback_fn(" - ERROR: unable to verify node %s." % (node,))
+ return True
+
# compares ganeti version
local_version = constants.PROTOCOL_VERSION
+ remote_version = node_result.get('version', None)
if not remote_version:
feedback_fn(" - ERROR: connection to %s failed" % (node))
return True
# checks vg existance and size > 20G
bad = False
+ vglist = node_result.get(constants.NV_VGLIST, None)
if not vglist:
feedback_fn(" - ERROR: unable to check volume groups on node %s." %
(node,))
feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
bad = True
- if not node_result:
- feedback_fn(" - ERROR: unable to verify node %s." % (node,))
- return True
-
# checks config file checksum
- # checks ssh to any
- if 'filelist' not in node_result:
+ remote_cksum = node_result.get(constants.NV_FILELIST, None)
+ if not isinstance(remote_cksum, dict):
bad = True
feedback_fn(" - ERROR: node hasn't returned file checksum data")
else:
- remote_cksum = node_result['filelist']
for file_name in file_list:
+ node_is_mc = nodeinfo.master_candidate
+ must_have_file = file_name not in master_files
if file_name not in remote_cksum:
- bad = True
- feedback_fn(" - ERROR: file '%s' missing" % file_name)
+ if node_is_mc or must_have_file:
+ bad = True
+ feedback_fn(" - ERROR: file '%s' missing" % file_name)
elif remote_cksum[file_name] != local_cksum[file_name]:
- bad = True
- feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
+ if node_is_mc or must_have_file:
+ bad = True
+ feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
+ else:
+ # not candidate and this is not a must-have file
+ bad = True
+ feedback_fn(" - ERROR: non master-candidate has old/wrong file"
+ " '%s'" % file_name)
+ else:
+ # all good, except non-master/non-must have combination
+ if not node_is_mc and not must_have_file:
+ feedback_fn(" - ERROR: file '%s' should not exist on non master"
+ " candidates" % file_name)
+
+ # checks ssh to any
- if 'nodelist' not in node_result:
+ if constants.NV_NODELIST not in node_result:
bad = True
feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
else:
- if node_result['nodelist']:
+ if node_result[constants.NV_NODELIST]:
bad = True
- for node in node_result['nodelist']:
+ for node in node_result[constants.NV_NODELIST]:
feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
- (node, node_result['nodelist'][node]))
- if 'node-net-test' not in node_result:
+ (node, node_result[constants.NV_NODELIST][node]))
+
+ if constants.NV_NODENETTEST not in node_result:
bad = True
feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
else:
- if node_result['node-net-test']:
+ if node_result[constants.NV_NODENETTEST]:
bad = True
- nlist = utils.NiceSort(node_result['node-net-test'].keys())
+ nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
for node in nlist:
feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
- (node, node_result['node-net-test'][node]))
+ (node, node_result[constants.NV_NODENETTEST][node]))
- hyp_result = node_result.get('hypervisor', None)
+ hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
if isinstance(hyp_result, dict):
for hv_name, hv_result in hyp_result.iteritems():
if hv_result is not None:
# FIXME: verify OS list
# do local checksums
- file_names = []
+ master_files = [constants.CLUSTER_CONF_FILE]
+
+ file_names = ssconf.SimpleStore().GetFileList()
file_names.append(constants.SSL_CERT_FILE)
- file_names.append(constants.CLUSTER_CONF_FILE)
+ file_names.extend(master_files)
+
local_checksums = utils.FingerprintFiles(file_names)
feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
- all_volumeinfo = self.rpc.call_volume_list(nodelist, vg_name)
- all_instanceinfo = self.rpc.call_instance_list(nodelist, hypervisors)
- all_vglist = self.rpc.call_vg_list(nodelist)
node_verify_param = {
- 'filelist': file_names,
- 'nodelist': nodelist,
- 'hypervisor': hypervisors,
- 'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
- for node in nodeinfo]
+ constants.NV_FILELIST: file_names,
+ constants.NV_NODELIST: nodelist,
+ constants.NV_HYPERVISOR: hypervisors,
+ constants.NV_NODENETTEST: [(node.name, node.primary_ip,
+ node.secondary_ip) for node in nodeinfo],
+ constants.NV_LVLIST: vg_name,
+ constants.NV_INSTANCELIST: hypervisors,
+ constants.NV_VGLIST: None,
+ constants.NV_VERSION: None,
+ constants.NV_HVINFO: self.cfg.GetHypervisorType(),
}
all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
self.cfg.GetClusterName())
- all_rversion = self.rpc.call_version(nodelist)
- all_ninfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(),
- self.cfg.GetHypervisorType())
cluster = self.cfg.GetClusterInfo()
- for node in nodelist:
- feedback_fn("* Verifying node %s" % node)
- result = self._VerifyNode(node, file_names, local_checksums,
- all_vglist[node], all_nvinfo[node],
- all_rversion[node], feedback_fn)
- bad = bad or result
+ master_node = self.cfg.GetMasterNode()
+ for node_i in nodeinfo:
+ node = node_i.name
+ nresult = all_nvinfo[node].data
+
+ if node == master_node:
+ ntype = "master"
+ elif node_i.master_candidate:
+ ntype = "master candidate"
+ else:
+ ntype = "regular"
+ feedback_fn("* Verifying node %s (%s)" % (node, ntype))
- # node_volume
- volumeinfo = all_volumeinfo[node]
+ if all_nvinfo[node].failed or not isinstance(nresult, dict):
+ feedback_fn(" - ERROR: connection to %s failed" % (node,))
+ bad = True
+ continue
+
+ result = self._VerifyNode(node_i, file_names, local_checksums,
+ nresult, feedback_fn, master_files)
+ bad = bad or result
- if isinstance(volumeinfo, basestring):
+ lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
+ if isinstance(lvdata, basestring):
feedback_fn(" - ERROR: LVM problem on node %s: %s" %
- (node, volumeinfo[-400:].encode('string_escape')))
+ (node, lvdata.encode('string_escape')))
bad = True
node_volume[node] = {}
- elif not isinstance(volumeinfo, dict):
- feedback_fn(" - ERROR: connection to %s failed" % (node,))
+ elif not isinstance(lvdata, dict):
+ feedback_fn(" - ERROR: connection to %s failed (lvlist)" % (node,))
bad = True
continue
else:
- node_volume[node] = volumeinfo
+ node_volume[node] = lvdata
# node_instance
- nodeinstance = all_instanceinfo[node]
- if type(nodeinstance) != list:
- feedback_fn(" - ERROR: connection to %s failed" % (node,))
+ idata = nresult.get(constants.NV_INSTANCELIST, None)
+ if not isinstance(idata, list):
+ feedback_fn(" - ERROR: connection to %s failed (instancelist)" %
+ (node,))
bad = True
continue
- node_instance[node] = nodeinstance
+ node_instance[node] = idata
# node_info
- nodeinfo = all_ninfo[node]
+ nodeinfo = nresult.get(constants.NV_HVINFO, None)
if not isinstance(nodeinfo, dict):
- feedback_fn(" - ERROR: connection to %s failed" % (node,))
+ feedback_fn(" - ERROR: connection to %s failed (hvinfo)" % (node,))
bad = True
continue
try:
node_info[node] = {
"mfree": int(nodeinfo['memory_free']),
- "dfree": int(nodeinfo['vg_free']),
+ "dfree": int(nresult[constants.NV_VGLIST][vg_name]),
"pinst": [],
"sinst": [],
# dictionary holding all instances this node is secondary for,
return not bad
def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
- """Analize the post-hooks' result, handle it, and send some
+ """Analize the post-hooks' result
+
+ This method analyses the hook result, handles it, and sends some
nicely-formatted feedback back to the user.
- Args:
- phase: the hooks phase that has just been run
- hooks_results: the results of the multi-node hooks rpc call
- feedback_fn: function to send feedback back to the caller
- lu_result: previous Exec result
+ @param phase: one of L{constants.HOOKS_PHASE_POST} or
+ L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
+ @param hooks_results: the results of the multi-node hooks rpc call
+ @param feedback_fn: function used send feedback back to the caller
+ @param lu_result: previous Exec result
+ @return: the new Exec result, based on the previous result
+ and hook results
"""
# We only really run POST phase hooks, and are only interested in
for node_name in hooks_results:
show_node_header = True
res = hooks_results[node_name]
- if res is False or not isinstance(res, list):
- feedback_fn(" Communication failure")
+ if res.failed or res.data is False or not isinstance(res.data, list):
+ feedback_fn(" Communication failure in hooks execution")
lu_result = 1
continue
- for script, hkr, output in res:
+ for script, hkr, output in res.data:
if hkr == constants.HKR_FAIL:
# The node header is only shown once, if there are
# failing hooks on that node
for node in nodes:
# node_volume
lvs = node_lvs[node]
-
+ if lvs.failed:
+ self.LogWarning("Connection to node %s failed: %s" %
+ (node, lvs.data))
+ continue
+ lvs = lvs.data
if isinstance(lvs, basestring):
logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
res_nlvm[node] = lvs
# shutdown the master IP
master = self.cfg.GetMasterNode()
- if not self.rpc.call_node_stop_master(master, False):
+ result = self.rpc.call_node_stop_master(master, False)
+ if result.failed or not result.data:
raise errors.OpExecError("Could not disable the master role")
try:
- # modify the sstore
- # TODO: sstore
- ss.SetKey(ss.SS_MASTER_IP, ip)
- ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
-
- # Distribute updated ss config to all nodes
- myself = self.cfg.GetNodeInfo(master)
- dist_nodes = self.cfg.GetNodeList()
- if myself.name in dist_nodes:
- dist_nodes.remove(myself.name)
-
- logging.debug("Copying updated ssconf data to all nodes")
- for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
- fname = ss.KeyToFilename(keyname)
- result = self.rpc.call_upload_file(dist_nodes, fname)
- for to_node in dist_nodes:
- if not result[to_node]:
- self.LogWarning("Copy of file %s to node %s failed",
- fname, to_node)
+ cluster = self.cfg.GetClusterInfo()
+ cluster.cluster_name = clustername
+ cluster.master_ip = ip
+ self.cfg.Update(cluster)
+
+ # update the known hosts file
+ ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
+ node_list = self.cfg.GetNodeList()
+ try:
+ node_list.remove(master)
+ except ValueError:
+ pass
+ result = self.rpc.call_upload_file(node_list,
+ constants.SSH_KNOWN_HOSTS_FILE)
+ for to_node, to_result in result.iteritems():
+ if to_result.failed or not to_result.data:
+ logging.error("Copy of file %s to node %s failed", fname, to_node)
+
finally:
- if not self.rpc.call_node_start_master(master, False):
+ result = self.rpc.call_node_start_master(master, False)
+ if result.failed or not result.data:
self.LogWarning("Could not re-enable the master role on"
" the master, please restart manually.")
def _RecursiveCheckIfLVMBased(disk):
"""Check if the given disk or its children are lvm-based.
- Args:
- disk: ganeti.objects.Disk object
-
- Returns:
- boolean indicating whether a LD_LV dev_type was found or not
+ @type disk: L{objects.Disk}
+ @param disk: the disk to check
+ @rtype: booleean
+ @return: boolean indicating whether a LD_LV dev_type was found or not
"""
if disk.children:
_OP_REQP = []
REQ_BGL = False
+ def CheckParameters(self):
+ """Check parameters
+
+ """
+ if not hasattr(self.op, "candidate_pool_size"):
+ self.op.candidate_pool_size = None
+ if self.op.candidate_pool_size is not None:
+ try:
+ self.op.candidate_pool_size = int(self.op.candidate_pool_size)
+ except ValueError, err:
+ raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
+ str(err))
+ if self.op.candidate_pool_size < 1:
+ raise errors.OpPrereqError("At least one master candidate needed")
+
def ExpandNames(self):
# FIXME: in the future maybe other cluster params won't require checking on
# all nodes to be modified.
if self.op.vg_name:
vglist = self.rpc.call_vg_list(node_list)
for node in node_list:
- vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
+ if vglist[node].failed:
+ # ignoring down node
+ self.LogWarning("Node %s unreachable/error, ignoring" % node)
+ continue
+ vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
+ self.op.vg_name,
constants.MIN_VG_SIZE)
if vgstatus:
raise errors.OpPrereqError("Error on node '%s': %s" %
(node, vgstatus))
self.cluster = cluster = self.cfg.GetClusterInfo()
- # beparams changes do not need validation (we can't validate?),
- # but we still process here
+ # validate beparams changes
if self.op.beparams:
+ utils.CheckBEParams(self.op.beparams)
self.new_beparams = cluster.FillDict(
cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
if self.op.beparams:
self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
+ if self.op.candidate_pool_size is not None:
+ self.cluster.candidate_pool_size = self.op.candidate_pool_size
+
self.cfg.Update(self.cluster)
+ # we want to update nodes after the cluster so that if any errors
+ # happen, we have recorded and saved the cluster info
+ if self.op.candidate_pool_size is not None:
+ node_info = self.cfg.GetAllNodesInfo().values()
+ num_candidates = len([node for node in node_info
+ if node.master_candidate])
+ num_nodes = len(node_info)
+ if num_candidates < self.op.candidate_pool_size:
+ random.shuffle(node_info)
+ for node in node_info:
+ if num_candidates >= self.op.candidate_pool_size:
+ break
+ if node.master_candidate:
+ continue
+ node.master_candidate = True
+ self.LogInfo("Promoting node %s to master candidate", node.name)
+ self.cfg.Update(node)
+ self.context.ReaddNode(node)
+ num_candidates += 1
+ elif num_candidates > self.op.candidate_pool_size:
+ self.LogInfo("Note: more nodes are candidates (%d) than the new value"
+ " of candidate_pool_size (%d)" %
+ (num_candidates, self.op.candidate_pool_size))
+
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
"""Sleep and poll for an instance's disk to sync.
done = True
cumul_degraded = False
rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
- if not rstats:
+ if rstats.failed or not rstats.data:
lu.LogWarning("Can't get any data from node %s", node)
retries += 1
if retries >= 10:
" aborting." % node)
time.sleep(6)
continue
+ rstats = rstats.data
retries = 0
for i in range(len(rstats)):
mstat = rstats[i]
result = True
if on_primary or dev.AssembleOnSecondary():
rstats = lu.rpc.call_blockdev_find(node, dev)
- if not rstats:
+ if rstats.failed or not rstats.data:
logging.warning("Node %s: disk degraded, not found or node down", node)
result = False
else:
- result = result and (not rstats[idx])
+ result = result and (not rstats.data[idx])
if dev.children:
for child in dev.children:
result = result and _CheckDiskConsistency(lu, child, node, on_primary)
"""
_OP_REQP = ["output_fields", "names"]
REQ_BGL = False
+ _FIELDS_STATIC = utils.FieldSet()
+ _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
def ExpandNames(self):
if self.op.names:
raise errors.OpPrereqError("Selective OS query not supported")
- self.dynamic_fields = frozenset(["name", "valid", "node_status"])
- _CheckOutputFields(static=[],
- dynamic=self.dynamic_fields,
+ _CheckOutputFields(static=self._FIELDS_STATIC,
+ dynamic=self._FIELDS_DYNAMIC,
selected=self.op.output_fields)
# Lock all nodes, in shared mode
def _DiagnoseByOS(node_list, rlist):
"""Remaps a per-node return list into an a per-os per-node dictionary
- Args:
- node_list: a list with the names of all nodes
- rlist: a map with node names as keys and OS objects as values
+ @param node_list: a list with the names of all nodes
+ @param rlist: a map with node names as keys and OS objects as values
+
+ @rtype: dict
+ @returns: a dictionary with osnames as keys and as value another map, with
+ nodes as keys and list of OS objects as values, eg::
- Returns:
- map: a map with osnames as keys and as value another map, with
- nodes as
- keys and list of OS objects as values
- e.g. {"debian-etch": {"node1": [<object>,...],
- "node2": [<object>,]}
- }
+ {"debian-etch": {"node1": [<object>,...],
+ "node2": [<object>,]}
+ }
"""
all_os = {}
for node_name, nr in rlist.iteritems():
- if not nr:
+ if nr.failed or not nr.data:
continue
- for os_obj in nr:
+ for os_obj in nr.data:
if os_obj.name not in all_os:
# build a list of nodes for this os containing empty lists
# for each node in node_list
self.rpc.call_node_leave_cluster(node.name)
+ # Promote nodes to master candidate as needed
+ cp_size = self.cfg.GetClusterInfo().candidate_pool_size
+ node_info = self.cfg.GetAllNodesInfo().values()
+ num_candidates = len([n for n in node_info
+ if n.master_candidate])
+ num_nodes = len(node_info)
+ random.shuffle(node_info)
+ for node in node_info:
+ if num_candidates >= cp_size or num_candidates >= num_nodes:
+ break
+ if node.master_candidate:
+ continue
+ node.master_candidate = True
+ self.LogInfo("Promoting node %s to master candidate", node.name)
+ self.cfg.Update(node)
+ self.context.ReaddNode(node)
+ num_candidates += 1
+
class LUQueryNodes(NoHooksLU):
"""Logical unit for querying nodes.
"""
_OP_REQP = ["output_fields", "names"]
REQ_BGL = False
+ _FIELDS_DYNAMIC = utils.FieldSet(
+ "dtotal", "dfree",
+ "mtotal", "mnode", "mfree",
+ "bootid",
+ "ctotal",
+ )
+
+ _FIELDS_STATIC = utils.FieldSet(
+ "name", "pinst_cnt", "sinst_cnt",
+ "pinst_list", "sinst_list",
+ "pip", "sip", "tags",
+ "serial_no",
+ "master_candidate",
+ "master",
+ )
def ExpandNames(self):
- self.dynamic_fields = frozenset([
- "dtotal", "dfree",
- "mtotal", "mnode", "mfree",
- "bootid",
- "ctotal",
- ])
-
- self.static_fields = frozenset([
- "name", "pinst_cnt", "sinst_cnt",
- "pinst_list", "sinst_list",
- "pip", "sip", "tags",
- "serial_no",
- ])
-
- _CheckOutputFields(static=self.static_fields,
- dynamic=self.dynamic_fields,
+ _CheckOutputFields(static=self._FIELDS_STATIC,
+ dynamic=self._FIELDS_DYNAMIC,
selected=self.op.output_fields)
self.needed_locks = {}
else:
self.wanted = locking.ALL_SET
- self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
+ self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
if self.do_locking:
# if we don't request only static fields, we need to lock the nodes
self.needed_locks[locking.LEVEL_NODE] = self.wanted
# begin data gathering
- if self.dynamic_fields.intersection(self.op.output_fields):
+ if self.do_locking:
live_data = {}
node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
self.cfg.GetHypervisorType())
for name in nodenames:
- nodeinfo = node_data.get(name, None)
- if nodeinfo:
+ nodeinfo = node_data[name]
+ if not nodeinfo.failed and nodeinfo.data:
+ nodeinfo = nodeinfo.data
+ fn = utils.TryConvert
live_data[name] = {
- "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
- "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
- "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
- "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
- "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
- "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
- "bootid": nodeinfo['bootid'],
+ "mtotal": fn(int, nodeinfo.get('memory_total', None)),
+ "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
+ "mfree": fn(int, nodeinfo.get('memory_free', None)),
+ "dtotal": fn(int, nodeinfo.get('vg_size', None)),
+ "dfree": fn(int, nodeinfo.get('vg_free', None)),
+ "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
+ "bootid": nodeinfo.get('bootid', None),
}
else:
live_data[name] = {}
if secnode in node_to_secondary:
node_to_secondary[secnode].add(inst.name)
+ master_node = self.cfg.GetMasterNode()
+
# end data gathering
output = []
val = list(node.GetTags())
elif field == "serial_no":
val = node.serial_no
- elif field in self.dynamic_fields:
+ elif field == "master_candidate":
+ val = node.master_candidate
+ elif field == "master":
+ val = node.name == master_node
+ elif self._FIELDS_DYNAMIC.Matches(field):
val = live_data[node.name].get(field, None)
else:
raise errors.ParameterError(field)
"""
_OP_REQP = ["nodes", "output_fields"]
REQ_BGL = False
+ _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
+ _FIELDS_STATIC = utils.FieldSet("node")
def ExpandNames(self):
- _CheckOutputFields(static=["node"],
- dynamic=["phys", "vg", "name", "size", "instance"],
+ _CheckOutputFields(static=self._FIELDS_STATIC,
+ dynamic=self._FIELDS_DYNAMIC,
selected=self.op.output_fields)
self.needed_locks = {}
output = []
for node in nodenames:
- if node not in volumes or not volumes[node]:
+ if node not in volumes or volumes[node].failed or not volumes[node].data:
continue
- node_vols = volumes[node][:]
+ node_vols = volumes[node].data[:]
node_vols.sort(key=lambda vol: vol['dev'])
for vol in node_vols:
raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
" based ping to noded port")
+ cp_size = self.cfg.GetClusterInfo().candidate_pool_size
+ node_info = self.cfg.GetAllNodesInfo().values()
+ num_candidates = len([n for n in node_info
+ if n.master_candidate])
+ master_candidate = num_candidates < cp_size
+
self.new_node = objects.Node(name=node,
primary_ip=primary_ip,
- secondary_ip=secondary_ip)
+ secondary_ip=secondary_ip,
+ master_candidate=master_candidate)
def Exec(self, feedback_fn):
"""Adds the new node to the cluster.
# check connectivity
result = self.rpc.call_version([node])[node]
- if result:
- if constants.PROTOCOL_VERSION == result:
+ result.Raise()
+ if result.data:
+ if constants.PROTOCOL_VERSION == result.data:
logging.info("Communication to node %s fine, sw version %s match",
- node, result)
+ node, result.data)
else:
raise errors.OpExecError("Version mismatch master version %s,"
" node version %s" %
- (constants.PROTOCOL_VERSION, result))
+ (constants.PROTOCOL_VERSION, result.data))
else:
raise errors.OpExecError("Cannot get version from the new node")
keyarray[2],
keyarray[3], keyarray[4], keyarray[5])
- if not result:
+ if result.failed or not result.data:
raise errors.OpExecError("Cannot transfer ssh keys to the new node")
# Add node to our /etc/hosts, and add key to known_hosts
utils.AddHostToEtcHosts(new_node.name)
if new_node.secondary_ip != new_node.primary_ip:
- if not self.rpc.call_node_has_ip_address(new_node.name,
- new_node.secondary_ip):
+ result = self.rpc.call_node_has_ip_address(new_node.name,
+ new_node.secondary_ip)
+ if result.failed or not result.data:
raise errors.OpExecError("Node claims it doesn't have the secondary ip"
" you gave (%s). Please fix and re-run this"
" command." % new_node.secondary_ip)
result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
self.cfg.GetClusterName())
for verifier in node_verify_list:
- if not result[verifier]:
+ if result[verifier].failed or not result[verifier].data:
raise errors.OpExecError("Cannot communicate with %s's node daemon"
" for remote verification" % verifier)
- if result[verifier]['nodelist']:
- for failed in result[verifier]['nodelist']:
+ if result[verifier].data['nodelist']:
+ for failed in result[verifier].data['nodelist']:
feedback_fn("ssh/hostname verification failed %s -> %s" %
(verifier, result[verifier]['nodelist'][failed]))
raise errors.OpExecError("ssh/hostname verification failed.")
logging.debug("Copying hosts and known_hosts to all nodes")
for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
result = self.rpc.call_upload_file(dist_nodes, fname)
- for to_node in dist_nodes:
- if not result[to_node]:
+ for to_node, to_result in result.iteritems():
+ if to_result.failed or not to_result.data:
logging.error("Copy of file %s to node %s failed", fname, to_node)
to_copy = []
to_copy.append(constants.VNC_PASSWORD_FILE)
for fname in to_copy:
result = self.rpc.call_upload_file([node], fname)
- if not result[node]:
+ if result[node].failed or not result[node]:
logging.error("Could not copy file %s to node %s", fname, node)
if self.op.readd:
self.context.AddNode(new_node)
+class LUSetNodeParams(LogicalUnit):
+ """Modifies the parameters of a node.
+
+ """
+ HPATH = "node-modify"
+ HTYPE = constants.HTYPE_NODE
+ _OP_REQP = ["node_name"]
+ REQ_BGL = False
+
+ def CheckArguments(self):
+ node_name = self.cfg.ExpandNodeName(self.op.node_name)
+ if node_name is None:
+ raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
+ self.op.node_name = node_name
+ if not hasattr(self.op, 'master_candidate'):
+ raise errors.OpPrereqError("Please pass at least one modification")
+ self.op.master_candidate = bool(self.op.master_candidate)
+
+ def ExpandNames(self):
+ self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
+
+ def BuildHooksEnv(self):
+ """Build hooks env.
+
+ This runs on the master node.
+
+ """
+ env = {
+ "OP_TARGET": self.op.node_name,
+ "MASTER_CANDIDATE": str(self.op.master_candidate),
+ }
+ nl = [self.cfg.GetMasterNode(),
+ self.op.node_name]
+ return env, nl, nl
+
+ def CheckPrereq(self):
+ """Check prerequisites.
+
+ This only checks the instance list against the existing names.
+
+ """
+ force = self.force = self.op.force
+
+ if self.op.master_candidate == False:
+ if self.op.node_name == self.cfg.GetMasterNode():
+ raise errors.OpPrereqError("The master node has to be a"
+ " master candidate")
+ cp_size = self.cfg.GetClusterInfo().candidate_pool_size
+ node_info = self.cfg.GetAllNodesInfo().values()
+ num_candidates = len([node for node in node_info
+ if node.master_candidate])
+ if num_candidates <= cp_size:
+ msg = ("Not enough master candidates (desired"
+ " %d, new value will be %d)" % (cp_size, num_candidates-1))
+ if force:
+ self.LogWarning(msg)
+ else:
+ raise errors.OpPrereqError(msg)
+
+ return
+
+ def Exec(self, feedback_fn):
+ """Modifies a node.
+
+ """
+ node = self.cfg.GetNodeInfo(self.op.node_name)
+
+ result = []
+
+ if self.op.master_candidate is not None:
+ node.master_candidate = self.op.master_candidate
+ result.append(("master_candidate", str(self.op.master_candidate)))
+
+ # this will trigger configuration file update, if needed
+ self.cfg.Update(node)
+ # this will trigger job queue propagation or cleanup
+ if self.op.node_name != self.cfg.GetMasterNode():
+ self.context.ReaddNode(node)
+
+ return result
+
+
class LUQueryClusterInfo(NoHooksLU):
"""Query cluster configuration.
"""
_OP_REQP = []
- REQ_MASTER = False
REQ_BGL = False
def ExpandNames(self):
"enabled_hypervisors": cluster.enabled_hypervisors,
"hvparams": cluster.hvparams,
"beparams": cluster.beparams,
+ "candidate_pool_size": cluster.candidate_pool_size,
}
return result
"""
_OP_REQP = []
REQ_BGL = False
+ _FIELDS_DYNAMIC = utils.FieldSet()
+ _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
def ExpandNames(self):
self.needed_locks = {}
- static_fields = ["cluster_name", "master_node", "drain_flag"]
- _CheckOutputFields(static=static_fields,
- dynamic=[],
+ _CheckOutputFields(static=self._FIELDS_STATIC,
+ dynamic=self._FIELDS_DYNAMIC,
selected=self.op.output_fields)
def CheckPrereq(self):
This sets up the block devices on all nodes.
- Args:
- instance: a ganeti.objects.Instance object
- ignore_secondaries: if true, errors on secondary nodes won't result
- in an error return from the function
+ @type lu: L{LogicalUnit}
+ @param lu: the logical unit on whose behalf we execute
+ @type instance: L{objects.Instance}
+ @param instance: the instance for whose disks we assemble
+ @type ignore_secondaries: boolean
+ @param ignore_secondaries: if true, errors on secondary nodes
+ won't result in an error return from the function
+ @return: False if the operation failed, otherwise a list of
+ (host, instance_visible_name, node_visible_name)
+ with the mapping from node devices to instance devices
- Returns:
- false if the operation failed
- list of (host, instance_visible_name, node_visible_name) if the operation
- suceeded with the mapping from node devices to instance devices
"""
device_info = []
disks_ok = True
for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
lu.cfg.SetDiskID(node_disk, node)
result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
- if not result:
+ if result.failed or not result:
lu.proc.LogWarning("Could not prepare block device %s on node %s"
" (is_primary=False, pass=1)",
inst_disk.iv_name, node)
continue
lu.cfg.SetDiskID(node_disk, node)
result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
- if not result:
+ if result.failed or not result:
lu.proc.LogWarning("Could not prepare block device %s on node %s"
" (is_primary=True, pass=2)",
inst_disk.iv_name, node)
ins_l = lu.rpc.call_instance_list([instance.primary_node],
[instance.hypervisor])
ins_l = ins_l[instance.primary_node]
- if not type(ins_l) is list:
+ if ins_l.failed or not isinstance(ins_l.data, list):
raise errors.OpExecError("Can't contact node '%s'" %
instance.primary_node)
- if instance.name in ins_l:
+ if instance.name in ins_l.data:
raise errors.OpExecError("Instance is running, can't shutdown"
" block devices.")
for disk in instance.disks:
for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
lu.cfg.SetDiskID(top_disk, node)
- if not lu.rpc.call_blockdev_shutdown(node, top_disk):
+ result = lu.rpc.call_blockdev_shutdown(node, top_disk)
+ if result.failed or not result.data:
logging.error("Could not shutdown block device %s on node %s",
disk.iv_name, node)
if not ignore_primary or node != instance.primary_node:
"""
nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
- if not nodeinfo or not isinstance(nodeinfo, dict):
- raise errors.OpPrereqError("Could not contact node %s for resource"
- " information" % (node,))
-
- free_mem = nodeinfo[node].get('memory_free')
+ nodeinfo[node].Raise()
+ free_mem = nodeinfo[node].data.get('memory_free')
if not isinstance(free_mem, int):
raise errors.OpPrereqError("Can't compute free memory on node %s, result"
" was '%s'" % (node, free_mem))
def ExpandNames(self):
self._ExpandAndLockInstance()
- self.needed_locks[locking.LEVEL_NODE] = []
- self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
-
- def DeclareLocks(self, level):
- if level == locking.LEVEL_NODE:
- self._LockInstancesNodes()
def BuildHooksEnv(self):
"""Build hooks env.
_StartInstanceDisks(self, instance, force)
- if not self.rpc.call_instance_start(node_current, instance, extra_args):
+ result = self.rpc.call_instance_start(node_current, instance, extra_args)
+ if result.failed or not result.data:
_ShutdownInstanceDisks(self, instance)
raise errors.OpExecError("Could not start instance")
constants.INSTANCE_REBOOT_HARD,
constants.INSTANCE_REBOOT_FULL))
self._ExpandAndLockInstance()
- self.needed_locks[locking.LEVEL_NODE] = []
- self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
-
- def DeclareLocks(self, level):
- if level == locking.LEVEL_NODE:
- primary_only = not constants.INSTANCE_REBOOT_FULL
- self._LockInstancesNodes(primary_only=primary_only)
def BuildHooksEnv(self):
"""Build hooks env.
if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
constants.INSTANCE_REBOOT_HARD]:
- if not self.rpc.call_instance_reboot(node_current, instance,
- reboot_type, extra_args):
+ result = self.rpc.call_instance_reboot(node_current, instance,
+ reboot_type, extra_args)
+ if result.failed or not result.data:
raise errors.OpExecError("Could not reboot instance")
else:
if not self.rpc.call_instance_shutdown(node_current, instance):
raise errors.OpExecError("could not shutdown instance for full reboot")
_ShutdownInstanceDisks(self, instance)
_StartInstanceDisks(self, instance, ignore_secondaries)
- if not self.rpc.call_instance_start(node_current, instance, extra_args):
+ result = self.rpc.call_instance_start(node_current, instance, extra_args)
+ if result.failed or not result.data:
_ShutdownInstanceDisks(self, instance)
raise errors.OpExecError("Could not start instance for full reboot")
def ExpandNames(self):
self._ExpandAndLockInstance()
- self.needed_locks[locking.LEVEL_NODE] = []
- self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
-
- def DeclareLocks(self, level):
- if level == locking.LEVEL_NODE:
- self._LockInstancesNodes()
def BuildHooksEnv(self):
"""Build hooks env.
instance = self.instance
node_current = instance.primary_node
self.cfg.MarkInstanceDown(instance.name)
- if not self.rpc.call_instance_shutdown(node_current, instance):
+ result = self.rpc.call_instance_shutdown(node_current, instance)
+ if result.failed or not result.data:
self.proc.LogWarning("Could not shutdown instance")
_ShutdownInstanceDisks(self, instance)
def ExpandNames(self):
self._ExpandAndLockInstance()
- self.needed_locks[locking.LEVEL_NODE] = []
- self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
-
- def DeclareLocks(self, level):
- if level == locking.LEVEL_NODE:
- self._LockInstancesNodes()
def BuildHooksEnv(self):
"""Build hooks env.
remote_info = self.rpc.call_instance_info(instance.primary_node,
instance.name,
instance.hypervisor)
- if remote_info:
+ if remote_info.failed or remote_info.data:
raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
(self.op.instance_name,
instance.primary_node))
if pnode is None:
raise errors.OpPrereqError("Primary node '%s' is unknown" %
self.op.pnode)
- os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
- if not os_obj:
+ result = self.rpc.call_os_get(pnode.name, self.op.os_type)
+ result.Raise()
+ if not isinstance(result.data, objects.OS):
raise errors.OpPrereqError("OS '%s' not in supported OS list for"
" primary node" % self.op.os_type)
_StartInstanceDisks(self, inst, None)
try:
feedback_fn("Running the instance OS create scripts...")
- if not self.rpc.call_instance_os_add(inst.primary_node, inst):
+ result = self.rpc.call_instance_os_add(inst.primary_node, inst)
+ result.Raise()
+ if not result.data:
raise errors.OpExecError("Could not install OS for instance %s"
" on node %s" %
(inst.name, inst.primary_node))
remote_info = self.rpc.call_instance_info(instance.primary_node,
instance.name,
instance.hypervisor)
- if remote_info:
+ remote_info.Raise()
+ if remote_info.data:
raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
(self.op.instance_name,
instance.primary_node))
self.cfg.RenameInstance(inst.name, self.op.new_name)
# Change the instance lock. This is definitely safe while we hold the BGL
- self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
+ self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
# re-read the instance from the configuration after rename
result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
old_file_storage_dir,
new_file_storage_dir)
-
- if not result:
+ result.Raise()
+ if not result.data:
raise errors.OpExecError("Could not connect to node '%s' to rename"
" directory '%s' to '%s' (but the instance"
" has been renamed in Ganeti)" % (
inst.primary_node, old_file_storage_dir,
new_file_storage_dir))
- if not result[0]:
+ if not result.data[0]:
raise errors.OpExecError("Could not rename directory '%s' to '%s'"
" (but the instance has been renamed in"
" Ganeti)" % (old_file_storage_dir,
_StartInstanceDisks(self, inst, None)
try:
- if not self.rpc.call_instance_run_rename(inst.primary_node, inst,
- old_name):
+ result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
+ old_name)
+ if result.failed or not result.data:
msg = ("Could not run OS rename script for instance %s on node %s"
" (but the instance has been renamed in Ganeti)" %
(inst.name, inst.primary_node))
logging.info("Shutting down instance %s on node %s",
instance.name, instance.primary_node)
- if not self.rpc.call_instance_shutdown(instance.primary_node, instance):
+ result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
+ if result.failed or not result.data:
if self.op.ignore_failures:
feedback_fn("Warning: can't shutdown instance")
else:
"""
_OP_REQP = ["output_fields", "names"]
REQ_BGL = False
+ _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
+ "admin_state", "admin_ram",
+ "disk_template", "ip", "mac", "bridge",
+ "sda_size", "sdb_size", "vcpus", "tags",
+ "network_port", "beparams",
+ "(disk).(size)/([0-9]+)",
+ "(disk).(sizes)",
+ "(nic).(mac|ip|bridge)/([0-9]+)",
+ "(nic).(macs|ips|bridges)",
+ "(disk|nic).(count)",
+ "serial_no", "hypervisor", "hvparams",] +
+ ["hv/%s" % name
+ for name in constants.HVS_PARAMETERS] +
+ ["be/%s" % name
+ for name in constants.BES_PARAMETERS])
+ _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
+
def ExpandNames(self):
- self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
- hvp = ["hv/%s" % name for name in constants.HVS_PARAMETERS]
- bep = ["be/%s" % name for name in constants.BES_PARAMETERS]
- self.static_fields = frozenset([
- "name", "os", "pnode", "snodes",
- "admin_state", "admin_ram",
- "disk_template", "ip", "mac", "bridge",
- "sda_size", "sdb_size", "vcpus", "tags",
- "network_port", "beparams",
- "serial_no", "hypervisor", "hvparams",
- ] + hvp + bep)
-
- _CheckOutputFields(static=self.static_fields,
- dynamic=self.dynamic_fields,
+ _CheckOutputFields(static=self._FIELDS_STATIC,
+ dynamic=self._FIELDS_DYNAMIC,
selected=self.op.output_fields)
self.needed_locks = {}
else:
self.wanted = locking.ALL_SET
- self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
+ self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
if self.do_locking:
self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
self.needed_locks[locking.LEVEL_NODE] = []
hv_list = list(set([inst.hypervisor for inst in instance_list]))
bad_nodes = []
- if self.dynamic_fields.intersection(self.op.output_fields):
+ if self.do_locking:
live_data = {}
node_data = self.rpc.call_all_instances_info(nodes, hv_list)
for name in nodes:
result = node_data[name]
- if result:
- live_data.update(result)
- elif result == False:
+ if result.failed:
bad_nodes.append(name)
- # else no instance is alive
+ else:
+ if result.data:
+ live_data.update(result.data)
+ # else no instance is alive
else:
live_data = dict([(name, {}) for name in instance_names])
i_hv = self.cfg.GetClusterInfo().FillHV(instance)
i_be = self.cfg.GetClusterInfo().FillBE(instance)
for field in self.op.output_fields:
+ st_match = self._FIELDS_STATIC.Matches(field)
if field == "name":
val = instance.name
elif field == "os":
elif field == "mac":
val = instance.nics[0].mac
elif field == "sda_size" or field == "sdb_size":
- disk = instance.FindDisk(field[:3])
- if disk is None:
+ idx = ord(field[2]) - ord('a')
+ try:
+ val = instance.FindDisk(idx).size
+ except errors.OpPrereqError:
val = None
- else:
- val = disk.size
elif field == "tags":
val = list(instance.GetTags())
elif field == "serial_no":
elif (field.startswith(BEPREFIX) and
field[len(BEPREFIX):] in constants.BES_PARAMETERS):
val = i_be.get(field[len(BEPREFIX):], None)
+ elif st_match and st_match.groups():
+ # matches a variable list
+ st_groups = st_match.groups()
+ if st_groups and st_groups[0] == "disk":
+ if st_groups[1] == "count":
+ val = len(instance.disks)
+ elif st_groups[1] == "sizes":
+ val = [disk.size for disk in instance.disks]
+ elif st_groups[1] == "size":
+ try:
+ val = instance.FindDisk(st_groups[2]).size
+ except errors.OpPrereqError:
+ val = None
+ else:
+ assert False, "Unhandled disk parameter"
+ elif st_groups[0] == "nic":
+ if st_groups[1] == "count":
+ val = len(instance.nics)
+ elif st_groups[1] == "macs":
+ val = [nic.mac for nic in instance.nics]
+ elif st_groups[1] == "ips":
+ val = [nic.ip for nic in instance.nics]
+ elif st_groups[1] == "bridges":
+ val = [nic.bridge for nic in instance.nics]
+ else:
+ # index-based item
+ nic_idx = int(st_groups[2])
+ if nic_idx >= len(instance.nics):
+ val = None
+ else:
+ if st_groups[1] == "mac":
+ val = instance.nics[nic_idx].mac
+ elif st_groups[1] == "ip":
+ val = instance.nics[nic_idx].ip
+ elif st_groups[1] == "bridge":
+ val = instance.nics[nic_idx].bridge
+ else:
+ assert False, "Unhandled NIC parameter"
+ else:
+ assert False, "Unhandled variable parameter"
else:
raise errors.ParameterError(field)
iout.append(val)
# check bridge existance
brlist = [nic.bridge for nic in instance.nics]
- if not self.rpc.call_bridges_exist(target_node, brlist):
+ result = self.rpc.call_bridges_exist(target_node, brlist)
+ result.Raise()
+ if not result.data:
raise errors.OpPrereqError("One or more target bridges %s does not"
" exist on destination node '%s'" %
(brlist, target_node))
logging.info("Shutting down instance %s on node %s",
instance.name, source_node)
- if not self.rpc.call_instance_shutdown(source_node, instance):
+ result = self.rpc.call_instance_shutdown(source_node, instance)
+ if result.failed or not result.data:
if self.op.ignore_consistency:
self.proc.LogWarning("Could not shutdown instance %s on node %s."
" Proceeding"
raise errors.OpExecError("Can't activate the instance's disks")
feedback_fn("* starting the instance on the target node")
- if not self.rpc.call_instance_start(target_node, instance, None):
+ result = self.rpc.call_instance_start(target_node, instance, None)
+ if result.failed or not result.data:
_ShutdownInstanceDisks(self, instance)
raise errors.OpExecError("Could not start instance %s on node %s." %
(instance.name, target_node))
lu.cfg.SetDiskID(device, node)
new_id = lu.rpc.call_blockdev_create(node, device, device.size,
instance.name, True, info)
- if not new_id:
+ if new_id.failed or not new_id.data:
return False
if device.physical_id is None:
device.physical_id = new_id
lu.cfg.SetDiskID(device, node)
new_id = lu.rpc.call_blockdev_create(node, device, device.size,
instance.name, False, info)
- if not new_id:
+ if new_id.failed or not new_id.data:
return False
if device.physical_id is None:
device.physical_id = new_id
def _GenerateDiskTemplate(lu, template_name,
instance_name, primary_node,
- secondary_nodes, disk_sz, swap_sz,
- file_storage_dir, file_driver):
+ secondary_nodes, disk_info,
+ file_storage_dir, file_driver,
+ base_index):
"""Generate the entire disk layout for a given template type.
"""
#TODO: compute space requirements
vgname = lu.cfg.GetVGName()
+ disk_count = len(disk_info)
+ disks = []
if template_name == constants.DT_DISKLESS:
- disks = []
+ pass
elif template_name == constants.DT_PLAIN:
if len(secondary_nodes) != 0:
raise errors.ProgrammerError("Wrong template configuration")
- names = _GenerateUniqueNames(lu, [".sda", ".sdb"])
- sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
- logical_id=(vgname, names[0]),
- iv_name = "sda")
- sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
- logical_id=(vgname, names[1]),
- iv_name = "sdb")
- disks = [sda_dev, sdb_dev]
+ names = _GenerateUniqueNames(lu, [".disk%d" % i
+ for i in range(disk_count)])
+ for idx, disk in enumerate(disk_info):
+ disk_index = idx + base_index
+ disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
+ logical_id=(vgname, names[idx]),
+ iv_name="disk/%d" % disk_index)
+ disks.append(disk_dev)
elif template_name == constants.DT_DRBD8:
if len(secondary_nodes) != 1:
raise errors.ProgrammerError("Wrong template configuration")
remote_node = secondary_nodes[0]
- (minor_pa, minor_pb,
- minor_sa, minor_sb) = lu.cfg.AllocateDRBDMinor(
- [primary_node, primary_node, remote_node, remote_node], instance_name)
-
- names = _GenerateUniqueNames(lu, [".sda_data", ".sda_meta",
- ".sdb_data", ".sdb_meta"])
- drbd_sda_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
- disk_sz, names[0:2], "sda",
- minor_pa, minor_sa)
- drbd_sdb_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
- swap_sz, names[2:4], "sdb",
- minor_pb, minor_sb)
- disks = [drbd_sda_dev, drbd_sdb_dev]
+ minors = lu.cfg.AllocateDRBDMinor(
+ [primary_node, remote_node] * len(disk_info), instance_name)
+
+ names = _GenerateUniqueNames(lu,
+ [".disk%d_%s" % (i, s)
+ for i in range(disk_count)
+ for s in ("data", "meta")
+ ])
+ for idx, disk in enumerate(disk_info):
+ disk_index = idx + base_index
+ disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
+ disk["size"], names[idx*2:idx*2+2],
+ "disk/%d" % disk_index,
+ minors[idx*2], minors[idx*2+1])
+ disks.append(disk_dev)
elif template_name == constants.DT_FILE:
if len(secondary_nodes) != 0:
raise errors.ProgrammerError("Wrong template configuration")
- file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
- iv_name="sda", logical_id=(file_driver,
- "%s/sda" % file_storage_dir))
- file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
- iv_name="sdb", logical_id=(file_driver,
- "%s/sdb" % file_storage_dir))
- disks = [file_sda_dev, file_sdb_dev]
+ for idx, disk in enumerate(disk_info):
+ disk_index = idx + base_index
+ disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
+ iv_name="disk/%d" % disk_index,
+ logical_id=(file_driver,
+ "%s/disk%d" % (file_storage_dir,
+ idx)))
+ disks.append(disk_dev)
else:
raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
return disks
This abstracts away some work from AddInstance.
- Args:
- instance: the instance object
-
- Returns:
- True or False showing the success of the creation process
+ @type lu: L{LogicalUnit}
+ @param lu: the logical unit on whose behalf we execute
+ @type instance: L{objects.Instance}
+ @param instance: the instance whose disks we should create
+ @rtype: boolean
+ @return: the success of the creation
"""
info = _GetInstanceInfoText(instance)
result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
file_storage_dir)
- if not result:
+ if result.failed or not result.data:
logging.error("Could not connect to node '%s'", instance.primary_node)
return False
- if not result[0]:
+ if not result.data[0]:
logging.error("Failed to create directory '%s'", file_storage_dir)
return False
+ # Note: this needs to be kept in sync with adding of disks in
+ # LUSetInstanceParams
for device in instance.disks:
logging.info("Creating volume %s for instance %s",
device.iv_name, instance.name)
be removed, the removal will continue with the other ones (compare
with `_CreateDisks()`).
- Args:
- instance: the instance object
-
- Returns:
- True or False showing the success of the removal proces
+ @type lu: L{LogicalUnit}
+ @param lu: the logical unit on whose behalf we execute
+ @type instance: L{objects.Instance}
+ @param instance: the instance whose disks we should remove
+ @rtype: boolean
+ @return: the success of the removal
"""
logging.info("Removing block devices for instance %s", instance.name)
for device in instance.disks:
for node, disk in device.ComputeNodeTree(instance.primary_node):
lu.cfg.SetDiskID(disk, node)
- if not lu.rpc.call_blockdev_remove(node, disk):
+ result = lu.rpc.call_blockdev_remove(node, disk)
+ if result.failed or not result.data:
lu.proc.LogWarning("Could not remove block device %s on node %s,"
" continuing anyway", device.iv_name, node)
result = False
if instance.disk_template == constants.DT_FILE:
file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
- if not lu.rpc.call_file_storage_dir_remove(instance.primary_node,
- file_storage_dir):
+ result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
+ file_storage_dir)
+ if result.failed or not result.data:
logging.error("Could not remove directory '%s'", file_storage_dir)
result = False
return result
-def _ComputeDiskSize(disk_template, disk_size, swap_size):
+def _ComputeDiskSize(disk_template, disks):
"""Compute disk size requirements in the volume group
- This is currently hard-coded for the two-drive layout.
-
"""
# Required free disk space as a function of disk and swap space
req_size_dict = {
constants.DT_DISKLESS: None,
- constants.DT_PLAIN: disk_size + swap_size,
- # 256 MB are added for drbd metadata, 128MB for each drbd device
- constants.DT_DRBD8: disk_size + swap_size + 256,
+ constants.DT_PLAIN: sum(d["size"] for d in disks),
+ # 128 MB are added for drbd metadata for each disk
+ constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
constants.DT_FILE: None,
}
hvname,
hvparams)
for node in nodenames:
- info = hvinfo.get(node, None)
- if not info or not isinstance(info, (tuple, list)):
+ info = hvinfo[node]
+ info.Raise()
+ if not info.data or not isinstance(info.data, (tuple, list)):
raise errors.OpPrereqError("Cannot get current information"
- " from node '%s' (%s)" % (node, info))
- if not info[0]:
+ " from node '%s' (%s)" % (node, info.data))
+ if not info.data[0]:
raise errors.OpPrereqError("Hypervisor parameter validation failed:"
- " %s" % info[1])
+ " %s" % info.data[1])
class LUCreateInstance(LogicalUnit):
"""
HPATH = "instance-add"
HTYPE = constants.HTYPE_INSTANCE
- _OP_REQP = ["instance_name", "disk_size",
- "disk_template", "swap_size", "mode", "start",
- "wait_for_sync", "ip_check", "mac",
+ _OP_REQP = ["instance_name", "disks", "disk_template",
+ "mode", "start",
+ "wait_for_sync", "ip_check", "nics",
"hvparams", "beparams"]
REQ_BGL = False
hv_type.CheckParameterSyntax(filled_hvp)
# fill and remember the beparams dict
+ utils.CheckBEParams(self.op.beparams)
self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
self.op.beparams)
self.add_locks[locking.LEVEL_INSTANCE] = instance_name
- # ip validity checks
- ip = getattr(self.op, "ip", None)
- if ip is None or ip.lower() == "none":
- inst_ip = None
- elif ip.lower() == constants.VALUE_AUTO:
- inst_ip = hostname1.ip
- else:
- if not utils.IsValidIP(ip):
- raise errors.OpPrereqError("given IP address '%s' doesn't look"
- " like a valid IP" % ip)
- inst_ip = ip
- self.inst_ip = self.op.ip = inst_ip
+ # NIC buildup
+ self.nics = []
+ for nic in self.op.nics:
+ # ip validity checks
+ ip = nic.get("ip", None)
+ if ip is None or ip.lower() == "none":
+ nic_ip = None
+ elif ip.lower() == constants.VALUE_AUTO:
+ nic_ip = hostname1.ip
+ else:
+ if not utils.IsValidIP(ip):
+ raise errors.OpPrereqError("Given IP address '%s' doesn't look"
+ " like a valid IP" % ip)
+ nic_ip = ip
+
+ # MAC address verification
+ mac = nic.get("mac", constants.VALUE_AUTO)
+ if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
+ if not utils.IsValidMac(mac.lower()):
+ raise errors.OpPrereqError("Invalid MAC address specified: %s" %
+ mac)
+ # bridge verification
+ bridge = nic.get("bridge", self.cfg.GetDefBridge())
+ self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
+
+ # disk checks/pre-build
+ self.disks = []
+ for disk in self.op.disks:
+ mode = disk.get("mode", constants.DISK_RDWR)
+ if mode not in constants.DISK_ACCESS_SET:
+ raise errors.OpPrereqError("Invalid disk access mode '%s'" %
+ mode)
+ size = disk.get("size", None)
+ if size is None:
+ raise errors.OpPrereqError("Missing disk size")
+ try:
+ size = int(size)
+ except ValueError:
+ raise errors.OpPrereqError("Invalid disk size '%s'" % size)
+ self.disks.append({"size": size, "mode": mode})
+
# used in CheckPrereq for ip ping check
self.check_ip = hostname1.ip
- # MAC address verification
- if self.op.mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
- if not utils.IsValidMac(self.op.mac.lower()):
- raise errors.OpPrereqError("invalid MAC address specified: %s" %
- self.op.mac)
-
# file storage checks
if (self.op.file_driver and
not self.op.file_driver in constants.FILE_DRIVER):
src_node = getattr(self.op, "src_node", None)
src_path = getattr(self.op, "src_path", None)
- if src_node is None or src_path is None:
- raise errors.OpPrereqError("Importing an instance requires source"
- " node and path options")
+ if src_path is None:
+ self.op.src_path = src_path = self.op.instance_name
- if not os.path.isabs(src_path):
- raise errors.OpPrereqError("The source path must be absolute")
-
- self.op.src_node = src_node = self._ExpandNode(src_node)
- if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
- self.needed_locks[locking.LEVEL_NODE].append(src_node)
+ if src_node is None:
+ self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+ self.op.src_node = None
+ if os.path.isabs(src_path):
+ raise errors.OpPrereqError("Importing an instance from an absolute"
+ " path requires a source node option.")
+ else:
+ self.op.src_node = src_node = self._ExpandNode(src_node)
+ if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
+ self.needed_locks[locking.LEVEL_NODE].append(src_node)
+ if not os.path.isabs(src_path):
+ self.op.src_path = src_path = \
+ os.path.join(constants.EXPORT_DIR, src_path)
else: # INSTANCE_CREATE
if getattr(self.op, "os_type", None) is None:
"""Run the allocator based on input opcode.
"""
- disks = [{"size": self.op.disk_size, "mode": "w"},
- {"size": self.op.swap_size, "mode": "w"}]
- nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
- "bridge": self.op.bridge}]
+ nics = [n.ToDict() for n in self.nics]
ial = IAllocator(self,
mode=constants.IALLOCATOR_MODE_ALLOC,
name=self.op.instance_name,
os=self.op.os_type,
vcpus=self.be_full[constants.BE_VCPUS],
mem_size=self.be_full[constants.BE_MEMORY],
- disks=disks,
+ disks=self.disks,
nics=nics,
+ hypervisor=self.op.hypervisor,
)
ial.Run(self.op.iallocator)
"""
env = {
"INSTANCE_DISK_TEMPLATE": self.op.disk_template,
- "INSTANCE_DISK_SIZE": self.op.disk_size,
- "INSTANCE_SWAP_SIZE": self.op.swap_size,
+ "INSTANCE_DISK_SIZE": ",".join(str(d["size"]) for d in self.disks),
"INSTANCE_ADD_MODE": self.op.mode,
}
if self.op.mode == constants.INSTANCE_IMPORT:
os_type=self.op.os_type,
memory=self.be_full[constants.BE_MEMORY],
vcpus=self.be_full[constants.BE_VCPUS],
- nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
+ nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
))
nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
src_node = self.op.src_node
src_path = self.op.src_path
- export_info = self.rpc.call_export_info(src_node, src_path)
-
- if not export_info:
+ if src_node is None:
+ exp_list = self.rpc.call_export_list(
+ self.acquired_locks[locking.LEVEL_NODE])
+ found = False
+ for node in exp_list:
+ if not exp_list[node].failed and src_path in exp_list[node].data:
+ found = True
+ self.op.src_node = src_node = node
+ self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
+ src_path)
+ break
+ if not found:
+ raise errors.OpPrereqError("No export found for relative path %s" %
+ src_path)
+
+ result = self.rpc.call_export_info(src_node, src_path)
+ result.Raise()
+ if not result.data:
raise errors.OpPrereqError("No export found in dir %s" % src_path)
+ export_info = result.data
if not export_info.has_section(constants.INISECT_EXP):
raise errors.ProgrammerError("Corrupted export config")
(ei_version, constants.EXPORT_VERSION))
# Check that the new instance doesn't have less disks than the export
- # TODO: substitute "2" with the actual number of disks requested
- instance_disks = 2
+ instance_disks = len(self.disks)
export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
if instance_disks < export_disks:
raise errors.OpPrereqError("Not enough disks to import."
" (instance: %d, export: %d)" %
- (2, export_disks))
+ (instance_disks, export_disks))
self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
disk_images = []
self.src_images = disk_images
- if self.op.mac == constants.VALUE_AUTO:
- old_name = export_info.get(constants.INISECT_INS, 'name')
- if self.op.instance_name == old_name:
- # FIXME: adjust every nic, when we'll be able to create instances
- # with more than one
- if int(export_info.get(constants.INISECT_INS, 'nic_count')) >= 1:
- self.op.mac = export_info.get(constants.INISECT_INS, 'nic_0_mac')
+ old_name = export_info.get(constants.INISECT_INS, 'name')
+ # FIXME: int() here could throw a ValueError on broken exports
+ exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
+ if self.op.instance_name == old_name:
+ for idx, nic in enumerate(self.nics):
+ if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
+ nic_mac_ini = 'nic%d_mac' % idx
+ nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
# ip ping checks (we use the same ip that was resolved in ExpandNames)
-
if self.op.start and not self.op.ip_check:
raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
" adding an instance in start mode")
raise errors.OpPrereqError("IP %s of instance %s already in use" %
(self.check_ip, self.op.instance_name))
- # bridge verification
- bridge = getattr(self.op, "bridge", None)
- if bridge is None:
- self.op.bridge = self.cfg.GetDefBridge()
- else:
- self.op.bridge = bridge
-
#### allocator run
if self.op.iallocator is not None:
nodenames = [pnode.name] + self.secondaries
req_size = _ComputeDiskSize(self.op.disk_template,
- self.op.disk_size, self.op.swap_size)
+ self.disks)
# Check lv size requirements
if req_size is not None:
nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
self.op.hypervisor)
for node in nodenames:
- info = nodeinfo.get(node, None)
+ info = nodeinfo[node]
+ info.Raise()
+ info = info.data
if not info:
raise errors.OpPrereqError("Cannot get current information"
" from node '%s'" % node)
_CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
# os verification
- os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
- if not os_obj:
+ result = self.rpc.call_os_get(pnode.name, self.op.os_type)
+ result.Raise()
+ if not isinstance(result.data, objects.OS):
raise errors.OpPrereqError("OS '%s' not in supported os list for"
" primary node" % self.op.os_type)
# bridge check on primary node
- if not self.rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
- raise errors.OpPrereqError("target bridge '%s' does not exist on"
- " destination node '%s'" %
- (self.op.bridge, pnode.name))
+ bridges = [n.bridge for n in self.nics]
+ result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
+ result.Raise()
+ if not result.data:
+ raise errors.OpPrereqError("One of the target bridges '%s' does not"
+ " exist on destination node '%s'" %
+ (",".join(bridges), pnode.name))
# memory check on primary node
if self.op.start:
instance = self.op.instance_name
pnode_name = self.pnode.name
- if self.op.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
- mac_address = self.cfg.GenerateMAC()
- else:
- mac_address = self.op.mac
-
- nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
- if self.inst_ip is not None:
- nic.ip = self.inst_ip
+ for nic in self.nics:
+ if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
+ nic.mac = self.cfg.GenerateMAC()
ht_kind = self.op.hypervisor
if ht_kind in constants.HTS_REQ_PORT:
disks = _GenerateDiskTemplate(self,
self.op.disk_template,
instance, pnode_name,
- self.secondaries, self.op.disk_size,
- self.op.swap_size,
+ self.secondaries,
+ self.disks,
file_storage_dir,
- self.op.file_driver)
+ self.op.file_driver,
+ 0)
iobj = objects.Instance(name=instance, os=self.op.os_type,
primary_node=pnode_name,
- nics=[nic], disks=disks,
+ nics=self.nics, disks=disks,
disk_template=self.op.disk_template,
status=self.instance_status,
network_port=network_port,
del self.remove_locks[locking.LEVEL_INSTANCE]
# Remove the temp. assignements for the instance's drbds
self.cfg.ReleaseDRBDMinors(instance)
+ # Unlock all the nodes
+ if self.op.mode == constants.INSTANCE_IMPORT:
+ nodes_keep = [self.op.src_node]
+ nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
+ if node != self.op.src_node]
+ self.context.glm.release(locking.LEVEL_NODE, nodes_release)
+ self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
+ else:
+ self.context.glm.release(locking.LEVEL_NODE)
+ del self.acquired_locks[locking.LEVEL_NODE]
if self.op.wait_for_sync:
disk_abort = not _WaitForSync(self, iobj)
if iobj.disk_template != constants.DT_DISKLESS:
if self.op.mode == constants.INSTANCE_CREATE:
feedback_fn("* running the instance OS create scripts...")
- if not self.rpc.call_instance_os_add(pnode_name, iobj):
- raise errors.OpExecError("could not add os for instance %s"
+ result = self.rpc.call_instance_os_add(pnode_name, iobj)
+ result.Raise()
+ if not result.data:
+ raise errors.OpExecError("Could not add os for instance %s"
" on node %s" %
(instance, pnode_name))
import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
src_node, src_images,
cluster_name)
- for idx, result in enumerate(import_result):
+ import_result.Raise()
+ for idx, result in enumerate(import_result.data):
if not result:
- self.LogWarning("Could not image %s for on instance %s, disk %d,"
- " on node %s" % (src_images[idx], instance, idx,
- pnode_name))
+ self.LogWarning("Could not import the image %s for instance"
+ " %s, disk %d, on node %s" %
+ (src_images[idx], instance, idx, pnode_name))
else:
# also checked in the prereq part
raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
if self.op.start:
logging.info("Starting instance %s on node %s", instance, pnode_name)
feedback_fn("* starting instance...")
- if not self.rpc.call_instance_start(pnode_name, iobj, None):
+ result = self.rpc.call_instance_start(pnode_name, iobj, None)
+ result.Raise()
+ if not result.data:
raise errors.OpExecError("Could not start instance")
node_insts = self.rpc.call_instance_list([node],
[instance.hypervisor])[node]
- if node_insts is False:
- raise errors.OpExecError("Can't connect to node %s." % node)
+ node_insts.Raise()
- if instance.name not in node_insts:
+ if instance.name not in node_insts.data:
raise errors.OpExecError("Instance %s is not running." % instance.name)
logging.debug("Connecting to console of %s on %s", instance.name, node)
else:
raise errors.ProgrammerError("Unhandled disk replace mode")
- for name in self.op.disks:
- if instance.FindDisk(name) is None:
- raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
- (name, instance.name))
+ if not self.op.disks:
+ self.op.disks = range(len(instance.disks))
+
+ for disk_idx in self.op.disks:
+ instance.FindDisk(disk_idx)
def _ExecD8DiskOnly(self, feedback_fn):
"""Replace a disk on the primary or secondary for dbrd8.
The algorithm for replace is quite complicated:
- - for each disk to be replaced:
- - create new LVs on the target node with unique names
- - detach old LVs from the drbd device
- - rename old LVs to name_replaced.<time_t>
- - rename new LVs to old LVs
- - attach the new LVs (with the old names now) to the drbd device
- - wait for sync across all devices
- - for each modified disk:
- - remove old LVs (which have the name name_replaces.<time_t>)
+
+ 1. for each disk to be replaced:
+
+ 1. create new LVs on the target node with unique names
+ 1. detach old LVs from the drbd device
+ 1. rename old LVs to name_replaced.<time_t>
+ 1. rename new LVs to old LVs
+ 1. attach the new LVs (with the old names now) to the drbd device
+
+ 1. wait for sync across all devices
+
+ 1. for each modified disk:
+
+ 1. remove old LVs (which have the name name_replaces.<time_t>)
Failures are not very well handled.
if not results:
raise errors.OpExecError("Can't list volume groups on the nodes")
for node in oth_node, tgt_node:
- res = results.get(node, False)
- if not res or my_vg not in res:
+ res = results[node]
+ if res.failed or not res.data or my_vg not in res.data:
raise errors.OpExecError("Volume group '%s' not found on %s" %
(my_vg, node))
- for dev in instance.disks:
- if not dev.iv_name in self.op.disks:
+ for idx, dev in enumerate(instance.disks):
+ if idx not in self.op.disks:
continue
for node in tgt_node, oth_node:
- info("checking %s on %s" % (dev.iv_name, node))
+ info("checking disk/%d on %s" % (idx, node))
cfg.SetDiskID(dev, node)
if not self.rpc.call_blockdev_find(node, dev):
- raise errors.OpExecError("Can't find device %s on node %s" %
- (dev.iv_name, node))
+ raise errors.OpExecError("Can't find disk/%d on node %s" %
+ (idx, node))
# Step: check other node consistency
self.proc.LogStep(2, steps_total, "check peer consistency")
- for dev in instance.disks:
- if not dev.iv_name in self.op.disks:
+ for idx, dev in enumerate(instance.disks):
+ if idx not in self.op.disks:
continue
- info("checking %s consistency on %s" % (dev.iv_name, oth_node))
+ info("checking disk/%d consistency on %s" % (idx, oth_node))
if not _CheckDiskConsistency(self, dev, oth_node,
oth_node==instance.primary_node):
raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
# Step: create new storage
self.proc.LogStep(3, steps_total, "allocate new storage")
- for dev in instance.disks:
- if not dev.iv_name in self.op.disks:
+ for idx, dev in enumerate(instance.disks):
+ if idx not in self.op.disks:
continue
size = dev.size
cfg.SetDiskID(dev, tgt_node)
- lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
+ lv_names = [".disk%d_%s" % (idx, suf)
+ for suf in ["data", "meta"]]
names = _GenerateUniqueNames(self, lv_names)
lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
logical_id=(vgname, names[0]))
self.proc.LogStep(4, steps_total, "change drbd configuration")
for dev, old_lvs, new_lvs in iv_names.itervalues():
info("detaching %s drbd from local storage" % dev.iv_name)
- if not self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
+ result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
+ result.Raise()
+ if not result.data:
raise errors.OpExecError("Can't detach drbd from local storage on node"
" %s for device %s" % (tgt_node, dev.iv_name))
#dev.children = []
rlist = []
for to_ren in old_lvs:
find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
- if find_res is not None: # device exists
+ if not find_res.failed and find_res.data is not None: # device exists
rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
info("renaming the old LVs on the target node")
- if not self.rpc.call_blockdev_rename(tgt_node, rlist):
+ result = self.rpc.call_blockdev_rename(tgt_node, rlist)
+ result.Raise()
+ if not result.data:
raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
# now we rename the new LVs to the old LVs
info("renaming the new LVs on the target node")
rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
- if not self.rpc.call_blockdev_rename(tgt_node, rlist):
+ result = self.rpc.call_blockdev_rename(tgt_node, rlist)
+ result.Raise()
+ if not result.data:
raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
for old, new in zip(old_lvs, new_lvs):
# now that the new lvs have the old name, we can add them to the device
info("adding new mirror component on %s" % tgt_node)
- if not self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
+ result =self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
+ if result.failed or not result.data:
for new_lv in new_lvs:
- if not self.rpc.call_blockdev_remove(tgt_node, new_lv):
+ result = self.rpc.call_blockdev_remove(tgt_node, new_lv)
+ if result.failed or not result.data:
warning("Can't rollback device %s", hint="manually cleanup unused"
" logical volumes")
raise errors.OpExecError("Can't add local storage to drbd")
# so check manually all the devices
for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
cfg.SetDiskID(dev, instance.primary_node)
- is_degr = self.rpc.call_blockdev_find(instance.primary_node, dev)[5]
- if is_degr:
+ result = self.rpc.call_blockdev_find(instance.primary_node, dev)
+ if result.failed or result.data[5]:
raise errors.OpExecError("DRBD device %s is degraded!" % name)
# Step: remove old storage
info("remove logical volumes for %s" % name)
for lv in old_lvs:
cfg.SetDiskID(lv, tgt_node)
- if not self.rpc.call_blockdev_remove(tgt_node, lv):
+ result = self.rpc.call_blockdev_remove(tgt_node, lv)
+ if result.failed or not result.data:
warning("Can't remove old LV", hint="manually remove unused LVs")
continue
info("checking volume groups")
my_vg = cfg.GetVGName()
results = self.rpc.call_vg_list([pri_node, new_node])
- if not results:
- raise errors.OpExecError("Can't list volume groups on the nodes")
for node in pri_node, new_node:
- res = results.get(node, False)
- if not res or my_vg not in res:
+ res = results[node]
+ if res.failed or not res.data or my_vg not in res.data:
raise errors.OpExecError("Volume group '%s' not found on %s" %
(my_vg, node))
- for dev in instance.disks:
- if not dev.iv_name in self.op.disks:
+ for idx, dev in enumerate(instance.disks):
+ if idx not in self.op.disks:
continue
- info("checking %s on %s" % (dev.iv_name, pri_node))
+ info("checking disk/%d on %s" % (idx, pri_node))
cfg.SetDiskID(dev, pri_node)
- if not self.rpc.call_blockdev_find(pri_node, dev):
- raise errors.OpExecError("Can't find device %s on node %s" %
- (dev.iv_name, pri_node))
+ result = self.rpc.call_blockdev_find(pri_node, dev)
+ result.Raise()
+ if not result.data:
+ raise errors.OpExecError("Can't find disk/%d on node %s" %
+ (idx, pri_node))
# Step: check other node consistency
self.proc.LogStep(2, steps_total, "check peer consistency")
- for dev in instance.disks:
- if not dev.iv_name in self.op.disks:
+ for idx, dev in enumerate(instance.disks):
+ if idx not in self.op.disks:
continue
- info("checking %s consistency on %s" % (dev.iv_name, pri_node))
+ info("checking disk/%d consistency on %s" % (idx, pri_node))
if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
raise errors.OpExecError("Primary node (%s) has degraded storage,"
" unsafe to replace the secondary" %
# Step: create new storage
self.proc.LogStep(3, steps_total, "allocate new storage")
- for dev in instance.disks:
+ for idx, dev in enumerate(instance.disks):
size = dev.size
- info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
+ info("adding new local storage on %s for disk/%d" %
+ (new_node, idx))
# since we *always* want to create this LV, we use the
# _Create...OnPrimary (which forces the creation), even if we
# are talking about the secondary node
" node '%s'" %
(new_lv.logical_id[1], new_node))
-
# Step 4: dbrd minors and drbd setups changes
# after this, we must manually remove the drbd minors on both the
# error and the success paths
instance.name)
logging.debug("Allocated minors %s" % (minors,))
self.proc.LogStep(4, steps_total, "changing drbd configuration")
- for dev, new_minor in zip(instance.disks, minors):
+ for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
size = dev.size
- info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
+ info("activating a new drbd on %s for disk/%d" % (new_node, idx))
# create new devices on new_node
if pri_node == dev.logical_id[0]:
new_logical_id = (pri_node, new_node,
new_logical_id = (new_node, pri_node,
dev.logical_id[2], new_minor, dev.logical_id[4],
dev.logical_id[5])
- iv_names[dev.iv_name] = (dev, dev.children, new_logical_id)
+ iv_names[idx] = (dev, dev.children, new_logical_id)
logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
new_logical_id)
new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
raise errors.OpExecError("Failed to create new DRBD on"
" node '%s'" % new_node)
- for dev in instance.disks:
+ for idx, dev in enumerate(instance.disks):
# we have new devices, shutdown the drbd on the old secondary
- info("shutting down drbd for %s on old node" % dev.iv_name)
+ info("shutting down drbd for disk/%d on old node" % idx)
cfg.SetDiskID(dev, old_node)
- if not self.rpc.call_blockdev_shutdown(old_node, dev):
- warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
+ result = self.rpc.call_blockdev_shutdown(old_node, dev)
+ if result.failed or not result.data:
+ warning("Failed to shutdown drbd for disk/%d on old node" % idx,
hint="Please cleanup this device manually as soon as possible")
info("detaching primary drbds from the network (=> standalone)")
done = 0
- for dev in instance.disks:
+ for idx, dev in enumerate(instance.disks):
cfg.SetDiskID(dev, pri_node)
# set the network part of the physical (unique in bdev terms) id
# to None, meaning detach from network
dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
# and 'find' the device, which will 'fix' it to match the
# standalone state
- if self.rpc.call_blockdev_find(pri_node, dev):
+ result = self.rpc.call_blockdev_find(pri_node, dev)
+ if not result.failed and result.data:
done += 1
else:
- warning("Failed to detach drbd %s from network, unusual case" %
- dev.iv_name)
+ warning("Failed to detach drbd disk/%d from network, unusual case" %
+ idx)
if not done:
# no detaches succeeded (very unlikely)
# and now perform the drbd attach
info("attaching primary drbds to new secondary (standalone => connected)")
failures = []
- for dev in instance.disks:
- info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
+ for idx, dev in enumerate(instance.disks):
+ info("attaching primary drbd for disk/%d to new secondary node" % idx)
# since the attach is smart, it's enough to 'find' the device,
# it will automatically activate the network, if the physical_id
# is correct
cfg.SetDiskID(dev, pri_node)
logging.debug("Disk to attach: %s", dev)
- if not self.rpc.call_blockdev_find(pri_node, dev):
- warning("can't attach drbd %s to new secondary!" % dev.iv_name,
+ result = self.rpc.call_blockdev_find(pri_node, dev)
+ if result.failed or not result.data:
+ warning("can't attach drbd disk/%d to new secondary!" % idx,
"please do a gnt-instance info to see the status of disks")
# this can fail as the old devices are degraded and _WaitForSync
_WaitForSync(self, instance, unlock=True)
# so check manually all the devices
- for name, (dev, old_lvs, _) in iv_names.iteritems():
+ for idx, (dev, old_lvs, _) in iv_names.iteritems():
cfg.SetDiskID(dev, pri_node)
- is_degr = self.rpc.call_blockdev_find(pri_node, dev)[5]
- if is_degr:
- raise errors.OpExecError("DRBD device %s is degraded!" % name)
+ result = self.rpc.call_blockdev_find(pri_node, dev)
+ result.Raise()
+ if result.data[5]:
+ raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
self.proc.LogStep(6, steps_total, "removing old storage")
- for name, (dev, old_lvs, _) in iv_names.iteritems():
- info("remove logical volumes for %s" % name)
+ for idx, (dev, old_lvs, _) in iv_names.iteritems():
+ info("remove logical volumes for disk/%d" % idx)
for lv in old_lvs:
cfg.SetDiskID(lv, old_node)
- if not self.rpc.call_blockdev_remove(old_node, lv):
+ result = self.rpc.call_blockdev_remove(old_node, lv)
+ if result.failed or not result.data:
warning("Can't remove LV on old secondary",
hint="Cleanup stale volumes by hand")
raise errors.OpPrereqError("Instance's disk layout does not support"
" growing.")
- if instance.FindDisk(self.op.disk) is None:
- raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
- (self.op.disk, instance.name))
+ self.disk = instance.FindDisk(self.op.disk)
nodenames = [instance.primary_node] + list(instance.secondary_nodes)
nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
instance.hypervisor)
for node in nodenames:
- info = nodeinfo.get(node, None)
- if not info:
+ info = nodeinfo[node]
+ if info.failed or not info.data:
raise errors.OpPrereqError("Cannot get current information"
" from node '%s'" % node)
- vg_free = info.get('vg_free', None)
+ vg_free = info.data.get('vg_free', None)
if not isinstance(vg_free, int):
raise errors.OpPrereqError("Can't compute free disk space on"
" node %s" % node)
- if self.op.amount > info['vg_free']:
+ if self.op.amount > vg_free:
raise errors.OpPrereqError("Not enough disk space on target node %s:"
" %d MiB available, %d MiB required" %
- (node, info['vg_free'], self.op.amount))
+ (node, vg_free, self.op.amount))
def Exec(self, feedback_fn):
"""Execute disk grow.
"""
instance = self.instance
- disk = instance.FindDisk(self.op.disk)
+ disk = self.disk
for node in (instance.secondary_nodes + (instance.primary_node,)):
self.cfg.SetDiskID(disk, node)
result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
- if (not result or not isinstance(result, (list, tuple)) or
- len(result) != 2):
- raise errors.OpExecError("grow request failed to node %s" % node)
- elif not result[0]:
- raise errors.OpExecError("grow request failed to node %s: %s" %
- (node, result[1]))
+ result.Raise()
+ if (not result.data or not isinstance(result.data, (list, tuple)) or
+ len(result.data) != 2):
+ raise errors.OpExecError("Grow request failed to node %s" % node)
+ elif not result.data[0]:
+ raise errors.OpExecError("Grow request failed to node %s: %s" %
+ (node, result.data[1]))
disk.RecordGrow(self.op.amount)
self.cfg.Update(instance)
if self.op.wait_for_sync:
if not static:
self.cfg.SetDiskID(dev, instance.primary_node)
dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
+ dev_pstatus.Raise()
+ dev_pstatus = dev_pstatus.data
else:
dev_pstatus = None
if snode and not static:
self.cfg.SetDiskID(dev, snode)
dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
+ dev_sstatus.Raise()
+ dev_sstatus = dev_sstatus.data
else:
dev_sstatus = None
"pstatus": dev_pstatus,
"sstatus": dev_sstatus,
"children": dev_children,
+ "mode": dev.mode,
}
return data
remote_info = self.rpc.call_instance_info(instance.primary_node,
instance.name,
instance.hypervisor)
+ remote_info.Raise()
+ remote_info = remote_info.data
if remote_info and "state" in remote_info:
remote_state = "up"
else:
"""
HPATH = "instance-modify"
HTYPE = constants.HTYPE_INSTANCE
- _OP_REQP = ["instance_name", "hvparams"]
+ _OP_REQP = ["instance_name"]
REQ_BGL = False
+ def CheckArguments(self):
+ if not hasattr(self.op, 'nics'):
+ self.op.nics = []
+ if not hasattr(self.op, 'disks'):
+ self.op.disks = []
+ if not hasattr(self.op, 'beparams'):
+ self.op.beparams = {}
+ if not hasattr(self.op, 'hvparams'):
+ self.op.hvparams = {}
+ self.op.force = getattr(self.op, "force", False)
+ if not (self.op.nics or self.op.disks or
+ self.op.hvparams or self.op.beparams):
+ raise errors.OpPrereqError("No changes submitted")
+
+ utils.CheckBEParams(self.op.beparams)
+
+ # Disk validation
+ disk_addremove = 0
+ for disk_op, disk_dict in self.op.disks:
+ if disk_op == constants.DDM_REMOVE:
+ disk_addremove += 1
+ continue
+ elif disk_op == constants.DDM_ADD:
+ disk_addremove += 1
+ else:
+ if not isinstance(disk_op, int):
+ raise errors.OpPrereqError("Invalid disk index")
+ if disk_op == constants.DDM_ADD:
+ mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
+ if mode not in (constants.DISK_RDONLY, constants.DISK_RDWR):
+ raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
+ size = disk_dict.get('size', None)
+ if size is None:
+ raise errors.OpPrereqError("Required disk parameter size missing")
+ try:
+ size = int(size)
+ except ValueError, err:
+ raise errors.OpPrereqError("Invalid disk size parameter: %s" %
+ str(err))
+ disk_dict['size'] = size
+ else:
+ # modification of disk
+ if 'size' in disk_dict:
+ raise errors.OpPrereqError("Disk size change not possible, use"
+ " grow-disk")
+
+ if disk_addremove > 1:
+ raise errors.OpPrereqError("Only one disk add or remove operation"
+ " supported at a time")
+
+ # NIC validation
+ nic_addremove = 0
+ for nic_op, nic_dict in self.op.nics:
+ if nic_op == constants.DDM_REMOVE:
+ nic_addremove += 1
+ continue
+ elif nic_op == constants.DDM_ADD:
+ nic_addremove += 1
+ else:
+ if not isinstance(nic_op, int):
+ raise errors.OpPrereqError("Invalid nic index")
+
+ # nic_dict should be a dict
+ nic_ip = nic_dict.get('ip', None)
+ if nic_ip is not None:
+ if nic_ip.lower() == "none":
+ nic_dict['ip'] = None
+ else:
+ if not utils.IsValidIP(nic_ip):
+ raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
+ # we can only check None bridges and assign the default one
+ nic_bridge = nic_dict.get('bridge', None)
+ if nic_bridge is None:
+ nic_dict['bridge'] = self.cfg.GetDefBridge()
+ # but we can validate MACs
+ nic_mac = nic_dict.get('mac', None)
+ if nic_mac is not None:
+ if self.cfg.IsMacInUse(nic_mac):
+ raise errors.OpPrereqError("MAC address %s already in use"
+ " in cluster" % nic_mac)
+ if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
+ if not utils.IsValidMac(nic_mac):
+ raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
+ if nic_addremove > 1:
+ raise errors.OpPrereqError("Only one NIC add or remove operation"
+ " supported at a time")
+
def ExpandNames(self):
self._ExpandAndLockInstance()
self.needed_locks[locking.LEVEL_NODE] = []
self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
-
def DeclareLocks(self, level):
if level == locking.LEVEL_NODE:
self._LockInstancesNodes()
args['memory'] = self.be_new[constants.BE_MEMORY]
if constants.BE_VCPUS in self.be_new:
args['vcpus'] = self.be_new[constants.BE_VCPUS]
- if self.do_ip or self.do_bridge or self.mac:
- if self.do_ip:
- ip = self.ip
- else:
- ip = self.instance.nics[0].ip
- if self.bridge:
- bridge = self.bridge
- else:
- bridge = self.instance.nics[0].bridge
- if self.mac:
- mac = self.mac
- else:
- mac = self.instance.nics[0].mac
- args['nics'] = [(ip, bridge, mac)]
+ # FIXME: readd disk/nic changes
env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
nl = [self.cfg.GetMasterNode(),
self.instance.primary_node] + list(self.instance.secondary_nodes)
This only checks the instance list against the existing names.
"""
- # FIXME: all the parameters could be checked before, in ExpandNames, or in
- # a separate CheckArguments function, if we implement one, so the operation
- # can be aborted without waiting for any lock, should it have an error...
- self.ip = getattr(self.op, "ip", None)
- self.mac = getattr(self.op, "mac", None)
- self.bridge = getattr(self.op, "bridge", None)
- self.kernel_path = getattr(self.op, "kernel_path", None)
- self.initrd_path = getattr(self.op, "initrd_path", None)
- self.force = getattr(self.op, "force", None)
- all_parms = [self.ip, self.bridge, self.mac]
- if (all_parms.count(None) == len(all_parms) and
- not self.op.hvparams and
- not self.op.beparams):
- raise errors.OpPrereqError("No changes submitted")
- for item in (constants.BE_MEMORY, constants.BE_VCPUS):
- val = self.op.beparams.get(item, None)
- if val is not None:
- try:
- val = int(val)
- except ValueError, err:
- raise errors.OpPrereqError("Invalid %s size: %s" % (item, str(err)))
- self.op.beparams[item] = val
- if self.ip is not None:
- self.do_ip = True
- if self.ip.lower() == "none":
- self.ip = None
- else:
- if not utils.IsValidIP(self.ip):
- raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
- else:
- self.do_ip = False
- self.do_bridge = (self.bridge is not None)
- if self.mac is not None:
- if self.cfg.IsMacInUse(self.mac):
- raise errors.OpPrereqError('MAC address %s already in use in cluster' %
- self.mac)
- if not utils.IsValidMac(self.mac):
- raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
+ force = self.force = self.op.force
# checking the new params on the primary/secondary nodes
if self.op.hvparams:
i_hvdict = copy.deepcopy(instance.hvparams)
for key, val in self.op.hvparams.iteritems():
- if val is None:
+ if val == constants.VALUE_DEFAULT:
try:
del i_hvdict[key]
except KeyError:
pass
+ elif val == constants.VALUE_NONE:
+ i_hvdict[key] = None
else:
i_hvdict[key] = val
cluster = self.cfg.GetClusterInfo()
if self.op.beparams:
i_bedict = copy.deepcopy(instance.beparams)
for key, val in self.op.beparams.iteritems():
- if val is None:
+ if val == constants.VALUE_DEFAULT:
try:
del i_bedict[key]
except KeyError:
self.be_new = be_new # the new actual values
self.be_inst = i_bedict # the new dict (without defaults)
else:
- self.hv_new = self.hv_inst = {}
+ self.be_new = self.be_inst = {}
self.warn = []
instance.hypervisor)
nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
instance.hypervisor)
-
- if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
+ if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
# Assume the primary node is unreachable and go ahead
self.warn.append("Can't get info from primary node %s" % pnode)
else:
- if instance_info:
- current_mem = instance_info['memory']
+ if not instance_info.failed and instance_info.data:
+ current_mem = instance_info.data['memory']
else:
# Assume instance not running
# (there is a slight race condition here, but it's not very probable,
# and we have no other way to check)
current_mem = 0
miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
- nodeinfo[pnode]['memory_free'])
+ nodeinfo[pnode].data['memory_free'])
if miss_mem > 0:
raise errors.OpPrereqError("This change will prevent the instance"
" from starting, due to %d MB of memory"
" missing on its primary node" % miss_mem)
if be_new[constants.BE_AUTO_BALANCE]:
- for node in instance.secondary_nodes:
- if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
+ for node, nres in instance.secondary_nodes.iteritems():
+ if nres.failed or not isinstance(nres.data, dict):
self.warn.append("Can't get info from secondary node %s" % node)
- elif be_new[constants.BE_MEMORY] > nodeinfo[node]['memory_free']:
+ elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
self.warn.append("Not enough memory to failover instance to"
" secondary node %s" % node)
+ # NIC processing
+ for nic_op, nic_dict in self.op.nics:
+ if nic_op == constants.DDM_REMOVE:
+ if not instance.nics:
+ raise errors.OpPrereqError("Instance has no NICs, cannot remove")
+ continue
+ if nic_op != constants.DDM_ADD:
+ # an existing nic
+ if nic_op < 0 or nic_op >= len(instance.nics):
+ raise errors.OpPrereqError("Invalid NIC index %s, valid values"
+ " are 0 to %d" %
+ (nic_op, len(instance.nics)))
+ nic_bridge = nic_dict.get('bridge', None)
+ if nic_bridge is not None:
+ if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
+ msg = ("Bridge '%s' doesn't exist on one of"
+ " the instance nodes" % nic_bridge)
+ if self.force:
+ self.warn.append(msg)
+ else:
+ raise errors.OpPrereqError(msg)
+
+ # DISK processing
+ if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
+ raise errors.OpPrereqError("Disk operations not supported for"
+ " diskless instances")
+ for disk_op, disk_dict in self.op.disks:
+ if disk_op == constants.DDM_REMOVE:
+ if len(instance.disks) == 1:
+ raise errors.OpPrereqError("Cannot remove the last disk of"
+ " an instance")
+ ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
+ ins_l = ins_l[pnode]
+ if not type(ins_l) is list:
+ raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
+ if instance.name in ins_l:
+ raise errors.OpPrereqError("Instance is running, can't remove"
+ " disks.")
+
+ if (disk_op == constants.DDM_ADD and
+ len(instance.nics) >= constants.MAX_DISKS):
+ raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
+ " add more" % constants.MAX_DISKS)
+ if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
+ # an existing disk
+ if disk_op < 0 or disk_op >= len(instance.disks):
+ raise errors.OpPrereqError("Invalid disk index %s, valid values"
+ " are 0 to %d" %
+ (disk_op, len(instance.disks)))
+
return
def Exec(self, feedback_fn):
"""Modifies an instance.
All parameters take effect only at the next restart of the instance.
+
"""
# Process here the warnings from CheckPrereq, as we don't have a
# feedback_fn there.
result = []
instance = self.instance
- if self.do_ip:
- instance.nics[0].ip = self.ip
- result.append(("ip", self.ip))
- if self.bridge:
- instance.nics[0].bridge = self.bridge
- result.append(("bridge", self.bridge))
- if self.mac:
- instance.nics[0].mac = self.mac
- result.append(("mac", self.mac))
+ # disk changes
+ for disk_op, disk_dict in self.op.disks:
+ if disk_op == constants.DDM_REMOVE:
+ # remove the last disk
+ device = instance.disks.pop()
+ device_idx = len(instance.disks)
+ for node, disk in device.ComputeNodeTree(instance.primary_node):
+ self.cfg.SetDiskID(disk, node)
+ result = self.rpc.call_blockdev_remove(node, disk)
+ if result.failed or not result.data:
+ self.proc.LogWarning("Could not remove disk/%d on node %s,"
+ " continuing anyway", device_idx, node)
+ result.append(("disk/%d" % device_idx, "remove"))
+ elif disk_op == constants.DDM_ADD:
+ # add a new disk
+ if instance.disk_template == constants.DT_FILE:
+ file_driver, file_path = instance.disks[0].logical_id
+ file_path = os.path.dirname(file_path)
+ else:
+ file_driver = file_path = None
+ disk_idx_base = len(instance.disks)
+ new_disk = _GenerateDiskTemplate(self,
+ instance.disk_template,
+ instance, instance.primary_node,
+ instance.secondary_nodes,
+ [disk_dict],
+ file_path,
+ file_driver,
+ disk_idx_base)[0]
+ new_disk.mode = disk_dict['mode']
+ instance.disks.append(new_disk)
+ info = _GetInstanceInfoText(instance)
+
+ logging.info("Creating volume %s for instance %s",
+ new_disk.iv_name, instance.name)
+ # Note: this needs to be kept in sync with _CreateDisks
+ #HARDCODE
+ for secondary_node in instance.secondary_nodes:
+ if not _CreateBlockDevOnSecondary(self, secondary_node, instance,
+ new_disk, False, info):
+ self.LogWarning("Failed to create volume %s (%s) on"
+ " secondary node %s!",
+ new_disk.iv_name, new_disk, secondary_node)
+ #HARDCODE
+ if not _CreateBlockDevOnPrimary(self, instance.primary_node,
+ instance, new_disk, info):
+ self.LogWarning("Failed to create volume %s on primary!",
+ new_disk.iv_name)
+ result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
+ (new_disk.size, new_disk.mode)))
+ else:
+ # change a given disk
+ instance.disks[disk_op].mode = disk_dict['mode']
+ result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
+ # NIC changes
+ for nic_op, nic_dict in self.op.nics:
+ if nic_op == constants.DDM_REMOVE:
+ # remove the last nic
+ del instance.nics[-1]
+ result.append(("nic.%d" % len(instance.nics), "remove"))
+ elif nic_op == constants.DDM_ADD:
+ # add a new nic
+ if 'mac' not in nic_dict:
+ mac = constants.VALUE_GENERATE
+ else:
+ mac = nic_dict['mac']
+ if mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
+ mac = self.cfg.GenerateMAC()
+ new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
+ bridge=nic_dict.get('bridge', None))
+ instance.nics.append(new_nic)
+ result.append(("nic.%d" % (len(instance.nics) - 1),
+ "add:mac=%s,ip=%s,bridge=%s" %
+ (new_nic.mac, new_nic.ip, new_nic.bridge)))
+ else:
+ # change a given nic
+ for key in 'mac', 'ip', 'bridge':
+ if key in nic_dict:
+ setattr(instance.nics[nic_op], key, nic_dict[key])
+ result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
+
+ # hvparams changes
if self.op.hvparams:
instance.hvparams = self.hv_new
for key, val in self.op.hvparams.iteritems():
result.append(("hv/%s" % key, val))
+
+ # beparams changes
if self.op.beparams:
instance.beparams = self.be_inst
for key, val in self.op.beparams.iteritems():
def Exec(self, feedback_fn):
"""Compute the list of all the exported system images.
- Returns:
- a dictionary with the structure node->(export-list)
- where export-list is a list of the instances exported on
- that node.
+ @rtype: dict
+ @return: a dictionary with the structure node->(export-list)
+ where export-list is a list of the instances exported on
+ that node.
"""
- return self.rpc.call_export_list(self.nodes)
+ rpcresult = self.rpc.call_export_list(self.nodes)
+ result = {}
+ for node in rpcresult:
+ if rpcresult[node].failed:
+ result[node] = False
+ else:
+ result[node] = rpcresult[node].data
+
+ return result
class LUExportInstance(LogicalUnit):
self.dst_node = self.cfg.GetNodeInfo(
self.cfg.ExpandNodeName(self.op.target_node))
- assert self.dst_node is not None, \
- "Cannot retrieve locked node %s" % self.op.target_node
+ if self.dst_node is None:
+ # This is wrong node name, not a non-locked node
+ raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
# instance disk type verification
for disk in self.instance.disks:
src_node = instance.primary_node
if self.op.shutdown:
# shutdown the instance, but not the disks
- if not self.rpc.call_instance_shutdown(src_node, instance):
+ result = self.rpc.call_instance_shutdown(src_node, instance)
+ result.Raise()
+ if not result.data:
raise errors.OpExecError("Could not shutdown instance %s on node %s" %
(instance.name, src_node))
for disk in instance.disks:
# new_dev_name will be a snapshot of an lvm leaf of the one we passed
new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
-
- if not new_dev_name:
+ if new_dev_name.failed or not new_dev_name.data:
self.LogWarning("Could not snapshot block device %s on node %s",
disk.logical_id[1], src_node)
snap_disks.append(False)
else:
new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
- logical_id=(vgname, new_dev_name),
- physical_id=(vgname, new_dev_name),
+ logical_id=(vgname, new_dev_name.data),
+ physical_id=(vgname, new_dev_name.data),
iv_name=disk.iv_name)
snap_disks.append(new_dev)
finally:
if self.op.shutdown and instance.status == "up":
- if not self.rpc.call_instance_start(src_node, instance, None):
+ result = self.rpc.call_instance_start(src_node, instance, None)
+ if result.failed or not result.data:
_ShutdownInstanceDisks(self, instance)
raise errors.OpExecError("Could not start instance")
cluster_name = self.cfg.GetClusterName()
for idx, dev in enumerate(snap_disks):
if dev:
- if not self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
- instance, cluster_name, idx):
+ result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
+ instance, cluster_name, idx)
+ if result.failed or not result.data:
self.LogWarning("Could not export block device %s from node %s to"
" node %s", dev.logical_id[1], src_node,
dst_node.name)
- if not self.rpc.call_blockdev_remove(src_node, dev):
+ result = self.rpc.call_blockdev_remove(src_node, dev)
+ if result.failed or not result.data:
self.LogWarning("Could not remove snapshot block device %s from node"
" %s", dev.logical_id[1], src_node)
- if not self.rpc.call_finalize_export(dst_node.name, instance, snap_disks):
+ result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
+ if result.failed or not result.data:
self.LogWarning("Could not finalize export for instance %s on node %s",
instance.name, dst_node.name)
if nodelist:
exportlist = self.rpc.call_export_list(nodelist)
for node in exportlist:
- if instance.name in exportlist[node]:
+ if exportlist[node].failed:
+ continue
+ if instance.name in exportlist[node].data:
if not self.rpc.call_export_remove(node, instance.name):
self.LogWarning("Could not remove older export for instance %s"
" on node %s", instance.name, node)
locking.LEVEL_NODE])
found = False
for node in exportlist:
- if instance_name in exportlist[node]:
+ if exportlist[node].failed:
+ self.LogWarning("Failed to query node %s, continuing" % node)
+ continue
+ if instance_name in exportlist[node].data:
found = True
- if not self.rpc.call_export_remove(node, instance_name):
+ result = self.rpc.call_export_remove(node, instance_name)
+ if result.failed or not result.data:
logging.error("Could not remove export for instance %s"
" on node %s", instance_name, node)
if not result:
raise errors.OpExecError("Complete failure from rpc call")
for node, node_result in result.items():
- if not node_result:
+ node_result.Raise()
+ if not node_result.data:
raise errors.OpExecError("Failure during rpc call to node %s,"
- " result: %s" % (node, node_result))
+ " result: %s" % (node, node_result.data))
class IAllocator(object):
"""
_ALLO_KEYS = [
"mem_size", "disks", "disk_template",
- "os", "tags", "nics", "vcpus",
+ "os", "tags", "nics", "vcpus", "hypervisor",
]
_RELO_KEYS = [
"relocate_from",
"enable_hypervisors": list(cluster_info.enabled_hypervisors),
# we don't have job IDs
}
-
- i_list = []
- cluster = self.cfg.GetClusterInfo()
- for iname in cfg.GetInstanceList():
- i_obj = cfg.GetInstanceInfo(iname)
- i_list.append((i_obj, cluster.FillBE(i_obj)))
+ iinfo = cfg.GetAllInstancesInfo().values()
+ i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
# node data
node_results = {}
node_list = cfg.GetNodeList()
- # FIXME: here we have only one hypervisor information, but
- # instance can belong to different hypervisors
+
+ if self.mode == constants.IALLOCATOR_MODE_ALLOC:
+ hypervisor = self.hypervisor
+ elif self.mode == constants.IALLOCATOR_MODE_RELOC:
+ hypervisor = cfg.GetInstanceInfo(self.name).hypervisor
+
node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
- cfg.GetHypervisorType())
+ hypervisor)
+ node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
+ cluster_info.enabled_hypervisors)
for nname in node_list:
ninfo = cfg.GetNodeInfo(nname)
- if nname not in node_data or not isinstance(node_data[nname], dict):
+ node_data[nname].Raise()
+ if not isinstance(node_data[nname].data, dict):
raise errors.OpExecError("Can't get data for node %s" % nname)
- remote_info = node_data[nname]
+ remote_info = node_data[nname].data
for attr in ['memory_total', 'memory_free', 'memory_dom0',
'vg_size', 'vg_free', 'cpu_total']:
if attr not in remote_info:
for iinfo, beinfo in i_list:
if iinfo.primary_node == nname:
i_p_mem += beinfo[constants.BE_MEMORY]
+ if iinfo.name not in node_iinfo[nname]:
+ i_used_mem = 0
+ else:
+ i_used_mem = int(node_iinfo[nname][iinfo.name]['memory'])
+ i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
+ remote_info['memory_free'] -= max(0, i_mem_diff)
+
if iinfo.status == "up":
i_p_up_mem += beinfo[constants.BE_MEMORY]
if len(self.disks) != 2:
raise errors.OpExecError("Only two-disk configurations supported")
- disk_space = _ComputeDiskSize(self.disk_template,
- self.disks[0]["size"], self.disks[1]["size"])
+ disk_space = _ComputeDiskSize(self.disk_template, self.disks)
if self.disk_template in constants.DTS_NET_MIRROR:
self.required_nodes = 2
raise errors.OpPrereqError("Instance has not exactly one secondary node")
self.required_nodes = 1
-
- disk_space = _ComputeDiskSize(instance.disk_template,
- instance.disks[0].size,
- instance.disks[1].size)
+ disk_sizes = [{'size': disk.size} for disk in instance.disks]
+ disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
request = {
"type": "relocate",
data = self.in_text
result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
+ result.Raise()
- if not isinstance(result, (list, tuple)) or len(result) != 4:
+ if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
raise errors.OpExecError("Invalid result from master iallocator runner")
- rcode, stdout, stderr, fail = result
+ rcode, stdout, stderr, fail = result.data
if rcode == constants.IARUN_NOTFOUND:
raise errors.OpExecError("Can't find allocator '%s'" % name)
row["mode"] not in ['r', 'w']):
raise errors.OpPrereqError("Invalid contents of the"
" 'disks' parameter")
+ if self.op.hypervisor is None:
+ self.op.hypervisor = self.cfg.GetHypervisorType()
elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
if not hasattr(self.op, "name"):
raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
tags=self.op.tags,
nics=self.op.nics,
vcpus=self.op.vcpus,
+ hypervisor=self.op.hypervisor,
)
else:
ial = IAllocator(self,