4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Logical units dealing with the cluster."""
34 from ganeti import compat
35 from ganeti import constants
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import masterd
40 from ganeti import netutils
41 from ganeti import objects
42 from ganeti import opcodes
43 from ganeti import pathutils
44 from ganeti import query
45 from ganeti import rpc
46 from ganeti import runtime
47 from ganeti import ssh
48 from ganeti import uidpool
49 from ganeti import utils
50 from ganeti import vcluster
52 from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
54 from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55 ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56 GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57 GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58 CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59 ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob
61 import ganeti.masterd.instance
64 class LUClusterActivateMasterIp(NoHooksLU):
65 """Activate the master IP on the master node.
68 def Exec(self, feedback_fn):
69 """Activate the master IP.
72 master_params = self.cfg.GetMasterNetworkParameters()
73 ems = self.cfg.GetUseExternalMipScript()
74 result = self.rpc.call_node_activate_master_ip(master_params.uuid,
76 result.Raise("Could not activate the master IP")
79 class LUClusterDeactivateMasterIp(NoHooksLU):
80 """Deactivate the master IP on the master node.
83 def Exec(self, feedback_fn):
84 """Deactivate the master IP.
87 master_params = self.cfg.GetMasterNetworkParameters()
88 ems = self.cfg.GetUseExternalMipScript()
89 result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
91 result.Raise("Could not deactivate the master IP")
94 class LUClusterConfigQuery(NoHooksLU):
95 """Return configuration values.
100 def CheckArguments(self):
101 self.cq = ClusterQuery(None, self.op.output_fields, False)
103 def ExpandNames(self):
104 self.cq.ExpandNames(self)
106 def DeclareLocks(self, level):
107 self.cq.DeclareLocks(self, level)
109 def Exec(self, feedback_fn):
110 result = self.cq.OldStyleQuery(self)
112 assert len(result) == 1
117 class LUClusterDestroy(LogicalUnit):
118 """Logical unit for destroying the cluster.
121 HPATH = "cluster-destroy"
122 HTYPE = constants.HTYPE_CLUSTER
124 def BuildHooksEnv(self):
129 "OP_TARGET": self.cfg.GetClusterName(),
132 def BuildHooksNodes(self):
133 """Build hooks nodes.
138 def CheckPrereq(self):
139 """Check prerequisites.
141 This checks whether the cluster is empty.
143 Any errors are signaled by raising errors.OpPrereqError.
146 master = self.cfg.GetMasterNode()
148 nodelist = self.cfg.GetNodeList()
149 if len(nodelist) != 1 or nodelist[0] != master:
150 raise errors.OpPrereqError("There are still %d node(s) in"
151 " this cluster." % (len(nodelist) - 1),
153 instancelist = self.cfg.GetInstanceList()
155 raise errors.OpPrereqError("There are still %d instance(s) in"
156 " this cluster." % len(instancelist),
159 def Exec(self, feedback_fn):
160 """Destroys the cluster.
163 master_params = self.cfg.GetMasterNetworkParameters()
165 # Run post hooks on master node before it's removed
166 RunPostHook(self, self.cfg.GetNodeName(master_params.uuid))
168 ems = self.cfg.GetUseExternalMipScript()
169 result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
171 result.Warn("Error disabling the master IP address", self.LogWarning)
172 return master_params.uuid
175 class LUClusterPostInit(LogicalUnit):
176 """Logical unit for running hooks after cluster initialization.
179 HPATH = "cluster-init"
180 HTYPE = constants.HTYPE_CLUSTER
182 def BuildHooksEnv(self):
187 "OP_TARGET": self.cfg.GetClusterName(),
190 def BuildHooksNodes(self):
191 """Build hooks nodes.
194 return ([], [self.cfg.GetMasterNode()])
196 def Exec(self, feedback_fn):
203 class ClusterQuery(QueryBase):
204 FIELDS = query.CLUSTER_FIELDS
206 #: Do not sort (there is only one item)
209 def ExpandNames(self, lu):
212 # The following variables interact with _QueryBase._GetNames
213 self.wanted = locking.ALL_SET
214 self.do_locking = self.use_locking
217 raise errors.OpPrereqError("Can not use locking for cluster queries",
220 def DeclareLocks(self, lu, level):
223 def _GetQueryData(self, lu):
224 """Computes the list of nodes and their attributes.
227 # Locking is not used
228 assert not (compat.any(lu.glm.is_owned(level)
229 for level in locking.LEVELS
230 if level != locking.LEVEL_CLUSTER) or
231 self.do_locking or self.use_locking)
233 if query.CQ_CONFIG in self.requested_data:
234 cluster = lu.cfg.GetClusterInfo()
235 nodes = lu.cfg.GetAllNodesInfo()
237 cluster = NotImplemented
238 nodes = NotImplemented
240 if query.CQ_QUEUE_DRAINED in self.requested_data:
241 drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
243 drain_flag = NotImplemented
245 if query.CQ_WATCHER_PAUSE in self.requested_data:
246 master_node_uuid = lu.cfg.GetMasterNode()
248 result = lu.rpc.call_get_watcher_pause(master_node_uuid)
249 result.Raise("Can't retrieve watcher pause from master node '%s'" %
250 lu.cfg.GetMasterNodeName())
252 watcher_pause = result.payload
254 watcher_pause = NotImplemented
256 return query.ClusterQueryData(cluster, nodes, drain_flag, watcher_pause)
259 class LUClusterQuery(NoHooksLU):
260 """Query cluster configuration.
265 def ExpandNames(self):
266 self.needed_locks = {}
268 def Exec(self, feedback_fn):
269 """Return cluster config.
272 cluster = self.cfg.GetClusterInfo()
275 # Filter just for enabled hypervisors
276 for os_name, hv_dict in cluster.os_hvp.items():
278 for hv_name, hv_params in hv_dict.items():
279 if hv_name in cluster.enabled_hypervisors:
280 os_hvp[os_name][hv_name] = hv_params
282 # Convert ip_family to ip_version
283 primary_ip_version = constants.IP4_VERSION
284 if cluster.primary_ip_family == netutils.IP6Address.family:
285 primary_ip_version = constants.IP6_VERSION
288 "software_version": constants.RELEASE_VERSION,
289 "protocol_version": constants.PROTOCOL_VERSION,
290 "config_version": constants.CONFIG_VERSION,
291 "os_api_version": max(constants.OS_API_VERSIONS),
292 "export_version": constants.EXPORT_VERSION,
293 "vcs_version": constants.VCS_VERSION,
294 "architecture": runtime.GetArchInfo(),
295 "name": cluster.cluster_name,
296 "master": self.cfg.GetMasterNodeName(),
297 "default_hypervisor": cluster.primary_hypervisor,
298 "enabled_hypervisors": cluster.enabled_hypervisors,
299 "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
300 for hypervisor_name in cluster.enabled_hypervisors]),
302 "beparams": cluster.beparams,
303 "osparams": cluster.osparams,
304 "ipolicy": cluster.ipolicy,
305 "nicparams": cluster.nicparams,
306 "ndparams": cluster.ndparams,
307 "diskparams": cluster.diskparams,
308 "candidate_pool_size": cluster.candidate_pool_size,
309 "master_netdev": cluster.master_netdev,
310 "master_netmask": cluster.master_netmask,
311 "use_external_mip_script": cluster.use_external_mip_script,
312 "volume_group_name": cluster.volume_group_name,
313 "drbd_usermode_helper": cluster.drbd_usermode_helper,
314 "file_storage_dir": cluster.file_storage_dir,
315 "shared_file_storage_dir": cluster.shared_file_storage_dir,
316 "maintain_node_health": cluster.maintain_node_health,
317 "ctime": cluster.ctime,
318 "mtime": cluster.mtime,
319 "uuid": cluster.uuid,
320 "tags": list(cluster.GetTags()),
321 "uid_pool": cluster.uid_pool,
322 "default_iallocator": cluster.default_iallocator,
323 "reserved_lvs": cluster.reserved_lvs,
324 "primary_ip_version": primary_ip_version,
325 "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
326 "hidden_os": cluster.hidden_os,
327 "blacklisted_os": cluster.blacklisted_os,
328 "enabled_disk_templates": cluster.enabled_disk_templates,
334 class LUClusterRedistConf(NoHooksLU):
335 """Force the redistribution of cluster configuration.
337 This is a very simple LU.
342 def ExpandNames(self):
343 self.needed_locks = {
344 locking.LEVEL_NODE: locking.ALL_SET,
345 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
347 self.share_locks = ShareAll()
349 def Exec(self, feedback_fn):
350 """Redistribute the configuration.
353 self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
354 RedistributeAncillaryFiles(self)
357 class LUClusterRename(LogicalUnit):
358 """Rename the cluster.
361 HPATH = "cluster-rename"
362 HTYPE = constants.HTYPE_CLUSTER
364 def BuildHooksEnv(self):
369 "OP_TARGET": self.cfg.GetClusterName(),
370 "NEW_NAME": self.op.name,
373 def BuildHooksNodes(self):
374 """Build hooks nodes.
377 return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
379 def CheckPrereq(self):
380 """Verify that the passed name is a valid one.
383 hostname = netutils.GetHostname(name=self.op.name,
384 family=self.cfg.GetPrimaryIPFamily())
386 new_name = hostname.name
387 self.ip = new_ip = hostname.ip
388 old_name = self.cfg.GetClusterName()
389 old_ip = self.cfg.GetMasterIP()
390 if new_name == old_name and new_ip == old_ip:
391 raise errors.OpPrereqError("Neither the name nor the IP address of the"
392 " cluster has changed",
395 if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
396 raise errors.OpPrereqError("The given cluster IP address (%s) is"
397 " reachable on the network" %
398 new_ip, errors.ECODE_NOTUNIQUE)
400 self.op.name = new_name
402 def Exec(self, feedback_fn):
403 """Rename the cluster.
406 clustername = self.op.name
409 # shutdown the master IP
410 master_params = self.cfg.GetMasterNetworkParameters()
411 ems = self.cfg.GetUseExternalMipScript()
412 result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
414 result.Raise("Could not disable the master role")
417 cluster = self.cfg.GetClusterInfo()
418 cluster.cluster_name = clustername
419 cluster.master_ip = new_ip
420 self.cfg.Update(cluster, feedback_fn)
422 # update the known hosts file
423 ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
424 node_list = self.cfg.GetOnlineNodeList()
426 node_list.remove(master_params.uuid)
429 UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
431 master_params.ip = new_ip
432 result = self.rpc.call_node_activate_master_ip(master_params.uuid,
434 result.Warn("Could not re-enable the master role on the master,"
435 " please restart manually", self.LogWarning)
440 class LUClusterRepairDiskSizes(NoHooksLU):
441 """Verifies the cluster disks sizes.
446 def ExpandNames(self):
447 if self.op.instances:
448 (_, self.wanted_names) = GetWantedInstances(self, self.op.instances)
449 # Not getting the node allocation lock as only a specific set of
450 # instances (and their nodes) is going to be acquired
451 self.needed_locks = {
452 locking.LEVEL_NODE_RES: [],
453 locking.LEVEL_INSTANCE: self.wanted_names,
455 self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
457 self.wanted_names = None
458 self.needed_locks = {
459 locking.LEVEL_NODE_RES: locking.ALL_SET,
460 locking.LEVEL_INSTANCE: locking.ALL_SET,
462 # This opcode is acquires the node locks for all instances
463 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
467 locking.LEVEL_NODE_RES: 1,
468 locking.LEVEL_INSTANCE: 0,
469 locking.LEVEL_NODE_ALLOC: 1,
472 def DeclareLocks(self, level):
473 if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
474 self._LockInstancesNodes(primary_only=True, level=level)
476 def CheckPrereq(self):
477 """Check prerequisites.
479 This only checks the optional instance list against the existing names.
482 if self.wanted_names is None:
483 self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
485 self.wanted_instances = \
486 map(compat.snd, self.cfg.GetMultiInstanceInfoByName(self.wanted_names))
488 def _EnsureChildSizes(self, disk):
489 """Ensure children of the disk have the needed disk size.
491 This is valid mainly for DRBD8 and fixes an issue where the
492 children have smaller disk size.
494 @param disk: an L{ganeti.objects.Disk} object
497 if disk.dev_type == constants.LD_DRBD8:
498 assert disk.children, "Empty children for DRBD8?"
499 fchild = disk.children[0]
500 mismatch = fchild.size < disk.size
502 self.LogInfo("Child disk has size %d, parent %d, fixing",
503 fchild.size, disk.size)
504 fchild.size = disk.size
506 # and we recurse on this child only, not on the metadev
507 return self._EnsureChildSizes(fchild) or mismatch
511 def Exec(self, feedback_fn):
512 """Verify the size of cluster disks.
515 # TODO: check child disks too
516 # TODO: check differences in size between primary/secondary nodes
518 for instance in self.wanted_instances:
519 pnode = instance.primary_node
520 if pnode not in per_node_disks:
521 per_node_disks[pnode] = []
522 for idx, disk in enumerate(instance.disks):
523 per_node_disks[pnode].append((instance, idx, disk))
525 assert not (frozenset(per_node_disks.keys()) -
526 self.owned_locks(locking.LEVEL_NODE_RES)), \
527 "Not owning correct locks"
528 assert not self.owned_locks(locking.LEVEL_NODE)
530 es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
531 per_node_disks.keys())
534 for node_uuid, dskl in per_node_disks.items():
535 newl = [v[2].Copy() for v in dskl]
537 self.cfg.SetDiskID(dsk, node_uuid)
538 node_name = self.cfg.GetNodeName(node_uuid)
539 result = self.rpc.call_blockdev_getdimensions(node_uuid, newl)
541 self.LogWarning("Failure in blockdev_getdimensions call to node"
542 " %s, ignoring", node_name)
544 if len(result.payload) != len(dskl):
545 logging.warning("Invalid result from node %s: len(dksl)=%d,"
546 " result.payload=%s", node_name, len(dskl),
548 self.LogWarning("Invalid result from node %s, ignoring node results",
551 for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
552 if dimensions is None:
553 self.LogWarning("Disk %d of instance %s did not return size"
554 " information, ignoring", idx, instance.name)
556 if not isinstance(dimensions, (tuple, list)):
557 self.LogWarning("Disk %d of instance %s did not return valid"
558 " dimension information, ignoring", idx,
561 (size, spindles) = dimensions
562 if not isinstance(size, (int, long)):
563 self.LogWarning("Disk %d of instance %s did not return valid"
564 " size information, ignoring", idx, instance.name)
567 if size != disk.size:
568 self.LogInfo("Disk %d of instance %s has mismatched size,"
569 " correcting: recorded %d, actual %d", idx,
570 instance.name, disk.size, size)
572 self.cfg.Update(instance, feedback_fn)
573 changed.append((instance.name, idx, "size", size))
574 if es_flags[node_uuid]:
576 self.LogWarning("Disk %d of instance %s did not return valid"
577 " spindles information, ignoring", idx,
579 elif disk.spindles is None or disk.spindles != spindles:
580 self.LogInfo("Disk %d of instance %s has mismatched spindles,"
581 " correcting: recorded %s, actual %s",
582 idx, instance.name, disk.spindles, spindles)
583 disk.spindles = spindles
584 self.cfg.Update(instance, feedback_fn)
585 changed.append((instance.name, idx, "spindles", disk.spindles))
586 if self._EnsureChildSizes(disk):
587 self.cfg.Update(instance, feedback_fn)
588 changed.append((instance.name, idx, "size", disk.size))
592 def _ValidateNetmask(cfg, netmask):
593 """Checks if a netmask is valid.
595 @type cfg: L{config.ConfigWriter}
596 @param cfg: The cluster configuration
598 @param netmask: the netmask to be verified
599 @raise errors.OpPrereqError: if the validation fails
602 ip_family = cfg.GetPrimaryIPFamily()
604 ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
605 except errors.ProgrammerError:
606 raise errors.OpPrereqError("Invalid primary ip family: %s." %
607 ip_family, errors.ECODE_INVAL)
608 if not ipcls.ValidateNetmask(netmask):
609 raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
610 (netmask), errors.ECODE_INVAL)
613 def CheckFileBasedStoragePathVsEnabledDiskTemplates(
614 logging_warn_fn, file_storage_dir, enabled_disk_templates,
616 """Checks whether the given file-based storage directory is acceptable.
618 Note: This function is public, because it is also used in bootstrap.py.
620 @type logging_warn_fn: function
621 @param logging_warn_fn: function which accepts a string and logs it
622 @type file_storage_dir: string
623 @param file_storage_dir: the directory to be used for file-based instances
624 @type enabled_disk_templates: list of string
625 @param enabled_disk_templates: the list of enabled disk templates
626 @type file_disk_template: string
627 @param file_disk_template: the file-based disk template for which the
628 path should be checked
631 assert (file_disk_template in
632 utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
633 file_storage_enabled = file_disk_template in enabled_disk_templates
634 if file_storage_dir is not None:
635 if file_storage_dir == "":
636 if file_storage_enabled:
637 raise errors.OpPrereqError(
638 "Unsetting the '%s' storage directory while having '%s' storage"
639 " enabled is not permitted." %
640 (file_disk_template, file_disk_template))
642 if not file_storage_enabled:
644 "Specified a %s storage directory, although %s storage is not"
645 " enabled." % (file_disk_template, file_disk_template))
647 raise errors.ProgrammerError("Received %s storage dir with value"
648 " 'None'." % file_disk_template)
651 def CheckFileStoragePathVsEnabledDiskTemplates(
652 logging_warn_fn, file_storage_dir, enabled_disk_templates):
653 """Checks whether the given file storage directory is acceptable.
655 @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
658 CheckFileBasedStoragePathVsEnabledDiskTemplates(
659 logging_warn_fn, file_storage_dir, enabled_disk_templates,
663 def CheckSharedFileStoragePathVsEnabledDiskTemplates(
664 logging_warn_fn, file_storage_dir, enabled_disk_templates):
665 """Checks whether the given shared file storage directory is acceptable.
667 @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
670 CheckFileBasedStoragePathVsEnabledDiskTemplates(
671 logging_warn_fn, file_storage_dir, enabled_disk_templates,
672 constants.DT_SHARED_FILE)
675 class LUClusterSetParams(LogicalUnit):
676 """Change the parameters of the cluster.
679 HPATH = "cluster-modify"
680 HTYPE = constants.HTYPE_CLUSTER
683 def CheckArguments(self):
688 uidpool.CheckUidPool(self.op.uid_pool)
691 uidpool.CheckUidPool(self.op.add_uids)
693 if self.op.remove_uids:
694 uidpool.CheckUidPool(self.op.remove_uids)
696 if self.op.master_netmask is not None:
697 _ValidateNetmask(self.cfg, self.op.master_netmask)
699 if self.op.diskparams:
700 for dt_params in self.op.diskparams.values():
701 utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
703 utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
704 except errors.OpPrereqError, err:
705 raise errors.OpPrereqError("While verify diskparams options: %s" % err,
708 def ExpandNames(self):
709 # FIXME: in the future maybe other cluster params won't require checking on
710 # all nodes to be modified.
711 # FIXME: This opcode changes cluster-wide settings. Is acquiring all
712 # resource locks the right thing, shouldn't it be the BGL instead?
713 self.needed_locks = {
714 locking.LEVEL_NODE: locking.ALL_SET,
715 locking.LEVEL_INSTANCE: locking.ALL_SET,
716 locking.LEVEL_NODEGROUP: locking.ALL_SET,
717 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
719 self.share_locks = ShareAll()
721 def BuildHooksEnv(self):
726 "OP_TARGET": self.cfg.GetClusterName(),
727 "NEW_VG_NAME": self.op.vg_name,
730 def BuildHooksNodes(self):
731 """Build hooks nodes.
734 mn = self.cfg.GetMasterNode()
737 def _CheckVgName(self, node_uuids, enabled_disk_templates,
738 new_enabled_disk_templates):
739 """Check the consistency of the vg name on all nodes and in case it gets
740 unset whether there are instances still using it.
743 if self.op.vg_name is not None and not self.op.vg_name:
744 if self.cfg.HasAnyDiskOfType(constants.LD_LV):
745 raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
746 " instances exist", errors.ECODE_INVAL)
748 if (self.op.vg_name is not None and
749 utils.IsLvmEnabled(enabled_disk_templates)) or \
750 (self.cfg.GetVGName() is not None and
751 utils.LvmGetsEnabled(enabled_disk_templates,
752 new_enabled_disk_templates)):
753 self._CheckVgNameOnNodes(node_uuids)
755 def _CheckVgNameOnNodes(self, node_uuids):
756 """Check the status of the volume group on each node.
759 vglist = self.rpc.call_vg_list(node_uuids)
760 for node_uuid in node_uuids:
761 msg = vglist[node_uuid].fail_msg
764 self.LogWarning("Error while gathering data on node %s"
765 " (ignoring node): %s",
766 self.cfg.GetNodeName(node_uuid), msg)
768 vgstatus = utils.CheckVolumeGroupSize(vglist[node_uuid].payload,
770 constants.MIN_VG_SIZE)
772 raise errors.OpPrereqError("Error on node '%s': %s" %
773 (self.cfg.GetNodeName(node_uuid), vgstatus),
774 errors.ECODE_ENVIRON)
776 def _GetEnabledDiskTemplates(self, cluster):
777 """Determines the enabled disk templates and the subset of disk templates
778 that are newly enabled by this operation.
781 enabled_disk_templates = None
782 new_enabled_disk_templates = []
783 if self.op.enabled_disk_templates:
784 enabled_disk_templates = self.op.enabled_disk_templates
785 new_enabled_disk_templates = \
786 list(set(enabled_disk_templates)
787 - set(cluster.enabled_disk_templates))
789 enabled_disk_templates = cluster.enabled_disk_templates
790 return (enabled_disk_templates, new_enabled_disk_templates)
792 def CheckPrereq(self):
793 """Check prerequisites.
795 This checks whether the given params don't conflict and
796 if the given volume group is valid.
799 if self.op.drbd_helper is not None and not self.op.drbd_helper:
800 if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
801 raise errors.OpPrereqError("Cannot disable drbd helper while"
802 " drbd-based instances exist",
805 node_uuids = self.owned_locks(locking.LEVEL_NODE)
806 self.cluster = cluster = self.cfg.GetClusterInfo()
808 vm_capable_node_uuids = [node.uuid
809 for node in self.cfg.GetAllNodesInfo().values()
810 if node.uuid in node_uuids and node.vm_capable]
812 (enabled_disk_templates, new_enabled_disk_templates) = \
813 self._GetEnabledDiskTemplates(cluster)
815 self._CheckVgName(vm_capable_node_uuids, enabled_disk_templates,
816 new_enabled_disk_templates)
818 if self.op.file_storage_dir is not None:
819 CheckFileStoragePathVsEnabledDiskTemplates(
820 self.LogWarning, self.op.file_storage_dir, enabled_disk_templates)
822 if self.op.shared_file_storage_dir is not None:
823 CheckSharedFileStoragePathVsEnabledDiskTemplates(
824 self.LogWarning, self.op.shared_file_storage_dir,
825 enabled_disk_templates)
827 if self.op.drbd_helper:
828 # checks given drbd helper on all nodes
829 helpers = self.rpc.call_drbd_helper(node_uuids)
830 for (_, ninfo) in self.cfg.GetMultiNodeInfo(node_uuids):
832 self.LogInfo("Not checking drbd helper on offline node %s",
835 msg = helpers[ninfo.uuid].fail_msg
837 raise errors.OpPrereqError("Error checking drbd helper on node"
838 " '%s': %s" % (ninfo.name, msg),
839 errors.ECODE_ENVIRON)
840 node_helper = helpers[ninfo.uuid].payload
841 if node_helper != self.op.drbd_helper:
842 raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
843 (ninfo.name, node_helper),
844 errors.ECODE_ENVIRON)
846 # validate params changes
848 objects.UpgradeBeParams(self.op.beparams)
849 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
850 self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
853 utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
854 self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
856 # TODO: we need a more general way to handle resetting
857 # cluster-level parameters to default values
858 if self.new_ndparams["oob_program"] == "":
859 self.new_ndparams["oob_program"] = \
860 constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
863 new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
864 self.cluster.hv_state_static)
865 self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
866 for hv, values in new_hv_state.items())
868 if self.op.disk_state:
869 new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
870 self.cluster.disk_state_static)
871 self.new_disk_state = \
872 dict((storage, dict((name, cluster.SimpleFillDiskState(values))
873 for name, values in svalues.items()))
874 for storage, svalues in new_disk_state.items())
877 self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
880 all_instances = self.cfg.GetAllInstancesInfo().values()
882 for group in self.cfg.GetAllNodeGroupsInfo().values():
883 instances = frozenset([inst for inst in all_instances
884 if compat.any(nuuid in group.members
885 for nuuid in inst.all_nodes)])
886 new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
887 ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
888 new = ComputeNewInstanceViolations(ipol, new_ipolicy, instances,
891 violations.update(new)
894 self.LogWarning("After the ipolicy change the following instances"
896 utils.CommaJoin(utils.NiceSort(violations)))
898 if self.op.nicparams:
899 utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
900 self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
901 objects.NIC.CheckParameterSyntax(self.new_nicparams)
904 # check all instances for consistency
905 for instance in self.cfg.GetAllInstancesInfo().values():
906 for nic_idx, nic in enumerate(instance.nics):
907 params_copy = copy.deepcopy(nic.nicparams)
908 params_filled = objects.FillDict(self.new_nicparams, params_copy)
910 # check parameter syntax
912 objects.NIC.CheckParameterSyntax(params_filled)
913 except errors.ConfigurationError, err:
914 nic_errors.append("Instance %s, nic/%d: %s" %
915 (instance.name, nic_idx, err))
917 # if we're moving instances to routed, check that they have an ip
918 target_mode = params_filled[constants.NIC_MODE]
919 if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
920 nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
921 " address" % (instance.name, nic_idx))
923 raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
924 "\n".join(nic_errors), errors.ECODE_INVAL)
926 # hypervisor list/parameters
927 self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
929 for hv_name, hv_dict in self.op.hvparams.items():
930 if hv_name not in self.new_hvparams:
931 self.new_hvparams[hv_name] = hv_dict
933 self.new_hvparams[hv_name].update(hv_dict)
935 # disk template parameters
936 self.new_diskparams = objects.FillDict(cluster.diskparams, {})
937 if self.op.diskparams:
938 for dt_name, dt_params in self.op.diskparams.items():
939 if dt_name not in self.new_diskparams:
940 self.new_diskparams[dt_name] = dt_params
942 self.new_diskparams[dt_name].update(dt_params)
944 # os hypervisor parameters
945 self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
947 for os_name, hvs in self.op.os_hvp.items():
948 if os_name not in self.new_os_hvp:
949 self.new_os_hvp[os_name] = hvs
951 for hv_name, hv_dict in hvs.items():
953 # Delete if it exists
954 self.new_os_hvp[os_name].pop(hv_name, None)
955 elif hv_name not in self.new_os_hvp[os_name]:
956 self.new_os_hvp[os_name][hv_name] = hv_dict
958 self.new_os_hvp[os_name][hv_name].update(hv_dict)
961 self.new_osp = objects.FillDict(cluster.osparams, {})
963 for os_name, osp in self.op.osparams.items():
964 if os_name not in self.new_osp:
965 self.new_osp[os_name] = {}
967 self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
970 if not self.new_osp[os_name]:
971 # we removed all parameters
972 del self.new_osp[os_name]
974 # check the parameter validity (remote check)
975 CheckOSParams(self, False, [self.cfg.GetMasterNode()],
976 os_name, self.new_osp[os_name])
978 # changes to the hypervisor list
979 if self.op.enabled_hypervisors is not None:
980 self.hv_list = self.op.enabled_hypervisors
981 for hv in self.hv_list:
982 # if the hypervisor doesn't already exist in the cluster
983 # hvparams, we initialize it to empty, and then (in both
984 # cases) we make sure to fill the defaults, as we might not
985 # have a complete defaults list if the hypervisor wasn't
987 if hv not in new_hvp:
989 new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
990 utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
992 self.hv_list = cluster.enabled_hypervisors
994 if self.op.hvparams or self.op.enabled_hypervisors is not None:
995 # either the enabled list has changed, or the parameters have, validate
996 for hv_name, hv_params in self.new_hvparams.items():
997 if ((self.op.hvparams and hv_name in self.op.hvparams) or
998 (self.op.enabled_hypervisors and
999 hv_name in self.op.enabled_hypervisors)):
1000 # either this is a new hypervisor, or its parameters have changed
1001 hv_class = hypervisor.GetHypervisorClass(hv_name)
1002 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1003 hv_class.CheckParameterSyntax(hv_params)
1004 CheckHVParams(self, node_uuids, hv_name, hv_params)
1006 self._CheckDiskTemplateConsistency()
1009 # no need to check any newly-enabled hypervisors, since the
1010 # defaults have already been checked in the above code-block
1011 for os_name, os_hvp in self.new_os_hvp.items():
1012 for hv_name, hv_params in os_hvp.items():
1013 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1014 # we need to fill in the new os_hvp on top of the actual hv_p
1015 cluster_defaults = self.new_hvparams.get(hv_name, {})
1016 new_osp = objects.FillDict(cluster_defaults, hv_params)
1017 hv_class = hypervisor.GetHypervisorClass(hv_name)
1018 hv_class.CheckParameterSyntax(new_osp)
1019 CheckHVParams(self, node_uuids, hv_name, new_osp)
1021 if self.op.default_iallocator:
1022 alloc_script = utils.FindFile(self.op.default_iallocator,
1023 constants.IALLOCATOR_SEARCH_PATH,
1025 if alloc_script is None:
1026 raise errors.OpPrereqError("Invalid default iallocator script '%s'"
1027 " specified" % self.op.default_iallocator,
1030 def _CheckDiskTemplateConsistency(self):
1031 """Check whether the disk templates that are going to be disabled
1032 are still in use by some instances.
1035 if self.op.enabled_disk_templates:
1036 cluster = self.cfg.GetClusterInfo()
1037 instances = self.cfg.GetAllInstancesInfo()
1039 disk_templates_to_remove = set(cluster.enabled_disk_templates) \
1040 - set(self.op.enabled_disk_templates)
1041 for instance in instances.itervalues():
1042 if instance.disk_template in disk_templates_to_remove:
1043 raise errors.OpPrereqError("Cannot disable disk template '%s',"
1044 " because instance '%s' is using it." %
1045 (instance.disk_template, instance.name))
1047 def _SetVgName(self, feedback_fn):
1048 """Determines and sets the new volume group name.
1051 if self.op.vg_name is not None:
1052 if self.op.vg_name and not \
1053 utils.IsLvmEnabled(self.cluster.enabled_disk_templates):
1054 feedback_fn("Note that you specified a volume group, but did not"
1055 " enable any lvm disk template.")
1056 new_volume = self.op.vg_name
1058 if utils.IsLvmEnabled(self.cluster.enabled_disk_templates):
1059 raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
1060 " disk templates are enabled.")
1062 if new_volume != self.cfg.GetVGName():
1063 self.cfg.SetVGName(new_volume)
1065 feedback_fn("Cluster LVM configuration already in desired"
1066 " state, not changing")
1068 if utils.IsLvmEnabled(self.cluster.enabled_disk_templates) and \
1069 not self.cfg.GetVGName():
1070 raise errors.OpPrereqError("Please specify a volume group when"
1071 " enabling lvm-based disk-templates.")
1073 def _SetFileStorageDir(self, feedback_fn):
1074 """Set the file storage directory.
1077 if self.op.file_storage_dir is not None:
1078 if self.cluster.file_storage_dir == self.op.file_storage_dir:
1079 feedback_fn("Global file storage dir already set to value '%s'"
1080 % self.cluster.file_storage_dir)
1082 self.cluster.file_storage_dir = self.op.file_storage_dir
1084 def Exec(self, feedback_fn):
1085 """Change the parameters of the cluster.
1088 if self.op.enabled_disk_templates:
1089 self.cluster.enabled_disk_templates = \
1090 list(set(self.op.enabled_disk_templates))
1092 self._SetVgName(feedback_fn)
1093 self._SetFileStorageDir(feedback_fn)
1095 if self.op.drbd_helper is not None:
1096 if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
1097 feedback_fn("Note that you specified a drbd user helper, but did not"
1098 " enable the drbd disk template.")
1099 new_helper = self.op.drbd_helper
1102 if new_helper != self.cfg.GetDRBDHelper():
1103 self.cfg.SetDRBDHelper(new_helper)
1105 feedback_fn("Cluster DRBD helper already in desired state,"
1107 if self.op.hvparams:
1108 self.cluster.hvparams = self.new_hvparams
1110 self.cluster.os_hvp = self.new_os_hvp
1111 if self.op.enabled_hypervisors is not None:
1112 self.cluster.hvparams = self.new_hvparams
1113 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1114 if self.op.beparams:
1115 self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1116 if self.op.nicparams:
1117 self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1119 self.cluster.ipolicy = self.new_ipolicy
1120 if self.op.osparams:
1121 self.cluster.osparams = self.new_osp
1122 if self.op.ndparams:
1123 self.cluster.ndparams = self.new_ndparams
1124 if self.op.diskparams:
1125 self.cluster.diskparams = self.new_diskparams
1126 if self.op.hv_state:
1127 self.cluster.hv_state_static = self.new_hv_state
1128 if self.op.disk_state:
1129 self.cluster.disk_state_static = self.new_disk_state
1131 if self.op.candidate_pool_size is not None:
1132 self.cluster.candidate_pool_size = self.op.candidate_pool_size
1133 # we need to update the pool size here, otherwise the save will fail
1134 AdjustCandidatePool(self, [])
1136 if self.op.maintain_node_health is not None:
1137 if self.op.maintain_node_health and not constants.ENABLE_CONFD:
1138 feedback_fn("Note: CONFD was disabled at build time, node health"
1139 " maintenance is not useful (still enabling it)")
1140 self.cluster.maintain_node_health = self.op.maintain_node_health
1142 if self.op.modify_etc_hosts is not None:
1143 self.cluster.modify_etc_hosts = self.op.modify_etc_hosts
1145 if self.op.prealloc_wipe_disks is not None:
1146 self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
1148 if self.op.add_uids is not None:
1149 uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
1151 if self.op.remove_uids is not None:
1152 uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
1154 if self.op.uid_pool is not None:
1155 self.cluster.uid_pool = self.op.uid_pool
1157 if self.op.default_iallocator is not None:
1158 self.cluster.default_iallocator = self.op.default_iallocator
1160 if self.op.reserved_lvs is not None:
1161 self.cluster.reserved_lvs = self.op.reserved_lvs
1163 if self.op.use_external_mip_script is not None:
1164 self.cluster.use_external_mip_script = self.op.use_external_mip_script
1166 def helper_os(aname, mods, desc):
1168 lst = getattr(self.cluster, aname)
1169 for key, val in mods:
1170 if key == constants.DDM_ADD:
1172 feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1175 elif key == constants.DDM_REMOVE:
1179 feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1181 raise errors.ProgrammerError("Invalid modification '%s'" % key)
1183 if self.op.hidden_os:
1184 helper_os("hidden_os", self.op.hidden_os, "hidden")
1186 if self.op.blacklisted_os:
1187 helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1189 if self.op.master_netdev:
1190 master_params = self.cfg.GetMasterNetworkParameters()
1191 ems = self.cfg.GetUseExternalMipScript()
1192 feedback_fn("Shutting down master ip on the current netdev (%s)" %
1193 self.cluster.master_netdev)
1194 result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
1196 if not self.op.force:
1197 result.Raise("Could not disable the master ip")
1200 msg = ("Could not disable the master ip (continuing anyway): %s" %
1203 feedback_fn("Changing master_netdev from %s to %s" %
1204 (master_params.netdev, self.op.master_netdev))
1205 self.cluster.master_netdev = self.op.master_netdev
1207 if self.op.master_netmask:
1208 master_params = self.cfg.GetMasterNetworkParameters()
1209 feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1210 result = self.rpc.call_node_change_master_netmask(
1211 master_params.uuid, master_params.netmask,
1212 self.op.master_netmask, master_params.ip,
1213 master_params.netdev)
1214 result.Warn("Could not change the master IP netmask", feedback_fn)
1215 self.cluster.master_netmask = self.op.master_netmask
1217 self.cfg.Update(self.cluster, feedback_fn)
1219 if self.op.master_netdev:
1220 master_params = self.cfg.GetMasterNetworkParameters()
1221 feedback_fn("Starting the master ip on the new master netdev (%s)" %
1222 self.op.master_netdev)
1223 ems = self.cfg.GetUseExternalMipScript()
1224 result = self.rpc.call_node_activate_master_ip(master_params.uuid,
1226 result.Warn("Could not re-enable the master ip on the master,"
1227 " please restart manually", self.LogWarning)
1230 class LUClusterVerify(NoHooksLU):
1231 """Submits all jobs necessary to verify the cluster.
1236 def ExpandNames(self):
1237 self.needed_locks = {}
1239 def Exec(self, feedback_fn):
1242 if self.op.group_name:
1243 groups = [self.op.group_name]
1244 depends_fn = lambda: None
1246 groups = self.cfg.GetNodeGroupList()
1248 # Verify global configuration
1250 opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1253 # Always depend on global verification
1254 depends_fn = lambda: [(-len(jobs), [])]
1257 [opcodes.OpClusterVerifyGroup(group_name=group,
1258 ignore_errors=self.op.ignore_errors,
1259 depends=depends_fn())]
1260 for group in groups)
1262 # Fix up all parameters
1263 for op in itertools.chain(*jobs): # pylint: disable=W0142
1264 op.debug_simulate_errors = self.op.debug_simulate_errors
1265 op.verbose = self.op.verbose
1266 op.error_codes = self.op.error_codes
1268 op.skip_checks = self.op.skip_checks
1269 except AttributeError:
1270 assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1272 return ResultWithJobs(jobs)
1275 class _VerifyErrors(object):
1276 """Mix-in for cluster/group verify LUs.
1278 It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1279 self.op and self._feedback_fn to be available.)
1283 ETYPE_FIELD = "code"
1284 ETYPE_ERROR = "ERROR"
1285 ETYPE_WARNING = "WARNING"
1287 def _Error(self, ecode, item, msg, *args, **kwargs):
1288 """Format an error message.
1290 Based on the opcode's error_codes parameter, either format a
1291 parseable error code, or a simpler error string.
1293 This must be called only from Exec and functions called from Exec.
1296 ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1297 itype, etxt, _ = ecode
1298 # If the error code is in the list of ignored errors, demote the error to a
1300 if etxt in self.op.ignore_errors: # pylint: disable=E1101
1301 ltype = self.ETYPE_WARNING
1302 # first complete the msg
1305 # then format the whole message
1306 if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1307 msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1313 msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1314 # and finally report it via the feedback_fn
1315 self._feedback_fn(" - %s" % msg) # Mix-in. pylint: disable=E1101
1316 # do not mark the operation as failed for WARN cases only
1317 if ltype == self.ETYPE_ERROR:
1320 def _ErrorIf(self, cond, *args, **kwargs):
1321 """Log an error message if the passed condition is True.
1325 or self.op.debug_simulate_errors): # pylint: disable=E1101
1326 self._Error(*args, **kwargs)
1329 def _VerifyCertificate(filename):
1330 """Verifies a certificate for L{LUClusterVerifyConfig}.
1332 @type filename: string
1333 @param filename: Path to PEM file
1337 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1338 utils.ReadFile(filename))
1339 except Exception, err: # pylint: disable=W0703
1340 return (LUClusterVerifyConfig.ETYPE_ERROR,
1341 "Failed to load X509 certificate %s: %s" % (filename, err))
1344 utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1345 constants.SSL_CERT_EXPIRATION_ERROR)
1348 fnamemsg = "While verifying %s: %s" % (filename, msg)
1353 return (None, fnamemsg)
1354 elif errcode == utils.CERT_WARNING:
1355 return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1356 elif errcode == utils.CERT_ERROR:
1357 return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1359 raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1362 def _GetAllHypervisorParameters(cluster, instances):
1363 """Compute the set of all hypervisor parameters.
1365 @type cluster: L{objects.Cluster}
1366 @param cluster: the cluster object
1367 @param instances: list of L{objects.Instance}
1368 @param instances: additional instances from which to obtain parameters
1369 @rtype: list of (origin, hypervisor, parameters)
1370 @return: a list with all parameters found, indicating the hypervisor they
1371 apply to, and the origin (can be "cluster", "os X", or "instance Y")
1376 for hv_name in cluster.enabled_hypervisors:
1377 hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1379 for os_name, os_hvp in cluster.os_hvp.items():
1380 for hv_name, hv_params in os_hvp.items():
1382 full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1383 hvp_data.append(("os %s" % os_name, hv_name, full_params))
1385 # TODO: collapse identical parameter values in a single one
1386 for instance in instances:
1387 if instance.hvparams:
1388 hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1389 cluster.FillHV(instance)))
1394 class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1395 """Verifies the cluster config.
1400 def _VerifyHVP(self, hvp_data):
1401 """Verifies locally the syntax of the hypervisor parameters.
1404 for item, hv_name, hv_params in hvp_data:
1405 msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1408 hv_class = hypervisor.GetHypervisorClass(hv_name)
1409 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1410 hv_class.CheckParameterSyntax(hv_params)
1411 except errors.GenericError, err:
1412 self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1414 def ExpandNames(self):
1415 self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1416 self.share_locks = ShareAll()
1418 def CheckPrereq(self):
1419 """Check prerequisites.
1422 # Retrieve all information
1423 self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1424 self.all_node_info = self.cfg.GetAllNodesInfo()
1425 self.all_inst_info = self.cfg.GetAllInstancesInfo()
1427 def Exec(self, feedback_fn):
1428 """Verify integrity of cluster, performing various test on nodes.
1432 self._feedback_fn = feedback_fn
1434 feedback_fn("* Verifying cluster config")
1436 for msg in self.cfg.VerifyConfig():
1437 self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1439 feedback_fn("* Verifying cluster certificate files")
1441 for cert_filename in pathutils.ALL_CERT_FILES:
1442 (errcode, msg) = _VerifyCertificate(cert_filename)
1443 self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1445 self._ErrorIf(not utils.CanRead(constants.LUXID_USER,
1446 pathutils.NODED_CERT_FILE),
1447 constants.CV_ECLUSTERCERT,
1449 pathutils.NODED_CERT_FILE + " must be accessible by the " +
1450 constants.LUXID_USER + " user")
1452 feedback_fn("* Verifying hypervisor parameters")
1454 self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1455 self.all_inst_info.values()))
1457 feedback_fn("* Verifying all nodes belong to an existing group")
1459 # We do this verification here because, should this bogus circumstance
1460 # occur, it would never be caught by VerifyGroup, which only acts on
1461 # nodes/instances reachable from existing node groups.
1463 dangling_nodes = set(node for node in self.all_node_info.values()
1464 if node.group not in self.all_group_info)
1466 dangling_instances = {}
1467 no_node_instances = []
1469 for inst in self.all_inst_info.values():
1470 if inst.primary_node in [node.uuid for node in dangling_nodes]:
1471 dangling_instances.setdefault(inst.primary_node, []).append(inst)
1472 elif inst.primary_node not in self.all_node_info:
1473 no_node_instances.append(inst)
1478 utils.CommaJoin(inst.name for
1479 inst in dangling_instances.get(node.uuid, [])))
1480 for node in dangling_nodes]
1482 self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1484 "the following nodes (and their instances) belong to a non"
1485 " existing group: %s", utils.CommaJoin(pretty_dangling))
1487 self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1489 "the following instances have a non-existing primary-node:"
1490 " %s", utils.CommaJoin(inst.name for
1491 inst in no_node_instances))
1496 class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1497 """Verifies the status of a node group.
1500 HPATH = "cluster-verify"
1501 HTYPE = constants.HTYPE_CLUSTER
1504 _HOOKS_INDENT_RE = re.compile("^", re.M)
1506 class NodeImage(object):
1507 """A class representing the logical and physical status of a node.
1510 @ivar uuid: the node UUID to which this object refers
1511 @ivar volumes: a structure as returned from
1512 L{ganeti.backend.GetVolumeList} (runtime)
1513 @ivar instances: a list of running instances (runtime)
1514 @ivar pinst: list of configured primary instances (config)
1515 @ivar sinst: list of configured secondary instances (config)
1516 @ivar sbp: dictionary of {primary-node: list of instances} for all
1517 instances for which this node is secondary (config)
1518 @ivar mfree: free memory, as reported by hypervisor (runtime)
1519 @ivar dfree: free disk, as reported by the node (runtime)
1520 @ivar offline: the offline status (config)
1521 @type rpc_fail: boolean
1522 @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1523 not whether the individual keys were correct) (runtime)
1524 @type lvm_fail: boolean
1525 @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1526 @type hyp_fail: boolean
1527 @ivar hyp_fail: whether the RPC call didn't return the instance list
1528 @type ghost: boolean
1529 @ivar ghost: whether this is a known node or not (config)
1530 @type os_fail: boolean
1531 @ivar os_fail: whether the RPC call didn't return valid OS data
1533 @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1534 @type vm_capable: boolean
1535 @ivar vm_capable: whether the node can host instances
1537 @ivar pv_min: size in MiB of the smallest PVs
1539 @ivar pv_max: size in MiB of the biggest PVs
1542 def __init__(self, offline=False, uuid=None, vm_capable=True):
1551 self.offline = offline
1552 self.vm_capable = vm_capable
1553 self.rpc_fail = False
1554 self.lvm_fail = False
1555 self.hyp_fail = False
1557 self.os_fail = False
1562 def ExpandNames(self):
1563 # This raises errors.OpPrereqError on its own:
1564 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1566 # Get instances in node group; this is unsafe and needs verification later
1568 self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1570 self.needed_locks = {
1571 locking.LEVEL_INSTANCE: self.cfg.GetInstanceNames(inst_uuids),
1572 locking.LEVEL_NODEGROUP: [self.group_uuid],
1573 locking.LEVEL_NODE: [],
1575 # This opcode is run by watcher every five minutes and acquires all nodes
1576 # for a group. It doesn't run for a long time, so it's better to acquire
1577 # the node allocation lock as well.
1578 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1581 self.share_locks = ShareAll()
1583 def DeclareLocks(self, level):
1584 if level == locking.LEVEL_NODE:
1585 # Get members of node group; this is unsafe and needs verification later
1586 nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1588 # In Exec(), we warn about mirrored instances that have primary and
1589 # secondary living in separate node groups. To fully verify that
1590 # volumes for these instances are healthy, we will need to do an
1591 # extra call to their secondaries. We ensure here those nodes will
1593 for inst_name in self.owned_locks(locking.LEVEL_INSTANCE):
1594 # Important: access only the instances whose lock is owned
1595 instance = self.cfg.GetInstanceInfoByName(inst_name)
1596 if instance.disk_template in constants.DTS_INT_MIRROR:
1597 nodes.update(instance.secondary_nodes)
1599 self.needed_locks[locking.LEVEL_NODE] = nodes
1601 def CheckPrereq(self):
1602 assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1603 self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1605 group_node_uuids = set(self.group_info.members)
1606 group_inst_uuids = \
1607 self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1609 unlocked_node_uuids = \
1610 group_node_uuids.difference(self.owned_locks(locking.LEVEL_NODE))
1612 unlocked_inst_uuids = \
1613 group_inst_uuids.difference(
1614 [self.cfg.GetInstanceInfoByName(name).uuid
1615 for name in self.owned_locks(locking.LEVEL_INSTANCE)])
1617 if unlocked_node_uuids:
1618 raise errors.OpPrereqError(
1619 "Missing lock for nodes: %s" %
1620 utils.CommaJoin(self.cfg.GetNodeNames(unlocked_node_uuids)),
1623 if unlocked_inst_uuids:
1624 raise errors.OpPrereqError(
1625 "Missing lock for instances: %s" %
1626 utils.CommaJoin(self.cfg.GetInstanceNames(unlocked_inst_uuids)),
1629 self.all_node_info = self.cfg.GetAllNodesInfo()
1630 self.all_inst_info = self.cfg.GetAllInstancesInfo()
1632 self.my_node_uuids = group_node_uuids
1633 self.my_node_info = dict((node_uuid, self.all_node_info[node_uuid])
1634 for node_uuid in group_node_uuids)
1636 self.my_inst_uuids = group_inst_uuids
1637 self.my_inst_info = dict((inst_uuid, self.all_inst_info[inst_uuid])
1638 for inst_uuid in group_inst_uuids)
1640 # We detect here the nodes that will need the extra RPC calls for verifying
1641 # split LV volumes; they should be locked.
1642 extra_lv_nodes = set()
1644 for inst in self.my_inst_info.values():
1645 if inst.disk_template in constants.DTS_INT_MIRROR:
1646 for nuuid in inst.all_nodes:
1647 if self.all_node_info[nuuid].group != self.group_uuid:
1648 extra_lv_nodes.add(nuuid)
1650 unlocked_lv_nodes = \
1651 extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1653 if unlocked_lv_nodes:
1654 raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1655 utils.CommaJoin(unlocked_lv_nodes),
1657 self.extra_lv_nodes = list(extra_lv_nodes)
1659 def _VerifyNode(self, ninfo, nresult):
1660 """Perform some basic validation on data returned from a node.
1662 - check the result data structure is well formed and has all the
1664 - check ganeti version
1666 @type ninfo: L{objects.Node}
1667 @param ninfo: the node to check
1668 @param nresult: the results from the node
1670 @return: whether overall this call was successful (and we can expect
1671 reasonable values in the respose)
1674 # main result, nresult should be a non-empty dict
1675 test = not nresult or not isinstance(nresult, dict)
1676 self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1677 "unable to verify node: no data returned")
1681 # compares ganeti version
1682 local_version = constants.PROTOCOL_VERSION
1683 remote_version = nresult.get("version", None)
1684 test = not (remote_version and
1685 isinstance(remote_version, (list, tuple)) and
1686 len(remote_version) == 2)
1687 self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1688 "connection to node returned invalid data")
1692 test = local_version != remote_version[0]
1693 self._ErrorIf(test, constants.CV_ENODEVERSION, ninfo.name,
1694 "incompatible protocol versions: master %s,"
1695 " node %s", local_version, remote_version[0])
1699 # node seems compatible, we can actually try to look into its results
1701 # full package version
1702 self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1703 constants.CV_ENODEVERSION, ninfo.name,
1704 "software version mismatch: master %s, node %s",
1705 constants.RELEASE_VERSION, remote_version[1],
1706 code=self.ETYPE_WARNING)
1708 hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1709 if ninfo.vm_capable and isinstance(hyp_result, dict):
1710 for hv_name, hv_result in hyp_result.iteritems():
1711 test = hv_result is not None
1712 self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
1713 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1715 hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1716 if ninfo.vm_capable and isinstance(hvp_result, list):
1717 for item, hv_name, hv_result in hvp_result:
1718 self._ErrorIf(True, constants.CV_ENODEHV, ninfo.name,
1719 "hypervisor %s parameter verify failure (source %s): %s",
1720 hv_name, item, hv_result)
1722 test = nresult.get(constants.NV_NODESETUP,
1723 ["Missing NODESETUP results"])
1724 self._ErrorIf(test, constants.CV_ENODESETUP, ninfo.name,
1725 "node setup error: %s", "; ".join(test))
1729 def _VerifyNodeTime(self, ninfo, nresult,
1730 nvinfo_starttime, nvinfo_endtime):
1731 """Check the node time.
1733 @type ninfo: L{objects.Node}
1734 @param ninfo: the node to check
1735 @param nresult: the remote results for the node
1736 @param nvinfo_starttime: the start time of the RPC call
1737 @param nvinfo_endtime: the end time of the RPC call
1740 ntime = nresult.get(constants.NV_TIME, None)
1742 ntime_merged = utils.MergeTime(ntime)
1743 except (ValueError, TypeError):
1744 self._ErrorIf(True, constants.CV_ENODETIME, ninfo.name,
1745 "Node returned invalid time")
1748 if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1749 ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1750 elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1751 ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1755 self._ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, ninfo.name,
1756 "Node time diverges by at least %s from master node time",
1759 def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1760 """Check the node LVM results and update info for cross-node checks.
1762 @type ninfo: L{objects.Node}
1763 @param ninfo: the node to check
1764 @param nresult: the remote results for the node
1765 @param vg_name: the configured VG name
1766 @type nimg: L{NodeImage}
1767 @param nimg: node image
1773 # checks vg existence and size > 20G
1774 vglist = nresult.get(constants.NV_VGLIST, None)
1776 self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
1777 "unable to check volume groups")
1779 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1780 constants.MIN_VG_SIZE)
1781 self._ErrorIf(vgstatus, constants.CV_ENODELVM, ninfo.name, vgstatus)
1784 (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1786 self._Error(constants.CV_ENODELVM, ninfo.name, em)
1787 if pvminmax is not None:
1788 (nimg.pv_min, nimg.pv_max) = pvminmax
1790 def _VerifyGroupDRBDVersion(self, node_verify_infos):
1791 """Check cross-node DRBD version consistency.
1793 @type node_verify_infos: dict
1794 @param node_verify_infos: infos about nodes as returned from the
1799 for node_uuid, ndata in node_verify_infos.items():
1800 nresult = ndata.payload
1801 version = nresult.get(constants.NV_DRBDVERSION, "Missing DRBD version")
1802 node_versions[node_uuid] = version
1804 if len(set(node_versions.values())) > 1:
1805 for node_uuid, version in sorted(node_versions.items()):
1806 msg = "DRBD version mismatch: %s" % version
1807 self._Error(constants.CV_ENODEDRBDHELPER, node_uuid, msg,
1808 code=self.ETYPE_WARNING)
1810 def _VerifyGroupLVM(self, node_image, vg_name):
1811 """Check cross-node consistency in LVM.
1813 @type node_image: dict
1814 @param node_image: info about nodes, mapping from node to names to
1815 L{NodeImage} objects
1816 @param vg_name: the configured VG name
1822 # Only exclusive storage needs this kind of checks
1823 if not self._exclusive_storage:
1826 # exclusive_storage wants all PVs to have the same size (approximately),
1827 # if the smallest and the biggest ones are okay, everything is fine.
1828 # pv_min is None iff pv_max is None
1829 vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1832 (pvmin, minnode_uuid) = min((ni.pv_min, ni.uuid) for ni in vals)
1833 (pvmax, maxnode_uuid) = max((ni.pv_max, ni.uuid) for ni in vals)
1834 bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1835 self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1836 "PV sizes differ too much in the group; smallest (%s MB) is"
1837 " on %s, biggest (%s MB) is on %s",
1838 pvmin, self.cfg.GetNodeName(minnode_uuid),
1839 pvmax, self.cfg.GetNodeName(maxnode_uuid))
1841 def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1842 """Check the node bridges.
1844 @type ninfo: L{objects.Node}
1845 @param ninfo: the node to check
1846 @param nresult: the remote results for the node
1847 @param bridges: the expected list of bridges
1853 missing = nresult.get(constants.NV_BRIDGES, None)
1854 test = not isinstance(missing, list)
1855 self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1856 "did not return valid bridge information")
1858 self._ErrorIf(bool(missing), constants.CV_ENODENET, ninfo.name,
1859 "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1861 def _VerifyNodeUserScripts(self, ninfo, nresult):
1862 """Check the results of user scripts presence and executability on the node
1864 @type ninfo: L{objects.Node}
1865 @param ninfo: the node to check
1866 @param nresult: the remote results for the node
1869 test = not constants.NV_USERSCRIPTS in nresult
1870 self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1871 "did not return user scripts information")
1873 broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
1875 self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1876 "user scripts not present or not executable: %s" %
1877 utils.CommaJoin(sorted(broken_scripts)))
1879 def _VerifyNodeNetwork(self, ninfo, nresult):
1880 """Check the node network connectivity results.
1882 @type ninfo: L{objects.Node}
1883 @param ninfo: the node to check
1884 @param nresult: the remote results for the node
1887 test = constants.NV_NODELIST not in nresult
1888 self._ErrorIf(test, constants.CV_ENODESSH, ninfo.name,
1889 "node hasn't returned node ssh connectivity data")
1891 if nresult[constants.NV_NODELIST]:
1892 for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1893 self._ErrorIf(True, constants.CV_ENODESSH, ninfo.name,
1894 "ssh communication with node '%s': %s", a_node, a_msg)
1896 test = constants.NV_NODENETTEST not in nresult
1897 self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1898 "node hasn't returned node tcp connectivity data")
1900 if nresult[constants.NV_NODENETTEST]:
1901 nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1903 self._ErrorIf(True, constants.CV_ENODENET, ninfo.name,
1904 "tcp communication with node '%s': %s",
1905 anode, nresult[constants.NV_NODENETTEST][anode])
1907 test = constants.NV_MASTERIP not in nresult
1908 self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1909 "node hasn't returned node master IP reachability data")
1911 if not nresult[constants.NV_MASTERIP]:
1912 if ninfo.uuid == self.master_node:
1913 msg = "the master node cannot reach the master IP (not configured?)"
1915 msg = "cannot reach the master IP"
1916 self._ErrorIf(True, constants.CV_ENODENET, ninfo.name, msg)
1918 def _VerifyInstance(self, instance, node_image, diskstatus):
1919 """Verify an instance.
1921 This function checks to see if the required block devices are
1922 available on the instance's node, and that the nodes are in the correct
1926 pnode_uuid = instance.primary_node
1927 pnode_img = node_image[pnode_uuid]
1928 groupinfo = self.cfg.GetAllNodeGroupsInfo()
1930 node_vol_should = {}
1931 instance.MapLVsByNode(node_vol_should)
1933 cluster = self.cfg.GetClusterInfo()
1934 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
1936 err = ComputeIPolicyInstanceViolation(ipolicy, instance, self.cfg)
1937 self._ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance.name,
1938 utils.CommaJoin(err), code=self.ETYPE_WARNING)
1940 for node_uuid in node_vol_should:
1941 n_img = node_image[node_uuid]
1942 if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1943 # ignore missing volumes on offline or broken nodes
1945 for volume in node_vol_should[node_uuid]:
1946 test = volume not in n_img.volumes
1947 self._ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance.name,
1948 "volume %s missing on node %s", volume,
1949 self.cfg.GetNodeName(node_uuid))
1951 if instance.admin_state == constants.ADMINST_UP:
1952 test = instance.uuid not in pnode_img.instances and not pnode_img.offline
1953 self._ErrorIf(test, constants.CV_EINSTANCEDOWN, instance.name,
1954 "instance not running on its primary node %s",
1955 self.cfg.GetNodeName(pnode_uuid))
1956 self._ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE,
1957 instance.name, "instance is marked as running and lives on"
1958 " offline node %s", self.cfg.GetNodeName(pnode_uuid))
1960 diskdata = [(nname, success, status, idx)
1961 for (nname, disks) in diskstatus.items()
1962 for idx, (success, status) in enumerate(disks)]
1964 for nname, success, bdev_status, idx in diskdata:
1965 # the 'ghost node' construction in Exec() ensures that we have a
1967 snode = node_image[nname]
1968 bad_snode = snode.ghost or snode.offline
1969 self._ErrorIf(instance.disks_active and
1970 not success and not bad_snode,
1971 constants.CV_EINSTANCEFAULTYDISK, instance.name,
1972 "couldn't retrieve status for disk/%s on %s: %s",
1973 idx, self.cfg.GetNodeName(nname), bdev_status)
1975 if instance.disks_active and success and \
1976 (bdev_status.is_degraded or
1977 bdev_status.ldisk_status != constants.LDS_OKAY):
1978 msg = "disk/%s on %s" % (idx, self.cfg.GetNodeName(nname))
1979 if bdev_status.is_degraded:
1980 msg += " is degraded"
1981 if bdev_status.ldisk_status != constants.LDS_OKAY:
1982 msg += "; state is '%s'" % \
1983 constants.LDS_NAMES[bdev_status.ldisk_status]
1985 self._Error(constants.CV_EINSTANCEFAULTYDISK, instance.name, msg)
1987 self._ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
1988 constants.CV_ENODERPC, self.cfg.GetNodeName(pnode_uuid),
1989 "instance %s, connection to primary node failed",
1992 self._ErrorIf(len(instance.secondary_nodes) > 1,
1993 constants.CV_EINSTANCELAYOUT, instance.name,
1994 "instance has multiple secondary nodes: %s",
1995 utils.CommaJoin(instance.secondary_nodes),
1996 code=self.ETYPE_WARNING)
1998 es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, instance.all_nodes)
1999 if any(es_flags.values()):
2000 if instance.disk_template not in constants.DTS_EXCL_STORAGE:
2001 # Disk template not compatible with exclusive_storage: no instance
2002 # node should have the flag set
2004 for (n, es) in es_flags.items()
2006 self._Error(constants.CV_EINSTANCEUNSUITABLENODE, instance.name,
2007 "instance has template %s, which is not supported on nodes"
2008 " that have exclusive storage set: %s",
2009 instance.disk_template,
2010 utils.CommaJoin(self.cfg.GetNodeNames(es_nodes)))
2011 for (idx, disk) in enumerate(instance.disks):
2012 self._ErrorIf(disk.spindles is None,
2013 constants.CV_EINSTANCEMISSINGCFGPARAMETER, instance.name,
2014 "number of spindles not configured for disk %s while"
2015 " exclusive storage is enabled, try running"
2016 " gnt-cluster repair-disk-sizes", idx)
2018 if instance.disk_template in constants.DTS_INT_MIRROR:
2019 instance_nodes = utils.NiceSort(instance.all_nodes)
2020 instance_groups = {}
2022 for node_uuid in instance_nodes:
2023 instance_groups.setdefault(self.all_node_info[node_uuid].group,
2024 []).append(node_uuid)
2027 "%s (group %s)" % (utils.CommaJoin(self.cfg.GetNodeNames(nodes)),
2028 groupinfo[group].name)
2029 # Sort so that we always list the primary node first.
2030 for group, nodes in sorted(instance_groups.items(),
2031 key=lambda (_, nodes): pnode_uuid in nodes,
2034 self._ErrorIf(len(instance_groups) > 1,
2035 constants.CV_EINSTANCESPLITGROUPS,
2036 instance.name, "instance has primary and secondary nodes in"
2037 " different groups: %s", utils.CommaJoin(pretty_list),
2038 code=self.ETYPE_WARNING)
2040 inst_nodes_offline = []
2041 for snode in instance.secondary_nodes:
2042 s_img = node_image[snode]
2043 self._ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
2044 self.cfg.GetNodeName(snode),
2045 "instance %s, connection to secondary node failed",
2049 inst_nodes_offline.append(snode)
2051 # warn that the instance lives on offline nodes
2052 self._ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE,
2053 instance.name, "instance has offline secondary node(s) %s",
2054 utils.CommaJoin(self.cfg.GetNodeNames(inst_nodes_offline)))
2055 # ... or ghost/non-vm_capable nodes
2056 for node_uuid in instance.all_nodes:
2057 self._ErrorIf(node_image[node_uuid].ghost, constants.CV_EINSTANCEBADNODE,
2058 instance.name, "instance lives on ghost node %s",
2059 self.cfg.GetNodeName(node_uuid))
2060 self._ErrorIf(not node_image[node_uuid].vm_capable,
2061 constants.CV_EINSTANCEBADNODE, instance.name,
2062 "instance lives on non-vm_capable node %s",
2063 self.cfg.GetNodeName(node_uuid))
2065 def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2066 """Verify if there are any unknown volumes in the cluster.
2068 The .os, .swap and backup volumes are ignored. All other volumes are
2069 reported as unknown.
2071 @type reserved: L{ganeti.utils.FieldSet}
2072 @param reserved: a FieldSet of reserved volume names
2075 for node_uuid, n_img in node_image.items():
2076 if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
2077 self.all_node_info[node_uuid].group != self.group_uuid):
2078 # skip non-healthy nodes
2080 for volume in n_img.volumes:
2081 test = ((node_uuid not in node_vol_should or
2082 volume not in node_vol_should[node_uuid]) and
2083 not reserved.Matches(volume))
2084 self._ErrorIf(test, constants.CV_ENODEORPHANLV,
2085 self.cfg.GetNodeName(node_uuid),
2086 "volume %s is unknown", volume)
2088 def _VerifyNPlusOneMemory(self, node_image, all_insts):
2089 """Verify N+1 Memory Resilience.
2091 Check that if one single node dies we can still start all the
2092 instances it was primary for.
2095 cluster_info = self.cfg.GetClusterInfo()
2096 for node_uuid, n_img in node_image.items():
2097 # This code checks that every node which is now listed as
2098 # secondary has enough memory to host all instances it is
2099 # supposed to should a single other node in the cluster fail.
2100 # FIXME: not ready for failover to an arbitrary node
2101 # FIXME: does not support file-backed instances
2102 # WARNING: we currently take into account down instances as well
2103 # as up ones, considering that even if they're down someone
2104 # might want to start them even in the event of a node failure.
2105 if n_img.offline or \
2106 self.all_node_info[node_uuid].group != self.group_uuid:
2107 # we're skipping nodes marked offline and nodes in other groups from
2108 # the N+1 warning, since most likely we don't have good memory
2109 # information from them; we already list instances living on such
2110 # nodes, and that's enough warning
2112 #TODO(dynmem): also consider ballooning out other instances
2113 for prinode, inst_uuids in n_img.sbp.items():
2115 for inst_uuid in inst_uuids:
2116 bep = cluster_info.FillBE(all_insts[inst_uuid])
2117 if bep[constants.BE_AUTO_BALANCE]:
2118 needed_mem += bep[constants.BE_MINMEM]
2119 test = n_img.mfree < needed_mem
2120 self._ErrorIf(test, constants.CV_ENODEN1,
2121 self.cfg.GetNodeName(node_uuid),
2122 "not enough memory to accomodate instance failovers"
2123 " should node %s fail (%dMiB needed, %dMiB available)",
2124 self.cfg.GetNodeName(prinode), needed_mem, n_img.mfree)
2126 def _VerifyFiles(self, nodes, master_node_uuid, all_nvinfo,
2127 (files_all, files_opt, files_mc, files_vm)):
2128 """Verifies file checksums collected from all nodes.
2130 @param nodes: List of L{objects.Node} objects
2131 @param master_node_uuid: UUID of master node
2132 @param all_nvinfo: RPC results
2135 # Define functions determining which nodes to consider for a file
2138 (files_mc, lambda node: (node.master_candidate or
2139 node.uuid == master_node_uuid)),
2140 (files_vm, lambda node: node.vm_capable),
2143 # Build mapping from filename to list of nodes which should have the file
2145 for (files, fn) in files2nodefn:
2149 filenodes = filter(fn, nodes)
2150 nodefiles.update((filename,
2151 frozenset(map(operator.attrgetter("uuid"), filenodes)))
2152 for filename in files)
2154 assert set(nodefiles) == (files_all | files_mc | files_vm)
2156 fileinfo = dict((filename, {}) for filename in nodefiles)
2157 ignore_nodes = set()
2161 ignore_nodes.add(node.uuid)
2164 nresult = all_nvinfo[node.uuid]
2166 if nresult.fail_msg or not nresult.payload:
2169 fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
2170 node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2171 for (key, value) in fingerprints.items())
2174 test = not (node_files and isinstance(node_files, dict))
2175 self._ErrorIf(test, constants.CV_ENODEFILECHECK, node.name,
2176 "Node did not return file checksum data")
2178 ignore_nodes.add(node.uuid)
2181 # Build per-checksum mapping from filename to nodes having it
2182 for (filename, checksum) in node_files.items():
2183 assert filename in nodefiles
2184 fileinfo[filename].setdefault(checksum, set()).add(node.uuid)
2186 for (filename, checksums) in fileinfo.items():
2187 assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2189 # Nodes having the file
2190 with_file = frozenset(node_uuid
2191 for node_uuids in fileinfo[filename].values()
2192 for node_uuid in node_uuids) - ignore_nodes
2194 expected_nodes = nodefiles[filename] - ignore_nodes
2196 # Nodes missing file
2197 missing_file = expected_nodes - with_file
2199 if filename in files_opt:
2201 self._ErrorIf(missing_file and missing_file != expected_nodes,
2202 constants.CV_ECLUSTERFILECHECK, None,
2203 "File %s is optional, but it must exist on all or no"
2204 " nodes (not found on %s)",
2208 map(self.cfg.GetNodeName, missing_file))))
2210 self._ErrorIf(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2211 "File %s is missing from node(s) %s", filename,
2214 map(self.cfg.GetNodeName, missing_file))))
2216 # Warn if a node has a file it shouldn't
2217 unexpected = with_file - expected_nodes
2218 self._ErrorIf(unexpected,
2219 constants.CV_ECLUSTERFILECHECK, None,
2220 "File %s should not exist on node(s) %s",
2221 filename, utils.CommaJoin(
2222 utils.NiceSort(map(self.cfg.GetNodeName, unexpected))))
2224 # See if there are multiple versions of the file
2225 test = len(checksums) > 1
2227 variants = ["variant %s on %s" %
2229 utils.CommaJoin(utils.NiceSort(
2230 map(self.cfg.GetNodeName, node_uuids))))
2231 for (idx, (checksum, node_uuids)) in
2232 enumerate(sorted(checksums.items()))]
2236 self._ErrorIf(test, constants.CV_ECLUSTERFILECHECK, None,
2237 "File %s found with %s different checksums (%s)",
2238 filename, len(checksums), "; ".join(variants))
2240 def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2242 """Verifies and the node DRBD status.
2244 @type ninfo: L{objects.Node}
2245 @param ninfo: the node to check
2246 @param nresult: the remote results for the node
2247 @param instanceinfo: the dict of instances
2248 @param drbd_helper: the configured DRBD usermode helper
2249 @param drbd_map: the DRBD map as returned by
2250 L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2254 helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2255 test = (helper_result is None)
2256 self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2257 "no drbd usermode helper returned")
2259 status, payload = helper_result
2261 self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2262 "drbd usermode helper check unsuccessful: %s", payload)
2263 test = status and (payload != drbd_helper)
2264 self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2265 "wrong drbd usermode helper: %s", payload)
2267 # compute the DRBD minors
2269 for minor, inst_uuid in drbd_map[ninfo.uuid].items():
2270 test = inst_uuid not in instanceinfo
2271 self._ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2272 "ghost instance '%s' in temporary DRBD map", inst_uuid)
2273 # ghost instance should not be running, but otherwise we
2274 # don't give double warnings (both ghost instance and
2275 # unallocated minor in use)
2277 node_drbd[minor] = (inst_uuid, False)
2279 instance = instanceinfo[inst_uuid]
2280 node_drbd[minor] = (inst_uuid, instance.disks_active)
2282 # and now check them
2283 used_minors = nresult.get(constants.NV_DRBDLIST, [])
2284 test = not isinstance(used_minors, (tuple, list))
2285 self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2286 "cannot parse drbd status file: %s", str(used_minors))
2288 # we cannot check drbd status
2291 for minor, (inst_uuid, must_exist) in node_drbd.items():
2292 test = minor not in used_minors and must_exist
2293 self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2294 "drbd minor %d of instance %s is not active", minor,
2295 self.cfg.GetInstanceName(inst_uuid))
2296 for minor in used_minors:
2297 test = minor not in node_drbd
2298 self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2299 "unallocated drbd minor %d is in use", minor)
2301 def _UpdateNodeOS(self, ninfo, nresult, nimg):
2302 """Builds the node OS structures.
2304 @type ninfo: L{objects.Node}
2305 @param ninfo: the node to check
2306 @param nresult: the remote results for the node
2307 @param nimg: the node image object
2310 remote_os = nresult.get(constants.NV_OSLIST, None)
2311 test = (not isinstance(remote_os, list) or
2312 not compat.all(isinstance(v, list) and len(v) == 7
2313 for v in remote_os))
2315 self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2316 "node hasn't returned valid OS data")
2325 for (name, os_path, status, diagnose,
2326 variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2328 if name not in os_dict:
2331 # parameters is a list of lists instead of list of tuples due to
2332 # JSON lacking a real tuple type, fix it:
2333 parameters = [tuple(v) for v in parameters]
2334 os_dict[name].append((os_path, status, diagnose,
2335 set(variants), set(parameters), set(api_ver)))
2337 nimg.oslist = os_dict
2339 def _VerifyNodeOS(self, ninfo, nimg, base):
2340 """Verifies the node OS list.
2342 @type ninfo: L{objects.Node}
2343 @param ninfo: the node to check
2344 @param nimg: the node image object
2345 @param base: the 'template' node we match against (e.g. from the master)
2348 assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2350 beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2351 for os_name, os_data in nimg.oslist.items():
2352 assert os_data, "Empty OS status for OS %s?!" % os_name
2353 f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2354 self._ErrorIf(not f_status, constants.CV_ENODEOS, ninfo.name,
2355 "Invalid OS %s (located at %s): %s",
2356 os_name, f_path, f_diag)
2357 self._ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, ninfo.name,
2358 "OS '%s' has multiple entries"
2359 " (first one shadows the rest): %s",
2360 os_name, utils.CommaJoin([v[0] for v in os_data]))
2361 # comparisons with the 'base' image
2362 test = os_name not in base.oslist
2363 self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2364 "Extra OS %s not present on reference node (%s)",
2365 os_name, self.cfg.GetNodeName(base.uuid))
2368 assert base.oslist[os_name], "Base node has empty OS status?"
2369 _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2371 # base OS is invalid, skipping
2373 for kind, a, b in [("API version", f_api, b_api),
2374 ("variants list", f_var, b_var),
2375 ("parameters", beautify_params(f_param),
2376 beautify_params(b_param))]:
2377 self._ErrorIf(a != b, constants.CV_ENODEOS, ninfo.name,
2378 "OS %s for %s differs from reference node %s:"
2379 " [%s] vs. [%s]", kind, os_name,
2380 self.cfg.GetNodeName(base.uuid),
2381 utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2383 # check any missing OSes
2384 missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2385 self._ErrorIf(missing, constants.CV_ENODEOS, ninfo.name,
2386 "OSes present on reference node %s"
2387 " but missing on this node: %s",
2388 self.cfg.GetNodeName(base.uuid), utils.CommaJoin(missing))
2390 def _VerifyAcceptedFileStoragePaths(self, ninfo, nresult, is_master):
2391 """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2393 @type ninfo: L{objects.Node}
2394 @param ninfo: the node to check
2395 @param nresult: the remote results for the node
2396 @type is_master: bool
2397 @param is_master: Whether node is the master node
2400 cluster = self.cfg.GetClusterInfo()
2402 (cluster.IsFileStorageEnabled() or
2403 cluster.IsSharedFileStorageEnabled())):
2405 fspaths = nresult[constants.NV_ACCEPTED_STORAGE_PATHS]
2407 # This should never happen
2408 self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2409 "Node did not return forbidden file storage paths")
2411 self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2412 "Found forbidden file storage paths: %s",
2413 utils.CommaJoin(fspaths))
2415 self._ErrorIf(constants.NV_ACCEPTED_STORAGE_PATHS in nresult,
2416 constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2417 "Node should not have returned forbidden file storage"
2420 def _VerifyStoragePaths(self, ninfo, nresult, file_disk_template,
2421 verify_key, error_key):
2422 """Verifies (file) storage paths.
2424 @type ninfo: L{objects.Node}
2425 @param ninfo: the node to check
2426 @param nresult: the remote results for the node
2427 @type file_disk_template: string
2428 @param file_disk_template: file-based disk template, whose directory
2429 is supposed to be verified
2430 @type verify_key: string
2431 @param verify_key: key for the verification map of this file
2433 @param error_key: error key to be added to the verification results
2434 in case something goes wrong in this verification step
2437 assert (file_disk_template in
2438 utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
2439 cluster = self.cfg.GetClusterInfo()
2440 if cluster.IsDiskTemplateEnabled(file_disk_template):
2442 verify_key in nresult,
2443 error_key, ninfo.name,
2444 "The configured %s storage path is unusable: %s" %
2445 (file_disk_template, nresult.get(verify_key)))
2447 def _VerifyFileStoragePaths(self, ninfo, nresult):
2448 """Verifies (file) storage paths.
2450 @see: C{_VerifyStoragePaths}
2453 self._VerifyStoragePaths(
2454 ninfo, nresult, constants.DT_FILE,
2455 constants.NV_FILE_STORAGE_PATH,
2456 constants.CV_ENODEFILESTORAGEPATHUNUSABLE)
2458 def _VerifySharedFileStoragePaths(self, ninfo, nresult):
2459 """Verifies (file) storage paths.
2461 @see: C{_VerifyStoragePaths}
2464 self._VerifyStoragePaths(
2465 ninfo, nresult, constants.DT_SHARED_FILE,
2466 constants.NV_SHARED_FILE_STORAGE_PATH,
2467 constants.CV_ENODESHAREDFILESTORAGEPATHUNUSABLE)
2469 def _VerifyOob(self, ninfo, nresult):
2470 """Verifies out of band functionality of a node.
2472 @type ninfo: L{objects.Node}
2473 @param ninfo: the node to check
2474 @param nresult: the remote results for the node
2477 # We just have to verify the paths on master and/or master candidates
2478 # as the oob helper is invoked on the master
2479 if ((ninfo.master_candidate or ninfo.master_capable) and
2480 constants.NV_OOB_PATHS in nresult):
2481 for path_result in nresult[constants.NV_OOB_PATHS]:
2482 self._ErrorIf(path_result, constants.CV_ENODEOOBPATH,
2483 ninfo.name, path_result)
2485 def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2486 """Verifies and updates the node volume data.
2488 This function will update a L{NodeImage}'s internal structures
2489 with data from the remote call.
2491 @type ninfo: L{objects.Node}
2492 @param ninfo: the node to check
2493 @param nresult: the remote results for the node
2494 @param nimg: the node image object
2495 @param vg_name: the configured VG name
2498 nimg.lvm_fail = True
2499 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2502 elif isinstance(lvdata, basestring):
2503 self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2504 "LVM problem on node: %s", utils.SafeEncode(lvdata))
2505 elif not isinstance(lvdata, dict):
2506 self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2507 "rpc call to node failed (lvlist)")
2509 nimg.volumes = lvdata
2510 nimg.lvm_fail = False
2512 def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2513 """Verifies and updates the node instance list.
2515 If the listing was successful, then updates this node's instance
2516 list. Otherwise, it marks the RPC call as failed for the instance
2519 @type ninfo: L{objects.Node}
2520 @param ninfo: the node to check
2521 @param nresult: the remote results for the node
2522 @param nimg: the node image object
2525 idata = nresult.get(constants.NV_INSTANCELIST, None)
2526 test = not isinstance(idata, list)
2527 self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2528 "rpc call to node failed (instancelist): %s",
2529 utils.SafeEncode(str(idata)))
2531 nimg.hyp_fail = True
2533 nimg.instances = [inst.uuid for (_, inst) in
2534 self.cfg.GetMultiInstanceInfoByName(idata)]
2536 def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2537 """Verifies and computes a node information map
2539 @type ninfo: L{objects.Node}
2540 @param ninfo: the node to check
2541 @param nresult: the remote results for the node
2542 @param nimg: the node image object
2543 @param vg_name: the configured VG name
2546 # try to read free memory (from the hypervisor)
2547 hv_info = nresult.get(constants.NV_HVINFO, None)
2548 test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2549 self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2550 "rpc call to node failed (hvinfo)")
2553 nimg.mfree = int(hv_info["memory_free"])
2554 except (ValueError, TypeError):
2555 self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2556 "node returned invalid nodeinfo, check hypervisor")
2558 # FIXME: devise a free space model for file based instances as well
2559 if vg_name is not None:
2560 test = (constants.NV_VGLIST not in nresult or
2561 vg_name not in nresult[constants.NV_VGLIST])
2562 self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
2563 "node didn't return data for the volume group '%s'"
2564 " - it is either missing or broken", vg_name)
2567 nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2568 except (ValueError, TypeError):
2569 self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2570 "node returned invalid LVM info, check LVM status")
2572 def _CollectDiskInfo(self, node_uuids, node_image, instanceinfo):
2573 """Gets per-disk status information for all instances.
2575 @type node_uuids: list of strings
2576 @param node_uuids: Node UUIDs
2577 @type node_image: dict of (UUID, L{objects.Node})
2578 @param node_image: Node objects
2579 @type instanceinfo: dict of (UUID, L{objects.Instance})
2580 @param instanceinfo: Instance objects
2581 @rtype: {instance: {node: [(succes, payload)]}}
2582 @return: a dictionary of per-instance dictionaries with nodes as
2583 keys and disk information as values; the disk information is a
2584 list of tuples (success, payload)
2588 node_disks_devonly = {}
2589 diskless_instances = set()
2590 diskless = constants.DT_DISKLESS
2592 for nuuid in node_uuids:
2593 node_inst_uuids = list(itertools.chain(node_image[nuuid].pinst,
2594 node_image[nuuid].sinst))
2595 diskless_instances.update(uuid for uuid in node_inst_uuids
2596 if instanceinfo[uuid].disk_template == diskless)
2597 disks = [(inst_uuid, disk)
2598 for inst_uuid in node_inst_uuids
2599 for disk in instanceinfo[inst_uuid].disks]
2602 # No need to collect data
2605 node_disks[nuuid] = disks
2607 # _AnnotateDiskParams makes already copies of the disks
2609 for (inst_uuid, dev) in disks:
2610 (anno_disk,) = AnnotateDiskParams(instanceinfo[inst_uuid], [dev],
2612 self.cfg.SetDiskID(anno_disk, nuuid)
2613 devonly.append(anno_disk)
2615 node_disks_devonly[nuuid] = devonly
2617 assert len(node_disks) == len(node_disks_devonly)
2619 # Collect data from all nodes with disks
2620 result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2623 assert len(result) == len(node_disks)
2627 for (nuuid, nres) in result.items():
2628 node = self.cfg.GetNodeInfo(nuuid)
2629 disks = node_disks[node.uuid]
2632 # No data from this node
2633 data = len(disks) * [(False, "node offline")]
2636 self._ErrorIf(msg, constants.CV_ENODERPC, node.name,
2637 "while getting disk information: %s", msg)
2639 # No data from this node
2640 data = len(disks) * [(False, msg)]
2643 for idx, i in enumerate(nres.payload):
2644 if isinstance(i, (tuple, list)) and len(i) == 2:
2647 logging.warning("Invalid result from node %s, entry %d: %s",
2649 data.append((False, "Invalid result from the remote node"))
2651 for ((inst_uuid, _), status) in zip(disks, data):
2652 instdisk.setdefault(inst_uuid, {}).setdefault(node.uuid, []) \
2655 # Add empty entries for diskless instances.
2656 for inst_uuid in diskless_instances:
2657 assert inst_uuid not in instdisk
2658 instdisk[inst_uuid] = {}
2660 assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2661 len(nuuids) <= len(instanceinfo[inst].all_nodes) and
2662 compat.all(isinstance(s, (tuple, list)) and
2663 len(s) == 2 for s in statuses)
2664 for inst, nuuids in instdisk.items()
2665 for nuuid, statuses in nuuids.items())
2667 instdisk_keys = set(instdisk)
2668 instanceinfo_keys = set(instanceinfo)
2669 assert instdisk_keys == instanceinfo_keys, \
2670 ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2671 (instdisk_keys, instanceinfo_keys))
2676 def _SshNodeSelector(group_uuid, all_nodes):
2677 """Create endless iterators for all potential SSH check hosts.
2680 nodes = [node for node in all_nodes
2681 if (node.group != group_uuid and
2683 keyfunc = operator.attrgetter("group")
2685 return map(itertools.cycle,
2686 [sorted(map(operator.attrgetter("name"), names))
2687 for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2691 def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2692 """Choose which nodes should talk to which other nodes.
2694 We will make nodes contact all nodes in their group, and one node from
2697 @warning: This algorithm has a known issue if one node group is much
2698 smaller than others (e.g. just one node). In such a case all other
2699 nodes will talk to the single node.
2702 online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2703 sel = cls._SshNodeSelector(group_uuid, all_nodes)
2705 return (online_nodes,
2706 dict((name, sorted([i.next() for i in sel]))
2707 for name in online_nodes))
2709 def BuildHooksEnv(self):
2712 Cluster-Verify hooks just ran in the post phase and their failure makes
2713 the output be logged in the verify output and the verification to fail.
2717 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2720 env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2721 for node in self.my_node_info.values())
2725 def BuildHooksNodes(self):
2726 """Build hooks nodes.
2729 return ([], list(self.my_node_info.keys()))
2731 def Exec(self, feedback_fn):
2732 """Verify integrity of the node group, performing various test on nodes.
2735 # This method has too many local variables. pylint: disable=R0914
2736 feedback_fn("* Verifying group '%s'" % self.group_info.name)
2738 if not self.my_node_uuids:
2740 feedback_fn("* Empty node group, skipping verification")
2744 verbose = self.op.verbose
2745 self._feedback_fn = feedback_fn
2747 vg_name = self.cfg.GetVGName()
2748 drbd_helper = self.cfg.GetDRBDHelper()
2749 cluster = self.cfg.GetClusterInfo()
2750 hypervisors = cluster.enabled_hypervisors
2751 node_data_list = self.my_node_info.values()
2753 i_non_redundant = [] # Non redundant instances
2754 i_non_a_balanced = [] # Non auto-balanced instances
2755 i_offline = 0 # Count of offline instances
2756 n_offline = 0 # Count of offline nodes
2757 n_drained = 0 # Count of nodes being drained
2758 node_vol_should = {}
2760 # FIXME: verify OS list
2763 filemap = ComputeAncillaryFiles(cluster, False)
2765 # do local checksums
2766 master_node_uuid = self.master_node = self.cfg.GetMasterNode()
2767 master_ip = self.cfg.GetMasterIP()
2769 feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_uuids))
2772 if self.cfg.GetUseExternalMipScript():
2773 user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2775 node_verify_param = {
2776 constants.NV_FILELIST:
2777 map(vcluster.MakeVirtualPath,
2778 utils.UniqueSequence(filename
2779 for files in filemap
2780 for filename in files)),
2781 constants.NV_NODELIST:
2782 self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2783 self.all_node_info.values()),
2784 constants.NV_HYPERVISOR: hypervisors,
2785 constants.NV_HVPARAMS:
2786 _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2787 constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2788 for node in node_data_list
2789 if not node.offline],
2790 constants.NV_INSTANCELIST: hypervisors,
2791 constants.NV_VERSION: None,
2792 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2793 constants.NV_NODESETUP: None,
2794 constants.NV_TIME: None,
2795 constants.NV_MASTERIP: (self.cfg.GetMasterNodeName(), master_ip),
2796 constants.NV_OSLIST: None,
2797 constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2798 constants.NV_USERSCRIPTS: user_scripts,
2801 if vg_name is not None:
2802 node_verify_param[constants.NV_VGLIST] = None
2803 node_verify_param[constants.NV_LVLIST] = vg_name
2804 node_verify_param[constants.NV_PVLIST] = [vg_name]
2807 node_verify_param[constants.NV_DRBDVERSION] = None
2808 node_verify_param[constants.NV_DRBDLIST] = None
2809 node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2811 if cluster.IsFileStorageEnabled() or \
2812 cluster.IsSharedFileStorageEnabled():
2813 # Load file storage paths only from master node
2814 node_verify_param[constants.NV_ACCEPTED_STORAGE_PATHS] = \
2815 self.cfg.GetMasterNodeName()
2816 if cluster.IsFileStorageEnabled():
2817 node_verify_param[constants.NV_FILE_STORAGE_PATH] = \
2818 cluster.file_storage_dir
2821 # FIXME: this needs to be changed per node-group, not cluster-wide
2823 default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2824 if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2825 bridges.add(default_nicpp[constants.NIC_LINK])
2826 for inst_uuid in self.my_inst_info.values():
2827 for nic in inst_uuid.nics:
2828 full_nic = cluster.SimpleFillNIC(nic.nicparams)
2829 if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2830 bridges.add(full_nic[constants.NIC_LINK])
2833 node_verify_param[constants.NV_BRIDGES] = list(bridges)
2835 # Build our expected cluster state
2836 node_image = dict((node.uuid, self.NodeImage(offline=node.offline,
2838 vm_capable=node.vm_capable))
2839 for node in node_data_list)
2843 for node in self.all_node_info.values():
2844 path = SupportsOob(self.cfg, node)
2845 if path and path not in oob_paths:
2846 oob_paths.append(path)
2849 node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2851 for inst_uuid in self.my_inst_uuids:
2852 instance = self.my_inst_info[inst_uuid]
2853 if instance.admin_state == constants.ADMINST_OFFLINE:
2856 for nuuid in instance.all_nodes:
2857 if nuuid not in node_image:
2858 gnode = self.NodeImage(uuid=nuuid)
2859 gnode.ghost = (nuuid not in self.all_node_info)
2860 node_image[nuuid] = gnode
2862 instance.MapLVsByNode(node_vol_should)
2864 pnode = instance.primary_node
2865 node_image[pnode].pinst.append(instance.uuid)
2867 for snode in instance.secondary_nodes:
2868 nimg = node_image[snode]
2869 nimg.sinst.append(instance.uuid)
2870 if pnode not in nimg.sbp:
2871 nimg.sbp[pnode] = []
2872 nimg.sbp[pnode].append(instance.uuid)
2874 es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
2875 self.my_node_info.keys())
2876 # The value of exclusive_storage should be the same across the group, so if
2877 # it's True for at least a node, we act as if it were set for all the nodes
2878 self._exclusive_storage = compat.any(es_flags.values())
2879 if self._exclusive_storage:
2880 node_verify_param[constants.NV_EXCLUSIVEPVS] = True
2882 # At this point, we have the in-memory data structures complete,
2883 # except for the runtime information, which we'll gather next
2885 # Due to the way our RPC system works, exact response times cannot be
2886 # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2887 # time before and after executing the request, we can at least have a time
2889 nvinfo_starttime = time.time()
2890 all_nvinfo = self.rpc.call_node_verify(self.my_node_uuids,
2892 self.cfg.GetClusterName(),
2893 self.cfg.GetClusterInfo().hvparams)
2894 nvinfo_endtime = time.time()
2896 if self.extra_lv_nodes and vg_name is not None:
2898 self.rpc.call_node_verify(self.extra_lv_nodes,
2899 {constants.NV_LVLIST: vg_name},
2900 self.cfg.GetClusterName(),
2901 self.cfg.GetClusterInfo().hvparams)
2903 extra_lv_nvinfo = {}
2905 all_drbd_map = self.cfg.ComputeDRBDMap()
2907 feedback_fn("* Gathering disk information (%s nodes)" %
2908 len(self.my_node_uuids))
2909 instdisk = self._CollectDiskInfo(self.my_node_info.keys(), node_image,
2912 feedback_fn("* Verifying configuration file consistency")
2914 # If not all nodes are being checked, we need to make sure the master node
2915 # and a non-checked vm_capable node are in the list.
2916 absent_node_uuids = set(self.all_node_info).difference(self.my_node_info)
2917 if absent_node_uuids:
2918 vf_nvinfo = all_nvinfo.copy()
2919 vf_node_info = list(self.my_node_info.values())
2920 additional_node_uuids = []
2921 if master_node_uuid not in self.my_node_info:
2922 additional_node_uuids.append(master_node_uuid)
2923 vf_node_info.append(self.all_node_info[master_node_uuid])
2924 # Add the first vm_capable node we find which is not included,
2925 # excluding the master node (which we already have)
2926 for node_uuid in absent_node_uuids:
2927 nodeinfo = self.all_node_info[node_uuid]
2928 if (nodeinfo.vm_capable and not nodeinfo.offline and
2929 node_uuid != master_node_uuid):
2930 additional_node_uuids.append(node_uuid)
2931 vf_node_info.append(self.all_node_info[node_uuid])
2933 key = constants.NV_FILELIST
2934 vf_nvinfo.update(self.rpc.call_node_verify(
2935 additional_node_uuids, {key: node_verify_param[key]},
2936 self.cfg.GetClusterName(), self.cfg.GetClusterInfo().hvparams))
2938 vf_nvinfo = all_nvinfo
2939 vf_node_info = self.my_node_info.values()
2941 self._VerifyFiles(vf_node_info, master_node_uuid, vf_nvinfo, filemap)
2943 feedback_fn("* Verifying node status")
2947 for node_i in node_data_list:
2948 nimg = node_image[node_i.uuid]
2952 feedback_fn("* Skipping offline node %s" % (node_i.name,))
2956 if node_i.uuid == master_node_uuid:
2958 elif node_i.master_candidate:
2959 ntype = "master candidate"
2960 elif node_i.drained:
2966 feedback_fn("* Verifying node %s (%s)" % (node_i.name, ntype))
2968 msg = all_nvinfo[node_i.uuid].fail_msg
2969 self._ErrorIf(msg, constants.CV_ENODERPC, node_i.name,
2970 "while contacting node: %s", msg)
2972 nimg.rpc_fail = True
2975 nresult = all_nvinfo[node_i.uuid].payload
2977 nimg.call_ok = self._VerifyNode(node_i, nresult)
2978 self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2979 self._VerifyNodeNetwork(node_i, nresult)
2980 self._VerifyNodeUserScripts(node_i, nresult)
2981 self._VerifyOob(node_i, nresult)
2982 self._VerifyAcceptedFileStoragePaths(node_i, nresult,
2983 node_i.uuid == master_node_uuid)
2984 self._VerifyFileStoragePaths(node_i, nresult)
2985 self._VerifySharedFileStoragePaths(node_i, nresult)
2988 self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
2989 self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
2992 self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2993 self._UpdateNodeInstances(node_i, nresult, nimg)
2994 self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2995 self._UpdateNodeOS(node_i, nresult, nimg)
2997 if not nimg.os_fail:
2998 if refos_img is None:
3000 self._VerifyNodeOS(node_i, nimg, refos_img)
3001 self._VerifyNodeBridges(node_i, nresult, bridges)
3003 # Check whether all running instances are primary for the node. (This
3004 # can no longer be done from _VerifyInstance below, since some of the
3005 # wrong instances could be from other node groups.)
3006 non_primary_inst_uuids = set(nimg.instances).difference(nimg.pinst)
3008 for inst_uuid in non_primary_inst_uuids:
3009 test = inst_uuid in self.all_inst_info
3010 self._ErrorIf(test, constants.CV_EINSTANCEWRONGNODE,
3011 self.cfg.GetInstanceName(inst_uuid),
3012 "instance should not run on node %s", node_i.name)
3013 self._ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
3014 "node is running unknown instance %s", inst_uuid)
3016 self._VerifyGroupDRBDVersion(all_nvinfo)
3017 self._VerifyGroupLVM(node_image, vg_name)
3019 for node_uuid, result in extra_lv_nvinfo.items():
3020 self._UpdateNodeVolumes(self.all_node_info[node_uuid], result.payload,
3021 node_image[node_uuid], vg_name)
3023 feedback_fn("* Verifying instance status")
3024 for inst_uuid in self.my_inst_uuids:
3025 instance = self.my_inst_info[inst_uuid]
3027 feedback_fn("* Verifying instance %s" % instance.name)
3028 self._VerifyInstance(instance, node_image, instdisk[inst_uuid])
3030 # If the instance is non-redundant we cannot survive losing its primary
3031 # node, so we are not N+1 compliant.
3032 if instance.disk_template not in constants.DTS_MIRRORED:
3033 i_non_redundant.append(instance)
3035 if not cluster.FillBE(instance)[constants.BE_AUTO_BALANCE]:
3036 i_non_a_balanced.append(instance)
3038 feedback_fn("* Verifying orphan volumes")
3039 reserved = utils.FieldSet(*cluster.reserved_lvs)
3041 # We will get spurious "unknown volume" warnings if any node of this group
3042 # is secondary for an instance whose primary is in another group. To avoid
3043 # them, we find these instances and add their volumes to node_vol_should.
3044 for instance in self.all_inst_info.values():
3045 for secondary in instance.secondary_nodes:
3046 if (secondary in self.my_node_info
3047 and instance.name not in self.my_inst_info):
3048 instance.MapLVsByNode(node_vol_should)
3051 self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
3053 if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
3054 feedback_fn("* Verifying N+1 Memory redundancy")
3055 self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
3057 feedback_fn("* Other Notes")
3059 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
3060 % len(i_non_redundant))
3062 if i_non_a_balanced:
3063 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
3064 % len(i_non_a_balanced))
3067 feedback_fn(" - NOTICE: %d offline instance(s) found." % i_offline)
3070 feedback_fn(" - NOTICE: %d offline node(s) found." % n_offline)
3073 feedback_fn(" - NOTICE: %d drained node(s) found." % n_drained)
3077 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
3078 """Analyze the post-hooks' result
3080 This method analyses the hook result, handles it, and sends some
3081 nicely-formatted feedback back to the user.
3083 @param phase: one of L{constants.HOOKS_PHASE_POST} or
3084 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
3085 @param hooks_results: the results of the multi-node hooks rpc call
3086 @param feedback_fn: function used send feedback back to the caller
3087 @param lu_result: previous Exec result
3088 @return: the new Exec result, based on the previous result
3092 # We only really run POST phase hooks, only for non-empty groups,
3093 # and are only interested in their results
3094 if not self.my_node_uuids:
3097 elif phase == constants.HOOKS_PHASE_POST:
3098 # Used to change hooks' output to proper indentation
3099 feedback_fn("* Hooks Results")
3100 assert hooks_results, "invalid result from hooks"
3102 for node_name in hooks_results:
3103 res = hooks_results[node_name]
3105 test = msg and not res.offline
3106 self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3107 "Communication failure in hooks execution: %s", msg)
3108 if res.offline or msg:
3109 # No need to investigate payload if node is offline or gave
3112 for script, hkr, output in res.payload:
3113 test = hkr == constants.HKR_FAIL
3114 self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3115 "Script %s failed, output:", script)
3117 output = self._HOOKS_INDENT_RE.sub(" ", output)
3118 feedback_fn("%s" % output)
3124 class LUClusterVerifyDisks(NoHooksLU):
3125 """Verifies the cluster disks status.
3130 def ExpandNames(self):
3131 self.share_locks = ShareAll()
3132 self.needed_locks = {
3133 locking.LEVEL_NODEGROUP: locking.ALL_SET,
3136 def Exec(self, feedback_fn):
3137 group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3139 # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3140 return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3141 for group in group_names])