4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Logical units dealing with the cluster."""
34 from ganeti import compat
35 from ganeti import constants
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import masterd
40 from ganeti import netutils
41 from ganeti import objects
42 from ganeti import opcodes
43 from ganeti import pathutils
44 from ganeti import query
45 from ganeti import rpc
46 from ganeti import runtime
47 from ganeti import ssh
48 from ganeti import uidpool
49 from ganeti import utils
50 from ganeti import vcluster
52 from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
54 from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55 ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56 GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57 GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58 CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59 ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob
61 import ganeti.masterd.instance
64 class LUClusterActivateMasterIp(NoHooksLU):
65 """Activate the master IP on the master node.
68 def Exec(self, feedback_fn):
69 """Activate the master IP.
72 master_params = self.cfg.GetMasterNetworkParameters()
73 ems = self.cfg.GetUseExternalMipScript()
74 result = self.rpc.call_node_activate_master_ip(master_params.name,
76 result.Raise("Could not activate the master IP")
79 class LUClusterDeactivateMasterIp(NoHooksLU):
80 """Deactivate the master IP on the master node.
83 def Exec(self, feedback_fn):
84 """Deactivate the master IP.
87 master_params = self.cfg.GetMasterNetworkParameters()
88 ems = self.cfg.GetUseExternalMipScript()
89 result = self.rpc.call_node_deactivate_master_ip(master_params.name,
91 result.Raise("Could not deactivate the master IP")
94 class LUClusterConfigQuery(NoHooksLU):
95 """Return configuration values.
100 def CheckArguments(self):
101 self.cq = ClusterQuery(None, self.op.output_fields, False)
103 def ExpandNames(self):
104 self.cq.ExpandNames(self)
106 def DeclareLocks(self, level):
107 self.cq.DeclareLocks(self, level)
109 def Exec(self, feedback_fn):
110 result = self.cq.OldStyleQuery(self)
112 assert len(result) == 1
117 class LUClusterDestroy(LogicalUnit):
118 """Logical unit for destroying the cluster.
121 HPATH = "cluster-destroy"
122 HTYPE = constants.HTYPE_CLUSTER
124 def BuildHooksEnv(self):
129 "OP_TARGET": self.cfg.GetClusterName(),
132 def BuildHooksNodes(self):
133 """Build hooks nodes.
138 def CheckPrereq(self):
139 """Check prerequisites.
141 This checks whether the cluster is empty.
143 Any errors are signaled by raising errors.OpPrereqError.
146 master = self.cfg.GetMasterNode()
148 nodelist = self.cfg.GetNodeList()
149 if len(nodelist) != 1 or nodelist[0] != master:
150 raise errors.OpPrereqError("There are still %d node(s) in"
151 " this cluster." % (len(nodelist) - 1),
153 instancelist = self.cfg.GetInstanceList()
155 raise errors.OpPrereqError("There are still %d instance(s) in"
156 " this cluster." % len(instancelist),
159 def Exec(self, feedback_fn):
160 """Destroys the cluster.
163 master_params = self.cfg.GetMasterNetworkParameters()
165 # Run post hooks on master node before it's removed
166 RunPostHook(self, master_params.name)
168 ems = self.cfg.GetUseExternalMipScript()
169 result = self.rpc.call_node_deactivate_master_ip(master_params.name,
172 self.LogWarning("Error disabling the master IP address: %s",
175 return master_params.name
178 class LUClusterPostInit(LogicalUnit):
179 """Logical unit for running hooks after cluster initialization.
182 HPATH = "cluster-init"
183 HTYPE = constants.HTYPE_CLUSTER
185 def BuildHooksEnv(self):
190 "OP_TARGET": self.cfg.GetClusterName(),
193 def BuildHooksNodes(self):
194 """Build hooks nodes.
197 return ([], [self.cfg.GetMasterNode()])
199 def Exec(self, feedback_fn):
206 class ClusterQuery(QueryBase):
207 FIELDS = query.CLUSTER_FIELDS
209 #: Do not sort (there is only one item)
212 def ExpandNames(self, lu):
215 # The following variables interact with _QueryBase._GetNames
216 self.wanted = locking.ALL_SET
217 self.do_locking = self.use_locking
220 raise errors.OpPrereqError("Can not use locking for cluster queries",
223 def DeclareLocks(self, lu, level):
226 def _GetQueryData(self, lu):
227 """Computes the list of nodes and their attributes.
230 # Locking is not used
231 assert not (compat.any(lu.glm.is_owned(level)
232 for level in locking.LEVELS
233 if level != locking.LEVEL_CLUSTER) or
234 self.do_locking or self.use_locking)
236 if query.CQ_CONFIG in self.requested_data:
237 cluster = lu.cfg.GetClusterInfo()
239 cluster = NotImplemented
241 if query.CQ_QUEUE_DRAINED in self.requested_data:
242 drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
244 drain_flag = NotImplemented
246 if query.CQ_WATCHER_PAUSE in self.requested_data:
247 master_name = lu.cfg.GetMasterNode()
249 result = lu.rpc.call_get_watcher_pause(master_name)
250 result.Raise("Can't retrieve watcher pause from master node '%s'" %
253 watcher_pause = result.payload
255 watcher_pause = NotImplemented
257 return query.ClusterQueryData(cluster, drain_flag, watcher_pause)
260 class LUClusterQuery(NoHooksLU):
261 """Query cluster configuration.
266 def ExpandNames(self):
267 self.needed_locks = {}
269 def Exec(self, feedback_fn):
270 """Return cluster config.
273 cluster = self.cfg.GetClusterInfo()
276 # Filter just for enabled hypervisors
277 for os_name, hv_dict in cluster.os_hvp.items():
279 for hv_name, hv_params in hv_dict.items():
280 if hv_name in cluster.enabled_hypervisors:
281 os_hvp[os_name][hv_name] = hv_params
283 # Convert ip_family to ip_version
284 primary_ip_version = constants.IP4_VERSION
285 if cluster.primary_ip_family == netutils.IP6Address.family:
286 primary_ip_version = constants.IP6_VERSION
289 "software_version": constants.RELEASE_VERSION,
290 "protocol_version": constants.PROTOCOL_VERSION,
291 "config_version": constants.CONFIG_VERSION,
292 "os_api_version": max(constants.OS_API_VERSIONS),
293 "export_version": constants.EXPORT_VERSION,
294 "architecture": runtime.GetArchInfo(),
295 "name": cluster.cluster_name,
296 "master": cluster.master_node,
297 "default_hypervisor": cluster.primary_hypervisor,
298 "enabled_hypervisors": cluster.enabled_hypervisors,
299 "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
300 for hypervisor_name in cluster.enabled_hypervisors]),
302 "beparams": cluster.beparams,
303 "osparams": cluster.osparams,
304 "ipolicy": cluster.ipolicy,
305 "nicparams": cluster.nicparams,
306 "ndparams": cluster.ndparams,
307 "diskparams": cluster.diskparams,
308 "candidate_pool_size": cluster.candidate_pool_size,
309 "master_netdev": cluster.master_netdev,
310 "master_netmask": cluster.master_netmask,
311 "use_external_mip_script": cluster.use_external_mip_script,
312 "volume_group_name": cluster.volume_group_name,
313 "drbd_usermode_helper": cluster.drbd_usermode_helper,
314 "file_storage_dir": cluster.file_storage_dir,
315 "shared_file_storage_dir": cluster.shared_file_storage_dir,
316 "maintain_node_health": cluster.maintain_node_health,
317 "ctime": cluster.ctime,
318 "mtime": cluster.mtime,
319 "uuid": cluster.uuid,
320 "tags": list(cluster.GetTags()),
321 "uid_pool": cluster.uid_pool,
322 "default_iallocator": cluster.default_iallocator,
323 "reserved_lvs": cluster.reserved_lvs,
324 "primary_ip_version": primary_ip_version,
325 "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
326 "hidden_os": cluster.hidden_os,
327 "blacklisted_os": cluster.blacklisted_os,
328 "enabled_disk_templates": cluster.enabled_disk_templates,
334 class LUClusterRedistConf(NoHooksLU):
335 """Force the redistribution of cluster configuration.
337 This is a very simple LU.
342 def ExpandNames(self):
343 self.needed_locks = {
344 locking.LEVEL_NODE: locking.ALL_SET,
345 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
347 self.share_locks = ShareAll()
349 def Exec(self, feedback_fn):
350 """Redistribute the configuration.
353 self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
354 RedistributeAncillaryFiles(self)
357 class LUClusterRename(LogicalUnit):
358 """Rename the cluster.
361 HPATH = "cluster-rename"
362 HTYPE = constants.HTYPE_CLUSTER
364 def BuildHooksEnv(self):
369 "OP_TARGET": self.cfg.GetClusterName(),
370 "NEW_NAME": self.op.name,
373 def BuildHooksNodes(self):
374 """Build hooks nodes.
377 return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
379 def CheckPrereq(self):
380 """Verify that the passed name is a valid one.
383 hostname = netutils.GetHostname(name=self.op.name,
384 family=self.cfg.GetPrimaryIPFamily())
386 new_name = hostname.name
387 self.ip = new_ip = hostname.ip
388 old_name = self.cfg.GetClusterName()
389 old_ip = self.cfg.GetMasterIP()
390 if new_name == old_name and new_ip == old_ip:
391 raise errors.OpPrereqError("Neither the name nor the IP address of the"
392 " cluster has changed",
395 if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
396 raise errors.OpPrereqError("The given cluster IP address (%s) is"
397 " reachable on the network" %
398 new_ip, errors.ECODE_NOTUNIQUE)
400 self.op.name = new_name
402 def Exec(self, feedback_fn):
403 """Rename the cluster.
406 clustername = self.op.name
409 # shutdown the master IP
410 master_params = self.cfg.GetMasterNetworkParameters()
411 ems = self.cfg.GetUseExternalMipScript()
412 result = self.rpc.call_node_deactivate_master_ip(master_params.name,
414 result.Raise("Could not disable the master role")
417 cluster = self.cfg.GetClusterInfo()
418 cluster.cluster_name = clustername
419 cluster.master_ip = new_ip
420 self.cfg.Update(cluster, feedback_fn)
422 # update the known hosts file
423 ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
424 node_list = self.cfg.GetOnlineNodeList()
426 node_list.remove(master_params.name)
429 UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
431 master_params.ip = new_ip
432 result = self.rpc.call_node_activate_master_ip(master_params.name,
434 msg = result.fail_msg
436 self.LogWarning("Could not re-enable the master role on"
437 " the master, please restart manually: %s", msg)
442 class LUClusterRepairDiskSizes(NoHooksLU):
443 """Verifies the cluster disks sizes.
448 def ExpandNames(self):
449 if self.op.instances:
450 self.wanted_names = GetWantedInstances(self, self.op.instances)
451 # Not getting the node allocation lock as only a specific set of
452 # instances (and their nodes) is going to be acquired
453 self.needed_locks = {
454 locking.LEVEL_NODE_RES: [],
455 locking.LEVEL_INSTANCE: self.wanted_names,
457 self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
459 self.wanted_names = None
460 self.needed_locks = {
461 locking.LEVEL_NODE_RES: locking.ALL_SET,
462 locking.LEVEL_INSTANCE: locking.ALL_SET,
464 # This opcode is acquires the node locks for all instances
465 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
469 locking.LEVEL_NODE_RES: 1,
470 locking.LEVEL_INSTANCE: 0,
471 locking.LEVEL_NODE_ALLOC: 1,
474 def DeclareLocks(self, level):
475 if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
476 self._LockInstancesNodes(primary_only=True, level=level)
478 def CheckPrereq(self):
479 """Check prerequisites.
481 This only checks the optional instance list against the existing names.
484 if self.wanted_names is None:
485 self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
487 self.wanted_instances = \
488 map(compat.snd, self.cfg.GetMultiInstanceInfo(self.wanted_names))
490 def _EnsureChildSizes(self, disk):
491 """Ensure children of the disk have the needed disk size.
493 This is valid mainly for DRBD8 and fixes an issue where the
494 children have smaller disk size.
496 @param disk: an L{ganeti.objects.Disk} object
499 if disk.dev_type == constants.LD_DRBD8:
500 assert disk.children, "Empty children for DRBD8?"
501 fchild = disk.children[0]
502 mismatch = fchild.size < disk.size
504 self.LogInfo("Child disk has size %d, parent %d, fixing",
505 fchild.size, disk.size)
506 fchild.size = disk.size
508 # and we recurse on this child only, not on the metadev
509 return self._EnsureChildSizes(fchild) or mismatch
513 def Exec(self, feedback_fn):
514 """Verify the size of cluster disks.
517 # TODO: check child disks too
518 # TODO: check differences in size between primary/secondary nodes
520 for instance in self.wanted_instances:
521 pnode = instance.primary_node
522 if pnode not in per_node_disks:
523 per_node_disks[pnode] = []
524 for idx, disk in enumerate(instance.disks):
525 per_node_disks[pnode].append((instance, idx, disk))
527 assert not (frozenset(per_node_disks.keys()) -
528 self.owned_locks(locking.LEVEL_NODE_RES)), \
529 "Not owning correct locks"
530 assert not self.owned_locks(locking.LEVEL_NODE)
533 for node, dskl in per_node_disks.items():
534 newl = [v[2].Copy() for v in dskl]
536 self.cfg.SetDiskID(dsk, node)
537 result = self.rpc.call_blockdev_getsize(node, newl)
539 self.LogWarning("Failure in blockdev_getsize call to node"
540 " %s, ignoring", node)
542 if len(result.payload) != len(dskl):
543 logging.warning("Invalid result from node %s: len(dksl)=%d,"
544 " result.payload=%s", node, len(dskl), result.payload)
545 self.LogWarning("Invalid result from node %s, ignoring node results",
548 for ((instance, idx, disk), size) in zip(dskl, result.payload):
550 self.LogWarning("Disk %d of instance %s did not return size"
551 " information, ignoring", idx, instance.name)
553 if not isinstance(size, (int, long)):
554 self.LogWarning("Disk %d of instance %s did not return valid"
555 " size information, ignoring", idx, instance.name)
558 if size != disk.size:
559 self.LogInfo("Disk %d of instance %s has mismatched size,"
560 " correcting: recorded %d, actual %d", idx,
561 instance.name, disk.size, size)
563 self.cfg.Update(instance, feedback_fn)
564 changed.append((instance.name, idx, size))
565 if self._EnsureChildSizes(disk):
566 self.cfg.Update(instance, feedback_fn)
567 changed.append((instance.name, idx, disk.size))
571 def _ValidateNetmask(cfg, netmask):
572 """Checks if a netmask is valid.
574 @type cfg: L{config.ConfigWriter}
575 @param cfg: The cluster configuration
577 @param netmask: the netmask to be verified
578 @raise errors.OpPrereqError: if the validation fails
581 ip_family = cfg.GetPrimaryIPFamily()
583 ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
584 except errors.ProgrammerError:
585 raise errors.OpPrereqError("Invalid primary ip family: %s." %
586 ip_family, errors.ECODE_INVAL)
587 if not ipcls.ValidateNetmask(netmask):
588 raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
589 (netmask), errors.ECODE_INVAL)
592 class LUClusterSetParams(LogicalUnit):
593 """Change the parameters of the cluster.
596 HPATH = "cluster-modify"
597 HTYPE = constants.HTYPE_CLUSTER
600 def CheckArguments(self):
605 uidpool.CheckUidPool(self.op.uid_pool)
608 uidpool.CheckUidPool(self.op.add_uids)
610 if self.op.remove_uids:
611 uidpool.CheckUidPool(self.op.remove_uids)
613 if self.op.master_netmask is not None:
614 _ValidateNetmask(self.cfg, self.op.master_netmask)
616 if self.op.diskparams:
617 for dt_params in self.op.diskparams.values():
618 utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
620 utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
621 except errors.OpPrereqError, err:
622 raise errors.OpPrereqError("While verify diskparams options: %s" % err,
625 def ExpandNames(self):
626 # FIXME: in the future maybe other cluster params won't require checking on
627 # all nodes to be modified.
628 # FIXME: This opcode changes cluster-wide settings. Is acquiring all
629 # resource locks the right thing, shouldn't it be the BGL instead?
630 self.needed_locks = {
631 locking.LEVEL_NODE: locking.ALL_SET,
632 locking.LEVEL_INSTANCE: locking.ALL_SET,
633 locking.LEVEL_NODEGROUP: locking.ALL_SET,
634 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
636 self.share_locks = ShareAll()
638 def BuildHooksEnv(self):
643 "OP_TARGET": self.cfg.GetClusterName(),
644 "NEW_VG_NAME": self.op.vg_name,
647 def BuildHooksNodes(self):
648 """Build hooks nodes.
651 mn = self.cfg.GetMasterNode()
654 def CheckPrereq(self):
655 """Check prerequisites.
657 This checks whether the given params don't conflict and
658 if the given volume group is valid.
661 if self.op.vg_name is not None and not self.op.vg_name:
662 if self.cfg.HasAnyDiskOfType(constants.LD_LV):
663 raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
664 " instances exist", errors.ECODE_INVAL)
666 if self.op.drbd_helper is not None and not self.op.drbd_helper:
667 if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
668 raise errors.OpPrereqError("Cannot disable drbd helper while"
669 " drbd-based instances exist",
672 node_list = self.owned_locks(locking.LEVEL_NODE)
674 vm_capable_nodes = [node.name
675 for node in self.cfg.GetAllNodesInfo().values()
676 if node.name in node_list and node.vm_capable]
678 # if vg_name not None, checks given volume group on all nodes
680 vglist = self.rpc.call_vg_list(vm_capable_nodes)
681 for node in vm_capable_nodes:
682 msg = vglist[node].fail_msg
685 self.LogWarning("Error while gathering data on node %s"
686 " (ignoring node): %s", node, msg)
688 vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
690 constants.MIN_VG_SIZE)
692 raise errors.OpPrereqError("Error on node '%s': %s" %
693 (node, vgstatus), errors.ECODE_ENVIRON)
695 if self.op.drbd_helper:
696 # checks given drbd helper on all nodes
697 helpers = self.rpc.call_drbd_helper(node_list)
698 for (node, ninfo) in self.cfg.GetMultiNodeInfo(node_list):
700 self.LogInfo("Not checking drbd helper on offline node %s", node)
702 msg = helpers[node].fail_msg
704 raise errors.OpPrereqError("Error checking drbd helper on node"
705 " '%s': %s" % (node, msg),
706 errors.ECODE_ENVIRON)
707 node_helper = helpers[node].payload
708 if node_helper != self.op.drbd_helper:
709 raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
710 (node, node_helper), errors.ECODE_ENVIRON)
712 self.cluster = cluster = self.cfg.GetClusterInfo()
713 # validate params changes
715 objects.UpgradeBeParams(self.op.beparams)
716 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
717 self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
720 utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
721 self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
723 # TODO: we need a more general way to handle resetting
724 # cluster-level parameters to default values
725 if self.new_ndparams["oob_program"] == "":
726 self.new_ndparams["oob_program"] = \
727 constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
730 new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
731 self.cluster.hv_state_static)
732 self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
733 for hv, values in new_hv_state.items())
735 if self.op.disk_state:
736 new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
737 self.cluster.disk_state_static)
738 self.new_disk_state = \
739 dict((storage, dict((name, cluster.SimpleFillDiskState(values))
740 for name, values in svalues.items()))
741 for storage, svalues in new_disk_state.items())
744 self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
747 all_instances = self.cfg.GetAllInstancesInfo().values()
749 for group in self.cfg.GetAllNodeGroupsInfo().values():
750 instances = frozenset([inst for inst in all_instances
751 if compat.any(node in group.members
752 for node in inst.all_nodes)])
753 new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
754 ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
755 new = ComputeNewInstanceViolations(ipol,
756 new_ipolicy, instances, self.cfg)
758 violations.update(new)
761 self.LogWarning("After the ipolicy change the following instances"
763 utils.CommaJoin(utils.NiceSort(violations)))
765 if self.op.nicparams:
766 utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
767 self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
768 objects.NIC.CheckParameterSyntax(self.new_nicparams)
771 # check all instances for consistency
772 for instance in self.cfg.GetAllInstancesInfo().values():
773 for nic_idx, nic in enumerate(instance.nics):
774 params_copy = copy.deepcopy(nic.nicparams)
775 params_filled = objects.FillDict(self.new_nicparams, params_copy)
777 # check parameter syntax
779 objects.NIC.CheckParameterSyntax(params_filled)
780 except errors.ConfigurationError, err:
781 nic_errors.append("Instance %s, nic/%d: %s" %
782 (instance.name, nic_idx, err))
784 # if we're moving instances to routed, check that they have an ip
785 target_mode = params_filled[constants.NIC_MODE]
786 if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
787 nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
788 " address" % (instance.name, nic_idx))
790 raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
791 "\n".join(nic_errors), errors.ECODE_INVAL)
793 # hypervisor list/parameters
794 self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
796 for hv_name, hv_dict in self.op.hvparams.items():
797 if hv_name not in self.new_hvparams:
798 self.new_hvparams[hv_name] = hv_dict
800 self.new_hvparams[hv_name].update(hv_dict)
802 # disk template parameters
803 self.new_diskparams = objects.FillDict(cluster.diskparams, {})
804 if self.op.diskparams:
805 for dt_name, dt_params in self.op.diskparams.items():
806 if dt_name not in self.op.diskparams:
807 self.new_diskparams[dt_name] = dt_params
809 self.new_diskparams[dt_name].update(dt_params)
811 # os hypervisor parameters
812 self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
814 for os_name, hvs in self.op.os_hvp.items():
815 if os_name not in self.new_os_hvp:
816 self.new_os_hvp[os_name] = hvs
818 for hv_name, hv_dict in hvs.items():
820 # Delete if it exists
821 self.new_os_hvp[os_name].pop(hv_name, None)
822 elif hv_name not in self.new_os_hvp[os_name]:
823 self.new_os_hvp[os_name][hv_name] = hv_dict
825 self.new_os_hvp[os_name][hv_name].update(hv_dict)
828 self.new_osp = objects.FillDict(cluster.osparams, {})
830 for os_name, osp in self.op.osparams.items():
831 if os_name not in self.new_osp:
832 self.new_osp[os_name] = {}
834 self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
837 if not self.new_osp[os_name]:
838 # we removed all parameters
839 del self.new_osp[os_name]
841 # check the parameter validity (remote check)
842 CheckOSParams(self, False, [self.cfg.GetMasterNode()],
843 os_name, self.new_osp[os_name])
845 # changes to the hypervisor list
846 if self.op.enabled_hypervisors is not None:
847 self.hv_list = self.op.enabled_hypervisors
848 for hv in self.hv_list:
849 # if the hypervisor doesn't already exist in the cluster
850 # hvparams, we initialize it to empty, and then (in both
851 # cases) we make sure to fill the defaults, as we might not
852 # have a complete defaults list if the hypervisor wasn't
854 if hv not in new_hvp:
856 new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
857 utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
859 self.hv_list = cluster.enabled_hypervisors
861 if self.op.hvparams or self.op.enabled_hypervisors is not None:
862 # either the enabled list has changed, or the parameters have, validate
863 for hv_name, hv_params in self.new_hvparams.items():
864 if ((self.op.hvparams and hv_name in self.op.hvparams) or
865 (self.op.enabled_hypervisors and
866 hv_name in self.op.enabled_hypervisors)):
867 # either this is a new hypervisor, or its parameters have changed
868 hv_class = hypervisor.GetHypervisorClass(hv_name)
869 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
870 hv_class.CheckParameterSyntax(hv_params)
871 CheckHVParams(self, node_list, hv_name, hv_params)
873 self._CheckDiskTemplateConsistency()
876 # no need to check any newly-enabled hypervisors, since the
877 # defaults have already been checked in the above code-block
878 for os_name, os_hvp in self.new_os_hvp.items():
879 for hv_name, hv_params in os_hvp.items():
880 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
881 # we need to fill in the new os_hvp on top of the actual hv_p
882 cluster_defaults = self.new_hvparams.get(hv_name, {})
883 new_osp = objects.FillDict(cluster_defaults, hv_params)
884 hv_class = hypervisor.GetHypervisorClass(hv_name)
885 hv_class.CheckParameterSyntax(new_osp)
886 CheckHVParams(self, node_list, hv_name, new_osp)
888 if self.op.default_iallocator:
889 alloc_script = utils.FindFile(self.op.default_iallocator,
890 constants.IALLOCATOR_SEARCH_PATH,
892 if alloc_script is None:
893 raise errors.OpPrereqError("Invalid default iallocator script '%s'"
894 " specified" % self.op.default_iallocator,
897 def _CheckDiskTemplateConsistency(self):
898 """Check whether the disk templates that are going to be disabled
899 are still in use by some instances.
902 if self.op.enabled_disk_templates:
903 cluster = self.cfg.GetClusterInfo()
904 instances = self.cfg.GetAllInstancesInfo()
906 disk_templates_to_remove = set(cluster.enabled_disk_templates) \
907 - set(self.op.enabled_disk_templates)
908 for instance in instances.itervalues():
909 if instance.disk_template in disk_templates_to_remove:
910 raise errors.OpPrereqError("Cannot disable disk template '%s',"
911 " because instance '%s' is using it." %
912 (instance.disk_template, instance.name))
914 def Exec(self, feedback_fn):
915 """Change the parameters of the cluster.
918 if self.op.vg_name is not None:
919 new_volume = self.op.vg_name
922 if new_volume != self.cfg.GetVGName():
923 self.cfg.SetVGName(new_volume)
925 feedback_fn("Cluster LVM configuration already in desired"
926 " state, not changing")
927 if self.op.drbd_helper is not None:
928 new_helper = self.op.drbd_helper
931 if new_helper != self.cfg.GetDRBDHelper():
932 self.cfg.SetDRBDHelper(new_helper)
934 feedback_fn("Cluster DRBD helper already in desired state,"
937 self.cluster.hvparams = self.new_hvparams
939 self.cluster.os_hvp = self.new_os_hvp
940 if self.op.enabled_hypervisors is not None:
941 self.cluster.hvparams = self.new_hvparams
942 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
943 if self.op.enabled_disk_templates:
944 self.cluster.enabled_disk_templates = \
945 list(set(self.op.enabled_disk_templates))
947 self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
948 if self.op.nicparams:
949 self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
951 self.cluster.ipolicy = self.new_ipolicy
953 self.cluster.osparams = self.new_osp
955 self.cluster.ndparams = self.new_ndparams
956 if self.op.diskparams:
957 self.cluster.diskparams = self.new_diskparams
959 self.cluster.hv_state_static = self.new_hv_state
960 if self.op.disk_state:
961 self.cluster.disk_state_static = self.new_disk_state
963 if self.op.candidate_pool_size is not None:
964 self.cluster.candidate_pool_size = self.op.candidate_pool_size
965 # we need to update the pool size here, otherwise the save will fail
966 AdjustCandidatePool(self, [])
968 if self.op.maintain_node_health is not None:
969 if self.op.maintain_node_health and not constants.ENABLE_CONFD:
970 feedback_fn("Note: CONFD was disabled at build time, node health"
971 " maintenance is not useful (still enabling it)")
972 self.cluster.maintain_node_health = self.op.maintain_node_health
974 if self.op.modify_etc_hosts is not None:
975 self.cluster.modify_etc_hosts = self.op.modify_etc_hosts
977 if self.op.prealloc_wipe_disks is not None:
978 self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
980 if self.op.add_uids is not None:
981 uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
983 if self.op.remove_uids is not None:
984 uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
986 if self.op.uid_pool is not None:
987 self.cluster.uid_pool = self.op.uid_pool
989 if self.op.default_iallocator is not None:
990 self.cluster.default_iallocator = self.op.default_iallocator
992 if self.op.reserved_lvs is not None:
993 self.cluster.reserved_lvs = self.op.reserved_lvs
995 if self.op.use_external_mip_script is not None:
996 self.cluster.use_external_mip_script = self.op.use_external_mip_script
998 def helper_os(aname, mods, desc):
1000 lst = getattr(self.cluster, aname)
1001 for key, val in mods:
1002 if key == constants.DDM_ADD:
1004 feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1007 elif key == constants.DDM_REMOVE:
1011 feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1013 raise errors.ProgrammerError("Invalid modification '%s'" % key)
1015 if self.op.hidden_os:
1016 helper_os("hidden_os", self.op.hidden_os, "hidden")
1018 if self.op.blacklisted_os:
1019 helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1021 if self.op.master_netdev:
1022 master_params = self.cfg.GetMasterNetworkParameters()
1023 ems = self.cfg.GetUseExternalMipScript()
1024 feedback_fn("Shutting down master ip on the current netdev (%s)" %
1025 self.cluster.master_netdev)
1026 result = self.rpc.call_node_deactivate_master_ip(master_params.name,
1028 if not self.op.force:
1029 result.Raise("Could not disable the master ip")
1032 msg = ("Could not disable the master ip (continuing anyway): %s" %
1035 feedback_fn("Changing master_netdev from %s to %s" %
1036 (master_params.netdev, self.op.master_netdev))
1037 self.cluster.master_netdev = self.op.master_netdev
1039 if self.op.master_netmask:
1040 master_params = self.cfg.GetMasterNetworkParameters()
1041 feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1042 result = self.rpc.call_node_change_master_netmask(master_params.name,
1043 master_params.netmask,
1044 self.op.master_netmask,
1046 master_params.netdev)
1048 msg = "Could not change the master IP netmask: %s" % result.fail_msg
1051 self.cluster.master_netmask = self.op.master_netmask
1053 self.cfg.Update(self.cluster, feedback_fn)
1055 if self.op.master_netdev:
1056 master_params = self.cfg.GetMasterNetworkParameters()
1057 feedback_fn("Starting the master ip on the new master netdev (%s)" %
1058 self.op.master_netdev)
1059 ems = self.cfg.GetUseExternalMipScript()
1060 result = self.rpc.call_node_activate_master_ip(master_params.name,
1063 self.LogWarning("Could not re-enable the master ip on"
1064 " the master, please restart manually: %s",
1068 class LUClusterVerify(NoHooksLU):
1069 """Submits all jobs necessary to verify the cluster.
1074 def ExpandNames(self):
1075 self.needed_locks = {}
1077 def Exec(self, feedback_fn):
1080 if self.op.group_name:
1081 groups = [self.op.group_name]
1082 depends_fn = lambda: None
1084 groups = self.cfg.GetNodeGroupList()
1086 # Verify global configuration
1088 opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1091 # Always depend on global verification
1092 depends_fn = lambda: [(-len(jobs), [])]
1095 [opcodes.OpClusterVerifyGroup(group_name=group,
1096 ignore_errors=self.op.ignore_errors,
1097 depends=depends_fn())]
1098 for group in groups)
1100 # Fix up all parameters
1101 for op in itertools.chain(*jobs): # pylint: disable=W0142
1102 op.debug_simulate_errors = self.op.debug_simulate_errors
1103 op.verbose = self.op.verbose
1104 op.error_codes = self.op.error_codes
1106 op.skip_checks = self.op.skip_checks
1107 except AttributeError:
1108 assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1110 return ResultWithJobs(jobs)
1113 class _VerifyErrors(object):
1114 """Mix-in for cluster/group verify LUs.
1116 It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1117 self.op and self._feedback_fn to be available.)
1121 ETYPE_FIELD = "code"
1122 ETYPE_ERROR = "ERROR"
1123 ETYPE_WARNING = "WARNING"
1125 def _Error(self, ecode, item, msg, *args, **kwargs):
1126 """Format an error message.
1128 Based on the opcode's error_codes parameter, either format a
1129 parseable error code, or a simpler error string.
1131 This must be called only from Exec and functions called from Exec.
1134 ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1135 itype, etxt, _ = ecode
1136 # If the error code is in the list of ignored errors, demote the error to a
1138 if etxt in self.op.ignore_errors: # pylint: disable=E1101
1139 ltype = self.ETYPE_WARNING
1140 # first complete the msg
1143 # then format the whole message
1144 if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1145 msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1151 msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1152 # and finally report it via the feedback_fn
1153 self._feedback_fn(" - %s" % msg) # Mix-in. pylint: disable=E1101
1154 # do not mark the operation as failed for WARN cases only
1155 if ltype == self.ETYPE_ERROR:
1158 def _ErrorIf(self, cond, *args, **kwargs):
1159 """Log an error message if the passed condition is True.
1163 or self.op.debug_simulate_errors): # pylint: disable=E1101
1164 self._Error(*args, **kwargs)
1167 def _VerifyCertificate(filename):
1168 """Verifies a certificate for L{LUClusterVerifyConfig}.
1170 @type filename: string
1171 @param filename: Path to PEM file
1175 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1176 utils.ReadFile(filename))
1177 except Exception, err: # pylint: disable=W0703
1178 return (LUClusterVerifyConfig.ETYPE_ERROR,
1179 "Failed to load X509 certificate %s: %s" % (filename, err))
1182 utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1183 constants.SSL_CERT_EXPIRATION_ERROR)
1186 fnamemsg = "While verifying %s: %s" % (filename, msg)
1191 return (None, fnamemsg)
1192 elif errcode == utils.CERT_WARNING:
1193 return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1194 elif errcode == utils.CERT_ERROR:
1195 return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1197 raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1200 def _GetAllHypervisorParameters(cluster, instances):
1201 """Compute the set of all hypervisor parameters.
1203 @type cluster: L{objects.Cluster}
1204 @param cluster: the cluster object
1205 @param instances: list of L{objects.Instance}
1206 @param instances: additional instances from which to obtain parameters
1207 @rtype: list of (origin, hypervisor, parameters)
1208 @return: a list with all parameters found, indicating the hypervisor they
1209 apply to, and the origin (can be "cluster", "os X", or "instance Y")
1214 for hv_name in cluster.enabled_hypervisors:
1215 hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1217 for os_name, os_hvp in cluster.os_hvp.items():
1218 for hv_name, hv_params in os_hvp.items():
1220 full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1221 hvp_data.append(("os %s" % os_name, hv_name, full_params))
1223 # TODO: collapse identical parameter values in a single one
1224 for instance in instances:
1225 if instance.hvparams:
1226 hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1227 cluster.FillHV(instance)))
1232 class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1233 """Verifies the cluster config.
1238 def _VerifyHVP(self, hvp_data):
1239 """Verifies locally the syntax of the hypervisor parameters.
1242 for item, hv_name, hv_params in hvp_data:
1243 msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1246 hv_class = hypervisor.GetHypervisorClass(hv_name)
1247 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1248 hv_class.CheckParameterSyntax(hv_params)
1249 except errors.GenericError, err:
1250 self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1252 def ExpandNames(self):
1253 self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1254 self.share_locks = ShareAll()
1256 def CheckPrereq(self):
1257 """Check prerequisites.
1260 # Retrieve all information
1261 self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1262 self.all_node_info = self.cfg.GetAllNodesInfo()
1263 self.all_inst_info = self.cfg.GetAllInstancesInfo()
1265 def Exec(self, feedback_fn):
1266 """Verify integrity of cluster, performing various test on nodes.
1270 self._feedback_fn = feedback_fn
1272 feedback_fn("* Verifying cluster config")
1274 for msg in self.cfg.VerifyConfig():
1275 self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1277 feedback_fn("* Verifying cluster certificate files")
1279 for cert_filename in pathutils.ALL_CERT_FILES:
1280 (errcode, msg) = _VerifyCertificate(cert_filename)
1281 self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1283 self._ErrorIf(not utils.CanRead(constants.CONFD_USER,
1284 pathutils.NODED_CERT_FILE),
1285 constants.CV_ECLUSTERCERT,
1287 pathutils.NODED_CERT_FILE + " must be accessible by the " +
1288 constants.CONFD_USER + " user")
1290 feedback_fn("* Verifying hypervisor parameters")
1292 self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1293 self.all_inst_info.values()))
1295 feedback_fn("* Verifying all nodes belong to an existing group")
1297 # We do this verification here because, should this bogus circumstance
1298 # occur, it would never be caught by VerifyGroup, which only acts on
1299 # nodes/instances reachable from existing node groups.
1301 dangling_nodes = set(node.name for node in self.all_node_info.values()
1302 if node.group not in self.all_group_info)
1304 dangling_instances = {}
1305 no_node_instances = []
1307 for inst in self.all_inst_info.values():
1308 if inst.primary_node in dangling_nodes:
1309 dangling_instances.setdefault(inst.primary_node, []).append(inst.name)
1310 elif inst.primary_node not in self.all_node_info:
1311 no_node_instances.append(inst.name)
1316 utils.CommaJoin(dangling_instances.get(node.name,
1318 for node in dangling_nodes]
1320 self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1322 "the following nodes (and their instances) belong to a non"
1323 " existing group: %s", utils.CommaJoin(pretty_dangling))
1325 self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1327 "the following instances have a non-existing primary-node:"
1328 " %s", utils.CommaJoin(no_node_instances))
1333 class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1334 """Verifies the status of a node group.
1337 HPATH = "cluster-verify"
1338 HTYPE = constants.HTYPE_CLUSTER
1341 _HOOKS_INDENT_RE = re.compile("^", re.M)
1343 class NodeImage(object):
1344 """A class representing the logical and physical status of a node.
1347 @ivar name: the node name to which this object refers
1348 @ivar volumes: a structure as returned from
1349 L{ganeti.backend.GetVolumeList} (runtime)
1350 @ivar instances: a list of running instances (runtime)
1351 @ivar pinst: list of configured primary instances (config)
1352 @ivar sinst: list of configured secondary instances (config)
1353 @ivar sbp: dictionary of {primary-node: list of instances} for all
1354 instances for which this node is secondary (config)
1355 @ivar mfree: free memory, as reported by hypervisor (runtime)
1356 @ivar dfree: free disk, as reported by the node (runtime)
1357 @ivar offline: the offline status (config)
1358 @type rpc_fail: boolean
1359 @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1360 not whether the individual keys were correct) (runtime)
1361 @type lvm_fail: boolean
1362 @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1363 @type hyp_fail: boolean
1364 @ivar hyp_fail: whether the RPC call didn't return the instance list
1365 @type ghost: boolean
1366 @ivar ghost: whether this is a known node or not (config)
1367 @type os_fail: boolean
1368 @ivar os_fail: whether the RPC call didn't return valid OS data
1370 @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1371 @type vm_capable: boolean
1372 @ivar vm_capable: whether the node can host instances
1374 @ivar pv_min: size in MiB of the smallest PVs
1376 @ivar pv_max: size in MiB of the biggest PVs
1379 def __init__(self, offline=False, name=None, vm_capable=True):
1388 self.offline = offline
1389 self.vm_capable = vm_capable
1390 self.rpc_fail = False
1391 self.lvm_fail = False
1392 self.hyp_fail = False
1394 self.os_fail = False
1399 def ExpandNames(self):
1400 # This raises errors.OpPrereqError on its own:
1401 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1403 # Get instances in node group; this is unsafe and needs verification later
1405 self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1407 self.needed_locks = {
1408 locking.LEVEL_INSTANCE: inst_names,
1409 locking.LEVEL_NODEGROUP: [self.group_uuid],
1410 locking.LEVEL_NODE: [],
1412 # This opcode is run by watcher every five minutes and acquires all nodes
1413 # for a group. It doesn't run for a long time, so it's better to acquire
1414 # the node allocation lock as well.
1415 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1418 self.share_locks = ShareAll()
1420 def DeclareLocks(self, level):
1421 if level == locking.LEVEL_NODE:
1422 # Get members of node group; this is unsafe and needs verification later
1423 nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1425 all_inst_info = self.cfg.GetAllInstancesInfo()
1427 # In Exec(), we warn about mirrored instances that have primary and
1428 # secondary living in separate node groups. To fully verify that
1429 # volumes for these instances are healthy, we will need to do an
1430 # extra call to their secondaries. We ensure here those nodes will
1432 for inst in self.owned_locks(locking.LEVEL_INSTANCE):
1433 # Important: access only the instances whose lock is owned
1434 if all_inst_info[inst].disk_template in constants.DTS_INT_MIRROR:
1435 nodes.update(all_inst_info[inst].secondary_nodes)
1437 self.needed_locks[locking.LEVEL_NODE] = nodes
1439 def CheckPrereq(self):
1440 assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1441 self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1443 group_nodes = set(self.group_info.members)
1445 self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1448 group_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1450 unlocked_instances = \
1451 group_instances.difference(self.owned_locks(locking.LEVEL_INSTANCE))
1454 raise errors.OpPrereqError("Missing lock for nodes: %s" %
1455 utils.CommaJoin(unlocked_nodes),
1458 if unlocked_instances:
1459 raise errors.OpPrereqError("Missing lock for instances: %s" %
1460 utils.CommaJoin(unlocked_instances),
1463 self.all_node_info = self.cfg.GetAllNodesInfo()
1464 self.all_inst_info = self.cfg.GetAllInstancesInfo()
1466 self.my_node_names = utils.NiceSort(group_nodes)
1467 self.my_inst_names = utils.NiceSort(group_instances)
1469 self.my_node_info = dict((name, self.all_node_info[name])
1470 for name in self.my_node_names)
1472 self.my_inst_info = dict((name, self.all_inst_info[name])
1473 for name in self.my_inst_names)
1475 # We detect here the nodes that will need the extra RPC calls for verifying
1476 # split LV volumes; they should be locked.
1477 extra_lv_nodes = set()
1479 for inst in self.my_inst_info.values():
1480 if inst.disk_template in constants.DTS_INT_MIRROR:
1481 for nname in inst.all_nodes:
1482 if self.all_node_info[nname].group != self.group_uuid:
1483 extra_lv_nodes.add(nname)
1485 unlocked_lv_nodes = \
1486 extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1488 if unlocked_lv_nodes:
1489 raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1490 utils.CommaJoin(unlocked_lv_nodes),
1492 self.extra_lv_nodes = list(extra_lv_nodes)
1494 def _VerifyNode(self, ninfo, nresult):
1495 """Perform some basic validation on data returned from a node.
1497 - check the result data structure is well formed and has all the
1499 - check ganeti version
1501 @type ninfo: L{objects.Node}
1502 @param ninfo: the node to check
1503 @param nresult: the results from the node
1505 @return: whether overall this call was successful (and we can expect
1506 reasonable values in the respose)
1510 _ErrorIf = self._ErrorIf # pylint: disable=C0103
1512 # main result, nresult should be a non-empty dict
1513 test = not nresult or not isinstance(nresult, dict)
1514 _ErrorIf(test, constants.CV_ENODERPC, node,
1515 "unable to verify node: no data returned")
1519 # compares ganeti version
1520 local_version = constants.PROTOCOL_VERSION
1521 remote_version = nresult.get("version", None)
1522 test = not (remote_version and
1523 isinstance(remote_version, (list, tuple)) and
1524 len(remote_version) == 2)
1525 _ErrorIf(test, constants.CV_ENODERPC, node,
1526 "connection to node returned invalid data")
1530 test = local_version != remote_version[0]
1531 _ErrorIf(test, constants.CV_ENODEVERSION, node,
1532 "incompatible protocol versions: master %s,"
1533 " node %s", local_version, remote_version[0])
1537 # node seems compatible, we can actually try to look into its results
1539 # full package version
1540 self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1541 constants.CV_ENODEVERSION, node,
1542 "software version mismatch: master %s, node %s",
1543 constants.RELEASE_VERSION, remote_version[1],
1544 code=self.ETYPE_WARNING)
1546 hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1547 if ninfo.vm_capable and isinstance(hyp_result, dict):
1548 for hv_name, hv_result in hyp_result.iteritems():
1549 test = hv_result is not None
1550 _ErrorIf(test, constants.CV_ENODEHV, node,
1551 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1553 hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1554 if ninfo.vm_capable and isinstance(hvp_result, list):
1555 for item, hv_name, hv_result in hvp_result:
1556 _ErrorIf(True, constants.CV_ENODEHV, node,
1557 "hypervisor %s parameter verify failure (source %s): %s",
1558 hv_name, item, hv_result)
1560 test = nresult.get(constants.NV_NODESETUP,
1561 ["Missing NODESETUP results"])
1562 _ErrorIf(test, constants.CV_ENODESETUP, node, "node setup error: %s",
1567 def _VerifyNodeTime(self, ninfo, nresult,
1568 nvinfo_starttime, nvinfo_endtime):
1569 """Check the node time.
1571 @type ninfo: L{objects.Node}
1572 @param ninfo: the node to check
1573 @param nresult: the remote results for the node
1574 @param nvinfo_starttime: the start time of the RPC call
1575 @param nvinfo_endtime: the end time of the RPC call
1579 _ErrorIf = self._ErrorIf # pylint: disable=C0103
1581 ntime = nresult.get(constants.NV_TIME, None)
1583 ntime_merged = utils.MergeTime(ntime)
1584 except (ValueError, TypeError):
1585 _ErrorIf(True, constants.CV_ENODETIME, node, "Node returned invalid time")
1588 if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1589 ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1590 elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1591 ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1595 _ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, node,
1596 "Node time diverges by at least %s from master node time",
1599 def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1600 """Check the node LVM results and update info for cross-node checks.
1602 @type ninfo: L{objects.Node}
1603 @param ninfo: the node to check
1604 @param nresult: the remote results for the node
1605 @param vg_name: the configured VG name
1606 @type nimg: L{NodeImage}
1607 @param nimg: node image
1614 _ErrorIf = self._ErrorIf # pylint: disable=C0103
1616 # checks vg existence and size > 20G
1617 vglist = nresult.get(constants.NV_VGLIST, None)
1619 _ErrorIf(test, constants.CV_ENODELVM, node, "unable to check volume groups")
1621 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1622 constants.MIN_VG_SIZE)
1623 _ErrorIf(vgstatus, constants.CV_ENODELVM, node, vgstatus)
1626 (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1628 self._Error(constants.CV_ENODELVM, node, em)
1629 if pvminmax is not None:
1630 (nimg.pv_min, nimg.pv_max) = pvminmax
1632 def _VerifyGroupLVM(self, node_image, vg_name):
1633 """Check cross-node consistency in LVM.
1635 @type node_image: dict
1636 @param node_image: info about nodes, mapping from node to names to
1637 L{NodeImage} objects
1638 @param vg_name: the configured VG name
1644 # Only exlcusive storage needs this kind of checks
1645 if not self._exclusive_storage:
1648 # exclusive_storage wants all PVs to have the same size (approximately),
1649 # if the smallest and the biggest ones are okay, everything is fine.
1650 # pv_min is None iff pv_max is None
1651 vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1654 (pvmin, minnode) = min((ni.pv_min, ni.name) for ni in vals)
1655 (pvmax, maxnode) = max((ni.pv_max, ni.name) for ni in vals)
1656 bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1657 self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1658 "PV sizes differ too much in the group; smallest (%s MB) is"
1659 " on %s, biggest (%s MB) is on %s",
1660 pvmin, minnode, pvmax, maxnode)
1662 def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1663 """Check the node bridges.
1665 @type ninfo: L{objects.Node}
1666 @param ninfo: the node to check
1667 @param nresult: the remote results for the node
1668 @param bridges: the expected list of bridges
1675 _ErrorIf = self._ErrorIf # pylint: disable=C0103
1677 missing = nresult.get(constants.NV_BRIDGES, None)
1678 test = not isinstance(missing, list)
1679 _ErrorIf(test, constants.CV_ENODENET, node,
1680 "did not return valid bridge information")
1682 _ErrorIf(bool(missing), constants.CV_ENODENET, node,
1683 "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1685 def _VerifyNodeUserScripts(self, ninfo, nresult):
1686 """Check the results of user scripts presence and executability on the node
1688 @type ninfo: L{objects.Node}
1689 @param ninfo: the node to check
1690 @param nresult: the remote results for the node
1695 test = not constants.NV_USERSCRIPTS in nresult
1696 self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, node,
1697 "did not return user scripts information")
1699 broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
1701 self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, node,
1702 "user scripts not present or not executable: %s" %
1703 utils.CommaJoin(sorted(broken_scripts)))
1705 def _VerifyNodeNetwork(self, ninfo, nresult):
1706 """Check the node network connectivity results.
1708 @type ninfo: L{objects.Node}
1709 @param ninfo: the node to check
1710 @param nresult: the remote results for the node
1714 _ErrorIf = self._ErrorIf # pylint: disable=C0103
1716 test = constants.NV_NODELIST not in nresult
1717 _ErrorIf(test, constants.CV_ENODESSH, node,
1718 "node hasn't returned node ssh connectivity data")
1720 if nresult[constants.NV_NODELIST]:
1721 for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1722 _ErrorIf(True, constants.CV_ENODESSH, node,
1723 "ssh communication with node '%s': %s", a_node, a_msg)
1725 test = constants.NV_NODENETTEST not in nresult
1726 _ErrorIf(test, constants.CV_ENODENET, node,
1727 "node hasn't returned node tcp connectivity data")
1729 if nresult[constants.NV_NODENETTEST]:
1730 nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1732 _ErrorIf(True, constants.CV_ENODENET, node,
1733 "tcp communication with node '%s': %s",
1734 anode, nresult[constants.NV_NODENETTEST][anode])
1736 test = constants.NV_MASTERIP not in nresult
1737 _ErrorIf(test, constants.CV_ENODENET, node,
1738 "node hasn't returned node master IP reachability data")
1740 if not nresult[constants.NV_MASTERIP]:
1741 if node == self.master_node:
1742 msg = "the master node cannot reach the master IP (not configured?)"
1744 msg = "cannot reach the master IP"
1745 _ErrorIf(True, constants.CV_ENODENET, node, msg)
1747 def _VerifyInstance(self, instance, inst_config, node_image,
1749 """Verify an instance.
1751 This function checks to see if the required block devices are
1752 available on the instance's node, and that the nodes are in the correct
1756 _ErrorIf = self._ErrorIf # pylint: disable=C0103
1757 pnode = inst_config.primary_node
1758 pnode_img = node_image[pnode]
1759 groupinfo = self.cfg.GetAllNodeGroupsInfo()
1761 node_vol_should = {}
1762 inst_config.MapLVsByNode(node_vol_should)
1764 cluster = self.cfg.GetClusterInfo()
1765 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
1767 err = ComputeIPolicyInstanceViolation(ipolicy, inst_config, self.cfg)
1768 _ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance, utils.CommaJoin(err),
1769 code=self.ETYPE_WARNING)
1771 for node in node_vol_should:
1772 n_img = node_image[node]
1773 if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1774 # ignore missing volumes on offline or broken nodes
1776 for volume in node_vol_should[node]:
1777 test = volume not in n_img.volumes
1778 _ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance,
1779 "volume %s missing on node %s", volume, node)
1781 if inst_config.admin_state == constants.ADMINST_UP:
1782 test = instance not in pnode_img.instances and not pnode_img.offline
1783 _ErrorIf(test, constants.CV_EINSTANCEDOWN, instance,
1784 "instance not running on its primary node %s",
1786 _ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE, instance,
1787 "instance is marked as running and lives on offline node %s",
1790 diskdata = [(nname, success, status, idx)
1791 for (nname, disks) in diskstatus.items()
1792 for idx, (success, status) in enumerate(disks)]
1794 for nname, success, bdev_status, idx in diskdata:
1795 # the 'ghost node' construction in Exec() ensures that we have a
1797 snode = node_image[nname]
1798 bad_snode = snode.ghost or snode.offline
1799 _ErrorIf(inst_config.disks_active and
1800 not success and not bad_snode,
1801 constants.CV_EINSTANCEFAULTYDISK, instance,
1802 "couldn't retrieve status for disk/%s on %s: %s",
1803 idx, nname, bdev_status)
1804 _ErrorIf((inst_config.disks_active and
1805 success and bdev_status.ldisk_status == constants.LDS_FAULTY),
1806 constants.CV_EINSTANCEFAULTYDISK, instance,
1807 "disk/%s on %s is faulty", idx, nname)
1809 _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
1810 constants.CV_ENODERPC, pnode, "instance %s, connection to"
1811 " primary node failed", instance)
1813 _ErrorIf(len(inst_config.secondary_nodes) > 1,
1814 constants.CV_EINSTANCELAYOUT,
1815 instance, "instance has multiple secondary nodes: %s",
1816 utils.CommaJoin(inst_config.secondary_nodes),
1817 code=self.ETYPE_WARNING)
1819 if inst_config.disk_template not in constants.DTS_EXCL_STORAGE:
1820 # Disk template not compatible with exclusive_storage: no instance
1821 # node should have the flag set
1822 es_flags = rpc.GetExclusiveStorageForNodeNames(self.cfg,
1823 inst_config.all_nodes)
1824 es_nodes = [n for (n, es) in es_flags.items()
1826 _ErrorIf(es_nodes, constants.CV_EINSTANCEUNSUITABLENODE, instance,
1827 "instance has template %s, which is not supported on nodes"
1828 " that have exclusive storage set: %s",
1829 inst_config.disk_template, utils.CommaJoin(es_nodes))
1831 if inst_config.disk_template in constants.DTS_INT_MIRROR:
1832 instance_nodes = utils.NiceSort(inst_config.all_nodes)
1833 instance_groups = {}
1835 for node in instance_nodes:
1836 instance_groups.setdefault(self.all_node_info[node].group,
1840 "%s (group %s)" % (utils.CommaJoin(nodes), groupinfo[group].name)
1841 # Sort so that we always list the primary node first.
1842 for group, nodes in sorted(instance_groups.items(),
1843 key=lambda (_, nodes): pnode in nodes,
1846 self._ErrorIf(len(instance_groups) > 1,
1847 constants.CV_EINSTANCESPLITGROUPS,
1848 instance, "instance has primary and secondary nodes in"
1849 " different groups: %s", utils.CommaJoin(pretty_list),
1850 code=self.ETYPE_WARNING)
1852 inst_nodes_offline = []
1853 for snode in inst_config.secondary_nodes:
1854 s_img = node_image[snode]
1855 _ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
1856 snode, "instance %s, connection to secondary node failed",
1860 inst_nodes_offline.append(snode)
1862 # warn that the instance lives on offline nodes
1863 _ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE, instance,
1864 "instance has offline secondary node(s) %s",
1865 utils.CommaJoin(inst_nodes_offline))
1866 # ... or ghost/non-vm_capable nodes
1867 for node in inst_config.all_nodes:
1868 _ErrorIf(node_image[node].ghost, constants.CV_EINSTANCEBADNODE,
1869 instance, "instance lives on ghost node %s", node)
1870 _ErrorIf(not node_image[node].vm_capable, constants.CV_EINSTANCEBADNODE,
1871 instance, "instance lives on non-vm_capable node %s", node)
1873 def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
1874 """Verify if there are any unknown volumes in the cluster.
1876 The .os, .swap and backup volumes are ignored. All other volumes are
1877 reported as unknown.
1879 @type reserved: L{ganeti.utils.FieldSet}
1880 @param reserved: a FieldSet of reserved volume names
1883 for node, n_img in node_image.items():
1884 if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
1885 self.all_node_info[node].group != self.group_uuid):
1886 # skip non-healthy nodes
1888 for volume in n_img.volumes:
1889 test = ((node not in node_vol_should or
1890 volume not in node_vol_should[node]) and
1891 not reserved.Matches(volume))
1892 self._ErrorIf(test, constants.CV_ENODEORPHANLV, node,
1893 "volume %s is unknown", volume)
1895 def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
1896 """Verify N+1 Memory Resilience.
1898 Check that if one single node dies we can still start all the
1899 instances it was primary for.
1902 cluster_info = self.cfg.GetClusterInfo()
1903 for node, n_img in node_image.items():
1904 # This code checks that every node which is now listed as
1905 # secondary has enough memory to host all instances it is
1906 # supposed to should a single other node in the cluster fail.
1907 # FIXME: not ready for failover to an arbitrary node
1908 # FIXME: does not support file-backed instances
1909 # WARNING: we currently take into account down instances as well
1910 # as up ones, considering that even if they're down someone
1911 # might want to start them even in the event of a node failure.
1912 if n_img.offline or self.all_node_info[node].group != self.group_uuid:
1913 # we're skipping nodes marked offline and nodes in other groups from
1914 # the N+1 warning, since most likely we don't have good memory
1915 # infromation from them; we already list instances living on such
1916 # nodes, and that's enough warning
1918 #TODO(dynmem): also consider ballooning out other instances
1919 for prinode, instances in n_img.sbp.items():
1921 for instance in instances:
1922 bep = cluster_info.FillBE(instance_cfg[instance])
1923 if bep[constants.BE_AUTO_BALANCE]:
1924 needed_mem += bep[constants.BE_MINMEM]
1925 test = n_img.mfree < needed_mem
1926 self._ErrorIf(test, constants.CV_ENODEN1, node,
1927 "not enough memory to accomodate instance failovers"
1928 " should node %s fail (%dMiB needed, %dMiB available)",
1929 prinode, needed_mem, n_img.mfree)
1932 def _VerifyFiles(cls, errorif, nodeinfo, master_node, all_nvinfo,
1933 (files_all, files_opt, files_mc, files_vm)):
1934 """Verifies file checksums collected from all nodes.
1936 @param errorif: Callback for reporting errors
1937 @param nodeinfo: List of L{objects.Node} objects
1938 @param master_node: Name of master node
1939 @param all_nvinfo: RPC results
1942 # Define functions determining which nodes to consider for a file
1945 (files_mc, lambda node: (node.master_candidate or
1946 node.name == master_node)),
1947 (files_vm, lambda node: node.vm_capable),
1950 # Build mapping from filename to list of nodes which should have the file
1952 for (files, fn) in files2nodefn:
1954 filenodes = nodeinfo
1956 filenodes = filter(fn, nodeinfo)
1957 nodefiles.update((filename,
1958 frozenset(map(operator.attrgetter("name"), filenodes)))
1959 for filename in files)
1961 assert set(nodefiles) == (files_all | files_mc | files_vm)
1963 fileinfo = dict((filename, {}) for filename in nodefiles)
1964 ignore_nodes = set()
1966 for node in nodeinfo:
1968 ignore_nodes.add(node.name)
1971 nresult = all_nvinfo[node.name]
1973 if nresult.fail_msg or not nresult.payload:
1976 fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
1977 node_files = dict((vcluster.LocalizeVirtualPath(key), value)
1978 for (key, value) in fingerprints.items())
1981 test = not (node_files and isinstance(node_files, dict))
1982 errorif(test, constants.CV_ENODEFILECHECK, node.name,
1983 "Node did not return file checksum data")
1985 ignore_nodes.add(node.name)
1988 # Build per-checksum mapping from filename to nodes having it
1989 for (filename, checksum) in node_files.items():
1990 assert filename in nodefiles
1991 fileinfo[filename].setdefault(checksum, set()).add(node.name)
1993 for (filename, checksums) in fileinfo.items():
1994 assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
1996 # Nodes having the file
1997 with_file = frozenset(node_name
1998 for nodes in fileinfo[filename].values()
1999 for node_name in nodes) - ignore_nodes
2001 expected_nodes = nodefiles[filename] - ignore_nodes
2003 # Nodes missing file
2004 missing_file = expected_nodes - with_file
2006 if filename in files_opt:
2008 errorif(missing_file and missing_file != expected_nodes,
2009 constants.CV_ECLUSTERFILECHECK, None,
2010 "File %s is optional, but it must exist on all or no"
2011 " nodes (not found on %s)",
2012 filename, utils.CommaJoin(utils.NiceSort(missing_file)))
2014 errorif(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2015 "File %s is missing from node(s) %s", filename,
2016 utils.CommaJoin(utils.NiceSort(missing_file)))
2018 # Warn if a node has a file it shouldn't
2019 unexpected = with_file - expected_nodes
2021 constants.CV_ECLUSTERFILECHECK, None,
2022 "File %s should not exist on node(s) %s",
2023 filename, utils.CommaJoin(utils.NiceSort(unexpected)))
2025 # See if there are multiple versions of the file
2026 test = len(checksums) > 1
2028 variants = ["variant %s on %s" %
2029 (idx + 1, utils.CommaJoin(utils.NiceSort(nodes)))
2030 for (idx, (checksum, nodes)) in
2031 enumerate(sorted(checksums.items()))]
2035 errorif(test, constants.CV_ECLUSTERFILECHECK, None,
2036 "File %s found with %s different checksums (%s)",
2037 filename, len(checksums), "; ".join(variants))
2039 def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2041 """Verifies and the node DRBD status.
2043 @type ninfo: L{objects.Node}
2044 @param ninfo: the node to check
2045 @param nresult: the remote results for the node
2046 @param instanceinfo: the dict of instances
2047 @param drbd_helper: the configured DRBD usermode helper
2048 @param drbd_map: the DRBD map as returned by
2049 L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2053 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2056 helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2057 test = (helper_result is None)
2058 _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2059 "no drbd usermode helper returned")
2061 status, payload = helper_result
2063 _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2064 "drbd usermode helper check unsuccessful: %s", payload)
2065 test = status and (payload != drbd_helper)
2066 _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2067 "wrong drbd usermode helper: %s", payload)
2069 # compute the DRBD minors
2071 for minor, instance in drbd_map[node].items():
2072 test = instance not in instanceinfo
2073 _ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2074 "ghost instance '%s' in temporary DRBD map", instance)
2075 # ghost instance should not be running, but otherwise we
2076 # don't give double warnings (both ghost instance and
2077 # unallocated minor in use)
2079 node_drbd[minor] = (instance, False)
2081 instance = instanceinfo[instance]
2082 node_drbd[minor] = (instance.name, instance.disks_active)
2084 # and now check them
2085 used_minors = nresult.get(constants.NV_DRBDLIST, [])
2086 test = not isinstance(used_minors, (tuple, list))
2087 _ErrorIf(test, constants.CV_ENODEDRBD, node,
2088 "cannot parse drbd status file: %s", str(used_minors))
2090 # we cannot check drbd status
2093 for minor, (iname, must_exist) in node_drbd.items():
2094 test = minor not in used_minors and must_exist
2095 _ErrorIf(test, constants.CV_ENODEDRBD, node,
2096 "drbd minor %d of instance %s is not active", minor, iname)
2097 for minor in used_minors:
2098 test = minor not in node_drbd
2099 _ErrorIf(test, constants.CV_ENODEDRBD, node,
2100 "unallocated drbd minor %d is in use", minor)
2102 def _UpdateNodeOS(self, ninfo, nresult, nimg):
2103 """Builds the node OS structures.
2105 @type ninfo: L{objects.Node}
2106 @param ninfo: the node to check
2107 @param nresult: the remote results for the node
2108 @param nimg: the node image object
2112 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2114 remote_os = nresult.get(constants.NV_OSLIST, None)
2115 test = (not isinstance(remote_os, list) or
2116 not compat.all(isinstance(v, list) and len(v) == 7
2117 for v in remote_os))
2119 _ErrorIf(test, constants.CV_ENODEOS, node,
2120 "node hasn't returned valid OS data")
2129 for (name, os_path, status, diagnose,
2130 variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2132 if name not in os_dict:
2135 # parameters is a list of lists instead of list of tuples due to
2136 # JSON lacking a real tuple type, fix it:
2137 parameters = [tuple(v) for v in parameters]
2138 os_dict[name].append((os_path, status, diagnose,
2139 set(variants), set(parameters), set(api_ver)))
2141 nimg.oslist = os_dict
2143 def _VerifyNodeOS(self, ninfo, nimg, base):
2144 """Verifies the node OS list.
2146 @type ninfo: L{objects.Node}
2147 @param ninfo: the node to check
2148 @param nimg: the node image object
2149 @param base: the 'template' node we match against (e.g. from the master)
2153 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2155 assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2157 beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2158 for os_name, os_data in nimg.oslist.items():
2159 assert os_data, "Empty OS status for OS %s?!" % os_name
2160 f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2161 _ErrorIf(not f_status, constants.CV_ENODEOS, node,
2162 "Invalid OS %s (located at %s): %s", os_name, f_path, f_diag)
2163 _ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, node,
2164 "OS '%s' has multiple entries (first one shadows the rest): %s",
2165 os_name, utils.CommaJoin([v[0] for v in os_data]))
2166 # comparisons with the 'base' image
2167 test = os_name not in base.oslist
2168 _ErrorIf(test, constants.CV_ENODEOS, node,
2169 "Extra OS %s not present on reference node (%s)",
2173 assert base.oslist[os_name], "Base node has empty OS status?"
2174 _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2176 # base OS is invalid, skipping
2178 for kind, a, b in [("API version", f_api, b_api),
2179 ("variants list", f_var, b_var),
2180 ("parameters", beautify_params(f_param),
2181 beautify_params(b_param))]:
2182 _ErrorIf(a != b, constants.CV_ENODEOS, node,
2183 "OS %s for %s differs from reference node %s: [%s] vs. [%s]",
2184 kind, os_name, base.name,
2185 utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2187 # check any missing OSes
2188 missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2189 _ErrorIf(missing, constants.CV_ENODEOS, node,
2190 "OSes present on reference node %s but missing on this node: %s",
2191 base.name, utils.CommaJoin(missing))
2193 def _VerifyFileStoragePaths(self, ninfo, nresult, is_master):
2194 """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2196 @type ninfo: L{objects.Node}
2197 @param ninfo: the node to check
2198 @param nresult: the remote results for the node
2199 @type is_master: bool
2200 @param is_master: Whether node is the master node
2206 (constants.ENABLE_FILE_STORAGE or
2207 constants.ENABLE_SHARED_FILE_STORAGE)):
2209 fspaths = nresult[constants.NV_FILE_STORAGE_PATHS]
2211 # This should never happen
2212 self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, node,
2213 "Node did not return forbidden file storage paths")
2215 self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, node,
2216 "Found forbidden file storage paths: %s",
2217 utils.CommaJoin(fspaths))
2219 self._ErrorIf(constants.NV_FILE_STORAGE_PATHS in nresult,
2220 constants.CV_ENODEFILESTORAGEPATHS, node,
2221 "Node should not have returned forbidden file storage"
2224 def _VerifyOob(self, ninfo, nresult):
2225 """Verifies out of band functionality of a node.
2227 @type ninfo: L{objects.Node}
2228 @param ninfo: the node to check
2229 @param nresult: the remote results for the node
2233 # We just have to verify the paths on master and/or master candidates
2234 # as the oob helper is invoked on the master
2235 if ((ninfo.master_candidate or ninfo.master_capable) and
2236 constants.NV_OOB_PATHS in nresult):
2237 for path_result in nresult[constants.NV_OOB_PATHS]:
2238 self._ErrorIf(path_result, constants.CV_ENODEOOBPATH, node, path_result)
2240 def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2241 """Verifies and updates the node volume data.
2243 This function will update a L{NodeImage}'s internal structures
2244 with data from the remote call.
2246 @type ninfo: L{objects.Node}
2247 @param ninfo: the node to check
2248 @param nresult: the remote results for the node
2249 @param nimg: the node image object
2250 @param vg_name: the configured VG name
2254 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2256 nimg.lvm_fail = True
2257 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2260 elif isinstance(lvdata, basestring):
2261 _ErrorIf(True, constants.CV_ENODELVM, node, "LVM problem on node: %s",
2262 utils.SafeEncode(lvdata))
2263 elif not isinstance(lvdata, dict):
2264 _ErrorIf(True, constants.CV_ENODELVM, node,
2265 "rpc call to node failed (lvlist)")
2267 nimg.volumes = lvdata
2268 nimg.lvm_fail = False
2270 def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2271 """Verifies and updates the node instance list.
2273 If the listing was successful, then updates this node's instance
2274 list. Otherwise, it marks the RPC call as failed for the instance
2277 @type ninfo: L{objects.Node}
2278 @param ninfo: the node to check
2279 @param nresult: the remote results for the node
2280 @param nimg: the node image object
2283 idata = nresult.get(constants.NV_INSTANCELIST, None)
2284 test = not isinstance(idata, list)
2285 self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2286 "rpc call to node failed (instancelist): %s",
2287 utils.SafeEncode(str(idata)))
2289 nimg.hyp_fail = True
2291 nimg.instances = idata
2293 def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2294 """Verifies and computes a node information map
2296 @type ninfo: L{objects.Node}
2297 @param ninfo: the node to check
2298 @param nresult: the remote results for the node
2299 @param nimg: the node image object
2300 @param vg_name: the configured VG name
2304 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2306 # try to read free memory (from the hypervisor)
2307 hv_info = nresult.get(constants.NV_HVINFO, None)
2308 test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2309 _ErrorIf(test, constants.CV_ENODEHV, node,
2310 "rpc call to node failed (hvinfo)")
2313 nimg.mfree = int(hv_info["memory_free"])
2314 except (ValueError, TypeError):
2315 _ErrorIf(True, constants.CV_ENODERPC, node,
2316 "node returned invalid nodeinfo, check hypervisor")
2318 # FIXME: devise a free space model for file based instances as well
2319 if vg_name is not None:
2320 test = (constants.NV_VGLIST not in nresult or
2321 vg_name not in nresult[constants.NV_VGLIST])
2322 _ErrorIf(test, constants.CV_ENODELVM, node,
2323 "node didn't return data for the volume group '%s'"
2324 " - it is either missing or broken", vg_name)
2327 nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2328 except (ValueError, TypeError):
2329 _ErrorIf(True, constants.CV_ENODERPC, node,
2330 "node returned invalid LVM info, check LVM status")
2332 def _CollectDiskInfo(self, nodelist, node_image, instanceinfo):
2333 """Gets per-disk status information for all instances.
2335 @type nodelist: list of strings
2336 @param nodelist: Node names
2337 @type node_image: dict of (name, L{objects.Node})
2338 @param node_image: Node objects
2339 @type instanceinfo: dict of (name, L{objects.Instance})
2340 @param instanceinfo: Instance objects
2341 @rtype: {instance: {node: [(succes, payload)]}}
2342 @return: a dictionary of per-instance dictionaries with nodes as
2343 keys and disk information as values; the disk information is a
2344 list of tuples (success, payload)
2347 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2350 node_disks_devonly = {}
2351 diskless_instances = set()
2352 diskless = constants.DT_DISKLESS
2354 for nname in nodelist:
2355 node_instances = list(itertools.chain(node_image[nname].pinst,
2356 node_image[nname].sinst))
2357 diskless_instances.update(inst for inst in node_instances
2358 if instanceinfo[inst].disk_template == diskless)
2359 disks = [(inst, disk)
2360 for inst in node_instances
2361 for disk in instanceinfo[inst].disks]
2364 # No need to collect data
2367 node_disks[nname] = disks
2369 # _AnnotateDiskParams makes already copies of the disks
2371 for (inst, dev) in disks:
2372 (anno_disk,) = AnnotateDiskParams(instanceinfo[inst], [dev], self.cfg)
2373 self.cfg.SetDiskID(anno_disk, nname)
2374 devonly.append(anno_disk)
2376 node_disks_devonly[nname] = devonly
2378 assert len(node_disks) == len(node_disks_devonly)
2380 # Collect data from all nodes with disks
2381 result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2384 assert len(result) == len(node_disks)
2388 for (nname, nres) in result.items():
2389 disks = node_disks[nname]
2392 # No data from this node
2393 data = len(disks) * [(False, "node offline")]
2396 _ErrorIf(msg, constants.CV_ENODERPC, nname,
2397 "while getting disk information: %s", msg)
2399 # No data from this node
2400 data = len(disks) * [(False, msg)]
2403 for idx, i in enumerate(nres.payload):
2404 if isinstance(i, (tuple, list)) and len(i) == 2:
2407 logging.warning("Invalid result from node %s, entry %d: %s",
2409 data.append((False, "Invalid result from the remote node"))
2411 for ((inst, _), status) in zip(disks, data):
2412 instdisk.setdefault(inst, {}).setdefault(nname, []).append(status)
2414 # Add empty entries for diskless instances.
2415 for inst in diskless_instances:
2416 assert inst not in instdisk
2419 assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2420 len(nnames) <= len(instanceinfo[inst].all_nodes) and
2421 compat.all(isinstance(s, (tuple, list)) and
2422 len(s) == 2 for s in statuses)
2423 for inst, nnames in instdisk.items()
2424 for nname, statuses in nnames.items())
2426 instdisk_keys = set(instdisk)
2427 instanceinfo_keys = set(instanceinfo)
2428 assert instdisk_keys == instanceinfo_keys, \
2429 ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2430 (instdisk_keys, instanceinfo_keys))
2435 def _SshNodeSelector(group_uuid, all_nodes):
2436 """Create endless iterators for all potential SSH check hosts.
2439 nodes = [node for node in all_nodes
2440 if (node.group != group_uuid and
2442 keyfunc = operator.attrgetter("group")
2444 return map(itertools.cycle,
2445 [sorted(map(operator.attrgetter("name"), names))
2446 for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2450 def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2451 """Choose which nodes should talk to which other nodes.
2453 We will make nodes contact all nodes in their group, and one node from
2456 @warning: This algorithm has a known issue if one node group is much
2457 smaller than others (e.g. just one node). In such a case all other
2458 nodes will talk to the single node.
2461 online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2462 sel = cls._SshNodeSelector(group_uuid, all_nodes)
2464 return (online_nodes,
2465 dict((name, sorted([i.next() for i in sel]))
2466 for name in online_nodes))
2468 def BuildHooksEnv(self):
2471 Cluster-Verify hooks just ran in the post phase and their failure makes
2472 the output be logged in the verify output and the verification to fail.
2476 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2479 env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2480 for node in self.my_node_info.values())
2484 def BuildHooksNodes(self):
2485 """Build hooks nodes.
2488 return ([], self.my_node_names)
2490 def Exec(self, feedback_fn):
2491 """Verify integrity of the node group, performing various test on nodes.
2494 # This method has too many local variables. pylint: disable=R0914
2495 feedback_fn("* Verifying group '%s'" % self.group_info.name)
2497 if not self.my_node_names:
2499 feedback_fn("* Empty node group, skipping verification")
2503 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2504 verbose = self.op.verbose
2505 self._feedback_fn = feedback_fn
2507 vg_name = self.cfg.GetVGName()
2508 drbd_helper = self.cfg.GetDRBDHelper()
2509 cluster = self.cfg.GetClusterInfo()
2510 hypervisors = cluster.enabled_hypervisors
2511 node_data_list = [self.my_node_info[name] for name in self.my_node_names]
2513 i_non_redundant = [] # Non redundant instances
2514 i_non_a_balanced = [] # Non auto-balanced instances
2515 i_offline = 0 # Count of offline instances
2516 n_offline = 0 # Count of offline nodes
2517 n_drained = 0 # Count of nodes being drained
2518 node_vol_should = {}
2520 # FIXME: verify OS list
2523 filemap = ComputeAncillaryFiles(cluster, False)
2525 # do local checksums
2526 master_node = self.master_node = self.cfg.GetMasterNode()
2527 master_ip = self.cfg.GetMasterIP()
2529 feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_names))
2532 if self.cfg.GetUseExternalMipScript():
2533 user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2535 node_verify_param = {
2536 constants.NV_FILELIST:
2537 map(vcluster.MakeVirtualPath,
2538 utils.UniqueSequence(filename
2539 for files in filemap
2540 for filename in files)),
2541 constants.NV_NODELIST:
2542 self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2543 self.all_node_info.values()),
2544 constants.NV_HYPERVISOR: hypervisors,
2545 constants.NV_HVPARAMS:
2546 _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2547 constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2548 for node in node_data_list
2549 if not node.offline],
2550 constants.NV_INSTANCELIST: hypervisors,
2551 constants.NV_VERSION: None,
2552 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2553 constants.NV_NODESETUP: None,
2554 constants.NV_TIME: None,
2555 constants.NV_MASTERIP: (master_node, master_ip),
2556 constants.NV_OSLIST: None,
2557 constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2558 constants.NV_USERSCRIPTS: user_scripts,
2561 if vg_name is not None:
2562 node_verify_param[constants.NV_VGLIST] = None
2563 node_verify_param[constants.NV_LVLIST] = vg_name
2564 node_verify_param[constants.NV_PVLIST] = [vg_name]
2567 node_verify_param[constants.NV_DRBDLIST] = None
2568 node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2570 if constants.ENABLE_FILE_STORAGE or constants.ENABLE_SHARED_FILE_STORAGE:
2571 # Load file storage paths only from master node
2572 node_verify_param[constants.NV_FILE_STORAGE_PATHS] = master_node
2575 # FIXME: this needs to be changed per node-group, not cluster-wide
2577 default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2578 if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2579 bridges.add(default_nicpp[constants.NIC_LINK])
2580 for instance in self.my_inst_info.values():
2581 for nic in instance.nics:
2582 full_nic = cluster.SimpleFillNIC(nic.nicparams)
2583 if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2584 bridges.add(full_nic[constants.NIC_LINK])
2587 node_verify_param[constants.NV_BRIDGES] = list(bridges)
2589 # Build our expected cluster state
2590 node_image = dict((node.name, self.NodeImage(offline=node.offline,
2592 vm_capable=node.vm_capable))
2593 for node in node_data_list)
2597 for node in self.all_node_info.values():
2598 path = SupportsOob(self.cfg, node)
2599 if path and path not in oob_paths:
2600 oob_paths.append(path)
2603 node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2605 for instance in self.my_inst_names:
2606 inst_config = self.my_inst_info[instance]
2607 if inst_config.admin_state == constants.ADMINST_OFFLINE:
2610 for nname in inst_config.all_nodes:
2611 if nname not in node_image:
2612 gnode = self.NodeImage(name=nname)
2613 gnode.ghost = (nname not in self.all_node_info)
2614 node_image[nname] = gnode
2616 inst_config.MapLVsByNode(node_vol_should)
2618 pnode = inst_config.primary_node
2619 node_image[pnode].pinst.append(instance)
2621 for snode in inst_config.secondary_nodes:
2622 nimg = node_image[snode]
2623 nimg.sinst.append(instance)
2624 if pnode not in nimg.sbp:
2625 nimg.sbp[pnode] = []
2626 nimg.sbp[pnode].append(instance)
2628 es_flags = rpc.GetExclusiveStorageForNodeNames(self.cfg, self.my_node_names)
2629 # The value of exclusive_storage should be the same across the group, so if
2630 # it's True for at least a node, we act as if it were set for all the nodes
2631 self._exclusive_storage = compat.any(es_flags.values())
2632 if self._exclusive_storage:
2633 node_verify_param[constants.NV_EXCLUSIVEPVS] = True
2635 # At this point, we have the in-memory data structures complete,
2636 # except for the runtime information, which we'll gather next
2638 # Due to the way our RPC system works, exact response times cannot be
2639 # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2640 # time before and after executing the request, we can at least have a time
2642 nvinfo_starttime = time.time()
2643 all_nvinfo = self.rpc.call_node_verify(self.my_node_names,
2645 self.cfg.GetClusterName())
2646 nvinfo_endtime = time.time()
2648 if self.extra_lv_nodes and vg_name is not None:
2650 self.rpc.call_node_verify(self.extra_lv_nodes,
2651 {constants.NV_LVLIST: vg_name},
2652 self.cfg.GetClusterName())
2654 extra_lv_nvinfo = {}
2656 all_drbd_map = self.cfg.ComputeDRBDMap()
2658 feedback_fn("* Gathering disk information (%s nodes)" %
2659 len(self.my_node_names))
2660 instdisk = self._CollectDiskInfo(self.my_node_names, node_image,
2663 feedback_fn("* Verifying configuration file consistency")
2665 # If not all nodes are being checked, we need to make sure the master node
2666 # and a non-checked vm_capable node are in the list.
2667 absent_nodes = set(self.all_node_info).difference(self.my_node_info)
2669 vf_nvinfo = all_nvinfo.copy()
2670 vf_node_info = list(self.my_node_info.values())
2671 additional_nodes = []
2672 if master_node not in self.my_node_info:
2673 additional_nodes.append(master_node)
2674 vf_node_info.append(self.all_node_info[master_node])
2675 # Add the first vm_capable node we find which is not included,
2676 # excluding the master node (which we already have)
2677 for node in absent_nodes:
2678 nodeinfo = self.all_node_info[node]
2679 if (nodeinfo.vm_capable and not nodeinfo.offline and
2680 node != master_node):
2681 additional_nodes.append(node)
2682 vf_node_info.append(self.all_node_info[node])
2684 key = constants.NV_FILELIST
2685 vf_nvinfo.update(self.rpc.call_node_verify(additional_nodes,
2686 {key: node_verify_param[key]},
2687 self.cfg.GetClusterName()))
2689 vf_nvinfo = all_nvinfo
2690 vf_node_info = self.my_node_info.values()
2692 self._VerifyFiles(_ErrorIf, vf_node_info, master_node, vf_nvinfo, filemap)
2694 feedback_fn("* Verifying node status")
2698 for node_i in node_data_list:
2700 nimg = node_image[node]
2704 feedback_fn("* Skipping offline node %s" % (node,))
2708 if node == master_node:
2710 elif node_i.master_candidate:
2711 ntype = "master candidate"
2712 elif node_i.drained:
2718 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
2720 msg = all_nvinfo[node].fail_msg
2721 _ErrorIf(msg, constants.CV_ENODERPC, node, "while contacting node: %s",
2724 nimg.rpc_fail = True
2727 nresult = all_nvinfo[node].payload
2729 nimg.call_ok = self._VerifyNode(node_i, nresult)
2730 self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2731 self._VerifyNodeNetwork(node_i, nresult)
2732 self._VerifyNodeUserScripts(node_i, nresult)
2733 self._VerifyOob(node_i, nresult)
2734 self._VerifyFileStoragePaths(node_i, nresult,
2735 node == master_node)
2738 self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
2739 self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
2742 self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2743 self._UpdateNodeInstances(node_i, nresult, nimg)
2744 self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2745 self._UpdateNodeOS(node_i, nresult, nimg)
2747 if not nimg.os_fail:
2748 if refos_img is None:
2750 self._VerifyNodeOS(node_i, nimg, refos_img)
2751 self._VerifyNodeBridges(node_i, nresult, bridges)
2753 # Check whether all running instancies are primary for the node. (This
2754 # can no longer be done from _VerifyInstance below, since some of the
2755 # wrong instances could be from other node groups.)
2756 non_primary_inst = set(nimg.instances).difference(nimg.pinst)
2758 for inst in non_primary_inst:
2759 test = inst in self.all_inst_info
2760 _ErrorIf(test, constants.CV_EINSTANCEWRONGNODE, inst,
2761 "instance should not run on node %s", node_i.name)
2762 _ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
2763 "node is running unknown instance %s", inst)
2765 self._VerifyGroupLVM(node_image, vg_name)
2767 for node, result in extra_lv_nvinfo.items():
2768 self._UpdateNodeVolumes(self.all_node_info[node], result.payload,
2769 node_image[node], vg_name)
2771 feedback_fn("* Verifying instance status")
2772 for instance in self.my_inst_names:
2774 feedback_fn("* Verifying instance %s" % instance)
2775 inst_config = self.my_inst_info[instance]
2776 self._VerifyInstance(instance, inst_config, node_image,
2779 # If the instance is non-redundant we cannot survive losing its primary
2780 # node, so we are not N+1 compliant.
2781 if inst_config.disk_template not in constants.DTS_MIRRORED:
2782 i_non_redundant.append(instance)
2784 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
2785 i_non_a_balanced.append(instance)
2787 feedback_fn("* Verifying orphan volumes")
2788 reserved = utils.FieldSet(*cluster.reserved_lvs)
2790 # We will get spurious "unknown volume" warnings if any node of this group
2791 # is secondary for an instance whose primary is in another group. To avoid
2792 # them, we find these instances and add their volumes to node_vol_should.
2793 for inst in self.all_inst_info.values():
2794 for secondary in inst.secondary_nodes:
2795 if (secondary in self.my_node_info
2796 and inst.name not in self.my_inst_info):
2797 inst.MapLVsByNode(node_vol_should)
2800 self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
2802 if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
2803 feedback_fn("* Verifying N+1 Memory redundancy")
2804 self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
2806 feedback_fn("* Other Notes")
2808 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
2809 % len(i_non_redundant))
2811 if i_non_a_balanced:
2812 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
2813 % len(i_non_a_balanced))
2816 feedback_fn(" - NOTICE: %d offline instance(s) found." % i_offline)
2819 feedback_fn(" - NOTICE: %d offline node(s) found." % n_offline)
2822 feedback_fn(" - NOTICE: %d drained node(s) found." % n_drained)
2826 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
2827 """Analyze the post-hooks' result
2829 This method analyses the hook result, handles it, and sends some
2830 nicely-formatted feedback back to the user.
2832 @param phase: one of L{constants.HOOKS_PHASE_POST} or
2833 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
2834 @param hooks_results: the results of the multi-node hooks rpc call
2835 @param feedback_fn: function used send feedback back to the caller
2836 @param lu_result: previous Exec result
2837 @return: the new Exec result, based on the previous result
2841 # We only really run POST phase hooks, only for non-empty groups,
2842 # and are only interested in their results
2843 if not self.my_node_names:
2846 elif phase == constants.HOOKS_PHASE_POST:
2847 # Used to change hooks' output to proper indentation
2848 feedback_fn("* Hooks Results")
2849 assert hooks_results, "invalid result from hooks"
2851 for node_name in hooks_results:
2852 res = hooks_results[node_name]
2854 test = msg and not res.offline
2855 self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
2856 "Communication failure in hooks execution: %s", msg)
2857 if res.offline or msg:
2858 # No need to investigate payload if node is offline or gave
2861 for script, hkr, output in res.payload:
2862 test = hkr == constants.HKR_FAIL
2863 self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
2864 "Script %s failed, output:", script)
2866 output = self._HOOKS_INDENT_RE.sub(" ", output)
2867 feedback_fn("%s" % output)
2873 class LUClusterVerifyDisks(NoHooksLU):
2874 """Verifies the cluster disks status.
2879 def ExpandNames(self):
2880 self.share_locks = ShareAll()
2881 self.needed_locks = {
2882 locking.LEVEL_NODEGROUP: locking.ALL_SET,
2885 def Exec(self, feedback_fn):
2886 group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
2888 # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
2889 return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
2890 for group in group_names])