4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Logical units dealing with the cluster."""
34 from ganeti import compat
35 from ganeti import constants
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import masterd
40 from ganeti import netutils
41 from ganeti import objects
42 from ganeti import opcodes
43 from ganeti import pathutils
44 from ganeti import query
45 from ganeti import rpc
46 from ganeti import runtime
47 from ganeti import ssh
48 from ganeti import uidpool
49 from ganeti import utils
50 from ganeti import vcluster
52 from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
54 from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55 ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56 GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57 GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58 CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59 ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob
61 import ganeti.masterd.instance
64 class LUClusterActivateMasterIp(NoHooksLU):
65 """Activate the master IP on the master node.
68 def Exec(self, feedback_fn):
69 """Activate the master IP.
72 master_params = self.cfg.GetMasterNetworkParameters()
73 ems = self.cfg.GetUseExternalMipScript()
74 result = self.rpc.call_node_activate_master_ip(master_params.name,
76 result.Raise("Could not activate the master IP")
79 class LUClusterDeactivateMasterIp(NoHooksLU):
80 """Deactivate the master IP on the master node.
83 def Exec(self, feedback_fn):
84 """Deactivate the master IP.
87 master_params = self.cfg.GetMasterNetworkParameters()
88 ems = self.cfg.GetUseExternalMipScript()
89 result = self.rpc.call_node_deactivate_master_ip(master_params.name,
91 result.Raise("Could not deactivate the master IP")
94 class LUClusterConfigQuery(NoHooksLU):
95 """Return configuration values.
100 def CheckArguments(self):
101 self.cq = ClusterQuery(None, self.op.output_fields, False)
103 def ExpandNames(self):
104 self.cq.ExpandNames(self)
106 def DeclareLocks(self, level):
107 self.cq.DeclareLocks(self, level)
109 def Exec(self, feedback_fn):
110 result = self.cq.OldStyleQuery(self)
112 assert len(result) == 1
117 class LUClusterDestroy(LogicalUnit):
118 """Logical unit for destroying the cluster.
121 HPATH = "cluster-destroy"
122 HTYPE = constants.HTYPE_CLUSTER
124 def BuildHooksEnv(self):
129 "OP_TARGET": self.cfg.GetClusterName(),
132 def BuildHooksNodes(self):
133 """Build hooks nodes.
138 def CheckPrereq(self):
139 """Check prerequisites.
141 This checks whether the cluster is empty.
143 Any errors are signaled by raising errors.OpPrereqError.
146 master = self.cfg.GetMasterNode()
148 nodelist = self.cfg.GetNodeList()
149 if len(nodelist) != 1 or nodelist[0] != master:
150 raise errors.OpPrereqError("There are still %d node(s) in"
151 " this cluster." % (len(nodelist) - 1),
153 instancelist = self.cfg.GetInstanceList()
155 raise errors.OpPrereqError("There are still %d instance(s) in"
156 " this cluster." % len(instancelist),
159 def Exec(self, feedback_fn):
160 """Destroys the cluster.
163 master_params = self.cfg.GetMasterNetworkParameters()
165 # Run post hooks on master node before it's removed
166 RunPostHook(self, master_params.name)
168 ems = self.cfg.GetUseExternalMipScript()
169 result = self.rpc.call_node_deactivate_master_ip(master_params.name,
172 self.LogWarning("Error disabling the master IP address: %s",
175 return master_params.name
178 class LUClusterPostInit(LogicalUnit):
179 """Logical unit for running hooks after cluster initialization.
182 HPATH = "cluster-init"
183 HTYPE = constants.HTYPE_CLUSTER
185 def BuildHooksEnv(self):
190 "OP_TARGET": self.cfg.GetClusterName(),
193 def BuildHooksNodes(self):
194 """Build hooks nodes.
197 return ([], [self.cfg.GetMasterNode()])
199 def Exec(self, feedback_fn):
206 class ClusterQuery(QueryBase):
207 FIELDS = query.CLUSTER_FIELDS
209 #: Do not sort (there is only one item)
212 def ExpandNames(self, lu):
215 # The following variables interact with _QueryBase._GetNames
216 self.wanted = locking.ALL_SET
217 self.do_locking = self.use_locking
220 raise errors.OpPrereqError("Can not use locking for cluster queries",
223 def DeclareLocks(self, lu, level):
226 def _GetQueryData(self, lu):
227 """Computes the list of nodes and their attributes.
230 # Locking is not used
231 assert not (compat.any(lu.glm.is_owned(level)
232 for level in locking.LEVELS
233 if level != locking.LEVEL_CLUSTER) or
234 self.do_locking or self.use_locking)
236 if query.CQ_CONFIG in self.requested_data:
237 cluster = lu.cfg.GetClusterInfo()
239 cluster = NotImplemented
241 if query.CQ_QUEUE_DRAINED in self.requested_data:
242 drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
244 drain_flag = NotImplemented
246 if query.CQ_WATCHER_PAUSE in self.requested_data:
247 master_name = lu.cfg.GetMasterNode()
249 result = lu.rpc.call_get_watcher_pause(master_name)
250 result.Raise("Can't retrieve watcher pause from master node '%s'" %
253 watcher_pause = result.payload
255 watcher_pause = NotImplemented
257 return query.ClusterQueryData(cluster, drain_flag, watcher_pause)
260 class LUClusterQuery(NoHooksLU):
261 """Query cluster configuration.
266 def ExpandNames(self):
267 self.needed_locks = {}
269 def Exec(self, feedback_fn):
270 """Return cluster config.
273 cluster = self.cfg.GetClusterInfo()
276 # Filter just for enabled hypervisors
277 for os_name, hv_dict in cluster.os_hvp.items():
279 for hv_name, hv_params in hv_dict.items():
280 if hv_name in cluster.enabled_hypervisors:
281 os_hvp[os_name][hv_name] = hv_params
283 # Convert ip_family to ip_version
284 primary_ip_version = constants.IP4_VERSION
285 if cluster.primary_ip_family == netutils.IP6Address.family:
286 primary_ip_version = constants.IP6_VERSION
289 "software_version": constants.RELEASE_VERSION,
290 "protocol_version": constants.PROTOCOL_VERSION,
291 "config_version": constants.CONFIG_VERSION,
292 "os_api_version": max(constants.OS_API_VERSIONS),
293 "export_version": constants.EXPORT_VERSION,
294 "vcs_version": constants.VCS_VERSION,
295 "architecture": runtime.GetArchInfo(),
296 "name": cluster.cluster_name,
297 "master": cluster.master_node,
298 "default_hypervisor": cluster.primary_hypervisor,
299 "enabled_hypervisors": cluster.enabled_hypervisors,
300 "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
301 for hypervisor_name in cluster.enabled_hypervisors]),
303 "beparams": cluster.beparams,
304 "osparams": cluster.osparams,
305 "ipolicy": cluster.ipolicy,
306 "nicparams": cluster.nicparams,
307 "ndparams": cluster.ndparams,
308 "diskparams": cluster.diskparams,
309 "candidate_pool_size": cluster.candidate_pool_size,
310 "master_netdev": cluster.master_netdev,
311 "master_netmask": cluster.master_netmask,
312 "use_external_mip_script": cluster.use_external_mip_script,
313 "volume_group_name": cluster.volume_group_name,
314 "drbd_usermode_helper": cluster.drbd_usermode_helper,
315 "file_storage_dir": cluster.file_storage_dir,
316 "shared_file_storage_dir": cluster.shared_file_storage_dir,
317 "maintain_node_health": cluster.maintain_node_health,
318 "ctime": cluster.ctime,
319 "mtime": cluster.mtime,
320 "uuid": cluster.uuid,
321 "tags": list(cluster.GetTags()),
322 "uid_pool": cluster.uid_pool,
323 "default_iallocator": cluster.default_iallocator,
324 "reserved_lvs": cluster.reserved_lvs,
325 "primary_ip_version": primary_ip_version,
326 "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
327 "hidden_os": cluster.hidden_os,
328 "blacklisted_os": cluster.blacklisted_os,
329 "enabled_disk_templates": cluster.enabled_disk_templates,
335 class LUClusterRedistConf(NoHooksLU):
336 """Force the redistribution of cluster configuration.
338 This is a very simple LU.
343 def ExpandNames(self):
344 self.needed_locks = {
345 locking.LEVEL_NODE: locking.ALL_SET,
346 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
348 self.share_locks = ShareAll()
350 def Exec(self, feedback_fn):
351 """Redistribute the configuration.
354 self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
355 RedistributeAncillaryFiles(self)
358 class LUClusterRename(LogicalUnit):
359 """Rename the cluster.
362 HPATH = "cluster-rename"
363 HTYPE = constants.HTYPE_CLUSTER
365 def BuildHooksEnv(self):
370 "OP_TARGET": self.cfg.GetClusterName(),
371 "NEW_NAME": self.op.name,
374 def BuildHooksNodes(self):
375 """Build hooks nodes.
378 return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
380 def CheckPrereq(self):
381 """Verify that the passed name is a valid one.
384 hostname = netutils.GetHostname(name=self.op.name,
385 family=self.cfg.GetPrimaryIPFamily())
387 new_name = hostname.name
388 self.ip = new_ip = hostname.ip
389 old_name = self.cfg.GetClusterName()
390 old_ip = self.cfg.GetMasterIP()
391 if new_name == old_name and new_ip == old_ip:
392 raise errors.OpPrereqError("Neither the name nor the IP address of the"
393 " cluster has changed",
396 if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
397 raise errors.OpPrereqError("The given cluster IP address (%s) is"
398 " reachable on the network" %
399 new_ip, errors.ECODE_NOTUNIQUE)
401 self.op.name = new_name
403 def Exec(self, feedback_fn):
404 """Rename the cluster.
407 clustername = self.op.name
410 # shutdown the master IP
411 master_params = self.cfg.GetMasterNetworkParameters()
412 ems = self.cfg.GetUseExternalMipScript()
413 result = self.rpc.call_node_deactivate_master_ip(master_params.name,
415 result.Raise("Could not disable the master role")
418 cluster = self.cfg.GetClusterInfo()
419 cluster.cluster_name = clustername
420 cluster.master_ip = new_ip
421 self.cfg.Update(cluster, feedback_fn)
423 # update the known hosts file
424 ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
425 node_list = self.cfg.GetOnlineNodeList()
427 node_list.remove(master_params.name)
430 UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
432 master_params.ip = new_ip
433 result = self.rpc.call_node_activate_master_ip(master_params.name,
435 msg = result.fail_msg
437 self.LogWarning("Could not re-enable the master role on"
438 " the master, please restart manually: %s", msg)
443 class LUClusterRepairDiskSizes(NoHooksLU):
444 """Verifies the cluster disks sizes.
449 def ExpandNames(self):
450 if self.op.instances:
451 self.wanted_names = GetWantedInstances(self, self.op.instances)
452 # Not getting the node allocation lock as only a specific set of
453 # instances (and their nodes) is going to be acquired
454 self.needed_locks = {
455 locking.LEVEL_NODE_RES: [],
456 locking.LEVEL_INSTANCE: self.wanted_names,
458 self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
460 self.wanted_names = None
461 self.needed_locks = {
462 locking.LEVEL_NODE_RES: locking.ALL_SET,
463 locking.LEVEL_INSTANCE: locking.ALL_SET,
465 # This opcode is acquires the node locks for all instances
466 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
470 locking.LEVEL_NODE_RES: 1,
471 locking.LEVEL_INSTANCE: 0,
472 locking.LEVEL_NODE_ALLOC: 1,
475 def DeclareLocks(self, level):
476 if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
477 self._LockInstancesNodes(primary_only=True, level=level)
479 def CheckPrereq(self):
480 """Check prerequisites.
482 This only checks the optional instance list against the existing names.
485 if self.wanted_names is None:
486 self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
488 self.wanted_instances = \
489 map(compat.snd, self.cfg.GetMultiInstanceInfo(self.wanted_names))
491 def _EnsureChildSizes(self, disk):
492 """Ensure children of the disk have the needed disk size.
494 This is valid mainly for DRBD8 and fixes an issue where the
495 children have smaller disk size.
497 @param disk: an L{ganeti.objects.Disk} object
500 if disk.dev_type == constants.LD_DRBD8:
501 assert disk.children, "Empty children for DRBD8?"
502 fchild = disk.children[0]
503 mismatch = fchild.size < disk.size
505 self.LogInfo("Child disk has size %d, parent %d, fixing",
506 fchild.size, disk.size)
507 fchild.size = disk.size
509 # and we recurse on this child only, not on the metadev
510 return self._EnsureChildSizes(fchild) or mismatch
514 def Exec(self, feedback_fn):
515 """Verify the size of cluster disks.
518 # TODO: check child disks too
519 # TODO: check differences in size between primary/secondary nodes
521 for instance in self.wanted_instances:
522 pnode = instance.primary_node
523 if pnode not in per_node_disks:
524 per_node_disks[pnode] = []
525 for idx, disk in enumerate(instance.disks):
526 per_node_disks[pnode].append((instance, idx, disk))
528 assert not (frozenset(per_node_disks.keys()) -
529 self.owned_locks(locking.LEVEL_NODE_RES)), \
530 "Not owning correct locks"
531 assert not self.owned_locks(locking.LEVEL_NODE)
534 for node, dskl in per_node_disks.items():
535 newl = [v[2].Copy() for v in dskl]
537 self.cfg.SetDiskID(dsk, node)
538 result = self.rpc.call_blockdev_getsize(node, newl)
540 self.LogWarning("Failure in blockdev_getsize call to node"
541 " %s, ignoring", node)
543 if len(result.payload) != len(dskl):
544 logging.warning("Invalid result from node %s: len(dksl)=%d,"
545 " result.payload=%s", node, len(dskl), result.payload)
546 self.LogWarning("Invalid result from node %s, ignoring node results",
549 for ((instance, idx, disk), size) in zip(dskl, result.payload):
551 self.LogWarning("Disk %d of instance %s did not return size"
552 " information, ignoring", idx, instance.name)
554 if not isinstance(size, (int, long)):
555 self.LogWarning("Disk %d of instance %s did not return valid"
556 " size information, ignoring", idx, instance.name)
559 if size != disk.size:
560 self.LogInfo("Disk %d of instance %s has mismatched size,"
561 " correcting: recorded %d, actual %d", idx,
562 instance.name, disk.size, size)
564 self.cfg.Update(instance, feedback_fn)
565 changed.append((instance.name, idx, size))
566 if self._EnsureChildSizes(disk):
567 self.cfg.Update(instance, feedback_fn)
568 changed.append((instance.name, idx, disk.size))
572 def _ValidateNetmask(cfg, netmask):
573 """Checks if a netmask is valid.
575 @type cfg: L{config.ConfigWriter}
576 @param cfg: The cluster configuration
578 @param netmask: the netmask to be verified
579 @raise errors.OpPrereqError: if the validation fails
582 ip_family = cfg.GetPrimaryIPFamily()
584 ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
585 except errors.ProgrammerError:
586 raise errors.OpPrereqError("Invalid primary ip family: %s." %
587 ip_family, errors.ECODE_INVAL)
588 if not ipcls.ValidateNetmask(netmask):
589 raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
590 (netmask), errors.ECODE_INVAL)
593 class LUClusterSetParams(LogicalUnit):
594 """Change the parameters of the cluster.
597 HPATH = "cluster-modify"
598 HTYPE = constants.HTYPE_CLUSTER
601 def CheckArguments(self):
606 uidpool.CheckUidPool(self.op.uid_pool)
609 uidpool.CheckUidPool(self.op.add_uids)
611 if self.op.remove_uids:
612 uidpool.CheckUidPool(self.op.remove_uids)
614 if self.op.master_netmask is not None:
615 _ValidateNetmask(self.cfg, self.op.master_netmask)
617 if self.op.diskparams:
618 for dt_params in self.op.diskparams.values():
619 utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
621 utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
622 except errors.OpPrereqError, err:
623 raise errors.OpPrereqError("While verify diskparams options: %s" % err,
626 def ExpandNames(self):
627 # FIXME: in the future maybe other cluster params won't require checking on
628 # all nodes to be modified.
629 # FIXME: This opcode changes cluster-wide settings. Is acquiring all
630 # resource locks the right thing, shouldn't it be the BGL instead?
631 self.needed_locks = {
632 locking.LEVEL_NODE: locking.ALL_SET,
633 locking.LEVEL_INSTANCE: locking.ALL_SET,
634 locking.LEVEL_NODEGROUP: locking.ALL_SET,
635 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
637 self.share_locks = ShareAll()
639 def BuildHooksEnv(self):
644 "OP_TARGET": self.cfg.GetClusterName(),
645 "NEW_VG_NAME": self.op.vg_name,
648 def BuildHooksNodes(self):
649 """Build hooks nodes.
652 mn = self.cfg.GetMasterNode()
655 def CheckPrereq(self):
656 """Check prerequisites.
658 This checks whether the given params don't conflict and
659 if the given volume group is valid.
662 if self.op.vg_name is not None and not self.op.vg_name:
663 if self.cfg.HasAnyDiskOfType(constants.LD_LV):
664 raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
665 " instances exist", errors.ECODE_INVAL)
667 if self.op.drbd_helper is not None and not self.op.drbd_helper:
668 if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
669 raise errors.OpPrereqError("Cannot disable drbd helper while"
670 " drbd-based instances exist",
673 node_list = self.owned_locks(locking.LEVEL_NODE)
675 vm_capable_nodes = [node.name
676 for node in self.cfg.GetAllNodesInfo().values()
677 if node.name in node_list and node.vm_capable]
679 # if vg_name not None, checks given volume group on all nodes
681 vglist = self.rpc.call_vg_list(vm_capable_nodes)
682 for node in vm_capable_nodes:
683 msg = vglist[node].fail_msg
686 self.LogWarning("Error while gathering data on node %s"
687 " (ignoring node): %s", node, msg)
689 vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
691 constants.MIN_VG_SIZE)
693 raise errors.OpPrereqError("Error on node '%s': %s" %
694 (node, vgstatus), errors.ECODE_ENVIRON)
696 if self.op.drbd_helper:
697 # checks given drbd helper on all nodes
698 helpers = self.rpc.call_drbd_helper(node_list)
699 for (node, ninfo) in self.cfg.GetMultiNodeInfo(node_list):
701 self.LogInfo("Not checking drbd helper on offline node %s", node)
703 msg = helpers[node].fail_msg
705 raise errors.OpPrereqError("Error checking drbd helper on node"
706 " '%s': %s" % (node, msg),
707 errors.ECODE_ENVIRON)
708 node_helper = helpers[node].payload
709 if node_helper != self.op.drbd_helper:
710 raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
711 (node, node_helper), errors.ECODE_ENVIRON)
713 self.cluster = cluster = self.cfg.GetClusterInfo()
714 # validate params changes
716 objects.UpgradeBeParams(self.op.beparams)
717 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
718 self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
721 utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
722 self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
724 # TODO: we need a more general way to handle resetting
725 # cluster-level parameters to default values
726 if self.new_ndparams["oob_program"] == "":
727 self.new_ndparams["oob_program"] = \
728 constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
731 new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
732 self.cluster.hv_state_static)
733 self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
734 for hv, values in new_hv_state.items())
736 if self.op.disk_state:
737 new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
738 self.cluster.disk_state_static)
739 self.new_disk_state = \
740 dict((storage, dict((name, cluster.SimpleFillDiskState(values))
741 for name, values in svalues.items()))
742 for storage, svalues in new_disk_state.items())
745 self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
748 all_instances = self.cfg.GetAllInstancesInfo().values()
750 for group in self.cfg.GetAllNodeGroupsInfo().values():
751 instances = frozenset([inst for inst in all_instances
752 if compat.any(node in group.members
753 for node in inst.all_nodes)])
754 new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
755 ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
756 new = ComputeNewInstanceViolations(ipol,
757 new_ipolicy, instances, self.cfg)
759 violations.update(new)
762 self.LogWarning("After the ipolicy change the following instances"
764 utils.CommaJoin(utils.NiceSort(violations)))
766 if self.op.nicparams:
767 utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
768 self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
769 objects.NIC.CheckParameterSyntax(self.new_nicparams)
772 # check all instances for consistency
773 for instance in self.cfg.GetAllInstancesInfo().values():
774 for nic_idx, nic in enumerate(instance.nics):
775 params_copy = copy.deepcopy(nic.nicparams)
776 params_filled = objects.FillDict(self.new_nicparams, params_copy)
778 # check parameter syntax
780 objects.NIC.CheckParameterSyntax(params_filled)
781 except errors.ConfigurationError, err:
782 nic_errors.append("Instance %s, nic/%d: %s" %
783 (instance.name, nic_idx, err))
785 # if we're moving instances to routed, check that they have an ip
786 target_mode = params_filled[constants.NIC_MODE]
787 if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
788 nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
789 " address" % (instance.name, nic_idx))
791 raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
792 "\n".join(nic_errors), errors.ECODE_INVAL)
794 # hypervisor list/parameters
795 self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
797 for hv_name, hv_dict in self.op.hvparams.items():
798 if hv_name not in self.new_hvparams:
799 self.new_hvparams[hv_name] = hv_dict
801 self.new_hvparams[hv_name].update(hv_dict)
803 # disk template parameters
804 self.new_diskparams = objects.FillDict(cluster.diskparams, {})
805 if self.op.diskparams:
806 for dt_name, dt_params in self.op.diskparams.items():
807 if dt_name not in self.new_diskparams:
808 self.new_diskparams[dt_name] = dt_params
810 self.new_diskparams[dt_name].update(dt_params)
812 # os hypervisor parameters
813 self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
815 for os_name, hvs in self.op.os_hvp.items():
816 if os_name not in self.new_os_hvp:
817 self.new_os_hvp[os_name] = hvs
819 for hv_name, hv_dict in hvs.items():
821 # Delete if it exists
822 self.new_os_hvp[os_name].pop(hv_name, None)
823 elif hv_name not in self.new_os_hvp[os_name]:
824 self.new_os_hvp[os_name][hv_name] = hv_dict
826 self.new_os_hvp[os_name][hv_name].update(hv_dict)
829 self.new_osp = objects.FillDict(cluster.osparams, {})
831 for os_name, osp in self.op.osparams.items():
832 if os_name not in self.new_osp:
833 self.new_osp[os_name] = {}
835 self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
838 if not self.new_osp[os_name]:
839 # we removed all parameters
840 del self.new_osp[os_name]
842 # check the parameter validity (remote check)
843 CheckOSParams(self, False, [self.cfg.GetMasterNode()],
844 os_name, self.new_osp[os_name])
846 # changes to the hypervisor list
847 if self.op.enabled_hypervisors is not None:
848 self.hv_list = self.op.enabled_hypervisors
849 for hv in self.hv_list:
850 # if the hypervisor doesn't already exist in the cluster
851 # hvparams, we initialize it to empty, and then (in both
852 # cases) we make sure to fill the defaults, as we might not
853 # have a complete defaults list if the hypervisor wasn't
855 if hv not in new_hvp:
857 new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
858 utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
860 self.hv_list = cluster.enabled_hypervisors
862 if self.op.hvparams or self.op.enabled_hypervisors is not None:
863 # either the enabled list has changed, or the parameters have, validate
864 for hv_name, hv_params in self.new_hvparams.items():
865 if ((self.op.hvparams and hv_name in self.op.hvparams) or
866 (self.op.enabled_hypervisors and
867 hv_name in self.op.enabled_hypervisors)):
868 # either this is a new hypervisor, or its parameters have changed
869 hv_class = hypervisor.GetHypervisorClass(hv_name)
870 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
871 hv_class.CheckParameterSyntax(hv_params)
872 CheckHVParams(self, node_list, hv_name, hv_params)
874 self._CheckDiskTemplateConsistency()
877 # no need to check any newly-enabled hypervisors, since the
878 # defaults have already been checked in the above code-block
879 for os_name, os_hvp in self.new_os_hvp.items():
880 for hv_name, hv_params in os_hvp.items():
881 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
882 # we need to fill in the new os_hvp on top of the actual hv_p
883 cluster_defaults = self.new_hvparams.get(hv_name, {})
884 new_osp = objects.FillDict(cluster_defaults, hv_params)
885 hv_class = hypervisor.GetHypervisorClass(hv_name)
886 hv_class.CheckParameterSyntax(new_osp)
887 CheckHVParams(self, node_list, hv_name, new_osp)
889 if self.op.default_iallocator:
890 alloc_script = utils.FindFile(self.op.default_iallocator,
891 constants.IALLOCATOR_SEARCH_PATH,
893 if alloc_script is None:
894 raise errors.OpPrereqError("Invalid default iallocator script '%s'"
895 " specified" % self.op.default_iallocator,
898 def _CheckDiskTemplateConsistency(self):
899 """Check whether the disk templates that are going to be disabled
900 are still in use by some instances.
903 if self.op.enabled_disk_templates:
904 cluster = self.cfg.GetClusterInfo()
905 instances = self.cfg.GetAllInstancesInfo()
907 disk_templates_to_remove = set(cluster.enabled_disk_templates) \
908 - set(self.op.enabled_disk_templates)
909 for instance in instances.itervalues():
910 if instance.disk_template in disk_templates_to_remove:
911 raise errors.OpPrereqError("Cannot disable disk template '%s',"
912 " because instance '%s' is using it." %
913 (instance.disk_template, instance.name))
915 def Exec(self, feedback_fn):
916 """Change the parameters of the cluster.
919 if self.op.vg_name is not None:
920 new_volume = self.op.vg_name
923 if new_volume != self.cfg.GetVGName():
924 self.cfg.SetVGName(new_volume)
926 feedback_fn("Cluster LVM configuration already in desired"
927 " state, not changing")
928 if self.op.drbd_helper is not None:
929 new_helper = self.op.drbd_helper
932 if new_helper != self.cfg.GetDRBDHelper():
933 self.cfg.SetDRBDHelper(new_helper)
935 feedback_fn("Cluster DRBD helper already in desired state,"
938 self.cluster.hvparams = self.new_hvparams
940 self.cluster.os_hvp = self.new_os_hvp
941 if self.op.enabled_hypervisors is not None:
942 self.cluster.hvparams = self.new_hvparams
943 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
944 if self.op.enabled_disk_templates:
945 self.cluster.enabled_disk_templates = \
946 list(set(self.op.enabled_disk_templates))
948 self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
949 if self.op.nicparams:
950 self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
952 self.cluster.ipolicy = self.new_ipolicy
954 self.cluster.osparams = self.new_osp
956 self.cluster.ndparams = self.new_ndparams
957 if self.op.diskparams:
958 self.cluster.diskparams = self.new_diskparams
960 self.cluster.hv_state_static = self.new_hv_state
961 if self.op.disk_state:
962 self.cluster.disk_state_static = self.new_disk_state
964 if self.op.candidate_pool_size is not None:
965 self.cluster.candidate_pool_size = self.op.candidate_pool_size
966 # we need to update the pool size here, otherwise the save will fail
967 AdjustCandidatePool(self, [])
969 if self.op.maintain_node_health is not None:
970 if self.op.maintain_node_health and not constants.ENABLE_CONFD:
971 feedback_fn("Note: CONFD was disabled at build time, node health"
972 " maintenance is not useful (still enabling it)")
973 self.cluster.maintain_node_health = self.op.maintain_node_health
975 if self.op.modify_etc_hosts is not None:
976 self.cluster.modify_etc_hosts = self.op.modify_etc_hosts
978 if self.op.prealloc_wipe_disks is not None:
979 self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
981 if self.op.add_uids is not None:
982 uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
984 if self.op.remove_uids is not None:
985 uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
987 if self.op.uid_pool is not None:
988 self.cluster.uid_pool = self.op.uid_pool
990 if self.op.default_iallocator is not None:
991 self.cluster.default_iallocator = self.op.default_iallocator
993 if self.op.reserved_lvs is not None:
994 self.cluster.reserved_lvs = self.op.reserved_lvs
996 if self.op.use_external_mip_script is not None:
997 self.cluster.use_external_mip_script = self.op.use_external_mip_script
999 def helper_os(aname, mods, desc):
1001 lst = getattr(self.cluster, aname)
1002 for key, val in mods:
1003 if key == constants.DDM_ADD:
1005 feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1008 elif key == constants.DDM_REMOVE:
1012 feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1014 raise errors.ProgrammerError("Invalid modification '%s'" % key)
1016 if self.op.hidden_os:
1017 helper_os("hidden_os", self.op.hidden_os, "hidden")
1019 if self.op.blacklisted_os:
1020 helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1022 if self.op.master_netdev:
1023 master_params = self.cfg.GetMasterNetworkParameters()
1024 ems = self.cfg.GetUseExternalMipScript()
1025 feedback_fn("Shutting down master ip on the current netdev (%s)" %
1026 self.cluster.master_netdev)
1027 result = self.rpc.call_node_deactivate_master_ip(master_params.name,
1029 if not self.op.force:
1030 result.Raise("Could not disable the master ip")
1033 msg = ("Could not disable the master ip (continuing anyway): %s" %
1036 feedback_fn("Changing master_netdev from %s to %s" %
1037 (master_params.netdev, self.op.master_netdev))
1038 self.cluster.master_netdev = self.op.master_netdev
1040 if self.op.master_netmask:
1041 master_params = self.cfg.GetMasterNetworkParameters()
1042 feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1043 result = self.rpc.call_node_change_master_netmask(master_params.name,
1044 master_params.netmask,
1045 self.op.master_netmask,
1047 master_params.netdev)
1049 msg = "Could not change the master IP netmask: %s" % result.fail_msg
1052 self.cluster.master_netmask = self.op.master_netmask
1054 self.cfg.Update(self.cluster, feedback_fn)
1056 if self.op.master_netdev:
1057 master_params = self.cfg.GetMasterNetworkParameters()
1058 feedback_fn("Starting the master ip on the new master netdev (%s)" %
1059 self.op.master_netdev)
1060 ems = self.cfg.GetUseExternalMipScript()
1061 result = self.rpc.call_node_activate_master_ip(master_params.name,
1064 self.LogWarning("Could not re-enable the master ip on"
1065 " the master, please restart manually: %s",
1069 class LUClusterVerify(NoHooksLU):
1070 """Submits all jobs necessary to verify the cluster.
1075 def ExpandNames(self):
1076 self.needed_locks = {}
1078 def Exec(self, feedback_fn):
1081 if self.op.group_name:
1082 groups = [self.op.group_name]
1083 depends_fn = lambda: None
1085 groups = self.cfg.GetNodeGroupList()
1087 # Verify global configuration
1089 opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1092 # Always depend on global verification
1093 depends_fn = lambda: [(-len(jobs), [])]
1096 [opcodes.OpClusterVerifyGroup(group_name=group,
1097 ignore_errors=self.op.ignore_errors,
1098 depends=depends_fn())]
1099 for group in groups)
1101 # Fix up all parameters
1102 for op in itertools.chain(*jobs): # pylint: disable=W0142
1103 op.debug_simulate_errors = self.op.debug_simulate_errors
1104 op.verbose = self.op.verbose
1105 op.error_codes = self.op.error_codes
1107 op.skip_checks = self.op.skip_checks
1108 except AttributeError:
1109 assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1111 return ResultWithJobs(jobs)
1114 class _VerifyErrors(object):
1115 """Mix-in for cluster/group verify LUs.
1117 It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1118 self.op and self._feedback_fn to be available.)
1122 ETYPE_FIELD = "code"
1123 ETYPE_ERROR = "ERROR"
1124 ETYPE_WARNING = "WARNING"
1126 def _Error(self, ecode, item, msg, *args, **kwargs):
1127 """Format an error message.
1129 Based on the opcode's error_codes parameter, either format a
1130 parseable error code, or a simpler error string.
1132 This must be called only from Exec and functions called from Exec.
1135 ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1136 itype, etxt, _ = ecode
1137 # If the error code is in the list of ignored errors, demote the error to a
1139 if etxt in self.op.ignore_errors: # pylint: disable=E1101
1140 ltype = self.ETYPE_WARNING
1141 # first complete the msg
1144 # then format the whole message
1145 if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1146 msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1152 msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1153 # and finally report it via the feedback_fn
1154 self._feedback_fn(" - %s" % msg) # Mix-in. pylint: disable=E1101
1155 # do not mark the operation as failed for WARN cases only
1156 if ltype == self.ETYPE_ERROR:
1159 def _ErrorIf(self, cond, *args, **kwargs):
1160 """Log an error message if the passed condition is True.
1164 or self.op.debug_simulate_errors): # pylint: disable=E1101
1165 self._Error(*args, **kwargs)
1168 def _VerifyCertificate(filename):
1169 """Verifies a certificate for L{LUClusterVerifyConfig}.
1171 @type filename: string
1172 @param filename: Path to PEM file
1176 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1177 utils.ReadFile(filename))
1178 except Exception, err: # pylint: disable=W0703
1179 return (LUClusterVerifyConfig.ETYPE_ERROR,
1180 "Failed to load X509 certificate %s: %s" % (filename, err))
1183 utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1184 constants.SSL_CERT_EXPIRATION_ERROR)
1187 fnamemsg = "While verifying %s: %s" % (filename, msg)
1192 return (None, fnamemsg)
1193 elif errcode == utils.CERT_WARNING:
1194 return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1195 elif errcode == utils.CERT_ERROR:
1196 return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1198 raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1201 def _GetAllHypervisorParameters(cluster, instances):
1202 """Compute the set of all hypervisor parameters.
1204 @type cluster: L{objects.Cluster}
1205 @param cluster: the cluster object
1206 @param instances: list of L{objects.Instance}
1207 @param instances: additional instances from which to obtain parameters
1208 @rtype: list of (origin, hypervisor, parameters)
1209 @return: a list with all parameters found, indicating the hypervisor they
1210 apply to, and the origin (can be "cluster", "os X", or "instance Y")
1215 for hv_name in cluster.enabled_hypervisors:
1216 hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1218 for os_name, os_hvp in cluster.os_hvp.items():
1219 for hv_name, hv_params in os_hvp.items():
1221 full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1222 hvp_data.append(("os %s" % os_name, hv_name, full_params))
1224 # TODO: collapse identical parameter values in a single one
1225 for instance in instances:
1226 if instance.hvparams:
1227 hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1228 cluster.FillHV(instance)))
1233 class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1234 """Verifies the cluster config.
1239 def _VerifyHVP(self, hvp_data):
1240 """Verifies locally the syntax of the hypervisor parameters.
1243 for item, hv_name, hv_params in hvp_data:
1244 msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1247 hv_class = hypervisor.GetHypervisorClass(hv_name)
1248 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1249 hv_class.CheckParameterSyntax(hv_params)
1250 except errors.GenericError, err:
1251 self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1253 def ExpandNames(self):
1254 self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1255 self.share_locks = ShareAll()
1257 def CheckPrereq(self):
1258 """Check prerequisites.
1261 # Retrieve all information
1262 self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1263 self.all_node_info = self.cfg.GetAllNodesInfo()
1264 self.all_inst_info = self.cfg.GetAllInstancesInfo()
1266 def Exec(self, feedback_fn):
1267 """Verify integrity of cluster, performing various test on nodes.
1271 self._feedback_fn = feedback_fn
1273 feedback_fn("* Verifying cluster config")
1275 for msg in self.cfg.VerifyConfig():
1276 self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1278 feedback_fn("* Verifying cluster certificate files")
1280 for cert_filename in pathutils.ALL_CERT_FILES:
1281 (errcode, msg) = _VerifyCertificate(cert_filename)
1282 self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1284 self._ErrorIf(not utils.CanRead(constants.LUXID_USER,
1285 pathutils.NODED_CERT_FILE),
1286 constants.CV_ECLUSTERCERT,
1288 pathutils.NODED_CERT_FILE + " must be accessible by the " +
1289 constants.LUXID_USER + " user")
1291 feedback_fn("* Verifying hypervisor parameters")
1293 self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1294 self.all_inst_info.values()))
1296 feedback_fn("* Verifying all nodes belong to an existing group")
1298 # We do this verification here because, should this bogus circumstance
1299 # occur, it would never be caught by VerifyGroup, which only acts on
1300 # nodes/instances reachable from existing node groups.
1302 dangling_nodes = set(node.name for node in self.all_node_info.values()
1303 if node.group not in self.all_group_info)
1305 dangling_instances = {}
1306 no_node_instances = []
1308 for inst in self.all_inst_info.values():
1309 if inst.primary_node in dangling_nodes:
1310 dangling_instances.setdefault(inst.primary_node, []).append(inst.name)
1311 elif inst.primary_node not in self.all_node_info:
1312 no_node_instances.append(inst.name)
1317 utils.CommaJoin(dangling_instances.get(node.name,
1319 for node in dangling_nodes]
1321 self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1323 "the following nodes (and their instances) belong to a non"
1324 " existing group: %s", utils.CommaJoin(pretty_dangling))
1326 self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1328 "the following instances have a non-existing primary-node:"
1329 " %s", utils.CommaJoin(no_node_instances))
1334 class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1335 """Verifies the status of a node group.
1338 HPATH = "cluster-verify"
1339 HTYPE = constants.HTYPE_CLUSTER
1342 _HOOKS_INDENT_RE = re.compile("^", re.M)
1344 class NodeImage(object):
1345 """A class representing the logical and physical status of a node.
1348 @ivar name: the node name to which this object refers
1349 @ivar volumes: a structure as returned from
1350 L{ganeti.backend.GetVolumeList} (runtime)
1351 @ivar instances: a list of running instances (runtime)
1352 @ivar pinst: list of configured primary instances (config)
1353 @ivar sinst: list of configured secondary instances (config)
1354 @ivar sbp: dictionary of {primary-node: list of instances} for all
1355 instances for which this node is secondary (config)
1356 @ivar mfree: free memory, as reported by hypervisor (runtime)
1357 @ivar dfree: free disk, as reported by the node (runtime)
1358 @ivar offline: the offline status (config)
1359 @type rpc_fail: boolean
1360 @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1361 not whether the individual keys were correct) (runtime)
1362 @type lvm_fail: boolean
1363 @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1364 @type hyp_fail: boolean
1365 @ivar hyp_fail: whether the RPC call didn't return the instance list
1366 @type ghost: boolean
1367 @ivar ghost: whether this is a known node or not (config)
1368 @type os_fail: boolean
1369 @ivar os_fail: whether the RPC call didn't return valid OS data
1371 @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1372 @type vm_capable: boolean
1373 @ivar vm_capable: whether the node can host instances
1375 @ivar pv_min: size in MiB of the smallest PVs
1377 @ivar pv_max: size in MiB of the biggest PVs
1380 def __init__(self, offline=False, name=None, vm_capable=True):
1389 self.offline = offline
1390 self.vm_capable = vm_capable
1391 self.rpc_fail = False
1392 self.lvm_fail = False
1393 self.hyp_fail = False
1395 self.os_fail = False
1400 def ExpandNames(self):
1401 # This raises errors.OpPrereqError on its own:
1402 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1404 # Get instances in node group; this is unsafe and needs verification later
1406 self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1408 self.needed_locks = {
1409 locking.LEVEL_INSTANCE: inst_names,
1410 locking.LEVEL_NODEGROUP: [self.group_uuid],
1411 locking.LEVEL_NODE: [],
1413 # This opcode is run by watcher every five minutes and acquires all nodes
1414 # for a group. It doesn't run for a long time, so it's better to acquire
1415 # the node allocation lock as well.
1416 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1419 self.share_locks = ShareAll()
1421 def DeclareLocks(self, level):
1422 if level == locking.LEVEL_NODE:
1423 # Get members of node group; this is unsafe and needs verification later
1424 nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1426 all_inst_info = self.cfg.GetAllInstancesInfo()
1428 # In Exec(), we warn about mirrored instances that have primary and
1429 # secondary living in separate node groups. To fully verify that
1430 # volumes for these instances are healthy, we will need to do an
1431 # extra call to their secondaries. We ensure here those nodes will
1433 for inst in self.owned_locks(locking.LEVEL_INSTANCE):
1434 # Important: access only the instances whose lock is owned
1435 if all_inst_info[inst].disk_template in constants.DTS_INT_MIRROR:
1436 nodes.update(all_inst_info[inst].secondary_nodes)
1438 self.needed_locks[locking.LEVEL_NODE] = nodes
1440 def CheckPrereq(self):
1441 assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1442 self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1444 group_nodes = set(self.group_info.members)
1446 self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1449 group_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1451 unlocked_instances = \
1452 group_instances.difference(self.owned_locks(locking.LEVEL_INSTANCE))
1455 raise errors.OpPrereqError("Missing lock for nodes: %s" %
1456 utils.CommaJoin(unlocked_nodes),
1459 if unlocked_instances:
1460 raise errors.OpPrereqError("Missing lock for instances: %s" %
1461 utils.CommaJoin(unlocked_instances),
1464 self.all_node_info = self.cfg.GetAllNodesInfo()
1465 self.all_inst_info = self.cfg.GetAllInstancesInfo()
1467 self.my_node_names = utils.NiceSort(group_nodes)
1468 self.my_inst_names = utils.NiceSort(group_instances)
1470 self.my_node_info = dict((name, self.all_node_info[name])
1471 for name in self.my_node_names)
1473 self.my_inst_info = dict((name, self.all_inst_info[name])
1474 for name in self.my_inst_names)
1476 # We detect here the nodes that will need the extra RPC calls for verifying
1477 # split LV volumes; they should be locked.
1478 extra_lv_nodes = set()
1480 for inst in self.my_inst_info.values():
1481 if inst.disk_template in constants.DTS_INT_MIRROR:
1482 for nname in inst.all_nodes:
1483 if self.all_node_info[nname].group != self.group_uuid:
1484 extra_lv_nodes.add(nname)
1486 unlocked_lv_nodes = \
1487 extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1489 if unlocked_lv_nodes:
1490 raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1491 utils.CommaJoin(unlocked_lv_nodes),
1493 self.extra_lv_nodes = list(extra_lv_nodes)
1495 def _VerifyNode(self, ninfo, nresult):
1496 """Perform some basic validation on data returned from a node.
1498 - check the result data structure is well formed and has all the
1500 - check ganeti version
1502 @type ninfo: L{objects.Node}
1503 @param ninfo: the node to check
1504 @param nresult: the results from the node
1506 @return: whether overall this call was successful (and we can expect
1507 reasonable values in the respose)
1511 _ErrorIf = self._ErrorIf # pylint: disable=C0103
1513 # main result, nresult should be a non-empty dict
1514 test = not nresult or not isinstance(nresult, dict)
1515 _ErrorIf(test, constants.CV_ENODERPC, node,
1516 "unable to verify node: no data returned")
1520 # compares ganeti version
1521 local_version = constants.PROTOCOL_VERSION
1522 remote_version = nresult.get("version", None)
1523 test = not (remote_version and
1524 isinstance(remote_version, (list, tuple)) and
1525 len(remote_version) == 2)
1526 _ErrorIf(test, constants.CV_ENODERPC, node,
1527 "connection to node returned invalid data")
1531 test = local_version != remote_version[0]
1532 _ErrorIf(test, constants.CV_ENODEVERSION, node,
1533 "incompatible protocol versions: master %s,"
1534 " node %s", local_version, remote_version[0])
1538 # node seems compatible, we can actually try to look into its results
1540 # full package version
1541 self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1542 constants.CV_ENODEVERSION, node,
1543 "software version mismatch: master %s, node %s",
1544 constants.RELEASE_VERSION, remote_version[1],
1545 code=self.ETYPE_WARNING)
1547 hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1548 if ninfo.vm_capable and isinstance(hyp_result, dict):
1549 for hv_name, hv_result in hyp_result.iteritems():
1550 test = hv_result is not None
1551 _ErrorIf(test, constants.CV_ENODEHV, node,
1552 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1554 hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1555 if ninfo.vm_capable and isinstance(hvp_result, list):
1556 for item, hv_name, hv_result in hvp_result:
1557 _ErrorIf(True, constants.CV_ENODEHV, node,
1558 "hypervisor %s parameter verify failure (source %s): %s",
1559 hv_name, item, hv_result)
1561 test = nresult.get(constants.NV_NODESETUP,
1562 ["Missing NODESETUP results"])
1563 _ErrorIf(test, constants.CV_ENODESETUP, node, "node setup error: %s",
1568 def _VerifyNodeTime(self, ninfo, nresult,
1569 nvinfo_starttime, nvinfo_endtime):
1570 """Check the node time.
1572 @type ninfo: L{objects.Node}
1573 @param ninfo: the node to check
1574 @param nresult: the remote results for the node
1575 @param nvinfo_starttime: the start time of the RPC call
1576 @param nvinfo_endtime: the end time of the RPC call
1580 _ErrorIf = self._ErrorIf # pylint: disable=C0103
1582 ntime = nresult.get(constants.NV_TIME, None)
1584 ntime_merged = utils.MergeTime(ntime)
1585 except (ValueError, TypeError):
1586 _ErrorIf(True, constants.CV_ENODETIME, node, "Node returned invalid time")
1589 if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1590 ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1591 elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1592 ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1596 _ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, node,
1597 "Node time diverges by at least %s from master node time",
1600 def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1601 """Check the node LVM results and update info for cross-node checks.
1603 @type ninfo: L{objects.Node}
1604 @param ninfo: the node to check
1605 @param nresult: the remote results for the node
1606 @param vg_name: the configured VG name
1607 @type nimg: L{NodeImage}
1608 @param nimg: node image
1615 _ErrorIf = self._ErrorIf # pylint: disable=C0103
1617 # checks vg existence and size > 20G
1618 vglist = nresult.get(constants.NV_VGLIST, None)
1620 _ErrorIf(test, constants.CV_ENODELVM, node, "unable to check volume groups")
1622 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1623 constants.MIN_VG_SIZE)
1624 _ErrorIf(vgstatus, constants.CV_ENODELVM, node, vgstatus)
1627 (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1629 self._Error(constants.CV_ENODELVM, node, em)
1630 if pvminmax is not None:
1631 (nimg.pv_min, nimg.pv_max) = pvminmax
1633 def _VerifyGroupLVM(self, node_image, vg_name):
1634 """Check cross-node consistency in LVM.
1636 @type node_image: dict
1637 @param node_image: info about nodes, mapping from node to names to
1638 L{NodeImage} objects
1639 @param vg_name: the configured VG name
1645 # Only exlcusive storage needs this kind of checks
1646 if not self._exclusive_storage:
1649 # exclusive_storage wants all PVs to have the same size (approximately),
1650 # if the smallest and the biggest ones are okay, everything is fine.
1651 # pv_min is None iff pv_max is None
1652 vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1655 (pvmin, minnode) = min((ni.pv_min, ni.name) for ni in vals)
1656 (pvmax, maxnode) = max((ni.pv_max, ni.name) for ni in vals)
1657 bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1658 self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1659 "PV sizes differ too much in the group; smallest (%s MB) is"
1660 " on %s, biggest (%s MB) is on %s",
1661 pvmin, minnode, pvmax, maxnode)
1663 def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1664 """Check the node bridges.
1666 @type ninfo: L{objects.Node}
1667 @param ninfo: the node to check
1668 @param nresult: the remote results for the node
1669 @param bridges: the expected list of bridges
1676 _ErrorIf = self._ErrorIf # pylint: disable=C0103
1678 missing = nresult.get(constants.NV_BRIDGES, None)
1679 test = not isinstance(missing, list)
1680 _ErrorIf(test, constants.CV_ENODENET, node,
1681 "did not return valid bridge information")
1683 _ErrorIf(bool(missing), constants.CV_ENODENET, node,
1684 "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1686 def _VerifyNodeUserScripts(self, ninfo, nresult):
1687 """Check the results of user scripts presence and executability on the node
1689 @type ninfo: L{objects.Node}
1690 @param ninfo: the node to check
1691 @param nresult: the remote results for the node
1696 test = not constants.NV_USERSCRIPTS in nresult
1697 self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, node,
1698 "did not return user scripts information")
1700 broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
1702 self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, node,
1703 "user scripts not present or not executable: %s" %
1704 utils.CommaJoin(sorted(broken_scripts)))
1706 def _VerifyNodeNetwork(self, ninfo, nresult):
1707 """Check the node network connectivity results.
1709 @type ninfo: L{objects.Node}
1710 @param ninfo: the node to check
1711 @param nresult: the remote results for the node
1715 _ErrorIf = self._ErrorIf # pylint: disable=C0103
1717 test = constants.NV_NODELIST not in nresult
1718 _ErrorIf(test, constants.CV_ENODESSH, node,
1719 "node hasn't returned node ssh connectivity data")
1721 if nresult[constants.NV_NODELIST]:
1722 for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1723 _ErrorIf(True, constants.CV_ENODESSH, node,
1724 "ssh communication with node '%s': %s", a_node, a_msg)
1726 test = constants.NV_NODENETTEST not in nresult
1727 _ErrorIf(test, constants.CV_ENODENET, node,
1728 "node hasn't returned node tcp connectivity data")
1730 if nresult[constants.NV_NODENETTEST]:
1731 nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1733 _ErrorIf(True, constants.CV_ENODENET, node,
1734 "tcp communication with node '%s': %s",
1735 anode, nresult[constants.NV_NODENETTEST][anode])
1737 test = constants.NV_MASTERIP not in nresult
1738 _ErrorIf(test, constants.CV_ENODENET, node,
1739 "node hasn't returned node master IP reachability data")
1741 if not nresult[constants.NV_MASTERIP]:
1742 if node == self.master_node:
1743 msg = "the master node cannot reach the master IP (not configured?)"
1745 msg = "cannot reach the master IP"
1746 _ErrorIf(True, constants.CV_ENODENET, node, msg)
1748 def _VerifyInstance(self, instance, inst_config, node_image,
1750 """Verify an instance.
1752 This function checks to see if the required block devices are
1753 available on the instance's node, and that the nodes are in the correct
1757 _ErrorIf = self._ErrorIf # pylint: disable=C0103
1758 pnode = inst_config.primary_node
1759 pnode_img = node_image[pnode]
1760 groupinfo = self.cfg.GetAllNodeGroupsInfo()
1762 node_vol_should = {}
1763 inst_config.MapLVsByNode(node_vol_should)
1765 cluster = self.cfg.GetClusterInfo()
1766 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
1768 err = ComputeIPolicyInstanceViolation(ipolicy, inst_config, self.cfg)
1769 _ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance, utils.CommaJoin(err),
1770 code=self.ETYPE_WARNING)
1772 for node in node_vol_should:
1773 n_img = node_image[node]
1774 if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1775 # ignore missing volumes on offline or broken nodes
1777 for volume in node_vol_should[node]:
1778 test = volume not in n_img.volumes
1779 _ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance,
1780 "volume %s missing on node %s", volume, node)
1782 if inst_config.admin_state == constants.ADMINST_UP:
1783 test = instance not in pnode_img.instances and not pnode_img.offline
1784 _ErrorIf(test, constants.CV_EINSTANCEDOWN, instance,
1785 "instance not running on its primary node %s",
1787 _ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE, instance,
1788 "instance is marked as running and lives on offline node %s",
1791 diskdata = [(nname, success, status, idx)
1792 for (nname, disks) in diskstatus.items()
1793 for idx, (success, status) in enumerate(disks)]
1795 for nname, success, bdev_status, idx in diskdata:
1796 # the 'ghost node' construction in Exec() ensures that we have a
1798 snode = node_image[nname]
1799 bad_snode = snode.ghost or snode.offline
1800 _ErrorIf(inst_config.disks_active and
1801 not success and not bad_snode,
1802 constants.CV_EINSTANCEFAULTYDISK, instance,
1803 "couldn't retrieve status for disk/%s on %s: %s",
1804 idx, nname, bdev_status)
1805 _ErrorIf((inst_config.disks_active and
1806 success and bdev_status.ldisk_status == constants.LDS_FAULTY),
1807 constants.CV_EINSTANCEFAULTYDISK, instance,
1808 "disk/%s on %s is faulty", idx, nname)
1810 _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
1811 constants.CV_ENODERPC, pnode, "instance %s, connection to"
1812 " primary node failed", instance)
1814 _ErrorIf(len(inst_config.secondary_nodes) > 1,
1815 constants.CV_EINSTANCELAYOUT,
1816 instance, "instance has multiple secondary nodes: %s",
1817 utils.CommaJoin(inst_config.secondary_nodes),
1818 code=self.ETYPE_WARNING)
1820 if inst_config.disk_template not in constants.DTS_EXCL_STORAGE:
1821 # Disk template not compatible with exclusive_storage: no instance
1822 # node should have the flag set
1823 es_flags = rpc.GetExclusiveStorageForNodeNames(self.cfg,
1824 inst_config.all_nodes)
1825 es_nodes = [n for (n, es) in es_flags.items()
1827 _ErrorIf(es_nodes, constants.CV_EINSTANCEUNSUITABLENODE, instance,
1828 "instance has template %s, which is not supported on nodes"
1829 " that have exclusive storage set: %s",
1830 inst_config.disk_template, utils.CommaJoin(es_nodes))
1832 if inst_config.disk_template in constants.DTS_INT_MIRROR:
1833 instance_nodes = utils.NiceSort(inst_config.all_nodes)
1834 instance_groups = {}
1836 for node in instance_nodes:
1837 instance_groups.setdefault(self.all_node_info[node].group,
1841 "%s (group %s)" % (utils.CommaJoin(nodes), groupinfo[group].name)
1842 # Sort so that we always list the primary node first.
1843 for group, nodes in sorted(instance_groups.items(),
1844 key=lambda (_, nodes): pnode in nodes,
1847 self._ErrorIf(len(instance_groups) > 1,
1848 constants.CV_EINSTANCESPLITGROUPS,
1849 instance, "instance has primary and secondary nodes in"
1850 " different groups: %s", utils.CommaJoin(pretty_list),
1851 code=self.ETYPE_WARNING)
1853 inst_nodes_offline = []
1854 for snode in inst_config.secondary_nodes:
1855 s_img = node_image[snode]
1856 _ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
1857 snode, "instance %s, connection to secondary node failed",
1861 inst_nodes_offline.append(snode)
1863 # warn that the instance lives on offline nodes
1864 _ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE, instance,
1865 "instance has offline secondary node(s) %s",
1866 utils.CommaJoin(inst_nodes_offline))
1867 # ... or ghost/non-vm_capable nodes
1868 for node in inst_config.all_nodes:
1869 _ErrorIf(node_image[node].ghost, constants.CV_EINSTANCEBADNODE,
1870 instance, "instance lives on ghost node %s", node)
1871 _ErrorIf(not node_image[node].vm_capable, constants.CV_EINSTANCEBADNODE,
1872 instance, "instance lives on non-vm_capable node %s", node)
1874 def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
1875 """Verify if there are any unknown volumes in the cluster.
1877 The .os, .swap and backup volumes are ignored. All other volumes are
1878 reported as unknown.
1880 @type reserved: L{ganeti.utils.FieldSet}
1881 @param reserved: a FieldSet of reserved volume names
1884 for node, n_img in node_image.items():
1885 if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
1886 self.all_node_info[node].group != self.group_uuid):
1887 # skip non-healthy nodes
1889 for volume in n_img.volumes:
1890 test = ((node not in node_vol_should or
1891 volume not in node_vol_should[node]) and
1892 not reserved.Matches(volume))
1893 self._ErrorIf(test, constants.CV_ENODEORPHANLV, node,
1894 "volume %s is unknown", volume)
1896 def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
1897 """Verify N+1 Memory Resilience.
1899 Check that if one single node dies we can still start all the
1900 instances it was primary for.
1903 cluster_info = self.cfg.GetClusterInfo()
1904 for node, n_img in node_image.items():
1905 # This code checks that every node which is now listed as
1906 # secondary has enough memory to host all instances it is
1907 # supposed to should a single other node in the cluster fail.
1908 # FIXME: not ready for failover to an arbitrary node
1909 # FIXME: does not support file-backed instances
1910 # WARNING: we currently take into account down instances as well
1911 # as up ones, considering that even if they're down someone
1912 # might want to start them even in the event of a node failure.
1913 if n_img.offline or self.all_node_info[node].group != self.group_uuid:
1914 # we're skipping nodes marked offline and nodes in other groups from
1915 # the N+1 warning, since most likely we don't have good memory
1916 # infromation from them; we already list instances living on such
1917 # nodes, and that's enough warning
1919 #TODO(dynmem): also consider ballooning out other instances
1920 for prinode, instances in n_img.sbp.items():
1922 for instance in instances:
1923 bep = cluster_info.FillBE(instance_cfg[instance])
1924 if bep[constants.BE_AUTO_BALANCE]:
1925 needed_mem += bep[constants.BE_MINMEM]
1926 test = n_img.mfree < needed_mem
1927 self._ErrorIf(test, constants.CV_ENODEN1, node,
1928 "not enough memory to accomodate instance failovers"
1929 " should node %s fail (%dMiB needed, %dMiB available)",
1930 prinode, needed_mem, n_img.mfree)
1933 def _VerifyFiles(cls, errorif, nodeinfo, master_node, all_nvinfo,
1934 (files_all, files_opt, files_mc, files_vm)):
1935 """Verifies file checksums collected from all nodes.
1937 @param errorif: Callback for reporting errors
1938 @param nodeinfo: List of L{objects.Node} objects
1939 @param master_node: Name of master node
1940 @param all_nvinfo: RPC results
1943 # Define functions determining which nodes to consider for a file
1946 (files_mc, lambda node: (node.master_candidate or
1947 node.name == master_node)),
1948 (files_vm, lambda node: node.vm_capable),
1951 # Build mapping from filename to list of nodes which should have the file
1953 for (files, fn) in files2nodefn:
1955 filenodes = nodeinfo
1957 filenodes = filter(fn, nodeinfo)
1958 nodefiles.update((filename,
1959 frozenset(map(operator.attrgetter("name"), filenodes)))
1960 for filename in files)
1962 assert set(nodefiles) == (files_all | files_mc | files_vm)
1964 fileinfo = dict((filename, {}) for filename in nodefiles)
1965 ignore_nodes = set()
1967 for node in nodeinfo:
1969 ignore_nodes.add(node.name)
1972 nresult = all_nvinfo[node.name]
1974 if nresult.fail_msg or not nresult.payload:
1977 fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
1978 node_files = dict((vcluster.LocalizeVirtualPath(key), value)
1979 for (key, value) in fingerprints.items())
1982 test = not (node_files and isinstance(node_files, dict))
1983 errorif(test, constants.CV_ENODEFILECHECK, node.name,
1984 "Node did not return file checksum data")
1986 ignore_nodes.add(node.name)
1989 # Build per-checksum mapping from filename to nodes having it
1990 for (filename, checksum) in node_files.items():
1991 assert filename in nodefiles
1992 fileinfo[filename].setdefault(checksum, set()).add(node.name)
1994 for (filename, checksums) in fileinfo.items():
1995 assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
1997 # Nodes having the file
1998 with_file = frozenset(node_name
1999 for nodes in fileinfo[filename].values()
2000 for node_name in nodes) - ignore_nodes
2002 expected_nodes = nodefiles[filename] - ignore_nodes
2004 # Nodes missing file
2005 missing_file = expected_nodes - with_file
2007 if filename in files_opt:
2009 errorif(missing_file and missing_file != expected_nodes,
2010 constants.CV_ECLUSTERFILECHECK, None,
2011 "File %s is optional, but it must exist on all or no"
2012 " nodes (not found on %s)",
2013 filename, utils.CommaJoin(utils.NiceSort(missing_file)))
2015 errorif(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2016 "File %s is missing from node(s) %s", filename,
2017 utils.CommaJoin(utils.NiceSort(missing_file)))
2019 # Warn if a node has a file it shouldn't
2020 unexpected = with_file - expected_nodes
2022 constants.CV_ECLUSTERFILECHECK, None,
2023 "File %s should not exist on node(s) %s",
2024 filename, utils.CommaJoin(utils.NiceSort(unexpected)))
2026 # See if there are multiple versions of the file
2027 test = len(checksums) > 1
2029 variants = ["variant %s on %s" %
2030 (idx + 1, utils.CommaJoin(utils.NiceSort(nodes)))
2031 for (idx, (checksum, nodes)) in
2032 enumerate(sorted(checksums.items()))]
2036 errorif(test, constants.CV_ECLUSTERFILECHECK, None,
2037 "File %s found with %s different checksums (%s)",
2038 filename, len(checksums), "; ".join(variants))
2040 def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2042 """Verifies and the node DRBD status.
2044 @type ninfo: L{objects.Node}
2045 @param ninfo: the node to check
2046 @param nresult: the remote results for the node
2047 @param instanceinfo: the dict of instances
2048 @param drbd_helper: the configured DRBD usermode helper
2049 @param drbd_map: the DRBD map as returned by
2050 L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2054 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2057 helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2058 test = (helper_result is None)
2059 _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2060 "no drbd usermode helper returned")
2062 status, payload = helper_result
2064 _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2065 "drbd usermode helper check unsuccessful: %s", payload)
2066 test = status and (payload != drbd_helper)
2067 _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2068 "wrong drbd usermode helper: %s", payload)
2070 # compute the DRBD minors
2072 for minor, instance in drbd_map[node].items():
2073 test = instance not in instanceinfo
2074 _ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2075 "ghost instance '%s' in temporary DRBD map", instance)
2076 # ghost instance should not be running, but otherwise we
2077 # don't give double warnings (both ghost instance and
2078 # unallocated minor in use)
2080 node_drbd[minor] = (instance, False)
2082 instance = instanceinfo[instance]
2083 node_drbd[minor] = (instance.name, instance.disks_active)
2085 # and now check them
2086 used_minors = nresult.get(constants.NV_DRBDLIST, [])
2087 test = not isinstance(used_minors, (tuple, list))
2088 _ErrorIf(test, constants.CV_ENODEDRBD, node,
2089 "cannot parse drbd status file: %s", str(used_minors))
2091 # we cannot check drbd status
2094 for minor, (iname, must_exist) in node_drbd.items():
2095 test = minor not in used_minors and must_exist
2096 _ErrorIf(test, constants.CV_ENODEDRBD, node,
2097 "drbd minor %d of instance %s is not active", minor, iname)
2098 for minor in used_minors:
2099 test = minor not in node_drbd
2100 _ErrorIf(test, constants.CV_ENODEDRBD, node,
2101 "unallocated drbd minor %d is in use", minor)
2103 def _UpdateNodeOS(self, ninfo, nresult, nimg):
2104 """Builds the node OS structures.
2106 @type ninfo: L{objects.Node}
2107 @param ninfo: the node to check
2108 @param nresult: the remote results for the node
2109 @param nimg: the node image object
2113 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2115 remote_os = nresult.get(constants.NV_OSLIST, None)
2116 test = (not isinstance(remote_os, list) or
2117 not compat.all(isinstance(v, list) and len(v) == 7
2118 for v in remote_os))
2120 _ErrorIf(test, constants.CV_ENODEOS, node,
2121 "node hasn't returned valid OS data")
2130 for (name, os_path, status, diagnose,
2131 variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2133 if name not in os_dict:
2136 # parameters is a list of lists instead of list of tuples due to
2137 # JSON lacking a real tuple type, fix it:
2138 parameters = [tuple(v) for v in parameters]
2139 os_dict[name].append((os_path, status, diagnose,
2140 set(variants), set(parameters), set(api_ver)))
2142 nimg.oslist = os_dict
2144 def _VerifyNodeOS(self, ninfo, nimg, base):
2145 """Verifies the node OS list.
2147 @type ninfo: L{objects.Node}
2148 @param ninfo: the node to check
2149 @param nimg: the node image object
2150 @param base: the 'template' node we match against (e.g. from the master)
2154 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2156 assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2158 beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2159 for os_name, os_data in nimg.oslist.items():
2160 assert os_data, "Empty OS status for OS %s?!" % os_name
2161 f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2162 _ErrorIf(not f_status, constants.CV_ENODEOS, node,
2163 "Invalid OS %s (located at %s): %s", os_name, f_path, f_diag)
2164 _ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, node,
2165 "OS '%s' has multiple entries (first one shadows the rest): %s",
2166 os_name, utils.CommaJoin([v[0] for v in os_data]))
2167 # comparisons with the 'base' image
2168 test = os_name not in base.oslist
2169 _ErrorIf(test, constants.CV_ENODEOS, node,
2170 "Extra OS %s not present on reference node (%s)",
2174 assert base.oslist[os_name], "Base node has empty OS status?"
2175 _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2177 # base OS is invalid, skipping
2179 for kind, a, b in [("API version", f_api, b_api),
2180 ("variants list", f_var, b_var),
2181 ("parameters", beautify_params(f_param),
2182 beautify_params(b_param))]:
2183 _ErrorIf(a != b, constants.CV_ENODEOS, node,
2184 "OS %s for %s differs from reference node %s: [%s] vs. [%s]",
2185 kind, os_name, base.name,
2186 utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2188 # check any missing OSes
2189 missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2190 _ErrorIf(missing, constants.CV_ENODEOS, node,
2191 "OSes present on reference node %s but missing on this node: %s",
2192 base.name, utils.CommaJoin(missing))
2194 def _VerifyFileStoragePaths(self, ninfo, nresult, is_master):
2195 """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2197 @type ninfo: L{objects.Node}
2198 @param ninfo: the node to check
2199 @param nresult: the remote results for the node
2200 @type is_master: bool
2201 @param is_master: Whether node is the master node
2207 (constants.ENABLE_FILE_STORAGE or
2208 constants.ENABLE_SHARED_FILE_STORAGE)):
2210 fspaths = nresult[constants.NV_FILE_STORAGE_PATHS]
2212 # This should never happen
2213 self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, node,
2214 "Node did not return forbidden file storage paths")
2216 self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, node,
2217 "Found forbidden file storage paths: %s",
2218 utils.CommaJoin(fspaths))
2220 self._ErrorIf(constants.NV_FILE_STORAGE_PATHS in nresult,
2221 constants.CV_ENODEFILESTORAGEPATHS, node,
2222 "Node should not have returned forbidden file storage"
2225 def _VerifyOob(self, ninfo, nresult):
2226 """Verifies out of band functionality of a node.
2228 @type ninfo: L{objects.Node}
2229 @param ninfo: the node to check
2230 @param nresult: the remote results for the node
2234 # We just have to verify the paths on master and/or master candidates
2235 # as the oob helper is invoked on the master
2236 if ((ninfo.master_candidate or ninfo.master_capable) and
2237 constants.NV_OOB_PATHS in nresult):
2238 for path_result in nresult[constants.NV_OOB_PATHS]:
2239 self._ErrorIf(path_result, constants.CV_ENODEOOBPATH, node, path_result)
2241 def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2242 """Verifies and updates the node volume data.
2244 This function will update a L{NodeImage}'s internal structures
2245 with data from the remote call.
2247 @type ninfo: L{objects.Node}
2248 @param ninfo: the node to check
2249 @param nresult: the remote results for the node
2250 @param nimg: the node image object
2251 @param vg_name: the configured VG name
2255 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2257 nimg.lvm_fail = True
2258 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2261 elif isinstance(lvdata, basestring):
2262 _ErrorIf(True, constants.CV_ENODELVM, node, "LVM problem on node: %s",
2263 utils.SafeEncode(lvdata))
2264 elif not isinstance(lvdata, dict):
2265 _ErrorIf(True, constants.CV_ENODELVM, node,
2266 "rpc call to node failed (lvlist)")
2268 nimg.volumes = lvdata
2269 nimg.lvm_fail = False
2271 def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2272 """Verifies and updates the node instance list.
2274 If the listing was successful, then updates this node's instance
2275 list. Otherwise, it marks the RPC call as failed for the instance
2278 @type ninfo: L{objects.Node}
2279 @param ninfo: the node to check
2280 @param nresult: the remote results for the node
2281 @param nimg: the node image object
2284 idata = nresult.get(constants.NV_INSTANCELIST, None)
2285 test = not isinstance(idata, list)
2286 self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2287 "rpc call to node failed (instancelist): %s",
2288 utils.SafeEncode(str(idata)))
2290 nimg.hyp_fail = True
2292 nimg.instances = idata
2294 def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2295 """Verifies and computes a node information map
2297 @type ninfo: L{objects.Node}
2298 @param ninfo: the node to check
2299 @param nresult: the remote results for the node
2300 @param nimg: the node image object
2301 @param vg_name: the configured VG name
2305 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2307 # try to read free memory (from the hypervisor)
2308 hv_info = nresult.get(constants.NV_HVINFO, None)
2309 test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2310 _ErrorIf(test, constants.CV_ENODEHV, node,
2311 "rpc call to node failed (hvinfo)")
2314 nimg.mfree = int(hv_info["memory_free"])
2315 except (ValueError, TypeError):
2316 _ErrorIf(True, constants.CV_ENODERPC, node,
2317 "node returned invalid nodeinfo, check hypervisor")
2319 # FIXME: devise a free space model for file based instances as well
2320 if vg_name is not None:
2321 test = (constants.NV_VGLIST not in nresult or
2322 vg_name not in nresult[constants.NV_VGLIST])
2323 _ErrorIf(test, constants.CV_ENODELVM, node,
2324 "node didn't return data for the volume group '%s'"
2325 " - it is either missing or broken", vg_name)
2328 nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2329 except (ValueError, TypeError):
2330 _ErrorIf(True, constants.CV_ENODERPC, node,
2331 "node returned invalid LVM info, check LVM status")
2333 def _CollectDiskInfo(self, nodelist, node_image, instanceinfo):
2334 """Gets per-disk status information for all instances.
2336 @type nodelist: list of strings
2337 @param nodelist: Node names
2338 @type node_image: dict of (name, L{objects.Node})
2339 @param node_image: Node objects
2340 @type instanceinfo: dict of (name, L{objects.Instance})
2341 @param instanceinfo: Instance objects
2342 @rtype: {instance: {node: [(succes, payload)]}}
2343 @return: a dictionary of per-instance dictionaries with nodes as
2344 keys and disk information as values; the disk information is a
2345 list of tuples (success, payload)
2348 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2351 node_disks_devonly = {}
2352 diskless_instances = set()
2353 diskless = constants.DT_DISKLESS
2355 for nname in nodelist:
2356 node_instances = list(itertools.chain(node_image[nname].pinst,
2357 node_image[nname].sinst))
2358 diskless_instances.update(inst for inst in node_instances
2359 if instanceinfo[inst].disk_template == diskless)
2360 disks = [(inst, disk)
2361 for inst in node_instances
2362 for disk in instanceinfo[inst].disks]
2365 # No need to collect data
2368 node_disks[nname] = disks
2370 # _AnnotateDiskParams makes already copies of the disks
2372 for (inst, dev) in disks:
2373 (anno_disk,) = AnnotateDiskParams(instanceinfo[inst], [dev], self.cfg)
2374 self.cfg.SetDiskID(anno_disk, nname)
2375 devonly.append(anno_disk)
2377 node_disks_devonly[nname] = devonly
2379 assert len(node_disks) == len(node_disks_devonly)
2381 # Collect data from all nodes with disks
2382 result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2385 assert len(result) == len(node_disks)
2389 for (nname, nres) in result.items():
2390 disks = node_disks[nname]
2393 # No data from this node
2394 data = len(disks) * [(False, "node offline")]
2397 _ErrorIf(msg, constants.CV_ENODERPC, nname,
2398 "while getting disk information: %s", msg)
2400 # No data from this node
2401 data = len(disks) * [(False, msg)]
2404 for idx, i in enumerate(nres.payload):
2405 if isinstance(i, (tuple, list)) and len(i) == 2:
2408 logging.warning("Invalid result from node %s, entry %d: %s",
2410 data.append((False, "Invalid result from the remote node"))
2412 for ((inst, _), status) in zip(disks, data):
2413 instdisk.setdefault(inst, {}).setdefault(nname, []).append(status)
2415 # Add empty entries for diskless instances.
2416 for inst in diskless_instances:
2417 assert inst not in instdisk
2420 assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2421 len(nnames) <= len(instanceinfo[inst].all_nodes) and
2422 compat.all(isinstance(s, (tuple, list)) and
2423 len(s) == 2 for s in statuses)
2424 for inst, nnames in instdisk.items()
2425 for nname, statuses in nnames.items())
2427 instdisk_keys = set(instdisk)
2428 instanceinfo_keys = set(instanceinfo)
2429 assert instdisk_keys == instanceinfo_keys, \
2430 ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2431 (instdisk_keys, instanceinfo_keys))
2436 def _SshNodeSelector(group_uuid, all_nodes):
2437 """Create endless iterators for all potential SSH check hosts.
2440 nodes = [node for node in all_nodes
2441 if (node.group != group_uuid and
2443 keyfunc = operator.attrgetter("group")
2445 return map(itertools.cycle,
2446 [sorted(map(operator.attrgetter("name"), names))
2447 for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2451 def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2452 """Choose which nodes should talk to which other nodes.
2454 We will make nodes contact all nodes in their group, and one node from
2457 @warning: This algorithm has a known issue if one node group is much
2458 smaller than others (e.g. just one node). In such a case all other
2459 nodes will talk to the single node.
2462 online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2463 sel = cls._SshNodeSelector(group_uuid, all_nodes)
2465 return (online_nodes,
2466 dict((name, sorted([i.next() for i in sel]))
2467 for name in online_nodes))
2469 def BuildHooksEnv(self):
2472 Cluster-Verify hooks just ran in the post phase and their failure makes
2473 the output be logged in the verify output and the verification to fail.
2477 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2480 env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2481 for node in self.my_node_info.values())
2485 def BuildHooksNodes(self):
2486 """Build hooks nodes.
2489 return ([], self.my_node_names)
2491 def Exec(self, feedback_fn):
2492 """Verify integrity of the node group, performing various test on nodes.
2495 # This method has too many local variables. pylint: disable=R0914
2496 feedback_fn("* Verifying group '%s'" % self.group_info.name)
2498 if not self.my_node_names:
2500 feedback_fn("* Empty node group, skipping verification")
2504 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2505 verbose = self.op.verbose
2506 self._feedback_fn = feedback_fn
2508 vg_name = self.cfg.GetVGName()
2509 drbd_helper = self.cfg.GetDRBDHelper()
2510 cluster = self.cfg.GetClusterInfo()
2511 hypervisors = cluster.enabled_hypervisors
2512 node_data_list = [self.my_node_info[name] for name in self.my_node_names]
2514 i_non_redundant = [] # Non redundant instances
2515 i_non_a_balanced = [] # Non auto-balanced instances
2516 i_offline = 0 # Count of offline instances
2517 n_offline = 0 # Count of offline nodes
2518 n_drained = 0 # Count of nodes being drained
2519 node_vol_should = {}
2521 # FIXME: verify OS list
2524 filemap = ComputeAncillaryFiles(cluster, False)
2526 # do local checksums
2527 master_node = self.master_node = self.cfg.GetMasterNode()
2528 master_ip = self.cfg.GetMasterIP()
2530 feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_names))
2533 if self.cfg.GetUseExternalMipScript():
2534 user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2536 node_verify_param = {
2537 constants.NV_FILELIST:
2538 map(vcluster.MakeVirtualPath,
2539 utils.UniqueSequence(filename
2540 for files in filemap
2541 for filename in files)),
2542 constants.NV_NODELIST:
2543 self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2544 self.all_node_info.values()),
2545 constants.NV_HYPERVISOR: hypervisors,
2546 constants.NV_HVPARAMS:
2547 _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2548 constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2549 for node in node_data_list
2550 if not node.offline],
2551 constants.NV_INSTANCELIST: hypervisors,
2552 constants.NV_VERSION: None,
2553 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2554 constants.NV_NODESETUP: None,
2555 constants.NV_TIME: None,
2556 constants.NV_MASTERIP: (master_node, master_ip),
2557 constants.NV_OSLIST: None,
2558 constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2559 constants.NV_USERSCRIPTS: user_scripts,
2562 if vg_name is not None:
2563 node_verify_param[constants.NV_VGLIST] = None
2564 node_verify_param[constants.NV_LVLIST] = vg_name
2565 node_verify_param[constants.NV_PVLIST] = [vg_name]
2568 node_verify_param[constants.NV_DRBDLIST] = None
2569 node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2571 if constants.ENABLE_FILE_STORAGE or constants.ENABLE_SHARED_FILE_STORAGE:
2572 # Load file storage paths only from master node
2573 node_verify_param[constants.NV_FILE_STORAGE_PATHS] = master_node
2576 # FIXME: this needs to be changed per node-group, not cluster-wide
2578 default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2579 if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2580 bridges.add(default_nicpp[constants.NIC_LINK])
2581 for instance in self.my_inst_info.values():
2582 for nic in instance.nics:
2583 full_nic = cluster.SimpleFillNIC(nic.nicparams)
2584 if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2585 bridges.add(full_nic[constants.NIC_LINK])
2588 node_verify_param[constants.NV_BRIDGES] = list(bridges)
2590 # Build our expected cluster state
2591 node_image = dict((node.name, self.NodeImage(offline=node.offline,
2593 vm_capable=node.vm_capable))
2594 for node in node_data_list)
2598 for node in self.all_node_info.values():
2599 path = SupportsOob(self.cfg, node)
2600 if path and path not in oob_paths:
2601 oob_paths.append(path)
2604 node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2606 for instance in self.my_inst_names:
2607 inst_config = self.my_inst_info[instance]
2608 if inst_config.admin_state == constants.ADMINST_OFFLINE:
2611 for nname in inst_config.all_nodes:
2612 if nname not in node_image:
2613 gnode = self.NodeImage(name=nname)
2614 gnode.ghost = (nname not in self.all_node_info)
2615 node_image[nname] = gnode
2617 inst_config.MapLVsByNode(node_vol_should)
2619 pnode = inst_config.primary_node
2620 node_image[pnode].pinst.append(instance)
2622 for snode in inst_config.secondary_nodes:
2623 nimg = node_image[snode]
2624 nimg.sinst.append(instance)
2625 if pnode not in nimg.sbp:
2626 nimg.sbp[pnode] = []
2627 nimg.sbp[pnode].append(instance)
2629 es_flags = rpc.GetExclusiveStorageForNodeNames(self.cfg, self.my_node_names)
2630 # The value of exclusive_storage should be the same across the group, so if
2631 # it's True for at least a node, we act as if it were set for all the nodes
2632 self._exclusive_storage = compat.any(es_flags.values())
2633 if self._exclusive_storage:
2634 node_verify_param[constants.NV_EXCLUSIVEPVS] = True
2636 # At this point, we have the in-memory data structures complete,
2637 # except for the runtime information, which we'll gather next
2639 # Due to the way our RPC system works, exact response times cannot be
2640 # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2641 # time before and after executing the request, we can at least have a time
2643 nvinfo_starttime = time.time()
2644 all_nvinfo = self.rpc.call_node_verify(self.my_node_names,
2646 self.cfg.GetClusterName())
2647 nvinfo_endtime = time.time()
2649 if self.extra_lv_nodes and vg_name is not None:
2651 self.rpc.call_node_verify(self.extra_lv_nodes,
2652 {constants.NV_LVLIST: vg_name},
2653 self.cfg.GetClusterName())
2655 extra_lv_nvinfo = {}
2657 all_drbd_map = self.cfg.ComputeDRBDMap()
2659 feedback_fn("* Gathering disk information (%s nodes)" %
2660 len(self.my_node_names))
2661 instdisk = self._CollectDiskInfo(self.my_node_names, node_image,
2664 feedback_fn("* Verifying configuration file consistency")
2666 # If not all nodes are being checked, we need to make sure the master node
2667 # and a non-checked vm_capable node are in the list.
2668 absent_nodes = set(self.all_node_info).difference(self.my_node_info)
2670 vf_nvinfo = all_nvinfo.copy()
2671 vf_node_info = list(self.my_node_info.values())
2672 additional_nodes = []
2673 if master_node not in self.my_node_info:
2674 additional_nodes.append(master_node)
2675 vf_node_info.append(self.all_node_info[master_node])
2676 # Add the first vm_capable node we find which is not included,
2677 # excluding the master node (which we already have)
2678 for node in absent_nodes:
2679 nodeinfo = self.all_node_info[node]
2680 if (nodeinfo.vm_capable and not nodeinfo.offline and
2681 node != master_node):
2682 additional_nodes.append(node)
2683 vf_node_info.append(self.all_node_info[node])
2685 key = constants.NV_FILELIST
2686 vf_nvinfo.update(self.rpc.call_node_verify(additional_nodes,
2687 {key: node_verify_param[key]},
2688 self.cfg.GetClusterName()))
2690 vf_nvinfo = all_nvinfo
2691 vf_node_info = self.my_node_info.values()
2693 self._VerifyFiles(_ErrorIf, vf_node_info, master_node, vf_nvinfo, filemap)
2695 feedback_fn("* Verifying node status")
2699 for node_i in node_data_list:
2701 nimg = node_image[node]
2705 feedback_fn("* Skipping offline node %s" % (node,))
2709 if node == master_node:
2711 elif node_i.master_candidate:
2712 ntype = "master candidate"
2713 elif node_i.drained:
2719 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
2721 msg = all_nvinfo[node].fail_msg
2722 _ErrorIf(msg, constants.CV_ENODERPC, node, "while contacting node: %s",
2725 nimg.rpc_fail = True
2728 nresult = all_nvinfo[node].payload
2730 nimg.call_ok = self._VerifyNode(node_i, nresult)
2731 self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2732 self._VerifyNodeNetwork(node_i, nresult)
2733 self._VerifyNodeUserScripts(node_i, nresult)
2734 self._VerifyOob(node_i, nresult)
2735 self._VerifyFileStoragePaths(node_i, nresult,
2736 node == master_node)
2739 self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
2740 self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
2743 self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2744 self._UpdateNodeInstances(node_i, nresult, nimg)
2745 self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2746 self._UpdateNodeOS(node_i, nresult, nimg)
2748 if not nimg.os_fail:
2749 if refos_img is None:
2751 self._VerifyNodeOS(node_i, nimg, refos_img)
2752 self._VerifyNodeBridges(node_i, nresult, bridges)
2754 # Check whether all running instancies are primary for the node. (This
2755 # can no longer be done from _VerifyInstance below, since some of the
2756 # wrong instances could be from other node groups.)
2757 non_primary_inst = set(nimg.instances).difference(nimg.pinst)
2759 for inst in non_primary_inst:
2760 test = inst in self.all_inst_info
2761 _ErrorIf(test, constants.CV_EINSTANCEWRONGNODE, inst,
2762 "instance should not run on node %s", node_i.name)
2763 _ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
2764 "node is running unknown instance %s", inst)
2766 self._VerifyGroupLVM(node_image, vg_name)
2768 for node, result in extra_lv_nvinfo.items():
2769 self._UpdateNodeVolumes(self.all_node_info[node], result.payload,
2770 node_image[node], vg_name)
2772 feedback_fn("* Verifying instance status")
2773 for instance in self.my_inst_names:
2775 feedback_fn("* Verifying instance %s" % instance)
2776 inst_config = self.my_inst_info[instance]
2777 self._VerifyInstance(instance, inst_config, node_image,
2780 # If the instance is non-redundant we cannot survive losing its primary
2781 # node, so we are not N+1 compliant.
2782 if inst_config.disk_template not in constants.DTS_MIRRORED:
2783 i_non_redundant.append(instance)
2785 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
2786 i_non_a_balanced.append(instance)
2788 feedback_fn("* Verifying orphan volumes")
2789 reserved = utils.FieldSet(*cluster.reserved_lvs)
2791 # We will get spurious "unknown volume" warnings if any node of this group
2792 # is secondary for an instance whose primary is in another group. To avoid
2793 # them, we find these instances and add their volumes to node_vol_should.
2794 for inst in self.all_inst_info.values():
2795 for secondary in inst.secondary_nodes:
2796 if (secondary in self.my_node_info
2797 and inst.name not in self.my_inst_info):
2798 inst.MapLVsByNode(node_vol_should)
2801 self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
2803 if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
2804 feedback_fn("* Verifying N+1 Memory redundancy")
2805 self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
2807 feedback_fn("* Other Notes")
2809 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
2810 % len(i_non_redundant))
2812 if i_non_a_balanced:
2813 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
2814 % len(i_non_a_balanced))
2817 feedback_fn(" - NOTICE: %d offline instance(s) found." % i_offline)
2820 feedback_fn(" - NOTICE: %d offline node(s) found." % n_offline)
2823 feedback_fn(" - NOTICE: %d drained node(s) found." % n_drained)
2827 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
2828 """Analyze the post-hooks' result
2830 This method analyses the hook result, handles it, and sends some
2831 nicely-formatted feedback back to the user.
2833 @param phase: one of L{constants.HOOKS_PHASE_POST} or
2834 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
2835 @param hooks_results: the results of the multi-node hooks rpc call
2836 @param feedback_fn: function used send feedback back to the caller
2837 @param lu_result: previous Exec result
2838 @return: the new Exec result, based on the previous result
2842 # We only really run POST phase hooks, only for non-empty groups,
2843 # and are only interested in their results
2844 if not self.my_node_names:
2847 elif phase == constants.HOOKS_PHASE_POST:
2848 # Used to change hooks' output to proper indentation
2849 feedback_fn("* Hooks Results")
2850 assert hooks_results, "invalid result from hooks"
2852 for node_name in hooks_results:
2853 res = hooks_results[node_name]
2855 test = msg and not res.offline
2856 self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
2857 "Communication failure in hooks execution: %s", msg)
2858 if res.offline or msg:
2859 # No need to investigate payload if node is offline or gave
2862 for script, hkr, output in res.payload:
2863 test = hkr == constants.HKR_FAIL
2864 self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
2865 "Script %s failed, output:", script)
2867 output = self._HOOKS_INDENT_RE.sub(" ", output)
2868 feedback_fn("%s" % output)
2874 class LUClusterVerifyDisks(NoHooksLU):
2875 """Verifies the cluster disks status.
2880 def ExpandNames(self):
2881 self.share_locks = ShareAll()
2882 self.needed_locks = {
2883 locking.LEVEL_NODEGROUP: locking.ALL_SET,
2886 def Exec(self, feedback_fn):
2887 group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
2889 # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
2890 return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
2891 for group in group_names])