4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Logical units dealing with the cluster."""
34 from ganeti import compat
35 from ganeti import constants
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import masterd
40 from ganeti import netutils
41 from ganeti import objects
42 from ganeti import opcodes
43 from ganeti import pathutils
44 from ganeti import query
45 from ganeti import rpc
46 from ganeti import runtime
47 from ganeti import ssh
48 from ganeti import uidpool
49 from ganeti import utils
50 from ganeti import vcluster
52 from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
54 from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55 ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56 GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57 GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58 CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59 ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob
61 import ganeti.masterd.instance
64 class LUClusterActivateMasterIp(NoHooksLU):
65 """Activate the master IP on the master node.
68 def Exec(self, feedback_fn):
69 """Activate the master IP.
72 master_params = self.cfg.GetMasterNetworkParameters()
73 ems = self.cfg.GetUseExternalMipScript()
74 result = self.rpc.call_node_activate_master_ip(master_params.uuid,
76 result.Raise("Could not activate the master IP")
79 class LUClusterDeactivateMasterIp(NoHooksLU):
80 """Deactivate the master IP on the master node.
83 def Exec(self, feedback_fn):
84 """Deactivate the master IP.
87 master_params = self.cfg.GetMasterNetworkParameters()
88 ems = self.cfg.GetUseExternalMipScript()
89 result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
91 result.Raise("Could not deactivate the master IP")
94 class LUClusterConfigQuery(NoHooksLU):
95 """Return configuration values.
100 def CheckArguments(self):
101 self.cq = ClusterQuery(None, self.op.output_fields, False)
103 def ExpandNames(self):
104 self.cq.ExpandNames(self)
106 def DeclareLocks(self, level):
107 self.cq.DeclareLocks(self, level)
109 def Exec(self, feedback_fn):
110 result = self.cq.OldStyleQuery(self)
112 assert len(result) == 1
117 class LUClusterDestroy(LogicalUnit):
118 """Logical unit for destroying the cluster.
121 HPATH = "cluster-destroy"
122 HTYPE = constants.HTYPE_CLUSTER
124 def BuildHooksEnv(self):
129 "OP_TARGET": self.cfg.GetClusterName(),
132 def BuildHooksNodes(self):
133 """Build hooks nodes.
138 def CheckPrereq(self):
139 """Check prerequisites.
141 This checks whether the cluster is empty.
143 Any errors are signaled by raising errors.OpPrereqError.
146 master = self.cfg.GetMasterNode()
148 nodelist = self.cfg.GetNodeList()
149 if len(nodelist) != 1 or nodelist[0] != master:
150 raise errors.OpPrereqError("There are still %d node(s) in"
151 " this cluster." % (len(nodelist) - 1),
153 instancelist = self.cfg.GetInstanceList()
155 raise errors.OpPrereqError("There are still %d instance(s) in"
156 " this cluster." % len(instancelist),
159 def Exec(self, feedback_fn):
160 """Destroys the cluster.
163 master_params = self.cfg.GetMasterNetworkParameters()
165 # Run post hooks on master node before it's removed
166 RunPostHook(self, self.cfg.GetNodeName(master_params.uuid))
168 ems = self.cfg.GetUseExternalMipScript()
169 result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
171 result.Warn("Error disabling the master IP address", self.LogWarning)
172 return master_params.uuid
175 class LUClusterPostInit(LogicalUnit):
176 """Logical unit for running hooks after cluster initialization.
179 HPATH = "cluster-init"
180 HTYPE = constants.HTYPE_CLUSTER
182 def BuildHooksEnv(self):
187 "OP_TARGET": self.cfg.GetClusterName(),
190 def BuildHooksNodes(self):
191 """Build hooks nodes.
194 return ([], [self.cfg.GetMasterNode()])
196 def Exec(self, feedback_fn):
203 class ClusterQuery(QueryBase):
204 FIELDS = query.CLUSTER_FIELDS
206 #: Do not sort (there is only one item)
209 def ExpandNames(self, lu):
212 # The following variables interact with _QueryBase._GetNames
213 self.wanted = locking.ALL_SET
214 self.do_locking = self.use_locking
217 raise errors.OpPrereqError("Can not use locking for cluster queries",
220 def DeclareLocks(self, lu, level):
223 def _GetQueryData(self, lu):
224 """Computes the list of nodes and their attributes.
227 # Locking is not used
228 assert not (compat.any(lu.glm.is_owned(level)
229 for level in locking.LEVELS
230 if level != locking.LEVEL_CLUSTER) or
231 self.do_locking or self.use_locking)
233 if query.CQ_CONFIG in self.requested_data:
234 cluster = lu.cfg.GetClusterInfo()
235 nodes = lu.cfg.GetAllNodesInfo()
237 cluster = NotImplemented
238 nodes = NotImplemented
240 if query.CQ_QUEUE_DRAINED in self.requested_data:
241 drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
243 drain_flag = NotImplemented
245 if query.CQ_WATCHER_PAUSE in self.requested_data:
246 master_node_uuid = lu.cfg.GetMasterNode()
248 result = lu.rpc.call_get_watcher_pause(master_node_uuid)
249 result.Raise("Can't retrieve watcher pause from master node '%s'" %
250 lu.cfg.GetMasterNodeName())
252 watcher_pause = result.payload
254 watcher_pause = NotImplemented
256 return query.ClusterQueryData(cluster, nodes, drain_flag, watcher_pause)
259 class LUClusterQuery(NoHooksLU):
260 """Query cluster configuration.
265 def ExpandNames(self):
266 self.needed_locks = {}
268 def Exec(self, feedback_fn):
269 """Return cluster config.
272 cluster = self.cfg.GetClusterInfo()
275 # Filter just for enabled hypervisors
276 for os_name, hv_dict in cluster.os_hvp.items():
278 for hv_name, hv_params in hv_dict.items():
279 if hv_name in cluster.enabled_hypervisors:
280 os_hvp[os_name][hv_name] = hv_params
282 # Convert ip_family to ip_version
283 primary_ip_version = constants.IP4_VERSION
284 if cluster.primary_ip_family == netutils.IP6Address.family:
285 primary_ip_version = constants.IP6_VERSION
288 "software_version": constants.RELEASE_VERSION,
289 "protocol_version": constants.PROTOCOL_VERSION,
290 "config_version": constants.CONFIG_VERSION,
291 "os_api_version": max(constants.OS_API_VERSIONS),
292 "export_version": constants.EXPORT_VERSION,
293 "architecture": runtime.GetArchInfo(),
294 "name": cluster.cluster_name,
295 "master": self.cfg.GetMasterNodeName(),
296 "default_hypervisor": cluster.primary_hypervisor,
297 "enabled_hypervisors": cluster.enabled_hypervisors,
298 "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
299 for hypervisor_name in cluster.enabled_hypervisors]),
301 "beparams": cluster.beparams,
302 "osparams": cluster.osparams,
303 "ipolicy": cluster.ipolicy,
304 "nicparams": cluster.nicparams,
305 "ndparams": cluster.ndparams,
306 "diskparams": cluster.diskparams,
307 "candidate_pool_size": cluster.candidate_pool_size,
308 "master_netdev": cluster.master_netdev,
309 "master_netmask": cluster.master_netmask,
310 "use_external_mip_script": cluster.use_external_mip_script,
311 "volume_group_name": cluster.volume_group_name,
312 "drbd_usermode_helper": cluster.drbd_usermode_helper,
313 "file_storage_dir": cluster.file_storage_dir,
314 "shared_file_storage_dir": cluster.shared_file_storage_dir,
315 "maintain_node_health": cluster.maintain_node_health,
316 "ctime": cluster.ctime,
317 "mtime": cluster.mtime,
318 "uuid": cluster.uuid,
319 "tags": list(cluster.GetTags()),
320 "uid_pool": cluster.uid_pool,
321 "default_iallocator": cluster.default_iallocator,
322 "reserved_lvs": cluster.reserved_lvs,
323 "primary_ip_version": primary_ip_version,
324 "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
325 "hidden_os": cluster.hidden_os,
326 "blacklisted_os": cluster.blacklisted_os,
332 class LUClusterRedistConf(NoHooksLU):
333 """Force the redistribution of cluster configuration.
335 This is a very simple LU.
340 def ExpandNames(self):
341 self.needed_locks = {
342 locking.LEVEL_NODE: locking.ALL_SET,
343 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
345 self.share_locks = ShareAll()
347 def Exec(self, feedback_fn):
348 """Redistribute the configuration.
351 self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
352 RedistributeAncillaryFiles(self)
355 class LUClusterRename(LogicalUnit):
356 """Rename the cluster.
359 HPATH = "cluster-rename"
360 HTYPE = constants.HTYPE_CLUSTER
362 def BuildHooksEnv(self):
367 "OP_TARGET": self.cfg.GetClusterName(),
368 "NEW_NAME": self.op.name,
371 def BuildHooksNodes(self):
372 """Build hooks nodes.
375 return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
377 def CheckPrereq(self):
378 """Verify that the passed name is a valid one.
381 hostname = netutils.GetHostname(name=self.op.name,
382 family=self.cfg.GetPrimaryIPFamily())
384 new_name = hostname.name
385 self.ip = new_ip = hostname.ip
386 old_name = self.cfg.GetClusterName()
387 old_ip = self.cfg.GetMasterIP()
388 if new_name == old_name and new_ip == old_ip:
389 raise errors.OpPrereqError("Neither the name nor the IP address of the"
390 " cluster has changed",
393 if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
394 raise errors.OpPrereqError("The given cluster IP address (%s) is"
395 " reachable on the network" %
396 new_ip, errors.ECODE_NOTUNIQUE)
398 self.op.name = new_name
400 def Exec(self, feedback_fn):
401 """Rename the cluster.
404 clustername = self.op.name
407 # shutdown the master IP
408 master_params = self.cfg.GetMasterNetworkParameters()
409 ems = self.cfg.GetUseExternalMipScript()
410 result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
412 result.Raise("Could not disable the master role")
415 cluster = self.cfg.GetClusterInfo()
416 cluster.cluster_name = clustername
417 cluster.master_ip = new_ip
418 self.cfg.Update(cluster, feedback_fn)
420 # update the known hosts file
421 ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
422 node_list = self.cfg.GetOnlineNodeList()
424 node_list.remove(master_params.uuid)
427 UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
429 master_params.ip = new_ip
430 result = self.rpc.call_node_activate_master_ip(master_params.uuid,
432 result.Warn("Could not re-enable the master role on the master,"
433 " please restart manually", self.LogWarning)
438 class LUClusterRepairDiskSizes(NoHooksLU):
439 """Verifies the cluster disks sizes.
444 def ExpandNames(self):
445 if self.op.instances:
446 self.wanted_names = GetWantedInstances(self, self.op.instances)
447 # Not getting the node allocation lock as only a specific set of
448 # instances (and their nodes) is going to be acquired
449 self.needed_locks = {
450 locking.LEVEL_NODE_RES: [],
451 locking.LEVEL_INSTANCE: self.wanted_names,
453 self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
455 self.wanted_names = None
456 self.needed_locks = {
457 locking.LEVEL_NODE_RES: locking.ALL_SET,
458 locking.LEVEL_INSTANCE: locking.ALL_SET,
460 # This opcode is acquires the node locks for all instances
461 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
465 locking.LEVEL_NODE_RES: 1,
466 locking.LEVEL_INSTANCE: 0,
467 locking.LEVEL_NODE_ALLOC: 1,
470 def DeclareLocks(self, level):
471 if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
472 self._LockInstancesNodes(primary_only=True, level=level)
474 def CheckPrereq(self):
475 """Check prerequisites.
477 This only checks the optional instance list against the existing names.
480 if self.wanted_names is None:
481 self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
483 self.wanted_instances = \
484 map(compat.snd, self.cfg.GetMultiInstanceInfo(self.wanted_names))
486 def _EnsureChildSizes(self, disk):
487 """Ensure children of the disk have the needed disk size.
489 This is valid mainly for DRBD8 and fixes an issue where the
490 children have smaller disk size.
492 @param disk: an L{ganeti.objects.Disk} object
495 if disk.dev_type == constants.LD_DRBD8:
496 assert disk.children, "Empty children for DRBD8?"
497 fchild = disk.children[0]
498 mismatch = fchild.size < disk.size
500 self.LogInfo("Child disk has size %d, parent %d, fixing",
501 fchild.size, disk.size)
502 fchild.size = disk.size
504 # and we recurse on this child only, not on the metadev
505 return self._EnsureChildSizes(fchild) or mismatch
509 def Exec(self, feedback_fn):
510 """Verify the size of cluster disks.
513 # TODO: check child disks too
514 # TODO: check differences in size between primary/secondary nodes
516 for instance in self.wanted_instances:
517 pnode = instance.primary_node
518 if pnode not in per_node_disks:
519 per_node_disks[pnode] = []
520 for idx, disk in enumerate(instance.disks):
521 per_node_disks[pnode].append((instance, idx, disk))
523 assert not (frozenset(per_node_disks.keys()) -
524 self.owned_locks(locking.LEVEL_NODE_RES)), \
525 "Not owning correct locks"
526 assert not self.owned_locks(locking.LEVEL_NODE)
528 es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
529 per_node_disks.keys())
532 for node_uuid, dskl in per_node_disks.items():
533 newl = [v[2].Copy() for v in dskl]
535 self.cfg.SetDiskID(dsk, node_uuid)
536 node_name = self.cfg.GetNodeName(node_uuid)
537 result = self.rpc.call_blockdev_getdimensions(node_uuid, newl)
539 self.LogWarning("Failure in blockdev_getdimensions call to node"
540 " %s, ignoring", node_name)
542 if len(result.payload) != len(dskl):
543 logging.warning("Invalid result from node %s: len(dksl)=%d,"
544 " result.payload=%s", node_name, len(dskl),
546 self.LogWarning("Invalid result from node %s, ignoring node results",
549 for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
550 if dimensions is None:
551 self.LogWarning("Disk %d of instance %s did not return size"
552 " information, ignoring", idx, instance.name)
554 if not isinstance(dimensions, (tuple, list)):
555 self.LogWarning("Disk %d of instance %s did not return valid"
556 " dimension information, ignoring", idx,
559 (size, spindles) = dimensions
560 if not isinstance(size, (int, long)):
561 self.LogWarning("Disk %d of instance %s did not return valid"
562 " size information, ignoring", idx, instance.name)
565 if size != disk.size:
566 self.LogInfo("Disk %d of instance %s has mismatched size,"
567 " correcting: recorded %d, actual %d", idx,
568 instance.name, disk.size, size)
570 self.cfg.Update(instance, feedback_fn)
571 changed.append((instance.name, idx, "size", size))
572 if es_flags[node_uuid]:
574 self.LogWarning("Disk %d of instance %s did not return valid"
575 " spindles information, ignoring", idx,
577 elif disk.spindles is None or disk.spindles != spindles:
578 self.LogInfo("Disk %d of instance %s has mismatched spindles,"
579 " correcting: recorded %s, actual %s",
580 idx, instance.name, disk.spindles, spindles)
581 disk.spindles = spindles
582 self.cfg.Update(instance, feedback_fn)
583 changed.append((instance.name, idx, "spindles", disk.spindles))
584 if self._EnsureChildSizes(disk):
585 self.cfg.Update(instance, feedback_fn)
586 changed.append((instance.name, idx, "size", disk.size))
590 def _ValidateNetmask(cfg, netmask):
591 """Checks if a netmask is valid.
593 @type cfg: L{config.ConfigWriter}
594 @param cfg: The cluster configuration
596 @param netmask: the netmask to be verified
597 @raise errors.OpPrereqError: if the validation fails
600 ip_family = cfg.GetPrimaryIPFamily()
602 ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
603 except errors.ProgrammerError:
604 raise errors.OpPrereqError("Invalid primary ip family: %s." %
605 ip_family, errors.ECODE_INVAL)
606 if not ipcls.ValidateNetmask(netmask):
607 raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
608 (netmask), errors.ECODE_INVAL)
611 class LUClusterSetParams(LogicalUnit):
612 """Change the parameters of the cluster.
615 HPATH = "cluster-modify"
616 HTYPE = constants.HTYPE_CLUSTER
619 def CheckArguments(self):
624 uidpool.CheckUidPool(self.op.uid_pool)
627 uidpool.CheckUidPool(self.op.add_uids)
629 if self.op.remove_uids:
630 uidpool.CheckUidPool(self.op.remove_uids)
632 if self.op.master_netmask is not None:
633 _ValidateNetmask(self.cfg, self.op.master_netmask)
635 if self.op.diskparams:
636 for dt_params in self.op.diskparams.values():
637 utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
639 utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
640 except errors.OpPrereqError, err:
641 raise errors.OpPrereqError("While verify diskparams options: %s" % err,
644 def ExpandNames(self):
645 # FIXME: in the future maybe other cluster params won't require checking on
646 # all nodes to be modified.
647 # FIXME: This opcode changes cluster-wide settings. Is acquiring all
648 # resource locks the right thing, shouldn't it be the BGL instead?
649 self.needed_locks = {
650 locking.LEVEL_NODE: locking.ALL_SET,
651 locking.LEVEL_INSTANCE: locking.ALL_SET,
652 locking.LEVEL_NODEGROUP: locking.ALL_SET,
653 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
655 self.share_locks = ShareAll()
657 def BuildHooksEnv(self):
662 "OP_TARGET": self.cfg.GetClusterName(),
663 "NEW_VG_NAME": self.op.vg_name,
666 def BuildHooksNodes(self):
667 """Build hooks nodes.
670 mn = self.cfg.GetMasterNode()
673 def _CheckVgName(self, node_uuids, enabled_disk_templates,
674 new_enabled_disk_templates):
675 """Check the consistency of the vg name on all nodes and in case it gets
676 unset whether there are instances still using it.
679 if self.op.vg_name is not None and not self.op.vg_name:
680 if self.cfg.HasAnyDiskOfType(constants.LD_LV):
681 raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
682 " instances exist", errors.ECODE_INVAL)
684 if (self.op.vg_name is not None and
685 utils.IsLvmEnabled(enabled_disk_templates)) or \
686 (self.cfg.GetVGName() is not None and
687 utils.LvmGetsEnabled(enabled_disk_templates,
688 new_enabled_disk_templates)):
689 self._CheckVgNameOnNodes(node_uuids)
691 def _CheckVgNameOnNodes(self, node_uuids):
692 """Check the status of the volume group on each node.
695 vglist = self.rpc.call_vg_list(node_uuids)
696 for node_uuid in node_uuids:
697 msg = vglist[node_uuid].fail_msg
700 self.LogWarning("Error while gathering data on node %s"
701 " (ignoring node): %s",
702 self.cfg.GetNodeName(node_uuid), msg)
704 vgstatus = utils.CheckVolumeGroupSize(vglist[node_uuid].payload,
706 constants.MIN_VG_SIZE)
708 raise errors.OpPrereqError("Error on node '%s': %s" %
709 (self.cfg.GetNodeName(node_uuid), vgstatus),
710 errors.ECODE_ENVIRON)
712 def _GetEnabledDiskTemplates(self, cluster):
713 """Determines the enabled disk templates and the subset of disk templates
714 that are newly enabled by this operation.
717 enabled_disk_templates = None
718 new_enabled_disk_templates = []
719 if self.op.enabled_disk_templates:
720 enabled_disk_templates = self.op.enabled_disk_templates
721 new_enabled_disk_templates = \
722 list(set(enabled_disk_templates)
723 - set(cluster.enabled_disk_templates))
725 enabled_disk_templates = cluster.enabled_disk_templates
726 return (enabled_disk_templates, new_enabled_disk_templates)
728 def CheckPrereq(self):
729 """Check prerequisites.
731 This checks whether the given params don't conflict and
732 if the given volume group is valid.
735 if self.op.drbd_helper is not None and not self.op.drbd_helper:
736 if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
737 raise errors.OpPrereqError("Cannot disable drbd helper while"
738 " drbd-based instances exist",
741 node_uuids = self.owned_locks(locking.LEVEL_NODE)
742 self.cluster = cluster = self.cfg.GetClusterInfo()
744 vm_capable_node_uuids = [node.uuid
745 for node in self.cfg.GetAllNodesInfo().values()
746 if node.uuid in node_uuids and node.vm_capable]
748 (enabled_disk_templates, new_enabled_disk_templates) = \
749 self._GetEnabledDiskTemplates(cluster)
751 self._CheckVgName(vm_capable_node_uuids, enabled_disk_templates,
752 new_enabled_disk_templates)
754 if self.op.drbd_helper:
755 # checks given drbd helper on all nodes
756 helpers = self.rpc.call_drbd_helper(node_uuids)
757 for (_, ninfo) in self.cfg.GetMultiNodeInfo(node_uuids):
759 self.LogInfo("Not checking drbd helper on offline node %s",
762 msg = helpers[ninfo.uuid].fail_msg
764 raise errors.OpPrereqError("Error checking drbd helper on node"
765 " '%s': %s" % (ninfo.name, msg),
766 errors.ECODE_ENVIRON)
767 node_helper = helpers[ninfo.uuid].payload
768 if node_helper != self.op.drbd_helper:
769 raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
770 (ninfo.name, node_helper),
771 errors.ECODE_ENVIRON)
773 # validate params changes
775 objects.UpgradeBeParams(self.op.beparams)
776 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
777 self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
780 utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
781 self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
783 # TODO: we need a more general way to handle resetting
784 # cluster-level parameters to default values
785 if self.new_ndparams["oob_program"] == "":
786 self.new_ndparams["oob_program"] = \
787 constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
790 new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
791 self.cluster.hv_state_static)
792 self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
793 for hv, values in new_hv_state.items())
795 if self.op.disk_state:
796 new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
797 self.cluster.disk_state_static)
798 self.new_disk_state = \
799 dict((storage, dict((name, cluster.SimpleFillDiskState(values))
800 for name, values in svalues.items()))
801 for storage, svalues in new_disk_state.items())
804 self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
807 all_instances = self.cfg.GetAllInstancesInfo().values()
809 for group in self.cfg.GetAllNodeGroupsInfo().values():
810 instances = frozenset([inst for inst in all_instances
811 if compat.any(nuuid in group.members
812 for nuuid in inst.all_nodes)])
813 new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
814 ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
815 new = ComputeNewInstanceViolations(ipol,
816 new_ipolicy, instances, self.cfg)
818 violations.update(new)
821 self.LogWarning("After the ipolicy change the following instances"
823 utils.CommaJoin(utils.NiceSort(violations)))
825 if self.op.nicparams:
826 utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
827 self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
828 objects.NIC.CheckParameterSyntax(self.new_nicparams)
831 # check all instances for consistency
832 for instance in self.cfg.GetAllInstancesInfo().values():
833 for nic_idx, nic in enumerate(instance.nics):
834 params_copy = copy.deepcopy(nic.nicparams)
835 params_filled = objects.FillDict(self.new_nicparams, params_copy)
837 # check parameter syntax
839 objects.NIC.CheckParameterSyntax(params_filled)
840 except errors.ConfigurationError, err:
841 nic_errors.append("Instance %s, nic/%d: %s" %
842 (instance.name, nic_idx, err))
844 # if we're moving instances to routed, check that they have an ip
845 target_mode = params_filled[constants.NIC_MODE]
846 if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
847 nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
848 " address" % (instance.name, nic_idx))
850 raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
851 "\n".join(nic_errors), errors.ECODE_INVAL)
853 # hypervisor list/parameters
854 self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
856 for hv_name, hv_dict in self.op.hvparams.items():
857 if hv_name not in self.new_hvparams:
858 self.new_hvparams[hv_name] = hv_dict
860 self.new_hvparams[hv_name].update(hv_dict)
862 # disk template parameters
863 self.new_diskparams = objects.FillDict(cluster.diskparams, {})
864 if self.op.diskparams:
865 for dt_name, dt_params in self.op.diskparams.items():
866 if dt_name not in self.op.diskparams:
867 self.new_diskparams[dt_name] = dt_params
869 self.new_diskparams[dt_name].update(dt_params)
871 # os hypervisor parameters
872 self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
874 for os_name, hvs in self.op.os_hvp.items():
875 if os_name not in self.new_os_hvp:
876 self.new_os_hvp[os_name] = hvs
878 for hv_name, hv_dict in hvs.items():
880 # Delete if it exists
881 self.new_os_hvp[os_name].pop(hv_name, None)
882 elif hv_name not in self.new_os_hvp[os_name]:
883 self.new_os_hvp[os_name][hv_name] = hv_dict
885 self.new_os_hvp[os_name][hv_name].update(hv_dict)
888 self.new_osp = objects.FillDict(cluster.osparams, {})
890 for os_name, osp in self.op.osparams.items():
891 if os_name not in self.new_osp:
892 self.new_osp[os_name] = {}
894 self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
897 if not self.new_osp[os_name]:
898 # we removed all parameters
899 del self.new_osp[os_name]
901 # check the parameter validity (remote check)
902 CheckOSParams(self, False, [self.cfg.GetMasterNode()],
903 os_name, self.new_osp[os_name])
905 # changes to the hypervisor list
906 if self.op.enabled_hypervisors is not None:
907 self.hv_list = self.op.enabled_hypervisors
908 for hv in self.hv_list:
909 # if the hypervisor doesn't already exist in the cluster
910 # hvparams, we initialize it to empty, and then (in both
911 # cases) we make sure to fill the defaults, as we might not
912 # have a complete defaults list if the hypervisor wasn't
914 if hv not in new_hvp:
916 new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
917 utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
919 self.hv_list = cluster.enabled_hypervisors
921 if self.op.hvparams or self.op.enabled_hypervisors is not None:
922 # either the enabled list has changed, or the parameters have, validate
923 for hv_name, hv_params in self.new_hvparams.items():
924 if ((self.op.hvparams and hv_name in self.op.hvparams) or
925 (self.op.enabled_hypervisors and
926 hv_name in self.op.enabled_hypervisors)):
927 # either this is a new hypervisor, or its parameters have changed
928 hv_class = hypervisor.GetHypervisorClass(hv_name)
929 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
930 hv_class.CheckParameterSyntax(hv_params)
931 CheckHVParams(self, node_uuids, hv_name, hv_params)
933 self._CheckDiskTemplateConsistency()
936 # no need to check any newly-enabled hypervisors, since the
937 # defaults have already been checked in the above code-block
938 for os_name, os_hvp in self.new_os_hvp.items():
939 for hv_name, hv_params in os_hvp.items():
940 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
941 # we need to fill in the new os_hvp on top of the actual hv_p
942 cluster_defaults = self.new_hvparams.get(hv_name, {})
943 new_osp = objects.FillDict(cluster_defaults, hv_params)
944 hv_class = hypervisor.GetHypervisorClass(hv_name)
945 hv_class.CheckParameterSyntax(new_osp)
946 CheckHVParams(self, node_uuids, hv_name, new_osp)
948 if self.op.default_iallocator:
949 alloc_script = utils.FindFile(self.op.default_iallocator,
950 constants.IALLOCATOR_SEARCH_PATH,
952 if alloc_script is None:
953 raise errors.OpPrereqError("Invalid default iallocator script '%s'"
954 " specified" % self.op.default_iallocator,
957 def _CheckDiskTemplateConsistency(self):
958 """Check whether the disk templates that are going to be disabled
959 are still in use by some instances.
962 if self.op.enabled_disk_templates:
963 cluster = self.cfg.GetClusterInfo()
964 instances = self.cfg.GetAllInstancesInfo()
966 disk_templates_to_remove = set(cluster.enabled_disk_templates) \
967 - set(self.op.enabled_disk_templates)
968 for instance in instances.itervalues():
969 if instance.disk_template in disk_templates_to_remove:
970 raise errors.OpPrereqError("Cannot disable disk template '%s',"
971 " because instance '%s' is using it." %
972 (instance.disk_template, instance.name))
974 def _SetVgName(self, feedback_fn):
975 """Determines and sets the new volume group name.
978 if self.op.vg_name is not None:
979 if self.op.vg_name and not \
980 utils.IsLvmEnabled(self.cluster.enabled_disk_templates):
981 feedback_fn("Note that you specified a volume group, but did not"
982 " enable any lvm disk template.")
983 new_volume = self.op.vg_name
985 if utils.IsLvmEnabled(self.cluster.enabled_disk_templates):
986 raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
987 " disk templates are enabled.")
989 if new_volume != self.cfg.GetVGName():
990 self.cfg.SetVGName(new_volume)
992 feedback_fn("Cluster LVM configuration already in desired"
993 " state, not changing")
995 if utils.IsLvmEnabled(self.cluster.enabled_disk_templates) and \
996 not self.cfg.GetVGName():
997 raise errors.OpPrereqError("Please specify a volume group when"
998 " enabling lvm-based disk-templates.")
1000 def Exec(self, feedback_fn):
1001 """Change the parameters of the cluster.
1004 if self.op.enabled_disk_templates:
1005 self.cluster.enabled_disk_templates = \
1006 list(set(self.op.enabled_disk_templates))
1008 self._SetVgName(feedback_fn)
1010 if self.op.drbd_helper is not None:
1011 if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
1012 feedback_fn("Note that you specified a drbd user helper, but did"
1013 " enabled the drbd disk template.")
1014 new_helper = self.op.drbd_helper
1017 if new_helper != self.cfg.GetDRBDHelper():
1018 self.cfg.SetDRBDHelper(new_helper)
1020 feedback_fn("Cluster DRBD helper already in desired state,"
1022 if self.op.hvparams:
1023 self.cluster.hvparams = self.new_hvparams
1025 self.cluster.os_hvp = self.new_os_hvp
1026 if self.op.enabled_hypervisors is not None:
1027 self.cluster.hvparams = self.new_hvparams
1028 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1029 if self.op.beparams:
1030 self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1031 if self.op.nicparams:
1032 self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1034 self.cluster.ipolicy = self.new_ipolicy
1035 if self.op.osparams:
1036 self.cluster.osparams = self.new_osp
1037 if self.op.ndparams:
1038 self.cluster.ndparams = self.new_ndparams
1039 if self.op.diskparams:
1040 self.cluster.diskparams = self.new_diskparams
1041 if self.op.hv_state:
1042 self.cluster.hv_state_static = self.new_hv_state
1043 if self.op.disk_state:
1044 self.cluster.disk_state_static = self.new_disk_state
1046 if self.op.candidate_pool_size is not None:
1047 self.cluster.candidate_pool_size = self.op.candidate_pool_size
1048 # we need to update the pool size here, otherwise the save will fail
1049 AdjustCandidatePool(self, [])
1051 if self.op.maintain_node_health is not None:
1052 if self.op.maintain_node_health and not constants.ENABLE_CONFD:
1053 feedback_fn("Note: CONFD was disabled at build time, node health"
1054 " maintenance is not useful (still enabling it)")
1055 self.cluster.maintain_node_health = self.op.maintain_node_health
1057 if self.op.prealloc_wipe_disks is not None:
1058 self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
1060 if self.op.add_uids is not None:
1061 uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
1063 if self.op.remove_uids is not None:
1064 uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
1066 if self.op.uid_pool is not None:
1067 self.cluster.uid_pool = self.op.uid_pool
1069 if self.op.default_iallocator is not None:
1070 self.cluster.default_iallocator = self.op.default_iallocator
1072 if self.op.reserved_lvs is not None:
1073 self.cluster.reserved_lvs = self.op.reserved_lvs
1075 if self.op.use_external_mip_script is not None:
1076 self.cluster.use_external_mip_script = self.op.use_external_mip_script
1078 def helper_os(aname, mods, desc):
1080 lst = getattr(self.cluster, aname)
1081 for key, val in mods:
1082 if key == constants.DDM_ADD:
1084 feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1087 elif key == constants.DDM_REMOVE:
1091 feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1093 raise errors.ProgrammerError("Invalid modification '%s'" % key)
1095 if self.op.hidden_os:
1096 helper_os("hidden_os", self.op.hidden_os, "hidden")
1098 if self.op.blacklisted_os:
1099 helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1101 if self.op.master_netdev:
1102 master_params = self.cfg.GetMasterNetworkParameters()
1103 ems = self.cfg.GetUseExternalMipScript()
1104 feedback_fn("Shutting down master ip on the current netdev (%s)" %
1105 self.cluster.master_netdev)
1106 result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
1108 if not self.op.force:
1109 result.Raise("Could not disable the master ip")
1112 msg = ("Could not disable the master ip (continuing anyway): %s" %
1115 feedback_fn("Changing master_netdev from %s to %s" %
1116 (master_params.netdev, self.op.master_netdev))
1117 self.cluster.master_netdev = self.op.master_netdev
1119 if self.op.master_netmask:
1120 master_params = self.cfg.GetMasterNetworkParameters()
1121 feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1122 result = self.rpc.call_node_change_master_netmask(
1123 master_params.uuid, master_params.netmask,
1124 self.op.master_netmask, master_params.ip,
1125 master_params.netdev)
1126 result.Warn("Could not change the master IP netmask", feedback_fn)
1127 self.cluster.master_netmask = self.op.master_netmask
1129 self.cfg.Update(self.cluster, feedback_fn)
1131 if self.op.master_netdev:
1132 master_params = self.cfg.GetMasterNetworkParameters()
1133 feedback_fn("Starting the master ip on the new master netdev (%s)" %
1134 self.op.master_netdev)
1135 ems = self.cfg.GetUseExternalMipScript()
1136 result = self.rpc.call_node_activate_master_ip(master_params.uuid,
1138 result.Warn("Could not re-enable the master ip on the master,"
1139 " please restart manually", self.LogWarning)
1142 class LUClusterVerify(NoHooksLU):
1143 """Submits all jobs necessary to verify the cluster.
1148 def ExpandNames(self):
1149 self.needed_locks = {}
1151 def Exec(self, feedback_fn):
1154 if self.op.group_name:
1155 groups = [self.op.group_name]
1156 depends_fn = lambda: None
1158 groups = self.cfg.GetNodeGroupList()
1160 # Verify global configuration
1162 opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1165 # Always depend on global verification
1166 depends_fn = lambda: [(-len(jobs), [])]
1169 [opcodes.OpClusterVerifyGroup(group_name=group,
1170 ignore_errors=self.op.ignore_errors,
1171 depends=depends_fn())]
1172 for group in groups)
1174 # Fix up all parameters
1175 for op in itertools.chain(*jobs): # pylint: disable=W0142
1176 op.debug_simulate_errors = self.op.debug_simulate_errors
1177 op.verbose = self.op.verbose
1178 op.error_codes = self.op.error_codes
1180 op.skip_checks = self.op.skip_checks
1181 except AttributeError:
1182 assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1184 return ResultWithJobs(jobs)
1187 class _VerifyErrors(object):
1188 """Mix-in for cluster/group verify LUs.
1190 It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1191 self.op and self._feedback_fn to be available.)
1195 ETYPE_FIELD = "code"
1196 ETYPE_ERROR = "ERROR"
1197 ETYPE_WARNING = "WARNING"
1199 def _Error(self, ecode, item, msg, *args, **kwargs):
1200 """Format an error message.
1202 Based on the opcode's error_codes parameter, either format a
1203 parseable error code, or a simpler error string.
1205 This must be called only from Exec and functions called from Exec.
1208 ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1209 itype, etxt, _ = ecode
1210 # If the error code is in the list of ignored errors, demote the error to a
1212 if etxt in self.op.ignore_errors: # pylint: disable=E1101
1213 ltype = self.ETYPE_WARNING
1214 # first complete the msg
1217 # then format the whole message
1218 if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1219 msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1225 msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1226 # and finally report it via the feedback_fn
1227 self._feedback_fn(" - %s" % msg) # Mix-in. pylint: disable=E1101
1228 # do not mark the operation as failed for WARN cases only
1229 if ltype == self.ETYPE_ERROR:
1232 def _ErrorIf(self, cond, *args, **kwargs):
1233 """Log an error message if the passed condition is True.
1237 or self.op.debug_simulate_errors): # pylint: disable=E1101
1238 self._Error(*args, **kwargs)
1241 def _VerifyCertificate(filename):
1242 """Verifies a certificate for L{LUClusterVerifyConfig}.
1244 @type filename: string
1245 @param filename: Path to PEM file
1249 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1250 utils.ReadFile(filename))
1251 except Exception, err: # pylint: disable=W0703
1252 return (LUClusterVerifyConfig.ETYPE_ERROR,
1253 "Failed to load X509 certificate %s: %s" % (filename, err))
1256 utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1257 constants.SSL_CERT_EXPIRATION_ERROR)
1260 fnamemsg = "While verifying %s: %s" % (filename, msg)
1265 return (None, fnamemsg)
1266 elif errcode == utils.CERT_WARNING:
1267 return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1268 elif errcode == utils.CERT_ERROR:
1269 return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1271 raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1274 def _GetAllHypervisorParameters(cluster, instances):
1275 """Compute the set of all hypervisor parameters.
1277 @type cluster: L{objects.Cluster}
1278 @param cluster: the cluster object
1279 @param instances: list of L{objects.Instance}
1280 @param instances: additional instances from which to obtain parameters
1281 @rtype: list of (origin, hypervisor, parameters)
1282 @return: a list with all parameters found, indicating the hypervisor they
1283 apply to, and the origin (can be "cluster", "os X", or "instance Y")
1288 for hv_name in cluster.enabled_hypervisors:
1289 hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1291 for os_name, os_hvp in cluster.os_hvp.items():
1292 for hv_name, hv_params in os_hvp.items():
1294 full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1295 hvp_data.append(("os %s" % os_name, hv_name, full_params))
1297 # TODO: collapse identical parameter values in a single one
1298 for instance in instances:
1299 if instance.hvparams:
1300 hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1301 cluster.FillHV(instance)))
1306 class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1307 """Verifies the cluster config.
1312 def _VerifyHVP(self, hvp_data):
1313 """Verifies locally the syntax of the hypervisor parameters.
1316 for item, hv_name, hv_params in hvp_data:
1317 msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1320 hv_class = hypervisor.GetHypervisorClass(hv_name)
1321 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1322 hv_class.CheckParameterSyntax(hv_params)
1323 except errors.GenericError, err:
1324 self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1326 def ExpandNames(self):
1327 self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1328 self.share_locks = ShareAll()
1330 def CheckPrereq(self):
1331 """Check prerequisites.
1334 # Retrieve all information
1335 self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1336 self.all_node_info = self.cfg.GetAllNodesInfo()
1337 self.all_inst_info = self.cfg.GetAllInstancesInfo()
1339 def Exec(self, feedback_fn):
1340 """Verify integrity of cluster, performing various test on nodes.
1344 self._feedback_fn = feedback_fn
1346 feedback_fn("* Verifying cluster config")
1348 for msg in self.cfg.VerifyConfig():
1349 self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1351 feedback_fn("* Verifying cluster certificate files")
1353 for cert_filename in pathutils.ALL_CERT_FILES:
1354 (errcode, msg) = _VerifyCertificate(cert_filename)
1355 self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1357 feedback_fn("* Verifying hypervisor parameters")
1359 self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1360 self.all_inst_info.values()))
1362 feedback_fn("* Verifying all nodes belong to an existing group")
1364 # We do this verification here because, should this bogus circumstance
1365 # occur, it would never be caught by VerifyGroup, which only acts on
1366 # nodes/instances reachable from existing node groups.
1368 dangling_nodes = set(node for node in self.all_node_info.values()
1369 if node.group not in self.all_group_info)
1371 dangling_instances = {}
1372 no_node_instances = []
1374 for inst in self.all_inst_info.values():
1375 if inst.primary_node in [node.uuid for node in dangling_nodes]:
1376 dangling_instances.setdefault(inst.primary_node, []).append(inst.name)
1377 elif inst.primary_node not in self.all_node_info:
1378 no_node_instances.append(inst.name)
1383 utils.CommaJoin(dangling_instances.get(node.uuid,
1385 for node in dangling_nodes]
1387 self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1389 "the following nodes (and their instances) belong to a non"
1390 " existing group: %s", utils.CommaJoin(pretty_dangling))
1392 self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1394 "the following instances have a non-existing primary-node:"
1395 " %s", utils.CommaJoin(no_node_instances))
1400 class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1401 """Verifies the status of a node group.
1404 HPATH = "cluster-verify"
1405 HTYPE = constants.HTYPE_CLUSTER
1408 _HOOKS_INDENT_RE = re.compile("^", re.M)
1410 class NodeImage(object):
1411 """A class representing the logical and physical status of a node.
1414 @ivar uuid: the node UUID to which this object refers
1415 @ivar volumes: a structure as returned from
1416 L{ganeti.backend.GetVolumeList} (runtime)
1417 @ivar instances: a list of running instances (runtime)
1418 @ivar pinst: list of configured primary instances (config)
1419 @ivar sinst: list of configured secondary instances (config)
1420 @ivar sbp: dictionary of {primary-node: list of instances} for all
1421 instances for which this node is secondary (config)
1422 @ivar mfree: free memory, as reported by hypervisor (runtime)
1423 @ivar dfree: free disk, as reported by the node (runtime)
1424 @ivar offline: the offline status (config)
1425 @type rpc_fail: boolean
1426 @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1427 not whether the individual keys were correct) (runtime)
1428 @type lvm_fail: boolean
1429 @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1430 @type hyp_fail: boolean
1431 @ivar hyp_fail: whether the RPC call didn't return the instance list
1432 @type ghost: boolean
1433 @ivar ghost: whether this is a known node or not (config)
1434 @type os_fail: boolean
1435 @ivar os_fail: whether the RPC call didn't return valid OS data
1437 @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1438 @type vm_capable: boolean
1439 @ivar vm_capable: whether the node can host instances
1441 @ivar pv_min: size in MiB of the smallest PVs
1443 @ivar pv_max: size in MiB of the biggest PVs
1446 def __init__(self, offline=False, uuid=None, vm_capable=True):
1455 self.offline = offline
1456 self.vm_capable = vm_capable
1457 self.rpc_fail = False
1458 self.lvm_fail = False
1459 self.hyp_fail = False
1461 self.os_fail = False
1466 def ExpandNames(self):
1467 # This raises errors.OpPrereqError on its own:
1468 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1470 # Get instances in node group; this is unsafe and needs verification later
1472 self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1474 self.needed_locks = {
1475 locking.LEVEL_INSTANCE: inst_names,
1476 locking.LEVEL_NODEGROUP: [self.group_uuid],
1477 locking.LEVEL_NODE: [],
1479 # This opcode is run by watcher every five minutes and acquires all nodes
1480 # for a group. It doesn't run for a long time, so it's better to acquire
1481 # the node allocation lock as well.
1482 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1485 self.share_locks = ShareAll()
1487 def DeclareLocks(self, level):
1488 if level == locking.LEVEL_NODE:
1489 # Get members of node group; this is unsafe and needs verification later
1490 nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1492 all_inst_info = self.cfg.GetAllInstancesInfo()
1494 # In Exec(), we warn about mirrored instances that have primary and
1495 # secondary living in separate node groups. To fully verify that
1496 # volumes for these instances are healthy, we will need to do an
1497 # extra call to their secondaries. We ensure here those nodes will
1499 for inst in self.owned_locks(locking.LEVEL_INSTANCE):
1500 # Important: access only the instances whose lock is owned
1501 if all_inst_info[inst].disk_template in constants.DTS_INT_MIRROR:
1502 nodes.update(all_inst_info[inst].secondary_nodes)
1504 self.needed_locks[locking.LEVEL_NODE] = nodes
1506 def CheckPrereq(self):
1507 assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1508 self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1510 group_node_uuids = set(self.group_info.members)
1512 self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1514 unlocked_node_uuids = \
1515 group_node_uuids.difference(self.owned_locks(locking.LEVEL_NODE))
1517 unlocked_instances = \
1518 group_instances.difference(self.owned_locks(locking.LEVEL_INSTANCE))
1520 if unlocked_node_uuids:
1521 raise errors.OpPrereqError(
1522 "Missing lock for nodes: %s" %
1523 utils.CommaJoin(self.cfg.GetNodeNames(unlocked_node_uuids)),
1526 if unlocked_instances:
1527 raise errors.OpPrereqError("Missing lock for instances: %s" %
1528 utils.CommaJoin(unlocked_instances),
1531 self.all_node_info = self.cfg.GetAllNodesInfo()
1532 self.all_inst_info = self.cfg.GetAllInstancesInfo()
1534 self.my_node_uuids = group_node_uuids
1535 self.my_node_info = dict((node_uuid, self.all_node_info[node_uuid])
1536 for node_uuid in group_node_uuids)
1538 self.my_inst_names = utils.NiceSort(group_instances)
1539 self.my_inst_info = dict((name, self.all_inst_info[name])
1540 for name in self.my_inst_names)
1542 # We detect here the nodes that will need the extra RPC calls for verifying
1543 # split LV volumes; they should be locked.
1544 extra_lv_nodes = set()
1546 for inst in self.my_inst_info.values():
1547 if inst.disk_template in constants.DTS_INT_MIRROR:
1548 for nuuid in inst.all_nodes:
1549 if self.all_node_info[nuuid].group != self.group_uuid:
1550 extra_lv_nodes.add(nuuid)
1552 unlocked_lv_nodes = \
1553 extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1555 if unlocked_lv_nodes:
1556 raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1557 utils.CommaJoin(unlocked_lv_nodes),
1559 self.extra_lv_nodes = list(extra_lv_nodes)
1561 def _VerifyNode(self, ninfo, nresult):
1562 """Perform some basic validation on data returned from a node.
1564 - check the result data structure is well formed and has all the
1566 - check ganeti version
1568 @type ninfo: L{objects.Node}
1569 @param ninfo: the node to check
1570 @param nresult: the results from the node
1572 @return: whether overall this call was successful (and we can expect
1573 reasonable values in the respose)
1576 # main result, nresult should be a non-empty dict
1577 test = not nresult or not isinstance(nresult, dict)
1578 self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1579 "unable to verify node: no data returned")
1583 # compares ganeti version
1584 local_version = constants.PROTOCOL_VERSION
1585 remote_version = nresult.get("version", None)
1586 test = not (remote_version and
1587 isinstance(remote_version, (list, tuple)) and
1588 len(remote_version) == 2)
1589 self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1590 "connection to node returned invalid data")
1594 test = local_version != remote_version[0]
1595 self._ErrorIf(test, constants.CV_ENODEVERSION, ninfo.name,
1596 "incompatible protocol versions: master %s,"
1597 " node %s", local_version, remote_version[0])
1601 # node seems compatible, we can actually try to look into its results
1603 # full package version
1604 self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1605 constants.CV_ENODEVERSION, ninfo.name,
1606 "software version mismatch: master %s, node %s",
1607 constants.RELEASE_VERSION, remote_version[1],
1608 code=self.ETYPE_WARNING)
1610 hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1611 if ninfo.vm_capable and isinstance(hyp_result, dict):
1612 for hv_name, hv_result in hyp_result.iteritems():
1613 test = hv_result is not None
1614 self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
1615 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1617 hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1618 if ninfo.vm_capable and isinstance(hvp_result, list):
1619 for item, hv_name, hv_result in hvp_result:
1620 self._ErrorIf(True, constants.CV_ENODEHV, ninfo.name,
1621 "hypervisor %s parameter verify failure (source %s): %s",
1622 hv_name, item, hv_result)
1624 test = nresult.get(constants.NV_NODESETUP,
1625 ["Missing NODESETUP results"])
1626 self._ErrorIf(test, constants.CV_ENODESETUP, ninfo.name,
1627 "node setup error: %s", "; ".join(test))
1631 def _VerifyNodeTime(self, ninfo, nresult,
1632 nvinfo_starttime, nvinfo_endtime):
1633 """Check the node time.
1635 @type ninfo: L{objects.Node}
1636 @param ninfo: the node to check
1637 @param nresult: the remote results for the node
1638 @param nvinfo_starttime: the start time of the RPC call
1639 @param nvinfo_endtime: the end time of the RPC call
1642 ntime = nresult.get(constants.NV_TIME, None)
1644 ntime_merged = utils.MergeTime(ntime)
1645 except (ValueError, TypeError):
1646 self._ErrorIf(True, constants.CV_ENODETIME, ninfo.name,
1647 "Node returned invalid time")
1650 if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1651 ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1652 elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1653 ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1657 self._ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, ninfo.name,
1658 "Node time diverges by at least %s from master node time",
1661 def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1662 """Check the node LVM results and update info for cross-node checks.
1664 @type ninfo: L{objects.Node}
1665 @param ninfo: the node to check
1666 @param nresult: the remote results for the node
1667 @param vg_name: the configured VG name
1668 @type nimg: L{NodeImage}
1669 @param nimg: node image
1675 # checks vg existence and size > 20G
1676 vglist = nresult.get(constants.NV_VGLIST, None)
1678 self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
1679 "unable to check volume groups")
1681 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1682 constants.MIN_VG_SIZE)
1683 self._ErrorIf(vgstatus, constants.CV_ENODELVM, ninfo.name, vgstatus)
1686 (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1688 self._Error(constants.CV_ENODELVM, ninfo.name, em)
1689 if pvminmax is not None:
1690 (nimg.pv_min, nimg.pv_max) = pvminmax
1692 def _VerifyGroupDRBDVersion(self, node_verify_infos):
1693 """Check cross-node DRBD version consistency.
1695 @type node_verify_infos: dict
1696 @param node_verify_infos: infos about nodes as returned from the
1701 for node_uuid, ndata in node_verify_infos.items():
1702 nresult = ndata.payload
1703 version = nresult.get(constants.NV_DRBDVERSION, "Missing DRBD version")
1704 node_versions[node_uuid] = version
1706 if len(set(node_versions.values())) > 1:
1707 for node_uuid, version in sorted(node_versions.items()):
1708 msg = "DRBD version mismatch: %s" % version
1709 self._Error(constants.CV_ENODEDRBDHELPER, node_uuid, msg,
1710 code=self.ETYPE_WARNING)
1712 def _VerifyGroupLVM(self, node_image, vg_name):
1713 """Check cross-node consistency in LVM.
1715 @type node_image: dict
1716 @param node_image: info about nodes, mapping from node to names to
1717 L{NodeImage} objects
1718 @param vg_name: the configured VG name
1724 # Only exclusive storage needs this kind of checks
1725 if not self._exclusive_storage:
1728 # exclusive_storage wants all PVs to have the same size (approximately),
1729 # if the smallest and the biggest ones are okay, everything is fine.
1730 # pv_min is None iff pv_max is None
1731 vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1734 (pvmin, minnode_uuid) = min((ni.pv_min, ni.uuid) for ni in vals)
1735 (pvmax, maxnode_uuid) = max((ni.pv_max, ni.uuid) for ni in vals)
1736 bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1737 self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1738 "PV sizes differ too much in the group; smallest (%s MB) is"
1739 " on %s, biggest (%s MB) is on %s",
1740 pvmin, self.cfg.GetNodeName(minnode_uuid),
1741 pvmax, self.cfg.GetNodeName(maxnode_uuid))
1743 def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1744 """Check the node bridges.
1746 @type ninfo: L{objects.Node}
1747 @param ninfo: the node to check
1748 @param nresult: the remote results for the node
1749 @param bridges: the expected list of bridges
1755 missing = nresult.get(constants.NV_BRIDGES, None)
1756 test = not isinstance(missing, list)
1757 self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1758 "did not return valid bridge information")
1760 self._ErrorIf(bool(missing), constants.CV_ENODENET, ninfo.name,
1761 "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1763 def _VerifyNodeUserScripts(self, ninfo, nresult):
1764 """Check the results of user scripts presence and executability on the node
1766 @type ninfo: L{objects.Node}
1767 @param ninfo: the node to check
1768 @param nresult: the remote results for the node
1771 test = not constants.NV_USERSCRIPTS in nresult
1772 self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1773 "did not return user scripts information")
1775 broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
1777 self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1778 "user scripts not present or not executable: %s" %
1779 utils.CommaJoin(sorted(broken_scripts)))
1781 def _VerifyNodeNetwork(self, ninfo, nresult):
1782 """Check the node network connectivity results.
1784 @type ninfo: L{objects.Node}
1785 @param ninfo: the node to check
1786 @param nresult: the remote results for the node
1789 test = constants.NV_NODELIST not in nresult
1790 self._ErrorIf(test, constants.CV_ENODESSH, ninfo.name,
1791 "node hasn't returned node ssh connectivity data")
1793 if nresult[constants.NV_NODELIST]:
1794 for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1795 self._ErrorIf(True, constants.CV_ENODESSH, ninfo.name,
1796 "ssh communication with node '%s': %s", a_node, a_msg)
1798 test = constants.NV_NODENETTEST not in nresult
1799 self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1800 "node hasn't returned node tcp connectivity data")
1802 if nresult[constants.NV_NODENETTEST]:
1803 nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1805 self._ErrorIf(True, constants.CV_ENODENET, ninfo.name,
1806 "tcp communication with node '%s': %s",
1807 anode, nresult[constants.NV_NODENETTEST][anode])
1809 test = constants.NV_MASTERIP not in nresult
1810 self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1811 "node hasn't returned node master IP reachability data")
1813 if not nresult[constants.NV_MASTERIP]:
1814 if ninfo.uuid == self.master_node:
1815 msg = "the master node cannot reach the master IP (not configured?)"
1817 msg = "cannot reach the master IP"
1818 self._ErrorIf(True, constants.CV_ENODENET, ninfo.name, msg)
1820 def _VerifyInstance(self, instance, inst_config, node_image,
1822 """Verify an instance.
1824 This function checks to see if the required block devices are
1825 available on the instance's node, and that the nodes are in the correct
1829 pnode = inst_config.primary_node
1830 pnode_img = node_image[pnode]
1831 groupinfo = self.cfg.GetAllNodeGroupsInfo()
1833 node_vol_should = {}
1834 inst_config.MapLVsByNode(node_vol_should)
1836 cluster = self.cfg.GetClusterInfo()
1837 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
1839 err = ComputeIPolicyInstanceViolation(ipolicy, inst_config, self.cfg)
1840 self._ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance,
1841 utils.CommaJoin(err), code=self.ETYPE_WARNING)
1843 for node in node_vol_should:
1844 n_img = node_image[node]
1845 if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1846 # ignore missing volumes on offline or broken nodes
1848 for volume in node_vol_should[node]:
1849 test = volume not in n_img.volumes
1850 self._ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance,
1851 "volume %s missing on node %s", volume,
1852 self.cfg.GetNodeName(node))
1854 if inst_config.admin_state == constants.ADMINST_UP:
1855 test = instance not in pnode_img.instances and not pnode_img.offline
1856 self._ErrorIf(test, constants.CV_EINSTANCEDOWN, instance,
1857 "instance not running on its primary node %s",
1858 self.cfg.GetNodeName(pnode))
1859 self._ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE, instance,
1860 "instance is marked as running and lives on"
1861 " offline node %s", self.cfg.GetNodeName(pnode))
1863 diskdata = [(nname, success, status, idx)
1864 for (nname, disks) in diskstatus.items()
1865 for idx, (success, status) in enumerate(disks)]
1867 for nname, success, bdev_status, idx in diskdata:
1868 # the 'ghost node' construction in Exec() ensures that we have a
1870 snode = node_image[nname]
1871 bad_snode = snode.ghost or snode.offline
1872 self._ErrorIf(inst_config.disks_active and
1873 not success and not bad_snode,
1874 constants.CV_EINSTANCEFAULTYDISK, instance,
1875 "couldn't retrieve status for disk/%s on %s: %s",
1876 idx, self.cfg.GetNodeName(nname), bdev_status)
1877 self._ErrorIf((inst_config.disks_active and
1879 bdev_status.ldisk_status == constants.LDS_FAULTY),
1880 constants.CV_EINSTANCEFAULTYDISK, instance,
1881 "disk/%s on %s is faulty", idx, self.cfg.GetNodeName(nname))
1883 self._ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
1884 constants.CV_ENODERPC, pnode, "instance %s, connection to"
1885 " primary node failed", instance)
1887 self._ErrorIf(len(inst_config.secondary_nodes) > 1,
1888 constants.CV_EINSTANCELAYOUT,
1889 instance, "instance has multiple secondary nodes: %s",
1890 utils.CommaJoin(inst_config.secondary_nodes),
1891 code=self.ETYPE_WARNING)
1893 es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
1894 inst_config.all_nodes)
1895 if any(es_flags.values()):
1896 if inst_config.disk_template not in constants.DTS_EXCL_STORAGE:
1897 # Disk template not compatible with exclusive_storage: no instance
1898 # node should have the flag set
1900 for (n, es) in es_flags.items()
1902 self._Error(constants.CV_EINSTANCEUNSUITABLENODE, instance,
1903 "instance has template %s, which is not supported on nodes"
1904 " that have exclusive storage set: %s",
1905 inst_config.disk_template,
1906 utils.CommaJoin(self.cfg.GetNodeNames(es_nodes)))
1907 for (idx, disk) in enumerate(inst_config.disks):
1908 self._ErrorIf(disk.spindles is None,
1909 constants.CV_EINSTANCEMISSINGCFGPARAMETER, instance,
1910 "number of spindles not configured for disk %s while"
1911 " exclusive storage is enabled, try running"
1912 " gnt-cluster repair-disk-sizes", idx)
1914 if inst_config.disk_template in constants.DTS_INT_MIRROR:
1915 instance_nodes = utils.NiceSort(inst_config.all_nodes)
1916 instance_groups = {}
1918 for node in instance_nodes:
1919 instance_groups.setdefault(self.all_node_info[node].group,
1923 "%s (group %s)" % (utils.CommaJoin(self.cfg.GetNodeNames(nodes)),
1924 groupinfo[group].name)
1925 # Sort so that we always list the primary node first.
1926 for group, nodes in sorted(instance_groups.items(),
1927 key=lambda (_, nodes): pnode in nodes,
1930 self._ErrorIf(len(instance_groups) > 1,
1931 constants.CV_EINSTANCESPLITGROUPS,
1932 instance, "instance has primary and secondary nodes in"
1933 " different groups: %s", utils.CommaJoin(pretty_list),
1934 code=self.ETYPE_WARNING)
1936 inst_nodes_offline = []
1937 for snode in inst_config.secondary_nodes:
1938 s_img = node_image[snode]
1939 self._ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
1940 snode, "instance %s, connection to secondary node failed",
1944 inst_nodes_offline.append(snode)
1946 # warn that the instance lives on offline nodes
1947 self._ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE, instance,
1948 "instance has offline secondary node(s) %s",
1949 utils.CommaJoin(self.cfg.GetNodeNames(inst_nodes_offline)))
1950 # ... or ghost/non-vm_capable nodes
1951 for node in inst_config.all_nodes:
1952 self._ErrorIf(node_image[node].ghost, constants.CV_EINSTANCEBADNODE,
1953 instance, "instance lives on ghost node %s",
1954 self.cfg.GetNodeName(node))
1955 self._ErrorIf(not node_image[node].vm_capable,
1956 constants.CV_EINSTANCEBADNODE, instance,
1957 "instance lives on non-vm_capable node %s",
1958 self.cfg.GetNodeName(node))
1960 def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
1961 """Verify if there are any unknown volumes in the cluster.
1963 The .os, .swap and backup volumes are ignored. All other volumes are
1964 reported as unknown.
1966 @type reserved: L{ganeti.utils.FieldSet}
1967 @param reserved: a FieldSet of reserved volume names
1970 for node_uuid, n_img in node_image.items():
1971 if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
1972 self.all_node_info[node_uuid].group != self.group_uuid):
1973 # skip non-healthy nodes
1975 for volume in n_img.volumes:
1976 test = ((node_uuid not in node_vol_should or
1977 volume not in node_vol_should[node_uuid]) and
1978 not reserved.Matches(volume))
1979 self._ErrorIf(test, constants.CV_ENODEORPHANLV,
1980 self.cfg.GetNodeName(node_uuid),
1981 "volume %s is unknown", volume)
1983 def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
1984 """Verify N+1 Memory Resilience.
1986 Check that if one single node dies we can still start all the
1987 instances it was primary for.
1990 cluster_info = self.cfg.GetClusterInfo()
1991 for node_uuid, n_img in node_image.items():
1992 # This code checks that every node which is now listed as
1993 # secondary has enough memory to host all instances it is
1994 # supposed to should a single other node in the cluster fail.
1995 # FIXME: not ready for failover to an arbitrary node
1996 # FIXME: does not support file-backed instances
1997 # WARNING: we currently take into account down instances as well
1998 # as up ones, considering that even if they're down someone
1999 # might want to start them even in the event of a node failure.
2000 if n_img.offline or \
2001 self.all_node_info[node_uuid].group != self.group_uuid:
2002 # we're skipping nodes marked offline and nodes in other groups from
2003 # the N+1 warning, since most likely we don't have good memory
2004 # infromation from them; we already list instances living on such
2005 # nodes, and that's enough warning
2007 #TODO(dynmem): also consider ballooning out other instances
2008 for prinode, instances in n_img.sbp.items():
2010 for instance in instances:
2011 bep = cluster_info.FillBE(instance_cfg[instance])
2012 if bep[constants.BE_AUTO_BALANCE]:
2013 needed_mem += bep[constants.BE_MINMEM]
2014 test = n_img.mfree < needed_mem
2015 self._ErrorIf(test, constants.CV_ENODEN1,
2016 self.cfg.GetNodeName(node_uuid),
2017 "not enough memory to accomodate instance failovers"
2018 " should node %s fail (%dMiB needed, %dMiB available)",
2019 self.cfg.GetNodeName(prinode), needed_mem, n_img.mfree)
2021 def _VerifyFiles(self, nodes, master_node_uuid, all_nvinfo,
2022 (files_all, files_opt, files_mc, files_vm)):
2023 """Verifies file checksums collected from all nodes.
2025 @param nodes: List of L{objects.Node} objects
2026 @param master_node_uuid: UUID of master node
2027 @param all_nvinfo: RPC results
2030 # Define functions determining which nodes to consider for a file
2033 (files_mc, lambda node: (node.master_candidate or
2034 node.uuid == master_node_uuid)),
2035 (files_vm, lambda node: node.vm_capable),
2038 # Build mapping from filename to list of nodes which should have the file
2040 for (files, fn) in files2nodefn:
2044 filenodes = filter(fn, nodes)
2045 nodefiles.update((filename,
2046 frozenset(map(operator.attrgetter("uuid"), filenodes)))
2047 for filename in files)
2049 assert set(nodefiles) == (files_all | files_mc | files_vm)
2051 fileinfo = dict((filename, {}) for filename in nodefiles)
2052 ignore_nodes = set()
2056 ignore_nodes.add(node.uuid)
2059 nresult = all_nvinfo[node.uuid]
2061 if nresult.fail_msg or not nresult.payload:
2064 fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
2065 node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2066 for (key, value) in fingerprints.items())
2069 test = not (node_files and isinstance(node_files, dict))
2070 self._ErrorIf(test, constants.CV_ENODEFILECHECK, node.name,
2071 "Node did not return file checksum data")
2073 ignore_nodes.add(node.uuid)
2076 # Build per-checksum mapping from filename to nodes having it
2077 for (filename, checksum) in node_files.items():
2078 assert filename in nodefiles
2079 fileinfo[filename].setdefault(checksum, set()).add(node.uuid)
2081 for (filename, checksums) in fileinfo.items():
2082 assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2084 # Nodes having the file
2085 with_file = frozenset(node_uuid
2086 for node_uuids in fileinfo[filename].values()
2087 for node_uuid in node_uuids) - ignore_nodes
2089 expected_nodes = nodefiles[filename] - ignore_nodes
2091 # Nodes missing file
2092 missing_file = expected_nodes - with_file
2094 if filename in files_opt:
2096 self._ErrorIf(missing_file and missing_file != expected_nodes,
2097 constants.CV_ECLUSTERFILECHECK, None,
2098 "File %s is optional, but it must exist on all or no"
2099 " nodes (not found on %s)",
2103 map(self.cfg.GetNodeName, missing_file))))
2105 self._ErrorIf(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2106 "File %s is missing from node(s) %s", filename,
2109 map(self.cfg.GetNodeName, missing_file))))
2111 # Warn if a node has a file it shouldn't
2112 unexpected = with_file - expected_nodes
2113 self._ErrorIf(unexpected,
2114 constants.CV_ECLUSTERFILECHECK, None,
2115 "File %s should not exist on node(s) %s",
2116 filename, utils.CommaJoin(
2117 utils.NiceSort(map(self.cfg.GetNodeName, unexpected))))
2119 # See if there are multiple versions of the file
2120 test = len(checksums) > 1
2122 variants = ["variant %s on %s" %
2124 utils.CommaJoin(utils.NiceSort(
2125 map(self.cfg.GetNodeName, node_uuids))))
2126 for (idx, (checksum, node_uuids)) in
2127 enumerate(sorted(checksums.items()))]
2131 self._ErrorIf(test, constants.CV_ECLUSTERFILECHECK, None,
2132 "File %s found with %s different checksums (%s)",
2133 filename, len(checksums), "; ".join(variants))
2135 def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2137 """Verifies and the node DRBD status.
2139 @type ninfo: L{objects.Node}
2140 @param ninfo: the node to check
2141 @param nresult: the remote results for the node
2142 @param instanceinfo: the dict of instances
2143 @param drbd_helper: the configured DRBD usermode helper
2144 @param drbd_map: the DRBD map as returned by
2145 L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2149 helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2150 test = (helper_result is None)
2151 self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2152 "no drbd usermode helper returned")
2154 status, payload = helper_result
2156 self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2157 "drbd usermode helper check unsuccessful: %s", payload)
2158 test = status and (payload != drbd_helper)
2159 self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2160 "wrong drbd usermode helper: %s", payload)
2162 # compute the DRBD minors
2164 for minor, instance in drbd_map[ninfo.uuid].items():
2165 test = instance not in instanceinfo
2166 self._ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2167 "ghost instance '%s' in temporary DRBD map", instance)
2168 # ghost instance should not be running, but otherwise we
2169 # don't give double warnings (both ghost instance and
2170 # unallocated minor in use)
2172 node_drbd[minor] = (instance, False)
2174 instance = instanceinfo[instance]
2175 node_drbd[minor] = (instance.name, instance.disks_active)
2177 # and now check them
2178 used_minors = nresult.get(constants.NV_DRBDLIST, [])
2179 test = not isinstance(used_minors, (tuple, list))
2180 self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2181 "cannot parse drbd status file: %s", str(used_minors))
2183 # we cannot check drbd status
2186 for minor, (iname, must_exist) in node_drbd.items():
2187 test = minor not in used_minors and must_exist
2188 self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2189 "drbd minor %d of instance %s is not active", minor, iname)
2190 for minor in used_minors:
2191 test = minor not in node_drbd
2192 self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2193 "unallocated drbd minor %d is in use", minor)
2195 def _UpdateNodeOS(self, ninfo, nresult, nimg):
2196 """Builds the node OS structures.
2198 @type ninfo: L{objects.Node}
2199 @param ninfo: the node to check
2200 @param nresult: the remote results for the node
2201 @param nimg: the node image object
2204 remote_os = nresult.get(constants.NV_OSLIST, None)
2205 test = (not isinstance(remote_os, list) or
2206 not compat.all(isinstance(v, list) and len(v) == 7
2207 for v in remote_os))
2209 self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2210 "node hasn't returned valid OS data")
2219 for (name, os_path, status, diagnose,
2220 variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2222 if name not in os_dict:
2225 # parameters is a list of lists instead of list of tuples due to
2226 # JSON lacking a real tuple type, fix it:
2227 parameters = [tuple(v) for v in parameters]
2228 os_dict[name].append((os_path, status, diagnose,
2229 set(variants), set(parameters), set(api_ver)))
2231 nimg.oslist = os_dict
2233 def _VerifyNodeOS(self, ninfo, nimg, base):
2234 """Verifies the node OS list.
2236 @type ninfo: L{objects.Node}
2237 @param ninfo: the node to check
2238 @param nimg: the node image object
2239 @param base: the 'template' node we match against (e.g. from the master)
2242 assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2244 beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2245 for os_name, os_data in nimg.oslist.items():
2246 assert os_data, "Empty OS status for OS %s?!" % os_name
2247 f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2248 self._ErrorIf(not f_status, constants.CV_ENODEOS, ninfo.name,
2249 "Invalid OS %s (located at %s): %s",
2250 os_name, f_path, f_diag)
2251 self._ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, ninfo.name,
2252 "OS '%s' has multiple entries"
2253 " (first one shadows the rest): %s",
2254 os_name, utils.CommaJoin([v[0] for v in os_data]))
2255 # comparisons with the 'base' image
2256 test = os_name not in base.oslist
2257 self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2258 "Extra OS %s not present on reference node (%s)",
2259 os_name, self.cfg.GetNodeName(base.uuid))
2262 assert base.oslist[os_name], "Base node has empty OS status?"
2263 _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2265 # base OS is invalid, skipping
2267 for kind, a, b in [("API version", f_api, b_api),
2268 ("variants list", f_var, b_var),
2269 ("parameters", beautify_params(f_param),
2270 beautify_params(b_param))]:
2271 self._ErrorIf(a != b, constants.CV_ENODEOS, ninfo.name,
2272 "OS %s for %s differs from reference node %s:"
2273 " [%s] vs. [%s]", kind, os_name,
2274 self.cfg.GetNodeName(base.uuid),
2275 utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2277 # check any missing OSes
2278 missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2279 self._ErrorIf(missing, constants.CV_ENODEOS, ninfo.name,
2280 "OSes present on reference node %s"
2281 " but missing on this node: %s",
2282 self.cfg.GetNodeName(base.uuid), utils.CommaJoin(missing))
2284 def _VerifyFileStoragePaths(self, ninfo, nresult, is_master):
2285 """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2287 @type ninfo: L{objects.Node}
2288 @param ninfo: the node to check
2289 @param nresult: the remote results for the node
2290 @type is_master: bool
2291 @param is_master: Whether node is the master node
2295 (constants.ENABLE_FILE_STORAGE or
2296 constants.ENABLE_SHARED_FILE_STORAGE)):
2298 fspaths = nresult[constants.NV_FILE_STORAGE_PATHS]
2300 # This should never happen
2301 self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2302 "Node did not return forbidden file storage paths")
2304 self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2305 "Found forbidden file storage paths: %s",
2306 utils.CommaJoin(fspaths))
2308 self._ErrorIf(constants.NV_FILE_STORAGE_PATHS in nresult,
2309 constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2310 "Node should not have returned forbidden file storage"
2313 def _VerifyOob(self, ninfo, nresult):
2314 """Verifies out of band functionality of a node.
2316 @type ninfo: L{objects.Node}
2317 @param ninfo: the node to check
2318 @param nresult: the remote results for the node
2321 # We just have to verify the paths on master and/or master candidates
2322 # as the oob helper is invoked on the master
2323 if ((ninfo.master_candidate or ninfo.master_capable) and
2324 constants.NV_OOB_PATHS in nresult):
2325 for path_result in nresult[constants.NV_OOB_PATHS]:
2326 self._ErrorIf(path_result, constants.CV_ENODEOOBPATH,
2327 ninfo.name, path_result)
2329 def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2330 """Verifies and updates the node volume data.
2332 This function will update a L{NodeImage}'s internal structures
2333 with data from the remote call.
2335 @type ninfo: L{objects.Node}
2336 @param ninfo: the node to check
2337 @param nresult: the remote results for the node
2338 @param nimg: the node image object
2339 @param vg_name: the configured VG name
2342 nimg.lvm_fail = True
2343 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2346 elif isinstance(lvdata, basestring):
2347 self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2348 "LVM problem on node: %s", utils.SafeEncode(lvdata))
2349 elif not isinstance(lvdata, dict):
2350 self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2351 "rpc call to node failed (lvlist)")
2353 nimg.volumes = lvdata
2354 nimg.lvm_fail = False
2356 def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2357 """Verifies and updates the node instance list.
2359 If the listing was successful, then updates this node's instance
2360 list. Otherwise, it marks the RPC call as failed for the instance
2363 @type ninfo: L{objects.Node}
2364 @param ninfo: the node to check
2365 @param nresult: the remote results for the node
2366 @param nimg: the node image object
2369 idata = nresult.get(constants.NV_INSTANCELIST, None)
2370 test = not isinstance(idata, list)
2371 self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2372 "rpc call to node failed (instancelist): %s",
2373 utils.SafeEncode(str(idata)))
2375 nimg.hyp_fail = True
2377 nimg.instances = idata
2379 def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2380 """Verifies and computes a node information map
2382 @type ninfo: L{objects.Node}
2383 @param ninfo: the node to check
2384 @param nresult: the remote results for the node
2385 @param nimg: the node image object
2386 @param vg_name: the configured VG name
2389 # try to read free memory (from the hypervisor)
2390 hv_info = nresult.get(constants.NV_HVINFO, None)
2391 test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2392 self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2393 "rpc call to node failed (hvinfo)")
2396 nimg.mfree = int(hv_info["memory_free"])
2397 except (ValueError, TypeError):
2398 self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2399 "node returned invalid nodeinfo, check hypervisor")
2401 # FIXME: devise a free space model for file based instances as well
2402 if vg_name is not None:
2403 test = (constants.NV_VGLIST not in nresult or
2404 vg_name not in nresult[constants.NV_VGLIST])
2405 self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
2406 "node didn't return data for the volume group '%s'"
2407 " - it is either missing or broken", vg_name)
2410 nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2411 except (ValueError, TypeError):
2412 self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2413 "node returned invalid LVM info, check LVM status")
2415 def _CollectDiskInfo(self, node_uuids, node_image, instanceinfo):
2416 """Gets per-disk status information for all instances.
2418 @type node_uuids: list of strings
2419 @param node_uuids: Node UUIDs
2420 @type node_image: dict of (name, L{objects.Node})
2421 @param node_image: Node objects
2422 @type instanceinfo: dict of (name, L{objects.Instance})
2423 @param instanceinfo: Instance objects
2424 @rtype: {instance: {node: [(succes, payload)]}}
2425 @return: a dictionary of per-instance dictionaries with nodes as
2426 keys and disk information as values; the disk information is a
2427 list of tuples (success, payload)
2431 node_disks_devonly = {}
2432 diskless_instances = set()
2433 diskless = constants.DT_DISKLESS
2435 for nuuid in node_uuids:
2436 node_instances = list(itertools.chain(node_image[nuuid].pinst,
2437 node_image[nuuid].sinst))
2438 diskless_instances.update(inst for inst in node_instances
2439 if instanceinfo[inst].disk_template == diskless)
2440 disks = [(inst, disk)
2441 for inst in node_instances
2442 for disk in instanceinfo[inst].disks]
2445 # No need to collect data
2448 node_disks[nuuid] = disks
2450 # _AnnotateDiskParams makes already copies of the disks
2452 for (inst, dev) in disks:
2453 (anno_disk,) = AnnotateDiskParams(instanceinfo[inst], [dev], self.cfg)
2454 self.cfg.SetDiskID(anno_disk, nuuid)
2455 devonly.append(anno_disk)
2457 node_disks_devonly[nuuid] = devonly
2459 assert len(node_disks) == len(node_disks_devonly)
2461 # Collect data from all nodes with disks
2462 result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2465 assert len(result) == len(node_disks)
2469 for (nuuid, nres) in result.items():
2470 node = self.cfg.GetNodeInfo(nuuid)
2471 disks = node_disks[node.uuid]
2474 # No data from this node
2475 data = len(disks) * [(False, "node offline")]
2478 self._ErrorIf(msg, constants.CV_ENODERPC, node.name,
2479 "while getting disk information: %s", msg)
2481 # No data from this node
2482 data = len(disks) * [(False, msg)]
2485 for idx, i in enumerate(nres.payload):
2486 if isinstance(i, (tuple, list)) and len(i) == 2:
2489 logging.warning("Invalid result from node %s, entry %d: %s",
2491 data.append((False, "Invalid result from the remote node"))
2493 for ((inst, _), status) in zip(disks, data):
2494 instdisk.setdefault(inst, {}).setdefault(node.uuid, []).append(status)
2496 # Add empty entries for diskless instances.
2497 for inst in diskless_instances:
2498 assert inst not in instdisk
2501 assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2502 len(nuuids) <= len(instanceinfo[inst].all_nodes) and
2503 compat.all(isinstance(s, (tuple, list)) and
2504 len(s) == 2 for s in statuses)
2505 for inst, nuuids in instdisk.items()
2506 for nuuid, statuses in nuuids.items())
2508 instdisk_keys = set(instdisk)
2509 instanceinfo_keys = set(instanceinfo)
2510 assert instdisk_keys == instanceinfo_keys, \
2511 ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2512 (instdisk_keys, instanceinfo_keys))
2517 def _SshNodeSelector(group_uuid, all_nodes):
2518 """Create endless iterators for all potential SSH check hosts.
2521 nodes = [node for node in all_nodes
2522 if (node.group != group_uuid and
2524 keyfunc = operator.attrgetter("group")
2526 return map(itertools.cycle,
2527 [sorted(map(operator.attrgetter("name"), names))
2528 for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2532 def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2533 """Choose which nodes should talk to which other nodes.
2535 We will make nodes contact all nodes in their group, and one node from
2538 @warning: This algorithm has a known issue if one node group is much
2539 smaller than others (e.g. just one node). In such a case all other
2540 nodes will talk to the single node.
2543 online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2544 sel = cls._SshNodeSelector(group_uuid, all_nodes)
2546 return (online_nodes,
2547 dict((name, sorted([i.next() for i in sel]))
2548 for name in online_nodes))
2550 def BuildHooksEnv(self):
2553 Cluster-Verify hooks just ran in the post phase and their failure makes
2554 the output be logged in the verify output and the verification to fail.
2558 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2561 env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2562 for node in self.my_node_info.values())
2566 def BuildHooksNodes(self):
2567 """Build hooks nodes.
2570 return ([], list(self.my_node_info.keys()))
2572 def Exec(self, feedback_fn):
2573 """Verify integrity of the node group, performing various test on nodes.
2576 # This method has too many local variables. pylint: disable=R0914
2577 feedback_fn("* Verifying group '%s'" % self.group_info.name)
2579 if not self.my_node_uuids:
2581 feedback_fn("* Empty node group, skipping verification")
2585 verbose = self.op.verbose
2586 self._feedback_fn = feedback_fn
2588 vg_name = self.cfg.GetVGName()
2589 drbd_helper = self.cfg.GetDRBDHelper()
2590 cluster = self.cfg.GetClusterInfo()
2591 hypervisors = cluster.enabled_hypervisors
2592 node_data_list = self.my_node_info.values()
2594 i_non_redundant = [] # Non redundant instances
2595 i_non_a_balanced = [] # Non auto-balanced instances
2596 i_offline = 0 # Count of offline instances
2597 n_offline = 0 # Count of offline nodes
2598 n_drained = 0 # Count of nodes being drained
2599 node_vol_should = {}
2601 # FIXME: verify OS list
2604 filemap = ComputeAncillaryFiles(cluster, False)
2606 # do local checksums
2607 master_node_uuid = self.master_node = self.cfg.GetMasterNode()
2608 master_ip = self.cfg.GetMasterIP()
2610 feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_uuids))
2613 if self.cfg.GetUseExternalMipScript():
2614 user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2616 node_verify_param = {
2617 constants.NV_FILELIST:
2618 map(vcluster.MakeVirtualPath,
2619 utils.UniqueSequence(filename
2620 for files in filemap
2621 for filename in files)),
2622 constants.NV_NODELIST:
2623 self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2624 self.all_node_info.values()),
2625 constants.NV_HYPERVISOR: hypervisors,
2626 constants.NV_HVPARAMS:
2627 _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2628 constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2629 for node in node_data_list
2630 if not node.offline],
2631 constants.NV_INSTANCELIST: hypervisors,
2632 constants.NV_VERSION: None,
2633 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2634 constants.NV_NODESETUP: None,
2635 constants.NV_TIME: None,
2636 constants.NV_MASTERIP: (self.cfg.GetMasterNodeName(), master_ip),
2637 constants.NV_OSLIST: None,
2638 constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2639 constants.NV_USERSCRIPTS: user_scripts,
2642 if vg_name is not None:
2643 node_verify_param[constants.NV_VGLIST] = None
2644 node_verify_param[constants.NV_LVLIST] = vg_name
2645 node_verify_param[constants.NV_PVLIST] = [vg_name]
2648 node_verify_param[constants.NV_DRBDVERSION] = None
2649 node_verify_param[constants.NV_DRBDLIST] = None
2650 node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2652 if constants.ENABLE_FILE_STORAGE or constants.ENABLE_SHARED_FILE_STORAGE:
2653 # Load file storage paths only from master node
2654 node_verify_param[constants.NV_FILE_STORAGE_PATHS] = \
2655 self.cfg.GetMasterNodeName()
2658 # FIXME: this needs to be changed per node-group, not cluster-wide
2660 default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2661 if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2662 bridges.add(default_nicpp[constants.NIC_LINK])
2663 for instance in self.my_inst_info.values():
2664 for nic in instance.nics:
2665 full_nic = cluster.SimpleFillNIC(nic.nicparams)
2666 if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2667 bridges.add(full_nic[constants.NIC_LINK])
2670 node_verify_param[constants.NV_BRIDGES] = list(bridges)
2672 # Build our expected cluster state
2673 node_image = dict((node.uuid, self.NodeImage(offline=node.offline,
2675 vm_capable=node.vm_capable))
2676 for node in node_data_list)
2680 for node in self.all_node_info.values():
2681 path = SupportsOob(self.cfg, node)
2682 if path and path not in oob_paths:
2683 oob_paths.append(path)
2686 node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2688 for instance in self.my_inst_names:
2689 inst_config = self.my_inst_info[instance]
2690 if inst_config.admin_state == constants.ADMINST_OFFLINE:
2693 for nuuid in inst_config.all_nodes:
2694 if nuuid not in node_image:
2695 gnode = self.NodeImage(uuid=nuuid)
2696 gnode.ghost = (nuuid not in self.all_node_info)
2697 node_image[nuuid] = gnode
2699 inst_config.MapLVsByNode(node_vol_should)
2701 pnode = inst_config.primary_node
2702 node_image[pnode].pinst.append(instance)
2704 for snode in inst_config.secondary_nodes:
2705 nimg = node_image[snode]
2706 nimg.sinst.append(instance)
2707 if pnode not in nimg.sbp:
2708 nimg.sbp[pnode] = []
2709 nimg.sbp[pnode].append(instance)
2711 es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
2712 self.my_node_info.keys())
2713 # The value of exclusive_storage should be the same across the group, so if
2714 # it's True for at least a node, we act as if it were set for all the nodes
2715 self._exclusive_storage = compat.any(es_flags.values())
2716 if self._exclusive_storage:
2717 node_verify_param[constants.NV_EXCLUSIVEPVS] = True
2719 # At this point, we have the in-memory data structures complete,
2720 # except for the runtime information, which we'll gather next
2722 # Due to the way our RPC system works, exact response times cannot be
2723 # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2724 # time before and after executing the request, we can at least have a time
2726 nvinfo_starttime = time.time()
2727 all_nvinfo = self.rpc.call_node_verify(self.my_node_uuids,
2729 self.cfg.GetClusterName(),
2730 self.cfg.GetClusterInfo().hvparams)
2731 nvinfo_endtime = time.time()
2733 if self.extra_lv_nodes and vg_name is not None:
2735 self.rpc.call_node_verify(self.extra_lv_nodes,
2736 {constants.NV_LVLIST: vg_name},
2737 self.cfg.GetClusterName(),
2738 self.cfg.GetClusterInfo().hvparams)
2740 extra_lv_nvinfo = {}
2742 all_drbd_map = self.cfg.ComputeDRBDMap()
2744 feedback_fn("* Gathering disk information (%s nodes)" %
2745 len(self.my_node_uuids))
2746 instdisk = self._CollectDiskInfo(self.my_node_info.keys(), node_image,
2749 feedback_fn("* Verifying configuration file consistency")
2751 # If not all nodes are being checked, we need to make sure the master node
2752 # and a non-checked vm_capable node are in the list.
2753 absent_node_uuids = set(self.all_node_info).difference(self.my_node_info)
2754 if absent_node_uuids:
2755 vf_nvinfo = all_nvinfo.copy()
2756 vf_node_info = list(self.my_node_info.values())
2757 additional_node_uuids = []
2758 if master_node_uuid not in self.my_node_info:
2759 additional_node_uuids.append(master_node_uuid)
2760 vf_node_info.append(self.all_node_info[master_node_uuid])
2761 # Add the first vm_capable node we find which is not included,
2762 # excluding the master node (which we already have)
2763 for node_uuid in absent_node_uuids:
2764 nodeinfo = self.all_node_info[node_uuid]
2765 if (nodeinfo.vm_capable and not nodeinfo.offline and
2766 node_uuid != master_node_uuid):
2767 additional_node_uuids.append(node_uuid)
2768 vf_node_info.append(self.all_node_info[node_uuid])
2770 key = constants.NV_FILELIST
2771 vf_nvinfo.update(self.rpc.call_node_verify(
2772 additional_node_uuids, {key: node_verify_param[key]},
2773 self.cfg.GetClusterName(), self.cfg.GetClusterInfo().hvparams))
2775 vf_nvinfo = all_nvinfo
2776 vf_node_info = self.my_node_info.values()
2778 self._VerifyFiles(vf_node_info, master_node_uuid, vf_nvinfo, filemap)
2780 feedback_fn("* Verifying node status")
2784 for node_i in node_data_list:
2785 nimg = node_image[node_i.uuid]
2789 feedback_fn("* Skipping offline node %s" % (node_i.name,))
2793 if node_i.uuid == master_node_uuid:
2795 elif node_i.master_candidate:
2796 ntype = "master candidate"
2797 elif node_i.drained:
2803 feedback_fn("* Verifying node %s (%s)" % (node_i.name, ntype))
2805 msg = all_nvinfo[node_i.uuid].fail_msg
2806 self._ErrorIf(msg, constants.CV_ENODERPC, node_i.name,
2807 "while contacting node: %s", msg)
2809 nimg.rpc_fail = True
2812 nresult = all_nvinfo[node_i.uuid].payload
2814 nimg.call_ok = self._VerifyNode(node_i, nresult)
2815 self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2816 self._VerifyNodeNetwork(node_i, nresult)
2817 self._VerifyNodeUserScripts(node_i, nresult)
2818 self._VerifyOob(node_i, nresult)
2819 self._VerifyFileStoragePaths(node_i, nresult,
2820 node_i.uuid == master_node_uuid)
2823 self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
2824 self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
2827 self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2828 self._UpdateNodeInstances(node_i, nresult, nimg)
2829 self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2830 self._UpdateNodeOS(node_i, nresult, nimg)
2832 if not nimg.os_fail:
2833 if refos_img is None:
2835 self._VerifyNodeOS(node_i, nimg, refos_img)
2836 self._VerifyNodeBridges(node_i, nresult, bridges)
2838 # Check whether all running instancies are primary for the node. (This
2839 # can no longer be done from _VerifyInstance below, since some of the
2840 # wrong instances could be from other node groups.)
2841 non_primary_inst = set(nimg.instances).difference(nimg.pinst)
2843 for inst in non_primary_inst:
2844 test = inst in self.all_inst_info
2845 self._ErrorIf(test, constants.CV_EINSTANCEWRONGNODE, inst,
2846 "instance should not run on node %s", node_i.name)
2847 self._ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
2848 "node is running unknown instance %s", inst)
2850 self._VerifyGroupDRBDVersion(all_nvinfo)
2851 self._VerifyGroupLVM(node_image, vg_name)
2853 for node_uuid, result in extra_lv_nvinfo.items():
2854 self._UpdateNodeVolumes(self.all_node_info[node_uuid], result.payload,
2855 node_image[node_uuid], vg_name)
2857 feedback_fn("* Verifying instance status")
2858 for instance in self.my_inst_names:
2860 feedback_fn("* Verifying instance %s" % instance)
2861 inst_config = self.my_inst_info[instance]
2862 self._VerifyInstance(instance, inst_config, node_image,
2865 # If the instance is non-redundant we cannot survive losing its primary
2866 # node, so we are not N+1 compliant.
2867 if inst_config.disk_template not in constants.DTS_MIRRORED:
2868 i_non_redundant.append(instance)
2870 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
2871 i_non_a_balanced.append(instance)
2873 feedback_fn("* Verifying orphan volumes")
2874 reserved = utils.FieldSet(*cluster.reserved_lvs)
2876 # We will get spurious "unknown volume" warnings if any node of this group
2877 # is secondary for an instance whose primary is in another group. To avoid
2878 # them, we find these instances and add their volumes to node_vol_should.
2879 for inst in self.all_inst_info.values():
2880 for secondary in inst.secondary_nodes:
2881 if (secondary in self.my_node_info
2882 and inst.name not in self.my_inst_info):
2883 inst.MapLVsByNode(node_vol_should)
2886 self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
2888 if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
2889 feedback_fn("* Verifying N+1 Memory redundancy")
2890 self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
2892 feedback_fn("* Other Notes")
2894 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
2895 % len(i_non_redundant))
2897 if i_non_a_balanced:
2898 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
2899 % len(i_non_a_balanced))
2902 feedback_fn(" - NOTICE: %d offline instance(s) found." % i_offline)
2905 feedback_fn(" - NOTICE: %d offline node(s) found." % n_offline)
2908 feedback_fn(" - NOTICE: %d drained node(s) found." % n_drained)
2912 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
2913 """Analyze the post-hooks' result
2915 This method analyses the hook result, handles it, and sends some
2916 nicely-formatted feedback back to the user.
2918 @param phase: one of L{constants.HOOKS_PHASE_POST} or
2919 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
2920 @param hooks_results: the results of the multi-node hooks rpc call
2921 @param feedback_fn: function used send feedback back to the caller
2922 @param lu_result: previous Exec result
2923 @return: the new Exec result, based on the previous result
2927 # We only really run POST phase hooks, only for non-empty groups,
2928 # and are only interested in their results
2929 if not self.my_node_uuids:
2932 elif phase == constants.HOOKS_PHASE_POST:
2933 # Used to change hooks' output to proper indentation
2934 feedback_fn("* Hooks Results")
2935 assert hooks_results, "invalid result from hooks"
2937 for node_name in hooks_results:
2938 res = hooks_results[node_name]
2940 test = msg and not res.offline
2941 self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
2942 "Communication failure in hooks execution: %s", msg)
2943 if res.offline or msg:
2944 # No need to investigate payload if node is offline or gave
2947 for script, hkr, output in res.payload:
2948 test = hkr == constants.HKR_FAIL
2949 self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
2950 "Script %s failed, output:", script)
2952 output = self._HOOKS_INDENT_RE.sub(" ", output)
2953 feedback_fn("%s" % output)
2959 class LUClusterVerifyDisks(NoHooksLU):
2960 """Verifies the cluster disks status.
2965 def ExpandNames(self):
2966 self.share_locks = ShareAll()
2967 self.needed_locks = {
2968 locking.LEVEL_NODEGROUP: locking.ALL_SET,
2971 def Exec(self, feedback_fn):
2972 group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
2974 # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
2975 return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
2976 for group in group_names])