4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Logical units dealing with the cluster."""
34 from ganeti import compat
35 from ganeti import constants
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import masterd
40 from ganeti import netutils
41 from ganeti import objects
42 from ganeti import opcodes
43 from ganeti import pathutils
44 from ganeti import query
45 from ganeti import rpc
46 from ganeti import runtime
47 from ganeti import ssh
48 from ganeti import uidpool
49 from ganeti import utils
50 from ganeti import vcluster
52 from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
54 from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55 ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56 GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57 GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58 CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59 ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob
61 import ganeti.masterd.instance
64 class LUClusterActivateMasterIp(NoHooksLU):
65 """Activate the master IP on the master node.
68 def Exec(self, feedback_fn):
69 """Activate the master IP.
72 master_params = self.cfg.GetMasterNetworkParameters()
73 ems = self.cfg.GetUseExternalMipScript()
74 result = self.rpc.call_node_activate_master_ip(master_params.name,
76 result.Raise("Could not activate the master IP")
79 class LUClusterDeactivateMasterIp(NoHooksLU):
80 """Deactivate the master IP on the master node.
83 def Exec(self, feedback_fn):
84 """Deactivate the master IP.
87 master_params = self.cfg.GetMasterNetworkParameters()
88 ems = self.cfg.GetUseExternalMipScript()
89 result = self.rpc.call_node_deactivate_master_ip(master_params.name,
91 result.Raise("Could not deactivate the master IP")
94 class LUClusterConfigQuery(NoHooksLU):
95 """Return configuration values.
100 def CheckArguments(self):
101 self.cq = ClusterQuery(None, self.op.output_fields, False)
103 def ExpandNames(self):
104 self.cq.ExpandNames(self)
106 def DeclareLocks(self, level):
107 self.cq.DeclareLocks(self, level)
109 def Exec(self, feedback_fn):
110 result = self.cq.OldStyleQuery(self)
112 assert len(result) == 1
117 class LUClusterDestroy(LogicalUnit):
118 """Logical unit for destroying the cluster.
121 HPATH = "cluster-destroy"
122 HTYPE = constants.HTYPE_CLUSTER
124 def BuildHooksEnv(self):
129 "OP_TARGET": self.cfg.GetClusterName(),
132 def BuildHooksNodes(self):
133 """Build hooks nodes.
138 def CheckPrereq(self):
139 """Check prerequisites.
141 This checks whether the cluster is empty.
143 Any errors are signaled by raising errors.OpPrereqError.
146 master = self.cfg.GetMasterNode()
148 nodelist = self.cfg.GetNodeList()
149 if len(nodelist) != 1 or nodelist[0] != master:
150 raise errors.OpPrereqError("There are still %d node(s) in"
151 " this cluster." % (len(nodelist) - 1),
153 instancelist = self.cfg.GetInstanceList()
155 raise errors.OpPrereqError("There are still %d instance(s) in"
156 " this cluster." % len(instancelist),
159 def Exec(self, feedback_fn):
160 """Destroys the cluster.
163 master_params = self.cfg.GetMasterNetworkParameters()
165 # Run post hooks on master node before it's removed
166 RunPostHook(self, master_params.name)
168 ems = self.cfg.GetUseExternalMipScript()
169 result = self.rpc.call_node_deactivate_master_ip(master_params.name,
171 result.Warn("Error disabling the master IP address", self.LogWarning)
172 return master_params.name
175 class LUClusterPostInit(LogicalUnit):
176 """Logical unit for running hooks after cluster initialization.
179 HPATH = "cluster-init"
180 HTYPE = constants.HTYPE_CLUSTER
182 def BuildHooksEnv(self):
187 "OP_TARGET": self.cfg.GetClusterName(),
190 def BuildHooksNodes(self):
191 """Build hooks nodes.
194 return ([], [self.cfg.GetMasterNode()])
196 def Exec(self, feedback_fn):
203 class ClusterQuery(QueryBase):
204 FIELDS = query.CLUSTER_FIELDS
206 #: Do not sort (there is only one item)
209 def ExpandNames(self, lu):
212 # The following variables interact with _QueryBase._GetNames
213 self.wanted = locking.ALL_SET
214 self.do_locking = self.use_locking
217 raise errors.OpPrereqError("Can not use locking for cluster queries",
220 def DeclareLocks(self, lu, level):
223 def _GetQueryData(self, lu):
224 """Computes the list of nodes and their attributes.
227 # Locking is not used
228 assert not (compat.any(lu.glm.is_owned(level)
229 for level in locking.LEVELS
230 if level != locking.LEVEL_CLUSTER) or
231 self.do_locking or self.use_locking)
233 if query.CQ_CONFIG in self.requested_data:
234 cluster = lu.cfg.GetClusterInfo()
236 cluster = NotImplemented
238 if query.CQ_QUEUE_DRAINED in self.requested_data:
239 drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
241 drain_flag = NotImplemented
243 if query.CQ_WATCHER_PAUSE in self.requested_data:
244 master_name = lu.cfg.GetMasterNode()
246 result = lu.rpc.call_get_watcher_pause(master_name)
247 result.Raise("Can't retrieve watcher pause from master node '%s'" %
250 watcher_pause = result.payload
252 watcher_pause = NotImplemented
254 return query.ClusterQueryData(cluster, drain_flag, watcher_pause)
257 class LUClusterQuery(NoHooksLU):
258 """Query cluster configuration.
263 def ExpandNames(self):
264 self.needed_locks = {}
266 def Exec(self, feedback_fn):
267 """Return cluster config.
270 cluster = self.cfg.GetClusterInfo()
273 # Filter just for enabled hypervisors
274 for os_name, hv_dict in cluster.os_hvp.items():
276 for hv_name, hv_params in hv_dict.items():
277 if hv_name in cluster.enabled_hypervisors:
278 os_hvp[os_name][hv_name] = hv_params
280 # Convert ip_family to ip_version
281 primary_ip_version = constants.IP4_VERSION
282 if cluster.primary_ip_family == netutils.IP6Address.family:
283 primary_ip_version = constants.IP6_VERSION
286 "software_version": constants.RELEASE_VERSION,
287 "protocol_version": constants.PROTOCOL_VERSION,
288 "config_version": constants.CONFIG_VERSION,
289 "os_api_version": max(constants.OS_API_VERSIONS),
290 "export_version": constants.EXPORT_VERSION,
291 "architecture": runtime.GetArchInfo(),
292 "name": cluster.cluster_name,
293 "master": cluster.master_node,
294 "default_hypervisor": cluster.primary_hypervisor,
295 "enabled_hypervisors": cluster.enabled_hypervisors,
296 "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
297 for hypervisor_name in cluster.enabled_hypervisors]),
299 "beparams": cluster.beparams,
300 "osparams": cluster.osparams,
301 "ipolicy": cluster.ipolicy,
302 "nicparams": cluster.nicparams,
303 "ndparams": cluster.ndparams,
304 "diskparams": cluster.diskparams,
305 "candidate_pool_size": cluster.candidate_pool_size,
306 "master_netdev": cluster.master_netdev,
307 "master_netmask": cluster.master_netmask,
308 "use_external_mip_script": cluster.use_external_mip_script,
309 "volume_group_name": cluster.volume_group_name,
310 "drbd_usermode_helper": cluster.drbd_usermode_helper,
311 "file_storage_dir": cluster.file_storage_dir,
312 "shared_file_storage_dir": cluster.shared_file_storage_dir,
313 "maintain_node_health": cluster.maintain_node_health,
314 "ctime": cluster.ctime,
315 "mtime": cluster.mtime,
316 "uuid": cluster.uuid,
317 "tags": list(cluster.GetTags()),
318 "uid_pool": cluster.uid_pool,
319 "default_iallocator": cluster.default_iallocator,
320 "reserved_lvs": cluster.reserved_lvs,
321 "primary_ip_version": primary_ip_version,
322 "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
323 "hidden_os": cluster.hidden_os,
324 "blacklisted_os": cluster.blacklisted_os,
330 class LUClusterRedistConf(NoHooksLU):
331 """Force the redistribution of cluster configuration.
333 This is a very simple LU.
338 def ExpandNames(self):
339 self.needed_locks = {
340 locking.LEVEL_NODE: locking.ALL_SET,
341 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
343 self.share_locks = ShareAll()
345 def Exec(self, feedback_fn):
346 """Redistribute the configuration.
349 self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
350 RedistributeAncillaryFiles(self)
353 class LUClusterRename(LogicalUnit):
354 """Rename the cluster.
357 HPATH = "cluster-rename"
358 HTYPE = constants.HTYPE_CLUSTER
360 def BuildHooksEnv(self):
365 "OP_TARGET": self.cfg.GetClusterName(),
366 "NEW_NAME": self.op.name,
369 def BuildHooksNodes(self):
370 """Build hooks nodes.
373 return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
375 def CheckPrereq(self):
376 """Verify that the passed name is a valid one.
379 hostname = netutils.GetHostname(name=self.op.name,
380 family=self.cfg.GetPrimaryIPFamily())
382 new_name = hostname.name
383 self.ip = new_ip = hostname.ip
384 old_name = self.cfg.GetClusterName()
385 old_ip = self.cfg.GetMasterIP()
386 if new_name == old_name and new_ip == old_ip:
387 raise errors.OpPrereqError("Neither the name nor the IP address of the"
388 " cluster has changed",
391 if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
392 raise errors.OpPrereqError("The given cluster IP address (%s) is"
393 " reachable on the network" %
394 new_ip, errors.ECODE_NOTUNIQUE)
396 self.op.name = new_name
398 def Exec(self, feedback_fn):
399 """Rename the cluster.
402 clustername = self.op.name
405 # shutdown the master IP
406 master_params = self.cfg.GetMasterNetworkParameters()
407 ems = self.cfg.GetUseExternalMipScript()
408 result = self.rpc.call_node_deactivate_master_ip(master_params.name,
410 result.Raise("Could not disable the master role")
413 cluster = self.cfg.GetClusterInfo()
414 cluster.cluster_name = clustername
415 cluster.master_ip = new_ip
416 self.cfg.Update(cluster, feedback_fn)
418 # update the known hosts file
419 ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
420 node_list = self.cfg.GetOnlineNodeList()
422 node_list.remove(master_params.name)
425 UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
427 master_params.ip = new_ip
428 result = self.rpc.call_node_activate_master_ip(master_params.name,
430 result.Warn("Could not re-enable the master role on the master,"
431 " please restart manually", self.LogWarning)
436 class LUClusterRepairDiskSizes(NoHooksLU):
437 """Verifies the cluster disks sizes.
442 def ExpandNames(self):
443 if self.op.instances:
444 self.wanted_names = GetWantedInstances(self, self.op.instances)
445 # Not getting the node allocation lock as only a specific set of
446 # instances (and their nodes) is going to be acquired
447 self.needed_locks = {
448 locking.LEVEL_NODE_RES: [],
449 locking.LEVEL_INSTANCE: self.wanted_names,
451 self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
453 self.wanted_names = None
454 self.needed_locks = {
455 locking.LEVEL_NODE_RES: locking.ALL_SET,
456 locking.LEVEL_INSTANCE: locking.ALL_SET,
458 # This opcode is acquires the node locks for all instances
459 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
463 locking.LEVEL_NODE_RES: 1,
464 locking.LEVEL_INSTANCE: 0,
465 locking.LEVEL_NODE_ALLOC: 1,
468 def DeclareLocks(self, level):
469 if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
470 self._LockInstancesNodes(primary_only=True, level=level)
472 def CheckPrereq(self):
473 """Check prerequisites.
475 This only checks the optional instance list against the existing names.
478 if self.wanted_names is None:
479 self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
481 self.wanted_instances = \
482 map(compat.snd, self.cfg.GetMultiInstanceInfo(self.wanted_names))
484 def _EnsureChildSizes(self, disk):
485 """Ensure children of the disk have the needed disk size.
487 This is valid mainly for DRBD8 and fixes an issue where the
488 children have smaller disk size.
490 @param disk: an L{ganeti.objects.Disk} object
493 if disk.dev_type == constants.LD_DRBD8:
494 assert disk.children, "Empty children for DRBD8?"
495 fchild = disk.children[0]
496 mismatch = fchild.size < disk.size
498 self.LogInfo("Child disk has size %d, parent %d, fixing",
499 fchild.size, disk.size)
500 fchild.size = disk.size
502 # and we recurse on this child only, not on the metadev
503 return self._EnsureChildSizes(fchild) or mismatch
507 def Exec(self, feedback_fn):
508 """Verify the size of cluster disks.
511 # TODO: check child disks too
512 # TODO: check differences in size between primary/secondary nodes
514 for instance in self.wanted_instances:
515 pnode = instance.primary_node
516 if pnode not in per_node_disks:
517 per_node_disks[pnode] = []
518 for idx, disk in enumerate(instance.disks):
519 per_node_disks[pnode].append((instance, idx, disk))
521 assert not (frozenset(per_node_disks.keys()) -
522 self.owned_locks(locking.LEVEL_NODE_RES)), \
523 "Not owning correct locks"
524 assert not self.owned_locks(locking.LEVEL_NODE)
526 es_flags = rpc.GetExclusiveStorageForNodeNames(self.cfg,
527 per_node_disks.keys())
530 for node, dskl in per_node_disks.items():
531 newl = [v[2].Copy() for v in dskl]
533 self.cfg.SetDiskID(dsk, node)
534 result = self.rpc.call_blockdev_getdimensions(node, newl)
536 self.LogWarning("Failure in blockdev_getdimensions call to node"
537 " %s, ignoring", node)
539 if len(result.payload) != len(dskl):
540 logging.warning("Invalid result from node %s: len(dksl)=%d,"
541 " result.payload=%s", node, len(dskl), result.payload)
542 self.LogWarning("Invalid result from node %s, ignoring node results",
545 for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
546 if dimensions is None:
547 self.LogWarning("Disk %d of instance %s did not return size"
548 " information, ignoring", idx, instance.name)
550 if not isinstance(dimensions, (tuple, list)):
551 self.LogWarning("Disk %d of instance %s did not return valid"
552 " dimension information, ignoring", idx,
555 (size, spindles) = dimensions
556 if not isinstance(size, (int, long)):
557 self.LogWarning("Disk %d of instance %s did not return valid"
558 " size information, ignoring", idx, instance.name)
561 if size != disk.size:
562 self.LogInfo("Disk %d of instance %s has mismatched size,"
563 " correcting: recorded %d, actual %d", idx,
564 instance.name, disk.size, size)
566 self.cfg.Update(instance, feedback_fn)
567 changed.append((instance.name, idx, "size", size))
570 self.LogWarning("Disk %d of instance %s did not return valid"
571 " spindles information, ignoring", idx,
573 elif disk.spindles is None or disk.spindles != spindles:
574 self.LogInfo("Disk %d of instance %s has mismatched spindles,"
575 " correcting: recorded %s, actual %s",
576 idx, instance.name, disk.spindles, spindles)
577 disk.spindles = spindles
578 self.cfg.Update(instance, feedback_fn)
579 changed.append((instance.name, idx, "spindles", disk.spindles))
580 if self._EnsureChildSizes(disk):
581 self.cfg.Update(instance, feedback_fn)
582 changed.append((instance.name, idx, "size", disk.size))
586 def _ValidateNetmask(cfg, netmask):
587 """Checks if a netmask is valid.
589 @type cfg: L{config.ConfigWriter}
590 @param cfg: The cluster configuration
592 @param netmask: the netmask to be verified
593 @raise errors.OpPrereqError: if the validation fails
596 ip_family = cfg.GetPrimaryIPFamily()
598 ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
599 except errors.ProgrammerError:
600 raise errors.OpPrereqError("Invalid primary ip family: %s." %
601 ip_family, errors.ECODE_INVAL)
602 if not ipcls.ValidateNetmask(netmask):
603 raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
604 (netmask), errors.ECODE_INVAL)
607 class LUClusterSetParams(LogicalUnit):
608 """Change the parameters of the cluster.
611 HPATH = "cluster-modify"
612 HTYPE = constants.HTYPE_CLUSTER
615 def CheckArguments(self):
620 uidpool.CheckUidPool(self.op.uid_pool)
623 uidpool.CheckUidPool(self.op.add_uids)
625 if self.op.remove_uids:
626 uidpool.CheckUidPool(self.op.remove_uids)
628 if self.op.master_netmask is not None:
629 _ValidateNetmask(self.cfg, self.op.master_netmask)
631 if self.op.diskparams:
632 for dt_params in self.op.diskparams.values():
633 utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
635 utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
636 except errors.OpPrereqError, err:
637 raise errors.OpPrereqError("While verify diskparams options: %s" % err,
640 def ExpandNames(self):
641 # FIXME: in the future maybe other cluster params won't require checking on
642 # all nodes to be modified.
643 # FIXME: This opcode changes cluster-wide settings. Is acquiring all
644 # resource locks the right thing, shouldn't it be the BGL instead?
645 self.needed_locks = {
646 locking.LEVEL_NODE: locking.ALL_SET,
647 locking.LEVEL_INSTANCE: locking.ALL_SET,
648 locking.LEVEL_NODEGROUP: locking.ALL_SET,
649 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
651 self.share_locks = ShareAll()
653 def BuildHooksEnv(self):
658 "OP_TARGET": self.cfg.GetClusterName(),
659 "NEW_VG_NAME": self.op.vg_name,
662 def BuildHooksNodes(self):
663 """Build hooks nodes.
666 mn = self.cfg.GetMasterNode()
669 def _CheckVgName(self, node_list, enabled_disk_templates,
670 new_enabled_disk_templates):
671 """Check the consistency of the vg name on all nodes and in case it gets
672 unset whether there are instances still using it.
675 if self.op.vg_name is not None and not self.op.vg_name:
676 if self.cfg.HasAnyDiskOfType(constants.LD_LV):
677 raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
678 " instances exist", errors.ECODE_INVAL)
680 if (self.op.vg_name is not None and
681 utils.IsLvmEnabled(enabled_disk_templates)) or \
682 (self.cfg.GetVGName() is not None and
683 utils.LvmGetsEnabled(enabled_disk_templates,
684 new_enabled_disk_templates)):
685 self._CheckVgNameOnNodes(node_list)
687 def _CheckVgNameOnNodes(self, node_list):
688 """Check the status of the volume group on each node.
691 vglist = self.rpc.call_vg_list(node_list)
692 for node in node_list:
693 msg = vglist[node].fail_msg
696 self.LogWarning("Error while gathering data on node %s"
697 " (ignoring node): %s", node, msg)
699 vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
701 constants.MIN_VG_SIZE)
703 raise errors.OpPrereqError("Error on node '%s': %s" %
704 (node, vgstatus), errors.ECODE_ENVIRON)
706 def _GetEnabledDiskTemplates(self, cluster):
707 """Determines the enabled disk templates and the subset of disk templates
708 that are newly enabled by this operation.
711 enabled_disk_templates = None
712 new_enabled_disk_templates = []
713 if self.op.enabled_disk_templates:
714 enabled_disk_templates = self.op.enabled_disk_templates
715 new_enabled_disk_templates = \
716 list(set(enabled_disk_templates)
717 - set(cluster.enabled_disk_templates))
719 enabled_disk_templates = cluster.enabled_disk_templates
720 return (enabled_disk_templates, new_enabled_disk_templates)
722 def CheckPrereq(self):
723 """Check prerequisites.
725 This checks whether the given params don't conflict and
726 if the given volume group is valid.
729 if self.op.drbd_helper is not None and not self.op.drbd_helper:
730 if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
731 raise errors.OpPrereqError("Cannot disable drbd helper while"
732 " drbd-based instances exist",
735 node_list = self.owned_locks(locking.LEVEL_NODE)
736 self.cluster = cluster = self.cfg.GetClusterInfo()
738 vm_capable_nodes = [node.name
739 for node in self.cfg.GetAllNodesInfo().values()
740 if node.name in node_list and node.vm_capable]
742 (enabled_disk_templates, new_enabled_disk_templates) = \
743 self._GetEnabledDiskTemplates(cluster)
745 self._CheckVgName(vm_capable_nodes, enabled_disk_templates,
746 new_enabled_disk_templates)
748 if self.op.drbd_helper:
749 # checks given drbd helper on all nodes
750 helpers = self.rpc.call_drbd_helper(node_list)
751 for (node, ninfo) in self.cfg.GetMultiNodeInfo(node_list):
753 self.LogInfo("Not checking drbd helper on offline node %s", node)
755 msg = helpers[node].fail_msg
757 raise errors.OpPrereqError("Error checking drbd helper on node"
758 " '%s': %s" % (node, msg),
759 errors.ECODE_ENVIRON)
760 node_helper = helpers[node].payload
761 if node_helper != self.op.drbd_helper:
762 raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
763 (node, node_helper), errors.ECODE_ENVIRON)
765 # validate params changes
767 objects.UpgradeBeParams(self.op.beparams)
768 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
769 self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
772 utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
773 self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
775 # TODO: we need a more general way to handle resetting
776 # cluster-level parameters to default values
777 if self.new_ndparams["oob_program"] == "":
778 self.new_ndparams["oob_program"] = \
779 constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
782 new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
783 self.cluster.hv_state_static)
784 self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
785 for hv, values in new_hv_state.items())
787 if self.op.disk_state:
788 new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
789 self.cluster.disk_state_static)
790 self.new_disk_state = \
791 dict((storage, dict((name, cluster.SimpleFillDiskState(values))
792 for name, values in svalues.items()))
793 for storage, svalues in new_disk_state.items())
796 self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
799 all_instances = self.cfg.GetAllInstancesInfo().values()
801 for group in self.cfg.GetAllNodeGroupsInfo().values():
802 instances = frozenset([inst for inst in all_instances
803 if compat.any(node in group.members
804 for node in inst.all_nodes)])
805 new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
806 ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
807 new = ComputeNewInstanceViolations(ipol,
808 new_ipolicy, instances, self.cfg)
810 violations.update(new)
813 self.LogWarning("After the ipolicy change the following instances"
815 utils.CommaJoin(utils.NiceSort(violations)))
817 if self.op.nicparams:
818 utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
819 self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
820 objects.NIC.CheckParameterSyntax(self.new_nicparams)
823 # check all instances for consistency
824 for instance in self.cfg.GetAllInstancesInfo().values():
825 for nic_idx, nic in enumerate(instance.nics):
826 params_copy = copy.deepcopy(nic.nicparams)
827 params_filled = objects.FillDict(self.new_nicparams, params_copy)
829 # check parameter syntax
831 objects.NIC.CheckParameterSyntax(params_filled)
832 except errors.ConfigurationError, err:
833 nic_errors.append("Instance %s, nic/%d: %s" %
834 (instance.name, nic_idx, err))
836 # if we're moving instances to routed, check that they have an ip
837 target_mode = params_filled[constants.NIC_MODE]
838 if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
839 nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
840 " address" % (instance.name, nic_idx))
842 raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
843 "\n".join(nic_errors), errors.ECODE_INVAL)
845 # hypervisor list/parameters
846 self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
848 for hv_name, hv_dict in self.op.hvparams.items():
849 if hv_name not in self.new_hvparams:
850 self.new_hvparams[hv_name] = hv_dict
852 self.new_hvparams[hv_name].update(hv_dict)
854 # disk template parameters
855 self.new_diskparams = objects.FillDict(cluster.diskparams, {})
856 if self.op.diskparams:
857 for dt_name, dt_params in self.op.diskparams.items():
858 if dt_name not in self.op.diskparams:
859 self.new_diskparams[dt_name] = dt_params
861 self.new_diskparams[dt_name].update(dt_params)
863 # os hypervisor parameters
864 self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
866 for os_name, hvs in self.op.os_hvp.items():
867 if os_name not in self.new_os_hvp:
868 self.new_os_hvp[os_name] = hvs
870 for hv_name, hv_dict in hvs.items():
872 # Delete if it exists
873 self.new_os_hvp[os_name].pop(hv_name, None)
874 elif hv_name not in self.new_os_hvp[os_name]:
875 self.new_os_hvp[os_name][hv_name] = hv_dict
877 self.new_os_hvp[os_name][hv_name].update(hv_dict)
880 self.new_osp = objects.FillDict(cluster.osparams, {})
882 for os_name, osp in self.op.osparams.items():
883 if os_name not in self.new_osp:
884 self.new_osp[os_name] = {}
886 self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
889 if not self.new_osp[os_name]:
890 # we removed all parameters
891 del self.new_osp[os_name]
893 # check the parameter validity (remote check)
894 CheckOSParams(self, False, [self.cfg.GetMasterNode()],
895 os_name, self.new_osp[os_name])
897 # changes to the hypervisor list
898 if self.op.enabled_hypervisors is not None:
899 self.hv_list = self.op.enabled_hypervisors
900 for hv in self.hv_list:
901 # if the hypervisor doesn't already exist in the cluster
902 # hvparams, we initialize it to empty, and then (in both
903 # cases) we make sure to fill the defaults, as we might not
904 # have a complete defaults list if the hypervisor wasn't
906 if hv not in new_hvp:
908 new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
909 utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
911 self.hv_list = cluster.enabled_hypervisors
913 if self.op.hvparams or self.op.enabled_hypervisors is not None:
914 # either the enabled list has changed, or the parameters have, validate
915 for hv_name, hv_params in self.new_hvparams.items():
916 if ((self.op.hvparams and hv_name in self.op.hvparams) or
917 (self.op.enabled_hypervisors and
918 hv_name in self.op.enabled_hypervisors)):
919 # either this is a new hypervisor, or its parameters have changed
920 hv_class = hypervisor.GetHypervisorClass(hv_name)
921 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
922 hv_class.CheckParameterSyntax(hv_params)
923 CheckHVParams(self, node_list, hv_name, hv_params)
925 self._CheckDiskTemplateConsistency()
928 # no need to check any newly-enabled hypervisors, since the
929 # defaults have already been checked in the above code-block
930 for os_name, os_hvp in self.new_os_hvp.items():
931 for hv_name, hv_params in os_hvp.items():
932 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
933 # we need to fill in the new os_hvp on top of the actual hv_p
934 cluster_defaults = self.new_hvparams.get(hv_name, {})
935 new_osp = objects.FillDict(cluster_defaults, hv_params)
936 hv_class = hypervisor.GetHypervisorClass(hv_name)
937 hv_class.CheckParameterSyntax(new_osp)
938 CheckHVParams(self, node_list, hv_name, new_osp)
940 if self.op.default_iallocator:
941 alloc_script = utils.FindFile(self.op.default_iallocator,
942 constants.IALLOCATOR_SEARCH_PATH,
944 if alloc_script is None:
945 raise errors.OpPrereqError("Invalid default iallocator script '%s'"
946 " specified" % self.op.default_iallocator,
949 def _CheckDiskTemplateConsistency(self):
950 """Check whether the disk templates that are going to be disabled
951 are still in use by some instances.
954 if self.op.enabled_disk_templates:
955 cluster = self.cfg.GetClusterInfo()
956 instances = self.cfg.GetAllInstancesInfo()
958 disk_templates_to_remove = set(cluster.enabled_disk_templates) \
959 - set(self.op.enabled_disk_templates)
960 for instance in instances.itervalues():
961 if instance.disk_template in disk_templates_to_remove:
962 raise errors.OpPrereqError("Cannot disable disk template '%s',"
963 " because instance '%s' is using it." %
964 (instance.disk_template, instance.name))
966 def _SetVgName(self, feedback_fn):
967 """Determines and sets the new volume group name.
970 if self.op.vg_name is not None:
971 if self.op.vg_name and not \
972 utils.IsLvmEnabled(self.cluster.enabled_disk_templates):
973 feedback_fn("Note that you specified a volume group, but did not"
974 " enable any lvm disk template.")
975 new_volume = self.op.vg_name
977 if utils.IsLvmEnabled(self.cluster.enabled_disk_templates):
978 raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
979 " disk templates are enabled.")
981 if new_volume != self.cfg.GetVGName():
982 self.cfg.SetVGName(new_volume)
984 feedback_fn("Cluster LVM configuration already in desired"
985 " state, not changing")
987 if utils.IsLvmEnabled(self.cluster.enabled_disk_templates) and \
988 not self.cfg.GetVGName():
989 raise errors.OpPrereqError("Please specify a volume group when"
990 " enabling lvm-based disk-templates.")
992 def Exec(self, feedback_fn):
993 """Change the parameters of the cluster.
996 if self.op.enabled_disk_templates:
997 self.cluster.enabled_disk_templates = \
998 list(set(self.op.enabled_disk_templates))
1000 self._SetVgName(feedback_fn)
1002 if self.op.drbd_helper is not None:
1003 if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
1004 feedback_fn("Note that you specified a drbd user helper, but did"
1005 " enabled the drbd disk template.")
1006 new_helper = self.op.drbd_helper
1009 if new_helper != self.cfg.GetDRBDHelper():
1010 self.cfg.SetDRBDHelper(new_helper)
1012 feedback_fn("Cluster DRBD helper already in desired state,"
1014 if self.op.hvparams:
1015 self.cluster.hvparams = self.new_hvparams
1017 self.cluster.os_hvp = self.new_os_hvp
1018 if self.op.enabled_hypervisors is not None:
1019 self.cluster.hvparams = self.new_hvparams
1020 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1021 if self.op.beparams:
1022 self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1023 if self.op.nicparams:
1024 self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1026 self.cluster.ipolicy = self.new_ipolicy
1027 if self.op.osparams:
1028 self.cluster.osparams = self.new_osp
1029 if self.op.ndparams:
1030 self.cluster.ndparams = self.new_ndparams
1031 if self.op.diskparams:
1032 self.cluster.diskparams = self.new_diskparams
1033 if self.op.hv_state:
1034 self.cluster.hv_state_static = self.new_hv_state
1035 if self.op.disk_state:
1036 self.cluster.disk_state_static = self.new_disk_state
1038 if self.op.candidate_pool_size is not None:
1039 self.cluster.candidate_pool_size = self.op.candidate_pool_size
1040 # we need to update the pool size here, otherwise the save will fail
1041 AdjustCandidatePool(self, [])
1043 if self.op.maintain_node_health is not None:
1044 if self.op.maintain_node_health and not constants.ENABLE_CONFD:
1045 feedback_fn("Note: CONFD was disabled at build time, node health"
1046 " maintenance is not useful (still enabling it)")
1047 self.cluster.maintain_node_health = self.op.maintain_node_health
1049 if self.op.prealloc_wipe_disks is not None:
1050 self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
1052 if self.op.add_uids is not None:
1053 uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
1055 if self.op.remove_uids is not None:
1056 uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
1058 if self.op.uid_pool is not None:
1059 self.cluster.uid_pool = self.op.uid_pool
1061 if self.op.default_iallocator is not None:
1062 self.cluster.default_iallocator = self.op.default_iallocator
1064 if self.op.reserved_lvs is not None:
1065 self.cluster.reserved_lvs = self.op.reserved_lvs
1067 if self.op.use_external_mip_script is not None:
1068 self.cluster.use_external_mip_script = self.op.use_external_mip_script
1070 def helper_os(aname, mods, desc):
1072 lst = getattr(self.cluster, aname)
1073 for key, val in mods:
1074 if key == constants.DDM_ADD:
1076 feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1079 elif key == constants.DDM_REMOVE:
1083 feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1085 raise errors.ProgrammerError("Invalid modification '%s'" % key)
1087 if self.op.hidden_os:
1088 helper_os("hidden_os", self.op.hidden_os, "hidden")
1090 if self.op.blacklisted_os:
1091 helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1093 if self.op.master_netdev:
1094 master_params = self.cfg.GetMasterNetworkParameters()
1095 ems = self.cfg.GetUseExternalMipScript()
1096 feedback_fn("Shutting down master ip on the current netdev (%s)" %
1097 self.cluster.master_netdev)
1098 result = self.rpc.call_node_deactivate_master_ip(master_params.name,
1100 result.Raise("Could not disable the master ip")
1101 feedback_fn("Changing master_netdev from %s to %s" %
1102 (master_params.netdev, self.op.master_netdev))
1103 self.cluster.master_netdev = self.op.master_netdev
1105 if self.op.master_netmask:
1106 master_params = self.cfg.GetMasterNetworkParameters()
1107 feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1108 result = self.rpc.call_node_change_master_netmask(master_params.name,
1109 master_params.netmask,
1110 self.op.master_netmask,
1112 master_params.netdev)
1113 result.Warn("Could not change the master IP netmask", feedback_fn)
1114 self.cluster.master_netmask = self.op.master_netmask
1116 self.cfg.Update(self.cluster, feedback_fn)
1118 if self.op.master_netdev:
1119 master_params = self.cfg.GetMasterNetworkParameters()
1120 feedback_fn("Starting the master ip on the new master netdev (%s)" %
1121 self.op.master_netdev)
1122 ems = self.cfg.GetUseExternalMipScript()
1123 result = self.rpc.call_node_activate_master_ip(master_params.name,
1125 result.Warn("Could not re-enable the master ip on the master,"
1126 " please restart manually", self.LogWarning)
1129 class LUClusterVerify(NoHooksLU):
1130 """Submits all jobs necessary to verify the cluster.
1135 def ExpandNames(self):
1136 self.needed_locks = {}
1138 def Exec(self, feedback_fn):
1141 if self.op.group_name:
1142 groups = [self.op.group_name]
1143 depends_fn = lambda: None
1145 groups = self.cfg.GetNodeGroupList()
1147 # Verify global configuration
1149 opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1152 # Always depend on global verification
1153 depends_fn = lambda: [(-len(jobs), [])]
1156 [opcodes.OpClusterVerifyGroup(group_name=group,
1157 ignore_errors=self.op.ignore_errors,
1158 depends=depends_fn())]
1159 for group in groups)
1161 # Fix up all parameters
1162 for op in itertools.chain(*jobs): # pylint: disable=W0142
1163 op.debug_simulate_errors = self.op.debug_simulate_errors
1164 op.verbose = self.op.verbose
1165 op.error_codes = self.op.error_codes
1167 op.skip_checks = self.op.skip_checks
1168 except AttributeError:
1169 assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1171 return ResultWithJobs(jobs)
1174 class _VerifyErrors(object):
1175 """Mix-in for cluster/group verify LUs.
1177 It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1178 self.op and self._feedback_fn to be available.)
1182 ETYPE_FIELD = "code"
1183 ETYPE_ERROR = "ERROR"
1184 ETYPE_WARNING = "WARNING"
1186 def _Error(self, ecode, item, msg, *args, **kwargs):
1187 """Format an error message.
1189 Based on the opcode's error_codes parameter, either format a
1190 parseable error code, or a simpler error string.
1192 This must be called only from Exec and functions called from Exec.
1195 ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1196 itype, etxt, _ = ecode
1197 # If the error code is in the list of ignored errors, demote the error to a
1199 if etxt in self.op.ignore_errors: # pylint: disable=E1101
1200 ltype = self.ETYPE_WARNING
1201 # first complete the msg
1204 # then format the whole message
1205 if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1206 msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1212 msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1213 # and finally report it via the feedback_fn
1214 self._feedback_fn(" - %s" % msg) # Mix-in. pylint: disable=E1101
1215 # do not mark the operation as failed for WARN cases only
1216 if ltype == self.ETYPE_ERROR:
1219 def _ErrorIf(self, cond, *args, **kwargs):
1220 """Log an error message if the passed condition is True.
1224 or self.op.debug_simulate_errors): # pylint: disable=E1101
1225 self._Error(*args, **kwargs)
1228 def _VerifyCertificate(filename):
1229 """Verifies a certificate for L{LUClusterVerifyConfig}.
1231 @type filename: string
1232 @param filename: Path to PEM file
1236 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1237 utils.ReadFile(filename))
1238 except Exception, err: # pylint: disable=W0703
1239 return (LUClusterVerifyConfig.ETYPE_ERROR,
1240 "Failed to load X509 certificate %s: %s" % (filename, err))
1243 utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1244 constants.SSL_CERT_EXPIRATION_ERROR)
1247 fnamemsg = "While verifying %s: %s" % (filename, msg)
1252 return (None, fnamemsg)
1253 elif errcode == utils.CERT_WARNING:
1254 return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1255 elif errcode == utils.CERT_ERROR:
1256 return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1258 raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1261 def _GetAllHypervisorParameters(cluster, instances):
1262 """Compute the set of all hypervisor parameters.
1264 @type cluster: L{objects.Cluster}
1265 @param cluster: the cluster object
1266 @param instances: list of L{objects.Instance}
1267 @param instances: additional instances from which to obtain parameters
1268 @rtype: list of (origin, hypervisor, parameters)
1269 @return: a list with all parameters found, indicating the hypervisor they
1270 apply to, and the origin (can be "cluster", "os X", or "instance Y")
1275 for hv_name in cluster.enabled_hypervisors:
1276 hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1278 for os_name, os_hvp in cluster.os_hvp.items():
1279 for hv_name, hv_params in os_hvp.items():
1281 full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1282 hvp_data.append(("os %s" % os_name, hv_name, full_params))
1284 # TODO: collapse identical parameter values in a single one
1285 for instance in instances:
1286 if instance.hvparams:
1287 hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1288 cluster.FillHV(instance)))
1293 class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1294 """Verifies the cluster config.
1299 def _VerifyHVP(self, hvp_data):
1300 """Verifies locally the syntax of the hypervisor parameters.
1303 for item, hv_name, hv_params in hvp_data:
1304 msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1307 hv_class = hypervisor.GetHypervisorClass(hv_name)
1308 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1309 hv_class.CheckParameterSyntax(hv_params)
1310 except errors.GenericError, err:
1311 self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1313 def ExpandNames(self):
1314 self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1315 self.share_locks = ShareAll()
1317 def CheckPrereq(self):
1318 """Check prerequisites.
1321 # Retrieve all information
1322 self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1323 self.all_node_info = self.cfg.GetAllNodesInfo()
1324 self.all_inst_info = self.cfg.GetAllInstancesInfo()
1326 def Exec(self, feedback_fn):
1327 """Verify integrity of cluster, performing various test on nodes.
1331 self._feedback_fn = feedback_fn
1333 feedback_fn("* Verifying cluster config")
1335 for msg in self.cfg.VerifyConfig():
1336 self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1338 feedback_fn("* Verifying cluster certificate files")
1340 for cert_filename in pathutils.ALL_CERT_FILES:
1341 (errcode, msg) = _VerifyCertificate(cert_filename)
1342 self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1344 feedback_fn("* Verifying hypervisor parameters")
1346 self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1347 self.all_inst_info.values()))
1349 feedback_fn("* Verifying all nodes belong to an existing group")
1351 # We do this verification here because, should this bogus circumstance
1352 # occur, it would never be caught by VerifyGroup, which only acts on
1353 # nodes/instances reachable from existing node groups.
1355 dangling_nodes = set(node.name for node in self.all_node_info.values()
1356 if node.group not in self.all_group_info)
1358 dangling_instances = {}
1359 no_node_instances = []
1361 for inst in self.all_inst_info.values():
1362 if inst.primary_node in dangling_nodes:
1363 dangling_instances.setdefault(inst.primary_node, []).append(inst.name)
1364 elif inst.primary_node not in self.all_node_info:
1365 no_node_instances.append(inst.name)
1370 utils.CommaJoin(dangling_instances.get(node.name,
1372 for node in dangling_nodes]
1374 self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1376 "the following nodes (and their instances) belong to a non"
1377 " existing group: %s", utils.CommaJoin(pretty_dangling))
1379 self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1381 "the following instances have a non-existing primary-node:"
1382 " %s", utils.CommaJoin(no_node_instances))
1387 class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1388 """Verifies the status of a node group.
1391 HPATH = "cluster-verify"
1392 HTYPE = constants.HTYPE_CLUSTER
1395 _HOOKS_INDENT_RE = re.compile("^", re.M)
1397 class NodeImage(object):
1398 """A class representing the logical and physical status of a node.
1401 @ivar name: the node name to which this object refers
1402 @ivar volumes: a structure as returned from
1403 L{ganeti.backend.GetVolumeList} (runtime)
1404 @ivar instances: a list of running instances (runtime)
1405 @ivar pinst: list of configured primary instances (config)
1406 @ivar sinst: list of configured secondary instances (config)
1407 @ivar sbp: dictionary of {primary-node: list of instances} for all
1408 instances for which this node is secondary (config)
1409 @ivar mfree: free memory, as reported by hypervisor (runtime)
1410 @ivar dfree: free disk, as reported by the node (runtime)
1411 @ivar offline: the offline status (config)
1412 @type rpc_fail: boolean
1413 @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1414 not whether the individual keys were correct) (runtime)
1415 @type lvm_fail: boolean
1416 @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1417 @type hyp_fail: boolean
1418 @ivar hyp_fail: whether the RPC call didn't return the instance list
1419 @type ghost: boolean
1420 @ivar ghost: whether this is a known node or not (config)
1421 @type os_fail: boolean
1422 @ivar os_fail: whether the RPC call didn't return valid OS data
1424 @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1425 @type vm_capable: boolean
1426 @ivar vm_capable: whether the node can host instances
1428 @ivar pv_min: size in MiB of the smallest PVs
1430 @ivar pv_max: size in MiB of the biggest PVs
1433 def __init__(self, offline=False, name=None, vm_capable=True):
1442 self.offline = offline
1443 self.vm_capable = vm_capable
1444 self.rpc_fail = False
1445 self.lvm_fail = False
1446 self.hyp_fail = False
1448 self.os_fail = False
1453 def ExpandNames(self):
1454 # This raises errors.OpPrereqError on its own:
1455 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1457 # Get instances in node group; this is unsafe and needs verification later
1459 self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1461 self.needed_locks = {
1462 locking.LEVEL_INSTANCE: inst_names,
1463 locking.LEVEL_NODEGROUP: [self.group_uuid],
1464 locking.LEVEL_NODE: [],
1466 # This opcode is run by watcher every five minutes and acquires all nodes
1467 # for a group. It doesn't run for a long time, so it's better to acquire
1468 # the node allocation lock as well.
1469 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1472 self.share_locks = ShareAll()
1474 def DeclareLocks(self, level):
1475 if level == locking.LEVEL_NODE:
1476 # Get members of node group; this is unsafe and needs verification later
1477 nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1479 all_inst_info = self.cfg.GetAllInstancesInfo()
1481 # In Exec(), we warn about mirrored instances that have primary and
1482 # secondary living in separate node groups. To fully verify that
1483 # volumes for these instances are healthy, we will need to do an
1484 # extra call to their secondaries. We ensure here those nodes will
1486 for inst in self.owned_locks(locking.LEVEL_INSTANCE):
1487 # Important: access only the instances whose lock is owned
1488 if all_inst_info[inst].disk_template in constants.DTS_INT_MIRROR:
1489 nodes.update(all_inst_info[inst].secondary_nodes)
1491 self.needed_locks[locking.LEVEL_NODE] = nodes
1493 def CheckPrereq(self):
1494 assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1495 self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1497 group_nodes = set(self.group_info.members)
1499 self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1502 group_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1504 unlocked_instances = \
1505 group_instances.difference(self.owned_locks(locking.LEVEL_INSTANCE))
1508 raise errors.OpPrereqError("Missing lock for nodes: %s" %
1509 utils.CommaJoin(unlocked_nodes),
1512 if unlocked_instances:
1513 raise errors.OpPrereqError("Missing lock for instances: %s" %
1514 utils.CommaJoin(unlocked_instances),
1517 self.all_node_info = self.cfg.GetAllNodesInfo()
1518 self.all_inst_info = self.cfg.GetAllInstancesInfo()
1520 self.my_node_names = utils.NiceSort(group_nodes)
1521 self.my_inst_names = utils.NiceSort(group_instances)
1523 self.my_node_info = dict((name, self.all_node_info[name])
1524 for name in self.my_node_names)
1526 self.my_inst_info = dict((name, self.all_inst_info[name])
1527 for name in self.my_inst_names)
1529 # We detect here the nodes that will need the extra RPC calls for verifying
1530 # split LV volumes; they should be locked.
1531 extra_lv_nodes = set()
1533 for inst in self.my_inst_info.values():
1534 if inst.disk_template in constants.DTS_INT_MIRROR:
1535 for nname in inst.all_nodes:
1536 if self.all_node_info[nname].group != self.group_uuid:
1537 extra_lv_nodes.add(nname)
1539 unlocked_lv_nodes = \
1540 extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1542 if unlocked_lv_nodes:
1543 raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1544 utils.CommaJoin(unlocked_lv_nodes),
1546 self.extra_lv_nodes = list(extra_lv_nodes)
1548 def _VerifyNode(self, ninfo, nresult):
1549 """Perform some basic validation on data returned from a node.
1551 - check the result data structure is well formed and has all the
1553 - check ganeti version
1555 @type ninfo: L{objects.Node}
1556 @param ninfo: the node to check
1557 @param nresult: the results from the node
1559 @return: whether overall this call was successful (and we can expect
1560 reasonable values in the respose)
1564 _ErrorIf = self._ErrorIf # pylint: disable=C0103
1566 # main result, nresult should be a non-empty dict
1567 test = not nresult or not isinstance(nresult, dict)
1568 _ErrorIf(test, constants.CV_ENODERPC, node,
1569 "unable to verify node: no data returned")
1573 # compares ganeti version
1574 local_version = constants.PROTOCOL_VERSION
1575 remote_version = nresult.get("version", None)
1576 test = not (remote_version and
1577 isinstance(remote_version, (list, tuple)) and
1578 len(remote_version) == 2)
1579 _ErrorIf(test, constants.CV_ENODERPC, node,
1580 "connection to node returned invalid data")
1584 test = local_version != remote_version[0]
1585 _ErrorIf(test, constants.CV_ENODEVERSION, node,
1586 "incompatible protocol versions: master %s,"
1587 " node %s", local_version, remote_version[0])
1591 # node seems compatible, we can actually try to look into its results
1593 # full package version
1594 self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1595 constants.CV_ENODEVERSION, node,
1596 "software version mismatch: master %s, node %s",
1597 constants.RELEASE_VERSION, remote_version[1],
1598 code=self.ETYPE_WARNING)
1600 hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1601 if ninfo.vm_capable and isinstance(hyp_result, dict):
1602 for hv_name, hv_result in hyp_result.iteritems():
1603 test = hv_result is not None
1604 _ErrorIf(test, constants.CV_ENODEHV, node,
1605 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1607 hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1608 if ninfo.vm_capable and isinstance(hvp_result, list):
1609 for item, hv_name, hv_result in hvp_result:
1610 _ErrorIf(True, constants.CV_ENODEHV, node,
1611 "hypervisor %s parameter verify failure (source %s): %s",
1612 hv_name, item, hv_result)
1614 test = nresult.get(constants.NV_NODESETUP,
1615 ["Missing NODESETUP results"])
1616 _ErrorIf(test, constants.CV_ENODESETUP, node, "node setup error: %s",
1621 def _VerifyNodeTime(self, ninfo, nresult,
1622 nvinfo_starttime, nvinfo_endtime):
1623 """Check the node time.
1625 @type ninfo: L{objects.Node}
1626 @param ninfo: the node to check
1627 @param nresult: the remote results for the node
1628 @param nvinfo_starttime: the start time of the RPC call
1629 @param nvinfo_endtime: the end time of the RPC call
1633 _ErrorIf = self._ErrorIf # pylint: disable=C0103
1635 ntime = nresult.get(constants.NV_TIME, None)
1637 ntime_merged = utils.MergeTime(ntime)
1638 except (ValueError, TypeError):
1639 _ErrorIf(True, constants.CV_ENODETIME, node, "Node returned invalid time")
1642 if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1643 ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1644 elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1645 ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1649 _ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, node,
1650 "Node time diverges by at least %s from master node time",
1653 def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1654 """Check the node LVM results and update info for cross-node checks.
1656 @type ninfo: L{objects.Node}
1657 @param ninfo: the node to check
1658 @param nresult: the remote results for the node
1659 @param vg_name: the configured VG name
1660 @type nimg: L{NodeImage}
1661 @param nimg: node image
1668 _ErrorIf = self._ErrorIf # pylint: disable=C0103
1670 # checks vg existence and size > 20G
1671 vglist = nresult.get(constants.NV_VGLIST, None)
1673 _ErrorIf(test, constants.CV_ENODELVM, node, "unable to check volume groups")
1675 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1676 constants.MIN_VG_SIZE)
1677 _ErrorIf(vgstatus, constants.CV_ENODELVM, node, vgstatus)
1680 (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1682 self._Error(constants.CV_ENODELVM, node, em)
1683 if pvminmax is not None:
1684 (nimg.pv_min, nimg.pv_max) = pvminmax
1686 def _VerifyGroupDRBDVersion(self, node_verify_infos):
1687 """Check cross-node DRBD version consistency.
1689 @type node_verify_infos: dict
1690 @param node_verify_infos: infos about nodes as returned from the
1695 for node, ndata in node_verify_infos.items():
1696 nresult = ndata.payload
1697 version = nresult.get(constants.NV_DRBDVERSION, "Missing DRBD version")
1698 node_versions[node] = version
1700 if len(set(node_versions.values())) > 1:
1701 for node, version in sorted(node_versions.items()):
1702 msg = "DRBD version mismatch: %s" % version
1703 self._Error(constants.CV_ENODEDRBDHELPER, node, msg,
1704 code=self.ETYPE_WARNING)
1706 def _VerifyGroupLVM(self, node_image, vg_name):
1707 """Check cross-node consistency in LVM.
1709 @type node_image: dict
1710 @param node_image: info about nodes, mapping from node to names to
1711 L{NodeImage} objects
1712 @param vg_name: the configured VG name
1718 # Only exlcusive storage needs this kind of checks
1719 if not self._exclusive_storage:
1722 # exclusive_storage wants all PVs to have the same size (approximately),
1723 # if the smallest and the biggest ones are okay, everything is fine.
1724 # pv_min is None iff pv_max is None
1725 vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1728 (pvmin, minnode) = min((ni.pv_min, ni.name) for ni in vals)
1729 (pvmax, maxnode) = max((ni.pv_max, ni.name) for ni in vals)
1730 bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1731 self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1732 "PV sizes differ too much in the group; smallest (%s MB) is"
1733 " on %s, biggest (%s MB) is on %s",
1734 pvmin, minnode, pvmax, maxnode)
1736 def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1737 """Check the node bridges.
1739 @type ninfo: L{objects.Node}
1740 @param ninfo: the node to check
1741 @param nresult: the remote results for the node
1742 @param bridges: the expected list of bridges
1749 _ErrorIf = self._ErrorIf # pylint: disable=C0103
1751 missing = nresult.get(constants.NV_BRIDGES, None)
1752 test = not isinstance(missing, list)
1753 _ErrorIf(test, constants.CV_ENODENET, node,
1754 "did not return valid bridge information")
1756 _ErrorIf(bool(missing), constants.CV_ENODENET, node,
1757 "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1759 def _VerifyNodeUserScripts(self, ninfo, nresult):
1760 """Check the results of user scripts presence and executability on the node
1762 @type ninfo: L{objects.Node}
1763 @param ninfo: the node to check
1764 @param nresult: the remote results for the node
1769 test = not constants.NV_USERSCRIPTS in nresult
1770 self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, node,
1771 "did not return user scripts information")
1773 broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
1775 self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, node,
1776 "user scripts not present or not executable: %s" %
1777 utils.CommaJoin(sorted(broken_scripts)))
1779 def _VerifyNodeNetwork(self, ninfo, nresult):
1780 """Check the node network connectivity results.
1782 @type ninfo: L{objects.Node}
1783 @param ninfo: the node to check
1784 @param nresult: the remote results for the node
1788 _ErrorIf = self._ErrorIf # pylint: disable=C0103
1790 test = constants.NV_NODELIST not in nresult
1791 _ErrorIf(test, constants.CV_ENODESSH, node,
1792 "node hasn't returned node ssh connectivity data")
1794 if nresult[constants.NV_NODELIST]:
1795 for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1796 _ErrorIf(True, constants.CV_ENODESSH, node,
1797 "ssh communication with node '%s': %s", a_node, a_msg)
1799 test = constants.NV_NODENETTEST not in nresult
1800 _ErrorIf(test, constants.CV_ENODENET, node,
1801 "node hasn't returned node tcp connectivity data")
1803 if nresult[constants.NV_NODENETTEST]:
1804 nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1806 _ErrorIf(True, constants.CV_ENODENET, node,
1807 "tcp communication with node '%s': %s",
1808 anode, nresult[constants.NV_NODENETTEST][anode])
1810 test = constants.NV_MASTERIP not in nresult
1811 _ErrorIf(test, constants.CV_ENODENET, node,
1812 "node hasn't returned node master IP reachability data")
1814 if not nresult[constants.NV_MASTERIP]:
1815 if node == self.master_node:
1816 msg = "the master node cannot reach the master IP (not configured?)"
1818 msg = "cannot reach the master IP"
1819 _ErrorIf(True, constants.CV_ENODENET, node, msg)
1821 def _VerifyInstance(self, instance, inst_config, node_image,
1823 """Verify an instance.
1825 This function checks to see if the required block devices are
1826 available on the instance's node, and that the nodes are in the correct
1830 _ErrorIf = self._ErrorIf # pylint: disable=C0103
1831 pnode = inst_config.primary_node
1832 pnode_img = node_image[pnode]
1833 groupinfo = self.cfg.GetAllNodeGroupsInfo()
1835 node_vol_should = {}
1836 inst_config.MapLVsByNode(node_vol_should)
1838 cluster = self.cfg.GetClusterInfo()
1839 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
1841 err = ComputeIPolicyInstanceViolation(ipolicy, inst_config, self.cfg)
1842 _ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance, utils.CommaJoin(err),
1843 code=self.ETYPE_WARNING)
1845 for node in node_vol_should:
1846 n_img = node_image[node]
1847 if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1848 # ignore missing volumes on offline or broken nodes
1850 for volume in node_vol_should[node]:
1851 test = volume not in n_img.volumes
1852 _ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance,
1853 "volume %s missing on node %s", volume, node)
1855 if inst_config.admin_state == constants.ADMINST_UP:
1856 test = instance not in pnode_img.instances and not pnode_img.offline
1857 _ErrorIf(test, constants.CV_EINSTANCEDOWN, instance,
1858 "instance not running on its primary node %s",
1860 _ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE, instance,
1861 "instance is marked as running and lives on offline node %s",
1864 diskdata = [(nname, success, status, idx)
1865 for (nname, disks) in diskstatus.items()
1866 for idx, (success, status) in enumerate(disks)]
1868 for nname, success, bdev_status, idx in diskdata:
1869 # the 'ghost node' construction in Exec() ensures that we have a
1871 snode = node_image[nname]
1872 bad_snode = snode.ghost or snode.offline
1873 _ErrorIf(inst_config.disks_active and
1874 not success and not bad_snode,
1875 constants.CV_EINSTANCEFAULTYDISK, instance,
1876 "couldn't retrieve status for disk/%s on %s: %s",
1877 idx, nname, bdev_status)
1878 _ErrorIf((inst_config.disks_active and
1879 success and bdev_status.ldisk_status == constants.LDS_FAULTY),
1880 constants.CV_EINSTANCEFAULTYDISK, instance,
1881 "disk/%s on %s is faulty", idx, nname)
1883 _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
1884 constants.CV_ENODERPC, pnode, "instance %s, connection to"
1885 " primary node failed", instance)
1887 _ErrorIf(len(inst_config.secondary_nodes) > 1,
1888 constants.CV_EINSTANCELAYOUT,
1889 instance, "instance has multiple secondary nodes: %s",
1890 utils.CommaJoin(inst_config.secondary_nodes),
1891 code=self.ETYPE_WARNING)
1893 es_flags = rpc.GetExclusiveStorageForNodeNames(self.cfg,
1894 inst_config.all_nodes)
1895 if any(es_flags.values()):
1896 if inst_config.disk_template not in constants.DTS_EXCL_STORAGE:
1897 # Disk template not compatible with exclusive_storage: no instance
1898 # node should have the flag set
1900 for (n, es) in es_flags.items()
1902 self._Error(constants.CV_EINSTANCEUNSUITABLENODE, instance,
1903 "instance has template %s, which is not supported on nodes"
1904 " that have exclusive storage set: %s",
1905 inst_config.disk_template, utils.CommaJoin(es_nodes))
1906 for (idx, disk) in enumerate(inst_config.disks):
1907 _ErrorIf(disk.spindles is None,
1908 constants.CV_EINSTANCEMISSINGCFGPARAMETER, instance,
1909 "number of spindles not configured for disk %s while"
1910 " exclusive storage is enabled, try running"
1911 " gnt-cluster repair-disk-sizes",
1914 if inst_config.disk_template in constants.DTS_INT_MIRROR:
1915 instance_nodes = utils.NiceSort(inst_config.all_nodes)
1916 instance_groups = {}
1918 for node in instance_nodes:
1919 instance_groups.setdefault(self.all_node_info[node].group,
1923 "%s (group %s)" % (utils.CommaJoin(nodes), groupinfo[group].name)
1924 # Sort so that we always list the primary node first.
1925 for group, nodes in sorted(instance_groups.items(),
1926 key=lambda (_, nodes): pnode in nodes,
1929 self._ErrorIf(len(instance_groups) > 1,
1930 constants.CV_EINSTANCESPLITGROUPS,
1931 instance, "instance has primary and secondary nodes in"
1932 " different groups: %s", utils.CommaJoin(pretty_list),
1933 code=self.ETYPE_WARNING)
1935 inst_nodes_offline = []
1936 for snode in inst_config.secondary_nodes:
1937 s_img = node_image[snode]
1938 _ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
1939 snode, "instance %s, connection to secondary node failed",
1943 inst_nodes_offline.append(snode)
1945 # warn that the instance lives on offline nodes
1946 _ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE, instance,
1947 "instance has offline secondary node(s) %s",
1948 utils.CommaJoin(inst_nodes_offline))
1949 # ... or ghost/non-vm_capable nodes
1950 for node in inst_config.all_nodes:
1951 _ErrorIf(node_image[node].ghost, constants.CV_EINSTANCEBADNODE,
1952 instance, "instance lives on ghost node %s", node)
1953 _ErrorIf(not node_image[node].vm_capable, constants.CV_EINSTANCEBADNODE,
1954 instance, "instance lives on non-vm_capable node %s", node)
1956 def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
1957 """Verify if there are any unknown volumes in the cluster.
1959 The .os, .swap and backup volumes are ignored. All other volumes are
1960 reported as unknown.
1962 @type reserved: L{ganeti.utils.FieldSet}
1963 @param reserved: a FieldSet of reserved volume names
1966 for node, n_img in node_image.items():
1967 if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
1968 self.all_node_info[node].group != self.group_uuid):
1969 # skip non-healthy nodes
1971 for volume in n_img.volumes:
1972 test = ((node not in node_vol_should or
1973 volume not in node_vol_should[node]) and
1974 not reserved.Matches(volume))
1975 self._ErrorIf(test, constants.CV_ENODEORPHANLV, node,
1976 "volume %s is unknown", volume)
1978 def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
1979 """Verify N+1 Memory Resilience.
1981 Check that if one single node dies we can still start all the
1982 instances it was primary for.
1985 cluster_info = self.cfg.GetClusterInfo()
1986 for node, n_img in node_image.items():
1987 # This code checks that every node which is now listed as
1988 # secondary has enough memory to host all instances it is
1989 # supposed to should a single other node in the cluster fail.
1990 # FIXME: not ready for failover to an arbitrary node
1991 # FIXME: does not support file-backed instances
1992 # WARNING: we currently take into account down instances as well
1993 # as up ones, considering that even if they're down someone
1994 # might want to start them even in the event of a node failure.
1995 if n_img.offline or self.all_node_info[node].group != self.group_uuid:
1996 # we're skipping nodes marked offline and nodes in other groups from
1997 # the N+1 warning, since most likely we don't have good memory
1998 # infromation from them; we already list instances living on such
1999 # nodes, and that's enough warning
2001 #TODO(dynmem): also consider ballooning out other instances
2002 for prinode, instances in n_img.sbp.items():
2004 for instance in instances:
2005 bep = cluster_info.FillBE(instance_cfg[instance])
2006 if bep[constants.BE_AUTO_BALANCE]:
2007 needed_mem += bep[constants.BE_MINMEM]
2008 test = n_img.mfree < needed_mem
2009 self._ErrorIf(test, constants.CV_ENODEN1, node,
2010 "not enough memory to accomodate instance failovers"
2011 " should node %s fail (%dMiB needed, %dMiB available)",
2012 prinode, needed_mem, n_img.mfree)
2015 def _VerifyFiles(cls, errorif, nodeinfo, master_node, all_nvinfo,
2016 (files_all, files_opt, files_mc, files_vm)):
2017 """Verifies file checksums collected from all nodes.
2019 @param errorif: Callback for reporting errors
2020 @param nodeinfo: List of L{objects.Node} objects
2021 @param master_node: Name of master node
2022 @param all_nvinfo: RPC results
2025 # Define functions determining which nodes to consider for a file
2028 (files_mc, lambda node: (node.master_candidate or
2029 node.name == master_node)),
2030 (files_vm, lambda node: node.vm_capable),
2033 # Build mapping from filename to list of nodes which should have the file
2035 for (files, fn) in files2nodefn:
2037 filenodes = nodeinfo
2039 filenodes = filter(fn, nodeinfo)
2040 nodefiles.update((filename,
2041 frozenset(map(operator.attrgetter("name"), filenodes)))
2042 for filename in files)
2044 assert set(nodefiles) == (files_all | files_mc | files_vm)
2046 fileinfo = dict((filename, {}) for filename in nodefiles)
2047 ignore_nodes = set()
2049 for node in nodeinfo:
2051 ignore_nodes.add(node.name)
2054 nresult = all_nvinfo[node.name]
2056 if nresult.fail_msg or not nresult.payload:
2059 fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
2060 node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2061 for (key, value) in fingerprints.items())
2064 test = not (node_files and isinstance(node_files, dict))
2065 errorif(test, constants.CV_ENODEFILECHECK, node.name,
2066 "Node did not return file checksum data")
2068 ignore_nodes.add(node.name)
2071 # Build per-checksum mapping from filename to nodes having it
2072 for (filename, checksum) in node_files.items():
2073 assert filename in nodefiles
2074 fileinfo[filename].setdefault(checksum, set()).add(node.name)
2076 for (filename, checksums) in fileinfo.items():
2077 assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2079 # Nodes having the file
2080 with_file = frozenset(node_name
2081 for nodes in fileinfo[filename].values()
2082 for node_name in nodes) - ignore_nodes
2084 expected_nodes = nodefiles[filename] - ignore_nodes
2086 # Nodes missing file
2087 missing_file = expected_nodes - with_file
2089 if filename in files_opt:
2091 errorif(missing_file and missing_file != expected_nodes,
2092 constants.CV_ECLUSTERFILECHECK, None,
2093 "File %s is optional, but it must exist on all or no"
2094 " nodes (not found on %s)",
2095 filename, utils.CommaJoin(utils.NiceSort(missing_file)))
2097 errorif(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2098 "File %s is missing from node(s) %s", filename,
2099 utils.CommaJoin(utils.NiceSort(missing_file)))
2101 # Warn if a node has a file it shouldn't
2102 unexpected = with_file - expected_nodes
2104 constants.CV_ECLUSTERFILECHECK, None,
2105 "File %s should not exist on node(s) %s",
2106 filename, utils.CommaJoin(utils.NiceSort(unexpected)))
2108 # See if there are multiple versions of the file
2109 test = len(checksums) > 1
2111 variants = ["variant %s on %s" %
2112 (idx + 1, utils.CommaJoin(utils.NiceSort(nodes)))
2113 for (idx, (checksum, nodes)) in
2114 enumerate(sorted(checksums.items()))]
2118 errorif(test, constants.CV_ECLUSTERFILECHECK, None,
2119 "File %s found with %s different checksums (%s)",
2120 filename, len(checksums), "; ".join(variants))
2122 def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2124 """Verifies and the node DRBD status.
2126 @type ninfo: L{objects.Node}
2127 @param ninfo: the node to check
2128 @param nresult: the remote results for the node
2129 @param instanceinfo: the dict of instances
2130 @param drbd_helper: the configured DRBD usermode helper
2131 @param drbd_map: the DRBD map as returned by
2132 L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2136 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2139 helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2140 test = (helper_result is None)
2141 _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2142 "no drbd usermode helper returned")
2144 status, payload = helper_result
2146 _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2147 "drbd usermode helper check unsuccessful: %s", payload)
2148 test = status and (payload != drbd_helper)
2149 _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2150 "wrong drbd usermode helper: %s", payload)
2152 # compute the DRBD minors
2154 for minor, instance in drbd_map[node].items():
2155 test = instance not in instanceinfo
2156 _ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2157 "ghost instance '%s' in temporary DRBD map", instance)
2158 # ghost instance should not be running, but otherwise we
2159 # don't give double warnings (both ghost instance and
2160 # unallocated minor in use)
2162 node_drbd[minor] = (instance, False)
2164 instance = instanceinfo[instance]
2165 node_drbd[minor] = (instance.name, instance.disks_active)
2167 # and now check them
2168 used_minors = nresult.get(constants.NV_DRBDLIST, [])
2169 test = not isinstance(used_minors, (tuple, list))
2170 _ErrorIf(test, constants.CV_ENODEDRBD, node,
2171 "cannot parse drbd status file: %s", str(used_minors))
2173 # we cannot check drbd status
2176 for minor, (iname, must_exist) in node_drbd.items():
2177 test = minor not in used_minors and must_exist
2178 _ErrorIf(test, constants.CV_ENODEDRBD, node,
2179 "drbd minor %d of instance %s is not active", minor, iname)
2180 for minor in used_minors:
2181 test = minor not in node_drbd
2182 _ErrorIf(test, constants.CV_ENODEDRBD, node,
2183 "unallocated drbd minor %d is in use", minor)
2185 def _UpdateNodeOS(self, ninfo, nresult, nimg):
2186 """Builds the node OS structures.
2188 @type ninfo: L{objects.Node}
2189 @param ninfo: the node to check
2190 @param nresult: the remote results for the node
2191 @param nimg: the node image object
2195 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2197 remote_os = nresult.get(constants.NV_OSLIST, None)
2198 test = (not isinstance(remote_os, list) or
2199 not compat.all(isinstance(v, list) and len(v) == 7
2200 for v in remote_os))
2202 _ErrorIf(test, constants.CV_ENODEOS, node,
2203 "node hasn't returned valid OS data")
2212 for (name, os_path, status, diagnose,
2213 variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2215 if name not in os_dict:
2218 # parameters is a list of lists instead of list of tuples due to
2219 # JSON lacking a real tuple type, fix it:
2220 parameters = [tuple(v) for v in parameters]
2221 os_dict[name].append((os_path, status, diagnose,
2222 set(variants), set(parameters), set(api_ver)))
2224 nimg.oslist = os_dict
2226 def _VerifyNodeOS(self, ninfo, nimg, base):
2227 """Verifies the node OS list.
2229 @type ninfo: L{objects.Node}
2230 @param ninfo: the node to check
2231 @param nimg: the node image object
2232 @param base: the 'template' node we match against (e.g. from the master)
2236 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2238 assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2240 beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2241 for os_name, os_data in nimg.oslist.items():
2242 assert os_data, "Empty OS status for OS %s?!" % os_name
2243 f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2244 _ErrorIf(not f_status, constants.CV_ENODEOS, node,
2245 "Invalid OS %s (located at %s): %s", os_name, f_path, f_diag)
2246 _ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, node,
2247 "OS '%s' has multiple entries (first one shadows the rest): %s",
2248 os_name, utils.CommaJoin([v[0] for v in os_data]))
2249 # comparisons with the 'base' image
2250 test = os_name not in base.oslist
2251 _ErrorIf(test, constants.CV_ENODEOS, node,
2252 "Extra OS %s not present on reference node (%s)",
2256 assert base.oslist[os_name], "Base node has empty OS status?"
2257 _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2259 # base OS is invalid, skipping
2261 for kind, a, b in [("API version", f_api, b_api),
2262 ("variants list", f_var, b_var),
2263 ("parameters", beautify_params(f_param),
2264 beautify_params(b_param))]:
2265 _ErrorIf(a != b, constants.CV_ENODEOS, node,
2266 "OS %s for %s differs from reference node %s: [%s] vs. [%s]",
2267 kind, os_name, base.name,
2268 utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2270 # check any missing OSes
2271 missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2272 _ErrorIf(missing, constants.CV_ENODEOS, node,
2273 "OSes present on reference node %s but missing on this node: %s",
2274 base.name, utils.CommaJoin(missing))
2276 def _VerifyFileStoragePaths(self, ninfo, nresult, is_master):
2277 """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2279 @type ninfo: L{objects.Node}
2280 @param ninfo: the node to check
2281 @param nresult: the remote results for the node
2282 @type is_master: bool
2283 @param is_master: Whether node is the master node
2289 (constants.ENABLE_FILE_STORAGE or
2290 constants.ENABLE_SHARED_FILE_STORAGE)):
2292 fspaths = nresult[constants.NV_FILE_STORAGE_PATHS]
2294 # This should never happen
2295 self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, node,
2296 "Node did not return forbidden file storage paths")
2298 self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, node,
2299 "Found forbidden file storage paths: %s",
2300 utils.CommaJoin(fspaths))
2302 self._ErrorIf(constants.NV_FILE_STORAGE_PATHS in nresult,
2303 constants.CV_ENODEFILESTORAGEPATHS, node,
2304 "Node should not have returned forbidden file storage"
2307 def _VerifyOob(self, ninfo, nresult):
2308 """Verifies out of band functionality of a node.
2310 @type ninfo: L{objects.Node}
2311 @param ninfo: the node to check
2312 @param nresult: the remote results for the node
2316 # We just have to verify the paths on master and/or master candidates
2317 # as the oob helper is invoked on the master
2318 if ((ninfo.master_candidate or ninfo.master_capable) and
2319 constants.NV_OOB_PATHS in nresult):
2320 for path_result in nresult[constants.NV_OOB_PATHS]:
2321 self._ErrorIf(path_result, constants.CV_ENODEOOBPATH, node, path_result)
2323 def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2324 """Verifies and updates the node volume data.
2326 This function will update a L{NodeImage}'s internal structures
2327 with data from the remote call.
2329 @type ninfo: L{objects.Node}
2330 @param ninfo: the node to check
2331 @param nresult: the remote results for the node
2332 @param nimg: the node image object
2333 @param vg_name: the configured VG name
2337 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2339 nimg.lvm_fail = True
2340 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2343 elif isinstance(lvdata, basestring):
2344 _ErrorIf(True, constants.CV_ENODELVM, node, "LVM problem on node: %s",
2345 utils.SafeEncode(lvdata))
2346 elif not isinstance(lvdata, dict):
2347 _ErrorIf(True, constants.CV_ENODELVM, node,
2348 "rpc call to node failed (lvlist)")
2350 nimg.volumes = lvdata
2351 nimg.lvm_fail = False
2353 def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2354 """Verifies and updates the node instance list.
2356 If the listing was successful, then updates this node's instance
2357 list. Otherwise, it marks the RPC call as failed for the instance
2360 @type ninfo: L{objects.Node}
2361 @param ninfo: the node to check
2362 @param nresult: the remote results for the node
2363 @param nimg: the node image object
2366 idata = nresult.get(constants.NV_INSTANCELIST, None)
2367 test = not isinstance(idata, list)
2368 self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2369 "rpc call to node failed (instancelist): %s",
2370 utils.SafeEncode(str(idata)))
2372 nimg.hyp_fail = True
2374 nimg.instances = idata
2376 def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2377 """Verifies and computes a node information map
2379 @type ninfo: L{objects.Node}
2380 @param ninfo: the node to check
2381 @param nresult: the remote results for the node
2382 @param nimg: the node image object
2383 @param vg_name: the configured VG name
2387 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2389 # try to read free memory (from the hypervisor)
2390 hv_info = nresult.get(constants.NV_HVINFO, None)
2391 test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2392 _ErrorIf(test, constants.CV_ENODEHV, node,
2393 "rpc call to node failed (hvinfo)")
2396 nimg.mfree = int(hv_info["memory_free"])
2397 except (ValueError, TypeError):
2398 _ErrorIf(True, constants.CV_ENODERPC, node,
2399 "node returned invalid nodeinfo, check hypervisor")
2401 # FIXME: devise a free space model for file based instances as well
2402 if vg_name is not None:
2403 test = (constants.NV_VGLIST not in nresult or
2404 vg_name not in nresult[constants.NV_VGLIST])
2405 _ErrorIf(test, constants.CV_ENODELVM, node,
2406 "node didn't return data for the volume group '%s'"
2407 " - it is either missing or broken", vg_name)
2410 nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2411 except (ValueError, TypeError):
2412 _ErrorIf(True, constants.CV_ENODERPC, node,
2413 "node returned invalid LVM info, check LVM status")
2415 def _CollectDiskInfo(self, nodelist, node_image, instanceinfo):
2416 """Gets per-disk status information for all instances.
2418 @type nodelist: list of strings
2419 @param nodelist: Node names
2420 @type node_image: dict of (name, L{objects.Node})
2421 @param node_image: Node objects
2422 @type instanceinfo: dict of (name, L{objects.Instance})
2423 @param instanceinfo: Instance objects
2424 @rtype: {instance: {node: [(succes, payload)]}}
2425 @return: a dictionary of per-instance dictionaries with nodes as
2426 keys and disk information as values; the disk information is a
2427 list of tuples (success, payload)
2430 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2433 node_disks_devonly = {}
2434 diskless_instances = set()
2435 diskless = constants.DT_DISKLESS
2437 for nname in nodelist:
2438 node_instances = list(itertools.chain(node_image[nname].pinst,
2439 node_image[nname].sinst))
2440 diskless_instances.update(inst for inst in node_instances
2441 if instanceinfo[inst].disk_template == diskless)
2442 disks = [(inst, disk)
2443 for inst in node_instances
2444 for disk in instanceinfo[inst].disks]
2447 # No need to collect data
2450 node_disks[nname] = disks
2452 # _AnnotateDiskParams makes already copies of the disks
2454 for (inst, dev) in disks:
2455 (anno_disk,) = AnnotateDiskParams(instanceinfo[inst], [dev], self.cfg)
2456 self.cfg.SetDiskID(anno_disk, nname)
2457 devonly.append(anno_disk)
2459 node_disks_devonly[nname] = devonly
2461 assert len(node_disks) == len(node_disks_devonly)
2463 # Collect data from all nodes with disks
2464 result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2467 assert len(result) == len(node_disks)
2471 for (nname, nres) in result.items():
2472 disks = node_disks[nname]
2475 # No data from this node
2476 data = len(disks) * [(False, "node offline")]
2479 _ErrorIf(msg, constants.CV_ENODERPC, nname,
2480 "while getting disk information: %s", msg)
2482 # No data from this node
2483 data = len(disks) * [(False, msg)]
2486 for idx, i in enumerate(nres.payload):
2487 if isinstance(i, (tuple, list)) and len(i) == 2:
2490 logging.warning("Invalid result from node %s, entry %d: %s",
2492 data.append((False, "Invalid result from the remote node"))
2494 for ((inst, _), status) in zip(disks, data):
2495 instdisk.setdefault(inst, {}).setdefault(nname, []).append(status)
2497 # Add empty entries for diskless instances.
2498 for inst in diskless_instances:
2499 assert inst not in instdisk
2502 assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2503 len(nnames) <= len(instanceinfo[inst].all_nodes) and
2504 compat.all(isinstance(s, (tuple, list)) and
2505 len(s) == 2 for s in statuses)
2506 for inst, nnames in instdisk.items()
2507 for nname, statuses in nnames.items())
2509 instdisk_keys = set(instdisk)
2510 instanceinfo_keys = set(instanceinfo)
2511 assert instdisk_keys == instanceinfo_keys, \
2512 ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2513 (instdisk_keys, instanceinfo_keys))
2518 def _SshNodeSelector(group_uuid, all_nodes):
2519 """Create endless iterators for all potential SSH check hosts.
2522 nodes = [node for node in all_nodes
2523 if (node.group != group_uuid and
2525 keyfunc = operator.attrgetter("group")
2527 return map(itertools.cycle,
2528 [sorted(map(operator.attrgetter("name"), names))
2529 for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2533 def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2534 """Choose which nodes should talk to which other nodes.
2536 We will make nodes contact all nodes in their group, and one node from
2539 @warning: This algorithm has a known issue if one node group is much
2540 smaller than others (e.g. just one node). In such a case all other
2541 nodes will talk to the single node.
2544 online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2545 sel = cls._SshNodeSelector(group_uuid, all_nodes)
2547 return (online_nodes,
2548 dict((name, sorted([i.next() for i in sel]))
2549 for name in online_nodes))
2551 def BuildHooksEnv(self):
2554 Cluster-Verify hooks just ran in the post phase and their failure makes
2555 the output be logged in the verify output and the verification to fail.
2559 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2562 env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2563 for node in self.my_node_info.values())
2567 def BuildHooksNodes(self):
2568 """Build hooks nodes.
2571 return ([], self.my_node_names)
2573 def Exec(self, feedback_fn):
2574 """Verify integrity of the node group, performing various test on nodes.
2577 # This method has too many local variables. pylint: disable=R0914
2578 feedback_fn("* Verifying group '%s'" % self.group_info.name)
2580 if not self.my_node_names:
2582 feedback_fn("* Empty node group, skipping verification")
2586 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2587 verbose = self.op.verbose
2588 self._feedback_fn = feedback_fn
2590 vg_name = self.cfg.GetVGName()
2591 drbd_helper = self.cfg.GetDRBDHelper()
2592 cluster = self.cfg.GetClusterInfo()
2593 hypervisors = cluster.enabled_hypervisors
2594 node_data_list = [self.my_node_info[name] for name in self.my_node_names]
2596 i_non_redundant = [] # Non redundant instances
2597 i_non_a_balanced = [] # Non auto-balanced instances
2598 i_offline = 0 # Count of offline instances
2599 n_offline = 0 # Count of offline nodes
2600 n_drained = 0 # Count of nodes being drained
2601 node_vol_should = {}
2603 # FIXME: verify OS list
2606 filemap = ComputeAncillaryFiles(cluster, False)
2608 # do local checksums
2609 master_node = self.master_node = self.cfg.GetMasterNode()
2610 master_ip = self.cfg.GetMasterIP()
2612 feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_names))
2615 if self.cfg.GetUseExternalMipScript():
2616 user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2618 node_verify_param = {
2619 constants.NV_FILELIST:
2620 map(vcluster.MakeVirtualPath,
2621 utils.UniqueSequence(filename
2622 for files in filemap
2623 for filename in files)),
2624 constants.NV_NODELIST:
2625 self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2626 self.all_node_info.values()),
2627 constants.NV_HYPERVISOR: hypervisors,
2628 constants.NV_HVPARAMS:
2629 _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2630 constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2631 for node in node_data_list
2632 if not node.offline],
2633 constants.NV_INSTANCELIST: hypervisors,
2634 constants.NV_VERSION: None,
2635 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2636 constants.NV_NODESETUP: None,
2637 constants.NV_TIME: None,
2638 constants.NV_MASTERIP: (master_node, master_ip),
2639 constants.NV_OSLIST: None,
2640 constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2641 constants.NV_USERSCRIPTS: user_scripts,
2644 if vg_name is not None:
2645 node_verify_param[constants.NV_VGLIST] = None
2646 node_verify_param[constants.NV_LVLIST] = vg_name
2647 node_verify_param[constants.NV_PVLIST] = [vg_name]
2650 node_verify_param[constants.NV_DRBDVERSION] = None
2651 node_verify_param[constants.NV_DRBDLIST] = None
2652 node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2654 if constants.ENABLE_FILE_STORAGE or constants.ENABLE_SHARED_FILE_STORAGE:
2655 # Load file storage paths only from master node
2656 node_verify_param[constants.NV_FILE_STORAGE_PATHS] = master_node
2659 # FIXME: this needs to be changed per node-group, not cluster-wide
2661 default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2662 if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2663 bridges.add(default_nicpp[constants.NIC_LINK])
2664 for instance in self.my_inst_info.values():
2665 for nic in instance.nics:
2666 full_nic = cluster.SimpleFillNIC(nic.nicparams)
2667 if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2668 bridges.add(full_nic[constants.NIC_LINK])
2671 node_verify_param[constants.NV_BRIDGES] = list(bridges)
2673 # Build our expected cluster state
2674 node_image = dict((node.name, self.NodeImage(offline=node.offline,
2676 vm_capable=node.vm_capable))
2677 for node in node_data_list)
2681 for node in self.all_node_info.values():
2682 path = SupportsOob(self.cfg, node)
2683 if path and path not in oob_paths:
2684 oob_paths.append(path)
2687 node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2689 for instance in self.my_inst_names:
2690 inst_config = self.my_inst_info[instance]
2691 if inst_config.admin_state == constants.ADMINST_OFFLINE:
2694 for nname in inst_config.all_nodes:
2695 if nname not in node_image:
2696 gnode = self.NodeImage(name=nname)
2697 gnode.ghost = (nname not in self.all_node_info)
2698 node_image[nname] = gnode
2700 inst_config.MapLVsByNode(node_vol_should)
2702 pnode = inst_config.primary_node
2703 node_image[pnode].pinst.append(instance)
2705 for snode in inst_config.secondary_nodes:
2706 nimg = node_image[snode]
2707 nimg.sinst.append(instance)
2708 if pnode not in nimg.sbp:
2709 nimg.sbp[pnode] = []
2710 nimg.sbp[pnode].append(instance)
2712 es_flags = rpc.GetExclusiveStorageForNodeNames(self.cfg, self.my_node_names)
2713 # The value of exclusive_storage should be the same across the group, so if
2714 # it's True for at least a node, we act as if it were set for all the nodes
2715 self._exclusive_storage = compat.any(es_flags.values())
2716 if self._exclusive_storage:
2717 node_verify_param[constants.NV_EXCLUSIVEPVS] = True
2719 # At this point, we have the in-memory data structures complete,
2720 # except for the runtime information, which we'll gather next
2722 # Due to the way our RPC system works, exact response times cannot be
2723 # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2724 # time before and after executing the request, we can at least have a time
2726 nvinfo_starttime = time.time()
2727 all_nvinfo = self.rpc.call_node_verify(self.my_node_names,
2729 self.cfg.GetClusterName())
2730 nvinfo_endtime = time.time()
2732 if self.extra_lv_nodes and vg_name is not None:
2734 self.rpc.call_node_verify(self.extra_lv_nodes,
2735 {constants.NV_LVLIST: vg_name},
2736 self.cfg.GetClusterName())
2738 extra_lv_nvinfo = {}
2740 all_drbd_map = self.cfg.ComputeDRBDMap()
2742 feedback_fn("* Gathering disk information (%s nodes)" %
2743 len(self.my_node_names))
2744 instdisk = self._CollectDiskInfo(self.my_node_names, node_image,
2747 feedback_fn("* Verifying configuration file consistency")
2749 # If not all nodes are being checked, we need to make sure the master node
2750 # and a non-checked vm_capable node are in the list.
2751 absent_nodes = set(self.all_node_info).difference(self.my_node_info)
2753 vf_nvinfo = all_nvinfo.copy()
2754 vf_node_info = list(self.my_node_info.values())
2755 additional_nodes = []
2756 if master_node not in self.my_node_info:
2757 additional_nodes.append(master_node)
2758 vf_node_info.append(self.all_node_info[master_node])
2759 # Add the first vm_capable node we find which is not included,
2760 # excluding the master node (which we already have)
2761 for node in absent_nodes:
2762 nodeinfo = self.all_node_info[node]
2763 if (nodeinfo.vm_capable and not nodeinfo.offline and
2764 node != master_node):
2765 additional_nodes.append(node)
2766 vf_node_info.append(self.all_node_info[node])
2768 key = constants.NV_FILELIST
2769 vf_nvinfo.update(self.rpc.call_node_verify(additional_nodes,
2770 {key: node_verify_param[key]},
2771 self.cfg.GetClusterName()))
2773 vf_nvinfo = all_nvinfo
2774 vf_node_info = self.my_node_info.values()
2776 self._VerifyFiles(_ErrorIf, vf_node_info, master_node, vf_nvinfo, filemap)
2778 feedback_fn("* Verifying node status")
2782 for node_i in node_data_list:
2784 nimg = node_image[node]
2788 feedback_fn("* Skipping offline node %s" % (node,))
2792 if node == master_node:
2794 elif node_i.master_candidate:
2795 ntype = "master candidate"
2796 elif node_i.drained:
2802 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
2804 msg = all_nvinfo[node].fail_msg
2805 _ErrorIf(msg, constants.CV_ENODERPC, node, "while contacting node: %s",
2808 nimg.rpc_fail = True
2811 nresult = all_nvinfo[node].payload
2813 nimg.call_ok = self._VerifyNode(node_i, nresult)
2814 self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2815 self._VerifyNodeNetwork(node_i, nresult)
2816 self._VerifyNodeUserScripts(node_i, nresult)
2817 self._VerifyOob(node_i, nresult)
2818 self._VerifyFileStoragePaths(node_i, nresult,
2819 node == master_node)
2822 self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
2823 self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
2826 self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2827 self._UpdateNodeInstances(node_i, nresult, nimg)
2828 self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2829 self._UpdateNodeOS(node_i, nresult, nimg)
2831 if not nimg.os_fail:
2832 if refos_img is None:
2834 self._VerifyNodeOS(node_i, nimg, refos_img)
2835 self._VerifyNodeBridges(node_i, nresult, bridges)
2837 # Check whether all running instancies are primary for the node. (This
2838 # can no longer be done from _VerifyInstance below, since some of the
2839 # wrong instances could be from other node groups.)
2840 non_primary_inst = set(nimg.instances).difference(nimg.pinst)
2842 for inst in non_primary_inst:
2843 test = inst in self.all_inst_info
2844 _ErrorIf(test, constants.CV_EINSTANCEWRONGNODE, inst,
2845 "instance should not run on node %s", node_i.name)
2846 _ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
2847 "node is running unknown instance %s", inst)
2849 self._VerifyGroupDRBDVersion(all_nvinfo)
2850 self._VerifyGroupLVM(node_image, vg_name)
2852 for node, result in extra_lv_nvinfo.items():
2853 self._UpdateNodeVolumes(self.all_node_info[node], result.payload,
2854 node_image[node], vg_name)
2856 feedback_fn("* Verifying instance status")
2857 for instance in self.my_inst_names:
2859 feedback_fn("* Verifying instance %s" % instance)
2860 inst_config = self.my_inst_info[instance]
2861 self._VerifyInstance(instance, inst_config, node_image,
2864 # If the instance is non-redundant we cannot survive losing its primary
2865 # node, so we are not N+1 compliant.
2866 if inst_config.disk_template not in constants.DTS_MIRRORED:
2867 i_non_redundant.append(instance)
2869 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
2870 i_non_a_balanced.append(instance)
2872 feedback_fn("* Verifying orphan volumes")
2873 reserved = utils.FieldSet(*cluster.reserved_lvs)
2875 # We will get spurious "unknown volume" warnings if any node of this group
2876 # is secondary for an instance whose primary is in another group. To avoid
2877 # them, we find these instances and add their volumes to node_vol_should.
2878 for inst in self.all_inst_info.values():
2879 for secondary in inst.secondary_nodes:
2880 if (secondary in self.my_node_info
2881 and inst.name not in self.my_inst_info):
2882 inst.MapLVsByNode(node_vol_should)
2885 self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
2887 if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
2888 feedback_fn("* Verifying N+1 Memory redundancy")
2889 self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
2891 feedback_fn("* Other Notes")
2893 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
2894 % len(i_non_redundant))
2896 if i_non_a_balanced:
2897 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
2898 % len(i_non_a_balanced))
2901 feedback_fn(" - NOTICE: %d offline instance(s) found." % i_offline)
2904 feedback_fn(" - NOTICE: %d offline node(s) found." % n_offline)
2907 feedback_fn(" - NOTICE: %d drained node(s) found." % n_drained)
2911 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
2912 """Analyze the post-hooks' result
2914 This method analyses the hook result, handles it, and sends some
2915 nicely-formatted feedback back to the user.
2917 @param phase: one of L{constants.HOOKS_PHASE_POST} or
2918 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
2919 @param hooks_results: the results of the multi-node hooks rpc call
2920 @param feedback_fn: function used send feedback back to the caller
2921 @param lu_result: previous Exec result
2922 @return: the new Exec result, based on the previous result
2926 # We only really run POST phase hooks, only for non-empty groups,
2927 # and are only interested in their results
2928 if not self.my_node_names:
2931 elif phase == constants.HOOKS_PHASE_POST:
2932 # Used to change hooks' output to proper indentation
2933 feedback_fn("* Hooks Results")
2934 assert hooks_results, "invalid result from hooks"
2936 for node_name in hooks_results:
2937 res = hooks_results[node_name]
2939 test = msg and not res.offline
2940 self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
2941 "Communication failure in hooks execution: %s", msg)
2942 if res.offline or msg:
2943 # No need to investigate payload if node is offline or gave
2946 for script, hkr, output in res.payload:
2947 test = hkr == constants.HKR_FAIL
2948 self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
2949 "Script %s failed, output:", script)
2951 output = self._HOOKS_INDENT_RE.sub(" ", output)
2952 feedback_fn("%s" % output)
2958 class LUClusterVerifyDisks(NoHooksLU):
2959 """Verifies the cluster disks status.
2964 def ExpandNames(self):
2965 self.share_locks = ShareAll()
2966 self.needed_locks = {
2967 locking.LEVEL_NODEGROUP: locking.ALL_SET,
2970 def Exec(self, feedback_fn):
2971 group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
2973 # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
2974 return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
2975 for group in group_names])