4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Logical units dealing with the cluster."""
34 from ganeti import compat
35 from ganeti import constants
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import masterd
40 from ganeti import netutils
41 from ganeti import objects
42 from ganeti import opcodes
43 from ganeti import pathutils
44 from ganeti import query
45 from ganeti import rpc
46 from ganeti import runtime
47 from ganeti import ssh
48 from ganeti import uidpool
49 from ganeti import utils
50 from ganeti import vcluster
52 from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
54 from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55 ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56 GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57 GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58 CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59 ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob
61 import ganeti.masterd.instance
64 class LUClusterActivateMasterIp(NoHooksLU):
65 """Activate the master IP on the master node.
68 def Exec(self, feedback_fn):
69 """Activate the master IP.
72 master_params = self.cfg.GetMasterNetworkParameters()
73 ems = self.cfg.GetUseExternalMipScript()
74 result = self.rpc.call_node_activate_master_ip(master_params.name,
76 result.Raise("Could not activate the master IP")
79 class LUClusterDeactivateMasterIp(NoHooksLU):
80 """Deactivate the master IP on the master node.
83 def Exec(self, feedback_fn):
84 """Deactivate the master IP.
87 master_params = self.cfg.GetMasterNetworkParameters()
88 ems = self.cfg.GetUseExternalMipScript()
89 result = self.rpc.call_node_deactivate_master_ip(master_params.name,
91 result.Raise("Could not deactivate the master IP")
94 class LUClusterConfigQuery(NoHooksLU):
95 """Return configuration values.
100 def CheckArguments(self):
101 self.cq = ClusterQuery(None, self.op.output_fields, False)
103 def ExpandNames(self):
104 self.cq.ExpandNames(self)
106 def DeclareLocks(self, level):
107 self.cq.DeclareLocks(self, level)
109 def Exec(self, feedback_fn):
110 result = self.cq.OldStyleQuery(self)
112 assert len(result) == 1
117 class LUClusterDestroy(LogicalUnit):
118 """Logical unit for destroying the cluster.
121 HPATH = "cluster-destroy"
122 HTYPE = constants.HTYPE_CLUSTER
124 def BuildHooksEnv(self):
129 "OP_TARGET": self.cfg.GetClusterName(),
132 def BuildHooksNodes(self):
133 """Build hooks nodes.
138 def CheckPrereq(self):
139 """Check prerequisites.
141 This checks whether the cluster is empty.
143 Any errors are signaled by raising errors.OpPrereqError.
146 master = self.cfg.GetMasterNode()
148 nodelist = self.cfg.GetNodeList()
149 if len(nodelist) != 1 or nodelist[0] != master:
150 raise errors.OpPrereqError("There are still %d node(s) in"
151 " this cluster." % (len(nodelist) - 1),
153 instancelist = self.cfg.GetInstanceList()
155 raise errors.OpPrereqError("There are still %d instance(s) in"
156 " this cluster." % len(instancelist),
159 def Exec(self, feedback_fn):
160 """Destroys the cluster.
163 master_params = self.cfg.GetMasterNetworkParameters()
165 # Run post hooks on master node before it's removed
166 RunPostHook(self, master_params.name)
168 ems = self.cfg.GetUseExternalMipScript()
169 result = self.rpc.call_node_deactivate_master_ip(master_params.name,
172 self.LogWarning("Error disabling the master IP address: %s",
175 return master_params.name
178 class LUClusterPostInit(LogicalUnit):
179 """Logical unit for running hooks after cluster initialization.
182 HPATH = "cluster-init"
183 HTYPE = constants.HTYPE_CLUSTER
185 def BuildHooksEnv(self):
190 "OP_TARGET": self.cfg.GetClusterName(),
193 def BuildHooksNodes(self):
194 """Build hooks nodes.
197 return ([], [self.cfg.GetMasterNode()])
199 def Exec(self, feedback_fn):
206 class ClusterQuery(QueryBase):
207 FIELDS = query.CLUSTER_FIELDS
209 #: Do not sort (there is only one item)
212 def ExpandNames(self, lu):
215 # The following variables interact with _QueryBase._GetNames
216 self.wanted = locking.ALL_SET
217 self.do_locking = self.use_locking
220 raise errors.OpPrereqError("Can not use locking for cluster queries",
223 def DeclareLocks(self, lu, level):
226 def _GetQueryData(self, lu):
227 """Computes the list of nodes and their attributes.
230 # Locking is not used
231 assert not (compat.any(lu.glm.is_owned(level)
232 for level in locking.LEVELS
233 if level != locking.LEVEL_CLUSTER) or
234 self.do_locking or self.use_locking)
236 if query.CQ_CONFIG in self.requested_data:
237 cluster = lu.cfg.GetClusterInfo()
239 cluster = NotImplemented
241 if query.CQ_QUEUE_DRAINED in self.requested_data:
242 drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
244 drain_flag = NotImplemented
246 if query.CQ_WATCHER_PAUSE in self.requested_data:
247 master_name = lu.cfg.GetMasterNode()
249 result = lu.rpc.call_get_watcher_pause(master_name)
250 result.Raise("Can't retrieve watcher pause from master node '%s'" %
253 watcher_pause = result.payload
255 watcher_pause = NotImplemented
257 return query.ClusterQueryData(cluster, drain_flag, watcher_pause)
260 class LUClusterQuery(NoHooksLU):
261 """Query cluster configuration.
266 def ExpandNames(self):
267 self.needed_locks = {}
269 def Exec(self, feedback_fn):
270 """Return cluster config.
273 cluster = self.cfg.GetClusterInfo()
276 # Filter just for enabled hypervisors
277 for os_name, hv_dict in cluster.os_hvp.items():
279 for hv_name, hv_params in hv_dict.items():
280 if hv_name in cluster.enabled_hypervisors:
281 os_hvp[os_name][hv_name] = hv_params
283 # Convert ip_family to ip_version
284 primary_ip_version = constants.IP4_VERSION
285 if cluster.primary_ip_family == netutils.IP6Address.family:
286 primary_ip_version = constants.IP6_VERSION
289 "software_version": constants.RELEASE_VERSION,
290 "protocol_version": constants.PROTOCOL_VERSION,
291 "config_version": constants.CONFIG_VERSION,
292 "os_api_version": max(constants.OS_API_VERSIONS),
293 "export_version": constants.EXPORT_VERSION,
294 "architecture": runtime.GetArchInfo(),
295 "name": cluster.cluster_name,
296 "master": cluster.master_node,
297 "default_hypervisor": cluster.primary_hypervisor,
298 "enabled_hypervisors": cluster.enabled_hypervisors,
299 "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
300 for hypervisor_name in cluster.enabled_hypervisors]),
302 "beparams": cluster.beparams,
303 "osparams": cluster.osparams,
304 "ipolicy": cluster.ipolicy,
305 "nicparams": cluster.nicparams,
306 "ndparams": cluster.ndparams,
307 "diskparams": cluster.diskparams,
308 "candidate_pool_size": cluster.candidate_pool_size,
309 "master_netdev": cluster.master_netdev,
310 "master_netmask": cluster.master_netmask,
311 "use_external_mip_script": cluster.use_external_mip_script,
312 "volume_group_name": cluster.volume_group_name,
313 "drbd_usermode_helper": cluster.drbd_usermode_helper,
314 "file_storage_dir": cluster.file_storage_dir,
315 "shared_file_storage_dir": cluster.shared_file_storage_dir,
316 "maintain_node_health": cluster.maintain_node_health,
317 "ctime": cluster.ctime,
318 "mtime": cluster.mtime,
319 "uuid": cluster.uuid,
320 "tags": list(cluster.GetTags()),
321 "uid_pool": cluster.uid_pool,
322 "default_iallocator": cluster.default_iallocator,
323 "reserved_lvs": cluster.reserved_lvs,
324 "primary_ip_version": primary_ip_version,
325 "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
326 "hidden_os": cluster.hidden_os,
327 "blacklisted_os": cluster.blacklisted_os,
328 "enabled_disk_templates": cluster.enabled_disk_templates,
334 class LUClusterRedistConf(NoHooksLU):
335 """Force the redistribution of cluster configuration.
337 This is a very simple LU.
342 def ExpandNames(self):
343 self.needed_locks = {
344 locking.LEVEL_NODE: locking.ALL_SET,
345 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
347 self.share_locks = ShareAll()
349 def Exec(self, feedback_fn):
350 """Redistribute the configuration.
353 self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
354 RedistributeAncillaryFiles(self)
357 class LUClusterRename(LogicalUnit):
358 """Rename the cluster.
361 HPATH = "cluster-rename"
362 HTYPE = constants.HTYPE_CLUSTER
364 def BuildHooksEnv(self):
369 "OP_TARGET": self.cfg.GetClusterName(),
370 "NEW_NAME": self.op.name,
373 def BuildHooksNodes(self):
374 """Build hooks nodes.
377 return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
379 def CheckPrereq(self):
380 """Verify that the passed name is a valid one.
383 hostname = netutils.GetHostname(name=self.op.name,
384 family=self.cfg.GetPrimaryIPFamily())
386 new_name = hostname.name
387 self.ip = new_ip = hostname.ip
388 old_name = self.cfg.GetClusterName()
389 old_ip = self.cfg.GetMasterIP()
390 if new_name == old_name and new_ip == old_ip:
391 raise errors.OpPrereqError("Neither the name nor the IP address of the"
392 " cluster has changed",
395 if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
396 raise errors.OpPrereqError("The given cluster IP address (%s) is"
397 " reachable on the network" %
398 new_ip, errors.ECODE_NOTUNIQUE)
400 self.op.name = new_name
402 def Exec(self, feedback_fn):
403 """Rename the cluster.
406 clustername = self.op.name
409 # shutdown the master IP
410 master_params = self.cfg.GetMasterNetworkParameters()
411 ems = self.cfg.GetUseExternalMipScript()
412 result = self.rpc.call_node_deactivate_master_ip(master_params.name,
414 result.Raise("Could not disable the master role")
417 cluster = self.cfg.GetClusterInfo()
418 cluster.cluster_name = clustername
419 cluster.master_ip = new_ip
420 self.cfg.Update(cluster, feedback_fn)
422 # update the known hosts file
423 ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
424 node_list = self.cfg.GetOnlineNodeList()
426 node_list.remove(master_params.name)
429 UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
431 master_params.ip = new_ip
432 result = self.rpc.call_node_activate_master_ip(master_params.name,
434 msg = result.fail_msg
436 self.LogWarning("Could not re-enable the master role on"
437 " the master, please restart manually: %s", msg)
442 class LUClusterRepairDiskSizes(NoHooksLU):
443 """Verifies the cluster disks sizes.
448 def ExpandNames(self):
449 if self.op.instances:
450 self.wanted_names = GetWantedInstances(self, self.op.instances)
451 # Not getting the node allocation lock as only a specific set of
452 # instances (and their nodes) is going to be acquired
453 self.needed_locks = {
454 locking.LEVEL_NODE_RES: [],
455 locking.LEVEL_INSTANCE: self.wanted_names,
457 self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
459 self.wanted_names = None
460 self.needed_locks = {
461 locking.LEVEL_NODE_RES: locking.ALL_SET,
462 locking.LEVEL_INSTANCE: locking.ALL_SET,
464 # This opcode is acquires the node locks for all instances
465 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
469 locking.LEVEL_NODE_RES: 1,
470 locking.LEVEL_INSTANCE: 0,
471 locking.LEVEL_NODE_ALLOC: 1,
474 def DeclareLocks(self, level):
475 if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
476 self._LockInstancesNodes(primary_only=True, level=level)
478 def CheckPrereq(self):
479 """Check prerequisites.
481 This only checks the optional instance list against the existing names.
484 if self.wanted_names is None:
485 self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
487 self.wanted_instances = \
488 map(compat.snd, self.cfg.GetMultiInstanceInfo(self.wanted_names))
490 def _EnsureChildSizes(self, disk):
491 """Ensure children of the disk have the needed disk size.
493 This is valid mainly for DRBD8 and fixes an issue where the
494 children have smaller disk size.
496 @param disk: an L{ganeti.objects.Disk} object
499 if disk.dev_type == constants.LD_DRBD8:
500 assert disk.children, "Empty children for DRBD8?"
501 fchild = disk.children[0]
502 mismatch = fchild.size < disk.size
504 self.LogInfo("Child disk has size %d, parent %d, fixing",
505 fchild.size, disk.size)
506 fchild.size = disk.size
508 # and we recurse on this child only, not on the metadev
509 return self._EnsureChildSizes(fchild) or mismatch
513 def Exec(self, feedback_fn):
514 """Verify the size of cluster disks.
517 # TODO: check child disks too
518 # TODO: check differences in size between primary/secondary nodes
520 for instance in self.wanted_instances:
521 pnode = instance.primary_node
522 if pnode not in per_node_disks:
523 per_node_disks[pnode] = []
524 for idx, disk in enumerate(instance.disks):
525 per_node_disks[pnode].append((instance, idx, disk))
527 assert not (frozenset(per_node_disks.keys()) -
528 self.owned_locks(locking.LEVEL_NODE_RES)), \
529 "Not owning correct locks"
530 assert not self.owned_locks(locking.LEVEL_NODE)
533 for node, dskl in per_node_disks.items():
534 newl = [v[2].Copy() for v in dskl]
536 self.cfg.SetDiskID(dsk, node)
537 result = self.rpc.call_blockdev_getsize(node, newl)
539 self.LogWarning("Failure in blockdev_getsize call to node"
540 " %s, ignoring", node)
542 if len(result.payload) != len(dskl):
543 logging.warning("Invalid result from node %s: len(dksl)=%d,"
544 " result.payload=%s", node, len(dskl), result.payload)
545 self.LogWarning("Invalid result from node %s, ignoring node results",
548 for ((instance, idx, disk), size) in zip(dskl, result.payload):
550 self.LogWarning("Disk %d of instance %s did not return size"
551 " information, ignoring", idx, instance.name)
553 if not isinstance(size, (int, long)):
554 self.LogWarning("Disk %d of instance %s did not return valid"
555 " size information, ignoring", idx, instance.name)
558 if size != disk.size:
559 self.LogInfo("Disk %d of instance %s has mismatched size,"
560 " correcting: recorded %d, actual %d", idx,
561 instance.name, disk.size, size)
563 self.cfg.Update(instance, feedback_fn)
564 changed.append((instance.name, idx, size))
565 if self._EnsureChildSizes(disk):
566 self.cfg.Update(instance, feedback_fn)
567 changed.append((instance.name, idx, disk.size))
571 def _ValidateNetmask(cfg, netmask):
572 """Checks if a netmask is valid.
574 @type cfg: L{config.ConfigWriter}
575 @param cfg: The cluster configuration
577 @param netmask: the netmask to be verified
578 @raise errors.OpPrereqError: if the validation fails
581 ip_family = cfg.GetPrimaryIPFamily()
583 ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
584 except errors.ProgrammerError:
585 raise errors.OpPrereqError("Invalid primary ip family: %s." %
586 ip_family, errors.ECODE_INVAL)
587 if not ipcls.ValidateNetmask(netmask):
588 raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
589 (netmask), errors.ECODE_INVAL)
592 class LUClusterSetParams(LogicalUnit):
593 """Change the parameters of the cluster.
596 HPATH = "cluster-modify"
597 HTYPE = constants.HTYPE_CLUSTER
600 def CheckArguments(self):
605 uidpool.CheckUidPool(self.op.uid_pool)
608 uidpool.CheckUidPool(self.op.add_uids)
610 if self.op.remove_uids:
611 uidpool.CheckUidPool(self.op.remove_uids)
613 if self.op.master_netmask is not None:
614 _ValidateNetmask(self.cfg, self.op.master_netmask)
616 if self.op.diskparams:
617 for dt_params in self.op.diskparams.values():
618 utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
620 utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
621 except errors.OpPrereqError, err:
622 raise errors.OpPrereqError("While verify diskparams options: %s" % err,
625 def ExpandNames(self):
626 # FIXME: in the future maybe other cluster params won't require checking on
627 # all nodes to be modified.
628 # FIXME: This opcode changes cluster-wide settings. Is acquiring all
629 # resource locks the right thing, shouldn't it be the BGL instead?
630 self.needed_locks = {
631 locking.LEVEL_NODE: locking.ALL_SET,
632 locking.LEVEL_INSTANCE: locking.ALL_SET,
633 locking.LEVEL_NODEGROUP: locking.ALL_SET,
634 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
636 self.share_locks = ShareAll()
638 def BuildHooksEnv(self):
643 "OP_TARGET": self.cfg.GetClusterName(),
644 "NEW_VG_NAME": self.op.vg_name,
647 def BuildHooksNodes(self):
648 """Build hooks nodes.
651 mn = self.cfg.GetMasterNode()
654 def CheckPrereq(self):
655 """Check prerequisites.
657 This checks whether the given params don't conflict and
658 if the given volume group is valid.
661 if self.op.vg_name is not None and not self.op.vg_name:
662 if self.cfg.HasAnyDiskOfType(constants.LD_LV):
663 raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
664 " instances exist", errors.ECODE_INVAL)
666 if self.op.drbd_helper is not None and not self.op.drbd_helper:
667 if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
668 raise errors.OpPrereqError("Cannot disable drbd helper while"
669 " drbd-based instances exist",
672 node_list = self.owned_locks(locking.LEVEL_NODE)
674 vm_capable_nodes = [node.name
675 for node in self.cfg.GetAllNodesInfo().values()
676 if node.name in node_list and node.vm_capable]
678 # if vg_name not None, checks given volume group on all nodes
680 vglist = self.rpc.call_vg_list(vm_capable_nodes)
681 for node in vm_capable_nodes:
682 msg = vglist[node].fail_msg
685 self.LogWarning("Error while gathering data on node %s"
686 " (ignoring node): %s", node, msg)
688 vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
690 constants.MIN_VG_SIZE)
692 raise errors.OpPrereqError("Error on node '%s': %s" %
693 (node, vgstatus), errors.ECODE_ENVIRON)
695 if self.op.drbd_helper:
696 # checks given drbd helper on all nodes
697 helpers = self.rpc.call_drbd_helper(node_list)
698 for (node, ninfo) in self.cfg.GetMultiNodeInfo(node_list):
700 self.LogInfo("Not checking drbd helper on offline node %s", node)
702 msg = helpers[node].fail_msg
704 raise errors.OpPrereqError("Error checking drbd helper on node"
705 " '%s': %s" % (node, msg),
706 errors.ECODE_ENVIRON)
707 node_helper = helpers[node].payload
708 if node_helper != self.op.drbd_helper:
709 raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
710 (node, node_helper), errors.ECODE_ENVIRON)
712 self.cluster = cluster = self.cfg.GetClusterInfo()
713 # validate params changes
715 objects.UpgradeBeParams(self.op.beparams)
716 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
717 self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
720 utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
721 self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
723 # TODO: we need a more general way to handle resetting
724 # cluster-level parameters to default values
725 if self.new_ndparams["oob_program"] == "":
726 self.new_ndparams["oob_program"] = \
727 constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
730 new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
731 self.cluster.hv_state_static)
732 self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
733 for hv, values in new_hv_state.items())
735 if self.op.disk_state:
736 new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
737 self.cluster.disk_state_static)
738 self.new_disk_state = \
739 dict((storage, dict((name, cluster.SimpleFillDiskState(values))
740 for name, values in svalues.items()))
741 for storage, svalues in new_disk_state.items())
744 self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
747 all_instances = self.cfg.GetAllInstancesInfo().values()
749 for group in self.cfg.GetAllNodeGroupsInfo().values():
750 instances = frozenset([inst for inst in all_instances
751 if compat.any(node in group.members
752 for node in inst.all_nodes)])
753 new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
754 ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
755 new = ComputeNewInstanceViolations(ipol,
756 new_ipolicy, instances, self.cfg)
758 violations.update(new)
761 self.LogWarning("After the ipolicy change the following instances"
763 utils.CommaJoin(utils.NiceSort(violations)))
765 if self.op.nicparams:
766 utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
767 self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
768 objects.NIC.CheckParameterSyntax(self.new_nicparams)
771 # check all instances for consistency
772 for instance in self.cfg.GetAllInstancesInfo().values():
773 for nic_idx, nic in enumerate(instance.nics):
774 params_copy = copy.deepcopy(nic.nicparams)
775 params_filled = objects.FillDict(self.new_nicparams, params_copy)
777 # check parameter syntax
779 objects.NIC.CheckParameterSyntax(params_filled)
780 except errors.ConfigurationError, err:
781 nic_errors.append("Instance %s, nic/%d: %s" %
782 (instance.name, nic_idx, err))
784 # if we're moving instances to routed, check that they have an ip
785 target_mode = params_filled[constants.NIC_MODE]
786 if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
787 nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
788 " address" % (instance.name, nic_idx))
790 raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
791 "\n".join(nic_errors), errors.ECODE_INVAL)
793 # hypervisor list/parameters
794 self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
796 for hv_name, hv_dict in self.op.hvparams.items():
797 if hv_name not in self.new_hvparams:
798 self.new_hvparams[hv_name] = hv_dict
800 self.new_hvparams[hv_name].update(hv_dict)
802 # disk template parameters
803 self.new_diskparams = objects.FillDict(cluster.diskparams, {})
804 if self.op.diskparams:
805 for dt_name, dt_params in self.op.diskparams.items():
806 if dt_name not in self.op.diskparams:
807 self.new_diskparams[dt_name] = dt_params
809 self.new_diskparams[dt_name].update(dt_params)
811 # os hypervisor parameters
812 self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
814 for os_name, hvs in self.op.os_hvp.items():
815 if os_name not in self.new_os_hvp:
816 self.new_os_hvp[os_name] = hvs
818 for hv_name, hv_dict in hvs.items():
820 # Delete if it exists
821 self.new_os_hvp[os_name].pop(hv_name, None)
822 elif hv_name not in self.new_os_hvp[os_name]:
823 self.new_os_hvp[os_name][hv_name] = hv_dict
825 self.new_os_hvp[os_name][hv_name].update(hv_dict)
828 self.new_osp = objects.FillDict(cluster.osparams, {})
830 for os_name, osp in self.op.osparams.items():
831 if os_name not in self.new_osp:
832 self.new_osp[os_name] = {}
834 self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
837 if not self.new_osp[os_name]:
838 # we removed all parameters
839 del self.new_osp[os_name]
841 # check the parameter validity (remote check)
842 CheckOSParams(self, False, [self.cfg.GetMasterNode()],
843 os_name, self.new_osp[os_name])
845 # changes to the hypervisor list
846 if self.op.enabled_hypervisors is not None:
847 self.hv_list = self.op.enabled_hypervisors
848 for hv in self.hv_list:
849 # if the hypervisor doesn't already exist in the cluster
850 # hvparams, we initialize it to empty, and then (in both
851 # cases) we make sure to fill the defaults, as we might not
852 # have a complete defaults list if the hypervisor wasn't
854 if hv not in new_hvp:
856 new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
857 utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
859 self.hv_list = cluster.enabled_hypervisors
861 if self.op.hvparams or self.op.enabled_hypervisors is not None:
862 # either the enabled list has changed, or the parameters have, validate
863 for hv_name, hv_params in self.new_hvparams.items():
864 if ((self.op.hvparams and hv_name in self.op.hvparams) or
865 (self.op.enabled_hypervisors and
866 hv_name in self.op.enabled_hypervisors)):
867 # either this is a new hypervisor, or its parameters have changed
868 hv_class = hypervisor.GetHypervisorClass(hv_name)
869 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
870 hv_class.CheckParameterSyntax(hv_params)
871 CheckHVParams(self, node_list, hv_name, hv_params)
873 self._CheckDiskTemplateConsistency()
876 # no need to check any newly-enabled hypervisors, since the
877 # defaults have already been checked in the above code-block
878 for os_name, os_hvp in self.new_os_hvp.items():
879 for hv_name, hv_params in os_hvp.items():
880 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
881 # we need to fill in the new os_hvp on top of the actual hv_p
882 cluster_defaults = self.new_hvparams.get(hv_name, {})
883 new_osp = objects.FillDict(cluster_defaults, hv_params)
884 hv_class = hypervisor.GetHypervisorClass(hv_name)
885 hv_class.CheckParameterSyntax(new_osp)
886 CheckHVParams(self, node_list, hv_name, new_osp)
888 if self.op.default_iallocator:
889 alloc_script = utils.FindFile(self.op.default_iallocator,
890 constants.IALLOCATOR_SEARCH_PATH,
892 if alloc_script is None:
893 raise errors.OpPrereqError("Invalid default iallocator script '%s'"
894 " specified" % self.op.default_iallocator,
897 def _CheckDiskTemplateConsistency(self):
898 """Check whether the disk templates that are going to be disabled
899 are still in use by some instances.
902 if self.op.enabled_disk_templates:
903 cluster = self.cfg.GetClusterInfo()
904 instances = self.cfg.GetAllInstancesInfo()
906 disk_templates_to_remove = set(cluster.enabled_disk_templates) \
907 - set(self.op.enabled_disk_templates)
908 for instance in instances.itervalues():
909 if instance.disk_template in disk_templates_to_remove:
910 raise errors.OpPrereqError("Cannot disable disk template '%s',"
911 " because instance '%s' is using it." %
912 (instance.disk_template, instance.name))
914 def Exec(self, feedback_fn):
915 """Change the parameters of the cluster.
918 if self.op.vg_name is not None:
919 new_volume = self.op.vg_name
922 if new_volume != self.cfg.GetVGName():
923 self.cfg.SetVGName(new_volume)
925 feedback_fn("Cluster LVM configuration already in desired"
926 " state, not changing")
927 if self.op.drbd_helper is not None:
928 new_helper = self.op.drbd_helper
931 if new_helper != self.cfg.GetDRBDHelper():
932 self.cfg.SetDRBDHelper(new_helper)
934 feedback_fn("Cluster DRBD helper already in desired state,"
937 self.cluster.hvparams = self.new_hvparams
939 self.cluster.os_hvp = self.new_os_hvp
940 if self.op.enabled_hypervisors is not None:
941 self.cluster.hvparams = self.new_hvparams
942 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
943 if self.op.enabled_disk_templates:
944 self.cluster.enabled_disk_templates = \
945 list(set(self.op.enabled_disk_templates))
947 self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
948 if self.op.nicparams:
949 self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
951 self.cluster.ipolicy = self.new_ipolicy
953 self.cluster.osparams = self.new_osp
955 self.cluster.ndparams = self.new_ndparams
956 if self.op.diskparams:
957 self.cluster.diskparams = self.new_diskparams
959 self.cluster.hv_state_static = self.new_hv_state
960 if self.op.disk_state:
961 self.cluster.disk_state_static = self.new_disk_state
963 if self.op.candidate_pool_size is not None:
964 self.cluster.candidate_pool_size = self.op.candidate_pool_size
965 # we need to update the pool size here, otherwise the save will fail
966 AdjustCandidatePool(self, [])
968 if self.op.maintain_node_health is not None:
969 if self.op.maintain_node_health and not constants.ENABLE_CONFD:
970 feedback_fn("Note: CONFD was disabled at build time, node health"
971 " maintenance is not useful (still enabling it)")
972 self.cluster.maintain_node_health = self.op.maintain_node_health
974 if self.op.prealloc_wipe_disks is not None:
975 self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
977 if self.op.add_uids is not None:
978 uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
980 if self.op.remove_uids is not None:
981 uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
983 if self.op.uid_pool is not None:
984 self.cluster.uid_pool = self.op.uid_pool
986 if self.op.default_iallocator is not None:
987 self.cluster.default_iallocator = self.op.default_iallocator
989 if self.op.reserved_lvs is not None:
990 self.cluster.reserved_lvs = self.op.reserved_lvs
992 if self.op.use_external_mip_script is not None:
993 self.cluster.use_external_mip_script = self.op.use_external_mip_script
995 def helper_os(aname, mods, desc):
997 lst = getattr(self.cluster, aname)
998 for key, val in mods:
999 if key == constants.DDM_ADD:
1001 feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1004 elif key == constants.DDM_REMOVE:
1008 feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1010 raise errors.ProgrammerError("Invalid modification '%s'" % key)
1012 if self.op.hidden_os:
1013 helper_os("hidden_os", self.op.hidden_os, "hidden")
1015 if self.op.blacklisted_os:
1016 helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1018 if self.op.master_netdev:
1019 master_params = self.cfg.GetMasterNetworkParameters()
1020 ems = self.cfg.GetUseExternalMipScript()
1021 feedback_fn("Shutting down master ip on the current netdev (%s)" %
1022 self.cluster.master_netdev)
1023 result = self.rpc.call_node_deactivate_master_ip(master_params.name,
1025 if not self.op.force:
1026 result.Raise("Could not disable the master ip")
1029 msg = ("Could not disable the master ip (continuing anyway): %s" %
1032 feedback_fn("Changing master_netdev from %s to %s" %
1033 (master_params.netdev, self.op.master_netdev))
1034 self.cluster.master_netdev = self.op.master_netdev
1036 if self.op.master_netmask:
1037 master_params = self.cfg.GetMasterNetworkParameters()
1038 feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1039 result = self.rpc.call_node_change_master_netmask(master_params.name,
1040 master_params.netmask,
1041 self.op.master_netmask,
1043 master_params.netdev)
1045 msg = "Could not change the master IP netmask: %s" % result.fail_msg
1048 self.cluster.master_netmask = self.op.master_netmask
1050 self.cfg.Update(self.cluster, feedback_fn)
1052 if self.op.master_netdev:
1053 master_params = self.cfg.GetMasterNetworkParameters()
1054 feedback_fn("Starting the master ip on the new master netdev (%s)" %
1055 self.op.master_netdev)
1056 ems = self.cfg.GetUseExternalMipScript()
1057 result = self.rpc.call_node_activate_master_ip(master_params.name,
1060 self.LogWarning("Could not re-enable the master ip on"
1061 " the master, please restart manually: %s",
1065 class LUClusterVerify(NoHooksLU):
1066 """Submits all jobs necessary to verify the cluster.
1071 def ExpandNames(self):
1072 self.needed_locks = {}
1074 def Exec(self, feedback_fn):
1077 if self.op.group_name:
1078 groups = [self.op.group_name]
1079 depends_fn = lambda: None
1081 groups = self.cfg.GetNodeGroupList()
1083 # Verify global configuration
1085 opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1088 # Always depend on global verification
1089 depends_fn = lambda: [(-len(jobs), [])]
1092 [opcodes.OpClusterVerifyGroup(group_name=group,
1093 ignore_errors=self.op.ignore_errors,
1094 depends=depends_fn())]
1095 for group in groups)
1097 # Fix up all parameters
1098 for op in itertools.chain(*jobs): # pylint: disable=W0142
1099 op.debug_simulate_errors = self.op.debug_simulate_errors
1100 op.verbose = self.op.verbose
1101 op.error_codes = self.op.error_codes
1103 op.skip_checks = self.op.skip_checks
1104 except AttributeError:
1105 assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1107 return ResultWithJobs(jobs)
1110 class _VerifyErrors(object):
1111 """Mix-in for cluster/group verify LUs.
1113 It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1114 self.op and self._feedback_fn to be available.)
1118 ETYPE_FIELD = "code"
1119 ETYPE_ERROR = "ERROR"
1120 ETYPE_WARNING = "WARNING"
1122 def _Error(self, ecode, item, msg, *args, **kwargs):
1123 """Format an error message.
1125 Based on the opcode's error_codes parameter, either format a
1126 parseable error code, or a simpler error string.
1128 This must be called only from Exec and functions called from Exec.
1131 ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1132 itype, etxt, _ = ecode
1133 # If the error code is in the list of ignored errors, demote the error to a
1135 if etxt in self.op.ignore_errors: # pylint: disable=E1101
1136 ltype = self.ETYPE_WARNING
1137 # first complete the msg
1140 # then format the whole message
1141 if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1142 msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1148 msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1149 # and finally report it via the feedback_fn
1150 self._feedback_fn(" - %s" % msg) # Mix-in. pylint: disable=E1101
1151 # do not mark the operation as failed for WARN cases only
1152 if ltype == self.ETYPE_ERROR:
1155 def _ErrorIf(self, cond, *args, **kwargs):
1156 """Log an error message if the passed condition is True.
1160 or self.op.debug_simulate_errors): # pylint: disable=E1101
1161 self._Error(*args, **kwargs)
1164 def _VerifyCertificate(filename):
1165 """Verifies a certificate for L{LUClusterVerifyConfig}.
1167 @type filename: string
1168 @param filename: Path to PEM file
1172 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1173 utils.ReadFile(filename))
1174 except Exception, err: # pylint: disable=W0703
1175 return (LUClusterVerifyConfig.ETYPE_ERROR,
1176 "Failed to load X509 certificate %s: %s" % (filename, err))
1179 utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1180 constants.SSL_CERT_EXPIRATION_ERROR)
1183 fnamemsg = "While verifying %s: %s" % (filename, msg)
1188 return (None, fnamemsg)
1189 elif errcode == utils.CERT_WARNING:
1190 return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1191 elif errcode == utils.CERT_ERROR:
1192 return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1194 raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1197 def _GetAllHypervisorParameters(cluster, instances):
1198 """Compute the set of all hypervisor parameters.
1200 @type cluster: L{objects.Cluster}
1201 @param cluster: the cluster object
1202 @param instances: list of L{objects.Instance}
1203 @param instances: additional instances from which to obtain parameters
1204 @rtype: list of (origin, hypervisor, parameters)
1205 @return: a list with all parameters found, indicating the hypervisor they
1206 apply to, and the origin (can be "cluster", "os X", or "instance Y")
1211 for hv_name in cluster.enabled_hypervisors:
1212 hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1214 for os_name, os_hvp in cluster.os_hvp.items():
1215 for hv_name, hv_params in os_hvp.items():
1217 full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1218 hvp_data.append(("os %s" % os_name, hv_name, full_params))
1220 # TODO: collapse identical parameter values in a single one
1221 for instance in instances:
1222 if instance.hvparams:
1223 hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1224 cluster.FillHV(instance)))
1229 class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1230 """Verifies the cluster config.
1235 def _VerifyHVP(self, hvp_data):
1236 """Verifies locally the syntax of the hypervisor parameters.
1239 for item, hv_name, hv_params in hvp_data:
1240 msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1243 hv_class = hypervisor.GetHypervisorClass(hv_name)
1244 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1245 hv_class.CheckParameterSyntax(hv_params)
1246 except errors.GenericError, err:
1247 self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1249 def ExpandNames(self):
1250 self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1251 self.share_locks = ShareAll()
1253 def CheckPrereq(self):
1254 """Check prerequisites.
1257 # Retrieve all information
1258 self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1259 self.all_node_info = self.cfg.GetAllNodesInfo()
1260 self.all_inst_info = self.cfg.GetAllInstancesInfo()
1262 def Exec(self, feedback_fn):
1263 """Verify integrity of cluster, performing various test on nodes.
1267 self._feedback_fn = feedback_fn
1269 feedback_fn("* Verifying cluster config")
1271 for msg in self.cfg.VerifyConfig():
1272 self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1274 feedback_fn("* Verifying cluster certificate files")
1276 for cert_filename in pathutils.ALL_CERT_FILES:
1277 (errcode, msg) = _VerifyCertificate(cert_filename)
1278 self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1280 self._ErrorIf(not utils.CanRead(constants.CONFD_USER,
1281 pathutils.NODED_CERT_FILE),
1282 constants.CV_ECLUSTERCERT,
1284 pathutils.NODED_CERT_FILE + " must be accessible by the " +
1285 constants.CONFD_USER + " user")
1287 feedback_fn("* Verifying hypervisor parameters")
1289 self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1290 self.all_inst_info.values()))
1292 feedback_fn("* Verifying all nodes belong to an existing group")
1294 # We do this verification here because, should this bogus circumstance
1295 # occur, it would never be caught by VerifyGroup, which only acts on
1296 # nodes/instances reachable from existing node groups.
1298 dangling_nodes = set(node.name for node in self.all_node_info.values()
1299 if node.group not in self.all_group_info)
1301 dangling_instances = {}
1302 no_node_instances = []
1304 for inst in self.all_inst_info.values():
1305 if inst.primary_node in dangling_nodes:
1306 dangling_instances.setdefault(inst.primary_node, []).append(inst.name)
1307 elif inst.primary_node not in self.all_node_info:
1308 no_node_instances.append(inst.name)
1313 utils.CommaJoin(dangling_instances.get(node.name,
1315 for node in dangling_nodes]
1317 self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1319 "the following nodes (and their instances) belong to a non"
1320 " existing group: %s", utils.CommaJoin(pretty_dangling))
1322 self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1324 "the following instances have a non-existing primary-node:"
1325 " %s", utils.CommaJoin(no_node_instances))
1330 class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1331 """Verifies the status of a node group.
1334 HPATH = "cluster-verify"
1335 HTYPE = constants.HTYPE_CLUSTER
1338 _HOOKS_INDENT_RE = re.compile("^", re.M)
1340 class NodeImage(object):
1341 """A class representing the logical and physical status of a node.
1344 @ivar name: the node name to which this object refers
1345 @ivar volumes: a structure as returned from
1346 L{ganeti.backend.GetVolumeList} (runtime)
1347 @ivar instances: a list of running instances (runtime)
1348 @ivar pinst: list of configured primary instances (config)
1349 @ivar sinst: list of configured secondary instances (config)
1350 @ivar sbp: dictionary of {primary-node: list of instances} for all
1351 instances for which this node is secondary (config)
1352 @ivar mfree: free memory, as reported by hypervisor (runtime)
1353 @ivar dfree: free disk, as reported by the node (runtime)
1354 @ivar offline: the offline status (config)
1355 @type rpc_fail: boolean
1356 @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1357 not whether the individual keys were correct) (runtime)
1358 @type lvm_fail: boolean
1359 @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1360 @type hyp_fail: boolean
1361 @ivar hyp_fail: whether the RPC call didn't return the instance list
1362 @type ghost: boolean
1363 @ivar ghost: whether this is a known node or not (config)
1364 @type os_fail: boolean
1365 @ivar os_fail: whether the RPC call didn't return valid OS data
1367 @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1368 @type vm_capable: boolean
1369 @ivar vm_capable: whether the node can host instances
1371 @ivar pv_min: size in MiB of the smallest PVs
1373 @ivar pv_max: size in MiB of the biggest PVs
1376 def __init__(self, offline=False, name=None, vm_capable=True):
1385 self.offline = offline
1386 self.vm_capable = vm_capable
1387 self.rpc_fail = False
1388 self.lvm_fail = False
1389 self.hyp_fail = False
1391 self.os_fail = False
1396 def ExpandNames(self):
1397 # This raises errors.OpPrereqError on its own:
1398 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1400 # Get instances in node group; this is unsafe and needs verification later
1402 self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1404 self.needed_locks = {
1405 locking.LEVEL_INSTANCE: inst_names,
1406 locking.LEVEL_NODEGROUP: [self.group_uuid],
1407 locking.LEVEL_NODE: [],
1409 # This opcode is run by watcher every five minutes and acquires all nodes
1410 # for a group. It doesn't run for a long time, so it's better to acquire
1411 # the node allocation lock as well.
1412 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1415 self.share_locks = ShareAll()
1417 def DeclareLocks(self, level):
1418 if level == locking.LEVEL_NODE:
1419 # Get members of node group; this is unsafe and needs verification later
1420 nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1422 all_inst_info = self.cfg.GetAllInstancesInfo()
1424 # In Exec(), we warn about mirrored instances that have primary and
1425 # secondary living in separate node groups. To fully verify that
1426 # volumes for these instances are healthy, we will need to do an
1427 # extra call to their secondaries. We ensure here those nodes will
1429 for inst in self.owned_locks(locking.LEVEL_INSTANCE):
1430 # Important: access only the instances whose lock is owned
1431 if all_inst_info[inst].disk_template in constants.DTS_INT_MIRROR:
1432 nodes.update(all_inst_info[inst].secondary_nodes)
1434 self.needed_locks[locking.LEVEL_NODE] = nodes
1436 def CheckPrereq(self):
1437 assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1438 self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1440 group_nodes = set(self.group_info.members)
1442 self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1445 group_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1447 unlocked_instances = \
1448 group_instances.difference(self.owned_locks(locking.LEVEL_INSTANCE))
1451 raise errors.OpPrereqError("Missing lock for nodes: %s" %
1452 utils.CommaJoin(unlocked_nodes),
1455 if unlocked_instances:
1456 raise errors.OpPrereqError("Missing lock for instances: %s" %
1457 utils.CommaJoin(unlocked_instances),
1460 self.all_node_info = self.cfg.GetAllNodesInfo()
1461 self.all_inst_info = self.cfg.GetAllInstancesInfo()
1463 self.my_node_names = utils.NiceSort(group_nodes)
1464 self.my_inst_names = utils.NiceSort(group_instances)
1466 self.my_node_info = dict((name, self.all_node_info[name])
1467 for name in self.my_node_names)
1469 self.my_inst_info = dict((name, self.all_inst_info[name])
1470 for name in self.my_inst_names)
1472 # We detect here the nodes that will need the extra RPC calls for verifying
1473 # split LV volumes; they should be locked.
1474 extra_lv_nodes = set()
1476 for inst in self.my_inst_info.values():
1477 if inst.disk_template in constants.DTS_INT_MIRROR:
1478 for nname in inst.all_nodes:
1479 if self.all_node_info[nname].group != self.group_uuid:
1480 extra_lv_nodes.add(nname)
1482 unlocked_lv_nodes = \
1483 extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1485 if unlocked_lv_nodes:
1486 raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1487 utils.CommaJoin(unlocked_lv_nodes),
1489 self.extra_lv_nodes = list(extra_lv_nodes)
1491 def _VerifyNode(self, ninfo, nresult):
1492 """Perform some basic validation on data returned from a node.
1494 - check the result data structure is well formed and has all the
1496 - check ganeti version
1498 @type ninfo: L{objects.Node}
1499 @param ninfo: the node to check
1500 @param nresult: the results from the node
1502 @return: whether overall this call was successful (and we can expect
1503 reasonable values in the respose)
1507 _ErrorIf = self._ErrorIf # pylint: disable=C0103
1509 # main result, nresult should be a non-empty dict
1510 test = not nresult or not isinstance(nresult, dict)
1511 _ErrorIf(test, constants.CV_ENODERPC, node,
1512 "unable to verify node: no data returned")
1516 # compares ganeti version
1517 local_version = constants.PROTOCOL_VERSION
1518 remote_version = nresult.get("version", None)
1519 test = not (remote_version and
1520 isinstance(remote_version, (list, tuple)) and
1521 len(remote_version) == 2)
1522 _ErrorIf(test, constants.CV_ENODERPC, node,
1523 "connection to node returned invalid data")
1527 test = local_version != remote_version[0]
1528 _ErrorIf(test, constants.CV_ENODEVERSION, node,
1529 "incompatible protocol versions: master %s,"
1530 " node %s", local_version, remote_version[0])
1534 # node seems compatible, we can actually try to look into its results
1536 # full package version
1537 self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1538 constants.CV_ENODEVERSION, node,
1539 "software version mismatch: master %s, node %s",
1540 constants.RELEASE_VERSION, remote_version[1],
1541 code=self.ETYPE_WARNING)
1543 hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1544 if ninfo.vm_capable and isinstance(hyp_result, dict):
1545 for hv_name, hv_result in hyp_result.iteritems():
1546 test = hv_result is not None
1547 _ErrorIf(test, constants.CV_ENODEHV, node,
1548 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1550 hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1551 if ninfo.vm_capable and isinstance(hvp_result, list):
1552 for item, hv_name, hv_result in hvp_result:
1553 _ErrorIf(True, constants.CV_ENODEHV, node,
1554 "hypervisor %s parameter verify failure (source %s): %s",
1555 hv_name, item, hv_result)
1557 test = nresult.get(constants.NV_NODESETUP,
1558 ["Missing NODESETUP results"])
1559 _ErrorIf(test, constants.CV_ENODESETUP, node, "node setup error: %s",
1564 def _VerifyNodeTime(self, ninfo, nresult,
1565 nvinfo_starttime, nvinfo_endtime):
1566 """Check the node time.
1568 @type ninfo: L{objects.Node}
1569 @param ninfo: the node to check
1570 @param nresult: the remote results for the node
1571 @param nvinfo_starttime: the start time of the RPC call
1572 @param nvinfo_endtime: the end time of the RPC call
1576 _ErrorIf = self._ErrorIf # pylint: disable=C0103
1578 ntime = nresult.get(constants.NV_TIME, None)
1580 ntime_merged = utils.MergeTime(ntime)
1581 except (ValueError, TypeError):
1582 _ErrorIf(True, constants.CV_ENODETIME, node, "Node returned invalid time")
1585 if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1586 ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1587 elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1588 ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1592 _ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, node,
1593 "Node time diverges by at least %s from master node time",
1596 def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1597 """Check the node LVM results and update info for cross-node checks.
1599 @type ninfo: L{objects.Node}
1600 @param ninfo: the node to check
1601 @param nresult: the remote results for the node
1602 @param vg_name: the configured VG name
1603 @type nimg: L{NodeImage}
1604 @param nimg: node image
1611 _ErrorIf = self._ErrorIf # pylint: disable=C0103
1613 # checks vg existence and size > 20G
1614 vglist = nresult.get(constants.NV_VGLIST, None)
1616 _ErrorIf(test, constants.CV_ENODELVM, node, "unable to check volume groups")
1618 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1619 constants.MIN_VG_SIZE)
1620 _ErrorIf(vgstatus, constants.CV_ENODELVM, node, vgstatus)
1623 (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1625 self._Error(constants.CV_ENODELVM, node, em)
1626 if pvminmax is not None:
1627 (nimg.pv_min, nimg.pv_max) = pvminmax
1629 def _VerifyGroupLVM(self, node_image, vg_name):
1630 """Check cross-node consistency in LVM.
1632 @type node_image: dict
1633 @param node_image: info about nodes, mapping from node to names to
1634 L{NodeImage} objects
1635 @param vg_name: the configured VG name
1641 # Only exlcusive storage needs this kind of checks
1642 if not self._exclusive_storage:
1645 # exclusive_storage wants all PVs to have the same size (approximately),
1646 # if the smallest and the biggest ones are okay, everything is fine.
1647 # pv_min is None iff pv_max is None
1648 vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1651 (pvmin, minnode) = min((ni.pv_min, ni.name) for ni in vals)
1652 (pvmax, maxnode) = max((ni.pv_max, ni.name) for ni in vals)
1653 bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1654 self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1655 "PV sizes differ too much in the group; smallest (%s MB) is"
1656 " on %s, biggest (%s MB) is on %s",
1657 pvmin, minnode, pvmax, maxnode)
1659 def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1660 """Check the node bridges.
1662 @type ninfo: L{objects.Node}
1663 @param ninfo: the node to check
1664 @param nresult: the remote results for the node
1665 @param bridges: the expected list of bridges
1672 _ErrorIf = self._ErrorIf # pylint: disable=C0103
1674 missing = nresult.get(constants.NV_BRIDGES, None)
1675 test = not isinstance(missing, list)
1676 _ErrorIf(test, constants.CV_ENODENET, node,
1677 "did not return valid bridge information")
1679 _ErrorIf(bool(missing), constants.CV_ENODENET, node,
1680 "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1682 def _VerifyNodeUserScripts(self, ninfo, nresult):
1683 """Check the results of user scripts presence and executability on the node
1685 @type ninfo: L{objects.Node}
1686 @param ninfo: the node to check
1687 @param nresult: the remote results for the node
1692 test = not constants.NV_USERSCRIPTS in nresult
1693 self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, node,
1694 "did not return user scripts information")
1696 broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
1698 self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, node,
1699 "user scripts not present or not executable: %s" %
1700 utils.CommaJoin(sorted(broken_scripts)))
1702 def _VerifyNodeNetwork(self, ninfo, nresult):
1703 """Check the node network connectivity results.
1705 @type ninfo: L{objects.Node}
1706 @param ninfo: the node to check
1707 @param nresult: the remote results for the node
1711 _ErrorIf = self._ErrorIf # pylint: disable=C0103
1713 test = constants.NV_NODELIST not in nresult
1714 _ErrorIf(test, constants.CV_ENODESSH, node,
1715 "node hasn't returned node ssh connectivity data")
1717 if nresult[constants.NV_NODELIST]:
1718 for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1719 _ErrorIf(True, constants.CV_ENODESSH, node,
1720 "ssh communication with node '%s': %s", a_node, a_msg)
1722 test = constants.NV_NODENETTEST not in nresult
1723 _ErrorIf(test, constants.CV_ENODENET, node,
1724 "node hasn't returned node tcp connectivity data")
1726 if nresult[constants.NV_NODENETTEST]:
1727 nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1729 _ErrorIf(True, constants.CV_ENODENET, node,
1730 "tcp communication with node '%s': %s",
1731 anode, nresult[constants.NV_NODENETTEST][anode])
1733 test = constants.NV_MASTERIP not in nresult
1734 _ErrorIf(test, constants.CV_ENODENET, node,
1735 "node hasn't returned node master IP reachability data")
1737 if not nresult[constants.NV_MASTERIP]:
1738 if node == self.master_node:
1739 msg = "the master node cannot reach the master IP (not configured?)"
1741 msg = "cannot reach the master IP"
1742 _ErrorIf(True, constants.CV_ENODENET, node, msg)
1744 def _VerifyInstance(self, instance, inst_config, node_image,
1746 """Verify an instance.
1748 This function checks to see if the required block devices are
1749 available on the instance's node, and that the nodes are in the correct
1753 _ErrorIf = self._ErrorIf # pylint: disable=C0103
1754 pnode = inst_config.primary_node
1755 pnode_img = node_image[pnode]
1756 groupinfo = self.cfg.GetAllNodeGroupsInfo()
1758 node_vol_should = {}
1759 inst_config.MapLVsByNode(node_vol_should)
1761 cluster = self.cfg.GetClusterInfo()
1762 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
1764 err = ComputeIPolicyInstanceViolation(ipolicy, inst_config, self.cfg)
1765 _ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance, utils.CommaJoin(err),
1766 code=self.ETYPE_WARNING)
1768 for node in node_vol_should:
1769 n_img = node_image[node]
1770 if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1771 # ignore missing volumes on offline or broken nodes
1773 for volume in node_vol_should[node]:
1774 test = volume not in n_img.volumes
1775 _ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance,
1776 "volume %s missing on node %s", volume, node)
1778 if inst_config.admin_state == constants.ADMINST_UP:
1779 test = instance not in pnode_img.instances and not pnode_img.offline
1780 _ErrorIf(test, constants.CV_EINSTANCEDOWN, instance,
1781 "instance not running on its primary node %s",
1783 _ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE, instance,
1784 "instance is marked as running and lives on offline node %s",
1787 diskdata = [(nname, success, status, idx)
1788 for (nname, disks) in diskstatus.items()
1789 for idx, (success, status) in enumerate(disks)]
1791 for nname, success, bdev_status, idx in diskdata:
1792 # the 'ghost node' construction in Exec() ensures that we have a
1794 snode = node_image[nname]
1795 bad_snode = snode.ghost or snode.offline
1796 _ErrorIf(inst_config.disks_active and
1797 not success and not bad_snode,
1798 constants.CV_EINSTANCEFAULTYDISK, instance,
1799 "couldn't retrieve status for disk/%s on %s: %s",
1800 idx, nname, bdev_status)
1801 _ErrorIf((inst_config.disks_active and
1802 success and bdev_status.ldisk_status == constants.LDS_FAULTY),
1803 constants.CV_EINSTANCEFAULTYDISK, instance,
1804 "disk/%s on %s is faulty", idx, nname)
1806 _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
1807 constants.CV_ENODERPC, pnode, "instance %s, connection to"
1808 " primary node failed", instance)
1810 _ErrorIf(len(inst_config.secondary_nodes) > 1,
1811 constants.CV_EINSTANCELAYOUT,
1812 instance, "instance has multiple secondary nodes: %s",
1813 utils.CommaJoin(inst_config.secondary_nodes),
1814 code=self.ETYPE_WARNING)
1816 if inst_config.disk_template not in constants.DTS_EXCL_STORAGE:
1817 # Disk template not compatible with exclusive_storage: no instance
1818 # node should have the flag set
1819 es_flags = rpc.GetExclusiveStorageForNodeNames(self.cfg,
1820 inst_config.all_nodes)
1821 es_nodes = [n for (n, es) in es_flags.items()
1823 _ErrorIf(es_nodes, constants.CV_EINSTANCEUNSUITABLENODE, instance,
1824 "instance has template %s, which is not supported on nodes"
1825 " that have exclusive storage set: %s",
1826 inst_config.disk_template, utils.CommaJoin(es_nodes))
1828 if inst_config.disk_template in constants.DTS_INT_MIRROR:
1829 instance_nodes = utils.NiceSort(inst_config.all_nodes)
1830 instance_groups = {}
1832 for node in instance_nodes:
1833 instance_groups.setdefault(self.all_node_info[node].group,
1837 "%s (group %s)" % (utils.CommaJoin(nodes), groupinfo[group].name)
1838 # Sort so that we always list the primary node first.
1839 for group, nodes in sorted(instance_groups.items(),
1840 key=lambda (_, nodes): pnode in nodes,
1843 self._ErrorIf(len(instance_groups) > 1,
1844 constants.CV_EINSTANCESPLITGROUPS,
1845 instance, "instance has primary and secondary nodes in"
1846 " different groups: %s", utils.CommaJoin(pretty_list),
1847 code=self.ETYPE_WARNING)
1849 inst_nodes_offline = []
1850 for snode in inst_config.secondary_nodes:
1851 s_img = node_image[snode]
1852 _ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
1853 snode, "instance %s, connection to secondary node failed",
1857 inst_nodes_offline.append(snode)
1859 # warn that the instance lives on offline nodes
1860 _ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE, instance,
1861 "instance has offline secondary node(s) %s",
1862 utils.CommaJoin(inst_nodes_offline))
1863 # ... or ghost/non-vm_capable nodes
1864 for node in inst_config.all_nodes:
1865 _ErrorIf(node_image[node].ghost, constants.CV_EINSTANCEBADNODE,
1866 instance, "instance lives on ghost node %s", node)
1867 _ErrorIf(not node_image[node].vm_capable, constants.CV_EINSTANCEBADNODE,
1868 instance, "instance lives on non-vm_capable node %s", node)
1870 def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
1871 """Verify if there are any unknown volumes in the cluster.
1873 The .os, .swap and backup volumes are ignored. All other volumes are
1874 reported as unknown.
1876 @type reserved: L{ganeti.utils.FieldSet}
1877 @param reserved: a FieldSet of reserved volume names
1880 for node, n_img in node_image.items():
1881 if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
1882 self.all_node_info[node].group != self.group_uuid):
1883 # skip non-healthy nodes
1885 for volume in n_img.volumes:
1886 test = ((node not in node_vol_should or
1887 volume not in node_vol_should[node]) and
1888 not reserved.Matches(volume))
1889 self._ErrorIf(test, constants.CV_ENODEORPHANLV, node,
1890 "volume %s is unknown", volume)
1892 def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
1893 """Verify N+1 Memory Resilience.
1895 Check that if one single node dies we can still start all the
1896 instances it was primary for.
1899 cluster_info = self.cfg.GetClusterInfo()
1900 for node, n_img in node_image.items():
1901 # This code checks that every node which is now listed as
1902 # secondary has enough memory to host all instances it is
1903 # supposed to should a single other node in the cluster fail.
1904 # FIXME: not ready for failover to an arbitrary node
1905 # FIXME: does not support file-backed instances
1906 # WARNING: we currently take into account down instances as well
1907 # as up ones, considering that even if they're down someone
1908 # might want to start them even in the event of a node failure.
1909 if n_img.offline or self.all_node_info[node].group != self.group_uuid:
1910 # we're skipping nodes marked offline and nodes in other groups from
1911 # the N+1 warning, since most likely we don't have good memory
1912 # infromation from them; we already list instances living on such
1913 # nodes, and that's enough warning
1915 #TODO(dynmem): also consider ballooning out other instances
1916 for prinode, instances in n_img.sbp.items():
1918 for instance in instances:
1919 bep = cluster_info.FillBE(instance_cfg[instance])
1920 if bep[constants.BE_AUTO_BALANCE]:
1921 needed_mem += bep[constants.BE_MINMEM]
1922 test = n_img.mfree < needed_mem
1923 self._ErrorIf(test, constants.CV_ENODEN1, node,
1924 "not enough memory to accomodate instance failovers"
1925 " should node %s fail (%dMiB needed, %dMiB available)",
1926 prinode, needed_mem, n_img.mfree)
1929 def _VerifyFiles(cls, errorif, nodeinfo, master_node, all_nvinfo,
1930 (files_all, files_opt, files_mc, files_vm)):
1931 """Verifies file checksums collected from all nodes.
1933 @param errorif: Callback for reporting errors
1934 @param nodeinfo: List of L{objects.Node} objects
1935 @param master_node: Name of master node
1936 @param all_nvinfo: RPC results
1939 # Define functions determining which nodes to consider for a file
1942 (files_mc, lambda node: (node.master_candidate or
1943 node.name == master_node)),
1944 (files_vm, lambda node: node.vm_capable),
1947 # Build mapping from filename to list of nodes which should have the file
1949 for (files, fn) in files2nodefn:
1951 filenodes = nodeinfo
1953 filenodes = filter(fn, nodeinfo)
1954 nodefiles.update((filename,
1955 frozenset(map(operator.attrgetter("name"), filenodes)))
1956 for filename in files)
1958 assert set(nodefiles) == (files_all | files_mc | files_vm)
1960 fileinfo = dict((filename, {}) for filename in nodefiles)
1961 ignore_nodes = set()
1963 for node in nodeinfo:
1965 ignore_nodes.add(node.name)
1968 nresult = all_nvinfo[node.name]
1970 if nresult.fail_msg or not nresult.payload:
1973 fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
1974 node_files = dict((vcluster.LocalizeVirtualPath(key), value)
1975 for (key, value) in fingerprints.items())
1978 test = not (node_files and isinstance(node_files, dict))
1979 errorif(test, constants.CV_ENODEFILECHECK, node.name,
1980 "Node did not return file checksum data")
1982 ignore_nodes.add(node.name)
1985 # Build per-checksum mapping from filename to nodes having it
1986 for (filename, checksum) in node_files.items():
1987 assert filename in nodefiles
1988 fileinfo[filename].setdefault(checksum, set()).add(node.name)
1990 for (filename, checksums) in fileinfo.items():
1991 assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
1993 # Nodes having the file
1994 with_file = frozenset(node_name
1995 for nodes in fileinfo[filename].values()
1996 for node_name in nodes) - ignore_nodes
1998 expected_nodes = nodefiles[filename] - ignore_nodes
2000 # Nodes missing file
2001 missing_file = expected_nodes - with_file
2003 if filename in files_opt:
2005 errorif(missing_file and missing_file != expected_nodes,
2006 constants.CV_ECLUSTERFILECHECK, None,
2007 "File %s is optional, but it must exist on all or no"
2008 " nodes (not found on %s)",
2009 filename, utils.CommaJoin(utils.NiceSort(missing_file)))
2011 errorif(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2012 "File %s is missing from node(s) %s", filename,
2013 utils.CommaJoin(utils.NiceSort(missing_file)))
2015 # Warn if a node has a file it shouldn't
2016 unexpected = with_file - expected_nodes
2018 constants.CV_ECLUSTERFILECHECK, None,
2019 "File %s should not exist on node(s) %s",
2020 filename, utils.CommaJoin(utils.NiceSort(unexpected)))
2022 # See if there are multiple versions of the file
2023 test = len(checksums) > 1
2025 variants = ["variant %s on %s" %
2026 (idx + 1, utils.CommaJoin(utils.NiceSort(nodes)))
2027 for (idx, (checksum, nodes)) in
2028 enumerate(sorted(checksums.items()))]
2032 errorif(test, constants.CV_ECLUSTERFILECHECK, None,
2033 "File %s found with %s different checksums (%s)",
2034 filename, len(checksums), "; ".join(variants))
2036 def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2038 """Verifies and the node DRBD status.
2040 @type ninfo: L{objects.Node}
2041 @param ninfo: the node to check
2042 @param nresult: the remote results for the node
2043 @param instanceinfo: the dict of instances
2044 @param drbd_helper: the configured DRBD usermode helper
2045 @param drbd_map: the DRBD map as returned by
2046 L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2050 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2053 helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2054 test = (helper_result is None)
2055 _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2056 "no drbd usermode helper returned")
2058 status, payload = helper_result
2060 _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2061 "drbd usermode helper check unsuccessful: %s", payload)
2062 test = status and (payload != drbd_helper)
2063 _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2064 "wrong drbd usermode helper: %s", payload)
2066 # compute the DRBD minors
2068 for minor, instance in drbd_map[node].items():
2069 test = instance not in instanceinfo
2070 _ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2071 "ghost instance '%s' in temporary DRBD map", instance)
2072 # ghost instance should not be running, but otherwise we
2073 # don't give double warnings (both ghost instance and
2074 # unallocated minor in use)
2076 node_drbd[minor] = (instance, False)
2078 instance = instanceinfo[instance]
2079 node_drbd[minor] = (instance.name, instance.disks_active)
2081 # and now check them
2082 used_minors = nresult.get(constants.NV_DRBDLIST, [])
2083 test = not isinstance(used_minors, (tuple, list))
2084 _ErrorIf(test, constants.CV_ENODEDRBD, node,
2085 "cannot parse drbd status file: %s", str(used_minors))
2087 # we cannot check drbd status
2090 for minor, (iname, must_exist) in node_drbd.items():
2091 test = minor not in used_minors and must_exist
2092 _ErrorIf(test, constants.CV_ENODEDRBD, node,
2093 "drbd minor %d of instance %s is not active", minor, iname)
2094 for minor in used_minors:
2095 test = minor not in node_drbd
2096 _ErrorIf(test, constants.CV_ENODEDRBD, node,
2097 "unallocated drbd minor %d is in use", minor)
2099 def _UpdateNodeOS(self, ninfo, nresult, nimg):
2100 """Builds the node OS structures.
2102 @type ninfo: L{objects.Node}
2103 @param ninfo: the node to check
2104 @param nresult: the remote results for the node
2105 @param nimg: the node image object
2109 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2111 remote_os = nresult.get(constants.NV_OSLIST, None)
2112 test = (not isinstance(remote_os, list) or
2113 not compat.all(isinstance(v, list) and len(v) == 7
2114 for v in remote_os))
2116 _ErrorIf(test, constants.CV_ENODEOS, node,
2117 "node hasn't returned valid OS data")
2126 for (name, os_path, status, diagnose,
2127 variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2129 if name not in os_dict:
2132 # parameters is a list of lists instead of list of tuples due to
2133 # JSON lacking a real tuple type, fix it:
2134 parameters = [tuple(v) for v in parameters]
2135 os_dict[name].append((os_path, status, diagnose,
2136 set(variants), set(parameters), set(api_ver)))
2138 nimg.oslist = os_dict
2140 def _VerifyNodeOS(self, ninfo, nimg, base):
2141 """Verifies the node OS list.
2143 @type ninfo: L{objects.Node}
2144 @param ninfo: the node to check
2145 @param nimg: the node image object
2146 @param base: the 'template' node we match against (e.g. from the master)
2150 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2152 assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2154 beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2155 for os_name, os_data in nimg.oslist.items():
2156 assert os_data, "Empty OS status for OS %s?!" % os_name
2157 f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2158 _ErrorIf(not f_status, constants.CV_ENODEOS, node,
2159 "Invalid OS %s (located at %s): %s", os_name, f_path, f_diag)
2160 _ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, node,
2161 "OS '%s' has multiple entries (first one shadows the rest): %s",
2162 os_name, utils.CommaJoin([v[0] for v in os_data]))
2163 # comparisons with the 'base' image
2164 test = os_name not in base.oslist
2165 _ErrorIf(test, constants.CV_ENODEOS, node,
2166 "Extra OS %s not present on reference node (%s)",
2170 assert base.oslist[os_name], "Base node has empty OS status?"
2171 _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2173 # base OS is invalid, skipping
2175 for kind, a, b in [("API version", f_api, b_api),
2176 ("variants list", f_var, b_var),
2177 ("parameters", beautify_params(f_param),
2178 beautify_params(b_param))]:
2179 _ErrorIf(a != b, constants.CV_ENODEOS, node,
2180 "OS %s for %s differs from reference node %s: [%s] vs. [%s]",
2181 kind, os_name, base.name,
2182 utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2184 # check any missing OSes
2185 missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2186 _ErrorIf(missing, constants.CV_ENODEOS, node,
2187 "OSes present on reference node %s but missing on this node: %s",
2188 base.name, utils.CommaJoin(missing))
2190 def _VerifyFileStoragePaths(self, ninfo, nresult, is_master):
2191 """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2193 @type ninfo: L{objects.Node}
2194 @param ninfo: the node to check
2195 @param nresult: the remote results for the node
2196 @type is_master: bool
2197 @param is_master: Whether node is the master node
2203 (constants.ENABLE_FILE_STORAGE or
2204 constants.ENABLE_SHARED_FILE_STORAGE)):
2206 fspaths = nresult[constants.NV_FILE_STORAGE_PATHS]
2208 # This should never happen
2209 self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, node,
2210 "Node did not return forbidden file storage paths")
2212 self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, node,
2213 "Found forbidden file storage paths: %s",
2214 utils.CommaJoin(fspaths))
2216 self._ErrorIf(constants.NV_FILE_STORAGE_PATHS in nresult,
2217 constants.CV_ENODEFILESTORAGEPATHS, node,
2218 "Node should not have returned forbidden file storage"
2221 def _VerifyOob(self, ninfo, nresult):
2222 """Verifies out of band functionality of a node.
2224 @type ninfo: L{objects.Node}
2225 @param ninfo: the node to check
2226 @param nresult: the remote results for the node
2230 # We just have to verify the paths on master and/or master candidates
2231 # as the oob helper is invoked on the master
2232 if ((ninfo.master_candidate or ninfo.master_capable) and
2233 constants.NV_OOB_PATHS in nresult):
2234 for path_result in nresult[constants.NV_OOB_PATHS]:
2235 self._ErrorIf(path_result, constants.CV_ENODEOOBPATH, node, path_result)
2237 def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2238 """Verifies and updates the node volume data.
2240 This function will update a L{NodeImage}'s internal structures
2241 with data from the remote call.
2243 @type ninfo: L{objects.Node}
2244 @param ninfo: the node to check
2245 @param nresult: the remote results for the node
2246 @param nimg: the node image object
2247 @param vg_name: the configured VG name
2251 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2253 nimg.lvm_fail = True
2254 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2257 elif isinstance(lvdata, basestring):
2258 _ErrorIf(True, constants.CV_ENODELVM, node, "LVM problem on node: %s",
2259 utils.SafeEncode(lvdata))
2260 elif not isinstance(lvdata, dict):
2261 _ErrorIf(True, constants.CV_ENODELVM, node,
2262 "rpc call to node failed (lvlist)")
2264 nimg.volumes = lvdata
2265 nimg.lvm_fail = False
2267 def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2268 """Verifies and updates the node instance list.
2270 If the listing was successful, then updates this node's instance
2271 list. Otherwise, it marks the RPC call as failed for the instance
2274 @type ninfo: L{objects.Node}
2275 @param ninfo: the node to check
2276 @param nresult: the remote results for the node
2277 @param nimg: the node image object
2280 idata = nresult.get(constants.NV_INSTANCELIST, None)
2281 test = not isinstance(idata, list)
2282 self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2283 "rpc call to node failed (instancelist): %s",
2284 utils.SafeEncode(str(idata)))
2286 nimg.hyp_fail = True
2288 nimg.instances = idata
2290 def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2291 """Verifies and computes a node information map
2293 @type ninfo: L{objects.Node}
2294 @param ninfo: the node to check
2295 @param nresult: the remote results for the node
2296 @param nimg: the node image object
2297 @param vg_name: the configured VG name
2301 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2303 # try to read free memory (from the hypervisor)
2304 hv_info = nresult.get(constants.NV_HVINFO, None)
2305 test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2306 _ErrorIf(test, constants.CV_ENODEHV, node,
2307 "rpc call to node failed (hvinfo)")
2310 nimg.mfree = int(hv_info["memory_free"])
2311 except (ValueError, TypeError):
2312 _ErrorIf(True, constants.CV_ENODERPC, node,
2313 "node returned invalid nodeinfo, check hypervisor")
2315 # FIXME: devise a free space model for file based instances as well
2316 if vg_name is not None:
2317 test = (constants.NV_VGLIST not in nresult or
2318 vg_name not in nresult[constants.NV_VGLIST])
2319 _ErrorIf(test, constants.CV_ENODELVM, node,
2320 "node didn't return data for the volume group '%s'"
2321 " - it is either missing or broken", vg_name)
2324 nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2325 except (ValueError, TypeError):
2326 _ErrorIf(True, constants.CV_ENODERPC, node,
2327 "node returned invalid LVM info, check LVM status")
2329 def _CollectDiskInfo(self, nodelist, node_image, instanceinfo):
2330 """Gets per-disk status information for all instances.
2332 @type nodelist: list of strings
2333 @param nodelist: Node names
2334 @type node_image: dict of (name, L{objects.Node})
2335 @param node_image: Node objects
2336 @type instanceinfo: dict of (name, L{objects.Instance})
2337 @param instanceinfo: Instance objects
2338 @rtype: {instance: {node: [(succes, payload)]}}
2339 @return: a dictionary of per-instance dictionaries with nodes as
2340 keys and disk information as values; the disk information is a
2341 list of tuples (success, payload)
2344 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2347 node_disks_devonly = {}
2348 diskless_instances = set()
2349 diskless = constants.DT_DISKLESS
2351 for nname in nodelist:
2352 node_instances = list(itertools.chain(node_image[nname].pinst,
2353 node_image[nname].sinst))
2354 diskless_instances.update(inst for inst in node_instances
2355 if instanceinfo[inst].disk_template == diskless)
2356 disks = [(inst, disk)
2357 for inst in node_instances
2358 for disk in instanceinfo[inst].disks]
2361 # No need to collect data
2364 node_disks[nname] = disks
2366 # _AnnotateDiskParams makes already copies of the disks
2368 for (inst, dev) in disks:
2369 (anno_disk,) = AnnotateDiskParams(instanceinfo[inst], [dev], self.cfg)
2370 self.cfg.SetDiskID(anno_disk, nname)
2371 devonly.append(anno_disk)
2373 node_disks_devonly[nname] = devonly
2375 assert len(node_disks) == len(node_disks_devonly)
2377 # Collect data from all nodes with disks
2378 result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2381 assert len(result) == len(node_disks)
2385 for (nname, nres) in result.items():
2386 disks = node_disks[nname]
2389 # No data from this node
2390 data = len(disks) * [(False, "node offline")]
2393 _ErrorIf(msg, constants.CV_ENODERPC, nname,
2394 "while getting disk information: %s", msg)
2396 # No data from this node
2397 data = len(disks) * [(False, msg)]
2400 for idx, i in enumerate(nres.payload):
2401 if isinstance(i, (tuple, list)) and len(i) == 2:
2404 logging.warning("Invalid result from node %s, entry %d: %s",
2406 data.append((False, "Invalid result from the remote node"))
2408 for ((inst, _), status) in zip(disks, data):
2409 instdisk.setdefault(inst, {}).setdefault(nname, []).append(status)
2411 # Add empty entries for diskless instances.
2412 for inst in diskless_instances:
2413 assert inst not in instdisk
2416 assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2417 len(nnames) <= len(instanceinfo[inst].all_nodes) and
2418 compat.all(isinstance(s, (tuple, list)) and
2419 len(s) == 2 for s in statuses)
2420 for inst, nnames in instdisk.items()
2421 for nname, statuses in nnames.items())
2423 instdisk_keys = set(instdisk)
2424 instanceinfo_keys = set(instanceinfo)
2425 assert instdisk_keys == instanceinfo_keys, \
2426 ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2427 (instdisk_keys, instanceinfo_keys))
2432 def _SshNodeSelector(group_uuid, all_nodes):
2433 """Create endless iterators for all potential SSH check hosts.
2436 nodes = [node for node in all_nodes
2437 if (node.group != group_uuid and
2439 keyfunc = operator.attrgetter("group")
2441 return map(itertools.cycle,
2442 [sorted(map(operator.attrgetter("name"), names))
2443 for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2447 def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2448 """Choose which nodes should talk to which other nodes.
2450 We will make nodes contact all nodes in their group, and one node from
2453 @warning: This algorithm has a known issue if one node group is much
2454 smaller than others (e.g. just one node). In such a case all other
2455 nodes will talk to the single node.
2458 online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2459 sel = cls._SshNodeSelector(group_uuid, all_nodes)
2461 return (online_nodes,
2462 dict((name, sorted([i.next() for i in sel]))
2463 for name in online_nodes))
2465 def BuildHooksEnv(self):
2468 Cluster-Verify hooks just ran in the post phase and their failure makes
2469 the output be logged in the verify output and the verification to fail.
2473 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2476 env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2477 for node in self.my_node_info.values())
2481 def BuildHooksNodes(self):
2482 """Build hooks nodes.
2485 return ([], self.my_node_names)
2487 def Exec(self, feedback_fn):
2488 """Verify integrity of the node group, performing various test on nodes.
2491 # This method has too many local variables. pylint: disable=R0914
2492 feedback_fn("* Verifying group '%s'" % self.group_info.name)
2494 if not self.my_node_names:
2496 feedback_fn("* Empty node group, skipping verification")
2500 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2501 verbose = self.op.verbose
2502 self._feedback_fn = feedback_fn
2504 vg_name = self.cfg.GetVGName()
2505 drbd_helper = self.cfg.GetDRBDHelper()
2506 cluster = self.cfg.GetClusterInfo()
2507 hypervisors = cluster.enabled_hypervisors
2508 node_data_list = [self.my_node_info[name] for name in self.my_node_names]
2510 i_non_redundant = [] # Non redundant instances
2511 i_non_a_balanced = [] # Non auto-balanced instances
2512 i_offline = 0 # Count of offline instances
2513 n_offline = 0 # Count of offline nodes
2514 n_drained = 0 # Count of nodes being drained
2515 node_vol_should = {}
2517 # FIXME: verify OS list
2520 filemap = ComputeAncillaryFiles(cluster, False)
2522 # do local checksums
2523 master_node = self.master_node = self.cfg.GetMasterNode()
2524 master_ip = self.cfg.GetMasterIP()
2526 feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_names))
2529 if self.cfg.GetUseExternalMipScript():
2530 user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2532 node_verify_param = {
2533 constants.NV_FILELIST:
2534 map(vcluster.MakeVirtualPath,
2535 utils.UniqueSequence(filename
2536 for files in filemap
2537 for filename in files)),
2538 constants.NV_NODELIST:
2539 self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2540 self.all_node_info.values()),
2541 constants.NV_HYPERVISOR: hypervisors,
2542 constants.NV_HVPARAMS:
2543 _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2544 constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2545 for node in node_data_list
2546 if not node.offline],
2547 constants.NV_INSTANCELIST: hypervisors,
2548 constants.NV_VERSION: None,
2549 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2550 constants.NV_NODESETUP: None,
2551 constants.NV_TIME: None,
2552 constants.NV_MASTERIP: (master_node, master_ip),
2553 constants.NV_OSLIST: None,
2554 constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2555 constants.NV_USERSCRIPTS: user_scripts,
2558 if vg_name is not None:
2559 node_verify_param[constants.NV_VGLIST] = None
2560 node_verify_param[constants.NV_LVLIST] = vg_name
2561 node_verify_param[constants.NV_PVLIST] = [vg_name]
2564 node_verify_param[constants.NV_DRBDLIST] = None
2565 node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2567 if constants.ENABLE_FILE_STORAGE or constants.ENABLE_SHARED_FILE_STORAGE:
2568 # Load file storage paths only from master node
2569 node_verify_param[constants.NV_FILE_STORAGE_PATHS] = master_node
2572 # FIXME: this needs to be changed per node-group, not cluster-wide
2574 default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2575 if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2576 bridges.add(default_nicpp[constants.NIC_LINK])
2577 for instance in self.my_inst_info.values():
2578 for nic in instance.nics:
2579 full_nic = cluster.SimpleFillNIC(nic.nicparams)
2580 if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2581 bridges.add(full_nic[constants.NIC_LINK])
2584 node_verify_param[constants.NV_BRIDGES] = list(bridges)
2586 # Build our expected cluster state
2587 node_image = dict((node.name, self.NodeImage(offline=node.offline,
2589 vm_capable=node.vm_capable))
2590 for node in node_data_list)
2594 for node in self.all_node_info.values():
2595 path = SupportsOob(self.cfg, node)
2596 if path and path not in oob_paths:
2597 oob_paths.append(path)
2600 node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2602 for instance in self.my_inst_names:
2603 inst_config = self.my_inst_info[instance]
2604 if inst_config.admin_state == constants.ADMINST_OFFLINE:
2607 for nname in inst_config.all_nodes:
2608 if nname not in node_image:
2609 gnode = self.NodeImage(name=nname)
2610 gnode.ghost = (nname not in self.all_node_info)
2611 node_image[nname] = gnode
2613 inst_config.MapLVsByNode(node_vol_should)
2615 pnode = inst_config.primary_node
2616 node_image[pnode].pinst.append(instance)
2618 for snode in inst_config.secondary_nodes:
2619 nimg = node_image[snode]
2620 nimg.sinst.append(instance)
2621 if pnode not in nimg.sbp:
2622 nimg.sbp[pnode] = []
2623 nimg.sbp[pnode].append(instance)
2625 es_flags = rpc.GetExclusiveStorageForNodeNames(self.cfg, self.my_node_names)
2626 # The value of exclusive_storage should be the same across the group, so if
2627 # it's True for at least a node, we act as if it were set for all the nodes
2628 self._exclusive_storage = compat.any(es_flags.values())
2629 if self._exclusive_storage:
2630 node_verify_param[constants.NV_EXCLUSIVEPVS] = True
2632 # At this point, we have the in-memory data structures complete,
2633 # except for the runtime information, which we'll gather next
2635 # Due to the way our RPC system works, exact response times cannot be
2636 # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2637 # time before and after executing the request, we can at least have a time
2639 nvinfo_starttime = time.time()
2640 all_nvinfo = self.rpc.call_node_verify(self.my_node_names,
2642 self.cfg.GetClusterName())
2643 nvinfo_endtime = time.time()
2645 if self.extra_lv_nodes and vg_name is not None:
2647 self.rpc.call_node_verify(self.extra_lv_nodes,
2648 {constants.NV_LVLIST: vg_name},
2649 self.cfg.GetClusterName())
2651 extra_lv_nvinfo = {}
2653 all_drbd_map = self.cfg.ComputeDRBDMap()
2655 feedback_fn("* Gathering disk information (%s nodes)" %
2656 len(self.my_node_names))
2657 instdisk = self._CollectDiskInfo(self.my_node_names, node_image,
2660 feedback_fn("* Verifying configuration file consistency")
2662 # If not all nodes are being checked, we need to make sure the master node
2663 # and a non-checked vm_capable node are in the list.
2664 absent_nodes = set(self.all_node_info).difference(self.my_node_info)
2666 vf_nvinfo = all_nvinfo.copy()
2667 vf_node_info = list(self.my_node_info.values())
2668 additional_nodes = []
2669 if master_node not in self.my_node_info:
2670 additional_nodes.append(master_node)
2671 vf_node_info.append(self.all_node_info[master_node])
2672 # Add the first vm_capable node we find which is not included,
2673 # excluding the master node (which we already have)
2674 for node in absent_nodes:
2675 nodeinfo = self.all_node_info[node]
2676 if (nodeinfo.vm_capable and not nodeinfo.offline and
2677 node != master_node):
2678 additional_nodes.append(node)
2679 vf_node_info.append(self.all_node_info[node])
2681 key = constants.NV_FILELIST
2682 vf_nvinfo.update(self.rpc.call_node_verify(additional_nodes,
2683 {key: node_verify_param[key]},
2684 self.cfg.GetClusterName()))
2686 vf_nvinfo = all_nvinfo
2687 vf_node_info = self.my_node_info.values()
2689 self._VerifyFiles(_ErrorIf, vf_node_info, master_node, vf_nvinfo, filemap)
2691 feedback_fn("* Verifying node status")
2695 for node_i in node_data_list:
2697 nimg = node_image[node]
2701 feedback_fn("* Skipping offline node %s" % (node,))
2705 if node == master_node:
2707 elif node_i.master_candidate:
2708 ntype = "master candidate"
2709 elif node_i.drained:
2715 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
2717 msg = all_nvinfo[node].fail_msg
2718 _ErrorIf(msg, constants.CV_ENODERPC, node, "while contacting node: %s",
2721 nimg.rpc_fail = True
2724 nresult = all_nvinfo[node].payload
2726 nimg.call_ok = self._VerifyNode(node_i, nresult)
2727 self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2728 self._VerifyNodeNetwork(node_i, nresult)
2729 self._VerifyNodeUserScripts(node_i, nresult)
2730 self._VerifyOob(node_i, nresult)
2731 self._VerifyFileStoragePaths(node_i, nresult,
2732 node == master_node)
2735 self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
2736 self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
2739 self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2740 self._UpdateNodeInstances(node_i, nresult, nimg)
2741 self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2742 self._UpdateNodeOS(node_i, nresult, nimg)
2744 if not nimg.os_fail:
2745 if refos_img is None:
2747 self._VerifyNodeOS(node_i, nimg, refos_img)
2748 self._VerifyNodeBridges(node_i, nresult, bridges)
2750 # Check whether all running instancies are primary for the node. (This
2751 # can no longer be done from _VerifyInstance below, since some of the
2752 # wrong instances could be from other node groups.)
2753 non_primary_inst = set(nimg.instances).difference(nimg.pinst)
2755 for inst in non_primary_inst:
2756 test = inst in self.all_inst_info
2757 _ErrorIf(test, constants.CV_EINSTANCEWRONGNODE, inst,
2758 "instance should not run on node %s", node_i.name)
2759 _ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
2760 "node is running unknown instance %s", inst)
2762 self._VerifyGroupLVM(node_image, vg_name)
2764 for node, result in extra_lv_nvinfo.items():
2765 self._UpdateNodeVolumes(self.all_node_info[node], result.payload,
2766 node_image[node], vg_name)
2768 feedback_fn("* Verifying instance status")
2769 for instance in self.my_inst_names:
2771 feedback_fn("* Verifying instance %s" % instance)
2772 inst_config = self.my_inst_info[instance]
2773 self._VerifyInstance(instance, inst_config, node_image,
2776 # If the instance is non-redundant we cannot survive losing its primary
2777 # node, so we are not N+1 compliant.
2778 if inst_config.disk_template not in constants.DTS_MIRRORED:
2779 i_non_redundant.append(instance)
2781 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
2782 i_non_a_balanced.append(instance)
2784 feedback_fn("* Verifying orphan volumes")
2785 reserved = utils.FieldSet(*cluster.reserved_lvs)
2787 # We will get spurious "unknown volume" warnings if any node of this group
2788 # is secondary for an instance whose primary is in another group. To avoid
2789 # them, we find these instances and add their volumes to node_vol_should.
2790 for inst in self.all_inst_info.values():
2791 for secondary in inst.secondary_nodes:
2792 if (secondary in self.my_node_info
2793 and inst.name not in self.my_inst_info):
2794 inst.MapLVsByNode(node_vol_should)
2797 self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
2799 if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
2800 feedback_fn("* Verifying N+1 Memory redundancy")
2801 self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
2803 feedback_fn("* Other Notes")
2805 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
2806 % len(i_non_redundant))
2808 if i_non_a_balanced:
2809 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
2810 % len(i_non_a_balanced))
2813 feedback_fn(" - NOTICE: %d offline instance(s) found." % i_offline)
2816 feedback_fn(" - NOTICE: %d offline node(s) found." % n_offline)
2819 feedback_fn(" - NOTICE: %d drained node(s) found." % n_drained)
2823 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
2824 """Analyze the post-hooks' result
2826 This method analyses the hook result, handles it, and sends some
2827 nicely-formatted feedback back to the user.
2829 @param phase: one of L{constants.HOOKS_PHASE_POST} or
2830 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
2831 @param hooks_results: the results of the multi-node hooks rpc call
2832 @param feedback_fn: function used send feedback back to the caller
2833 @param lu_result: previous Exec result
2834 @return: the new Exec result, based on the previous result
2838 # We only really run POST phase hooks, only for non-empty groups,
2839 # and are only interested in their results
2840 if not self.my_node_names:
2843 elif phase == constants.HOOKS_PHASE_POST:
2844 # Used to change hooks' output to proper indentation
2845 feedback_fn("* Hooks Results")
2846 assert hooks_results, "invalid result from hooks"
2848 for node_name in hooks_results:
2849 res = hooks_results[node_name]
2851 test = msg and not res.offline
2852 self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
2853 "Communication failure in hooks execution: %s", msg)
2854 if res.offline or msg:
2855 # No need to investigate payload if node is offline or gave
2858 for script, hkr, output in res.payload:
2859 test = hkr == constants.HKR_FAIL
2860 self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
2861 "Script %s failed, output:", script)
2863 output = self._HOOKS_INDENT_RE.sub(" ", output)
2864 feedback_fn("%s" % output)
2870 class LUClusterVerifyDisks(NoHooksLU):
2871 """Verifies the cluster disks status.
2876 def ExpandNames(self):
2877 self.share_locks = ShareAll()
2878 self.needed_locks = {
2879 locking.LEVEL_NODEGROUP: locking.ALL_SET,
2882 def Exec(self, feedback_fn):
2883 group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
2885 # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
2886 return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
2887 for group in group_names])