4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Logical units dealing with the cluster."""
34 from ganeti import compat
35 from ganeti import constants
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import masterd
40 from ganeti import netutils
41 from ganeti import objects
42 from ganeti import opcodes
43 from ganeti import pathutils
44 from ganeti import query
45 from ganeti import rpc
46 from ganeti import runtime
47 from ganeti import ssh
48 from ganeti import uidpool
49 from ganeti import utils
50 from ganeti import vcluster
52 from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
54 from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55 ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56 GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57 GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58 CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59 ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob
61 import ganeti.masterd.instance
64 class LUClusterActivateMasterIp(NoHooksLU):
65 """Activate the master IP on the master node.
68 def Exec(self, feedback_fn):
69 """Activate the master IP.
72 master_params = self.cfg.GetMasterNetworkParameters()
73 ems = self.cfg.GetUseExternalMipScript()
74 result = self.rpc.call_node_activate_master_ip(master_params.uuid,
76 result.Raise("Could not activate the master IP")
79 class LUClusterDeactivateMasterIp(NoHooksLU):
80 """Deactivate the master IP on the master node.
83 def Exec(self, feedback_fn):
84 """Deactivate the master IP.
87 master_params = self.cfg.GetMasterNetworkParameters()
88 ems = self.cfg.GetUseExternalMipScript()
89 result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
91 result.Raise("Could not deactivate the master IP")
94 class LUClusterConfigQuery(NoHooksLU):
95 """Return configuration values.
100 def CheckArguments(self):
101 self.cq = ClusterQuery(None, self.op.output_fields, False)
103 def ExpandNames(self):
104 self.cq.ExpandNames(self)
106 def DeclareLocks(self, level):
107 self.cq.DeclareLocks(self, level)
109 def Exec(self, feedback_fn):
110 result = self.cq.OldStyleQuery(self)
112 assert len(result) == 1
117 class LUClusterDestroy(LogicalUnit):
118 """Logical unit for destroying the cluster.
121 HPATH = "cluster-destroy"
122 HTYPE = constants.HTYPE_CLUSTER
124 def BuildHooksEnv(self):
129 "OP_TARGET": self.cfg.GetClusterName(),
132 def BuildHooksNodes(self):
133 """Build hooks nodes.
138 def CheckPrereq(self):
139 """Check prerequisites.
141 This checks whether the cluster is empty.
143 Any errors are signaled by raising errors.OpPrereqError.
146 master = self.cfg.GetMasterNode()
148 nodelist = self.cfg.GetNodeList()
149 if len(nodelist) != 1 or nodelist[0] != master:
150 raise errors.OpPrereqError("There are still %d node(s) in"
151 " this cluster." % (len(nodelist) - 1),
153 instancelist = self.cfg.GetInstanceList()
155 raise errors.OpPrereqError("There are still %d instance(s) in"
156 " this cluster." % len(instancelist),
159 def Exec(self, feedback_fn):
160 """Destroys the cluster.
163 master_params = self.cfg.GetMasterNetworkParameters()
165 # Run post hooks on master node before it's removed
166 RunPostHook(self, self.cfg.GetNodeName(master_params.uuid))
168 ems = self.cfg.GetUseExternalMipScript()
169 result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
171 result.Warn("Error disabling the master IP address", self.LogWarning)
172 return master_params.uuid
175 class LUClusterPostInit(LogicalUnit):
176 """Logical unit for running hooks after cluster initialization.
179 HPATH = "cluster-init"
180 HTYPE = constants.HTYPE_CLUSTER
182 def BuildHooksEnv(self):
187 "OP_TARGET": self.cfg.GetClusterName(),
190 def BuildHooksNodes(self):
191 """Build hooks nodes.
194 return ([], [self.cfg.GetMasterNode()])
196 def Exec(self, feedback_fn):
203 class ClusterQuery(QueryBase):
204 FIELDS = query.CLUSTER_FIELDS
206 #: Do not sort (there is only one item)
209 def ExpandNames(self, lu):
212 # The following variables interact with _QueryBase._GetNames
213 self.wanted = locking.ALL_SET
214 self.do_locking = self.use_locking
217 raise errors.OpPrereqError("Can not use locking for cluster queries",
220 def DeclareLocks(self, lu, level):
223 def _GetQueryData(self, lu):
224 """Computes the list of nodes and their attributes.
227 # Locking is not used
228 assert not (compat.any(lu.glm.is_owned(level)
229 for level in locking.LEVELS
230 if level != locking.LEVEL_CLUSTER) or
231 self.do_locking or self.use_locking)
233 if query.CQ_CONFIG in self.requested_data:
234 cluster = lu.cfg.GetClusterInfo()
235 nodes = lu.cfg.GetAllNodesInfo()
237 cluster = NotImplemented
238 nodes = NotImplemented
240 if query.CQ_QUEUE_DRAINED in self.requested_data:
241 drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
243 drain_flag = NotImplemented
245 if query.CQ_WATCHER_PAUSE in self.requested_data:
246 master_node_uuid = lu.cfg.GetMasterNode()
248 result = lu.rpc.call_get_watcher_pause(master_node_uuid)
249 result.Raise("Can't retrieve watcher pause from master node '%s'" %
250 lu.cfg.GetMasterNodeName())
252 watcher_pause = result.payload
254 watcher_pause = NotImplemented
256 return query.ClusterQueryData(cluster, nodes, drain_flag, watcher_pause)
259 class LUClusterQuery(NoHooksLU):
260 """Query cluster configuration.
265 def ExpandNames(self):
266 self.needed_locks = {}
268 def Exec(self, feedback_fn):
269 """Return cluster config.
272 cluster = self.cfg.GetClusterInfo()
275 # Filter just for enabled hypervisors
276 for os_name, hv_dict in cluster.os_hvp.items():
278 for hv_name, hv_params in hv_dict.items():
279 if hv_name in cluster.enabled_hypervisors:
280 os_hvp[os_name][hv_name] = hv_params
282 # Convert ip_family to ip_version
283 primary_ip_version = constants.IP4_VERSION
284 if cluster.primary_ip_family == netutils.IP6Address.family:
285 primary_ip_version = constants.IP6_VERSION
288 "software_version": constants.RELEASE_VERSION,
289 "protocol_version": constants.PROTOCOL_VERSION,
290 "config_version": constants.CONFIG_VERSION,
291 "os_api_version": max(constants.OS_API_VERSIONS),
292 "export_version": constants.EXPORT_VERSION,
293 "architecture": runtime.GetArchInfo(),
294 "name": cluster.cluster_name,
295 "master": self.cfg.GetMasterNodeName(),
296 "default_hypervisor": cluster.primary_hypervisor,
297 "enabled_hypervisors": cluster.enabled_hypervisors,
298 "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
299 for hypervisor_name in cluster.enabled_hypervisors]),
301 "beparams": cluster.beparams,
302 "osparams": cluster.osparams,
303 "ipolicy": cluster.ipolicy,
304 "nicparams": cluster.nicparams,
305 "ndparams": cluster.ndparams,
306 "diskparams": cluster.diskparams,
307 "candidate_pool_size": cluster.candidate_pool_size,
308 "master_netdev": cluster.master_netdev,
309 "master_netmask": cluster.master_netmask,
310 "use_external_mip_script": cluster.use_external_mip_script,
311 "volume_group_name": cluster.volume_group_name,
312 "drbd_usermode_helper": cluster.drbd_usermode_helper,
313 "file_storage_dir": cluster.file_storage_dir,
314 "shared_file_storage_dir": cluster.shared_file_storage_dir,
315 "maintain_node_health": cluster.maintain_node_health,
316 "ctime": cluster.ctime,
317 "mtime": cluster.mtime,
318 "uuid": cluster.uuid,
319 "tags": list(cluster.GetTags()),
320 "uid_pool": cluster.uid_pool,
321 "default_iallocator": cluster.default_iallocator,
322 "reserved_lvs": cluster.reserved_lvs,
323 "primary_ip_version": primary_ip_version,
324 "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
325 "hidden_os": cluster.hidden_os,
326 "blacklisted_os": cluster.blacklisted_os,
327 "enabled_disk_templates": cluster.enabled_disk_templates,
333 class LUClusterRedistConf(NoHooksLU):
334 """Force the redistribution of cluster configuration.
336 This is a very simple LU.
341 def ExpandNames(self):
342 self.needed_locks = {
343 locking.LEVEL_NODE: locking.ALL_SET,
344 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
346 self.share_locks = ShareAll()
348 def Exec(self, feedback_fn):
349 """Redistribute the configuration.
352 self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
353 RedistributeAncillaryFiles(self)
356 class LUClusterRename(LogicalUnit):
357 """Rename the cluster.
360 HPATH = "cluster-rename"
361 HTYPE = constants.HTYPE_CLUSTER
363 def BuildHooksEnv(self):
368 "OP_TARGET": self.cfg.GetClusterName(),
369 "NEW_NAME": self.op.name,
372 def BuildHooksNodes(self):
373 """Build hooks nodes.
376 return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
378 def CheckPrereq(self):
379 """Verify that the passed name is a valid one.
382 hostname = netutils.GetHostname(name=self.op.name,
383 family=self.cfg.GetPrimaryIPFamily())
385 new_name = hostname.name
386 self.ip = new_ip = hostname.ip
387 old_name = self.cfg.GetClusterName()
388 old_ip = self.cfg.GetMasterIP()
389 if new_name == old_name and new_ip == old_ip:
390 raise errors.OpPrereqError("Neither the name nor the IP address of the"
391 " cluster has changed",
394 if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
395 raise errors.OpPrereqError("The given cluster IP address (%s) is"
396 " reachable on the network" %
397 new_ip, errors.ECODE_NOTUNIQUE)
399 self.op.name = new_name
401 def Exec(self, feedback_fn):
402 """Rename the cluster.
405 clustername = self.op.name
408 # shutdown the master IP
409 master_params = self.cfg.GetMasterNetworkParameters()
410 ems = self.cfg.GetUseExternalMipScript()
411 result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
413 result.Raise("Could not disable the master role")
416 cluster = self.cfg.GetClusterInfo()
417 cluster.cluster_name = clustername
418 cluster.master_ip = new_ip
419 self.cfg.Update(cluster, feedback_fn)
421 # update the known hosts file
422 ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
423 node_list = self.cfg.GetOnlineNodeList()
425 node_list.remove(master_params.uuid)
428 UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
430 master_params.ip = new_ip
431 result = self.rpc.call_node_activate_master_ip(master_params.uuid,
433 result.Warn("Could not re-enable the master role on the master,"
434 " please restart manually", self.LogWarning)
439 class LUClusterRepairDiskSizes(NoHooksLU):
440 """Verifies the cluster disks sizes.
445 def ExpandNames(self):
446 if self.op.instances:
447 (_, self.wanted_names) = GetWantedInstances(self, self.op.instances)
448 # Not getting the node allocation lock as only a specific set of
449 # instances (and their nodes) is going to be acquired
450 self.needed_locks = {
451 locking.LEVEL_NODE_RES: [],
452 locking.LEVEL_INSTANCE: self.wanted_names,
454 self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
456 self.wanted_names = None
457 self.needed_locks = {
458 locking.LEVEL_NODE_RES: locking.ALL_SET,
459 locking.LEVEL_INSTANCE: locking.ALL_SET,
461 # This opcode is acquires the node locks for all instances
462 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
466 locking.LEVEL_NODE_RES: 1,
467 locking.LEVEL_INSTANCE: 0,
468 locking.LEVEL_NODE_ALLOC: 1,
471 def DeclareLocks(self, level):
472 if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
473 self._LockInstancesNodes(primary_only=True, level=level)
475 def CheckPrereq(self):
476 """Check prerequisites.
478 This only checks the optional instance list against the existing names.
481 if self.wanted_names is None:
482 self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
484 self.wanted_instances = \
485 map(compat.snd, self.cfg.GetMultiInstanceInfoByName(self.wanted_names))
487 def _EnsureChildSizes(self, disk):
488 """Ensure children of the disk have the needed disk size.
490 This is valid mainly for DRBD8 and fixes an issue where the
491 children have smaller disk size.
493 @param disk: an L{ganeti.objects.Disk} object
496 if disk.dev_type == constants.LD_DRBD8:
497 assert disk.children, "Empty children for DRBD8?"
498 fchild = disk.children[0]
499 mismatch = fchild.size < disk.size
501 self.LogInfo("Child disk has size %d, parent %d, fixing",
502 fchild.size, disk.size)
503 fchild.size = disk.size
505 # and we recurse on this child only, not on the metadev
506 return self._EnsureChildSizes(fchild) or mismatch
510 def Exec(self, feedback_fn):
511 """Verify the size of cluster disks.
514 # TODO: check child disks too
515 # TODO: check differences in size between primary/secondary nodes
517 for instance in self.wanted_instances:
518 pnode = instance.primary_node
519 if pnode not in per_node_disks:
520 per_node_disks[pnode] = []
521 for idx, disk in enumerate(instance.disks):
522 per_node_disks[pnode].append((instance, idx, disk))
524 assert not (frozenset(per_node_disks.keys()) -
525 self.owned_locks(locking.LEVEL_NODE_RES)), \
526 "Not owning correct locks"
527 assert not self.owned_locks(locking.LEVEL_NODE)
529 es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
530 per_node_disks.keys())
533 for node_uuid, dskl in per_node_disks.items():
534 newl = [v[2].Copy() for v in dskl]
536 self.cfg.SetDiskID(dsk, node_uuid)
537 node_name = self.cfg.GetNodeName(node_uuid)
538 result = self.rpc.call_blockdev_getdimensions(node_uuid, newl)
540 self.LogWarning("Failure in blockdev_getdimensions call to node"
541 " %s, ignoring", node_name)
543 if len(result.payload) != len(dskl):
544 logging.warning("Invalid result from node %s: len(dksl)=%d,"
545 " result.payload=%s", node_name, len(dskl),
547 self.LogWarning("Invalid result from node %s, ignoring node results",
550 for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
551 if dimensions is None:
552 self.LogWarning("Disk %d of instance %s did not return size"
553 " information, ignoring", idx, instance.name)
555 if not isinstance(dimensions, (tuple, list)):
556 self.LogWarning("Disk %d of instance %s did not return valid"
557 " dimension information, ignoring", idx,
560 (size, spindles) = dimensions
561 if not isinstance(size, (int, long)):
562 self.LogWarning("Disk %d of instance %s did not return valid"
563 " size information, ignoring", idx, instance.name)
566 if size != disk.size:
567 self.LogInfo("Disk %d of instance %s has mismatched size,"
568 " correcting: recorded %d, actual %d", idx,
569 instance.name, disk.size, size)
571 self.cfg.Update(instance, feedback_fn)
572 changed.append((instance.name, idx, "size", size))
573 if es_flags[node_uuid]:
575 self.LogWarning("Disk %d of instance %s did not return valid"
576 " spindles information, ignoring", idx,
578 elif disk.spindles is None or disk.spindles != spindles:
579 self.LogInfo("Disk %d of instance %s has mismatched spindles,"
580 " correcting: recorded %s, actual %s",
581 idx, instance.name, disk.spindles, spindles)
582 disk.spindles = spindles
583 self.cfg.Update(instance, feedback_fn)
584 changed.append((instance.name, idx, "spindles", disk.spindles))
585 if self._EnsureChildSizes(disk):
586 self.cfg.Update(instance, feedback_fn)
587 changed.append((instance.name, idx, "size", disk.size))
591 def _ValidateNetmask(cfg, netmask):
592 """Checks if a netmask is valid.
594 @type cfg: L{config.ConfigWriter}
595 @param cfg: The cluster configuration
597 @param netmask: the netmask to be verified
598 @raise errors.OpPrereqError: if the validation fails
601 ip_family = cfg.GetPrimaryIPFamily()
603 ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
604 except errors.ProgrammerError:
605 raise errors.OpPrereqError("Invalid primary ip family: %s." %
606 ip_family, errors.ECODE_INVAL)
607 if not ipcls.ValidateNetmask(netmask):
608 raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
609 (netmask), errors.ECODE_INVAL)
612 class LUClusterSetParams(LogicalUnit):
613 """Change the parameters of the cluster.
616 HPATH = "cluster-modify"
617 HTYPE = constants.HTYPE_CLUSTER
620 def CheckArguments(self):
625 uidpool.CheckUidPool(self.op.uid_pool)
628 uidpool.CheckUidPool(self.op.add_uids)
630 if self.op.remove_uids:
631 uidpool.CheckUidPool(self.op.remove_uids)
633 if self.op.master_netmask is not None:
634 _ValidateNetmask(self.cfg, self.op.master_netmask)
636 if self.op.diskparams:
637 for dt_params in self.op.diskparams.values():
638 utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
640 utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
641 except errors.OpPrereqError, err:
642 raise errors.OpPrereqError("While verify diskparams options: %s" % err,
645 def ExpandNames(self):
646 # FIXME: in the future maybe other cluster params won't require checking on
647 # all nodes to be modified.
648 # FIXME: This opcode changes cluster-wide settings. Is acquiring all
649 # resource locks the right thing, shouldn't it be the BGL instead?
650 self.needed_locks = {
651 locking.LEVEL_NODE: locking.ALL_SET,
652 locking.LEVEL_INSTANCE: locking.ALL_SET,
653 locking.LEVEL_NODEGROUP: locking.ALL_SET,
654 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
656 self.share_locks = ShareAll()
658 def BuildHooksEnv(self):
663 "OP_TARGET": self.cfg.GetClusterName(),
664 "NEW_VG_NAME": self.op.vg_name,
667 def BuildHooksNodes(self):
668 """Build hooks nodes.
671 mn = self.cfg.GetMasterNode()
674 def _CheckVgName(self, node_uuids, enabled_disk_templates,
675 new_enabled_disk_templates):
676 """Check the consistency of the vg name on all nodes and in case it gets
677 unset whether there are instances still using it.
680 if self.op.vg_name is not None and not self.op.vg_name:
681 if self.cfg.HasAnyDiskOfType(constants.LD_LV):
682 raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
683 " instances exist", errors.ECODE_INVAL)
685 if (self.op.vg_name is not None and
686 utils.IsLvmEnabled(enabled_disk_templates)) or \
687 (self.cfg.GetVGName() is not None and
688 utils.LvmGetsEnabled(enabled_disk_templates,
689 new_enabled_disk_templates)):
690 self._CheckVgNameOnNodes(node_uuids)
692 def _CheckVgNameOnNodes(self, node_uuids):
693 """Check the status of the volume group on each node.
696 vglist = self.rpc.call_vg_list(node_uuids)
697 for node_uuid in node_uuids:
698 msg = vglist[node_uuid].fail_msg
701 self.LogWarning("Error while gathering data on node %s"
702 " (ignoring node): %s",
703 self.cfg.GetNodeName(node_uuid), msg)
705 vgstatus = utils.CheckVolumeGroupSize(vglist[node_uuid].payload,
707 constants.MIN_VG_SIZE)
709 raise errors.OpPrereqError("Error on node '%s': %s" %
710 (self.cfg.GetNodeName(node_uuid), vgstatus),
711 errors.ECODE_ENVIRON)
713 def _GetEnabledDiskTemplates(self, cluster):
714 """Determines the enabled disk templates and the subset of disk templates
715 that are newly enabled by this operation.
718 enabled_disk_templates = None
719 new_enabled_disk_templates = []
720 if self.op.enabled_disk_templates:
721 enabled_disk_templates = self.op.enabled_disk_templates
722 new_enabled_disk_templates = \
723 list(set(enabled_disk_templates)
724 - set(cluster.enabled_disk_templates))
726 enabled_disk_templates = cluster.enabled_disk_templates
727 return (enabled_disk_templates, new_enabled_disk_templates)
729 def CheckPrereq(self):
730 """Check prerequisites.
732 This checks whether the given params don't conflict and
733 if the given volume group is valid.
736 if self.op.drbd_helper is not None and not self.op.drbd_helper:
737 if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
738 raise errors.OpPrereqError("Cannot disable drbd helper while"
739 " drbd-based instances exist",
742 node_uuids = self.owned_locks(locking.LEVEL_NODE)
743 self.cluster = cluster = self.cfg.GetClusterInfo()
745 vm_capable_node_uuids = [node.uuid
746 for node in self.cfg.GetAllNodesInfo().values()
747 if node.uuid in node_uuids and node.vm_capable]
749 (enabled_disk_templates, new_enabled_disk_templates) = \
750 self._GetEnabledDiskTemplates(cluster)
752 self._CheckVgName(vm_capable_node_uuids, enabled_disk_templates,
753 new_enabled_disk_templates)
755 if self.op.drbd_helper:
756 # checks given drbd helper on all nodes
757 helpers = self.rpc.call_drbd_helper(node_uuids)
758 for (_, ninfo) in self.cfg.GetMultiNodeInfo(node_uuids):
760 self.LogInfo("Not checking drbd helper on offline node %s",
763 msg = helpers[ninfo.uuid].fail_msg
765 raise errors.OpPrereqError("Error checking drbd helper on node"
766 " '%s': %s" % (ninfo.name, msg),
767 errors.ECODE_ENVIRON)
768 node_helper = helpers[ninfo.uuid].payload
769 if node_helper != self.op.drbd_helper:
770 raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
771 (ninfo.name, node_helper),
772 errors.ECODE_ENVIRON)
774 # validate params changes
776 objects.UpgradeBeParams(self.op.beparams)
777 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
778 self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
781 utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
782 self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
784 # TODO: we need a more general way to handle resetting
785 # cluster-level parameters to default values
786 if self.new_ndparams["oob_program"] == "":
787 self.new_ndparams["oob_program"] = \
788 constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
791 new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
792 self.cluster.hv_state_static)
793 self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
794 for hv, values in new_hv_state.items())
796 if self.op.disk_state:
797 new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
798 self.cluster.disk_state_static)
799 self.new_disk_state = \
800 dict((storage, dict((name, cluster.SimpleFillDiskState(values))
801 for name, values in svalues.items()))
802 for storage, svalues in new_disk_state.items())
805 self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
808 all_instances = self.cfg.GetAllInstancesInfo().values()
810 for group in self.cfg.GetAllNodeGroupsInfo().values():
811 instances = frozenset([inst for inst in all_instances
812 if compat.any(nuuid in group.members
813 for nuuid in inst.all_nodes)])
814 new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
815 ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
816 new = ComputeNewInstanceViolations(ipol, new_ipolicy, instances,
819 violations.update(new)
822 self.LogWarning("After the ipolicy change the following instances"
824 utils.CommaJoin(utils.NiceSort(violations)))
826 if self.op.nicparams:
827 utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
828 self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
829 objects.NIC.CheckParameterSyntax(self.new_nicparams)
832 # check all instances for consistency
833 for instance in self.cfg.GetAllInstancesInfo().values():
834 for nic_idx, nic in enumerate(instance.nics):
835 params_copy = copy.deepcopy(nic.nicparams)
836 params_filled = objects.FillDict(self.new_nicparams, params_copy)
838 # check parameter syntax
840 objects.NIC.CheckParameterSyntax(params_filled)
841 except errors.ConfigurationError, err:
842 nic_errors.append("Instance %s, nic/%d: %s" %
843 (instance.name, nic_idx, err))
845 # if we're moving instances to routed, check that they have an ip
846 target_mode = params_filled[constants.NIC_MODE]
847 if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
848 nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
849 " address" % (instance.name, nic_idx))
851 raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
852 "\n".join(nic_errors), errors.ECODE_INVAL)
854 # hypervisor list/parameters
855 self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
857 for hv_name, hv_dict in self.op.hvparams.items():
858 if hv_name not in self.new_hvparams:
859 self.new_hvparams[hv_name] = hv_dict
861 self.new_hvparams[hv_name].update(hv_dict)
863 # disk template parameters
864 self.new_diskparams = objects.FillDict(cluster.diskparams, {})
865 if self.op.diskparams:
866 for dt_name, dt_params in self.op.diskparams.items():
867 if dt_name not in self.op.diskparams:
868 self.new_diskparams[dt_name] = dt_params
870 self.new_diskparams[dt_name].update(dt_params)
872 # os hypervisor parameters
873 self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
875 for os_name, hvs in self.op.os_hvp.items():
876 if os_name not in self.new_os_hvp:
877 self.new_os_hvp[os_name] = hvs
879 for hv_name, hv_dict in hvs.items():
881 # Delete if it exists
882 self.new_os_hvp[os_name].pop(hv_name, None)
883 elif hv_name not in self.new_os_hvp[os_name]:
884 self.new_os_hvp[os_name][hv_name] = hv_dict
886 self.new_os_hvp[os_name][hv_name].update(hv_dict)
889 self.new_osp = objects.FillDict(cluster.osparams, {})
891 for os_name, osp in self.op.osparams.items():
892 if os_name not in self.new_osp:
893 self.new_osp[os_name] = {}
895 self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
898 if not self.new_osp[os_name]:
899 # we removed all parameters
900 del self.new_osp[os_name]
902 # check the parameter validity (remote check)
903 CheckOSParams(self, False, [self.cfg.GetMasterNode()],
904 os_name, self.new_osp[os_name])
906 # changes to the hypervisor list
907 if self.op.enabled_hypervisors is not None:
908 self.hv_list = self.op.enabled_hypervisors
909 for hv in self.hv_list:
910 # if the hypervisor doesn't already exist in the cluster
911 # hvparams, we initialize it to empty, and then (in both
912 # cases) we make sure to fill the defaults, as we might not
913 # have a complete defaults list if the hypervisor wasn't
915 if hv not in new_hvp:
917 new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
918 utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
920 self.hv_list = cluster.enabled_hypervisors
922 if self.op.hvparams or self.op.enabled_hypervisors is not None:
923 # either the enabled list has changed, or the parameters have, validate
924 for hv_name, hv_params in self.new_hvparams.items():
925 if ((self.op.hvparams and hv_name in self.op.hvparams) or
926 (self.op.enabled_hypervisors and
927 hv_name in self.op.enabled_hypervisors)):
928 # either this is a new hypervisor, or its parameters have changed
929 hv_class = hypervisor.GetHypervisorClass(hv_name)
930 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
931 hv_class.CheckParameterSyntax(hv_params)
932 CheckHVParams(self, node_uuids, hv_name, hv_params)
934 self._CheckDiskTemplateConsistency()
937 # no need to check any newly-enabled hypervisors, since the
938 # defaults have already been checked in the above code-block
939 for os_name, os_hvp in self.new_os_hvp.items():
940 for hv_name, hv_params in os_hvp.items():
941 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
942 # we need to fill in the new os_hvp on top of the actual hv_p
943 cluster_defaults = self.new_hvparams.get(hv_name, {})
944 new_osp = objects.FillDict(cluster_defaults, hv_params)
945 hv_class = hypervisor.GetHypervisorClass(hv_name)
946 hv_class.CheckParameterSyntax(new_osp)
947 CheckHVParams(self, node_uuids, hv_name, new_osp)
949 if self.op.default_iallocator:
950 alloc_script = utils.FindFile(self.op.default_iallocator,
951 constants.IALLOCATOR_SEARCH_PATH,
953 if alloc_script is None:
954 raise errors.OpPrereqError("Invalid default iallocator script '%s'"
955 " specified" % self.op.default_iallocator,
958 def _CheckDiskTemplateConsistency(self):
959 """Check whether the disk templates that are going to be disabled
960 are still in use by some instances.
963 if self.op.enabled_disk_templates:
964 cluster = self.cfg.GetClusterInfo()
965 instances = self.cfg.GetAllInstancesInfo()
967 disk_templates_to_remove = set(cluster.enabled_disk_templates) \
968 - set(self.op.enabled_disk_templates)
969 for instance in instances.itervalues():
970 if instance.disk_template in disk_templates_to_remove:
971 raise errors.OpPrereqError("Cannot disable disk template '%s',"
972 " because instance '%s' is using it." %
973 (instance.disk_template, instance.name))
975 def _SetVgName(self, feedback_fn):
976 """Determines and sets the new volume group name.
979 if self.op.vg_name is not None:
980 if self.op.vg_name and not \
981 utils.IsLvmEnabled(self.cluster.enabled_disk_templates):
982 feedback_fn("Note that you specified a volume group, but did not"
983 " enable any lvm disk template.")
984 new_volume = self.op.vg_name
986 if utils.IsLvmEnabled(self.cluster.enabled_disk_templates):
987 raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
988 " disk templates are enabled.")
990 if new_volume != self.cfg.GetVGName():
991 self.cfg.SetVGName(new_volume)
993 feedback_fn("Cluster LVM configuration already in desired"
994 " state, not changing")
996 if utils.IsLvmEnabled(self.cluster.enabled_disk_templates) and \
997 not self.cfg.GetVGName():
998 raise errors.OpPrereqError("Please specify a volume group when"
999 " enabling lvm-based disk-templates.")
1001 def Exec(self, feedback_fn):
1002 """Change the parameters of the cluster.
1005 if self.op.enabled_disk_templates:
1006 self.cluster.enabled_disk_templates = \
1007 list(set(self.op.enabled_disk_templates))
1009 self._SetVgName(feedback_fn)
1011 if self.op.drbd_helper is not None:
1012 if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
1013 feedback_fn("Note that you specified a drbd user helper, but did"
1014 " enabled the drbd disk template.")
1015 new_helper = self.op.drbd_helper
1018 if new_helper != self.cfg.GetDRBDHelper():
1019 self.cfg.SetDRBDHelper(new_helper)
1021 feedback_fn("Cluster DRBD helper already in desired state,"
1023 if self.op.hvparams:
1024 self.cluster.hvparams = self.new_hvparams
1026 self.cluster.os_hvp = self.new_os_hvp
1027 if self.op.enabled_hypervisors is not None:
1028 self.cluster.hvparams = self.new_hvparams
1029 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1030 if self.op.beparams:
1031 self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1032 if self.op.nicparams:
1033 self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1035 self.cluster.ipolicy = self.new_ipolicy
1036 if self.op.osparams:
1037 self.cluster.osparams = self.new_osp
1038 if self.op.ndparams:
1039 self.cluster.ndparams = self.new_ndparams
1040 if self.op.diskparams:
1041 self.cluster.diskparams = self.new_diskparams
1042 if self.op.hv_state:
1043 self.cluster.hv_state_static = self.new_hv_state
1044 if self.op.disk_state:
1045 self.cluster.disk_state_static = self.new_disk_state
1047 if self.op.candidate_pool_size is not None:
1048 self.cluster.candidate_pool_size = self.op.candidate_pool_size
1049 # we need to update the pool size here, otherwise the save will fail
1050 AdjustCandidatePool(self, [])
1052 if self.op.maintain_node_health is not None:
1053 if self.op.maintain_node_health and not constants.ENABLE_CONFD:
1054 feedback_fn("Note: CONFD was disabled at build time, node health"
1055 " maintenance is not useful (still enabling it)")
1056 self.cluster.maintain_node_health = self.op.maintain_node_health
1058 if self.op.prealloc_wipe_disks is not None:
1059 self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
1061 if self.op.add_uids is not None:
1062 uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
1064 if self.op.remove_uids is not None:
1065 uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
1067 if self.op.uid_pool is not None:
1068 self.cluster.uid_pool = self.op.uid_pool
1070 if self.op.default_iallocator is not None:
1071 self.cluster.default_iallocator = self.op.default_iallocator
1073 if self.op.reserved_lvs is not None:
1074 self.cluster.reserved_lvs = self.op.reserved_lvs
1076 if self.op.use_external_mip_script is not None:
1077 self.cluster.use_external_mip_script = self.op.use_external_mip_script
1079 def helper_os(aname, mods, desc):
1081 lst = getattr(self.cluster, aname)
1082 for key, val in mods:
1083 if key == constants.DDM_ADD:
1085 feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1088 elif key == constants.DDM_REMOVE:
1092 feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1094 raise errors.ProgrammerError("Invalid modification '%s'" % key)
1096 if self.op.hidden_os:
1097 helper_os("hidden_os", self.op.hidden_os, "hidden")
1099 if self.op.blacklisted_os:
1100 helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1102 if self.op.master_netdev:
1103 master_params = self.cfg.GetMasterNetworkParameters()
1104 ems = self.cfg.GetUseExternalMipScript()
1105 feedback_fn("Shutting down master ip on the current netdev (%s)" %
1106 self.cluster.master_netdev)
1107 result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
1109 if not self.op.force:
1110 result.Raise("Could not disable the master ip")
1113 msg = ("Could not disable the master ip (continuing anyway): %s" %
1116 feedback_fn("Changing master_netdev from %s to %s" %
1117 (master_params.netdev, self.op.master_netdev))
1118 self.cluster.master_netdev = self.op.master_netdev
1120 if self.op.master_netmask:
1121 master_params = self.cfg.GetMasterNetworkParameters()
1122 feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1123 result = self.rpc.call_node_change_master_netmask(
1124 master_params.uuid, master_params.netmask,
1125 self.op.master_netmask, master_params.ip,
1126 master_params.netdev)
1127 result.Warn("Could not change the master IP netmask", feedback_fn)
1128 self.cluster.master_netmask = self.op.master_netmask
1130 self.cfg.Update(self.cluster, feedback_fn)
1132 if self.op.master_netdev:
1133 master_params = self.cfg.GetMasterNetworkParameters()
1134 feedback_fn("Starting the master ip on the new master netdev (%s)" %
1135 self.op.master_netdev)
1136 ems = self.cfg.GetUseExternalMipScript()
1137 result = self.rpc.call_node_activate_master_ip(master_params.uuid,
1139 result.Warn("Could not re-enable the master ip on the master,"
1140 " please restart manually", self.LogWarning)
1143 class LUClusterVerify(NoHooksLU):
1144 """Submits all jobs necessary to verify the cluster.
1149 def ExpandNames(self):
1150 self.needed_locks = {}
1152 def Exec(self, feedback_fn):
1155 if self.op.group_name:
1156 groups = [self.op.group_name]
1157 depends_fn = lambda: None
1159 groups = self.cfg.GetNodeGroupList()
1161 # Verify global configuration
1163 opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1166 # Always depend on global verification
1167 depends_fn = lambda: [(-len(jobs), [])]
1170 [opcodes.OpClusterVerifyGroup(group_name=group,
1171 ignore_errors=self.op.ignore_errors,
1172 depends=depends_fn())]
1173 for group in groups)
1175 # Fix up all parameters
1176 for op in itertools.chain(*jobs): # pylint: disable=W0142
1177 op.debug_simulate_errors = self.op.debug_simulate_errors
1178 op.verbose = self.op.verbose
1179 op.error_codes = self.op.error_codes
1181 op.skip_checks = self.op.skip_checks
1182 except AttributeError:
1183 assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1185 return ResultWithJobs(jobs)
1188 class _VerifyErrors(object):
1189 """Mix-in for cluster/group verify LUs.
1191 It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1192 self.op and self._feedback_fn to be available.)
1196 ETYPE_FIELD = "code"
1197 ETYPE_ERROR = "ERROR"
1198 ETYPE_WARNING = "WARNING"
1200 def _Error(self, ecode, item, msg, *args, **kwargs):
1201 """Format an error message.
1203 Based on the opcode's error_codes parameter, either format a
1204 parseable error code, or a simpler error string.
1206 This must be called only from Exec and functions called from Exec.
1209 ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1210 itype, etxt, _ = ecode
1211 # If the error code is in the list of ignored errors, demote the error to a
1213 if etxt in self.op.ignore_errors: # pylint: disable=E1101
1214 ltype = self.ETYPE_WARNING
1215 # first complete the msg
1218 # then format the whole message
1219 if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1220 msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1226 msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1227 # and finally report it via the feedback_fn
1228 self._feedback_fn(" - %s" % msg) # Mix-in. pylint: disable=E1101
1229 # do not mark the operation as failed for WARN cases only
1230 if ltype == self.ETYPE_ERROR:
1233 def _ErrorIf(self, cond, *args, **kwargs):
1234 """Log an error message if the passed condition is True.
1238 or self.op.debug_simulate_errors): # pylint: disable=E1101
1239 self._Error(*args, **kwargs)
1242 def _VerifyCertificate(filename):
1243 """Verifies a certificate for L{LUClusterVerifyConfig}.
1245 @type filename: string
1246 @param filename: Path to PEM file
1250 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1251 utils.ReadFile(filename))
1252 except Exception, err: # pylint: disable=W0703
1253 return (LUClusterVerifyConfig.ETYPE_ERROR,
1254 "Failed to load X509 certificate %s: %s" % (filename, err))
1257 utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1258 constants.SSL_CERT_EXPIRATION_ERROR)
1261 fnamemsg = "While verifying %s: %s" % (filename, msg)
1266 return (None, fnamemsg)
1267 elif errcode == utils.CERT_WARNING:
1268 return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1269 elif errcode == utils.CERT_ERROR:
1270 return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1272 raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1275 def _GetAllHypervisorParameters(cluster, instances):
1276 """Compute the set of all hypervisor parameters.
1278 @type cluster: L{objects.Cluster}
1279 @param cluster: the cluster object
1280 @param instances: list of L{objects.Instance}
1281 @param instances: additional instances from which to obtain parameters
1282 @rtype: list of (origin, hypervisor, parameters)
1283 @return: a list with all parameters found, indicating the hypervisor they
1284 apply to, and the origin (can be "cluster", "os X", or "instance Y")
1289 for hv_name in cluster.enabled_hypervisors:
1290 hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1292 for os_name, os_hvp in cluster.os_hvp.items():
1293 for hv_name, hv_params in os_hvp.items():
1295 full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1296 hvp_data.append(("os %s" % os_name, hv_name, full_params))
1298 # TODO: collapse identical parameter values in a single one
1299 for instance in instances:
1300 if instance.hvparams:
1301 hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1302 cluster.FillHV(instance)))
1307 class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1308 """Verifies the cluster config.
1313 def _VerifyHVP(self, hvp_data):
1314 """Verifies locally the syntax of the hypervisor parameters.
1317 for item, hv_name, hv_params in hvp_data:
1318 msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1321 hv_class = hypervisor.GetHypervisorClass(hv_name)
1322 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1323 hv_class.CheckParameterSyntax(hv_params)
1324 except errors.GenericError, err:
1325 self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1327 def ExpandNames(self):
1328 self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1329 self.share_locks = ShareAll()
1331 def CheckPrereq(self):
1332 """Check prerequisites.
1335 # Retrieve all information
1336 self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1337 self.all_node_info = self.cfg.GetAllNodesInfo()
1338 self.all_inst_info = self.cfg.GetAllInstancesInfo()
1340 def Exec(self, feedback_fn):
1341 """Verify integrity of cluster, performing various test on nodes.
1345 self._feedback_fn = feedback_fn
1347 feedback_fn("* Verifying cluster config")
1349 for msg in self.cfg.VerifyConfig():
1350 self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1352 feedback_fn("* Verifying cluster certificate files")
1354 for cert_filename in pathutils.ALL_CERT_FILES:
1355 (errcode, msg) = _VerifyCertificate(cert_filename)
1356 self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1358 feedback_fn("* Verifying hypervisor parameters")
1360 self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1361 self.all_inst_info.values()))
1363 feedback_fn("* Verifying all nodes belong to an existing group")
1365 # We do this verification here because, should this bogus circumstance
1366 # occur, it would never be caught by VerifyGroup, which only acts on
1367 # nodes/instances reachable from existing node groups.
1369 dangling_nodes = set(node for node in self.all_node_info.values()
1370 if node.group not in self.all_group_info)
1372 dangling_instances = {}
1373 no_node_instances = []
1375 for inst in self.all_inst_info.values():
1376 if inst.primary_node in [node.uuid for node in dangling_nodes]:
1377 dangling_instances.setdefault(inst.primary_node, []).append(inst)
1378 elif inst.primary_node not in self.all_node_info:
1379 no_node_instances.append(inst)
1385 self.cfg.GetInstanceNames(
1386 dangling_instances.get(node.uuid, ["no instances"]))))
1387 for node in dangling_nodes]
1389 self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1391 "the following nodes (and their instances) belong to a non"
1392 " existing group: %s", utils.CommaJoin(pretty_dangling))
1394 self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1396 "the following instances have a non-existing primary-node:"
1397 " %s", utils.CommaJoin(
1398 self.cfg.GetInstanceNames(no_node_instances)))
1403 class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1404 """Verifies the status of a node group.
1407 HPATH = "cluster-verify"
1408 HTYPE = constants.HTYPE_CLUSTER
1411 _HOOKS_INDENT_RE = re.compile("^", re.M)
1413 class NodeImage(object):
1414 """A class representing the logical and physical status of a node.
1417 @ivar uuid: the node UUID to which this object refers
1418 @ivar volumes: a structure as returned from
1419 L{ganeti.backend.GetVolumeList} (runtime)
1420 @ivar instances: a list of running instances (runtime)
1421 @ivar pinst: list of configured primary instances (config)
1422 @ivar sinst: list of configured secondary instances (config)
1423 @ivar sbp: dictionary of {primary-node: list of instances} for all
1424 instances for which this node is secondary (config)
1425 @ivar mfree: free memory, as reported by hypervisor (runtime)
1426 @ivar dfree: free disk, as reported by the node (runtime)
1427 @ivar offline: the offline status (config)
1428 @type rpc_fail: boolean
1429 @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1430 not whether the individual keys were correct) (runtime)
1431 @type lvm_fail: boolean
1432 @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1433 @type hyp_fail: boolean
1434 @ivar hyp_fail: whether the RPC call didn't return the instance list
1435 @type ghost: boolean
1436 @ivar ghost: whether this is a known node or not (config)
1437 @type os_fail: boolean
1438 @ivar os_fail: whether the RPC call didn't return valid OS data
1440 @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1441 @type vm_capable: boolean
1442 @ivar vm_capable: whether the node can host instances
1444 @ivar pv_min: size in MiB of the smallest PVs
1446 @ivar pv_max: size in MiB of the biggest PVs
1449 def __init__(self, offline=False, uuid=None, vm_capable=True):
1458 self.offline = offline
1459 self.vm_capable = vm_capable
1460 self.rpc_fail = False
1461 self.lvm_fail = False
1462 self.hyp_fail = False
1464 self.os_fail = False
1469 def ExpandNames(self):
1470 # This raises errors.OpPrereqError on its own:
1471 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1473 # Get instances in node group; this is unsafe and needs verification later
1475 self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1477 self.needed_locks = {
1478 locking.LEVEL_INSTANCE: self.cfg.GetInstanceNames(inst_uuids),
1479 locking.LEVEL_NODEGROUP: [self.group_uuid],
1480 locking.LEVEL_NODE: [],
1482 # This opcode is run by watcher every five minutes and acquires all nodes
1483 # for a group. It doesn't run for a long time, so it's better to acquire
1484 # the node allocation lock as well.
1485 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1488 self.share_locks = ShareAll()
1490 def DeclareLocks(self, level):
1491 if level == locking.LEVEL_NODE:
1492 # Get members of node group; this is unsafe and needs verification later
1493 nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1495 # In Exec(), we warn about mirrored instances that have primary and
1496 # secondary living in separate node groups. To fully verify that
1497 # volumes for these instances are healthy, we will need to do an
1498 # extra call to their secondaries. We ensure here those nodes will
1500 for inst_name in self.owned_locks(locking.LEVEL_INSTANCE):
1501 # Important: access only the instances whose lock is owned
1502 instance = self.cfg.GetInstanceInfoByName(inst_name)
1503 if instance.disk_template in constants.DTS_INT_MIRROR:
1504 nodes.update(instance.secondary_nodes)
1506 self.needed_locks[locking.LEVEL_NODE] = nodes
1508 def CheckPrereq(self):
1509 assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1510 self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1512 group_node_uuids = set(self.group_info.members)
1513 group_inst_uuids = \
1514 self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1516 unlocked_node_uuids = \
1517 group_node_uuids.difference(self.owned_locks(locking.LEVEL_NODE))
1519 unlocked_inst_uuids = \
1520 group_inst_uuids.difference(
1521 [self.cfg.GetInstanceInfoByName(name).uuid
1522 for name in self.owned_locks(locking.LEVEL_INSTANCE)])
1524 if unlocked_node_uuids:
1525 raise errors.OpPrereqError(
1526 "Missing lock for nodes: %s" %
1527 utils.CommaJoin(self.cfg.GetNodeNames(unlocked_node_uuids)),
1530 if unlocked_inst_uuids:
1531 raise errors.OpPrereqError(
1532 "Missing lock for instances: %s" %
1533 utils.CommaJoin(self.cfg.GetInstanceNames(unlocked_inst_uuids)),
1536 self.all_node_info = self.cfg.GetAllNodesInfo()
1537 self.all_inst_info = self.cfg.GetAllInstancesInfo()
1539 self.my_node_uuids = group_node_uuids
1540 self.my_node_info = dict((node_uuid, self.all_node_info[node_uuid])
1541 for node_uuid in group_node_uuids)
1543 self.my_inst_uuids = group_inst_uuids
1544 self.my_inst_info = dict((inst_uuid, self.all_inst_info[inst_uuid])
1545 for inst_uuid in group_inst_uuids)
1547 # We detect here the nodes that will need the extra RPC calls for verifying
1548 # split LV volumes; they should be locked.
1549 extra_lv_nodes = set()
1551 for inst in self.my_inst_info.values():
1552 if inst.disk_template in constants.DTS_INT_MIRROR:
1553 for nuuid in inst.all_nodes:
1554 if self.all_node_info[nuuid].group != self.group_uuid:
1555 extra_lv_nodes.add(nuuid)
1557 unlocked_lv_nodes = \
1558 extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1560 if unlocked_lv_nodes:
1561 raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1562 utils.CommaJoin(unlocked_lv_nodes),
1564 self.extra_lv_nodes = list(extra_lv_nodes)
1566 def _VerifyNode(self, ninfo, nresult):
1567 """Perform some basic validation on data returned from a node.
1569 - check the result data structure is well formed and has all the
1571 - check ganeti version
1573 @type ninfo: L{objects.Node}
1574 @param ninfo: the node to check
1575 @param nresult: the results from the node
1577 @return: whether overall this call was successful (and we can expect
1578 reasonable values in the respose)
1581 # main result, nresult should be a non-empty dict
1582 test = not nresult or not isinstance(nresult, dict)
1583 self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1584 "unable to verify node: no data returned")
1588 # compares ganeti version
1589 local_version = constants.PROTOCOL_VERSION
1590 remote_version = nresult.get("version", None)
1591 test = not (remote_version and
1592 isinstance(remote_version, (list, tuple)) and
1593 len(remote_version) == 2)
1594 self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1595 "connection to node returned invalid data")
1599 test = local_version != remote_version[0]
1600 self._ErrorIf(test, constants.CV_ENODEVERSION, ninfo.name,
1601 "incompatible protocol versions: master %s,"
1602 " node %s", local_version, remote_version[0])
1606 # node seems compatible, we can actually try to look into its results
1608 # full package version
1609 self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1610 constants.CV_ENODEVERSION, ninfo.name,
1611 "software version mismatch: master %s, node %s",
1612 constants.RELEASE_VERSION, remote_version[1],
1613 code=self.ETYPE_WARNING)
1615 hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1616 if ninfo.vm_capable and isinstance(hyp_result, dict):
1617 for hv_name, hv_result in hyp_result.iteritems():
1618 test = hv_result is not None
1619 self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
1620 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1622 hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1623 if ninfo.vm_capable and isinstance(hvp_result, list):
1624 for item, hv_name, hv_result in hvp_result:
1625 self._ErrorIf(True, constants.CV_ENODEHV, ninfo.name,
1626 "hypervisor %s parameter verify failure (source %s): %s",
1627 hv_name, item, hv_result)
1629 test = nresult.get(constants.NV_NODESETUP,
1630 ["Missing NODESETUP results"])
1631 self._ErrorIf(test, constants.CV_ENODESETUP, ninfo.name,
1632 "node setup error: %s", "; ".join(test))
1636 def _VerifyNodeTime(self, ninfo, nresult,
1637 nvinfo_starttime, nvinfo_endtime):
1638 """Check the node time.
1640 @type ninfo: L{objects.Node}
1641 @param ninfo: the node to check
1642 @param nresult: the remote results for the node
1643 @param nvinfo_starttime: the start time of the RPC call
1644 @param nvinfo_endtime: the end time of the RPC call
1647 ntime = nresult.get(constants.NV_TIME, None)
1649 ntime_merged = utils.MergeTime(ntime)
1650 except (ValueError, TypeError):
1651 self._ErrorIf(True, constants.CV_ENODETIME, ninfo.name,
1652 "Node returned invalid time")
1655 if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1656 ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1657 elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1658 ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1662 self._ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, ninfo.name,
1663 "Node time diverges by at least %s from master node time",
1666 def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1667 """Check the node LVM results and update info for cross-node checks.
1669 @type ninfo: L{objects.Node}
1670 @param ninfo: the node to check
1671 @param nresult: the remote results for the node
1672 @param vg_name: the configured VG name
1673 @type nimg: L{NodeImage}
1674 @param nimg: node image
1680 # checks vg existence and size > 20G
1681 vglist = nresult.get(constants.NV_VGLIST, None)
1683 self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
1684 "unable to check volume groups")
1686 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1687 constants.MIN_VG_SIZE)
1688 self._ErrorIf(vgstatus, constants.CV_ENODELVM, ninfo.name, vgstatus)
1691 (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1693 self._Error(constants.CV_ENODELVM, ninfo.name, em)
1694 if pvminmax is not None:
1695 (nimg.pv_min, nimg.pv_max) = pvminmax
1697 def _VerifyGroupDRBDVersion(self, node_verify_infos):
1698 """Check cross-node DRBD version consistency.
1700 @type node_verify_infos: dict
1701 @param node_verify_infos: infos about nodes as returned from the
1706 for node_uuid, ndata in node_verify_infos.items():
1707 nresult = ndata.payload
1708 version = nresult.get(constants.NV_DRBDVERSION, "Missing DRBD version")
1709 node_versions[node_uuid] = version
1711 if len(set(node_versions.values())) > 1:
1712 for node_uuid, version in sorted(node_versions.items()):
1713 msg = "DRBD version mismatch: %s" % version
1714 self._Error(constants.CV_ENODEDRBDHELPER, node_uuid, msg,
1715 code=self.ETYPE_WARNING)
1717 def _VerifyGroupLVM(self, node_image, vg_name):
1718 """Check cross-node consistency in LVM.
1720 @type node_image: dict
1721 @param node_image: info about nodes, mapping from node to names to
1722 L{NodeImage} objects
1723 @param vg_name: the configured VG name
1729 # Only exclusive storage needs this kind of checks
1730 if not self._exclusive_storage:
1733 # exclusive_storage wants all PVs to have the same size (approximately),
1734 # if the smallest and the biggest ones are okay, everything is fine.
1735 # pv_min is None iff pv_max is None
1736 vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1739 (pvmin, minnode_uuid) = min((ni.pv_min, ni.uuid) for ni in vals)
1740 (pvmax, maxnode_uuid) = max((ni.pv_max, ni.uuid) for ni in vals)
1741 bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1742 self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1743 "PV sizes differ too much in the group; smallest (%s MB) is"
1744 " on %s, biggest (%s MB) is on %s",
1745 pvmin, self.cfg.GetNodeName(minnode_uuid),
1746 pvmax, self.cfg.GetNodeName(maxnode_uuid))
1748 def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1749 """Check the node bridges.
1751 @type ninfo: L{objects.Node}
1752 @param ninfo: the node to check
1753 @param nresult: the remote results for the node
1754 @param bridges: the expected list of bridges
1760 missing = nresult.get(constants.NV_BRIDGES, None)
1761 test = not isinstance(missing, list)
1762 self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1763 "did not return valid bridge information")
1765 self._ErrorIf(bool(missing), constants.CV_ENODENET, ninfo.name,
1766 "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1768 def _VerifyNodeUserScripts(self, ninfo, nresult):
1769 """Check the results of user scripts presence and executability on the node
1771 @type ninfo: L{objects.Node}
1772 @param ninfo: the node to check
1773 @param nresult: the remote results for the node
1776 test = not constants.NV_USERSCRIPTS in nresult
1777 self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1778 "did not return user scripts information")
1780 broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
1782 self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1783 "user scripts not present or not executable: %s" %
1784 utils.CommaJoin(sorted(broken_scripts)))
1786 def _VerifyNodeNetwork(self, ninfo, nresult):
1787 """Check the node network connectivity results.
1789 @type ninfo: L{objects.Node}
1790 @param ninfo: the node to check
1791 @param nresult: the remote results for the node
1794 test = constants.NV_NODELIST not in nresult
1795 self._ErrorIf(test, constants.CV_ENODESSH, ninfo.name,
1796 "node hasn't returned node ssh connectivity data")
1798 if nresult[constants.NV_NODELIST]:
1799 for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1800 self._ErrorIf(True, constants.CV_ENODESSH, ninfo.name,
1801 "ssh communication with node '%s': %s", a_node, a_msg)
1803 test = constants.NV_NODENETTEST not in nresult
1804 self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1805 "node hasn't returned node tcp connectivity data")
1807 if nresult[constants.NV_NODENETTEST]:
1808 nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1810 self._ErrorIf(True, constants.CV_ENODENET, ninfo.name,
1811 "tcp communication with node '%s': %s",
1812 anode, nresult[constants.NV_NODENETTEST][anode])
1814 test = constants.NV_MASTERIP not in nresult
1815 self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1816 "node hasn't returned node master IP reachability data")
1818 if not nresult[constants.NV_MASTERIP]:
1819 if ninfo.uuid == self.master_node:
1820 msg = "the master node cannot reach the master IP (not configured?)"
1822 msg = "cannot reach the master IP"
1823 self._ErrorIf(True, constants.CV_ENODENET, ninfo.name, msg)
1825 def _VerifyInstance(self, instance, node_image, diskstatus):
1826 """Verify an instance.
1828 This function checks to see if the required block devices are
1829 available on the instance's node, and that the nodes are in the correct
1833 pnode_uuid = instance.primary_node
1834 pnode_img = node_image[pnode_uuid]
1835 groupinfo = self.cfg.GetAllNodeGroupsInfo()
1837 node_vol_should = {}
1838 instance.MapLVsByNode(node_vol_should)
1840 cluster = self.cfg.GetClusterInfo()
1841 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
1843 err = ComputeIPolicyInstanceViolation(ipolicy, instance, self.cfg)
1844 self._ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance.name,
1845 utils.CommaJoin(err), code=self.ETYPE_WARNING)
1847 for node_uuid in node_vol_should:
1848 n_img = node_image[node_uuid]
1849 if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1850 # ignore missing volumes on offline or broken nodes
1852 for volume in node_vol_should[node_uuid]:
1853 test = volume not in n_img.volumes
1854 self._ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance.name,
1855 "volume %s missing on node %s", volume,
1856 self.cfg.GetNodeName(node_uuid))
1858 if instance.admin_state == constants.ADMINST_UP:
1859 test = instance.uuid not in pnode_img.instances and not pnode_img.offline
1860 self._ErrorIf(test, constants.CV_EINSTANCEDOWN, instance.name,
1861 "instance not running on its primary node %s",
1862 self.cfg.GetNodeName(pnode_uuid))
1863 self._ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE,
1864 instance.name, "instance is marked as running and lives on"
1865 " offline node %s", self.cfg.GetNodeName(pnode_uuid))
1867 diskdata = [(nname, success, status, idx)
1868 for (nname, disks) in diskstatus.items()
1869 for idx, (success, status) in enumerate(disks)]
1871 for nname, success, bdev_status, idx in diskdata:
1872 # the 'ghost node' construction in Exec() ensures that we have a
1874 snode = node_image[nname]
1875 bad_snode = snode.ghost or snode.offline
1876 self._ErrorIf(instance.disks_active and
1877 not success and not bad_snode,
1878 constants.CV_EINSTANCEFAULTYDISK, instance.name,
1879 "couldn't retrieve status for disk/%s on %s: %s",
1880 idx, self.cfg.GetNodeName(nname), bdev_status)
1882 if instance.disks_active and success and \
1883 (bdev_status.is_degraded or
1884 bdev_status.ldisk_status != constants.LDS_OKAY):
1885 msg = "disk/%s on %s" % (idx, self.cfg.GetNodeName(nname))
1886 if bdev_status.is_degraded:
1887 msg += " is degraded"
1888 if bdev_status.ldisk_status != constants.LDS_OKAY:
1889 msg += "; state is '%s'" % \
1890 constants.LDS_NAMES[bdev_status.ldisk_status]
1892 self._Error(constants.CV_EINSTANCEFAULTYDISK, instance.name, msg)
1894 self._ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
1895 constants.CV_ENODERPC, self.cfg.GetNodeName(pnode_uuid),
1896 "instance %s, connection to primary node failed",
1899 self._ErrorIf(len(instance.secondary_nodes) > 1,
1900 constants.CV_EINSTANCELAYOUT, instance.name,
1901 "instance has multiple secondary nodes: %s",
1902 utils.CommaJoin(instance.secondary_nodes),
1903 code=self.ETYPE_WARNING)
1905 es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, instance.all_nodes)
1906 if any(es_flags.values()):
1907 if instance.disk_template not in constants.DTS_EXCL_STORAGE:
1908 # Disk template not compatible with exclusive_storage: no instance
1909 # node should have the flag set
1911 for (n, es) in es_flags.items()
1913 self._Error(constants.CV_EINSTANCEUNSUITABLENODE, instance.name,
1914 "instance has template %s, which is not supported on nodes"
1915 " that have exclusive storage set: %s",
1916 instance.disk_template,
1917 utils.CommaJoin(self.cfg.GetNodeNames(es_nodes)))
1918 for (idx, disk) in enumerate(instance.disks):
1919 self._ErrorIf(disk.spindles is None,
1920 constants.CV_EINSTANCEMISSINGCFGPARAMETER, instance.name,
1921 "number of spindles not configured for disk %s while"
1922 " exclusive storage is enabled, try running"
1923 " gnt-cluster repair-disk-sizes", idx)
1925 if instance.disk_template in constants.DTS_INT_MIRROR:
1926 instance_nodes = utils.NiceSort(instance.all_nodes)
1927 instance_groups = {}
1929 for node_uuid in instance_nodes:
1930 instance_groups.setdefault(self.all_node_info[node_uuid].group,
1931 []).append(node_uuid)
1934 "%s (group %s)" % (utils.CommaJoin(self.cfg.GetNodeNames(nodes)),
1935 groupinfo[group].name)
1936 # Sort so that we always list the primary node first.
1937 for group, nodes in sorted(instance_groups.items(),
1938 key=lambda (_, nodes): pnode_uuid in nodes,
1941 self._ErrorIf(len(instance_groups) > 1,
1942 constants.CV_EINSTANCESPLITGROUPS,
1943 instance.name, "instance has primary and secondary nodes in"
1944 " different groups: %s", utils.CommaJoin(pretty_list),
1945 code=self.ETYPE_WARNING)
1947 inst_nodes_offline = []
1948 for snode in instance.secondary_nodes:
1949 s_img = node_image[snode]
1950 self._ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
1951 self.cfg.GetNodeName(snode),
1952 "instance %s, connection to secondary node failed",
1956 inst_nodes_offline.append(snode)
1958 # warn that the instance lives on offline nodes
1959 self._ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE,
1960 instance.name, "instance has offline secondary node(s) %s",
1961 utils.CommaJoin(self.cfg.GetNodeNames(inst_nodes_offline)))
1962 # ... or ghost/non-vm_capable nodes
1963 for node_uuid in instance.all_nodes:
1964 self._ErrorIf(node_image[node_uuid].ghost, constants.CV_EINSTANCEBADNODE,
1965 instance.name, "instance lives on ghost node %s",
1966 self.cfg.GetNodeName(node_uuid))
1967 self._ErrorIf(not node_image[node_uuid].vm_capable,
1968 constants.CV_EINSTANCEBADNODE, instance.name,
1969 "instance lives on non-vm_capable node %s",
1970 self.cfg.GetNodeName(node_uuid))
1972 def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
1973 """Verify if there are any unknown volumes in the cluster.
1975 The .os, .swap and backup volumes are ignored. All other volumes are
1976 reported as unknown.
1978 @type reserved: L{ganeti.utils.FieldSet}
1979 @param reserved: a FieldSet of reserved volume names
1982 for node_uuid, n_img in node_image.items():
1983 if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
1984 self.all_node_info[node_uuid].group != self.group_uuid):
1985 # skip non-healthy nodes
1987 for volume in n_img.volumes:
1988 test = ((node_uuid not in node_vol_should or
1989 volume not in node_vol_should[node_uuid]) and
1990 not reserved.Matches(volume))
1991 self._ErrorIf(test, constants.CV_ENODEORPHANLV,
1992 self.cfg.GetNodeName(node_uuid),
1993 "volume %s is unknown", volume)
1995 def _VerifyNPlusOneMemory(self, node_image, all_insts):
1996 """Verify N+1 Memory Resilience.
1998 Check that if one single node dies we can still start all the
1999 instances it was primary for.
2002 cluster_info = self.cfg.GetClusterInfo()
2003 for node_uuid, n_img in node_image.items():
2004 # This code checks that every node which is now listed as
2005 # secondary has enough memory to host all instances it is
2006 # supposed to should a single other node in the cluster fail.
2007 # FIXME: not ready for failover to an arbitrary node
2008 # FIXME: does not support file-backed instances
2009 # WARNING: we currently take into account down instances as well
2010 # as up ones, considering that even if they're down someone
2011 # might want to start them even in the event of a node failure.
2012 if n_img.offline or \
2013 self.all_node_info[node_uuid].group != self.group_uuid:
2014 # we're skipping nodes marked offline and nodes in other groups from
2015 # the N+1 warning, since most likely we don't have good memory
2016 # infromation from them; we already list instances living on such
2017 # nodes, and that's enough warning
2019 #TODO(dynmem): also consider ballooning out other instances
2020 for prinode, inst_uuids in n_img.sbp.items():
2022 for inst_uuid in inst_uuids:
2023 bep = cluster_info.FillBE(all_insts[inst_uuid])
2024 if bep[constants.BE_AUTO_BALANCE]:
2025 needed_mem += bep[constants.BE_MINMEM]
2026 test = n_img.mfree < needed_mem
2027 self._ErrorIf(test, constants.CV_ENODEN1,
2028 self.cfg.GetNodeName(node_uuid),
2029 "not enough memory to accomodate instance failovers"
2030 " should node %s fail (%dMiB needed, %dMiB available)",
2031 self.cfg.GetNodeName(prinode), needed_mem, n_img.mfree)
2033 def _VerifyFiles(self, nodes, master_node_uuid, all_nvinfo,
2034 (files_all, files_opt, files_mc, files_vm)):
2035 """Verifies file checksums collected from all nodes.
2037 @param nodes: List of L{objects.Node} objects
2038 @param master_node_uuid: UUID of master node
2039 @param all_nvinfo: RPC results
2042 # Define functions determining which nodes to consider for a file
2045 (files_mc, lambda node: (node.master_candidate or
2046 node.uuid == master_node_uuid)),
2047 (files_vm, lambda node: node.vm_capable),
2050 # Build mapping from filename to list of nodes which should have the file
2052 for (files, fn) in files2nodefn:
2056 filenodes = filter(fn, nodes)
2057 nodefiles.update((filename,
2058 frozenset(map(operator.attrgetter("uuid"), filenodes)))
2059 for filename in files)
2061 assert set(nodefiles) == (files_all | files_mc | files_vm)
2063 fileinfo = dict((filename, {}) for filename in nodefiles)
2064 ignore_nodes = set()
2068 ignore_nodes.add(node.uuid)
2071 nresult = all_nvinfo[node.uuid]
2073 if nresult.fail_msg or not nresult.payload:
2076 fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
2077 node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2078 for (key, value) in fingerprints.items())
2081 test = not (node_files and isinstance(node_files, dict))
2082 self._ErrorIf(test, constants.CV_ENODEFILECHECK, node.name,
2083 "Node did not return file checksum data")
2085 ignore_nodes.add(node.uuid)
2088 # Build per-checksum mapping from filename to nodes having it
2089 for (filename, checksum) in node_files.items():
2090 assert filename in nodefiles
2091 fileinfo[filename].setdefault(checksum, set()).add(node.uuid)
2093 for (filename, checksums) in fileinfo.items():
2094 assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2096 # Nodes having the file
2097 with_file = frozenset(node_uuid
2098 for node_uuids in fileinfo[filename].values()
2099 for node_uuid in node_uuids) - ignore_nodes
2101 expected_nodes = nodefiles[filename] - ignore_nodes
2103 # Nodes missing file
2104 missing_file = expected_nodes - with_file
2106 if filename in files_opt:
2108 self._ErrorIf(missing_file and missing_file != expected_nodes,
2109 constants.CV_ECLUSTERFILECHECK, None,
2110 "File %s is optional, but it must exist on all or no"
2111 " nodes (not found on %s)",
2115 map(self.cfg.GetNodeName, missing_file))))
2117 self._ErrorIf(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2118 "File %s is missing from node(s) %s", filename,
2121 map(self.cfg.GetNodeName, missing_file))))
2123 # Warn if a node has a file it shouldn't
2124 unexpected = with_file - expected_nodes
2125 self._ErrorIf(unexpected,
2126 constants.CV_ECLUSTERFILECHECK, None,
2127 "File %s should not exist on node(s) %s",
2128 filename, utils.CommaJoin(
2129 utils.NiceSort(map(self.cfg.GetNodeName, unexpected))))
2131 # See if there are multiple versions of the file
2132 test = len(checksums) > 1
2134 variants = ["variant %s on %s" %
2136 utils.CommaJoin(utils.NiceSort(
2137 map(self.cfg.GetNodeName, node_uuids))))
2138 for (idx, (checksum, node_uuids)) in
2139 enumerate(sorted(checksums.items()))]
2143 self._ErrorIf(test, constants.CV_ECLUSTERFILECHECK, None,
2144 "File %s found with %s different checksums (%s)",
2145 filename, len(checksums), "; ".join(variants))
2147 def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2149 """Verifies and the node DRBD status.
2151 @type ninfo: L{objects.Node}
2152 @param ninfo: the node to check
2153 @param nresult: the remote results for the node
2154 @param instanceinfo: the dict of instances
2155 @param drbd_helper: the configured DRBD usermode helper
2156 @param drbd_map: the DRBD map as returned by
2157 L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2161 helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2162 test = (helper_result is None)
2163 self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2164 "no drbd usermode helper returned")
2166 status, payload = helper_result
2168 self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2169 "drbd usermode helper check unsuccessful: %s", payload)
2170 test = status and (payload != drbd_helper)
2171 self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2172 "wrong drbd usermode helper: %s", payload)
2174 # compute the DRBD minors
2176 for minor, inst_uuid in drbd_map[ninfo.uuid].items():
2177 test = inst_uuid not in instanceinfo
2178 self._ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2179 "ghost instance '%s' in temporary DRBD map", inst_uuid)
2180 # ghost instance should not be running, but otherwise we
2181 # don't give double warnings (both ghost instance and
2182 # unallocated minor in use)
2184 node_drbd[minor] = (inst_uuid, False)
2186 instance = instanceinfo[inst_uuid]
2187 node_drbd[minor] = (inst_uuid, instance.disks_active)
2189 # and now check them
2190 used_minors = nresult.get(constants.NV_DRBDLIST, [])
2191 test = not isinstance(used_minors, (tuple, list))
2192 self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2193 "cannot parse drbd status file: %s", str(used_minors))
2195 # we cannot check drbd status
2198 for minor, (inst_uuid, must_exist) in node_drbd.items():
2199 test = minor not in used_minors and must_exist
2200 self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2201 "drbd minor %d of instance %s is not active", minor,
2202 self.cfg.GetInstanceName(inst_uuid))
2203 for minor in used_minors:
2204 test = minor not in node_drbd
2205 self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2206 "unallocated drbd minor %d is in use", minor)
2208 def _UpdateNodeOS(self, ninfo, nresult, nimg):
2209 """Builds the node OS structures.
2211 @type ninfo: L{objects.Node}
2212 @param ninfo: the node to check
2213 @param nresult: the remote results for the node
2214 @param nimg: the node image object
2217 remote_os = nresult.get(constants.NV_OSLIST, None)
2218 test = (not isinstance(remote_os, list) or
2219 not compat.all(isinstance(v, list) and len(v) == 7
2220 for v in remote_os))
2222 self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2223 "node hasn't returned valid OS data")
2232 for (name, os_path, status, diagnose,
2233 variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2235 if name not in os_dict:
2238 # parameters is a list of lists instead of list of tuples due to
2239 # JSON lacking a real tuple type, fix it:
2240 parameters = [tuple(v) for v in parameters]
2241 os_dict[name].append((os_path, status, diagnose,
2242 set(variants), set(parameters), set(api_ver)))
2244 nimg.oslist = os_dict
2246 def _VerifyNodeOS(self, ninfo, nimg, base):
2247 """Verifies the node OS list.
2249 @type ninfo: L{objects.Node}
2250 @param ninfo: the node to check
2251 @param nimg: the node image object
2252 @param base: the 'template' node we match against (e.g. from the master)
2255 assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2257 beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2258 for os_name, os_data in nimg.oslist.items():
2259 assert os_data, "Empty OS status for OS %s?!" % os_name
2260 f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2261 self._ErrorIf(not f_status, constants.CV_ENODEOS, ninfo.name,
2262 "Invalid OS %s (located at %s): %s",
2263 os_name, f_path, f_diag)
2264 self._ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, ninfo.name,
2265 "OS '%s' has multiple entries"
2266 " (first one shadows the rest): %s",
2267 os_name, utils.CommaJoin([v[0] for v in os_data]))
2268 # comparisons with the 'base' image
2269 test = os_name not in base.oslist
2270 self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2271 "Extra OS %s not present on reference node (%s)",
2272 os_name, self.cfg.GetNodeName(base.uuid))
2275 assert base.oslist[os_name], "Base node has empty OS status?"
2276 _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2278 # base OS is invalid, skipping
2280 for kind, a, b in [("API version", f_api, b_api),
2281 ("variants list", f_var, b_var),
2282 ("parameters", beautify_params(f_param),
2283 beautify_params(b_param))]:
2284 self._ErrorIf(a != b, constants.CV_ENODEOS, ninfo.name,
2285 "OS %s for %s differs from reference node %s:"
2286 " [%s] vs. [%s]", kind, os_name,
2287 self.cfg.GetNodeName(base.uuid),
2288 utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2290 # check any missing OSes
2291 missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2292 self._ErrorIf(missing, constants.CV_ENODEOS, ninfo.name,
2293 "OSes present on reference node %s"
2294 " but missing on this node: %s",
2295 self.cfg.GetNodeName(base.uuid), utils.CommaJoin(missing))
2297 def _VerifyFileStoragePaths(self, ninfo, nresult, is_master,
2298 enabled_disk_templates):
2299 """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2301 @type ninfo: L{objects.Node}
2302 @param ninfo: the node to check
2303 @param nresult: the remote results for the node
2304 @type is_master: bool
2305 @param is_master: Whether node is the master node
2309 (utils.storage.IsFileStorageEnabled(enabled_disk_templates) or
2310 utils.storage.IsSharedFileStorageEnabled(enabled_disk_templates))):
2312 fspaths = nresult[constants.NV_FILE_STORAGE_PATHS]
2314 # This should never happen
2315 self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2316 "Node did not return forbidden file storage paths")
2318 self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2319 "Found forbidden file storage paths: %s",
2320 utils.CommaJoin(fspaths))
2322 self._ErrorIf(constants.NV_FILE_STORAGE_PATHS in nresult,
2323 constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2324 "Node should not have returned forbidden file storage"
2327 def _VerifyOob(self, ninfo, nresult):
2328 """Verifies out of band functionality of a node.
2330 @type ninfo: L{objects.Node}
2331 @param ninfo: the node to check
2332 @param nresult: the remote results for the node
2335 # We just have to verify the paths on master and/or master candidates
2336 # as the oob helper is invoked on the master
2337 if ((ninfo.master_candidate or ninfo.master_capable) and
2338 constants.NV_OOB_PATHS in nresult):
2339 for path_result in nresult[constants.NV_OOB_PATHS]:
2340 self._ErrorIf(path_result, constants.CV_ENODEOOBPATH,
2341 ninfo.name, path_result)
2343 def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2344 """Verifies and updates the node volume data.
2346 This function will update a L{NodeImage}'s internal structures
2347 with data from the remote call.
2349 @type ninfo: L{objects.Node}
2350 @param ninfo: the node to check
2351 @param nresult: the remote results for the node
2352 @param nimg: the node image object
2353 @param vg_name: the configured VG name
2356 nimg.lvm_fail = True
2357 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2360 elif isinstance(lvdata, basestring):
2361 self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2362 "LVM problem on node: %s", utils.SafeEncode(lvdata))
2363 elif not isinstance(lvdata, dict):
2364 self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2365 "rpc call to node failed (lvlist)")
2367 nimg.volumes = lvdata
2368 nimg.lvm_fail = False
2370 def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2371 """Verifies and updates the node instance list.
2373 If the listing was successful, then updates this node's instance
2374 list. Otherwise, it marks the RPC call as failed for the instance
2377 @type ninfo: L{objects.Node}
2378 @param ninfo: the node to check
2379 @param nresult: the remote results for the node
2380 @param nimg: the node image object
2383 idata = nresult.get(constants.NV_INSTANCELIST, None)
2384 test = not isinstance(idata, list)
2385 self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2386 "rpc call to node failed (instancelist): %s",
2387 utils.SafeEncode(str(idata)))
2389 nimg.hyp_fail = True
2391 nimg.instances = [inst.uuid for (_, inst) in
2392 self.cfg.GetMultiInstanceInfoByName(idata)]
2394 def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2395 """Verifies and computes a node information map
2397 @type ninfo: L{objects.Node}
2398 @param ninfo: the node to check
2399 @param nresult: the remote results for the node
2400 @param nimg: the node image object
2401 @param vg_name: the configured VG name
2404 # try to read free memory (from the hypervisor)
2405 hv_info = nresult.get(constants.NV_HVINFO, None)
2406 test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2407 self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2408 "rpc call to node failed (hvinfo)")
2411 nimg.mfree = int(hv_info["memory_free"])
2412 except (ValueError, TypeError):
2413 self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2414 "node returned invalid nodeinfo, check hypervisor")
2416 # FIXME: devise a free space model for file based instances as well
2417 if vg_name is not None:
2418 test = (constants.NV_VGLIST not in nresult or
2419 vg_name not in nresult[constants.NV_VGLIST])
2420 self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
2421 "node didn't return data for the volume group '%s'"
2422 " - it is either missing or broken", vg_name)
2425 nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2426 except (ValueError, TypeError):
2427 self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2428 "node returned invalid LVM info, check LVM status")
2430 def _CollectDiskInfo(self, node_uuids, node_image, instanceinfo):
2431 """Gets per-disk status information for all instances.
2433 @type node_uuids: list of strings
2434 @param node_uuids: Node UUIDs
2435 @type node_image: dict of (UUID, L{objects.Node})
2436 @param node_image: Node objects
2437 @type instanceinfo: dict of (UUID, L{objects.Instance})
2438 @param instanceinfo: Instance objects
2439 @rtype: {instance: {node: [(succes, payload)]}}
2440 @return: a dictionary of per-instance dictionaries with nodes as
2441 keys and disk information as values; the disk information is a
2442 list of tuples (success, payload)
2446 node_disks_devonly = {}
2447 diskless_instances = set()
2448 diskless = constants.DT_DISKLESS
2450 for nuuid in node_uuids:
2451 node_inst_uuids = list(itertools.chain(node_image[nuuid].pinst,
2452 node_image[nuuid].sinst))
2453 diskless_instances.update(uuid for uuid in node_inst_uuids
2454 if instanceinfo[uuid].disk_template == diskless)
2455 disks = [(inst_uuid, disk)
2456 for inst_uuid in node_inst_uuids
2457 for disk in instanceinfo[inst_uuid].disks]
2460 # No need to collect data
2463 node_disks[nuuid] = disks
2465 # _AnnotateDiskParams makes already copies of the disks
2467 for (inst_uuid, dev) in disks:
2468 (anno_disk,) = AnnotateDiskParams(instanceinfo[inst_uuid], [dev],
2470 self.cfg.SetDiskID(anno_disk, nuuid)
2471 devonly.append(anno_disk)
2473 node_disks_devonly[nuuid] = devonly
2475 assert len(node_disks) == len(node_disks_devonly)
2477 # Collect data from all nodes with disks
2478 result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2481 assert len(result) == len(node_disks)
2485 for (nuuid, nres) in result.items():
2486 node = self.cfg.GetNodeInfo(nuuid)
2487 disks = node_disks[node.uuid]
2490 # No data from this node
2491 data = len(disks) * [(False, "node offline")]
2494 self._ErrorIf(msg, constants.CV_ENODERPC, node.name,
2495 "while getting disk information: %s", msg)
2497 # No data from this node
2498 data = len(disks) * [(False, msg)]
2501 for idx, i in enumerate(nres.payload):
2502 if isinstance(i, (tuple, list)) and len(i) == 2:
2505 logging.warning("Invalid result from node %s, entry %d: %s",
2507 data.append((False, "Invalid result from the remote node"))
2509 for ((inst_uuid, _), status) in zip(disks, data):
2510 instdisk.setdefault(inst_uuid, {}).setdefault(node.uuid, []) \
2513 # Add empty entries for diskless instances.
2514 for inst_uuid in diskless_instances:
2515 assert inst_uuid not in instdisk
2516 instdisk[inst_uuid] = {}
2518 assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2519 len(nuuids) <= len(instanceinfo[inst].all_nodes) and
2520 compat.all(isinstance(s, (tuple, list)) and
2521 len(s) == 2 for s in statuses)
2522 for inst, nuuids in instdisk.items()
2523 for nuuid, statuses in nuuids.items())
2525 instdisk_keys = set(instdisk)
2526 instanceinfo_keys = set(instanceinfo)
2527 assert instdisk_keys == instanceinfo_keys, \
2528 ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2529 (instdisk_keys, instanceinfo_keys))
2534 def _SshNodeSelector(group_uuid, all_nodes):
2535 """Create endless iterators for all potential SSH check hosts.
2538 nodes = [node for node in all_nodes
2539 if (node.group != group_uuid and
2541 keyfunc = operator.attrgetter("group")
2543 return map(itertools.cycle,
2544 [sorted(map(operator.attrgetter("name"), names))
2545 for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2549 def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2550 """Choose which nodes should talk to which other nodes.
2552 We will make nodes contact all nodes in their group, and one node from
2555 @warning: This algorithm has a known issue if one node group is much
2556 smaller than others (e.g. just one node). In such a case all other
2557 nodes will talk to the single node.
2560 online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2561 sel = cls._SshNodeSelector(group_uuid, all_nodes)
2563 return (online_nodes,
2564 dict((name, sorted([i.next() for i in sel]))
2565 for name in online_nodes))
2567 def BuildHooksEnv(self):
2570 Cluster-Verify hooks just ran in the post phase and their failure makes
2571 the output be logged in the verify output and the verification to fail.
2575 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2578 env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2579 for node in self.my_node_info.values())
2583 def BuildHooksNodes(self):
2584 """Build hooks nodes.
2587 return ([], list(self.my_node_info.keys()))
2589 def Exec(self, feedback_fn):
2590 """Verify integrity of the node group, performing various test on nodes.
2593 # This method has too many local variables. pylint: disable=R0914
2594 feedback_fn("* Verifying group '%s'" % self.group_info.name)
2596 if not self.my_node_uuids:
2598 feedback_fn("* Empty node group, skipping verification")
2602 verbose = self.op.verbose
2603 self._feedback_fn = feedback_fn
2605 vg_name = self.cfg.GetVGName()
2606 drbd_helper = self.cfg.GetDRBDHelper()
2607 cluster = self.cfg.GetClusterInfo()
2608 hypervisors = cluster.enabled_hypervisors
2609 node_data_list = self.my_node_info.values()
2611 i_non_redundant = [] # Non redundant instances
2612 i_non_a_balanced = [] # Non auto-balanced instances
2613 i_offline = 0 # Count of offline instances
2614 n_offline = 0 # Count of offline nodes
2615 n_drained = 0 # Count of nodes being drained
2616 node_vol_should = {}
2618 # FIXME: verify OS list
2621 filemap = ComputeAncillaryFiles(cluster, False)
2623 # do local checksums
2624 master_node_uuid = self.master_node = self.cfg.GetMasterNode()
2625 master_ip = self.cfg.GetMasterIP()
2627 feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_uuids))
2630 if self.cfg.GetUseExternalMipScript():
2631 user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2633 node_verify_param = {
2634 constants.NV_FILELIST:
2635 map(vcluster.MakeVirtualPath,
2636 utils.UniqueSequence(filename
2637 for files in filemap
2638 for filename in files)),
2639 constants.NV_NODELIST:
2640 self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2641 self.all_node_info.values()),
2642 constants.NV_HYPERVISOR: hypervisors,
2643 constants.NV_HVPARAMS:
2644 _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2645 constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2646 for node in node_data_list
2647 if not node.offline],
2648 constants.NV_INSTANCELIST: hypervisors,
2649 constants.NV_VERSION: None,
2650 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2651 constants.NV_NODESETUP: None,
2652 constants.NV_TIME: None,
2653 constants.NV_MASTERIP: (self.cfg.GetMasterNodeName(), master_ip),
2654 constants.NV_OSLIST: None,
2655 constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2656 constants.NV_USERSCRIPTS: user_scripts,
2659 if vg_name is not None:
2660 node_verify_param[constants.NV_VGLIST] = None
2661 node_verify_param[constants.NV_LVLIST] = vg_name
2662 node_verify_param[constants.NV_PVLIST] = [vg_name]
2665 node_verify_param[constants.NV_DRBDVERSION] = None
2666 node_verify_param[constants.NV_DRBDLIST] = None
2667 node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2669 if constants.ENABLE_FILE_STORAGE or constants.ENABLE_SHARED_FILE_STORAGE:
2670 # Load file storage paths only from master node
2671 node_verify_param[constants.NV_FILE_STORAGE_PATHS] = \
2672 self.cfg.GetMasterNodeName()
2675 # FIXME: this needs to be changed per node-group, not cluster-wide
2677 default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2678 if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2679 bridges.add(default_nicpp[constants.NIC_LINK])
2680 for inst_uuid in self.my_inst_info.values():
2681 for nic in inst_uuid.nics:
2682 full_nic = cluster.SimpleFillNIC(nic.nicparams)
2683 if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2684 bridges.add(full_nic[constants.NIC_LINK])
2687 node_verify_param[constants.NV_BRIDGES] = list(bridges)
2689 # Build our expected cluster state
2690 node_image = dict((node.uuid, self.NodeImage(offline=node.offline,
2692 vm_capable=node.vm_capable))
2693 for node in node_data_list)
2697 for node in self.all_node_info.values():
2698 path = SupportsOob(self.cfg, node)
2699 if path and path not in oob_paths:
2700 oob_paths.append(path)
2703 node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2705 for inst_uuid in self.my_inst_uuids:
2706 instance = self.my_inst_info[inst_uuid]
2707 if instance.admin_state == constants.ADMINST_OFFLINE:
2710 for nuuid in instance.all_nodes:
2711 if nuuid not in node_image:
2712 gnode = self.NodeImage(uuid=nuuid)
2713 gnode.ghost = (nuuid not in self.all_node_info)
2714 node_image[nuuid] = gnode
2716 instance.MapLVsByNode(node_vol_should)
2718 pnode = instance.primary_node
2719 node_image[pnode].pinst.append(instance.uuid)
2721 for snode in instance.secondary_nodes:
2722 nimg = node_image[snode]
2723 nimg.sinst.append(instance.uuid)
2724 if pnode not in nimg.sbp:
2725 nimg.sbp[pnode] = []
2726 nimg.sbp[pnode].append(instance.uuid)
2728 es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
2729 self.my_node_info.keys())
2730 # The value of exclusive_storage should be the same across the group, so if
2731 # it's True for at least a node, we act as if it were set for all the nodes
2732 self._exclusive_storage = compat.any(es_flags.values())
2733 if self._exclusive_storage:
2734 node_verify_param[constants.NV_EXCLUSIVEPVS] = True
2736 # At this point, we have the in-memory data structures complete,
2737 # except for the runtime information, which we'll gather next
2739 # Due to the way our RPC system works, exact response times cannot be
2740 # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2741 # time before and after executing the request, we can at least have a time
2743 nvinfo_starttime = time.time()
2744 all_nvinfo = self.rpc.call_node_verify(self.my_node_uuids,
2746 self.cfg.GetClusterName(),
2747 self.cfg.GetClusterInfo().hvparams)
2748 nvinfo_endtime = time.time()
2750 if self.extra_lv_nodes and vg_name is not None:
2752 self.rpc.call_node_verify(self.extra_lv_nodes,
2753 {constants.NV_LVLIST: vg_name},
2754 self.cfg.GetClusterName(),
2755 self.cfg.GetClusterInfo().hvparams)
2757 extra_lv_nvinfo = {}
2759 all_drbd_map = self.cfg.ComputeDRBDMap()
2761 feedback_fn("* Gathering disk information (%s nodes)" %
2762 len(self.my_node_uuids))
2763 instdisk = self._CollectDiskInfo(self.my_node_info.keys(), node_image,
2766 feedback_fn("* Verifying configuration file consistency")
2768 # If not all nodes are being checked, we need to make sure the master node
2769 # and a non-checked vm_capable node are in the list.
2770 absent_node_uuids = set(self.all_node_info).difference(self.my_node_info)
2771 if absent_node_uuids:
2772 vf_nvinfo = all_nvinfo.copy()
2773 vf_node_info = list(self.my_node_info.values())
2774 additional_node_uuids = []
2775 if master_node_uuid not in self.my_node_info:
2776 additional_node_uuids.append(master_node_uuid)
2777 vf_node_info.append(self.all_node_info[master_node_uuid])
2778 # Add the first vm_capable node we find which is not included,
2779 # excluding the master node (which we already have)
2780 for node_uuid in absent_node_uuids:
2781 nodeinfo = self.all_node_info[node_uuid]
2782 if (nodeinfo.vm_capable and not nodeinfo.offline and
2783 node_uuid != master_node_uuid):
2784 additional_node_uuids.append(node_uuid)
2785 vf_node_info.append(self.all_node_info[node_uuid])
2787 key = constants.NV_FILELIST
2788 vf_nvinfo.update(self.rpc.call_node_verify(
2789 additional_node_uuids, {key: node_verify_param[key]},
2790 self.cfg.GetClusterName(), self.cfg.GetClusterInfo().hvparams))
2792 vf_nvinfo = all_nvinfo
2793 vf_node_info = self.my_node_info.values()
2795 self._VerifyFiles(vf_node_info, master_node_uuid, vf_nvinfo, filemap)
2797 feedback_fn("* Verifying node status")
2801 for node_i in node_data_list:
2802 nimg = node_image[node_i.uuid]
2806 feedback_fn("* Skipping offline node %s" % (node_i.name,))
2810 if node_i.uuid == master_node_uuid:
2812 elif node_i.master_candidate:
2813 ntype = "master candidate"
2814 elif node_i.drained:
2820 feedback_fn("* Verifying node %s (%s)" % (node_i.name, ntype))
2822 msg = all_nvinfo[node_i.uuid].fail_msg
2823 self._ErrorIf(msg, constants.CV_ENODERPC, node_i.name,
2824 "while contacting node: %s", msg)
2826 nimg.rpc_fail = True
2829 nresult = all_nvinfo[node_i.uuid].payload
2831 nimg.call_ok = self._VerifyNode(node_i, nresult)
2832 self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2833 self._VerifyNodeNetwork(node_i, nresult)
2834 self._VerifyNodeUserScripts(node_i, nresult)
2835 self._VerifyOob(node_i, nresult)
2836 self._VerifyFileStoragePaths(node_i, nresult,
2837 node_i.uuid == master_node_uuid,
2838 cluster.enabled_disk_templates)
2841 self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
2842 self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
2845 self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2846 self._UpdateNodeInstances(node_i, nresult, nimg)
2847 self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2848 self._UpdateNodeOS(node_i, nresult, nimg)
2850 if not nimg.os_fail:
2851 if refos_img is None:
2853 self._VerifyNodeOS(node_i, nimg, refos_img)
2854 self._VerifyNodeBridges(node_i, nresult, bridges)
2856 # Check whether all running instances are primary for the node. (This
2857 # can no longer be done from _VerifyInstance below, since some of the
2858 # wrong instances could be from other node groups.)
2859 non_primary_inst_uuids = set(nimg.instances).difference(nimg.pinst)
2861 for inst_uuid in non_primary_inst_uuids:
2862 test = inst_uuid in self.all_inst_info
2863 self._ErrorIf(test, constants.CV_EINSTANCEWRONGNODE,
2864 self.cfg.GetInstanceName(inst_uuid),
2865 "instance should not run on node %s", node_i.name)
2866 self._ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
2867 "node is running unknown instance %s", inst_uuid)
2869 self._VerifyGroupDRBDVersion(all_nvinfo)
2870 self._VerifyGroupLVM(node_image, vg_name)
2872 for node_uuid, result in extra_lv_nvinfo.items():
2873 self._UpdateNodeVolumes(self.all_node_info[node_uuid], result.payload,
2874 node_image[node_uuid], vg_name)
2876 feedback_fn("* Verifying instance status")
2877 for inst_uuid in self.my_inst_uuids:
2878 instance = self.my_inst_info[inst_uuid]
2880 feedback_fn("* Verifying instance %s" % instance.name)
2881 self._VerifyInstance(instance, node_image, instdisk[inst_uuid])
2883 # If the instance is non-redundant we cannot survive losing its primary
2884 # node, so we are not N+1 compliant.
2885 if instance.disk_template not in constants.DTS_MIRRORED:
2886 i_non_redundant.append(instance)
2888 if not cluster.FillBE(instance)[constants.BE_AUTO_BALANCE]:
2889 i_non_a_balanced.append(instance)
2891 feedback_fn("* Verifying orphan volumes")
2892 reserved = utils.FieldSet(*cluster.reserved_lvs)
2894 # We will get spurious "unknown volume" warnings if any node of this group
2895 # is secondary for an instance whose primary is in another group. To avoid
2896 # them, we find these instances and add their volumes to node_vol_should.
2897 for instance in self.all_inst_info.values():
2898 for secondary in instance.secondary_nodes:
2899 if (secondary in self.my_node_info
2900 and instance.name not in self.my_inst_info):
2901 instance.MapLVsByNode(node_vol_should)
2904 self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
2906 if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
2907 feedback_fn("* Verifying N+1 Memory redundancy")
2908 self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
2910 feedback_fn("* Other Notes")
2912 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
2913 % len(i_non_redundant))
2915 if i_non_a_balanced:
2916 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
2917 % len(i_non_a_balanced))
2920 feedback_fn(" - NOTICE: %d offline instance(s) found." % i_offline)
2923 feedback_fn(" - NOTICE: %d offline node(s) found." % n_offline)
2926 feedback_fn(" - NOTICE: %d drained node(s) found." % n_drained)
2930 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
2931 """Analyze the post-hooks' result
2933 This method analyses the hook result, handles it, and sends some
2934 nicely-formatted feedback back to the user.
2936 @param phase: one of L{constants.HOOKS_PHASE_POST} or
2937 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
2938 @param hooks_results: the results of the multi-node hooks rpc call
2939 @param feedback_fn: function used send feedback back to the caller
2940 @param lu_result: previous Exec result
2941 @return: the new Exec result, based on the previous result
2945 # We only really run POST phase hooks, only for non-empty groups,
2946 # and are only interested in their results
2947 if not self.my_node_uuids:
2950 elif phase == constants.HOOKS_PHASE_POST:
2951 # Used to change hooks' output to proper indentation
2952 feedback_fn("* Hooks Results")
2953 assert hooks_results, "invalid result from hooks"
2955 for node_name in hooks_results:
2956 res = hooks_results[node_name]
2958 test = msg and not res.offline
2959 self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
2960 "Communication failure in hooks execution: %s", msg)
2961 if res.offline or msg:
2962 # No need to investigate payload if node is offline or gave
2965 for script, hkr, output in res.payload:
2966 test = hkr == constants.HKR_FAIL
2967 self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
2968 "Script %s failed, output:", script)
2970 output = self._HOOKS_INDENT_RE.sub(" ", output)
2971 feedback_fn("%s" % output)
2977 class LUClusterVerifyDisks(NoHooksLU):
2978 """Verifies the cluster disks status.
2983 def ExpandNames(self):
2984 self.share_locks = ShareAll()
2985 self.needed_locks = {
2986 locking.LEVEL_NODEGROUP: locking.ALL_SET,
2989 def Exec(self, feedback_fn):
2990 group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
2992 # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
2993 return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
2994 for group in group_names])