import platform
import logging
import copy
+import random
from ganeti import ssh
from ganeti import utils
from ganeti import objects
from ganeti import opcodes
from ganeti import serializer
+from ganeti import ssconf
class LogicalUnit(object):
- implement BuildHooksEnv
- redefine HPATH and HTYPE
- optionally redefine their run requirements:
- REQ_MASTER: the LU needs to run on the master node
REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
Note that all commands require root permissions.
HPATH = None
HTYPE = None
_OP_REQP = []
- REQ_MASTER = True
REQ_BGL = True
def __init__(self, processor, op, context, rpc):
if attr_val is None:
raise errors.OpPrereqError("Required parameter '%s' missing" %
attr_name)
-
- if not self.cfg.IsCluster():
- raise errors.OpPrereqError("Cluster not initialized yet,"
- " use 'gnt-cluster init' first.")
- if self.REQ_MASTER:
- master = self.cfg.GetMasterNode()
- if master != utils.HostInfo().name:
- raise errors.OpPrereqError("Commands must be run on the master"
- " node %s" % master)
+ self.CheckArguments()
def __GetSSH(self):
"""Returns the SshRunner object
ssh = property(fget=__GetSSH)
+ def CheckArguments(self):
+ """Check syntactic validity for the opcode arguments.
+
+ This method is for doing a simple syntactic check and ensure
+ validity of opcode parameters, without any cluster-related
+ checks. While the same can be accomplished in ExpandNames and/or
+ CheckPrereq, doing these separate is better because:
+
+ - ExpandNames is left as as purely a lock-related function
+ - CheckPrereq is run after we have aquired locks (and possible
+ waited for them)
+
+ The function is allowed to change the self.op attribute so that
+ later methods can no longer worry about missing parameters.
+
+ """
+ pass
+
def ExpandNames(self):
"""Expand names for this LU.
"""
# check bridges existance
brlist = [nic.bridge for nic in instance.nics]
- if not lu.rpc.call_bridges_exist(instance.primary_node, brlist):
- raise errors.OpPrereqError("one or more target bridges %s does not"
+ result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
+ result.Raise()
+ if not result.data:
+ raise errors.OpPrereqError("One or more target bridges %s does not"
" exist on destination node '%s'" %
(brlist, instance.primary_node))
"""
master = self.cfg.GetMasterNode()
- if not self.rpc.call_node_stop_master(master, False):
+ result = self.rpc.call_node_stop_master(master, False)
+ result.Raise()
+ if not result.data:
raise errors.OpExecError("Could not disable the master role")
priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
utils.CreateBackup(priv_key)
}
self.share_locks = dict(((i, 1) for i in locking.LEVELS))
- def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
- remote_version, feedback_fn):
+ def _VerifyNode(self, nodeinfo, file_list, local_cksum,
+ node_result, feedback_fn, master_files):
"""Run multiple tests against a node.
- Test list::
+ Test list:
- compares ganeti version
- checks vg existance and size > 20G
- checks config file checksum
- checks ssh to other nodes
- @type node: string
- @param node: the name of the node to check
+ @type nodeinfo: L{objects.Node}
+ @param nodeinfo: the node to check
@param file_list: required list of files
@param local_cksum: dictionary of local files and their checksums
- @type vglist: dict
- @param vglist: dictionary of volume group names and their size
@param node_result: the results from the node
- @param remote_version: the RPC version from the remote node
@param feedback_fn: function used to accumulate results
+ @param master_files: list of files that only masters should have
"""
+ node = nodeinfo.name
+
+ # main result, node_result should be a non-empty dict
+ if not node_result or not isinstance(node_result, dict):
+ feedback_fn(" - ERROR: unable to verify node %s." % (node,))
+ return True
+
# compares ganeti version
local_version = constants.PROTOCOL_VERSION
+ remote_version = node_result.get('version', None)
if not remote_version:
feedback_fn(" - ERROR: connection to %s failed" % (node))
return True
# checks vg existance and size > 20G
bad = False
+ vglist = node_result.get(constants.NV_VGLIST, None)
if not vglist:
feedback_fn(" - ERROR: unable to check volume groups on node %s." %
(node,))
feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
bad = True
- if not node_result:
- feedback_fn(" - ERROR: unable to verify node %s." % (node,))
- return True
-
# checks config file checksum
- # checks ssh to any
- if 'filelist' not in node_result:
+ remote_cksum = node_result.get(constants.NV_FILELIST, None)
+ if not isinstance(remote_cksum, dict):
bad = True
feedback_fn(" - ERROR: node hasn't returned file checksum data")
else:
- remote_cksum = node_result['filelist']
for file_name in file_list:
+ node_is_mc = nodeinfo.master_candidate
+ must_have_file = file_name not in master_files
if file_name not in remote_cksum:
- bad = True
- feedback_fn(" - ERROR: file '%s' missing" % file_name)
+ if node_is_mc or must_have_file:
+ bad = True
+ feedback_fn(" - ERROR: file '%s' missing" % file_name)
elif remote_cksum[file_name] != local_cksum[file_name]:
- bad = True
- feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
+ if node_is_mc or must_have_file:
+ bad = True
+ feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
+ else:
+ # not candidate and this is not a must-have file
+ bad = True
+ feedback_fn(" - ERROR: non master-candidate has old/wrong file"
+ " '%s'" % file_name)
+ else:
+ # all good, except non-master/non-must have combination
+ if not node_is_mc and not must_have_file:
+ feedback_fn(" - ERROR: file '%s' should not exist on non master"
+ " candidates" % file_name)
- if 'nodelist' not in node_result:
+ # checks ssh to any
+
+ if constants.NV_NODELIST not in node_result:
bad = True
feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
else:
- if node_result['nodelist']:
+ if node_result[constants.NV_NODELIST]:
bad = True
- for node in node_result['nodelist']:
+ for node in node_result[constants.NV_NODELIST]:
feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
- (node, node_result['nodelist'][node]))
- if 'node-net-test' not in node_result:
+ (node, node_result[constants.NV_NODELIST][node]))
+
+ if constants.NV_NODENETTEST not in node_result:
bad = True
feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
else:
- if node_result['node-net-test']:
+ if node_result[constants.NV_NODENETTEST]:
bad = True
- nlist = utils.NiceSort(node_result['node-net-test'].keys())
+ nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
for node in nlist:
feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
- (node, node_result['node-net-test'][node]))
+ (node, node_result[constants.NV_NODENETTEST][node]))
- hyp_result = node_result.get('hypervisor', None)
+ hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
if isinstance(hyp_result, dict):
for hv_name, hv_result in hyp_result.iteritems():
if hv_result is not None:
# FIXME: verify OS list
# do local checksums
- file_names = []
+ master_files = [constants.CLUSTER_CONF_FILE]
+
+ file_names = ssconf.SimpleStore().GetFileList()
file_names.append(constants.SSL_CERT_FILE)
- file_names.append(constants.CLUSTER_CONF_FILE)
+ file_names.extend(master_files)
+
local_checksums = utils.FingerprintFiles(file_names)
feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
- all_volumeinfo = self.rpc.call_volume_list(nodelist, vg_name)
- all_instanceinfo = self.rpc.call_instance_list(nodelist, hypervisors)
- all_vglist = self.rpc.call_vg_list(nodelist)
node_verify_param = {
- 'filelist': file_names,
- 'nodelist': nodelist,
- 'hypervisor': hypervisors,
- 'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
- for node in nodeinfo]
+ constants.NV_FILELIST: file_names,
+ constants.NV_NODELIST: nodelist,
+ constants.NV_HYPERVISOR: hypervisors,
+ constants.NV_NODENETTEST: [(node.name, node.primary_ip,
+ node.secondary_ip) for node in nodeinfo],
+ constants.NV_LVLIST: vg_name,
+ constants.NV_INSTANCELIST: hypervisors,
+ constants.NV_VGLIST: None,
+ constants.NV_VERSION: None,
+ constants.NV_HVINFO: self.cfg.GetHypervisorType(),
}
all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
self.cfg.GetClusterName())
- all_rversion = self.rpc.call_version(nodelist)
- all_ninfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(),
- self.cfg.GetHypervisorType())
cluster = self.cfg.GetClusterInfo()
- for node in nodelist:
- feedback_fn("* Verifying node %s" % node)
- result = self._VerifyNode(node, file_names, local_checksums,
- all_vglist[node], all_nvinfo[node],
- all_rversion[node], feedback_fn)
- bad = bad or result
+ master_node = self.cfg.GetMasterNode()
+ for node_i in nodeinfo:
+ node = node_i.name
+ nresult = all_nvinfo[node].data
+
+ if node == master_node:
+ ntype = "master"
+ elif node_i.master_candidate:
+ ntype = "master candidate"
+ else:
+ ntype = "regular"
+ feedback_fn("* Verifying node %s (%s)" % (node, ntype))
- # node_volume
- volumeinfo = all_volumeinfo[node]
+ if all_nvinfo[node].failed or not isinstance(nresult, dict):
+ feedback_fn(" - ERROR: connection to %s failed" % (node,))
+ bad = True
+ continue
- if isinstance(volumeinfo, basestring):
+ result = self._VerifyNode(node_i, file_names, local_checksums,
+ nresult, feedback_fn, master_files)
+ bad = bad or result
+
+ lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
+ if isinstance(lvdata, basestring):
feedback_fn(" - ERROR: LVM problem on node %s: %s" %
- (node, volumeinfo[-400:].encode('string_escape')))
+ (node, lvdata.encode('string_escape')))
bad = True
node_volume[node] = {}
- elif not isinstance(volumeinfo, dict):
- feedback_fn(" - ERROR: connection to %s failed" % (node,))
+ elif not isinstance(lvdata, dict):
+ feedback_fn(" - ERROR: connection to %s failed (lvlist)" % (node,))
bad = True
continue
else:
- node_volume[node] = volumeinfo
+ node_volume[node] = lvdata
# node_instance
- nodeinstance = all_instanceinfo[node]
- if type(nodeinstance) != list:
- feedback_fn(" - ERROR: connection to %s failed" % (node,))
+ idata = nresult.get(constants.NV_INSTANCELIST, None)
+ if not isinstance(idata, list):
+ feedback_fn(" - ERROR: connection to %s failed (instancelist)" %
+ (node,))
bad = True
continue
- node_instance[node] = nodeinstance
+ node_instance[node] = idata
# node_info
- nodeinfo = all_ninfo[node]
+ nodeinfo = nresult.get(constants.NV_HVINFO, None)
if not isinstance(nodeinfo, dict):
- feedback_fn(" - ERROR: connection to %s failed" % (node,))
+ feedback_fn(" - ERROR: connection to %s failed (hvinfo)" % (node,))
bad = True
continue
try:
node_info[node] = {
"mfree": int(nodeinfo['memory_free']),
- "dfree": int(nodeinfo['vg_free']),
+ "dfree": int(nresult[constants.NV_VGLIST][vg_name]),
"pinst": [],
"sinst": [],
# dictionary holding all instances this node is secondary for,
for node_name in hooks_results:
show_node_header = True
res = hooks_results[node_name]
- if res is False or not isinstance(res, list):
- feedback_fn(" Communication failure")
+ if res.failed or res.data is False or not isinstance(res.data, list):
+ feedback_fn(" Communication failure in hooks execution")
lu_result = 1
continue
- for script, hkr, output in res:
+ for script, hkr, output in res.data:
if hkr == constants.HKR_FAIL:
# The node header is only shown once, if there are
# failing hooks on that node
for node in nodes:
# node_volume
lvs = node_lvs[node]
-
+ if lvs.failed:
+ self.LogWarning("Connection to node %s failed: %s" %
+ (node, lvs.data))
+ continue
+ lvs = lvs.data
if isinstance(lvs, basestring):
logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
res_nlvm[node] = lvs
# shutdown the master IP
master = self.cfg.GetMasterNode()
- if not self.rpc.call_node_stop_master(master, False):
+ result = self.rpc.call_node_stop_master(master, False)
+ if result.failed or not result.data:
raise errors.OpExecError("Could not disable the master role")
try:
- # modify the sstore
- # TODO: sstore
- ss.SetKey(ss.SS_MASTER_IP, ip)
- ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
-
- # Distribute updated ss config to all nodes
- myself = self.cfg.GetNodeInfo(master)
- dist_nodes = self.cfg.GetNodeList()
- if myself.name in dist_nodes:
- dist_nodes.remove(myself.name)
-
- logging.debug("Copying updated ssconf data to all nodes")
- for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
- fname = ss.KeyToFilename(keyname)
- result = self.rpc.call_upload_file(dist_nodes, fname)
- for to_node in dist_nodes:
- if not result[to_node]:
- self.LogWarning("Copy of file %s to node %s failed",
- fname, to_node)
+ cluster = self.cfg.GetClusterInfo()
+ cluster.cluster_name = clustername
+ cluster.master_ip = ip
+ self.cfg.Update(cluster)
+
+ # update the known hosts file
+ ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
+ node_list = self.cfg.GetNodeList()
+ try:
+ node_list.remove(master)
+ except ValueError:
+ pass
+ result = self.rpc.call_upload_file(node_list,
+ constants.SSH_KNOWN_HOSTS_FILE)
+ for to_node, to_result in result.iteritems():
+ if to_result.failed or not to_result.data:
+ logging.error("Copy of file %s to node %s failed", fname, to_node)
+
finally:
- if not self.rpc.call_node_start_master(master, False):
+ result = self.rpc.call_node_start_master(master, False)
+ if result.failed or not result.data:
self.LogWarning("Could not re-enable the master role on"
" the master, please restart manually.")
_OP_REQP = []
REQ_BGL = False
+ def CheckParameters(self):
+ """Check parameters
+
+ """
+ if not hasattr(self.op, "candidate_pool_size"):
+ self.op.candidate_pool_size = None
+ if self.op.candidate_pool_size is not None:
+ try:
+ self.op.candidate_pool_size = int(self.op.candidate_pool_size)
+ except ValueError, err:
+ raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
+ str(err))
+ if self.op.candidate_pool_size < 1:
+ raise errors.OpPrereqError("At least one master candidate needed")
+
def ExpandNames(self):
# FIXME: in the future maybe other cluster params won't require checking on
# all nodes to be modified.
if self.op.vg_name:
vglist = self.rpc.call_vg_list(node_list)
for node in node_list:
- vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
+ if vglist[node].failed:
+ # ignoring down node
+ self.LogWarning("Node %s unreachable/error, ignoring" % node)
+ continue
+ vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
+ self.op.vg_name,
constants.MIN_VG_SIZE)
if vgstatus:
raise errors.OpPrereqError("Error on node '%s': %s" %
(node, vgstatus))
self.cluster = cluster = self.cfg.GetClusterInfo()
- # beparams changes do not need validation (we can't validate?),
- # but we still process here
+ # validate beparams changes
if self.op.beparams:
+ utils.CheckBEParams(self.op.beparams)
self.new_beparams = cluster.FillDict(
cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
if self.op.beparams:
self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
+ if self.op.candidate_pool_size is not None:
+ self.cluster.candidate_pool_size = self.op.candidate_pool_size
+
self.cfg.Update(self.cluster)
+ # we want to update nodes after the cluster so that if any errors
+ # happen, we have recorded and saved the cluster info
+ if self.op.candidate_pool_size is not None:
+ node_info = self.cfg.GetAllNodesInfo().values()
+ num_candidates = len([node for node in node_info
+ if node.master_candidate])
+ num_nodes = len(node_info)
+ if num_candidates < self.op.candidate_pool_size:
+ random.shuffle(node_info)
+ for node in node_info:
+ if num_candidates >= self.op.candidate_pool_size:
+ break
+ if node.master_candidate:
+ continue
+ node.master_candidate = True
+ self.LogInfo("Promoting node %s to master candidate", node.name)
+ self.cfg.Update(node)
+ self.context.ReaddNode(node)
+ num_candidates += 1
+ elif num_candidates > self.op.candidate_pool_size:
+ self.LogInfo("Note: more nodes are candidates (%d) than the new value"
+ " of candidate_pool_size (%d)" %
+ (num_candidates, self.op.candidate_pool_size))
+
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
"""Sleep and poll for an instance's disk to sync.
done = True
cumul_degraded = False
rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
- if not rstats:
+ if rstats.failed or not rstats.data:
lu.LogWarning("Can't get any data from node %s", node)
retries += 1
if retries >= 10:
" aborting." % node)
time.sleep(6)
continue
+ rstats = rstats.data
retries = 0
for i in range(len(rstats)):
mstat = rstats[i]
result = True
if on_primary or dev.AssembleOnSecondary():
rstats = lu.rpc.call_blockdev_find(node, dev)
- if not rstats:
+ if rstats.failed or not rstats.data:
logging.warning("Node %s: disk degraded, not found or node down", node)
result = False
else:
- result = result and (not rstats[idx])
+ result = result and (not rstats.data[idx])
if dev.children:
for child in dev.children:
result = result and _CheckDiskConsistency(lu, child, node, on_primary)
"""
all_os = {}
for node_name, nr in rlist.iteritems():
- if not nr:
+ if nr.failed or not nr.data:
continue
- for os_obj in nr:
+ for os_obj in nr.data:
if os_obj.name not in all_os:
# build a list of nodes for this os containing empty lists
# for each node in node_list
self.rpc.call_node_leave_cluster(node.name)
+ # Promote nodes to master candidate as needed
+ cp_size = self.cfg.GetClusterInfo().candidate_pool_size
+ node_info = self.cfg.GetAllNodesInfo().values()
+ num_candidates = len([n for n in node_info
+ if n.master_candidate])
+ num_nodes = len(node_info)
+ random.shuffle(node_info)
+ for node in node_info:
+ if num_candidates >= cp_size or num_candidates >= num_nodes:
+ break
+ if node.master_candidate:
+ continue
+ node.master_candidate = True
+ self.LogInfo("Promoting node %s to master candidate", node.name)
+ self.cfg.Update(node)
+ self.context.ReaddNode(node)
+ num_candidates += 1
+
class LUQueryNodes(NoHooksLU):
"""Logical unit for querying nodes.
"pinst_list", "sinst_list",
"pip", "sip", "tags",
"serial_no",
+ "master_candidate",
+ "master",
)
def ExpandNames(self):
node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
self.cfg.GetHypervisorType())
for name in nodenames:
- nodeinfo = node_data.get(name, None)
- if nodeinfo:
+ nodeinfo = node_data[name]
+ if not nodeinfo.failed and nodeinfo.data:
+ nodeinfo = nodeinfo.data
+ fn = utils.TryConvert
live_data[name] = {
- "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
- "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
- "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
- "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
- "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
- "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
- "bootid": nodeinfo['bootid'],
+ "mtotal": fn(int, nodeinfo.get('memory_total', None)),
+ "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
+ "mfree": fn(int, nodeinfo.get('memory_free', None)),
+ "dtotal": fn(int, nodeinfo.get('vg_size', None)),
+ "dfree": fn(int, nodeinfo.get('vg_free', None)),
+ "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
+ "bootid": nodeinfo.get('bootid', None),
}
else:
live_data[name] = {}
if secnode in node_to_secondary:
node_to_secondary[secnode].add(inst.name)
+ master_node = self.cfg.GetMasterNode()
+
# end data gathering
output = []
val = list(node.GetTags())
elif field == "serial_no":
val = node.serial_no
+ elif field == "master_candidate":
+ val = node.master_candidate
+ elif field == "master":
+ val = node.name == master_node
elif self._FIELDS_DYNAMIC.Matches(field):
val = live_data[node.name].get(field, None)
else:
output = []
for node in nodenames:
- if node not in volumes or not volumes[node]:
+ if node not in volumes or volumes[node].failed or not volumes[node].data:
continue
- node_vols = volumes[node][:]
+ node_vols = volumes[node].data[:]
node_vols.sort(key=lambda vol: vol['dev'])
for vol in node_vols:
raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
" based ping to noded port")
+ cp_size = self.cfg.GetClusterInfo().candidate_pool_size
+ node_info = self.cfg.GetAllNodesInfo().values()
+ num_candidates = len([n for n in node_info
+ if n.master_candidate])
+ master_candidate = num_candidates < cp_size
+
self.new_node = objects.Node(name=node,
primary_ip=primary_ip,
- secondary_ip=secondary_ip)
+ secondary_ip=secondary_ip,
+ master_candidate=master_candidate)
def Exec(self, feedback_fn):
"""Adds the new node to the cluster.
# check connectivity
result = self.rpc.call_version([node])[node]
- if result:
- if constants.PROTOCOL_VERSION == result:
+ result.Raise()
+ if result.data:
+ if constants.PROTOCOL_VERSION == result.data:
logging.info("Communication to node %s fine, sw version %s match",
- node, result)
+ node, result.data)
else:
raise errors.OpExecError("Version mismatch master version %s,"
" node version %s" %
- (constants.PROTOCOL_VERSION, result))
+ (constants.PROTOCOL_VERSION, result.data))
else:
raise errors.OpExecError("Cannot get version from the new node")
keyarray[2],
keyarray[3], keyarray[4], keyarray[5])
- if not result:
+ if result.failed or not result.data:
raise errors.OpExecError("Cannot transfer ssh keys to the new node")
# Add node to our /etc/hosts, and add key to known_hosts
utils.AddHostToEtcHosts(new_node.name)
if new_node.secondary_ip != new_node.primary_ip:
- if not self.rpc.call_node_has_ip_address(new_node.name,
- new_node.secondary_ip):
+ result = self.rpc.call_node_has_ip_address(new_node.name,
+ new_node.secondary_ip)
+ if result.failed or not result.data:
raise errors.OpExecError("Node claims it doesn't have the secondary ip"
" you gave (%s). Please fix and re-run this"
" command." % new_node.secondary_ip)
result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
self.cfg.GetClusterName())
for verifier in node_verify_list:
- if not result[verifier]:
+ if result[verifier].failed or not result[verifier].data:
raise errors.OpExecError("Cannot communicate with %s's node daemon"
" for remote verification" % verifier)
- if result[verifier]['nodelist']:
- for failed in result[verifier]['nodelist']:
+ if result[verifier].data['nodelist']:
+ for failed in result[verifier].data['nodelist']:
feedback_fn("ssh/hostname verification failed %s -> %s" %
(verifier, result[verifier]['nodelist'][failed]))
raise errors.OpExecError("ssh/hostname verification failed.")
logging.debug("Copying hosts and known_hosts to all nodes")
for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
result = self.rpc.call_upload_file(dist_nodes, fname)
- for to_node in dist_nodes:
- if not result[to_node]:
+ for to_node, to_result in result.iteritems():
+ if to_result.failed or not to_result.data:
logging.error("Copy of file %s to node %s failed", fname, to_node)
to_copy = []
to_copy.append(constants.VNC_PASSWORD_FILE)
for fname in to_copy:
result = self.rpc.call_upload_file([node], fname)
- if not result[node]:
+ if result[node].failed or not result[node]:
logging.error("Could not copy file %s to node %s", fname, node)
if self.op.readd:
self.context.AddNode(new_node)
+class LUSetNodeParams(LogicalUnit):
+ """Modifies the parameters of a node.
+
+ """
+ HPATH = "node-modify"
+ HTYPE = constants.HTYPE_NODE
+ _OP_REQP = ["node_name"]
+ REQ_BGL = False
+
+ def CheckArguments(self):
+ node_name = self.cfg.ExpandNodeName(self.op.node_name)
+ if node_name is None:
+ raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
+ self.op.node_name = node_name
+ if not hasattr(self.op, 'master_candidate'):
+ raise errors.OpPrereqError("Please pass at least one modification")
+ self.op.master_candidate = bool(self.op.master_candidate)
+
+ def ExpandNames(self):
+ self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
+
+ def BuildHooksEnv(self):
+ """Build hooks env.
+
+ This runs on the master node.
+
+ """
+ env = {
+ "OP_TARGET": self.op.node_name,
+ "MASTER_CANDIDATE": str(self.op.master_candidate),
+ }
+ nl = [self.cfg.GetMasterNode(),
+ self.op.node_name]
+ return env, nl, nl
+
+ def CheckPrereq(self):
+ """Check prerequisites.
+
+ This only checks the instance list against the existing names.
+
+ """
+ force = self.force = self.op.force
+
+ if self.op.master_candidate == False:
+ if self.op.node_name == self.cfg.GetMasterNode():
+ raise errors.OpPrereqError("The master node has to be a"
+ " master candidate")
+ cp_size = self.cfg.GetClusterInfo().candidate_pool_size
+ node_info = self.cfg.GetAllNodesInfo().values()
+ num_candidates = len([node for node in node_info
+ if node.master_candidate])
+ if num_candidates <= cp_size:
+ msg = ("Not enough master candidates (desired"
+ " %d, new value will be %d)" % (cp_size, num_candidates-1))
+ if force:
+ self.LogWarning(msg)
+ else:
+ raise errors.OpPrereqError(msg)
+
+ return
+
+ def Exec(self, feedback_fn):
+ """Modifies a node.
+
+ """
+ node = self.cfg.GetNodeInfo(self.op.node_name)
+
+ result = []
+
+ if self.op.master_candidate is not None:
+ node.master_candidate = self.op.master_candidate
+ result.append(("master_candidate", str(self.op.master_candidate)))
+
+ # this will trigger configuration file update, if needed
+ self.cfg.Update(node)
+ # this will trigger job queue propagation or cleanup
+ if self.op.node_name != self.cfg.GetMasterNode():
+ self.context.ReaddNode(node)
+
+ return result
+
+
class LUQueryClusterInfo(NoHooksLU):
"""Query cluster configuration.
"""
_OP_REQP = []
- REQ_MASTER = False
REQ_BGL = False
def ExpandNames(self):
"enabled_hypervisors": cluster.enabled_hypervisors,
"hvparams": cluster.hvparams,
"beparams": cluster.beparams,
+ "candidate_pool_size": cluster.candidate_pool_size,
}
return result
for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
lu.cfg.SetDiskID(node_disk, node)
result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
- if not result:
+ if result.failed or not result:
lu.proc.LogWarning("Could not prepare block device %s on node %s"
" (is_primary=False, pass=1)",
inst_disk.iv_name, node)
continue
lu.cfg.SetDiskID(node_disk, node)
result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
- if not result:
+ if result.failed or not result:
lu.proc.LogWarning("Could not prepare block device %s on node %s"
" (is_primary=True, pass=2)",
inst_disk.iv_name, node)
ins_l = lu.rpc.call_instance_list([instance.primary_node],
[instance.hypervisor])
ins_l = ins_l[instance.primary_node]
- if not type(ins_l) is list:
+ if ins_l.failed or not isinstance(ins_l.data, list):
raise errors.OpExecError("Can't contact node '%s'" %
instance.primary_node)
- if instance.name in ins_l:
+ if instance.name in ins_l.data:
raise errors.OpExecError("Instance is running, can't shutdown"
" block devices.")
for disk in instance.disks:
for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
lu.cfg.SetDiskID(top_disk, node)
- if not lu.rpc.call_blockdev_shutdown(node, top_disk):
+ result = lu.rpc.call_blockdev_shutdown(node, top_disk)
+ if result.failed or not result.data:
logging.error("Could not shutdown block device %s on node %s",
disk.iv_name, node)
if not ignore_primary or node != instance.primary_node:
"""
nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
- if not nodeinfo or not isinstance(nodeinfo, dict):
- raise errors.OpPrereqError("Could not contact node %s for resource"
- " information" % (node,))
-
- free_mem = nodeinfo[node].get('memory_free')
+ nodeinfo[node].Raise()
+ free_mem = nodeinfo[node].data.get('memory_free')
if not isinstance(free_mem, int):
raise errors.OpPrereqError("Can't compute free memory on node %s, result"
" was '%s'" % (node, free_mem))
_StartInstanceDisks(self, instance, force)
- if not self.rpc.call_instance_start(node_current, instance, extra_args):
+ result = self.rpc.call_instance_start(node_current, instance, extra_args)
+ if result.failed or not result.data:
_ShutdownInstanceDisks(self, instance)
raise errors.OpExecError("Could not start instance")
if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
constants.INSTANCE_REBOOT_HARD]:
- if not self.rpc.call_instance_reboot(node_current, instance,
- reboot_type, extra_args):
+ result = self.rpc.call_instance_reboot(node_current, instance,
+ reboot_type, extra_args)
+ if result.failed or not result.data:
raise errors.OpExecError("Could not reboot instance")
else:
if not self.rpc.call_instance_shutdown(node_current, instance):
raise errors.OpExecError("could not shutdown instance for full reboot")
_ShutdownInstanceDisks(self, instance)
_StartInstanceDisks(self, instance, ignore_secondaries)
- if not self.rpc.call_instance_start(node_current, instance, extra_args):
+ result = self.rpc.call_instance_start(node_current, instance, extra_args)
+ if result.failed or not result.data:
_ShutdownInstanceDisks(self, instance)
raise errors.OpExecError("Could not start instance for full reboot")
instance = self.instance
node_current = instance.primary_node
self.cfg.MarkInstanceDown(instance.name)
- if not self.rpc.call_instance_shutdown(node_current, instance):
+ result = self.rpc.call_instance_shutdown(node_current, instance)
+ if result.failed or not result.data:
self.proc.LogWarning("Could not shutdown instance")
_ShutdownInstanceDisks(self, instance)
remote_info = self.rpc.call_instance_info(instance.primary_node,
instance.name,
instance.hypervisor)
- if remote_info:
+ if remote_info.failed or remote_info.data:
raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
(self.op.instance_name,
instance.primary_node))
if pnode is None:
raise errors.OpPrereqError("Primary node '%s' is unknown" %
self.op.pnode)
- os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
- if not os_obj:
+ result = self.rpc.call_os_get(pnode.name, self.op.os_type)
+ result.Raise()
+ if not isinstance(result.data, objects.OS):
raise errors.OpPrereqError("OS '%s' not in supported OS list for"
" primary node" % self.op.os_type)
_StartInstanceDisks(self, inst, None)
try:
feedback_fn("Running the instance OS create scripts...")
- if not self.rpc.call_instance_os_add(inst.primary_node, inst):
+ result = self.rpc.call_instance_os_add(inst.primary_node, inst)
+ result.Raise()
+ if not result.data:
raise errors.OpExecError("Could not install OS for instance %s"
" on node %s" %
(inst.name, inst.primary_node))
remote_info = self.rpc.call_instance_info(instance.primary_node,
instance.name,
instance.hypervisor)
- if remote_info:
+ remote_info.Raise()
+ if remote_info.data:
raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
(self.op.instance_name,
instance.primary_node))
result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
old_file_storage_dir,
new_file_storage_dir)
-
- if not result:
+ result.Raise()
+ if not result.data:
raise errors.OpExecError("Could not connect to node '%s' to rename"
" directory '%s' to '%s' (but the instance"
" has been renamed in Ganeti)" % (
inst.primary_node, old_file_storage_dir,
new_file_storage_dir))
- if not result[0]:
+ if not result.data[0]:
raise errors.OpExecError("Could not rename directory '%s' to '%s'"
" (but the instance has been renamed in"
" Ganeti)" % (old_file_storage_dir,
_StartInstanceDisks(self, inst, None)
try:
- if not self.rpc.call_instance_run_rename(inst.primary_node, inst,
- old_name):
+ result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
+ old_name)
+ if result.failed or not result.data:
msg = ("Could not run OS rename script for instance %s on node %s"
" (but the instance has been renamed in Ganeti)" %
(inst.name, inst.primary_node))
logging.info("Shutting down instance %s on node %s",
instance.name, instance.primary_node)
- if not self.rpc.call_instance_shutdown(instance.primary_node, instance):
+ result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
+ if result.failed or not result.data:
if self.op.ignore_failures:
feedback_fn("Warning: can't shutdown instance")
else:
node_data = self.rpc.call_all_instances_info(nodes, hv_list)
for name in nodes:
result = node_data[name]
- if result:
- live_data.update(result)
- elif result == False:
+ if result.failed:
bad_nodes.append(name)
- # else no instance is alive
+ else:
+ if result.data:
+ live_data.update(result.data)
+ # else no instance is alive
else:
live_data = dict([(name, {}) for name in instance_names])
# check bridge existance
brlist = [nic.bridge for nic in instance.nics]
- if not self.rpc.call_bridges_exist(target_node, brlist):
+ result = self.rpc.call_bridges_exist(target_node, brlist)
+ result.Raise()
+ if not result.data:
raise errors.OpPrereqError("One or more target bridges %s does not"
" exist on destination node '%s'" %
(brlist, target_node))
logging.info("Shutting down instance %s on node %s",
instance.name, source_node)
- if not self.rpc.call_instance_shutdown(source_node, instance):
+ result = self.rpc.call_instance_shutdown(source_node, instance)
+ if result.failed or not result.data:
if self.op.ignore_consistency:
self.proc.LogWarning("Could not shutdown instance %s on node %s."
" Proceeding"
raise errors.OpExecError("Can't activate the instance's disks")
feedback_fn("* starting the instance on the target node")
- if not self.rpc.call_instance_start(target_node, instance, None):
+ result = self.rpc.call_instance_start(target_node, instance, None)
+ if result.failed or not result.data:
_ShutdownInstanceDisks(self, instance)
raise errors.OpExecError("Could not start instance %s on node %s." %
(instance.name, target_node))
lu.cfg.SetDiskID(device, node)
new_id = lu.rpc.call_blockdev_create(node, device, device.size,
instance.name, True, info)
- if not new_id:
+ if new_id.failed or not new_id.data:
return False
if device.physical_id is None:
device.physical_id = new_id
lu.cfg.SetDiskID(device, node)
new_id = lu.rpc.call_blockdev_create(node, device, device.size,
instance.name, False, info)
- if not new_id:
+ if new_id.failed or not new_id.data:
return False
if device.physical_id is None:
device.physical_id = new_id
def _GenerateDiskTemplate(lu, template_name,
instance_name, primary_node,
secondary_nodes, disk_info,
- file_storage_dir, file_driver):
+ file_storage_dir, file_driver,
+ base_index):
"""Generate the entire disk layout for a given template type.
"""
names = _GenerateUniqueNames(lu, [".disk%d" % i
for i in range(disk_count)])
for idx, disk in enumerate(disk_info):
+ disk_index = idx + base_index
disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
logical_id=(vgname, names[idx]),
- iv_name = "disk/%d" % idx)
+ iv_name="disk/%d" % disk_index)
disks.append(disk_dev)
elif template_name == constants.DT_DRBD8:
if len(secondary_nodes) != 1:
for s in ("data", "meta")
])
for idx, disk in enumerate(disk_info):
+ disk_index = idx + base_index
disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
disk["size"], names[idx*2:idx*2+2],
- "disk/%d" % idx,
+ "disk/%d" % disk_index,
minors[idx*2], minors[idx*2+1])
disks.append(disk_dev)
elif template_name == constants.DT_FILE:
raise errors.ProgrammerError("Wrong template configuration")
for idx, disk in enumerate(disk_info):
-
+ disk_index = idx + base_index
disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
- iv_name="disk/%d" % idx,
+ iv_name="disk/%d" % disk_index,
logical_id=(file_driver,
"%s/disk%d" % (file_storage_dir,
idx)))
result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
file_storage_dir)
- if not result:
+ if result.failed or not result.data:
logging.error("Could not connect to node '%s'", instance.primary_node)
return False
- if not result[0]:
+ if not result.data[0]:
logging.error("Failed to create directory '%s'", file_storage_dir)
return False
+ # Note: this needs to be kept in sync with adding of disks in
+ # LUSetInstanceParams
for device in instance.disks:
logging.info("Creating volume %s for instance %s",
device.iv_name, instance.name)
for device in instance.disks:
for node, disk in device.ComputeNodeTree(instance.primary_node):
lu.cfg.SetDiskID(disk, node)
- if not lu.rpc.call_blockdev_remove(node, disk):
+ result = lu.rpc.call_blockdev_remove(node, disk)
+ if result.failed or not result.data:
lu.proc.LogWarning("Could not remove block device %s on node %s,"
" continuing anyway", device.iv_name, node)
result = False
if instance.disk_template == constants.DT_FILE:
file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
- if not lu.rpc.call_file_storage_dir_remove(instance.primary_node,
- file_storage_dir):
+ result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
+ file_storage_dir)
+ if result.failed or not result.data:
logging.error("Could not remove directory '%s'", file_storage_dir)
result = False
hvname,
hvparams)
for node in nodenames:
- info = hvinfo.get(node, None)
- if not info or not isinstance(info, (tuple, list)):
+ info = hvinfo[node]
+ info.Raise()
+ if not info.data or not isinstance(info.data, (tuple, list)):
raise errors.OpPrereqError("Cannot get current information"
- " from node '%s' (%s)" % (node, info))
- if not info[0]:
+ " from node '%s' (%s)" % (node, info.data))
+ if not info.data[0]:
raise errors.OpPrereqError("Hypervisor parameter validation failed:"
- " %s" % info[1])
+ " %s" % info.data[1])
class LUCreateInstance(LogicalUnit):
hv_type.CheckParameterSyntax(filled_hvp)
# fill and remember the beparams dict
+ utils.CheckBEParams(self.op.beparams)
self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
self.op.beparams)
src_node = getattr(self.op, "src_node", None)
src_path = getattr(self.op, "src_path", None)
- if src_node is None or src_path is None:
- raise errors.OpPrereqError("Importing an instance requires source"
- " node and path options")
-
- if not os.path.isabs(src_path):
- raise errors.OpPrereqError("The source path must be absolute")
+ if src_path is None:
+ self.op.src_path = src_path = self.op.instance_name
- self.op.src_node = src_node = self._ExpandNode(src_node)
- if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
- self.needed_locks[locking.LEVEL_NODE].append(src_node)
+ if src_node is None:
+ self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+ self.op.src_node = None
+ if os.path.isabs(src_path):
+ raise errors.OpPrereqError("Importing an instance from an absolute"
+ " path requires a source node option.")
+ else:
+ self.op.src_node = src_node = self._ExpandNode(src_node)
+ if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
+ self.needed_locks[locking.LEVEL_NODE].append(src_node)
+ if not os.path.isabs(src_path):
+ self.op.src_path = src_path = \
+ os.path.join(constants.EXPORT_DIR, src_path)
else: # INSTANCE_CREATE
if getattr(self.op, "os_type", None) is None:
src_node = self.op.src_node
src_path = self.op.src_path
- export_info = self.rpc.call_export_info(src_node, src_path)
-
- if not export_info:
+ if src_node is None:
+ exp_list = self.rpc.call_export_list(
+ self.acquired_locks[locking.LEVEL_NODE])
+ found = False
+ for node in exp_list:
+ if not exp_list[node].failed and src_path in exp_list[node].data:
+ found = True
+ self.op.src_node = src_node = node
+ self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
+ src_path)
+ break
+ if not found:
+ raise errors.OpPrereqError("No export found for relative path %s" %
+ src_path)
+
+ result = self.rpc.call_export_info(src_node, src_path)
+ result.Raise()
+ if not result.data:
raise errors.OpPrereqError("No export found in dir %s" % src_path)
+ export_info = result.data
if not export_info.has_section(constants.INISECT_EXP):
raise errors.ProgrammerError("Corrupted export config")
if instance_disks < export_disks:
raise errors.OpPrereqError("Not enough disks to import."
" (instance: %d, export: %d)" %
- (2, export_disks))
+ (instance_disks, export_disks))
self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
disk_images = []
self.src_images = disk_images
- if self.op.mac == constants.VALUE_AUTO:
- old_name = export_info.get(constants.INISECT_INS, 'name')
- if self.op.instance_name == old_name:
- # FIXME: adjust every nic, when we'll be able to create instances
- # with more than one
- if int(export_info.get(constants.INISECT_INS, 'nic_count')) >= 1:
- self.op.mac = export_info.get(constants.INISECT_INS, 'nic_0_mac')
+ old_name = export_info.get(constants.INISECT_INS, 'name')
+ # FIXME: int() here could throw a ValueError on broken exports
+ exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
+ if self.op.instance_name == old_name:
+ for idx, nic in enumerate(self.nics):
+ if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
+ nic_mac_ini = 'nic%d_mac' % idx
+ nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
# ip ping checks (we use the same ip that was resolved in ExpandNames)
-
if self.op.start and not self.op.ip_check:
raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
" adding an instance in start mode")
nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
self.op.hypervisor)
for node in nodenames:
- info = nodeinfo.get(node, None)
+ info = nodeinfo[node]
+ info.Raise()
+ info = info.data
if not info:
raise errors.OpPrereqError("Cannot get current information"
" from node '%s'" % node)
_CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
# os verification
- os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
- if not os_obj:
+ result = self.rpc.call_os_get(pnode.name, self.op.os_type)
+ result.Raise()
+ if not isinstance(result.data, objects.OS):
raise errors.OpPrereqError("OS '%s' not in supported os list for"
" primary node" % self.op.os_type)
# bridge check on primary node
bridges = [n.bridge for n in self.nics]
- if not self.rpc.call_bridges_exist(self.pnode.name, bridges):
- raise errors.OpPrereqError("one of the target bridges '%s' does not"
- " exist on"
- " destination node '%s'" %
+ result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
+ result.Raise()
+ if not result.data:
+ raise errors.OpPrereqError("One of the target bridges '%s' does not"
+ " exist on destination node '%s'" %
(",".join(bridges), pnode.name))
# memory check on primary node
self.secondaries,
self.disks,
file_storage_dir,
- self.op.file_driver)
+ self.op.file_driver,
+ 0)
iobj = objects.Instance(name=instance, os=self.op.os_type,
primary_node=pnode_name,
del self.remove_locks[locking.LEVEL_INSTANCE]
# Remove the temp. assignements for the instance's drbds
self.cfg.ReleaseDRBDMinors(instance)
+ # Unlock all the nodes
+ if self.op.mode == constants.INSTANCE_IMPORT:
+ nodes_keep = [self.op.src_node]
+ nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
+ if node != self.op.src_node]
+ self.context.glm.release(locking.LEVEL_NODE, nodes_release)
+ self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
+ else:
+ self.context.glm.release(locking.LEVEL_NODE)
+ del self.acquired_locks[locking.LEVEL_NODE]
if self.op.wait_for_sync:
disk_abort = not _WaitForSync(self, iobj)
if iobj.disk_template != constants.DT_DISKLESS:
if self.op.mode == constants.INSTANCE_CREATE:
feedback_fn("* running the instance OS create scripts...")
- if not self.rpc.call_instance_os_add(pnode_name, iobj):
- raise errors.OpExecError("could not add os for instance %s"
+ result = self.rpc.call_instance_os_add(pnode_name, iobj)
+ result.Raise()
+ if not result.data:
+ raise errors.OpExecError("Could not add os for instance %s"
" on node %s" %
(instance, pnode_name))
import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
src_node, src_images,
cluster_name)
- for idx, result in enumerate(import_result):
+ import_result.Raise()
+ for idx, result in enumerate(import_result.data):
if not result:
- self.LogWarning("Could not image %s for on instance %s, disk %d,"
- " on node %s" % (src_images[idx], instance, idx,
- pnode_name))
+ self.LogWarning("Could not import the image %s for instance"
+ " %s, disk %d, on node %s" %
+ (src_images[idx], instance, idx, pnode_name))
else:
# also checked in the prereq part
raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
if self.op.start:
logging.info("Starting instance %s on node %s", instance, pnode_name)
feedback_fn("* starting instance...")
- if not self.rpc.call_instance_start(pnode_name, iobj, None):
+ result = self.rpc.call_instance_start(pnode_name, iobj, None)
+ result.Raise()
+ if not result.data:
raise errors.OpExecError("Could not start instance")
node_insts = self.rpc.call_instance_list([node],
[instance.hypervisor])[node]
- if node_insts is False:
- raise errors.OpExecError("Can't connect to node %s." % node)
+ node_insts.Raise()
- if instance.name not in node_insts:
+ if instance.name not in node_insts.data:
raise errors.OpExecError("Instance %s is not running." % instance.name)
logging.debug("Connecting to console of %s on %s", instance.name, node)
if not results:
raise errors.OpExecError("Can't list volume groups on the nodes")
for node in oth_node, tgt_node:
- res = results.get(node, False)
- if not res or my_vg not in res:
+ res = results[node]
+ if res.failed or not res.data or my_vg not in res.data:
raise errors.OpExecError("Volume group '%s' not found on %s" %
(my_vg, node))
for idx, dev in enumerate(instance.disks):
self.proc.LogStep(4, steps_total, "change drbd configuration")
for dev, old_lvs, new_lvs in iv_names.itervalues():
info("detaching %s drbd from local storage" % dev.iv_name)
- if not self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
+ result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
+ result.Raise()
+ if not result.data:
raise errors.OpExecError("Can't detach drbd from local storage on node"
" %s for device %s" % (tgt_node, dev.iv_name))
#dev.children = []
rlist = []
for to_ren in old_lvs:
find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
- if find_res is not None: # device exists
+ if not find_res.failed and find_res.data is not None: # device exists
rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
info("renaming the old LVs on the target node")
- if not self.rpc.call_blockdev_rename(tgt_node, rlist):
+ result = self.rpc.call_blockdev_rename(tgt_node, rlist)
+ result.Raise()
+ if not result.data:
raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
# now we rename the new LVs to the old LVs
info("renaming the new LVs on the target node")
rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
- if not self.rpc.call_blockdev_rename(tgt_node, rlist):
+ result = self.rpc.call_blockdev_rename(tgt_node, rlist)
+ result.Raise()
+ if not result.data:
raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
for old, new in zip(old_lvs, new_lvs):
# now that the new lvs have the old name, we can add them to the device
info("adding new mirror component on %s" % tgt_node)
- if not self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
+ result =self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
+ if result.failed or not result.data:
for new_lv in new_lvs:
- if not self.rpc.call_blockdev_remove(tgt_node, new_lv):
+ result = self.rpc.call_blockdev_remove(tgt_node, new_lv)
+ if result.failed or not result.data:
warning("Can't rollback device %s", hint="manually cleanup unused"
" logical volumes")
raise errors.OpExecError("Can't add local storage to drbd")
# so check manually all the devices
for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
cfg.SetDiskID(dev, instance.primary_node)
- is_degr = self.rpc.call_blockdev_find(instance.primary_node, dev)[5]
- if is_degr:
+ result = self.rpc.call_blockdev_find(instance.primary_node, dev)
+ if result.failed or result.data[5]:
raise errors.OpExecError("DRBD device %s is degraded!" % name)
# Step: remove old storage
info("remove logical volumes for %s" % name)
for lv in old_lvs:
cfg.SetDiskID(lv, tgt_node)
- if not self.rpc.call_blockdev_remove(tgt_node, lv):
+ result = self.rpc.call_blockdev_remove(tgt_node, lv)
+ if result.failed or not result.data:
warning("Can't remove old LV", hint="manually remove unused LVs")
continue
info("checking volume groups")
my_vg = cfg.GetVGName()
results = self.rpc.call_vg_list([pri_node, new_node])
- if not results:
- raise errors.OpExecError("Can't list volume groups on the nodes")
for node in pri_node, new_node:
- res = results.get(node, False)
- if not res or my_vg not in res:
+ res = results[node]
+ if res.failed or not res.data or my_vg not in res.data:
raise errors.OpExecError("Volume group '%s' not found on %s" %
(my_vg, node))
for idx, dev in enumerate(instance.disks):
continue
info("checking disk/%d on %s" % (idx, pri_node))
cfg.SetDiskID(dev, pri_node)
- if not self.rpc.call_blockdev_find(pri_node, dev):
+ result = self.rpc.call_blockdev_find(pri_node, dev)
+ result.Raise()
+ if not result.data:
raise errors.OpExecError("Can't find disk/%d on node %s" %
(idx, pri_node))
# we have new devices, shutdown the drbd on the old secondary
info("shutting down drbd for disk/%d on old node" % idx)
cfg.SetDiskID(dev, old_node)
- if not self.rpc.call_blockdev_shutdown(old_node, dev):
+ result = self.rpc.call_blockdev_shutdown(old_node, dev)
+ if result.failed or not result.data:
warning("Failed to shutdown drbd for disk/%d on old node" % idx,
hint="Please cleanup this device manually as soon as possible")
dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
# and 'find' the device, which will 'fix' it to match the
# standalone state
- if self.rpc.call_blockdev_find(pri_node, dev):
+ result = self.rpc.call_blockdev_find(pri_node, dev)
+ if not result.failed and result.data:
done += 1
else:
warning("Failed to detach drbd disk/%d from network, unusual case" %
# is correct
cfg.SetDiskID(dev, pri_node)
logging.debug("Disk to attach: %s", dev)
- if not self.rpc.call_blockdev_find(pri_node, dev):
+ result = self.rpc.call_blockdev_find(pri_node, dev)
+ if result.failed or not result.data:
warning("can't attach drbd disk/%d to new secondary!" % idx,
"please do a gnt-instance info to see the status of disks")
# so check manually all the devices
for idx, (dev, old_lvs, _) in iv_names.iteritems():
cfg.SetDiskID(dev, pri_node)
- is_degr = self.rpc.call_blockdev_find(pri_node, dev)[5]
- if is_degr:
+ result = self.rpc.call_blockdev_find(pri_node, dev)
+ result.Raise()
+ if result.data[5]:
raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
self.proc.LogStep(6, steps_total, "removing old storage")
info("remove logical volumes for disk/%d" % idx)
for lv in old_lvs:
cfg.SetDiskID(lv, old_node)
- if not self.rpc.call_blockdev_remove(old_node, lv):
+ result = self.rpc.call_blockdev_remove(old_node, lv)
+ if result.failed or not result.data:
warning("Can't remove LV on old secondary",
hint="Cleanup stale volumes by hand")
nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
instance.hypervisor)
for node in nodenames:
- info = nodeinfo.get(node, None)
- if not info:
+ info = nodeinfo[node]
+ if info.failed or not info.data:
raise errors.OpPrereqError("Cannot get current information"
" from node '%s'" % node)
- vg_free = info.get('vg_free', None)
+ vg_free = info.data.get('vg_free', None)
if not isinstance(vg_free, int):
raise errors.OpPrereqError("Can't compute free disk space on"
" node %s" % node)
- if self.op.amount > info['vg_free']:
+ if self.op.amount > vg_free:
raise errors.OpPrereqError("Not enough disk space on target node %s:"
" %d MiB available, %d MiB required" %
- (node, info['vg_free'], self.op.amount))
+ (node, vg_free, self.op.amount))
def Exec(self, feedback_fn):
"""Execute disk grow.
for node in (instance.secondary_nodes + (instance.primary_node,)):
self.cfg.SetDiskID(disk, node)
result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
- if (not result or not isinstance(result, (list, tuple)) or
- len(result) != 2):
- raise errors.OpExecError("grow request failed to node %s" % node)
- elif not result[0]:
- raise errors.OpExecError("grow request failed to node %s: %s" %
- (node, result[1]))
+ result.Raise()
+ if (not result.data or not isinstance(result.data, (list, tuple)) or
+ len(result.data) != 2):
+ raise errors.OpExecError("Grow request failed to node %s" % node)
+ elif not result.data[0]:
+ raise errors.OpExecError("Grow request failed to node %s: %s" %
+ (node, result.data[1]))
disk.RecordGrow(self.op.amount)
self.cfg.Update(instance)
if self.op.wait_for_sync:
if not static:
self.cfg.SetDiskID(dev, instance.primary_node)
dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
+ dev_pstatus.Raise()
+ dev_pstatus = dev_pstatus.data
else:
dev_pstatus = None
if snode and not static:
self.cfg.SetDiskID(dev, snode)
dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
+ dev_sstatus.Raise()
+ dev_sstatus = dev_sstatus.data
else:
dev_sstatus = None
"pstatus": dev_pstatus,
"sstatus": dev_sstatus,
"children": dev_children,
+ "mode": dev.mode,
}
return data
remote_info = self.rpc.call_instance_info(instance.primary_node,
instance.name,
instance.hypervisor)
+ remote_info.Raise()
+ remote_info = remote_info.data
if remote_info and "state" in remote_info:
remote_state = "up"
else:
"""
HPATH = "instance-modify"
HTYPE = constants.HTYPE_INSTANCE
- _OP_REQP = ["instance_name", "hvparams"]
+ _OP_REQP = ["instance_name"]
REQ_BGL = False
+ def CheckArguments(self):
+ if not hasattr(self.op, 'nics'):
+ self.op.nics = []
+ if not hasattr(self.op, 'disks'):
+ self.op.disks = []
+ if not hasattr(self.op, 'beparams'):
+ self.op.beparams = {}
+ if not hasattr(self.op, 'hvparams'):
+ self.op.hvparams = {}
+ self.op.force = getattr(self.op, "force", False)
+ if not (self.op.nics or self.op.disks or
+ self.op.hvparams or self.op.beparams):
+ raise errors.OpPrereqError("No changes submitted")
+
+ utils.CheckBEParams(self.op.beparams)
+
+ # Disk validation
+ disk_addremove = 0
+ for disk_op, disk_dict in self.op.disks:
+ if disk_op == constants.DDM_REMOVE:
+ disk_addremove += 1
+ continue
+ elif disk_op == constants.DDM_ADD:
+ disk_addremove += 1
+ else:
+ if not isinstance(disk_op, int):
+ raise errors.OpPrereqError("Invalid disk index")
+ if disk_op == constants.DDM_ADD:
+ mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
+ if mode not in (constants.DISK_RDONLY, constants.DISK_RDWR):
+ raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
+ size = disk_dict.get('size', None)
+ if size is None:
+ raise errors.OpPrereqError("Required disk parameter size missing")
+ try:
+ size = int(size)
+ except ValueError, err:
+ raise errors.OpPrereqError("Invalid disk size parameter: %s" %
+ str(err))
+ disk_dict['size'] = size
+ else:
+ # modification of disk
+ if 'size' in disk_dict:
+ raise errors.OpPrereqError("Disk size change not possible, use"
+ " grow-disk")
+
+ if disk_addremove > 1:
+ raise errors.OpPrereqError("Only one disk add or remove operation"
+ " supported at a time")
+
+ # NIC validation
+ nic_addremove = 0
+ for nic_op, nic_dict in self.op.nics:
+ if nic_op == constants.DDM_REMOVE:
+ nic_addremove += 1
+ continue
+ elif nic_op == constants.DDM_ADD:
+ nic_addremove += 1
+ else:
+ if not isinstance(nic_op, int):
+ raise errors.OpPrereqError("Invalid nic index")
+
+ # nic_dict should be a dict
+ nic_ip = nic_dict.get('ip', None)
+ if nic_ip is not None:
+ if nic_ip.lower() == "none":
+ nic_dict['ip'] = None
+ else:
+ if not utils.IsValidIP(nic_ip):
+ raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
+ # we can only check None bridges and assign the default one
+ nic_bridge = nic_dict.get('bridge', None)
+ if nic_bridge is None:
+ nic_dict['bridge'] = self.cfg.GetDefBridge()
+ # but we can validate MACs
+ nic_mac = nic_dict.get('mac', None)
+ if nic_mac is not None:
+ if self.cfg.IsMacInUse(nic_mac):
+ raise errors.OpPrereqError("MAC address %s already in use"
+ " in cluster" % nic_mac)
+ if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
+ if not utils.IsValidMac(nic_mac):
+ raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
+ if nic_addremove > 1:
+ raise errors.OpPrereqError("Only one NIC add or remove operation"
+ " supported at a time")
+
def ExpandNames(self):
self._ExpandAndLockInstance()
self.needed_locks[locking.LEVEL_NODE] = []
self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
-
def DeclareLocks(self, level):
if level == locking.LEVEL_NODE:
self._LockInstancesNodes()
args['memory'] = self.be_new[constants.BE_MEMORY]
if constants.BE_VCPUS in self.be_new:
args['vcpus'] = self.be_new[constants.BE_VCPUS]
- if self.do_ip or self.do_bridge or self.mac:
- if self.do_ip:
- ip = self.ip
- else:
- ip = self.instance.nics[0].ip
- if self.bridge:
- bridge = self.bridge
- else:
- bridge = self.instance.nics[0].bridge
- if self.mac:
- mac = self.mac
- else:
- mac = self.instance.nics[0].mac
- args['nics'] = [(ip, bridge, mac)]
+ # FIXME: readd disk/nic changes
env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
nl = [self.cfg.GetMasterNode(),
self.instance.primary_node] + list(self.instance.secondary_nodes)
This only checks the instance list against the existing names.
"""
- # FIXME: all the parameters could be checked before, in ExpandNames, or in
- # a separate CheckArguments function, if we implement one, so the operation
- # can be aborted without waiting for any lock, should it have an error...
- self.ip = getattr(self.op, "ip", None)
- self.mac = getattr(self.op, "mac", None)
- self.bridge = getattr(self.op, "bridge", None)
- self.kernel_path = getattr(self.op, "kernel_path", None)
- self.initrd_path = getattr(self.op, "initrd_path", None)
- self.force = getattr(self.op, "force", None)
- all_parms = [self.ip, self.bridge, self.mac]
- if (all_parms.count(None) == len(all_parms) and
- not self.op.hvparams and
- not self.op.beparams):
- raise errors.OpPrereqError("No changes submitted")
- for item in (constants.BE_MEMORY, constants.BE_VCPUS):
- val = self.op.beparams.get(item, None)
- if val is not None:
- try:
- val = int(val)
- except ValueError, err:
- raise errors.OpPrereqError("Invalid %s size: %s" % (item, str(err)))
- self.op.beparams[item] = val
- if self.ip is not None:
- self.do_ip = True
- if self.ip.lower() == "none":
- self.ip = None
- else:
- if not utils.IsValidIP(self.ip):
- raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
- else:
- self.do_ip = False
- self.do_bridge = (self.bridge is not None)
- if self.mac is not None:
- if self.cfg.IsMacInUse(self.mac):
- raise errors.OpPrereqError('MAC address %s already in use in cluster' %
- self.mac)
- if not utils.IsValidMac(self.mac):
- raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
+ force = self.force = self.op.force
# checking the new params on the primary/secondary nodes
if self.op.hvparams:
i_hvdict = copy.deepcopy(instance.hvparams)
for key, val in self.op.hvparams.iteritems():
- if val is None:
+ if val == constants.VALUE_DEFAULT:
try:
del i_hvdict[key]
except KeyError:
pass
+ elif val == constants.VALUE_NONE:
+ i_hvdict[key] = None
else:
i_hvdict[key] = val
cluster = self.cfg.GetClusterInfo()
if self.op.beparams:
i_bedict = copy.deepcopy(instance.beparams)
for key, val in self.op.beparams.iteritems():
- if val is None:
+ if val == constants.VALUE_DEFAULT:
try:
del i_bedict[key]
except KeyError:
self.be_new = be_new # the new actual values
self.be_inst = i_bedict # the new dict (without defaults)
else:
- self.hv_new = self.hv_inst = {}
+ self.be_new = self.be_inst = {}
self.warn = []
instance.hypervisor)
nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
instance.hypervisor)
-
- if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
+ if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
# Assume the primary node is unreachable and go ahead
self.warn.append("Can't get info from primary node %s" % pnode)
else:
- if instance_info:
- current_mem = instance_info['memory']
+ if not instance_info.failed and instance_info.data:
+ current_mem = instance_info.data['memory']
else:
# Assume instance not running
# (there is a slight race condition here, but it's not very probable,
# and we have no other way to check)
current_mem = 0
miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
- nodeinfo[pnode]['memory_free'])
+ nodeinfo[pnode].data['memory_free'])
if miss_mem > 0:
raise errors.OpPrereqError("This change will prevent the instance"
" from starting, due to %d MB of memory"
" missing on its primary node" % miss_mem)
if be_new[constants.BE_AUTO_BALANCE]:
- for node in instance.secondary_nodes:
- if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
+ for node, nres in instance.secondary_nodes.iteritems():
+ if nres.failed or not isinstance(nres.data, dict):
self.warn.append("Can't get info from secondary node %s" % node)
- elif be_new[constants.BE_MEMORY] > nodeinfo[node]['memory_free']:
+ elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
self.warn.append("Not enough memory to failover instance to"
" secondary node %s" % node)
+ # NIC processing
+ for nic_op, nic_dict in self.op.nics:
+ if nic_op == constants.DDM_REMOVE:
+ if not instance.nics:
+ raise errors.OpPrereqError("Instance has no NICs, cannot remove")
+ continue
+ if nic_op != constants.DDM_ADD:
+ # an existing nic
+ if nic_op < 0 or nic_op >= len(instance.nics):
+ raise errors.OpPrereqError("Invalid NIC index %s, valid values"
+ " are 0 to %d" %
+ (nic_op, len(instance.nics)))
+ nic_bridge = nic_dict.get('bridge', None)
+ if nic_bridge is not None:
+ if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
+ msg = ("Bridge '%s' doesn't exist on one of"
+ " the instance nodes" % nic_bridge)
+ if self.force:
+ self.warn.append(msg)
+ else:
+ raise errors.OpPrereqError(msg)
+
+ # DISK processing
+ if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
+ raise errors.OpPrereqError("Disk operations not supported for"
+ " diskless instances")
+ for disk_op, disk_dict in self.op.disks:
+ if disk_op == constants.DDM_REMOVE:
+ if len(instance.disks) == 1:
+ raise errors.OpPrereqError("Cannot remove the last disk of"
+ " an instance")
+ ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
+ ins_l = ins_l[pnode]
+ if not type(ins_l) is list:
+ raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
+ if instance.name in ins_l:
+ raise errors.OpPrereqError("Instance is running, can't remove"
+ " disks.")
+
+ if (disk_op == constants.DDM_ADD and
+ len(instance.nics) >= constants.MAX_DISKS):
+ raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
+ " add more" % constants.MAX_DISKS)
+ if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
+ # an existing disk
+ if disk_op < 0 or disk_op >= len(instance.disks):
+ raise errors.OpPrereqError("Invalid disk index %s, valid values"
+ " are 0 to %d" %
+ (disk_op, len(instance.disks)))
+
return
def Exec(self, feedback_fn):
"""Modifies an instance.
All parameters take effect only at the next restart of the instance.
+
"""
# Process here the warnings from CheckPrereq, as we don't have a
# feedback_fn there.
result = []
instance = self.instance
- if self.do_ip:
- instance.nics[0].ip = self.ip
- result.append(("ip", self.ip))
- if self.bridge:
- instance.nics[0].bridge = self.bridge
- result.append(("bridge", self.bridge))
- if self.mac:
- instance.nics[0].mac = self.mac
- result.append(("mac", self.mac))
+ # disk changes
+ for disk_op, disk_dict in self.op.disks:
+ if disk_op == constants.DDM_REMOVE:
+ # remove the last disk
+ device = instance.disks.pop()
+ device_idx = len(instance.disks)
+ for node, disk in device.ComputeNodeTree(instance.primary_node):
+ self.cfg.SetDiskID(disk, node)
+ result = self.rpc.call_blockdev_remove(node, disk)
+ if result.failed or not result.data:
+ self.proc.LogWarning("Could not remove disk/%d on node %s,"
+ " continuing anyway", device_idx, node)
+ result.append(("disk/%d" % device_idx, "remove"))
+ elif disk_op == constants.DDM_ADD:
+ # add a new disk
+ if instance.disk_template == constants.DT_FILE:
+ file_driver, file_path = instance.disks[0].logical_id
+ file_path = os.path.dirname(file_path)
+ else:
+ file_driver = file_path = None
+ disk_idx_base = len(instance.disks)
+ new_disk = _GenerateDiskTemplate(self,
+ instance.disk_template,
+ instance, instance.primary_node,
+ instance.secondary_nodes,
+ [disk_dict],
+ file_path,
+ file_driver,
+ disk_idx_base)[0]
+ new_disk.mode = disk_dict['mode']
+ instance.disks.append(new_disk)
+ info = _GetInstanceInfoText(instance)
+
+ logging.info("Creating volume %s for instance %s",
+ new_disk.iv_name, instance.name)
+ # Note: this needs to be kept in sync with _CreateDisks
+ #HARDCODE
+ for secondary_node in instance.secondary_nodes:
+ if not _CreateBlockDevOnSecondary(self, secondary_node, instance,
+ new_disk, False, info):
+ self.LogWarning("Failed to create volume %s (%s) on"
+ " secondary node %s!",
+ new_disk.iv_name, new_disk, secondary_node)
+ #HARDCODE
+ if not _CreateBlockDevOnPrimary(self, instance.primary_node,
+ instance, new_disk, info):
+ self.LogWarning("Failed to create volume %s on primary!",
+ new_disk.iv_name)
+ result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
+ (new_disk.size, new_disk.mode)))
+ else:
+ # change a given disk
+ instance.disks[disk_op].mode = disk_dict['mode']
+ result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
+ # NIC changes
+ for nic_op, nic_dict in self.op.nics:
+ if nic_op == constants.DDM_REMOVE:
+ # remove the last nic
+ del instance.nics[-1]
+ result.append(("nic.%d" % len(instance.nics), "remove"))
+ elif nic_op == constants.DDM_ADD:
+ # add a new nic
+ if 'mac' not in nic_dict:
+ mac = constants.VALUE_GENERATE
+ else:
+ mac = nic_dict['mac']
+ if mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
+ mac = self.cfg.GenerateMAC()
+ new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
+ bridge=nic_dict.get('bridge', None))
+ instance.nics.append(new_nic)
+ result.append(("nic.%d" % (len(instance.nics) - 1),
+ "add:mac=%s,ip=%s,bridge=%s" %
+ (new_nic.mac, new_nic.ip, new_nic.bridge)))
+ else:
+ # change a given nic
+ for key in 'mac', 'ip', 'bridge':
+ if key in nic_dict:
+ setattr(instance.nics[nic_op], key, nic_dict[key])
+ result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
+
+ # hvparams changes
if self.op.hvparams:
instance.hvparams = self.hv_new
for key, val in self.op.hvparams.iteritems():
result.append(("hv/%s" % key, val))
+
+ # beparams changes
if self.op.beparams:
instance.beparams = self.be_inst
for key, val in self.op.beparams.iteritems():
that node.
"""
- return self.rpc.call_export_list(self.nodes)
+ rpcresult = self.rpc.call_export_list(self.nodes)
+ result = {}
+ for node in rpcresult:
+ if rpcresult[node].failed:
+ result[node] = False
+ else:
+ result[node] = rpcresult[node].data
+
+ return result
class LUExportInstance(LogicalUnit):
self.dst_node = self.cfg.GetNodeInfo(
self.cfg.ExpandNodeName(self.op.target_node))
- assert self.dst_node is not None, \
- "Cannot retrieve locked node %s" % self.op.target_node
+ if self.dst_node is None:
+ # This is wrong node name, not a non-locked node
+ raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
# instance disk type verification
for disk in self.instance.disks:
src_node = instance.primary_node
if self.op.shutdown:
# shutdown the instance, but not the disks
- if not self.rpc.call_instance_shutdown(src_node, instance):
+ result = self.rpc.call_instance_shutdown(src_node, instance)
+ result.Raise()
+ if not result.data:
raise errors.OpExecError("Could not shutdown instance %s on node %s" %
(instance.name, src_node))
for disk in instance.disks:
# new_dev_name will be a snapshot of an lvm leaf of the one we passed
new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
-
- if not new_dev_name:
+ if new_dev_name.failed or not new_dev_name.data:
self.LogWarning("Could not snapshot block device %s on node %s",
disk.logical_id[1], src_node)
snap_disks.append(False)
else:
new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
- logical_id=(vgname, new_dev_name),
- physical_id=(vgname, new_dev_name),
+ logical_id=(vgname, new_dev_name.data),
+ physical_id=(vgname, new_dev_name.data),
iv_name=disk.iv_name)
snap_disks.append(new_dev)
finally:
if self.op.shutdown and instance.status == "up":
- if not self.rpc.call_instance_start(src_node, instance, None):
+ result = self.rpc.call_instance_start(src_node, instance, None)
+ if result.failed or not result.data:
_ShutdownInstanceDisks(self, instance)
raise errors.OpExecError("Could not start instance")
cluster_name = self.cfg.GetClusterName()
for idx, dev in enumerate(snap_disks):
if dev:
- if not self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
- instance, cluster_name, idx):
+ result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
+ instance, cluster_name, idx)
+ if result.failed or not result.data:
self.LogWarning("Could not export block device %s from node %s to"
" node %s", dev.logical_id[1], src_node,
dst_node.name)
- if not self.rpc.call_blockdev_remove(src_node, dev):
+ result = self.rpc.call_blockdev_remove(src_node, dev)
+ if result.failed or not result.data:
self.LogWarning("Could not remove snapshot block device %s from node"
" %s", dev.logical_id[1], src_node)
- if not self.rpc.call_finalize_export(dst_node.name, instance, snap_disks):
+ result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
+ if result.failed or not result.data:
self.LogWarning("Could not finalize export for instance %s on node %s",
instance.name, dst_node.name)
if nodelist:
exportlist = self.rpc.call_export_list(nodelist)
for node in exportlist:
- if instance.name in exportlist[node]:
+ if exportlist[node].failed:
+ continue
+ if instance.name in exportlist[node].data:
if not self.rpc.call_export_remove(node, instance.name):
self.LogWarning("Could not remove older export for instance %s"
" on node %s", instance.name, node)
locking.LEVEL_NODE])
found = False
for node in exportlist:
- if instance_name in exportlist[node]:
+ if exportlist[node].failed:
+ self.LogWarning("Failed to query node %s, continuing" % node)
+ continue
+ if instance_name in exportlist[node].data:
found = True
- if not self.rpc.call_export_remove(node, instance_name):
+ result = self.rpc.call_export_remove(node, instance_name)
+ if result.failed or not result.data:
logging.error("Could not remove export for instance %s"
" on node %s", instance_name, node)
if not result:
raise errors.OpExecError("Complete failure from rpc call")
for node, node_result in result.items():
- if not node_result:
+ node_result.Raise()
+ if not node_result.data:
raise errors.OpExecError("Failure during rpc call to node %s,"
- " result: %s" % (node, node_result))
+ " result: %s" % (node, node_result.data))
class IAllocator(object):
node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
hypervisor)
+ node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
+ cluster_info.enabled_hypervisors)
for nname in node_list:
ninfo = cfg.GetNodeInfo(nname)
- if nname not in node_data or not isinstance(node_data[nname], dict):
+ node_data[nname].Raise()
+ if not isinstance(node_data[nname].data, dict):
raise errors.OpExecError("Can't get data for node %s" % nname)
- remote_info = node_data[nname]
+ remote_info = node_data[nname].data
for attr in ['memory_total', 'memory_free', 'memory_dom0',
'vg_size', 'vg_free', 'cpu_total']:
if attr not in remote_info:
for iinfo, beinfo in i_list:
if iinfo.primary_node == nname:
i_p_mem += beinfo[constants.BE_MEMORY]
+ if iinfo.name not in node_iinfo[nname]:
+ i_used_mem = 0
+ else:
+ i_used_mem = int(node_iinfo[nname][iinfo.name]['memory'])
+ i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
+ remote_info['memory_free'] -= max(0, i_mem_diff)
+
if iinfo.status == "up":
i_p_up_mem += beinfo[constants.BE_MEMORY]
data = self.in_text
result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
+ result.Raise()
- if not isinstance(result, (list, tuple)) or len(result) != 4:
+ if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
raise errors.OpExecError("Invalid result from master iallocator runner")
- rcode, stdout, stderr, fail = result
+ rcode, stdout, stderr, fail = result.data
if rcode == constants.IARUN_NOTFOUND:
raise errors.OpExecError("Can't find allocator '%s'" % name)