4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Logical units dealing with instances."""
32 from ganeti import compat
33 from ganeti import constants
34 from ganeti import errors
36 from ganeti import hypervisor
37 from ganeti import locking
38 from ganeti.masterd import iallocator
39 from ganeti import masterd
40 from ganeti import netutils
41 from ganeti import objects
42 from ganeti import opcodes
43 from ganeti import pathutils
44 from ganeti import qlang
45 from ganeti import rpc
46 from ganeti import utils
47 from ganeti import query
49 from ganeti.cmdlib.base import NoHooksLU, LogicalUnit, _QueryBase, \
50 ResultWithJobs, Tasklet
52 from ganeti.cmdlib.common import INSTANCE_ONLINE, INSTANCE_DOWN, \
53 INSTANCE_NOT_RUNNING, CAN_CHANGE_INSTANCE_OFFLINE, _CheckNodeOnline, \
54 _ShareAll, _GetDefaultIAllocator, _CheckInstanceNodeGroups, \
55 _LoadNodeEvacResult, _CheckIAllocatorOrNode, _CheckParamsNotGlobal, \
56 _IsExclusiveStorageEnabledNode, _CheckHVParams, _CheckOSParams, \
57 _GetWantedInstances, _CheckInstancesNodeGroups, _AnnotateDiskParams, \
58 _GetUpdatedParams, _ExpandInstanceName, _FindFaultyInstanceDisks, \
59 _ComputeIPolicySpecViolation, _ComputeIPolicyInstanceViolation, \
60 _CheckInstanceState, _ExpandNodeName
61 from ganeti.cmdlib.instance_utils import _AssembleInstanceDisks, \
62 _BuildInstanceHookEnvByObject, _GetClusterDomainSecret, \
63 _BuildInstanceHookEnv, _NICListToTuple, _NICToTuple, _CheckNodeNotDrained, \
64 _RemoveDisks, _StartInstanceDisks, _ShutdownInstanceDisks, \
65 _RemoveInstance, _ExpandCheckDisks
67 import ganeti.masterd.instance
70 _DISK_TEMPLATE_NAME_PREFIX = {
71 constants.DT_PLAIN: "",
72 constants.DT_RBD: ".rbd",
73 constants.DT_EXT: ".ext",
77 _DISK_TEMPLATE_DEVICE_TYPE = {
78 constants.DT_PLAIN: constants.LD_LV,
79 constants.DT_FILE: constants.LD_FILE,
80 constants.DT_SHARED_FILE: constants.LD_FILE,
81 constants.DT_BLOCK: constants.LD_BLOCKDEV,
82 constants.DT_RBD: constants.LD_RBD,
83 constants.DT_EXT: constants.LD_EXT,
87 #: Type description for changes as returned by L{ApplyContainerMods}'s
89 _TApplyContModsCbChanges = \
90 ht.TMaybeListOf(ht.TAnd(ht.TIsLength(2), ht.TItems([
96 def _CopyLockList(names):
97 """Makes a copy of a list of lock names.
99 Handles L{locking.ALL_SET} correctly.
102 if names == locking.ALL_SET:
103 return locking.ALL_SET
108 def _ReleaseLocks(lu, level, names=None, keep=None):
109 """Releases locks owned by an LU.
111 @type lu: L{LogicalUnit}
112 @param level: Lock level
113 @type names: list or None
114 @param names: Names of locks to release
115 @type keep: list or None
116 @param keep: Names of locks to retain
119 assert not (keep is not None and names is not None), \
120 "Only one of the 'names' and the 'keep' parameters can be given"
122 if names is not None:
123 should_release = names.__contains__
125 should_release = lambda name: name not in keep
127 should_release = None
129 owned = lu.owned_locks(level)
131 # Not owning any lock at this level, do nothing
138 # Determine which locks to release
140 if should_release(name):
145 assert len(lu.owned_locks(level)) == (len(retain) + len(release))
147 # Release just some locks
148 lu.glm.release(level, names=release)
150 assert frozenset(lu.owned_locks(level)) == frozenset(retain)
153 lu.glm.release(level)
155 assert not lu.glm.is_owned(level), "No locks should be owned"
158 def _CheckHostnameSane(lu, name):
159 """Ensures that a given hostname resolves to a 'sane' name.
161 The given name is required to be a prefix of the resolved hostname,
162 to prevent accidental mismatches.
164 @param lu: the logical unit on behalf of which we're checking
165 @param name: the name we should resolve and check
166 @return: the resolved hostname object
169 hostname = netutils.GetHostname(name=name)
170 if hostname.name != name:
171 lu.LogInfo("Resolved given name '%s' to '%s'", name, hostname.name)
172 if not utils.MatchNameComponent(name, [hostname.name]):
173 raise errors.OpPrereqError(("Resolved hostname '%s' does not look the"
174 " same as given hostname '%s'") %
175 (hostname.name, name), errors.ECODE_INVAL)
179 def _CheckOpportunisticLocking(op):
180 """Generate error if opportunistic locking is not possible.
183 if op.opportunistic_locking and not op.iallocator:
184 raise errors.OpPrereqError("Opportunistic locking is only available in"
185 " combination with an instance allocator",
189 def _CreateInstanceAllocRequest(op, disks, nics, beparams, node_whitelist):
190 """Wrapper around IAReqInstanceAlloc.
192 @param op: The instance opcode
193 @param disks: The computed disks
194 @param nics: The computed nics
195 @param beparams: The full filled beparams
196 @param node_whitelist: List of nodes which should appear as online to the
197 allocator (unless the node is already marked offline)
199 @returns: A filled L{iallocator.IAReqInstanceAlloc}
202 spindle_use = beparams[constants.BE_SPINDLE_USE]
203 return iallocator.IAReqInstanceAlloc(name=op.instance_name,
204 disk_template=op.disk_template,
207 vcpus=beparams[constants.BE_VCPUS],
208 memory=beparams[constants.BE_MAXMEM],
209 spindle_use=spindle_use,
211 nics=[n.ToDict() for n in nics],
212 hypervisor=op.hypervisor,
213 node_whitelist=node_whitelist)
216 def _ComputeFullBeParams(op, cluster):
217 """Computes the full beparams.
219 @param op: The instance opcode
220 @param cluster: The cluster config object
222 @return: The fully filled beparams
225 default_beparams = cluster.beparams[constants.PP_DEFAULT]
226 for param, value in op.beparams.iteritems():
227 if value == constants.VALUE_AUTO:
228 op.beparams[param] = default_beparams[param]
229 objects.UpgradeBeParams(op.beparams)
230 utils.ForceDictType(op.beparams, constants.BES_PARAMETER_TYPES)
231 return cluster.SimpleFillBE(op.beparams)
234 def _ComputeNics(op, cluster, default_ip, cfg, ec_id):
235 """Computes the nics.
237 @param op: The instance opcode
238 @param cluster: Cluster configuration object
239 @param default_ip: The default ip to assign
240 @param cfg: An instance of the configuration object
241 @param ec_id: Execution context ID
243 @returns: The build up nics
248 nic_mode_req = nic.get(constants.INIC_MODE, None)
249 nic_mode = nic_mode_req
250 if nic_mode is None or nic_mode == constants.VALUE_AUTO:
251 nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
253 net = nic.get(constants.INIC_NETWORK, None)
254 link = nic.get(constants.NIC_LINK, None)
255 ip = nic.get(constants.INIC_IP, None)
257 if net is None or net.lower() == constants.VALUE_NONE:
260 if nic_mode_req is not None or link is not None:
261 raise errors.OpPrereqError("If network is given, no mode or link"
262 " is allowed to be passed",
266 if ip is None or ip.lower() == constants.VALUE_NONE:
268 elif ip.lower() == constants.VALUE_AUTO:
269 if not op.name_check:
270 raise errors.OpPrereqError("IP address set to auto but name checks"
271 " have been skipped",
275 # We defer pool operations until later, so that the iallocator has
276 # filled in the instance's node(s) dimara
277 if ip.lower() == constants.NIC_IP_POOL:
279 raise errors.OpPrereqError("if ip=pool, parameter network"
280 " must be passed too",
283 elif not netutils.IPAddress.IsValid(ip):
284 raise errors.OpPrereqError("Invalid IP address '%s'" % ip,
289 # TODO: check the ip address for uniqueness
290 if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
291 raise errors.OpPrereqError("Routed nic mode requires an ip address",
294 # MAC address verification
295 mac = nic.get(constants.INIC_MAC, constants.VALUE_AUTO)
296 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
297 mac = utils.NormalizeAndValidateMac(mac)
300 # TODO: We need to factor this out
301 cfg.ReserveMAC(mac, ec_id)
302 except errors.ReservationError:
303 raise errors.OpPrereqError("MAC address %s already in use"
305 errors.ECODE_NOTUNIQUE)
307 # Build nic parameters
310 nicparams[constants.NIC_MODE] = nic_mode
312 nicparams[constants.NIC_LINK] = link
314 check_params = cluster.SimpleFillNIC(nicparams)
315 objects.NIC.CheckParameterSyntax(check_params)
316 net_uuid = cfg.LookupNetwork(net)
317 name = nic.get(constants.INIC_NAME, None)
318 if name is not None and name.lower() == constants.VALUE_NONE:
320 nic_obj = objects.NIC(mac=mac, ip=nic_ip, name=name,
321 network=net_uuid, nicparams=nicparams)
322 nic_obj.uuid = cfg.GenerateUniqueID(ec_id)
328 def _CheckForConflictingIp(lu, ip, node):
329 """In case of conflicting IP address raise error.
332 @param ip: IP address
334 @param node: node name
337 (conf_net, _) = lu.cfg.CheckIPInNodeGroup(ip, node)
338 if conf_net is not None:
339 raise errors.OpPrereqError(("The requested IP address (%s) belongs to"
340 " network %s, but the target NIC does not." %
347 def _CheckRADOSFreeSpace():
348 """Compute disk size requirements inside the RADOS cluster.
351 # For the RADOS cluster we assume there is always enough space.
355 def _WaitForSync(lu, instance, disks=None, oneshot=False):
356 """Sleep and poll for an instance's disk to sync.
359 if not instance.disks or disks is not None and not disks:
362 disks = _ExpandCheckDisks(instance, disks)
365 lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
367 node = instance.primary_node
370 lu.cfg.SetDiskID(dev, node)
372 # TODO: Convert to utils.Retry
375 degr_retries = 10 # in seconds, as we sleep 1 second each time
379 cumul_degraded = False
380 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, (disks, instance))
381 msg = rstats.fail_msg
383 lu.LogWarning("Can't get any data from node %s: %s", node, msg)
386 raise errors.RemoteError("Can't contact node %s for mirror data,"
390 rstats = rstats.payload
392 for i, mstat in enumerate(rstats):
394 lu.LogWarning("Can't compute data for node %s/%s",
395 node, disks[i].iv_name)
398 cumul_degraded = (cumul_degraded or
399 (mstat.is_degraded and mstat.sync_percent is None))
400 if mstat.sync_percent is not None:
402 if mstat.estimated_time is not None:
403 rem_time = ("%s remaining (estimated)" %
404 utils.FormatSeconds(mstat.estimated_time))
405 max_time = mstat.estimated_time
407 rem_time = "no time estimate"
408 lu.LogInfo("- device %s: %5.2f%% done, %s",
409 disks[i].iv_name, mstat.sync_percent, rem_time)
411 # if we're done but degraded, let's do a few small retries, to
412 # make sure we see a stable and not transient situation; therefore
413 # we force restart of the loop
414 if (done or oneshot) and cumul_degraded and degr_retries > 0:
415 logging.info("Degraded disks found, %d retries left", degr_retries)
423 time.sleep(min(60, max_time))
426 lu.LogInfo("Instance %s's disks are in sync", instance.name)
428 return not cumul_degraded
431 def _ComputeDisks(op, default_vg):
432 """Computes the instance disks.
434 @param op: The instance opcode
435 @param default_vg: The default_vg to assume
437 @return: The computed disks
441 for disk in op.disks:
442 mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
443 if mode not in constants.DISK_ACCESS_SET:
444 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
445 mode, errors.ECODE_INVAL)
446 size = disk.get(constants.IDISK_SIZE, None)
448 raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
451 except (TypeError, ValueError):
452 raise errors.OpPrereqError("Invalid disk size '%s'" % size,
455 ext_provider = disk.get(constants.IDISK_PROVIDER, None)
456 if ext_provider and op.disk_template != constants.DT_EXT:
457 raise errors.OpPrereqError("The '%s' option is only valid for the %s"
458 " disk template, not %s" %
459 (constants.IDISK_PROVIDER, constants.DT_EXT,
460 op.disk_template), errors.ECODE_INVAL)
462 data_vg = disk.get(constants.IDISK_VG, default_vg)
463 name = disk.get(constants.IDISK_NAME, None)
464 if name is not None and name.lower() == constants.VALUE_NONE:
467 constants.IDISK_SIZE: size,
468 constants.IDISK_MODE: mode,
469 constants.IDISK_VG: data_vg,
470 constants.IDISK_NAME: name,
473 if constants.IDISK_METAVG in disk:
474 new_disk[constants.IDISK_METAVG] = disk[constants.IDISK_METAVG]
475 if constants.IDISK_ADOPT in disk:
476 new_disk[constants.IDISK_ADOPT] = disk[constants.IDISK_ADOPT]
478 # For extstorage, demand the `provider' option and add any
479 # additional parameters (ext-params) to the dict
480 if op.disk_template == constants.DT_EXT:
482 new_disk[constants.IDISK_PROVIDER] = ext_provider
484 if key not in constants.IDISK_PARAMS:
485 new_disk[key] = disk[key]
487 raise errors.OpPrereqError("Missing provider for template '%s'" %
488 constants.DT_EXT, errors.ECODE_INVAL)
490 disks.append(new_disk)
495 def _ComputeDiskSizePerVG(disk_template, disks):
496 """Compute disk size requirements in the volume group
499 def _compute(disks, payload):
500 """Universal algorithm.
505 vgs[disk[constants.IDISK_VG]] = \
506 vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
510 # Required free disk space as a function of disk and swap space
512 constants.DT_DISKLESS: {},
513 constants.DT_PLAIN: _compute(disks, 0),
514 # 128 MB are added for drbd metadata for each disk
515 constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
516 constants.DT_FILE: {},
517 constants.DT_SHARED_FILE: {},
520 if disk_template not in req_size_dict:
521 raise errors.ProgrammerError("Disk template '%s' size requirement"
522 " is unknown" % disk_template)
524 return req_size_dict[disk_template]
527 def _CheckNodesFreeDiskOnVG(lu, nodenames, vg, requested):
528 """Checks if nodes have enough free disk space in the specified VG.
530 This function checks if all given nodes have the needed amount of
531 free disk. In case any node has less disk or we cannot get the
532 information from the node, this function raises an OpPrereqError
535 @type lu: C{LogicalUnit}
536 @param lu: a logical unit from which we get configuration data
537 @type nodenames: C{list}
538 @param nodenames: the list of node names to check
540 @param vg: the volume group to check
541 @type requested: C{int}
542 @param requested: the amount of disk in MiB to check for
543 @raise errors.OpPrereqError: if the node doesn't have enough disk,
544 or we cannot check the node
547 es_flags = rpc.GetExclusiveStorageForNodeNames(lu.cfg, nodenames)
548 nodeinfo = lu.rpc.call_node_info(nodenames, [vg], None, es_flags)
549 for node in nodenames:
550 info = nodeinfo[node]
551 info.Raise("Cannot get current information from node %s" % node,
552 prereq=True, ecode=errors.ECODE_ENVIRON)
553 (_, (vg_info, ), _) = info.payload
554 vg_free = vg_info.get("vg_free", None)
555 if not isinstance(vg_free, int):
556 raise errors.OpPrereqError("Can't compute free disk space on node"
557 " %s for vg %s, result was '%s'" %
558 (node, vg, vg_free), errors.ECODE_ENVIRON)
559 if requested > vg_free:
560 raise errors.OpPrereqError("Not enough disk space on target node %s"
561 " vg %s: required %d MiB, available %d MiB" %
562 (node, vg, requested, vg_free),
566 def _CheckNodesFreeDiskPerVG(lu, nodenames, req_sizes):
567 """Checks if nodes have enough free disk space in all the VGs.
569 This function checks if all given nodes have the needed amount of
570 free disk. In case any node has less disk or we cannot get the
571 information from the node, this function raises an OpPrereqError
574 @type lu: C{LogicalUnit}
575 @param lu: a logical unit from which we get configuration data
576 @type nodenames: C{list}
577 @param nodenames: the list of node names to check
578 @type req_sizes: C{dict}
579 @param req_sizes: the hash of vg and corresponding amount of disk in
581 @raise errors.OpPrereqError: if the node doesn't have enough disk,
582 or we cannot check the node
585 for vg, req_size in req_sizes.items():
586 _CheckNodesFreeDiskOnVG(lu, nodenames, vg, req_size)
589 def _CheckNodeVmCapable(lu, node):
590 """Ensure that a given node is vm capable.
592 @param lu: the LU on behalf of which we make the check
593 @param node: the node to check
594 @raise errors.OpPrereqError: if the node is not vm capable
597 if not lu.cfg.GetNodeInfo(node).vm_capable:
598 raise errors.OpPrereqError("Can't use non-vm_capable node %s" % node,
602 def _ComputeIPolicyInstanceSpecViolation(
603 ipolicy, instance_spec, disk_template,
604 _compute_fn=_ComputeIPolicySpecViolation):
605 """Compute if instance specs meets the specs of ipolicy.
608 @param ipolicy: The ipolicy to verify against
609 @param instance_spec: dict
610 @param instance_spec: The instance spec to verify
611 @type disk_template: string
612 @param disk_template: the disk template of the instance
613 @param _compute_fn: The function to verify ipolicy (unittest only)
614 @see: L{_ComputeIPolicySpecViolation}
617 mem_size = instance_spec.get(constants.ISPEC_MEM_SIZE, None)
618 cpu_count = instance_spec.get(constants.ISPEC_CPU_COUNT, None)
619 disk_count = instance_spec.get(constants.ISPEC_DISK_COUNT, 0)
620 disk_sizes = instance_spec.get(constants.ISPEC_DISK_SIZE, [])
621 nic_count = instance_spec.get(constants.ISPEC_NIC_COUNT, 0)
622 spindle_use = instance_spec.get(constants.ISPEC_SPINDLE_USE, None)
624 return _compute_fn(ipolicy, mem_size, cpu_count, disk_count, nic_count,
625 disk_sizes, spindle_use, disk_template)
628 def _CheckOSVariant(os_obj, name):
629 """Check whether an OS name conforms to the os variants specification.
631 @type os_obj: L{objects.OS}
632 @param os_obj: OS object to check
634 @param name: OS name passed by the user, to check for validity
637 variant = objects.OS.GetVariant(name)
638 if not os_obj.supported_variants:
640 raise errors.OpPrereqError("OS '%s' doesn't support variants ('%s'"
641 " passed)" % (os_obj.name, variant),
645 raise errors.OpPrereqError("OS name must include a variant",
648 if variant not in os_obj.supported_variants:
649 raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
652 def _CheckNodeHasOS(lu, node, os_name, force_variant):
653 """Ensure that a node supports a given OS.
655 @param lu: the LU on behalf of which we make the check
656 @param node: the node to check
657 @param os_name: the OS to query about
658 @param force_variant: whether to ignore variant errors
659 @raise errors.OpPrereqError: if the node is not supporting the OS
662 result = lu.rpc.call_os_get(node, os_name)
663 result.Raise("OS '%s' not in supported OS list for node %s" %
665 prereq=True, ecode=errors.ECODE_INVAL)
666 if not force_variant:
667 _CheckOSVariant(result.payload, os_name)
670 def _CheckNicsBridgesExist(lu, target_nics, target_node):
671 """Check that the brigdes needed by a list of nics exist.
674 cluster = lu.cfg.GetClusterInfo()
675 paramslist = [cluster.SimpleFillNIC(nic.nicparams) for nic in target_nics]
676 brlist = [params[constants.NIC_LINK] for params in paramslist
677 if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
679 result = lu.rpc.call_bridges_exist(target_node, brlist)
680 result.Raise("Error checking bridges on destination node '%s'" %
681 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
684 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
685 """Checks if a node has enough free memory.
687 This function checks if a given node has the needed amount of free
688 memory. In case the node has less memory or we cannot get the
689 information from the node, this function raises an OpPrereqError
692 @type lu: C{LogicalUnit}
693 @param lu: a logical unit from which we get configuration data
695 @param node: the node to check
697 @param reason: string to use in the error message
698 @type requested: C{int}
699 @param requested: the amount of memory in MiB to check for
700 @type hypervisor_name: C{str}
701 @param hypervisor_name: the hypervisor to ask for memory stats
703 @return: node current free memory
704 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
705 we cannot check the node
708 nodeinfo = lu.rpc.call_node_info([node], None, [hypervisor_name], False)
709 nodeinfo[node].Raise("Can't get data from node %s" % node,
710 prereq=True, ecode=errors.ECODE_ENVIRON)
711 (_, _, (hv_info, )) = nodeinfo[node].payload
713 free_mem = hv_info.get("memory_free", None)
714 if not isinstance(free_mem, int):
715 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
716 " was '%s'" % (node, free_mem),
717 errors.ECODE_ENVIRON)
718 if requested > free_mem:
719 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
720 " needed %s MiB, available %s MiB" %
721 (node, reason, requested, free_mem),
726 def _GenerateUniqueNames(lu, exts):
727 """Generate a suitable LV name.
729 This will generate a logical volume name for the given instance.
734 new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
735 results.append("%s%s" % (new_id, val))
739 def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names,
740 iv_name, p_minor, s_minor):
741 """Generate a drbd8 device complete with its children.
744 assert len(vgnames) == len(names) == 2
745 port = lu.cfg.AllocatePort()
746 shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
748 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
749 logical_id=(vgnames[0], names[0]),
751 dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
752 dev_meta = objects.Disk(dev_type=constants.LD_LV,
753 size=constants.DRBD_META_SIZE,
754 logical_id=(vgnames[1], names[1]),
756 dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
757 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
758 logical_id=(primary, secondary, port,
761 children=[dev_data, dev_meta],
762 iv_name=iv_name, params={})
763 drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
767 def _GenerateDiskTemplate(
768 lu, template_name, instance_name, primary_node, secondary_nodes,
769 disk_info, file_storage_dir, file_driver, base_index,
770 feedback_fn, full_disk_params, _req_file_storage=opcodes.RequireFileStorage,
771 _req_shr_file_storage=opcodes.RequireSharedFileStorage):
772 """Generate the entire disk layout for a given template type.
775 vgname = lu.cfg.GetVGName()
776 disk_count = len(disk_info)
779 if template_name == constants.DT_DISKLESS:
781 elif template_name == constants.DT_DRBD8:
782 if len(secondary_nodes) != 1:
783 raise errors.ProgrammerError("Wrong template configuration")
784 remote_node = secondary_nodes[0]
785 minors = lu.cfg.AllocateDRBDMinor(
786 [primary_node, remote_node] * len(disk_info), instance_name)
788 (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
790 drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
793 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
794 for i in range(disk_count)]):
795 names.append(lv_prefix + "_data")
796 names.append(lv_prefix + "_meta")
797 for idx, disk in enumerate(disk_info):
798 disk_index = idx + base_index
799 data_vg = disk.get(constants.IDISK_VG, vgname)
800 meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
801 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
802 disk[constants.IDISK_SIZE],
804 names[idx * 2:idx * 2 + 2],
805 "disk/%d" % disk_index,
806 minors[idx * 2], minors[idx * 2 + 1])
807 disk_dev.mode = disk[constants.IDISK_MODE]
808 disk_dev.name = disk.get(constants.IDISK_NAME, None)
809 disks.append(disk_dev)
812 raise errors.ProgrammerError("Wrong template configuration")
814 if template_name == constants.DT_FILE:
816 elif template_name == constants.DT_SHARED_FILE:
817 _req_shr_file_storage()
819 name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
820 if name_prefix is None:
823 names = _GenerateUniqueNames(lu, ["%s.disk%s" %
824 (name_prefix, base_index + i)
825 for i in range(disk_count)])
827 if template_name == constants.DT_PLAIN:
829 def logical_id_fn(idx, _, disk):
830 vg = disk.get(constants.IDISK_VG, vgname)
831 return (vg, names[idx])
833 elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE):
835 lambda _, disk_index, disk: (file_driver,
836 "%s/disk%d" % (file_storage_dir,
838 elif template_name == constants.DT_BLOCK:
840 lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
841 disk[constants.IDISK_ADOPT])
842 elif template_name == constants.DT_RBD:
843 logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
844 elif template_name == constants.DT_EXT:
845 def logical_id_fn(idx, _, disk):
846 provider = disk.get(constants.IDISK_PROVIDER, None)
848 raise errors.ProgrammerError("Disk template is %s, but '%s' is"
849 " not found", constants.DT_EXT,
850 constants.IDISK_PROVIDER)
851 return (provider, names[idx])
853 raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
855 dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name]
857 for idx, disk in enumerate(disk_info):
859 # Only for the Ext template add disk_info to params
860 if template_name == constants.DT_EXT:
861 params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
863 if key not in constants.IDISK_PARAMS:
864 params[key] = disk[key]
865 disk_index = idx + base_index
866 size = disk[constants.IDISK_SIZE]
867 feedback_fn("* disk %s, size %s" %
868 (disk_index, utils.FormatUnit(size, "h")))
869 disk_dev = objects.Disk(dev_type=dev_type, size=size,
870 logical_id=logical_id_fn(idx, disk_index, disk),
871 iv_name="disk/%d" % disk_index,
872 mode=disk[constants.IDISK_MODE],
874 disk_dev.name = disk.get(constants.IDISK_NAME, None)
875 disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
876 disks.append(disk_dev)
881 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open,
883 """Create a single block device on a given node.
885 This will not recurse over children of the device, so they must be
888 @param lu: the lu on whose behalf we execute
889 @param node: the node on which to create the device
890 @type instance: L{objects.Instance}
891 @param instance: the instance which owns the device
892 @type device: L{objects.Disk}
893 @param device: the device to create
894 @param info: the extra 'metadata' we should attach to the device
895 (this will be represented as a LVM tag)
896 @type force_open: boolean
897 @param force_open: this parameter will be passes to the
898 L{backend.BlockdevCreate} function where it specifies
899 whether we run on primary or not, and it affects both
900 the child assembly and the device own Open() execution
901 @type excl_stor: boolean
902 @param excl_stor: Whether exclusive_storage is active for the node
905 lu.cfg.SetDiskID(device, node)
906 result = lu.rpc.call_blockdev_create(node, device, device.size,
907 instance.name, force_open, info,
909 result.Raise("Can't create block device %s on"
910 " node %s for instance %s" % (device, node, instance.name))
911 if device.physical_id is None:
912 device.physical_id = result.payload
915 def _CreateBlockDevInner(lu, node, instance, device, force_create,
916 info, force_open, excl_stor):
917 """Create a tree of block devices on a given node.
919 If this device type has to be created on secondaries, create it and
922 If not, just recurse to children keeping the same 'force' value.
924 @attention: The device has to be annotated already.
926 @param lu: the lu on whose behalf we execute
927 @param node: the node on which to create the device
928 @type instance: L{objects.Instance}
929 @param instance: the instance which owns the device
930 @type device: L{objects.Disk}
931 @param device: the device to create
932 @type force_create: boolean
933 @param force_create: whether to force creation of this device; this
934 will be change to True whenever we find a device which has
935 CreateOnSecondary() attribute
936 @param info: the extra 'metadata' we should attach to the device
937 (this will be represented as a LVM tag)
938 @type force_open: boolean
939 @param force_open: this parameter will be passes to the
940 L{backend.BlockdevCreate} function where it specifies
941 whether we run on primary or not, and it affects both
942 the child assembly and the device own Open() execution
943 @type excl_stor: boolean
944 @param excl_stor: Whether exclusive_storage is active for the node
946 @return: list of created devices
950 if device.CreateOnSecondary():
954 for child in device.children:
955 devs = _CreateBlockDevInner(lu, node, instance, child, force_create,
956 info, force_open, excl_stor)
957 created_devices.extend(devs)
960 return created_devices
962 _CreateSingleBlockDev(lu, node, instance, device, info, force_open,
964 # The device has been completely created, so there is no point in keeping
965 # its subdevices in the list. We just add the device itself instead.
966 created_devices = [(node, device)]
967 return created_devices
969 except errors.DeviceCreationError, e:
970 e.created_devices.extend(created_devices)
972 except errors.OpExecError, e:
973 raise errors.DeviceCreationError(str(e), created_devices)
976 def _IsExclusiveStorageEnabledNodeName(cfg, nodename):
977 """Whether exclusive_storage is in effect for the given node.
979 @type cfg: L{config.ConfigWriter}
980 @param cfg: The cluster configuration
981 @type nodename: string
982 @param nodename: The node
984 @return: The effective value of exclusive_storage
985 @raise errors.OpPrereqError: if no node exists with the given name
988 ni = cfg.GetNodeInfo(nodename)
990 raise errors.OpPrereqError("Invalid node name %s" % nodename,
992 return _IsExclusiveStorageEnabledNode(cfg, ni)
995 def _CreateBlockDev(lu, node, instance, device, force_create, info,
997 """Wrapper around L{_CreateBlockDevInner}.
999 This method annotates the root device first.
1002 (disk,) = _AnnotateDiskParams(instance, [device], lu.cfg)
1003 excl_stor = _IsExclusiveStorageEnabledNodeName(lu.cfg, node)
1004 return _CreateBlockDevInner(lu, node, instance, disk, force_create, info,
1005 force_open, excl_stor)
1008 def _CreateDisks(lu, instance, to_skip=None, target_node=None):
1009 """Create all disks for an instance.
1011 This abstracts away some work from AddInstance.
1013 @type lu: L{LogicalUnit}
1014 @param lu: the logical unit on whose behalf we execute
1015 @type instance: L{objects.Instance}
1016 @param instance: the instance whose disks we should create
1018 @param to_skip: list of indices to skip
1019 @type target_node: string
1020 @param target_node: if passed, overrides the target node for creation
1022 @return: the success of the creation
1025 info = _GetInstanceInfoText(instance)
1026 if target_node is None:
1027 pnode = instance.primary_node
1028 all_nodes = instance.all_nodes
1033 if instance.disk_template in constants.DTS_FILEBASED:
1034 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
1035 result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
1037 result.Raise("Failed to create directory '%s' on"
1038 " node %s" % (file_storage_dir, pnode))
1041 # Note: this needs to be kept in sync with adding of disks in
1042 # LUInstanceSetParams
1043 for idx, device in enumerate(instance.disks):
1044 if to_skip and idx in to_skip:
1046 logging.info("Creating disk %s for instance '%s'", idx, instance.name)
1048 for node in all_nodes:
1049 f_create = node == pnode
1051 _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
1052 disks_created.append((node, device))
1053 except errors.OpExecError:
1054 logging.warning("Creating disk %s for instance '%s' failed",
1056 except errors.DeviceCreationError, e:
1057 logging.warning("Creating disk %s for instance '%s' failed",
1059 disks_created.extend(e.created_devices)
1060 for (node, disk) in disks_created:
1061 lu.cfg.SetDiskID(disk, node)
1062 result = lu.rpc.call_blockdev_remove(node, disk)
1064 logging.warning("Failed to remove newly-created disk %s on node %s:"
1065 " %s", device, node, result.fail_msg)
1066 raise errors.OpExecError(e.message)
1069 def _CalcEta(time_taken, written, total_size):
1070 """Calculates the ETA based on size written and total size.
1072 @param time_taken: The time taken so far
1073 @param written: amount written so far
1074 @param total_size: The total size of data to be written
1075 @return: The remaining time in seconds
1078 avg_time = time_taken / float(written)
1079 return (total_size - written) * avg_time
1082 def _WipeDisks(lu, instance, disks=None):
1083 """Wipes instance disks.
1085 @type lu: L{LogicalUnit}
1086 @param lu: the logical unit on whose behalf we execute
1087 @type instance: L{objects.Instance}
1088 @param instance: the instance whose disks we should create
1089 @type disks: None or list of tuple of (number, L{objects.Disk}, number)
1090 @param disks: Disk details; tuple contains disk index, disk object and the
1094 node = instance.primary_node
1097 disks = [(idx, disk, 0)
1098 for (idx, disk) in enumerate(instance.disks)]
1100 for (_, device, _) in disks:
1101 lu.cfg.SetDiskID(device, node)
1103 logging.info("Pausing synchronization of disks of instance '%s'",
1105 result = lu.rpc.call_blockdev_pause_resume_sync(node,
1106 (map(compat.snd, disks),
1109 result.Raise("Failed to pause disk synchronization on node '%s'" % node)
1111 for idx, success in enumerate(result.payload):
1113 logging.warn("Pausing synchronization of disk %s of instance '%s'"
1114 " failed", idx, instance.name)
1117 for (idx, device, offset) in disks:
1118 # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
1119 # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
1121 int(min(constants.MAX_WIPE_CHUNK,
1122 device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
1126 start_time = time.time()
1131 info_text = (" (from %s to %s)" %
1132 (utils.FormatUnit(offset, "h"),
1133 utils.FormatUnit(size, "h")))
1135 lu.LogInfo("* Wiping disk %s%s", idx, info_text)
1137 logging.info("Wiping disk %d for instance %s on node %s using"
1138 " chunk size %s", idx, instance.name, node, wipe_chunk_size)
1140 while offset < size:
1141 wipe_size = min(wipe_chunk_size, size - offset)
1143 logging.debug("Wiping disk %d, offset %s, chunk %s",
1144 idx, offset, wipe_size)
1146 result = lu.rpc.call_blockdev_wipe(node, (device, instance), offset,
1148 result.Raise("Could not wipe disk %d at offset %d for size %d" %
1149 (idx, offset, wipe_size))
1153 if now - last_output >= 60:
1154 eta = _CalcEta(now - start_time, offset, size)
1155 lu.LogInfo(" - done: %.1f%% ETA: %s",
1156 offset / float(size) * 100, utils.FormatSeconds(eta))
1159 logging.info("Resuming synchronization of disks for instance '%s'",
1162 result = lu.rpc.call_blockdev_pause_resume_sync(node,
1163 (map(compat.snd, disks),
1168 lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
1169 node, result.fail_msg)
1171 for idx, success in enumerate(result.payload):
1173 lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
1174 " failed", idx, instance.name)
1177 class LUInstanceCreate(LogicalUnit):
1178 """Create an instance.
1181 HPATH = "instance-add"
1182 HTYPE = constants.HTYPE_INSTANCE
1185 def CheckArguments(self):
1189 # do not require name_check to ease forward/backward compatibility
1191 if self.op.no_install and self.op.start:
1192 self.LogInfo("No-installation mode selected, disabling startup")
1193 self.op.start = False
1194 # validate/normalize the instance name
1195 self.op.instance_name = \
1196 netutils.Hostname.GetNormalizedName(self.op.instance_name)
1198 if self.op.ip_check and not self.op.name_check:
1199 # TODO: make the ip check more flexible and not depend on the name check
1200 raise errors.OpPrereqError("Cannot do IP address check without a name"
1201 " check", errors.ECODE_INVAL)
1203 # check nics' parameter names
1204 for nic in self.op.nics:
1205 utils.ForceDictType(nic, constants.INIC_PARAMS_TYPES)
1206 # check that NIC's parameters names are unique and valid
1207 utils.ValidateDeviceNames("NIC", self.op.nics)
1209 # check that disk's names are unique and valid
1210 utils.ValidateDeviceNames("disk", self.op.disks)
1212 cluster = self.cfg.GetClusterInfo()
1213 if not self.op.disk_template in cluster.enabled_disk_templates:
1214 raise errors.OpPrereqError("Cannot create an instance with disk template"
1215 " '%s', because it is not enabled in the"
1216 " cluster. Enabled disk templates are: %s." %
1217 (self.op.disk_template,
1218 ",".join(cluster.enabled_disk_templates)))
1220 # check disks. parameter names and consistent adopt/no-adopt strategy
1221 has_adopt = has_no_adopt = False
1222 for disk in self.op.disks:
1223 if self.op.disk_template != constants.DT_EXT:
1224 utils.ForceDictType(disk, constants.IDISK_PARAMS_TYPES)
1225 if constants.IDISK_ADOPT in disk:
1229 if has_adopt and has_no_adopt:
1230 raise errors.OpPrereqError("Either all disks are adopted or none is",
1233 if self.op.disk_template not in constants.DTS_MAY_ADOPT:
1234 raise errors.OpPrereqError("Disk adoption is not supported for the"
1235 " '%s' disk template" %
1236 self.op.disk_template,
1238 if self.op.iallocator is not None:
1239 raise errors.OpPrereqError("Disk adoption not allowed with an"
1240 " iallocator script", errors.ECODE_INVAL)
1241 if self.op.mode == constants.INSTANCE_IMPORT:
1242 raise errors.OpPrereqError("Disk adoption not allowed for"
1243 " instance import", errors.ECODE_INVAL)
1245 if self.op.disk_template in constants.DTS_MUST_ADOPT:
1246 raise errors.OpPrereqError("Disk template %s requires disk adoption,"
1247 " but no 'adopt' parameter given" %
1248 self.op.disk_template,
1251 self.adopt_disks = has_adopt
1253 # instance name verification
1254 if self.op.name_check:
1255 self.hostname1 = _CheckHostnameSane(self, self.op.instance_name)
1256 self.op.instance_name = self.hostname1.name
1257 # used in CheckPrereq for ip ping check
1258 self.check_ip = self.hostname1.ip
1260 self.check_ip = None
1262 # file storage checks
1263 if (self.op.file_driver and
1264 not self.op.file_driver in constants.FILE_DRIVER):
1265 raise errors.OpPrereqError("Invalid file driver name '%s'" %
1266 self.op.file_driver, errors.ECODE_INVAL)
1268 if self.op.disk_template == constants.DT_FILE:
1269 opcodes.RequireFileStorage()
1270 elif self.op.disk_template == constants.DT_SHARED_FILE:
1271 opcodes.RequireSharedFileStorage()
1273 ### Node/iallocator related checks
1274 _CheckIAllocatorOrNode(self, "iallocator", "pnode")
1276 if self.op.pnode is not None:
1277 if self.op.disk_template in constants.DTS_INT_MIRROR:
1278 if self.op.snode is None:
1279 raise errors.OpPrereqError("The networked disk templates need"
1280 " a mirror node", errors.ECODE_INVAL)
1282 self.LogWarning("Secondary node will be ignored on non-mirrored disk"
1284 self.op.snode = None
1286 _CheckOpportunisticLocking(self.op)
1288 self._cds = _GetClusterDomainSecret()
1290 if self.op.mode == constants.INSTANCE_IMPORT:
1291 # On import force_variant must be True, because if we forced it at
1292 # initial install, our only chance when importing it back is that it
1294 self.op.force_variant = True
1296 if self.op.no_install:
1297 self.LogInfo("No-installation mode has no effect during import")
1299 elif self.op.mode == constants.INSTANCE_CREATE:
1300 if self.op.os_type is None:
1301 raise errors.OpPrereqError("No guest OS specified",
1303 if self.op.os_type in self.cfg.GetClusterInfo().blacklisted_os:
1304 raise errors.OpPrereqError("Guest OS '%s' is not allowed for"
1305 " installation" % self.op.os_type,
1307 if self.op.disk_template is None:
1308 raise errors.OpPrereqError("No disk template specified",
1311 elif self.op.mode == constants.INSTANCE_REMOTE_IMPORT:
1312 # Check handshake to ensure both clusters have the same domain secret
1313 src_handshake = self.op.source_handshake
1314 if not src_handshake:
1315 raise errors.OpPrereqError("Missing source handshake",
1318 errmsg = masterd.instance.CheckRemoteExportHandshake(self._cds,
1321 raise errors.OpPrereqError("Invalid handshake: %s" % errmsg,
1324 # Load and check source CA
1325 self.source_x509_ca_pem = self.op.source_x509_ca
1326 if not self.source_x509_ca_pem:
1327 raise errors.OpPrereqError("Missing source X509 CA",
1331 (cert, _) = utils.LoadSignedX509Certificate(self.source_x509_ca_pem,
1333 except OpenSSL.crypto.Error, err:
1334 raise errors.OpPrereqError("Unable to load source X509 CA (%s)" %
1335 (err, ), errors.ECODE_INVAL)
1337 (errcode, msg) = utils.VerifyX509Certificate(cert, None, None)
1338 if errcode is not None:
1339 raise errors.OpPrereqError("Invalid source X509 CA (%s)" % (msg, ),
1342 self.source_x509_ca = cert
1344 src_instance_name = self.op.source_instance_name
1345 if not src_instance_name:
1346 raise errors.OpPrereqError("Missing source instance name",
1349 self.source_instance_name = \
1350 netutils.GetHostname(name=src_instance_name).name
1353 raise errors.OpPrereqError("Invalid instance creation mode %r" %
1354 self.op.mode, errors.ECODE_INVAL)
1356 def ExpandNames(self):
1357 """ExpandNames for CreateInstance.
1359 Figure out the right locks for instance creation.
1362 self.needed_locks = {}
1364 instance_name = self.op.instance_name
1365 # this is just a preventive check, but someone might still add this
1366 # instance in the meantime, and creation will fail at lock-add time
1367 if instance_name in self.cfg.GetInstanceList():
1368 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
1369 instance_name, errors.ECODE_EXISTS)
1371 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
1373 if self.op.iallocator:
1374 # TODO: Find a solution to not lock all nodes in the cluster, e.g. by
1375 # specifying a group on instance creation and then selecting nodes from
1377 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1378 self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1380 if self.op.opportunistic_locking:
1381 self.opportunistic_locks[locking.LEVEL_NODE] = True
1382 self.opportunistic_locks[locking.LEVEL_NODE_RES] = True
1384 self.op.pnode = _ExpandNodeName(self.cfg, self.op.pnode)
1385 nodelist = [self.op.pnode]
1386 if self.op.snode is not None:
1387 self.op.snode = _ExpandNodeName(self.cfg, self.op.snode)
1388 nodelist.append(self.op.snode)
1389 self.needed_locks[locking.LEVEL_NODE] = nodelist
1391 # in case of import lock the source node too
1392 if self.op.mode == constants.INSTANCE_IMPORT:
1393 src_node = self.op.src_node
1394 src_path = self.op.src_path
1396 if src_path is None:
1397 self.op.src_path = src_path = self.op.instance_name
1399 if src_node is None:
1400 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1401 self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1402 self.op.src_node = None
1403 if os.path.isabs(src_path):
1404 raise errors.OpPrereqError("Importing an instance from a path"
1405 " requires a source node option",
1408 self.op.src_node = src_node = _ExpandNodeName(self.cfg, src_node)
1409 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
1410 self.needed_locks[locking.LEVEL_NODE].append(src_node)
1411 if not os.path.isabs(src_path):
1412 self.op.src_path = src_path = \
1413 utils.PathJoin(pathutils.EXPORT_DIR, src_path)
1415 self.needed_locks[locking.LEVEL_NODE_RES] = \
1416 _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1418 def _RunAllocator(self):
1419 """Run the allocator based on input opcode.
1422 if self.op.opportunistic_locking:
1423 # Only consider nodes for which a lock is held
1424 node_whitelist = list(self.owned_locks(locking.LEVEL_NODE))
1426 node_whitelist = None
1428 #TODO Export network to iallocator so that it chooses a pnode
1429 # in a nodegroup that has the desired network connected to
1430 req = _CreateInstanceAllocRequest(self.op, self.disks,
1431 self.nics, self.be_full,
1433 ial = iallocator.IAllocator(self.cfg, self.rpc, req)
1435 ial.Run(self.op.iallocator)
1438 # When opportunistic locks are used only a temporary failure is generated
1439 if self.op.opportunistic_locking:
1440 ecode = errors.ECODE_TEMP_NORES
1442 ecode = errors.ECODE_NORES
1444 raise errors.OpPrereqError("Can't compute nodes using"
1445 " iallocator '%s': %s" %
1446 (self.op.iallocator, ial.info),
1449 self.op.pnode = ial.result[0]
1450 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
1451 self.op.instance_name, self.op.iallocator,
1452 utils.CommaJoin(ial.result))
1454 assert req.RequiredNodes() in (1, 2), "Wrong node count from iallocator"
1456 if req.RequiredNodes() == 2:
1457 self.op.snode = ial.result[1]
1459 def BuildHooksEnv(self):
1462 This runs on master, primary and secondary nodes of the instance.
1466 "ADD_MODE": self.op.mode,
1468 if self.op.mode == constants.INSTANCE_IMPORT:
1469 env["SRC_NODE"] = self.op.src_node
1470 env["SRC_PATH"] = self.op.src_path
1471 env["SRC_IMAGES"] = self.src_images
1473 env.update(_BuildInstanceHookEnv(
1474 name=self.op.instance_name,
1475 primary_node=self.op.pnode,
1476 secondary_nodes=self.secondaries,
1477 status=self.op.start,
1478 os_type=self.op.os_type,
1479 minmem=self.be_full[constants.BE_MINMEM],
1480 maxmem=self.be_full[constants.BE_MAXMEM],
1481 vcpus=self.be_full[constants.BE_VCPUS],
1482 nics=_NICListToTuple(self, self.nics),
1483 disk_template=self.op.disk_template,
1484 disks=[(d[constants.IDISK_NAME], d[constants.IDISK_SIZE],
1485 d[constants.IDISK_MODE]) for d in self.disks],
1488 hypervisor_name=self.op.hypervisor,
1494 def BuildHooksNodes(self):
1495 """Build hooks nodes.
1498 nl = [self.cfg.GetMasterNode(), self.op.pnode] + self.secondaries
1501 def _ReadExportInfo(self):
1502 """Reads the export information from disk.
1504 It will override the opcode source node and path with the actual
1505 information, if these two were not specified before.
1507 @return: the export information
1510 assert self.op.mode == constants.INSTANCE_IMPORT
1512 src_node = self.op.src_node
1513 src_path = self.op.src_path
1515 if src_node is None:
1516 locked_nodes = self.owned_locks(locking.LEVEL_NODE)
1517 exp_list = self.rpc.call_export_list(locked_nodes)
1519 for node in exp_list:
1520 if exp_list[node].fail_msg:
1522 if src_path in exp_list[node].payload:
1524 self.op.src_node = src_node = node
1525 self.op.src_path = src_path = utils.PathJoin(pathutils.EXPORT_DIR,
1529 raise errors.OpPrereqError("No export found for relative path %s" %
1530 src_path, errors.ECODE_INVAL)
1532 _CheckNodeOnline(self, src_node)
1533 result = self.rpc.call_export_info(src_node, src_path)
1534 result.Raise("No export or invalid export found in dir %s" % src_path)
1536 export_info = objects.SerializableConfigParser.Loads(str(result.payload))
1537 if not export_info.has_section(constants.INISECT_EXP):
1538 raise errors.ProgrammerError("Corrupted export config",
1539 errors.ECODE_ENVIRON)
1541 ei_version = export_info.get(constants.INISECT_EXP, "version")
1542 if (int(ei_version) != constants.EXPORT_VERSION):
1543 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
1544 (ei_version, constants.EXPORT_VERSION),
1545 errors.ECODE_ENVIRON)
1548 def _ReadExportParams(self, einfo):
1549 """Use export parameters as defaults.
1551 In case the opcode doesn't specify (as in override) some instance
1552 parameters, then try to use them from the export information, if
1556 self.op.os_type = einfo.get(constants.INISECT_EXP, "os")
1558 if self.op.disk_template is None:
1559 if einfo.has_option(constants.INISECT_INS, "disk_template"):
1560 self.op.disk_template = einfo.get(constants.INISECT_INS,
1562 if self.op.disk_template not in constants.DISK_TEMPLATES:
1563 raise errors.OpPrereqError("Disk template specified in configuration"
1564 " file is not one of the allowed values:"
1566 " ".join(constants.DISK_TEMPLATES),
1569 raise errors.OpPrereqError("No disk template specified and the export"
1570 " is missing the disk_template information",
1573 if not self.op.disks:
1575 # TODO: import the disk iv_name too
1576 for idx in range(constants.MAX_DISKS):
1577 if einfo.has_option(constants.INISECT_INS, "disk%d_size" % idx):
1578 disk_sz = einfo.getint(constants.INISECT_INS, "disk%d_size" % idx)
1579 disks.append({constants.IDISK_SIZE: disk_sz})
1580 self.op.disks = disks
1581 if not disks and self.op.disk_template != constants.DT_DISKLESS:
1582 raise errors.OpPrereqError("No disk info specified and the export"
1583 " is missing the disk information",
1586 if not self.op.nics:
1588 for idx in range(constants.MAX_NICS):
1589 if einfo.has_option(constants.INISECT_INS, "nic%d_mac" % idx):
1591 for name in list(constants.NICS_PARAMETERS) + ["ip", "mac"]:
1592 v = einfo.get(constants.INISECT_INS, "nic%d_%s" % (idx, name))
1599 if not self.op.tags and einfo.has_option(constants.INISECT_INS, "tags"):
1600 self.op.tags = einfo.get(constants.INISECT_INS, "tags").split()
1602 if (self.op.hypervisor is None and
1603 einfo.has_option(constants.INISECT_INS, "hypervisor")):
1604 self.op.hypervisor = einfo.get(constants.INISECT_INS, "hypervisor")
1606 if einfo.has_section(constants.INISECT_HYP):
1607 # use the export parameters but do not override the ones
1608 # specified by the user
1609 for name, value in einfo.items(constants.INISECT_HYP):
1610 if name not in self.op.hvparams:
1611 self.op.hvparams[name] = value
1613 if einfo.has_section(constants.INISECT_BEP):
1614 # use the parameters, without overriding
1615 for name, value in einfo.items(constants.INISECT_BEP):
1616 if name not in self.op.beparams:
1617 self.op.beparams[name] = value
1618 # Compatibility for the old "memory" be param
1619 if name == constants.BE_MEMORY:
1620 if constants.BE_MAXMEM not in self.op.beparams:
1621 self.op.beparams[constants.BE_MAXMEM] = value
1622 if constants.BE_MINMEM not in self.op.beparams:
1623 self.op.beparams[constants.BE_MINMEM] = value
1625 # try to read the parameters old style, from the main section
1626 for name in constants.BES_PARAMETERS:
1627 if (name not in self.op.beparams and
1628 einfo.has_option(constants.INISECT_INS, name)):
1629 self.op.beparams[name] = einfo.get(constants.INISECT_INS, name)
1631 if einfo.has_section(constants.INISECT_OSP):
1632 # use the parameters, without overriding
1633 for name, value in einfo.items(constants.INISECT_OSP):
1634 if name not in self.op.osparams:
1635 self.op.osparams[name] = value
1637 def _RevertToDefaults(self, cluster):
1638 """Revert the instance parameters to the default values.
1642 hv_defs = cluster.SimpleFillHV(self.op.hypervisor, self.op.os_type, {})
1643 for name in self.op.hvparams.keys():
1644 if name in hv_defs and hv_defs[name] == self.op.hvparams[name]:
1645 del self.op.hvparams[name]
1647 be_defs = cluster.SimpleFillBE({})
1648 for name in self.op.beparams.keys():
1649 if name in be_defs and be_defs[name] == self.op.beparams[name]:
1650 del self.op.beparams[name]
1652 nic_defs = cluster.SimpleFillNIC({})
1653 for nic in self.op.nics:
1654 for name in constants.NICS_PARAMETERS:
1655 if name in nic and name in nic_defs and nic[name] == nic_defs[name]:
1658 os_defs = cluster.SimpleFillOS(self.op.os_type, {})
1659 for name in self.op.osparams.keys():
1660 if name in os_defs and os_defs[name] == self.op.osparams[name]:
1661 del self.op.osparams[name]
1663 def _CalculateFileStorageDir(self):
1664 """Calculate final instance file storage dir.
1667 # file storage dir calculation/check
1668 self.instance_file_storage_dir = None
1669 if self.op.disk_template in constants.DTS_FILEBASED:
1670 # build the full file storage dir path
1673 if self.op.disk_template == constants.DT_SHARED_FILE:
1674 get_fsd_fn = self.cfg.GetSharedFileStorageDir
1676 get_fsd_fn = self.cfg.GetFileStorageDir
1678 cfg_storagedir = get_fsd_fn()
1679 if not cfg_storagedir:
1680 raise errors.OpPrereqError("Cluster file storage dir not defined",
1682 joinargs.append(cfg_storagedir)
1684 if self.op.file_storage_dir is not None:
1685 joinargs.append(self.op.file_storage_dir)
1687 joinargs.append(self.op.instance_name)
1689 # pylint: disable=W0142
1690 self.instance_file_storage_dir = utils.PathJoin(*joinargs)
1692 def CheckPrereq(self): # pylint: disable=R0914
1693 """Check prerequisites.
1696 self._CalculateFileStorageDir()
1698 if self.op.mode == constants.INSTANCE_IMPORT:
1699 export_info = self._ReadExportInfo()
1700 self._ReadExportParams(export_info)
1701 self._old_instance_name = export_info.get(constants.INISECT_INS, "name")
1703 self._old_instance_name = None
1705 if (not self.cfg.GetVGName() and
1706 self.op.disk_template not in constants.DTS_NOT_LVM):
1707 raise errors.OpPrereqError("Cluster does not support lvm-based"
1708 " instances", errors.ECODE_STATE)
1710 if (self.op.hypervisor is None or
1711 self.op.hypervisor == constants.VALUE_AUTO):
1712 self.op.hypervisor = self.cfg.GetHypervisorType()
1714 cluster = self.cfg.GetClusterInfo()
1715 enabled_hvs = cluster.enabled_hypervisors
1716 if self.op.hypervisor not in enabled_hvs:
1717 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
1719 (self.op.hypervisor, ",".join(enabled_hvs)),
1722 # Check tag validity
1723 for tag in self.op.tags:
1724 objects.TaggableObject.ValidateTag(tag)
1726 # check hypervisor parameter syntax (locally)
1727 utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
1728 filled_hvp = cluster.SimpleFillHV(self.op.hypervisor, self.op.os_type,
1730 hv_type = hypervisor.GetHypervisorClass(self.op.hypervisor)
1731 hv_type.CheckParameterSyntax(filled_hvp)
1732 self.hv_full = filled_hvp
1733 # check that we don't specify global parameters on an instance
1734 _CheckParamsNotGlobal(self.op.hvparams, constants.HVC_GLOBALS, "hypervisor",
1735 "instance", "cluster")
1737 # fill and remember the beparams dict
1738 self.be_full = _ComputeFullBeParams(self.op, cluster)
1740 # build os parameters
1741 self.os_full = cluster.SimpleFillOS(self.op.os_type, self.op.osparams)
1743 # now that hvp/bep are in final format, let's reset to defaults,
1745 if self.op.identify_defaults:
1746 self._RevertToDefaults(cluster)
1749 self.nics = _ComputeNics(self.op, cluster, self.check_ip, self.cfg,
1750 self.proc.GetECId())
1752 # disk checks/pre-build
1753 default_vg = self.cfg.GetVGName()
1754 self.disks = _ComputeDisks(self.op, default_vg)
1756 if self.op.mode == constants.INSTANCE_IMPORT:
1758 for idx in range(len(self.disks)):
1759 option = "disk%d_dump" % idx
1760 if export_info.has_option(constants.INISECT_INS, option):
1761 # FIXME: are the old os-es, disk sizes, etc. useful?
1762 export_name = export_info.get(constants.INISECT_INS, option)
1763 image = utils.PathJoin(self.op.src_path, export_name)
1764 disk_images.append(image)
1766 disk_images.append(False)
1768 self.src_images = disk_images
1770 if self.op.instance_name == self._old_instance_name:
1771 for idx, nic in enumerate(self.nics):
1772 if nic.mac == constants.VALUE_AUTO:
1773 nic_mac_ini = "nic%d_mac" % idx
1774 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
1776 # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
1778 # ip ping checks (we use the same ip that was resolved in ExpandNames)
1779 if self.op.ip_check:
1780 if netutils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
1781 raise errors.OpPrereqError("IP %s of instance %s already in use" %
1782 (self.check_ip, self.op.instance_name),
1783 errors.ECODE_NOTUNIQUE)
1785 #### mac address generation
1786 # By generating here the mac address both the allocator and the hooks get
1787 # the real final mac address rather than the 'auto' or 'generate' value.
1788 # There is a race condition between the generation and the instance object
1789 # creation, which means that we know the mac is valid now, but we're not
1790 # sure it will be when we actually add the instance. If things go bad
1791 # adding the instance will abort because of a duplicate mac, and the
1792 # creation job will fail.
1793 for nic in self.nics:
1794 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
1795 nic.mac = self.cfg.GenerateMAC(nic.network, self.proc.GetECId())
1799 if self.op.iallocator is not None:
1800 self._RunAllocator()
1802 # Release all unneeded node locks
1803 keep_locks = filter(None, [self.op.pnode, self.op.snode, self.op.src_node])
1804 _ReleaseLocks(self, locking.LEVEL_NODE, keep=keep_locks)
1805 _ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=keep_locks)
1806 _ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
1808 assert (self.owned_locks(locking.LEVEL_NODE) ==
1809 self.owned_locks(locking.LEVEL_NODE_RES)), \
1810 "Node locks differ from node resource locks"
1812 #### node related checks
1814 # check primary node
1815 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
1816 assert self.pnode is not None, \
1817 "Cannot retrieve locked node %s" % self.op.pnode
1819 raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
1820 pnode.name, errors.ECODE_STATE)
1822 raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
1823 pnode.name, errors.ECODE_STATE)
1824 if not pnode.vm_capable:
1825 raise errors.OpPrereqError("Cannot use non-vm_capable primary node"
1826 " '%s'" % pnode.name, errors.ECODE_STATE)
1828 self.secondaries = []
1830 # Fill in any IPs from IP pools. This must happen here, because we need to
1831 # know the nic's primary node, as specified by the iallocator
1832 for idx, nic in enumerate(self.nics):
1833 net_uuid = nic.network
1834 if net_uuid is not None:
1835 nobj = self.cfg.GetNetwork(net_uuid)
1836 netparams = self.cfg.GetGroupNetParams(net_uuid, self.pnode.name)
1837 if netparams is None:
1838 raise errors.OpPrereqError("No netparams found for network"
1839 " %s. Propably not connected to"
1840 " node's %s nodegroup" %
1841 (nobj.name, self.pnode.name),
1843 self.LogInfo("NIC/%d inherits netparams %s" %
1844 (idx, netparams.values()))
1845 nic.nicparams = dict(netparams)
1846 if nic.ip is not None:
1847 if nic.ip.lower() == constants.NIC_IP_POOL:
1849 nic.ip = self.cfg.GenerateIp(net_uuid, self.proc.GetECId())
1850 except errors.ReservationError:
1851 raise errors.OpPrereqError("Unable to get a free IP for NIC %d"
1852 " from the address pool" % idx,
1854 self.LogInfo("Chose IP %s from network %s", nic.ip, nobj.name)
1857 self.cfg.ReserveIp(net_uuid, nic.ip, self.proc.GetECId())
1858 except errors.ReservationError:
1859 raise errors.OpPrereqError("IP address %s already in use"
1860 " or does not belong to network %s" %
1861 (nic.ip, nobj.name),
1862 errors.ECODE_NOTUNIQUE)
1864 # net is None, ip None or given
1865 elif self.op.conflicts_check:
1866 _CheckForConflictingIp(self, nic.ip, self.pnode.name)
1868 # mirror node verification
1869 if self.op.disk_template in constants.DTS_INT_MIRROR:
1870 if self.op.snode == pnode.name:
1871 raise errors.OpPrereqError("The secondary node cannot be the"
1872 " primary node", errors.ECODE_INVAL)
1873 _CheckNodeOnline(self, self.op.snode)
1874 _CheckNodeNotDrained(self, self.op.snode)
1875 _CheckNodeVmCapable(self, self.op.snode)
1876 self.secondaries.append(self.op.snode)
1878 snode = self.cfg.GetNodeInfo(self.op.snode)
1879 if pnode.group != snode.group:
1880 self.LogWarning("The primary and secondary nodes are in two"
1881 " different node groups; the disk parameters"
1882 " from the first disk's node group will be"
1885 if not self.op.disk_template in constants.DTS_EXCL_STORAGE:
1887 if self.op.disk_template in constants.DTS_INT_MIRROR:
1889 has_es = lambda n: _IsExclusiveStorageEnabledNode(self.cfg, n)
1890 if compat.any(map(has_es, nodes)):
1891 raise errors.OpPrereqError("Disk template %s not supported with"
1892 " exclusive storage" % self.op.disk_template,
1895 nodenames = [pnode.name] + self.secondaries
1897 if not self.adopt_disks:
1898 if self.op.disk_template == constants.DT_RBD:
1899 # _CheckRADOSFreeSpace() is just a placeholder.
1900 # Any function that checks prerequisites can be placed here.
1901 # Check if there is enough space on the RADOS cluster.
1902 _CheckRADOSFreeSpace()
1903 elif self.op.disk_template == constants.DT_EXT:
1904 # FIXME: Function that checks prereqs if needed
1907 # Check lv size requirements, if not adopting
1908 req_sizes = _ComputeDiskSizePerVG(self.op.disk_template, self.disks)
1909 _CheckNodesFreeDiskPerVG(self, nodenames, req_sizes)
1911 elif self.op.disk_template == constants.DT_PLAIN: # Check the adoption data
1912 all_lvs = set(["%s/%s" % (disk[constants.IDISK_VG],
1913 disk[constants.IDISK_ADOPT])
1914 for disk in self.disks])
1915 if len(all_lvs) != len(self.disks):
1916 raise errors.OpPrereqError("Duplicate volume names given for adoption",
1918 for lv_name in all_lvs:
1920 # FIXME: lv_name here is "vg/lv" need to ensure that other calls
1921 # to ReserveLV uses the same syntax
1922 self.cfg.ReserveLV(lv_name, self.proc.GetECId())
1923 except errors.ReservationError:
1924 raise errors.OpPrereqError("LV named %s used by another instance" %
1925 lv_name, errors.ECODE_NOTUNIQUE)
1927 vg_names = self.rpc.call_vg_list([pnode.name])[pnode.name]
1928 vg_names.Raise("Cannot get VG information from node %s" % pnode.name)
1930 node_lvs = self.rpc.call_lv_list([pnode.name],
1931 vg_names.payload.keys())[pnode.name]
1932 node_lvs.Raise("Cannot get LV information from node %s" % pnode.name)
1933 node_lvs = node_lvs.payload
1935 delta = all_lvs.difference(node_lvs.keys())
1937 raise errors.OpPrereqError("Missing logical volume(s): %s" %
1938 utils.CommaJoin(delta),
1940 online_lvs = [lv for lv in all_lvs if node_lvs[lv][2]]
1942 raise errors.OpPrereqError("Online logical volumes found, cannot"
1943 " adopt: %s" % utils.CommaJoin(online_lvs),
1945 # update the size of disk based on what is found
1946 for dsk in self.disks:
1947 dsk[constants.IDISK_SIZE] = \
1948 int(float(node_lvs["%s/%s" % (dsk[constants.IDISK_VG],
1949 dsk[constants.IDISK_ADOPT])][0]))
1951 elif self.op.disk_template == constants.DT_BLOCK:
1952 # Normalize and de-duplicate device paths
1953 all_disks = set([os.path.abspath(disk[constants.IDISK_ADOPT])
1954 for disk in self.disks])
1955 if len(all_disks) != len(self.disks):
1956 raise errors.OpPrereqError("Duplicate disk names given for adoption",
1958 baddisks = [d for d in all_disks
1959 if not d.startswith(constants.ADOPTABLE_BLOCKDEV_ROOT)]
1961 raise errors.OpPrereqError("Device node(s) %s lie outside %s and"
1962 " cannot be adopted" %
1963 (utils.CommaJoin(baddisks),
1964 constants.ADOPTABLE_BLOCKDEV_ROOT),
1967 node_disks = self.rpc.call_bdev_sizes([pnode.name],
1968 list(all_disks))[pnode.name]
1969 node_disks.Raise("Cannot get block device information from node %s" %
1971 node_disks = node_disks.payload
1972 delta = all_disks.difference(node_disks.keys())
1974 raise errors.OpPrereqError("Missing block device(s): %s" %
1975 utils.CommaJoin(delta),
1977 for dsk in self.disks:
1978 dsk[constants.IDISK_SIZE] = \
1979 int(float(node_disks[dsk[constants.IDISK_ADOPT]]))
1981 # Verify instance specs
1982 spindle_use = self.be_full.get(constants.BE_SPINDLE_USE, None)
1984 constants.ISPEC_MEM_SIZE: self.be_full.get(constants.BE_MAXMEM, None),
1985 constants.ISPEC_CPU_COUNT: self.be_full.get(constants.BE_VCPUS, None),
1986 constants.ISPEC_DISK_COUNT: len(self.disks),
1987 constants.ISPEC_DISK_SIZE: [disk[constants.IDISK_SIZE]
1988 for disk in self.disks],
1989 constants.ISPEC_NIC_COUNT: len(self.nics),
1990 constants.ISPEC_SPINDLE_USE: spindle_use,
1993 group_info = self.cfg.GetNodeGroup(pnode.group)
1994 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, group_info)
1995 res = _ComputeIPolicyInstanceSpecViolation(ipolicy, ispec,
1996 self.op.disk_template)
1997 if not self.op.ignore_ipolicy and res:
1998 msg = ("Instance allocation to group %s (%s) violates policy: %s" %
1999 (pnode.group, group_info.name, utils.CommaJoin(res)))
2000 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
2002 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
2004 _CheckNodeHasOS(self, pnode.name, self.op.os_type, self.op.force_variant)
2005 # check OS parameters (remotely)
2006 _CheckOSParams(self, True, nodenames, self.op.os_type, self.os_full)
2008 _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
2010 #TODO: _CheckExtParams (remotely)
2011 # Check parameters for extstorage
2013 # memory check on primary node
2014 #TODO(dynmem): use MINMEM for checking
2016 _CheckNodeFreeMemory(self, self.pnode.name,
2017 "creating instance %s" % self.op.instance_name,
2018 self.be_full[constants.BE_MAXMEM],
2021 self.dry_run_result = list(nodenames)
2023 def Exec(self, feedback_fn):
2024 """Create and add the instance to the cluster.
2027 instance = self.op.instance_name
2028 pnode_name = self.pnode.name
2030 assert not (self.owned_locks(locking.LEVEL_NODE_RES) -
2031 self.owned_locks(locking.LEVEL_NODE)), \
2032 "Node locks differ from node resource locks"
2033 assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
2035 ht_kind = self.op.hypervisor
2036 if ht_kind in constants.HTS_REQ_PORT:
2037 network_port = self.cfg.AllocatePort()
2041 # This is ugly but we got a chicken-egg problem here
2042 # We can only take the group disk parameters, as the instance
2043 # has no disks yet (we are generating them right here).
2044 node = self.cfg.GetNodeInfo(pnode_name)
2045 nodegroup = self.cfg.GetNodeGroup(node.group)
2046 disks = _GenerateDiskTemplate(self,
2047 self.op.disk_template,
2048 instance, pnode_name,
2051 self.instance_file_storage_dir,
2052 self.op.file_driver,
2055 self.cfg.GetGroupDiskParams(nodegroup))
2057 iobj = objects.Instance(name=instance, os=self.op.os_type,
2058 primary_node=pnode_name,
2059 nics=self.nics, disks=disks,
2060 disk_template=self.op.disk_template,
2061 admin_state=constants.ADMINST_DOWN,
2062 network_port=network_port,
2063 beparams=self.op.beparams,
2064 hvparams=self.op.hvparams,
2065 hypervisor=self.op.hypervisor,
2066 osparams=self.op.osparams,
2070 for tag in self.op.tags:
2073 if self.adopt_disks:
2074 if self.op.disk_template == constants.DT_PLAIN:
2075 # rename LVs to the newly-generated names; we need to construct
2076 # 'fake' LV disks with the old data, plus the new unique_id
2077 tmp_disks = [objects.Disk.FromDict(v.ToDict()) for v in disks]
2079 for t_dsk, a_dsk in zip(tmp_disks, self.disks):
2080 rename_to.append(t_dsk.logical_id)
2081 t_dsk.logical_id = (t_dsk.logical_id[0], a_dsk[constants.IDISK_ADOPT])
2082 self.cfg.SetDiskID(t_dsk, pnode_name)
2083 result = self.rpc.call_blockdev_rename(pnode_name,
2084 zip(tmp_disks, rename_to))
2085 result.Raise("Failed to rename adoped LVs")
2087 feedback_fn("* creating instance disks...")
2089 _CreateDisks(self, iobj)
2090 except errors.OpExecError:
2091 self.LogWarning("Device creation failed")
2092 self.cfg.ReleaseDRBDMinors(instance)
2095 feedback_fn("adding instance %s to cluster config" % instance)
2097 self.cfg.AddInstance(iobj, self.proc.GetECId())
2099 # Declare that we don't want to remove the instance lock anymore, as we've
2100 # added the instance to the config
2101 del self.remove_locks[locking.LEVEL_INSTANCE]
2103 if self.op.mode == constants.INSTANCE_IMPORT:
2104 # Release unused nodes
2105 _ReleaseLocks(self, locking.LEVEL_NODE, keep=[self.op.src_node])
2108 _ReleaseLocks(self, locking.LEVEL_NODE)
2111 if not self.adopt_disks and self.cfg.GetClusterInfo().prealloc_wipe_disks:
2112 feedback_fn("* wiping instance disks...")
2114 _WipeDisks(self, iobj)
2115 except errors.OpExecError, err:
2116 logging.exception("Wiping disks failed")
2117 self.LogWarning("Wiping instance disks failed (%s)", err)
2121 # Something is already wrong with the disks, don't do anything else
2123 elif self.op.wait_for_sync:
2124 disk_abort = not _WaitForSync(self, iobj)
2125 elif iobj.disk_template in constants.DTS_INT_MIRROR:
2126 # make sure the disks are not degraded (still sync-ing is ok)
2127 feedback_fn("* checking mirrors status")
2128 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
2133 _RemoveDisks(self, iobj)
2134 self.cfg.RemoveInstance(iobj.name)
2135 # Make sure the instance lock gets removed
2136 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
2137 raise errors.OpExecError("There are some degraded disks for"
2140 # Release all node resource locks
2141 _ReleaseLocks(self, locking.LEVEL_NODE_RES)
2143 if iobj.disk_template != constants.DT_DISKLESS and not self.adopt_disks:
2144 # we need to set the disks ID to the primary node, since the
2145 # preceding code might or might have not done it, depending on
2146 # disk template and other options
2147 for disk in iobj.disks:
2148 self.cfg.SetDiskID(disk, pnode_name)
2149 if self.op.mode == constants.INSTANCE_CREATE:
2150 if not self.op.no_install:
2151 pause_sync = (iobj.disk_template in constants.DTS_INT_MIRROR and
2152 not self.op.wait_for_sync)
2154 feedback_fn("* pausing disk sync to install instance OS")
2155 result = self.rpc.call_blockdev_pause_resume_sync(pnode_name,
2158 for idx, success in enumerate(result.payload):
2160 logging.warn("pause-sync of instance %s for disk %d failed",
2163 feedback_fn("* running the instance OS create scripts...")
2164 # FIXME: pass debug option from opcode to backend
2166 self.rpc.call_instance_os_add(pnode_name, (iobj, None), False,
2167 self.op.debug_level)
2169 feedback_fn("* resuming disk sync")
2170 result = self.rpc.call_blockdev_pause_resume_sync(pnode_name,
2173 for idx, success in enumerate(result.payload):
2175 logging.warn("resume-sync of instance %s for disk %d failed",
2178 os_add_result.Raise("Could not add os for instance %s"
2179 " on node %s" % (instance, pnode_name))
2182 if self.op.mode == constants.INSTANCE_IMPORT:
2183 feedback_fn("* running the instance OS import scripts...")
2187 for idx, image in enumerate(self.src_images):
2191 # FIXME: pass debug option from opcode to backend
2192 dt = masterd.instance.DiskTransfer("disk/%s" % idx,
2193 constants.IEIO_FILE, (image, ),
2194 constants.IEIO_SCRIPT,
2195 (iobj.disks[idx], idx),
2197 transfers.append(dt)
2200 masterd.instance.TransferInstanceData(self, feedback_fn,
2201 self.op.src_node, pnode_name,
2202 self.pnode.secondary_ip,
2204 if not compat.all(import_result):
2205 self.LogWarning("Some disks for instance %s on node %s were not"
2206 " imported successfully" % (instance, pnode_name))
2208 rename_from = self._old_instance_name
2210 elif self.op.mode == constants.INSTANCE_REMOTE_IMPORT:
2211 feedback_fn("* preparing remote import...")
2212 # The source cluster will stop the instance before attempting to make
2213 # a connection. In some cases stopping an instance can take a long
2214 # time, hence the shutdown timeout is added to the connection
2216 connect_timeout = (constants.RIE_CONNECT_TIMEOUT +
2217 self.op.source_shutdown_timeout)
2218 timeouts = masterd.instance.ImportExportTimeouts(connect_timeout)
2220 assert iobj.primary_node == self.pnode.name
2222 masterd.instance.RemoteImport(self, feedback_fn, iobj, self.pnode,
2223 self.source_x509_ca,
2224 self._cds, timeouts)
2225 if not compat.all(disk_results):
2226 # TODO: Should the instance still be started, even if some disks
2227 # failed to import (valid for local imports, too)?
2228 self.LogWarning("Some disks for instance %s on node %s were not"
2229 " imported successfully" % (instance, pnode_name))
2231 rename_from = self.source_instance_name
2234 # also checked in the prereq part
2235 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
2238 # Run rename script on newly imported instance
2239 assert iobj.name == instance
2240 feedback_fn("Running rename script for %s" % instance)
2241 result = self.rpc.call_instance_run_rename(pnode_name, iobj,
2243 self.op.debug_level)
2245 self.LogWarning("Failed to run rename script for %s on node"
2246 " %s: %s" % (instance, pnode_name, result.fail_msg))
2248 assert not self.owned_locks(locking.LEVEL_NODE_RES)
2251 iobj.admin_state = constants.ADMINST_UP
2252 self.cfg.Update(iobj, feedback_fn)
2253 logging.info("Starting instance %s on node %s", instance, pnode_name)
2254 feedback_fn("* starting instance...")
2255 result = self.rpc.call_instance_start(pnode_name, (iobj, None, None),
2256 False, self.op.reason)
2257 result.Raise("Could not start instance")
2259 return list(iobj.all_nodes)
2262 def _GetInstanceInfoText(instance):
2263 """Compute that text that should be added to the disk's metadata.
2266 return "originstname+%s" % instance.name
2269 class LUInstanceRename(LogicalUnit):
2270 """Rename an instance.
2273 HPATH = "instance-rename"
2274 HTYPE = constants.HTYPE_INSTANCE
2276 def CheckArguments(self):
2280 if self.op.ip_check and not self.op.name_check:
2281 # TODO: make the ip check more flexible and not depend on the name check
2282 raise errors.OpPrereqError("IP address check requires a name check",
2285 def BuildHooksEnv(self):
2288 This runs on master, primary and secondary nodes of the instance.
2291 env = _BuildInstanceHookEnvByObject(self, self.instance)
2292 env["INSTANCE_NEW_NAME"] = self.op.new_name
2295 def BuildHooksNodes(self):
2296 """Build hooks nodes.
2299 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2302 def CheckPrereq(self):
2303 """Check prerequisites.
2305 This checks that the instance is in the cluster and is not running.
2308 self.op.instance_name = _ExpandInstanceName(self.cfg,
2309 self.op.instance_name)
2310 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2311 assert instance is not None
2312 _CheckNodeOnline(self, instance.primary_node)
2313 _CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
2314 msg="cannot rename")
2315 self.instance = instance
2317 new_name = self.op.new_name
2318 if self.op.name_check:
2319 hostname = _CheckHostnameSane(self, new_name)
2320 new_name = self.op.new_name = hostname.name
2321 if (self.op.ip_check and
2322 netutils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT)):
2323 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2324 (hostname.ip, new_name),
2325 errors.ECODE_NOTUNIQUE)
2327 instance_list = self.cfg.GetInstanceList()
2328 if new_name in instance_list and new_name != instance.name:
2329 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2330 new_name, errors.ECODE_EXISTS)
2332 def Exec(self, feedback_fn):
2333 """Rename the instance.
2336 inst = self.instance
2337 old_name = inst.name
2339 rename_file_storage = False
2340 if (inst.disk_template in constants.DTS_FILEBASED and
2341 self.op.new_name != inst.name):
2342 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2343 rename_file_storage = True
2345 self.cfg.RenameInstance(inst.name, self.op.new_name)
2346 # Change the instance lock. This is definitely safe while we hold the BGL.
2347 # Otherwise the new lock would have to be added in acquired mode.
2349 assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER)
2350 self.glm.remove(locking.LEVEL_INSTANCE, old_name)
2351 self.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2353 # re-read the instance from the configuration after rename
2354 inst = self.cfg.GetInstanceInfo(self.op.new_name)
2356 if rename_file_storage:
2357 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2358 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2359 old_file_storage_dir,
2360 new_file_storage_dir)
2361 result.Raise("Could not rename on node %s directory '%s' to '%s'"
2362 " (but the instance has been renamed in Ganeti)" %
2363 (inst.primary_node, old_file_storage_dir,
2364 new_file_storage_dir))
2366 _StartInstanceDisks(self, inst, None)
2367 # update info on disks
2368 info = _GetInstanceInfoText(inst)
2369 for (idx, disk) in enumerate(inst.disks):
2370 for node in inst.all_nodes:
2371 self.cfg.SetDiskID(disk, node)
2372 result = self.rpc.call_blockdev_setinfo(node, disk, info)
2374 self.LogWarning("Error setting info on node %s for disk %s: %s",
2375 node, idx, result.fail_msg)
2377 result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
2378 old_name, self.op.debug_level)
2379 msg = result.fail_msg
2381 msg = ("Could not run OS rename script for instance %s on node %s"
2382 " (but the instance has been renamed in Ganeti): %s" %
2383 (inst.name, inst.primary_node, msg))
2384 self.LogWarning(msg)
2386 _ShutdownInstanceDisks(self, inst)
2391 class LUInstanceRemove(LogicalUnit):
2392 """Remove an instance.
2395 HPATH = "instance-remove"
2396 HTYPE = constants.HTYPE_INSTANCE
2399 def ExpandNames(self):
2400 self._ExpandAndLockInstance()
2401 self.needed_locks[locking.LEVEL_NODE] = []
2402 self.needed_locks[locking.LEVEL_NODE_RES] = []
2403 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2405 def DeclareLocks(self, level):
2406 if level == locking.LEVEL_NODE:
2407 self._LockInstancesNodes()
2408 elif level == locking.LEVEL_NODE_RES:
2410 self.needed_locks[locking.LEVEL_NODE_RES] = \
2411 _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
2413 def BuildHooksEnv(self):
2416 This runs on master, primary and secondary nodes of the instance.
2419 env = _BuildInstanceHookEnvByObject(self, self.instance)
2420 env["SHUTDOWN_TIMEOUT"] = self.op.shutdown_timeout
2423 def BuildHooksNodes(self):
2424 """Build hooks nodes.
2427 nl = [self.cfg.GetMasterNode()]
2428 nl_post = list(self.instance.all_nodes) + nl
2429 return (nl, nl_post)
2431 def CheckPrereq(self):
2432 """Check prerequisites.
2434 This checks that the instance is in the cluster.
2437 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2438 assert self.instance is not None, \
2439 "Cannot retrieve locked instance %s" % self.op.instance_name
2441 def Exec(self, feedback_fn):
2442 """Remove the instance.
2445 instance = self.instance
2446 logging.info("Shutting down instance %s on node %s",
2447 instance.name, instance.primary_node)
2449 result = self.rpc.call_instance_shutdown(instance.primary_node, instance,
2450 self.op.shutdown_timeout,
2452 msg = result.fail_msg
2454 if self.op.ignore_failures:
2455 feedback_fn("Warning: can't shutdown instance: %s" % msg)
2457 raise errors.OpExecError("Could not shutdown instance %s on"
2459 (instance.name, instance.primary_node, msg))
2461 assert (self.owned_locks(locking.LEVEL_NODE) ==
2462 self.owned_locks(locking.LEVEL_NODE_RES))
2463 assert not (set(instance.all_nodes) -
2464 self.owned_locks(locking.LEVEL_NODE)), \
2465 "Not owning correct locks"
2467 _RemoveInstance(self, feedback_fn, instance, self.op.ignore_failures)
2470 def _CheckInstanceBridgesExist(lu, instance, node=None):
2471 """Check that the brigdes needed by an instance exist.
2475 node = instance.primary_node
2476 _CheckNicsBridgesExist(lu, instance.nics, node)
2479 def _ComputeIPolicyNodeViolation(ipolicy, instance, current_group,
2481 _compute_fn=_ComputeIPolicyInstanceViolation):
2482 """Compute if instance meets the specs of the new target group.
2484 @param ipolicy: The ipolicy to verify
2485 @param instance: The instance object to verify
2486 @param current_group: The current group of the instance
2487 @param target_group: The new group of the instance
2488 @type cfg: L{config.ConfigWriter}
2489 @param cfg: Cluster configuration
2490 @param _compute_fn: The function to verify ipolicy (unittest only)
2491 @see: L{_ComputeIPolicySpecViolation}
2494 if current_group == target_group:
2497 return _compute_fn(ipolicy, instance, cfg)
2500 def _CheckTargetNodeIPolicy(lu, ipolicy, instance, node, cfg, ignore=False,
2501 _compute_fn=_ComputeIPolicyNodeViolation):
2502 """Checks that the target node is correct in terms of instance policy.
2504 @param ipolicy: The ipolicy to verify
2505 @param instance: The instance object to verify
2506 @param node: The new node to relocate
2507 @type cfg: L{config.ConfigWriter}
2508 @param cfg: Cluster configuration
2509 @param ignore: Ignore violations of the ipolicy
2510 @param _compute_fn: The function to verify ipolicy (unittest only)
2511 @see: L{_ComputeIPolicySpecViolation}
2514 primary_node = lu.cfg.GetNodeInfo(instance.primary_node)
2515 res = _compute_fn(ipolicy, instance, primary_node.group, node.group, cfg)
2518 msg = ("Instance does not meet target node group's (%s) instance"
2519 " policy: %s") % (node.group, utils.CommaJoin(res))
2523 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
2526 class LUInstanceMove(LogicalUnit):
2527 """Move an instance by data-copying.
2530 HPATH = "instance-move"
2531 HTYPE = constants.HTYPE_INSTANCE
2534 def ExpandNames(self):
2535 self._ExpandAndLockInstance()
2536 target_node = _ExpandNodeName(self.cfg, self.op.target_node)
2537 self.op.target_node = target_node
2538 self.needed_locks[locking.LEVEL_NODE] = [target_node]
2539 self.needed_locks[locking.LEVEL_NODE_RES] = []
2540 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
2542 def DeclareLocks(self, level):
2543 if level == locking.LEVEL_NODE:
2544 self._LockInstancesNodes(primary_only=True)
2545 elif level == locking.LEVEL_NODE_RES:
2547 self.needed_locks[locking.LEVEL_NODE_RES] = \
2548 _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
2550 def BuildHooksEnv(self):
2553 This runs on master, primary and secondary nodes of the instance.
2557 "TARGET_NODE": self.op.target_node,
2558 "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
2560 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2563 def BuildHooksNodes(self):
2564 """Build hooks nodes.
2568 self.cfg.GetMasterNode(),
2569 self.instance.primary_node,
2570 self.op.target_node,
2574 def CheckPrereq(self):
2575 """Check prerequisites.
2577 This checks that the instance is in the cluster.
2580 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2581 assert self.instance is not None, \
2582 "Cannot retrieve locked instance %s" % self.op.instance_name
2584 if instance.disk_template not in constants.DTS_COPYABLE:
2585 raise errors.OpPrereqError("Disk template %s not suitable for copying" %
2586 instance.disk_template, errors.ECODE_STATE)
2588 node = self.cfg.GetNodeInfo(self.op.target_node)
2589 assert node is not None, \
2590 "Cannot retrieve locked node %s" % self.op.target_node
2592 self.target_node = target_node = node.name
2594 if target_node == instance.primary_node:
2595 raise errors.OpPrereqError("Instance %s is already on the node %s" %
2596 (instance.name, target_node),
2599 bep = self.cfg.GetClusterInfo().FillBE(instance)
2601 for idx, dsk in enumerate(instance.disks):
2602 if dsk.dev_type not in (constants.LD_LV, constants.LD_FILE):
2603 raise errors.OpPrereqError("Instance disk %d has a complex layout,"
2604 " cannot copy" % idx, errors.ECODE_STATE)
2606 _CheckNodeOnline(self, target_node)
2607 _CheckNodeNotDrained(self, target_node)
2608 _CheckNodeVmCapable(self, target_node)
2609 cluster = self.cfg.GetClusterInfo()
2610 group_info = self.cfg.GetNodeGroup(node.group)
2611 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, group_info)
2612 _CheckTargetNodeIPolicy(self, ipolicy, instance, node, self.cfg,
2613 ignore=self.op.ignore_ipolicy)
2615 if instance.admin_state == constants.ADMINST_UP:
2616 # check memory requirements on the secondary node
2617 _CheckNodeFreeMemory(self, target_node,
2618 "failing over instance %s" %
2619 instance.name, bep[constants.BE_MAXMEM],
2620 instance.hypervisor)
2622 self.LogInfo("Not checking memory on the secondary node as"
2623 " instance will not be started")
2625 # check bridge existance
2626 _CheckInstanceBridgesExist(self, instance, node=target_node)
2628 def Exec(self, feedback_fn):
2629 """Move an instance.
2631 The move is done by shutting it down on its present node, copying
2632 the data over (slow) and starting it on the new node.
2635 instance = self.instance
2637 source_node = instance.primary_node
2638 target_node = self.target_node
2640 self.LogInfo("Shutting down instance %s on source node %s",
2641 instance.name, source_node)
2643 assert (self.owned_locks(locking.LEVEL_NODE) ==
2644 self.owned_locks(locking.LEVEL_NODE_RES))
2646 result = self.rpc.call_instance_shutdown(source_node, instance,
2647 self.op.shutdown_timeout,
2649 msg = result.fail_msg
2651 if self.op.ignore_consistency:
2652 self.LogWarning("Could not shutdown instance %s on node %s."
2653 " Proceeding anyway. Please make sure node"
2654 " %s is down. Error details: %s",
2655 instance.name, source_node, source_node, msg)
2657 raise errors.OpExecError("Could not shutdown instance %s on"
2659 (instance.name, source_node, msg))
2661 # create the target disks
2663 _CreateDisks(self, instance, target_node=target_node)
2664 except errors.OpExecError:
2665 self.LogWarning("Device creation failed")
2666 self.cfg.ReleaseDRBDMinors(instance.name)
2669 cluster_name = self.cfg.GetClusterInfo().cluster_name
2672 # activate, get path, copy the data over
2673 for idx, disk in enumerate(instance.disks):
2674 self.LogInfo("Copying data for disk %d", idx)
2675 result = self.rpc.call_blockdev_assemble(target_node, (disk, instance),
2676 instance.name, True, idx)
2678 self.LogWarning("Can't assemble newly created disk %d: %s",
2679 idx, result.fail_msg)
2680 errs.append(result.fail_msg)
2682 dev_path = result.payload
2683 result = self.rpc.call_blockdev_export(source_node, (disk, instance),
2684 target_node, dev_path,
2687 self.LogWarning("Can't copy data over for disk %d: %s",
2688 idx, result.fail_msg)
2689 errs.append(result.fail_msg)
2693 self.LogWarning("Some disks failed to copy, aborting")
2695 _RemoveDisks(self, instance, target_node=target_node)
2697 self.cfg.ReleaseDRBDMinors(instance.name)
2698 raise errors.OpExecError("Errors during disk copy: %s" %
2701 instance.primary_node = target_node
2702 self.cfg.Update(instance, feedback_fn)
2704 self.LogInfo("Removing the disks on the original node")
2705 _RemoveDisks(self, instance, target_node=source_node)
2707 # Only start the instance if it's marked as up
2708 if instance.admin_state == constants.ADMINST_UP:
2709 self.LogInfo("Starting instance %s on node %s",
2710 instance.name, target_node)
2712 disks_ok, _ = _AssembleInstanceDisks(self, instance,
2713 ignore_secondaries=True)
2715 _ShutdownInstanceDisks(self, instance)
2716 raise errors.OpExecError("Can't activate the instance's disks")
2718 result = self.rpc.call_instance_start(target_node,
2719 (instance, None, None), False,
2721 msg = result.fail_msg
2723 _ShutdownInstanceDisks(self, instance)
2724 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
2725 (instance.name, target_node, msg))
2728 def _GetInstanceConsole(cluster, instance):
2729 """Returns console information for an instance.
2731 @type cluster: L{objects.Cluster}
2732 @type instance: L{objects.Instance}
2736 hyper = hypervisor.GetHypervisorClass(instance.hypervisor)
2737 # beparams and hvparams are passed separately, to avoid editing the
2738 # instance and then saving the defaults in the instance itself.
2739 hvparams = cluster.FillHV(instance)
2740 beparams = cluster.FillBE(instance)
2741 console = hyper.GetInstanceConsole(instance, hvparams, beparams)
2743 assert console.instance == instance.name
2744 assert console.Validate()
2746 return console.ToDict()
2749 class _InstanceQuery(_QueryBase):
2750 FIELDS = query.INSTANCE_FIELDS
2752 def ExpandNames(self, lu):
2753 lu.needed_locks = {}
2754 lu.share_locks = _ShareAll()
2757 self.wanted = _GetWantedInstances(lu, self.names)
2759 self.wanted = locking.ALL_SET
2761 self.do_locking = (self.use_locking and
2762 query.IQ_LIVE in self.requested_data)
2764 lu.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2765 lu.needed_locks[locking.LEVEL_NODEGROUP] = []
2766 lu.needed_locks[locking.LEVEL_NODE] = []
2767 lu.needed_locks[locking.LEVEL_NETWORK] = []
2768 lu.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2770 self.do_grouplocks = (self.do_locking and
2771 query.IQ_NODES in self.requested_data)
2773 def DeclareLocks(self, lu, level):
2775 if level == locking.LEVEL_NODEGROUP and self.do_grouplocks:
2776 assert not lu.needed_locks[locking.LEVEL_NODEGROUP]
2778 # Lock all groups used by instances optimistically; this requires going
2779 # via the node before it's locked, requiring verification later on
2780 lu.needed_locks[locking.LEVEL_NODEGROUP] = \
2782 for instance_name in lu.owned_locks(locking.LEVEL_INSTANCE)
2783 for group_uuid in lu.cfg.GetInstanceNodeGroups(instance_name))
2784 elif level == locking.LEVEL_NODE:
2785 lu._LockInstancesNodes() # pylint: disable=W0212
2787 elif level == locking.LEVEL_NETWORK:
2788 lu.needed_locks[locking.LEVEL_NETWORK] = \
2790 for instance_name in lu.owned_locks(locking.LEVEL_INSTANCE)
2791 for net_uuid in lu.cfg.GetInstanceNetworks(instance_name))
2794 def _CheckGroupLocks(lu):
2795 owned_instances = frozenset(lu.owned_locks(locking.LEVEL_INSTANCE))
2796 owned_groups = frozenset(lu.owned_locks(locking.LEVEL_NODEGROUP))
2798 # Check if node groups for locked instances are still correct
2799 for instance_name in owned_instances:
2800 _CheckInstanceNodeGroups(lu.cfg, instance_name, owned_groups)
2802 def _GetQueryData(self, lu):
2803 """Computes the list of instances and their attributes.
2806 if self.do_grouplocks:
2807 self._CheckGroupLocks(lu)
2809 cluster = lu.cfg.GetClusterInfo()
2810 all_info = lu.cfg.GetAllInstancesInfo()
2812 instance_names = self._GetNames(lu, all_info.keys(), locking.LEVEL_INSTANCE)
2814 instance_list = [all_info[name] for name in instance_names]
2815 nodes = frozenset(itertools.chain(*(inst.all_nodes
2816 for inst in instance_list)))
2817 hv_list = list(set([inst.hypervisor for inst in instance_list]))
2820 wrongnode_inst = set()
2822 # Gather data as requested
2823 if self.requested_data & set([query.IQ_LIVE, query.IQ_CONSOLE]):
2825 node_data = lu.rpc.call_all_instances_info(nodes, hv_list)
2827 result = node_data[name]
2829 # offline nodes will be in both lists
2830 assert result.fail_msg
2831 offline_nodes.append(name)
2833 bad_nodes.append(name)
2834 elif result.payload:
2835 for inst in result.payload:
2836 if inst in all_info:
2837 if all_info[inst].primary_node == name:
2838 live_data.update(result.payload)
2840 wrongnode_inst.add(inst)
2842 # orphan instance; we don't list it here as we don't
2843 # handle this case yet in the output of instance listing
2844 logging.warning("Orphan instance '%s' found on node %s",
2846 # else no instance is alive
2850 if query.IQ_DISKUSAGE in self.requested_data:
2851 gmi = ganeti.masterd.instance
2852 disk_usage = dict((inst.name,
2853 gmi.ComputeDiskSize(inst.disk_template,
2854 [{constants.IDISK_SIZE: disk.size}
2855 for disk in inst.disks]))
2856 for inst in instance_list)
2860 if query.IQ_CONSOLE in self.requested_data:
2862 for inst in instance_list:
2863 if inst.name in live_data:
2864 # Instance is running
2865 consinfo[inst.name] = _GetInstanceConsole(cluster, inst)
2867 consinfo[inst.name] = None
2868 assert set(consinfo.keys()) == set(instance_names)
2872 if query.IQ_NODES in self.requested_data:
2873 node_names = set(itertools.chain(*map(operator.attrgetter("all_nodes"),
2875 nodes = dict(lu.cfg.GetMultiNodeInfo(node_names))
2876 groups = dict((uuid, lu.cfg.GetNodeGroup(uuid))
2877 for uuid in set(map(operator.attrgetter("group"),
2883 if query.IQ_NETWORKS in self.requested_data:
2884 net_uuids = itertools.chain(*(lu.cfg.GetInstanceNetworks(i.name)
2885 for i in instance_list))
2886 networks = dict((uuid, lu.cfg.GetNetwork(uuid)) for uuid in net_uuids)
2890 return query.InstanceQueryData(instance_list, lu.cfg.GetClusterInfo(),
2891 disk_usage, offline_nodes, bad_nodes,
2892 live_data, wrongnode_inst, consinfo,
2893 nodes, groups, networks)
2896 class LUInstanceQuery(NoHooksLU):
2897 """Logical unit for querying instances.
2900 # pylint: disable=W0142
2903 def CheckArguments(self):
2904 self.iq = _InstanceQuery(qlang.MakeSimpleFilter("name", self.op.names),
2905 self.op.output_fields, self.op.use_locking)
2907 def ExpandNames(self):
2908 self.iq.ExpandNames(self)
2910 def DeclareLocks(self, level):
2911 self.iq.DeclareLocks(self, level)
2913 def Exec(self, feedback_fn):
2914 return self.iq.OldStyleQuery(self)
2917 class LUInstanceQueryData(NoHooksLU):
2918 """Query runtime instance data.
2923 def ExpandNames(self):
2924 self.needed_locks = {}
2926 # Use locking if requested or when non-static information is wanted
2927 if not (self.op.static or self.op.use_locking):
2928 self.LogWarning("Non-static data requested, locks need to be acquired")
2929 self.op.use_locking = True
2931 if self.op.instances or not self.op.use_locking:
2932 # Expand instance names right here
2933 self.wanted_names = _GetWantedInstances(self, self.op.instances)
2935 # Will use acquired locks
2936 self.wanted_names = None
2938 if self.op.use_locking:
2939 self.share_locks = _ShareAll()
2941 if self.wanted_names is None:
2942 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
2944 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
2946 self.needed_locks[locking.LEVEL_NODEGROUP] = []
2947 self.needed_locks[locking.LEVEL_NODE] = []
2948 self.needed_locks[locking.LEVEL_NETWORK] = []
2949 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2951 def DeclareLocks(self, level):
2952 if self.op.use_locking:
2953 owned_instances = self.owned_locks(locking.LEVEL_INSTANCE)
2954 if level == locking.LEVEL_NODEGROUP:
2956 # Lock all groups used by instances optimistically; this requires going
2957 # via the node before it's locked, requiring verification later on
2958 self.needed_locks[locking.LEVEL_NODEGROUP] = \
2959 frozenset(group_uuid
2960 for instance_name in owned_instances
2962 self.cfg.GetInstanceNodeGroups(instance_name))
2964 elif level == locking.LEVEL_NODE:
2965 self._LockInstancesNodes()
2967 elif level == locking.LEVEL_NETWORK:
2968 self.needed_locks[locking.LEVEL_NETWORK] = \
2970 for instance_name in owned_instances
2972 self.cfg.GetInstanceNetworks(instance_name))
2974 def CheckPrereq(self):
2975 """Check prerequisites.
2977 This only checks the optional instance list against the existing names.
2980 owned_instances = frozenset(self.owned_locks(locking.LEVEL_INSTANCE))
2981 owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP))
2982 owned_nodes = frozenset(self.owned_locks(locking.LEVEL_NODE))
2983 owned_networks = frozenset(self.owned_locks(locking.LEVEL_NETWORK))
2985 if self.wanted_names is None:
2986 assert self.op.use_locking, "Locking was not used"
2987 self.wanted_names = owned_instances
2989 instances = dict(self.cfg.GetMultiInstanceInfo(self.wanted_names))
2991 if self.op.use_locking:
2992 _CheckInstancesNodeGroups(self.cfg, instances, owned_groups, owned_nodes,
2995 assert not (owned_instances or owned_groups or
2996 owned_nodes or owned_networks)
2998 self.wanted_instances = instances.values()
3000 def _ComputeBlockdevStatus(self, node, instance, dev):
3001 """Returns the status of a block device
3004 if self.op.static or not node:
3007 self.cfg.SetDiskID(dev, node)
3009 result = self.rpc.call_blockdev_find(node, dev)
3013 result.Raise("Can't compute disk status for %s" % instance.name)
3015 status = result.payload
3019 return (status.dev_path, status.major, status.minor,
3020 status.sync_percent, status.estimated_time,
3021 status.is_degraded, status.ldisk_status)
3023 def _ComputeDiskStatus(self, instance, snode, dev):
3024 """Compute block device status.
3027 (anno_dev,) = _AnnotateDiskParams(instance, [dev], self.cfg)
3029 return self._ComputeDiskStatusInner(instance, snode, anno_dev)
3031 def _ComputeDiskStatusInner(self, instance, snode, dev):
3032 """Compute block device status.
3034 @attention: The device has to be annotated already.
3037 if dev.dev_type in constants.LDS_DRBD:
3038 # we change the snode then (otherwise we use the one passed in)
3039 if dev.logical_id[0] == instance.primary_node:
3040 snode = dev.logical_id[1]
3042 snode = dev.logical_id[0]
3044 dev_pstatus = self._ComputeBlockdevStatus(instance.primary_node,
3046 dev_sstatus = self._ComputeBlockdevStatus(snode, instance, dev)
3049 dev_children = map(compat.partial(self._ComputeDiskStatusInner,
3056 "iv_name": dev.iv_name,
3057 "dev_type": dev.dev_type,
3058 "logical_id": dev.logical_id,
3059 "physical_id": dev.physical_id,
3060 "pstatus": dev_pstatus,
3061 "sstatus": dev_sstatus,
3062 "children": dev_children,
3069 def Exec(self, feedback_fn):
3070 """Gather and return data"""
3073 cluster = self.cfg.GetClusterInfo()
3075 node_names = itertools.chain(*(i.all_nodes for i in self.wanted_instances))
3076 nodes = dict(self.cfg.GetMultiNodeInfo(node_names))
3078 groups = dict(self.cfg.GetMultiNodeGroupInfo(node.group
3079 for node in nodes.values()))
3081 group2name_fn = lambda uuid: groups[uuid].name
3082 for instance in self.wanted_instances:
3083 pnode = nodes[instance.primary_node]
3085 if self.op.static or pnode.offline:
3088 self.LogWarning("Primary node %s is marked offline, returning static"
3089 " information only for instance %s" %
3090 (pnode.name, instance.name))
3092 remote_info = self.rpc.call_instance_info(instance.primary_node,
3094 instance.hypervisor)
3095 remote_info.Raise("Error checking node %s" % instance.primary_node)
3096 remote_info = remote_info.payload
3097 if remote_info and "state" in remote_info:
3100 if instance.admin_state == constants.ADMINST_UP:
3101 remote_state = "down"
3103 remote_state = instance.admin_state
3105 disks = map(compat.partial(self._ComputeDiskStatus, instance, None),
3108 snodes_group_uuids = [nodes[snode_name].group
3109 for snode_name in instance.secondary_nodes]
3111 result[instance.name] = {
3112 "name": instance.name,
3113 "config_state": instance.admin_state,
3114 "run_state": remote_state,
3115 "pnode": instance.primary_node,
3116 "pnode_group_uuid": pnode.group,
3117 "pnode_group_name": group2name_fn(pnode.group),
3118 "snodes": instance.secondary_nodes,
3119 "snodes_group_uuids": snodes_group_uuids,
3120 "snodes_group_names": map(group2name_fn, snodes_group_uuids),
3122 # this happens to be the same format used for hooks
3123 "nics": _NICListToTuple(self, instance.nics),
3124 "disk_template": instance.disk_template,
3126 "hypervisor": instance.hypervisor,
3127 "network_port": instance.network_port,
3128 "hv_instance": instance.hvparams,
3129 "hv_actual": cluster.FillHV(instance, skip_globals=True),
3130 "be_instance": instance.beparams,
3131 "be_actual": cluster.FillBE(instance),
3132 "os_instance": instance.osparams,
3133 "os_actual": cluster.SimpleFillOS(instance.os, instance.osparams),
3134 "serial_no": instance.serial_no,
3135 "mtime": instance.mtime,
3136 "ctime": instance.ctime,
3137 "uuid": instance.uuid,
3143 class LUInstanceRecreateDisks(LogicalUnit):
3144 """Recreate an instance's missing disks.
3147 HPATH = "instance-recreate-disks"
3148 HTYPE = constants.HTYPE_INSTANCE
3151 _MODIFYABLE = compat.UniqueFrozenset([
3152 constants.IDISK_SIZE,
3153 constants.IDISK_MODE,
3156 # New or changed disk parameters may have different semantics
3157 assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
3158 constants.IDISK_ADOPT,
3160 # TODO: Implement support changing VG while recreating
3162 constants.IDISK_METAVG,
3163 constants.IDISK_PROVIDER,
3164 constants.IDISK_NAME,
3167 def _RunAllocator(self):
3168 """Run the allocator based on input opcode.
3171 be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
3174 # The allocator should actually run in "relocate" mode, but current
3175 # allocators don't support relocating all the nodes of an instance at
3176 # the same time. As a workaround we use "allocate" mode, but this is
3177 # suboptimal for two reasons:
3178 # - The instance name passed to the allocator is present in the list of
3179 # existing instances, so there could be a conflict within the
3180 # internal structures of the allocator. This doesn't happen with the
3181 # current allocators, but it's a liability.
3182 # - The allocator counts the resources used by the instance twice: once
3183 # because the instance exists already, and once because it tries to
3184 # allocate a new instance.
3185 # The allocator could choose some of the nodes on which the instance is
3186 # running, but that's not a problem. If the instance nodes are broken,
3187 # they should be already be marked as drained or offline, and hence
3188 # skipped by the allocator. If instance disks have been lost for other
3189 # reasons, then recreating the disks on the same nodes should be fine.
3190 disk_template = self.instance.disk_template
3191 spindle_use = be_full[constants.BE_SPINDLE_USE]
3192 req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
3193 disk_template=disk_template,
3194 tags=list(self.instance.GetTags()),
3195 os=self.instance.os,
3197 vcpus=be_full[constants.BE_VCPUS],
3198 memory=be_full[constants.BE_MAXMEM],
3199 spindle_use=spindle_use,
3200 disks=[{constants.IDISK_SIZE: d.size,
3201 constants.IDISK_MODE: d.mode}
3202 for d in self.instance.disks],
3203 hypervisor=self.instance.hypervisor,
3204 node_whitelist=None)
3205 ial = iallocator.IAllocator(self.cfg, self.rpc, req)
3207 ial.Run(self.op.iallocator)
3209 assert req.RequiredNodes() == len(self.instance.all_nodes)
3212 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
3213 " %s" % (self.op.iallocator, ial.info),
3216 self.op.nodes = ial.result
3217 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
3218 self.op.instance_name, self.op.iallocator,
3219 utils.CommaJoin(ial.result))
3221 def CheckArguments(self):
3222 if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
3223 # Normalize and convert deprecated list of disk indices
3224 self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
3226 duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
3228 raise errors.OpPrereqError("Some disks have been specified more than"
3229 " once: %s" % utils.CommaJoin(duplicates),
3232 # We don't want _CheckIAllocatorOrNode selecting the default iallocator
3233 # when neither iallocator nor nodes are specified
3234 if self.op.iallocator or self.op.nodes:
3235 _CheckIAllocatorOrNode(self, "iallocator", "nodes")
3237 for (idx, params) in self.op.disks:
3238 utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
3239 unsupported = frozenset(params.keys()) - self._MODIFYABLE
3241 raise errors.OpPrereqError("Parameters for disk %s try to change"
3242 " unmodifyable parameter(s): %s" %
3243 (idx, utils.CommaJoin(unsupported)),
3246 def ExpandNames(self):
3247 self._ExpandAndLockInstance()
3248 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3251 self.op.nodes = [_ExpandNodeName(self.cfg, n) for n in self.op.nodes]
3252 self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes)
3254 self.needed_locks[locking.LEVEL_NODE] = []
3255 if self.op.iallocator:
3256 # iallocator will select a new node in the same group
3257 self.needed_locks[locking.LEVEL_NODEGROUP] = []
3258 self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
3260 self.needed_locks[locking.LEVEL_NODE_RES] = []
3262 def DeclareLocks(self, level):
3263 if level == locking.LEVEL_NODEGROUP:
3264 assert self.op.iallocator is not None
3265 assert not self.op.nodes
3266 assert not self.needed_locks[locking.LEVEL_NODEGROUP]
3267 self.share_locks[locking.LEVEL_NODEGROUP] = 1
3268 # Lock the primary group used by the instance optimistically; this
3269 # requires going via the node before it's locked, requiring
3270 # verification later on
3271 self.needed_locks[locking.LEVEL_NODEGROUP] = \
3272 self.cfg.GetInstanceNodeGroups(self.op.instance_name, primary_only=True)
3274 elif level == locking.LEVEL_NODE:
3275 # If an allocator is used, then we lock all the nodes in the current
3276 # instance group, as we don't know yet which ones will be selected;
3277 # if we replace the nodes without using an allocator, locks are
3278 # already declared in ExpandNames; otherwise, we need to lock all the
3279 # instance nodes for disk re-creation
3280 if self.op.iallocator:
3281 assert not self.op.nodes
3282 assert not self.needed_locks[locking.LEVEL_NODE]
3283 assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
3285 # Lock member nodes of the group of the primary node
3286 for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
3287 self.needed_locks[locking.LEVEL_NODE].extend(
3288 self.cfg.GetNodeGroup(group_uuid).members)
3290 assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
3291 elif not self.op.nodes:
3292 self._LockInstancesNodes(primary_only=False)
3293 elif level == locking.LEVEL_NODE_RES:
3295 self.needed_locks[locking.LEVEL_NODE_RES] = \
3296 _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
3298 def BuildHooksEnv(self):
3301 This runs on master, primary and secondary nodes of the instance.
3304 return _BuildInstanceHookEnvByObject(self, self.instance)
3306 def BuildHooksNodes(self):
3307 """Build hooks nodes.
3310 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3313 def CheckPrereq(self):
3314 """Check prerequisites.
3316 This checks that the instance is in the cluster and is not running.
3319 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3320 assert instance is not None, \
3321 "Cannot retrieve locked instance %s" % self.op.instance_name
3323 if len(self.op.nodes) != len(instance.all_nodes):
3324 raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
3325 " %d replacement nodes were specified" %
3326 (instance.name, len(instance.all_nodes),
3327 len(self.op.nodes)),
3329 assert instance.disk_template != constants.DT_DRBD8 or \
3330 len(self.op.nodes) == 2
3331 assert instance.disk_template != constants.DT_PLAIN or \
3332 len(self.op.nodes) == 1
3333 primary_node = self.op.nodes[0]
3335 primary_node = instance.primary_node
3336 if not self.op.iallocator:
3337 _CheckNodeOnline(self, primary_node)
3339 if instance.disk_template == constants.DT_DISKLESS:
3340 raise errors.OpPrereqError("Instance '%s' has no disks" %
3341 self.op.instance_name, errors.ECODE_INVAL)
3343 # Verify if node group locks are still correct
3344 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
3346 # Node group locks are acquired only for the primary node (and only
3347 # when the allocator is used)
3348 _CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups,
3351 # if we replace nodes *and* the old primary is offline, we don't
3352 # check the instance state
3353 old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
3354 if not ((self.op.iallocator or self.op.nodes) and old_pnode.offline):
3355 _CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
3356 msg="cannot recreate disks")
3359 self.disks = dict(self.op.disks)
3361 self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
3363 maxidx = max(self.disks.keys())
3364 if maxidx >= len(instance.disks):
3365 raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
3368 if ((self.op.nodes or self.op.iallocator) and
3369 sorted(self.disks.keys()) != range(len(instance.disks))):
3370 raise errors.OpPrereqError("Can't recreate disks partially and"
3371 " change the nodes at the same time",
3374 self.instance = instance
3376 if self.op.iallocator:
3377 self._RunAllocator()
3378 # Release unneeded node and node resource locks
3379 _ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.nodes)
3380 _ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.nodes)
3381 _ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
3383 assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
3385 def Exec(self, feedback_fn):
3386 """Recreate the disks.
3389 instance = self.instance
3391 assert (self.owned_locks(locking.LEVEL_NODE) ==
3392 self.owned_locks(locking.LEVEL_NODE_RES))
3395 mods = [] # keeps track of needed changes
3397 for idx, disk in enumerate(instance.disks):
3399 changes = self.disks[idx]
3401 # Disk should not be recreated
3405 # update secondaries for disks, if needed
3406 if self.op.nodes and disk.dev_type == constants.LD_DRBD8:
3407 # need to update the nodes and minors
3408 assert len(self.op.nodes) == 2
3409 assert len(disk.logical_id) == 6 # otherwise disk internals
3411 (_, _, old_port, _, _, old_secret) = disk.logical_id
3412 new_minors = self.cfg.AllocateDRBDMinor(self.op.nodes, instance.name)
3413 new_id = (self.op.nodes[0], self.op.nodes[1], old_port,
3414 new_minors[0], new_minors[1], old_secret)
3415 assert len(disk.logical_id) == len(new_id)
3419 mods.append((idx, new_id, changes))
3421 # now that we have passed all asserts above, we can apply the mods
3422 # in a single run (to avoid partial changes)
3423 for idx, new_id, changes in mods:
3424 disk = instance.disks[idx]
3425 if new_id is not None:
3426 assert disk.dev_type == constants.LD_DRBD8
3427 disk.logical_id = new_id
3429 disk.Update(size=changes.get(constants.IDISK_SIZE, None),
3430 mode=changes.get(constants.IDISK_MODE, None))
3432 # change primary node, if needed
3434 instance.primary_node = self.op.nodes[0]
3435 self.LogWarning("Changing the instance's nodes, you will have to"
3436 " remove any disks left on the older nodes manually")
3439 self.cfg.Update(instance, feedback_fn)
3441 # All touched nodes must be locked
3442 mylocks = self.owned_locks(locking.LEVEL_NODE)
3443 assert mylocks.issuperset(frozenset(instance.all_nodes))
3444 _CreateDisks(self, instance, to_skip=to_skip)
3447 def _SafeShutdownInstanceDisks(lu, instance, disks=None):
3448 """Shutdown block devices of an instance.
3450 This function checks if an instance is running, before calling
3451 _ShutdownInstanceDisks.
3454 _CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
3455 _ShutdownInstanceDisks(lu, instance, disks=disks)
3458 def _DiskSizeInBytesToMebibytes(lu, size):
3459 """Converts a disk size in bytes to mebibytes.
3461 Warns and rounds up if the size isn't an even multiple of 1 MiB.
3464 (mib, remainder) = divmod(size, 1024 * 1024)
3467 lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
3468 " to not overwrite existing data (%s bytes will not be"
3469 " wiped)", (1024 * 1024) - remainder)
3475 class LUInstanceGrowDisk(LogicalUnit):
3476 """Grow a disk of an instance.
3480 HTYPE = constants.HTYPE_INSTANCE
3483 def ExpandNames(self):
3484 self._ExpandAndLockInstance()
3485 self.needed_locks[locking.LEVEL_NODE] = []
3486 self.needed_locks[locking.LEVEL_NODE_RES] = []
3487 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3488 self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
3490 def DeclareLocks(self, level):
3491 if level == locking.LEVEL_NODE:
3492 self._LockInstancesNodes()
3493 elif level == locking.LEVEL_NODE_RES:
3495 self.needed_locks[locking.LEVEL_NODE_RES] = \
3496 _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
3498 def BuildHooksEnv(self):
3501 This runs on the master, the primary and all the secondaries.
3505 "DISK": self.op.disk,
3506 "AMOUNT": self.op.amount,
3507 "ABSOLUTE": self.op.absolute,
3509 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3512 def BuildHooksNodes(self):
3513 """Build hooks nodes.
3516 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3519 def CheckPrereq(self):
3520 """Check prerequisites.
3522 This checks that the instance is in the cluster.
3525 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3526 assert instance is not None, \
3527 "Cannot retrieve locked instance %s" % self.op.instance_name
3528 nodenames = list(instance.all_nodes)
3529 for node in nodenames:
3530 _CheckNodeOnline(self, node)
3532 self.instance = instance
3534 if instance.disk_template not in constants.DTS_GROWABLE:
3535 raise errors.OpPrereqError("Instance's disk layout does not support"
3536 " growing", errors.ECODE_INVAL)
3538 self.disk = instance.FindDisk(self.op.disk)
3540 if self.op.absolute:
3541 self.target = self.op.amount
3542 self.delta = self.target - self.disk.size
3544 raise errors.OpPrereqError("Requested size (%s) is smaller than "
3545 "current disk size (%s)" %
3546 (utils.FormatUnit(self.target, "h"),
3547 utils.FormatUnit(self.disk.size, "h")),
3550 self.delta = self.op.amount
3551 self.target = self.disk.size + self.delta
3553 raise errors.OpPrereqError("Requested increment (%s) is negative" %
3554 utils.FormatUnit(self.delta, "h"),
3557 self._CheckDiskSpace(nodenames, self.disk.ComputeGrowth(self.delta))
3559 def _CheckDiskSpace(self, nodenames, req_vgspace):
3560 template = self.instance.disk_template
3561 if template not in (constants.DTS_NO_FREE_SPACE_CHECK):
3562 # TODO: check the free disk space for file, when that feature will be
3564 nodes = map(self.cfg.GetNodeInfo, nodenames)
3565 es_nodes = filter(lambda n: _IsExclusiveStorageEnabledNode(self.cfg, n),
3568 # With exclusive storage we need to something smarter than just looking
3569 # at free space; for now, let's simply abort the operation.
3570 raise errors.OpPrereqError("Cannot grow disks when exclusive_storage"
3571 " is enabled", errors.ECODE_STATE)
3572 _CheckNodesFreeDiskPerVG(self, nodenames, req_vgspace)
3574 def Exec(self, feedback_fn):
3575 """Execute disk grow.
3578 instance = self.instance
3581 assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
3582 assert (self.owned_locks(locking.LEVEL_NODE) ==
3583 self.owned_locks(locking.LEVEL_NODE_RES))
3585 wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
3587 disks_ok, _ = _AssembleInstanceDisks(self, self.instance, disks=[disk])
3589 raise errors.OpExecError("Cannot activate block device to grow")
3591 feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
3592 (self.op.disk, instance.name,
3593 utils.FormatUnit(self.delta, "h"),
3594 utils.FormatUnit(self.target, "h")))
3596 # First run all grow ops in dry-run mode
3597 for node in instance.all_nodes:
3598 self.cfg.SetDiskID(disk, node)
3599 result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
3601 result.Raise("Dry-run grow request failed to node %s" % node)
3604 # Get disk size from primary node for wiping
3605 result = self.rpc.call_blockdev_getsize(instance.primary_node, [disk])
3606 result.Raise("Failed to retrieve disk size from node '%s'" %
3607 instance.primary_node)
3609 (disk_size_in_bytes, ) = result.payload
3611 if disk_size_in_bytes is None:
3612 raise errors.OpExecError("Failed to retrieve disk size from primary"
3613 " node '%s'" % instance.primary_node)
3615 old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
3617 assert old_disk_size >= disk.size, \
3618 ("Retrieved disk size too small (got %s, should be at least %s)" %
3619 (old_disk_size, disk.size))
3621 old_disk_size = None
3623 # We know that (as far as we can test) operations across different
3624 # nodes will succeed, time to run it for real on the backing storage
3625 for node in instance.all_nodes:
3626 self.cfg.SetDiskID(disk, node)
3627 result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
3629 result.Raise("Grow request failed to node %s" % node)
3631 # And now execute it for logical storage, on the primary node
3632 node = instance.primary_node
3633 self.cfg.SetDiskID(disk, node)
3634 result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
3636 result.Raise("Grow request failed to node %s" % node)
3638 disk.RecordGrow(self.delta)
3639 self.cfg.Update(instance, feedback_fn)
3641 # Changes have been recorded, release node lock
3642 _ReleaseLocks(self, locking.LEVEL_NODE)
3644 # Downgrade lock while waiting for sync
3645 self.glm.downgrade(locking.LEVEL_INSTANCE)
3647 assert wipe_disks ^ (old_disk_size is None)
3650 assert instance.disks[self.op.disk] == disk
3652 # Wipe newly added disk space
3653 _WipeDisks(self, instance,
3654 disks=[(self.op.disk, disk, old_disk_size)])
3656 if self.op.wait_for_sync:
3657 disk_abort = not _WaitForSync(self, instance, disks=[disk])
3659 self.LogWarning("Disk syncing has not returned a good status; check"
3661 if instance.admin_state != constants.ADMINST_UP:
3662 _SafeShutdownInstanceDisks(self, instance, disks=[disk])
3663 elif instance.admin_state != constants.ADMINST_UP:
3664 self.LogWarning("Not shutting down the disk even if the instance is"
3665 " not supposed to be running because no wait for"
3666 " sync mode was requested")
3668 assert self.owned_locks(locking.LEVEL_NODE_RES)
3669 assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
3672 class LUInstanceReplaceDisks(LogicalUnit):
3673 """Replace the disks of an instance.
3676 HPATH = "mirrors-replace"
3677 HTYPE = constants.HTYPE_INSTANCE
3680 def CheckArguments(self):
3684 remote_node = self.op.remote_node
3685 ialloc = self.op.iallocator
3686 if self.op.mode == constants.REPLACE_DISK_CHG:
3687 if remote_node is None and ialloc is None:
3688 raise errors.OpPrereqError("When changing the secondary either an"
3689 " iallocator script must be used or the"
3690 " new node given", errors.ECODE_INVAL)
3692 _CheckIAllocatorOrNode(self, "iallocator", "remote_node")
3694 elif remote_node is not None or ialloc is not None:
3695 # Not replacing the secondary
3696 raise errors.OpPrereqError("The iallocator and new node options can"
3697 " only be used when changing the"
3698 " secondary node", errors.ECODE_INVAL)
3700 def ExpandNames(self):
3701 self._ExpandAndLockInstance()
3703 assert locking.LEVEL_NODE not in self.needed_locks
3704 assert locking.LEVEL_NODE_RES not in self.needed_locks
3705 assert locking.LEVEL_NODEGROUP not in self.needed_locks
3707 assert self.op.iallocator is None or self.op.remote_node is None, \
3708 "Conflicting options"
3710 if self.op.remote_node is not None:
3711 self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node)
3713 # Warning: do not remove the locking of the new secondary here
3714 # unless DRBD8.AddChildren is changed to work in parallel;
3715 # currently it doesn't since parallel invocations of
3716 # FindUnusedMinor will conflict
3717 self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node]
3718 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3720 self.needed_locks[locking.LEVEL_NODE] = []
3721 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3723 if self.op.iallocator is not None:
3724 # iallocator will select a new node in the same group
3725 self.needed_locks[locking.LEVEL_NODEGROUP] = []
3726 self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
3728 self.needed_locks[locking.LEVEL_NODE_RES] = []
3730 self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
3731 self.op.iallocator, self.op.remote_node,
3732 self.op.disks, self.op.early_release,
3733 self.op.ignore_ipolicy)
3735 self.tasklets = [self.replacer]
3737 def DeclareLocks(self, level):
3738 if level == locking.LEVEL_NODEGROUP:
3739 assert self.op.remote_node is None
3740 assert self.op.iallocator is not None
3741 assert not self.needed_locks[locking.LEVEL_NODEGROUP]
3743 self.share_locks[locking.LEVEL_NODEGROUP] = 1
3744 # Lock all groups used by instance optimistically; this requires going
3745 # via the node before it's locked, requiring verification later on
3746 self.needed_locks[locking.LEVEL_NODEGROUP] = \
3747 self.cfg.GetInstanceNodeGroups(self.op.instance_name)
3749 elif level == locking.LEVEL_NODE:
3750 if self.op.iallocator is not None:
3751 assert self.op.remote_node is None
3752 assert not self.needed_locks[locking.LEVEL_NODE]
3753 assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
3755 # Lock member nodes of all locked groups
3756 self.needed_locks[locking.LEVEL_NODE] = \
3758 for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
3759 for node_name in self.cfg.GetNodeGroup(group_uuid).members]
3761 assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
3763 self._LockInstancesNodes()
3765 elif level == locking.LEVEL_NODE_RES:
3767 self.needed_locks[locking.LEVEL_NODE_RES] = \
3768 self.needed_locks[locking.LEVEL_NODE]
3770 def BuildHooksEnv(self):
3773 This runs on the master, the primary and all the secondaries.
3776 instance = self.replacer.instance
3778 "MODE": self.op.mode,
3779 "NEW_SECONDARY": self.op.remote_node,
3780 "OLD_SECONDARY": instance.secondary_nodes[0],
3782 env.update(_BuildInstanceHookEnvByObject(self, instance))
3785 def BuildHooksNodes(self):
3786 """Build hooks nodes.
3789 instance = self.replacer.instance
3791 self.cfg.GetMasterNode(),
3792 instance.primary_node,
3794 if self.op.remote_node is not None:
3795 nl.append(self.op.remote_node)
3798 def CheckPrereq(self):
3799 """Check prerequisites.
3802 assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
3803 self.op.iallocator is None)
3805 # Verify if node group locks are still correct
3806 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
3808 _CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups)
3810 return LogicalUnit.CheckPrereq(self)
3813 class LUInstanceActivateDisks(NoHooksLU):
3814 """Bring up an instance's disks.
3819 def ExpandNames(self):
3820 self._ExpandAndLockInstance()
3821 self.needed_locks[locking.LEVEL_NODE] = []
3822 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3824 def DeclareLocks(self, level):
3825 if level == locking.LEVEL_NODE:
3826 self._LockInstancesNodes()
3828 def CheckPrereq(self):
3829 """Check prerequisites.
3831 This checks that the instance is in the cluster.
3834 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3835 assert self.instance is not None, \
3836 "Cannot retrieve locked instance %s" % self.op.instance_name
3837 _CheckNodeOnline(self, self.instance.primary_node)
3839 def Exec(self, feedback_fn):
3840 """Activate the disks.
3843 disks_ok, disks_info = \
3844 _AssembleInstanceDisks(self, self.instance,
3845 ignore_size=self.op.ignore_size)
3847 raise errors.OpExecError("Cannot activate block devices")
3849 if self.op.wait_for_sync:
3850 if not _WaitForSync(self, self.instance):
3851 raise errors.OpExecError("Some disks of the instance are degraded!")
3856 class LUInstanceDeactivateDisks(NoHooksLU):
3857 """Shutdown an instance's disks.
3862 def ExpandNames(self):
3863 self._ExpandAndLockInstance()
3864 self.needed_locks[locking.LEVEL_NODE] = []
3865 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3867 def DeclareLocks(self, level):
3868 if level == locking.LEVEL_NODE:
3869 self._LockInstancesNodes()
3871 def CheckPrereq(self):
3872 """Check prerequisites.
3874 This checks that the instance is in the cluster.
3877 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3878 assert self.instance is not None, \
3879 "Cannot retrieve locked instance %s" % self.op.instance_name
3881 def Exec(self, feedback_fn):
3882 """Deactivate the disks
3885 instance = self.instance
3887 _ShutdownInstanceDisks(self, instance)
3889 _SafeShutdownInstanceDisks(self, instance)
3892 class LUInstanceStartup(LogicalUnit):
3893 """Starts an instance.
3896 HPATH = "instance-start"
3897 HTYPE = constants.HTYPE_INSTANCE
3900 def CheckArguments(self):
3902 if self.op.beparams:
3903 # fill the beparams dict
3904 objects.UpgradeBeParams(self.op.beparams)
3905 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
3907 def ExpandNames(self):
3908 self._ExpandAndLockInstance()
3909 self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
3911 def DeclareLocks(self, level):
3912 if level == locking.LEVEL_NODE_RES:
3913 self._LockInstancesNodes(primary_only=True, level=locking.LEVEL_NODE_RES)
3915 def BuildHooksEnv(self):
3918 This runs on master, primary and secondary nodes of the instance.
3922 "FORCE": self.op.force,
3925 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3929 def BuildHooksNodes(self):
3930 """Build hooks nodes.
3933 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3936 def CheckPrereq(self):
3937 """Check prerequisites.
3939 This checks that the instance is in the cluster.
3942 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3943 assert self.instance is not None, \
3944 "Cannot retrieve locked instance %s" % self.op.instance_name
3947 if self.op.hvparams:
3948 # check hypervisor parameter syntax (locally)
3949 cluster = self.cfg.GetClusterInfo()
3950 utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
3951 filled_hvp = cluster.FillHV(instance)
3952 filled_hvp.update(self.op.hvparams)
3953 hv_type = hypervisor.GetHypervisorClass(instance.hypervisor)
3954 hv_type.CheckParameterSyntax(filled_hvp)
3955 _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
3957 _CheckInstanceState(self, instance, INSTANCE_ONLINE)
3959 self.primary_offline = self.cfg.GetNodeInfo(instance.primary_node).offline
3961 if self.primary_offline and self.op.ignore_offline_nodes:
3962 self.LogWarning("Ignoring offline primary node")
3964 if self.op.hvparams or self.op.beparams:
3965 self.LogWarning("Overridden parameters are ignored")
3967 _CheckNodeOnline(self, instance.primary_node)
3969 bep = self.cfg.GetClusterInfo().FillBE(instance)
3970 bep.update(self.op.beparams)
3972 # check bridges existence
3973 _CheckInstanceBridgesExist(self, instance)
3975 remote_info = self.rpc.call_instance_info(instance.primary_node,
3977 instance.hypervisor)
3978 remote_info.Raise("Error checking node %s" % instance.primary_node,
3979 prereq=True, ecode=errors.ECODE_ENVIRON)
3980 if not remote_info.payload: # not running already
3981 _CheckNodeFreeMemory(self, instance.primary_node,
3982 "starting instance %s" % instance.name,
3983 bep[constants.BE_MINMEM], instance.hypervisor)
3985 def Exec(self, feedback_fn):
3986 """Start the instance.
3989 instance = self.instance
3990 force = self.op.force
3991 reason = self.op.reason
3993 if not self.op.no_remember:
3994 self.cfg.MarkInstanceUp(instance.name)
3996 if self.primary_offline:
3997 assert self.op.ignore_offline_nodes
3998 self.LogInfo("Primary node offline, marked instance as started")
4000 node_current = instance.primary_node
4002 _StartInstanceDisks(self, instance, force)
4005 self.rpc.call_instance_start(node_current,
4006 (instance, self.op.hvparams,
4008 self.op.startup_paused, reason)
4009 msg = result.fail_msg
4011 _ShutdownInstanceDisks(self, instance)
4012 raise errors.OpExecError("Could not start instance: %s" % msg)
4015 class LUInstanceShutdown(LogicalUnit):
4016 """Shutdown an instance.
4019 HPATH = "instance-stop"
4020 HTYPE = constants.HTYPE_INSTANCE
4023 def ExpandNames(self):
4024 self._ExpandAndLockInstance()
4026 def BuildHooksEnv(self):
4029 This runs on master, primary and secondary nodes of the instance.
4032 env = _BuildInstanceHookEnvByObject(self, self.instance)
4033 env["TIMEOUT"] = self.op.timeout
4036 def BuildHooksNodes(self):
4037 """Build hooks nodes.
4040 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
4043 def CheckPrereq(self):
4044 """Check prerequisites.
4046 This checks that the instance is in the cluster.
4049 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4050 assert self.instance is not None, \
4051 "Cannot retrieve locked instance %s" % self.op.instance_name
4053 if not self.op.force:
4054 _CheckInstanceState(self, self.instance, INSTANCE_ONLINE)
4056 self.LogWarning("Ignoring offline instance check")
4058 self.primary_offline = \
4059 self.cfg.GetNodeInfo(self.instance.primary_node).offline
4061 if self.primary_offline and self.op.ignore_offline_nodes:
4062 self.LogWarning("Ignoring offline primary node")
4064 _CheckNodeOnline(self, self.instance.primary_node)
4066 def Exec(self, feedback_fn):
4067 """Shutdown the instance.
4070 instance = self.instance
4071 node_current = instance.primary_node
4072 timeout = self.op.timeout
4073 reason = self.op.reason
4075 # If the instance is offline we shouldn't mark it as down, as that
4076 # resets the offline flag.
4077 if not self.op.no_remember and instance.admin_state in INSTANCE_ONLINE:
4078 self.cfg.MarkInstanceDown(instance.name)
4080 if self.primary_offline:
4081 assert self.op.ignore_offline_nodes
4082 self.LogInfo("Primary node offline, marked instance as stopped")
4084 result = self.rpc.call_instance_shutdown(node_current, instance, timeout,
4086 msg = result.fail_msg
4088 self.LogWarning("Could not shutdown instance: %s", msg)
4090 _ShutdownInstanceDisks(self, instance)
4093 class LUInstanceReinstall(LogicalUnit):
4094 """Reinstall an instance.
4097 HPATH = "instance-reinstall"
4098 HTYPE = constants.HTYPE_INSTANCE
4101 def ExpandNames(self):
4102 self._ExpandAndLockInstance()
4104 def BuildHooksEnv(self):
4107 This runs on master, primary and secondary nodes of the instance.
4110 return _BuildInstanceHookEnvByObject(self, self.instance)
4112 def BuildHooksNodes(self):
4113 """Build hooks nodes.
4116 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
4119 def CheckPrereq(self):
4120 """Check prerequisites.
4122 This checks that the instance is in the cluster and is not running.
4125 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4126 assert instance is not None, \
4127 "Cannot retrieve locked instance %s" % self.op.instance_name
4128 _CheckNodeOnline(self, instance.primary_node, "Instance primary node"
4129 " offline, cannot reinstall")
4131 if instance.disk_template == constants.DT_DISKLESS:
4132 raise errors.OpPrereqError("Instance '%s' has no disks" %
4133 self.op.instance_name,
4135 _CheckInstanceState(self, instance, INSTANCE_DOWN, msg="cannot reinstall")
4137 if self.op.os_type is not None:
4139 pnode = _ExpandNodeName(self.cfg, instance.primary_node)
4140 _CheckNodeHasOS(self, pnode, self.op.os_type, self.op.force_variant)
4141 instance_os = self.op.os_type
4143 instance_os = instance.os
4145 nodelist = list(instance.all_nodes)
4147 if self.op.osparams:
4148 i_osdict = _GetUpdatedParams(instance.osparams, self.op.osparams)
4149 _CheckOSParams(self, True, nodelist, instance_os, i_osdict)
4150 self.os_inst = i_osdict # the new dict (without defaults)
4154 self.instance = instance
4156 def Exec(self, feedback_fn):
4157 """Reinstall the instance.
4160 inst = self.instance
4162 if self.op.os_type is not None:
4163 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
4164 inst.os = self.op.os_type
4165 # Write to configuration
4166 self.cfg.Update(inst, feedback_fn)
4168 _StartInstanceDisks(self, inst, None)
4170 feedback_fn("Running the instance OS create scripts...")
4171 # FIXME: pass debug option from opcode to backend
4172 result = self.rpc.call_instance_os_add(inst.primary_node,
4173 (inst, self.os_inst), True,
4174 self.op.debug_level)
4175 result.Raise("Could not install OS for instance %s on node %s" %
4176 (inst.name, inst.primary_node))
4178 _ShutdownInstanceDisks(self, inst)
4181 class LUInstanceReboot(LogicalUnit):
4182 """Reboot an instance.
4185 HPATH = "instance-reboot"
4186 HTYPE = constants.HTYPE_INSTANCE
4189 def ExpandNames(self):
4190 self._ExpandAndLockInstance()
4192 def BuildHooksEnv(self):
4195 This runs on master, primary and secondary nodes of the instance.
4199 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
4200 "REBOOT_TYPE": self.op.reboot_type,
4201 "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
4204 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4208 def BuildHooksNodes(self):
4209 """Build hooks nodes.
4212 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
4215 def CheckPrereq(self):
4216 """Check prerequisites.
4218 This checks that the instance is in the cluster.
4221 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4222 assert self.instance is not None, \
4223 "Cannot retrieve locked instance %s" % self.op.instance_name
4224 _CheckInstanceState(self, instance, INSTANCE_ONLINE)
4225 _CheckNodeOnline(self, instance.primary_node)
4227 # check bridges existence
4228 _CheckInstanceBridgesExist(self, instance)
4230 def Exec(self, feedback_fn):
4231 """Reboot the instance.
4234 instance = self.instance
4235 ignore_secondaries = self.op.ignore_secondaries
4236 reboot_type = self.op.reboot_type
4237 reason = self.op.reason
4239 remote_info = self.rpc.call_instance_info(instance.primary_node,
4241 instance.hypervisor)
4242 remote_info.Raise("Error checking node %s" % instance.primary_node)
4243 instance_running = bool(remote_info.payload)
4245 node_current = instance.primary_node
4247 if instance_running and reboot_type in [constants.INSTANCE_REBOOT_SOFT,
4248 constants.INSTANCE_REBOOT_HARD]:
4249 for disk in instance.disks:
4250 self.cfg.SetDiskID(disk, node_current)
4251 result = self.rpc.call_instance_reboot(node_current, instance,
4253 self.op.shutdown_timeout, reason)
4254 result.Raise("Could not reboot instance")
4256 if instance_running:
4257 result = self.rpc.call_instance_shutdown(node_current, instance,
4258 self.op.shutdown_timeout,
4260 result.Raise("Could not shutdown instance for full reboot")
4261 _ShutdownInstanceDisks(self, instance)
4263 self.LogInfo("Instance %s was already stopped, starting now",
4265 _StartInstanceDisks(self, instance, ignore_secondaries)
4266 result = self.rpc.call_instance_start(node_current,
4267 (instance, None, None), False,
4269 msg = result.fail_msg
4271 _ShutdownInstanceDisks(self, instance)
4272 raise errors.OpExecError("Could not start instance for"
4273 " full reboot: %s" % msg)
4275 self.cfg.MarkInstanceUp(instance.name)
4278 class LUInstanceConsole(NoHooksLU):
4279 """Connect to an instance's console.
4281 This is somewhat special in that it returns the command line that
4282 you need to run on the master node in order to connect to the
4288 def ExpandNames(self):
4289 self.share_locks = _ShareAll()
4290 self._ExpandAndLockInstance()
4292 def CheckPrereq(self):
4293 """Check prerequisites.
4295 This checks that the instance is in the cluster.
4298 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4299 assert self.instance is not None, \
4300 "Cannot retrieve locked instance %s" % self.op.instance_name
4301 _CheckNodeOnline(self, self.instance.primary_node)
4303 def Exec(self, feedback_fn):
4304 """Connect to the console of an instance
4307 instance = self.instance
4308 node = instance.primary_node
4310 node_insts = self.rpc.call_instance_list([node],
4311 [instance.hypervisor])[node]
4312 node_insts.Raise("Can't get node information from %s" % node)
4314 if instance.name not in node_insts.payload:
4315 if instance.admin_state == constants.ADMINST_UP:
4316 state = constants.INSTST_ERRORDOWN
4317 elif instance.admin_state == constants.ADMINST_DOWN:
4318 state = constants.INSTST_ADMINDOWN
4320 state = constants.INSTST_ADMINOFFLINE
4321 raise errors.OpExecError("Instance %s is not running (state %s)" %
4322 (instance.name, state))
4324 logging.debug("Connecting to console of %s on %s", instance.name, node)
4326 return _GetInstanceConsole(self.cfg.GetClusterInfo(), instance)
4329 def _DeclareLocksForMigration(lu, level):
4330 """Declares locks for L{TLMigrateInstance}.
4332 @type lu: L{LogicalUnit}
4333 @param level: Lock level
4336 if level == locking.LEVEL_NODE_ALLOC:
4337 assert lu.op.instance_name in lu.owned_locks(locking.LEVEL_INSTANCE)
4339 instance = lu.cfg.GetInstanceInfo(lu.op.instance_name)
4341 # Node locks are already declared here rather than at LEVEL_NODE as we need
4342 # the instance object anyway to declare the node allocation lock.
4343 if instance.disk_template in constants.DTS_EXT_MIRROR:
4344 if lu.op.target_node is None:
4345 lu.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4346 lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
4348 lu.needed_locks[locking.LEVEL_NODE] = [instance.primary_node,
4350 del lu.recalculate_locks[locking.LEVEL_NODE]
4352 lu._LockInstancesNodes() # pylint: disable=W0212
4354 elif level == locking.LEVEL_NODE:
4355 # Node locks are declared together with the node allocation lock
4356 assert (lu.needed_locks[locking.LEVEL_NODE] or
4357 lu.needed_locks[locking.LEVEL_NODE] is locking.ALL_SET)
4359 elif level == locking.LEVEL_NODE_RES:
4361 lu.needed_locks[locking.LEVEL_NODE_RES] = \
4362 _CopyLockList(lu.needed_locks[locking.LEVEL_NODE])
4365 def _ExpandNamesForMigration(lu):
4366 """Expands names for use with L{TLMigrateInstance}.
4368 @type lu: L{LogicalUnit}
4371 if lu.op.target_node is not None:
4372 lu.op.target_node = _ExpandNodeName(lu.cfg, lu.op.target_node)
4374 lu.needed_locks[locking.LEVEL_NODE] = []
4375 lu.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4377 lu.needed_locks[locking.LEVEL_NODE_RES] = []
4378 lu.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
4380 # The node allocation lock is actually only needed for externally replicated
4381 # instances (e.g. sharedfile or RBD) and if an iallocator is used.
4382 lu.needed_locks[locking.LEVEL_NODE_ALLOC] = []
4385 class LUInstanceFailover(LogicalUnit):
4386 """Failover an instance.
4389 HPATH = "instance-failover"
4390 HTYPE = constants.HTYPE_INSTANCE
4393 def CheckArguments(self):
4394 """Check the arguments.
4397 self.iallocator = getattr(self.op, "iallocator", None)
4398 self.target_node = getattr(self.op, "target_node", None)
4400 def ExpandNames(self):
4401 self._ExpandAndLockInstance()
4402 _ExpandNamesForMigration(self)
4405 TLMigrateInstance(self, self.op.instance_name, False, True, False,
4406 self.op.ignore_consistency, True,
4407 self.op.shutdown_timeout, self.op.ignore_ipolicy)
4409 self.tasklets = [self._migrater]
4411 def DeclareLocks(self, level):
4412 _DeclareLocksForMigration(self, level)
4414 def BuildHooksEnv(self):
4417 This runs on master, primary and secondary nodes of the instance.
4420 instance = self._migrater.instance
4421 source_node = instance.primary_node
4422 target_node = self.op.target_node
4424 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
4425 "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
4426 "OLD_PRIMARY": source_node,
4427 "NEW_PRIMARY": target_node,
4430 if instance.disk_template in constants.DTS_INT_MIRROR:
4431 env["OLD_SECONDARY"] = instance.secondary_nodes[0]
4432 env["NEW_SECONDARY"] = source_node
4434 env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = ""
4436 env.update(_BuildInstanceHookEnvByObject(self, instance))
4440 def BuildHooksNodes(self):
4441 """Build hooks nodes.
4444 instance = self._migrater.instance
4445 nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
4446 return (nl, nl + [instance.primary_node])
4449 class LUInstanceMigrate(LogicalUnit):
4450 """Migrate an instance.
4452 This is migration without shutting down, compared to the failover,
4453 which is done with shutdown.
4456 HPATH = "instance-migrate"
4457 HTYPE = constants.HTYPE_INSTANCE
4460 def ExpandNames(self):
4461 self._ExpandAndLockInstance()
4462 _ExpandNamesForMigration(self)
4465 TLMigrateInstance(self, self.op.instance_name, self.op.cleanup,
4466 False, self.op.allow_failover, False,
4467 self.op.allow_runtime_changes,
4468 constants.DEFAULT_SHUTDOWN_TIMEOUT,
4469 self.op.ignore_ipolicy)
4471 self.tasklets = [self._migrater]
4473 def DeclareLocks(self, level):
4474 _DeclareLocksForMigration(self, level)
4476 def BuildHooksEnv(self):
4479 This runs on master, primary and secondary nodes of the instance.
4482 instance = self._migrater.instance
4483 source_node = instance.primary_node
4484 target_node = self.op.target_node
4485 env = _BuildInstanceHookEnvByObject(self, instance)
4487 "MIGRATE_LIVE": self._migrater.live,
4488 "MIGRATE_CLEANUP": self.op.cleanup,
4489 "OLD_PRIMARY": source_node,
4490 "NEW_PRIMARY": target_node,
4491 "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
4494 if instance.disk_template in constants.DTS_INT_MIRROR:
4495 env["OLD_SECONDARY"] = target_node
4496 env["NEW_SECONDARY"] = source_node
4498 env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = None
4502 def BuildHooksNodes(self):
4503 """Build hooks nodes.
4506 instance = self._migrater.instance
4507 snodes = list(instance.secondary_nodes)
4508 nl = [self.cfg.GetMasterNode(), instance.primary_node] + snodes
4512 class LUInstanceMultiAlloc(NoHooksLU):
4513 """Allocates multiple instances at the same time.
4518 def CheckArguments(self):
4523 for inst in self.op.instances:
4524 if inst.iallocator is not None:
4525 raise errors.OpPrereqError("iallocator are not allowed to be set on"
4526 " instance objects", errors.ECODE_INVAL)
4527 nodes.append(bool(inst.pnode))
4528 if inst.disk_template in constants.DTS_INT_MIRROR:
4529 nodes.append(bool(inst.snode))
4531 has_nodes = compat.any(nodes)
4532 if compat.all(nodes) ^ has_nodes:
4533 raise errors.OpPrereqError("There are instance objects providing"
4534 " pnode/snode while others do not",
4537 if self.op.iallocator is None:
4538 default_iallocator = self.cfg.GetDefaultIAllocator()
4539 if default_iallocator and has_nodes:
4540 self.op.iallocator = default_iallocator
4542 raise errors.OpPrereqError("No iallocator or nodes on the instances"
4543 " given and no cluster-wide default"
4544 " iallocator found; please specify either"
4545 " an iallocator or nodes on the instances"
4546 " or set a cluster-wide default iallocator",
4549 _CheckOpportunisticLocking(self.op)
4551 dups = utils.FindDuplicates([op.instance_name for op in self.op.instances])
4553 raise errors.OpPrereqError("There are duplicate instance names: %s" %
4554 utils.CommaJoin(dups), errors.ECODE_INVAL)
4556 def ExpandNames(self):
4557 """Calculate the locks.
4560 self.share_locks = _ShareAll()
4561 self.needed_locks = {
4562 # iallocator will select nodes and even if no iallocator is used,
4563 # collisions with LUInstanceCreate should be avoided
4564 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
4567 if self.op.iallocator:
4568 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4569 self.needed_locks[locking.LEVEL_NODE_RES] = locking.ALL_SET
4571 if self.op.opportunistic_locking:
4572 self.opportunistic_locks[locking.LEVEL_NODE] = True
4573 self.opportunistic_locks[locking.LEVEL_NODE_RES] = True
4576 for inst in self.op.instances:
4577 inst.pnode = _ExpandNodeName(self.cfg, inst.pnode)
4578 nodeslist.append(inst.pnode)
4579 if inst.snode is not None:
4580 inst.snode = _ExpandNodeName(self.cfg, inst.snode)
4581 nodeslist.append(inst.snode)
4583 self.needed_locks[locking.LEVEL_NODE] = nodeslist
4584 # Lock resources of instance's primary and secondary nodes (copy to
4585 # prevent accidential modification)
4586 self.needed_locks[locking.LEVEL_NODE_RES] = list(nodeslist)
4588 def CheckPrereq(self):
4589 """Check prerequisite.
4592 cluster = self.cfg.GetClusterInfo()
4593 default_vg = self.cfg.GetVGName()
4594 ec_id = self.proc.GetECId()
4596 if self.op.opportunistic_locking:
4597 # Only consider nodes for which a lock is held
4598 node_whitelist = list(self.owned_locks(locking.LEVEL_NODE))
4600 node_whitelist = None
4602 insts = [_CreateInstanceAllocRequest(op, _ComputeDisks(op, default_vg),
4603 _ComputeNics(op, cluster, None,
4605 _ComputeFullBeParams(op, cluster),
4607 for op in self.op.instances]
4609 req = iallocator.IAReqMultiInstanceAlloc(instances=insts)
4610 ial = iallocator.IAllocator(self.cfg, self.rpc, req)
4612 ial.Run(self.op.iallocator)
4615 raise errors.OpPrereqError("Can't compute nodes using"
4616 " iallocator '%s': %s" %
4617 (self.op.iallocator, ial.info),
4620 self.ia_result = ial.result
4623 self.dry_run_result = objects.FillDict(self._ConstructPartialResult(), {
4624 constants.JOB_IDS_KEY: [],
4627 def _ConstructPartialResult(self):
4628 """Contructs the partial result.
4631 (allocatable, failed) = self.ia_result
4633 opcodes.OpInstanceMultiAlloc.ALLOCATABLE_KEY:
4634 map(compat.fst, allocatable),
4635 opcodes.OpInstanceMultiAlloc.FAILED_KEY: failed,
4638 def Exec(self, feedback_fn):
4639 """Executes the opcode.
4642 op2inst = dict((op.instance_name, op) for op in self.op.instances)
4643 (allocatable, failed) = self.ia_result
4646 for (name, nodes) in allocatable:
4647 op = op2inst.pop(name)
4650 (op.pnode, op.snode) = nodes
4656 missing = set(op2inst.keys()) - set(failed)
4657 assert not missing, \
4658 "Iallocator did return incomplete result: %s" % utils.CommaJoin(missing)
4660 return ResultWithJobs(jobs, **self._ConstructPartialResult())
4663 class _InstNicModPrivate:
4664 """Data structure for network interface modifications.
4666 Used by L{LUInstanceSetParams}.
4674 def PrepareContainerMods(mods, private_fn):
4675 """Prepares a list of container modifications by adding a private data field.
4677 @type mods: list of tuples; (operation, index, parameters)
4678 @param mods: List of modifications
4679 @type private_fn: callable or None
4680 @param private_fn: Callable for constructing a private data field for a
4685 if private_fn is None:
4690 return [(op, idx, params, fn()) for (op, idx, params) in mods]
4693 def _CheckNodesPhysicalCPUs(lu, nodenames, requested, hypervisor_name):
4694 """Checks if nodes have enough physical CPUs
4696 This function checks if all given nodes have the needed number of
4697 physical CPUs. In case any node has less CPUs or we cannot get the
4698 information from the node, this function raises an OpPrereqError
4701 @type lu: C{LogicalUnit}
4702 @param lu: a logical unit from which we get configuration data
4703 @type nodenames: C{list}
4704 @param nodenames: the list of node names to check
4705 @type requested: C{int}
4706 @param requested: the minimum acceptable number of physical CPUs
4707 @raise errors.OpPrereqError: if the node doesn't have enough CPUs,
4708 or we cannot check the node
4711 nodeinfo = lu.rpc.call_node_info(nodenames, None, [hypervisor_name], None)
4712 for node in nodenames:
4713 info = nodeinfo[node]
4714 info.Raise("Cannot get current information from node %s" % node,
4715 prereq=True, ecode=errors.ECODE_ENVIRON)
4716 (_, _, (hv_info, )) = info.payload
4717 num_cpus = hv_info.get("cpu_total", None)
4718 if not isinstance(num_cpus, int):
4719 raise errors.OpPrereqError("Can't compute the number of physical CPUs"
4720 " on node %s, result was '%s'" %
4721 (node, num_cpus), errors.ECODE_ENVIRON)
4722 if requested > num_cpus:
4723 raise errors.OpPrereqError("Node %s has %s physical CPUs, but %s are "
4724 "required" % (node, num_cpus, requested),
4728 def GetItemFromContainer(identifier, kind, container):
4729 """Return the item refered by the identifier.
4731 @type identifier: string
4732 @param identifier: Item index or name or UUID
4734 @param kind: One-word item description
4735 @type container: list
4736 @param container: Container to get the item from
4741 idx = int(identifier)
4744 absidx = len(container) - 1
4746 raise IndexError("Not accepting negative indices other than -1")
4747 elif idx > len(container):
4748 raise IndexError("Got %s index %s, but there are only %s" %
4749 (kind, idx, len(container)))
4752 return (absidx, container[idx])
4756 for idx, item in enumerate(container):
4757 if item.uuid == identifier or item.name == identifier:
4760 raise errors.OpPrereqError("Cannot find %s with identifier %s" %
4761 (kind, identifier), errors.ECODE_NOENT)
4764 def ApplyContainerMods(kind, container, chgdesc, mods,
4765 create_fn, modify_fn, remove_fn):
4766 """Applies descriptions in C{mods} to C{container}.
4769 @param kind: One-word item description
4770 @type container: list
4771 @param container: Container to modify
4772 @type chgdesc: None or list
4773 @param chgdesc: List of applied changes
4775 @param mods: Modifications as returned by L{PrepareContainerMods}
4776 @type create_fn: callable
4777 @param create_fn: Callback for creating a new item (L{constants.DDM_ADD});
4778 receives absolute item index, parameters and private data object as added
4779 by L{PrepareContainerMods}, returns tuple containing new item and changes
4781 @type modify_fn: callable
4782 @param modify_fn: Callback for modifying an existing item
4783 (L{constants.DDM_MODIFY}); receives absolute item index, item, parameters
4784 and private data object as added by L{PrepareContainerMods}, returns
4786 @type remove_fn: callable
4787 @param remove_fn: Callback on removing item; receives absolute item index,
4788 item and private data object as added by L{PrepareContainerMods}
4791 for (op, identifier, params, private) in mods:
4794 if op == constants.DDM_ADD:
4795 # Calculate where item will be added
4796 # When adding an item, identifier can only be an index
4798 idx = int(identifier)
4800 raise errors.OpPrereqError("Only possitive integer or -1 is accepted as"
4801 " identifier for %s" % constants.DDM_ADD,
4804 addidx = len(container)
4807 raise IndexError("Not accepting negative indices other than -1")
4808 elif idx > len(container):
4809 raise IndexError("Got %s index %s, but there are only %s" %
4810 (kind, idx, len(container)))
4813 if create_fn is None:
4816 (item, changes) = create_fn(addidx, params, private)
4819 container.append(item)
4822 assert idx <= len(container)
4823 # list.insert does so before the specified index
4824 container.insert(idx, item)
4826 # Retrieve existing item
4827 (absidx, item) = GetItemFromContainer(identifier, kind, container)
4829 if op == constants.DDM_REMOVE:
4832 if remove_fn is not None:
4833 remove_fn(absidx, item, private)
4835 changes = [("%s/%s" % (kind, absidx), "remove")]
4837 assert container[absidx] == item
4838 del container[absidx]
4839 elif op == constants.DDM_MODIFY:
4840 if modify_fn is not None:
4841 changes = modify_fn(absidx, item, params, private)
4843 raise errors.ProgrammerError("Unhandled operation '%s'" % op)
4845 assert _TApplyContModsCbChanges(changes)
4847 if not (chgdesc is None or changes is None):
4848 chgdesc.extend(changes)
4851 def _UpdateIvNames(base_index, disks):
4852 """Updates the C{iv_name} attribute of disks.
4854 @type disks: list of L{objects.Disk}
4857 for (idx, disk) in enumerate(disks):
4858 disk.iv_name = "disk/%s" % (base_index + idx, )
4861 class LUInstanceSetParams(LogicalUnit):
4862 """Modifies an instances's parameters.
4865 HPATH = "instance-modify"
4866 HTYPE = constants.HTYPE_INSTANCE
4870 def _UpgradeDiskNicMods(kind, mods, verify_fn):
4871 assert ht.TList(mods)
4872 assert not mods or len(mods[0]) in (2, 3)
4874 if mods and len(mods[0]) == 2:
4878 for op, params in mods:
4879 if op in (constants.DDM_ADD, constants.DDM_REMOVE):
4880 result.append((op, -1, params))
4884 raise errors.OpPrereqError("Only one %s add or remove operation is"
4885 " supported at a time" % kind,
4888 result.append((constants.DDM_MODIFY, op, params))
4890 assert verify_fn(result)
4897 def _CheckMods(kind, mods, key_types, item_fn):
4898 """Ensures requested disk/NIC modifications are valid.
4901 for (op, _, params) in mods:
4902 assert ht.TDict(params)
4904 # If 'key_types' is an empty dict, we assume we have an
4905 # 'ext' template and thus do not ForceDictType
4907 utils.ForceDictType(params, key_types)
4909 if op == constants.DDM_REMOVE:
4911 raise errors.OpPrereqError("No settings should be passed when"
4912 " removing a %s" % kind,
4914 elif op in (constants.DDM_ADD, constants.DDM_MODIFY):
4917 raise errors.ProgrammerError("Unhandled operation '%s'" % op)
4920 def _VerifyDiskModification(op, params):
4921 """Verifies a disk modification.
4924 if op == constants.DDM_ADD:
4925 mode = params.setdefault(constants.IDISK_MODE, constants.DISK_RDWR)
4926 if mode not in constants.DISK_ACCESS_SET:
4927 raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode,
4930 size = params.get(constants.IDISK_SIZE, None)
4932 raise errors.OpPrereqError("Required disk parameter '%s' missing" %
4933 constants.IDISK_SIZE, errors.ECODE_INVAL)
4937 except (TypeError, ValueError), err:
4938 raise errors.OpPrereqError("Invalid disk size parameter: %s" % err,
4941 params[constants.IDISK_SIZE] = size
4942 name = params.get(constants.IDISK_NAME, None)
4943 if name is not None and name.lower() == constants.VALUE_NONE:
4944 params[constants.IDISK_NAME] = None
4946 elif op == constants.DDM_MODIFY:
4947 if constants.IDISK_SIZE in params:
4948 raise errors.OpPrereqError("Disk size change not possible, use"
4949 " grow-disk", errors.ECODE_INVAL)
4951 raise errors.OpPrereqError("Disk modification doesn't support"
4952 " additional arbitrary parameters",
4954 name = params.get(constants.IDISK_NAME, None)
4955 if name is not None and name.lower() == constants.VALUE_NONE:
4956 params[constants.IDISK_NAME] = None
4959 def _VerifyNicModification(op, params):
4960 """Verifies a network interface modification.
4963 if op in (constants.DDM_ADD, constants.DDM_MODIFY):
4964 ip = params.get(constants.INIC_IP, None)
4965 name = params.get(constants.INIC_NAME, None)
4966 req_net = params.get(constants.INIC_NETWORK, None)
4967 link = params.get(constants.NIC_LINK, None)
4968 mode = params.get(constants.NIC_MODE, None)
4969 if name is not None and name.lower() == constants.VALUE_NONE:
4970 params[constants.INIC_NAME] = None
4971 if req_net is not None:
4972 if req_net.lower() == constants.VALUE_NONE:
4973 params[constants.INIC_NETWORK] = None
4975 elif link is not None or mode is not None:
4976 raise errors.OpPrereqError("If network is given"
4977 " mode or link should not",
4980 if op == constants.DDM_ADD:
4981 macaddr = params.get(constants.INIC_MAC, None)
4983 params[constants.INIC_MAC] = constants.VALUE_AUTO
4986 if ip.lower() == constants.VALUE_NONE:
4987 params[constants.INIC_IP] = None
4989 if ip.lower() == constants.NIC_IP_POOL:
4990 if op == constants.DDM_ADD and req_net is None:
4991 raise errors.OpPrereqError("If ip=pool, parameter network"
4995 if not netutils.IPAddress.IsValid(ip):
4996 raise errors.OpPrereqError("Invalid IP address '%s'" % ip,
4999 if constants.INIC_MAC in params:
5000 macaddr = params[constants.INIC_MAC]
5001 if macaddr not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5002 macaddr = utils.NormalizeAndValidateMac(macaddr)
5004 if op == constants.DDM_MODIFY and macaddr == constants.VALUE_AUTO:
5005 raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5006 " modifying an existing NIC",
5009 def CheckArguments(self):
5010 if not (self.op.nics or self.op.disks or self.op.disk_template or
5011 self.op.hvparams or self.op.beparams or self.op.os_name or
5012 self.op.offline is not None or self.op.runtime_mem or
5014 raise errors.OpPrereqError("No changes submitted", errors.ECODE_INVAL)
5016 if self.op.hvparams:
5017 _CheckParamsNotGlobal(self.op.hvparams, constants.HVC_GLOBALS,
5018 "hypervisor", "instance", "cluster")
5020 self.op.disks = self._UpgradeDiskNicMods(
5021 "disk", self.op.disks, opcodes.OpInstanceSetParams.TestDiskModifications)
5022 self.op.nics = self._UpgradeDiskNicMods(
5023 "NIC", self.op.nics, opcodes.OpInstanceSetParams.TestNicModifications)
5025 if self.op.disks and self.op.disk_template is not None:
5026 raise errors.OpPrereqError("Disk template conversion and other disk"
5027 " changes not supported at the same time",
5030 if (self.op.disk_template and
5031 self.op.disk_template in constants.DTS_INT_MIRROR and
5032 self.op.remote_node is None):
5033 raise errors.OpPrereqError("Changing the disk template to a mirrored"
5034 " one requires specifying a secondary node",
5037 # Check NIC modifications
5038 self._CheckMods("NIC", self.op.nics, constants.INIC_PARAMS_TYPES,
5039 self._VerifyNicModification)
5042 self.op.pnode = _ExpandNodeName(self.cfg, self.op.pnode)
5044 def ExpandNames(self):
5045 self._ExpandAndLockInstance()
5046 self.needed_locks[locking.LEVEL_NODEGROUP] = []
5047 # Can't even acquire node locks in shared mode as upcoming changes in
5048 # Ganeti 2.6 will start to modify the node object on disk conversion
5049 self.needed_locks[locking.LEVEL_NODE] = []
5050 self.needed_locks[locking.LEVEL_NODE_RES] = []
5051 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5052 # Look node group to look up the ipolicy
5053 self.share_locks[locking.LEVEL_NODEGROUP] = 1
5055 def DeclareLocks(self, level):
5056 if level == locking.LEVEL_NODEGROUP:
5057 assert not self.needed_locks[locking.LEVEL_NODEGROUP]
5058 # Acquire locks for the instance's nodegroups optimistically. Needs
5059 # to be verified in CheckPrereq
5060 self.needed_locks[locking.LEVEL_NODEGROUP] = \
5061 self.cfg.GetInstanceNodeGroups(self.op.instance_name)
5062 elif level == locking.LEVEL_NODE:
5063 self._LockInstancesNodes()
5064 if self.op.disk_template and self.op.remote_node:
5065 self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node)
5066 self.needed_locks[locking.LEVEL_NODE].append(self.op.remote_node)
5067 elif level == locking.LEVEL_NODE_RES and self.op.disk_template:
5069 self.needed_locks[locking.LEVEL_NODE_RES] = \
5070 _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
5072 def BuildHooksEnv(self):
5075 This runs on the master, primary and secondaries.
5079 if constants.BE_MINMEM in self.be_new:
5080 args["minmem"] = self.be_new[constants.BE_MINMEM]
5081 if constants.BE_MAXMEM in self.be_new:
5082 args["maxmem"] = self.be_new[constants.BE_MAXMEM]
5083 if constants.BE_VCPUS in self.be_new:
5084 args["vcpus"] = self.be_new[constants.BE_VCPUS]
5085 # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
5086 # information at all.
5088 if self._new_nics is not None:
5091 for nic in self._new_nics:
5092 n = copy.deepcopy(nic)
5093 nicparams = self.cluster.SimpleFillNIC(n.nicparams)
5094 n.nicparams = nicparams
5095 nics.append(_NICToTuple(self, n))
5099 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5100 if self.op.disk_template:
5101 env["NEW_DISK_TEMPLATE"] = self.op.disk_template
5102 if self.op.runtime_mem:
5103 env["RUNTIME_MEMORY"] = self.op.runtime_mem
5107 def BuildHooksNodes(self):
5108 """Build hooks nodes.
5111 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
5114 def _PrepareNicModification(self, params, private, old_ip, old_net_uuid,
5115 old_params, cluster, pnode):
5117 update_params_dict = dict([(key, params[key])
5118 for key in constants.NICS_PARAMETERS
5121 req_link = update_params_dict.get(constants.NIC_LINK, None)
5122 req_mode = update_params_dict.get(constants.NIC_MODE, None)
5125 new_net_uuid_or_name = params.get(constants.INIC_NETWORK, old_net_uuid)
5126 if new_net_uuid_or_name:
5127 new_net_uuid = self.cfg.LookupNetwork(new_net_uuid_or_name)
5128 new_net_obj = self.cfg.GetNetwork(new_net_uuid)
5131 old_net_obj = self.cfg.GetNetwork(old_net_uuid)
5134 netparams = self.cfg.GetGroupNetParams(new_net_uuid, pnode)
5136 raise errors.OpPrereqError("No netparams found for the network"
5137 " %s, probably not connected" %
5138 new_net_obj.name, errors.ECODE_INVAL)
5139 new_params = dict(netparams)
5141 new_params = _GetUpdatedParams(old_params, update_params_dict)
5143 utils.ForceDictType(new_params, constants.NICS_PARAMETER_TYPES)
5145 new_filled_params = cluster.SimpleFillNIC(new_params)
5146 objects.NIC.CheckParameterSyntax(new_filled_params)
5148 new_mode = new_filled_params[constants.NIC_MODE]
5149 if new_mode == constants.NIC_MODE_BRIDGED:
5150 bridge = new_filled_params[constants.NIC_LINK]
5151 msg = self.rpc.call_bridges_exist(pnode, [bridge]).fail_msg
5153 msg = "Error checking bridges on node '%s': %s" % (pnode, msg)
5155 self.warn.append(msg)
5157 raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
5159 elif new_mode == constants.NIC_MODE_ROUTED:
5160 ip = params.get(constants.INIC_IP, old_ip)
5162 raise errors.OpPrereqError("Cannot set the NIC IP address to None"
5163 " on a routed NIC", errors.ECODE_INVAL)
5165 elif new_mode == constants.NIC_MODE_OVS:
5166 # TODO: check OVS link
5167 self.LogInfo("OVS links are currently not checked for correctness")
5169 if constants.INIC_MAC in params:
5170 mac = params[constants.INIC_MAC]
5172 raise errors.OpPrereqError("Cannot unset the NIC MAC address",
5174 elif mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5175 # otherwise generate the MAC address
5176 params[constants.INIC_MAC] = \
5177 self.cfg.GenerateMAC(new_net_uuid, self.proc.GetECId())
5179 # or validate/reserve the current one
5181 self.cfg.ReserveMAC(mac, self.proc.GetECId())
5182 except errors.ReservationError:
5183 raise errors.OpPrereqError("MAC address '%s' already in use"
5184 " in cluster" % mac,
5185 errors.ECODE_NOTUNIQUE)
5186 elif new_net_uuid != old_net_uuid:
5188 def get_net_prefix(net_uuid):
5191 nobj = self.cfg.GetNetwork(net_uuid)
5192 mac_prefix = nobj.mac_prefix
5196 new_prefix = get_net_prefix(new_net_uuid)
5197 old_prefix = get_net_prefix(old_net_uuid)
5198 if old_prefix != new_prefix:
5199 params[constants.INIC_MAC] = \
5200 self.cfg.GenerateMAC(new_net_uuid, self.proc.GetECId())
5202 # if there is a change in (ip, network) tuple
5203 new_ip = params.get(constants.INIC_IP, old_ip)
5204 if (new_ip, new_net_uuid) != (old_ip, old_net_uuid):
5206 # if IP is pool then require a network and generate one IP
5207 if new_ip.lower() == constants.NIC_IP_POOL:
5210 new_ip = self.cfg.GenerateIp(new_net_uuid, self.proc.GetECId())
5211 except errors.ReservationError:
5212 raise errors.OpPrereqError("Unable to get a free IP"
5213 " from the address pool",
5215 self.LogInfo("Chose IP %s from network %s",
5218 params[constants.INIC_IP] = new_ip
5220 raise errors.OpPrereqError("ip=pool, but no network found",
5222 # Reserve new IP if in the new network if any
5225 self.cfg.ReserveIp(new_net_uuid, new_ip, self.proc.GetECId())
5226 self.LogInfo("Reserving IP %s in network %s",
5227 new_ip, new_net_obj.name)
5228 except errors.ReservationError:
5229 raise errors.OpPrereqError("IP %s not available in network %s" %
5230 (new_ip, new_net_obj.name),
5231 errors.ECODE_NOTUNIQUE)
5232 # new network is None so check if new IP is a conflicting IP
5233 elif self.op.conflicts_check:
5234 _CheckForConflictingIp(self, new_ip, pnode)
5236 # release old IP if old network is not None
5237 if old_ip and old_net_uuid:
5239 self.cfg.ReleaseIp(old_net_uuid, old_ip, self.proc.GetECId())
5240 except errors.AddressPoolError:
5241 logging.warning("Release IP %s not contained in network %s",
5242 old_ip, old_net_obj.name)
5244 # there are no changes in (ip, network) tuple and old network is not None
5245 elif (old_net_uuid is not None and
5246 (req_link is not None or req_mode is not None)):
5247 raise errors.OpPrereqError("Not allowed to change link or mode of"
5248 " a NIC that is connected to a network",
5251 private.params = new_params
5252 private.filled = new_filled_params
5254 def _PreCheckDiskTemplate(self, pnode_info):
5255 """CheckPrereq checks related to a new disk template."""
5256 # Arguments are passed to avoid configuration lookups
5257 instance = self.instance
5258 pnode = instance.primary_node
5259 cluster = self.cluster
5260 if instance.disk_template == self.op.disk_template:
5261 raise errors.OpPrereqError("Instance already has disk template %s" %
5262 instance.disk_template, errors.ECODE_INVAL)
5264 if (instance.disk_template,
5265 self.op.disk_template) not in self._DISK_CONVERSIONS:
5266 raise errors.OpPrereqError("Unsupported disk template conversion from"
5267 " %s to %s" % (instance.disk_template,
5268 self.op.disk_template),
5270 _CheckInstanceState(self, instance, INSTANCE_DOWN,
5271 msg="cannot change disk template")
5272 if self.op.disk_template in constants.DTS_INT_MIRROR:
5273 if self.op.remote_node == pnode:
5274 raise errors.OpPrereqError("Given new secondary node %s is the same"
5275 " as the primary node of the instance" %
5276 self.op.remote_node, errors.ECODE_STATE)
5277 _CheckNodeOnline(self, self.op.remote_node)
5278 _CheckNodeNotDrained(self, self.op.remote_node)
5279 # FIXME: here we assume that the old instance type is DT_PLAIN
5280 assert instance.disk_template == constants.DT_PLAIN
5281 disks = [{constants.IDISK_SIZE: d.size,
5282 constants.IDISK_VG: d.logical_id[0]}
5283 for d in instance.disks]
5284 required = _ComputeDiskSizePerVG(self.op.disk_template, disks)
5285 _CheckNodesFreeDiskPerVG(self, [self.op.remote_node], required)
5287 snode_info = self.cfg.GetNodeInfo(self.op.remote_node)
5288 snode_group = self.cfg.GetNodeGroup(snode_info.group)
5289 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
5291 _CheckTargetNodeIPolicy(self, ipolicy, instance, snode_info, self.cfg,
5292 ignore=self.op.ignore_ipolicy)
5293 if pnode_info.group != snode_info.group:
5294 self.LogWarning("The primary and secondary nodes are in two"
5295 " different node groups; the disk parameters"
5296 " from the first disk's node group will be"
5299 if not self.op.disk_template in constants.DTS_EXCL_STORAGE:
5300 # Make sure none of the nodes require exclusive storage
5301 nodes = [pnode_info]
5302 if self.op.disk_template in constants.DTS_INT_MIRROR:
5304 nodes.append(snode_info)
5305 has_es = lambda n: _IsExclusiveStorageEnabledNode(self.cfg, n)
5306 if compat.any(map(has_es, nodes)):
5307 errmsg = ("Cannot convert disk template from %s to %s when exclusive"
5308 " storage is enabled" % (instance.disk_template,
5309 self.op.disk_template))
5310 raise errors.OpPrereqError(errmsg, errors.ECODE_STATE)
5312 def CheckPrereq(self):
5313 """Check prerequisites.
5315 This only checks the instance list against the existing names.
5318 assert self.op.instance_name in self.owned_locks(locking.LEVEL_INSTANCE)
5319 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5321 cluster = self.cluster = self.cfg.GetClusterInfo()
5322 assert self.instance is not None, \
5323 "Cannot retrieve locked instance %s" % self.op.instance_name
5325 pnode = instance.primary_node
5329 if (self.op.pnode is not None and self.op.pnode != pnode and
5331 # verify that the instance is not up
5332 instance_info = self.rpc.call_instance_info(pnode, instance.name,
5333 instance.hypervisor)
5334 if instance_info.fail_msg:
5335 self.warn.append("Can't get instance runtime information: %s" %
5336 instance_info.fail_msg)
5337 elif instance_info.payload:
5338 raise errors.OpPrereqError("Instance is still running on %s" % pnode,
5341 assert pnode in self.owned_locks(locking.LEVEL_NODE)
5342 nodelist = list(instance.all_nodes)
5343 pnode_info = self.cfg.GetNodeInfo(pnode)
5344 self.diskparams = self.cfg.GetInstanceDiskParams(instance)
5346 #_CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups)
5347 assert pnode_info.group in self.owned_locks(locking.LEVEL_NODEGROUP)
5348 group_info = self.cfg.GetNodeGroup(pnode_info.group)
5350 # dictionary with instance information after the modification
5353 # Check disk modifications. This is done here and not in CheckArguments
5354 # (as with NICs), because we need to know the instance's disk template
5355 if instance.disk_template == constants.DT_EXT:
5356 self._CheckMods("disk", self.op.disks, {},
5357 self._VerifyDiskModification)
5359 self._CheckMods("disk", self.op.disks, constants.IDISK_PARAMS_TYPES,
5360 self._VerifyDiskModification)
5362 # Prepare disk/NIC modifications
5363 self.diskmod = PrepareContainerMods(self.op.disks, None)
5364 self.nicmod = PrepareContainerMods(self.op.nics, _InstNicModPrivate)
5366 # Check the validity of the `provider' parameter
5367 if instance.disk_template in constants.DT_EXT:
5368 for mod in self.diskmod:
5369 ext_provider = mod[2].get(constants.IDISK_PROVIDER, None)
5370 if mod[0] == constants.DDM_ADD:
5371 if ext_provider is None:
5372 raise errors.OpPrereqError("Instance template is '%s' and parameter"
5373 " '%s' missing, during disk add" %
5375 constants.IDISK_PROVIDER),
5377 elif mod[0] == constants.DDM_MODIFY:
5379 raise errors.OpPrereqError("Parameter '%s' is invalid during disk"
5381 constants.IDISK_PROVIDER,
5384 for mod in self.diskmod:
5385 ext_provider = mod[2].get(constants.IDISK_PROVIDER, None)
5386 if ext_provider is not None:
5387 raise errors.OpPrereqError("Parameter '%s' is only valid for"
5388 " instances of type '%s'" %
5389 (constants.IDISK_PROVIDER,
5394 if self.op.os_name and not self.op.force:
5395 _CheckNodeHasOS(self, instance.primary_node, self.op.os_name,
5396 self.op.force_variant)
5397 instance_os = self.op.os_name
5399 instance_os = instance.os
5401 assert not (self.op.disk_template and self.op.disks), \
5402 "Can't modify disk template and apply disk changes at the same time"
5404 if self.op.disk_template:
5405 self._PreCheckDiskTemplate(pnode_info)
5407 # hvparams processing
5408 if self.op.hvparams:
5409 hv_type = instance.hypervisor
5410 i_hvdict = _GetUpdatedParams(instance.hvparams, self.op.hvparams)
5411 utils.ForceDictType(i_hvdict, constants.HVS_PARAMETER_TYPES)
5412 hv_new = cluster.SimpleFillHV(hv_type, instance.os, i_hvdict)
5415 hypervisor.GetHypervisorClass(hv_type).CheckParameterSyntax(hv_new)
5416 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5417 self.hv_proposed = self.hv_new = hv_new # the new actual values
5418 self.hv_inst = i_hvdict # the new dict (without defaults)
5420 self.hv_proposed = cluster.SimpleFillHV(instance.hypervisor, instance.os,
5422 self.hv_new = self.hv_inst = {}
5424 # beparams processing
5425 if self.op.beparams:
5426 i_bedict = _GetUpdatedParams(instance.beparams, self.op.beparams,
5428 objects.UpgradeBeParams(i_bedict)
5429 utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES)
5430 be_new = cluster.SimpleFillBE(i_bedict)
5431 self.be_proposed = self.be_new = be_new # the new actual values
5432 self.be_inst = i_bedict # the new dict (without defaults)
5434 self.be_new = self.be_inst = {}
5435 self.be_proposed = cluster.SimpleFillBE(instance.beparams)
5436 be_old = cluster.FillBE(instance)
5438 # CPU param validation -- checking every time a parameter is
5439 # changed to cover all cases where either CPU mask or vcpus have
5441 if (constants.BE_VCPUS in self.be_proposed and
5442 constants.HV_CPU_MASK in self.hv_proposed):
5444 utils.ParseMultiCpuMask(self.hv_proposed[constants.HV_CPU_MASK])
5445 # Verify mask is consistent with number of vCPUs. Can skip this
5446 # test if only 1 entry in the CPU mask, which means same mask
5447 # is applied to all vCPUs.
5448 if (len(cpu_list) > 1 and
5449 len(cpu_list) != self.be_proposed[constants.BE_VCPUS]):
5450 raise errors.OpPrereqError("Number of vCPUs [%d] does not match the"
5452 (self.be_proposed[constants.BE_VCPUS],
5453 self.hv_proposed[constants.HV_CPU_MASK]),
5456 # Only perform this test if a new CPU mask is given
5457 if constants.HV_CPU_MASK in self.hv_new:
5458 # Calculate the largest CPU number requested
5459 max_requested_cpu = max(map(max, cpu_list))
5460 # Check that all of the instance's nodes have enough physical CPUs to
5461 # satisfy the requested CPU mask
5462 _CheckNodesPhysicalCPUs(self, instance.all_nodes,
5463 max_requested_cpu + 1, instance.hypervisor)
5465 # osparams processing
5466 if self.op.osparams:
5467 i_osdict = _GetUpdatedParams(instance.osparams, self.op.osparams)
5468 _CheckOSParams(self, True, nodelist, instance_os, i_osdict)
5469 self.os_inst = i_osdict # the new dict (without defaults)
5473 #TODO(dynmem): do the appropriate check involving MINMEM
5474 if (constants.BE_MAXMEM in self.op.beparams and not self.op.force and
5475 be_new[constants.BE_MAXMEM] > be_old[constants.BE_MAXMEM]):
5476 mem_check_list = [pnode]
5477 if be_new[constants.BE_AUTO_BALANCE]:
5478 # either we changed auto_balance to yes or it was from before
5479 mem_check_list.extend(instance.secondary_nodes)
5480 instance_info = self.rpc.call_instance_info(pnode, instance.name,
5481 instance.hypervisor)
5482 nodeinfo = self.rpc.call_node_info(mem_check_list, None,
5483 [instance.hypervisor], False)
5484 pninfo = nodeinfo[pnode]
5485 msg = pninfo.fail_msg
5487 # Assume the primary node is unreachable and go ahead
5488 self.warn.append("Can't get info from primary node %s: %s" %
5491 (_, _, (pnhvinfo, )) = pninfo.payload
5492 if not isinstance(pnhvinfo.get("memory_free", None), int):
5493 self.warn.append("Node data from primary node %s doesn't contain"
5494 " free memory information" % pnode)
5495 elif instance_info.fail_msg:
5496 self.warn.append("Can't get instance runtime information: %s" %
5497 instance_info.fail_msg)
5499 if instance_info.payload:
5500 current_mem = int(instance_info.payload["memory"])
5502 # Assume instance not running
5503 # (there is a slight race condition here, but it's not very
5504 # probable, and we have no other way to check)
5505 # TODO: Describe race condition
5507 #TODO(dynmem): do the appropriate check involving MINMEM
5508 miss_mem = (be_new[constants.BE_MAXMEM] - current_mem -
5509 pnhvinfo["memory_free"])
5511 raise errors.OpPrereqError("This change will prevent the instance"
5512 " from starting, due to %d MB of memory"
5513 " missing on its primary node" %
5514 miss_mem, errors.ECODE_NORES)
5516 if be_new[constants.BE_AUTO_BALANCE]:
5517 for node, nres in nodeinfo.items():
5518 if node not in instance.secondary_nodes:
5520 nres.Raise("Can't get info from secondary node %s" % node,
5521 prereq=True, ecode=errors.ECODE_STATE)
5522 (_, _, (nhvinfo, )) = nres.payload
5523 if not isinstance(nhvinfo.get("memory_free", None), int):
5524 raise errors.OpPrereqError("Secondary node %s didn't return free"
5525 " memory information" % node,
5527 #TODO(dynmem): do the appropriate check involving MINMEM
5528 elif be_new[constants.BE_MAXMEM] > nhvinfo["memory_free"]:
5529 raise errors.OpPrereqError("This change will prevent the instance"
5530 " from failover to its secondary node"
5531 " %s, due to not enough memory" % node,
5534 if self.op.runtime_mem:
5535 remote_info = self.rpc.call_instance_info(instance.primary_node,
5537 instance.hypervisor)
5538 remote_info.Raise("Error checking node %s" % instance.primary_node)
5539 if not remote_info.payload: # not running already
5540 raise errors.OpPrereqError("Instance %s is not running" %
5541 instance.name, errors.ECODE_STATE)
5543 current_memory = remote_info.payload["memory"]
5544 if (not self.op.force and
5545 (self.op.runtime_mem > self.be_proposed[constants.BE_MAXMEM] or
5546 self.op.runtime_mem < self.be_proposed[constants.BE_MINMEM])):
5547 raise errors.OpPrereqError("Instance %s must have memory between %d"
5548 " and %d MB of memory unless --force is"
5551 self.be_proposed[constants.BE_MINMEM],
5552 self.be_proposed[constants.BE_MAXMEM]),
5555 delta = self.op.runtime_mem - current_memory
5557 _CheckNodeFreeMemory(self, instance.primary_node,
5558 "ballooning memory for instance %s" %
5559 instance.name, delta, instance.hypervisor)
5561 if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
5562 raise errors.OpPrereqError("Disk operations not supported for"
5563 " diskless instances", errors.ECODE_INVAL)
5565 def _PrepareNicCreate(_, params, private):
5566 self._PrepareNicModification(params, private, None, None,
5570 def _PrepareNicMod(_, nic, params, private):
5571 self._PrepareNicModification(params, private, nic.ip, nic.network,
5572 nic.nicparams, cluster, pnode)
5575 def _PrepareNicRemove(_, params, __):
5577 net = params.network
5578 if net is not None and ip is not None:
5579 self.cfg.ReleaseIp(net, ip, self.proc.GetECId())
5581 # Verify NIC changes (operating on copy)
5582 nics = instance.nics[:]
5583 ApplyContainerMods("NIC", nics, None, self.nicmod,
5584 _PrepareNicCreate, _PrepareNicMod, _PrepareNicRemove)
5585 if len(nics) > constants.MAX_NICS:
5586 raise errors.OpPrereqError("Instance has too many network interfaces"
5587 " (%d), cannot add more" % constants.MAX_NICS,
5590 def _PrepareDiskMod(_, disk, params, __):
5591 disk.name = params.get(constants.IDISK_NAME, None)
5593 # Verify disk changes (operating on a copy)
5594 disks = copy.deepcopy(instance.disks)
5595 ApplyContainerMods("disk", disks, None, self.diskmod, None, _PrepareDiskMod,
5597 utils.ValidateDeviceNames("disk", disks)
5598 if len(disks) > constants.MAX_DISKS:
5599 raise errors.OpPrereqError("Instance has too many disks (%d), cannot add"
5600 " more" % constants.MAX_DISKS,
5602 disk_sizes = [disk.size for disk in instance.disks]
5603 disk_sizes.extend(params["size"] for (op, idx, params, private) in
5604 self.diskmod if op == constants.DDM_ADD)
5605 ispec[constants.ISPEC_DISK_COUNT] = len(disk_sizes)
5606 ispec[constants.ISPEC_DISK_SIZE] = disk_sizes
5608 if self.op.offline is not None and self.op.offline:
5609 _CheckInstanceState(self, instance, CAN_CHANGE_INSTANCE_OFFLINE,
5610 msg="can't change to offline")
5612 # Pre-compute NIC changes (necessary to use result in hooks)
5613 self._nic_chgdesc = []
5615 # Operate on copies as this is still in prereq
5616 nics = [nic.Copy() for nic in instance.nics]
5617 ApplyContainerMods("NIC", nics, self._nic_chgdesc, self.nicmod,
5618 self._CreateNewNic, self._ApplyNicMods, None)
5619 # Verify that NIC names are unique and valid
5620 utils.ValidateDeviceNames("NIC", nics)
5621 self._new_nics = nics
5622 ispec[constants.ISPEC_NIC_COUNT] = len(self._new_nics)
5624 self._new_nics = None
5625 ispec[constants.ISPEC_NIC_COUNT] = len(instance.nics)
5627 if not self.op.ignore_ipolicy:
5628 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
5631 # Fill ispec with backend parameters
5632 ispec[constants.ISPEC_SPINDLE_USE] = \
5633 self.be_new.get(constants.BE_SPINDLE_USE, None)
5634 ispec[constants.ISPEC_CPU_COUNT] = self.be_new.get(constants.BE_VCPUS,
5637 # Copy ispec to verify parameters with min/max values separately
5638 if self.op.disk_template:
5639 new_disk_template = self.op.disk_template
5641 new_disk_template = instance.disk_template
5642 ispec_max = ispec.copy()
5643 ispec_max[constants.ISPEC_MEM_SIZE] = \
5644 self.be_new.get(constants.BE_MAXMEM, None)
5645 res_max = _ComputeIPolicyInstanceSpecViolation(ipolicy, ispec_max,
5647 ispec_min = ispec.copy()
5648 ispec_min[constants.ISPEC_MEM_SIZE] = \
5649 self.be_new.get(constants.BE_MINMEM, None)
5650 res_min = _ComputeIPolicyInstanceSpecViolation(ipolicy, ispec_min,
5653 if (res_max or res_min):
5654 # FIXME: Improve error message by including information about whether
5655 # the upper or lower limit of the parameter fails the ipolicy.
5656 msg = ("Instance allocation to group %s (%s) violates policy: %s" %
5657 (group_info, group_info.name,
5658 utils.CommaJoin(set(res_max + res_min))))
5659 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
5661 def _ConvertPlainToDrbd(self, feedback_fn):
5662 """Converts an instance from plain to drbd.
5665 feedback_fn("Converting template to drbd")
5666 instance = self.instance
5667 pnode = instance.primary_node
5668 snode = self.op.remote_node
5670 assert instance.disk_template == constants.DT_PLAIN
5672 # create a fake disk info for _GenerateDiskTemplate
5673 disk_info = [{constants.IDISK_SIZE: d.size, constants.IDISK_MODE: d.mode,
5674 constants.IDISK_VG: d.logical_id[0],
5675 constants.IDISK_NAME: d.name}
5676 for d in instance.disks]
5677 new_disks = _GenerateDiskTemplate(self, self.op.disk_template,
5678 instance.name, pnode, [snode],
5679 disk_info, None, None, 0, feedback_fn,
5681 anno_disks = rpc.AnnotateDiskParams(constants.DT_DRBD8, new_disks,
5683 p_excl_stor = _IsExclusiveStorageEnabledNodeName(self.cfg, pnode)
5684 s_excl_stor = _IsExclusiveStorageEnabledNodeName(self.cfg, snode)
5685 info = _GetInstanceInfoText(instance)
5686 feedback_fn("Creating additional volumes...")
5687 # first, create the missing data and meta devices
5688 for disk in anno_disks:
5689 # unfortunately this is... not too nice
5690 _CreateSingleBlockDev(self, pnode, instance, disk.children[1],
5691 info, True, p_excl_stor)
5692 for child in disk.children:
5693 _CreateSingleBlockDev(self, snode, instance, child, info, True,
5695 # at this stage, all new LVs have been created, we can rename the
5697 feedback_fn("Renaming original volumes...")
5698 rename_list = [(o, n.children[0].logical_id)
5699 for (o, n) in zip(instance.disks, new_disks)]
5700 result = self.rpc.call_blockdev_rename(pnode, rename_list)
5701 result.Raise("Failed to rename original LVs")
5703 feedback_fn("Initializing DRBD devices...")
5704 # all child devices are in place, we can now create the DRBD devices
5706 for disk in anno_disks:
5707 for (node, excl_stor) in [(pnode, p_excl_stor), (snode, s_excl_stor)]:
5708 f_create = node == pnode
5709 _CreateSingleBlockDev(self, node, instance, disk, info, f_create,
5711 except errors.GenericError, e:
5712 feedback_fn("Initializing of DRBD devices failed;"
5713 " renaming back original volumes...")
5714 for disk in new_disks:
5715 self.cfg.SetDiskID(disk, pnode)
5716 rename_back_list = [(n.children[0], o.logical_id)
5717 for (n, o) in zip(new_disks, instance.disks)]
5718 result = self.rpc.call_blockdev_rename(pnode, rename_back_list)
5719 result.Raise("Failed to rename LVs back after error %s" % str(e))
5722 # at this point, the instance has been modified
5723 instance.disk_template = constants.DT_DRBD8
5724 instance.disks = new_disks
5725 self.cfg.Update(instance, feedback_fn)
5727 # Release node locks while waiting for sync
5728 _ReleaseLocks(self, locking.LEVEL_NODE)
5730 # disks are created, waiting for sync
5731 disk_abort = not _WaitForSync(self, instance,
5732 oneshot=not self.op.wait_for_sync)
5734 raise errors.OpExecError("There are some degraded disks for"
5735 " this instance, please cleanup manually")
5737 # Node resource locks will be released by caller
5739 def _ConvertDrbdToPlain(self, feedback_fn):
5740 """Converts an instance from drbd to plain.
5743 instance = self.instance
5745 assert len(instance.secondary_nodes) == 1
5746 assert instance.disk_template == constants.DT_DRBD8
5748 pnode = instance.primary_node
5749 snode = instance.secondary_nodes[0]
5750 feedback_fn("Converting template to plain")
5752 old_disks = _AnnotateDiskParams(instance, instance.disks, self.cfg)
5753 new_disks = [d.children[0] for d in instance.disks]
5755 # copy over size, mode and name
5756 for parent, child in zip(old_disks, new_disks):
5757 child.size = parent.size
5758 child.mode = parent.mode
5759 child.name = parent.name
5761 # this is a DRBD disk, return its port to the pool
5762 # NOTE: this must be done right before the call to cfg.Update!
5763 for disk in old_disks:
5764 tcp_port = disk.logical_id[2]
5765 self.cfg.AddTcpUdpPort(tcp_port)
5767 # update instance structure
5768 instance.disks = new_disks
5769 instance.disk_template = constants.DT_PLAIN
5770 _UpdateIvNames(0, instance.disks)
5771 self.cfg.Update(instance, feedback_fn)
5773 # Release locks in case removing disks takes a while
5774 _ReleaseLocks(self, locking.LEVEL_NODE)
5776 feedback_fn("Removing volumes on the secondary node...")
5777 for disk in old_disks:
5778 self.cfg.SetDiskID(disk, snode)
5779 msg = self.rpc.call_blockdev_remove(snode, disk).fail_msg
5781 self.LogWarning("Could not remove block device %s on node %s,"
5782 " continuing anyway: %s", disk.iv_name, snode, msg)
5784 feedback_fn("Removing unneeded volumes on the primary node...")
5785 for idx, disk in enumerate(old_disks):
5786 meta = disk.children[1]
5787 self.cfg.SetDiskID(meta, pnode)
5788 msg = self.rpc.call_blockdev_remove(pnode, meta).fail_msg
5790 self.LogWarning("Could not remove metadata for disk %d on node %s,"
5791 " continuing anyway: %s", idx, pnode, msg)
5793 def _CreateNewDisk(self, idx, params, _):
5794 """Creates a new disk.
5797 instance = self.instance
5800 if instance.disk_template in constants.DTS_FILEBASED:
5801 (file_driver, file_path) = instance.disks[0].logical_id
5802 file_path = os.path.dirname(file_path)
5804 file_driver = file_path = None
5807 _GenerateDiskTemplate(self, instance.disk_template, instance.name,
5808 instance.primary_node, instance.secondary_nodes,
5809 [params], file_path, file_driver, idx,
5810 self.Log, self.diskparams)[0]
5812 info = _GetInstanceInfoText(instance)
5814 logging.info("Creating volume %s for instance %s",
5815 disk.iv_name, instance.name)
5816 # Note: this needs to be kept in sync with _CreateDisks
5818 for node in instance.all_nodes:
5819 f_create = (node == instance.primary_node)
5821 _CreateBlockDev(self, node, instance, disk, f_create, info, f_create)
5822 except errors.OpExecError, err:
5823 self.LogWarning("Failed to create volume %s (%s) on node '%s': %s",
5824 disk.iv_name, disk, node, err)
5826 if self.cluster.prealloc_wipe_disks:
5828 _WipeDisks(self, instance,
5829 disks=[(idx, disk, 0)])
5832 ("disk/%d" % idx, "add:size=%s,mode=%s" % (disk.size, disk.mode)),
5836 def _ModifyDisk(idx, disk, params, _):
5841 mode = params.get(constants.IDISK_MODE, None)
5844 changes.append(("disk.mode/%d" % idx, disk.mode))
5846 name = params.get(constants.IDISK_NAME, None)
5848 changes.append(("disk.name/%d" % idx, disk.name))
5852 def _RemoveDisk(self, idx, root, _):
5856 (anno_disk,) = _AnnotateDiskParams(self.instance, [root], self.cfg)
5857 for node, disk in anno_disk.ComputeNodeTree(self.instance.primary_node):
5858 self.cfg.SetDiskID(disk, node)
5859 msg = self.rpc.call_blockdev_remove(node, disk).fail_msg
5861 self.LogWarning("Could not remove disk/%d on node '%s': %s,"
5862 " continuing anyway", idx, node, msg)
5864 # if this is a DRBD disk, return its port to the pool
5865 if root.dev_type in constants.LDS_DRBD:
5866 self.cfg.AddTcpUdpPort(root.logical_id[2])
5868 def _CreateNewNic(self, idx, params, private):
5869 """Creates data structure for a new network interface.
5872 mac = params[constants.INIC_MAC]
5873 ip = params.get(constants.INIC_IP, None)
5874 net = params.get(constants.INIC_NETWORK, None)
5875 name = params.get(constants.INIC_NAME, None)
5876 net_uuid = self.cfg.LookupNetwork(net)
5877 #TODO: not private.filled?? can a nic have no nicparams??
5878 nicparams = private.filled
5879 nobj = objects.NIC(mac=mac, ip=ip, network=net_uuid, name=name,
5880 nicparams=nicparams)
5881 nobj.uuid = self.cfg.GenerateUniqueID(self.proc.GetECId())
5885 "add:mac=%s,ip=%s,mode=%s,link=%s,network=%s" %
5886 (mac, ip, private.filled[constants.NIC_MODE],
5887 private.filled[constants.NIC_LINK],
5891 def _ApplyNicMods(self, idx, nic, params, private):
5892 """Modifies a network interface.
5897 for key in [constants.INIC_MAC, constants.INIC_IP, constants.INIC_NAME]:
5899 changes.append(("nic.%s/%d" % (key, idx), params[key]))
5900 setattr(nic, key, params[key])
5902 new_net = params.get(constants.INIC_NETWORK, nic.network)
5903 new_net_uuid = self.cfg.LookupNetwork(new_net)
5904 if new_net_uuid != nic.network:
5905 changes.append(("nic.network/%d" % idx, new_net))
5906 nic.network = new_net_uuid
5909 nic.nicparams = private.filled
5911 for (key, val) in nic.nicparams.items():
5912 changes.append(("nic.%s/%d" % (key, idx), val))
5916 def Exec(self, feedback_fn):
5917 """Modifies an instance.
5919 All parameters take effect only at the next restart of the instance.
5922 # Process here the warnings from CheckPrereq, as we don't have a
5923 # feedback_fn there.
5924 # TODO: Replace with self.LogWarning
5925 for warn in self.warn:
5926 feedback_fn("WARNING: %s" % warn)
5928 assert ((self.op.disk_template is None) ^
5929 bool(self.owned_locks(locking.LEVEL_NODE_RES))), \
5930 "Not owning any node resource locks"
5933 instance = self.instance
5937 instance.primary_node = self.op.pnode
5940 if self.op.runtime_mem:
5941 rpcres = self.rpc.call_instance_balloon_memory(instance.primary_node,
5943 self.op.runtime_mem)
5944 rpcres.Raise("Cannot modify instance runtime memory")
5945 result.append(("runtime_memory", self.op.runtime_mem))
5947 # Apply disk changes
5948 ApplyContainerMods("disk", instance.disks, result, self.diskmod,
5949 self._CreateNewDisk, self._ModifyDisk, self._RemoveDisk)
5950 _UpdateIvNames(0, instance.disks)
5952 if self.op.disk_template:
5954 check_nodes = set(instance.all_nodes)
5955 if self.op.remote_node:
5956 check_nodes.add(self.op.remote_node)
5957 for level in [locking.LEVEL_NODE, locking.LEVEL_NODE_RES]:
5958 owned = self.owned_locks(level)
5959 assert not (check_nodes - owned), \
5960 ("Not owning the correct locks, owning %r, expected at least %r" %
5961 (owned, check_nodes))
5963 r_shut = _ShutdownInstanceDisks(self, instance)
5965 raise errors.OpExecError("Cannot shutdown instance disks, unable to"
5966 " proceed with disk template conversion")
5967 mode = (instance.disk_template, self.op.disk_template)
5969 self._DISK_CONVERSIONS[mode](self, feedback_fn)
5971 self.cfg.ReleaseDRBDMinors(instance.name)
5973 result.append(("disk_template", self.op.disk_template))
5975 assert instance.disk_template == self.op.disk_template, \
5976 ("Expected disk template '%s', found '%s'" %
5977 (self.op.disk_template, instance.disk_template))
5979 # Release node and resource locks if there are any (they might already have
5980 # been released during disk conversion)
5981 _ReleaseLocks(self, locking.LEVEL_NODE)
5982 _ReleaseLocks(self, locking.LEVEL_NODE_RES)
5985 if self._new_nics is not None:
5986 instance.nics = self._new_nics
5987 result.extend(self._nic_chgdesc)
5990 if self.op.hvparams:
5991 instance.hvparams = self.hv_inst
5992 for key, val in self.op.hvparams.iteritems():
5993 result.append(("hv/%s" % key, val))
5996 if self.op.beparams:
5997 instance.beparams = self.be_inst
5998 for key, val in self.op.beparams.iteritems():
5999 result.append(("be/%s" % key, val))
6003 instance.os = self.op.os_name
6006 if self.op.osparams:
6007 instance.osparams = self.os_inst
6008 for key, val in self.op.osparams.iteritems():
6009 result.append(("os/%s" % key, val))
6011 if self.op.offline is None:
6014 elif self.op.offline:
6015 # Mark instance as offline
6016 self.cfg.MarkInstanceOffline(instance.name)
6017 result.append(("admin_state", constants.ADMINST_OFFLINE))
6019 # Mark instance as online, but stopped
6020 self.cfg.MarkInstanceDown(instance.name)
6021 result.append(("admin_state", constants.ADMINST_DOWN))
6023 self.cfg.Update(instance, feedback_fn, self.proc.GetECId())
6025 assert not (self.owned_locks(locking.LEVEL_NODE_RES) or
6026 self.owned_locks(locking.LEVEL_NODE)), \
6027 "All node locks should have been released by now"
6031 _DISK_CONVERSIONS = {
6032 (constants.DT_PLAIN, constants.DT_DRBD8): _ConvertPlainToDrbd,
6033 (constants.DT_DRBD8, constants.DT_PLAIN): _ConvertDrbdToPlain,
6037 class LUInstanceChangeGroup(LogicalUnit):
6038 HPATH = "instance-change-group"
6039 HTYPE = constants.HTYPE_INSTANCE
6042 def ExpandNames(self):
6043 self.share_locks = _ShareAll()
6045 self.needed_locks = {
6046 locking.LEVEL_NODEGROUP: [],
6047 locking.LEVEL_NODE: [],
6048 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
6051 self._ExpandAndLockInstance()
6053 if self.op.target_groups:
6054 self.req_target_uuids = map(self.cfg.LookupNodeGroup,
6055 self.op.target_groups)
6057 self.req_target_uuids = None
6059 self.op.iallocator = _GetDefaultIAllocator(self.cfg, self.op.iallocator)
6061 def DeclareLocks(self, level):
6062 if level == locking.LEVEL_NODEGROUP:
6063 assert not self.needed_locks[locking.LEVEL_NODEGROUP]
6065 if self.req_target_uuids:
6066 lock_groups = set(self.req_target_uuids)
6068 # Lock all groups used by instance optimistically; this requires going
6069 # via the node before it's locked, requiring verification later on
6070 instance_groups = self.cfg.GetInstanceNodeGroups(self.op.instance_name)
6071 lock_groups.update(instance_groups)
6073 # No target groups, need to lock all of them
6074 lock_groups = locking.ALL_SET
6076 self.needed_locks[locking.LEVEL_NODEGROUP] = lock_groups
6078 elif level == locking.LEVEL_NODE:
6079 if self.req_target_uuids:
6080 # Lock all nodes used by instances
6081 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
6082 self._LockInstancesNodes()
6084 # Lock all nodes in all potential target groups
6085 lock_groups = (frozenset(self.owned_locks(locking.LEVEL_NODEGROUP)) -
6086 self.cfg.GetInstanceNodeGroups(self.op.instance_name))
6087 member_nodes = [node_name
6088 for group in lock_groups
6089 for node_name in self.cfg.GetNodeGroup(group).members]
6090 self.needed_locks[locking.LEVEL_NODE].extend(member_nodes)
6092 # Lock all nodes as all groups are potential targets
6093 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6095 def CheckPrereq(self):
6096 owned_instances = frozenset(self.owned_locks(locking.LEVEL_INSTANCE))
6097 owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP))
6098 owned_nodes = frozenset(self.owned_locks(locking.LEVEL_NODE))
6100 assert (self.req_target_uuids is None or
6101 owned_groups.issuperset(self.req_target_uuids))
6102 assert owned_instances == set([self.op.instance_name])
6104 # Get instance information
6105 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6107 # Check if node groups for locked instance are still correct
6108 assert owned_nodes.issuperset(self.instance.all_nodes), \
6109 ("Instance %s's nodes changed while we kept the lock" %
6110 self.op.instance_name)
6112 inst_groups = _CheckInstanceNodeGroups(self.cfg, self.op.instance_name,
6115 if self.req_target_uuids:
6116 # User requested specific target groups
6117 self.target_uuids = frozenset(self.req_target_uuids)
6119 # All groups except those used by the instance are potential targets
6120 self.target_uuids = owned_groups - inst_groups
6122 conflicting_groups = self.target_uuids & inst_groups
6123 if conflicting_groups:
6124 raise errors.OpPrereqError("Can't use group(s) '%s' as targets, they are"
6125 " used by the instance '%s'" %
6126 (utils.CommaJoin(conflicting_groups),
6127 self.op.instance_name),
6130 if not self.target_uuids:
6131 raise errors.OpPrereqError("There are no possible target groups",
6134 def BuildHooksEnv(self):
6138 assert self.target_uuids
6141 "TARGET_GROUPS": " ".join(self.target_uuids),
6144 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6148 def BuildHooksNodes(self):
6149 """Build hooks nodes.
6152 mn = self.cfg.GetMasterNode()
6155 def Exec(self, feedback_fn):
6156 instances = list(self.owned_locks(locking.LEVEL_INSTANCE))
6158 assert instances == [self.op.instance_name], "Instance not locked"
6160 req = iallocator.IAReqGroupChange(instances=instances,
6161 target_groups=list(self.target_uuids))
6162 ial = iallocator.IAllocator(self.cfg, self.rpc, req)
6164 ial.Run(self.op.iallocator)
6167 raise errors.OpPrereqError("Can't compute solution for changing group of"
6168 " instance '%s' using iallocator '%s': %s" %
6169 (self.op.instance_name, self.op.iallocator,
6170 ial.info), errors.ECODE_NORES)
6172 jobs = _LoadNodeEvacResult(self, ial.result, self.op.early_release, False)
6174 self.LogInfo("Iallocator returned %s job(s) for changing group of"
6175 " instance '%s'", len(jobs), self.op.instance_name)
6177 return ResultWithJobs(jobs)
6180 def _CheckDiskConsistencyInner(lu, instance, dev, node, on_primary,
6182 """Check that mirrors are not degraded.
6184 @attention: The device has to be annotated already.
6186 The ldisk parameter, if True, will change the test from the
6187 is_degraded attribute (which represents overall non-ok status for
6188 the device(s)) to the ldisk (representing the local storage status).
6191 lu.cfg.SetDiskID(dev, node)
6195 if on_primary or dev.AssembleOnSecondary():
6196 rstats = lu.rpc.call_blockdev_find(node, dev)
6197 msg = rstats.fail_msg
6199 lu.LogWarning("Can't find disk on node %s: %s", node, msg)
6201 elif not rstats.payload:
6202 lu.LogWarning("Can't find disk on node %s", node)
6206 result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
6208 result = result and not rstats.payload.is_degraded
6211 for child in dev.children:
6212 result = result and _CheckDiskConsistencyInner(lu, instance, child, node,
6218 def _CheckDiskConsistency(lu, instance, dev, node, on_primary, ldisk=False):
6219 """Wrapper around L{_CheckDiskConsistencyInner}.
6222 (disk,) = _AnnotateDiskParams(instance, [dev], lu.cfg)
6223 return _CheckDiskConsistencyInner(lu, instance, disk, node, on_primary,
6227 class TLMigrateInstance(Tasklet):
6228 """Tasklet class for instance migration.
6231 @ivar live: whether the migration will be done live or non-live;
6232 this variable is initalized only after CheckPrereq has run
6233 @type cleanup: boolean
6234 @ivar cleanup: Wheater we cleanup from a failed migration
6235 @type iallocator: string
6236 @ivar iallocator: The iallocator used to determine target_node
6237 @type target_node: string
6238 @ivar target_node: If given, the target_node to reallocate the instance to
6239 @type failover: boolean
6240 @ivar failover: Whether operation results in failover or migration
6241 @type fallback: boolean
6242 @ivar fallback: Whether fallback to failover is allowed if migration not
6244 @type ignore_consistency: boolean
6245 @ivar ignore_consistency: Wheter we should ignore consistency between source
6247 @type shutdown_timeout: int
6248 @ivar shutdown_timeout: In case of failover timeout of the shutdown
6249 @type ignore_ipolicy: bool
6250 @ivar ignore_ipolicy: If true, we can ignore instance policy when migrating
6255 _MIGRATION_POLL_INTERVAL = 1 # seconds
6256 _MIGRATION_FEEDBACK_INTERVAL = 10 # seconds
6258 def __init__(self, lu, instance_name, cleanup, failover, fallback,
6259 ignore_consistency, allow_runtime_changes, shutdown_timeout,
6261 """Initializes this class.
6264 Tasklet.__init__(self, lu)
6267 self.instance_name = instance_name
6268 self.cleanup = cleanup
6269 self.live = False # will be overridden later
6270 self.failover = failover
6271 self.fallback = fallback
6272 self.ignore_consistency = ignore_consistency
6273 self.shutdown_timeout = shutdown_timeout
6274 self.ignore_ipolicy = ignore_ipolicy
6275 self.allow_runtime_changes = allow_runtime_changes
6277 def CheckPrereq(self):
6278 """Check prerequisites.
6280 This checks that the instance is in the cluster.
6283 instance_name = _ExpandInstanceName(self.lu.cfg, self.instance_name)
6284 instance = self.cfg.GetInstanceInfo(instance_name)
6285 assert instance is not None
6286 self.instance = instance
6287 cluster = self.cfg.GetClusterInfo()
6289 if (not self.cleanup and
6290 not instance.admin_state == constants.ADMINST_UP and
6291 not self.failover and self.fallback):
6292 self.lu.LogInfo("Instance is marked down or offline, fallback allowed,"
6293 " switching to failover")
6294 self.failover = True
6296 if instance.disk_template not in constants.DTS_MIRRORED:
6301 raise errors.OpPrereqError("Instance's disk layout '%s' does not allow"
6302 " %s" % (instance.disk_template, text),
6305 if instance.disk_template in constants.DTS_EXT_MIRROR:
6306 _CheckIAllocatorOrNode(self.lu, "iallocator", "target_node")
6308 if self.lu.op.iallocator:
6309 assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC)
6310 self._RunAllocator()
6312 # We set set self.target_node as it is required by
6314 self.target_node = self.lu.op.target_node
6316 # Check that the target node is correct in terms of instance policy
6317 nodeinfo = self.cfg.GetNodeInfo(self.target_node)
6318 group_info = self.cfg.GetNodeGroup(nodeinfo.group)
6319 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
6321 _CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo, self.cfg,
6322 ignore=self.ignore_ipolicy)
6324 # self.target_node is already populated, either directly or by the
6326 target_node = self.target_node
6327 if self.target_node == instance.primary_node:
6328 raise errors.OpPrereqError("Cannot migrate instance %s"
6329 " to its primary (%s)" %
6330 (instance.name, instance.primary_node),
6333 if len(self.lu.tasklets) == 1:
6334 # It is safe to release locks only when we're the only tasklet
6336 _ReleaseLocks(self.lu, locking.LEVEL_NODE,
6337 keep=[instance.primary_node, self.target_node])
6338 _ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
6341 assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
6343 secondary_nodes = instance.secondary_nodes
6344 if not secondary_nodes:
6345 raise errors.ConfigurationError("No secondary node but using"
6346 " %s disk template" %
6347 instance.disk_template)
6348 target_node = secondary_nodes[0]
6349 if self.lu.op.iallocator or (self.lu.op.target_node and
6350 self.lu.op.target_node != target_node):
6352 text = "failed over"
6355 raise errors.OpPrereqError("Instances with disk template %s cannot"
6356 " be %s to arbitrary nodes"
6357 " (neither an iallocator nor a target"
6358 " node can be passed)" %
6359 (instance.disk_template, text),
6361 nodeinfo = self.cfg.GetNodeInfo(target_node)
6362 group_info = self.cfg.GetNodeGroup(nodeinfo.group)
6363 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
6365 _CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo, self.cfg,
6366 ignore=self.ignore_ipolicy)
6368 i_be = cluster.FillBE(instance)
6370 # check memory requirements on the secondary node
6371 if (not self.cleanup and
6372 (not self.failover or instance.admin_state == constants.ADMINST_UP)):
6373 self.tgt_free_mem = _CheckNodeFreeMemory(self.lu, target_node,
6374 "migrating instance %s" %
6376 i_be[constants.BE_MINMEM],
6377 instance.hypervisor)
6379 self.lu.LogInfo("Not checking memory on the secondary node as"
6380 " instance will not be started")
6382 # check if failover must be forced instead of migration
6383 if (not self.cleanup and not self.failover and
6384 i_be[constants.BE_ALWAYS_FAILOVER]):
6385 self.lu.LogInfo("Instance configured to always failover; fallback"
6387 self.failover = True
6389 # check bridge existance
6390 _CheckInstanceBridgesExist(self.lu, instance, node=target_node)
6392 if not self.cleanup:
6393 _CheckNodeNotDrained(self.lu, target_node)
6394 if not self.failover:
6395 result = self.rpc.call_instance_migratable(instance.primary_node,
6397 if result.fail_msg and self.fallback:
6398 self.lu.LogInfo("Can't migrate, instance offline, fallback to"
6400 self.failover = True
6402 result.Raise("Can't migrate, please use failover",
6403 prereq=True, ecode=errors.ECODE_STATE)
6405 assert not (self.failover and self.cleanup)
6407 if not self.failover:
6408 if self.lu.op.live is not None and self.lu.op.mode is not None:
6409 raise errors.OpPrereqError("Only one of the 'live' and 'mode'"
6410 " parameters are accepted",
6412 if self.lu.op.live is not None:
6414 self.lu.op.mode = constants.HT_MIGRATION_LIVE
6416 self.lu.op.mode = constants.HT_MIGRATION_NONLIVE
6417 # reset the 'live' parameter to None so that repeated
6418 # invocations of CheckPrereq do not raise an exception
6419 self.lu.op.live = None
6420 elif self.lu.op.mode is None:
6421 # read the default value from the hypervisor
6422 i_hv = cluster.FillHV(self.instance, skip_globals=False)
6423 self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE]
6425 self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE
6427 # Failover is never live
6430 if not (self.failover or self.cleanup):
6431 remote_info = self.rpc.call_instance_info(instance.primary_node,
6433 instance.hypervisor)
6434 remote_info.Raise("Error checking instance on node %s" %
6435 instance.primary_node)
6436 instance_running = bool(remote_info.payload)
6437 if instance_running:
6438 self.current_mem = int(remote_info.payload["memory"])
6440 def _RunAllocator(self):
6441 """Run the allocator based on input opcode.
6444 assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC)
6446 # FIXME: add a self.ignore_ipolicy option
6447 req = iallocator.IAReqRelocate(name=self.instance_name,
6448 relocate_from=[self.instance.primary_node])
6449 ial = iallocator.IAllocator(self.cfg, self.rpc, req)
6451 ial.Run(self.lu.op.iallocator)
6454 raise errors.OpPrereqError("Can't compute nodes using"
6455 " iallocator '%s': %s" %
6456 (self.lu.op.iallocator, ial.info),
6458 self.target_node = ial.result[0]
6459 self.lu.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
6460 self.instance_name, self.lu.op.iallocator,
6461 utils.CommaJoin(ial.result))
6463 def _WaitUntilSync(self):
6464 """Poll with custom rpc for disk sync.
6466 This uses our own step-based rpc call.
6469 self.feedback_fn("* wait until resync is done")
6473 result = self.rpc.call_drbd_wait_sync(self.all_nodes,
6475 (self.instance.disks,
6478 for node, nres in result.items():
6479 nres.Raise("Cannot resync disks on node %s" % node)
6480 node_done, node_percent = nres.payload
6481 all_done = all_done and node_done
6482 if node_percent is not None:
6483 min_percent = min(min_percent, node_percent)
6485 if min_percent < 100:
6486 self.feedback_fn(" - progress: %.1f%%" % min_percent)
6489 def _EnsureSecondary(self, node):
6490 """Demote a node to secondary.
6493 self.feedback_fn("* switching node %s to secondary mode" % node)
6495 for dev in self.instance.disks:
6496 self.cfg.SetDiskID(dev, node)
6498 result = self.rpc.call_blockdev_close(node, self.instance.name,
6499 self.instance.disks)
6500 result.Raise("Cannot change disk to secondary on node %s" % node)
6502 def _GoStandalone(self):
6503 """Disconnect from the network.
6506 self.feedback_fn("* changing into standalone mode")
6507 result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
6508 self.instance.disks)
6509 for node, nres in result.items():
6510 nres.Raise("Cannot disconnect disks node %s" % node)
6512 def _GoReconnect(self, multimaster):
6513 """Reconnect to the network.
6519 msg = "single-master"
6520 self.feedback_fn("* changing disks into %s mode" % msg)
6521 result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
6522 (self.instance.disks, self.instance),
6523 self.instance.name, multimaster)
6524 for node, nres in result.items():
6525 nres.Raise("Cannot change disks config on node %s" % node)
6527 def _ExecCleanup(self):
6528 """Try to cleanup after a failed migration.
6530 The cleanup is done by:
6531 - check that the instance is running only on one node
6532 (and update the config if needed)
6533 - change disks on its secondary node to secondary
6534 - wait until disks are fully synchronized
6535 - disconnect from the network
6536 - change disks into single-master mode
6537 - wait again until disks are fully synchronized
6540 instance = self.instance
6541 target_node = self.target_node
6542 source_node = self.source_node
6544 # check running on only one node
6545 self.feedback_fn("* checking where the instance actually runs"
6546 " (if this hangs, the hypervisor might be in"
6548 ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
6549 for node, result in ins_l.items():
6550 result.Raise("Can't contact node %s" % node)
6552 runningon_source = instance.name in ins_l[source_node].payload
6553 runningon_target = instance.name in ins_l[target_node].payload
6555 if runningon_source and runningon_target:
6556 raise errors.OpExecError("Instance seems to be running on two nodes,"
6557 " or the hypervisor is confused; you will have"
6558 " to ensure manually that it runs only on one"
6559 " and restart this operation")
6561 if not (runningon_source or runningon_target):
6562 raise errors.OpExecError("Instance does not seem to be running at all;"
6563 " in this case it's safer to repair by"
6564 " running 'gnt-instance stop' to ensure disk"
6565 " shutdown, and then restarting it")
6567 if runningon_target:
6568 # the migration has actually succeeded, we need to update the config
6569 self.feedback_fn("* instance running on secondary node (%s),"
6570 " updating config" % target_node)
6571 instance.primary_node = target_node
6572 self.cfg.Update(instance, self.feedback_fn)
6573 demoted_node = source_node
6575 self.feedback_fn("* instance confirmed to be running on its"
6576 " primary node (%s)" % source_node)
6577 demoted_node = target_node
6579 if instance.disk_template in constants.DTS_INT_MIRROR:
6580 self._EnsureSecondary(demoted_node)
6582 self._WaitUntilSync()
6583 except errors.OpExecError:
6584 # we ignore here errors, since if the device is standalone, it
6585 # won't be able to sync
6587 self._GoStandalone()
6588 self._GoReconnect(False)
6589 self._WaitUntilSync()
6591 self.feedback_fn("* done")
6593 def _RevertDiskStatus(self):
6594 """Try to revert the disk status after a failed migration.
6597 target_node = self.target_node
6598 if self.instance.disk_template in constants.DTS_EXT_MIRROR:
6602 self._EnsureSecondary(target_node)
6603 self._GoStandalone()
6604 self._GoReconnect(False)
6605 self._WaitUntilSync()
6606 except errors.OpExecError, err:
6607 self.lu.LogWarning("Migration failed and I can't reconnect the drives,"
6608 " please try to recover the instance manually;"
6609 " error '%s'" % str(err))
6611 def _AbortMigration(self):
6612 """Call the hypervisor code to abort a started migration.
6615 instance = self.instance
6616 target_node = self.target_node
6617 source_node = self.source_node
6618 migration_info = self.migration_info
6620 abort_result = self.rpc.call_instance_finalize_migration_dst(target_node,
6624 abort_msg = abort_result.fail_msg
6626 logging.error("Aborting migration failed on target node %s: %s",
6627 target_node, abort_msg)
6628 # Don't raise an exception here, as we stil have to try to revert the
6629 # disk status, even if this step failed.
6631 abort_result = self.rpc.call_instance_finalize_migration_src(
6632 source_node, instance, False, self.live)
6633 abort_msg = abort_result.fail_msg
6635 logging.error("Aborting migration failed on source node %s: %s",
6636 source_node, abort_msg)
6638 def _ExecMigration(self):
6639 """Migrate an instance.
6641 The migrate is done by:
6642 - change the disks into dual-master mode
6643 - wait until disks are fully synchronized again
6644 - migrate the instance
6645 - change disks on the new secondary node (the old primary) to secondary
6646 - wait until disks are fully synchronized
6647 - change disks into single-master mode
6650 instance = self.instance
6651 target_node = self.target_node
6652 source_node = self.source_node
6654 # Check for hypervisor version mismatch and warn the user.
6655 nodeinfo = self.rpc.call_node_info([source_node, target_node],
6656 None, [self.instance.hypervisor], False)
6657 for ninfo in nodeinfo.values():
6658 ninfo.Raise("Unable to retrieve node information from node '%s'" %
6660 (_, _, (src_info, )) = nodeinfo[source_node].payload
6661 (_, _, (dst_info, )) = nodeinfo[target_node].payload
6663 if ((constants.HV_NODEINFO_KEY_VERSION in src_info) and
6664 (constants.HV_NODEINFO_KEY_VERSION in dst_info)):
6665 src_version = src_info[constants.HV_NODEINFO_KEY_VERSION]
6666 dst_version = dst_info[constants.HV_NODEINFO_KEY_VERSION]
6667 if src_version != dst_version:
6668 self.feedback_fn("* warning: hypervisor version mismatch between"
6669 " source (%s) and target (%s) node" %
6670 (src_version, dst_version))
6672 self.feedback_fn("* checking disk consistency between source and target")
6673 for (idx, dev) in enumerate(instance.disks):
6674 if not _CheckDiskConsistency(self.lu, instance, dev, target_node, False):
6675 raise errors.OpExecError("Disk %s is degraded or not fully"
6676 " synchronized on target node,"
6677 " aborting migration" % idx)
6679 if self.current_mem > self.tgt_free_mem:
6680 if not self.allow_runtime_changes:
6681 raise errors.OpExecError("Memory ballooning not allowed and not enough"
6682 " free memory to fit instance %s on target"
6683 " node %s (have %dMB, need %dMB)" %
6684 (instance.name, target_node,
6685 self.tgt_free_mem, self.current_mem))
6686 self.feedback_fn("* setting instance memory to %s" % self.tgt_free_mem)
6687 rpcres = self.rpc.call_instance_balloon_memory(instance.primary_node,
6690 rpcres.Raise("Cannot modify instance runtime memory")
6692 # First get the migration information from the remote node
6693 result = self.rpc.call_migration_info(source_node, instance)
6694 msg = result.fail_msg
6696 log_err = ("Failed fetching source migration information from %s: %s" %
6698 logging.error(log_err)
6699 raise errors.OpExecError(log_err)
6701 self.migration_info = migration_info = result.payload
6703 if self.instance.disk_template not in constants.DTS_EXT_MIRROR:
6704 # Then switch the disks to master/master mode
6705 self._EnsureSecondary(target_node)
6706 self._GoStandalone()
6707 self._GoReconnect(True)
6708 self._WaitUntilSync()
6710 self.feedback_fn("* preparing %s to accept the instance" % target_node)
6711 result = self.rpc.call_accept_instance(target_node,
6714 self.nodes_ip[target_node])
6716 msg = result.fail_msg
6718 logging.error("Instance pre-migration failed, trying to revert"
6719 " disk status: %s", msg)
6720 self.feedback_fn("Pre-migration failed, aborting")
6721 self._AbortMigration()
6722 self._RevertDiskStatus()
6723 raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
6724 (instance.name, msg))
6726 self.feedback_fn("* migrating instance to %s" % target_node)
6727 result = self.rpc.call_instance_migrate(source_node, instance,
6728 self.nodes_ip[target_node],
6730 msg = result.fail_msg
6732 logging.error("Instance migration failed, trying to revert"
6733 " disk status: %s", msg)
6734 self.feedback_fn("Migration failed, aborting")
6735 self._AbortMigration()
6736 self._RevertDiskStatus()
6737 raise errors.OpExecError("Could not migrate instance %s: %s" %
6738 (instance.name, msg))
6740 self.feedback_fn("* starting memory transfer")
6741 last_feedback = time.time()
6743 result = self.rpc.call_instance_get_migration_status(source_node,
6745 msg = result.fail_msg
6746 ms = result.payload # MigrationStatus instance
6747 if msg or (ms.status in constants.HV_MIGRATION_FAILED_STATUSES):
6748 logging.error("Instance migration failed, trying to revert"
6749 " disk status: %s", msg)
6750 self.feedback_fn("Migration failed, aborting")
6751 self._AbortMigration()
6752 self._RevertDiskStatus()
6754 msg = "hypervisor returned failure"
6755 raise errors.OpExecError("Could not migrate instance %s: %s" %
6756 (instance.name, msg))
6758 if result.payload.status != constants.HV_MIGRATION_ACTIVE:
6759 self.feedback_fn("* memory transfer complete")
6762 if (utils.TimeoutExpired(last_feedback,
6763 self._MIGRATION_FEEDBACK_INTERVAL) and
6764 ms.transferred_ram is not None):
6765 mem_progress = 100 * float(ms.transferred_ram) / float(ms.total_ram)
6766 self.feedback_fn("* memory transfer progress: %.2f %%" % mem_progress)
6767 last_feedback = time.time()
6769 time.sleep(self._MIGRATION_POLL_INTERVAL)
6771 result = self.rpc.call_instance_finalize_migration_src(source_node,
6775 msg = result.fail_msg
6777 logging.error("Instance migration succeeded, but finalization failed"
6778 " on the source node: %s", msg)
6779 raise errors.OpExecError("Could not finalize instance migration: %s" %
6782 instance.primary_node = target_node
6784 # distribute new instance config to the other nodes
6785 self.cfg.Update(instance, self.feedback_fn)
6787 result = self.rpc.call_instance_finalize_migration_dst(target_node,
6791 msg = result.fail_msg
6793 logging.error("Instance migration succeeded, but finalization failed"
6794 " on the target node: %s", msg)
6795 raise errors.OpExecError("Could not finalize instance migration: %s" %
6798 if self.instance.disk_template not in constants.DTS_EXT_MIRROR:
6799 self._EnsureSecondary(source_node)
6800 self._WaitUntilSync()
6801 self._GoStandalone()
6802 self._GoReconnect(False)
6803 self._WaitUntilSync()
6805 # If the instance's disk template is `rbd' or `ext' and there was a
6806 # successful migration, unmap the device from the source node.
6807 if self.instance.disk_template in (constants.DT_RBD, constants.DT_EXT):
6808 disks = _ExpandCheckDisks(instance, instance.disks)
6809 self.feedback_fn("* unmapping instance's disks from %s" % source_node)
6811 result = self.rpc.call_blockdev_shutdown(source_node, (disk, instance))
6812 msg = result.fail_msg
6814 logging.error("Migration was successful, but couldn't unmap the"
6815 " block device %s on source node %s: %s",
6816 disk.iv_name, source_node, msg)
6817 logging.error("You need to unmap the device %s manually on %s",
6818 disk.iv_name, source_node)
6820 self.feedback_fn("* done")
6822 def _ExecFailover(self):
6823 """Failover an instance.
6825 The failover is done by shutting it down on its present node and
6826 starting it on the secondary.
6829 instance = self.instance
6830 primary_node = self.cfg.GetNodeInfo(instance.primary_node)
6832 source_node = instance.primary_node
6833 target_node = self.target_node
6835 if instance.admin_state == constants.ADMINST_UP:
6836 self.feedback_fn("* checking disk consistency between source and target")
6837 for (idx, dev) in enumerate(instance.disks):
6838 # for drbd, these are drbd over lvm
6839 if not _CheckDiskConsistency(self.lu, instance, dev, target_node,
6841 if primary_node.offline:
6842 self.feedback_fn("Node %s is offline, ignoring degraded disk %s on"
6844 (primary_node.name, idx, target_node))
6845 elif not self.ignore_consistency:
6846 raise errors.OpExecError("Disk %s is degraded on target node,"
6847 " aborting failover" % idx)
6849 self.feedback_fn("* not checking disk consistency as instance is not"
6852 self.feedback_fn("* shutting down instance on source node")
6853 logging.info("Shutting down instance %s on node %s",
6854 instance.name, source_node)
6856 result = self.rpc.call_instance_shutdown(source_node, instance,
6857 self.shutdown_timeout,
6859 msg = result.fail_msg
6861 if self.ignore_consistency or primary_node.offline:
6862 self.lu.LogWarning("Could not shutdown instance %s on node %s,"
6863 " proceeding anyway; please make sure node"
6864 " %s is down; error details: %s",
6865 instance.name, source_node, source_node, msg)
6867 raise errors.OpExecError("Could not shutdown instance %s on"
6869 (instance.name, source_node, msg))
6871 self.feedback_fn("* deactivating the instance's disks on source node")
6872 if not _ShutdownInstanceDisks(self.lu, instance, ignore_primary=True):
6873 raise errors.OpExecError("Can't shut down the instance's disks")
6875 instance.primary_node = target_node
6876 # distribute new instance config to the other nodes
6877 self.cfg.Update(instance, self.feedback_fn)
6879 # Only start the instance if it's marked as up
6880 if instance.admin_state == constants.ADMINST_UP:
6881 self.feedback_fn("* activating the instance's disks on target node %s" %
6883 logging.info("Starting instance %s on node %s",
6884 instance.name, target_node)
6886 disks_ok, _ = _AssembleInstanceDisks(self.lu, instance,
6887 ignore_secondaries=True)
6889 _ShutdownInstanceDisks(self.lu, instance)
6890 raise errors.OpExecError("Can't activate the instance's disks")
6892 self.feedback_fn("* starting the instance on the target node %s" %
6894 result = self.rpc.call_instance_start(target_node, (instance, None, None),
6895 False, self.lu.op.reason)
6896 msg = result.fail_msg
6898 _ShutdownInstanceDisks(self.lu, instance)
6899 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
6900 (instance.name, target_node, msg))
6902 def Exec(self, feedback_fn):
6903 """Perform the migration.
6906 self.feedback_fn = feedback_fn
6907 self.source_node = self.instance.primary_node
6909 # FIXME: if we implement migrate-to-any in DRBD, this needs fixing
6910 if self.instance.disk_template in constants.DTS_INT_MIRROR:
6911 self.target_node = self.instance.secondary_nodes[0]
6912 # Otherwise self.target_node has been populated either
6913 # directly, or through an iallocator.
6915 self.all_nodes = [self.source_node, self.target_node]
6916 self.nodes_ip = dict((name, node.secondary_ip) for (name, node)
6917 in self.cfg.GetMultiNodeInfo(self.all_nodes))
6920 feedback_fn("Failover instance %s" % self.instance.name)
6921 self._ExecFailover()
6923 feedback_fn("Migrating instance %s" % self.instance.name)
6926 return self._ExecCleanup()
6928 return self._ExecMigration()
6931 def _BlockdevFind(lu, node, dev, instance):
6932 """Wrapper around call_blockdev_find to annotate diskparams.
6934 @param lu: A reference to the lu object
6935 @param node: The node to call out
6936 @param dev: The device to find
6937 @param instance: The instance object the device belongs to
6938 @returns The result of the rpc call
6941 (disk,) = _AnnotateDiskParams(instance, [dev], lu.cfg)
6942 return lu.rpc.call_blockdev_find(node, disk)
6945 class TLReplaceDisks(Tasklet):
6946 """Replaces disks for an instance.
6948 Note: Locking is not within the scope of this class.
6951 def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
6952 disks, early_release, ignore_ipolicy):
6953 """Initializes this class.
6956 Tasklet.__init__(self, lu)
6959 self.instance_name = instance_name
6961 self.iallocator_name = iallocator_name
6962 self.remote_node = remote_node
6964 self.early_release = early_release
6965 self.ignore_ipolicy = ignore_ipolicy
6968 self.instance = None
6969 self.new_node = None
6970 self.target_node = None
6971 self.other_node = None
6972 self.remote_node_info = None
6973 self.node_secondary_ip = None
6976 def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
6977 """Compute a new secondary node using an IAllocator.
6980 req = iallocator.IAReqRelocate(name=instance_name,
6981 relocate_from=list(relocate_from))
6982 ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
6984 ial.Run(iallocator_name)
6987 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
6988 " %s" % (iallocator_name, ial.info),
6991 remote_node_name = ial.result[0]
6993 lu.LogInfo("Selected new secondary for instance '%s': %s",
6994 instance_name, remote_node_name)
6996 return remote_node_name
6998 def _FindFaultyDisks(self, node_name):
6999 """Wrapper for L{_FindFaultyInstanceDisks}.
7002 return _FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
7005 def _CheckDisksActivated(self, instance):
7006 """Checks if the instance disks are activated.
7008 @param instance: The instance to check disks
7009 @return: True if they are activated, False otherwise
7012 nodes = instance.all_nodes
7014 for idx, dev in enumerate(instance.disks):
7016 self.lu.LogInfo("Checking disk/%d on %s", idx, node)
7017 self.cfg.SetDiskID(dev, node)
7019 result = _BlockdevFind(self, node, dev, instance)
7023 elif result.fail_msg or not result.payload:
7028 def CheckPrereq(self):
7029 """Check prerequisites.
7031 This checks that the instance is in the cluster.
7034 self.instance = instance = self.cfg.GetInstanceInfo(self.instance_name)
7035 assert instance is not None, \
7036 "Cannot retrieve locked instance %s" % self.instance_name
7038 if instance.disk_template != constants.DT_DRBD8:
7039 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
7040 " instances", errors.ECODE_INVAL)
7042 if len(instance.secondary_nodes) != 1:
7043 raise errors.OpPrereqError("The instance has a strange layout,"
7044 " expected one secondary but found %d" %
7045 len(instance.secondary_nodes),
7048 instance = self.instance
7049 secondary_node = instance.secondary_nodes[0]
7051 if self.iallocator_name is None:
7052 remote_node = self.remote_node
7054 remote_node = self._RunAllocator(self.lu, self.iallocator_name,
7055 instance.name, instance.secondary_nodes)
7057 if remote_node is None:
7058 self.remote_node_info = None
7060 assert remote_node in self.lu.owned_locks(locking.LEVEL_NODE), \
7061 "Remote node '%s' is not locked" % remote_node
7063 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
7064 assert self.remote_node_info is not None, \
7065 "Cannot retrieve locked node %s" % remote_node
7067 if remote_node == self.instance.primary_node:
7068 raise errors.OpPrereqError("The specified node is the primary node of"
7069 " the instance", errors.ECODE_INVAL)
7071 if remote_node == secondary_node:
7072 raise errors.OpPrereqError("The specified node is already the"
7073 " secondary node of the instance",
7076 if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
7077 constants.REPLACE_DISK_CHG):
7078 raise errors.OpPrereqError("Cannot specify disks to be replaced",
7081 if self.mode == constants.REPLACE_DISK_AUTO:
7082 if not self._CheckDisksActivated(instance):
7083 raise errors.OpPrereqError("Please run activate-disks on instance %s"
7084 " first" % self.instance_name,
7086 faulty_primary = self._FindFaultyDisks(instance.primary_node)
7087 faulty_secondary = self._FindFaultyDisks(secondary_node)
7089 if faulty_primary and faulty_secondary:
7090 raise errors.OpPrereqError("Instance %s has faulty disks on more than"
7091 " one node and can not be repaired"
7092 " automatically" % self.instance_name,
7096 self.disks = faulty_primary
7097 self.target_node = instance.primary_node
7098 self.other_node = secondary_node
7099 check_nodes = [self.target_node, self.other_node]
7100 elif faulty_secondary:
7101 self.disks = faulty_secondary
7102 self.target_node = secondary_node
7103 self.other_node = instance.primary_node
7104 check_nodes = [self.target_node, self.other_node]
7110 # Non-automatic modes
7111 if self.mode == constants.REPLACE_DISK_PRI:
7112 self.target_node = instance.primary_node
7113 self.other_node = secondary_node
7114 check_nodes = [self.target_node, self.other_node]
7116 elif self.mode == constants.REPLACE_DISK_SEC:
7117 self.target_node = secondary_node
7118 self.other_node = instance.primary_node
7119 check_nodes = [self.target_node, self.other_node]
7121 elif self.mode == constants.REPLACE_DISK_CHG:
7122 self.new_node = remote_node
7123 self.other_node = instance.primary_node
7124 self.target_node = secondary_node
7125 check_nodes = [self.new_node, self.other_node]
7127 _CheckNodeNotDrained(self.lu, remote_node)
7128 _CheckNodeVmCapable(self.lu, remote_node)
7130 old_node_info = self.cfg.GetNodeInfo(secondary_node)
7131 assert old_node_info is not None
7132 if old_node_info.offline and not self.early_release:
7133 # doesn't make sense to delay the release
7134 self.early_release = True
7135 self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
7136 " early-release mode", secondary_node)
7139 raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
7142 # If not specified all disks should be replaced
7144 self.disks = range(len(self.instance.disks))
7146 # TODO: This is ugly, but right now we can't distinguish between internal
7147 # submitted opcode and external one. We should fix that.
7148 if self.remote_node_info:
7149 # We change the node, lets verify it still meets instance policy
7150 new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
7151 cluster = self.cfg.GetClusterInfo()
7152 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
7154 _CheckTargetNodeIPolicy(self, ipolicy, instance, self.remote_node_info,
7155 self.cfg, ignore=self.ignore_ipolicy)
7157 for node in check_nodes:
7158 _CheckNodeOnline(self.lu, node)
7160 touched_nodes = frozenset(node_name for node_name in [self.new_node,
7163 if node_name is not None)
7165 # Release unneeded node and node resource locks
7166 _ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
7167 _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
7168 _ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
7170 # Release any owned node group
7171 _ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
7173 # Check whether disks are valid
7174 for disk_idx in self.disks:
7175 instance.FindDisk(disk_idx)
7177 # Get secondary node IP addresses
7178 self.node_secondary_ip = dict((name, node.secondary_ip) for (name, node)
7179 in self.cfg.GetMultiNodeInfo(touched_nodes))
7181 def Exec(self, feedback_fn):
7182 """Execute disk replacement.
7184 This dispatches the disk replacement to the appropriate handler.
7188 # Verify owned locks before starting operation
7189 owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
7190 assert set(owned_nodes) == set(self.node_secondary_ip), \
7191 ("Incorrect node locks, owning %s, expected %s" %
7192 (owned_nodes, self.node_secondary_ip.keys()))
7193 assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
7194 self.lu.owned_locks(locking.LEVEL_NODE_RES))
7195 assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
7197 owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
7198 assert list(owned_instances) == [self.instance_name], \
7199 "Instance '%s' not locked" % self.instance_name
7201 assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
7202 "Should not own any node group lock at this point"
7205 feedback_fn("No disks need replacement for instance '%s'" %
7209 feedback_fn("Replacing disk(s) %s for instance '%s'" %
7210 (utils.CommaJoin(self.disks), self.instance.name))
7211 feedback_fn("Current primary node: %s" % self.instance.primary_node)
7212 feedback_fn("Current seconary node: %s" %
7213 utils.CommaJoin(self.instance.secondary_nodes))
7215 activate_disks = (self.instance.admin_state != constants.ADMINST_UP)
7217 # Activate the instance disks if we're replacing them on a down instance
7219 _StartInstanceDisks(self.lu, self.instance, True)
7222 # Should we replace the secondary node?
7223 if self.new_node is not None:
7224 fn = self._ExecDrbd8Secondary
7226 fn = self._ExecDrbd8DiskOnly
7228 result = fn(feedback_fn)
7230 # Deactivate the instance disks if we're replacing them on a
7233 _SafeShutdownInstanceDisks(self.lu, self.instance)
7235 assert not self.lu.owned_locks(locking.LEVEL_NODE)
7238 # Verify owned locks
7239 owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
7240 nodes = frozenset(self.node_secondary_ip)
7241 assert ((self.early_release and not owned_nodes) or
7242 (not self.early_release and not (set(owned_nodes) - nodes))), \
7243 ("Not owning the correct locks, early_release=%s, owned=%r,"
7244 " nodes=%r" % (self.early_release, owned_nodes, nodes))
7248 def _CheckVolumeGroup(self, nodes):
7249 self.lu.LogInfo("Checking volume groups")
7251 vgname = self.cfg.GetVGName()
7253 # Make sure volume group exists on all involved nodes
7254 results = self.rpc.call_vg_list(nodes)
7256 raise errors.OpExecError("Can't list volume groups on the nodes")
7260 res.Raise("Error checking node %s" % node)
7261 if vgname not in res.payload:
7262 raise errors.OpExecError("Volume group '%s' not found on node %s" %
7265 def _CheckDisksExistence(self, nodes):
7266 # Check disk existence
7267 for idx, dev in enumerate(self.instance.disks):
7268 if idx not in self.disks:
7272 self.lu.LogInfo("Checking disk/%d on %s", idx, node)
7273 self.cfg.SetDiskID(dev, node)
7275 result = _BlockdevFind(self, node, dev, self.instance)
7277 msg = result.fail_msg
7278 if msg or not result.payload:
7280 msg = "disk not found"
7281 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
7284 def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
7285 for idx, dev in enumerate(self.instance.disks):
7286 if idx not in self.disks:
7289 self.lu.LogInfo("Checking disk/%d consistency on node %s" %
7292 if not _CheckDiskConsistency(self.lu, self.instance, dev, node_name,
7293 on_primary, ldisk=ldisk):
7294 raise errors.OpExecError("Node %s has degraded storage, unsafe to"
7295 " replace disks for instance %s" %
7296 (node_name, self.instance.name))
7298 def _CreateNewStorage(self, node_name):
7299 """Create new storage on the primary or secondary node.
7301 This is only used for same-node replaces, not for changing the
7302 secondary node, hence we don't want to modify the existing disk.
7307 disks = _AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
7308 for idx, dev in enumerate(disks):
7309 if idx not in self.disks:
7312 self.lu.LogInfo("Adding storage on %s for disk/%d", node_name, idx)
7314 self.cfg.SetDiskID(dev, node_name)
7316 lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
7317 names = _GenerateUniqueNames(self.lu, lv_names)
7319 (data_disk, meta_disk) = dev.children
7320 vg_data = data_disk.logical_id[0]
7321 lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
7322 logical_id=(vg_data, names[0]),
7323 params=data_disk.params)
7324 vg_meta = meta_disk.logical_id[0]
7325 lv_meta = objects.Disk(dev_type=constants.LD_LV,
7326 size=constants.DRBD_META_SIZE,
7327 logical_id=(vg_meta, names[1]),
7328 params=meta_disk.params)
7330 new_lvs = [lv_data, lv_meta]
7331 old_lvs = [child.Copy() for child in dev.children]
7332 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
7333 excl_stor = _IsExclusiveStorageEnabledNodeName(self.lu.cfg, node_name)
7335 # we pass force_create=True to force the LVM creation
7336 for new_lv in new_lvs:
7337 _CreateBlockDevInner(self.lu, node_name, self.instance, new_lv, True,
7338 _GetInstanceInfoText(self.instance), False,
7343 def _CheckDevices(self, node_name, iv_names):
7344 for name, (dev, _, _) in iv_names.iteritems():
7345 self.cfg.SetDiskID(dev, node_name)
7347 result = _BlockdevFind(self, node_name, dev, self.instance)
7349 msg = result.fail_msg
7350 if msg or not result.payload:
7352 msg = "disk not found"
7353 raise errors.OpExecError("Can't find DRBD device %s: %s" %
7356 if result.payload.is_degraded:
7357 raise errors.OpExecError("DRBD device %s is degraded!" % name)
7359 def _RemoveOldStorage(self, node_name, iv_names):
7360 for name, (_, old_lvs, _) in iv_names.iteritems():
7361 self.lu.LogInfo("Remove logical volumes for %s", name)
7364 self.cfg.SetDiskID(lv, node_name)
7366 msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
7368 self.lu.LogWarning("Can't remove old LV: %s", msg,
7369 hint="remove unused LVs manually")
7371 def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
7372 """Replace a disk on the primary or secondary for DRBD 8.
7374 The algorithm for replace is quite complicated:
7376 1. for each disk to be replaced:
7378 1. create new LVs on the target node with unique names
7379 1. detach old LVs from the drbd device
7380 1. rename old LVs to name_replaced.<time_t>
7381 1. rename new LVs to old LVs
7382 1. attach the new LVs (with the old names now) to the drbd device
7384 1. wait for sync across all devices
7386 1. for each modified disk:
7388 1. remove old LVs (which have the name name_replaces.<time_t>)
7390 Failures are not very well handled.
7395 # Step: check device activation
7396 self.lu.LogStep(1, steps_total, "Check device existence")
7397 self._CheckDisksExistence([self.other_node, self.target_node])
7398 self._CheckVolumeGroup([self.target_node, self.other_node])
7400 # Step: check other node consistency
7401 self.lu.LogStep(2, steps_total, "Check peer consistency")
7402 self._CheckDisksConsistency(self.other_node,
7403 self.other_node == self.instance.primary_node,
7406 # Step: create new storage
7407 self.lu.LogStep(3, steps_total, "Allocate new storage")
7408 iv_names = self._CreateNewStorage(self.target_node)
7410 # Step: for each lv, detach+rename*2+attach
7411 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
7412 for dev, old_lvs, new_lvs in iv_names.itervalues():
7413 self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name)
7415 result = self.rpc.call_blockdev_removechildren(self.target_node, dev,
7417 result.Raise("Can't detach drbd from local storage on node"
7418 " %s for device %s" % (self.target_node, dev.iv_name))
7420 #cfg.Update(instance)
7422 # ok, we created the new LVs, so now we know we have the needed
7423 # storage; as such, we proceed on the target node to rename
7424 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
7425 # using the assumption that logical_id == physical_id (which in
7426 # turn is the unique_id on that node)
7428 # FIXME(iustin): use a better name for the replaced LVs
7429 temp_suffix = int(time.time())
7430 ren_fn = lambda d, suff: (d.physical_id[0],
7431 d.physical_id[1] + "_replaced-%s" % suff)
7433 # Build the rename list based on what LVs exist on the node
7434 rename_old_to_new = []
7435 for to_ren in old_lvs:
7436 result = self.rpc.call_blockdev_find(self.target_node, to_ren)
7437 if not result.fail_msg and result.payload:
7439 rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
7441 self.lu.LogInfo("Renaming the old LVs on the target node")
7442 result = self.rpc.call_blockdev_rename(self.target_node,
7444 result.Raise("Can't rename old LVs on node %s" % self.target_node)
7446 # Now we rename the new LVs to the old LVs
7447 self.lu.LogInfo("Renaming the new LVs on the target node")
7448 rename_new_to_old = [(new, old.physical_id)
7449 for old, new in zip(old_lvs, new_lvs)]
7450 result = self.rpc.call_blockdev_rename(self.target_node,
7452 result.Raise("Can't rename new LVs on node %s" % self.target_node)
7454 # Intermediate steps of in memory modifications
7455 for old, new in zip(old_lvs, new_lvs):
7456 new.logical_id = old.logical_id
7457 self.cfg.SetDiskID(new, self.target_node)
7459 # We need to modify old_lvs so that removal later removes the
7460 # right LVs, not the newly added ones; note that old_lvs is a
7462 for disk in old_lvs:
7463 disk.logical_id = ren_fn(disk, temp_suffix)
7464 self.cfg.SetDiskID(disk, self.target_node)
7466 # Now that the new lvs have the old name, we can add them to the device
7467 self.lu.LogInfo("Adding new mirror component on %s", self.target_node)
7468 result = self.rpc.call_blockdev_addchildren(self.target_node,
7469 (dev, self.instance), new_lvs)
7470 msg = result.fail_msg
7472 for new_lv in new_lvs:
7473 msg2 = self.rpc.call_blockdev_remove(self.target_node,
7476 self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
7477 hint=("cleanup manually the unused logical"
7479 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
7481 cstep = itertools.count(5)
7483 if self.early_release:
7484 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
7485 self._RemoveOldStorage(self.target_node, iv_names)
7486 # TODO: Check if releasing locks early still makes sense
7487 _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
7489 # Release all resource locks except those used by the instance
7490 _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
7491 keep=self.node_secondary_ip.keys())
7493 # Release all node locks while waiting for sync
7494 _ReleaseLocks(self.lu, locking.LEVEL_NODE)
7496 # TODO: Can the instance lock be downgraded here? Take the optional disk
7497 # shutdown in the caller into consideration.
7500 # This can fail as the old devices are degraded and _WaitForSync
7501 # does a combined result over all disks, so we don't check its return value
7502 self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
7503 _WaitForSync(self.lu, self.instance)
7505 # Check all devices manually
7506 self._CheckDevices(self.instance.primary_node, iv_names)
7508 # Step: remove old storage
7509 if not self.early_release:
7510 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
7511 self._RemoveOldStorage(self.target_node, iv_names)
7513 def _ExecDrbd8Secondary(self, feedback_fn):
7514 """Replace the secondary node for DRBD 8.
7516 The algorithm for replace is quite complicated:
7517 - for all disks of the instance:
7518 - create new LVs on the new node with same names
7519 - shutdown the drbd device on the old secondary
7520 - disconnect the drbd network on the primary
7521 - create the drbd device on the new secondary
7522 - network attach the drbd on the primary, using an artifice:
7523 the drbd code for Attach() will connect to the network if it
7524 finds a device which is connected to the good local disks but
7526 - wait for sync across all devices
7527 - remove all disks from the old secondary
7529 Failures are not very well handled.
7534 pnode = self.instance.primary_node
7536 # Step: check device activation
7537 self.lu.LogStep(1, steps_total, "Check device existence")
7538 self._CheckDisksExistence([self.instance.primary_node])
7539 self._CheckVolumeGroup([self.instance.primary_node])
7541 # Step: check other node consistency
7542 self.lu.LogStep(2, steps_total, "Check peer consistency")
7543 self._CheckDisksConsistency(self.instance.primary_node, True, True)
7545 # Step: create new storage
7546 self.lu.LogStep(3, steps_total, "Allocate new storage")
7547 disks = _AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
7548 excl_stor = _IsExclusiveStorageEnabledNodeName(self.lu.cfg, self.new_node)
7549 for idx, dev in enumerate(disks):
7550 self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
7551 (self.new_node, idx))
7552 # we pass force_create=True to force LVM creation
7553 for new_lv in dev.children:
7554 _CreateBlockDevInner(self.lu, self.new_node, self.instance, new_lv,
7555 True, _GetInstanceInfoText(self.instance), False,
7558 # Step 4: dbrd minors and drbd setups changes
7559 # after this, we must manually remove the drbd minors on both the
7560 # error and the success paths
7561 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
7562 minors = self.cfg.AllocateDRBDMinor([self.new_node
7563 for dev in self.instance.disks],
7565 logging.debug("Allocated minors %r", minors)
7568 for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
7569 self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
7570 (self.new_node, idx))
7571 # create new devices on new_node; note that we create two IDs:
7572 # one without port, so the drbd will be activated without
7573 # networking information on the new node at this stage, and one
7574 # with network, for the latter activation in step 4
7575 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
7576 if self.instance.primary_node == o_node1:
7579 assert self.instance.primary_node == o_node2, "Three-node instance?"
7582 new_alone_id = (self.instance.primary_node, self.new_node, None,
7583 p_minor, new_minor, o_secret)
7584 new_net_id = (self.instance.primary_node, self.new_node, o_port,
7585 p_minor, new_minor, o_secret)
7587 iv_names[idx] = (dev, dev.children, new_net_id)
7588 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
7590 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
7591 logical_id=new_alone_id,
7592 children=dev.children,
7595 (anno_new_drbd,) = _AnnotateDiskParams(self.instance, [new_drbd],
7598 _CreateSingleBlockDev(self.lu, self.new_node, self.instance,
7600 _GetInstanceInfoText(self.instance), False,
7602 except errors.GenericError:
7603 self.cfg.ReleaseDRBDMinors(self.instance.name)
7606 # We have new devices, shutdown the drbd on the old secondary
7607 for idx, dev in enumerate(self.instance.disks):
7608 self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx)
7609 self.cfg.SetDiskID(dev, self.target_node)
7610 msg = self.rpc.call_blockdev_shutdown(self.target_node,
7611 (dev, self.instance)).fail_msg
7613 self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
7614 "node: %s" % (idx, msg),
7615 hint=("Please cleanup this device manually as"
7616 " soon as possible"))
7618 self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
7619 result = self.rpc.call_drbd_disconnect_net([pnode], self.node_secondary_ip,
7620 self.instance.disks)[pnode]
7622 msg = result.fail_msg
7624 # detaches didn't succeed (unlikely)
7625 self.cfg.ReleaseDRBDMinors(self.instance.name)
7626 raise errors.OpExecError("Can't detach the disks from the network on"
7627 " old node: %s" % (msg,))
7629 # if we managed to detach at least one, we update all the disks of
7630 # the instance to point to the new secondary
7631 self.lu.LogInfo("Updating instance configuration")
7632 for dev, _, new_logical_id in iv_names.itervalues():
7633 dev.logical_id = new_logical_id
7634 self.cfg.SetDiskID(dev, self.instance.primary_node)
7636 self.cfg.Update(self.instance, feedback_fn)
7638 # Release all node locks (the configuration has been updated)
7639 _ReleaseLocks(self.lu, locking.LEVEL_NODE)
7641 # and now perform the drbd attach
7642 self.lu.LogInfo("Attaching primary drbds to new secondary"
7643 " (standalone => connected)")
7644 result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
7646 self.node_secondary_ip,
7647 (self.instance.disks, self.instance),
7650 for to_node, to_result in result.items():
7651 msg = to_result.fail_msg
7653 self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
7655 hint=("please do a gnt-instance info to see the"
7656 " status of disks"))
7658 cstep = itertools.count(5)
7660 if self.early_release:
7661 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
7662 self._RemoveOldStorage(self.target_node, iv_names)
7663 # TODO: Check if releasing locks early still makes sense
7664 _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
7666 # Release all resource locks except those used by the instance
7667 _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
7668 keep=self.node_secondary_ip.keys())
7670 # TODO: Can the instance lock be downgraded here? Take the optional disk
7671 # shutdown in the caller into consideration.
7674 # This can fail as the old devices are degraded and _WaitForSync
7675 # does a combined result over all disks, so we don't check its return value
7676 self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
7677 _WaitForSync(self.lu, self.instance)
7679 # Check all devices manually
7680 self._CheckDevices(self.instance.primary_node, iv_names)
7682 # Step: remove old storage
7683 if not self.early_release:
7684 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
7685 self._RemoveOldStorage(self.target_node, iv_names)