4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Logical units dealing with instances."""
32 from ganeti import compat
33 from ganeti import constants
34 from ganeti import errors
36 from ganeti import hypervisor
37 from ganeti import locking
38 from ganeti.masterd import iallocator
39 from ganeti import masterd
40 from ganeti import netutils
41 from ganeti import objects
42 from ganeti import opcodes
43 from ganeti import pathutils
44 from ganeti import qlang
45 from ganeti import rpc
46 from ganeti import utils
47 from ganeti import query
49 from ganeti.cmdlib.base import NoHooksLU, LogicalUnit, _QueryBase, \
50 ResultWithJobs, Tasklet
52 from ganeti.cmdlib.common import INSTANCE_ONLINE, INSTANCE_DOWN, \
53 INSTANCE_NOT_RUNNING, CAN_CHANGE_INSTANCE_OFFLINE, _CheckNodeOnline, \
54 _ShareAll, _GetDefaultIAllocator, _CheckInstanceNodeGroups, \
55 _LoadNodeEvacResult, _CheckIAllocatorOrNode, _CheckParamsNotGlobal, \
56 _IsExclusiveStorageEnabledNode, _CheckHVParams, _CheckOSParams, \
57 _GetWantedInstances, _CheckInstancesNodeGroups, _AnnotateDiskParams, \
58 _GetUpdatedParams, _ExpandInstanceName, _ComputeIPolicySpecViolation, \
59 _CheckInstanceState, _ExpandNodeName
60 from ganeti.cmdlib.instance_storage import _CreateDisks, \
61 _CheckNodesFreeDiskPerVG, _WipeDisks, _WaitForSync, _CheckDiskConsistency, \
62 _IsExclusiveStorageEnabledNodeName, _CreateSingleBlockDev, _ComputeDisks, \
63 _CheckRADOSFreeSpace, _ComputeDiskSizePerVG, _GenerateDiskTemplate, \
64 _CreateBlockDev, _StartInstanceDisks, _ShutdownInstanceDisks, \
65 _AssembleInstanceDisks, _ExpandCheckDisks
66 from ganeti.cmdlib.instance_utils import _BuildInstanceHookEnvByObject, \
67 _GetClusterDomainSecret, _BuildInstanceHookEnv, _NICListToTuple, \
68 _NICToTuple, _CheckNodeNotDrained, _RemoveInstance, _CopyLockList, \
69 _ReleaseLocks, _CheckNodeVmCapable, _CheckTargetNodeIPolicy, \
70 _GetInstanceInfoText, _RemoveDisks
72 import ganeti.masterd.instance
75 #: Type description for changes as returned by L{ApplyContainerMods}'s
77 _TApplyContModsCbChanges = \
78 ht.TMaybeListOf(ht.TAnd(ht.TIsLength(2), ht.TItems([
84 def _CheckHostnameSane(lu, name):
85 """Ensures that a given hostname resolves to a 'sane' name.
87 The given name is required to be a prefix of the resolved hostname,
88 to prevent accidental mismatches.
90 @param lu: the logical unit on behalf of which we're checking
91 @param name: the name we should resolve and check
92 @return: the resolved hostname object
95 hostname = netutils.GetHostname(name=name)
96 if hostname.name != name:
97 lu.LogInfo("Resolved given name '%s' to '%s'", name, hostname.name)
98 if not utils.MatchNameComponent(name, [hostname.name]):
99 raise errors.OpPrereqError(("Resolved hostname '%s' does not look the"
100 " same as given hostname '%s'") %
101 (hostname.name, name), errors.ECODE_INVAL)
105 def _CheckOpportunisticLocking(op):
106 """Generate error if opportunistic locking is not possible.
109 if op.opportunistic_locking and not op.iallocator:
110 raise errors.OpPrereqError("Opportunistic locking is only available in"
111 " combination with an instance allocator",
115 def _CreateInstanceAllocRequest(op, disks, nics, beparams, node_whitelist):
116 """Wrapper around IAReqInstanceAlloc.
118 @param op: The instance opcode
119 @param disks: The computed disks
120 @param nics: The computed nics
121 @param beparams: The full filled beparams
122 @param node_whitelist: List of nodes which should appear as online to the
123 allocator (unless the node is already marked offline)
125 @returns: A filled L{iallocator.IAReqInstanceAlloc}
128 spindle_use = beparams[constants.BE_SPINDLE_USE]
129 return iallocator.IAReqInstanceAlloc(name=op.instance_name,
130 disk_template=op.disk_template,
133 vcpus=beparams[constants.BE_VCPUS],
134 memory=beparams[constants.BE_MAXMEM],
135 spindle_use=spindle_use,
137 nics=[n.ToDict() for n in nics],
138 hypervisor=op.hypervisor,
139 node_whitelist=node_whitelist)
142 def _ComputeFullBeParams(op, cluster):
143 """Computes the full beparams.
145 @param op: The instance opcode
146 @param cluster: The cluster config object
148 @return: The fully filled beparams
151 default_beparams = cluster.beparams[constants.PP_DEFAULT]
152 for param, value in op.beparams.iteritems():
153 if value == constants.VALUE_AUTO:
154 op.beparams[param] = default_beparams[param]
155 objects.UpgradeBeParams(op.beparams)
156 utils.ForceDictType(op.beparams, constants.BES_PARAMETER_TYPES)
157 return cluster.SimpleFillBE(op.beparams)
160 def _ComputeNics(op, cluster, default_ip, cfg, ec_id):
161 """Computes the nics.
163 @param op: The instance opcode
164 @param cluster: Cluster configuration object
165 @param default_ip: The default ip to assign
166 @param cfg: An instance of the configuration object
167 @param ec_id: Execution context ID
169 @returns: The build up nics
174 nic_mode_req = nic.get(constants.INIC_MODE, None)
175 nic_mode = nic_mode_req
176 if nic_mode is None or nic_mode == constants.VALUE_AUTO:
177 nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
179 net = nic.get(constants.INIC_NETWORK, None)
180 link = nic.get(constants.NIC_LINK, None)
181 ip = nic.get(constants.INIC_IP, None)
183 if net is None or net.lower() == constants.VALUE_NONE:
186 if nic_mode_req is not None or link is not None:
187 raise errors.OpPrereqError("If network is given, no mode or link"
188 " is allowed to be passed",
192 if ip is None or ip.lower() == constants.VALUE_NONE:
194 elif ip.lower() == constants.VALUE_AUTO:
195 if not op.name_check:
196 raise errors.OpPrereqError("IP address set to auto but name checks"
197 " have been skipped",
201 # We defer pool operations until later, so that the iallocator has
202 # filled in the instance's node(s) dimara
203 if ip.lower() == constants.NIC_IP_POOL:
205 raise errors.OpPrereqError("if ip=pool, parameter network"
206 " must be passed too",
209 elif not netutils.IPAddress.IsValid(ip):
210 raise errors.OpPrereqError("Invalid IP address '%s'" % ip,
215 # TODO: check the ip address for uniqueness
216 if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
217 raise errors.OpPrereqError("Routed nic mode requires an ip address",
220 # MAC address verification
221 mac = nic.get(constants.INIC_MAC, constants.VALUE_AUTO)
222 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
223 mac = utils.NormalizeAndValidateMac(mac)
226 # TODO: We need to factor this out
227 cfg.ReserveMAC(mac, ec_id)
228 except errors.ReservationError:
229 raise errors.OpPrereqError("MAC address %s already in use"
231 errors.ECODE_NOTUNIQUE)
233 # Build nic parameters
236 nicparams[constants.NIC_MODE] = nic_mode
238 nicparams[constants.NIC_LINK] = link
240 check_params = cluster.SimpleFillNIC(nicparams)
241 objects.NIC.CheckParameterSyntax(check_params)
242 net_uuid = cfg.LookupNetwork(net)
243 name = nic.get(constants.INIC_NAME, None)
244 if name is not None and name.lower() == constants.VALUE_NONE:
246 nic_obj = objects.NIC(mac=mac, ip=nic_ip, name=name,
247 network=net_uuid, nicparams=nicparams)
248 nic_obj.uuid = cfg.GenerateUniqueID(ec_id)
254 def _CheckForConflictingIp(lu, ip, node):
255 """In case of conflicting IP address raise error.
258 @param ip: IP address
260 @param node: node name
263 (conf_net, _) = lu.cfg.CheckIPInNodeGroup(ip, node)
264 if conf_net is not None:
265 raise errors.OpPrereqError(("The requested IP address (%s) belongs to"
266 " network %s, but the target NIC does not." %
273 def _ComputeIPolicyInstanceSpecViolation(
274 ipolicy, instance_spec, disk_template,
275 _compute_fn=_ComputeIPolicySpecViolation):
276 """Compute if instance specs meets the specs of ipolicy.
279 @param ipolicy: The ipolicy to verify against
280 @param instance_spec: dict
281 @param instance_spec: The instance spec to verify
282 @type disk_template: string
283 @param disk_template: the disk template of the instance
284 @param _compute_fn: The function to verify ipolicy (unittest only)
285 @see: L{_ComputeIPolicySpecViolation}
288 mem_size = instance_spec.get(constants.ISPEC_MEM_SIZE, None)
289 cpu_count = instance_spec.get(constants.ISPEC_CPU_COUNT, None)
290 disk_count = instance_spec.get(constants.ISPEC_DISK_COUNT, 0)
291 disk_sizes = instance_spec.get(constants.ISPEC_DISK_SIZE, [])
292 nic_count = instance_spec.get(constants.ISPEC_NIC_COUNT, 0)
293 spindle_use = instance_spec.get(constants.ISPEC_SPINDLE_USE, None)
295 return _compute_fn(ipolicy, mem_size, cpu_count, disk_count, nic_count,
296 disk_sizes, spindle_use, disk_template)
299 def _CheckOSVariant(os_obj, name):
300 """Check whether an OS name conforms to the os variants specification.
302 @type os_obj: L{objects.OS}
303 @param os_obj: OS object to check
305 @param name: OS name passed by the user, to check for validity
308 variant = objects.OS.GetVariant(name)
309 if not os_obj.supported_variants:
311 raise errors.OpPrereqError("OS '%s' doesn't support variants ('%s'"
312 " passed)" % (os_obj.name, variant),
316 raise errors.OpPrereqError("OS name must include a variant",
319 if variant not in os_obj.supported_variants:
320 raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
323 def _CheckNodeHasOS(lu, node, os_name, force_variant):
324 """Ensure that a node supports a given OS.
326 @param lu: the LU on behalf of which we make the check
327 @param node: the node to check
328 @param os_name: the OS to query about
329 @param force_variant: whether to ignore variant errors
330 @raise errors.OpPrereqError: if the node is not supporting the OS
333 result = lu.rpc.call_os_get(node, os_name)
334 result.Raise("OS '%s' not in supported OS list for node %s" %
336 prereq=True, ecode=errors.ECODE_INVAL)
337 if not force_variant:
338 _CheckOSVariant(result.payload, os_name)
341 def _CheckNicsBridgesExist(lu, target_nics, target_node):
342 """Check that the brigdes needed by a list of nics exist.
345 cluster = lu.cfg.GetClusterInfo()
346 paramslist = [cluster.SimpleFillNIC(nic.nicparams) for nic in target_nics]
347 brlist = [params[constants.NIC_LINK] for params in paramslist
348 if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
350 result = lu.rpc.call_bridges_exist(target_node, brlist)
351 result.Raise("Error checking bridges on destination node '%s'" %
352 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
355 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
356 """Checks if a node has enough free memory.
358 This function checks if a given node has the needed amount of free
359 memory. In case the node has less memory or we cannot get the
360 information from the node, this function raises an OpPrereqError
363 @type lu: C{LogicalUnit}
364 @param lu: a logical unit from which we get configuration data
366 @param node: the node to check
368 @param reason: string to use in the error message
369 @type requested: C{int}
370 @param requested: the amount of memory in MiB to check for
371 @type hypervisor_name: C{str}
372 @param hypervisor_name: the hypervisor to ask for memory stats
374 @return: node current free memory
375 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
376 we cannot check the node
379 nodeinfo = lu.rpc.call_node_info([node], None, [hypervisor_name], False)
380 nodeinfo[node].Raise("Can't get data from node %s" % node,
381 prereq=True, ecode=errors.ECODE_ENVIRON)
382 (_, _, (hv_info, )) = nodeinfo[node].payload
384 free_mem = hv_info.get("memory_free", None)
385 if not isinstance(free_mem, int):
386 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
387 " was '%s'" % (node, free_mem),
388 errors.ECODE_ENVIRON)
389 if requested > free_mem:
390 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
391 " needed %s MiB, available %s MiB" %
392 (node, reason, requested, free_mem),
397 class LUInstanceCreate(LogicalUnit):
398 """Create an instance.
401 HPATH = "instance-add"
402 HTYPE = constants.HTYPE_INSTANCE
405 def CheckArguments(self):
409 # do not require name_check to ease forward/backward compatibility
411 if self.op.no_install and self.op.start:
412 self.LogInfo("No-installation mode selected, disabling startup")
413 self.op.start = False
414 # validate/normalize the instance name
415 self.op.instance_name = \
416 netutils.Hostname.GetNormalizedName(self.op.instance_name)
418 if self.op.ip_check and not self.op.name_check:
419 # TODO: make the ip check more flexible and not depend on the name check
420 raise errors.OpPrereqError("Cannot do IP address check without a name"
421 " check", errors.ECODE_INVAL)
423 # check nics' parameter names
424 for nic in self.op.nics:
425 utils.ForceDictType(nic, constants.INIC_PARAMS_TYPES)
426 # check that NIC's parameters names are unique and valid
427 utils.ValidateDeviceNames("NIC", self.op.nics)
429 # check that disk's names are unique and valid
430 utils.ValidateDeviceNames("disk", self.op.disks)
432 cluster = self.cfg.GetClusterInfo()
433 if not self.op.disk_template in cluster.enabled_disk_templates:
434 raise errors.OpPrereqError("Cannot create an instance with disk template"
435 " '%s', because it is not enabled in the"
436 " cluster. Enabled disk templates are: %s." %
437 (self.op.disk_template,
438 ",".join(cluster.enabled_disk_templates)))
440 # check disks. parameter names and consistent adopt/no-adopt strategy
441 has_adopt = has_no_adopt = False
442 for disk in self.op.disks:
443 if self.op.disk_template != constants.DT_EXT:
444 utils.ForceDictType(disk, constants.IDISK_PARAMS_TYPES)
445 if constants.IDISK_ADOPT in disk:
449 if has_adopt and has_no_adopt:
450 raise errors.OpPrereqError("Either all disks are adopted or none is",
453 if self.op.disk_template not in constants.DTS_MAY_ADOPT:
454 raise errors.OpPrereqError("Disk adoption is not supported for the"
455 " '%s' disk template" %
456 self.op.disk_template,
458 if self.op.iallocator is not None:
459 raise errors.OpPrereqError("Disk adoption not allowed with an"
460 " iallocator script", errors.ECODE_INVAL)
461 if self.op.mode == constants.INSTANCE_IMPORT:
462 raise errors.OpPrereqError("Disk adoption not allowed for"
463 " instance import", errors.ECODE_INVAL)
465 if self.op.disk_template in constants.DTS_MUST_ADOPT:
466 raise errors.OpPrereqError("Disk template %s requires disk adoption,"
467 " but no 'adopt' parameter given" %
468 self.op.disk_template,
471 self.adopt_disks = has_adopt
473 # instance name verification
474 if self.op.name_check:
475 self.hostname1 = _CheckHostnameSane(self, self.op.instance_name)
476 self.op.instance_name = self.hostname1.name
477 # used in CheckPrereq for ip ping check
478 self.check_ip = self.hostname1.ip
482 # file storage checks
483 if (self.op.file_driver and
484 not self.op.file_driver in constants.FILE_DRIVER):
485 raise errors.OpPrereqError("Invalid file driver name '%s'" %
486 self.op.file_driver, errors.ECODE_INVAL)
488 if self.op.disk_template == constants.DT_FILE:
489 opcodes.RequireFileStorage()
490 elif self.op.disk_template == constants.DT_SHARED_FILE:
491 opcodes.RequireSharedFileStorage()
493 ### Node/iallocator related checks
494 _CheckIAllocatorOrNode(self, "iallocator", "pnode")
496 if self.op.pnode is not None:
497 if self.op.disk_template in constants.DTS_INT_MIRROR:
498 if self.op.snode is None:
499 raise errors.OpPrereqError("The networked disk templates need"
500 " a mirror node", errors.ECODE_INVAL)
502 self.LogWarning("Secondary node will be ignored on non-mirrored disk"
506 _CheckOpportunisticLocking(self.op)
508 self._cds = _GetClusterDomainSecret()
510 if self.op.mode == constants.INSTANCE_IMPORT:
511 # On import force_variant must be True, because if we forced it at
512 # initial install, our only chance when importing it back is that it
514 self.op.force_variant = True
516 if self.op.no_install:
517 self.LogInfo("No-installation mode has no effect during import")
519 elif self.op.mode == constants.INSTANCE_CREATE:
520 if self.op.os_type is None:
521 raise errors.OpPrereqError("No guest OS specified",
523 if self.op.os_type in self.cfg.GetClusterInfo().blacklisted_os:
524 raise errors.OpPrereqError("Guest OS '%s' is not allowed for"
525 " installation" % self.op.os_type,
527 if self.op.disk_template is None:
528 raise errors.OpPrereqError("No disk template specified",
531 elif self.op.mode == constants.INSTANCE_REMOTE_IMPORT:
532 # Check handshake to ensure both clusters have the same domain secret
533 src_handshake = self.op.source_handshake
534 if not src_handshake:
535 raise errors.OpPrereqError("Missing source handshake",
538 errmsg = masterd.instance.CheckRemoteExportHandshake(self._cds,
541 raise errors.OpPrereqError("Invalid handshake: %s" % errmsg,
544 # Load and check source CA
545 self.source_x509_ca_pem = self.op.source_x509_ca
546 if not self.source_x509_ca_pem:
547 raise errors.OpPrereqError("Missing source X509 CA",
551 (cert, _) = utils.LoadSignedX509Certificate(self.source_x509_ca_pem,
553 except OpenSSL.crypto.Error, err:
554 raise errors.OpPrereqError("Unable to load source X509 CA (%s)" %
555 (err, ), errors.ECODE_INVAL)
557 (errcode, msg) = utils.VerifyX509Certificate(cert, None, None)
558 if errcode is not None:
559 raise errors.OpPrereqError("Invalid source X509 CA (%s)" % (msg, ),
562 self.source_x509_ca = cert
564 src_instance_name = self.op.source_instance_name
565 if not src_instance_name:
566 raise errors.OpPrereqError("Missing source instance name",
569 self.source_instance_name = \
570 netutils.GetHostname(name=src_instance_name).name
573 raise errors.OpPrereqError("Invalid instance creation mode %r" %
574 self.op.mode, errors.ECODE_INVAL)
576 def ExpandNames(self):
577 """ExpandNames for CreateInstance.
579 Figure out the right locks for instance creation.
582 self.needed_locks = {}
584 instance_name = self.op.instance_name
585 # this is just a preventive check, but someone might still add this
586 # instance in the meantime, and creation will fail at lock-add time
587 if instance_name in self.cfg.GetInstanceList():
588 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
589 instance_name, errors.ECODE_EXISTS)
591 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
593 if self.op.iallocator:
594 # TODO: Find a solution to not lock all nodes in the cluster, e.g. by
595 # specifying a group on instance creation and then selecting nodes from
597 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
598 self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
600 if self.op.opportunistic_locking:
601 self.opportunistic_locks[locking.LEVEL_NODE] = True
602 self.opportunistic_locks[locking.LEVEL_NODE_RES] = True
604 self.op.pnode = _ExpandNodeName(self.cfg, self.op.pnode)
605 nodelist = [self.op.pnode]
606 if self.op.snode is not None:
607 self.op.snode = _ExpandNodeName(self.cfg, self.op.snode)
608 nodelist.append(self.op.snode)
609 self.needed_locks[locking.LEVEL_NODE] = nodelist
611 # in case of import lock the source node too
612 if self.op.mode == constants.INSTANCE_IMPORT:
613 src_node = self.op.src_node
614 src_path = self.op.src_path
617 self.op.src_path = src_path = self.op.instance_name
620 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
621 self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
622 self.op.src_node = None
623 if os.path.isabs(src_path):
624 raise errors.OpPrereqError("Importing an instance from a path"
625 " requires a source node option",
628 self.op.src_node = src_node = _ExpandNodeName(self.cfg, src_node)
629 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
630 self.needed_locks[locking.LEVEL_NODE].append(src_node)
631 if not os.path.isabs(src_path):
632 self.op.src_path = src_path = \
633 utils.PathJoin(pathutils.EXPORT_DIR, src_path)
635 self.needed_locks[locking.LEVEL_NODE_RES] = \
636 _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
638 def _RunAllocator(self):
639 """Run the allocator based on input opcode.
642 if self.op.opportunistic_locking:
643 # Only consider nodes for which a lock is held
644 node_whitelist = list(self.owned_locks(locking.LEVEL_NODE))
646 node_whitelist = None
648 #TODO Export network to iallocator so that it chooses a pnode
649 # in a nodegroup that has the desired network connected to
650 req = _CreateInstanceAllocRequest(self.op, self.disks,
651 self.nics, self.be_full,
653 ial = iallocator.IAllocator(self.cfg, self.rpc, req)
655 ial.Run(self.op.iallocator)
658 # When opportunistic locks are used only a temporary failure is generated
659 if self.op.opportunistic_locking:
660 ecode = errors.ECODE_TEMP_NORES
662 ecode = errors.ECODE_NORES
664 raise errors.OpPrereqError("Can't compute nodes using"
665 " iallocator '%s': %s" %
666 (self.op.iallocator, ial.info),
669 self.op.pnode = ial.result[0]
670 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
671 self.op.instance_name, self.op.iallocator,
672 utils.CommaJoin(ial.result))
674 assert req.RequiredNodes() in (1, 2), "Wrong node count from iallocator"
676 if req.RequiredNodes() == 2:
677 self.op.snode = ial.result[1]
679 def BuildHooksEnv(self):
682 This runs on master, primary and secondary nodes of the instance.
686 "ADD_MODE": self.op.mode,
688 if self.op.mode == constants.INSTANCE_IMPORT:
689 env["SRC_NODE"] = self.op.src_node
690 env["SRC_PATH"] = self.op.src_path
691 env["SRC_IMAGES"] = self.src_images
693 env.update(_BuildInstanceHookEnv(
694 name=self.op.instance_name,
695 primary_node=self.op.pnode,
696 secondary_nodes=self.secondaries,
697 status=self.op.start,
698 os_type=self.op.os_type,
699 minmem=self.be_full[constants.BE_MINMEM],
700 maxmem=self.be_full[constants.BE_MAXMEM],
701 vcpus=self.be_full[constants.BE_VCPUS],
702 nics=_NICListToTuple(self, self.nics),
703 disk_template=self.op.disk_template,
704 disks=[(d[constants.IDISK_NAME], d[constants.IDISK_SIZE],
705 d[constants.IDISK_MODE]) for d in self.disks],
708 hypervisor_name=self.op.hypervisor,
714 def BuildHooksNodes(self):
715 """Build hooks nodes.
718 nl = [self.cfg.GetMasterNode(), self.op.pnode] + self.secondaries
721 def _ReadExportInfo(self):
722 """Reads the export information from disk.
724 It will override the opcode source node and path with the actual
725 information, if these two were not specified before.
727 @return: the export information
730 assert self.op.mode == constants.INSTANCE_IMPORT
732 src_node = self.op.src_node
733 src_path = self.op.src_path
736 locked_nodes = self.owned_locks(locking.LEVEL_NODE)
737 exp_list = self.rpc.call_export_list(locked_nodes)
739 for node in exp_list:
740 if exp_list[node].fail_msg:
742 if src_path in exp_list[node].payload:
744 self.op.src_node = src_node = node
745 self.op.src_path = src_path = utils.PathJoin(pathutils.EXPORT_DIR,
749 raise errors.OpPrereqError("No export found for relative path %s" %
750 src_path, errors.ECODE_INVAL)
752 _CheckNodeOnline(self, src_node)
753 result = self.rpc.call_export_info(src_node, src_path)
754 result.Raise("No export or invalid export found in dir %s" % src_path)
756 export_info = objects.SerializableConfigParser.Loads(str(result.payload))
757 if not export_info.has_section(constants.INISECT_EXP):
758 raise errors.ProgrammerError("Corrupted export config",
759 errors.ECODE_ENVIRON)
761 ei_version = export_info.get(constants.INISECT_EXP, "version")
762 if (int(ei_version) != constants.EXPORT_VERSION):
763 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
764 (ei_version, constants.EXPORT_VERSION),
765 errors.ECODE_ENVIRON)
768 def _ReadExportParams(self, einfo):
769 """Use export parameters as defaults.
771 In case the opcode doesn't specify (as in override) some instance
772 parameters, then try to use them from the export information, if
776 self.op.os_type = einfo.get(constants.INISECT_EXP, "os")
778 if self.op.disk_template is None:
779 if einfo.has_option(constants.INISECT_INS, "disk_template"):
780 self.op.disk_template = einfo.get(constants.INISECT_INS,
782 if self.op.disk_template not in constants.DISK_TEMPLATES:
783 raise errors.OpPrereqError("Disk template specified in configuration"
784 " file is not one of the allowed values:"
786 " ".join(constants.DISK_TEMPLATES),
789 raise errors.OpPrereqError("No disk template specified and the export"
790 " is missing the disk_template information",
793 if not self.op.disks:
795 # TODO: import the disk iv_name too
796 for idx in range(constants.MAX_DISKS):
797 if einfo.has_option(constants.INISECT_INS, "disk%d_size" % idx):
798 disk_sz = einfo.getint(constants.INISECT_INS, "disk%d_size" % idx)
799 disks.append({constants.IDISK_SIZE: disk_sz})
800 self.op.disks = disks
801 if not disks and self.op.disk_template != constants.DT_DISKLESS:
802 raise errors.OpPrereqError("No disk info specified and the export"
803 " is missing the disk information",
808 for idx in range(constants.MAX_NICS):
809 if einfo.has_option(constants.INISECT_INS, "nic%d_mac" % idx):
811 for name in list(constants.NICS_PARAMETERS) + ["ip", "mac"]:
812 v = einfo.get(constants.INISECT_INS, "nic%d_%s" % (idx, name))
819 if not self.op.tags and einfo.has_option(constants.INISECT_INS, "tags"):
820 self.op.tags = einfo.get(constants.INISECT_INS, "tags").split()
822 if (self.op.hypervisor is None and
823 einfo.has_option(constants.INISECT_INS, "hypervisor")):
824 self.op.hypervisor = einfo.get(constants.INISECT_INS, "hypervisor")
826 if einfo.has_section(constants.INISECT_HYP):
827 # use the export parameters but do not override the ones
828 # specified by the user
829 for name, value in einfo.items(constants.INISECT_HYP):
830 if name not in self.op.hvparams:
831 self.op.hvparams[name] = value
833 if einfo.has_section(constants.INISECT_BEP):
834 # use the parameters, without overriding
835 for name, value in einfo.items(constants.INISECT_BEP):
836 if name not in self.op.beparams:
837 self.op.beparams[name] = value
838 # Compatibility for the old "memory" be param
839 if name == constants.BE_MEMORY:
840 if constants.BE_MAXMEM not in self.op.beparams:
841 self.op.beparams[constants.BE_MAXMEM] = value
842 if constants.BE_MINMEM not in self.op.beparams:
843 self.op.beparams[constants.BE_MINMEM] = value
845 # try to read the parameters old style, from the main section
846 for name in constants.BES_PARAMETERS:
847 if (name not in self.op.beparams and
848 einfo.has_option(constants.INISECT_INS, name)):
849 self.op.beparams[name] = einfo.get(constants.INISECT_INS, name)
851 if einfo.has_section(constants.INISECT_OSP):
852 # use the parameters, without overriding
853 for name, value in einfo.items(constants.INISECT_OSP):
854 if name not in self.op.osparams:
855 self.op.osparams[name] = value
857 def _RevertToDefaults(self, cluster):
858 """Revert the instance parameters to the default values.
862 hv_defs = cluster.SimpleFillHV(self.op.hypervisor, self.op.os_type, {})
863 for name in self.op.hvparams.keys():
864 if name in hv_defs and hv_defs[name] == self.op.hvparams[name]:
865 del self.op.hvparams[name]
867 be_defs = cluster.SimpleFillBE({})
868 for name in self.op.beparams.keys():
869 if name in be_defs and be_defs[name] == self.op.beparams[name]:
870 del self.op.beparams[name]
872 nic_defs = cluster.SimpleFillNIC({})
873 for nic in self.op.nics:
874 for name in constants.NICS_PARAMETERS:
875 if name in nic and name in nic_defs and nic[name] == nic_defs[name]:
878 os_defs = cluster.SimpleFillOS(self.op.os_type, {})
879 for name in self.op.osparams.keys():
880 if name in os_defs and os_defs[name] == self.op.osparams[name]:
881 del self.op.osparams[name]
883 def _CalculateFileStorageDir(self):
884 """Calculate final instance file storage dir.
887 # file storage dir calculation/check
888 self.instance_file_storage_dir = None
889 if self.op.disk_template in constants.DTS_FILEBASED:
890 # build the full file storage dir path
893 if self.op.disk_template == constants.DT_SHARED_FILE:
894 get_fsd_fn = self.cfg.GetSharedFileStorageDir
896 get_fsd_fn = self.cfg.GetFileStorageDir
898 cfg_storagedir = get_fsd_fn()
899 if not cfg_storagedir:
900 raise errors.OpPrereqError("Cluster file storage dir not defined",
902 joinargs.append(cfg_storagedir)
904 if self.op.file_storage_dir is not None:
905 joinargs.append(self.op.file_storage_dir)
907 joinargs.append(self.op.instance_name)
909 # pylint: disable=W0142
910 self.instance_file_storage_dir = utils.PathJoin(*joinargs)
912 def CheckPrereq(self): # pylint: disable=R0914
913 """Check prerequisites.
916 self._CalculateFileStorageDir()
918 if self.op.mode == constants.INSTANCE_IMPORT:
919 export_info = self._ReadExportInfo()
920 self._ReadExportParams(export_info)
921 self._old_instance_name = export_info.get(constants.INISECT_INS, "name")
923 self._old_instance_name = None
925 if (not self.cfg.GetVGName() and
926 self.op.disk_template not in constants.DTS_NOT_LVM):
927 raise errors.OpPrereqError("Cluster does not support lvm-based"
928 " instances", errors.ECODE_STATE)
930 if (self.op.hypervisor is None or
931 self.op.hypervisor == constants.VALUE_AUTO):
932 self.op.hypervisor = self.cfg.GetHypervisorType()
934 cluster = self.cfg.GetClusterInfo()
935 enabled_hvs = cluster.enabled_hypervisors
936 if self.op.hypervisor not in enabled_hvs:
937 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
939 (self.op.hypervisor, ",".join(enabled_hvs)),
943 for tag in self.op.tags:
944 objects.TaggableObject.ValidateTag(tag)
946 # check hypervisor parameter syntax (locally)
947 utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
948 filled_hvp = cluster.SimpleFillHV(self.op.hypervisor, self.op.os_type,
950 hv_type = hypervisor.GetHypervisorClass(self.op.hypervisor)
951 hv_type.CheckParameterSyntax(filled_hvp)
952 self.hv_full = filled_hvp
953 # check that we don't specify global parameters on an instance
954 _CheckParamsNotGlobal(self.op.hvparams, constants.HVC_GLOBALS, "hypervisor",
955 "instance", "cluster")
957 # fill and remember the beparams dict
958 self.be_full = _ComputeFullBeParams(self.op, cluster)
960 # build os parameters
961 self.os_full = cluster.SimpleFillOS(self.op.os_type, self.op.osparams)
963 # now that hvp/bep are in final format, let's reset to defaults,
965 if self.op.identify_defaults:
966 self._RevertToDefaults(cluster)
969 self.nics = _ComputeNics(self.op, cluster, self.check_ip, self.cfg,
972 # disk checks/pre-build
973 default_vg = self.cfg.GetVGName()
974 self.disks = _ComputeDisks(self.op, default_vg)
976 if self.op.mode == constants.INSTANCE_IMPORT:
978 for idx in range(len(self.disks)):
979 option = "disk%d_dump" % idx
980 if export_info.has_option(constants.INISECT_INS, option):
981 # FIXME: are the old os-es, disk sizes, etc. useful?
982 export_name = export_info.get(constants.INISECT_INS, option)
983 image = utils.PathJoin(self.op.src_path, export_name)
984 disk_images.append(image)
986 disk_images.append(False)
988 self.src_images = disk_images
990 if self.op.instance_name == self._old_instance_name:
991 for idx, nic in enumerate(self.nics):
992 if nic.mac == constants.VALUE_AUTO:
993 nic_mac_ini = "nic%d_mac" % idx
994 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
996 # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
998 # ip ping checks (we use the same ip that was resolved in ExpandNames)
1000 if netutils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
1001 raise errors.OpPrereqError("IP %s of instance %s already in use" %
1002 (self.check_ip, self.op.instance_name),
1003 errors.ECODE_NOTUNIQUE)
1005 #### mac address generation
1006 # By generating here the mac address both the allocator and the hooks get
1007 # the real final mac address rather than the 'auto' or 'generate' value.
1008 # There is a race condition between the generation and the instance object
1009 # creation, which means that we know the mac is valid now, but we're not
1010 # sure it will be when we actually add the instance. If things go bad
1011 # adding the instance will abort because of a duplicate mac, and the
1012 # creation job will fail.
1013 for nic in self.nics:
1014 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
1015 nic.mac = self.cfg.GenerateMAC(nic.network, self.proc.GetECId())
1019 if self.op.iallocator is not None:
1020 self._RunAllocator()
1022 # Release all unneeded node locks
1023 keep_locks = filter(None, [self.op.pnode, self.op.snode, self.op.src_node])
1024 _ReleaseLocks(self, locking.LEVEL_NODE, keep=keep_locks)
1025 _ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=keep_locks)
1026 _ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
1028 assert (self.owned_locks(locking.LEVEL_NODE) ==
1029 self.owned_locks(locking.LEVEL_NODE_RES)), \
1030 "Node locks differ from node resource locks"
1032 #### node related checks
1034 # check primary node
1035 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
1036 assert self.pnode is not None, \
1037 "Cannot retrieve locked node %s" % self.op.pnode
1039 raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
1040 pnode.name, errors.ECODE_STATE)
1042 raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
1043 pnode.name, errors.ECODE_STATE)
1044 if not pnode.vm_capable:
1045 raise errors.OpPrereqError("Cannot use non-vm_capable primary node"
1046 " '%s'" % pnode.name, errors.ECODE_STATE)
1048 self.secondaries = []
1050 # Fill in any IPs from IP pools. This must happen here, because we need to
1051 # know the nic's primary node, as specified by the iallocator
1052 for idx, nic in enumerate(self.nics):
1053 net_uuid = nic.network
1054 if net_uuid is not None:
1055 nobj = self.cfg.GetNetwork(net_uuid)
1056 netparams = self.cfg.GetGroupNetParams(net_uuid, self.pnode.name)
1057 if netparams is None:
1058 raise errors.OpPrereqError("No netparams found for network"
1059 " %s. Propably not connected to"
1060 " node's %s nodegroup" %
1061 (nobj.name, self.pnode.name),
1063 self.LogInfo("NIC/%d inherits netparams %s" %
1064 (idx, netparams.values()))
1065 nic.nicparams = dict(netparams)
1066 if nic.ip is not None:
1067 if nic.ip.lower() == constants.NIC_IP_POOL:
1069 nic.ip = self.cfg.GenerateIp(net_uuid, self.proc.GetECId())
1070 except errors.ReservationError:
1071 raise errors.OpPrereqError("Unable to get a free IP for NIC %d"
1072 " from the address pool" % idx,
1074 self.LogInfo("Chose IP %s from network %s", nic.ip, nobj.name)
1077 self.cfg.ReserveIp(net_uuid, nic.ip, self.proc.GetECId())
1078 except errors.ReservationError:
1079 raise errors.OpPrereqError("IP address %s already in use"
1080 " or does not belong to network %s" %
1081 (nic.ip, nobj.name),
1082 errors.ECODE_NOTUNIQUE)
1084 # net is None, ip None or given
1085 elif self.op.conflicts_check:
1086 _CheckForConflictingIp(self, nic.ip, self.pnode.name)
1088 # mirror node verification
1089 if self.op.disk_template in constants.DTS_INT_MIRROR:
1090 if self.op.snode == pnode.name:
1091 raise errors.OpPrereqError("The secondary node cannot be the"
1092 " primary node", errors.ECODE_INVAL)
1093 _CheckNodeOnline(self, self.op.snode)
1094 _CheckNodeNotDrained(self, self.op.snode)
1095 _CheckNodeVmCapable(self, self.op.snode)
1096 self.secondaries.append(self.op.snode)
1098 snode = self.cfg.GetNodeInfo(self.op.snode)
1099 if pnode.group != snode.group:
1100 self.LogWarning("The primary and secondary nodes are in two"
1101 " different node groups; the disk parameters"
1102 " from the first disk's node group will be"
1105 if not self.op.disk_template in constants.DTS_EXCL_STORAGE:
1107 if self.op.disk_template in constants.DTS_INT_MIRROR:
1109 has_es = lambda n: _IsExclusiveStorageEnabledNode(self.cfg, n)
1110 if compat.any(map(has_es, nodes)):
1111 raise errors.OpPrereqError("Disk template %s not supported with"
1112 " exclusive storage" % self.op.disk_template,
1115 nodenames = [pnode.name] + self.secondaries
1117 if not self.adopt_disks:
1118 if self.op.disk_template == constants.DT_RBD:
1119 # _CheckRADOSFreeSpace() is just a placeholder.
1120 # Any function that checks prerequisites can be placed here.
1121 # Check if there is enough space on the RADOS cluster.
1122 _CheckRADOSFreeSpace()
1123 elif self.op.disk_template == constants.DT_EXT:
1124 # FIXME: Function that checks prereqs if needed
1127 # Check lv size requirements, if not adopting
1128 req_sizes = _ComputeDiskSizePerVG(self.op.disk_template, self.disks)
1129 _CheckNodesFreeDiskPerVG(self, nodenames, req_sizes)
1131 elif self.op.disk_template == constants.DT_PLAIN: # Check the adoption data
1132 all_lvs = set(["%s/%s" % (disk[constants.IDISK_VG],
1133 disk[constants.IDISK_ADOPT])
1134 for disk in self.disks])
1135 if len(all_lvs) != len(self.disks):
1136 raise errors.OpPrereqError("Duplicate volume names given for adoption",
1138 for lv_name in all_lvs:
1140 # FIXME: lv_name here is "vg/lv" need to ensure that other calls
1141 # to ReserveLV uses the same syntax
1142 self.cfg.ReserveLV(lv_name, self.proc.GetECId())
1143 except errors.ReservationError:
1144 raise errors.OpPrereqError("LV named %s used by another instance" %
1145 lv_name, errors.ECODE_NOTUNIQUE)
1147 vg_names = self.rpc.call_vg_list([pnode.name])[pnode.name]
1148 vg_names.Raise("Cannot get VG information from node %s" % pnode.name)
1150 node_lvs = self.rpc.call_lv_list([pnode.name],
1151 vg_names.payload.keys())[pnode.name]
1152 node_lvs.Raise("Cannot get LV information from node %s" % pnode.name)
1153 node_lvs = node_lvs.payload
1155 delta = all_lvs.difference(node_lvs.keys())
1157 raise errors.OpPrereqError("Missing logical volume(s): %s" %
1158 utils.CommaJoin(delta),
1160 online_lvs = [lv for lv in all_lvs if node_lvs[lv][2]]
1162 raise errors.OpPrereqError("Online logical volumes found, cannot"
1163 " adopt: %s" % utils.CommaJoin(online_lvs),
1165 # update the size of disk based on what is found
1166 for dsk in self.disks:
1167 dsk[constants.IDISK_SIZE] = \
1168 int(float(node_lvs["%s/%s" % (dsk[constants.IDISK_VG],
1169 dsk[constants.IDISK_ADOPT])][0]))
1171 elif self.op.disk_template == constants.DT_BLOCK:
1172 # Normalize and de-duplicate device paths
1173 all_disks = set([os.path.abspath(disk[constants.IDISK_ADOPT])
1174 for disk in self.disks])
1175 if len(all_disks) != len(self.disks):
1176 raise errors.OpPrereqError("Duplicate disk names given for adoption",
1178 baddisks = [d for d in all_disks
1179 if not d.startswith(constants.ADOPTABLE_BLOCKDEV_ROOT)]
1181 raise errors.OpPrereqError("Device node(s) %s lie outside %s and"
1182 " cannot be adopted" %
1183 (utils.CommaJoin(baddisks),
1184 constants.ADOPTABLE_BLOCKDEV_ROOT),
1187 node_disks = self.rpc.call_bdev_sizes([pnode.name],
1188 list(all_disks))[pnode.name]
1189 node_disks.Raise("Cannot get block device information from node %s" %
1191 node_disks = node_disks.payload
1192 delta = all_disks.difference(node_disks.keys())
1194 raise errors.OpPrereqError("Missing block device(s): %s" %
1195 utils.CommaJoin(delta),
1197 for dsk in self.disks:
1198 dsk[constants.IDISK_SIZE] = \
1199 int(float(node_disks[dsk[constants.IDISK_ADOPT]]))
1201 # Verify instance specs
1202 spindle_use = self.be_full.get(constants.BE_SPINDLE_USE, None)
1204 constants.ISPEC_MEM_SIZE: self.be_full.get(constants.BE_MAXMEM, None),
1205 constants.ISPEC_CPU_COUNT: self.be_full.get(constants.BE_VCPUS, None),
1206 constants.ISPEC_DISK_COUNT: len(self.disks),
1207 constants.ISPEC_DISK_SIZE: [disk[constants.IDISK_SIZE]
1208 for disk in self.disks],
1209 constants.ISPEC_NIC_COUNT: len(self.nics),
1210 constants.ISPEC_SPINDLE_USE: spindle_use,
1213 group_info = self.cfg.GetNodeGroup(pnode.group)
1214 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, group_info)
1215 res = _ComputeIPolicyInstanceSpecViolation(ipolicy, ispec,
1216 self.op.disk_template)
1217 if not self.op.ignore_ipolicy and res:
1218 msg = ("Instance allocation to group %s (%s) violates policy: %s" %
1219 (pnode.group, group_info.name, utils.CommaJoin(res)))
1220 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
1222 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
1224 _CheckNodeHasOS(self, pnode.name, self.op.os_type, self.op.force_variant)
1225 # check OS parameters (remotely)
1226 _CheckOSParams(self, True, nodenames, self.op.os_type, self.os_full)
1228 _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
1230 #TODO: _CheckExtParams (remotely)
1231 # Check parameters for extstorage
1233 # memory check on primary node
1234 #TODO(dynmem): use MINMEM for checking
1236 _CheckNodeFreeMemory(self, self.pnode.name,
1237 "creating instance %s" % self.op.instance_name,
1238 self.be_full[constants.BE_MAXMEM],
1241 self.dry_run_result = list(nodenames)
1243 def Exec(self, feedback_fn):
1244 """Create and add the instance to the cluster.
1247 instance = self.op.instance_name
1248 pnode_name = self.pnode.name
1250 assert not (self.owned_locks(locking.LEVEL_NODE_RES) -
1251 self.owned_locks(locking.LEVEL_NODE)), \
1252 "Node locks differ from node resource locks"
1253 assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
1255 ht_kind = self.op.hypervisor
1256 if ht_kind in constants.HTS_REQ_PORT:
1257 network_port = self.cfg.AllocatePort()
1261 # This is ugly but we got a chicken-egg problem here
1262 # We can only take the group disk parameters, as the instance
1263 # has no disks yet (we are generating them right here).
1264 node = self.cfg.GetNodeInfo(pnode_name)
1265 nodegroup = self.cfg.GetNodeGroup(node.group)
1266 disks = _GenerateDiskTemplate(self,
1267 self.op.disk_template,
1268 instance, pnode_name,
1271 self.instance_file_storage_dir,
1272 self.op.file_driver,
1275 self.cfg.GetGroupDiskParams(nodegroup))
1277 iobj = objects.Instance(name=instance, os=self.op.os_type,
1278 primary_node=pnode_name,
1279 nics=self.nics, disks=disks,
1280 disk_template=self.op.disk_template,
1281 admin_state=constants.ADMINST_DOWN,
1282 network_port=network_port,
1283 beparams=self.op.beparams,
1284 hvparams=self.op.hvparams,
1285 hypervisor=self.op.hypervisor,
1286 osparams=self.op.osparams,
1290 for tag in self.op.tags:
1293 if self.adopt_disks:
1294 if self.op.disk_template == constants.DT_PLAIN:
1295 # rename LVs to the newly-generated names; we need to construct
1296 # 'fake' LV disks with the old data, plus the new unique_id
1297 tmp_disks = [objects.Disk.FromDict(v.ToDict()) for v in disks]
1299 for t_dsk, a_dsk in zip(tmp_disks, self.disks):
1300 rename_to.append(t_dsk.logical_id)
1301 t_dsk.logical_id = (t_dsk.logical_id[0], a_dsk[constants.IDISK_ADOPT])
1302 self.cfg.SetDiskID(t_dsk, pnode_name)
1303 result = self.rpc.call_blockdev_rename(pnode_name,
1304 zip(tmp_disks, rename_to))
1305 result.Raise("Failed to rename adoped LVs")
1307 feedback_fn("* creating instance disks...")
1309 _CreateDisks(self, iobj)
1310 except errors.OpExecError:
1311 self.LogWarning("Device creation failed")
1312 self.cfg.ReleaseDRBDMinors(instance)
1315 feedback_fn("adding instance %s to cluster config" % instance)
1317 self.cfg.AddInstance(iobj, self.proc.GetECId())
1319 # Declare that we don't want to remove the instance lock anymore, as we've
1320 # added the instance to the config
1321 del self.remove_locks[locking.LEVEL_INSTANCE]
1323 if self.op.mode == constants.INSTANCE_IMPORT:
1324 # Release unused nodes
1325 _ReleaseLocks(self, locking.LEVEL_NODE, keep=[self.op.src_node])
1328 _ReleaseLocks(self, locking.LEVEL_NODE)
1331 if not self.adopt_disks and self.cfg.GetClusterInfo().prealloc_wipe_disks:
1332 feedback_fn("* wiping instance disks...")
1334 _WipeDisks(self, iobj)
1335 except errors.OpExecError, err:
1336 logging.exception("Wiping disks failed")
1337 self.LogWarning("Wiping instance disks failed (%s)", err)
1341 # Something is already wrong with the disks, don't do anything else
1343 elif self.op.wait_for_sync:
1344 disk_abort = not _WaitForSync(self, iobj)
1345 elif iobj.disk_template in constants.DTS_INT_MIRROR:
1346 # make sure the disks are not degraded (still sync-ing is ok)
1347 feedback_fn("* checking mirrors status")
1348 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
1353 _RemoveDisks(self, iobj)
1354 self.cfg.RemoveInstance(iobj.name)
1355 # Make sure the instance lock gets removed
1356 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
1357 raise errors.OpExecError("There are some degraded disks for"
1360 # Release all node resource locks
1361 _ReleaseLocks(self, locking.LEVEL_NODE_RES)
1363 if iobj.disk_template != constants.DT_DISKLESS and not self.adopt_disks:
1364 # we need to set the disks ID to the primary node, since the
1365 # preceding code might or might have not done it, depending on
1366 # disk template and other options
1367 for disk in iobj.disks:
1368 self.cfg.SetDiskID(disk, pnode_name)
1369 if self.op.mode == constants.INSTANCE_CREATE:
1370 if not self.op.no_install:
1371 pause_sync = (iobj.disk_template in constants.DTS_INT_MIRROR and
1372 not self.op.wait_for_sync)
1374 feedback_fn("* pausing disk sync to install instance OS")
1375 result = self.rpc.call_blockdev_pause_resume_sync(pnode_name,
1378 for idx, success in enumerate(result.payload):
1380 logging.warn("pause-sync of instance %s for disk %d failed",
1383 feedback_fn("* running the instance OS create scripts...")
1384 # FIXME: pass debug option from opcode to backend
1386 self.rpc.call_instance_os_add(pnode_name, (iobj, None), False,
1387 self.op.debug_level)
1389 feedback_fn("* resuming disk sync")
1390 result = self.rpc.call_blockdev_pause_resume_sync(pnode_name,
1393 for idx, success in enumerate(result.payload):
1395 logging.warn("resume-sync of instance %s for disk %d failed",
1398 os_add_result.Raise("Could not add os for instance %s"
1399 " on node %s" % (instance, pnode_name))
1402 if self.op.mode == constants.INSTANCE_IMPORT:
1403 feedback_fn("* running the instance OS import scripts...")
1407 for idx, image in enumerate(self.src_images):
1411 # FIXME: pass debug option from opcode to backend
1412 dt = masterd.instance.DiskTransfer("disk/%s" % idx,
1413 constants.IEIO_FILE, (image, ),
1414 constants.IEIO_SCRIPT,
1415 (iobj.disks[idx], idx),
1417 transfers.append(dt)
1420 masterd.instance.TransferInstanceData(self, feedback_fn,
1421 self.op.src_node, pnode_name,
1422 self.pnode.secondary_ip,
1424 if not compat.all(import_result):
1425 self.LogWarning("Some disks for instance %s on node %s were not"
1426 " imported successfully" % (instance, pnode_name))
1428 rename_from = self._old_instance_name
1430 elif self.op.mode == constants.INSTANCE_REMOTE_IMPORT:
1431 feedback_fn("* preparing remote import...")
1432 # The source cluster will stop the instance before attempting to make
1433 # a connection. In some cases stopping an instance can take a long
1434 # time, hence the shutdown timeout is added to the connection
1436 connect_timeout = (constants.RIE_CONNECT_TIMEOUT +
1437 self.op.source_shutdown_timeout)
1438 timeouts = masterd.instance.ImportExportTimeouts(connect_timeout)
1440 assert iobj.primary_node == self.pnode.name
1442 masterd.instance.RemoteImport(self, feedback_fn, iobj, self.pnode,
1443 self.source_x509_ca,
1444 self._cds, timeouts)
1445 if not compat.all(disk_results):
1446 # TODO: Should the instance still be started, even if some disks
1447 # failed to import (valid for local imports, too)?
1448 self.LogWarning("Some disks for instance %s on node %s were not"
1449 " imported successfully" % (instance, pnode_name))
1451 rename_from = self.source_instance_name
1454 # also checked in the prereq part
1455 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
1458 # Run rename script on newly imported instance
1459 assert iobj.name == instance
1460 feedback_fn("Running rename script for %s" % instance)
1461 result = self.rpc.call_instance_run_rename(pnode_name, iobj,
1463 self.op.debug_level)
1465 self.LogWarning("Failed to run rename script for %s on node"
1466 " %s: %s" % (instance, pnode_name, result.fail_msg))
1468 assert not self.owned_locks(locking.LEVEL_NODE_RES)
1471 iobj.admin_state = constants.ADMINST_UP
1472 self.cfg.Update(iobj, feedback_fn)
1473 logging.info("Starting instance %s on node %s", instance, pnode_name)
1474 feedback_fn("* starting instance...")
1475 result = self.rpc.call_instance_start(pnode_name, (iobj, None, None),
1476 False, self.op.reason)
1477 result.Raise("Could not start instance")
1479 return list(iobj.all_nodes)
1482 class LUInstanceRename(LogicalUnit):
1483 """Rename an instance.
1486 HPATH = "instance-rename"
1487 HTYPE = constants.HTYPE_INSTANCE
1489 def CheckArguments(self):
1493 if self.op.ip_check and not self.op.name_check:
1494 # TODO: make the ip check more flexible and not depend on the name check
1495 raise errors.OpPrereqError("IP address check requires a name check",
1498 def BuildHooksEnv(self):
1501 This runs on master, primary and secondary nodes of the instance.
1504 env = _BuildInstanceHookEnvByObject(self, self.instance)
1505 env["INSTANCE_NEW_NAME"] = self.op.new_name
1508 def BuildHooksNodes(self):
1509 """Build hooks nodes.
1512 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
1515 def CheckPrereq(self):
1516 """Check prerequisites.
1518 This checks that the instance is in the cluster and is not running.
1521 self.op.instance_name = _ExpandInstanceName(self.cfg,
1522 self.op.instance_name)
1523 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1524 assert instance is not None
1525 _CheckNodeOnline(self, instance.primary_node)
1526 _CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
1527 msg="cannot rename")
1528 self.instance = instance
1530 new_name = self.op.new_name
1531 if self.op.name_check:
1532 hostname = _CheckHostnameSane(self, new_name)
1533 new_name = self.op.new_name = hostname.name
1534 if (self.op.ip_check and
1535 netutils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT)):
1536 raise errors.OpPrereqError("IP %s of instance %s already in use" %
1537 (hostname.ip, new_name),
1538 errors.ECODE_NOTUNIQUE)
1540 instance_list = self.cfg.GetInstanceList()
1541 if new_name in instance_list and new_name != instance.name:
1542 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
1543 new_name, errors.ECODE_EXISTS)
1545 def Exec(self, feedback_fn):
1546 """Rename the instance.
1549 inst = self.instance
1550 old_name = inst.name
1552 rename_file_storage = False
1553 if (inst.disk_template in constants.DTS_FILEBASED and
1554 self.op.new_name != inst.name):
1555 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
1556 rename_file_storage = True
1558 self.cfg.RenameInstance(inst.name, self.op.new_name)
1559 # Change the instance lock. This is definitely safe while we hold the BGL.
1560 # Otherwise the new lock would have to be added in acquired mode.
1562 assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER)
1563 self.glm.remove(locking.LEVEL_INSTANCE, old_name)
1564 self.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
1566 # re-read the instance from the configuration after rename
1567 inst = self.cfg.GetInstanceInfo(self.op.new_name)
1569 if rename_file_storage:
1570 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
1571 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
1572 old_file_storage_dir,
1573 new_file_storage_dir)
1574 result.Raise("Could not rename on node %s directory '%s' to '%s'"
1575 " (but the instance has been renamed in Ganeti)" %
1576 (inst.primary_node, old_file_storage_dir,
1577 new_file_storage_dir))
1579 _StartInstanceDisks(self, inst, None)
1580 # update info on disks
1581 info = _GetInstanceInfoText(inst)
1582 for (idx, disk) in enumerate(inst.disks):
1583 for node in inst.all_nodes:
1584 self.cfg.SetDiskID(disk, node)
1585 result = self.rpc.call_blockdev_setinfo(node, disk, info)
1587 self.LogWarning("Error setting info on node %s for disk %s: %s",
1588 node, idx, result.fail_msg)
1590 result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
1591 old_name, self.op.debug_level)
1592 msg = result.fail_msg
1594 msg = ("Could not run OS rename script for instance %s on node %s"
1595 " (but the instance has been renamed in Ganeti): %s" %
1596 (inst.name, inst.primary_node, msg))
1597 self.LogWarning(msg)
1599 _ShutdownInstanceDisks(self, inst)
1604 class LUInstanceRemove(LogicalUnit):
1605 """Remove an instance.
1608 HPATH = "instance-remove"
1609 HTYPE = constants.HTYPE_INSTANCE
1612 def ExpandNames(self):
1613 self._ExpandAndLockInstance()
1614 self.needed_locks[locking.LEVEL_NODE] = []
1615 self.needed_locks[locking.LEVEL_NODE_RES] = []
1616 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1618 def DeclareLocks(self, level):
1619 if level == locking.LEVEL_NODE:
1620 self._LockInstancesNodes()
1621 elif level == locking.LEVEL_NODE_RES:
1623 self.needed_locks[locking.LEVEL_NODE_RES] = \
1624 _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1626 def BuildHooksEnv(self):
1629 This runs on master, primary and secondary nodes of the instance.
1632 env = _BuildInstanceHookEnvByObject(self, self.instance)
1633 env["SHUTDOWN_TIMEOUT"] = self.op.shutdown_timeout
1636 def BuildHooksNodes(self):
1637 """Build hooks nodes.
1640 nl = [self.cfg.GetMasterNode()]
1641 nl_post = list(self.instance.all_nodes) + nl
1642 return (nl, nl_post)
1644 def CheckPrereq(self):
1645 """Check prerequisites.
1647 This checks that the instance is in the cluster.
1650 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1651 assert self.instance is not None, \
1652 "Cannot retrieve locked instance %s" % self.op.instance_name
1654 def Exec(self, feedback_fn):
1655 """Remove the instance.
1658 instance = self.instance
1659 logging.info("Shutting down instance %s on node %s",
1660 instance.name, instance.primary_node)
1662 result = self.rpc.call_instance_shutdown(instance.primary_node, instance,
1663 self.op.shutdown_timeout,
1665 msg = result.fail_msg
1667 if self.op.ignore_failures:
1668 feedback_fn("Warning: can't shutdown instance: %s" % msg)
1670 raise errors.OpExecError("Could not shutdown instance %s on"
1672 (instance.name, instance.primary_node, msg))
1674 assert (self.owned_locks(locking.LEVEL_NODE) ==
1675 self.owned_locks(locking.LEVEL_NODE_RES))
1676 assert not (set(instance.all_nodes) -
1677 self.owned_locks(locking.LEVEL_NODE)), \
1678 "Not owning correct locks"
1680 _RemoveInstance(self, feedback_fn, instance, self.op.ignore_failures)
1683 def _CheckInstanceBridgesExist(lu, instance, node=None):
1684 """Check that the brigdes needed by an instance exist.
1688 node = instance.primary_node
1689 _CheckNicsBridgesExist(lu, instance.nics, node)
1692 class LUInstanceMove(LogicalUnit):
1693 """Move an instance by data-copying.
1696 HPATH = "instance-move"
1697 HTYPE = constants.HTYPE_INSTANCE
1700 def ExpandNames(self):
1701 self._ExpandAndLockInstance()
1702 target_node = _ExpandNodeName(self.cfg, self.op.target_node)
1703 self.op.target_node = target_node
1704 self.needed_locks[locking.LEVEL_NODE] = [target_node]
1705 self.needed_locks[locking.LEVEL_NODE_RES] = []
1706 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
1708 def DeclareLocks(self, level):
1709 if level == locking.LEVEL_NODE:
1710 self._LockInstancesNodes(primary_only=True)
1711 elif level == locking.LEVEL_NODE_RES:
1713 self.needed_locks[locking.LEVEL_NODE_RES] = \
1714 _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1716 def BuildHooksEnv(self):
1719 This runs on master, primary and secondary nodes of the instance.
1723 "TARGET_NODE": self.op.target_node,
1724 "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
1726 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
1729 def BuildHooksNodes(self):
1730 """Build hooks nodes.
1734 self.cfg.GetMasterNode(),
1735 self.instance.primary_node,
1736 self.op.target_node,
1740 def CheckPrereq(self):
1741 """Check prerequisites.
1743 This checks that the instance is in the cluster.
1746 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1747 assert self.instance is not None, \
1748 "Cannot retrieve locked instance %s" % self.op.instance_name
1750 if instance.disk_template not in constants.DTS_COPYABLE:
1751 raise errors.OpPrereqError("Disk template %s not suitable for copying" %
1752 instance.disk_template, errors.ECODE_STATE)
1754 node = self.cfg.GetNodeInfo(self.op.target_node)
1755 assert node is not None, \
1756 "Cannot retrieve locked node %s" % self.op.target_node
1758 self.target_node = target_node = node.name
1760 if target_node == instance.primary_node:
1761 raise errors.OpPrereqError("Instance %s is already on the node %s" %
1762 (instance.name, target_node),
1765 bep = self.cfg.GetClusterInfo().FillBE(instance)
1767 for idx, dsk in enumerate(instance.disks):
1768 if dsk.dev_type not in (constants.LD_LV, constants.LD_FILE):
1769 raise errors.OpPrereqError("Instance disk %d has a complex layout,"
1770 " cannot copy" % idx, errors.ECODE_STATE)
1772 _CheckNodeOnline(self, target_node)
1773 _CheckNodeNotDrained(self, target_node)
1774 _CheckNodeVmCapable(self, target_node)
1775 cluster = self.cfg.GetClusterInfo()
1776 group_info = self.cfg.GetNodeGroup(node.group)
1777 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, group_info)
1778 _CheckTargetNodeIPolicy(self, ipolicy, instance, node, self.cfg,
1779 ignore=self.op.ignore_ipolicy)
1781 if instance.admin_state == constants.ADMINST_UP:
1782 # check memory requirements on the secondary node
1783 _CheckNodeFreeMemory(self, target_node,
1784 "failing over instance %s" %
1785 instance.name, bep[constants.BE_MAXMEM],
1786 instance.hypervisor)
1788 self.LogInfo("Not checking memory on the secondary node as"
1789 " instance will not be started")
1791 # check bridge existance
1792 _CheckInstanceBridgesExist(self, instance, node=target_node)
1794 def Exec(self, feedback_fn):
1795 """Move an instance.
1797 The move is done by shutting it down on its present node, copying
1798 the data over (slow) and starting it on the new node.
1801 instance = self.instance
1803 source_node = instance.primary_node
1804 target_node = self.target_node
1806 self.LogInfo("Shutting down instance %s on source node %s",
1807 instance.name, source_node)
1809 assert (self.owned_locks(locking.LEVEL_NODE) ==
1810 self.owned_locks(locking.LEVEL_NODE_RES))
1812 result = self.rpc.call_instance_shutdown(source_node, instance,
1813 self.op.shutdown_timeout,
1815 msg = result.fail_msg
1817 if self.op.ignore_consistency:
1818 self.LogWarning("Could not shutdown instance %s on node %s."
1819 " Proceeding anyway. Please make sure node"
1820 " %s is down. Error details: %s",
1821 instance.name, source_node, source_node, msg)
1823 raise errors.OpExecError("Could not shutdown instance %s on"
1825 (instance.name, source_node, msg))
1827 # create the target disks
1829 _CreateDisks(self, instance, target_node=target_node)
1830 except errors.OpExecError:
1831 self.LogWarning("Device creation failed")
1832 self.cfg.ReleaseDRBDMinors(instance.name)
1835 cluster_name = self.cfg.GetClusterInfo().cluster_name
1838 # activate, get path, copy the data over
1839 for idx, disk in enumerate(instance.disks):
1840 self.LogInfo("Copying data for disk %d", idx)
1841 result = self.rpc.call_blockdev_assemble(target_node, (disk, instance),
1842 instance.name, True, idx)
1844 self.LogWarning("Can't assemble newly created disk %d: %s",
1845 idx, result.fail_msg)
1846 errs.append(result.fail_msg)
1848 dev_path = result.payload
1849 result = self.rpc.call_blockdev_export(source_node, (disk, instance),
1850 target_node, dev_path,
1853 self.LogWarning("Can't copy data over for disk %d: %s",
1854 idx, result.fail_msg)
1855 errs.append(result.fail_msg)
1859 self.LogWarning("Some disks failed to copy, aborting")
1861 _RemoveDisks(self, instance, target_node=target_node)
1863 self.cfg.ReleaseDRBDMinors(instance.name)
1864 raise errors.OpExecError("Errors during disk copy: %s" %
1867 instance.primary_node = target_node
1868 self.cfg.Update(instance, feedback_fn)
1870 self.LogInfo("Removing the disks on the original node")
1871 _RemoveDisks(self, instance, target_node=source_node)
1873 # Only start the instance if it's marked as up
1874 if instance.admin_state == constants.ADMINST_UP:
1875 self.LogInfo("Starting instance %s on node %s",
1876 instance.name, target_node)
1878 disks_ok, _ = _AssembleInstanceDisks(self, instance,
1879 ignore_secondaries=True)
1881 _ShutdownInstanceDisks(self, instance)
1882 raise errors.OpExecError("Can't activate the instance's disks")
1884 result = self.rpc.call_instance_start(target_node,
1885 (instance, None, None), False,
1887 msg = result.fail_msg
1889 _ShutdownInstanceDisks(self, instance)
1890 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
1891 (instance.name, target_node, msg))
1894 def _GetInstanceConsole(cluster, instance):
1895 """Returns console information for an instance.
1897 @type cluster: L{objects.Cluster}
1898 @type instance: L{objects.Instance}
1902 hyper = hypervisor.GetHypervisorClass(instance.hypervisor)
1903 # beparams and hvparams are passed separately, to avoid editing the
1904 # instance and then saving the defaults in the instance itself.
1905 hvparams = cluster.FillHV(instance)
1906 beparams = cluster.FillBE(instance)
1907 console = hyper.GetInstanceConsole(instance, hvparams, beparams)
1909 assert console.instance == instance.name
1910 assert console.Validate()
1912 return console.ToDict()
1915 class _InstanceQuery(_QueryBase):
1916 FIELDS = query.INSTANCE_FIELDS
1918 def ExpandNames(self, lu):
1919 lu.needed_locks = {}
1920 lu.share_locks = _ShareAll()
1923 self.wanted = _GetWantedInstances(lu, self.names)
1925 self.wanted = locking.ALL_SET
1927 self.do_locking = (self.use_locking and
1928 query.IQ_LIVE in self.requested_data)
1930 lu.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
1931 lu.needed_locks[locking.LEVEL_NODEGROUP] = []
1932 lu.needed_locks[locking.LEVEL_NODE] = []
1933 lu.needed_locks[locking.LEVEL_NETWORK] = []
1934 lu.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1936 self.do_grouplocks = (self.do_locking and
1937 query.IQ_NODES in self.requested_data)
1939 def DeclareLocks(self, lu, level):
1941 if level == locking.LEVEL_NODEGROUP and self.do_grouplocks:
1942 assert not lu.needed_locks[locking.LEVEL_NODEGROUP]
1944 # Lock all groups used by instances optimistically; this requires going
1945 # via the node before it's locked, requiring verification later on
1946 lu.needed_locks[locking.LEVEL_NODEGROUP] = \
1948 for instance_name in lu.owned_locks(locking.LEVEL_INSTANCE)
1949 for group_uuid in lu.cfg.GetInstanceNodeGroups(instance_name))
1950 elif level == locking.LEVEL_NODE:
1951 lu._LockInstancesNodes() # pylint: disable=W0212
1953 elif level == locking.LEVEL_NETWORK:
1954 lu.needed_locks[locking.LEVEL_NETWORK] = \
1956 for instance_name in lu.owned_locks(locking.LEVEL_INSTANCE)
1957 for net_uuid in lu.cfg.GetInstanceNetworks(instance_name))
1960 def _CheckGroupLocks(lu):
1961 owned_instances = frozenset(lu.owned_locks(locking.LEVEL_INSTANCE))
1962 owned_groups = frozenset(lu.owned_locks(locking.LEVEL_NODEGROUP))
1964 # Check if node groups for locked instances are still correct
1965 for instance_name in owned_instances:
1966 _CheckInstanceNodeGroups(lu.cfg, instance_name, owned_groups)
1968 def _GetQueryData(self, lu):
1969 """Computes the list of instances and their attributes.
1972 if self.do_grouplocks:
1973 self._CheckGroupLocks(lu)
1975 cluster = lu.cfg.GetClusterInfo()
1976 all_info = lu.cfg.GetAllInstancesInfo()
1978 instance_names = self._GetNames(lu, all_info.keys(), locking.LEVEL_INSTANCE)
1980 instance_list = [all_info[name] for name in instance_names]
1981 nodes = frozenset(itertools.chain(*(inst.all_nodes
1982 for inst in instance_list)))
1983 hv_list = list(set([inst.hypervisor for inst in instance_list]))
1986 wrongnode_inst = set()
1988 # Gather data as requested
1989 if self.requested_data & set([query.IQ_LIVE, query.IQ_CONSOLE]):
1991 node_data = lu.rpc.call_all_instances_info(nodes, hv_list)
1993 result = node_data[name]
1995 # offline nodes will be in both lists
1996 assert result.fail_msg
1997 offline_nodes.append(name)
1999 bad_nodes.append(name)
2000 elif result.payload:
2001 for inst in result.payload:
2002 if inst in all_info:
2003 if all_info[inst].primary_node == name:
2004 live_data.update(result.payload)
2006 wrongnode_inst.add(inst)
2008 # orphan instance; we don't list it here as we don't
2009 # handle this case yet in the output of instance listing
2010 logging.warning("Orphan instance '%s' found on node %s",
2012 # else no instance is alive
2016 if query.IQ_DISKUSAGE in self.requested_data:
2017 gmi = ganeti.masterd.instance
2018 disk_usage = dict((inst.name,
2019 gmi.ComputeDiskSize(inst.disk_template,
2020 [{constants.IDISK_SIZE: disk.size}
2021 for disk in inst.disks]))
2022 for inst in instance_list)
2026 if query.IQ_CONSOLE in self.requested_data:
2028 for inst in instance_list:
2029 if inst.name in live_data:
2030 # Instance is running
2031 consinfo[inst.name] = _GetInstanceConsole(cluster, inst)
2033 consinfo[inst.name] = None
2034 assert set(consinfo.keys()) == set(instance_names)
2038 if query.IQ_NODES in self.requested_data:
2039 node_names = set(itertools.chain(*map(operator.attrgetter("all_nodes"),
2041 nodes = dict(lu.cfg.GetMultiNodeInfo(node_names))
2042 groups = dict((uuid, lu.cfg.GetNodeGroup(uuid))
2043 for uuid in set(map(operator.attrgetter("group"),
2049 if query.IQ_NETWORKS in self.requested_data:
2050 net_uuids = itertools.chain(*(lu.cfg.GetInstanceNetworks(i.name)
2051 for i in instance_list))
2052 networks = dict((uuid, lu.cfg.GetNetwork(uuid)) for uuid in net_uuids)
2056 return query.InstanceQueryData(instance_list, lu.cfg.GetClusterInfo(),
2057 disk_usage, offline_nodes, bad_nodes,
2058 live_data, wrongnode_inst, consinfo,
2059 nodes, groups, networks)
2062 class LUInstanceQuery(NoHooksLU):
2063 """Logical unit for querying instances.
2066 # pylint: disable=W0142
2069 def CheckArguments(self):
2070 self.iq = _InstanceQuery(qlang.MakeSimpleFilter("name", self.op.names),
2071 self.op.output_fields, self.op.use_locking)
2073 def ExpandNames(self):
2074 self.iq.ExpandNames(self)
2076 def DeclareLocks(self, level):
2077 self.iq.DeclareLocks(self, level)
2079 def Exec(self, feedback_fn):
2080 return self.iq.OldStyleQuery(self)
2083 class LUInstanceQueryData(NoHooksLU):
2084 """Query runtime instance data.
2089 def ExpandNames(self):
2090 self.needed_locks = {}
2092 # Use locking if requested or when non-static information is wanted
2093 if not (self.op.static or self.op.use_locking):
2094 self.LogWarning("Non-static data requested, locks need to be acquired")
2095 self.op.use_locking = True
2097 if self.op.instances or not self.op.use_locking:
2098 # Expand instance names right here
2099 self.wanted_names = _GetWantedInstances(self, self.op.instances)
2101 # Will use acquired locks
2102 self.wanted_names = None
2104 if self.op.use_locking:
2105 self.share_locks = _ShareAll()
2107 if self.wanted_names is None:
2108 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
2110 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
2112 self.needed_locks[locking.LEVEL_NODEGROUP] = []
2113 self.needed_locks[locking.LEVEL_NODE] = []
2114 self.needed_locks[locking.LEVEL_NETWORK] = []
2115 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2117 def DeclareLocks(self, level):
2118 if self.op.use_locking:
2119 owned_instances = self.owned_locks(locking.LEVEL_INSTANCE)
2120 if level == locking.LEVEL_NODEGROUP:
2122 # Lock all groups used by instances optimistically; this requires going
2123 # via the node before it's locked, requiring verification later on
2124 self.needed_locks[locking.LEVEL_NODEGROUP] = \
2125 frozenset(group_uuid
2126 for instance_name in owned_instances
2128 self.cfg.GetInstanceNodeGroups(instance_name))
2130 elif level == locking.LEVEL_NODE:
2131 self._LockInstancesNodes()
2133 elif level == locking.LEVEL_NETWORK:
2134 self.needed_locks[locking.LEVEL_NETWORK] = \
2136 for instance_name in owned_instances
2138 self.cfg.GetInstanceNetworks(instance_name))
2140 def CheckPrereq(self):
2141 """Check prerequisites.
2143 This only checks the optional instance list against the existing names.
2146 owned_instances = frozenset(self.owned_locks(locking.LEVEL_INSTANCE))
2147 owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP))
2148 owned_nodes = frozenset(self.owned_locks(locking.LEVEL_NODE))
2149 owned_networks = frozenset(self.owned_locks(locking.LEVEL_NETWORK))
2151 if self.wanted_names is None:
2152 assert self.op.use_locking, "Locking was not used"
2153 self.wanted_names = owned_instances
2155 instances = dict(self.cfg.GetMultiInstanceInfo(self.wanted_names))
2157 if self.op.use_locking:
2158 _CheckInstancesNodeGroups(self.cfg, instances, owned_groups, owned_nodes,
2161 assert not (owned_instances or owned_groups or
2162 owned_nodes or owned_networks)
2164 self.wanted_instances = instances.values()
2166 def _ComputeBlockdevStatus(self, node, instance, dev):
2167 """Returns the status of a block device
2170 if self.op.static or not node:
2173 self.cfg.SetDiskID(dev, node)
2175 result = self.rpc.call_blockdev_find(node, dev)
2179 result.Raise("Can't compute disk status for %s" % instance.name)
2181 status = result.payload
2185 return (status.dev_path, status.major, status.minor,
2186 status.sync_percent, status.estimated_time,
2187 status.is_degraded, status.ldisk_status)
2189 def _ComputeDiskStatus(self, instance, snode, dev):
2190 """Compute block device status.
2193 (anno_dev,) = _AnnotateDiskParams(instance, [dev], self.cfg)
2195 return self._ComputeDiskStatusInner(instance, snode, anno_dev)
2197 def _ComputeDiskStatusInner(self, instance, snode, dev):
2198 """Compute block device status.
2200 @attention: The device has to be annotated already.
2203 if dev.dev_type in constants.LDS_DRBD:
2204 # we change the snode then (otherwise we use the one passed in)
2205 if dev.logical_id[0] == instance.primary_node:
2206 snode = dev.logical_id[1]
2208 snode = dev.logical_id[0]
2210 dev_pstatus = self._ComputeBlockdevStatus(instance.primary_node,
2212 dev_sstatus = self._ComputeBlockdevStatus(snode, instance, dev)
2215 dev_children = map(compat.partial(self._ComputeDiskStatusInner,
2222 "iv_name": dev.iv_name,
2223 "dev_type": dev.dev_type,
2224 "logical_id": dev.logical_id,
2225 "physical_id": dev.physical_id,
2226 "pstatus": dev_pstatus,
2227 "sstatus": dev_sstatus,
2228 "children": dev_children,
2235 def Exec(self, feedback_fn):
2236 """Gather and return data"""
2239 cluster = self.cfg.GetClusterInfo()
2241 node_names = itertools.chain(*(i.all_nodes for i in self.wanted_instances))
2242 nodes = dict(self.cfg.GetMultiNodeInfo(node_names))
2244 groups = dict(self.cfg.GetMultiNodeGroupInfo(node.group
2245 for node in nodes.values()))
2247 group2name_fn = lambda uuid: groups[uuid].name
2248 for instance in self.wanted_instances:
2249 pnode = nodes[instance.primary_node]
2251 if self.op.static or pnode.offline:
2254 self.LogWarning("Primary node %s is marked offline, returning static"
2255 " information only for instance %s" %
2256 (pnode.name, instance.name))
2258 remote_info = self.rpc.call_instance_info(instance.primary_node,
2260 instance.hypervisor)
2261 remote_info.Raise("Error checking node %s" % instance.primary_node)
2262 remote_info = remote_info.payload
2263 if remote_info and "state" in remote_info:
2266 if instance.admin_state == constants.ADMINST_UP:
2267 remote_state = "down"
2269 remote_state = instance.admin_state
2271 disks = map(compat.partial(self._ComputeDiskStatus, instance, None),
2274 snodes_group_uuids = [nodes[snode_name].group
2275 for snode_name in instance.secondary_nodes]
2277 result[instance.name] = {
2278 "name": instance.name,
2279 "config_state": instance.admin_state,
2280 "run_state": remote_state,
2281 "pnode": instance.primary_node,
2282 "pnode_group_uuid": pnode.group,
2283 "pnode_group_name": group2name_fn(pnode.group),
2284 "snodes": instance.secondary_nodes,
2285 "snodes_group_uuids": snodes_group_uuids,
2286 "snodes_group_names": map(group2name_fn, snodes_group_uuids),
2288 # this happens to be the same format used for hooks
2289 "nics": _NICListToTuple(self, instance.nics),
2290 "disk_template": instance.disk_template,
2292 "hypervisor": instance.hypervisor,
2293 "network_port": instance.network_port,
2294 "hv_instance": instance.hvparams,
2295 "hv_actual": cluster.FillHV(instance, skip_globals=True),
2296 "be_instance": instance.beparams,
2297 "be_actual": cluster.FillBE(instance),
2298 "os_instance": instance.osparams,
2299 "os_actual": cluster.SimpleFillOS(instance.os, instance.osparams),
2300 "serial_no": instance.serial_no,
2301 "mtime": instance.mtime,
2302 "ctime": instance.ctime,
2303 "uuid": instance.uuid,
2309 class LUInstanceStartup(LogicalUnit):
2310 """Starts an instance.
2313 HPATH = "instance-start"
2314 HTYPE = constants.HTYPE_INSTANCE
2317 def CheckArguments(self):
2319 if self.op.beparams:
2320 # fill the beparams dict
2321 objects.UpgradeBeParams(self.op.beparams)
2322 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
2324 def ExpandNames(self):
2325 self._ExpandAndLockInstance()
2326 self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
2328 def DeclareLocks(self, level):
2329 if level == locking.LEVEL_NODE_RES:
2330 self._LockInstancesNodes(primary_only=True, level=locking.LEVEL_NODE_RES)
2332 def BuildHooksEnv(self):
2335 This runs on master, primary and secondary nodes of the instance.
2339 "FORCE": self.op.force,
2342 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2346 def BuildHooksNodes(self):
2347 """Build hooks nodes.
2350 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2353 def CheckPrereq(self):
2354 """Check prerequisites.
2356 This checks that the instance is in the cluster.
2359 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2360 assert self.instance is not None, \
2361 "Cannot retrieve locked instance %s" % self.op.instance_name
2364 if self.op.hvparams:
2365 # check hypervisor parameter syntax (locally)
2366 cluster = self.cfg.GetClusterInfo()
2367 utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
2368 filled_hvp = cluster.FillHV(instance)
2369 filled_hvp.update(self.op.hvparams)
2370 hv_type = hypervisor.GetHypervisorClass(instance.hypervisor)
2371 hv_type.CheckParameterSyntax(filled_hvp)
2372 _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2374 _CheckInstanceState(self, instance, INSTANCE_ONLINE)
2376 self.primary_offline = self.cfg.GetNodeInfo(instance.primary_node).offline
2378 if self.primary_offline and self.op.ignore_offline_nodes:
2379 self.LogWarning("Ignoring offline primary node")
2381 if self.op.hvparams or self.op.beparams:
2382 self.LogWarning("Overridden parameters are ignored")
2384 _CheckNodeOnline(self, instance.primary_node)
2386 bep = self.cfg.GetClusterInfo().FillBE(instance)
2387 bep.update(self.op.beparams)
2389 # check bridges existence
2390 _CheckInstanceBridgesExist(self, instance)
2392 remote_info = self.rpc.call_instance_info(instance.primary_node,
2394 instance.hypervisor)
2395 remote_info.Raise("Error checking node %s" % instance.primary_node,
2396 prereq=True, ecode=errors.ECODE_ENVIRON)
2397 if not remote_info.payload: # not running already
2398 _CheckNodeFreeMemory(self, instance.primary_node,
2399 "starting instance %s" % instance.name,
2400 bep[constants.BE_MINMEM], instance.hypervisor)
2402 def Exec(self, feedback_fn):
2403 """Start the instance.
2406 instance = self.instance
2407 force = self.op.force
2408 reason = self.op.reason
2410 if not self.op.no_remember:
2411 self.cfg.MarkInstanceUp(instance.name)
2413 if self.primary_offline:
2414 assert self.op.ignore_offline_nodes
2415 self.LogInfo("Primary node offline, marked instance as started")
2417 node_current = instance.primary_node
2419 _StartInstanceDisks(self, instance, force)
2422 self.rpc.call_instance_start(node_current,
2423 (instance, self.op.hvparams,
2425 self.op.startup_paused, reason)
2426 msg = result.fail_msg
2428 _ShutdownInstanceDisks(self, instance)
2429 raise errors.OpExecError("Could not start instance: %s" % msg)
2432 class LUInstanceShutdown(LogicalUnit):
2433 """Shutdown an instance.
2436 HPATH = "instance-stop"
2437 HTYPE = constants.HTYPE_INSTANCE
2440 def ExpandNames(self):
2441 self._ExpandAndLockInstance()
2443 def BuildHooksEnv(self):
2446 This runs on master, primary and secondary nodes of the instance.
2449 env = _BuildInstanceHookEnvByObject(self, self.instance)
2450 env["TIMEOUT"] = self.op.timeout
2453 def BuildHooksNodes(self):
2454 """Build hooks nodes.
2457 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2460 def CheckPrereq(self):
2461 """Check prerequisites.
2463 This checks that the instance is in the cluster.
2466 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2467 assert self.instance is not None, \
2468 "Cannot retrieve locked instance %s" % self.op.instance_name
2470 if not self.op.force:
2471 _CheckInstanceState(self, self.instance, INSTANCE_ONLINE)
2473 self.LogWarning("Ignoring offline instance check")
2475 self.primary_offline = \
2476 self.cfg.GetNodeInfo(self.instance.primary_node).offline
2478 if self.primary_offline and self.op.ignore_offline_nodes:
2479 self.LogWarning("Ignoring offline primary node")
2481 _CheckNodeOnline(self, self.instance.primary_node)
2483 def Exec(self, feedback_fn):
2484 """Shutdown the instance.
2487 instance = self.instance
2488 node_current = instance.primary_node
2489 timeout = self.op.timeout
2490 reason = self.op.reason
2492 # If the instance is offline we shouldn't mark it as down, as that
2493 # resets the offline flag.
2494 if not self.op.no_remember and instance.admin_state in INSTANCE_ONLINE:
2495 self.cfg.MarkInstanceDown(instance.name)
2497 if self.primary_offline:
2498 assert self.op.ignore_offline_nodes
2499 self.LogInfo("Primary node offline, marked instance as stopped")
2501 result = self.rpc.call_instance_shutdown(node_current, instance, timeout,
2503 msg = result.fail_msg
2505 self.LogWarning("Could not shutdown instance: %s", msg)
2507 _ShutdownInstanceDisks(self, instance)
2510 class LUInstanceReinstall(LogicalUnit):
2511 """Reinstall an instance.
2514 HPATH = "instance-reinstall"
2515 HTYPE = constants.HTYPE_INSTANCE
2518 def ExpandNames(self):
2519 self._ExpandAndLockInstance()
2521 def BuildHooksEnv(self):
2524 This runs on master, primary and secondary nodes of the instance.
2527 return _BuildInstanceHookEnvByObject(self, self.instance)
2529 def BuildHooksNodes(self):
2530 """Build hooks nodes.
2533 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2536 def CheckPrereq(self):
2537 """Check prerequisites.
2539 This checks that the instance is in the cluster and is not running.
2542 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2543 assert instance is not None, \
2544 "Cannot retrieve locked instance %s" % self.op.instance_name
2545 _CheckNodeOnline(self, instance.primary_node, "Instance primary node"
2546 " offline, cannot reinstall")
2548 if instance.disk_template == constants.DT_DISKLESS:
2549 raise errors.OpPrereqError("Instance '%s' has no disks" %
2550 self.op.instance_name,
2552 _CheckInstanceState(self, instance, INSTANCE_DOWN, msg="cannot reinstall")
2554 if self.op.os_type is not None:
2556 pnode = _ExpandNodeName(self.cfg, instance.primary_node)
2557 _CheckNodeHasOS(self, pnode, self.op.os_type, self.op.force_variant)
2558 instance_os = self.op.os_type
2560 instance_os = instance.os
2562 nodelist = list(instance.all_nodes)
2564 if self.op.osparams:
2565 i_osdict = _GetUpdatedParams(instance.osparams, self.op.osparams)
2566 _CheckOSParams(self, True, nodelist, instance_os, i_osdict)
2567 self.os_inst = i_osdict # the new dict (without defaults)
2571 self.instance = instance
2573 def Exec(self, feedback_fn):
2574 """Reinstall the instance.
2577 inst = self.instance
2579 if self.op.os_type is not None:
2580 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2581 inst.os = self.op.os_type
2582 # Write to configuration
2583 self.cfg.Update(inst, feedback_fn)
2585 _StartInstanceDisks(self, inst, None)
2587 feedback_fn("Running the instance OS create scripts...")
2588 # FIXME: pass debug option from opcode to backend
2589 result = self.rpc.call_instance_os_add(inst.primary_node,
2590 (inst, self.os_inst), True,
2591 self.op.debug_level)
2592 result.Raise("Could not install OS for instance %s on node %s" %
2593 (inst.name, inst.primary_node))
2595 _ShutdownInstanceDisks(self, inst)
2598 class LUInstanceReboot(LogicalUnit):
2599 """Reboot an instance.
2602 HPATH = "instance-reboot"
2603 HTYPE = constants.HTYPE_INSTANCE
2606 def ExpandNames(self):
2607 self._ExpandAndLockInstance()
2609 def BuildHooksEnv(self):
2612 This runs on master, primary and secondary nodes of the instance.
2616 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2617 "REBOOT_TYPE": self.op.reboot_type,
2618 "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
2621 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2625 def BuildHooksNodes(self):
2626 """Build hooks nodes.
2629 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2632 def CheckPrereq(self):
2633 """Check prerequisites.
2635 This checks that the instance is in the cluster.
2638 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2639 assert self.instance is not None, \
2640 "Cannot retrieve locked instance %s" % self.op.instance_name
2641 _CheckInstanceState(self, instance, INSTANCE_ONLINE)
2642 _CheckNodeOnline(self, instance.primary_node)
2644 # check bridges existence
2645 _CheckInstanceBridgesExist(self, instance)
2647 def Exec(self, feedback_fn):
2648 """Reboot the instance.
2651 instance = self.instance
2652 ignore_secondaries = self.op.ignore_secondaries
2653 reboot_type = self.op.reboot_type
2654 reason = self.op.reason
2656 remote_info = self.rpc.call_instance_info(instance.primary_node,
2658 instance.hypervisor)
2659 remote_info.Raise("Error checking node %s" % instance.primary_node)
2660 instance_running = bool(remote_info.payload)
2662 node_current = instance.primary_node
2664 if instance_running and reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2665 constants.INSTANCE_REBOOT_HARD]:
2666 for disk in instance.disks:
2667 self.cfg.SetDiskID(disk, node_current)
2668 result = self.rpc.call_instance_reboot(node_current, instance,
2670 self.op.shutdown_timeout, reason)
2671 result.Raise("Could not reboot instance")
2673 if instance_running:
2674 result = self.rpc.call_instance_shutdown(node_current, instance,
2675 self.op.shutdown_timeout,
2677 result.Raise("Could not shutdown instance for full reboot")
2678 _ShutdownInstanceDisks(self, instance)
2680 self.LogInfo("Instance %s was already stopped, starting now",
2682 _StartInstanceDisks(self, instance, ignore_secondaries)
2683 result = self.rpc.call_instance_start(node_current,
2684 (instance, None, None), False,
2686 msg = result.fail_msg
2688 _ShutdownInstanceDisks(self, instance)
2689 raise errors.OpExecError("Could not start instance for"
2690 " full reboot: %s" % msg)
2692 self.cfg.MarkInstanceUp(instance.name)
2695 class LUInstanceConsole(NoHooksLU):
2696 """Connect to an instance's console.
2698 This is somewhat special in that it returns the command line that
2699 you need to run on the master node in order to connect to the
2705 def ExpandNames(self):
2706 self.share_locks = _ShareAll()
2707 self._ExpandAndLockInstance()
2709 def CheckPrereq(self):
2710 """Check prerequisites.
2712 This checks that the instance is in the cluster.
2715 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2716 assert self.instance is not None, \
2717 "Cannot retrieve locked instance %s" % self.op.instance_name
2718 _CheckNodeOnline(self, self.instance.primary_node)
2720 def Exec(self, feedback_fn):
2721 """Connect to the console of an instance
2724 instance = self.instance
2725 node = instance.primary_node
2727 node_insts = self.rpc.call_instance_list([node],
2728 [instance.hypervisor])[node]
2729 node_insts.Raise("Can't get node information from %s" % node)
2731 if instance.name not in node_insts.payload:
2732 if instance.admin_state == constants.ADMINST_UP:
2733 state = constants.INSTST_ERRORDOWN
2734 elif instance.admin_state == constants.ADMINST_DOWN:
2735 state = constants.INSTST_ADMINDOWN
2737 state = constants.INSTST_ADMINOFFLINE
2738 raise errors.OpExecError("Instance %s is not running (state %s)" %
2739 (instance.name, state))
2741 logging.debug("Connecting to console of %s on %s", instance.name, node)
2743 return _GetInstanceConsole(self.cfg.GetClusterInfo(), instance)
2746 def _DeclareLocksForMigration(lu, level):
2747 """Declares locks for L{TLMigrateInstance}.
2749 @type lu: L{LogicalUnit}
2750 @param level: Lock level
2753 if level == locking.LEVEL_NODE_ALLOC:
2754 assert lu.op.instance_name in lu.owned_locks(locking.LEVEL_INSTANCE)
2756 instance = lu.cfg.GetInstanceInfo(lu.op.instance_name)
2758 # Node locks are already declared here rather than at LEVEL_NODE as we need
2759 # the instance object anyway to declare the node allocation lock.
2760 if instance.disk_template in constants.DTS_EXT_MIRROR:
2761 if lu.op.target_node is None:
2762 lu.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2763 lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
2765 lu.needed_locks[locking.LEVEL_NODE] = [instance.primary_node,
2767 del lu.recalculate_locks[locking.LEVEL_NODE]
2769 lu._LockInstancesNodes() # pylint: disable=W0212
2771 elif level == locking.LEVEL_NODE:
2772 # Node locks are declared together with the node allocation lock
2773 assert (lu.needed_locks[locking.LEVEL_NODE] or
2774 lu.needed_locks[locking.LEVEL_NODE] is locking.ALL_SET)
2776 elif level == locking.LEVEL_NODE_RES:
2778 lu.needed_locks[locking.LEVEL_NODE_RES] = \
2779 _CopyLockList(lu.needed_locks[locking.LEVEL_NODE])
2782 def _ExpandNamesForMigration(lu):
2783 """Expands names for use with L{TLMigrateInstance}.
2785 @type lu: L{LogicalUnit}
2788 if lu.op.target_node is not None:
2789 lu.op.target_node = _ExpandNodeName(lu.cfg, lu.op.target_node)
2791 lu.needed_locks[locking.LEVEL_NODE] = []
2792 lu.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2794 lu.needed_locks[locking.LEVEL_NODE_RES] = []
2795 lu.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
2797 # The node allocation lock is actually only needed for externally replicated
2798 # instances (e.g. sharedfile or RBD) and if an iallocator is used.
2799 lu.needed_locks[locking.LEVEL_NODE_ALLOC] = []
2802 class LUInstanceFailover(LogicalUnit):
2803 """Failover an instance.
2806 HPATH = "instance-failover"
2807 HTYPE = constants.HTYPE_INSTANCE
2810 def CheckArguments(self):
2811 """Check the arguments.
2814 self.iallocator = getattr(self.op, "iallocator", None)
2815 self.target_node = getattr(self.op, "target_node", None)
2817 def ExpandNames(self):
2818 self._ExpandAndLockInstance()
2819 _ExpandNamesForMigration(self)
2822 TLMigrateInstance(self, self.op.instance_name, False, True, False,
2823 self.op.ignore_consistency, True,
2824 self.op.shutdown_timeout, self.op.ignore_ipolicy)
2826 self.tasklets = [self._migrater]
2828 def DeclareLocks(self, level):
2829 _DeclareLocksForMigration(self, level)
2831 def BuildHooksEnv(self):
2834 This runs on master, primary and secondary nodes of the instance.
2837 instance = self._migrater.instance
2838 source_node = instance.primary_node
2839 target_node = self.op.target_node
2841 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2842 "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
2843 "OLD_PRIMARY": source_node,
2844 "NEW_PRIMARY": target_node,
2847 if instance.disk_template in constants.DTS_INT_MIRROR:
2848 env["OLD_SECONDARY"] = instance.secondary_nodes[0]
2849 env["NEW_SECONDARY"] = source_node
2851 env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = ""
2853 env.update(_BuildInstanceHookEnvByObject(self, instance))
2857 def BuildHooksNodes(self):
2858 """Build hooks nodes.
2861 instance = self._migrater.instance
2862 nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
2863 return (nl, nl + [instance.primary_node])
2866 class LUInstanceMigrate(LogicalUnit):
2867 """Migrate an instance.
2869 This is migration without shutting down, compared to the failover,
2870 which is done with shutdown.
2873 HPATH = "instance-migrate"
2874 HTYPE = constants.HTYPE_INSTANCE
2877 def ExpandNames(self):
2878 self._ExpandAndLockInstance()
2879 _ExpandNamesForMigration(self)
2882 TLMigrateInstance(self, self.op.instance_name, self.op.cleanup,
2883 False, self.op.allow_failover, False,
2884 self.op.allow_runtime_changes,
2885 constants.DEFAULT_SHUTDOWN_TIMEOUT,
2886 self.op.ignore_ipolicy)
2888 self.tasklets = [self._migrater]
2890 def DeclareLocks(self, level):
2891 _DeclareLocksForMigration(self, level)
2893 def BuildHooksEnv(self):
2896 This runs on master, primary and secondary nodes of the instance.
2899 instance = self._migrater.instance
2900 source_node = instance.primary_node
2901 target_node = self.op.target_node
2902 env = _BuildInstanceHookEnvByObject(self, instance)
2904 "MIGRATE_LIVE": self._migrater.live,
2905 "MIGRATE_CLEANUP": self.op.cleanup,
2906 "OLD_PRIMARY": source_node,
2907 "NEW_PRIMARY": target_node,
2908 "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
2911 if instance.disk_template in constants.DTS_INT_MIRROR:
2912 env["OLD_SECONDARY"] = target_node
2913 env["NEW_SECONDARY"] = source_node
2915 env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = None
2919 def BuildHooksNodes(self):
2920 """Build hooks nodes.
2923 instance = self._migrater.instance
2924 snodes = list(instance.secondary_nodes)
2925 nl = [self.cfg.GetMasterNode(), instance.primary_node] + snodes
2929 class LUInstanceMultiAlloc(NoHooksLU):
2930 """Allocates multiple instances at the same time.
2935 def CheckArguments(self):
2940 for inst in self.op.instances:
2941 if inst.iallocator is not None:
2942 raise errors.OpPrereqError("iallocator are not allowed to be set on"
2943 " instance objects", errors.ECODE_INVAL)
2944 nodes.append(bool(inst.pnode))
2945 if inst.disk_template in constants.DTS_INT_MIRROR:
2946 nodes.append(bool(inst.snode))
2948 has_nodes = compat.any(nodes)
2949 if compat.all(nodes) ^ has_nodes:
2950 raise errors.OpPrereqError("There are instance objects providing"
2951 " pnode/snode while others do not",
2954 if self.op.iallocator is None:
2955 default_iallocator = self.cfg.GetDefaultIAllocator()
2956 if default_iallocator and has_nodes:
2957 self.op.iallocator = default_iallocator
2959 raise errors.OpPrereqError("No iallocator or nodes on the instances"
2960 " given and no cluster-wide default"
2961 " iallocator found; please specify either"
2962 " an iallocator or nodes on the instances"
2963 " or set a cluster-wide default iallocator",
2966 _CheckOpportunisticLocking(self.op)
2968 dups = utils.FindDuplicates([op.instance_name for op in self.op.instances])
2970 raise errors.OpPrereqError("There are duplicate instance names: %s" %
2971 utils.CommaJoin(dups), errors.ECODE_INVAL)
2973 def ExpandNames(self):
2974 """Calculate the locks.
2977 self.share_locks = _ShareAll()
2978 self.needed_locks = {
2979 # iallocator will select nodes and even if no iallocator is used,
2980 # collisions with LUInstanceCreate should be avoided
2981 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
2984 if self.op.iallocator:
2985 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2986 self.needed_locks[locking.LEVEL_NODE_RES] = locking.ALL_SET
2988 if self.op.opportunistic_locking:
2989 self.opportunistic_locks[locking.LEVEL_NODE] = True
2990 self.opportunistic_locks[locking.LEVEL_NODE_RES] = True
2993 for inst in self.op.instances:
2994 inst.pnode = _ExpandNodeName(self.cfg, inst.pnode)
2995 nodeslist.append(inst.pnode)
2996 if inst.snode is not None:
2997 inst.snode = _ExpandNodeName(self.cfg, inst.snode)
2998 nodeslist.append(inst.snode)
3000 self.needed_locks[locking.LEVEL_NODE] = nodeslist
3001 # Lock resources of instance's primary and secondary nodes (copy to
3002 # prevent accidential modification)
3003 self.needed_locks[locking.LEVEL_NODE_RES] = list(nodeslist)
3005 def CheckPrereq(self):
3006 """Check prerequisite.
3009 cluster = self.cfg.GetClusterInfo()
3010 default_vg = self.cfg.GetVGName()
3011 ec_id = self.proc.GetECId()
3013 if self.op.opportunistic_locking:
3014 # Only consider nodes for which a lock is held
3015 node_whitelist = list(self.owned_locks(locking.LEVEL_NODE))
3017 node_whitelist = None
3019 insts = [_CreateInstanceAllocRequest(op, _ComputeDisks(op, default_vg),
3020 _ComputeNics(op, cluster, None,
3022 _ComputeFullBeParams(op, cluster),
3024 for op in self.op.instances]
3026 req = iallocator.IAReqMultiInstanceAlloc(instances=insts)
3027 ial = iallocator.IAllocator(self.cfg, self.rpc, req)
3029 ial.Run(self.op.iallocator)
3032 raise errors.OpPrereqError("Can't compute nodes using"
3033 " iallocator '%s': %s" %
3034 (self.op.iallocator, ial.info),
3037 self.ia_result = ial.result
3040 self.dry_run_result = objects.FillDict(self._ConstructPartialResult(), {
3041 constants.JOB_IDS_KEY: [],
3044 def _ConstructPartialResult(self):
3045 """Contructs the partial result.
3048 (allocatable, failed) = self.ia_result
3050 opcodes.OpInstanceMultiAlloc.ALLOCATABLE_KEY:
3051 map(compat.fst, allocatable),
3052 opcodes.OpInstanceMultiAlloc.FAILED_KEY: failed,
3055 def Exec(self, feedback_fn):
3056 """Executes the opcode.
3059 op2inst = dict((op.instance_name, op) for op in self.op.instances)
3060 (allocatable, failed) = self.ia_result
3063 for (name, nodes) in allocatable:
3064 op = op2inst.pop(name)
3067 (op.pnode, op.snode) = nodes
3073 missing = set(op2inst.keys()) - set(failed)
3074 assert not missing, \
3075 "Iallocator did return incomplete result: %s" % utils.CommaJoin(missing)
3077 return ResultWithJobs(jobs, **self._ConstructPartialResult())
3080 class _InstNicModPrivate:
3081 """Data structure for network interface modifications.
3083 Used by L{LUInstanceSetParams}.
3091 def PrepareContainerMods(mods, private_fn):
3092 """Prepares a list of container modifications by adding a private data field.
3094 @type mods: list of tuples; (operation, index, parameters)
3095 @param mods: List of modifications
3096 @type private_fn: callable or None
3097 @param private_fn: Callable for constructing a private data field for a
3102 if private_fn is None:
3107 return [(op, idx, params, fn()) for (op, idx, params) in mods]
3110 def _CheckNodesPhysicalCPUs(lu, nodenames, requested, hypervisor_name):
3111 """Checks if nodes have enough physical CPUs
3113 This function checks if all given nodes have the needed number of
3114 physical CPUs. In case any node has less CPUs or we cannot get the
3115 information from the node, this function raises an OpPrereqError
3118 @type lu: C{LogicalUnit}
3119 @param lu: a logical unit from which we get configuration data
3120 @type nodenames: C{list}
3121 @param nodenames: the list of node names to check
3122 @type requested: C{int}
3123 @param requested: the minimum acceptable number of physical CPUs
3124 @raise errors.OpPrereqError: if the node doesn't have enough CPUs,
3125 or we cannot check the node
3128 nodeinfo = lu.rpc.call_node_info(nodenames, None, [hypervisor_name], None)
3129 for node in nodenames:
3130 info = nodeinfo[node]
3131 info.Raise("Cannot get current information from node %s" % node,
3132 prereq=True, ecode=errors.ECODE_ENVIRON)
3133 (_, _, (hv_info, )) = info.payload
3134 num_cpus = hv_info.get("cpu_total", None)
3135 if not isinstance(num_cpus, int):
3136 raise errors.OpPrereqError("Can't compute the number of physical CPUs"
3137 " on node %s, result was '%s'" %
3138 (node, num_cpus), errors.ECODE_ENVIRON)
3139 if requested > num_cpus:
3140 raise errors.OpPrereqError("Node %s has %s physical CPUs, but %s are "
3141 "required" % (node, num_cpus, requested),
3145 def GetItemFromContainer(identifier, kind, container):
3146 """Return the item refered by the identifier.
3148 @type identifier: string
3149 @param identifier: Item index or name or UUID
3151 @param kind: One-word item description
3152 @type container: list
3153 @param container: Container to get the item from
3158 idx = int(identifier)
3161 absidx = len(container) - 1
3163 raise IndexError("Not accepting negative indices other than -1")
3164 elif idx > len(container):
3165 raise IndexError("Got %s index %s, but there are only %s" %
3166 (kind, idx, len(container)))
3169 return (absidx, container[idx])
3173 for idx, item in enumerate(container):
3174 if item.uuid == identifier or item.name == identifier:
3177 raise errors.OpPrereqError("Cannot find %s with identifier %s" %
3178 (kind, identifier), errors.ECODE_NOENT)
3181 def ApplyContainerMods(kind, container, chgdesc, mods,
3182 create_fn, modify_fn, remove_fn):
3183 """Applies descriptions in C{mods} to C{container}.
3186 @param kind: One-word item description
3187 @type container: list
3188 @param container: Container to modify
3189 @type chgdesc: None or list
3190 @param chgdesc: List of applied changes
3192 @param mods: Modifications as returned by L{PrepareContainerMods}
3193 @type create_fn: callable
3194 @param create_fn: Callback for creating a new item (L{constants.DDM_ADD});
3195 receives absolute item index, parameters and private data object as added
3196 by L{PrepareContainerMods}, returns tuple containing new item and changes
3198 @type modify_fn: callable
3199 @param modify_fn: Callback for modifying an existing item
3200 (L{constants.DDM_MODIFY}); receives absolute item index, item, parameters
3201 and private data object as added by L{PrepareContainerMods}, returns
3203 @type remove_fn: callable
3204 @param remove_fn: Callback on removing item; receives absolute item index,
3205 item and private data object as added by L{PrepareContainerMods}
3208 for (op, identifier, params, private) in mods:
3211 if op == constants.DDM_ADD:
3212 # Calculate where item will be added
3213 # When adding an item, identifier can only be an index
3215 idx = int(identifier)
3217 raise errors.OpPrereqError("Only possitive integer or -1 is accepted as"
3218 " identifier for %s" % constants.DDM_ADD,
3221 addidx = len(container)
3224 raise IndexError("Not accepting negative indices other than -1")
3225 elif idx > len(container):
3226 raise IndexError("Got %s index %s, but there are only %s" %
3227 (kind, idx, len(container)))
3230 if create_fn is None:
3233 (item, changes) = create_fn(addidx, params, private)
3236 container.append(item)
3239 assert idx <= len(container)
3240 # list.insert does so before the specified index
3241 container.insert(idx, item)
3243 # Retrieve existing item
3244 (absidx, item) = GetItemFromContainer(identifier, kind, container)
3246 if op == constants.DDM_REMOVE:
3249 if remove_fn is not None:
3250 remove_fn(absidx, item, private)
3252 changes = [("%s/%s" % (kind, absidx), "remove")]
3254 assert container[absidx] == item
3255 del container[absidx]
3256 elif op == constants.DDM_MODIFY:
3257 if modify_fn is not None:
3258 changes = modify_fn(absidx, item, params, private)
3260 raise errors.ProgrammerError("Unhandled operation '%s'" % op)
3262 assert _TApplyContModsCbChanges(changes)
3264 if not (chgdesc is None or changes is None):
3265 chgdesc.extend(changes)
3268 def _UpdateIvNames(base_index, disks):
3269 """Updates the C{iv_name} attribute of disks.
3271 @type disks: list of L{objects.Disk}
3274 for (idx, disk) in enumerate(disks):
3275 disk.iv_name = "disk/%s" % (base_index + idx, )
3278 class LUInstanceSetParams(LogicalUnit):
3279 """Modifies an instances's parameters.
3282 HPATH = "instance-modify"
3283 HTYPE = constants.HTYPE_INSTANCE
3287 def _UpgradeDiskNicMods(kind, mods, verify_fn):
3288 assert ht.TList(mods)
3289 assert not mods or len(mods[0]) in (2, 3)
3291 if mods and len(mods[0]) == 2:
3295 for op, params in mods:
3296 if op in (constants.DDM_ADD, constants.DDM_REMOVE):
3297 result.append((op, -1, params))
3301 raise errors.OpPrereqError("Only one %s add or remove operation is"
3302 " supported at a time" % kind,
3305 result.append((constants.DDM_MODIFY, op, params))
3307 assert verify_fn(result)
3314 def _CheckMods(kind, mods, key_types, item_fn):
3315 """Ensures requested disk/NIC modifications are valid.
3318 for (op, _, params) in mods:
3319 assert ht.TDict(params)
3321 # If 'key_types' is an empty dict, we assume we have an
3322 # 'ext' template and thus do not ForceDictType
3324 utils.ForceDictType(params, key_types)
3326 if op == constants.DDM_REMOVE:
3328 raise errors.OpPrereqError("No settings should be passed when"
3329 " removing a %s" % kind,
3331 elif op in (constants.DDM_ADD, constants.DDM_MODIFY):
3334 raise errors.ProgrammerError("Unhandled operation '%s'" % op)
3337 def _VerifyDiskModification(op, params):
3338 """Verifies a disk modification.
3341 if op == constants.DDM_ADD:
3342 mode = params.setdefault(constants.IDISK_MODE, constants.DISK_RDWR)
3343 if mode not in constants.DISK_ACCESS_SET:
3344 raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode,
3347 size = params.get(constants.IDISK_SIZE, None)
3349 raise errors.OpPrereqError("Required disk parameter '%s' missing" %
3350 constants.IDISK_SIZE, errors.ECODE_INVAL)
3354 except (TypeError, ValueError), err:
3355 raise errors.OpPrereqError("Invalid disk size parameter: %s" % err,
3358 params[constants.IDISK_SIZE] = size
3359 name = params.get(constants.IDISK_NAME, None)
3360 if name is not None and name.lower() == constants.VALUE_NONE:
3361 params[constants.IDISK_NAME] = None
3363 elif op == constants.DDM_MODIFY:
3364 if constants.IDISK_SIZE in params:
3365 raise errors.OpPrereqError("Disk size change not possible, use"
3366 " grow-disk", errors.ECODE_INVAL)
3368 raise errors.OpPrereqError("Disk modification doesn't support"
3369 " additional arbitrary parameters",
3371 name = params.get(constants.IDISK_NAME, None)
3372 if name is not None and name.lower() == constants.VALUE_NONE:
3373 params[constants.IDISK_NAME] = None
3376 def _VerifyNicModification(op, params):
3377 """Verifies a network interface modification.
3380 if op in (constants.DDM_ADD, constants.DDM_MODIFY):
3381 ip = params.get(constants.INIC_IP, None)
3382 name = params.get(constants.INIC_NAME, None)
3383 req_net = params.get(constants.INIC_NETWORK, None)
3384 link = params.get(constants.NIC_LINK, None)
3385 mode = params.get(constants.NIC_MODE, None)
3386 if name is not None and name.lower() == constants.VALUE_NONE:
3387 params[constants.INIC_NAME] = None
3388 if req_net is not None:
3389 if req_net.lower() == constants.VALUE_NONE:
3390 params[constants.INIC_NETWORK] = None
3392 elif link is not None or mode is not None:
3393 raise errors.OpPrereqError("If network is given"
3394 " mode or link should not",
3397 if op == constants.DDM_ADD:
3398 macaddr = params.get(constants.INIC_MAC, None)
3400 params[constants.INIC_MAC] = constants.VALUE_AUTO
3403 if ip.lower() == constants.VALUE_NONE:
3404 params[constants.INIC_IP] = None
3406 if ip.lower() == constants.NIC_IP_POOL:
3407 if op == constants.DDM_ADD and req_net is None:
3408 raise errors.OpPrereqError("If ip=pool, parameter network"
3412 if not netutils.IPAddress.IsValid(ip):
3413 raise errors.OpPrereqError("Invalid IP address '%s'" % ip,
3416 if constants.INIC_MAC in params:
3417 macaddr = params[constants.INIC_MAC]
3418 if macaddr not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3419 macaddr = utils.NormalizeAndValidateMac(macaddr)
3421 if op == constants.DDM_MODIFY and macaddr == constants.VALUE_AUTO:
3422 raise errors.OpPrereqError("'auto' is not a valid MAC address when"
3423 " modifying an existing NIC",
3426 def CheckArguments(self):
3427 if not (self.op.nics or self.op.disks or self.op.disk_template or
3428 self.op.hvparams or self.op.beparams or self.op.os_name or
3429 self.op.offline is not None or self.op.runtime_mem or
3431 raise errors.OpPrereqError("No changes submitted", errors.ECODE_INVAL)
3433 if self.op.hvparams:
3434 _CheckParamsNotGlobal(self.op.hvparams, constants.HVC_GLOBALS,
3435 "hypervisor", "instance", "cluster")
3437 self.op.disks = self._UpgradeDiskNicMods(
3438 "disk", self.op.disks, opcodes.OpInstanceSetParams.TestDiskModifications)
3439 self.op.nics = self._UpgradeDiskNicMods(
3440 "NIC", self.op.nics, opcodes.OpInstanceSetParams.TestNicModifications)
3442 if self.op.disks and self.op.disk_template is not None:
3443 raise errors.OpPrereqError("Disk template conversion and other disk"
3444 " changes not supported at the same time",
3447 if (self.op.disk_template and
3448 self.op.disk_template in constants.DTS_INT_MIRROR and
3449 self.op.remote_node is None):
3450 raise errors.OpPrereqError("Changing the disk template to a mirrored"
3451 " one requires specifying a secondary node",
3454 # Check NIC modifications
3455 self._CheckMods("NIC", self.op.nics, constants.INIC_PARAMS_TYPES,
3456 self._VerifyNicModification)
3459 self.op.pnode = _ExpandNodeName(self.cfg, self.op.pnode)
3461 def ExpandNames(self):
3462 self._ExpandAndLockInstance()
3463 self.needed_locks[locking.LEVEL_NODEGROUP] = []
3464 # Can't even acquire node locks in shared mode as upcoming changes in
3465 # Ganeti 2.6 will start to modify the node object on disk conversion
3466 self.needed_locks[locking.LEVEL_NODE] = []
3467 self.needed_locks[locking.LEVEL_NODE_RES] = []
3468 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3469 # Look node group to look up the ipolicy
3470 self.share_locks[locking.LEVEL_NODEGROUP] = 1
3472 def DeclareLocks(self, level):
3473 if level == locking.LEVEL_NODEGROUP:
3474 assert not self.needed_locks[locking.LEVEL_NODEGROUP]
3475 # Acquire locks for the instance's nodegroups optimistically. Needs
3476 # to be verified in CheckPrereq
3477 self.needed_locks[locking.LEVEL_NODEGROUP] = \
3478 self.cfg.GetInstanceNodeGroups(self.op.instance_name)
3479 elif level == locking.LEVEL_NODE:
3480 self._LockInstancesNodes()
3481 if self.op.disk_template and self.op.remote_node:
3482 self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node)
3483 self.needed_locks[locking.LEVEL_NODE].append(self.op.remote_node)
3484 elif level == locking.LEVEL_NODE_RES and self.op.disk_template:
3486 self.needed_locks[locking.LEVEL_NODE_RES] = \
3487 _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
3489 def BuildHooksEnv(self):
3492 This runs on the master, primary and secondaries.
3496 if constants.BE_MINMEM in self.be_new:
3497 args["minmem"] = self.be_new[constants.BE_MINMEM]
3498 if constants.BE_MAXMEM in self.be_new:
3499 args["maxmem"] = self.be_new[constants.BE_MAXMEM]
3500 if constants.BE_VCPUS in self.be_new:
3501 args["vcpus"] = self.be_new[constants.BE_VCPUS]
3502 # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
3503 # information at all.
3505 if self._new_nics is not None:
3508 for nic in self._new_nics:
3509 n = copy.deepcopy(nic)
3510 nicparams = self.cluster.SimpleFillNIC(n.nicparams)
3511 n.nicparams = nicparams
3512 nics.append(_NICToTuple(self, n))
3516 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
3517 if self.op.disk_template:
3518 env["NEW_DISK_TEMPLATE"] = self.op.disk_template
3519 if self.op.runtime_mem:
3520 env["RUNTIME_MEMORY"] = self.op.runtime_mem
3524 def BuildHooksNodes(self):
3525 """Build hooks nodes.
3528 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3531 def _PrepareNicModification(self, params, private, old_ip, old_net_uuid,
3532 old_params, cluster, pnode):
3534 update_params_dict = dict([(key, params[key])
3535 for key in constants.NICS_PARAMETERS
3538 req_link = update_params_dict.get(constants.NIC_LINK, None)
3539 req_mode = update_params_dict.get(constants.NIC_MODE, None)
3542 new_net_uuid_or_name = params.get(constants.INIC_NETWORK, old_net_uuid)
3543 if new_net_uuid_or_name:
3544 new_net_uuid = self.cfg.LookupNetwork(new_net_uuid_or_name)
3545 new_net_obj = self.cfg.GetNetwork(new_net_uuid)
3548 old_net_obj = self.cfg.GetNetwork(old_net_uuid)
3551 netparams = self.cfg.GetGroupNetParams(new_net_uuid, pnode)
3553 raise errors.OpPrereqError("No netparams found for the network"
3554 " %s, probably not connected" %
3555 new_net_obj.name, errors.ECODE_INVAL)
3556 new_params = dict(netparams)
3558 new_params = _GetUpdatedParams(old_params, update_params_dict)
3560 utils.ForceDictType(new_params, constants.NICS_PARAMETER_TYPES)
3562 new_filled_params = cluster.SimpleFillNIC(new_params)
3563 objects.NIC.CheckParameterSyntax(new_filled_params)
3565 new_mode = new_filled_params[constants.NIC_MODE]
3566 if new_mode == constants.NIC_MODE_BRIDGED:
3567 bridge = new_filled_params[constants.NIC_LINK]
3568 msg = self.rpc.call_bridges_exist(pnode, [bridge]).fail_msg
3570 msg = "Error checking bridges on node '%s': %s" % (pnode, msg)
3572 self.warn.append(msg)
3574 raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
3576 elif new_mode == constants.NIC_MODE_ROUTED:
3577 ip = params.get(constants.INIC_IP, old_ip)
3579 raise errors.OpPrereqError("Cannot set the NIC IP address to None"
3580 " on a routed NIC", errors.ECODE_INVAL)
3582 elif new_mode == constants.NIC_MODE_OVS:
3583 # TODO: check OVS link
3584 self.LogInfo("OVS links are currently not checked for correctness")
3586 if constants.INIC_MAC in params:
3587 mac = params[constants.INIC_MAC]
3589 raise errors.OpPrereqError("Cannot unset the NIC MAC address",
3591 elif mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3592 # otherwise generate the MAC address
3593 params[constants.INIC_MAC] = \
3594 self.cfg.GenerateMAC(new_net_uuid, self.proc.GetECId())
3596 # or validate/reserve the current one
3598 self.cfg.ReserveMAC(mac, self.proc.GetECId())
3599 except errors.ReservationError:
3600 raise errors.OpPrereqError("MAC address '%s' already in use"
3601 " in cluster" % mac,
3602 errors.ECODE_NOTUNIQUE)
3603 elif new_net_uuid != old_net_uuid:
3605 def get_net_prefix(net_uuid):
3608 nobj = self.cfg.GetNetwork(net_uuid)
3609 mac_prefix = nobj.mac_prefix
3613 new_prefix = get_net_prefix(new_net_uuid)
3614 old_prefix = get_net_prefix(old_net_uuid)
3615 if old_prefix != new_prefix:
3616 params[constants.INIC_MAC] = \
3617 self.cfg.GenerateMAC(new_net_uuid, self.proc.GetECId())
3619 # if there is a change in (ip, network) tuple
3620 new_ip = params.get(constants.INIC_IP, old_ip)
3621 if (new_ip, new_net_uuid) != (old_ip, old_net_uuid):
3623 # if IP is pool then require a network and generate one IP
3624 if new_ip.lower() == constants.NIC_IP_POOL:
3627 new_ip = self.cfg.GenerateIp(new_net_uuid, self.proc.GetECId())
3628 except errors.ReservationError:
3629 raise errors.OpPrereqError("Unable to get a free IP"
3630 " from the address pool",
3632 self.LogInfo("Chose IP %s from network %s",
3635 params[constants.INIC_IP] = new_ip
3637 raise errors.OpPrereqError("ip=pool, but no network found",
3639 # Reserve new IP if in the new network if any
3642 self.cfg.ReserveIp(new_net_uuid, new_ip, self.proc.GetECId())
3643 self.LogInfo("Reserving IP %s in network %s",
3644 new_ip, new_net_obj.name)
3645 except errors.ReservationError:
3646 raise errors.OpPrereqError("IP %s not available in network %s" %
3647 (new_ip, new_net_obj.name),
3648 errors.ECODE_NOTUNIQUE)
3649 # new network is None so check if new IP is a conflicting IP
3650 elif self.op.conflicts_check:
3651 _CheckForConflictingIp(self, new_ip, pnode)
3653 # release old IP if old network is not None
3654 if old_ip and old_net_uuid:
3656 self.cfg.ReleaseIp(old_net_uuid, old_ip, self.proc.GetECId())
3657 except errors.AddressPoolError:
3658 logging.warning("Release IP %s not contained in network %s",
3659 old_ip, old_net_obj.name)
3661 # there are no changes in (ip, network) tuple and old network is not None
3662 elif (old_net_uuid is not None and
3663 (req_link is not None or req_mode is not None)):
3664 raise errors.OpPrereqError("Not allowed to change link or mode of"
3665 " a NIC that is connected to a network",
3668 private.params = new_params
3669 private.filled = new_filled_params
3671 def _PreCheckDiskTemplate(self, pnode_info):
3672 """CheckPrereq checks related to a new disk template."""
3673 # Arguments are passed to avoid configuration lookups
3674 instance = self.instance
3675 pnode = instance.primary_node
3676 cluster = self.cluster
3677 if instance.disk_template == self.op.disk_template:
3678 raise errors.OpPrereqError("Instance already has disk template %s" %
3679 instance.disk_template, errors.ECODE_INVAL)
3681 if (instance.disk_template,
3682 self.op.disk_template) not in self._DISK_CONVERSIONS:
3683 raise errors.OpPrereqError("Unsupported disk template conversion from"
3684 " %s to %s" % (instance.disk_template,
3685 self.op.disk_template),
3687 _CheckInstanceState(self, instance, INSTANCE_DOWN,
3688 msg="cannot change disk template")
3689 if self.op.disk_template in constants.DTS_INT_MIRROR:
3690 if self.op.remote_node == pnode:
3691 raise errors.OpPrereqError("Given new secondary node %s is the same"
3692 " as the primary node of the instance" %
3693 self.op.remote_node, errors.ECODE_STATE)
3694 _CheckNodeOnline(self, self.op.remote_node)
3695 _CheckNodeNotDrained(self, self.op.remote_node)
3696 # FIXME: here we assume that the old instance type is DT_PLAIN
3697 assert instance.disk_template == constants.DT_PLAIN
3698 disks = [{constants.IDISK_SIZE: d.size,
3699 constants.IDISK_VG: d.logical_id[0]}
3700 for d in instance.disks]
3701 required = _ComputeDiskSizePerVG(self.op.disk_template, disks)
3702 _CheckNodesFreeDiskPerVG(self, [self.op.remote_node], required)
3704 snode_info = self.cfg.GetNodeInfo(self.op.remote_node)
3705 snode_group = self.cfg.GetNodeGroup(snode_info.group)
3706 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
3708 _CheckTargetNodeIPolicy(self, ipolicy, instance, snode_info, self.cfg,
3709 ignore=self.op.ignore_ipolicy)
3710 if pnode_info.group != snode_info.group:
3711 self.LogWarning("The primary and secondary nodes are in two"
3712 " different node groups; the disk parameters"
3713 " from the first disk's node group will be"
3716 if not self.op.disk_template in constants.DTS_EXCL_STORAGE:
3717 # Make sure none of the nodes require exclusive storage
3718 nodes = [pnode_info]
3719 if self.op.disk_template in constants.DTS_INT_MIRROR:
3721 nodes.append(snode_info)
3722 has_es = lambda n: _IsExclusiveStorageEnabledNode(self.cfg, n)
3723 if compat.any(map(has_es, nodes)):
3724 errmsg = ("Cannot convert disk template from %s to %s when exclusive"
3725 " storage is enabled" % (instance.disk_template,
3726 self.op.disk_template))
3727 raise errors.OpPrereqError(errmsg, errors.ECODE_STATE)
3729 def CheckPrereq(self):
3730 """Check prerequisites.
3732 This only checks the instance list against the existing names.
3735 assert self.op.instance_name in self.owned_locks(locking.LEVEL_INSTANCE)
3736 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3738 cluster = self.cluster = self.cfg.GetClusterInfo()
3739 assert self.instance is not None, \
3740 "Cannot retrieve locked instance %s" % self.op.instance_name
3742 pnode = instance.primary_node
3746 if (self.op.pnode is not None and self.op.pnode != pnode and
3748 # verify that the instance is not up
3749 instance_info = self.rpc.call_instance_info(pnode, instance.name,
3750 instance.hypervisor)
3751 if instance_info.fail_msg:
3752 self.warn.append("Can't get instance runtime information: %s" %
3753 instance_info.fail_msg)
3754 elif instance_info.payload:
3755 raise errors.OpPrereqError("Instance is still running on %s" % pnode,
3758 assert pnode in self.owned_locks(locking.LEVEL_NODE)
3759 nodelist = list(instance.all_nodes)
3760 pnode_info = self.cfg.GetNodeInfo(pnode)
3761 self.diskparams = self.cfg.GetInstanceDiskParams(instance)
3763 #_CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups)
3764 assert pnode_info.group in self.owned_locks(locking.LEVEL_NODEGROUP)
3765 group_info = self.cfg.GetNodeGroup(pnode_info.group)
3767 # dictionary with instance information after the modification
3770 # Check disk modifications. This is done here and not in CheckArguments
3771 # (as with NICs), because we need to know the instance's disk template
3772 if instance.disk_template == constants.DT_EXT:
3773 self._CheckMods("disk", self.op.disks, {},
3774 self._VerifyDiskModification)
3776 self._CheckMods("disk", self.op.disks, constants.IDISK_PARAMS_TYPES,
3777 self._VerifyDiskModification)
3779 # Prepare disk/NIC modifications
3780 self.diskmod = PrepareContainerMods(self.op.disks, None)
3781 self.nicmod = PrepareContainerMods(self.op.nics, _InstNicModPrivate)
3783 # Check the validity of the `provider' parameter
3784 if instance.disk_template in constants.DT_EXT:
3785 for mod in self.diskmod:
3786 ext_provider = mod[2].get(constants.IDISK_PROVIDER, None)
3787 if mod[0] == constants.DDM_ADD:
3788 if ext_provider is None:
3789 raise errors.OpPrereqError("Instance template is '%s' and parameter"
3790 " '%s' missing, during disk add" %
3792 constants.IDISK_PROVIDER),
3794 elif mod[0] == constants.DDM_MODIFY:
3796 raise errors.OpPrereqError("Parameter '%s' is invalid during disk"
3798 constants.IDISK_PROVIDER,
3801 for mod in self.diskmod:
3802 ext_provider = mod[2].get(constants.IDISK_PROVIDER, None)
3803 if ext_provider is not None:
3804 raise errors.OpPrereqError("Parameter '%s' is only valid for"
3805 " instances of type '%s'" %
3806 (constants.IDISK_PROVIDER,
3811 if self.op.os_name and not self.op.force:
3812 _CheckNodeHasOS(self, instance.primary_node, self.op.os_name,
3813 self.op.force_variant)
3814 instance_os = self.op.os_name
3816 instance_os = instance.os
3818 assert not (self.op.disk_template and self.op.disks), \
3819 "Can't modify disk template and apply disk changes at the same time"
3821 if self.op.disk_template:
3822 self._PreCheckDiskTemplate(pnode_info)
3824 # hvparams processing
3825 if self.op.hvparams:
3826 hv_type = instance.hypervisor
3827 i_hvdict = _GetUpdatedParams(instance.hvparams, self.op.hvparams)
3828 utils.ForceDictType(i_hvdict, constants.HVS_PARAMETER_TYPES)
3829 hv_new = cluster.SimpleFillHV(hv_type, instance.os, i_hvdict)
3832 hypervisor.GetHypervisorClass(hv_type).CheckParameterSyntax(hv_new)
3833 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
3834 self.hv_proposed = self.hv_new = hv_new # the new actual values
3835 self.hv_inst = i_hvdict # the new dict (without defaults)
3837 self.hv_proposed = cluster.SimpleFillHV(instance.hypervisor, instance.os,
3839 self.hv_new = self.hv_inst = {}
3841 # beparams processing
3842 if self.op.beparams:
3843 i_bedict = _GetUpdatedParams(instance.beparams, self.op.beparams,
3845 objects.UpgradeBeParams(i_bedict)
3846 utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES)
3847 be_new = cluster.SimpleFillBE(i_bedict)
3848 self.be_proposed = self.be_new = be_new # the new actual values
3849 self.be_inst = i_bedict # the new dict (without defaults)
3851 self.be_new = self.be_inst = {}
3852 self.be_proposed = cluster.SimpleFillBE(instance.beparams)
3853 be_old = cluster.FillBE(instance)
3855 # CPU param validation -- checking every time a parameter is
3856 # changed to cover all cases where either CPU mask or vcpus have
3858 if (constants.BE_VCPUS in self.be_proposed and
3859 constants.HV_CPU_MASK in self.hv_proposed):
3861 utils.ParseMultiCpuMask(self.hv_proposed[constants.HV_CPU_MASK])
3862 # Verify mask is consistent with number of vCPUs. Can skip this
3863 # test if only 1 entry in the CPU mask, which means same mask
3864 # is applied to all vCPUs.
3865 if (len(cpu_list) > 1 and
3866 len(cpu_list) != self.be_proposed[constants.BE_VCPUS]):
3867 raise errors.OpPrereqError("Number of vCPUs [%d] does not match the"
3869 (self.be_proposed[constants.BE_VCPUS],
3870 self.hv_proposed[constants.HV_CPU_MASK]),
3873 # Only perform this test if a new CPU mask is given
3874 if constants.HV_CPU_MASK in self.hv_new:
3875 # Calculate the largest CPU number requested
3876 max_requested_cpu = max(map(max, cpu_list))
3877 # Check that all of the instance's nodes have enough physical CPUs to
3878 # satisfy the requested CPU mask
3879 _CheckNodesPhysicalCPUs(self, instance.all_nodes,
3880 max_requested_cpu + 1, instance.hypervisor)
3882 # osparams processing
3883 if self.op.osparams:
3884 i_osdict = _GetUpdatedParams(instance.osparams, self.op.osparams)
3885 _CheckOSParams(self, True, nodelist, instance_os, i_osdict)
3886 self.os_inst = i_osdict # the new dict (without defaults)
3890 #TODO(dynmem): do the appropriate check involving MINMEM
3891 if (constants.BE_MAXMEM in self.op.beparams and not self.op.force and
3892 be_new[constants.BE_MAXMEM] > be_old[constants.BE_MAXMEM]):
3893 mem_check_list = [pnode]
3894 if be_new[constants.BE_AUTO_BALANCE]:
3895 # either we changed auto_balance to yes or it was from before
3896 mem_check_list.extend(instance.secondary_nodes)
3897 instance_info = self.rpc.call_instance_info(pnode, instance.name,
3898 instance.hypervisor)
3899 nodeinfo = self.rpc.call_node_info(mem_check_list, None,
3900 [instance.hypervisor], False)
3901 pninfo = nodeinfo[pnode]
3902 msg = pninfo.fail_msg
3904 # Assume the primary node is unreachable and go ahead
3905 self.warn.append("Can't get info from primary node %s: %s" %
3908 (_, _, (pnhvinfo, )) = pninfo.payload
3909 if not isinstance(pnhvinfo.get("memory_free", None), int):
3910 self.warn.append("Node data from primary node %s doesn't contain"
3911 " free memory information" % pnode)
3912 elif instance_info.fail_msg:
3913 self.warn.append("Can't get instance runtime information: %s" %
3914 instance_info.fail_msg)
3916 if instance_info.payload:
3917 current_mem = int(instance_info.payload["memory"])
3919 # Assume instance not running
3920 # (there is a slight race condition here, but it's not very
3921 # probable, and we have no other way to check)
3922 # TODO: Describe race condition
3924 #TODO(dynmem): do the appropriate check involving MINMEM
3925 miss_mem = (be_new[constants.BE_MAXMEM] - current_mem -
3926 pnhvinfo["memory_free"])
3928 raise errors.OpPrereqError("This change will prevent the instance"
3929 " from starting, due to %d MB of memory"
3930 " missing on its primary node" %
3931 miss_mem, errors.ECODE_NORES)
3933 if be_new[constants.BE_AUTO_BALANCE]:
3934 for node, nres in nodeinfo.items():
3935 if node not in instance.secondary_nodes:
3937 nres.Raise("Can't get info from secondary node %s" % node,
3938 prereq=True, ecode=errors.ECODE_STATE)
3939 (_, _, (nhvinfo, )) = nres.payload
3940 if not isinstance(nhvinfo.get("memory_free", None), int):
3941 raise errors.OpPrereqError("Secondary node %s didn't return free"
3942 " memory information" % node,
3944 #TODO(dynmem): do the appropriate check involving MINMEM
3945 elif be_new[constants.BE_MAXMEM] > nhvinfo["memory_free"]:
3946 raise errors.OpPrereqError("This change will prevent the instance"
3947 " from failover to its secondary node"
3948 " %s, due to not enough memory" % node,
3951 if self.op.runtime_mem:
3952 remote_info = self.rpc.call_instance_info(instance.primary_node,
3954 instance.hypervisor)
3955 remote_info.Raise("Error checking node %s" % instance.primary_node)
3956 if not remote_info.payload: # not running already
3957 raise errors.OpPrereqError("Instance %s is not running" %
3958 instance.name, errors.ECODE_STATE)
3960 current_memory = remote_info.payload["memory"]
3961 if (not self.op.force and
3962 (self.op.runtime_mem > self.be_proposed[constants.BE_MAXMEM] or
3963 self.op.runtime_mem < self.be_proposed[constants.BE_MINMEM])):
3964 raise errors.OpPrereqError("Instance %s must have memory between %d"
3965 " and %d MB of memory unless --force is"
3968 self.be_proposed[constants.BE_MINMEM],
3969 self.be_proposed[constants.BE_MAXMEM]),
3972 delta = self.op.runtime_mem - current_memory
3974 _CheckNodeFreeMemory(self, instance.primary_node,
3975 "ballooning memory for instance %s" %
3976 instance.name, delta, instance.hypervisor)
3978 if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
3979 raise errors.OpPrereqError("Disk operations not supported for"
3980 " diskless instances", errors.ECODE_INVAL)
3982 def _PrepareNicCreate(_, params, private):
3983 self._PrepareNicModification(params, private, None, None,
3987 def _PrepareNicMod(_, nic, params, private):
3988 self._PrepareNicModification(params, private, nic.ip, nic.network,
3989 nic.nicparams, cluster, pnode)
3992 def _PrepareNicRemove(_, params, __):
3994 net = params.network
3995 if net is not None and ip is not None:
3996 self.cfg.ReleaseIp(net, ip, self.proc.GetECId())
3998 # Verify NIC changes (operating on copy)
3999 nics = instance.nics[:]
4000 ApplyContainerMods("NIC", nics, None, self.nicmod,
4001 _PrepareNicCreate, _PrepareNicMod, _PrepareNicRemove)
4002 if len(nics) > constants.MAX_NICS:
4003 raise errors.OpPrereqError("Instance has too many network interfaces"
4004 " (%d), cannot add more" % constants.MAX_NICS,
4007 def _PrepareDiskMod(_, disk, params, __):
4008 disk.name = params.get(constants.IDISK_NAME, None)
4010 # Verify disk changes (operating on a copy)
4011 disks = copy.deepcopy(instance.disks)
4012 ApplyContainerMods("disk", disks, None, self.diskmod, None, _PrepareDiskMod,
4014 utils.ValidateDeviceNames("disk", disks)
4015 if len(disks) > constants.MAX_DISKS:
4016 raise errors.OpPrereqError("Instance has too many disks (%d), cannot add"
4017 " more" % constants.MAX_DISKS,
4019 disk_sizes = [disk.size for disk in instance.disks]
4020 disk_sizes.extend(params["size"] for (op, idx, params, private) in
4021 self.diskmod if op == constants.DDM_ADD)
4022 ispec[constants.ISPEC_DISK_COUNT] = len(disk_sizes)
4023 ispec[constants.ISPEC_DISK_SIZE] = disk_sizes
4025 if self.op.offline is not None and self.op.offline:
4026 _CheckInstanceState(self, instance, CAN_CHANGE_INSTANCE_OFFLINE,
4027 msg="can't change to offline")
4029 # Pre-compute NIC changes (necessary to use result in hooks)
4030 self._nic_chgdesc = []
4032 # Operate on copies as this is still in prereq
4033 nics = [nic.Copy() for nic in instance.nics]
4034 ApplyContainerMods("NIC", nics, self._nic_chgdesc, self.nicmod,
4035 self._CreateNewNic, self._ApplyNicMods, None)
4036 # Verify that NIC names are unique and valid
4037 utils.ValidateDeviceNames("NIC", nics)
4038 self._new_nics = nics
4039 ispec[constants.ISPEC_NIC_COUNT] = len(self._new_nics)
4041 self._new_nics = None
4042 ispec[constants.ISPEC_NIC_COUNT] = len(instance.nics)
4044 if not self.op.ignore_ipolicy:
4045 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
4048 # Fill ispec with backend parameters
4049 ispec[constants.ISPEC_SPINDLE_USE] = \
4050 self.be_new.get(constants.BE_SPINDLE_USE, None)
4051 ispec[constants.ISPEC_CPU_COUNT] = self.be_new.get(constants.BE_VCPUS,
4054 # Copy ispec to verify parameters with min/max values separately
4055 if self.op.disk_template:
4056 new_disk_template = self.op.disk_template
4058 new_disk_template = instance.disk_template
4059 ispec_max = ispec.copy()
4060 ispec_max[constants.ISPEC_MEM_SIZE] = \
4061 self.be_new.get(constants.BE_MAXMEM, None)
4062 res_max = _ComputeIPolicyInstanceSpecViolation(ipolicy, ispec_max,
4064 ispec_min = ispec.copy()
4065 ispec_min[constants.ISPEC_MEM_SIZE] = \
4066 self.be_new.get(constants.BE_MINMEM, None)
4067 res_min = _ComputeIPolicyInstanceSpecViolation(ipolicy, ispec_min,
4070 if (res_max or res_min):
4071 # FIXME: Improve error message by including information about whether
4072 # the upper or lower limit of the parameter fails the ipolicy.
4073 msg = ("Instance allocation to group %s (%s) violates policy: %s" %
4074 (group_info, group_info.name,
4075 utils.CommaJoin(set(res_max + res_min))))
4076 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
4078 def _ConvertPlainToDrbd(self, feedback_fn):
4079 """Converts an instance from plain to drbd.
4082 feedback_fn("Converting template to drbd")
4083 instance = self.instance
4084 pnode = instance.primary_node
4085 snode = self.op.remote_node
4087 assert instance.disk_template == constants.DT_PLAIN
4089 # create a fake disk info for _GenerateDiskTemplate
4090 disk_info = [{constants.IDISK_SIZE: d.size, constants.IDISK_MODE: d.mode,
4091 constants.IDISK_VG: d.logical_id[0],
4092 constants.IDISK_NAME: d.name}
4093 for d in instance.disks]
4094 new_disks = _GenerateDiskTemplate(self, self.op.disk_template,
4095 instance.name, pnode, [snode],
4096 disk_info, None, None, 0, feedback_fn,
4098 anno_disks = rpc.AnnotateDiskParams(constants.DT_DRBD8, new_disks,
4100 p_excl_stor = _IsExclusiveStorageEnabledNodeName(self.cfg, pnode)
4101 s_excl_stor = _IsExclusiveStorageEnabledNodeName(self.cfg, snode)
4102 info = _GetInstanceInfoText(instance)
4103 feedback_fn("Creating additional volumes...")
4104 # first, create the missing data and meta devices
4105 for disk in anno_disks:
4106 # unfortunately this is... not too nice
4107 _CreateSingleBlockDev(self, pnode, instance, disk.children[1],
4108 info, True, p_excl_stor)
4109 for child in disk.children:
4110 _CreateSingleBlockDev(self, snode, instance, child, info, True,
4112 # at this stage, all new LVs have been created, we can rename the
4114 feedback_fn("Renaming original volumes...")
4115 rename_list = [(o, n.children[0].logical_id)
4116 for (o, n) in zip(instance.disks, new_disks)]
4117 result = self.rpc.call_blockdev_rename(pnode, rename_list)
4118 result.Raise("Failed to rename original LVs")
4120 feedback_fn("Initializing DRBD devices...")
4121 # all child devices are in place, we can now create the DRBD devices
4123 for disk in anno_disks:
4124 for (node, excl_stor) in [(pnode, p_excl_stor), (snode, s_excl_stor)]:
4125 f_create = node == pnode
4126 _CreateSingleBlockDev(self, node, instance, disk, info, f_create,
4128 except errors.GenericError, e:
4129 feedback_fn("Initializing of DRBD devices failed;"
4130 " renaming back original volumes...")
4131 for disk in new_disks:
4132 self.cfg.SetDiskID(disk, pnode)
4133 rename_back_list = [(n.children[0], o.logical_id)
4134 for (n, o) in zip(new_disks, instance.disks)]
4135 result = self.rpc.call_blockdev_rename(pnode, rename_back_list)
4136 result.Raise("Failed to rename LVs back after error %s" % str(e))
4139 # at this point, the instance has been modified
4140 instance.disk_template = constants.DT_DRBD8
4141 instance.disks = new_disks
4142 self.cfg.Update(instance, feedback_fn)
4144 # Release node locks while waiting for sync
4145 _ReleaseLocks(self, locking.LEVEL_NODE)
4147 # disks are created, waiting for sync
4148 disk_abort = not _WaitForSync(self, instance,
4149 oneshot=not self.op.wait_for_sync)
4151 raise errors.OpExecError("There are some degraded disks for"
4152 " this instance, please cleanup manually")
4154 # Node resource locks will be released by caller
4156 def _ConvertDrbdToPlain(self, feedback_fn):
4157 """Converts an instance from drbd to plain.
4160 instance = self.instance
4162 assert len(instance.secondary_nodes) == 1
4163 assert instance.disk_template == constants.DT_DRBD8
4165 pnode = instance.primary_node
4166 snode = instance.secondary_nodes[0]
4167 feedback_fn("Converting template to plain")
4169 old_disks = _AnnotateDiskParams(instance, instance.disks, self.cfg)
4170 new_disks = [d.children[0] for d in instance.disks]
4172 # copy over size, mode and name
4173 for parent, child in zip(old_disks, new_disks):
4174 child.size = parent.size
4175 child.mode = parent.mode
4176 child.name = parent.name
4178 # this is a DRBD disk, return its port to the pool
4179 # NOTE: this must be done right before the call to cfg.Update!
4180 for disk in old_disks:
4181 tcp_port = disk.logical_id[2]
4182 self.cfg.AddTcpUdpPort(tcp_port)
4184 # update instance structure
4185 instance.disks = new_disks
4186 instance.disk_template = constants.DT_PLAIN
4187 _UpdateIvNames(0, instance.disks)
4188 self.cfg.Update(instance, feedback_fn)
4190 # Release locks in case removing disks takes a while
4191 _ReleaseLocks(self, locking.LEVEL_NODE)
4193 feedback_fn("Removing volumes on the secondary node...")
4194 for disk in old_disks:
4195 self.cfg.SetDiskID(disk, snode)
4196 msg = self.rpc.call_blockdev_remove(snode, disk).fail_msg
4198 self.LogWarning("Could not remove block device %s on node %s,"
4199 " continuing anyway: %s", disk.iv_name, snode, msg)
4201 feedback_fn("Removing unneeded volumes on the primary node...")
4202 for idx, disk in enumerate(old_disks):
4203 meta = disk.children[1]
4204 self.cfg.SetDiskID(meta, pnode)
4205 msg = self.rpc.call_blockdev_remove(pnode, meta).fail_msg
4207 self.LogWarning("Could not remove metadata for disk %d on node %s,"
4208 " continuing anyway: %s", idx, pnode, msg)
4210 def _CreateNewDisk(self, idx, params, _):
4211 """Creates a new disk.
4214 instance = self.instance
4217 if instance.disk_template in constants.DTS_FILEBASED:
4218 (file_driver, file_path) = instance.disks[0].logical_id
4219 file_path = os.path.dirname(file_path)
4221 file_driver = file_path = None
4224 _GenerateDiskTemplate(self, instance.disk_template, instance.name,
4225 instance.primary_node, instance.secondary_nodes,
4226 [params], file_path, file_driver, idx,
4227 self.Log, self.diskparams)[0]
4229 info = _GetInstanceInfoText(instance)
4231 logging.info("Creating volume %s for instance %s",
4232 disk.iv_name, instance.name)
4233 # Note: this needs to be kept in sync with _CreateDisks
4235 for node in instance.all_nodes:
4236 f_create = (node == instance.primary_node)
4238 _CreateBlockDev(self, node, instance, disk, f_create, info, f_create)
4239 except errors.OpExecError, err:
4240 self.LogWarning("Failed to create volume %s (%s) on node '%s': %s",
4241 disk.iv_name, disk, node, err)
4243 if self.cluster.prealloc_wipe_disks:
4245 _WipeDisks(self, instance,
4246 disks=[(idx, disk, 0)])
4249 ("disk/%d" % idx, "add:size=%s,mode=%s" % (disk.size, disk.mode)),
4253 def _ModifyDisk(idx, disk, params, _):
4258 mode = params.get(constants.IDISK_MODE, None)
4261 changes.append(("disk.mode/%d" % idx, disk.mode))
4263 name = params.get(constants.IDISK_NAME, None)
4265 changes.append(("disk.name/%d" % idx, disk.name))
4269 def _RemoveDisk(self, idx, root, _):
4273 (anno_disk,) = _AnnotateDiskParams(self.instance, [root], self.cfg)
4274 for node, disk in anno_disk.ComputeNodeTree(self.instance.primary_node):
4275 self.cfg.SetDiskID(disk, node)
4276 msg = self.rpc.call_blockdev_remove(node, disk).fail_msg
4278 self.LogWarning("Could not remove disk/%d on node '%s': %s,"
4279 " continuing anyway", idx, node, msg)
4281 # if this is a DRBD disk, return its port to the pool
4282 if root.dev_type in constants.LDS_DRBD:
4283 self.cfg.AddTcpUdpPort(root.logical_id[2])
4285 def _CreateNewNic(self, idx, params, private):
4286 """Creates data structure for a new network interface.
4289 mac = params[constants.INIC_MAC]
4290 ip = params.get(constants.INIC_IP, None)
4291 net = params.get(constants.INIC_NETWORK, None)
4292 name = params.get(constants.INIC_NAME, None)
4293 net_uuid = self.cfg.LookupNetwork(net)
4294 #TODO: not private.filled?? can a nic have no nicparams??
4295 nicparams = private.filled
4296 nobj = objects.NIC(mac=mac, ip=ip, network=net_uuid, name=name,
4297 nicparams=nicparams)
4298 nobj.uuid = self.cfg.GenerateUniqueID(self.proc.GetECId())
4302 "add:mac=%s,ip=%s,mode=%s,link=%s,network=%s" %
4303 (mac, ip, private.filled[constants.NIC_MODE],
4304 private.filled[constants.NIC_LINK],
4308 def _ApplyNicMods(self, idx, nic, params, private):
4309 """Modifies a network interface.
4314 for key in [constants.INIC_MAC, constants.INIC_IP, constants.INIC_NAME]:
4316 changes.append(("nic.%s/%d" % (key, idx), params[key]))
4317 setattr(nic, key, params[key])
4319 new_net = params.get(constants.INIC_NETWORK, nic.network)
4320 new_net_uuid = self.cfg.LookupNetwork(new_net)
4321 if new_net_uuid != nic.network:
4322 changes.append(("nic.network/%d" % idx, new_net))
4323 nic.network = new_net_uuid
4326 nic.nicparams = private.filled
4328 for (key, val) in nic.nicparams.items():
4329 changes.append(("nic.%s/%d" % (key, idx), val))
4333 def Exec(self, feedback_fn):
4334 """Modifies an instance.
4336 All parameters take effect only at the next restart of the instance.
4339 # Process here the warnings from CheckPrereq, as we don't have a
4340 # feedback_fn there.
4341 # TODO: Replace with self.LogWarning
4342 for warn in self.warn:
4343 feedback_fn("WARNING: %s" % warn)
4345 assert ((self.op.disk_template is None) ^
4346 bool(self.owned_locks(locking.LEVEL_NODE_RES))), \
4347 "Not owning any node resource locks"
4350 instance = self.instance
4354 instance.primary_node = self.op.pnode
4357 if self.op.runtime_mem:
4358 rpcres = self.rpc.call_instance_balloon_memory(instance.primary_node,
4360 self.op.runtime_mem)
4361 rpcres.Raise("Cannot modify instance runtime memory")
4362 result.append(("runtime_memory", self.op.runtime_mem))
4364 # Apply disk changes
4365 ApplyContainerMods("disk", instance.disks, result, self.diskmod,
4366 self._CreateNewDisk, self._ModifyDisk, self._RemoveDisk)
4367 _UpdateIvNames(0, instance.disks)
4369 if self.op.disk_template:
4371 check_nodes = set(instance.all_nodes)
4372 if self.op.remote_node:
4373 check_nodes.add(self.op.remote_node)
4374 for level in [locking.LEVEL_NODE, locking.LEVEL_NODE_RES]:
4375 owned = self.owned_locks(level)
4376 assert not (check_nodes - owned), \
4377 ("Not owning the correct locks, owning %r, expected at least %r" %
4378 (owned, check_nodes))
4380 r_shut = _ShutdownInstanceDisks(self, instance)
4382 raise errors.OpExecError("Cannot shutdown instance disks, unable to"
4383 " proceed with disk template conversion")
4384 mode = (instance.disk_template, self.op.disk_template)
4386 self._DISK_CONVERSIONS[mode](self, feedback_fn)
4388 self.cfg.ReleaseDRBDMinors(instance.name)
4390 result.append(("disk_template", self.op.disk_template))
4392 assert instance.disk_template == self.op.disk_template, \
4393 ("Expected disk template '%s', found '%s'" %
4394 (self.op.disk_template, instance.disk_template))
4396 # Release node and resource locks if there are any (they might already have
4397 # been released during disk conversion)
4398 _ReleaseLocks(self, locking.LEVEL_NODE)
4399 _ReleaseLocks(self, locking.LEVEL_NODE_RES)
4402 if self._new_nics is not None:
4403 instance.nics = self._new_nics
4404 result.extend(self._nic_chgdesc)
4407 if self.op.hvparams:
4408 instance.hvparams = self.hv_inst
4409 for key, val in self.op.hvparams.iteritems():
4410 result.append(("hv/%s" % key, val))
4413 if self.op.beparams:
4414 instance.beparams = self.be_inst
4415 for key, val in self.op.beparams.iteritems():
4416 result.append(("be/%s" % key, val))
4420 instance.os = self.op.os_name
4423 if self.op.osparams:
4424 instance.osparams = self.os_inst
4425 for key, val in self.op.osparams.iteritems():
4426 result.append(("os/%s" % key, val))
4428 if self.op.offline is None:
4431 elif self.op.offline:
4432 # Mark instance as offline
4433 self.cfg.MarkInstanceOffline(instance.name)
4434 result.append(("admin_state", constants.ADMINST_OFFLINE))
4436 # Mark instance as online, but stopped
4437 self.cfg.MarkInstanceDown(instance.name)
4438 result.append(("admin_state", constants.ADMINST_DOWN))
4440 self.cfg.Update(instance, feedback_fn, self.proc.GetECId())
4442 assert not (self.owned_locks(locking.LEVEL_NODE_RES) or
4443 self.owned_locks(locking.LEVEL_NODE)), \
4444 "All node locks should have been released by now"
4448 _DISK_CONVERSIONS = {
4449 (constants.DT_PLAIN, constants.DT_DRBD8): _ConvertPlainToDrbd,
4450 (constants.DT_DRBD8, constants.DT_PLAIN): _ConvertDrbdToPlain,
4454 class LUInstanceChangeGroup(LogicalUnit):
4455 HPATH = "instance-change-group"
4456 HTYPE = constants.HTYPE_INSTANCE
4459 def ExpandNames(self):
4460 self.share_locks = _ShareAll()
4462 self.needed_locks = {
4463 locking.LEVEL_NODEGROUP: [],
4464 locking.LEVEL_NODE: [],
4465 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
4468 self._ExpandAndLockInstance()
4470 if self.op.target_groups:
4471 self.req_target_uuids = map(self.cfg.LookupNodeGroup,
4472 self.op.target_groups)
4474 self.req_target_uuids = None
4476 self.op.iallocator = _GetDefaultIAllocator(self.cfg, self.op.iallocator)
4478 def DeclareLocks(self, level):
4479 if level == locking.LEVEL_NODEGROUP:
4480 assert not self.needed_locks[locking.LEVEL_NODEGROUP]
4482 if self.req_target_uuids:
4483 lock_groups = set(self.req_target_uuids)
4485 # Lock all groups used by instance optimistically; this requires going
4486 # via the node before it's locked, requiring verification later on
4487 instance_groups = self.cfg.GetInstanceNodeGroups(self.op.instance_name)
4488 lock_groups.update(instance_groups)
4490 # No target groups, need to lock all of them
4491 lock_groups = locking.ALL_SET
4493 self.needed_locks[locking.LEVEL_NODEGROUP] = lock_groups
4495 elif level == locking.LEVEL_NODE:
4496 if self.req_target_uuids:
4497 # Lock all nodes used by instances
4498 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4499 self._LockInstancesNodes()
4501 # Lock all nodes in all potential target groups
4502 lock_groups = (frozenset(self.owned_locks(locking.LEVEL_NODEGROUP)) -
4503 self.cfg.GetInstanceNodeGroups(self.op.instance_name))
4504 member_nodes = [node_name
4505 for group in lock_groups
4506 for node_name in self.cfg.GetNodeGroup(group).members]
4507 self.needed_locks[locking.LEVEL_NODE].extend(member_nodes)
4509 # Lock all nodes as all groups are potential targets
4510 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4512 def CheckPrereq(self):
4513 owned_instances = frozenset(self.owned_locks(locking.LEVEL_INSTANCE))
4514 owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP))
4515 owned_nodes = frozenset(self.owned_locks(locking.LEVEL_NODE))
4517 assert (self.req_target_uuids is None or
4518 owned_groups.issuperset(self.req_target_uuids))
4519 assert owned_instances == set([self.op.instance_name])
4521 # Get instance information
4522 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4524 # Check if node groups for locked instance are still correct
4525 assert owned_nodes.issuperset(self.instance.all_nodes), \
4526 ("Instance %s's nodes changed while we kept the lock" %
4527 self.op.instance_name)
4529 inst_groups = _CheckInstanceNodeGroups(self.cfg, self.op.instance_name,
4532 if self.req_target_uuids:
4533 # User requested specific target groups
4534 self.target_uuids = frozenset(self.req_target_uuids)
4536 # All groups except those used by the instance are potential targets
4537 self.target_uuids = owned_groups - inst_groups
4539 conflicting_groups = self.target_uuids & inst_groups
4540 if conflicting_groups:
4541 raise errors.OpPrereqError("Can't use group(s) '%s' as targets, they are"
4542 " used by the instance '%s'" %
4543 (utils.CommaJoin(conflicting_groups),
4544 self.op.instance_name),
4547 if not self.target_uuids:
4548 raise errors.OpPrereqError("There are no possible target groups",
4551 def BuildHooksEnv(self):
4555 assert self.target_uuids
4558 "TARGET_GROUPS": " ".join(self.target_uuids),
4561 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4565 def BuildHooksNodes(self):
4566 """Build hooks nodes.
4569 mn = self.cfg.GetMasterNode()
4572 def Exec(self, feedback_fn):
4573 instances = list(self.owned_locks(locking.LEVEL_INSTANCE))
4575 assert instances == [self.op.instance_name], "Instance not locked"
4577 req = iallocator.IAReqGroupChange(instances=instances,
4578 target_groups=list(self.target_uuids))
4579 ial = iallocator.IAllocator(self.cfg, self.rpc, req)
4581 ial.Run(self.op.iallocator)
4584 raise errors.OpPrereqError("Can't compute solution for changing group of"
4585 " instance '%s' using iallocator '%s': %s" %
4586 (self.op.instance_name, self.op.iallocator,
4587 ial.info), errors.ECODE_NORES)
4589 jobs = _LoadNodeEvacResult(self, ial.result, self.op.early_release, False)
4591 self.LogInfo("Iallocator returned %s job(s) for changing group of"
4592 " instance '%s'", len(jobs), self.op.instance_name)
4594 return ResultWithJobs(jobs)
4597 class TLMigrateInstance(Tasklet):
4598 """Tasklet class for instance migration.
4601 @ivar live: whether the migration will be done live or non-live;
4602 this variable is initalized only after CheckPrereq has run
4603 @type cleanup: boolean
4604 @ivar cleanup: Wheater we cleanup from a failed migration
4605 @type iallocator: string
4606 @ivar iallocator: The iallocator used to determine target_node
4607 @type target_node: string
4608 @ivar target_node: If given, the target_node to reallocate the instance to
4609 @type failover: boolean
4610 @ivar failover: Whether operation results in failover or migration
4611 @type fallback: boolean
4612 @ivar fallback: Whether fallback to failover is allowed if migration not
4614 @type ignore_consistency: boolean
4615 @ivar ignore_consistency: Wheter we should ignore consistency between source
4617 @type shutdown_timeout: int
4618 @ivar shutdown_timeout: In case of failover timeout of the shutdown
4619 @type ignore_ipolicy: bool
4620 @ivar ignore_ipolicy: If true, we can ignore instance policy when migrating
4625 _MIGRATION_POLL_INTERVAL = 1 # seconds
4626 _MIGRATION_FEEDBACK_INTERVAL = 10 # seconds
4628 def __init__(self, lu, instance_name, cleanup, failover, fallback,
4629 ignore_consistency, allow_runtime_changes, shutdown_timeout,
4631 """Initializes this class.
4634 Tasklet.__init__(self, lu)
4637 self.instance_name = instance_name
4638 self.cleanup = cleanup
4639 self.live = False # will be overridden later
4640 self.failover = failover
4641 self.fallback = fallback
4642 self.ignore_consistency = ignore_consistency
4643 self.shutdown_timeout = shutdown_timeout
4644 self.ignore_ipolicy = ignore_ipolicy
4645 self.allow_runtime_changes = allow_runtime_changes
4647 def CheckPrereq(self):
4648 """Check prerequisites.
4650 This checks that the instance is in the cluster.
4653 instance_name = _ExpandInstanceName(self.lu.cfg, self.instance_name)
4654 instance = self.cfg.GetInstanceInfo(instance_name)
4655 assert instance is not None
4656 self.instance = instance
4657 cluster = self.cfg.GetClusterInfo()
4659 if (not self.cleanup and
4660 not instance.admin_state == constants.ADMINST_UP and
4661 not self.failover and self.fallback):
4662 self.lu.LogInfo("Instance is marked down or offline, fallback allowed,"
4663 " switching to failover")
4664 self.failover = True
4666 if instance.disk_template not in constants.DTS_MIRRORED:
4671 raise errors.OpPrereqError("Instance's disk layout '%s' does not allow"
4672 " %s" % (instance.disk_template, text),
4675 if instance.disk_template in constants.DTS_EXT_MIRROR:
4676 _CheckIAllocatorOrNode(self.lu, "iallocator", "target_node")
4678 if self.lu.op.iallocator:
4679 assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC)
4680 self._RunAllocator()
4682 # We set set self.target_node as it is required by
4684 self.target_node = self.lu.op.target_node
4686 # Check that the target node is correct in terms of instance policy
4687 nodeinfo = self.cfg.GetNodeInfo(self.target_node)
4688 group_info = self.cfg.GetNodeGroup(nodeinfo.group)
4689 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
4691 _CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo, self.cfg,
4692 ignore=self.ignore_ipolicy)
4694 # self.target_node is already populated, either directly or by the
4696 target_node = self.target_node
4697 if self.target_node == instance.primary_node:
4698 raise errors.OpPrereqError("Cannot migrate instance %s"
4699 " to its primary (%s)" %
4700 (instance.name, instance.primary_node),
4703 if len(self.lu.tasklets) == 1:
4704 # It is safe to release locks only when we're the only tasklet
4706 _ReleaseLocks(self.lu, locking.LEVEL_NODE,
4707 keep=[instance.primary_node, self.target_node])
4708 _ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
4711 assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
4713 secondary_nodes = instance.secondary_nodes
4714 if not secondary_nodes:
4715 raise errors.ConfigurationError("No secondary node but using"
4716 " %s disk template" %
4717 instance.disk_template)
4718 target_node = secondary_nodes[0]
4719 if self.lu.op.iallocator or (self.lu.op.target_node and
4720 self.lu.op.target_node != target_node):
4722 text = "failed over"
4725 raise errors.OpPrereqError("Instances with disk template %s cannot"
4726 " be %s to arbitrary nodes"
4727 " (neither an iallocator nor a target"
4728 " node can be passed)" %
4729 (instance.disk_template, text),
4731 nodeinfo = self.cfg.GetNodeInfo(target_node)
4732 group_info = self.cfg.GetNodeGroup(nodeinfo.group)
4733 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
4735 _CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo, self.cfg,
4736 ignore=self.ignore_ipolicy)
4738 i_be = cluster.FillBE(instance)
4740 # check memory requirements on the secondary node
4741 if (not self.cleanup and
4742 (not self.failover or instance.admin_state == constants.ADMINST_UP)):
4743 self.tgt_free_mem = _CheckNodeFreeMemory(self.lu, target_node,
4744 "migrating instance %s" %
4746 i_be[constants.BE_MINMEM],
4747 instance.hypervisor)
4749 self.lu.LogInfo("Not checking memory on the secondary node as"
4750 " instance will not be started")
4752 # check if failover must be forced instead of migration
4753 if (not self.cleanup and not self.failover and
4754 i_be[constants.BE_ALWAYS_FAILOVER]):
4755 self.lu.LogInfo("Instance configured to always failover; fallback"
4757 self.failover = True
4759 # check bridge existance
4760 _CheckInstanceBridgesExist(self.lu, instance, node=target_node)
4762 if not self.cleanup:
4763 _CheckNodeNotDrained(self.lu, target_node)
4764 if not self.failover:
4765 result = self.rpc.call_instance_migratable(instance.primary_node,
4767 if result.fail_msg and self.fallback:
4768 self.lu.LogInfo("Can't migrate, instance offline, fallback to"
4770 self.failover = True
4772 result.Raise("Can't migrate, please use failover",
4773 prereq=True, ecode=errors.ECODE_STATE)
4775 assert not (self.failover and self.cleanup)
4777 if not self.failover:
4778 if self.lu.op.live is not None and self.lu.op.mode is not None:
4779 raise errors.OpPrereqError("Only one of the 'live' and 'mode'"
4780 " parameters are accepted",
4782 if self.lu.op.live is not None:
4784 self.lu.op.mode = constants.HT_MIGRATION_LIVE
4786 self.lu.op.mode = constants.HT_MIGRATION_NONLIVE
4787 # reset the 'live' parameter to None so that repeated
4788 # invocations of CheckPrereq do not raise an exception
4789 self.lu.op.live = None
4790 elif self.lu.op.mode is None:
4791 # read the default value from the hypervisor
4792 i_hv = cluster.FillHV(self.instance, skip_globals=False)
4793 self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE]
4795 self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE
4797 # Failover is never live
4800 if not (self.failover or self.cleanup):
4801 remote_info = self.rpc.call_instance_info(instance.primary_node,
4803 instance.hypervisor)
4804 remote_info.Raise("Error checking instance on node %s" %
4805 instance.primary_node)
4806 instance_running = bool(remote_info.payload)
4807 if instance_running:
4808 self.current_mem = int(remote_info.payload["memory"])
4810 def _RunAllocator(self):
4811 """Run the allocator based on input opcode.
4814 assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC)
4816 # FIXME: add a self.ignore_ipolicy option
4817 req = iallocator.IAReqRelocate(name=self.instance_name,
4818 relocate_from=[self.instance.primary_node])
4819 ial = iallocator.IAllocator(self.cfg, self.rpc, req)
4821 ial.Run(self.lu.op.iallocator)
4824 raise errors.OpPrereqError("Can't compute nodes using"
4825 " iallocator '%s': %s" %
4826 (self.lu.op.iallocator, ial.info),
4828 self.target_node = ial.result[0]
4829 self.lu.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4830 self.instance_name, self.lu.op.iallocator,
4831 utils.CommaJoin(ial.result))
4833 def _WaitUntilSync(self):
4834 """Poll with custom rpc for disk sync.
4836 This uses our own step-based rpc call.
4839 self.feedback_fn("* wait until resync is done")
4843 result = self.rpc.call_drbd_wait_sync(self.all_nodes,
4845 (self.instance.disks,
4848 for node, nres in result.items():
4849 nres.Raise("Cannot resync disks on node %s" % node)
4850 node_done, node_percent = nres.payload
4851 all_done = all_done and node_done
4852 if node_percent is not None:
4853 min_percent = min(min_percent, node_percent)
4855 if min_percent < 100:
4856 self.feedback_fn(" - progress: %.1f%%" % min_percent)
4859 def _EnsureSecondary(self, node):
4860 """Demote a node to secondary.
4863 self.feedback_fn("* switching node %s to secondary mode" % node)
4865 for dev in self.instance.disks:
4866 self.cfg.SetDiskID(dev, node)
4868 result = self.rpc.call_blockdev_close(node, self.instance.name,
4869 self.instance.disks)
4870 result.Raise("Cannot change disk to secondary on node %s" % node)
4872 def _GoStandalone(self):
4873 """Disconnect from the network.
4876 self.feedback_fn("* changing into standalone mode")
4877 result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
4878 self.instance.disks)
4879 for node, nres in result.items():
4880 nres.Raise("Cannot disconnect disks node %s" % node)
4882 def _GoReconnect(self, multimaster):
4883 """Reconnect to the network.
4889 msg = "single-master"
4890 self.feedback_fn("* changing disks into %s mode" % msg)
4891 result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
4892 (self.instance.disks, self.instance),
4893 self.instance.name, multimaster)
4894 for node, nres in result.items():
4895 nres.Raise("Cannot change disks config on node %s" % node)
4897 def _ExecCleanup(self):
4898 """Try to cleanup after a failed migration.
4900 The cleanup is done by:
4901 - check that the instance is running only on one node
4902 (and update the config if needed)
4903 - change disks on its secondary node to secondary
4904 - wait until disks are fully synchronized
4905 - disconnect from the network
4906 - change disks into single-master mode
4907 - wait again until disks are fully synchronized
4910 instance = self.instance
4911 target_node = self.target_node
4912 source_node = self.source_node
4914 # check running on only one node
4915 self.feedback_fn("* checking where the instance actually runs"
4916 " (if this hangs, the hypervisor might be in"
4918 ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
4919 for node, result in ins_l.items():
4920 result.Raise("Can't contact node %s" % node)
4922 runningon_source = instance.name in ins_l[source_node].payload
4923 runningon_target = instance.name in ins_l[target_node].payload
4925 if runningon_source and runningon_target:
4926 raise errors.OpExecError("Instance seems to be running on two nodes,"
4927 " or the hypervisor is confused; you will have"
4928 " to ensure manually that it runs only on one"
4929 " and restart this operation")
4931 if not (runningon_source or runningon_target):
4932 raise errors.OpExecError("Instance does not seem to be running at all;"
4933 " in this case it's safer to repair by"
4934 " running 'gnt-instance stop' to ensure disk"
4935 " shutdown, and then restarting it")
4937 if runningon_target:
4938 # the migration has actually succeeded, we need to update the config
4939 self.feedback_fn("* instance running on secondary node (%s),"
4940 " updating config" % target_node)
4941 instance.primary_node = target_node
4942 self.cfg.Update(instance, self.feedback_fn)
4943 demoted_node = source_node
4945 self.feedback_fn("* instance confirmed to be running on its"
4946 " primary node (%s)" % source_node)
4947 demoted_node = target_node
4949 if instance.disk_template in constants.DTS_INT_MIRROR:
4950 self._EnsureSecondary(demoted_node)
4952 self._WaitUntilSync()
4953 except errors.OpExecError:
4954 # we ignore here errors, since if the device is standalone, it
4955 # won't be able to sync
4957 self._GoStandalone()
4958 self._GoReconnect(False)
4959 self._WaitUntilSync()
4961 self.feedback_fn("* done")
4963 def _RevertDiskStatus(self):
4964 """Try to revert the disk status after a failed migration.
4967 target_node = self.target_node
4968 if self.instance.disk_template in constants.DTS_EXT_MIRROR:
4972 self._EnsureSecondary(target_node)
4973 self._GoStandalone()
4974 self._GoReconnect(False)
4975 self._WaitUntilSync()
4976 except errors.OpExecError, err:
4977 self.lu.LogWarning("Migration failed and I can't reconnect the drives,"
4978 " please try to recover the instance manually;"
4979 " error '%s'" % str(err))
4981 def _AbortMigration(self):
4982 """Call the hypervisor code to abort a started migration.
4985 instance = self.instance
4986 target_node = self.target_node
4987 source_node = self.source_node
4988 migration_info = self.migration_info
4990 abort_result = self.rpc.call_instance_finalize_migration_dst(target_node,
4994 abort_msg = abort_result.fail_msg
4996 logging.error("Aborting migration failed on target node %s: %s",
4997 target_node, abort_msg)
4998 # Don't raise an exception here, as we stil have to try to revert the
4999 # disk status, even if this step failed.
5001 abort_result = self.rpc.call_instance_finalize_migration_src(
5002 source_node, instance, False, self.live)
5003 abort_msg = abort_result.fail_msg
5005 logging.error("Aborting migration failed on source node %s: %s",
5006 source_node, abort_msg)
5008 def _ExecMigration(self):
5009 """Migrate an instance.
5011 The migrate is done by:
5012 - change the disks into dual-master mode
5013 - wait until disks are fully synchronized again
5014 - migrate the instance
5015 - change disks on the new secondary node (the old primary) to secondary
5016 - wait until disks are fully synchronized
5017 - change disks into single-master mode
5020 instance = self.instance
5021 target_node = self.target_node
5022 source_node = self.source_node
5024 # Check for hypervisor version mismatch and warn the user.
5025 nodeinfo = self.rpc.call_node_info([source_node, target_node],
5026 None, [self.instance.hypervisor], False)
5027 for ninfo in nodeinfo.values():
5028 ninfo.Raise("Unable to retrieve node information from node '%s'" %
5030 (_, _, (src_info, )) = nodeinfo[source_node].payload
5031 (_, _, (dst_info, )) = nodeinfo[target_node].payload
5033 if ((constants.HV_NODEINFO_KEY_VERSION in src_info) and
5034 (constants.HV_NODEINFO_KEY_VERSION in dst_info)):
5035 src_version = src_info[constants.HV_NODEINFO_KEY_VERSION]
5036 dst_version = dst_info[constants.HV_NODEINFO_KEY_VERSION]
5037 if src_version != dst_version:
5038 self.feedback_fn("* warning: hypervisor version mismatch between"
5039 " source (%s) and target (%s) node" %
5040 (src_version, dst_version))
5042 self.feedback_fn("* checking disk consistency between source and target")
5043 for (idx, dev) in enumerate(instance.disks):
5044 if not _CheckDiskConsistency(self.lu, instance, dev, target_node, False):
5045 raise errors.OpExecError("Disk %s is degraded or not fully"
5046 " synchronized on target node,"
5047 " aborting migration" % idx)
5049 if self.current_mem > self.tgt_free_mem:
5050 if not self.allow_runtime_changes:
5051 raise errors.OpExecError("Memory ballooning not allowed and not enough"
5052 " free memory to fit instance %s on target"
5053 " node %s (have %dMB, need %dMB)" %
5054 (instance.name, target_node,
5055 self.tgt_free_mem, self.current_mem))
5056 self.feedback_fn("* setting instance memory to %s" % self.tgt_free_mem)
5057 rpcres = self.rpc.call_instance_balloon_memory(instance.primary_node,
5060 rpcres.Raise("Cannot modify instance runtime memory")
5062 # First get the migration information from the remote node
5063 result = self.rpc.call_migration_info(source_node, instance)
5064 msg = result.fail_msg
5066 log_err = ("Failed fetching source migration information from %s: %s" %
5068 logging.error(log_err)
5069 raise errors.OpExecError(log_err)
5071 self.migration_info = migration_info = result.payload
5073 if self.instance.disk_template not in constants.DTS_EXT_MIRROR:
5074 # Then switch the disks to master/master mode
5075 self._EnsureSecondary(target_node)
5076 self._GoStandalone()
5077 self._GoReconnect(True)
5078 self._WaitUntilSync()
5080 self.feedback_fn("* preparing %s to accept the instance" % target_node)
5081 result = self.rpc.call_accept_instance(target_node,
5084 self.nodes_ip[target_node])
5086 msg = result.fail_msg
5088 logging.error("Instance pre-migration failed, trying to revert"
5089 " disk status: %s", msg)
5090 self.feedback_fn("Pre-migration failed, aborting")
5091 self._AbortMigration()
5092 self._RevertDiskStatus()
5093 raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
5094 (instance.name, msg))
5096 self.feedback_fn("* migrating instance to %s" % target_node)
5097 result = self.rpc.call_instance_migrate(source_node, instance,
5098 self.nodes_ip[target_node],
5100 msg = result.fail_msg
5102 logging.error("Instance migration failed, trying to revert"
5103 " disk status: %s", msg)
5104 self.feedback_fn("Migration failed, aborting")
5105 self._AbortMigration()
5106 self._RevertDiskStatus()
5107 raise errors.OpExecError("Could not migrate instance %s: %s" %
5108 (instance.name, msg))
5110 self.feedback_fn("* starting memory transfer")
5111 last_feedback = time.time()
5113 result = self.rpc.call_instance_get_migration_status(source_node,
5115 msg = result.fail_msg
5116 ms = result.payload # MigrationStatus instance
5117 if msg or (ms.status in constants.HV_MIGRATION_FAILED_STATUSES):
5118 logging.error("Instance migration failed, trying to revert"
5119 " disk status: %s", msg)
5120 self.feedback_fn("Migration failed, aborting")
5121 self._AbortMigration()
5122 self._RevertDiskStatus()
5124 msg = "hypervisor returned failure"
5125 raise errors.OpExecError("Could not migrate instance %s: %s" %
5126 (instance.name, msg))
5128 if result.payload.status != constants.HV_MIGRATION_ACTIVE:
5129 self.feedback_fn("* memory transfer complete")
5132 if (utils.TimeoutExpired(last_feedback,
5133 self._MIGRATION_FEEDBACK_INTERVAL) and
5134 ms.transferred_ram is not None):
5135 mem_progress = 100 * float(ms.transferred_ram) / float(ms.total_ram)
5136 self.feedback_fn("* memory transfer progress: %.2f %%" % mem_progress)
5137 last_feedback = time.time()
5139 time.sleep(self._MIGRATION_POLL_INTERVAL)
5141 result = self.rpc.call_instance_finalize_migration_src(source_node,
5145 msg = result.fail_msg
5147 logging.error("Instance migration succeeded, but finalization failed"
5148 " on the source node: %s", msg)
5149 raise errors.OpExecError("Could not finalize instance migration: %s" %
5152 instance.primary_node = target_node
5154 # distribute new instance config to the other nodes
5155 self.cfg.Update(instance, self.feedback_fn)
5157 result = self.rpc.call_instance_finalize_migration_dst(target_node,
5161 msg = result.fail_msg
5163 logging.error("Instance migration succeeded, but finalization failed"
5164 " on the target node: %s", msg)
5165 raise errors.OpExecError("Could not finalize instance migration: %s" %
5168 if self.instance.disk_template not in constants.DTS_EXT_MIRROR:
5169 self._EnsureSecondary(source_node)
5170 self._WaitUntilSync()
5171 self._GoStandalone()
5172 self._GoReconnect(False)
5173 self._WaitUntilSync()
5175 # If the instance's disk template is `rbd' or `ext' and there was a
5176 # successful migration, unmap the device from the source node.
5177 if self.instance.disk_template in (constants.DT_RBD, constants.DT_EXT):
5178 disks = _ExpandCheckDisks(instance, instance.disks)
5179 self.feedback_fn("* unmapping instance's disks from %s" % source_node)
5181 result = self.rpc.call_blockdev_shutdown(source_node, (disk, instance))
5182 msg = result.fail_msg
5184 logging.error("Migration was successful, but couldn't unmap the"
5185 " block device %s on source node %s: %s",
5186 disk.iv_name, source_node, msg)
5187 logging.error("You need to unmap the device %s manually on %s",
5188 disk.iv_name, source_node)
5190 self.feedback_fn("* done")
5192 def _ExecFailover(self):
5193 """Failover an instance.
5195 The failover is done by shutting it down on its present node and
5196 starting it on the secondary.
5199 instance = self.instance
5200 primary_node = self.cfg.GetNodeInfo(instance.primary_node)
5202 source_node = instance.primary_node
5203 target_node = self.target_node
5205 if instance.admin_state == constants.ADMINST_UP:
5206 self.feedback_fn("* checking disk consistency between source and target")
5207 for (idx, dev) in enumerate(instance.disks):
5208 # for drbd, these are drbd over lvm
5209 if not _CheckDiskConsistency(self.lu, instance, dev, target_node,
5211 if primary_node.offline:
5212 self.feedback_fn("Node %s is offline, ignoring degraded disk %s on"
5214 (primary_node.name, idx, target_node))
5215 elif not self.ignore_consistency:
5216 raise errors.OpExecError("Disk %s is degraded on target node,"
5217 " aborting failover" % idx)
5219 self.feedback_fn("* not checking disk consistency as instance is not"
5222 self.feedback_fn("* shutting down instance on source node")
5223 logging.info("Shutting down instance %s on node %s",
5224 instance.name, source_node)
5226 result = self.rpc.call_instance_shutdown(source_node, instance,
5227 self.shutdown_timeout,
5229 msg = result.fail_msg
5231 if self.ignore_consistency or primary_node.offline:
5232 self.lu.LogWarning("Could not shutdown instance %s on node %s,"
5233 " proceeding anyway; please make sure node"
5234 " %s is down; error details: %s",
5235 instance.name, source_node, source_node, msg)
5237 raise errors.OpExecError("Could not shutdown instance %s on"
5239 (instance.name, source_node, msg))
5241 self.feedback_fn("* deactivating the instance's disks on source node")
5242 if not _ShutdownInstanceDisks(self.lu, instance, ignore_primary=True):
5243 raise errors.OpExecError("Can't shut down the instance's disks")
5245 instance.primary_node = target_node
5246 # distribute new instance config to the other nodes
5247 self.cfg.Update(instance, self.feedback_fn)
5249 # Only start the instance if it's marked as up
5250 if instance.admin_state == constants.ADMINST_UP:
5251 self.feedback_fn("* activating the instance's disks on target node %s" %
5253 logging.info("Starting instance %s on node %s",
5254 instance.name, target_node)
5256 disks_ok, _ = _AssembleInstanceDisks(self.lu, instance,
5257 ignore_secondaries=True)
5259 _ShutdownInstanceDisks(self.lu, instance)
5260 raise errors.OpExecError("Can't activate the instance's disks")
5262 self.feedback_fn("* starting the instance on the target node %s" %
5264 result = self.rpc.call_instance_start(target_node, (instance, None, None),
5265 False, self.lu.op.reason)
5266 msg = result.fail_msg
5268 _ShutdownInstanceDisks(self.lu, instance)
5269 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
5270 (instance.name, target_node, msg))
5272 def Exec(self, feedback_fn):
5273 """Perform the migration.
5276 self.feedback_fn = feedback_fn
5277 self.source_node = self.instance.primary_node
5279 # FIXME: if we implement migrate-to-any in DRBD, this needs fixing
5280 if self.instance.disk_template in constants.DTS_INT_MIRROR:
5281 self.target_node = self.instance.secondary_nodes[0]
5282 # Otherwise self.target_node has been populated either
5283 # directly, or through an iallocator.
5285 self.all_nodes = [self.source_node, self.target_node]
5286 self.nodes_ip = dict((name, node.secondary_ip) for (name, node)
5287 in self.cfg.GetMultiNodeInfo(self.all_nodes))
5290 feedback_fn("Failover instance %s" % self.instance.name)
5291 self._ExecFailover()
5293 feedback_fn("Migrating instance %s" % self.instance.name)
5296 return self._ExecCleanup()
5298 return self._ExecMigration()