from ganeti import locking
from ganeti import constants
from ganeti import objects
-from ganeti import serializer
from ganeti import ssconf
from ganeti import uidpool
from ganeti import compat
from ganeti import ht
from ganeti import rpc
from ganeti import runtime
+from ganeti import pathutils
+from ganeti import vcluster
+from ganeti import network
+from ganeti.masterd import iallocator
import ganeti.masterd.instance # pylint: disable=W0611
-#: Size of DRBD meta block device
-DRBD_META_SIZE = 128
-
# States of instance
INSTANCE_DOWN = [constants.ADMINST_DOWN]
INSTANCE_ONLINE = [constants.ADMINST_DOWN, constants.ADMINST_UP]
self.owned_locks = context.glm.list_owned
self.context = context
self.rpc = rpc_runner
- # Dicts used to declare locking needs to mcpu
+
+ # Dictionaries used to declare locking needs to mcpu
self.needed_locks = None
self.share_locks = dict.fromkeys(locking.LEVELS, 0)
+ self.opportunistic_locks = dict.fromkeys(locking.LEVELS, False)
+
self.add_locks = {}
self.remove_locks = {}
+
# Used to force good behavior when calling helper functions
self.recalculate_locks = {}
+
# logging
self.Log = processor.Log # pylint: disable=C0103
self.LogWarning = processor.LogWarning # pylint: disable=C0103
return dict.fromkeys(locking.LEVELS, 1)
-def _MakeLegacyNodeInfo(data):
- """Formats the data returned by L{rpc.RpcRunner.call_node_info}.
-
- Converts the data into a single dictionary. This is fine for most use cases,
- but some require information from more than one volume group or hypervisor.
-
- """
- (bootid, (vg_info, ), (hv_info, )) = data
-
- return utils.JoinDisjointDicts(utils.JoinDisjointDicts(vg_info, hv_info), {
- "bootid": bootid,
- })
-
-
def _AnnotateDiskParams(instance, devs, cfg):
"""Little helper wrapper to the rpc annotation method.
"Instance %s has no node in group %s" % (name, cur_group_uuid)
-def _CheckInstanceNodeGroups(cfg, instance_name, owned_groups):
+def _CheckInstanceNodeGroups(cfg, instance_name, owned_groups,
+ primary_only=False):
"""Checks if the owned node groups are still correct for an instance.
@type cfg: L{config.ConfigWriter}
@param instance_name: Instance name
@type owned_groups: set or frozenset
@param owned_groups: List of currently owned node groups
+ @type primary_only: boolean
+ @param primary_only: Whether to check node groups for only the primary node
"""
- inst_groups = cfg.GetInstanceNodeGroups(instance_name)
+ inst_groups = cfg.GetInstanceNodeGroups(instance_name, primary_only)
if not owned_groups.issuperset(inst_groups):
raise errors.OpPrereqError("Instance %s's node groups changed since"
try:
hm.RunPhase(constants.HOOKS_PHASE_POST, nodes=[node_name])
except Exception, err: # pylint: disable=W0703
- lu.LogWarning("Errors occurred running hooks on %s: %s" % (node_name, err))
+ lu.LogWarning("Errors occurred running hooks on %s: %s",
+ node_name, err)
def _CheckOutputFields(static, dynamic, selected):
"""Reads the cluster domain secret.
"""
- return utils.ReadOneLineFile(constants.CLUSTER_DOMAIN_SECRET_FILE,
+ return utils.ReadOneLineFile(pathutils.CLUSTER_DOMAIN_SECRET_FILE,
strict=True)
"""
if msg is None:
- msg = "can't use instance from outside %s states" % ", ".join(req_states)
+ msg = ("can't use instance from outside %s states" %
+ utils.CommaJoin(req_states))
if instance.admin_state not in req_states:
raise errors.OpPrereqError("Instance '%s' is marked to be %s, %s" %
(instance.name, instance.admin_state, msg),
disk_sizes, spindle_use)
-def _ComputeIPolicyInstanceSpecViolation(ipolicy, instance_spec,
- _compute_fn=_ComputeIPolicySpecViolation):
+def _ComputeIPolicyInstanceSpecViolation(
+ ipolicy, instance_spec, _compute_fn=_ComputeIPolicySpecViolation):
"""Compute if instance specs meets the specs of ipolicy.
@type ipolicy: dict
return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
+def _BuildNetworkHookEnv(name, subnet, gateway, network6, gateway6,
+ network_type, mac_prefix, tags):
+ """Builds network related env variables for hooks
+
+ This builds the hook environment from individual variables.
+
+ @type name: string
+ @param name: the name of the network
+ @type subnet: string
+ @param subnet: the ipv4 subnet
+ @type gateway: string
+ @param gateway: the ipv4 gateway
+ @type network6: string
+ @param network6: the ipv6 subnet
+ @type gateway6: string
+ @param gateway6: the ipv6 gateway
+ @type network_type: string
+ @param network_type: the type of the network
+ @type mac_prefix: string
+ @param mac_prefix: the mac_prefix
+ @type tags: list
+ @param tags: the tags of the network
+
+ """
+ env = {}
+ if name:
+ env["NETWORK_NAME"] = name
+ if subnet:
+ env["NETWORK_SUBNET"] = subnet
+ if gateway:
+ env["NETWORK_GATEWAY"] = gateway
+ if network6:
+ env["NETWORK_SUBNET6"] = network6
+ if gateway6:
+ env["NETWORK_GATEWAY6"] = gateway6
+ if mac_prefix:
+ env["NETWORK_MAC_PREFIX"] = mac_prefix
+ if network_type:
+ env["NETWORK_TYPE"] = network_type
+ if tags:
+ env["NETWORK_TAGS"] = " ".join(tags)
+
+ return env
+
+
+def _BuildNetworkHookEnvByObject(net):
+ """Builds network related env varliables for hooks
+
+ @type net: L{objects.Network}
+ @param net: the network object
+
+ """
+ args = {
+ "name": net.name,
+ "subnet": net.network,
+ "gateway": net.gateway,
+ "network6": net.network6,
+ "gateway6": net.gateway6,
+ "network_type": net.network_type,
+ "mac_prefix": net.mac_prefix,
+ "tags": net.tags,
+ }
+
+ return _BuildNetworkHookEnv(**args) # pylint: disable=W0142
+
+
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
minmem, maxmem, vcpus, nics, disk_template, disks,
bep, hvp, hypervisor_name, tags):
@type vcpus: string
@param vcpus: the count of VCPUs the instance has
@type nics: list
- @param nics: list of tuples (ip, mac, mode, link) representing
+ @param nics: list of tuples (ip, mac, mode, link, network) representing
the NICs the instance has
@type disk_template: string
@param disk_template: the disk template of the instance
}
if nics:
nic_count = len(nics)
- for idx, (ip, mac, mode, link) in enumerate(nics):
+ for idx, (ip, mac, mode, link, net, netinfo) in enumerate(nics):
if ip is None:
ip = ""
env["INSTANCE_NIC%d_IP" % idx] = ip
env["INSTANCE_NIC%d_MAC" % idx] = mac
env["INSTANCE_NIC%d_MODE" % idx] = mode
env["INSTANCE_NIC%d_LINK" % idx] = link
+ if network:
+ env["INSTANCE_NIC%d_NETWORK" % idx] = net
+ if netinfo:
+ nobj = objects.Network.FromDict(netinfo)
+ if nobj.network:
+ env["INSTANCE_NIC%d_NETWORK_SUBNET" % idx] = nobj.network
+ if nobj.gateway:
+ env["INSTANCE_NIC%d_NETWORK_GATEWAY" % idx] = nobj.gateway
+ if nobj.network6:
+ env["INSTANCE_NIC%d_NETWORK_SUBNET6" % idx] = nobj.network6
+ if nobj.gateway6:
+ env["INSTANCE_NIC%d_NETWORK_GATEWAY6" % idx] = nobj.gateway6
+ if nobj.mac_prefix:
+ env["INSTANCE_NIC%d_NETWORK_MAC_PREFIX" % idx] = nobj.mac_prefix
+ if nobj.network_type:
+ env["INSTANCE_NIC%d_NETWORK_TYPE" % idx] = nobj.network_type
+ if nobj.tags:
+ env["INSTANCE_NIC%d_NETWORK_TAGS" % idx] = " ".join(nobj.tags)
if mode == constants.NIC_MODE_BRIDGED:
env["INSTANCE_NIC%d_BRIDGE" % idx] = link
else:
return env
+def _NICToTuple(lu, nic):
+ """Build a tupple of nic information.
+
+ @type lu: L{LogicalUnit}
+ @param lu: the logical unit on whose behalf we execute
+ @type nic: L{objects.NIC}
+ @param nic: nic to convert to hooks tuple
+
+ """
+ ip = nic.ip
+ mac = nic.mac
+ cluster = lu.cfg.GetClusterInfo()
+ filled_params = cluster.SimpleFillNIC(nic.nicparams)
+ mode = filled_params[constants.NIC_MODE]
+ link = filled_params[constants.NIC_LINK]
+ net = nic.network
+ netinfo = None
+ if net:
+ net_uuid = lu.cfg.LookupNetwork(net)
+ if net_uuid:
+ nobj = lu.cfg.GetNetwork(net_uuid)
+ netinfo = objects.Network.ToDict(nobj)
+ return (ip, mac, mode, link, net, netinfo)
+
+
def _NICListToTuple(lu, nics):
"""Build a list of nic information tuples.
"""
hooks_nics = []
- cluster = lu.cfg.GetClusterInfo()
for nic in nics:
- ip = nic.ip
- mac = nic.mac
- filled_params = cluster.SimpleFillNIC(nic.nicparams)
- mode = filled_params[constants.NIC_MODE]
- link = filled_params[constants.NIC_LINK]
- hooks_nics.append((ip, mac, mode, link))
+ hooks_nics.append(_NICToTuple(lu, nic))
return hooks_nics
return mc_now < mc_should
-def _CalculateGroupIPolicy(cluster, group):
- """Calculate instance policy for group.
-
- """
- return cluster.SimpleFillIPolicy(group.ipolicy)
-
-
def _ComputeViolatingInstances(ipolicy, instances):
"""Computes a set of instances who violates given ipolicy.
cluster-wide iallocator if appropriate.
Check that at most one of (iallocator, node) is specified. If none is
- specified, then the LU's opcode's iallocator slot is filled with the
- cluster-wide default iallocator.
+ specified, or the iallocator is L{constants.DEFAULT_IALLOCATOR_SHORTCUT},
+ then the LU's opcode's iallocator slot is filled with the cluster-wide
+ default iallocator.
@type iallocator_slot: string
@param iallocator_slot: the name of the opcode iallocator slot
"""
node = getattr(lu.op, node_slot, None)
- iallocator = getattr(lu.op, iallocator_slot, None)
+ ialloc = getattr(lu.op, iallocator_slot, None)
+ if node == []:
+ node = None
- if node is not None and iallocator is not None:
+ if node is not None and ialloc is not None:
raise errors.OpPrereqError("Do not specify both, iallocator and node",
errors.ECODE_INVAL)
- elif node is None and iallocator is None:
+ elif ((node is None and ialloc is None) or
+ ialloc == constants.DEFAULT_IALLOCATOR_SHORTCUT):
default_iallocator = lu.cfg.GetDefaultIAllocator()
if default_iallocator:
setattr(lu.op, iallocator_slot, default_iallocator)
" cluster-wide default iallocator found;"
" please specify either an iallocator or a"
" node, or set a cluster-wide default"
- " iallocator")
+ " iallocator", errors.ECODE_INVAL)
-def _GetDefaultIAllocator(cfg, iallocator):
+def _GetDefaultIAllocator(cfg, ialloc):
"""Decides on which iallocator to use.
@type cfg: L{config.ConfigWriter}
@param cfg: Cluster configuration object
- @type iallocator: string or None
- @param iallocator: Iallocator specified in opcode
+ @type ialloc: string or None
+ @param ialloc: Iallocator specified in opcode
@rtype: string
@return: Iallocator name
"""
- if not iallocator:
+ if not ialloc:
# Use default iallocator
- iallocator = cfg.GetDefaultIAllocator()
+ ialloc = cfg.GetDefaultIAllocator()
- if not iallocator:
+ if not ialloc:
raise errors.OpPrereqError("No iallocator was specified, neither in the"
" opcode nor as a cluster-wide default",
errors.ECODE_INVAL)
- return iallocator
+ return ialloc
def _CheckHostnameSane(lu, name):
# Verify global configuration
jobs.append([
- opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors)
+ opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
])
# Always depend on global verification
depends_fn = lambda: [(-len(jobs), [])]
- jobs.extend([opcodes.OpClusterVerifyGroup(group_name=group,
- ignore_errors=self.op.ignore_errors,
- depends=depends_fn())]
- for group in groups)
+ jobs.extend(
+ [opcodes.OpClusterVerifyGroup(group_name=group,
+ ignore_errors=self.op.ignore_errors,
+ depends=depends_fn())]
+ for group in groups)
# Fix up all parameters
for op in itertools.chain(*jobs): # pylint: disable=W0142
feedback_fn("* Verifying cluster certificate files")
- for cert_filename in constants.ALL_CERT_FILES:
+ for cert_filename in pathutils.ALL_CERT_FILES:
(errcode, msg) = _VerifyCertificate(cert_filename)
self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
locking.LEVEL_INSTANCE: inst_names,
locking.LEVEL_NODEGROUP: [self.group_uuid],
locking.LEVEL_NODE: [],
+
+ # This opcode is run by watcher every five minutes and acquires all nodes
+ # for a group. It doesn't run for a long time, so it's better to acquire
+ # the node allocation lock as well.
+ locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
}
self.share_locks = _ShareAll()
node_vol_should = {}
instanceconfig.MapLVsByNode(node_vol_should)
- ipolicy = _CalculateGroupIPolicy(self.cfg.GetClusterInfo(), self.group_info)
+ cluster = self.cfg.GetClusterInfo()
+ ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
+ self.group_info)
err = _ComputeIPolicyInstanceViolation(ipolicy, instanceconfig)
- _ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance, utils.CommaJoin(err))
+ _ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance, utils.CommaJoin(err),
+ code=self.ETYPE_WARNING)
for node in node_vol_should:
n_img = node_image[node]
if nresult.fail_msg or not nresult.payload:
node_files = None
else:
- node_files = nresult.payload.get(constants.NV_FILELIST, None)
+ fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
+ node_files = dict((vcluster.LocalizeVirtualPath(key), value)
+ for (key, value) in fingerprints.items())
+ del fingerprints
test = not (node_files and isinstance(node_files, dict))
errorif(test, constants.CV_ENODEFILECHECK, node.name,
if drbd_helper:
helper_result = nresult.get(constants.NV_DRBDHELPER, None)
- test = (helper_result == None)
+ test = (helper_result is None)
_ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
"no drbd usermode helper returned")
if helper_result:
"OSes present on reference node %s but missing on this node: %s",
base.name, utils.CommaJoin(missing))
+ def _VerifyFileStoragePaths(self, ninfo, nresult, is_master):
+ """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
+
+ @type ninfo: L{objects.Node}
+ @param ninfo: the node to check
+ @param nresult: the remote results for the node
+ @type is_master: bool
+ @param is_master: Whether node is the master node
+
+ """
+ node = ninfo.name
+
+ if (is_master and
+ (constants.ENABLE_FILE_STORAGE or
+ constants.ENABLE_SHARED_FILE_STORAGE)):
+ try:
+ fspaths = nresult[constants.NV_FILE_STORAGE_PATHS]
+ except KeyError:
+ # This should never happen
+ self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, node,
+ "Node did not return forbidden file storage paths")
+ else:
+ self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, node,
+ "Found forbidden file storage paths: %s",
+ utils.CommaJoin(fspaths))
+ else:
+ self._ErrorIf(constants.NV_FILE_STORAGE_PATHS in nresult,
+ constants.CV_ENODEFILESTORAGEPATHS, node,
+ "Node should not have returned forbidden file storage"
+ " paths")
+
def _VerifyOob(self, ninfo, nresult):
"""Verifies out of band functionality of a node.
"""
env = {
- "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
+ "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
}
env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
user_scripts = []
if self.cfg.GetUseExternalMipScript():
- user_scripts.append(constants.EXTERNAL_MASTER_SETUP_SCRIPT)
+ user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
node_verify_param = {
constants.NV_FILELIST:
- utils.UniqueSequence(filename
- for files in filemap
- for filename in files),
+ map(vcluster.MakeVirtualPath,
+ utils.UniqueSequence(filename
+ for files in filemap
+ for filename in files)),
constants.NV_NODELIST:
self._SelectSshCheckNodes(node_data_list, self.group_uuid,
self.all_node_info.values()),
node_verify_param[constants.NV_DRBDLIST] = None
node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
+ if constants.ENABLE_FILE_STORAGE or constants.ENABLE_SHARED_FILE_STORAGE:
+ # Load file storage paths only from master node
+ node_verify_param[constants.NV_FILE_STORAGE_PATHS] = master_node
+
# bridge checks
# FIXME: this needs to be changed per node-group, not cluster-wide
bridges = set()
self._VerifyNodeNetwork(node_i, nresult)
self._VerifyNodeUserScripts(node_i, nresult)
self._VerifyOob(node_i, nresult)
+ self._VerifyFileStoragePaths(node_i, nresult,
+ node == master_node)
if nimg.vm_capable:
self._VerifyNodeLVM(node_i, nresult, vg_name)
locking.LEVEL_INSTANCE: [],
locking.LEVEL_NODEGROUP: [],
locking.LEVEL_NODE: [],
+
+ # This opcode is acquires all node locks in a group. LUClusterVerifyDisks
+ # starts one instance of this opcode for every group, which means all
+ # nodes will be locked for a short amount of time, so it's better to
+ # acquire the node allocation lock as well.
+ locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
}
def DeclareLocks(self, level):
res_instances = set()
res_missing = {}
- nv_dict = _MapInstanceDisksToNodes([inst
- for inst in self.instances.values()
- if inst.admin_state == constants.ADMINST_UP])
+ nv_dict = _MapInstanceDisksToNodes(
+ [inst for inst in self.instances.values()
+ if inst.admin_state == constants.ADMINST_UP])
if nv_dict:
nodes = utils.NiceSort(set(self.owned_locks(locking.LEVEL_NODE)) &
def ExpandNames(self):
if self.op.instances:
self.wanted_names = _GetWantedInstances(self, self.op.instances)
+ # Not getting the node allocation lock as only a specific set of
+ # instances (and their nodes) is going to be acquired
self.needed_locks = {
locking.LEVEL_NODE_RES: [],
locking.LEVEL_INSTANCE: self.wanted_names,
self.needed_locks = {
locking.LEVEL_NODE_RES: locking.ALL_SET,
locking.LEVEL_INSTANCE: locking.ALL_SET,
+
+ # This opcode is acquires the node locks for all instances
+ locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
}
+
self.share_locks = {
locking.LEVEL_NODE_RES: 1,
locking.LEVEL_INSTANCE: 0,
+ locking.LEVEL_NODE_ALLOC: 1,
}
def DeclareLocks(self, level):
self.cfg.Update(cluster, feedback_fn)
# update the known hosts file
- ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
+ ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
node_list = self.cfg.GetOnlineNodeList()
try:
node_list.remove(master_params.name)
except ValueError:
pass
- _UploadHelper(self, node_list, constants.SSH_KNOWN_HOSTS_FILE)
+ _UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
finally:
master_params.ip = new_ip
result = self.rpc.call_node_activate_master_ip(master_params.name,
ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
except errors.ProgrammerError:
raise errors.OpPrereqError("Invalid primary ip family: %s." %
- ip_family)
+ ip_family, errors.ECODE_INVAL)
if not ipcls.ValidateNetmask(netmask):
raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
- (netmask))
+ (netmask), errors.ECODE_INVAL)
class LUClusterSetParams(LogicalUnit):
def ExpandNames(self):
# FIXME: in the future maybe other cluster params won't require checking on
# all nodes to be modified.
+ # FIXME: This opcode changes cluster-wide settings. Is acquiring all
+ # resource locks the right thing, shouldn't it be the BGL instead?
self.needed_locks = {
locking.LEVEL_NODE: locking.ALL_SET,
locking.LEVEL_INSTANCE: locking.ALL_SET,
locking.LEVEL_NODEGROUP: locking.ALL_SET,
+ locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
}
- self.share_locks = {
- locking.LEVEL_NODE: 1,
- locking.LEVEL_INSTANCE: 1,
- locking.LEVEL_NODEGROUP: 1,
- }
+ self.share_locks = _ShareAll()
def BuildHooksEnv(self):
"""Build hooks env.
if compat.any(node in group.members
for node in inst.all_nodes)])
new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
- new = _ComputeNewInstanceViolations(_CalculateGroupIPolicy(cluster,
- group),
+ ipol = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, group)
+ new = _ComputeNewInstanceViolations(ipol,
new_ipolicy, instances)
if new:
violations.update(new)
" address" % (instance.name, nic_idx))
if nic_errors:
raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
- "\n".join(nic_errors))
+ "\n".join(nic_errors), errors.ECODE_INVAL)
# hypervisor list/parameters
self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
if msg:
msg = ("Copy of file %s to node %s failed: %s" %
(fname, to_node, msg))
- lu.proc.LogWarning(msg)
+ lu.LogWarning(msg)
def _ComputeAncillaryFiles(cluster, redist):
"""
# Compute files for all nodes
files_all = set([
- constants.SSH_KNOWN_HOSTS_FILE,
- constants.CONFD_HMAC_KEY,
- constants.CLUSTER_DOMAIN_SECRET_FILE,
- constants.SPICE_CERT_FILE,
- constants.SPICE_CACERT_FILE,
- constants.RAPI_USERS_FILE,
+ pathutils.SSH_KNOWN_HOSTS_FILE,
+ pathutils.CONFD_HMAC_KEY,
+ pathutils.CLUSTER_DOMAIN_SECRET_FILE,
+ pathutils.SPICE_CERT_FILE,
+ pathutils.SPICE_CACERT_FILE,
+ pathutils.RAPI_USERS_FILE,
])
- if not redist:
- files_all.update(constants.ALL_CERT_FILES)
- files_all.update(ssconf.SimpleStore().GetFileList())
- else:
+ if redist:
# we need to ship at least the RAPI certificate
- files_all.add(constants.RAPI_CERT_FILE)
+ files_all.add(pathutils.RAPI_CERT_FILE)
+ else:
+ files_all.update(pathutils.ALL_CERT_FILES)
+ files_all.update(ssconf.SimpleStore().GetFileList())
if cluster.modify_etc_hosts:
- files_all.add(constants.ETC_HOSTS)
+ files_all.add(pathutils.ETC_HOSTS)
if cluster.use_external_mip_script:
- files_all.add(constants.EXTERNAL_MASTER_SETUP_SCRIPT)
+ files_all.add(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
# Files which are optional, these must:
# - be present in one other category as well
# - either exist or not exist on all nodes of that category (mc, vm all)
files_opt = set([
- constants.RAPI_USERS_FILE,
+ pathutils.RAPI_USERS_FILE,
])
# Files which should only be on master candidates
files_mc = set()
if not redist:
- files_mc.add(constants.CLUSTER_CONF_FILE)
+ files_mc.add(pathutils.CLUSTER_CONF_FILE)
+
+ # File storage
+ if (not redist and
+ (constants.ENABLE_FILE_STORAGE or constants.ENABLE_SHARED_FILE_STORAGE)):
+ files_all.add(pathutils.FILE_STORAGE_PATHS_FILE)
+ files_opt.add(pathutils.FILE_STORAGE_PATHS_FILE)
# Files which should only be on VM-capable nodes
- files_vm = set(filename
+ files_vm = set(
+ filename
for hv_name in cluster.enabled_hypervisors
for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles()[0])
- files_opt |= set(filename
+ files_opt |= set(
+ filename
for hv_name in cluster.enabled_hypervisors
for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles()[1])
assert all_files_set.issuperset(files_opt), \
"Optional file not in a different required list"
+ # This one file should never ever be re-distributed via RPC
+ assert not (redist and
+ pathutils.FILE_STORAGE_PATHS_FILE in all_files_set)
+
return (files_all, files_opt, files_mc, files_vm)
_ComputeAncillaryFiles(cluster, True)
# Never re-distribute configuration file from here
- assert not (constants.CLUSTER_CONF_FILE in files_all or
- constants.CLUSTER_CONF_FILE in files_vm)
+ assert not (pathutils.CLUSTER_CONF_FILE in files_all or
+ pathutils.CLUSTER_CONF_FILE in files_vm)
assert not files_mc, "Master candidates not handled in this function"
filemap = [
def ExpandNames(self):
self.needed_locks = {
locking.LEVEL_NODE: locking.ALL_SET,
+ locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
}
- self.share_locks[locking.LEVEL_NODE] = 1
+ self.share_locks = _ShareAll()
def Exec(self, feedback_fn):
"""Redistribute the configuration.
disks = _ExpandCheckDisks(instance, disks)
if not oneshot:
- lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
+ lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
node = instance.primary_node
max_time = mstat.estimated_time
else:
rem_time = "no time estimate"
- lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
- (disks[i].iv_name, mstat.sync_percent, rem_time))
+ lu.LogInfo("- device %s: %5.2f%% done, %s",
+ disks[i].iv_name, mstat.sync_percent, rem_time)
# if we're done but degraded, let's do a few small retries, to
# make sure we see a stable and not transient situation; therefore
time.sleep(min(60, max_time))
if done:
- lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
+ lu.LogInfo("Instance %s's disks are in sync", instance.name)
+
return not cumul_degraded
locking.LEVEL_NODE: lock_names,
}
+ if not self.op.node_names:
+ # Acquire node allocation lock only if all nodes are affected
+ self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
+ self.share_locks[locking.LEVEL_NODE_ALLOC] = 1
+
def CheckPrereq(self):
"""Check prerequisites.
type(result.payload))
if self.op.command in [
- constants.OOB_POWER_ON,
- constants.OOB_POWER_OFF,
- constants.OOB_POWER_CYCLE,
- ]:
+ constants.OOB_POWER_ON,
+ constants.OOB_POWER_OFF,
+ constants.OOB_POWER_CYCLE,
+ ]:
if result.payload is not None:
errs.append("%s is expected to not return payload but got '%s'" %
(self.op.command, result.payload))
if self.do_locking:
# If any non-static field is requested we need to lock the nodes
lu.needed_locks[locking.LEVEL_NODE] = self.wanted
+ lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
def DeclareLocks(self, lu, level):
pass
node_data = lu.rpc.call_node_info(toquery_nodes, [lu.cfg.GetVGName()],
[lu.cfg.GetHypervisorType()])
- live_data = dict((name, _MakeLegacyNodeInfo(nresult.payload))
+ live_data = dict((name, rpc.MakeLegacyNodeInfo(nresult.payload))
for (name, nresult) in node_data.items()
if not nresult.fail_msg and nresult.payload)
else:
def ExpandNames(self):
self.share_locks = _ShareAll()
- self.needed_locks = {}
- if not self.op.nodes:
- self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+ if self.op.nodes:
+ self.needed_locks = {
+ locking.LEVEL_NODE: _GetWantedNodes(self, self.op.nodes),
+ }
else:
- self.needed_locks[locking.LEVEL_NODE] = \
- _GetWantedNodes(self, self.op.nodes)
+ self.needed_locks = {
+ locking.LEVEL_NODE: locking.ALL_SET,
+ locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
+ }
def Exec(self, feedback_fn):
"""Computes the list of nodes and their attributes.
def ExpandNames(self):
self.share_locks = _ShareAll()
- self.needed_locks = {}
if self.op.nodes:
- self.needed_locks[locking.LEVEL_NODE] = \
- _GetWantedNodes(self, self.op.nodes)
+ self.needed_locks = {
+ locking.LEVEL_NODE: _GetWantedNodes(self, self.op.nodes),
+ }
else:
- self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+ self.needed_locks = {
+ locking.LEVEL_NODE: locking.ALL_SET,
+ locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
+ }
def Exec(self, feedback_fn):
"""Computes the list of nodes and their attributes.
live_data = {}
if query.IQ_DISKUSAGE in self.requested_data:
+ gmi = ganeti.masterd.instance
disk_usage = dict((inst.name,
- _ComputeDiskSize(inst.disk_template,
- [{constants.IDISK_SIZE: disk.size}
- for disk in inst.disks]))
+ gmi.ComputeDiskSize(inst.disk_template,
+ [{constants.IDISK_SIZE: disk.size}
+ for disk in inst.disks]))
for inst in instance_list)
else:
disk_usage = None
if not newbie_singlehomed:
# check reachability from my secondary ip to newbie's secondary ip
if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
- source=myself.secondary_ip):
+ source=myself.secondary_ip):
raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
" based ping to node daemon port",
errors.ECODE_ENVIRON)
errors.ECODE_INVAL)
# Boolean value that tells us whether we might be demoting from MC
- self.might_demote = (self.op.master_candidate == False or
- self.op.offline == True or
- self.op.drained == True or
- self.op.master_capable == False)
+ self.might_demote = (self.op.master_candidate is False or
+ self.op.offline is True or
+ self.op.drained is True or
+ self.op.master_capable is False)
if self.op.secondary_ip:
if not netutils.IP4Address.IsValid(self.op.secondary_ip):
def ExpandNames(self):
if self.lock_all:
- self.needed_locks = {locking.LEVEL_NODE: locking.ALL_SET}
+ self.needed_locks = {
+ locking.LEVEL_NODE: locking.ALL_SET,
+
+ # Block allocations when all nodes are locked
+ locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
+ }
else:
- self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
+ self.needed_locks = {
+ locking.LEVEL_NODE: self.op.node_name,
+ }
# Since modifying a node can have severe effects on currently running
# operations the resource lock is at least acquired in shared mode
self.needed_locks[locking.LEVEL_NODE_RES] = \
self.needed_locks[locking.LEVEL_NODE]
- # Get node resource and instance locks in shared mode; they are not used
- # for anything but read-only access
- self.share_locks[locking.LEVEL_NODE_RES] = 1
- self.share_locks[locking.LEVEL_INSTANCE] = 1
+ # Get all locks except nodes in shared mode; they are not used for anything
+ # but read-only access
+ self.share_locks = _ShareAll()
+ self.share_locks[locking.LEVEL_NODE] = 0
+ self.share_locks[locking.LEVEL_NODE_RES] = 0
+ self.share_locks[locking.LEVEL_NODE_ALLOC] = 0
if self.lock_instances:
self.needed_locks[locking.LEVEL_INSTANCE] = \
" it a master candidate" % node.name,
errors.ECODE_STATE)
- if self.op.vm_capable == False:
+ if self.op.vm_capable is False:
(ipri, isec) = self.cfg.GetNodeInstances(self.op.node_name)
if ipri or isec:
raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
# Check for ineffective changes
for attr in self._FLAGS:
- if (getattr(self.op, attr) == False and getattr(node, attr) == False):
+ if (getattr(self.op, attr) is False and getattr(node, attr) is False):
self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
setattr(self.op, attr, None)
# TODO: We might query the real power state if it supports OOB
if _SupportsOob(self.cfg, node):
if self.op.offline is False and not (node.powered or
- self.op.powered == True):
+ self.op.powered is True):
raise errors.OpPrereqError(("Node %s needs to be turned on before its"
" offline status can be reset") %
- self.op.node_name)
+ self.op.node_name, errors.ECODE_STATE)
elif self.op.powered is not None:
raise errors.OpPrereqError(("Unable to change powered state for node %s"
" as it does not support out-of-band"
- " handling") % self.op.node_name)
+ " handling") % self.op.node_name,
+ errors.ECODE_STATE)
# If we're being deofflined/drained, we'll MC ourself if needed
- if (self.op.drained == False or self.op.offline == False or
+ if (self.op.drained is False or self.op.offline is False or
(self.op.master_capable and not node.master_capable)):
if _DecideSelfPromotion(self):
self.op.master_candidate = True
self.LogInfo("Auto-promoting node to master candidate")
# If we're no longer master capable, we'll demote ourselves from MC
- if self.op.master_capable == False and node.master_candidate:
+ if self.op.master_capable is False and node.master_candidate:
self.LogInfo("Demoting from master candidate")
self.op.master_candidate = False
" without using re-add. Please make sure the node"
" is healthy!")
+ # When changing the secondary ip, verify if this is a single-homed to
+ # multi-homed transition or vice versa, and apply the relevant
+ # restrictions.
if self.op.secondary_ip:
# Ok even without locking, because this can't be changed by any LU
master = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
master_singlehomed = master.secondary_ip == master.primary_ip
- if master_singlehomed and self.op.secondary_ip:
- raise errors.OpPrereqError("Cannot change the secondary ip on a single"
- " homed cluster", errors.ECODE_INVAL)
+ if master_singlehomed and self.op.secondary_ip != node.primary_ip:
+ if self.op.force and node.name == master.name:
+ self.LogWarning("Transitioning from single-homed to multi-homed"
+ " cluster; all nodes will require a secondary IP"
+ " address")
+ else:
+ raise errors.OpPrereqError("Changing the secondary ip on a"
+ " single-homed cluster requires the"
+ " --force option to be passed, and the"
+ " target node to be the master",
+ errors.ECODE_INVAL)
+ elif not master_singlehomed and self.op.secondary_ip == node.primary_ip:
+ if self.op.force and node.name == master.name:
+ self.LogWarning("Transitioning from multi-homed to single-homed"
+ " cluster; secondary IP addresses will have to be"
+ " removed")
+ else:
+ raise errors.OpPrereqError("Cannot set the secondary IP to be the"
+ " same as the primary IP on a multi-homed"
+ " cluster, unless the --force option is"
+ " passed, and the target node is the"
+ " master", errors.ECODE_INVAL)
assert not (frozenset(affected_instances) -
self.owned_locks(locking.LEVEL_INSTANCE))
if node.offline:
if affected_instances:
- raise errors.OpPrereqError("Cannot change secondary IP address:"
- " offline node has instances (%s)"
- " configured to use it" %
- utils.CommaJoin(affected_instances.keys()))
+ msg = ("Cannot change secondary IP address: offline node has"
+ " instances (%s) configured to use it" %
+ utils.CommaJoin(affected_instances.keys()))
+ raise errors.OpPrereqError(msg, errors.ECODE_STATE)
else:
# On online nodes, check that no instances are running, and that
# the node has the new ip and we can reach it.
cluster = NotImplemented
if query.CQ_QUEUE_DRAINED in self.requested_data:
- drain_flag = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
+ drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
else:
drain_flag = NotImplemented
if query.CQ_WATCHER_PAUSE in self.requested_data:
- watcher_pause = utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE)
+ watcher_pause = utils.ReadWatcherPauseFile(pathutils.WATCHER_PAUSEFILE)
else:
watcher_pause = NotImplemented
if not disks_ok:
raise errors.OpExecError("Cannot activate block devices")
+ if self.op.wait_for_sync:
+ if not _WaitForSync(self, self.instance):
+ raise errors.OpExecError("Some disks of the instance are degraded!")
+
return disks_info
if msg:
is_offline_secondary = (node in instance.secondary_nodes and
result.offline)
- lu.proc.LogWarning("Could not prepare block device %s on node %s"
- " (is_primary=False, pass=1): %s",
- inst_disk.iv_name, node, msg)
+ lu.LogWarning("Could not prepare block device %s on node %s"
+ " (is_primary=False, pass=1): %s",
+ inst_disk.iv_name, node, msg)
if not (ignore_secondaries or is_offline_secondary):
disks_ok = False
True, idx)
msg = result.fail_msg
if msg:
- lu.proc.LogWarning("Could not prepare block device %s on node %s"
- " (is_primary=True, pass=2): %s",
- inst_disk.iv_name, node, msg)
+ lu.LogWarning("Could not prepare block device %s on node %s"
+ " (is_primary=True, pass=2): %s",
+ inst_disk.iv_name, node, msg)
disks_ok = False
else:
dev_path = result.payload
if not disks_ok:
_ShutdownInstanceDisks(lu, instance)
if force is not None and not force:
- lu.proc.LogWarning("", hint="If the message above refers to a"
- " secondary node,"
- " you can retry the operation using '--force'.")
+ lu.LogWarning("",
+ hint=("If the message above refers to a secondary node,"
+ " you can retry the operation using '--force'"))
raise errors.OpExecError("Disk consistency error")
self.primary_offline = self.cfg.GetNodeInfo(instance.primary_node).offline
if self.primary_offline and self.op.ignore_offline_nodes:
- self.proc.LogWarning("Ignoring offline primary node")
+ self.LogWarning("Ignoring offline primary node")
if self.op.hvparams or self.op.beparams:
- self.proc.LogWarning("Overridden parameters are ignored")
+ self.LogWarning("Overridden parameters are ignored")
else:
_CheckNodeOnline(self, instance.primary_node)
if self.primary_offline:
assert self.op.ignore_offline_nodes
- self.proc.LogInfo("Primary node offline, marked instance as started")
+ self.LogInfo("Primary node offline, marked instance as started")
else:
node_current = instance.primary_node
self.cfg.GetNodeInfo(self.instance.primary_node).offline
if self.primary_offline and self.op.ignore_offline_nodes:
- self.proc.LogWarning("Ignoring offline primary node")
+ self.LogWarning("Ignoring offline primary node")
else:
_CheckNodeOnline(self, self.instance.primary_node)
if self.primary_offline:
assert self.op.ignore_offline_nodes
- self.proc.LogInfo("Primary node offline, marked instance as stopped")
+ self.LogInfo("Primary node offline, marked instance as stopped")
else:
result = self.rpc.call_instance_shutdown(node_current, instance, timeout)
msg = result.fail_msg
if msg:
- self.proc.LogWarning("Could not shutdown instance: %s" % msg)
+ self.LogWarning("Could not shutdown instance: %s", msg)
_ShutdownInstanceDisks(self, instance)
constants.IDISK_METAVG,
]))
+ def _RunAllocator(self):
+ """Run the allocator based on input opcode.
+
+ """
+ be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
+
+ # FIXME
+ # The allocator should actually run in "relocate" mode, but current
+ # allocators don't support relocating all the nodes of an instance at
+ # the same time. As a workaround we use "allocate" mode, but this is
+ # suboptimal for two reasons:
+ # - The instance name passed to the allocator is present in the list of
+ # existing instances, so there could be a conflict within the
+ # internal structures of the allocator. This doesn't happen with the
+ # current allocators, but it's a liability.
+ # - The allocator counts the resources used by the instance twice: once
+ # because the instance exists already, and once because it tries to
+ # allocate a new instance.
+ # The allocator could choose some of the nodes on which the instance is
+ # running, but that's not a problem. If the instance nodes are broken,
+ # they should be already be marked as drained or offline, and hence
+ # skipped by the allocator. If instance disks have been lost for other
+ # reasons, then recreating the disks on the same nodes should be fine.
+ disk_template = self.instance.disk_template
+ spindle_use = be_full[constants.BE_SPINDLE_USE]
+ req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
+ disk_template=disk_template,
+ tags=list(self.instance.GetTags()),
+ os=self.instance.os,
+ nics=[{}],
+ vcpus=be_full[constants.BE_VCPUS],
+ memory=be_full[constants.BE_MAXMEM],
+ spindle_use=spindle_use,
+ disks=[{constants.IDISK_SIZE: d.size,
+ constants.IDISK_MODE: d.mode}
+ for d in self.instance.disks],
+ hypervisor=self.instance.hypervisor)
+ ial = iallocator.IAllocator(self.cfg, self.rpc, req)
+
+ ial.Run(self.op.iallocator)
+
+ assert req.RequiredNodes() == len(self.instance.all_nodes)
+
+ if not ial.success:
+ raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
+ " %s" % (self.op.iallocator, ial.info),
+ errors.ECODE_NORES)
+
+ self.op.nodes = ial.result
+ self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
+ self.op.instance_name, self.op.iallocator,
+ utils.CommaJoin(ial.result))
+
def CheckArguments(self):
- if self.op.disks and ht.TPositiveInt(self.op.disks[0]):
+ if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
# Normalize and convert deprecated list of disk indices
self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
" once: %s" % utils.CommaJoin(duplicates),
errors.ECODE_INVAL)
+ # We don't want _CheckIAllocatorOrNode selecting the default iallocator
+ # when neither iallocator nor nodes are specified
+ if self.op.iallocator or self.op.nodes:
+ _CheckIAllocatorOrNode(self, "iallocator", "nodes")
+
for (idx, params) in self.op.disks:
utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
unsupported = frozenset(params.keys()) - self._MODIFYABLE
def ExpandNames(self):
self._ExpandAndLockInstance()
self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
+
if self.op.nodes:
self.op.nodes = [_ExpandNodeName(self.cfg, n) for n in self.op.nodes]
self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes)
else:
self.needed_locks[locking.LEVEL_NODE] = []
+ if self.op.iallocator:
+ # iallocator will select a new node in the same group
+ self.needed_locks[locking.LEVEL_NODEGROUP] = []
+ self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
+
self.needed_locks[locking.LEVEL_NODE_RES] = []
def DeclareLocks(self, level):
- if level == locking.LEVEL_NODE:
- # if we replace the nodes, we only need to lock the old primary,
- # otherwise we need to lock all nodes for disk re-creation
- primary_only = bool(self.op.nodes)
- self._LockInstancesNodes(primary_only=primary_only)
+ if level == locking.LEVEL_NODEGROUP:
+ assert self.op.iallocator is not None
+ assert not self.op.nodes
+ assert not self.needed_locks[locking.LEVEL_NODEGROUP]
+ self.share_locks[locking.LEVEL_NODEGROUP] = 1
+ # Lock the primary group used by the instance optimistically; this
+ # requires going via the node before it's locked, requiring
+ # verification later on
+ self.needed_locks[locking.LEVEL_NODEGROUP] = \
+ self.cfg.GetInstanceNodeGroups(self.op.instance_name, primary_only=True)
+
+ elif level == locking.LEVEL_NODE:
+ # If an allocator is used, then we lock all the nodes in the current
+ # instance group, as we don't know yet which ones will be selected;
+ # if we replace the nodes without using an allocator, locks are
+ # already declared in ExpandNames; otherwise, we need to lock all the
+ # instance nodes for disk re-creation
+ if self.op.iallocator:
+ assert not self.op.nodes
+ assert not self.needed_locks[locking.LEVEL_NODE]
+ assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
+
+ # Lock member nodes of the group of the primary node
+ for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
+ self.needed_locks[locking.LEVEL_NODE].extend(
+ self.cfg.GetNodeGroup(group_uuid).members)
+
+ assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
+ elif not self.op.nodes:
+ self._LockInstancesNodes(primary_only=False)
elif level == locking.LEVEL_NODE_RES:
# Copy node locks
self.needed_locks[locking.LEVEL_NODE_RES] = \
primary_node = self.op.nodes[0]
else:
primary_node = instance.primary_node
- _CheckNodeOnline(self, primary_node)
+ if not self.op.iallocator:
+ _CheckNodeOnline(self, primary_node)
if instance.disk_template == constants.DT_DISKLESS:
raise errors.OpPrereqError("Instance '%s' has no disks" %
self.op.instance_name, errors.ECODE_INVAL)
+ # Verify if node group locks are still correct
+ owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
+ if owned_groups:
+ # Node group locks are acquired only for the primary node (and only
+ # when the allocator is used)
+ _CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups,
+ primary_only=True)
+
# if we replace nodes *and* the old primary is offline, we don't
- # check
- assert instance.primary_node in self.owned_locks(locking.LEVEL_NODE)
- assert instance.primary_node in self.owned_locks(locking.LEVEL_NODE_RES)
+ # check the instance state
old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
- if not (self.op.nodes and old_pnode.offline):
+ if not ((self.op.iallocator or self.op.nodes) and old_pnode.offline):
_CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
msg="cannot recreate disks")
raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
errors.ECODE_INVAL)
- if (self.op.nodes and
+ if ((self.op.nodes or self.op.iallocator) and
sorted(self.disks.keys()) != range(len(instance.disks))):
raise errors.OpPrereqError("Can't recreate disks partially and"
" change the nodes at the same time",
self.instance = instance
+ if self.op.iallocator:
+ self._RunAllocator()
+ # Release unneeded node and node resource locks
+ _ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.nodes)
+ _ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.nodes)
+ _ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
+
+ assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
+
def Exec(self, feedback_fn):
"""Recreate the disks.
if self.op.nodes:
self.cfg.Update(instance, feedback_fn)
+ # All touched nodes must be locked
+ mylocks = self.owned_locks(locking.LEVEL_NODE)
+ assert mylocks.issuperset(frozenset(instance.all_nodes))
_CreateDisks(self, instance, to_skip=to_skip)
# Change the instance lock. This is definitely safe while we hold the BGL.
# Otherwise the new lock would have to be added in acquired mode.
assert self.REQ_BGL
+ assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER)
self.glm.remove(locking.LEVEL_INSTANCE, old_name)
self.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
new_file_storage_dir))
_StartInstanceDisks(self, inst, None)
+ # update info on disks
+ info = _GetInstanceInfoText(inst)
+ for (idx, disk) in enumerate(inst.disks):
+ for node in inst.all_nodes:
+ self.cfg.SetDiskID(disk, node)
+ result = self.rpc.call_blockdev_setinfo(node, disk, info)
+ if result.fail_msg:
+ self.LogWarning("Error setting info on node %s for disk %s: %s",
+ node, idx, result.fail_msg)
try:
result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
old_name, self.op.debug_level)
msg = ("Could not run OS rename script for instance %s on node %s"
" (but the instance has been renamed in Ganeti): %s" %
(inst.name, inst.primary_node, msg))
- self.proc.LogWarning(msg)
+ self.LogWarning(msg)
finally:
_ShutdownInstanceDisks(self, inst)
return self.iq.OldStyleQuery(self)
+def _ExpandNamesForMigration(lu):
+ """Expands names for use with L{TLMigrateInstance}.
+
+ @type lu: L{LogicalUnit}
+
+ """
+ if lu.op.target_node is not None:
+ lu.op.target_node = _ExpandNodeName(lu.cfg, lu.op.target_node)
+
+ lu.needed_locks[locking.LEVEL_NODE] = []
+ lu.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+
+ lu.needed_locks[locking.LEVEL_NODE_RES] = []
+ lu.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
+
+ # The node allocation lock is actually only needed for replicated instances
+ # (e.g. DRBD8) and if an iallocator is used.
+ lu.needed_locks[locking.LEVEL_NODE_ALLOC] = []
+
+
+def _DeclareLocksForMigration(lu, level):
+ """Declares locks for L{TLMigrateInstance}.
+
+ @type lu: L{LogicalUnit}
+ @param level: Lock level
+
+ """
+ if level == locking.LEVEL_NODE_ALLOC:
+ assert lu.op.instance_name in lu.owned_locks(locking.LEVEL_INSTANCE)
+
+ instance = lu.cfg.GetInstanceInfo(lu.op.instance_name)
+
+ if instance.disk_template in constants.DTS_EXT_MIRROR:
+ if lu.op.target_node is None:
+ lu.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+ lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
+ else:
+ lu.needed_locks[locking.LEVEL_NODE] = [instance.primary_node,
+ lu.op.target_node]
+ del lu.recalculate_locks[locking.LEVEL_NODE]
+ else:
+ lu._LockInstancesNodes() # pylint: disable=W0212
+
+ elif level == locking.LEVEL_NODE:
+ # Node locks are declared together with the node allocation lock
+ assert lu.needed_locks[locking.LEVEL_NODE]
+
+ elif level == locking.LEVEL_NODE_RES:
+ # Copy node locks
+ lu.needed_locks[locking.LEVEL_NODE_RES] = \
+ _CopyLockList(lu.needed_locks[locking.LEVEL_NODE])
+
+
class LUInstanceFailover(LogicalUnit):
"""Failover an instance.
def ExpandNames(self):
self._ExpandAndLockInstance()
+ _ExpandNamesForMigration(self)
- if self.op.target_node is not None:
- self.op.target_node = _ExpandNodeName(self.cfg, self.op.target_node)
-
- self.needed_locks[locking.LEVEL_NODE] = []
- self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
-
- self.needed_locks[locking.LEVEL_NODE_RES] = []
- self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
+ self._migrater = \
+ TLMigrateInstance(self, self.op.instance_name, False, True, False,
+ self.op.ignore_consistency, True,
+ self.op.shutdown_timeout, self.op.ignore_ipolicy)
- ignore_consistency = self.op.ignore_consistency
- shutdown_timeout = self.op.shutdown_timeout
- self._migrater = TLMigrateInstance(self, self.op.instance_name,
- cleanup=False,
- failover=True,
- ignore_consistency=ignore_consistency,
- shutdown_timeout=shutdown_timeout,
- ignore_ipolicy=self.op.ignore_ipolicy)
self.tasklets = [self._migrater]
def DeclareLocks(self, level):
- if level == locking.LEVEL_NODE:
- instance = self.context.cfg.GetInstanceInfo(self.op.instance_name)
- if instance.disk_template in constants.DTS_EXT_MIRROR:
- if self.op.target_node is None:
- self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
- else:
- self.needed_locks[locking.LEVEL_NODE] = [instance.primary_node,
- self.op.target_node]
- del self.recalculate_locks[locking.LEVEL_NODE]
- else:
- self._LockInstancesNodes()
- elif level == locking.LEVEL_NODE_RES:
- # Copy node locks
- self.needed_locks[locking.LEVEL_NODE_RES] = \
- _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
+ _DeclareLocksForMigration(self, level)
def BuildHooksEnv(self):
"""Build hooks env.
def ExpandNames(self):
self._ExpandAndLockInstance()
-
- if self.op.target_node is not None:
- self.op.target_node = _ExpandNodeName(self.cfg, self.op.target_node)
-
- self.needed_locks[locking.LEVEL_NODE] = []
- self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
-
- self.needed_locks[locking.LEVEL_NODE] = []
- self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+ _ExpandNamesForMigration(self)
self._migrater = \
- TLMigrateInstance(self, self.op.instance_name,
- cleanup=self.op.cleanup,
- failover=False,
- fallback=self.op.allow_failover,
- allow_runtime_changes=self.op.allow_runtime_changes,
- ignore_ipolicy=self.op.ignore_ipolicy)
+ TLMigrateInstance(self, self.op.instance_name, self.op.cleanup,
+ False, self.op.allow_failover, False,
+ self.op.allow_runtime_changes,
+ constants.DEFAULT_SHUTDOWN_TIMEOUT,
+ self.op.ignore_ipolicy)
+
self.tasklets = [self._migrater]
def DeclareLocks(self, level):
- if level == locking.LEVEL_NODE:
- instance = self.context.cfg.GetInstanceInfo(self.op.instance_name)
- if instance.disk_template in constants.DTS_EXT_MIRROR:
- if self.op.target_node is None:
- self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
- else:
- self.needed_locks[locking.LEVEL_NODE] = [instance.primary_node,
- self.op.target_node]
- del self.recalculate_locks[locking.LEVEL_NODE]
- else:
- self._LockInstancesNodes()
- elif level == locking.LEVEL_NODE_RES:
- # Copy node locks
- self.needed_locks[locking.LEVEL_NODE_RES] = \
- _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
+ _DeclareLocksForMigration(self, level)
def BuildHooksEnv(self):
"""Build hooks env.
_CheckNodeOnline(self, target_node)
_CheckNodeNotDrained(self, target_node)
_CheckNodeVmCapable(self, target_node)
- ipolicy = _CalculateGroupIPolicy(self.cfg.GetClusterInfo(),
- self.cfg.GetNodeGroup(node.group))
+ cluster = self.cfg.GetClusterInfo()
+ group_info = self.cfg.GetNodeGroup(node.group)
+ ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, group_info)
_CheckTargetNodeIPolicy(self, ipolicy, instance, node,
ignore=self.op.ignore_ipolicy)
msg = result.fail_msg
if msg:
if self.op.ignore_consistency:
- self.proc.LogWarning("Could not shutdown instance %s on node %s."
- " Proceeding anyway. Please make sure node"
- " %s is down. Error details: %s",
- instance.name, source_node, source_node, msg)
+ self.LogWarning("Could not shutdown instance %s on node %s."
+ " Proceeding anyway. Please make sure node"
+ " %s is down. Error details: %s",
+ instance.name, source_node, source_node, msg)
else:
raise errors.OpExecError("Could not shutdown instance %s on"
" node %s: %s" %
target_node=self.op.target_node,
allow_runtime_changes=allow_runtime_changes,
ignore_ipolicy=self.op.ignore_ipolicy)]
- for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name)
- ]
+ for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name)]
# TODO: Run iallocator in this opcode and pass correct placement options to
# OpInstanceMigrate. Since other jobs can modify the cluster between
_MIGRATION_POLL_INTERVAL = 1 # seconds
_MIGRATION_FEEDBACK_INTERVAL = 10 # seconds
- def __init__(self, lu, instance_name, cleanup=False,
- failover=False, fallback=False,
- ignore_consistency=False,
- allow_runtime_changes=True,
- shutdown_timeout=constants.DEFAULT_SHUTDOWN_TIMEOUT,
- ignore_ipolicy=False):
+ def __init__(self, lu, instance_name, cleanup, failover, fallback,
+ ignore_consistency, allow_runtime_changes, shutdown_timeout,
+ ignore_ipolicy):
"""Initializes this class.
"""
errors.ECODE_STATE)
if instance.disk_template in constants.DTS_EXT_MIRROR:
+ assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC)
+
_CheckIAllocatorOrNode(self.lu, "iallocator", "target_node")
if self.lu.op.iallocator:
# Check that the target node is correct in terms of instance policy
nodeinfo = self.cfg.GetNodeInfo(self.target_node)
group_info = self.cfg.GetNodeGroup(nodeinfo.group)
- ipolicy = _CalculateGroupIPolicy(cluster, group_info)
+ ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
+ group_info)
_CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo,
ignore=self.ignore_ipolicy)
if self.target_node == instance.primary_node:
raise errors.OpPrereqError("Cannot migrate instance %s"
" to its primary (%s)" %
- (instance.name, instance.primary_node))
+ (instance.name, instance.primary_node),
+ errors.ECODE_STATE)
if len(self.lu.tasklets) == 1:
# It is safe to release locks only when we're the only tasklet
# in the LU
_ReleaseLocks(self.lu, locking.LEVEL_NODE,
keep=[instance.primary_node, self.target_node])
+ _ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
else:
+ assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
+
secondary_nodes = instance.secondary_nodes
if not secondary_nodes:
raise errors.ConfigurationError("No secondary node but using"
errors.ECODE_INVAL)
nodeinfo = self.cfg.GetNodeInfo(target_node)
group_info = self.cfg.GetNodeGroup(nodeinfo.group)
- ipolicy = _CalculateGroupIPolicy(cluster, group_info)
+ ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
+ group_info)
_CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo,
ignore=self.ignore_ipolicy)
"""Run the allocator based on input opcode.
"""
+ assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC)
+
# FIXME: add a self.ignore_ipolicy option
- ial = IAllocator(self.cfg, self.rpc,
- mode=constants.IALLOCATOR_MODE_RELOC,
- name=self.instance_name,
- relocate_from=[self.instance.primary_node],
- )
+ req = iallocator.IAReqRelocate(name=self.instance_name,
+ relocate_from=[self.instance.primary_node])
+ ial = iallocator.IAllocator(self.cfg, self.rpc, req)
ial.Run(self.lu.op.iallocator)
" iallocator '%s': %s" %
(self.lu.op.iallocator, ial.info),
errors.ECODE_NORES)
- if len(ial.result) != ial.required_nodes:
- raise errors.OpPrereqError("iallocator '%s' returned invalid number"
- " of nodes (%s), required %s" %
- (self.lu.op.iallocator, len(ial.result),
- ial.required_nodes), errors.ECODE_FAULT)
self.target_node = ial.result[0]
self.lu.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
- self.instance_name, self.lu.op.iallocator,
- utils.CommaJoin(ial.result))
+ self.instance_name, self.lu.op.iallocator,
+ utils.CommaJoin(ial.result))
def _WaitUntilSync(self):
"""Poll with custom rpc for disk sync.
# Don't raise an exception here, as we stil have to try to revert the
# disk status, even if this step failed.
- abort_result = self.rpc.call_instance_finalize_migration_src(source_node,
- instance, False, self.live)
+ abort_result = self.rpc.call_instance_finalize_migration_src(
+ source_node, instance, False, self.live)
abort_msg = abort_result.fail_msg
if abort_msg:
logging.error("Aborting migration failed on source node %s: %s",
self.feedback_fn("Migration failed, aborting")
self._AbortMigration()
self._RevertDiskStatus()
+ if not msg:
+ msg = "hypervisor returned failure"
raise errors.OpExecError("Could not migrate instance %s: %s" %
(instance.name, msg))
dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
logical_id=(vgnames[0], names[0]),
params={})
- dev_meta = objects.Disk(dev_type=constants.LD_LV, size=DRBD_META_SIZE,
+ dev_meta = objects.Disk(dev_type=constants.LD_LV,
+ size=constants.DRBD_META_SIZE,
logical_id=(vgnames[1], names[1]),
params={})
drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
}
-def _GenerateDiskTemplate(lu, template_name, instance_name, primary_node,
- secondary_nodes, disk_info, file_storage_dir, file_driver, base_index,
- feedback_fn, full_disk_params, _req_file_storage=opcodes.RequireFileStorage,
- _req_shr_file_storage=opcodes.RequireSharedFileStorage):
+def _GenerateDiskTemplate(
+ lu, template_name, instance_name, primary_node, secondary_nodes,
+ disk_info, file_storage_dir, file_driver, base_index,
+ feedback_fn, full_disk_params, _req_file_storage=opcodes.RequireFileStorage,
+ _req_shr_file_storage=opcodes.RequireSharedFileStorage):
"""Generate the entire disk layout for a given template type.
"""
for i in range(disk_count)])
if template_name == constants.DT_PLAIN:
+
def logical_id_fn(idx, _, disk):
vg = disk.get(constants.IDISK_VG, vgname)
return (vg, names[idx])
+
elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE):
logical_id_fn = \
lambda _, disk_index, disk: (file_driver,
return (total_size - written) * avg_time
-def _WipeDisks(lu, instance):
+def _WipeDisks(lu, instance, disks=None):
"""Wipes instance disks.
@type lu: L{LogicalUnit}
"""
node = instance.primary_node
- for device in instance.disks:
+ if disks is None:
+ disks = [(idx, disk, 0)
+ for (idx, disk) in enumerate(instance.disks)]
+
+ for (_, device, _) in disks:
lu.cfg.SetDiskID(device, node)
- logging.info("Pause sync of instance %s disks", instance.name)
+ logging.info("Pausing synchronization of disks of instance '%s'",
+ instance.name)
result = lu.rpc.call_blockdev_pause_resume_sync(node,
- (instance.disks, instance),
+ (map(compat.snd, disks),
+ instance),
True)
- result.Raise("Failed RPC to node %s for pausing the disk syncing" % node)
+ result.Raise("Failed to pause disk synchronization on node '%s'" % node)
for idx, success in enumerate(result.payload):
if not success:
- logging.warn("pause-sync of instance %s for disks %d failed",
- instance.name, idx)
+ logging.warn("Pausing synchronization of disk %s of instance '%s'"
+ " failed", idx, instance.name)
try:
- for idx, device in enumerate(instance.disks):
+ for (idx, device, offset) in disks:
# The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
- # MAX_WIPE_CHUNK at max
- wipe_chunk_size = min(constants.MAX_WIPE_CHUNK, device.size / 100.0 *
- constants.MIN_WIPE_CHUNK_PERCENT)
- # we _must_ make this an int, otherwise rounding errors will
- # occur
- wipe_chunk_size = int(wipe_chunk_size)
-
- lu.LogInfo("* Wiping disk %d", idx)
- logging.info("Wiping disk %d for instance %s, node %s using"
- " chunk size %s", idx, instance.name, node, wipe_chunk_size)
+ # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
+ wipe_chunk_size = \
+ int(min(constants.MAX_WIPE_CHUNK,
+ device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
- offset = 0
size = device.size
last_output = 0
start_time = time.time()
+ if offset == 0:
+ info_text = ""
+ else:
+ info_text = (" (from %s to %s)" %
+ (utils.FormatUnit(offset, "h"),
+ utils.FormatUnit(size, "h")))
+
+ lu.LogInfo("* Wiping disk %s%s", idx, info_text)
+
+ logging.info("Wiping disk %d for instance %s on node %s using"
+ " chunk size %s", idx, instance.name, node, wipe_chunk_size)
+
while offset < size:
wipe_size = min(wipe_chunk_size, size - offset)
+
logging.debug("Wiping disk %d, offset %s, chunk %s",
idx, offset, wipe_size)
+
result = lu.rpc.call_blockdev_wipe(node, (device, instance), offset,
wipe_size)
result.Raise("Could not wipe disk %d at offset %d for size %d" %
(idx, offset, wipe_size))
+
now = time.time()
offset += wipe_size
if now - last_output >= 60:
eta = _CalcEta(now - start_time, offset, size)
- lu.LogInfo(" - done: %.1f%% ETA: %s" %
- (offset / float(size) * 100, utils.FormatSeconds(eta)))
+ lu.LogInfo(" - done: %.1f%% ETA: %s",
+ offset / float(size) * 100, utils.FormatSeconds(eta))
last_output = now
finally:
- logging.info("Resume sync of instance %s disks", instance.name)
+ logging.info("Resuming synchronization of disks for instance '%s'",
+ instance.name)
result = lu.rpc.call_blockdev_pause_resume_sync(node,
- (instance.disks, instance),
+ (map(compat.snd, disks),
+ instance),
False)
if result.fail_msg:
- lu.LogWarning("RPC call to %s for resuming disk syncing failed,"
- " please have a look at the status and troubleshoot"
- " the issue: %s", node, result.fail_msg)
+ lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
+ node, result.fail_msg)
else:
for idx, success in enumerate(result.payload):
if not success:
- lu.LogWarning("Resume sync of disk %d failed, please have a"
- " look at the status and troubleshoot the issue", idx)
- logging.warn("resume-sync of instance %s for disks %d failed",
- instance.name, idx)
+ lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
+ " failed", idx, instance.name)
def _CreateDisks(lu, instance, to_skip=None, target_node=None):
constants.DT_DISKLESS: {},
constants.DT_PLAIN: _compute(disks, 0),
# 128 MB are added for drbd metadata for each disk
- constants.DT_DRBD8: _compute(disks, DRBD_META_SIZE),
+ constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
constants.DT_FILE: {},
constants.DT_SHARED_FILE: {},
}
return req_size_dict[disk_template]
-def _ComputeDiskSize(disk_template, disks):
- """Compute disk size requirements according to disk template
-
- """
- # Required free disk space as a function of disk and swap space
- req_size_dict = {
- constants.DT_DISKLESS: None,
- constants.DT_PLAIN: sum(d[constants.IDISK_SIZE] for d in disks),
- # 128 MB are added for drbd metadata for each disk
- constants.DT_DRBD8:
- sum(d[constants.IDISK_SIZE] + DRBD_META_SIZE for d in disks),
- constants.DT_FILE: sum(d[constants.IDISK_SIZE] for d in disks),
- constants.DT_SHARED_FILE: sum(d[constants.IDISK_SIZE] for d in disks),
- constants.DT_BLOCK: 0,
- constants.DT_RBD: sum(d[constants.IDISK_SIZE] for d in disks),
- }
-
- if disk_template not in req_size_dict:
- raise errors.ProgrammerError("Disk template '%s' size requirement"
- " is unknown" % disk_template)
-
- return req_size_dict[disk_template]
-
-
def _FilterVmNodes(lu, nodenames):
"""Filters out non-vm_capable nodes from a list.
osname, node)
-class LUInstanceCreate(LogicalUnit):
- """Create an instance.
+def _CreateInstanceAllocRequest(op, disks, nics, beparams, node_whitelist):
+ """Wrapper around IAReqInstanceAlloc.
- """
- HPATH = "instance-add"
- HTYPE = constants.HTYPE_INSTANCE
- REQ_BGL = False
+ @param op: The instance opcode
+ @param disks: The computed disks
+ @param nics: The computed nics
+ @param beparams: The full filled beparams
+ @param node_whitelist: List of nodes which should appear as online to the
+ allocator (unless the node is already marked offline)
- def CheckArguments(self):
- """Check arguments.
+ @returns: A filled L{iallocator.IAReqInstanceAlloc}
- """
- # do not require name_check to ease forward/backward compatibility
- # for tools
- if self.op.no_install and self.op.start:
- self.LogInfo("No-installation mode selected, disabling startup")
- self.op.start = False
- # validate/normalize the instance name
- self.op.instance_name = \
- netutils.Hostname.GetNormalizedName(self.op.instance_name)
+ """
+ spindle_use = beparams[constants.BE_SPINDLE_USE]
+ return iallocator.IAReqInstanceAlloc(name=op.instance_name,
+ disk_template=op.disk_template,
+ tags=op.tags,
+ os=op.os_type,
+ vcpus=beparams[constants.BE_VCPUS],
+ memory=beparams[constants.BE_MAXMEM],
+ spindle_use=spindle_use,
+ disks=disks,
+ nics=[n.ToDict() for n in nics],
+ hypervisor=op.hypervisor,
+ node_whitelist=node_whitelist)
+
+
+def _ComputeNics(op, cluster, default_ip, cfg, ec_id):
+ """Computes the nics.
+
+ @param op: The instance opcode
+ @param cluster: Cluster configuration object
+ @param default_ip: The default ip to assign
+ @param cfg: An instance of the configuration object
+ @param ec_id: Execution context ID
+
+ @returns: The build up nics
- if self.op.ip_check and not self.op.name_check:
- # TODO: make the ip check more flexible and not depend on the name check
+ """
+ nics = []
+ for nic in op.nics:
+ nic_mode_req = nic.get(constants.INIC_MODE, None)
+ nic_mode = nic_mode_req
+ if nic_mode is None or nic_mode == constants.VALUE_AUTO:
+ nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
+
+ net = nic.get(constants.INIC_NETWORK, None)
+ link = nic.get(constants.NIC_LINK, None)
+ ip = nic.get(constants.INIC_IP, None)
+
+ if net is None or net.lower() == constants.VALUE_NONE:
+ net = None
+ else:
+ if nic_mode_req is not None or link is not None:
+ raise errors.OpPrereqError("If network is given, no mode or link"
+ " is allowed to be passed",
+ errors.ECODE_INVAL)
+
+ # ip validity checks
+ if ip is None or ip.lower() == constants.VALUE_NONE:
+ nic_ip = None
+ elif ip.lower() == constants.VALUE_AUTO:
+ if not op.name_check:
+ raise errors.OpPrereqError("IP address set to auto but name checks"
+ " have been skipped",
+ errors.ECODE_INVAL)
+ nic_ip = default_ip
+ else:
+ # We defer pool operations until later, so that the iallocator has
+ # filled in the instance's node(s) dimara
+ if ip.lower() == constants.NIC_IP_POOL:
+ if net is None:
+ raise errors.OpPrereqError("if ip=pool, parameter network"
+ " must be passed too",
+ errors.ECODE_INVAL)
+
+ elif not netutils.IPAddress.IsValid(ip):
+ raise errors.OpPrereqError("Invalid IP address '%s'" % ip,
+ errors.ECODE_INVAL)
+
+ nic_ip = ip
+
+ # TODO: check the ip address for uniqueness
+ if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
+ raise errors.OpPrereqError("Routed nic mode requires an ip address",
+ errors.ECODE_INVAL)
+
+ # MAC address verification
+ mac = nic.get(constants.INIC_MAC, constants.VALUE_AUTO)
+ if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
+ mac = utils.NormalizeAndValidateMac(mac)
+
+ try:
+ # TODO: We need to factor this out
+ cfg.ReserveMAC(mac, ec_id)
+ except errors.ReservationError:
+ raise errors.OpPrereqError("MAC address %s already in use"
+ " in cluster" % mac,
+ errors.ECODE_NOTUNIQUE)
+
+ # Build nic parameters
+ nicparams = {}
+ if nic_mode_req:
+ nicparams[constants.NIC_MODE] = nic_mode
+ if link:
+ nicparams[constants.NIC_LINK] = link
+
+ check_params = cluster.SimpleFillNIC(nicparams)
+ objects.NIC.CheckParameterSyntax(check_params)
+ nics.append(objects.NIC(mac=mac, ip=nic_ip,
+ network=net, nicparams=nicparams))
+
+ return nics
+
+
+def _ComputeDisks(op, default_vg):
+ """Computes the instance disks.
+
+ @param op: The instance opcode
+ @param default_vg: The default_vg to assume
+
+ @return: The computer disks
+
+ """
+ disks = []
+ for disk in op.disks:
+ mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
+ if mode not in constants.DISK_ACCESS_SET:
+ raise errors.OpPrereqError("Invalid disk access mode '%s'" %
+ mode, errors.ECODE_INVAL)
+ size = disk.get(constants.IDISK_SIZE, None)
+ if size is None:
+ raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
+ try:
+ size = int(size)
+ except (TypeError, ValueError):
+ raise errors.OpPrereqError("Invalid disk size '%s'" % size,
+ errors.ECODE_INVAL)
+
+ data_vg = disk.get(constants.IDISK_VG, default_vg)
+ new_disk = {
+ constants.IDISK_SIZE: size,
+ constants.IDISK_MODE: mode,
+ constants.IDISK_VG: data_vg,
+ }
+ if constants.IDISK_METAVG in disk:
+ new_disk[constants.IDISK_METAVG] = disk[constants.IDISK_METAVG]
+ if constants.IDISK_ADOPT in disk:
+ new_disk[constants.IDISK_ADOPT] = disk[constants.IDISK_ADOPT]
+ disks.append(new_disk)
+
+ return disks
+
+
+def _ComputeFullBeParams(op, cluster):
+ """Computes the full beparams.
+
+ @param op: The instance opcode
+ @param cluster: The cluster config object
+
+ @return: The fully filled beparams
+
+ """
+ default_beparams = cluster.beparams[constants.PP_DEFAULT]
+ for param, value in op.beparams.iteritems():
+ if value == constants.VALUE_AUTO:
+ op.beparams[param] = default_beparams[param]
+ objects.UpgradeBeParams(op.beparams)
+ utils.ForceDictType(op.beparams, constants.BES_PARAMETER_TYPES)
+ return cluster.SimpleFillBE(op.beparams)
+
+
+class LUInstanceCreate(LogicalUnit):
+ """Create an instance.
+
+ """
+ HPATH = "instance-add"
+ HTYPE = constants.HTYPE_INSTANCE
+ REQ_BGL = False
+
+ def CheckArguments(self):
+ """Check arguments.
+
+ """
+ # do not require name_check to ease forward/backward compatibility
+ # for tools
+ if self.op.no_install and self.op.start:
+ self.LogInfo("No-installation mode selected, disabling startup")
+ self.op.start = False
+ # validate/normalize the instance name
+ self.op.instance_name = \
+ netutils.Hostname.GetNormalizedName(self.op.instance_name)
+
+ if self.op.ip_check and not self.op.name_check:
+ # TODO: make the ip check more flexible and not depend on the name check
raise errors.OpPrereqError("Cannot do IP address check without a name"
" check", errors.ECODE_INVAL)
# specifying a group on instance creation and then selecting nodes from
# that group
self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
- self.needed_locks[locking.LEVEL_NODE_RES] = locking.ALL_SET
+ self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
+
+ if self.op.opportunistic_locking:
+ self.opportunistic_locks[locking.LEVEL_NODE] = True
+ self.opportunistic_locks[locking.LEVEL_NODE_RES] = True
else:
self.op.pnode = _ExpandNodeName(self.cfg, self.op.pnode)
nodelist = [self.op.pnode]
self.op.snode = _ExpandNodeName(self.cfg, self.op.snode)
nodelist.append(self.op.snode)
self.needed_locks[locking.LEVEL_NODE] = nodelist
- # Lock resources of instance's primary and secondary nodes (copy to
- # prevent accidential modification)
- self.needed_locks[locking.LEVEL_NODE_RES] = list(nodelist)
# in case of import lock the source node too
if self.op.mode == constants.INSTANCE_IMPORT:
if src_node is None:
self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+ self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
self.op.src_node = None
if os.path.isabs(src_path):
raise errors.OpPrereqError("Importing an instance from a path"
self.needed_locks[locking.LEVEL_NODE].append(src_node)
if not os.path.isabs(src_path):
self.op.src_path = src_path = \
- utils.PathJoin(constants.EXPORT_DIR, src_path)
+ utils.PathJoin(pathutils.EXPORT_DIR, src_path)
+
+ self.needed_locks[locking.LEVEL_NODE_RES] = \
+ _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
def _RunAllocator(self):
"""Run the allocator based on input opcode.
"""
- nics = [n.ToDict() for n in self.nics]
- ial = IAllocator(self.cfg, self.rpc,
- mode=constants.IALLOCATOR_MODE_ALLOC,
- name=self.op.instance_name,
- disk_template=self.op.disk_template,
- tags=self.op.tags,
- os=self.op.os_type,
- vcpus=self.be_full[constants.BE_VCPUS],
- memory=self.be_full[constants.BE_MAXMEM],
- spindle_use=self.be_full[constants.BE_SPINDLE_USE],
- disks=self.disks,
- nics=nics,
- hypervisor=self.op.hypervisor,
- )
+ if self.op.opportunistic_locking:
+ # Only consider nodes for which a lock is held
+ node_whitelist = self.owned_locks(locking.LEVEL_NODE)
+ else:
+ node_whitelist = None
+
+ #TODO Export network to iallocator so that it chooses a pnode
+ # in a nodegroup that has the desired network connected to
+ req = _CreateInstanceAllocRequest(self.op, self.disks,
+ self.nics, self.be_full,
+ node_whitelist)
+ ial = iallocator.IAllocator(self.cfg, self.rpc, req)
ial.Run(self.op.iallocator)
if not ial.success:
+ # When opportunistic locks are used only a temporary failure is generated
+ if self.op.opportunistic_locking:
+ ecode = errors.ECODE_TEMP_NORES
+ else:
+ ecode = errors.ECODE_NORES
+
raise errors.OpPrereqError("Can't compute nodes using"
" iallocator '%s': %s" %
(self.op.iallocator, ial.info),
- errors.ECODE_NORES)
- if len(ial.result) != ial.required_nodes:
- raise errors.OpPrereqError("iallocator '%s' returned invalid number"
- " of nodes (%s), required %s" %
- (self.op.iallocator, len(ial.result),
- ial.required_nodes), errors.ECODE_FAULT)
+ ecode)
+
self.op.pnode = ial.result[0]
self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
self.op.instance_name, self.op.iallocator,
utils.CommaJoin(ial.result))
- if ial.required_nodes == 2:
+
+ assert req.RequiredNodes() in (1, 2), "Wrong node count from iallocator"
+
+ if req.RequiredNodes() == 2:
self.op.snode = ial.result[1]
def BuildHooksEnv(self):
if src_path in exp_list[node].payload:
found = True
self.op.src_node = src_node = node
- self.op.src_path = src_path = utils.PathJoin(constants.EXPORT_DIR,
+ self.op.src_path = src_path = utils.PathJoin(pathutils.EXPORT_DIR,
src_path)
break
if not found:
if self.op.disk_template not in constants.DISK_TEMPLATES:
raise errors.OpPrereqError("Disk template specified in configuration"
" file is not one of the allowed values:"
- " %s" % " ".join(constants.DISK_TEMPLATES))
+ " %s" %
+ " ".join(constants.DISK_TEMPLATES),
+ errors.ECODE_INVAL)
else:
raise errors.OpPrereqError("No disk template specified and the export"
" is missing the disk_template information",
cfg_storagedir = get_fsd_fn()
if not cfg_storagedir:
- raise errors.OpPrereqError("Cluster file storage dir not defined")
+ raise errors.OpPrereqError("Cluster file storage dir not defined",
+ errors.ECODE_STATE)
joinargs.append(cfg_storagedir)
if self.op.file_storage_dir is not None:
enabled_hvs = cluster.enabled_hypervisors
if self.op.hypervisor not in enabled_hvs:
raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
- " cluster (%s)" % (self.op.hypervisor,
- ",".join(enabled_hvs)),
+ " cluster (%s)" %
+ (self.op.hypervisor, ",".join(enabled_hvs)),
errors.ECODE_STATE)
# Check tag validity
_CheckGlobalHvParams(self.op.hvparams)
# fill and remember the beparams dict
- default_beparams = cluster.beparams[constants.PP_DEFAULT]
- for param, value in self.op.beparams.iteritems():
- if value == constants.VALUE_AUTO:
- self.op.beparams[param] = default_beparams[param]
- objects.UpgradeBeParams(self.op.beparams)
- utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
- self.be_full = cluster.SimpleFillBE(self.op.beparams)
+ self.be_full = _ComputeFullBeParams(self.op, cluster)
# build os parameters
self.os_full = cluster.SimpleFillOS(self.op.os_type, self.op.osparams)
self._RevertToDefaults(cluster)
# NIC buildup
- self.nics = []
- for idx, nic in enumerate(self.op.nics):
- nic_mode_req = nic.get(constants.INIC_MODE, None)
- nic_mode = nic_mode_req
- if nic_mode is None or nic_mode == constants.VALUE_AUTO:
- nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
-
- # in routed mode, for the first nic, the default ip is 'auto'
- if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
- default_ip_mode = constants.VALUE_AUTO
- else:
- default_ip_mode = constants.VALUE_NONE
-
- # ip validity checks
- ip = nic.get(constants.INIC_IP, default_ip_mode)
- if ip is None or ip.lower() == constants.VALUE_NONE:
- nic_ip = None
- elif ip.lower() == constants.VALUE_AUTO:
- if not self.op.name_check:
- raise errors.OpPrereqError("IP address set to auto but name checks"
- " have been skipped",
- errors.ECODE_INVAL)
- nic_ip = self.hostname1.ip
- else:
- if not netutils.IPAddress.IsValid(ip):
- raise errors.OpPrereqError("Invalid IP address '%s'" % ip,
- errors.ECODE_INVAL)
- nic_ip = ip
-
- # TODO: check the ip address for uniqueness
- if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
- raise errors.OpPrereqError("Routed nic mode requires an ip address",
- errors.ECODE_INVAL)
-
- # MAC address verification
- mac = nic.get(constants.INIC_MAC, constants.VALUE_AUTO)
- if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
- mac = utils.NormalizeAndValidateMac(mac)
-
- try:
- self.cfg.ReserveMAC(mac, self.proc.GetECId())
- except errors.ReservationError:
- raise errors.OpPrereqError("MAC address %s already in use"
- " in cluster" % mac,
- errors.ECODE_NOTUNIQUE)
-
- # Build nic parameters
- link = nic.get(constants.INIC_LINK, None)
- if link == constants.VALUE_AUTO:
- link = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_LINK]
- nicparams = {}
- if nic_mode_req:
- nicparams[constants.NIC_MODE] = nic_mode
- if link:
- nicparams[constants.NIC_LINK] = link
-
- check_params = cluster.SimpleFillNIC(nicparams)
- objects.NIC.CheckParameterSyntax(check_params)
- self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
+ self.nics = _ComputeNics(self.op, cluster, self.hostname1.ip, self.cfg,
+ self.proc.GetECId())
# disk checks/pre-build
default_vg = self.cfg.GetVGName()
- self.disks = []
- for disk in self.op.disks:
- mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
- if mode not in constants.DISK_ACCESS_SET:
- raise errors.OpPrereqError("Invalid disk access mode '%s'" %
- mode, errors.ECODE_INVAL)
- size = disk.get(constants.IDISK_SIZE, None)
- if size is None:
- raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
- try:
- size = int(size)
- except (TypeError, ValueError):
- raise errors.OpPrereqError("Invalid disk size '%s'" % size,
- errors.ECODE_INVAL)
-
- data_vg = disk.get(constants.IDISK_VG, default_vg)
- new_disk = {
- constants.IDISK_SIZE: size,
- constants.IDISK_MODE: mode,
- constants.IDISK_VG: data_vg,
- }
- if constants.IDISK_METAVG in disk:
- new_disk[constants.IDISK_METAVG] = disk[constants.IDISK_METAVG]
- if constants.IDISK_ADOPT in disk:
- new_disk[constants.IDISK_ADOPT] = disk[constants.IDISK_ADOPT]
- self.disks.append(new_disk)
+ self.disks = _ComputeDisks(self.op, default_vg)
if self.op.mode == constants.INSTANCE_IMPORT:
disk_images = []
# creation job will fail.
for nic in self.nics:
if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
- nic.mac = self.cfg.GenerateMAC(self.proc.GetECId())
+ nic.mac = self.cfg.GenerateMAC(nic.network, self.proc.GetECId())
#### allocator run
self._RunAllocator()
# Release all unneeded node locks
- _ReleaseLocks(self, locking.LEVEL_NODE,
- keep=filter(None, [self.op.pnode, self.op.snode,
- self.op.src_node]))
- _ReleaseLocks(self, locking.LEVEL_NODE_RES,
- keep=filter(None, [self.op.pnode, self.op.snode,
- self.op.src_node]))
+ keep_locks = filter(None, [self.op.pnode, self.op.snode, self.op.src_node])
+ _ReleaseLocks(self, locking.LEVEL_NODE, keep=keep_locks)
+ _ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=keep_locks)
+ _ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
+
+ assert (self.owned_locks(locking.LEVEL_NODE) ==
+ self.owned_locks(locking.LEVEL_NODE_RES)), \
+ "Node locks differ from node resource locks"
#### node related checks
self.secondaries = []
+ # Fill in any IPs from IP pools. This must happen here, because we need to
+ # know the nic's primary node, as specified by the iallocator
+ for idx, nic in enumerate(self.nics):
+ net = nic.network
+ if net is not None:
+ netparams = self.cfg.GetGroupNetParams(net, self.pnode.name)
+ if netparams is None:
+ raise errors.OpPrereqError("No netparams found for network"
+ " %s. Propably not connected to"
+ " node's %s nodegroup" %
+ (net, self.pnode.name),
+ errors.ECODE_INVAL)
+ self.LogInfo("NIC/%d inherits netparams %s" %
+ (idx, netparams.values()))
+ nic.nicparams = dict(netparams)
+ if nic.ip is not None:
+ if nic.ip.lower() == constants.NIC_IP_POOL:
+ try:
+ nic.ip = self.cfg.GenerateIp(net, self.proc.GetECId())
+ except errors.ReservationError:
+ raise errors.OpPrereqError("Unable to get a free IP for NIC %d"
+ " from the address pool" % idx,
+ errors.ECODE_STATE)
+ self.LogInfo("Chose IP %s from network %s", nic.ip, net)
+ else:
+ try:
+ self.cfg.ReserveIp(net, nic.ip, self.proc.GetECId())
+ except errors.ReservationError:
+ raise errors.OpPrereqError("IP address %s already in use"
+ " or does not belong to network %s" %
+ (nic.ip, net),
+ errors.ECODE_NOTUNIQUE)
+ else:
+ # net is None, ip None or given
+ if self.op.conflicts_check:
+ _CheckForConflictingIp(self, nic.ip, self.pnode.name)
+
# mirror node verification
if self.op.disk_template in constants.DTS_INT_MIRROR:
if self.op.snode == pnode.name:
nodenames = [pnode.name] + self.secondaries
+ # Verify instance specs
+ spindle_use = self.be_full.get(constants.BE_SPINDLE_USE, None)
+ ispec = {
+ constants.ISPEC_MEM_SIZE: self.be_full.get(constants.BE_MAXMEM, None),
+ constants.ISPEC_CPU_COUNT: self.be_full.get(constants.BE_VCPUS, None),
+ constants.ISPEC_DISK_COUNT: len(self.disks),
+ constants.ISPEC_DISK_SIZE: [disk["size"] for disk in self.disks],
+ constants.ISPEC_NIC_COUNT: len(self.nics),
+ constants.ISPEC_SPINDLE_USE: spindle_use,
+ }
+
+ group_info = self.cfg.GetNodeGroup(pnode.group)
+ ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, group_info)
+ res = _ComputeIPolicyInstanceSpecViolation(ipolicy, ispec)
+ if not self.op.ignore_ipolicy and res:
+ msg = ("Instance allocation to group %s (%s) violates policy: %s" %
+ (pnode.group, group_info.name, utils.CommaJoin(res)))
+ raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
+
if not self.adopt_disks:
if self.op.disk_template == constants.DT_RBD:
# _CheckRADOSFreeSpace() is just a placeholder.
if baddisks:
raise errors.OpPrereqError("Device node(s) %s lie outside %s and"
" cannot be adopted" %
- (", ".join(baddisks),
+ (utils.CommaJoin(baddisks),
constants.ADOPTABLE_BLOCKDEV_ROOT),
errors.ECODE_INVAL)
}
group_info = self.cfg.GetNodeGroup(pnode.group)
- ipolicy = _CalculateGroupIPolicy(cluster, group_info)
+ ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, group_info)
res = _ComputeIPolicyInstanceSpecViolation(ipolicy, ispec)
if not self.op.ignore_ipolicy and res:
raise errors.OpPrereqError(("Instance allocation to group %s violates"
assert not (self.owned_locks(locking.LEVEL_NODE_RES) -
self.owned_locks(locking.LEVEL_NODE)), \
"Node locks differ from node resource locks"
+ assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
ht_kind = self.op.hypervisor
if ht_kind in constants.HTS_REQ_PORT:
return list(iobj.all_nodes)
+class LUInstanceMultiAlloc(NoHooksLU):
+ """Allocates multiple instances at the same time.
+
+ """
+ REQ_BGL = False
+
+ def CheckArguments(self):
+ """Check arguments.
+
+ """
+ nodes = []
+ for inst in self.op.instances:
+ if inst.iallocator is not None:
+ raise errors.OpPrereqError("iallocator are not allowed to be set on"
+ " instance objects", errors.ECODE_INVAL)
+ nodes.append(bool(inst.pnode))
+ if inst.disk_template in constants.DTS_INT_MIRROR:
+ nodes.append(bool(inst.snode))
+
+ has_nodes = compat.any(nodes)
+ if compat.all(nodes) ^ has_nodes:
+ raise errors.OpPrereqError("There are instance objects providing"
+ " pnode/snode while others do not",
+ errors.ECODE_INVAL)
+
+ if self.op.iallocator is None:
+ default_iallocator = self.cfg.GetDefaultIAllocator()
+ if default_iallocator and has_nodes:
+ self.op.iallocator = default_iallocator
+ else:
+ raise errors.OpPrereqError("No iallocator or nodes on the instances"
+ " given and no cluster-wide default"
+ " iallocator found; please specify either"
+ " an iallocator or nodes on the instances"
+ " or set a cluster-wide default iallocator",
+ errors.ECODE_INVAL)
+
+ dups = utils.FindDuplicates([op.instance_name for op in self.op.instances])
+ if dups:
+ raise errors.OpPrereqError("There are duplicate instance names: %s" %
+ utils.CommaJoin(dups), errors.ECODE_INVAL)
+
+ def ExpandNames(self):
+ """Calculate the locks.
+
+ """
+ self.share_locks = _ShareAll()
+ self.needed_locks = {
+ # iallocator will select nodes and even if no iallocator is used,
+ # collisions with LUInstanceCreate should be avoided
+ locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
+ }
+
+ if self.op.iallocator:
+ self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+ self.needed_locks[locking.LEVEL_NODE_RES] = locking.ALL_SET
+
+ if self.op.opportunistic_locking:
+ self.opportunistic_locks[locking.LEVEL_NODE] = True
+ self.opportunistic_locks[locking.LEVEL_NODE_RES] = True
+ else:
+ nodeslist = []
+ for inst in self.op.instances:
+ inst.pnode = _ExpandNodeName(self.cfg, inst.pnode)
+ nodeslist.append(inst.pnode)
+ if inst.snode is not None:
+ inst.snode = _ExpandNodeName(self.cfg, inst.snode)
+ nodeslist.append(inst.snode)
+
+ self.needed_locks[locking.LEVEL_NODE] = nodeslist
+ # Lock resources of instance's primary and secondary nodes (copy to
+ # prevent accidential modification)
+ self.needed_locks[locking.LEVEL_NODE_RES] = list(nodeslist)
+
+ def CheckPrereq(self):
+ """Check prerequisite.
+
+ """
+ cluster = self.cfg.GetClusterInfo()
+ default_vg = self.cfg.GetVGName()
+ ec_id = self.proc.GetECId()
+
+ if self.op.opportunistic_locking:
+ # Only consider nodes for which a lock is held
+ node_whitelist = self.owned_locks(locking.LEVEL_NODE)
+ else:
+ node_whitelist = None
+
+ insts = [_CreateInstanceAllocRequest(op, _ComputeDisks(op, default_vg),
+ _ComputeNics(op, cluster, None,
+ self.cfg, ec_id),
+ _ComputeFullBeParams(op, cluster),
+ node_whitelist)
+ for op in self.op.instances]
+
+ req = iallocator.IAReqMultiInstanceAlloc(instances=insts)
+ ial = iallocator.IAllocator(self.cfg, self.rpc, req)
+
+ ial.Run(self.op.iallocator)
+
+ if not ial.success:
+ raise errors.OpPrereqError("Can't compute nodes using"
+ " iallocator '%s': %s" %
+ (self.op.iallocator, ial.info),
+ errors.ECODE_NORES)
+
+ self.ia_result = ial.result
+
+ if self.op.dry_run:
+ self.dry_run_rsult = objects.FillDict(self._ConstructPartialResult(), {
+ constants.JOB_IDS_KEY: [],
+ })
+
+ def _ConstructPartialResult(self):
+ """Contructs the partial result.
+
+ """
+ (allocatable, failed) = self.ia_result
+ return {
+ opcodes.OpInstanceMultiAlloc.ALLOCATABLE_KEY:
+ map(compat.fst, allocatable),
+ opcodes.OpInstanceMultiAlloc.FAILED_KEY: failed,
+ }
+
+ def Exec(self, feedback_fn):
+ """Executes the opcode.
+
+ """
+ op2inst = dict((op.instance_name, op) for op in self.op.instances)
+ (allocatable, failed) = self.ia_result
+
+ jobs = []
+ for (name, nodes) in allocatable:
+ op = op2inst.pop(name)
+
+ if len(nodes) > 1:
+ (op.pnode, op.snode) = nodes
+ else:
+ (op.pnode,) = nodes
+
+ jobs.append([op])
+
+ missing = set(op2inst.keys()) - set(failed)
+ assert not missing, \
+ "Iallocator did return incomplete result: %s" % utils.CommaJoin(missing)
+
+ return ResultWithJobs(jobs, **self._ConstructPartialResult())
+
+
def _CheckRADOSFreeSpace():
"""Compute disk size requirements inside the RADOS cluster.
REQ_BGL = False
def CheckArguments(self):
- TLReplaceDisks.CheckArguments(self.op.mode, self.op.remote_node,
- self.op.iallocator)
+ """Check arguments.
+
+ """
+ remote_node = self.op.remote_node
+ ialloc = self.op.iallocator
+ if self.op.mode == constants.REPLACE_DISK_CHG:
+ if remote_node is None and ialloc is None:
+ raise errors.OpPrereqError("When changing the secondary either an"
+ " iallocator script must be used or the"
+ " new node given", errors.ECODE_INVAL)
+ else:
+ _CheckIAllocatorOrNode(self, "iallocator", "remote_node")
+
+ elif remote_node is not None or ialloc is not None:
+ # Not replacing the secondary
+ raise errors.OpPrereqError("The iallocator and new node options can"
+ " only be used when changing the"
+ " secondary node", errors.ECODE_INVAL)
def ExpandNames(self):
self._ExpandAndLockInstance()
if self.op.iallocator is not None:
# iallocator will select a new node in the same group
self.needed_locks[locking.LEVEL_NODEGROUP] = []
+ self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
self.needed_locks[locking.LEVEL_NODE_RES] = []
self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
self.op.iallocator, self.op.remote_node,
- self.op.disks, False, self.op.early_release,
+ self.op.disks, self.op.early_release,
self.op.ignore_ipolicy)
self.tasklets = [self.replacer]
if self.op.iallocator is not None:
assert self.op.remote_node is None
assert not self.needed_locks[locking.LEVEL_NODE]
+ assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
# Lock member nodes of all locked groups
- self.needed_locks[locking.LEVEL_NODE] = [node_name
- for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
- for node_name in self.cfg.GetNodeGroup(group_uuid).members]
+ self.needed_locks[locking.LEVEL_NODE] = \
+ [node_name
+ for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
+ for node_name in self.cfg.GetNodeGroup(group_uuid).members]
else:
+ assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
+
self._LockInstancesNodes()
+
elif level == locking.LEVEL_NODE_RES:
# Reuse node locks
self.needed_locks[locking.LEVEL_NODE_RES] = \
"""
def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
- disks, delay_iallocator, early_release, ignore_ipolicy):
+ disks, early_release, ignore_ipolicy):
"""Initializes this class.
"""
self.iallocator_name = iallocator_name
self.remote_node = remote_node
self.disks = disks
- self.delay_iallocator = delay_iallocator
self.early_release = early_release
self.ignore_ipolicy = ignore_ipolicy
self.node_secondary_ip = None
@staticmethod
- def CheckArguments(mode, remote_node, iallocator):
- """Helper function for users of this class.
-
- """
- # check for valid parameter combination
- if mode == constants.REPLACE_DISK_CHG:
- if remote_node is None and iallocator is None:
- raise errors.OpPrereqError("When changing the secondary either an"
- " iallocator script must be used or the"
- " new node given", errors.ECODE_INVAL)
-
- if remote_node is not None and iallocator is not None:
- raise errors.OpPrereqError("Give either the iallocator or the new"
- " secondary, not both", errors.ECODE_INVAL)
-
- elif remote_node is not None or iallocator is not None:
- # Not replacing the secondary
- raise errors.OpPrereqError("The iallocator and new node options can"
- " only be used when changing the"
- " secondary node", errors.ECODE_INVAL)
-
- @staticmethod
def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
"""Compute a new secondary node using an IAllocator.
"""
- ial = IAllocator(lu.cfg, lu.rpc,
- mode=constants.IALLOCATOR_MODE_RELOC,
- name=instance_name,
- relocate_from=list(relocate_from))
+ req = iallocator.IAReqRelocate(name=instance_name,
+ relocate_from=list(relocate_from))
+ ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
ial.Run(iallocator_name)
" %s" % (iallocator_name, ial.info),
errors.ECODE_NORES)
- if len(ial.result) != ial.required_nodes:
- raise errors.OpPrereqError("iallocator '%s' returned invalid number"
- " of nodes (%s), required %s" %
- (iallocator_name,
- len(ial.result), ial.required_nodes),
- errors.ECODE_FAULT)
-
remote_node_name = ial.result[0]
lu.LogInfo("Selected new secondary for instance '%s': %s",
len(instance.secondary_nodes),
errors.ECODE_FAULT)
- if not self.delay_iallocator:
- self._CheckPrereq2()
-
- def _CheckPrereq2(self):
- """Check prerequisites, second part.
-
- This function should always be part of CheckPrereq. It was separated and is
- now called from Exec because during node evacuation iallocator was only
- called with an unmodified cluster model, not taking planned changes into
- account.
-
- """
instance = self.instance
secondary_node = instance.secondary_nodes[0]
if self.remote_node_info:
# We change the node, lets verify it still meets instance policy
new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
- ipolicy = _CalculateGroupIPolicy(self.cfg.GetClusterInfo(),
- new_group_info)
+ cluster = self.cfg.GetClusterInfo()
+ ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
+ new_group_info)
_CheckTargetNodeIPolicy(self, ipolicy, instance, self.remote_node_info,
ignore=self.ignore_ipolicy)
# Release unneeded node and node resource locks
_ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
_ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
+ _ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
# Release any owned node group
- if self.lu.glm.is_owned(locking.LEVEL_NODEGROUP):
- _ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
+ _ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
# Check whether disks are valid
for disk_idx in self.disks:
This dispatches the disk replacement to the appropriate handler.
"""
- if self.delay_iallocator:
- self._CheckPrereq2()
-
if __debug__:
# Verify owned locks before starting operation
owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
(owned_nodes, self.node_secondary_ip.keys()))
assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
self.lu.owned_locks(locking.LEVEL_NODE_RES))
+ assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
assert list(owned_instances) == [self.instance_name], \
continue
for node in nodes:
- self.lu.LogInfo("Checking disk/%d on %s" % (idx, node))
+ self.lu.LogInfo("Checking disk/%d on %s", idx, node)
self.cfg.SetDiskID(dev, node)
result = _BlockdevFind(self, node, dev, self.instance)
if idx not in self.disks:
continue
- self.lu.LogInfo("Adding storage on %s for disk/%d" % (node_name, idx))
+ self.lu.LogInfo("Adding storage on %s for disk/%d", node_name, idx)
self.cfg.SetDiskID(dev, node_name)
logical_id=(vg_data, names[0]),
params=data_disk.params)
vg_meta = meta_disk.logical_id[0]
- lv_meta = objects.Disk(dev_type=constants.LD_LV, size=DRBD_META_SIZE,
+ lv_meta = objects.Disk(dev_type=constants.LD_LV,
+ size=constants.DRBD_META_SIZE,
logical_id=(vg_meta, names[1]),
params=meta_disk.params)
def _RemoveOldStorage(self, node_name, iv_names):
for name, (_, old_lvs, _) in iv_names.iteritems():
- self.lu.LogInfo("Remove logical volumes for %s" % name)
+ self.lu.LogInfo("Remove logical volumes for %s", name)
for lv in old_lvs:
self.cfg.SetDiskID(lv, node_name)
msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
if msg:
- self.lu.LogWarning("Can't remove old LV: %s" % msg,
+ self.lu.LogWarning("Can't remove old LV: %s", msg,
hint="remove unused LVs manually")
def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
# Step: for each lv, detach+rename*2+attach
self.lu.LogStep(4, steps_total, "Changing drbd configuration")
for dev, old_lvs, new_lvs in iv_names.itervalues():
- self.lu.LogInfo("Detaching %s drbd from local storage" % dev.iv_name)
+ self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name)
result = self.rpc.call_blockdev_removechildren(self.target_node, dev,
old_lvs)
self.cfg.SetDiskID(disk, self.target_node)
# Now that the new lvs have the old name, we can add them to the device
- self.lu.LogInfo("Adding new mirror component on %s" % self.target_node)
+ self.lu.LogInfo("Adding new mirror component on %s", self.target_node)
result = self.rpc.call_blockdev_addchildren(self.target_node,
(dev, self.instance), new_lvs)
msg = result.fail_msg
# We have new devices, shutdown the drbd on the old secondary
for idx, dev in enumerate(self.instance.disks):
- self.lu.LogInfo("Shutting down drbd for disk/%d on old node" % idx)
+ self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx)
self.cfg.SetDiskID(dev, self.target_node)
msg = self.rpc.call_blockdev_shutdown(self.target_node,
(dev, self.instance)).fail_msg
errors.ECODE_STATE)
except errors.OpPrereqError, err:
if self.op.ignore_consistency:
- self.proc.LogWarning(str(err.args[0]))
+ self.LogWarning(str(err.args[0]))
else:
raise
elif self.op.iallocator is not None:
# TODO: Implement relocation to other group
- ial = IAllocator(self.cfg, self.rpc, constants.IALLOCATOR_MODE_NODE_EVAC,
- evac_mode=self._MODE2IALLOCATOR[self.op.mode],
- instances=list(self.instance_names))
+ evac_mode = self._MODE2IALLOCATOR[self.op.mode]
+ req = iallocator.IAReqNodeEvac(evac_mode=evac_mode,
+ instances=list(self.instance_names))
+ ial = iallocator.IAllocator(self.cfg, self.rpc, req)
ial.Run(self.op.iallocator)
disks=[],
mode=constants.REPLACE_DISK_CHG,
early_release=self.op.early_release)]
- for instance_name in self.instance_names
- ]
+ for instance_name in self.instance_names]
else:
raise errors.ProgrammerError("No iallocator or remote node")
for ops in jobs]
+def _DiskSizeInBytesToMebibytes(lu, size):
+ """Converts a disk size in bytes to mebibytes.
+
+ Warns and rounds up if the size isn't an even multiple of 1 MiB.
+
+ """
+ (mib, remainder) = divmod(size, 1024 * 1024)
+
+ if remainder != 0:
+ lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
+ " to not overwrite existing data (%s bytes will not be"
+ " wiped)", (1024 * 1024) - remainder)
+ mib += 1
+
+ return mib
+
+
class LUInstanceGrowDisk(LogicalUnit):
"""Grow a disk of an instance.
assert (self.owned_locks(locking.LEVEL_NODE) ==
self.owned_locks(locking.LEVEL_NODE_RES))
+ wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
+
disks_ok, _ = _AssembleInstanceDisks(self, self.instance, disks=[disk])
if not disks_ok:
raise errors.OpExecError("Cannot activate block device to grow")
for node in instance.all_nodes:
self.cfg.SetDiskID(disk, node)
result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
- True)
- result.Raise("Grow request failed to node %s" % node)
+ True, True)
+ result.Raise("Dry-run grow request failed to node %s" % node)
+
+ if wipe_disks:
+ # Get disk size from primary node for wiping
+ result = self.rpc.call_blockdev_getsize(instance.primary_node, [disk])
+ result.Raise("Failed to retrieve disk size from node '%s'" %
+ instance.primary_node)
+
+ (disk_size_in_bytes, ) = result.payload
+
+ if disk_size_in_bytes is None:
+ raise errors.OpExecError("Failed to retrieve disk size from primary"
+ " node '%s'" % instance.primary_node)
+
+ old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
+
+ assert old_disk_size >= disk.size, \
+ ("Retrieved disk size too small (got %s, should be at least %s)" %
+ (old_disk_size, disk.size))
+ else:
+ old_disk_size = None
# We know that (as far as we can test) operations across different
- # nodes will succeed, time to run it for real
+ # nodes will succeed, time to run it for real on the backing storage
for node in instance.all_nodes:
self.cfg.SetDiskID(disk, node)
result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
- False)
+ False, True)
result.Raise("Grow request failed to node %s" % node)
- # TODO: Rewrite code to work properly
- # DRBD goes into sync mode for a short amount of time after executing the
- # "resize" command. DRBD 8.x below version 8.0.13 contains a bug whereby
- # calling "resize" in sync mode fails. Sleeping for a short amount of
- # time is a work-around.
- time.sleep(5)
+ # And now execute it for logical storage, on the primary node
+ node = instance.primary_node
+ self.cfg.SetDiskID(disk, node)
+ result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
+ False, False)
+ result.Raise("Grow request failed to node %s" % node)
disk.RecordGrow(self.delta)
self.cfg.Update(instance, feedback_fn)
# Downgrade lock while waiting for sync
self.glm.downgrade(locking.LEVEL_INSTANCE)
+ assert wipe_disks ^ (old_disk_size is None)
+
+ if wipe_disks:
+ assert instance.disks[self.op.disk] == disk
+
+ # Wipe newly added disk space
+ _WipeDisks(self, instance,
+ disks=[(self.op.disk, disk, old_disk_size)])
+
if self.op.wait_for_sync:
disk_abort = not _WaitForSync(self, instance, disks=[disk])
if disk_abort:
- self.proc.LogWarning("Disk sync-ing has not returned a good"
- " status; please check the instance")
+ self.LogWarning("Disk syncing has not returned a good status; check"
+ " the instance")
if instance.admin_state != constants.ADMINST_UP:
_SafeShutdownInstanceDisks(self, instance, disks=[disk])
elif instance.admin_state != constants.ADMINST_UP:
- self.proc.LogWarning("Not shutting down the disk even if the instance is"
- " not supposed to be running because no wait for"
- " sync mode was requested")
+ self.LogWarning("Not shutting down the disk even if the instance is"
+ " not supposed to be running because no wait for"
+ " sync mode was requested")
assert self.owned_locks(locking.LEVEL_NODE_RES)
assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
"""
if op in (constants.DDM_ADD, constants.DDM_MODIFY):
ip = params.get(constants.INIC_IP, None)
- if ip is None:
- pass
- elif ip.lower() == constants.VALUE_NONE:
- params[constants.INIC_IP] = None
- elif not netutils.IPAddress.IsValid(ip):
- raise errors.OpPrereqError("Invalid IP address '%s'" % ip,
- errors.ECODE_INVAL)
-
- bridge = params.get("bridge", None)
- link = params.get(constants.INIC_LINK, None)
- if bridge and link:
- raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
- " at the same time", errors.ECODE_INVAL)
- elif bridge and bridge.lower() == constants.VALUE_NONE:
- params["bridge"] = None
- elif link and link.lower() == constants.VALUE_NONE:
- params[constants.INIC_LINK] = None
+ req_net = params.get(constants.INIC_NETWORK, None)
+ link = params.get(constants.NIC_LINK, None)
+ mode = params.get(constants.NIC_MODE, None)
+ if req_net is not None:
+ if req_net.lower() == constants.VALUE_NONE:
+ params[constants.INIC_NETWORK] = None
+ req_net = None
+ elif link is not None or mode is not None:
+ raise errors.OpPrereqError("If network is given"
+ " mode or link should not",
+ errors.ECODE_INVAL)
if op == constants.DDM_ADD:
macaddr = params.get(constants.INIC_MAC, None)
if macaddr is None:
params[constants.INIC_MAC] = constants.VALUE_AUTO
+ if ip is not None:
+ if ip.lower() == constants.VALUE_NONE:
+ params[constants.INIC_IP] = None
+ else:
+ if ip.lower() == constants.NIC_IP_POOL:
+ if op == constants.DDM_ADD and req_net is None:
+ raise errors.OpPrereqError("If ip=pool, parameter network"
+ " cannot be none",
+ errors.ECODE_INVAL)
+ else:
+ if not netutils.IPAddress.IsValid(ip):
+ raise errors.OpPrereqError("Invalid IP address '%s'" % ip,
+ errors.ECODE_INVAL)
+
if constants.INIC_MAC in params:
macaddr = params[constants.INIC_MAC]
if macaddr not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
if self.op.hvparams:
_CheckGlobalHvParams(self.op.hvparams)
- self.op.disks = \
- self._UpgradeDiskNicMods("disk", self.op.disks,
- opcodes.OpInstanceSetParams.TestDiskModifications)
- self.op.nics = \
- self._UpgradeDiskNicMods("NIC", self.op.nics,
- opcodes.OpInstanceSetParams.TestNicModifications)
+ self.op.disks = self._UpgradeDiskNicMods(
+ "disk", self.op.disks, opcodes.OpInstanceSetParams.TestDiskModifications)
+ self.op.nics = self._UpgradeDiskNicMods(
+ "NIC", self.op.nics, opcodes.OpInstanceSetParams.TestNicModifications)
# Check disk modifications
self._CheckMods("disk", self.op.disks, constants.IDISK_PARAMS_TYPES,
def ExpandNames(self):
self._ExpandAndLockInstance()
+ self.needed_locks[locking.LEVEL_NODEGROUP] = []
# Can't even acquire node locks in shared mode as upcoming changes in
# Ganeti 2.6 will start to modify the node object on disk conversion
self.needed_locks[locking.LEVEL_NODE] = []
self.needed_locks[locking.LEVEL_NODE_RES] = []
self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+ # Look node group to look up the ipolicy
+ self.share_locks[locking.LEVEL_NODEGROUP] = 1
def DeclareLocks(self, level):
- # TODO: Acquire group lock in shared mode (disk parameters)
- if level == locking.LEVEL_NODE:
+ if level == locking.LEVEL_NODEGROUP:
+ assert not self.needed_locks[locking.LEVEL_NODEGROUP]
+ # Acquire locks for the instance's nodegroups optimistically. Needs
+ # to be verified in CheckPrereq
+ self.needed_locks[locking.LEVEL_NODEGROUP] = \
+ self.cfg.GetInstanceNodeGroups(self.op.instance_name)
+ elif level == locking.LEVEL_NODE:
self._LockInstancesNodes()
if self.op.disk_template and self.op.remote_node:
self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node)
This runs on the master, primary and secondaries.
"""
- args = dict()
+ args = {}
if constants.BE_MINMEM in self.be_new:
args["minmem"] = self.be_new[constants.BE_MINMEM]
if constants.BE_MAXMEM in self.be_new:
nics = []
for nic in self._new_nics:
- nicparams = self.cluster.SimpleFillNIC(nic.nicparams)
- mode = nicparams[constants.NIC_MODE]
- link = nicparams[constants.NIC_LINK]
- nics.append((nic.ip, nic.mac, mode, link))
+ n = copy.deepcopy(nic)
+ nicparams = self.cluster.SimpleFillNIC(n.nicparams)
+ n.nicparams = nicparams
+ nics.append(_NICToTuple(self, n))
args["nics"] = nics
nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
return (nl, nl)
- def _PrepareNicModification(self, params, private, old_ip, old_params,
- cluster, pnode):
+ def _PrepareNicModification(self, params, private, old_ip, old_net,
+ old_params, cluster, pnode):
+
update_params_dict = dict([(key, params[key])
for key in constants.NICS_PARAMETERS
if key in params])
- if "bridge" in params:
- update_params_dict[constants.NIC_LINK] = params["bridge"]
+ req_link = update_params_dict.get(constants.NIC_LINK, None)
+ req_mode = update_params_dict.get(constants.NIC_MODE, None)
+
+ new_net = params.get(constants.INIC_NETWORK, old_net)
+ if new_net is not None:
+ netparams = self.cfg.GetGroupNetParams(new_net, pnode)
+ if netparams is None:
+ raise errors.OpPrereqError("No netparams found for the network"
+ " %s, probably not connected" % new_net,
+ errors.ECODE_INVAL)
+ new_params = dict(netparams)
+ else:
+ new_params = _GetUpdatedParams(old_params, update_params_dict)
- new_params = _GetUpdatedParams(old_params, update_params_dict)
utils.ForceDictType(new_params, constants.NICS_PARAMETER_TYPES)
new_filled_params = cluster.SimpleFillNIC(new_params)
elif mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
# otherwise generate the MAC address
params[constants.INIC_MAC] = \
- self.cfg.GenerateMAC(self.proc.GetECId())
+ self.cfg.GenerateMAC(new_net, self.proc.GetECId())
else:
# or validate/reserve the current one
try:
raise errors.OpPrereqError("MAC address '%s' already in use"
" in cluster" % mac,
errors.ECODE_NOTUNIQUE)
+ elif new_net != old_net:
+
+ def get_net_prefix(net):
+ if net:
+ uuid = self.cfg.LookupNetwork(net)
+ if uuid:
+ nobj = self.cfg.GetNetwork(uuid)
+ return nobj.mac_prefix
+ return None
+
+ new_prefix = get_net_prefix(new_net)
+ old_prefix = get_net_prefix(old_net)
+ if old_prefix != new_prefix:
+ params[constants.INIC_MAC] = \
+ self.cfg.GenerateMAC(new_net, self.proc.GetECId())
+
+ #if there is a change in nic-network configuration
+ new_ip = params.get(constants.INIC_IP, old_ip)
+ if (new_ip, new_net) != (old_ip, old_net):
+ if new_ip:
+ if new_net:
+ if new_ip.lower() == constants.NIC_IP_POOL:
+ try:
+ new_ip = self.cfg.GenerateIp(new_net, self.proc.GetECId())
+ except errors.ReservationError:
+ raise errors.OpPrereqError("Unable to get a free IP"
+ " from the address pool",
+ errors.ECODE_STATE)
+ self.LogInfo("Chose IP %s from pool %s", new_ip, new_net)
+ params[constants.INIC_IP] = new_ip
+ elif new_ip != old_ip or new_net != old_net:
+ try:
+ self.LogInfo("Reserving IP %s in pool %s", new_ip, new_net)
+ self.cfg.ReserveIp(new_net, new_ip, self.proc.GetECId())
+ except errors.ReservationError:
+ raise errors.OpPrereqError("IP %s not available in network %s" %
+ (new_ip, new_net),
+ errors.ECODE_NOTUNIQUE)
+ elif new_ip.lower() == constants.NIC_IP_POOL:
+ raise errors.OpPrereqError("ip=pool, but no network found",
+ errors.ECODE_INVAL)
+ else:
+ # new net is None
+ if self.op.conflicts_check:
+ _CheckForConflictingIp(self, new_ip, pnode)
+
+ if old_ip:
+ if old_net:
+ try:
+ self.cfg.ReleaseIp(old_net, old_ip, self.proc.GetECId())
+ except errors.AddressPoolError:
+ logging.warning("Release IP %s not contained in network %s",
+ old_ip, old_net)
+
+ # there are no changes in (net, ip) tuple
+ elif (old_net is not None and
+ (req_link is not None or req_mode is not None)):
+ raise errors.OpPrereqError("Not allowed to change link or mode of"
+ " a NIC that is connected to a network",
+ errors.ECODE_INVAL)
private.params = new_params
private.filled = new_filled_params
This only checks the instance list against the existing names.
"""
- # checking the new params on the primary/secondary nodes
-
+ assert self.op.instance_name in self.owned_locks(locking.LEVEL_INSTANCE)
instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
+
cluster = self.cluster = self.cfg.GetClusterInfo()
assert self.instance is not None, \
"Cannot retrieve locked instance %s" % self.op.instance_name
+
pnode = instance.primary_node
+ assert pnode in self.owned_locks(locking.LEVEL_NODE)
nodelist = list(instance.all_nodes)
pnode_info = self.cfg.GetNodeInfo(pnode)
self.diskparams = self.cfg.GetInstanceDiskParams(instance)
+ #_CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups)
+ assert pnode_info.group in self.owned_locks(locking.LEVEL_NODEGROUP)
+ group_info = self.cfg.GetNodeGroup(pnode_info.group)
+
+ # dictionary with instance information after the modification
+ ispec = {}
+
# Prepare disk/NIC modifications
self.diskmod = PrepareContainerMods(self.op.disks, None)
self.nicmod = PrepareContainerMods(self.op.nics, _InstNicModPrivate)
snode_info = self.cfg.GetNodeInfo(self.op.remote_node)
snode_group = self.cfg.GetNodeGroup(snode_info.group)
- ipolicy = _CalculateGroupIPolicy(cluster, snode_group)
+ ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
+ snode_group)
_CheckTargetNodeIPolicy(self, ipolicy, instance, snode_info,
ignore=self.op.ignore_ipolicy)
if pnode_info.group != snode_info.group:
" free memory information" % pnode)
elif instance_info.fail_msg:
self.warn.append("Can't get instance runtime information: %s" %
- instance_info.fail_msg)
+ instance_info.fail_msg)
else:
if instance_info.payload:
current_mem = int(instance_info.payload["memory"])
raise errors.OpPrereqError("This change will prevent the instance"
" from starting, due to %d MB of memory"
" missing on its primary node" %
- miss_mem,
- errors.ECODE_NORES)
+ miss_mem, errors.ECODE_NORES)
if be_new[constants.BE_AUTO_BALANCE]:
for node, nres in nodeinfo.items():
instance.hypervisor)
remote_info.Raise("Error checking node %s" % instance.primary_node)
if not remote_info.payload: # not running already
- raise errors.OpPrereqError("Instance %s is not running" % instance.name,
- errors.ECODE_STATE)
+ raise errors.OpPrereqError("Instance %s is not running" %
+ instance.name, errors.ECODE_STATE)
current_memory = remote_info.payload["memory"]
if (not self.op.force and
self.op.runtime_mem < self.be_proposed[constants.BE_MINMEM])):
raise errors.OpPrereqError("Instance %s must have memory between %d"
" and %d MB of memory unless --force is"
- " given" % (instance.name,
+ " given" %
+ (instance.name,
self.be_proposed[constants.BE_MINMEM],
self.be_proposed[constants.BE_MAXMEM]),
errors.ECODE_INVAL)
if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
raise errors.OpPrereqError("Disk operations not supported for"
- " diskless instances",
- errors.ECODE_INVAL)
+ " diskless instances", errors.ECODE_INVAL)
def _PrepareNicCreate(_, params, private):
- self._PrepareNicModification(params, private, None, {}, cluster, pnode)
+ self._PrepareNicModification(params, private, None, None,
+ {}, cluster, pnode)
return (None, None)
def _PrepareNicMod(_, nic, params, private):
- self._PrepareNicModification(params, private, nic.ip,
+ self._PrepareNicModification(params, private, nic.ip, nic.network,
nic.nicparams, cluster, pnode)
return None
+ def _PrepareNicRemove(_, params, __):
+ ip = params.ip
+ net = params.network
+ if net is not None and ip is not None:
+ self.cfg.ReleaseIp(net, ip, self.proc.GetECId())
+
# Verify NIC changes (operating on copy)
nics = instance.nics[:]
ApplyContainerMods("NIC", nics, None, self.nicmod,
- _PrepareNicCreate, _PrepareNicMod, None)
+ _PrepareNicCreate, _PrepareNicMod, _PrepareNicRemove)
if len(nics) > constants.MAX_NICS:
raise errors.OpPrereqError("Instance has too many network interfaces"
" (%d), cannot add more" % constants.MAX_NICS,
raise errors.OpPrereqError("Instance has too many disks (%d), cannot add"
" more" % constants.MAX_DISKS,
errors.ECODE_STATE)
+ disk_sizes = [disk.size for disk in instance.disks]
+ disk_sizes.extend(params["size"] for (op, idx, params, private) in
+ self.diskmod if op == constants.DDM_ADD)
+ ispec[constants.ISPEC_DISK_COUNT] = len(disk_sizes)
+ ispec[constants.ISPEC_DISK_SIZE] = disk_sizes
if self.op.offline is not None:
if self.op.offline:
ApplyContainerMods("NIC", nics, self._nic_chgdesc, self.nicmod,
self._CreateNewNic, self._ApplyNicMods, None)
self._new_nics = nics
+ ispec[constants.ISPEC_NIC_COUNT] = len(self._new_nics)
else:
self._new_nics = None
+ ispec[constants.ISPEC_NIC_COUNT] = len(instance.nics)
+
+ if not self.op.ignore_ipolicy:
+ ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
+ group_info)
+
+ # Fill ispec with backend parameters
+ ispec[constants.ISPEC_SPINDLE_USE] = \
+ self.be_new.get(constants.BE_SPINDLE_USE, None)
+ ispec[constants.ISPEC_CPU_COUNT] = self.be_new.get(constants.BE_VCPUS,
+ None)
+
+ # Copy ispec to verify parameters with min/max values separately
+ ispec_max = ispec.copy()
+ ispec_max[constants.ISPEC_MEM_SIZE] = \
+ self.be_new.get(constants.BE_MAXMEM, None)
+ res_max = _ComputeIPolicyInstanceSpecViolation(ipolicy, ispec_max)
+ ispec_min = ispec.copy()
+ ispec_min[constants.ISPEC_MEM_SIZE] = \
+ self.be_new.get(constants.BE_MINMEM, None)
+ res_min = _ComputeIPolicyInstanceSpecViolation(ipolicy, ispec_min)
+
+ if (res_max or res_min):
+ # FIXME: Improve error message by including information about whether
+ # the upper or lower limit of the parameter fails the ipolicy.
+ msg = ("Instance allocation to group %s (%s) violates policy: %s" %
+ (group_info, group_info.name,
+ utils.CommaJoin(set(res_max + res_min))))
+ raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
def _ConvertPlainToDrbd(self, feedback_fn):
"""Converts an instance from plain to drbd.
"""
mac = params[constants.INIC_MAC]
ip = params.get(constants.INIC_IP, None)
- nicparams = private.params
+ net = params.get(constants.INIC_NETWORK, None)
+ #TODO: not private.filled?? can a nic have no nicparams??
+ nicparams = private.filled
- return (objects.NIC(mac=mac, ip=ip, nicparams=nicparams), [
+ return (objects.NIC(mac=mac, ip=ip, network=net, nicparams=nicparams), [
("nic.%d" % idx,
- "add:mac=%s,ip=%s,mode=%s,link=%s" %
+ "add:mac=%s,ip=%s,mode=%s,link=%s,network=%s" %
(mac, ip, private.filled[constants.NIC_MODE],
- private.filled[constants.NIC_LINK])),
+ private.filled[constants.NIC_LINK],
+ net)),
])
@staticmethod
"""
changes = []
- for key in [constants.INIC_MAC, constants.INIC_IP]:
+ for key in [constants.INIC_MAC, constants.INIC_IP, constants.INIC_NETWORK]:
if key in params:
changes.append(("nic.%s/%d" % (key, idx), params[key]))
setattr(nic, key, params[key])
- if private.params:
- nic.nicparams = private.params
+ if private.filled:
+ nic.nicparams = private.filled
- for (key, val) in params.items():
+ for (key, val) in nic.nicparams.items():
changes.append(("nic.%s/%d" % (key, idx), val))
return changes
self.cfg.MarkInstanceDown(instance.name)
result.append(("admin_state", constants.ADMINST_DOWN))
- self.cfg.Update(instance, feedback_fn)
+ self.cfg.Update(instance, feedback_fn, self.proc.GetECId())
assert not (self.owned_locks(locking.LEVEL_NODE_RES) or
self.owned_locks(locking.LEVEL_NODE)), \
def ExpandNames(self):
self.share_locks = _ShareAll()
+
self.needed_locks = {
locking.LEVEL_NODEGROUP: [],
locking.LEVEL_NODE: [],
+ locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
}
self._ExpandAndLockInstance()
assert instances == [self.op.instance_name], "Instance not locked"
- ial = IAllocator(self.cfg, self.rpc, constants.IALLOCATOR_MODE_CHG_GROUP,
- instances=instances, target_groups=list(self.target_uuids))
+ req = iallocator.IAReqGroupChange(instances=instances,
+ target_groups=list(self.target_uuids))
+ ial = iallocator.IAllocator(self.cfg, self.rpc, req)
ial.Run(self.op.iallocator)
raise errors.OpPrereqError("Can't compute solution for changing group of"
" instance '%s' using iallocator '%s': %s" %
(self.op.instance_name, self.op.iallocator,
- ial.info),
- errors.ECODE_NORES)
+ ial.info), errors.ECODE_NORES)
jobs = _LoadNodeEvacResult(self, ial.result, self.op.early_release, False)
locking.LEVEL_NODE: self.wanted,
}
+ if not self.names:
+ lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
+
def DeclareLocks(self, lu, level):
pass
# - removing the removal operation altogether
self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+ # Allocations should be stopped while this LU runs with node locks, but
+ # it doesn't have to be exclusive
+ self.share_locks[locking.LEVEL_NODE_ALLOC] = 1
+ self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
+
def DeclareLocks(self, level):
"""Last minute lock declaration."""
# All nodes are locked anyway, so nothing to do here.
self.instance.admin_state == constants.ADMINST_UP and
not self.op.shutdown):
raise errors.OpPrereqError("Can not remove instance without shutting it"
- " down before")
+ " down before", errors.ECODE_STATE)
if self.op.mode == constants.EXPORT_MODE_LOCAL:
self.op.target_node = _ExpandNodeName(self.cfg, self.op.target_node)
try:
(key_name, hmac_digest, hmac_salt) = self.x509_key_name
except (TypeError, ValueError), err:
- raise errors.OpPrereqError("Invalid data for X509 key name: %s" % err)
+ raise errors.OpPrereqError("Invalid data for X509 key name: %s" % err,
+ errors.ECODE_INVAL)
if not utils.VerifySha1Hmac(cds, key_name, hmac_digest, salt=hmac_salt):
raise errors.OpPrereqError("HMAC for X509 key name is wrong",
REQ_BGL = False
def ExpandNames(self):
- self.needed_locks = {}
- # We need all nodes to be locked in order for RemoveExport to work, but we
- # don't need to lock the instance itself, as nothing will happen to it (and
- # we can remove exports also for a removed instance)
- self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+ self.needed_locks = {
+ # We need all nodes to be locked in order for RemoveExport to work, but
+ # we don't need to lock the instance itself, as nothing will happen to it
+ # (and we can remove exports also for a removed instance)
+ locking.LEVEL_NODE: locking.ALL_SET,
+
+ # Removing backups is quick, so blocking allocations is justified
+ locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
+ }
+
+ # Allocations should be stopped while this LU runs with node locks, but it
+ # doesn't have to be exclusive
+ self.share_locks[locking.LEVEL_NODE_ALLOC] = 1
def Exec(self, feedback_fn):
"""Remove any export.
new_ipolicy = cluster.SimpleFillIPolicy(self.new_ipolicy)
inst_filter = lambda inst: inst.name in owned_instances
instances = self.cfg.GetInstancesInfoByFilter(inst_filter).values()
+ gmi = ganeti.masterd.instance
violations = \
- _ComputeNewInstanceViolations(_CalculateGroupIPolicy(cluster,
- self.group),
+ _ComputeNewInstanceViolations(gmi.CalculateGroupIPolicy(cluster,
+ self.group),
new_ipolicy, instances)
if violations:
# Verify the cluster would not be left group-less.
if len(self.cfg.GetNodeGroupList()) == 1:
- raise errors.OpPrereqError("Group '%s' is the only group,"
- " cannot be removed" %
- self.op.group_name,
+ raise errors.OpPrereqError("Group '%s' is the only group, cannot be"
+ " removed" % self.op.group_name,
errors.ECODE_STATE)
def BuildHooksEnv(self):
assert self.group_uuid not in self.target_uuids
- ial = IAllocator(self.cfg, self.rpc, constants.IALLOCATOR_MODE_CHG_GROUP,
- instances=instances, target_groups=self.target_uuids)
+ req = iallocator.IAReqGroupChange(instances=instances,
+ target_groups=self.target_uuids)
+ ial = iallocator.IAllocator(self.cfg, self.rpc, req)
ial.Run(self.op.iallocator)
self.group_uuid = self.cfg.LookupNodeGroup(self.op.name)
lock_level = locking.LEVEL_NODEGROUP
lock_name = self.group_uuid
+ elif self.op.kind == constants.TAG_NETWORK:
+ self.network_uuid = self.cfg.LookupNetwork(self.op.name)
+ lock_level = locking.LEVEL_NETWORK
+ lock_name = self.network_uuid
else:
lock_level = None
lock_name = None
self.target = self.cfg.GetInstanceInfo(self.op.name)
elif self.op.kind == constants.TAG_NODEGROUP:
self.target = self.cfg.GetNodeGroup(self.group_uuid)
+ elif self.op.kind == constants.TAG_NETWORK:
+ self.target = self.cfg.GetNetwork(self.network_uuid)
else:
raise errors.OpPrereqError("Wrong tag type requested (%s)" %
str(self.op.kind), errors.ECODE_INVAL)
else:
top_value = self.op.repeat - 1
for i in range(self.op.repeat):
- self.LogInfo("Test delay iteration %d/%d" % (i, top_value))
+ self.LogInfo("Test delay iteration %d/%d", i, top_value)
self._TestDelay()
+class LURestrictedCommand(NoHooksLU):
+ """Logical unit for executing restricted commands.
+
+ """
+ REQ_BGL = False
+
+ def ExpandNames(self):
+ if self.op.nodes:
+ self.op.nodes = _GetWantedNodes(self, self.op.nodes)
+
+ self.needed_locks = {
+ locking.LEVEL_NODE: self.op.nodes,
+ }
+ self.share_locks = {
+ locking.LEVEL_NODE: not self.op.use_locking,
+ }
+
+ def CheckPrereq(self):
+ """Check prerequisites.
+
+ """
+
+ def Exec(self, feedback_fn):
+ """Execute restricted command and return output.
+
+ """
+ owned_nodes = frozenset(self.owned_locks(locking.LEVEL_NODE))
+
+ # Check if correct locks are held
+ assert set(self.op.nodes).issubset(owned_nodes)
+
+ rpcres = self.rpc.call_restricted_command(self.op.nodes, self.op.command)
+
+ result = []
+
+ for node_name in self.op.nodes:
+ nres = rpcres[node_name]
+ if nres.fail_msg:
+ msg = ("Command '%s' on node '%s' failed: %s" %
+ (self.op.command, node_name, nres.fail_msg))
+ result.append((False, msg))
+ else:
+ result.append((True, nres.payload))
+
+ return result
+
+
class LUTestJqueue(NoHooksLU):
"""Utility LU to test some aspects of the job queue.
return True
-class IAllocator(object):
- """IAllocator framework.
+class LUTestAllocator(NoHooksLU):
+ """Run allocator tests.
- An IAllocator instance has three sets of attributes:
- - cfg that is needed to query the cluster
- - input data (all members of the _KEYS class attribute are required)
- - four buffer attributes (in|out_data|text), that represent the
- input (to the external script) in text and data structure format,
- and the output from it, again in two formats
- - the result variables from the script (success, info, nodes) for
- easy usage
+ This LU runs the allocator tests
"""
- # pylint: disable=R0902
- # lots of instance attributes
+ def CheckPrereq(self):
+ """Check prerequisites.
- def __init__(self, cfg, rpc_runner, mode, **kwargs):
- self.cfg = cfg
- self.rpc = rpc_runner
- # init buffer variables
- self.in_text = self.out_text = self.in_data = self.out_data = None
- # init all input fields so that pylint is happy
- self.mode = mode
- self.memory = self.disks = self.disk_template = self.spindle_use = None
- self.os = self.tags = self.nics = self.vcpus = None
- self.hypervisor = None
- self.relocate_from = None
- self.name = None
- self.instances = None
- self.evac_mode = None
- self.target_groups = []
- # computed fields
- self.required_nodes = None
- # init result fields
- self.success = self.info = self.result = None
+ This checks the opcode parameters depending on the director and mode test.
- try:
- (fn, keydata, self._result_check) = self._MODE_DATA[self.mode]
- except KeyError:
- raise errors.ProgrammerError("Unknown mode '%s' passed to the"
- " IAllocator" % self.mode)
+ """
+ if self.op.mode in (constants.IALLOCATOR_MODE_ALLOC,
+ constants.IALLOCATOR_MODE_MULTI_ALLOC):
+ for attr in ["memory", "disks", "disk_template",
+ "os", "tags", "nics", "vcpus"]:
+ if not hasattr(self.op, attr):
+ raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
+ attr, errors.ECODE_INVAL)
+ iname = self.cfg.ExpandInstanceName(self.op.name)
+ if iname is not None:
+ raise errors.OpPrereqError("Instance '%s' already in the cluster" %
+ iname, errors.ECODE_EXISTS)
+ if not isinstance(self.op.nics, list):
+ raise errors.OpPrereqError("Invalid parameter 'nics'",
+ errors.ECODE_INVAL)
+ if not isinstance(self.op.disks, list):
+ raise errors.OpPrereqError("Invalid parameter 'disks'",
+ errors.ECODE_INVAL)
+ for row in self.op.disks:
+ if (not isinstance(row, dict) or
+ constants.IDISK_SIZE not in row or
+ not isinstance(row[constants.IDISK_SIZE], int) or
+ constants.IDISK_MODE not in row or
+ row[constants.IDISK_MODE] not in constants.DISK_ACCESS_SET):
+ raise errors.OpPrereqError("Invalid contents of the 'disks'"
+ " parameter", errors.ECODE_INVAL)
+ if self.op.hypervisor is None:
+ self.op.hypervisor = self.cfg.GetHypervisorType()
+ elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
+ fname = _ExpandInstanceName(self.cfg, self.op.name)
+ self.op.name = fname
+ self.relocate_from = \
+ list(self.cfg.GetInstanceInfo(fname).secondary_nodes)
+ elif self.op.mode in (constants.IALLOCATOR_MODE_CHG_GROUP,
+ constants.IALLOCATOR_MODE_NODE_EVAC):
+ if not self.op.instances:
+ raise errors.OpPrereqError("Missing instances", errors.ECODE_INVAL)
+ self.op.instances = _GetWantedInstances(self, self.op.instances)
+ else:
+ raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
+ self.op.mode, errors.ECODE_INVAL)
- keyset = [n for (n, _) in keydata]
+ if self.op.direction == constants.IALLOCATOR_DIR_OUT:
+ if self.op.iallocator is None:
+ raise errors.OpPrereqError("Missing allocator name",
+ errors.ECODE_INVAL)
+ elif self.op.direction != constants.IALLOCATOR_DIR_IN:
+ raise errors.OpPrereqError("Wrong allocator test '%s'" %
+ self.op.direction, errors.ECODE_INVAL)
- for key in kwargs:
- if key not in keyset:
- raise errors.ProgrammerError("Invalid input parameter '%s' to"
- " IAllocator" % key)
- setattr(self, key, kwargs[key])
+ def Exec(self, feedback_fn):
+ """Run the allocator test.
- for key in keyset:
- if key not in kwargs:
- raise errors.ProgrammerError("Missing input parameter '%s' to"
- " IAllocator" % key)
- self._BuildInputData(compat.partial(fn, self), keydata)
+ """
+ if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
+ req = iallocator.IAReqInstanceAlloc(name=self.op.name,
+ memory=self.op.memory,
+ disks=self.op.disks,
+ disk_template=self.op.disk_template,
+ os=self.op.os,
+ tags=self.op.tags,
+ nics=self.op.nics,
+ vcpus=self.op.vcpus,
+ spindle_use=self.op.spindle_use,
+ hypervisor=self.op.hypervisor)
+ elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
+ req = iallocator.IAReqRelocate(name=self.op.name,
+ relocate_from=list(self.relocate_from))
+ elif self.op.mode == constants.IALLOCATOR_MODE_CHG_GROUP:
+ req = iallocator.IAReqGroupChange(instances=self.op.instances,
+ target_groups=self.op.target_groups)
+ elif self.op.mode == constants.IALLOCATOR_MODE_NODE_EVAC:
+ req = iallocator.IAReqNodeEvac(instances=self.op.instances,
+ evac_mode=self.op.evac_mode)
+ elif self.op.mode == constants.IALLOCATOR_MODE_MULTI_ALLOC:
+ disk_template = self.op.disk_template
+ insts = [iallocator.IAReqInstanceAlloc(name="%s%s" % (self.op.name, idx),
+ memory=self.op.memory,
+ disks=self.op.disks,
+ disk_template=disk_template,
+ os=self.op.os,
+ tags=self.op.tags,
+ nics=self.op.nics,
+ vcpus=self.op.vcpus,
+ spindle_use=self.op.spindle_use,
+ hypervisor=self.op.hypervisor)
+ for idx in range(self.op.count)]
+ req = iallocator.IAReqMultiInstanceAlloc(instances=insts)
+ else:
+ raise errors.ProgrammerError("Uncatched mode %s in"
+ " LUTestAllocator.Exec", self.op.mode)
+
+ ial = iallocator.IAllocator(self.cfg, self.rpc, req)
+ if self.op.direction == constants.IALLOCATOR_DIR_IN:
+ result = ial.in_text
+ else:
+ ial.Run(self.op.iallocator, validate=False)
+ result = ial.out_text
+ return result
- def _ComputeClusterData(self):
- """Compute the generic allocator input data.
- This is the data that is independent of the actual operation.
+class LUNetworkAdd(LogicalUnit):
+ """Logical unit for creating networks.
+
+ """
+ HPATH = "network-add"
+ HTYPE = constants.HTYPE_NETWORK
+ REQ_BGL = False
+
+ def BuildHooksNodes(self):
+ """Build hooks nodes.
"""
- cfg = self.cfg
- cluster_info = cfg.GetClusterInfo()
- # cluster data
- data = {
- "version": constants.IALLOCATOR_VERSION,
- "cluster_name": cfg.GetClusterName(),
- "cluster_tags": list(cluster_info.GetTags()),
- "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
- "ipolicy": cluster_info.ipolicy,
- }
- ninfo = cfg.GetAllNodesInfo()
- iinfo = cfg.GetAllInstancesInfo().values()
- i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
+ mn = self.cfg.GetMasterNode()
+ return ([mn], [mn])
+
+ def CheckArguments(self):
+ if self.op.mac_prefix:
+ self.op.mac_prefix = \
+ utils.NormalizeAndValidateThreeOctetMacPrefix(self.op.mac_prefix)
- # node data
- node_list = [n.name for n in ninfo.values() if n.vm_capable]
+ def ExpandNames(self):
+ self.network_uuid = self.cfg.GenerateUniqueID(self.proc.GetECId())
- if self.mode == constants.IALLOCATOR_MODE_ALLOC:
- hypervisor_name = self.hypervisor
- elif self.mode == constants.IALLOCATOR_MODE_RELOC:
- hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
+ if self.op.conflicts_check:
+ self.share_locks[locking.LEVEL_NODE] = 1
+ self.needed_locks = {
+ locking.LEVEL_NODE: locking.ALL_SET,
+ }
else:
- hypervisor_name = cluster_info.primary_hypervisor
+ self.needed_locks = {}
- node_data = self.rpc.call_node_info(node_list, [cfg.GetVGName()],
- [hypervisor_name])
- node_iinfo = \
- self.rpc.call_all_instances_info(node_list,
- cluster_info.enabled_hypervisors)
+ self.add_locks[locking.LEVEL_NETWORK] = self.network_uuid
- data["nodegroups"] = self._ComputeNodeGroupData(cfg)
+ def CheckPrereq(self):
+ if self.op.network is None:
+ raise errors.OpPrereqError("Network must be given",
+ errors.ECODE_INVAL)
- config_ndata = self._ComputeBasicNodeData(cfg, ninfo)
- data["nodes"] = self._ComputeDynamicNodeData(ninfo, node_data, node_iinfo,
- i_list, config_ndata)
- assert len(data["nodes"]) == len(ninfo), \
- "Incomplete node data computed"
+ uuid = self.cfg.LookupNetwork(self.op.network_name)
- data["instances"] = self._ComputeInstanceData(cluster_info, i_list)
+ if uuid:
+ raise errors.OpPrereqError("Network '%s' already defined" %
+ self.op.network, errors.ECODE_EXISTS)
- self.in_data = data
+ # Check tag validity
+ for tag in self.op.tags:
+ objects.TaggableObject.ValidateTag(tag)
- @staticmethod
- def _ComputeNodeGroupData(cfg):
- """Compute node groups data.
+ def BuildHooksEnv(self):
+ """Build hooks env.
"""
- cluster = cfg.GetClusterInfo()
- ng = dict((guuid, {
- "name": gdata.name,
- "alloc_policy": gdata.alloc_policy,
- "ipolicy": _CalculateGroupIPolicy(cluster, gdata),
- "tags": list(gdata.tags),
- })
- for guuid, gdata in cfg.GetAllNodeGroupsInfo().items())
-
- return ng
+ args = {
+ "name": self.op.network_name,
+ "subnet": self.op.network,
+ "gateway": self.op.gateway,
+ "network6": self.op.network6,
+ "gateway6": self.op.gateway6,
+ "mac_prefix": self.op.mac_prefix,
+ "network_type": self.op.network_type,
+ "tags": self.op.tags,
+ }
+ return _BuildNetworkHookEnv(**args) # pylint: disable=W0142
- @staticmethod
- def _ComputeBasicNodeData(cfg, node_cfg):
- """Compute global node data.
+ def Exec(self, feedback_fn):
+ """Add the ip pool to the cluster.
+
+ """
+ nobj = objects.Network(name=self.op.network_name,
+ network=self.op.network,
+ gateway=self.op.gateway,
+ network6=self.op.network6,
+ gateway6=self.op.gateway6,
+ mac_prefix=self.op.mac_prefix,
+ network_type=self.op.network_type,
+ uuid=self.network_uuid,
+ family=constants.IP4_VERSION)
+ # Initialize the associated address pool
+ try:
+ pool = network.AddressPool.InitializeNetwork(nobj)
+ except errors.AddressPoolError, e:
+ raise errors.OpExecError("Cannot create IP pool for this network. %s" % e)
+
+ # Check if we need to reserve the nodes and the cluster master IP
+ # These may not be allocated to any instances in routed mode, as
+ # they wouldn't function anyway.
+ if self.op.conflicts_check:
+ for node in self.cfg.GetAllNodesInfo().values():
+ for ip in [node.primary_ip, node.secondary_ip]:
+ try:
+ if pool.Contains(ip):
+ pool.Reserve(ip)
+ self.LogInfo("Reserved IP address of node '%s' (%s)",
+ node.name, ip)
+ except errors.AddressPoolError:
+ self.LogWarning("Cannot reserve IP address of node '%s' (%s)",
+ node.name, ip)
+
+ master_ip = self.cfg.GetClusterInfo().master_ip
+ try:
+ if pool.Contains(master_ip):
+ pool.Reserve(master_ip)
+ self.LogInfo("Reserved cluster master IP address (%s)", master_ip)
+ except errors.AddressPoolError:
+ self.LogWarning("Cannot reserve cluster master IP address (%s)",
+ master_ip)
+
+ if self.op.add_reserved_ips:
+ for ip in self.op.add_reserved_ips:
+ try:
+ pool.Reserve(ip, external=True)
+ except errors.AddressPoolError, e:
+ raise errors.OpExecError("Cannot reserve IP %s. %s " % (ip, e))
- @rtype: dict
- @returns: a dict of name: (node dict, node config)
-
- """
- # fill in static (config-based) values
- node_results = dict((ninfo.name, {
- "tags": list(ninfo.GetTags()),
- "primary_ip": ninfo.primary_ip,
- "secondary_ip": ninfo.secondary_ip,
- "offline": ninfo.offline,
- "drained": ninfo.drained,
- "master_candidate": ninfo.master_candidate,
- "group": ninfo.group,
- "master_capable": ninfo.master_capable,
- "vm_capable": ninfo.vm_capable,
- "ndparams": cfg.GetNdParams(ninfo),
- })
- for ninfo in node_cfg.values())
+ if self.op.tags:
+ for tag in self.op.tags:
+ nobj.AddTag(tag)
- return node_results
+ self.cfg.AddNetwork(nobj, self.proc.GetECId(), check_uuid=False)
+ del self.remove_locks[locking.LEVEL_NETWORK]
- @staticmethod
- def _ComputeDynamicNodeData(node_cfg, node_data, node_iinfo, i_list,
- node_results):
- """Compute global node data.
-
- @param node_results: the basic node structures as filled from the config
-
- """
- #TODO(dynmem): compute the right data on MAX and MIN memory
- # make a copy of the current dict
- node_results = dict(node_results)
- for nname, nresult in node_data.items():
- assert nname in node_results, "Missing basic data for node %s" % nname
- ninfo = node_cfg[nname]
-
- if not (ninfo.offline or ninfo.drained):
- nresult.Raise("Can't get data for node %s" % nname)
- node_iinfo[nname].Raise("Can't get node instance info from node %s" %
- nname)
- remote_info = _MakeLegacyNodeInfo(nresult.payload)
-
- for attr in ["memory_total", "memory_free", "memory_dom0",
- "vg_size", "vg_free", "cpu_total"]:
- if attr not in remote_info:
- raise errors.OpExecError("Node '%s' didn't return attribute"
- " '%s'" % (nname, attr))
- if not isinstance(remote_info[attr], int):
- raise errors.OpExecError("Node '%s' returned invalid value"
- " for '%s': %s" %
- (nname, attr, remote_info[attr]))
- # compute memory used by primary instances
- i_p_mem = i_p_up_mem = 0
- for iinfo, beinfo in i_list:
- if iinfo.primary_node == nname:
- i_p_mem += beinfo[constants.BE_MAXMEM]
- if iinfo.name not in node_iinfo[nname].payload:
- i_used_mem = 0
- else:
- i_used_mem = int(node_iinfo[nname].payload[iinfo.name]["memory"])
- i_mem_diff = beinfo[constants.BE_MAXMEM] - i_used_mem
- remote_info["memory_free"] -= max(0, i_mem_diff)
-
- if iinfo.admin_state == constants.ADMINST_UP:
- i_p_up_mem += beinfo[constants.BE_MAXMEM]
-
- # compute memory used by instances
- pnr_dyn = {
- "total_memory": remote_info["memory_total"],
- "reserved_memory": remote_info["memory_dom0"],
- "free_memory": remote_info["memory_free"],
- "total_disk": remote_info["vg_size"],
- "free_disk": remote_info["vg_free"],
- "total_cpus": remote_info["cpu_total"],
- "i_pri_memory": i_p_mem,
- "i_pri_up_memory": i_p_up_mem,
- }
- pnr_dyn.update(node_results[nname])
- node_results[nname] = pnr_dyn
- return node_results
+class LUNetworkRemove(LogicalUnit):
+ HPATH = "network-remove"
+ HTYPE = constants.HTYPE_NETWORK
+ REQ_BGL = False
- @staticmethod
- def _ComputeInstanceData(cluster_info, i_list):
- """Compute global instance data.
-
- """
- instance_data = {}
- for iinfo, beinfo in i_list:
- nic_data = []
- for nic in iinfo.nics:
- filled_params = cluster_info.SimpleFillNIC(nic.nicparams)
- nic_dict = {
- "mac": nic.mac,
- "ip": nic.ip,
- "mode": filled_params[constants.NIC_MODE],
- "link": filled_params[constants.NIC_LINK],
- }
- if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
- nic_dict["bridge"] = filled_params[constants.NIC_LINK]
- nic_data.append(nic_dict)
- pir = {
- "tags": list(iinfo.GetTags()),
- "admin_state": iinfo.admin_state,
- "vcpus": beinfo[constants.BE_VCPUS],
- "memory": beinfo[constants.BE_MAXMEM],
- "spindle_use": beinfo[constants.BE_SPINDLE_USE],
- "os": iinfo.os,
- "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
- "nics": nic_data,
- "disks": [{constants.IDISK_SIZE: dsk.size,
- constants.IDISK_MODE: dsk.mode}
- for dsk in iinfo.disks],
- "disk_template": iinfo.disk_template,
- "hypervisor": iinfo.hypervisor,
- }
- pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
- pir["disks"])
- instance_data[iinfo.name] = pir
+ def ExpandNames(self):
+ self.network_uuid = self.cfg.LookupNetwork(self.op.network_name)
- return instance_data
+ if not self.network_uuid:
+ raise errors.OpPrereqError(("Network '%s' not found" %
+ self.op.network_name),
+ errors.ECODE_INVAL)
- def _AddNewInstance(self):
- """Add new instance data to allocator structure.
+ self.share_locks[locking.LEVEL_NODEGROUP] = 1
+ self.needed_locks = {
+ locking.LEVEL_NETWORK: [self.network_uuid],
+ locking.LEVEL_NODEGROUP: locking.ALL_SET,
+ }
- This in combination with _AllocatorGetClusterData will create the
- correct structure needed as input for the allocator.
+ def CheckPrereq(self):
+ """Check prerequisites.
- The checks for the completeness of the opcode must have already been
- done.
+ This checks that the given network name exists as a network, that is
+ empty (i.e., contains no nodes), and that is not the last group of the
+ cluster.
"""
- disk_space = _ComputeDiskSize(self.disk_template, self.disks)
+ # Verify that the network is not conncted.
+ node_groups = [group.name
+ for group in self.cfg.GetAllNodeGroupsInfo().values()
+ if self.network_uuid in group.networks]
- if self.disk_template in constants.DTS_INT_MIRROR:
- self.required_nodes = 2
- else:
- self.required_nodes = 1
+ if node_groups:
+ self.LogWarning("Network '%s' is connected to the following"
+ " node groups: %s" %
+ (self.op.network_name,
+ utils.CommaJoin(utils.NiceSort(node_groups))))
+ raise errors.OpPrereqError("Network still connected", errors.ECODE_STATE)
- request = {
- "name": self.name,
- "disk_template": self.disk_template,
- "tags": self.tags,
- "os": self.os,
- "vcpus": self.vcpus,
- "memory": self.memory,
- "spindle_use": self.spindle_use,
- "disks": self.disks,
- "disk_space_total": disk_space,
- "nics": self.nics,
- "required_nodes": self.required_nodes,
- "hypervisor": self.hypervisor,
- }
+ def BuildHooksEnv(self):
+ """Build hooks env.
- return request
+ """
+ return {
+ "NETWORK_NAME": self.op.network_name,
+ }
- def _AddRelocateInstance(self):
- """Add relocate instance data to allocator structure.
+ def BuildHooksNodes(self):
+ """Build hooks nodes.
- This in combination with _IAllocatorGetClusterData will create the
- correct structure needed as input for the allocator.
+ """
+ mn = self.cfg.GetMasterNode()
+ return ([mn], [mn])
- The checks for the completeness of the opcode must have already been
- done.
+ def Exec(self, feedback_fn):
+ """Remove the network.
"""
- instance = self.cfg.GetInstanceInfo(self.name)
- if instance is None:
- raise errors.ProgrammerError("Unknown instance '%s' passed to"
- " IAllocator" % self.name)
+ try:
+ self.cfg.RemoveNetwork(self.network_uuid)
+ except errors.ConfigurationError:
+ raise errors.OpExecError("Network '%s' with UUID %s disappeared" %
+ (self.op.network_name, self.network_uuid))
- if instance.disk_template not in constants.DTS_MIRRORED:
- raise errors.OpPrereqError("Can't relocate non-mirrored instances",
- errors.ECODE_INVAL)
- if instance.disk_template in constants.DTS_INT_MIRROR and \
- len(instance.secondary_nodes) != 1:
- raise errors.OpPrereqError("Instance has not exactly one secondary node",
- errors.ECODE_STATE)
+class LUNetworkSetParams(LogicalUnit):
+ """Modifies the parameters of a network.
+
+ """
+ HPATH = "network-modify"
+ HTYPE = constants.HTYPE_NETWORK
+ REQ_BGL = False
- self.required_nodes = 1
- disk_sizes = [{constants.IDISK_SIZE: disk.size} for disk in instance.disks]
- disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
+ def CheckArguments(self):
+ if (self.op.gateway and
+ (self.op.add_reserved_ips or self.op.remove_reserved_ips)):
+ raise errors.OpPrereqError("Cannot modify gateway and reserved ips"
+ " at once", errors.ECODE_INVAL)
- request = {
- "name": self.name,
- "disk_space_total": disk_space,
- "required_nodes": self.required_nodes,
- "relocate_from": self.relocate_from,
+ def ExpandNames(self):
+ self.network_uuid = self.cfg.LookupNetwork(self.op.network_name)
+ if self.network_uuid is None:
+ raise errors.OpPrereqError(("Network '%s' not found" %
+ self.op.network_name),
+ errors.ECODE_INVAL)
+
+ self.needed_locks = {
+ locking.LEVEL_NETWORK: [self.network_uuid],
}
- return request
- def _AddNodeEvacuate(self):
- """Get data for node-evacuate requests.
+ def CheckPrereq(self):
+ """Check prerequisites.
"""
- return {
- "instances": self.instances,
- "evac_mode": self.evac_mode,
- }
+ self.network = self.cfg.GetNetwork(self.network_uuid)
+ self.gateway = self.network.gateway
+ self.network_type = self.network.network_type
+ self.mac_prefix = self.network.mac_prefix
+ self.network6 = self.network.network6
+ self.gateway6 = self.network.gateway6
+ self.tags = self.network.tags
+
+ self.pool = network.AddressPool(self.network)
- def _AddChangeGroup(self):
- """Get data for node-evacuate requests.
+ if self.op.gateway:
+ if self.op.gateway == constants.VALUE_NONE:
+ self.gateway = None
+ else:
+ self.gateway = self.op.gateway
+ if self.pool.IsReserved(self.gateway):
+ raise errors.OpPrereqError("%s is already reserved" %
+ self.gateway, errors.ECODE_INVAL)
+
+ if self.op.network_type:
+ if self.op.network_type == constants.VALUE_NONE:
+ self.network_type = None
+ else:
+ self.network_type = self.op.network_type
+
+ if self.op.mac_prefix:
+ if self.op.mac_prefix == constants.VALUE_NONE:
+ self.mac_prefix = None
+ else:
+ self.mac_prefix = \
+ utils.NormalizeAndValidateThreeOctetMacPrefix(self.op.mac_prefix)
+
+ if self.op.gateway6:
+ if self.op.gateway6 == constants.VALUE_NONE:
+ self.gateway6 = None
+ else:
+ self.gateway6 = self.op.gateway6
+
+ if self.op.network6:
+ if self.op.network6 == constants.VALUE_NONE:
+ self.network6 = None
+ else:
+ self.network6 = self.op.network6
+
+ def BuildHooksEnv(self):
+ """Build hooks env.
"""
- return {
- "instances": self.instances,
- "target_groups": self.target_groups,
+ args = {
+ "name": self.op.network_name,
+ "subnet": self.network.network,
+ "gateway": self.gateway,
+ "network6": self.network6,
+ "gateway6": self.gateway6,
+ "mac_prefix": self.mac_prefix,
+ "network_type": self.network_type,
+ "tags": self.tags,
}
+ return _BuildNetworkHookEnv(**args) # pylint: disable=W0142
- def _BuildInputData(self, fn, keydata):
- """Build input data structures.
-
- """
- self._ComputeClusterData()
-
- request = fn()
- request["type"] = self.mode
- for keyname, keytype in keydata:
- if keyname not in request:
- raise errors.ProgrammerError("Request parameter %s is missing" %
- keyname)
- val = request[keyname]
- if not keytype(val):
- raise errors.ProgrammerError("Request parameter %s doesn't pass"
- " validation, value %s, expected"
- " type %s" % (keyname, val, keytype))
- self.in_data["request"] = request
-
- self.in_text = serializer.Dump(self.in_data)
-
- _STRING_LIST = ht.TListOf(ht.TString)
- _JOB_LIST = ht.TListOf(ht.TListOf(ht.TStrictDict(True, False, {
- # pylint: disable=E1101
- # Class '...' has no 'OP_ID' member
- "OP_ID": ht.TElemOf([opcodes.OpInstanceFailover.OP_ID,
- opcodes.OpInstanceMigrate.OP_ID,
- opcodes.OpInstanceReplaceDisks.OP_ID])
- })))
-
- _NEVAC_MOVED = \
- ht.TListOf(ht.TAnd(ht.TIsLength(3),
- ht.TItems([ht.TNonEmptyString,
- ht.TNonEmptyString,
- ht.TListOf(ht.TNonEmptyString),
- ])))
- _NEVAC_FAILED = \
- ht.TListOf(ht.TAnd(ht.TIsLength(2),
- ht.TItems([ht.TNonEmptyString,
- ht.TMaybeString,
- ])))
- _NEVAC_RESULT = ht.TAnd(ht.TIsLength(3),
- ht.TItems([_NEVAC_MOVED, _NEVAC_FAILED, _JOB_LIST]))
-
- _MODE_DATA = {
- constants.IALLOCATOR_MODE_ALLOC:
- (_AddNewInstance,
- [
- ("name", ht.TString),
- ("memory", ht.TInt),
- ("spindle_use", ht.TInt),
- ("disks", ht.TListOf(ht.TDict)),
- ("disk_template", ht.TString),
- ("os", ht.TString),
- ("tags", _STRING_LIST),
- ("nics", ht.TListOf(ht.TDict)),
- ("vcpus", ht.TInt),
- ("hypervisor", ht.TString),
- ], ht.TList),
- constants.IALLOCATOR_MODE_RELOC:
- (_AddRelocateInstance,
- [("name", ht.TString), ("relocate_from", _STRING_LIST)],
- ht.TList),
- constants.IALLOCATOR_MODE_NODE_EVAC:
- (_AddNodeEvacuate, [
- ("instances", _STRING_LIST),
- ("evac_mode", ht.TElemOf(constants.IALLOCATOR_NEVAC_MODES)),
- ], _NEVAC_RESULT),
- constants.IALLOCATOR_MODE_CHG_GROUP:
- (_AddChangeGroup, [
- ("instances", _STRING_LIST),
- ("target_groups", _STRING_LIST),
- ], _NEVAC_RESULT),
- }
+ def BuildHooksNodes(self):
+ """Build hooks nodes.
+
+ """
+ mn = self.cfg.GetMasterNode()
+ return ([mn], [mn])
- def Run(self, name, validate=True, call_fn=None):
- """Run an instance allocator and return the results.
+ def Exec(self, feedback_fn):
+ """Modifies the network.
"""
- if call_fn is None:
- call_fn = self.rpc.call_iallocator_runner
+ #TODO: reserve/release via temporary reservation manager
+ # extend cfg.ReserveIp/ReleaseIp with the external flag
+ if self.op.gateway:
+ if self.gateway == self.network.gateway:
+ self.LogWarning("Gateway is already %s", self.gateway)
+ else:
+ if self.gateway:
+ self.pool.Reserve(self.gateway, external=True)
+ if self.network.gateway:
+ self.pool.Release(self.network.gateway, external=True)
+ self.network.gateway = self.gateway
+
+ if self.op.add_reserved_ips:
+ for ip in self.op.add_reserved_ips:
+ try:
+ if self.pool.IsReserved(ip):
+ self.LogWarning("IP address %s is already reserved", ip)
+ else:
+ self.pool.Reserve(ip, external=True)
+ except errors.AddressPoolError, err:
+ self.LogWarning("Cannot reserve IP address %s: %s", ip, err)
+
+ if self.op.remove_reserved_ips:
+ for ip in self.op.remove_reserved_ips:
+ if ip == self.network.gateway:
+ self.LogWarning("Cannot unreserve Gateway's IP")
+ continue
+ try:
+ if not self.pool.IsReserved(ip):
+ self.LogWarning("IP address %s is already unreserved", ip)
+ else:
+ self.pool.Release(ip, external=True)
+ except errors.AddressPoolError, err:
+ self.LogWarning("Cannot release IP address %s: %s", ip, err)
- result = call_fn(self.cfg.GetMasterNode(), name, self.in_text)
- result.Raise("Failure while running the iallocator script")
+ if self.op.mac_prefix:
+ self.network.mac_prefix = self.mac_prefix
- self.out_text = result.payload
- if validate:
- self._ValidateResult()
+ if self.op.network6:
+ self.network.network6 = self.network6
- def _ValidateResult(self):
- """Process the allocator results.
+ if self.op.gateway6:
+ self.network.gateway6 = self.gateway6
- This will process and if successful save the result in
- self.out_data and the other parameters.
+ if self.op.network_type:
+ self.network.network_type = self.network_type
- """
- try:
- rdict = serializer.Load(self.out_text)
- except Exception, err:
- raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
-
- if not isinstance(rdict, dict):
- raise errors.OpExecError("Can't parse iallocator results: not a dict")
-
- # TODO: remove backwards compatiblity in later versions
- if "nodes" in rdict and "result" not in rdict:
- rdict["result"] = rdict["nodes"]
- del rdict["nodes"]
-
- for key in "success", "info", "result":
- if key not in rdict:
- raise errors.OpExecError("Can't parse iallocator results:"
- " missing key '%s'" % key)
- setattr(self, key, rdict[key])
-
- if not self._result_check(self.result):
- raise errors.OpExecError("Iallocator returned invalid result,"
- " expected %s, got %s" %
- (self._result_check, self.result),
- errors.ECODE_INVAL)
+ self.pool.Validate()
+
+ self.cfg.Update(self.network, feedback_fn)
- if self.mode == constants.IALLOCATOR_MODE_RELOC:
- assert self.relocate_from is not None
- assert self.required_nodes == 1
- node2group = dict((name, ndata["group"])
- for (name, ndata) in self.in_data["nodes"].items())
+class _NetworkQuery(_QueryBase):
+ FIELDS = query.NETWORK_FIELDS
- fn = compat.partial(self._NodesToGroups, node2group,
- self.in_data["nodegroups"])
+ def ExpandNames(self, lu):
+ lu.needed_locks = {}
- instance = self.cfg.GetInstanceInfo(self.name)
- request_groups = fn(self.relocate_from + [instance.primary_node])
- result_groups = fn(rdict["result"] + [instance.primary_node])
+ self._all_networks = lu.cfg.GetAllNetworksInfo()
+ name_to_uuid = dict((n.name, n.uuid) for n in self._all_networks.values())
- if self.success and not set(result_groups).issubset(request_groups):
- raise errors.OpExecError("Groups of nodes returned by iallocator (%s)"
- " differ from original groups (%s)" %
- (utils.CommaJoin(result_groups),
- utils.CommaJoin(request_groups)))
+ if not self.names:
+ self.wanted = [name_to_uuid[name]
+ for name in utils.NiceSort(name_to_uuid.keys())]
+ else:
+ # Accept names to be either names or UUIDs.
+ missing = []
+ self.wanted = []
+ all_uuid = frozenset(self._all_networks.keys())
- elif self.mode == constants.IALLOCATOR_MODE_NODE_EVAC:
- assert self.evac_mode in constants.IALLOCATOR_NEVAC_MODES
+ for name in self.names:
+ if name in all_uuid:
+ self.wanted.append(name)
+ elif name in name_to_uuid:
+ self.wanted.append(name_to_uuid[name])
+ else:
+ missing.append(name)
- self.out_data = rdict
+ if missing:
+ raise errors.OpPrereqError("Some networks do not exist: %s" % missing,
+ errors.ECODE_NOENT)
- @staticmethod
- def _NodesToGroups(node2group, groups, nodes):
- """Returns a list of unique group names for a list of nodes.
+ def DeclareLocks(self, lu, level):
+ pass
- @type node2group: dict
- @param node2group: Map from node name to group UUID
- @type groups: dict
- @param groups: Group information
- @type nodes: list
- @param nodes: Node names
+ def _GetQueryData(self, lu):
+ """Computes the list of networks and their attributes.
"""
- result = set()
+ do_instances = query.NETQ_INST in self.requested_data
+ do_groups = do_instances or (query.NETQ_GROUP in self.requested_data)
+ do_stats = query.NETQ_STATS in self.requested_data
- for node in nodes:
- try:
- group_uuid = node2group[node]
- except KeyError:
- # Ignore unknown node
- pass
- else:
- try:
- group = groups[group_uuid]
- except KeyError:
- # Can't find group, let's use UUID
- group_name = group_uuid
- else:
- group_name = group["name"]
+ network_to_groups = None
+ network_to_instances = None
+ stats = None
- result.add(group_name)
+ # For NETQ_GROUP, we need to map network->[groups]
+ if do_groups:
+ all_groups = lu.cfg.GetAllNodeGroupsInfo()
+ network_to_groups = dict((uuid, []) for uuid in self.wanted)
- return sorted(result)
+ if do_instances:
+ all_instances = lu.cfg.GetAllInstancesInfo()
+ all_nodes = lu.cfg.GetAllNodesInfo()
+ network_to_instances = dict((uuid, []) for uuid in self.wanted)
+
+ for group in all_groups.values():
+ if do_instances:
+ group_nodes = [node.name for node in all_nodes.values() if
+ node.group == group.uuid]
+ group_instances = [instance for instance in all_instances.values()
+ if instance.primary_node in group_nodes]
+
+ for net_uuid in group.networks.keys():
+ if net_uuid in network_to_groups:
+ netparams = group.networks[net_uuid]
+ mode = netparams[constants.NIC_MODE]
+ link = netparams[constants.NIC_LINK]
+ info = group.name + "(" + mode + ", " + link + ")"
+ network_to_groups[net_uuid].append(info)
+
+ if do_instances:
+ for instance in group_instances:
+ for nic in instance.nics:
+ if nic.network == self._all_networks[net_uuid].name:
+ network_to_instances[net_uuid].append(instance.name)
+ break
+
+ if do_stats:
+ stats = {}
+ for uuid, net in self._all_networks.items():
+ if uuid in self.wanted:
+ pool = network.AddressPool(net)
+ stats[uuid] = {
+ "free_count": pool.GetFreeCount(),
+ "reserved_count": pool.GetReservedCount(),
+ "map": pool.GetMap(),
+ "external_reservations":
+ utils.CommaJoin(pool.GetExternalReservations()),
+ }
+
+ return query.NetworkQueryData([self._all_networks[uuid]
+ for uuid in self.wanted],
+ network_to_groups,
+ network_to_instances,
+ stats)
+
+
+class LUNetworkQuery(NoHooksLU):
+ """Logical unit for querying networks.
+ """
+ REQ_BGL = False
-class LUTestAllocator(NoHooksLU):
- """Run allocator tests.
+ def CheckArguments(self):
+ self.nq = _NetworkQuery(qlang.MakeSimpleFilter("name", self.op.names),
+ self.op.output_fields, False)
- This LU runs the allocator tests
+ def ExpandNames(self):
+ self.nq.ExpandNames(self)
+
+ def Exec(self, feedback_fn):
+ return self.nq.OldStyleQuery(self)
+
+
+class LUNetworkConnect(LogicalUnit):
+ """Connect a network to a nodegroup
"""
+ HPATH = "network-connect"
+ HTYPE = constants.HTYPE_NETWORK
+ REQ_BGL = False
+
+ def ExpandNames(self):
+ self.network_name = self.op.network_name
+ self.group_name = self.op.group_name
+ self.network_mode = self.op.network_mode
+ self.network_link = self.op.network_link
+
+ self.network_uuid = self.cfg.LookupNetwork(self.network_name)
+ if self.network_uuid is None:
+ raise errors.OpPrereqError("Network %s does not exist" %
+ self.network_name, errors.ECODE_INVAL)
+
+ self.group_uuid = self.cfg.LookupNodeGroup(self.group_name)
+ if self.group_uuid is None:
+ raise errors.OpPrereqError("Group %s does not exist" %
+ self.group_name, errors.ECODE_INVAL)
+
+ self.needed_locks = {
+ locking.LEVEL_INSTANCE: [],
+ locking.LEVEL_NODEGROUP: [self.group_uuid],
+ }
+ self.share_locks[locking.LEVEL_INSTANCE] = 1
+
+ if self.op.conflicts_check:
+ self.needed_locks[locking.LEVEL_NETWORK] = [self.network_uuid]
+ self.share_locks[locking.LEVEL_NETWORK] = 1
+
+ def DeclareLocks(self, level):
+ if level == locking.LEVEL_INSTANCE:
+ assert not self.needed_locks[locking.LEVEL_INSTANCE]
+
+ # Lock instances optimistically, needs verification once group lock has
+ # been acquired
+ if self.op.conflicts_check:
+ self.needed_locks[locking.LEVEL_INSTANCE] = \
+ self.cfg.GetNodeGroupInstances(self.group_uuid)
+
+ def BuildHooksEnv(self):
+ ret = {
+ "GROUP_NAME": self.group_name,
+ "GROUP_NETWORK_MODE": self.network_mode,
+ "GROUP_NETWORK_LINK": self.network_link,
+ }
+ return ret
+
+ def BuildHooksNodes(self):
+ nodes = self.cfg.GetNodeGroup(self.group_uuid).members
+ return (nodes, nodes)
+
def CheckPrereq(self):
- """Check prerequisites.
+ owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP))
- This checks the opcode parameters depending on the director and mode test.
+ assert self.group_uuid in owned_groups
- """
- if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
- for attr in ["memory", "disks", "disk_template",
- "os", "tags", "nics", "vcpus"]:
- if not hasattr(self.op, attr):
- raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
- attr, errors.ECODE_INVAL)
- iname = self.cfg.ExpandInstanceName(self.op.name)
- if iname is not None:
- raise errors.OpPrereqError("Instance '%s' already in the cluster" %
- iname, errors.ECODE_EXISTS)
- if not isinstance(self.op.nics, list):
- raise errors.OpPrereqError("Invalid parameter 'nics'",
- errors.ECODE_INVAL)
- if not isinstance(self.op.disks, list):
- raise errors.OpPrereqError("Invalid parameter 'disks'",
- errors.ECODE_INVAL)
- for row in self.op.disks:
- if (not isinstance(row, dict) or
- constants.IDISK_SIZE not in row or
- not isinstance(row[constants.IDISK_SIZE], int) or
- constants.IDISK_MODE not in row or
- row[constants.IDISK_MODE] not in constants.DISK_ACCESS_SET):
- raise errors.OpPrereqError("Invalid contents of the 'disks'"
- " parameter", errors.ECODE_INVAL)
- if self.op.hypervisor is None:
- self.op.hypervisor = self.cfg.GetHypervisorType()
- elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
- fname = _ExpandInstanceName(self.cfg, self.op.name)
- self.op.name = fname
- self.relocate_from = \
- list(self.cfg.GetInstanceInfo(fname).secondary_nodes)
- elif self.op.mode in (constants.IALLOCATOR_MODE_CHG_GROUP,
- constants.IALLOCATOR_MODE_NODE_EVAC):
- if not self.op.instances:
- raise errors.OpPrereqError("Missing instances", errors.ECODE_INVAL)
- self.op.instances = _GetWantedInstances(self, self.op.instances)
- else:
- raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
- self.op.mode, errors.ECODE_INVAL)
+ l = lambda value: utils.CommaJoin("%s: %s/%s" % (i[0], i[1], i[2])
+ for i in value)
- if self.op.direction == constants.IALLOCATOR_DIR_OUT:
- if self.op.allocator is None:
- raise errors.OpPrereqError("Missing allocator name",
+ self.netparams = {
+ constants.NIC_MODE: self.network_mode,
+ constants.NIC_LINK: self.network_link,
+ }
+ objects.NIC.CheckParameterSyntax(self.netparams)
+
+ self.group = self.cfg.GetNodeGroup(self.group_uuid)
+ #if self.network_mode == constants.NIC_MODE_BRIDGED:
+ # _CheckNodeGroupBridgesExist(self, self.network_link, self.group_uuid)
+ self.connected = False
+ if self.network_uuid in self.group.networks:
+ self.LogWarning("Network '%s' is already mapped to group '%s'" %
+ (self.network_name, self.group.name))
+ self.connected = True
+ return
+
+ if self.op.conflicts_check:
+ # Check if locked instances are still correct
+ owned_instances = frozenset(self.owned_locks(locking.LEVEL_INSTANCE))
+ _CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_instances)
+
+ nobj = self.cfg.GetNetwork(self.network_uuid)
+ pool = network.AddressPool(nobj)
+ conflicting_instances = []
+
+ for (_, instance) in self.cfg.GetMultiInstanceInfo(owned_instances):
+ for idx, nic in enumerate(instance.nics):
+ if pool.Contains(nic.ip):
+ conflicting_instances.append((instance.name, idx, nic.ip))
+
+ if conflicting_instances:
+ self.LogWarning("Following occurences use IPs from network %s"
+ " that is about to connect to nodegroup %s: %s" %
+ (self.network_name, self.group.name,
+ l(conflicting_instances)))
+ raise errors.OpPrereqError("Conflicting IPs found."
+ " Please remove/modify"
+ " corresponding NICs",
errors.ECODE_INVAL)
- elif self.op.direction != constants.IALLOCATOR_DIR_IN:
- raise errors.OpPrereqError("Wrong allocator test '%s'" %
- self.op.direction, errors.ECODE_INVAL)
def Exec(self, feedback_fn):
- """Run the allocator test.
+ if self.connected:
+ return
- """
- if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
- ial = IAllocator(self.cfg, self.rpc,
- mode=self.op.mode,
- name=self.op.name,
- memory=self.op.memory,
- disks=self.op.disks,
- disk_template=self.op.disk_template,
- os=self.op.os,
- tags=self.op.tags,
- nics=self.op.nics,
- vcpus=self.op.vcpus,
- hypervisor=self.op.hypervisor,
- spindle_use=self.op.spindle_use,
- )
- elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
- ial = IAllocator(self.cfg, self.rpc,
- mode=self.op.mode,
- name=self.op.name,
- relocate_from=list(self.relocate_from),
- )
- elif self.op.mode == constants.IALLOCATOR_MODE_CHG_GROUP:
- ial = IAllocator(self.cfg, self.rpc,
- mode=self.op.mode,
- instances=self.op.instances,
- target_groups=self.op.target_groups)
- elif self.op.mode == constants.IALLOCATOR_MODE_NODE_EVAC:
- ial = IAllocator(self.cfg, self.rpc,
- mode=self.op.mode,
- instances=self.op.instances,
- evac_mode=self.op.evac_mode)
- else:
- raise errors.ProgrammerError("Uncatched mode %s in"
- " LUTestAllocator.Exec", self.op.mode)
+ self.group.networks[self.network_uuid] = self.netparams
+ self.cfg.Update(self.group, feedback_fn)
- if self.op.direction == constants.IALLOCATOR_DIR_IN:
- result = ial.in_text
- else:
- ial.Run(self.op.allocator, validate=False)
- result = ial.out_text
- return result
+
+class LUNetworkDisconnect(LogicalUnit):
+ """Disconnect a network to a nodegroup
+
+ """
+ HPATH = "network-disconnect"
+ HTYPE = constants.HTYPE_NETWORK
+ REQ_BGL = False
+
+ def ExpandNames(self):
+ self.network_name = self.op.network_name
+ self.group_name = self.op.group_name
+
+ self.network_uuid = self.cfg.LookupNetwork(self.network_name)
+ if self.network_uuid is None:
+ raise errors.OpPrereqError("Network %s does not exist" %
+ self.network_name, errors.ECODE_INVAL)
+
+ self.group_uuid = self.cfg.LookupNodeGroup(self.group_name)
+ if self.group_uuid is None:
+ raise errors.OpPrereqError("Group %s does not exist" %
+ self.group_name, errors.ECODE_INVAL)
+
+ self.needed_locks = {
+ locking.LEVEL_INSTANCE: [],
+ locking.LEVEL_NODEGROUP: [self.group_uuid],
+ }
+ self.share_locks[locking.LEVEL_INSTANCE] = 1
+
+ def DeclareLocks(self, level):
+ if level == locking.LEVEL_INSTANCE:
+ assert not self.needed_locks[locking.LEVEL_INSTANCE]
+
+ # Lock instances optimistically, needs verification once group lock has
+ # been acquired
+ if self.op.conflicts_check:
+ self.needed_locks[locking.LEVEL_INSTANCE] = \
+ self.cfg.GetNodeGroupInstances(self.group_uuid)
+
+ def BuildHooksEnv(self):
+ ret = {
+ "GROUP_NAME": self.group_name,
+ }
+ return ret
+
+ def BuildHooksNodes(self):
+ nodes = self.cfg.GetNodeGroup(self.group_uuid).members
+ return (nodes, nodes)
+
+ def CheckPrereq(self):
+ owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP))
+
+ assert self.group_uuid in owned_groups
+
+ l = lambda value: utils.CommaJoin("%s: %s/%s" % (i[0], i[1], i[2])
+ for i in value)
+
+ self.group = self.cfg.GetNodeGroup(self.group_uuid)
+ self.connected = True
+ if self.network_uuid not in self.group.networks:
+ self.LogWarning("Network '%s' is not mapped to group '%s'",
+ self.network_name, self.group.name)
+ self.connected = False
+ return
+
+ if self.op.conflicts_check:
+ # Check if locked instances are still correct
+ owned_instances = frozenset(self.owned_locks(locking.LEVEL_INSTANCE))
+ _CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_instances)
+
+ conflicting_instances = []
+
+ for (_, instance) in self.cfg.GetMultiInstanceInfo(owned_instances):
+ for idx, nic in enumerate(instance.nics):
+ if nic.network == self.network_name:
+ conflicting_instances.append((instance.name, idx, nic.ip))
+
+ if conflicting_instances:
+ self.LogWarning("Following occurences use IPs from network %s"
+ " that is about to disconnected from the nodegroup"
+ " %s: %s" %
+ (self.network_name, self.group.name,
+ l(conflicting_instances)))
+ raise errors.OpPrereqError("Conflicting IPs."
+ " Please remove/modify"
+ " corresponding NICS",
+ errors.ECODE_INVAL)
+
+ def Exec(self, feedback_fn):
+ if not self.connected:
+ return
+
+ del self.group.networks[self.network_uuid]
+ self.cfg.Update(self.group, feedback_fn)
#: Query type implementations
constants.QR_INSTANCE: _InstanceQuery,
constants.QR_NODE: _NodeQuery,
constants.QR_GROUP: _GroupQuery,
+ constants.QR_NETWORK: _NetworkQuery,
constants.QR_OS: _OsQuery,
constants.QR_EXPORT: _ExportQuery,
}
except KeyError:
raise errors.OpPrereqError("Unknown query resource '%s'" % name,
errors.ECODE_INVAL)
+
+
+def _CheckForConflictingIp(lu, ip, node):
+ """In case of conflicting ip raise error.
+
+ @type ip: string
+ @param ip: ip address
+ @type node: string
+ @param node: node name
+
+ """
+ (conf_net, _) = lu.cfg.CheckIPInNodeGroup(ip, node)
+ if conf_net is not None:
+ raise errors.OpPrereqError("Conflicting IP found:"
+ " %s <> %s." % (ip, conf_net),
+ errors.ECODE_INVAL)
+
+ return (None, None)