4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Logical units dealing with the cluster."""
34 from ganeti import compat
35 from ganeti import constants
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import masterd
40 from ganeti import netutils
41 from ganeti import objects
42 from ganeti import opcodes
43 from ganeti import pathutils
44 from ganeti import query
45 from ganeti import rpc
46 from ganeti import runtime
47 from ganeti import ssh
48 from ganeti import uidpool
49 from ganeti import utils
50 from ganeti import vcluster
52 from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
54 from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55 ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56 GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57 GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58 CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59 ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob, \
60 CheckIpolicyVsDiskTemplates
62 import ganeti.masterd.instance
65 class LUClusterActivateMasterIp(NoHooksLU):
66 """Activate the master IP on the master node.
69 def Exec(self, feedback_fn):
70 """Activate the master IP.
73 master_params = self.cfg.GetMasterNetworkParameters()
74 ems = self.cfg.GetUseExternalMipScript()
75 result = self.rpc.call_node_activate_master_ip(master_params.uuid,
77 result.Raise("Could not activate the master IP")
80 class LUClusterDeactivateMasterIp(NoHooksLU):
81 """Deactivate the master IP on the master node.
84 def Exec(self, feedback_fn):
85 """Deactivate the master IP.
88 master_params = self.cfg.GetMasterNetworkParameters()
89 ems = self.cfg.GetUseExternalMipScript()
90 result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
92 result.Raise("Could not deactivate the master IP")
95 class LUClusterConfigQuery(NoHooksLU):
96 """Return configuration values.
101 def CheckArguments(self):
102 self.cq = ClusterQuery(None, self.op.output_fields, False)
104 def ExpandNames(self):
105 self.cq.ExpandNames(self)
107 def DeclareLocks(self, level):
108 self.cq.DeclareLocks(self, level)
110 def Exec(self, feedback_fn):
111 result = self.cq.OldStyleQuery(self)
113 assert len(result) == 1
118 class LUClusterDestroy(LogicalUnit):
119 """Logical unit for destroying the cluster.
122 HPATH = "cluster-destroy"
123 HTYPE = constants.HTYPE_CLUSTER
125 def BuildHooksEnv(self):
130 "OP_TARGET": self.cfg.GetClusterName(),
133 def BuildHooksNodes(self):
134 """Build hooks nodes.
139 def CheckPrereq(self):
140 """Check prerequisites.
142 This checks whether the cluster is empty.
144 Any errors are signaled by raising errors.OpPrereqError.
147 master = self.cfg.GetMasterNode()
149 nodelist = self.cfg.GetNodeList()
150 if len(nodelist) != 1 or nodelist[0] != master:
151 raise errors.OpPrereqError("There are still %d node(s) in"
152 " this cluster." % (len(nodelist) - 1),
154 instancelist = self.cfg.GetInstanceList()
156 raise errors.OpPrereqError("There are still %d instance(s) in"
157 " this cluster." % len(instancelist),
160 def Exec(self, feedback_fn):
161 """Destroys the cluster.
164 master_params = self.cfg.GetMasterNetworkParameters()
166 # Run post hooks on master node before it's removed
167 RunPostHook(self, self.cfg.GetNodeName(master_params.uuid))
169 ems = self.cfg.GetUseExternalMipScript()
170 result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
172 result.Warn("Error disabling the master IP address", self.LogWarning)
173 return master_params.uuid
176 class LUClusterPostInit(LogicalUnit):
177 """Logical unit for running hooks after cluster initialization.
180 HPATH = "cluster-init"
181 HTYPE = constants.HTYPE_CLUSTER
183 def BuildHooksEnv(self):
188 "OP_TARGET": self.cfg.GetClusterName(),
191 def BuildHooksNodes(self):
192 """Build hooks nodes.
195 return ([], [self.cfg.GetMasterNode()])
197 def Exec(self, feedback_fn):
204 class ClusterQuery(QueryBase):
205 FIELDS = query.CLUSTER_FIELDS
207 #: Do not sort (there is only one item)
210 def ExpandNames(self, lu):
213 # The following variables interact with _QueryBase._GetNames
214 self.wanted = locking.ALL_SET
215 self.do_locking = self.use_locking
218 raise errors.OpPrereqError("Can not use locking for cluster queries",
221 def DeclareLocks(self, lu, level):
224 def _GetQueryData(self, lu):
225 """Computes the list of nodes and their attributes.
228 # Locking is not used
229 assert not (compat.any(lu.glm.is_owned(level)
230 for level in locking.LEVELS
231 if level != locking.LEVEL_CLUSTER) or
232 self.do_locking or self.use_locking)
234 if query.CQ_CONFIG in self.requested_data:
235 cluster = lu.cfg.GetClusterInfo()
236 nodes = lu.cfg.GetAllNodesInfo()
238 cluster = NotImplemented
239 nodes = NotImplemented
241 if query.CQ_QUEUE_DRAINED in self.requested_data:
242 drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
244 drain_flag = NotImplemented
246 if query.CQ_WATCHER_PAUSE in self.requested_data:
247 master_node_uuid = lu.cfg.GetMasterNode()
249 result = lu.rpc.call_get_watcher_pause(master_node_uuid)
250 result.Raise("Can't retrieve watcher pause from master node '%s'" %
251 lu.cfg.GetMasterNodeName())
253 watcher_pause = result.payload
255 watcher_pause = NotImplemented
257 return query.ClusterQueryData(cluster, nodes, drain_flag, watcher_pause)
260 class LUClusterQuery(NoHooksLU):
261 """Query cluster configuration.
266 def ExpandNames(self):
267 self.needed_locks = {}
269 def Exec(self, feedback_fn):
270 """Return cluster config.
273 cluster = self.cfg.GetClusterInfo()
276 # Filter just for enabled hypervisors
277 for os_name, hv_dict in cluster.os_hvp.items():
279 for hv_name, hv_params in hv_dict.items():
280 if hv_name in cluster.enabled_hypervisors:
281 os_hvp[os_name][hv_name] = hv_params
283 # Convert ip_family to ip_version
284 primary_ip_version = constants.IP4_VERSION
285 if cluster.primary_ip_family == netutils.IP6Address.family:
286 primary_ip_version = constants.IP6_VERSION
289 "software_version": constants.RELEASE_VERSION,
290 "protocol_version": constants.PROTOCOL_VERSION,
291 "config_version": constants.CONFIG_VERSION,
292 "os_api_version": max(constants.OS_API_VERSIONS),
293 "export_version": constants.EXPORT_VERSION,
294 "vcs_version": constants.VCS_VERSION,
295 "architecture": runtime.GetArchInfo(),
296 "name": cluster.cluster_name,
297 "master": self.cfg.GetMasterNodeName(),
298 "default_hypervisor": cluster.primary_hypervisor,
299 "enabled_hypervisors": cluster.enabled_hypervisors,
300 "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
301 for hypervisor_name in cluster.enabled_hypervisors]),
303 "beparams": cluster.beparams,
304 "osparams": cluster.osparams,
305 "ipolicy": cluster.ipolicy,
306 "nicparams": cluster.nicparams,
307 "ndparams": cluster.ndparams,
308 "diskparams": cluster.diskparams,
309 "candidate_pool_size": cluster.candidate_pool_size,
310 "master_netdev": cluster.master_netdev,
311 "master_netmask": cluster.master_netmask,
312 "use_external_mip_script": cluster.use_external_mip_script,
313 "volume_group_name": cluster.volume_group_name,
314 "drbd_usermode_helper": cluster.drbd_usermode_helper,
315 "file_storage_dir": cluster.file_storage_dir,
316 "shared_file_storage_dir": cluster.shared_file_storage_dir,
317 "maintain_node_health": cluster.maintain_node_health,
318 "ctime": cluster.ctime,
319 "mtime": cluster.mtime,
320 "uuid": cluster.uuid,
321 "tags": list(cluster.GetTags()),
322 "uid_pool": cluster.uid_pool,
323 "default_iallocator": cluster.default_iallocator,
324 "reserved_lvs": cluster.reserved_lvs,
325 "primary_ip_version": primary_ip_version,
326 "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
327 "hidden_os": cluster.hidden_os,
328 "blacklisted_os": cluster.blacklisted_os,
329 "enabled_disk_templates": cluster.enabled_disk_templates,
335 class LUClusterRedistConf(NoHooksLU):
336 """Force the redistribution of cluster configuration.
338 This is a very simple LU.
343 def ExpandNames(self):
344 self.needed_locks = {
345 locking.LEVEL_NODE: locking.ALL_SET,
346 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
348 self.share_locks = ShareAll()
350 def Exec(self, feedback_fn):
351 """Redistribute the configuration.
354 self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
355 RedistributeAncillaryFiles(self)
358 class LUClusterRename(LogicalUnit):
359 """Rename the cluster.
362 HPATH = "cluster-rename"
363 HTYPE = constants.HTYPE_CLUSTER
365 def BuildHooksEnv(self):
370 "OP_TARGET": self.cfg.GetClusterName(),
371 "NEW_NAME": self.op.name,
374 def BuildHooksNodes(self):
375 """Build hooks nodes.
378 return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
380 def CheckPrereq(self):
381 """Verify that the passed name is a valid one.
384 hostname = netutils.GetHostname(name=self.op.name,
385 family=self.cfg.GetPrimaryIPFamily())
387 new_name = hostname.name
388 self.ip = new_ip = hostname.ip
389 old_name = self.cfg.GetClusterName()
390 old_ip = self.cfg.GetMasterIP()
391 if new_name == old_name and new_ip == old_ip:
392 raise errors.OpPrereqError("Neither the name nor the IP address of the"
393 " cluster has changed",
396 if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
397 raise errors.OpPrereqError("The given cluster IP address (%s) is"
398 " reachable on the network" %
399 new_ip, errors.ECODE_NOTUNIQUE)
401 self.op.name = new_name
403 def Exec(self, feedback_fn):
404 """Rename the cluster.
407 clustername = self.op.name
410 # shutdown the master IP
411 master_params = self.cfg.GetMasterNetworkParameters()
412 ems = self.cfg.GetUseExternalMipScript()
413 result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
415 result.Raise("Could not disable the master role")
418 cluster = self.cfg.GetClusterInfo()
419 cluster.cluster_name = clustername
420 cluster.master_ip = new_ip
421 self.cfg.Update(cluster, feedback_fn)
423 # update the known hosts file
424 ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
425 node_list = self.cfg.GetOnlineNodeList()
427 node_list.remove(master_params.uuid)
430 UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
432 master_params.ip = new_ip
433 result = self.rpc.call_node_activate_master_ip(master_params.uuid,
435 result.Warn("Could not re-enable the master role on the master,"
436 " please restart manually", self.LogWarning)
441 class LUClusterRepairDiskSizes(NoHooksLU):
442 """Verifies the cluster disks sizes.
447 def ExpandNames(self):
448 if self.op.instances:
449 (_, self.wanted_names) = GetWantedInstances(self, self.op.instances)
450 # Not getting the node allocation lock as only a specific set of
451 # instances (and their nodes) is going to be acquired
452 self.needed_locks = {
453 locking.LEVEL_NODE_RES: [],
454 locking.LEVEL_INSTANCE: self.wanted_names,
456 self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
458 self.wanted_names = None
459 self.needed_locks = {
460 locking.LEVEL_NODE_RES: locking.ALL_SET,
461 locking.LEVEL_INSTANCE: locking.ALL_SET,
463 # This opcode is acquires the node locks for all instances
464 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
468 locking.LEVEL_NODE_RES: 1,
469 locking.LEVEL_INSTANCE: 0,
470 locking.LEVEL_NODE_ALLOC: 1,
473 def DeclareLocks(self, level):
474 if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
475 self._LockInstancesNodes(primary_only=True, level=level)
477 def CheckPrereq(self):
478 """Check prerequisites.
480 This only checks the optional instance list against the existing names.
483 if self.wanted_names is None:
484 self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
486 self.wanted_instances = \
487 map(compat.snd, self.cfg.GetMultiInstanceInfoByName(self.wanted_names))
489 def _EnsureChildSizes(self, disk):
490 """Ensure children of the disk have the needed disk size.
492 This is valid mainly for DRBD8 and fixes an issue where the
493 children have smaller disk size.
495 @param disk: an L{ganeti.objects.Disk} object
498 if disk.dev_type == constants.LD_DRBD8:
499 assert disk.children, "Empty children for DRBD8?"
500 fchild = disk.children[0]
501 mismatch = fchild.size < disk.size
503 self.LogInfo("Child disk has size %d, parent %d, fixing",
504 fchild.size, disk.size)
505 fchild.size = disk.size
507 # and we recurse on this child only, not on the metadev
508 return self._EnsureChildSizes(fchild) or mismatch
512 def Exec(self, feedback_fn):
513 """Verify the size of cluster disks.
516 # TODO: check child disks too
517 # TODO: check differences in size between primary/secondary nodes
519 for instance in self.wanted_instances:
520 pnode = instance.primary_node
521 if pnode not in per_node_disks:
522 per_node_disks[pnode] = []
523 for idx, disk in enumerate(instance.disks):
524 per_node_disks[pnode].append((instance, idx, disk))
526 assert not (frozenset(per_node_disks.keys()) -
527 self.owned_locks(locking.LEVEL_NODE_RES)), \
528 "Not owning correct locks"
529 assert not self.owned_locks(locking.LEVEL_NODE)
531 es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
532 per_node_disks.keys())
535 for node_uuid, dskl in per_node_disks.items():
536 newl = [v[2].Copy() for v in dskl]
538 self.cfg.SetDiskID(dsk, node_uuid)
539 node_name = self.cfg.GetNodeName(node_uuid)
540 result = self.rpc.call_blockdev_getdimensions(node_uuid, newl)
542 self.LogWarning("Failure in blockdev_getdimensions call to node"
543 " %s, ignoring", node_name)
545 if len(result.payload) != len(dskl):
546 logging.warning("Invalid result from node %s: len(dksl)=%d,"
547 " result.payload=%s", node_name, len(dskl),
549 self.LogWarning("Invalid result from node %s, ignoring node results",
552 for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
553 if dimensions is None:
554 self.LogWarning("Disk %d of instance %s did not return size"
555 " information, ignoring", idx, instance.name)
557 if not isinstance(dimensions, (tuple, list)):
558 self.LogWarning("Disk %d of instance %s did not return valid"
559 " dimension information, ignoring", idx,
562 (size, spindles) = dimensions
563 if not isinstance(size, (int, long)):
564 self.LogWarning("Disk %d of instance %s did not return valid"
565 " size information, ignoring", idx, instance.name)
568 if size != disk.size:
569 self.LogInfo("Disk %d of instance %s has mismatched size,"
570 " correcting: recorded %d, actual %d", idx,
571 instance.name, disk.size, size)
573 self.cfg.Update(instance, feedback_fn)
574 changed.append((instance.name, idx, "size", size))
575 if es_flags[node_uuid]:
577 self.LogWarning("Disk %d of instance %s did not return valid"
578 " spindles information, ignoring", idx,
580 elif disk.spindles is None or disk.spindles != spindles:
581 self.LogInfo("Disk %d of instance %s has mismatched spindles,"
582 " correcting: recorded %s, actual %s",
583 idx, instance.name, disk.spindles, spindles)
584 disk.spindles = spindles
585 self.cfg.Update(instance, feedback_fn)
586 changed.append((instance.name, idx, "spindles", disk.spindles))
587 if self._EnsureChildSizes(disk):
588 self.cfg.Update(instance, feedback_fn)
589 changed.append((instance.name, idx, "size", disk.size))
593 def _ValidateNetmask(cfg, netmask):
594 """Checks if a netmask is valid.
596 @type cfg: L{config.ConfigWriter}
597 @param cfg: The cluster configuration
599 @param netmask: the netmask to be verified
600 @raise errors.OpPrereqError: if the validation fails
603 ip_family = cfg.GetPrimaryIPFamily()
605 ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
606 except errors.ProgrammerError:
607 raise errors.OpPrereqError("Invalid primary ip family: %s." %
608 ip_family, errors.ECODE_INVAL)
609 if not ipcls.ValidateNetmask(netmask):
610 raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
611 (netmask), errors.ECODE_INVAL)
614 def CheckFileBasedStoragePathVsEnabledDiskTemplates(
615 logging_warn_fn, file_storage_dir, enabled_disk_templates,
617 """Checks whether the given file-based storage directory is acceptable.
619 Note: This function is public, because it is also used in bootstrap.py.
621 @type logging_warn_fn: function
622 @param logging_warn_fn: function which accepts a string and logs it
623 @type file_storage_dir: string
624 @param file_storage_dir: the directory to be used for file-based instances
625 @type enabled_disk_templates: list of string
626 @param enabled_disk_templates: the list of enabled disk templates
627 @type file_disk_template: string
628 @param file_disk_template: the file-based disk template for which the
629 path should be checked
632 assert (file_disk_template in
633 utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
634 file_storage_enabled = file_disk_template in enabled_disk_templates
635 if file_storage_dir is not None:
636 if file_storage_dir == "":
637 if file_storage_enabled:
638 raise errors.OpPrereqError(
639 "Unsetting the '%s' storage directory while having '%s' storage"
640 " enabled is not permitted." %
641 (file_disk_template, file_disk_template))
643 if not file_storage_enabled:
645 "Specified a %s storage directory, although %s storage is not"
646 " enabled." % (file_disk_template, file_disk_template))
648 raise errors.ProgrammerError("Received %s storage dir with value"
649 " 'None'." % file_disk_template)
652 def CheckFileStoragePathVsEnabledDiskTemplates(
653 logging_warn_fn, file_storage_dir, enabled_disk_templates):
654 """Checks whether the given file storage directory is acceptable.
656 @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
659 CheckFileBasedStoragePathVsEnabledDiskTemplates(
660 logging_warn_fn, file_storage_dir, enabled_disk_templates,
664 def CheckSharedFileStoragePathVsEnabledDiskTemplates(
665 logging_warn_fn, file_storage_dir, enabled_disk_templates):
666 """Checks whether the given shared file storage directory is acceptable.
668 @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
671 CheckFileBasedStoragePathVsEnabledDiskTemplates(
672 logging_warn_fn, file_storage_dir, enabled_disk_templates,
673 constants.DT_SHARED_FILE)
676 class LUClusterSetParams(LogicalUnit):
677 """Change the parameters of the cluster.
680 HPATH = "cluster-modify"
681 HTYPE = constants.HTYPE_CLUSTER
684 def CheckArguments(self):
689 uidpool.CheckUidPool(self.op.uid_pool)
692 uidpool.CheckUidPool(self.op.add_uids)
694 if self.op.remove_uids:
695 uidpool.CheckUidPool(self.op.remove_uids)
697 if self.op.master_netmask is not None:
698 _ValidateNetmask(self.cfg, self.op.master_netmask)
700 if self.op.diskparams:
701 for dt_params in self.op.diskparams.values():
702 utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
704 utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
705 except errors.OpPrereqError, err:
706 raise errors.OpPrereqError("While verify diskparams options: %s" % err,
709 def ExpandNames(self):
710 # FIXME: in the future maybe other cluster params won't require checking on
711 # all nodes to be modified.
712 # FIXME: This opcode changes cluster-wide settings. Is acquiring all
713 # resource locks the right thing, shouldn't it be the BGL instead?
714 self.needed_locks = {
715 locking.LEVEL_NODE: locking.ALL_SET,
716 locking.LEVEL_INSTANCE: locking.ALL_SET,
717 locking.LEVEL_NODEGROUP: locking.ALL_SET,
718 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
720 self.share_locks = ShareAll()
722 def BuildHooksEnv(self):
727 "OP_TARGET": self.cfg.GetClusterName(),
728 "NEW_VG_NAME": self.op.vg_name,
731 def BuildHooksNodes(self):
732 """Build hooks nodes.
735 mn = self.cfg.GetMasterNode()
738 def _CheckVgName(self, node_uuids, enabled_disk_templates,
739 new_enabled_disk_templates):
740 """Check the consistency of the vg name on all nodes and in case it gets
741 unset whether there are instances still using it.
744 if self.op.vg_name is not None and not self.op.vg_name:
745 if self.cfg.HasAnyDiskOfType(constants.LD_LV):
746 raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
747 " instances exist", errors.ECODE_INVAL)
749 if (self.op.vg_name is not None and
750 utils.IsLvmEnabled(enabled_disk_templates)) or \
751 (self.cfg.GetVGName() is not None and
752 utils.LvmGetsEnabled(enabled_disk_templates,
753 new_enabled_disk_templates)):
754 self._CheckVgNameOnNodes(node_uuids)
756 def _CheckVgNameOnNodes(self, node_uuids):
757 """Check the status of the volume group on each node.
760 vglist = self.rpc.call_vg_list(node_uuids)
761 for node_uuid in node_uuids:
762 msg = vglist[node_uuid].fail_msg
765 self.LogWarning("Error while gathering data on node %s"
766 " (ignoring node): %s",
767 self.cfg.GetNodeName(node_uuid), msg)
769 vgstatus = utils.CheckVolumeGroupSize(vglist[node_uuid].payload,
771 constants.MIN_VG_SIZE)
773 raise errors.OpPrereqError("Error on node '%s': %s" %
774 (self.cfg.GetNodeName(node_uuid), vgstatus),
775 errors.ECODE_ENVIRON)
777 def _GetEnabledDiskTemplates(self, cluster):
778 """Determines the enabled disk templates and the subset of disk templates
779 that are newly enabled by this operation.
782 enabled_disk_templates = None
783 new_enabled_disk_templates = []
784 if self.op.enabled_disk_templates:
785 enabled_disk_templates = self.op.enabled_disk_templates
786 new_enabled_disk_templates = \
787 list(set(enabled_disk_templates)
788 - set(cluster.enabled_disk_templates))
790 enabled_disk_templates = cluster.enabled_disk_templates
791 return (enabled_disk_templates, new_enabled_disk_templates)
793 def _CheckIpolicy(self, cluster, enabled_disk_templates):
794 """Checks the ipolicy.
796 @type cluster: C{objects.Cluster}
797 @param cluster: the cluster's configuration
798 @type enabled_disk_templates: list of string
799 @param enabled_disk_templates: list of (possibly newly) enabled disk
803 # FIXME: write unit tests for this
805 self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
808 CheckIpolicyVsDiskTemplates(self.new_ipolicy,
809 enabled_disk_templates)
811 all_instances = self.cfg.GetAllInstancesInfo().values()
813 for group in self.cfg.GetAllNodeGroupsInfo().values():
814 instances = frozenset([inst for inst in all_instances
815 if compat.any(nuuid in group.members
816 for nuuid in inst.all_nodes)])
817 new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
818 ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
819 new = ComputeNewInstanceViolations(ipol, new_ipolicy, instances,
822 violations.update(new)
825 self.LogWarning("After the ipolicy change the following instances"
827 utils.CommaJoin(utils.NiceSort(violations)))
829 CheckIpolicyVsDiskTemplates(cluster.ipolicy,
830 enabled_disk_templates)
832 def CheckPrereq(self):
833 """Check prerequisites.
835 This checks whether the given params don't conflict and
836 if the given volume group is valid.
839 if self.op.drbd_helper is not None and not self.op.drbd_helper:
840 if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
841 raise errors.OpPrereqError("Cannot disable drbd helper while"
842 " drbd-based instances exist",
845 node_uuids = self.owned_locks(locking.LEVEL_NODE)
846 self.cluster = cluster = self.cfg.GetClusterInfo()
848 vm_capable_node_uuids = [node.uuid
849 for node in self.cfg.GetAllNodesInfo().values()
850 if node.uuid in node_uuids and node.vm_capable]
852 (enabled_disk_templates, new_enabled_disk_templates) = \
853 self._GetEnabledDiskTemplates(cluster)
855 self._CheckVgName(vm_capable_node_uuids, enabled_disk_templates,
856 new_enabled_disk_templates)
858 if self.op.file_storage_dir is not None:
859 CheckFileStoragePathVsEnabledDiskTemplates(
860 self.LogWarning, self.op.file_storage_dir, enabled_disk_templates)
862 if self.op.shared_file_storage_dir is not None:
863 CheckSharedFileStoragePathVsEnabledDiskTemplates(
864 self.LogWarning, self.op.shared_file_storage_dir,
865 enabled_disk_templates)
867 if self.op.drbd_helper:
868 # checks given drbd helper on all nodes
869 helpers = self.rpc.call_drbd_helper(node_uuids)
870 for (_, ninfo) in self.cfg.GetMultiNodeInfo(node_uuids):
872 self.LogInfo("Not checking drbd helper on offline node %s",
875 msg = helpers[ninfo.uuid].fail_msg
877 raise errors.OpPrereqError("Error checking drbd helper on node"
878 " '%s': %s" % (ninfo.name, msg),
879 errors.ECODE_ENVIRON)
880 node_helper = helpers[ninfo.uuid].payload
881 if node_helper != self.op.drbd_helper:
882 raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
883 (ninfo.name, node_helper),
884 errors.ECODE_ENVIRON)
886 # validate params changes
888 objects.UpgradeBeParams(self.op.beparams)
889 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
890 self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
893 utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
894 self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
896 # TODO: we need a more general way to handle resetting
897 # cluster-level parameters to default values
898 if self.new_ndparams["oob_program"] == "":
899 self.new_ndparams["oob_program"] = \
900 constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
903 new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
904 self.cluster.hv_state_static)
905 self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
906 for hv, values in new_hv_state.items())
908 if self.op.disk_state:
909 new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
910 self.cluster.disk_state_static)
911 self.new_disk_state = \
912 dict((storage, dict((name, cluster.SimpleFillDiskState(values))
913 for name, values in svalues.items()))
914 for storage, svalues in new_disk_state.items())
916 self._CheckIpolicy(cluster, enabled_disk_templates)
918 if self.op.nicparams:
919 utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
920 self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
921 objects.NIC.CheckParameterSyntax(self.new_nicparams)
924 # check all instances for consistency
925 for instance in self.cfg.GetAllInstancesInfo().values():
926 for nic_idx, nic in enumerate(instance.nics):
927 params_copy = copy.deepcopy(nic.nicparams)
928 params_filled = objects.FillDict(self.new_nicparams, params_copy)
930 # check parameter syntax
932 objects.NIC.CheckParameterSyntax(params_filled)
933 except errors.ConfigurationError, err:
934 nic_errors.append("Instance %s, nic/%d: %s" %
935 (instance.name, nic_idx, err))
937 # if we're moving instances to routed, check that they have an ip
938 target_mode = params_filled[constants.NIC_MODE]
939 if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
940 nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
941 " address" % (instance.name, nic_idx))
943 raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
944 "\n".join(nic_errors), errors.ECODE_INVAL)
946 # hypervisor list/parameters
947 self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
949 for hv_name, hv_dict in self.op.hvparams.items():
950 if hv_name not in self.new_hvparams:
951 self.new_hvparams[hv_name] = hv_dict
953 self.new_hvparams[hv_name].update(hv_dict)
955 # disk template parameters
956 self.new_diskparams = objects.FillDict(cluster.diskparams, {})
957 if self.op.diskparams:
958 for dt_name, dt_params in self.op.diskparams.items():
959 if dt_name not in self.new_diskparams:
960 self.new_diskparams[dt_name] = dt_params
962 self.new_diskparams[dt_name].update(dt_params)
964 # os hypervisor parameters
965 self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
967 for os_name, hvs in self.op.os_hvp.items():
968 if os_name not in self.new_os_hvp:
969 self.new_os_hvp[os_name] = hvs
971 for hv_name, hv_dict in hvs.items():
973 # Delete if it exists
974 self.new_os_hvp[os_name].pop(hv_name, None)
975 elif hv_name not in self.new_os_hvp[os_name]:
976 self.new_os_hvp[os_name][hv_name] = hv_dict
978 self.new_os_hvp[os_name][hv_name].update(hv_dict)
981 self.new_osp = objects.FillDict(cluster.osparams, {})
983 for os_name, osp in self.op.osparams.items():
984 if os_name not in self.new_osp:
985 self.new_osp[os_name] = {}
987 self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
990 if not self.new_osp[os_name]:
991 # we removed all parameters
992 del self.new_osp[os_name]
994 # check the parameter validity (remote check)
995 CheckOSParams(self, False, [self.cfg.GetMasterNode()],
996 os_name, self.new_osp[os_name])
998 # changes to the hypervisor list
999 if self.op.enabled_hypervisors is not None:
1000 self.hv_list = self.op.enabled_hypervisors
1001 for hv in self.hv_list:
1002 # if the hypervisor doesn't already exist in the cluster
1003 # hvparams, we initialize it to empty, and then (in both
1004 # cases) we make sure to fill the defaults, as we might not
1005 # have a complete defaults list if the hypervisor wasn't
1007 if hv not in new_hvp:
1009 new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
1010 utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
1012 self.hv_list = cluster.enabled_hypervisors
1014 if self.op.hvparams or self.op.enabled_hypervisors is not None:
1015 # either the enabled list has changed, or the parameters have, validate
1016 for hv_name, hv_params in self.new_hvparams.items():
1017 if ((self.op.hvparams and hv_name in self.op.hvparams) or
1018 (self.op.enabled_hypervisors and
1019 hv_name in self.op.enabled_hypervisors)):
1020 # either this is a new hypervisor, or its parameters have changed
1021 hv_class = hypervisor.GetHypervisorClass(hv_name)
1022 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1023 hv_class.CheckParameterSyntax(hv_params)
1024 CheckHVParams(self, node_uuids, hv_name, hv_params)
1026 self._CheckDiskTemplateConsistency()
1029 # no need to check any newly-enabled hypervisors, since the
1030 # defaults have already been checked in the above code-block
1031 for os_name, os_hvp in self.new_os_hvp.items():
1032 for hv_name, hv_params in os_hvp.items():
1033 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1034 # we need to fill in the new os_hvp on top of the actual hv_p
1035 cluster_defaults = self.new_hvparams.get(hv_name, {})
1036 new_osp = objects.FillDict(cluster_defaults, hv_params)
1037 hv_class = hypervisor.GetHypervisorClass(hv_name)
1038 hv_class.CheckParameterSyntax(new_osp)
1039 CheckHVParams(self, node_uuids, hv_name, new_osp)
1041 if self.op.default_iallocator:
1042 alloc_script = utils.FindFile(self.op.default_iallocator,
1043 constants.IALLOCATOR_SEARCH_PATH,
1045 if alloc_script is None:
1046 raise errors.OpPrereqError("Invalid default iallocator script '%s'"
1047 " specified" % self.op.default_iallocator,
1050 def _CheckDiskTemplateConsistency(self):
1051 """Check whether the disk templates that are going to be disabled
1052 are still in use by some instances.
1055 if self.op.enabled_disk_templates:
1056 cluster = self.cfg.GetClusterInfo()
1057 instances = self.cfg.GetAllInstancesInfo()
1059 disk_templates_to_remove = set(cluster.enabled_disk_templates) \
1060 - set(self.op.enabled_disk_templates)
1061 for instance in instances.itervalues():
1062 if instance.disk_template in disk_templates_to_remove:
1063 raise errors.OpPrereqError("Cannot disable disk template '%s',"
1064 " because instance '%s' is using it." %
1065 (instance.disk_template, instance.name))
1067 def _SetVgName(self, feedback_fn):
1068 """Determines and sets the new volume group name.
1071 if self.op.vg_name is not None:
1072 if self.op.vg_name and not \
1073 utils.IsLvmEnabled(self.cluster.enabled_disk_templates):
1074 feedback_fn("Note that you specified a volume group, but did not"
1075 " enable any lvm disk template.")
1076 new_volume = self.op.vg_name
1078 if utils.IsLvmEnabled(self.cluster.enabled_disk_templates):
1079 raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
1080 " disk templates are enabled.")
1082 if new_volume != self.cfg.GetVGName():
1083 self.cfg.SetVGName(new_volume)
1085 feedback_fn("Cluster LVM configuration already in desired"
1086 " state, not changing")
1088 if utils.IsLvmEnabled(self.cluster.enabled_disk_templates) and \
1089 not self.cfg.GetVGName():
1090 raise errors.OpPrereqError("Please specify a volume group when"
1091 " enabling lvm-based disk-templates.")
1093 def _SetFileStorageDir(self, feedback_fn):
1094 """Set the file storage directory.
1097 if self.op.file_storage_dir is not None:
1098 if self.cluster.file_storage_dir == self.op.file_storage_dir:
1099 feedback_fn("Global file storage dir already set to value '%s'"
1100 % self.cluster.file_storage_dir)
1102 self.cluster.file_storage_dir = self.op.file_storage_dir
1104 def Exec(self, feedback_fn):
1105 """Change the parameters of the cluster.
1108 if self.op.enabled_disk_templates:
1109 self.cluster.enabled_disk_templates = \
1110 list(set(self.op.enabled_disk_templates))
1112 self._SetVgName(feedback_fn)
1113 self._SetFileStorageDir(feedback_fn)
1115 if self.op.drbd_helper is not None:
1116 if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
1117 feedback_fn("Note that you specified a drbd user helper, but did"
1118 " enabled the drbd disk template.")
1119 new_helper = self.op.drbd_helper
1122 if new_helper != self.cfg.GetDRBDHelper():
1123 self.cfg.SetDRBDHelper(new_helper)
1125 feedback_fn("Cluster DRBD helper already in desired state,"
1127 if self.op.hvparams:
1128 self.cluster.hvparams = self.new_hvparams
1130 self.cluster.os_hvp = self.new_os_hvp
1131 if self.op.enabled_hypervisors is not None:
1132 self.cluster.hvparams = self.new_hvparams
1133 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1134 if self.op.beparams:
1135 self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1136 if self.op.nicparams:
1137 self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1139 self.cluster.ipolicy = self.new_ipolicy
1140 if self.op.osparams:
1141 self.cluster.osparams = self.new_osp
1142 if self.op.ndparams:
1143 self.cluster.ndparams = self.new_ndparams
1144 if self.op.diskparams:
1145 self.cluster.diskparams = self.new_diskparams
1146 if self.op.hv_state:
1147 self.cluster.hv_state_static = self.new_hv_state
1148 if self.op.disk_state:
1149 self.cluster.disk_state_static = self.new_disk_state
1151 if self.op.candidate_pool_size is not None:
1152 self.cluster.candidate_pool_size = self.op.candidate_pool_size
1153 # we need to update the pool size here, otherwise the save will fail
1154 AdjustCandidatePool(self, [])
1156 if self.op.maintain_node_health is not None:
1157 if self.op.maintain_node_health and not constants.ENABLE_CONFD:
1158 feedback_fn("Note: CONFD was disabled at build time, node health"
1159 " maintenance is not useful (still enabling it)")
1160 self.cluster.maintain_node_health = self.op.maintain_node_health
1162 if self.op.modify_etc_hosts is not None:
1163 self.cluster.modify_etc_hosts = self.op.modify_etc_hosts
1165 if self.op.prealloc_wipe_disks is not None:
1166 self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
1168 if self.op.add_uids is not None:
1169 uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
1171 if self.op.remove_uids is not None:
1172 uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
1174 if self.op.uid_pool is not None:
1175 self.cluster.uid_pool = self.op.uid_pool
1177 if self.op.default_iallocator is not None:
1178 self.cluster.default_iallocator = self.op.default_iallocator
1180 if self.op.reserved_lvs is not None:
1181 self.cluster.reserved_lvs = self.op.reserved_lvs
1183 if self.op.use_external_mip_script is not None:
1184 self.cluster.use_external_mip_script = self.op.use_external_mip_script
1186 def helper_os(aname, mods, desc):
1188 lst = getattr(self.cluster, aname)
1189 for key, val in mods:
1190 if key == constants.DDM_ADD:
1192 feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1195 elif key == constants.DDM_REMOVE:
1199 feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1201 raise errors.ProgrammerError("Invalid modification '%s'" % key)
1203 if self.op.hidden_os:
1204 helper_os("hidden_os", self.op.hidden_os, "hidden")
1206 if self.op.blacklisted_os:
1207 helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1209 if self.op.master_netdev:
1210 master_params = self.cfg.GetMasterNetworkParameters()
1211 ems = self.cfg.GetUseExternalMipScript()
1212 feedback_fn("Shutting down master ip on the current netdev (%s)" %
1213 self.cluster.master_netdev)
1214 result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
1216 if not self.op.force:
1217 result.Raise("Could not disable the master ip")
1220 msg = ("Could not disable the master ip (continuing anyway): %s" %
1223 feedback_fn("Changing master_netdev from %s to %s" %
1224 (master_params.netdev, self.op.master_netdev))
1225 self.cluster.master_netdev = self.op.master_netdev
1227 if self.op.master_netmask:
1228 master_params = self.cfg.GetMasterNetworkParameters()
1229 feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1230 result = self.rpc.call_node_change_master_netmask(
1231 master_params.uuid, master_params.netmask,
1232 self.op.master_netmask, master_params.ip,
1233 master_params.netdev)
1234 result.Warn("Could not change the master IP netmask", feedback_fn)
1235 self.cluster.master_netmask = self.op.master_netmask
1237 self.cfg.Update(self.cluster, feedback_fn)
1239 if self.op.master_netdev:
1240 master_params = self.cfg.GetMasterNetworkParameters()
1241 feedback_fn("Starting the master ip on the new master netdev (%s)" %
1242 self.op.master_netdev)
1243 ems = self.cfg.GetUseExternalMipScript()
1244 result = self.rpc.call_node_activate_master_ip(master_params.uuid,
1246 result.Warn("Could not re-enable the master ip on the master,"
1247 " please restart manually", self.LogWarning)
1250 class LUClusterVerify(NoHooksLU):
1251 """Submits all jobs necessary to verify the cluster.
1256 def ExpandNames(self):
1257 self.needed_locks = {}
1259 def Exec(self, feedback_fn):
1262 if self.op.group_name:
1263 groups = [self.op.group_name]
1264 depends_fn = lambda: None
1266 groups = self.cfg.GetNodeGroupList()
1268 # Verify global configuration
1270 opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1273 # Always depend on global verification
1274 depends_fn = lambda: [(-len(jobs), [])]
1277 [opcodes.OpClusterVerifyGroup(group_name=group,
1278 ignore_errors=self.op.ignore_errors,
1279 depends=depends_fn())]
1280 for group in groups)
1282 # Fix up all parameters
1283 for op in itertools.chain(*jobs): # pylint: disable=W0142
1284 op.debug_simulate_errors = self.op.debug_simulate_errors
1285 op.verbose = self.op.verbose
1286 op.error_codes = self.op.error_codes
1288 op.skip_checks = self.op.skip_checks
1289 except AttributeError:
1290 assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1292 return ResultWithJobs(jobs)
1295 class _VerifyErrors(object):
1296 """Mix-in for cluster/group verify LUs.
1298 It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1299 self.op and self._feedback_fn to be available.)
1303 ETYPE_FIELD = "code"
1304 ETYPE_ERROR = "ERROR"
1305 ETYPE_WARNING = "WARNING"
1307 def _Error(self, ecode, item, msg, *args, **kwargs):
1308 """Format an error message.
1310 Based on the opcode's error_codes parameter, either format a
1311 parseable error code, or a simpler error string.
1313 This must be called only from Exec and functions called from Exec.
1316 ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1317 itype, etxt, _ = ecode
1318 # If the error code is in the list of ignored errors, demote the error to a
1320 if etxt in self.op.ignore_errors: # pylint: disable=E1101
1321 ltype = self.ETYPE_WARNING
1322 # first complete the msg
1325 # then format the whole message
1326 if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1327 msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1333 msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1334 # and finally report it via the feedback_fn
1335 self._feedback_fn(" - %s" % msg) # Mix-in. pylint: disable=E1101
1336 # do not mark the operation as failed for WARN cases only
1337 if ltype == self.ETYPE_ERROR:
1340 def _ErrorIf(self, cond, *args, **kwargs):
1341 """Log an error message if the passed condition is True.
1345 or self.op.debug_simulate_errors): # pylint: disable=E1101
1346 self._Error(*args, **kwargs)
1349 def _VerifyCertificate(filename):
1350 """Verifies a certificate for L{LUClusterVerifyConfig}.
1352 @type filename: string
1353 @param filename: Path to PEM file
1357 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1358 utils.ReadFile(filename))
1359 except Exception, err: # pylint: disable=W0703
1360 return (LUClusterVerifyConfig.ETYPE_ERROR,
1361 "Failed to load X509 certificate %s: %s" % (filename, err))
1364 utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1365 constants.SSL_CERT_EXPIRATION_ERROR)
1368 fnamemsg = "While verifying %s: %s" % (filename, msg)
1373 return (None, fnamemsg)
1374 elif errcode == utils.CERT_WARNING:
1375 return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1376 elif errcode == utils.CERT_ERROR:
1377 return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1379 raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1382 def _GetAllHypervisorParameters(cluster, instances):
1383 """Compute the set of all hypervisor parameters.
1385 @type cluster: L{objects.Cluster}
1386 @param cluster: the cluster object
1387 @param instances: list of L{objects.Instance}
1388 @param instances: additional instances from which to obtain parameters
1389 @rtype: list of (origin, hypervisor, parameters)
1390 @return: a list with all parameters found, indicating the hypervisor they
1391 apply to, and the origin (can be "cluster", "os X", or "instance Y")
1396 for hv_name in cluster.enabled_hypervisors:
1397 hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1399 for os_name, os_hvp in cluster.os_hvp.items():
1400 for hv_name, hv_params in os_hvp.items():
1402 full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1403 hvp_data.append(("os %s" % os_name, hv_name, full_params))
1405 # TODO: collapse identical parameter values in a single one
1406 for instance in instances:
1407 if instance.hvparams:
1408 hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1409 cluster.FillHV(instance)))
1414 class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1415 """Verifies the cluster config.
1420 def _VerifyHVP(self, hvp_data):
1421 """Verifies locally the syntax of the hypervisor parameters.
1424 for item, hv_name, hv_params in hvp_data:
1425 msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1428 hv_class = hypervisor.GetHypervisorClass(hv_name)
1429 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1430 hv_class.CheckParameterSyntax(hv_params)
1431 except errors.GenericError, err:
1432 self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1434 def ExpandNames(self):
1435 self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1436 self.share_locks = ShareAll()
1438 def CheckPrereq(self):
1439 """Check prerequisites.
1442 # Retrieve all information
1443 self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1444 self.all_node_info = self.cfg.GetAllNodesInfo()
1445 self.all_inst_info = self.cfg.GetAllInstancesInfo()
1447 def Exec(self, feedback_fn):
1448 """Verify integrity of cluster, performing various test on nodes.
1452 self._feedback_fn = feedback_fn
1454 feedback_fn("* Verifying cluster config")
1456 for msg in self.cfg.VerifyConfig():
1457 self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1459 feedback_fn("* Verifying cluster certificate files")
1461 for cert_filename in pathutils.ALL_CERT_FILES:
1462 (errcode, msg) = _VerifyCertificate(cert_filename)
1463 self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1465 self._ErrorIf(not utils.CanRead(constants.LUXID_USER,
1466 pathutils.NODED_CERT_FILE),
1467 constants.CV_ECLUSTERCERT,
1469 pathutils.NODED_CERT_FILE + " must be accessible by the " +
1470 constants.LUXID_USER + " user")
1472 feedback_fn("* Verifying hypervisor parameters")
1474 self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1475 self.all_inst_info.values()))
1477 feedback_fn("* Verifying all nodes belong to an existing group")
1479 # We do this verification here because, should this bogus circumstance
1480 # occur, it would never be caught by VerifyGroup, which only acts on
1481 # nodes/instances reachable from existing node groups.
1483 dangling_nodes = set(node for node in self.all_node_info.values()
1484 if node.group not in self.all_group_info)
1486 dangling_instances = {}
1487 no_node_instances = []
1489 for inst in self.all_inst_info.values():
1490 if inst.primary_node in [node.uuid for node in dangling_nodes]:
1491 dangling_instances.setdefault(inst.primary_node, []).append(inst)
1492 elif inst.primary_node not in self.all_node_info:
1493 no_node_instances.append(inst)
1498 utils.CommaJoin(inst.name for
1499 inst in dangling_instances.get(node.uuid, [])))
1500 for node in dangling_nodes]
1502 self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1504 "the following nodes (and their instances) belong to a non"
1505 " existing group: %s", utils.CommaJoin(pretty_dangling))
1507 self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1509 "the following instances have a non-existing primary-node:"
1510 " %s", utils.CommaJoin(inst.name for
1511 inst in no_node_instances))
1516 class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1517 """Verifies the status of a node group.
1520 HPATH = "cluster-verify"
1521 HTYPE = constants.HTYPE_CLUSTER
1524 _HOOKS_INDENT_RE = re.compile("^", re.M)
1526 class NodeImage(object):
1527 """A class representing the logical and physical status of a node.
1530 @ivar uuid: the node UUID to which this object refers
1531 @ivar volumes: a structure as returned from
1532 L{ganeti.backend.GetVolumeList} (runtime)
1533 @ivar instances: a list of running instances (runtime)
1534 @ivar pinst: list of configured primary instances (config)
1535 @ivar sinst: list of configured secondary instances (config)
1536 @ivar sbp: dictionary of {primary-node: list of instances} for all
1537 instances for which this node is secondary (config)
1538 @ivar mfree: free memory, as reported by hypervisor (runtime)
1539 @ivar dfree: free disk, as reported by the node (runtime)
1540 @ivar offline: the offline status (config)
1541 @type rpc_fail: boolean
1542 @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1543 not whether the individual keys were correct) (runtime)
1544 @type lvm_fail: boolean
1545 @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1546 @type hyp_fail: boolean
1547 @ivar hyp_fail: whether the RPC call didn't return the instance list
1548 @type ghost: boolean
1549 @ivar ghost: whether this is a known node or not (config)
1550 @type os_fail: boolean
1551 @ivar os_fail: whether the RPC call didn't return valid OS data
1553 @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1554 @type vm_capable: boolean
1555 @ivar vm_capable: whether the node can host instances
1557 @ivar pv_min: size in MiB of the smallest PVs
1559 @ivar pv_max: size in MiB of the biggest PVs
1562 def __init__(self, offline=False, uuid=None, vm_capable=True):
1571 self.offline = offline
1572 self.vm_capable = vm_capable
1573 self.rpc_fail = False
1574 self.lvm_fail = False
1575 self.hyp_fail = False
1577 self.os_fail = False
1582 def ExpandNames(self):
1583 # This raises errors.OpPrereqError on its own:
1584 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1586 # Get instances in node group; this is unsafe and needs verification later
1588 self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1590 self.needed_locks = {
1591 locking.LEVEL_INSTANCE: self.cfg.GetInstanceNames(inst_uuids),
1592 locking.LEVEL_NODEGROUP: [self.group_uuid],
1593 locking.LEVEL_NODE: [],
1595 # This opcode is run by watcher every five minutes and acquires all nodes
1596 # for a group. It doesn't run for a long time, so it's better to acquire
1597 # the node allocation lock as well.
1598 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1601 self.share_locks = ShareAll()
1603 def DeclareLocks(self, level):
1604 if level == locking.LEVEL_NODE:
1605 # Get members of node group; this is unsafe and needs verification later
1606 nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1608 # In Exec(), we warn about mirrored instances that have primary and
1609 # secondary living in separate node groups. To fully verify that
1610 # volumes for these instances are healthy, we will need to do an
1611 # extra call to their secondaries. We ensure here those nodes will
1613 for inst_name in self.owned_locks(locking.LEVEL_INSTANCE):
1614 # Important: access only the instances whose lock is owned
1615 instance = self.cfg.GetInstanceInfoByName(inst_name)
1616 if instance.disk_template in constants.DTS_INT_MIRROR:
1617 nodes.update(instance.secondary_nodes)
1619 self.needed_locks[locking.LEVEL_NODE] = nodes
1621 def CheckPrereq(self):
1622 assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1623 self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1625 group_node_uuids = set(self.group_info.members)
1626 group_inst_uuids = \
1627 self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1629 unlocked_node_uuids = \
1630 group_node_uuids.difference(self.owned_locks(locking.LEVEL_NODE))
1632 unlocked_inst_uuids = \
1633 group_inst_uuids.difference(
1634 [self.cfg.GetInstanceInfoByName(name).uuid
1635 for name in self.owned_locks(locking.LEVEL_INSTANCE)])
1637 if unlocked_node_uuids:
1638 raise errors.OpPrereqError(
1639 "Missing lock for nodes: %s" %
1640 utils.CommaJoin(self.cfg.GetNodeNames(unlocked_node_uuids)),
1643 if unlocked_inst_uuids:
1644 raise errors.OpPrereqError(
1645 "Missing lock for instances: %s" %
1646 utils.CommaJoin(self.cfg.GetInstanceNames(unlocked_inst_uuids)),
1649 self.all_node_info = self.cfg.GetAllNodesInfo()
1650 self.all_inst_info = self.cfg.GetAllInstancesInfo()
1652 self.my_node_uuids = group_node_uuids
1653 self.my_node_info = dict((node_uuid, self.all_node_info[node_uuid])
1654 for node_uuid in group_node_uuids)
1656 self.my_inst_uuids = group_inst_uuids
1657 self.my_inst_info = dict((inst_uuid, self.all_inst_info[inst_uuid])
1658 for inst_uuid in group_inst_uuids)
1660 # We detect here the nodes that will need the extra RPC calls for verifying
1661 # split LV volumes; they should be locked.
1662 extra_lv_nodes = set()
1664 for inst in self.my_inst_info.values():
1665 if inst.disk_template in constants.DTS_INT_MIRROR:
1666 for nuuid in inst.all_nodes:
1667 if self.all_node_info[nuuid].group != self.group_uuid:
1668 extra_lv_nodes.add(nuuid)
1670 unlocked_lv_nodes = \
1671 extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1673 if unlocked_lv_nodes:
1674 raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1675 utils.CommaJoin(unlocked_lv_nodes),
1677 self.extra_lv_nodes = list(extra_lv_nodes)
1679 def _VerifyNode(self, ninfo, nresult):
1680 """Perform some basic validation on data returned from a node.
1682 - check the result data structure is well formed and has all the
1684 - check ganeti version
1686 @type ninfo: L{objects.Node}
1687 @param ninfo: the node to check
1688 @param nresult: the results from the node
1690 @return: whether overall this call was successful (and we can expect
1691 reasonable values in the respose)
1694 # main result, nresult should be a non-empty dict
1695 test = not nresult or not isinstance(nresult, dict)
1696 self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1697 "unable to verify node: no data returned")
1701 # compares ganeti version
1702 local_version = constants.PROTOCOL_VERSION
1703 remote_version = nresult.get("version", None)
1704 test = not (remote_version and
1705 isinstance(remote_version, (list, tuple)) and
1706 len(remote_version) == 2)
1707 self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1708 "connection to node returned invalid data")
1712 test = local_version != remote_version[0]
1713 self._ErrorIf(test, constants.CV_ENODEVERSION, ninfo.name,
1714 "incompatible protocol versions: master %s,"
1715 " node %s", local_version, remote_version[0])
1719 # node seems compatible, we can actually try to look into its results
1721 # full package version
1722 self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1723 constants.CV_ENODEVERSION, ninfo.name,
1724 "software version mismatch: master %s, node %s",
1725 constants.RELEASE_VERSION, remote_version[1],
1726 code=self.ETYPE_WARNING)
1728 hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1729 if ninfo.vm_capable and isinstance(hyp_result, dict):
1730 for hv_name, hv_result in hyp_result.iteritems():
1731 test = hv_result is not None
1732 self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
1733 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1735 hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1736 if ninfo.vm_capable and isinstance(hvp_result, list):
1737 for item, hv_name, hv_result in hvp_result:
1738 self._ErrorIf(True, constants.CV_ENODEHV, ninfo.name,
1739 "hypervisor %s parameter verify failure (source %s): %s",
1740 hv_name, item, hv_result)
1742 test = nresult.get(constants.NV_NODESETUP,
1743 ["Missing NODESETUP results"])
1744 self._ErrorIf(test, constants.CV_ENODESETUP, ninfo.name,
1745 "node setup error: %s", "; ".join(test))
1749 def _VerifyNodeTime(self, ninfo, nresult,
1750 nvinfo_starttime, nvinfo_endtime):
1751 """Check the node time.
1753 @type ninfo: L{objects.Node}
1754 @param ninfo: the node to check
1755 @param nresult: the remote results for the node
1756 @param nvinfo_starttime: the start time of the RPC call
1757 @param nvinfo_endtime: the end time of the RPC call
1760 ntime = nresult.get(constants.NV_TIME, None)
1762 ntime_merged = utils.MergeTime(ntime)
1763 except (ValueError, TypeError):
1764 self._ErrorIf(True, constants.CV_ENODETIME, ninfo.name,
1765 "Node returned invalid time")
1768 if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1769 ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1770 elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1771 ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1775 self._ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, ninfo.name,
1776 "Node time diverges by at least %s from master node time",
1779 def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1780 """Check the node LVM results and update info for cross-node checks.
1782 @type ninfo: L{objects.Node}
1783 @param ninfo: the node to check
1784 @param nresult: the remote results for the node
1785 @param vg_name: the configured VG name
1786 @type nimg: L{NodeImage}
1787 @param nimg: node image
1793 # checks vg existence and size > 20G
1794 vglist = nresult.get(constants.NV_VGLIST, None)
1796 self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
1797 "unable to check volume groups")
1799 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1800 constants.MIN_VG_SIZE)
1801 self._ErrorIf(vgstatus, constants.CV_ENODELVM, ninfo.name, vgstatus)
1804 (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1806 self._Error(constants.CV_ENODELVM, ninfo.name, em)
1807 if pvminmax is not None:
1808 (nimg.pv_min, nimg.pv_max) = pvminmax
1810 def _VerifyGroupDRBDVersion(self, node_verify_infos):
1811 """Check cross-node DRBD version consistency.
1813 @type node_verify_infos: dict
1814 @param node_verify_infos: infos about nodes as returned from the
1819 for node_uuid, ndata in node_verify_infos.items():
1820 nresult = ndata.payload
1821 version = nresult.get(constants.NV_DRBDVERSION, "Missing DRBD version")
1822 node_versions[node_uuid] = version
1824 if len(set(node_versions.values())) > 1:
1825 for node_uuid, version in sorted(node_versions.items()):
1826 msg = "DRBD version mismatch: %s" % version
1827 self._Error(constants.CV_ENODEDRBDHELPER, node_uuid, msg,
1828 code=self.ETYPE_WARNING)
1830 def _VerifyGroupLVM(self, node_image, vg_name):
1831 """Check cross-node consistency in LVM.
1833 @type node_image: dict
1834 @param node_image: info about nodes, mapping from node to names to
1835 L{NodeImage} objects
1836 @param vg_name: the configured VG name
1842 # Only exclusive storage needs this kind of checks
1843 if not self._exclusive_storage:
1846 # exclusive_storage wants all PVs to have the same size (approximately),
1847 # if the smallest and the biggest ones are okay, everything is fine.
1848 # pv_min is None iff pv_max is None
1849 vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1852 (pvmin, minnode_uuid) = min((ni.pv_min, ni.uuid) for ni in vals)
1853 (pvmax, maxnode_uuid) = max((ni.pv_max, ni.uuid) for ni in vals)
1854 bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1855 self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1856 "PV sizes differ too much in the group; smallest (%s MB) is"
1857 " on %s, biggest (%s MB) is on %s",
1858 pvmin, self.cfg.GetNodeName(minnode_uuid),
1859 pvmax, self.cfg.GetNodeName(maxnode_uuid))
1861 def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1862 """Check the node bridges.
1864 @type ninfo: L{objects.Node}
1865 @param ninfo: the node to check
1866 @param nresult: the remote results for the node
1867 @param bridges: the expected list of bridges
1873 missing = nresult.get(constants.NV_BRIDGES, None)
1874 test = not isinstance(missing, list)
1875 self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1876 "did not return valid bridge information")
1878 self._ErrorIf(bool(missing), constants.CV_ENODENET, ninfo.name,
1879 "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1881 def _VerifyNodeUserScripts(self, ninfo, nresult):
1882 """Check the results of user scripts presence and executability on the node
1884 @type ninfo: L{objects.Node}
1885 @param ninfo: the node to check
1886 @param nresult: the remote results for the node
1889 test = not constants.NV_USERSCRIPTS in nresult
1890 self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1891 "did not return user scripts information")
1893 broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
1895 self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1896 "user scripts not present or not executable: %s" %
1897 utils.CommaJoin(sorted(broken_scripts)))
1899 def _VerifyNodeNetwork(self, ninfo, nresult):
1900 """Check the node network connectivity results.
1902 @type ninfo: L{objects.Node}
1903 @param ninfo: the node to check
1904 @param nresult: the remote results for the node
1907 test = constants.NV_NODELIST not in nresult
1908 self._ErrorIf(test, constants.CV_ENODESSH, ninfo.name,
1909 "node hasn't returned node ssh connectivity data")
1911 if nresult[constants.NV_NODELIST]:
1912 for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1913 self._ErrorIf(True, constants.CV_ENODESSH, ninfo.name,
1914 "ssh communication with node '%s': %s", a_node, a_msg)
1916 test = constants.NV_NODENETTEST not in nresult
1917 self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1918 "node hasn't returned node tcp connectivity data")
1920 if nresult[constants.NV_NODENETTEST]:
1921 nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1923 self._ErrorIf(True, constants.CV_ENODENET, ninfo.name,
1924 "tcp communication with node '%s': %s",
1925 anode, nresult[constants.NV_NODENETTEST][anode])
1927 test = constants.NV_MASTERIP not in nresult
1928 self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1929 "node hasn't returned node master IP reachability data")
1931 if not nresult[constants.NV_MASTERIP]:
1932 if ninfo.uuid == self.master_node:
1933 msg = "the master node cannot reach the master IP (not configured?)"
1935 msg = "cannot reach the master IP"
1936 self._ErrorIf(True, constants.CV_ENODENET, ninfo.name, msg)
1938 def _VerifyInstance(self, instance, node_image, diskstatus):
1939 """Verify an instance.
1941 This function checks to see if the required block devices are
1942 available on the instance's node, and that the nodes are in the correct
1946 pnode_uuid = instance.primary_node
1947 pnode_img = node_image[pnode_uuid]
1948 groupinfo = self.cfg.GetAllNodeGroupsInfo()
1950 node_vol_should = {}
1951 instance.MapLVsByNode(node_vol_should)
1953 cluster = self.cfg.GetClusterInfo()
1954 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
1956 err = ComputeIPolicyInstanceViolation(ipolicy, instance, self.cfg)
1957 self._ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance.name,
1958 utils.CommaJoin(err), code=self.ETYPE_WARNING)
1960 for node_uuid in node_vol_should:
1961 n_img = node_image[node_uuid]
1962 if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1963 # ignore missing volumes on offline or broken nodes
1965 for volume in node_vol_should[node_uuid]:
1966 test = volume not in n_img.volumes
1967 self._ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance.name,
1968 "volume %s missing on node %s", volume,
1969 self.cfg.GetNodeName(node_uuid))
1971 if instance.admin_state == constants.ADMINST_UP:
1972 test = instance.uuid not in pnode_img.instances and not pnode_img.offline
1973 self._ErrorIf(test, constants.CV_EINSTANCEDOWN, instance.name,
1974 "instance not running on its primary node %s",
1975 self.cfg.GetNodeName(pnode_uuid))
1976 self._ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE,
1977 instance.name, "instance is marked as running and lives on"
1978 " offline node %s", self.cfg.GetNodeName(pnode_uuid))
1980 diskdata = [(nname, success, status, idx)
1981 for (nname, disks) in diskstatus.items()
1982 for idx, (success, status) in enumerate(disks)]
1984 for nname, success, bdev_status, idx in diskdata:
1985 # the 'ghost node' construction in Exec() ensures that we have a
1987 snode = node_image[nname]
1988 bad_snode = snode.ghost or snode.offline
1989 self._ErrorIf(instance.disks_active and
1990 not success and not bad_snode,
1991 constants.CV_EINSTANCEFAULTYDISK, instance.name,
1992 "couldn't retrieve status for disk/%s on %s: %s",
1993 idx, self.cfg.GetNodeName(nname), bdev_status)
1995 if instance.disks_active and success and \
1996 (bdev_status.is_degraded or
1997 bdev_status.ldisk_status != constants.LDS_OKAY):
1998 msg = "disk/%s on %s" % (idx, self.cfg.GetNodeName(nname))
1999 if bdev_status.is_degraded:
2000 msg += " is degraded"
2001 if bdev_status.ldisk_status != constants.LDS_OKAY:
2002 msg += "; state is '%s'" % \
2003 constants.LDS_NAMES[bdev_status.ldisk_status]
2005 self._Error(constants.CV_EINSTANCEFAULTYDISK, instance.name, msg)
2007 self._ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2008 constants.CV_ENODERPC, self.cfg.GetNodeName(pnode_uuid),
2009 "instance %s, connection to primary node failed",
2012 self._ErrorIf(len(instance.secondary_nodes) > 1,
2013 constants.CV_EINSTANCELAYOUT, instance.name,
2014 "instance has multiple secondary nodes: %s",
2015 utils.CommaJoin(instance.secondary_nodes),
2016 code=self.ETYPE_WARNING)
2018 es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, instance.all_nodes)
2019 if any(es_flags.values()):
2020 if instance.disk_template not in constants.DTS_EXCL_STORAGE:
2021 # Disk template not compatible with exclusive_storage: no instance
2022 # node should have the flag set
2024 for (n, es) in es_flags.items()
2026 self._Error(constants.CV_EINSTANCEUNSUITABLENODE, instance.name,
2027 "instance has template %s, which is not supported on nodes"
2028 " that have exclusive storage set: %s",
2029 instance.disk_template,
2030 utils.CommaJoin(self.cfg.GetNodeNames(es_nodes)))
2031 for (idx, disk) in enumerate(instance.disks):
2032 self._ErrorIf(disk.spindles is None,
2033 constants.CV_EINSTANCEMISSINGCFGPARAMETER, instance.name,
2034 "number of spindles not configured for disk %s while"
2035 " exclusive storage is enabled, try running"
2036 " gnt-cluster repair-disk-sizes", idx)
2038 if instance.disk_template in constants.DTS_INT_MIRROR:
2039 instance_nodes = utils.NiceSort(instance.all_nodes)
2040 instance_groups = {}
2042 for node_uuid in instance_nodes:
2043 instance_groups.setdefault(self.all_node_info[node_uuid].group,
2044 []).append(node_uuid)
2047 "%s (group %s)" % (utils.CommaJoin(self.cfg.GetNodeNames(nodes)),
2048 groupinfo[group].name)
2049 # Sort so that we always list the primary node first.
2050 for group, nodes in sorted(instance_groups.items(),
2051 key=lambda (_, nodes): pnode_uuid in nodes,
2054 self._ErrorIf(len(instance_groups) > 1,
2055 constants.CV_EINSTANCESPLITGROUPS,
2056 instance.name, "instance has primary and secondary nodes in"
2057 " different groups: %s", utils.CommaJoin(pretty_list),
2058 code=self.ETYPE_WARNING)
2060 inst_nodes_offline = []
2061 for snode in instance.secondary_nodes:
2062 s_img = node_image[snode]
2063 self._ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
2064 self.cfg.GetNodeName(snode),
2065 "instance %s, connection to secondary node failed",
2069 inst_nodes_offline.append(snode)
2071 # warn that the instance lives on offline nodes
2072 self._ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE,
2073 instance.name, "instance has offline secondary node(s) %s",
2074 utils.CommaJoin(self.cfg.GetNodeNames(inst_nodes_offline)))
2075 # ... or ghost/non-vm_capable nodes
2076 for node_uuid in instance.all_nodes:
2077 self._ErrorIf(node_image[node_uuid].ghost, constants.CV_EINSTANCEBADNODE,
2078 instance.name, "instance lives on ghost node %s",
2079 self.cfg.GetNodeName(node_uuid))
2080 self._ErrorIf(not node_image[node_uuid].vm_capable,
2081 constants.CV_EINSTANCEBADNODE, instance.name,
2082 "instance lives on non-vm_capable node %s",
2083 self.cfg.GetNodeName(node_uuid))
2085 def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2086 """Verify if there are any unknown volumes in the cluster.
2088 The .os, .swap and backup volumes are ignored. All other volumes are
2089 reported as unknown.
2091 @type reserved: L{ganeti.utils.FieldSet}
2092 @param reserved: a FieldSet of reserved volume names
2095 for node_uuid, n_img in node_image.items():
2096 if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
2097 self.all_node_info[node_uuid].group != self.group_uuid):
2098 # skip non-healthy nodes
2100 for volume in n_img.volumes:
2101 test = ((node_uuid not in node_vol_should or
2102 volume not in node_vol_should[node_uuid]) and
2103 not reserved.Matches(volume))
2104 self._ErrorIf(test, constants.CV_ENODEORPHANLV,
2105 self.cfg.GetNodeName(node_uuid),
2106 "volume %s is unknown", volume)
2108 def _VerifyNPlusOneMemory(self, node_image, all_insts):
2109 """Verify N+1 Memory Resilience.
2111 Check that if one single node dies we can still start all the
2112 instances it was primary for.
2115 cluster_info = self.cfg.GetClusterInfo()
2116 for node_uuid, n_img in node_image.items():
2117 # This code checks that every node which is now listed as
2118 # secondary has enough memory to host all instances it is
2119 # supposed to should a single other node in the cluster fail.
2120 # FIXME: not ready for failover to an arbitrary node
2121 # FIXME: does not support file-backed instances
2122 # WARNING: we currently take into account down instances as well
2123 # as up ones, considering that even if they're down someone
2124 # might want to start them even in the event of a node failure.
2125 if n_img.offline or \
2126 self.all_node_info[node_uuid].group != self.group_uuid:
2127 # we're skipping nodes marked offline and nodes in other groups from
2128 # the N+1 warning, since most likely we don't have good memory
2129 # infromation from them; we already list instances living on such
2130 # nodes, and that's enough warning
2132 #TODO(dynmem): also consider ballooning out other instances
2133 for prinode, inst_uuids in n_img.sbp.items():
2135 for inst_uuid in inst_uuids:
2136 bep = cluster_info.FillBE(all_insts[inst_uuid])
2137 if bep[constants.BE_AUTO_BALANCE]:
2138 needed_mem += bep[constants.BE_MINMEM]
2139 test = n_img.mfree < needed_mem
2140 self._ErrorIf(test, constants.CV_ENODEN1,
2141 self.cfg.GetNodeName(node_uuid),
2142 "not enough memory to accomodate instance failovers"
2143 " should node %s fail (%dMiB needed, %dMiB available)",
2144 self.cfg.GetNodeName(prinode), needed_mem, n_img.mfree)
2146 def _VerifyFiles(self, nodes, master_node_uuid, all_nvinfo,
2147 (files_all, files_opt, files_mc, files_vm)):
2148 """Verifies file checksums collected from all nodes.
2150 @param nodes: List of L{objects.Node} objects
2151 @param master_node_uuid: UUID of master node
2152 @param all_nvinfo: RPC results
2155 # Define functions determining which nodes to consider for a file
2158 (files_mc, lambda node: (node.master_candidate or
2159 node.uuid == master_node_uuid)),
2160 (files_vm, lambda node: node.vm_capable),
2163 # Build mapping from filename to list of nodes which should have the file
2165 for (files, fn) in files2nodefn:
2169 filenodes = filter(fn, nodes)
2170 nodefiles.update((filename,
2171 frozenset(map(operator.attrgetter("uuid"), filenodes)))
2172 for filename in files)
2174 assert set(nodefiles) == (files_all | files_mc | files_vm)
2176 fileinfo = dict((filename, {}) for filename in nodefiles)
2177 ignore_nodes = set()
2181 ignore_nodes.add(node.uuid)
2184 nresult = all_nvinfo[node.uuid]
2186 if nresult.fail_msg or not nresult.payload:
2189 fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
2190 node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2191 for (key, value) in fingerprints.items())
2194 test = not (node_files and isinstance(node_files, dict))
2195 self._ErrorIf(test, constants.CV_ENODEFILECHECK, node.name,
2196 "Node did not return file checksum data")
2198 ignore_nodes.add(node.uuid)
2201 # Build per-checksum mapping from filename to nodes having it
2202 for (filename, checksum) in node_files.items():
2203 assert filename in nodefiles
2204 fileinfo[filename].setdefault(checksum, set()).add(node.uuid)
2206 for (filename, checksums) in fileinfo.items():
2207 assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2209 # Nodes having the file
2210 with_file = frozenset(node_uuid
2211 for node_uuids in fileinfo[filename].values()
2212 for node_uuid in node_uuids) - ignore_nodes
2214 expected_nodes = nodefiles[filename] - ignore_nodes
2216 # Nodes missing file
2217 missing_file = expected_nodes - with_file
2219 if filename in files_opt:
2221 self._ErrorIf(missing_file and missing_file != expected_nodes,
2222 constants.CV_ECLUSTERFILECHECK, None,
2223 "File %s is optional, but it must exist on all or no"
2224 " nodes (not found on %s)",
2228 map(self.cfg.GetNodeName, missing_file))))
2230 self._ErrorIf(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2231 "File %s is missing from node(s) %s", filename,
2234 map(self.cfg.GetNodeName, missing_file))))
2236 # Warn if a node has a file it shouldn't
2237 unexpected = with_file - expected_nodes
2238 self._ErrorIf(unexpected,
2239 constants.CV_ECLUSTERFILECHECK, None,
2240 "File %s should not exist on node(s) %s",
2241 filename, utils.CommaJoin(
2242 utils.NiceSort(map(self.cfg.GetNodeName, unexpected))))
2244 # See if there are multiple versions of the file
2245 test = len(checksums) > 1
2247 variants = ["variant %s on %s" %
2249 utils.CommaJoin(utils.NiceSort(
2250 map(self.cfg.GetNodeName, node_uuids))))
2251 for (idx, (checksum, node_uuids)) in
2252 enumerate(sorted(checksums.items()))]
2256 self._ErrorIf(test, constants.CV_ECLUSTERFILECHECK, None,
2257 "File %s found with %s different checksums (%s)",
2258 filename, len(checksums), "; ".join(variants))
2260 def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2262 """Verifies and the node DRBD status.
2264 @type ninfo: L{objects.Node}
2265 @param ninfo: the node to check
2266 @param nresult: the remote results for the node
2267 @param instanceinfo: the dict of instances
2268 @param drbd_helper: the configured DRBD usermode helper
2269 @param drbd_map: the DRBD map as returned by
2270 L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2274 helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2275 test = (helper_result is None)
2276 self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2277 "no drbd usermode helper returned")
2279 status, payload = helper_result
2281 self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2282 "drbd usermode helper check unsuccessful: %s", payload)
2283 test = status and (payload != drbd_helper)
2284 self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2285 "wrong drbd usermode helper: %s", payload)
2287 # compute the DRBD minors
2289 for minor, inst_uuid in drbd_map[ninfo.uuid].items():
2290 test = inst_uuid not in instanceinfo
2291 self._ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2292 "ghost instance '%s' in temporary DRBD map", inst_uuid)
2293 # ghost instance should not be running, but otherwise we
2294 # don't give double warnings (both ghost instance and
2295 # unallocated minor in use)
2297 node_drbd[minor] = (inst_uuid, False)
2299 instance = instanceinfo[inst_uuid]
2300 node_drbd[minor] = (inst_uuid, instance.disks_active)
2302 # and now check them
2303 used_minors = nresult.get(constants.NV_DRBDLIST, [])
2304 test = not isinstance(used_minors, (tuple, list))
2305 self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2306 "cannot parse drbd status file: %s", str(used_minors))
2308 # we cannot check drbd status
2311 for minor, (inst_uuid, must_exist) in node_drbd.items():
2312 test = minor not in used_minors and must_exist
2313 self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2314 "drbd minor %d of instance %s is not active", minor,
2315 self.cfg.GetInstanceName(inst_uuid))
2316 for minor in used_minors:
2317 test = minor not in node_drbd
2318 self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2319 "unallocated drbd minor %d is in use", minor)
2321 def _UpdateNodeOS(self, ninfo, nresult, nimg):
2322 """Builds the node OS structures.
2324 @type ninfo: L{objects.Node}
2325 @param ninfo: the node to check
2326 @param nresult: the remote results for the node
2327 @param nimg: the node image object
2330 remote_os = nresult.get(constants.NV_OSLIST, None)
2331 test = (not isinstance(remote_os, list) or
2332 not compat.all(isinstance(v, list) and len(v) == 7
2333 for v in remote_os))
2335 self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2336 "node hasn't returned valid OS data")
2345 for (name, os_path, status, diagnose,
2346 variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2348 if name not in os_dict:
2351 # parameters is a list of lists instead of list of tuples due to
2352 # JSON lacking a real tuple type, fix it:
2353 parameters = [tuple(v) for v in parameters]
2354 os_dict[name].append((os_path, status, diagnose,
2355 set(variants), set(parameters), set(api_ver)))
2357 nimg.oslist = os_dict
2359 def _VerifyNodeOS(self, ninfo, nimg, base):
2360 """Verifies the node OS list.
2362 @type ninfo: L{objects.Node}
2363 @param ninfo: the node to check
2364 @param nimg: the node image object
2365 @param base: the 'template' node we match against (e.g. from the master)
2368 assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2370 beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2371 for os_name, os_data in nimg.oslist.items():
2372 assert os_data, "Empty OS status for OS %s?!" % os_name
2373 f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2374 self._ErrorIf(not f_status, constants.CV_ENODEOS, ninfo.name,
2375 "Invalid OS %s (located at %s): %s",
2376 os_name, f_path, f_diag)
2377 self._ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, ninfo.name,
2378 "OS '%s' has multiple entries"
2379 " (first one shadows the rest): %s",
2380 os_name, utils.CommaJoin([v[0] for v in os_data]))
2381 # comparisons with the 'base' image
2382 test = os_name not in base.oslist
2383 self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2384 "Extra OS %s not present on reference node (%s)",
2385 os_name, self.cfg.GetNodeName(base.uuid))
2388 assert base.oslist[os_name], "Base node has empty OS status?"
2389 _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2391 # base OS is invalid, skipping
2393 for kind, a, b in [("API version", f_api, b_api),
2394 ("variants list", f_var, b_var),
2395 ("parameters", beautify_params(f_param),
2396 beautify_params(b_param))]:
2397 self._ErrorIf(a != b, constants.CV_ENODEOS, ninfo.name,
2398 "OS %s for %s differs from reference node %s:"
2399 " [%s] vs. [%s]", kind, os_name,
2400 self.cfg.GetNodeName(base.uuid),
2401 utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2403 # check any missing OSes
2404 missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2405 self._ErrorIf(missing, constants.CV_ENODEOS, ninfo.name,
2406 "OSes present on reference node %s"
2407 " but missing on this node: %s",
2408 self.cfg.GetNodeName(base.uuid), utils.CommaJoin(missing))
2410 def _VerifyAcceptedFileStoragePaths(self, ninfo, nresult, is_master):
2411 """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2413 @type ninfo: L{objects.Node}
2414 @param ninfo: the node to check
2415 @param nresult: the remote results for the node
2416 @type is_master: bool
2417 @param is_master: Whether node is the master node
2420 cluster = self.cfg.GetClusterInfo()
2422 (cluster.IsFileStorageEnabled() or
2423 cluster.IsSharedFileStorageEnabled())):
2425 fspaths = nresult[constants.NV_ACCEPTED_STORAGE_PATHS]
2427 # This should never happen
2428 self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2429 "Node did not return forbidden file storage paths")
2431 self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2432 "Found forbidden file storage paths: %s",
2433 utils.CommaJoin(fspaths))
2435 self._ErrorIf(constants.NV_ACCEPTED_STORAGE_PATHS in nresult,
2436 constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2437 "Node should not have returned forbidden file storage"
2440 def _VerifyStoragePaths(self, ninfo, nresult, file_disk_template,
2441 verify_key, error_key):
2442 """Verifies (file) storage paths.
2444 @type ninfo: L{objects.Node}
2445 @param ninfo: the node to check
2446 @param nresult: the remote results for the node
2447 @type file_disk_template: string
2448 @param file_disk_template: file-based disk template, whose directory
2449 is supposed to be verified
2450 @type verify_key: string
2451 @param verify_key: key for the verification map of this file
2453 @param error_key: error key to be added to the verification results
2454 in case something goes wrong in this verification step
2457 assert (file_disk_template in
2458 utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
2459 cluster = self.cfg.GetClusterInfo()
2460 if cluster.IsDiskTemplateEnabled(file_disk_template):
2462 verify_key in nresult,
2463 error_key, ninfo.name,
2464 "The configured %s storage path is unusable: %s" %
2465 (file_disk_template, nresult.get(verify_key)))
2467 def _VerifyFileStoragePaths(self, ninfo, nresult):
2468 """Verifies (file) storage paths.
2470 @see: C{_VerifyStoragePaths}
2473 self._VerifyStoragePaths(
2474 ninfo, nresult, constants.DT_FILE,
2475 constants.NV_FILE_STORAGE_PATH,
2476 constants.CV_ENODEFILESTORAGEPATHUNUSABLE)
2478 def _VerifySharedFileStoragePaths(self, ninfo, nresult):
2479 """Verifies (file) storage paths.
2481 @see: C{_VerifyStoragePaths}
2484 self._VerifyStoragePaths(
2485 ninfo, nresult, constants.DT_SHARED_FILE,
2486 constants.NV_SHARED_FILE_STORAGE_PATH,
2487 constants.CV_ENODESHAREDFILESTORAGEPATHUNUSABLE)
2489 def _VerifyOob(self, ninfo, nresult):
2490 """Verifies out of band functionality of a node.
2492 @type ninfo: L{objects.Node}
2493 @param ninfo: the node to check
2494 @param nresult: the remote results for the node
2497 # We just have to verify the paths on master and/or master candidates
2498 # as the oob helper is invoked on the master
2499 if ((ninfo.master_candidate or ninfo.master_capable) and
2500 constants.NV_OOB_PATHS in nresult):
2501 for path_result in nresult[constants.NV_OOB_PATHS]:
2502 self._ErrorIf(path_result, constants.CV_ENODEOOBPATH,
2503 ninfo.name, path_result)
2505 def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2506 """Verifies and updates the node volume data.
2508 This function will update a L{NodeImage}'s internal structures
2509 with data from the remote call.
2511 @type ninfo: L{objects.Node}
2512 @param ninfo: the node to check
2513 @param nresult: the remote results for the node
2514 @param nimg: the node image object
2515 @param vg_name: the configured VG name
2518 nimg.lvm_fail = True
2519 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2522 elif isinstance(lvdata, basestring):
2523 self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2524 "LVM problem on node: %s", utils.SafeEncode(lvdata))
2525 elif not isinstance(lvdata, dict):
2526 self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2527 "rpc call to node failed (lvlist)")
2529 nimg.volumes = lvdata
2530 nimg.lvm_fail = False
2532 def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2533 """Verifies and updates the node instance list.
2535 If the listing was successful, then updates this node's instance
2536 list. Otherwise, it marks the RPC call as failed for the instance
2539 @type ninfo: L{objects.Node}
2540 @param ninfo: the node to check
2541 @param nresult: the remote results for the node
2542 @param nimg: the node image object
2545 idata = nresult.get(constants.NV_INSTANCELIST, None)
2546 test = not isinstance(idata, list)
2547 self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2548 "rpc call to node failed (instancelist): %s",
2549 utils.SafeEncode(str(idata)))
2551 nimg.hyp_fail = True
2553 nimg.instances = [inst.uuid for (_, inst) in
2554 self.cfg.GetMultiInstanceInfoByName(idata)]
2556 def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2557 """Verifies and computes a node information map
2559 @type ninfo: L{objects.Node}
2560 @param ninfo: the node to check
2561 @param nresult: the remote results for the node
2562 @param nimg: the node image object
2563 @param vg_name: the configured VG name
2566 # try to read free memory (from the hypervisor)
2567 hv_info = nresult.get(constants.NV_HVINFO, None)
2568 test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2569 self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2570 "rpc call to node failed (hvinfo)")
2573 nimg.mfree = int(hv_info["memory_free"])
2574 except (ValueError, TypeError):
2575 self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2576 "node returned invalid nodeinfo, check hypervisor")
2578 # FIXME: devise a free space model for file based instances as well
2579 if vg_name is not None:
2580 test = (constants.NV_VGLIST not in nresult or
2581 vg_name not in nresult[constants.NV_VGLIST])
2582 self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
2583 "node didn't return data for the volume group '%s'"
2584 " - it is either missing or broken", vg_name)
2587 nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2588 except (ValueError, TypeError):
2589 self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2590 "node returned invalid LVM info, check LVM status")
2592 def _CollectDiskInfo(self, node_uuids, node_image, instanceinfo):
2593 """Gets per-disk status information for all instances.
2595 @type node_uuids: list of strings
2596 @param node_uuids: Node UUIDs
2597 @type node_image: dict of (UUID, L{objects.Node})
2598 @param node_image: Node objects
2599 @type instanceinfo: dict of (UUID, L{objects.Instance})
2600 @param instanceinfo: Instance objects
2601 @rtype: {instance: {node: [(succes, payload)]}}
2602 @return: a dictionary of per-instance dictionaries with nodes as
2603 keys and disk information as values; the disk information is a
2604 list of tuples (success, payload)
2608 node_disks_devonly = {}
2609 diskless_instances = set()
2610 diskless = constants.DT_DISKLESS
2612 for nuuid in node_uuids:
2613 node_inst_uuids = list(itertools.chain(node_image[nuuid].pinst,
2614 node_image[nuuid].sinst))
2615 diskless_instances.update(uuid for uuid in node_inst_uuids
2616 if instanceinfo[uuid].disk_template == diskless)
2617 disks = [(inst_uuid, disk)
2618 for inst_uuid in node_inst_uuids
2619 for disk in instanceinfo[inst_uuid].disks]
2622 # No need to collect data
2625 node_disks[nuuid] = disks
2627 # _AnnotateDiskParams makes already copies of the disks
2629 for (inst_uuid, dev) in disks:
2630 (anno_disk,) = AnnotateDiskParams(instanceinfo[inst_uuid], [dev],
2632 self.cfg.SetDiskID(anno_disk, nuuid)
2633 devonly.append(anno_disk)
2635 node_disks_devonly[nuuid] = devonly
2637 assert len(node_disks) == len(node_disks_devonly)
2639 # Collect data from all nodes with disks
2640 result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2643 assert len(result) == len(node_disks)
2647 for (nuuid, nres) in result.items():
2648 node = self.cfg.GetNodeInfo(nuuid)
2649 disks = node_disks[node.uuid]
2652 # No data from this node
2653 data = len(disks) * [(False, "node offline")]
2656 self._ErrorIf(msg, constants.CV_ENODERPC, node.name,
2657 "while getting disk information: %s", msg)
2659 # No data from this node
2660 data = len(disks) * [(False, msg)]
2663 for idx, i in enumerate(nres.payload):
2664 if isinstance(i, (tuple, list)) and len(i) == 2:
2667 logging.warning("Invalid result from node %s, entry %d: %s",
2669 data.append((False, "Invalid result from the remote node"))
2671 for ((inst_uuid, _), status) in zip(disks, data):
2672 instdisk.setdefault(inst_uuid, {}).setdefault(node.uuid, []) \
2675 # Add empty entries for diskless instances.
2676 for inst_uuid in diskless_instances:
2677 assert inst_uuid not in instdisk
2678 instdisk[inst_uuid] = {}
2680 assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2681 len(nuuids) <= len(instanceinfo[inst].all_nodes) and
2682 compat.all(isinstance(s, (tuple, list)) and
2683 len(s) == 2 for s in statuses)
2684 for inst, nuuids in instdisk.items()
2685 for nuuid, statuses in nuuids.items())
2687 instdisk_keys = set(instdisk)
2688 instanceinfo_keys = set(instanceinfo)
2689 assert instdisk_keys == instanceinfo_keys, \
2690 ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2691 (instdisk_keys, instanceinfo_keys))
2696 def _SshNodeSelector(group_uuid, all_nodes):
2697 """Create endless iterators for all potential SSH check hosts.
2700 nodes = [node for node in all_nodes
2701 if (node.group != group_uuid and
2703 keyfunc = operator.attrgetter("group")
2705 return map(itertools.cycle,
2706 [sorted(map(operator.attrgetter("name"), names))
2707 for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2711 def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2712 """Choose which nodes should talk to which other nodes.
2714 We will make nodes contact all nodes in their group, and one node from
2717 @warning: This algorithm has a known issue if one node group is much
2718 smaller than others (e.g. just one node). In such a case all other
2719 nodes will talk to the single node.
2722 online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2723 sel = cls._SshNodeSelector(group_uuid, all_nodes)
2725 return (online_nodes,
2726 dict((name, sorted([i.next() for i in sel]))
2727 for name in online_nodes))
2729 def BuildHooksEnv(self):
2732 Cluster-Verify hooks just ran in the post phase and their failure makes
2733 the output be logged in the verify output and the verification to fail.
2737 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2740 env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2741 for node in self.my_node_info.values())
2745 def BuildHooksNodes(self):
2746 """Build hooks nodes.
2749 return ([], list(self.my_node_info.keys()))
2751 def Exec(self, feedback_fn):
2752 """Verify integrity of the node group, performing various test on nodes.
2755 # This method has too many local variables. pylint: disable=R0914
2756 feedback_fn("* Verifying group '%s'" % self.group_info.name)
2758 if not self.my_node_uuids:
2760 feedback_fn("* Empty node group, skipping verification")
2764 verbose = self.op.verbose
2765 self._feedback_fn = feedback_fn
2767 vg_name = self.cfg.GetVGName()
2768 drbd_helper = self.cfg.GetDRBDHelper()
2769 cluster = self.cfg.GetClusterInfo()
2770 hypervisors = cluster.enabled_hypervisors
2771 node_data_list = self.my_node_info.values()
2773 i_non_redundant = [] # Non redundant instances
2774 i_non_a_balanced = [] # Non auto-balanced instances
2775 i_offline = 0 # Count of offline instances
2776 n_offline = 0 # Count of offline nodes
2777 n_drained = 0 # Count of nodes being drained
2778 node_vol_should = {}
2780 # FIXME: verify OS list
2783 filemap = ComputeAncillaryFiles(cluster, False)
2785 # do local checksums
2786 master_node_uuid = self.master_node = self.cfg.GetMasterNode()
2787 master_ip = self.cfg.GetMasterIP()
2789 feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_uuids))
2792 if self.cfg.GetUseExternalMipScript():
2793 user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2795 node_verify_param = {
2796 constants.NV_FILELIST:
2797 map(vcluster.MakeVirtualPath,
2798 utils.UniqueSequence(filename
2799 for files in filemap
2800 for filename in files)),
2801 constants.NV_NODELIST:
2802 self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2803 self.all_node_info.values()),
2804 constants.NV_HYPERVISOR: hypervisors,
2805 constants.NV_HVPARAMS:
2806 _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2807 constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2808 for node in node_data_list
2809 if not node.offline],
2810 constants.NV_INSTANCELIST: hypervisors,
2811 constants.NV_VERSION: None,
2812 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2813 constants.NV_NODESETUP: None,
2814 constants.NV_TIME: None,
2815 constants.NV_MASTERIP: (self.cfg.GetMasterNodeName(), master_ip),
2816 constants.NV_OSLIST: None,
2817 constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2818 constants.NV_USERSCRIPTS: user_scripts,
2821 if vg_name is not None:
2822 node_verify_param[constants.NV_VGLIST] = None
2823 node_verify_param[constants.NV_LVLIST] = vg_name
2824 node_verify_param[constants.NV_PVLIST] = [vg_name]
2827 node_verify_param[constants.NV_DRBDVERSION] = None
2828 node_verify_param[constants.NV_DRBDLIST] = None
2829 node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2831 if cluster.IsFileStorageEnabled() or \
2832 cluster.IsSharedFileStorageEnabled():
2833 # Load file storage paths only from master node
2834 node_verify_param[constants.NV_ACCEPTED_STORAGE_PATHS] = \
2835 self.cfg.GetMasterNodeName()
2836 if cluster.IsFileStorageEnabled():
2837 node_verify_param[constants.NV_FILE_STORAGE_PATH] = \
2838 cluster.file_storage_dir
2841 # FIXME: this needs to be changed per node-group, not cluster-wide
2843 default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2844 if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2845 bridges.add(default_nicpp[constants.NIC_LINK])
2846 for inst_uuid in self.my_inst_info.values():
2847 for nic in inst_uuid.nics:
2848 full_nic = cluster.SimpleFillNIC(nic.nicparams)
2849 if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2850 bridges.add(full_nic[constants.NIC_LINK])
2853 node_verify_param[constants.NV_BRIDGES] = list(bridges)
2855 # Build our expected cluster state
2856 node_image = dict((node.uuid, self.NodeImage(offline=node.offline,
2858 vm_capable=node.vm_capable))
2859 for node in node_data_list)
2863 for node in self.all_node_info.values():
2864 path = SupportsOob(self.cfg, node)
2865 if path and path not in oob_paths:
2866 oob_paths.append(path)
2869 node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2871 for inst_uuid in self.my_inst_uuids:
2872 instance = self.my_inst_info[inst_uuid]
2873 if instance.admin_state == constants.ADMINST_OFFLINE:
2876 for nuuid in instance.all_nodes:
2877 if nuuid not in node_image:
2878 gnode = self.NodeImage(uuid=nuuid)
2879 gnode.ghost = (nuuid not in self.all_node_info)
2880 node_image[nuuid] = gnode
2882 instance.MapLVsByNode(node_vol_should)
2884 pnode = instance.primary_node
2885 node_image[pnode].pinst.append(instance.uuid)
2887 for snode in instance.secondary_nodes:
2888 nimg = node_image[snode]
2889 nimg.sinst.append(instance.uuid)
2890 if pnode not in nimg.sbp:
2891 nimg.sbp[pnode] = []
2892 nimg.sbp[pnode].append(instance.uuid)
2894 es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
2895 self.my_node_info.keys())
2896 # The value of exclusive_storage should be the same across the group, so if
2897 # it's True for at least a node, we act as if it were set for all the nodes
2898 self._exclusive_storage = compat.any(es_flags.values())
2899 if self._exclusive_storage:
2900 node_verify_param[constants.NV_EXCLUSIVEPVS] = True
2902 # At this point, we have the in-memory data structures complete,
2903 # except for the runtime information, which we'll gather next
2905 # Due to the way our RPC system works, exact response times cannot be
2906 # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2907 # time before and after executing the request, we can at least have a time
2909 nvinfo_starttime = time.time()
2910 all_nvinfo = self.rpc.call_node_verify(self.my_node_uuids,
2912 self.cfg.GetClusterName(),
2913 self.cfg.GetClusterInfo().hvparams)
2914 nvinfo_endtime = time.time()
2916 if self.extra_lv_nodes and vg_name is not None:
2918 self.rpc.call_node_verify(self.extra_lv_nodes,
2919 {constants.NV_LVLIST: vg_name},
2920 self.cfg.GetClusterName(),
2921 self.cfg.GetClusterInfo().hvparams)
2923 extra_lv_nvinfo = {}
2925 all_drbd_map = self.cfg.ComputeDRBDMap()
2927 feedback_fn("* Gathering disk information (%s nodes)" %
2928 len(self.my_node_uuids))
2929 instdisk = self._CollectDiskInfo(self.my_node_info.keys(), node_image,
2932 feedback_fn("* Verifying configuration file consistency")
2934 # If not all nodes are being checked, we need to make sure the master node
2935 # and a non-checked vm_capable node are in the list.
2936 absent_node_uuids = set(self.all_node_info).difference(self.my_node_info)
2937 if absent_node_uuids:
2938 vf_nvinfo = all_nvinfo.copy()
2939 vf_node_info = list(self.my_node_info.values())
2940 additional_node_uuids = []
2941 if master_node_uuid not in self.my_node_info:
2942 additional_node_uuids.append(master_node_uuid)
2943 vf_node_info.append(self.all_node_info[master_node_uuid])
2944 # Add the first vm_capable node we find which is not included,
2945 # excluding the master node (which we already have)
2946 for node_uuid in absent_node_uuids:
2947 nodeinfo = self.all_node_info[node_uuid]
2948 if (nodeinfo.vm_capable and not nodeinfo.offline and
2949 node_uuid != master_node_uuid):
2950 additional_node_uuids.append(node_uuid)
2951 vf_node_info.append(self.all_node_info[node_uuid])
2953 key = constants.NV_FILELIST
2954 vf_nvinfo.update(self.rpc.call_node_verify(
2955 additional_node_uuids, {key: node_verify_param[key]},
2956 self.cfg.GetClusterName(), self.cfg.GetClusterInfo().hvparams))
2958 vf_nvinfo = all_nvinfo
2959 vf_node_info = self.my_node_info.values()
2961 self._VerifyFiles(vf_node_info, master_node_uuid, vf_nvinfo, filemap)
2963 feedback_fn("* Verifying node status")
2967 for node_i in node_data_list:
2968 nimg = node_image[node_i.uuid]
2972 feedback_fn("* Skipping offline node %s" % (node_i.name,))
2976 if node_i.uuid == master_node_uuid:
2978 elif node_i.master_candidate:
2979 ntype = "master candidate"
2980 elif node_i.drained:
2986 feedback_fn("* Verifying node %s (%s)" % (node_i.name, ntype))
2988 msg = all_nvinfo[node_i.uuid].fail_msg
2989 self._ErrorIf(msg, constants.CV_ENODERPC, node_i.name,
2990 "while contacting node: %s", msg)
2992 nimg.rpc_fail = True
2995 nresult = all_nvinfo[node_i.uuid].payload
2997 nimg.call_ok = self._VerifyNode(node_i, nresult)
2998 self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2999 self._VerifyNodeNetwork(node_i, nresult)
3000 self._VerifyNodeUserScripts(node_i, nresult)
3001 self._VerifyOob(node_i, nresult)
3002 self._VerifyAcceptedFileStoragePaths(node_i, nresult,
3003 node_i.uuid == master_node_uuid)
3004 self._VerifyFileStoragePaths(node_i, nresult)
3005 self._VerifySharedFileStoragePaths(node_i, nresult)
3008 self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
3009 self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
3012 self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
3013 self._UpdateNodeInstances(node_i, nresult, nimg)
3014 self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
3015 self._UpdateNodeOS(node_i, nresult, nimg)
3017 if not nimg.os_fail:
3018 if refos_img is None:
3020 self._VerifyNodeOS(node_i, nimg, refos_img)
3021 self._VerifyNodeBridges(node_i, nresult, bridges)
3023 # Check whether all running instances are primary for the node. (This
3024 # can no longer be done from _VerifyInstance below, since some of the
3025 # wrong instances could be from other node groups.)
3026 non_primary_inst_uuids = set(nimg.instances).difference(nimg.pinst)
3028 for inst_uuid in non_primary_inst_uuids:
3029 test = inst_uuid in self.all_inst_info
3030 self._ErrorIf(test, constants.CV_EINSTANCEWRONGNODE,
3031 self.cfg.GetInstanceName(inst_uuid),
3032 "instance should not run on node %s", node_i.name)
3033 self._ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
3034 "node is running unknown instance %s", inst_uuid)
3036 self._VerifyGroupDRBDVersion(all_nvinfo)
3037 self._VerifyGroupLVM(node_image, vg_name)
3039 for node_uuid, result in extra_lv_nvinfo.items():
3040 self._UpdateNodeVolumes(self.all_node_info[node_uuid], result.payload,
3041 node_image[node_uuid], vg_name)
3043 feedback_fn("* Verifying instance status")
3044 for inst_uuid in self.my_inst_uuids:
3045 instance = self.my_inst_info[inst_uuid]
3047 feedback_fn("* Verifying instance %s" % instance.name)
3048 self._VerifyInstance(instance, node_image, instdisk[inst_uuid])
3050 # If the instance is non-redundant we cannot survive losing its primary
3051 # node, so we are not N+1 compliant.
3052 if instance.disk_template not in constants.DTS_MIRRORED:
3053 i_non_redundant.append(instance)
3055 if not cluster.FillBE(instance)[constants.BE_AUTO_BALANCE]:
3056 i_non_a_balanced.append(instance)
3058 feedback_fn("* Verifying orphan volumes")
3059 reserved = utils.FieldSet(*cluster.reserved_lvs)
3061 # We will get spurious "unknown volume" warnings if any node of this group
3062 # is secondary for an instance whose primary is in another group. To avoid
3063 # them, we find these instances and add their volumes to node_vol_should.
3064 for instance in self.all_inst_info.values():
3065 for secondary in instance.secondary_nodes:
3066 if (secondary in self.my_node_info
3067 and instance.name not in self.my_inst_info):
3068 instance.MapLVsByNode(node_vol_should)
3071 self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
3073 if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
3074 feedback_fn("* Verifying N+1 Memory redundancy")
3075 self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
3077 feedback_fn("* Other Notes")
3079 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
3080 % len(i_non_redundant))
3082 if i_non_a_balanced:
3083 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
3084 % len(i_non_a_balanced))
3087 feedback_fn(" - NOTICE: %d offline instance(s) found." % i_offline)
3090 feedback_fn(" - NOTICE: %d offline node(s) found." % n_offline)
3093 feedback_fn(" - NOTICE: %d drained node(s) found." % n_drained)
3097 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
3098 """Analyze the post-hooks' result
3100 This method analyses the hook result, handles it, and sends some
3101 nicely-formatted feedback back to the user.
3103 @param phase: one of L{constants.HOOKS_PHASE_POST} or
3104 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
3105 @param hooks_results: the results of the multi-node hooks rpc call
3106 @param feedback_fn: function used send feedback back to the caller
3107 @param lu_result: previous Exec result
3108 @return: the new Exec result, based on the previous result
3112 # We only really run POST phase hooks, only for non-empty groups,
3113 # and are only interested in their results
3114 if not self.my_node_uuids:
3117 elif phase == constants.HOOKS_PHASE_POST:
3118 # Used to change hooks' output to proper indentation
3119 feedback_fn("* Hooks Results")
3120 assert hooks_results, "invalid result from hooks"
3122 for node_name in hooks_results:
3123 res = hooks_results[node_name]
3125 test = msg and not res.offline
3126 self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3127 "Communication failure in hooks execution: %s", msg)
3128 if res.offline or msg:
3129 # No need to investigate payload if node is offline or gave
3132 for script, hkr, output in res.payload:
3133 test = hkr == constants.HKR_FAIL
3134 self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3135 "Script %s failed, output:", script)
3137 output = self._HOOKS_INDENT_RE.sub(" ", output)
3138 feedback_fn("%s" % output)
3144 class LUClusterVerifyDisks(NoHooksLU):
3145 """Verifies the cluster disks status.
3150 def ExpandNames(self):
3151 self.share_locks = ShareAll()
3152 self.needed_locks = {
3153 locking.LEVEL_NODEGROUP: locking.ALL_SET,
3156 def Exec(self, feedback_fn):
3157 group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3159 # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3160 return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3161 for group in group_names])