AnnotateDiskParams, GetUpdatedParams, ExpandInstanceName, \
ComputeIPolicySpecViolation, CheckInstanceState, ExpandNodeName
from ganeti.cmdlib.instance_storage import CreateDisks, \
- CheckNodesFreeDiskPerVG, WipeDisks, WaitForSync, \
+ CheckNodesFreeDiskPerVG, WipeDisks, WipeOrCleanupDisks, WaitForSync, \
IsExclusiveStorageEnabledNodeName, CreateSingleBlockDev, ComputeDisks, \
CheckRADOSFreeSpace, ComputeDiskSizePerVG, GenerateDiskTemplate, \
- CreateBlockDev, StartInstanceDisks, ShutdownInstanceDisks, \
- AssembleInstanceDisks
+ StartInstanceDisks, ShutdownInstanceDisks, AssembleInstanceDisks
from ganeti.cmdlib.instance_utils import BuildInstanceHookEnvByObject, \
GetClusterDomainSecret, BuildInstanceHookEnv, NICListToTuple, \
NICToTuple, CheckNodeNotDrained, RemoveInstance, CopyLockList, \
raise errors.OpPrereqError("Invalid file driver name '%s'" %
self.op.file_driver, errors.ECODE_INVAL)
+ # set default file_driver if unset and required
+ if (not self.op.file_driver and
+ self.op.disk_template in [constants.DT_FILE,
+ constants.DT_SHARED_FILE]):
+ self.op.file_driver = constants.FD_DEFAULT
+
if self.op.disk_template == constants.DT_FILE:
opcodes.RequireFileStorage()
elif self.op.disk_template == constants.DT_SHARED_FILE:
if self.op.opportunistic_locking:
self.opportunistic_locks[locking.LEVEL_NODE] = True
- self.opportunistic_locks[locking.LEVEL_NODE_RES] = True
else:
self.op.pnode = ExpandNodeName(self.cfg, self.op.pnode)
nodelist = [self.op.pnode]
self.needed_locks[locking.LEVEL_NODE_RES] = \
CopyLockList(self.needed_locks[locking.LEVEL_NODE])
+ # Optimistically acquire shared group locks (we're reading the
+ # configuration). We can't just call GetInstanceNodeGroups, because the
+ # instance doesn't exist yet. Therefore we lock all node groups of all
+ # nodes we have.
+ if self.needed_locks[locking.LEVEL_NODE] == locking.ALL_SET:
+ # In the case we lock all nodes for opportunistic allocation, we have no
+ # choice than to lock all groups, because they're allocated before nodes.
+ # This is sad, but true. At least we release all those we don't need in
+ # CheckPrereq later.
+ self.needed_locks[locking.LEVEL_NODEGROUP] = locking.ALL_SET
+ else:
+ self.needed_locks[locking.LEVEL_NODEGROUP] = \
+ list(self.cfg.GetNodeGroupsFromNodes(
+ self.needed_locks[locking.LEVEL_NODE]))
+ self.share_locks[locking.LEVEL_NODEGROUP] = 1
+
+ def DeclareLocks(self, level):
+ if level == locking.LEVEL_NODE_RES and \
+ self.opportunistic_locks[locking.LEVEL_NODE]:
+ # Even when using opportunistic locking, we require the same set of
+ # NODE_RES locks as we got NODE locks
+ self.needed_locks[locking.LEVEL_NODE_RES] = \
+ self.owned_locks(locking.LEVEL_NODE)
+
def _RunAllocator(self):
"""Run the allocator based on input opcode.
vcpus=self.be_full[constants.BE_VCPUS],
nics=NICListToTuple(self, self.nics),
disk_template=self.op.disk_template,
- disks=[(d[constants.IDISK_NAME], d[constants.IDISK_SIZE],
- d[constants.IDISK_MODE]) for d in self.disks],
+ disks=[(d[constants.IDISK_NAME], d.get("uuid", ""),
+ d[constants.IDISK_SIZE], d[constants.IDISK_MODE])
+ for d in self.disks],
bep=self.be_full,
hvp=self.hv_full,
hypervisor_name=self.op.hypervisor,
for idx in range(constants.MAX_DISKS):
if einfo.has_option(constants.INISECT_INS, "disk%d_size" % idx):
disk_sz = einfo.getint(constants.INISECT_INS, "disk%d_size" % idx)
- disks.append({constants.IDISK_SIZE: disk_sz})
+ disk_name = einfo.get(constants.INISECT_INS, "disk%d_name" % idx)
+ disk = {
+ constants.IDISK_SIZE: disk_sz,
+ constants.IDISK_NAME: disk_name
+ }
+ disks.append(disk)
self.op.disks = disks
if not disks and self.op.disk_template != constants.DT_DISKLESS:
raise errors.OpPrereqError("No disk info specified and the export"
for idx in range(constants.MAX_NICS):
if einfo.has_option(constants.INISECT_INS, "nic%d_mac" % idx):
ndict = {}
- for name in list(constants.NICS_PARAMETERS) + ["ip", "mac"]:
+ for name in [constants.INIC_IP,
+ constants.INIC_MAC, constants.INIC_NAME]:
v = einfo.get(constants.INISECT_INS, "nic%d_%s" % (idx, name))
ndict[name] = v
+ network = einfo.get(constants.INISECT_INS,
+ "nic%d_%s" % (idx, constants.INIC_NETWORK))
+ # in case network is given link and mode are inherited
+ # from nodegroup's netparams and thus should not be passed here
+ if network:
+ ndict[constants.INIC_NETWORK] = network
+ else:
+ for name in list(constants.NICS_PARAMETERS):
+ v = einfo.get(constants.INISECT_INS, "nic%d_%s" % (idx, name))
+ ndict[name] = v
nics.append(ndict)
else:
break
"""Check prerequisites.
"""
+ # Check that the optimistically acquired groups are correct wrt the
+ # acquired nodes
+ owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP))
+ owned_nodes = frozenset(self.owned_locks(locking.LEVEL_NODE))
+ cur_groups = list(self.cfg.GetNodeGroupsFromNodes(owned_nodes))
+ if not owned_groups.issuperset(cur_groups):
+ raise errors.OpPrereqError("New instance %s's node groups changed since"
+ " locks were acquired, current groups are"
+ " are '%s', owning groups '%s'; retry the"
+ " operation" %
+ (self.op.instance_name,
+ utils.CommaJoin(cur_groups),
+ utils.CommaJoin(owned_groups)),
+ errors.ECODE_STATE)
+
self._CalculateFileStorageDir()
if self.op.mode == constants.INSTANCE_IMPORT:
ReleaseLocks(self, locking.LEVEL_NODE, keep=keep_locks)
ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=keep_locks)
ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
+ # Release all unneeded group locks
+ ReleaseLocks(self, locking.LEVEL_NODEGROUP,
+ keep=self.cfg.GetNodeGroupsFromNodes(keep_locks))
assert (self.owned_locks(locking.LEVEL_NODE) ==
self.owned_locks(locking.LEVEL_NODE_RES)), \
primary_node=pnode_name,
nics=self.nics, disks=disks,
disk_template=self.op.disk_template,
+ disks_active=False,
admin_state=constants.ADMINST_DOWN,
network_port=network_port,
beparams=self.op.beparams,
raise errors.OpExecError("There are some degraded disks for"
" this instance")
+ # instance disks are now active
+ iobj.disks_active = True
+
# Release all node resource locks
ReleaseLocks(self, locking.LEVEL_NODE_RES)
idx, result.fail_msg)
errs.append(result.fail_msg)
break
- dev_path = result.payload
+ dev_path, _ = result.payload
result = self.rpc.call_blockdev_export(source_node, (disk, instance),
target_node, dev_path,
cluster_name)
" pnode/snode while others do not",
errors.ECODE_INVAL)
- if self.op.iallocator is None:
+ if not has_nodes and self.op.iallocator is None:
default_iallocator = self.cfg.GetDefaultIAllocator()
- if default_iallocator and has_nodes:
+ if default_iallocator:
self.op.iallocator = default_iallocator
else:
raise errors.OpPrereqError("No iallocator or nodes on the instances"
if self.op.opportunistic_locking:
self.opportunistic_locks[locking.LEVEL_NODE] = True
- self.opportunistic_locks[locking.LEVEL_NODE_RES] = True
else:
nodeslist = []
for inst in self.op.instances:
# prevent accidential modification)
self.needed_locks[locking.LEVEL_NODE_RES] = list(nodeslist)
+ def DeclareLocks(self, level):
+ if level == locking.LEVEL_NODE_RES and \
+ self.opportunistic_locks[locking.LEVEL_NODE]:
+ # Even when using opportunistic locking, we require the same set of
+ # NODE_RES locks as we got NODE locks
+ self.needed_locks[locking.LEVEL_NODE_RES] = \
+ self.owned_locks(locking.LEVEL_NODE)
+
def CheckPrereq(self):
"""Check prerequisite.
"""
- cluster = self.cfg.GetClusterInfo()
- default_vg = self.cfg.GetVGName()
- ec_id = self.proc.GetECId()
+ if self.op.iallocator:
+ cluster = self.cfg.GetClusterInfo()
+ default_vg = self.cfg.GetVGName()
+ ec_id = self.proc.GetECId()
- if self.op.opportunistic_locking:
- # Only consider nodes for which a lock is held
- node_whitelist = list(self.owned_locks(locking.LEVEL_NODE))
- else:
- node_whitelist = None
+ if self.op.opportunistic_locking:
+ # Only consider nodes for which a lock is held
+ node_whitelist = list(self.owned_locks(locking.LEVEL_NODE))
+ else:
+ node_whitelist = None
- insts = [_CreateInstanceAllocRequest(op, ComputeDisks(op, default_vg),
- _ComputeNics(op, cluster, None,
- self.cfg, ec_id),
- _ComputeFullBeParams(op, cluster),
- node_whitelist)
- for op in self.op.instances]
+ insts = [_CreateInstanceAllocRequest(op, ComputeDisks(op, default_vg),
+ _ComputeNics(op, cluster, None,
+ self.cfg, ec_id),
+ _ComputeFullBeParams(op, cluster),
+ node_whitelist)
+ for op in self.op.instances]
- req = iallocator.IAReqMultiInstanceAlloc(instances=insts)
- ial = iallocator.IAllocator(self.cfg, self.rpc, req)
+ req = iallocator.IAReqMultiInstanceAlloc(instances=insts)
+ ial = iallocator.IAllocator(self.cfg, self.rpc, req)
- ial.Run(self.op.iallocator)
+ ial.Run(self.op.iallocator)
- if not ial.success:
- raise errors.OpPrereqError("Can't compute nodes using"
- " iallocator '%s': %s" %
- (self.op.iallocator, ial.info),
- errors.ECODE_NORES)
+ if not ial.success:
+ raise errors.OpPrereqError("Can't compute nodes using"
+ " iallocator '%s': %s" %
+ (self.op.iallocator, ial.info),
+ errors.ECODE_NORES)
- self.ia_result = ial.result
+ self.ia_result = ial.result
if self.op.dry_run:
self.dry_run_result = objects.FillDict(self._ConstructPartialResult(), {
"""Contructs the partial result.
"""
- (allocatable, failed) = self.ia_result
+ if self.op.iallocator:
+ (allocatable, failed_insts) = self.ia_result
+ allocatable_insts = map(compat.fst, allocatable)
+ else:
+ allocatable_insts = [op.instance_name for op in self.op.instances]
+ failed_insts = []
+
return {
- opcodes.OpInstanceMultiAlloc.ALLOCATABLE_KEY:
- map(compat.fst, allocatable),
- opcodes.OpInstanceMultiAlloc.FAILED_KEY: failed,
+ opcodes.OpInstanceMultiAlloc.ALLOCATABLE_KEY: allocatable_insts,
+ opcodes.OpInstanceMultiAlloc.FAILED_KEY: failed_insts,
}
def Exec(self, feedback_fn):
"""Executes the opcode.
"""
- op2inst = dict((op.instance_name, op) for op in self.op.instances)
- (allocatable, failed) = self.ia_result
-
jobs = []
- for (name, nodes) in allocatable:
- op = op2inst.pop(name)
+ if self.op.iallocator:
+ op2inst = dict((op.instance_name, op) for op in self.op.instances)
+ (allocatable, failed) = self.ia_result
- if len(nodes) > 1:
- (op.pnode, op.snode) = nodes
- else:
- (op.pnode,) = nodes
+ for (name, nodes) in allocatable:
+ op = op2inst.pop(name)
+
+ if len(nodes) > 1:
+ (op.pnode, op.snode) = nodes
+ else:
+ (op.pnode,) = nodes
- jobs.append([op])
+ jobs.append([op])
- missing = set(op2inst.keys()) - set(failed)
- assert not missing, \
- "Iallocator did return incomplete result: %s" % utils.CommaJoin(missing)
+ missing = set(op2inst.keys()) - set(failed)
+ assert not missing, \
+ "Iallocator did return incomplete result: %s" % \
+ utils.CommaJoin(missing)
+ else:
+ jobs.extend([op] for op in self.op.instances)
return ResultWithJobs(jobs, **self._ConstructPartialResult())
if op == constants.DDM_REMOVE:
assert not params
- if remove_fn is not None:
- remove_fn(absidx, item, private)
-
changes = [("%s/%s" % (kind, absidx), "remove")]
+ if remove_fn is not None:
+ msg = remove_fn(absidx, item, private)
+ if msg:
+ changes.append(("%s/%s" % (kind, absidx), msg))
+
assert container[absidx] == item
del container[absidx]
elif op == constants.DDM_MODIFY:
else:
raise errors.ProgrammerError("Unhandled operation '%s'" % op)
- @staticmethod
- def _VerifyDiskModification(op, params):
+ def _VerifyDiskModification(self, op, params):
"""Verifies a disk modification.
"""
if constants.IDISK_SIZE in params:
raise errors.OpPrereqError("Disk size change not possible, use"
" grow-disk", errors.ECODE_INVAL)
- if len(params) > 2:
- raise errors.OpPrereqError("Disk modification doesn't support"
- " additional arbitrary parameters",
- errors.ECODE_INVAL)
+
+ # Disk modification supports changing only the disk name and mode.
+ # Changing arbitrary parameters is allowed only for ext disk template",
+ if self.instance.disk_template != constants.DT_EXT:
+ utils.ForceDictType(params, constants.MODIFIABLE_IDISK_PARAMS_TYPES)
+
name = params.get(constants.IDISK_NAME, None)
if name is not None and name.lower() == constants.VALUE_NONE:
params[constants.IDISK_NAME] = None
def CheckArguments(self):
if not (self.op.nics or self.op.disks or self.op.disk_template or
self.op.hvparams or self.op.beparams or self.op.os_name or
- self.op.offline is not None or self.op.runtime_mem or
- self.op.pnode):
+ self.op.osparams or self.op.offline is not None or
+ self.op.runtime_mem or self.op.pnode):
raise errors.OpPrereqError("No changes submitted", errors.ECODE_INVAL)
if self.op.hvparams:
# Operate on copies as this is still in prereq
nics = [nic.Copy() for nic in instance.nics]
_ApplyContainerMods("NIC", nics, self._nic_chgdesc, self.nicmod,
- self._CreateNewNic, self._ApplyNicMods, None)
+ self._CreateNewNic, self._ApplyNicMods,
+ self._RemoveNic)
# Verify that NIC names are unique and valid
utils.ValidateDeviceNames("NIC", nics)
self._new_nics = nics
self.LogWarning("Could not remove metadata for disk %d on node %s,"
" continuing anyway: %s", idx, pnode, msg)
+ def _HotplugDevice(self, action, dev_type, device, extra, seq):
+ self.LogInfo("Trying to hotplug device...")
+ msg = "hotplug:"
+ result = self.rpc.call_hotplug_device(self.instance.primary_node,
+ self.instance, action, dev_type,
+ (device, self.instance),
+ extra, seq)
+ if result.fail_msg:
+ self.LogWarning("Could not hotplug device: %s" % result.fail_msg)
+ self.LogInfo("Continuing execution..")
+ msg += "failed"
+ else:
+ self.LogInfo("Hotplug done.")
+ msg += "done"
+ return msg
+
def _CreateNewDisk(self, idx, params, _):
"""Creates a new disk.
[params], file_path, file_driver, idx,
self.Log, self.diskparams)[0]
- info = GetInstanceInfoText(instance)
-
- logging.info("Creating volume %s for instance %s",
- disk.iv_name, instance.name)
- # Note: this needs to be kept in sync with _CreateDisks
- #HARDCODE
- for node in instance.all_nodes:
- f_create = (node == instance.primary_node)
- try:
- CreateBlockDev(self, node, instance, disk, f_create, info, f_create)
- except errors.OpExecError, err:
- self.LogWarning("Failed to create volume %s (%s) on node '%s': %s",
- disk.iv_name, disk, node, err)
+ new_disks = CreateDisks(self, instance, disks=[disk])
if self.cluster.prealloc_wipe_disks:
# Wipe new disk
- WipeDisks(self, instance,
- disks=[(idx, disk, 0)])
+ WipeOrCleanupDisks(self, instance,
+ disks=[(idx, disk, 0)],
+ cleanup=new_disks)
- return (disk, [
- ("disk/%d" % idx, "add:size=%s,mode=%s" % (disk.size, disk.mode)),
- ])
+ changes = [
+ ("disk/%d" % idx,
+ "add:size=%s,mode=%s" % (disk.size, disk.mode)),
+ ]
+ if self.op.hotplug:
+ self.cfg.SetDiskID(disk, self.instance.primary_node)
+ result = self.rpc.call_blockdev_assemble(self.instance.primary_node,
+ (disk, self.instance),
+ self.instance.name, True, idx)
+ if result.fail_msg:
+ changes.append(("disk/%d" % idx, "assemble:failed"))
+ self.LogWarning("Can't assemble newly created disk %d: %s",
+ idx, result.fail_msg)
+ else:
+ _, link_name = result.payload
+ msg = self._HotplugDevice(constants.HOTPLUG_ACTION_ADD,
+ constants.HOTPLUG_TARGET_DISK,
+ disk, link_name, idx)
+ changes.append(("disk/%d" % idx, msg))
- @staticmethod
- def _ModifyDisk(idx, disk, params, _):
+ return (disk, changes)
+
+ def _ModifyDisk(self, idx, disk, params, _):
"""Modifies a disk.
"""
changes = []
- mode = params.get(constants.IDISK_MODE, None)
- if mode:
- disk.mode = mode
+ if constants.IDISK_MODE in params:
+ disk.mode = params.get(constants.IDISK_MODE)
changes.append(("disk.mode/%d" % idx, disk.mode))
- name = params.get(constants.IDISK_NAME, None)
- disk.name = name
- changes.append(("disk.name/%d" % idx, disk.name))
+ if constants.IDISK_NAME in params:
+ disk.name = params.get(constants.IDISK_NAME)
+ changes.append(("disk.name/%d" % idx, disk.name))
+
+ # Modify arbitrary params in case instance template is ext
+ for key, value in params.iteritems():
+ if (key not in constants.MODIFIABLE_IDISK_PARAMS and
+ self.instance.disk_template == constants.DT_EXT):
+ # stolen from GetUpdatedParams: default means reset/delete
+ if value.lower() == constants.VALUE_DEFAULT:
+ try:
+ del disk.params[key]
+ except KeyError:
+ pass
+ else:
+ disk.params[key] = value
+ changes.append(("disk.params:%s/%d" % (key, idx), value))
return changes
"""Removes a disk.
"""
+ hotmsg = ""
+ if self.op.hotplug:
+ hotmsg = self._HotplugDevice(constants.HOTPLUG_ACTION_REMOVE,
+ constants.HOTPLUG_TARGET_DISK,
+ root, None, idx)
+ ShutdownInstanceDisks(self, self.instance, [root])
+
(anno_disk,) = AnnotateDiskParams(self.instance, [root], self.cfg)
for node, disk in anno_disk.ComputeNodeTree(self.instance.primary_node):
self.cfg.SetDiskID(disk, node)
if root.dev_type in constants.LDS_DRBD:
self.cfg.AddTcpUdpPort(root.logical_id[2])
+ return hotmsg
+
def _CreateNewNic(self, idx, params, private):
"""Creates data structure for a new network interface.
nicparams=nicparams)
nobj.uuid = self.cfg.GenerateUniqueID(self.proc.GetECId())
- return (nobj, [
+ changes = [
("nic.%d" % idx,
"add:mac=%s,ip=%s,mode=%s,link=%s,network=%s" %
(mac, ip, private.filled[constants.NIC_MODE],
- private.filled[constants.NIC_LINK],
- net)),
- ])
+ private.filled[constants.NIC_LINK], net)),
+ ]
+
+ if self.op.hotplug:
+ msg = self._HotplugDevice(constants.HOTPLUG_ACTION_ADD,
+ constants.HOTPLUG_TARGET_NIC,
+ nobj, None, idx)
+ changes.append(("nic.%d" % idx, msg))
+
+ return (nobj, changes)
def _ApplyNicMods(self, idx, nic, params, private):
"""Modifies a network interface.
for (key, val) in nic.nicparams.items():
changes.append(("nic.%s/%d" % (key, idx), val))
+ if self.op.hotplug:
+ msg = self._HotplugDevice(constants.HOTPLUG_ACTION_MODIFY,
+ constants.HOTPLUG_TARGET_NIC,
+ nic, None, idx)
+ changes.append(("nic/%d" % idx, msg))
+
return changes
+ def _RemoveNic(self, idx, nic, _):
+ if self.op.hotplug:
+ return self._HotplugDevice(constants.HOTPLUG_ACTION_REMOVE,
+ constants.HOTPLUG_TARGET_NIC,
+ nic, None, idx)
+
def Exec(self, feedback_fn):
"""Modifies an instance.