4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Logical units dealing with the cluster."""
34 from ganeti import compat
35 from ganeti import constants
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import masterd
40 from ganeti import netutils
41 from ganeti import objects
42 from ganeti import opcodes
43 from ganeti import pathutils
44 from ganeti import query
45 from ganeti import rpc
46 from ganeti import runtime
47 from ganeti import ssh
48 from ganeti import uidpool
49 from ganeti import utils
50 from ganeti import vcluster
52 from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
54 from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55 ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56 GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57 GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58 CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59 ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob, \
60 CheckIpolicyVsDiskTemplates
62 import ganeti.masterd.instance
65 class LUClusterActivateMasterIp(NoHooksLU):
66 """Activate the master IP on the master node.
69 def Exec(self, feedback_fn):
70 """Activate the master IP.
73 master_params = self.cfg.GetMasterNetworkParameters()
74 ems = self.cfg.GetUseExternalMipScript()
75 result = self.rpc.call_node_activate_master_ip(master_params.uuid,
77 result.Raise("Could not activate the master IP")
80 class LUClusterDeactivateMasterIp(NoHooksLU):
81 """Deactivate the master IP on the master node.
84 def Exec(self, feedback_fn):
85 """Deactivate the master IP.
88 master_params = self.cfg.GetMasterNetworkParameters()
89 ems = self.cfg.GetUseExternalMipScript()
90 result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
92 result.Raise("Could not deactivate the master IP")
95 class LUClusterConfigQuery(NoHooksLU):
96 """Return configuration values.
101 def CheckArguments(self):
102 self.cq = ClusterQuery(None, self.op.output_fields, False)
104 def ExpandNames(self):
105 self.cq.ExpandNames(self)
107 def DeclareLocks(self, level):
108 self.cq.DeclareLocks(self, level)
110 def Exec(self, feedback_fn):
111 result = self.cq.OldStyleQuery(self)
113 assert len(result) == 1
118 class LUClusterDestroy(LogicalUnit):
119 """Logical unit for destroying the cluster.
122 HPATH = "cluster-destroy"
123 HTYPE = constants.HTYPE_CLUSTER
125 def BuildHooksEnv(self):
130 "OP_TARGET": self.cfg.GetClusterName(),
133 def BuildHooksNodes(self):
134 """Build hooks nodes.
139 def CheckPrereq(self):
140 """Check prerequisites.
142 This checks whether the cluster is empty.
144 Any errors are signaled by raising errors.OpPrereqError.
147 master = self.cfg.GetMasterNode()
149 nodelist = self.cfg.GetNodeList()
150 if len(nodelist) != 1 or nodelist[0] != master:
151 raise errors.OpPrereqError("There are still %d node(s) in"
152 " this cluster." % (len(nodelist) - 1),
154 instancelist = self.cfg.GetInstanceList()
156 raise errors.OpPrereqError("There are still %d instance(s) in"
157 " this cluster." % len(instancelist),
160 def Exec(self, feedback_fn):
161 """Destroys the cluster.
164 master_params = self.cfg.GetMasterNetworkParameters()
166 # Run post hooks on master node before it's removed
167 RunPostHook(self, self.cfg.GetNodeName(master_params.uuid))
169 ems = self.cfg.GetUseExternalMipScript()
170 result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
172 result.Warn("Error disabling the master IP address", self.LogWarning)
173 return master_params.uuid
176 class LUClusterPostInit(LogicalUnit):
177 """Logical unit for running hooks after cluster initialization.
180 HPATH = "cluster-init"
181 HTYPE = constants.HTYPE_CLUSTER
183 def BuildHooksEnv(self):
188 "OP_TARGET": self.cfg.GetClusterName(),
191 def BuildHooksNodes(self):
192 """Build hooks nodes.
195 return ([], [self.cfg.GetMasterNode()])
197 def Exec(self, feedback_fn):
204 class ClusterQuery(QueryBase):
205 FIELDS = query.CLUSTER_FIELDS
207 #: Do not sort (there is only one item)
210 def ExpandNames(self, lu):
213 # The following variables interact with _QueryBase._GetNames
214 self.wanted = locking.ALL_SET
215 self.do_locking = self.use_locking
218 raise errors.OpPrereqError("Can not use locking for cluster queries",
221 def DeclareLocks(self, lu, level):
224 def _GetQueryData(self, lu):
225 """Computes the list of nodes and their attributes.
228 # Locking is not used
229 assert not (compat.any(lu.glm.is_owned(level)
230 for level in locking.LEVELS
231 if level != locking.LEVEL_CLUSTER) or
232 self.do_locking or self.use_locking)
234 if query.CQ_CONFIG in self.requested_data:
235 cluster = lu.cfg.GetClusterInfo()
236 nodes = lu.cfg.GetAllNodesInfo()
238 cluster = NotImplemented
239 nodes = NotImplemented
241 if query.CQ_QUEUE_DRAINED in self.requested_data:
242 drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
244 drain_flag = NotImplemented
246 if query.CQ_WATCHER_PAUSE in self.requested_data:
247 master_node_uuid = lu.cfg.GetMasterNode()
249 result = lu.rpc.call_get_watcher_pause(master_node_uuid)
250 result.Raise("Can't retrieve watcher pause from master node '%s'" %
251 lu.cfg.GetMasterNodeName())
253 watcher_pause = result.payload
255 watcher_pause = NotImplemented
257 return query.ClusterQueryData(cluster, nodes, drain_flag, watcher_pause)
260 class LUClusterQuery(NoHooksLU):
261 """Query cluster configuration.
266 def ExpandNames(self):
267 self.needed_locks = {}
269 def Exec(self, feedback_fn):
270 """Return cluster config.
273 cluster = self.cfg.GetClusterInfo()
276 # Filter just for enabled hypervisors
277 for os_name, hv_dict in cluster.os_hvp.items():
279 for hv_name, hv_params in hv_dict.items():
280 if hv_name in cluster.enabled_hypervisors:
281 os_hvp[os_name][hv_name] = hv_params
283 # Convert ip_family to ip_version
284 primary_ip_version = constants.IP4_VERSION
285 if cluster.primary_ip_family == netutils.IP6Address.family:
286 primary_ip_version = constants.IP6_VERSION
289 "software_version": constants.RELEASE_VERSION,
290 "protocol_version": constants.PROTOCOL_VERSION,
291 "config_version": constants.CONFIG_VERSION,
292 "os_api_version": max(constants.OS_API_VERSIONS),
293 "export_version": constants.EXPORT_VERSION,
294 "vcs_version": constants.VCS_VERSION,
295 "architecture": runtime.GetArchInfo(),
296 "name": cluster.cluster_name,
297 "master": self.cfg.GetMasterNodeName(),
298 "default_hypervisor": cluster.primary_hypervisor,
299 "enabled_hypervisors": cluster.enabled_hypervisors,
300 "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
301 for hypervisor_name in cluster.enabled_hypervisors]),
303 "beparams": cluster.beparams,
304 "osparams": cluster.osparams,
305 "ipolicy": cluster.ipolicy,
306 "nicparams": cluster.nicparams,
307 "ndparams": cluster.ndparams,
308 "diskparams": cluster.diskparams,
309 "candidate_pool_size": cluster.candidate_pool_size,
310 "master_netdev": cluster.master_netdev,
311 "master_netmask": cluster.master_netmask,
312 "use_external_mip_script": cluster.use_external_mip_script,
313 "volume_group_name": cluster.volume_group_name,
314 "drbd_usermode_helper": cluster.drbd_usermode_helper,
315 "file_storage_dir": cluster.file_storage_dir,
316 "shared_file_storage_dir": cluster.shared_file_storage_dir,
317 "maintain_node_health": cluster.maintain_node_health,
318 "ctime": cluster.ctime,
319 "mtime": cluster.mtime,
320 "uuid": cluster.uuid,
321 "tags": list(cluster.GetTags()),
322 "uid_pool": cluster.uid_pool,
323 "default_iallocator": cluster.default_iallocator,
324 "reserved_lvs": cluster.reserved_lvs,
325 "primary_ip_version": primary_ip_version,
326 "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
327 "hidden_os": cluster.hidden_os,
328 "blacklisted_os": cluster.blacklisted_os,
329 "enabled_disk_templates": cluster.enabled_disk_templates,
335 class LUClusterRedistConf(NoHooksLU):
336 """Force the redistribution of cluster configuration.
338 This is a very simple LU.
343 def ExpandNames(self):
344 self.needed_locks = {
345 locking.LEVEL_NODE: locking.ALL_SET,
346 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
348 self.share_locks = ShareAll()
350 def Exec(self, feedback_fn):
351 """Redistribute the configuration.
354 self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
355 RedistributeAncillaryFiles(self)
358 class LUClusterRename(LogicalUnit):
359 """Rename the cluster.
362 HPATH = "cluster-rename"
363 HTYPE = constants.HTYPE_CLUSTER
365 def BuildHooksEnv(self):
370 "OP_TARGET": self.cfg.GetClusterName(),
371 "NEW_NAME": self.op.name,
374 def BuildHooksNodes(self):
375 """Build hooks nodes.
378 return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
380 def CheckPrereq(self):
381 """Verify that the passed name is a valid one.
384 hostname = netutils.GetHostname(name=self.op.name,
385 family=self.cfg.GetPrimaryIPFamily())
387 new_name = hostname.name
388 self.ip = new_ip = hostname.ip
389 old_name = self.cfg.GetClusterName()
390 old_ip = self.cfg.GetMasterIP()
391 if new_name == old_name and new_ip == old_ip:
392 raise errors.OpPrereqError("Neither the name nor the IP address of the"
393 " cluster has changed",
396 if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
397 raise errors.OpPrereqError("The given cluster IP address (%s) is"
398 " reachable on the network" %
399 new_ip, errors.ECODE_NOTUNIQUE)
401 self.op.name = new_name
403 def Exec(self, feedback_fn):
404 """Rename the cluster.
407 clustername = self.op.name
410 # shutdown the master IP
411 master_params = self.cfg.GetMasterNetworkParameters()
412 ems = self.cfg.GetUseExternalMipScript()
413 result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
415 result.Raise("Could not disable the master role")
418 cluster = self.cfg.GetClusterInfo()
419 cluster.cluster_name = clustername
420 cluster.master_ip = new_ip
421 self.cfg.Update(cluster, feedback_fn)
423 # update the known hosts file
424 ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
425 node_list = self.cfg.GetOnlineNodeList()
427 node_list.remove(master_params.uuid)
430 UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
432 master_params.ip = new_ip
433 result = self.rpc.call_node_activate_master_ip(master_params.uuid,
435 result.Warn("Could not re-enable the master role on the master,"
436 " please restart manually", self.LogWarning)
441 class LUClusterRepairDiskSizes(NoHooksLU):
442 """Verifies the cluster disks sizes.
447 def ExpandNames(self):
448 if self.op.instances:
449 (_, self.wanted_names) = GetWantedInstances(self, self.op.instances)
450 # Not getting the node allocation lock as only a specific set of
451 # instances (and their nodes) is going to be acquired
452 self.needed_locks = {
453 locking.LEVEL_NODE_RES: [],
454 locking.LEVEL_INSTANCE: self.wanted_names,
456 self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
458 self.wanted_names = None
459 self.needed_locks = {
460 locking.LEVEL_NODE_RES: locking.ALL_SET,
461 locking.LEVEL_INSTANCE: locking.ALL_SET,
463 # This opcode is acquires the node locks for all instances
464 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
468 locking.LEVEL_NODE_RES: 1,
469 locking.LEVEL_INSTANCE: 0,
470 locking.LEVEL_NODE_ALLOC: 1,
473 def DeclareLocks(self, level):
474 if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
475 self._LockInstancesNodes(primary_only=True, level=level)
477 def CheckPrereq(self):
478 """Check prerequisites.
480 This only checks the optional instance list against the existing names.
483 if self.wanted_names is None:
484 self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
486 self.wanted_instances = \
487 map(compat.snd, self.cfg.GetMultiInstanceInfoByName(self.wanted_names))
489 def _EnsureChildSizes(self, disk):
490 """Ensure children of the disk have the needed disk size.
492 This is valid mainly for DRBD8 and fixes an issue where the
493 children have smaller disk size.
495 @param disk: an L{ganeti.objects.Disk} object
498 if disk.dev_type == constants.DT_DRBD8:
499 assert disk.children, "Empty children for DRBD8?"
500 fchild = disk.children[0]
501 mismatch = fchild.size < disk.size
503 self.LogInfo("Child disk has size %d, parent %d, fixing",
504 fchild.size, disk.size)
505 fchild.size = disk.size
507 # and we recurse on this child only, not on the metadev
508 return self._EnsureChildSizes(fchild) or mismatch
512 def Exec(self, feedback_fn):
513 """Verify the size of cluster disks.
516 # TODO: check child disks too
517 # TODO: check differences in size between primary/secondary nodes
519 for instance in self.wanted_instances:
520 pnode = instance.primary_node
521 if pnode not in per_node_disks:
522 per_node_disks[pnode] = []
523 for idx, disk in enumerate(instance.disks):
524 per_node_disks[pnode].append((instance, idx, disk))
526 assert not (frozenset(per_node_disks.keys()) -
527 self.owned_locks(locking.LEVEL_NODE_RES)), \
528 "Not owning correct locks"
529 assert not self.owned_locks(locking.LEVEL_NODE)
531 es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
532 per_node_disks.keys())
535 for node_uuid, dskl in per_node_disks.items():
536 newl = [v[2].Copy() for v in dskl]
538 self.cfg.SetDiskID(dsk, node_uuid)
539 node_name = self.cfg.GetNodeName(node_uuid)
540 result = self.rpc.call_blockdev_getdimensions(node_uuid, newl)
542 self.LogWarning("Failure in blockdev_getdimensions call to node"
543 " %s, ignoring", node_name)
545 if len(result.payload) != len(dskl):
546 logging.warning("Invalid result from node %s: len(dksl)=%d,"
547 " result.payload=%s", node_name, len(dskl),
549 self.LogWarning("Invalid result from node %s, ignoring node results",
552 for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
553 if dimensions is None:
554 self.LogWarning("Disk %d of instance %s did not return size"
555 " information, ignoring", idx, instance.name)
557 if not isinstance(dimensions, (tuple, list)):
558 self.LogWarning("Disk %d of instance %s did not return valid"
559 " dimension information, ignoring", idx,
562 (size, spindles) = dimensions
563 if not isinstance(size, (int, long)):
564 self.LogWarning("Disk %d of instance %s did not return valid"
565 " size information, ignoring", idx, instance.name)
568 if size != disk.size:
569 self.LogInfo("Disk %d of instance %s has mismatched size,"
570 " correcting: recorded %d, actual %d", idx,
571 instance.name, disk.size, size)
573 self.cfg.Update(instance, feedback_fn)
574 changed.append((instance.name, idx, "size", size))
575 if es_flags[node_uuid]:
577 self.LogWarning("Disk %d of instance %s did not return valid"
578 " spindles information, ignoring", idx,
580 elif disk.spindles is None or disk.spindles != spindles:
581 self.LogInfo("Disk %d of instance %s has mismatched spindles,"
582 " correcting: recorded %s, actual %s",
583 idx, instance.name, disk.spindles, spindles)
584 disk.spindles = spindles
585 self.cfg.Update(instance, feedback_fn)
586 changed.append((instance.name, idx, "spindles", disk.spindles))
587 if self._EnsureChildSizes(disk):
588 self.cfg.Update(instance, feedback_fn)
589 changed.append((instance.name, idx, "size", disk.size))
593 def _ValidateNetmask(cfg, netmask):
594 """Checks if a netmask is valid.
596 @type cfg: L{config.ConfigWriter}
597 @param cfg: The cluster configuration
599 @param netmask: the netmask to be verified
600 @raise errors.OpPrereqError: if the validation fails
603 ip_family = cfg.GetPrimaryIPFamily()
605 ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
606 except errors.ProgrammerError:
607 raise errors.OpPrereqError("Invalid primary ip family: %s." %
608 ip_family, errors.ECODE_INVAL)
609 if not ipcls.ValidateNetmask(netmask):
610 raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
611 (netmask), errors.ECODE_INVAL)
614 def CheckFileBasedStoragePathVsEnabledDiskTemplates(
615 logging_warn_fn, file_storage_dir, enabled_disk_templates,
617 """Checks whether the given file-based storage directory is acceptable.
619 Note: This function is public, because it is also used in bootstrap.py.
621 @type logging_warn_fn: function
622 @param logging_warn_fn: function which accepts a string and logs it
623 @type file_storage_dir: string
624 @param file_storage_dir: the directory to be used for file-based instances
625 @type enabled_disk_templates: list of string
626 @param enabled_disk_templates: the list of enabled disk templates
627 @type file_disk_template: string
628 @param file_disk_template: the file-based disk template for which the
629 path should be checked
632 assert (file_disk_template in
633 utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
634 file_storage_enabled = file_disk_template in enabled_disk_templates
635 if file_storage_dir is not None:
636 if file_storage_dir == "":
637 if file_storage_enabled:
638 raise errors.OpPrereqError(
639 "Unsetting the '%s' storage directory while having '%s' storage"
640 " enabled is not permitted." %
641 (file_disk_template, file_disk_template))
643 if not file_storage_enabled:
645 "Specified a %s storage directory, although %s storage is not"
646 " enabled." % (file_disk_template, file_disk_template))
648 raise errors.ProgrammerError("Received %s storage dir with value"
649 " 'None'." % file_disk_template)
652 def CheckFileStoragePathVsEnabledDiskTemplates(
653 logging_warn_fn, file_storage_dir, enabled_disk_templates):
654 """Checks whether the given file storage directory is acceptable.
656 @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
659 CheckFileBasedStoragePathVsEnabledDiskTemplates(
660 logging_warn_fn, file_storage_dir, enabled_disk_templates,
664 def CheckSharedFileStoragePathVsEnabledDiskTemplates(
665 logging_warn_fn, file_storage_dir, enabled_disk_templates):
666 """Checks whether the given shared file storage directory is acceptable.
668 @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
671 CheckFileBasedStoragePathVsEnabledDiskTemplates(
672 logging_warn_fn, file_storage_dir, enabled_disk_templates,
673 constants.DT_SHARED_FILE)
676 class LUClusterSetParams(LogicalUnit):
677 """Change the parameters of the cluster.
680 HPATH = "cluster-modify"
681 HTYPE = constants.HTYPE_CLUSTER
684 def CheckArguments(self):
689 uidpool.CheckUidPool(self.op.uid_pool)
692 uidpool.CheckUidPool(self.op.add_uids)
694 if self.op.remove_uids:
695 uidpool.CheckUidPool(self.op.remove_uids)
697 if self.op.master_netmask is not None:
698 _ValidateNetmask(self.cfg, self.op.master_netmask)
700 if self.op.diskparams:
701 for dt_params in self.op.diskparams.values():
702 utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
704 utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
705 except errors.OpPrereqError, err:
706 raise errors.OpPrereqError("While verify diskparams options: %s" % err,
709 def ExpandNames(self):
710 # FIXME: in the future maybe other cluster params won't require checking on
711 # all nodes to be modified.
712 # FIXME: This opcode changes cluster-wide settings. Is acquiring all
713 # resource locks the right thing, shouldn't it be the BGL instead?
714 self.needed_locks = {
715 locking.LEVEL_NODE: locking.ALL_SET,
716 locking.LEVEL_INSTANCE: locking.ALL_SET,
717 locking.LEVEL_NODEGROUP: locking.ALL_SET,
718 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
720 self.share_locks = ShareAll()
722 def BuildHooksEnv(self):
727 "OP_TARGET": self.cfg.GetClusterName(),
728 "NEW_VG_NAME": self.op.vg_name,
731 def BuildHooksNodes(self):
732 """Build hooks nodes.
735 mn = self.cfg.GetMasterNode()
738 def _CheckVgName(self, node_uuids, enabled_disk_templates,
739 new_enabled_disk_templates):
740 """Check the consistency of the vg name on all nodes and in case it gets
741 unset whether there are instances still using it.
744 lvm_is_enabled = utils.IsLvmEnabled(enabled_disk_templates)
745 lvm_gets_enabled = utils.LvmGetsEnabled(enabled_disk_templates,
746 new_enabled_disk_templates)
747 current_vg_name = self.cfg.GetVGName()
749 if self.op.vg_name == '':
751 raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
752 " disk templates are or get enabled.")
754 if self.op.vg_name is None:
755 if current_vg_name is None and lvm_is_enabled:
756 raise errors.OpPrereqError("Please specify a volume group when"
757 " enabling lvm-based disk-templates.")
759 if self.op.vg_name is not None and not self.op.vg_name:
760 if self.cfg.HasAnyDiskOfType(constants.DT_PLAIN):
761 raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
762 " instances exist", errors.ECODE_INVAL)
764 if (self.op.vg_name is not None and lvm_is_enabled) or \
765 (self.cfg.GetVGName() is not None and lvm_gets_enabled):
766 self._CheckVgNameOnNodes(node_uuids)
768 def _CheckVgNameOnNodes(self, node_uuids):
769 """Check the status of the volume group on each node.
772 vglist = self.rpc.call_vg_list(node_uuids)
773 for node_uuid in node_uuids:
774 msg = vglist[node_uuid].fail_msg
777 self.LogWarning("Error while gathering data on node %s"
778 " (ignoring node): %s",
779 self.cfg.GetNodeName(node_uuid), msg)
781 vgstatus = utils.CheckVolumeGroupSize(vglist[node_uuid].payload,
783 constants.MIN_VG_SIZE)
785 raise errors.OpPrereqError("Error on node '%s': %s" %
786 (self.cfg.GetNodeName(node_uuid), vgstatus),
787 errors.ECODE_ENVIRON)
790 def _GetEnabledDiskTemplatesInner(op_enabled_disk_templates,
791 old_enabled_disk_templates):
792 """Determines the enabled disk templates and the subset of disk templates
793 that are newly enabled by this operation.
796 enabled_disk_templates = None
797 new_enabled_disk_templates = []
798 if op_enabled_disk_templates:
799 enabled_disk_templates = op_enabled_disk_templates
800 new_enabled_disk_templates = \
801 list(set(enabled_disk_templates)
802 - set(old_enabled_disk_templates))
804 enabled_disk_templates = old_enabled_disk_templates
805 return (enabled_disk_templates, new_enabled_disk_templates)
807 def _GetEnabledDiskTemplates(self, cluster):
808 """Determines the enabled disk templates and the subset of disk templates
809 that are newly enabled by this operation.
812 return self._GetEnabledDiskTemplatesInner(self.op.enabled_disk_templates,
813 cluster.enabled_disk_templates)
815 def _CheckIpolicy(self, cluster, enabled_disk_templates):
816 """Checks the ipolicy.
818 @type cluster: C{objects.Cluster}
819 @param cluster: the cluster's configuration
820 @type enabled_disk_templates: list of string
821 @param enabled_disk_templates: list of (possibly newly) enabled disk
825 # FIXME: write unit tests for this
827 self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
830 CheckIpolicyVsDiskTemplates(self.new_ipolicy,
831 enabled_disk_templates)
833 all_instances = self.cfg.GetAllInstancesInfo().values()
835 for group in self.cfg.GetAllNodeGroupsInfo().values():
836 instances = frozenset([inst for inst in all_instances
837 if compat.any(nuuid in group.members
838 for nuuid in inst.all_nodes)])
839 new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
840 ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
841 new = ComputeNewInstanceViolations(ipol, new_ipolicy, instances,
844 violations.update(new)
847 self.LogWarning("After the ipolicy change the following instances"
849 utils.CommaJoin(utils.NiceSort(violations)))
851 CheckIpolicyVsDiskTemplates(cluster.ipolicy,
852 enabled_disk_templates)
854 def _CheckDrbdHelperOnNodes(self, drbd_helper, node_uuids):
855 """Checks whether the set DRBD helper actually exists on the nodes.
857 @type drbd_helper: string
858 @param drbd_helper: path of the drbd usermode helper binary
859 @type node_uuids: list of strings
860 @param node_uuids: list of node UUIDs to check for the helper
863 # checks given drbd helper on all nodes
864 helpers = self.rpc.call_drbd_helper(node_uuids)
865 for (_, ninfo) in self.cfg.GetMultiNodeInfo(node_uuids):
867 self.LogInfo("Not checking drbd helper on offline node %s",
870 msg = helpers[ninfo.uuid].fail_msg
872 raise errors.OpPrereqError("Error checking drbd helper on node"
873 " '%s': %s" % (ninfo.name, msg),
874 errors.ECODE_ENVIRON)
875 node_helper = helpers[ninfo.uuid].payload
876 if node_helper != drbd_helper:
877 raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
878 (ninfo.name, node_helper),
879 errors.ECODE_ENVIRON)
881 def _CheckDrbdHelper(self, node_uuids, drbd_enabled, drbd_gets_enabled):
882 """Check the DRBD usermode helper.
884 @type node_uuids: list of strings
885 @param node_uuids: a list of nodes' UUIDs
886 @type drbd_enabled: boolean
887 @param drbd_enabled: whether DRBD will be enabled after this operation
888 (no matter if it was disabled before or not)
889 @type drbd_gets_enabled: boolen
890 @param drbd_gets_enabled: true if DRBD was disabled before this
891 operation, but will be enabled afterwards
894 if self.op.drbd_helper == '':
896 raise errors.OpPrereqError("Cannot disable drbd helper while"
898 if self.cfg.HasAnyDiskOfType(constants.DT_DRBD8):
899 raise errors.OpPrereqError("Cannot disable drbd helper while"
900 " drbd-based instances exist",
904 if self.op.drbd_helper is not None and drbd_enabled:
905 self._CheckDrbdHelperOnNodes(self.op.drbd_helper, node_uuids)
907 if drbd_gets_enabled:
908 current_drbd_helper = self.cfg.GetClusterInfo().drbd_usermode_helper
909 if current_drbd_helper is not None:
910 self._CheckDrbdHelperOnNodes(current_drbd_helper, node_uuids)
912 raise errors.OpPrereqError("Cannot enable DRBD without a"
913 " DRBD usermode helper set.")
915 def CheckPrereq(self):
916 """Check prerequisites.
918 This checks whether the given params don't conflict and
919 if the given volume group is valid.
922 node_uuids = self.owned_locks(locking.LEVEL_NODE)
923 self.cluster = cluster = self.cfg.GetClusterInfo()
925 vm_capable_node_uuids = [node.uuid
926 for node in self.cfg.GetAllNodesInfo().values()
927 if node.uuid in node_uuids and node.vm_capable]
929 (enabled_disk_templates, new_enabled_disk_templates) = \
930 self._GetEnabledDiskTemplates(cluster)
932 self._CheckVgName(vm_capable_node_uuids, enabled_disk_templates,
933 new_enabled_disk_templates)
935 if self.op.file_storage_dir is not None:
936 CheckFileStoragePathVsEnabledDiskTemplates(
937 self.LogWarning, self.op.file_storage_dir, enabled_disk_templates)
939 if self.op.shared_file_storage_dir is not None:
940 CheckSharedFileStoragePathVsEnabledDiskTemplates(
941 self.LogWarning, self.op.shared_file_storage_dir,
942 enabled_disk_templates)
944 drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
945 drbd_gets_enabled = constants.DT_DRBD8 in new_enabled_disk_templates
946 self._CheckDrbdHelper(node_uuids, drbd_enabled, drbd_gets_enabled)
948 # validate params changes
950 objects.UpgradeBeParams(self.op.beparams)
951 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
952 self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
955 utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
956 self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
958 # TODO: we need a more general way to handle resetting
959 # cluster-level parameters to default values
960 if self.new_ndparams["oob_program"] == "":
961 self.new_ndparams["oob_program"] = \
962 constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
965 new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
966 self.cluster.hv_state_static)
967 self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
968 for hv, values in new_hv_state.items())
970 if self.op.disk_state:
971 new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
972 self.cluster.disk_state_static)
973 self.new_disk_state = \
974 dict((storage, dict((name, cluster.SimpleFillDiskState(values))
975 for name, values in svalues.items()))
976 for storage, svalues in new_disk_state.items())
978 self._CheckIpolicy(cluster, enabled_disk_templates)
980 if self.op.nicparams:
981 utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
982 self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
983 objects.NIC.CheckParameterSyntax(self.new_nicparams)
986 # check all instances for consistency
987 for instance in self.cfg.GetAllInstancesInfo().values():
988 for nic_idx, nic in enumerate(instance.nics):
989 params_copy = copy.deepcopy(nic.nicparams)
990 params_filled = objects.FillDict(self.new_nicparams, params_copy)
992 # check parameter syntax
994 objects.NIC.CheckParameterSyntax(params_filled)
995 except errors.ConfigurationError, err:
996 nic_errors.append("Instance %s, nic/%d: %s" %
997 (instance.name, nic_idx, err))
999 # if we're moving instances to routed, check that they have an ip
1000 target_mode = params_filled[constants.NIC_MODE]
1001 if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
1002 nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
1003 " address" % (instance.name, nic_idx))
1005 raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
1006 "\n".join(nic_errors), errors.ECODE_INVAL)
1008 # hypervisor list/parameters
1009 self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
1010 if self.op.hvparams:
1011 for hv_name, hv_dict in self.op.hvparams.items():
1012 if hv_name not in self.new_hvparams:
1013 self.new_hvparams[hv_name] = hv_dict
1015 self.new_hvparams[hv_name].update(hv_dict)
1017 # disk template parameters
1018 self.new_diskparams = objects.FillDict(cluster.diskparams, {})
1019 if self.op.diskparams:
1020 for dt_name, dt_params in self.op.diskparams.items():
1021 if dt_name not in self.new_diskparams:
1022 self.new_diskparams[dt_name] = dt_params
1024 self.new_diskparams[dt_name].update(dt_params)
1026 # os hypervisor parameters
1027 self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
1029 for os_name, hvs in self.op.os_hvp.items():
1030 if os_name not in self.new_os_hvp:
1031 self.new_os_hvp[os_name] = hvs
1033 for hv_name, hv_dict in hvs.items():
1035 # Delete if it exists
1036 self.new_os_hvp[os_name].pop(hv_name, None)
1037 elif hv_name not in self.new_os_hvp[os_name]:
1038 self.new_os_hvp[os_name][hv_name] = hv_dict
1040 self.new_os_hvp[os_name][hv_name].update(hv_dict)
1043 self.new_osp = objects.FillDict(cluster.osparams, {})
1044 if self.op.osparams:
1045 for os_name, osp in self.op.osparams.items():
1046 if os_name not in self.new_osp:
1047 self.new_osp[os_name] = {}
1049 self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
1052 if not self.new_osp[os_name]:
1053 # we removed all parameters
1054 del self.new_osp[os_name]
1056 # check the parameter validity (remote check)
1057 CheckOSParams(self, False, [self.cfg.GetMasterNode()],
1058 os_name, self.new_osp[os_name])
1060 # changes to the hypervisor list
1061 if self.op.enabled_hypervisors is not None:
1062 self.hv_list = self.op.enabled_hypervisors
1063 for hv in self.hv_list:
1064 # if the hypervisor doesn't already exist in the cluster
1065 # hvparams, we initialize it to empty, and then (in both
1066 # cases) we make sure to fill the defaults, as we might not
1067 # have a complete defaults list if the hypervisor wasn't
1069 if hv not in new_hvp:
1071 new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
1072 utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
1074 self.hv_list = cluster.enabled_hypervisors
1076 if self.op.hvparams or self.op.enabled_hypervisors is not None:
1077 # either the enabled list has changed, or the parameters have, validate
1078 for hv_name, hv_params in self.new_hvparams.items():
1079 if ((self.op.hvparams and hv_name in self.op.hvparams) or
1080 (self.op.enabled_hypervisors and
1081 hv_name in self.op.enabled_hypervisors)):
1082 # either this is a new hypervisor, or its parameters have changed
1083 hv_class = hypervisor.GetHypervisorClass(hv_name)
1084 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1085 hv_class.CheckParameterSyntax(hv_params)
1086 CheckHVParams(self, node_uuids, hv_name, hv_params)
1088 self._CheckDiskTemplateConsistency()
1091 # no need to check any newly-enabled hypervisors, since the
1092 # defaults have already been checked in the above code-block
1093 for os_name, os_hvp in self.new_os_hvp.items():
1094 for hv_name, hv_params in os_hvp.items():
1095 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1096 # we need to fill in the new os_hvp on top of the actual hv_p
1097 cluster_defaults = self.new_hvparams.get(hv_name, {})
1098 new_osp = objects.FillDict(cluster_defaults, hv_params)
1099 hv_class = hypervisor.GetHypervisorClass(hv_name)
1100 hv_class.CheckParameterSyntax(new_osp)
1101 CheckHVParams(self, node_uuids, hv_name, new_osp)
1103 if self.op.default_iallocator:
1104 alloc_script = utils.FindFile(self.op.default_iallocator,
1105 constants.IALLOCATOR_SEARCH_PATH,
1107 if alloc_script is None:
1108 raise errors.OpPrereqError("Invalid default iallocator script '%s'"
1109 " specified" % self.op.default_iallocator,
1112 def _CheckDiskTemplateConsistency(self):
1113 """Check whether the disk templates that are going to be disabled
1114 are still in use by some instances.
1117 if self.op.enabled_disk_templates:
1118 cluster = self.cfg.GetClusterInfo()
1119 instances = self.cfg.GetAllInstancesInfo()
1121 disk_templates_to_remove = set(cluster.enabled_disk_templates) \
1122 - set(self.op.enabled_disk_templates)
1123 for instance in instances.itervalues():
1124 if instance.disk_template in disk_templates_to_remove:
1125 raise errors.OpPrereqError("Cannot disable disk template '%s',"
1126 " because instance '%s' is using it." %
1127 (instance.disk_template, instance.name))
1129 def _SetVgName(self, feedback_fn):
1130 """Determines and sets the new volume group name.
1133 if self.op.vg_name is not None:
1134 new_volume = self.op.vg_name
1137 if new_volume != self.cfg.GetVGName():
1138 self.cfg.SetVGName(new_volume)
1140 feedback_fn("Cluster LVM configuration already in desired"
1141 " state, not changing")
1143 def _SetFileStorageDir(self, feedback_fn):
1144 """Set the file storage directory.
1147 if self.op.file_storage_dir is not None:
1148 if self.cluster.file_storage_dir == self.op.file_storage_dir:
1149 feedback_fn("Global file storage dir already set to value '%s'"
1150 % self.cluster.file_storage_dir)
1152 self.cluster.file_storage_dir = self.op.file_storage_dir
1154 def _SetDrbdHelper(self, feedback_fn):
1155 """Set the DRBD usermode helper.
1158 if self.op.drbd_helper is not None:
1159 if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
1160 feedback_fn("Note that you specified a drbd user helper, but did not"
1161 " enable the drbd disk template.")
1162 new_helper = self.op.drbd_helper
1165 if new_helper != self.cfg.GetDRBDHelper():
1166 self.cfg.SetDRBDHelper(new_helper)
1168 feedback_fn("Cluster DRBD helper already in desired state,"
1171 def Exec(self, feedback_fn):
1172 """Change the parameters of the cluster.
1175 if self.op.enabled_disk_templates:
1176 self.cluster.enabled_disk_templates = \
1177 list(set(self.op.enabled_disk_templates))
1179 self._SetVgName(feedback_fn)
1180 self._SetFileStorageDir(feedback_fn)
1181 self._SetDrbdHelper(feedback_fn)
1183 if self.op.hvparams:
1184 self.cluster.hvparams = self.new_hvparams
1186 self.cluster.os_hvp = self.new_os_hvp
1187 if self.op.enabled_hypervisors is not None:
1188 self.cluster.hvparams = self.new_hvparams
1189 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1190 if self.op.beparams:
1191 self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1192 if self.op.nicparams:
1193 self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1195 self.cluster.ipolicy = self.new_ipolicy
1196 if self.op.osparams:
1197 self.cluster.osparams = self.new_osp
1198 if self.op.ndparams:
1199 self.cluster.ndparams = self.new_ndparams
1200 if self.op.diskparams:
1201 self.cluster.diskparams = self.new_diskparams
1202 if self.op.hv_state:
1203 self.cluster.hv_state_static = self.new_hv_state
1204 if self.op.disk_state:
1205 self.cluster.disk_state_static = self.new_disk_state
1207 if self.op.candidate_pool_size is not None:
1208 self.cluster.candidate_pool_size = self.op.candidate_pool_size
1209 # we need to update the pool size here, otherwise the save will fail
1210 AdjustCandidatePool(self, [])
1212 if self.op.maintain_node_health is not None:
1213 if self.op.maintain_node_health and not constants.ENABLE_CONFD:
1214 feedback_fn("Note: CONFD was disabled at build time, node health"
1215 " maintenance is not useful (still enabling it)")
1216 self.cluster.maintain_node_health = self.op.maintain_node_health
1218 if self.op.modify_etc_hosts is not None:
1219 self.cluster.modify_etc_hosts = self.op.modify_etc_hosts
1221 if self.op.prealloc_wipe_disks is not None:
1222 self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
1224 if self.op.add_uids is not None:
1225 uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
1227 if self.op.remove_uids is not None:
1228 uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
1230 if self.op.uid_pool is not None:
1231 self.cluster.uid_pool = self.op.uid_pool
1233 if self.op.default_iallocator is not None:
1234 self.cluster.default_iallocator = self.op.default_iallocator
1236 if self.op.reserved_lvs is not None:
1237 self.cluster.reserved_lvs = self.op.reserved_lvs
1239 if self.op.use_external_mip_script is not None:
1240 self.cluster.use_external_mip_script = self.op.use_external_mip_script
1242 def helper_os(aname, mods, desc):
1244 lst = getattr(self.cluster, aname)
1245 for key, val in mods:
1246 if key == constants.DDM_ADD:
1248 feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1251 elif key == constants.DDM_REMOVE:
1255 feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1257 raise errors.ProgrammerError("Invalid modification '%s'" % key)
1259 if self.op.hidden_os:
1260 helper_os("hidden_os", self.op.hidden_os, "hidden")
1262 if self.op.blacklisted_os:
1263 helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1265 if self.op.master_netdev:
1266 master_params = self.cfg.GetMasterNetworkParameters()
1267 ems = self.cfg.GetUseExternalMipScript()
1268 feedback_fn("Shutting down master ip on the current netdev (%s)" %
1269 self.cluster.master_netdev)
1270 result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
1272 if not self.op.force:
1273 result.Raise("Could not disable the master ip")
1276 msg = ("Could not disable the master ip (continuing anyway): %s" %
1279 feedback_fn("Changing master_netdev from %s to %s" %
1280 (master_params.netdev, self.op.master_netdev))
1281 self.cluster.master_netdev = self.op.master_netdev
1283 if self.op.master_netmask:
1284 master_params = self.cfg.GetMasterNetworkParameters()
1285 feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1286 result = self.rpc.call_node_change_master_netmask(
1287 master_params.uuid, master_params.netmask,
1288 self.op.master_netmask, master_params.ip,
1289 master_params.netdev)
1290 result.Warn("Could not change the master IP netmask", feedback_fn)
1291 self.cluster.master_netmask = self.op.master_netmask
1293 self.cfg.Update(self.cluster, feedback_fn)
1295 if self.op.master_netdev:
1296 master_params = self.cfg.GetMasterNetworkParameters()
1297 feedback_fn("Starting the master ip on the new master netdev (%s)" %
1298 self.op.master_netdev)
1299 ems = self.cfg.GetUseExternalMipScript()
1300 result = self.rpc.call_node_activate_master_ip(master_params.uuid,
1302 result.Warn("Could not re-enable the master ip on the master,"
1303 " please restart manually", self.LogWarning)
1306 class LUClusterVerify(NoHooksLU):
1307 """Submits all jobs necessary to verify the cluster.
1312 def ExpandNames(self):
1313 self.needed_locks = {}
1315 def Exec(self, feedback_fn):
1318 if self.op.group_name:
1319 groups = [self.op.group_name]
1320 depends_fn = lambda: None
1322 groups = self.cfg.GetNodeGroupList()
1324 # Verify global configuration
1326 opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1329 # Always depend on global verification
1330 depends_fn = lambda: [(-len(jobs), [])]
1333 [opcodes.OpClusterVerifyGroup(group_name=group,
1334 ignore_errors=self.op.ignore_errors,
1335 depends=depends_fn())]
1336 for group in groups)
1338 # Fix up all parameters
1339 for op in itertools.chain(*jobs): # pylint: disable=W0142
1340 op.debug_simulate_errors = self.op.debug_simulate_errors
1341 op.verbose = self.op.verbose
1342 op.error_codes = self.op.error_codes
1344 op.skip_checks = self.op.skip_checks
1345 except AttributeError:
1346 assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1348 return ResultWithJobs(jobs)
1351 class _VerifyErrors(object):
1352 """Mix-in for cluster/group verify LUs.
1354 It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1355 self.op and self._feedback_fn to be available.)
1359 ETYPE_FIELD = "code"
1360 ETYPE_ERROR = "ERROR"
1361 ETYPE_WARNING = "WARNING"
1363 def _Error(self, ecode, item, msg, *args, **kwargs):
1364 """Format an error message.
1366 Based on the opcode's error_codes parameter, either format a
1367 parseable error code, or a simpler error string.
1369 This must be called only from Exec and functions called from Exec.
1372 ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1373 itype, etxt, _ = ecode
1374 # If the error code is in the list of ignored errors, demote the error to a
1376 if etxt in self.op.ignore_errors: # pylint: disable=E1101
1377 ltype = self.ETYPE_WARNING
1378 # first complete the msg
1381 # then format the whole message
1382 if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1383 msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1389 msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1390 # and finally report it via the feedback_fn
1391 self._feedback_fn(" - %s" % msg) # Mix-in. pylint: disable=E1101
1392 # do not mark the operation as failed for WARN cases only
1393 if ltype == self.ETYPE_ERROR:
1396 def _ErrorIf(self, cond, *args, **kwargs):
1397 """Log an error message if the passed condition is True.
1401 or self.op.debug_simulate_errors): # pylint: disable=E1101
1402 self._Error(*args, **kwargs)
1405 def _VerifyCertificate(filename):
1406 """Verifies a certificate for L{LUClusterVerifyConfig}.
1408 @type filename: string
1409 @param filename: Path to PEM file
1413 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1414 utils.ReadFile(filename))
1415 except Exception, err: # pylint: disable=W0703
1416 return (LUClusterVerifyConfig.ETYPE_ERROR,
1417 "Failed to load X509 certificate %s: %s" % (filename, err))
1420 utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1421 constants.SSL_CERT_EXPIRATION_ERROR)
1424 fnamemsg = "While verifying %s: %s" % (filename, msg)
1429 return (None, fnamemsg)
1430 elif errcode == utils.CERT_WARNING:
1431 return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1432 elif errcode == utils.CERT_ERROR:
1433 return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1435 raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1438 def _GetAllHypervisorParameters(cluster, instances):
1439 """Compute the set of all hypervisor parameters.
1441 @type cluster: L{objects.Cluster}
1442 @param cluster: the cluster object
1443 @param instances: list of L{objects.Instance}
1444 @param instances: additional instances from which to obtain parameters
1445 @rtype: list of (origin, hypervisor, parameters)
1446 @return: a list with all parameters found, indicating the hypervisor they
1447 apply to, and the origin (can be "cluster", "os X", or "instance Y")
1452 for hv_name in cluster.enabled_hypervisors:
1453 hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1455 for os_name, os_hvp in cluster.os_hvp.items():
1456 for hv_name, hv_params in os_hvp.items():
1458 full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1459 hvp_data.append(("os %s" % os_name, hv_name, full_params))
1461 # TODO: collapse identical parameter values in a single one
1462 for instance in instances:
1463 if instance.hvparams:
1464 hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1465 cluster.FillHV(instance)))
1470 class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1471 """Verifies the cluster config.
1476 def _VerifyHVP(self, hvp_data):
1477 """Verifies locally the syntax of the hypervisor parameters.
1480 for item, hv_name, hv_params in hvp_data:
1481 msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1484 hv_class = hypervisor.GetHypervisorClass(hv_name)
1485 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1486 hv_class.CheckParameterSyntax(hv_params)
1487 except errors.GenericError, err:
1488 self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1490 def ExpandNames(self):
1491 self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1492 self.share_locks = ShareAll()
1494 def CheckPrereq(self):
1495 """Check prerequisites.
1498 # Retrieve all information
1499 self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1500 self.all_node_info = self.cfg.GetAllNodesInfo()
1501 self.all_inst_info = self.cfg.GetAllInstancesInfo()
1503 def Exec(self, feedback_fn):
1504 """Verify integrity of cluster, performing various test on nodes.
1508 self._feedback_fn = feedback_fn
1510 feedback_fn("* Verifying cluster config")
1512 for msg in self.cfg.VerifyConfig():
1513 self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1515 feedback_fn("* Verifying cluster certificate files")
1517 for cert_filename in pathutils.ALL_CERT_FILES:
1518 (errcode, msg) = _VerifyCertificate(cert_filename)
1519 self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1521 self._ErrorIf(not utils.CanRead(constants.LUXID_USER,
1522 pathutils.NODED_CERT_FILE),
1523 constants.CV_ECLUSTERCERT,
1525 pathutils.NODED_CERT_FILE + " must be accessible by the " +
1526 constants.LUXID_USER + " user")
1528 feedback_fn("* Verifying hypervisor parameters")
1530 self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1531 self.all_inst_info.values()))
1533 feedback_fn("* Verifying all nodes belong to an existing group")
1535 # We do this verification here because, should this bogus circumstance
1536 # occur, it would never be caught by VerifyGroup, which only acts on
1537 # nodes/instances reachable from existing node groups.
1539 dangling_nodes = set(node for node in self.all_node_info.values()
1540 if node.group not in self.all_group_info)
1542 dangling_instances = {}
1543 no_node_instances = []
1545 for inst in self.all_inst_info.values():
1546 if inst.primary_node in [node.uuid for node in dangling_nodes]:
1547 dangling_instances.setdefault(inst.primary_node, []).append(inst)
1548 elif inst.primary_node not in self.all_node_info:
1549 no_node_instances.append(inst)
1554 utils.CommaJoin(inst.name for
1555 inst in dangling_instances.get(node.uuid, [])))
1556 for node in dangling_nodes]
1558 self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1560 "the following nodes (and their instances) belong to a non"
1561 " existing group: %s", utils.CommaJoin(pretty_dangling))
1563 self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1565 "the following instances have a non-existing primary-node:"
1566 " %s", utils.CommaJoin(inst.name for
1567 inst in no_node_instances))
1572 class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1573 """Verifies the status of a node group.
1576 HPATH = "cluster-verify"
1577 HTYPE = constants.HTYPE_CLUSTER
1580 _HOOKS_INDENT_RE = re.compile("^", re.M)
1582 class NodeImage(object):
1583 """A class representing the logical and physical status of a node.
1586 @ivar uuid: the node UUID to which this object refers
1587 @ivar volumes: a structure as returned from
1588 L{ganeti.backend.GetVolumeList} (runtime)
1589 @ivar instances: a list of running instances (runtime)
1590 @ivar pinst: list of configured primary instances (config)
1591 @ivar sinst: list of configured secondary instances (config)
1592 @ivar sbp: dictionary of {primary-node: list of instances} for all
1593 instances for which this node is secondary (config)
1594 @ivar mfree: free memory, as reported by hypervisor (runtime)
1595 @ivar dfree: free disk, as reported by the node (runtime)
1596 @ivar offline: the offline status (config)
1597 @type rpc_fail: boolean
1598 @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1599 not whether the individual keys were correct) (runtime)
1600 @type lvm_fail: boolean
1601 @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1602 @type hyp_fail: boolean
1603 @ivar hyp_fail: whether the RPC call didn't return the instance list
1604 @type ghost: boolean
1605 @ivar ghost: whether this is a known node or not (config)
1606 @type os_fail: boolean
1607 @ivar os_fail: whether the RPC call didn't return valid OS data
1609 @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1610 @type vm_capable: boolean
1611 @ivar vm_capable: whether the node can host instances
1613 @ivar pv_min: size in MiB of the smallest PVs
1615 @ivar pv_max: size in MiB of the biggest PVs
1618 def __init__(self, offline=False, uuid=None, vm_capable=True):
1627 self.offline = offline
1628 self.vm_capable = vm_capable
1629 self.rpc_fail = False
1630 self.lvm_fail = False
1631 self.hyp_fail = False
1633 self.os_fail = False
1638 def ExpandNames(self):
1639 # This raises errors.OpPrereqError on its own:
1640 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1642 # Get instances in node group; this is unsafe and needs verification later
1644 self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1646 self.needed_locks = {
1647 locking.LEVEL_INSTANCE: self.cfg.GetInstanceNames(inst_uuids),
1648 locking.LEVEL_NODEGROUP: [self.group_uuid],
1649 locking.LEVEL_NODE: [],
1651 # This opcode is run by watcher every five minutes and acquires all nodes
1652 # for a group. It doesn't run for a long time, so it's better to acquire
1653 # the node allocation lock as well.
1654 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1657 self.share_locks = ShareAll()
1659 def DeclareLocks(self, level):
1660 if level == locking.LEVEL_NODE:
1661 # Get members of node group; this is unsafe and needs verification later
1662 nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1664 # In Exec(), we warn about mirrored instances that have primary and
1665 # secondary living in separate node groups. To fully verify that
1666 # volumes for these instances are healthy, we will need to do an
1667 # extra call to their secondaries. We ensure here those nodes will
1669 for inst_name in self.owned_locks(locking.LEVEL_INSTANCE):
1670 # Important: access only the instances whose lock is owned
1671 instance = self.cfg.GetInstanceInfoByName(inst_name)
1672 if instance.disk_template in constants.DTS_INT_MIRROR:
1673 nodes.update(instance.secondary_nodes)
1675 self.needed_locks[locking.LEVEL_NODE] = nodes
1677 def CheckPrereq(self):
1678 assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1679 self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1681 group_node_uuids = set(self.group_info.members)
1682 group_inst_uuids = \
1683 self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1685 unlocked_node_uuids = \
1686 group_node_uuids.difference(self.owned_locks(locking.LEVEL_NODE))
1688 unlocked_inst_uuids = \
1689 group_inst_uuids.difference(
1690 [self.cfg.GetInstanceInfoByName(name).uuid
1691 for name in self.owned_locks(locking.LEVEL_INSTANCE)])
1693 if unlocked_node_uuids:
1694 raise errors.OpPrereqError(
1695 "Missing lock for nodes: %s" %
1696 utils.CommaJoin(self.cfg.GetNodeNames(unlocked_node_uuids)),
1699 if unlocked_inst_uuids:
1700 raise errors.OpPrereqError(
1701 "Missing lock for instances: %s" %
1702 utils.CommaJoin(self.cfg.GetInstanceNames(unlocked_inst_uuids)),
1705 self.all_node_info = self.cfg.GetAllNodesInfo()
1706 self.all_inst_info = self.cfg.GetAllInstancesInfo()
1708 self.my_node_uuids = group_node_uuids
1709 self.my_node_info = dict((node_uuid, self.all_node_info[node_uuid])
1710 for node_uuid in group_node_uuids)
1712 self.my_inst_uuids = group_inst_uuids
1713 self.my_inst_info = dict((inst_uuid, self.all_inst_info[inst_uuid])
1714 for inst_uuid in group_inst_uuids)
1716 # We detect here the nodes that will need the extra RPC calls for verifying
1717 # split LV volumes; they should be locked.
1718 extra_lv_nodes = set()
1720 for inst in self.my_inst_info.values():
1721 if inst.disk_template in constants.DTS_INT_MIRROR:
1722 for nuuid in inst.all_nodes:
1723 if self.all_node_info[nuuid].group != self.group_uuid:
1724 extra_lv_nodes.add(nuuid)
1726 unlocked_lv_nodes = \
1727 extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1729 if unlocked_lv_nodes:
1730 raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1731 utils.CommaJoin(unlocked_lv_nodes),
1733 self.extra_lv_nodes = list(extra_lv_nodes)
1735 def _VerifyNode(self, ninfo, nresult):
1736 """Perform some basic validation on data returned from a node.
1738 - check the result data structure is well formed and has all the
1740 - check ganeti version
1742 @type ninfo: L{objects.Node}
1743 @param ninfo: the node to check
1744 @param nresult: the results from the node
1746 @return: whether overall this call was successful (and we can expect
1747 reasonable values in the respose)
1750 # main result, nresult should be a non-empty dict
1751 test = not nresult or not isinstance(nresult, dict)
1752 self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1753 "unable to verify node: no data returned")
1757 # compares ganeti version
1758 local_version = constants.PROTOCOL_VERSION
1759 remote_version = nresult.get("version", None)
1760 test = not (remote_version and
1761 isinstance(remote_version, (list, tuple)) and
1762 len(remote_version) == 2)
1763 self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1764 "connection to node returned invalid data")
1768 test = local_version != remote_version[0]
1769 self._ErrorIf(test, constants.CV_ENODEVERSION, ninfo.name,
1770 "incompatible protocol versions: master %s,"
1771 " node %s", local_version, remote_version[0])
1775 # node seems compatible, we can actually try to look into its results
1777 # full package version
1778 self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1779 constants.CV_ENODEVERSION, ninfo.name,
1780 "software version mismatch: master %s, node %s",
1781 constants.RELEASE_VERSION, remote_version[1],
1782 code=self.ETYPE_WARNING)
1784 hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1785 if ninfo.vm_capable and isinstance(hyp_result, dict):
1786 for hv_name, hv_result in hyp_result.iteritems():
1787 test = hv_result is not None
1788 self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
1789 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1791 hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1792 if ninfo.vm_capable and isinstance(hvp_result, list):
1793 for item, hv_name, hv_result in hvp_result:
1794 self._ErrorIf(True, constants.CV_ENODEHV, ninfo.name,
1795 "hypervisor %s parameter verify failure (source %s): %s",
1796 hv_name, item, hv_result)
1798 test = nresult.get(constants.NV_NODESETUP,
1799 ["Missing NODESETUP results"])
1800 self._ErrorIf(test, constants.CV_ENODESETUP, ninfo.name,
1801 "node setup error: %s", "; ".join(test))
1805 def _VerifyNodeTime(self, ninfo, nresult,
1806 nvinfo_starttime, nvinfo_endtime):
1807 """Check the node time.
1809 @type ninfo: L{objects.Node}
1810 @param ninfo: the node to check
1811 @param nresult: the remote results for the node
1812 @param nvinfo_starttime: the start time of the RPC call
1813 @param nvinfo_endtime: the end time of the RPC call
1816 ntime = nresult.get(constants.NV_TIME, None)
1818 ntime_merged = utils.MergeTime(ntime)
1819 except (ValueError, TypeError):
1820 self._ErrorIf(True, constants.CV_ENODETIME, ninfo.name,
1821 "Node returned invalid time")
1824 if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1825 ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1826 elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1827 ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1831 self._ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, ninfo.name,
1832 "Node time diverges by at least %s from master node time",
1835 def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1836 """Check the node LVM results and update info for cross-node checks.
1838 @type ninfo: L{objects.Node}
1839 @param ninfo: the node to check
1840 @param nresult: the remote results for the node
1841 @param vg_name: the configured VG name
1842 @type nimg: L{NodeImage}
1843 @param nimg: node image
1849 # checks vg existence and size > 20G
1850 vglist = nresult.get(constants.NV_VGLIST, None)
1852 self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
1853 "unable to check volume groups")
1855 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1856 constants.MIN_VG_SIZE)
1857 self._ErrorIf(vgstatus, constants.CV_ENODELVM, ninfo.name, vgstatus)
1860 (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1862 self._Error(constants.CV_ENODELVM, ninfo.name, em)
1863 if pvminmax is not None:
1864 (nimg.pv_min, nimg.pv_max) = pvminmax
1866 def _VerifyGroupDRBDVersion(self, node_verify_infos):
1867 """Check cross-node DRBD version consistency.
1869 @type node_verify_infos: dict
1870 @param node_verify_infos: infos about nodes as returned from the
1875 for node_uuid, ndata in node_verify_infos.items():
1876 nresult = ndata.payload
1877 version = nresult.get(constants.NV_DRBDVERSION, "Missing DRBD version")
1878 node_versions[node_uuid] = version
1880 if len(set(node_versions.values())) > 1:
1881 for node_uuid, version in sorted(node_versions.items()):
1882 msg = "DRBD version mismatch: %s" % version
1883 self._Error(constants.CV_ENODEDRBDHELPER, node_uuid, msg,
1884 code=self.ETYPE_WARNING)
1886 def _VerifyGroupLVM(self, node_image, vg_name):
1887 """Check cross-node consistency in LVM.
1889 @type node_image: dict
1890 @param node_image: info about nodes, mapping from node to names to
1891 L{NodeImage} objects
1892 @param vg_name: the configured VG name
1898 # Only exclusive storage needs this kind of checks
1899 if not self._exclusive_storage:
1902 # exclusive_storage wants all PVs to have the same size (approximately),
1903 # if the smallest and the biggest ones are okay, everything is fine.
1904 # pv_min is None iff pv_max is None
1905 vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1908 (pvmin, minnode_uuid) = min((ni.pv_min, ni.uuid) for ni in vals)
1909 (pvmax, maxnode_uuid) = max((ni.pv_max, ni.uuid) for ni in vals)
1910 bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1911 self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1912 "PV sizes differ too much in the group; smallest (%s MB) is"
1913 " on %s, biggest (%s MB) is on %s",
1914 pvmin, self.cfg.GetNodeName(minnode_uuid),
1915 pvmax, self.cfg.GetNodeName(maxnode_uuid))
1917 def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1918 """Check the node bridges.
1920 @type ninfo: L{objects.Node}
1921 @param ninfo: the node to check
1922 @param nresult: the remote results for the node
1923 @param bridges: the expected list of bridges
1929 missing = nresult.get(constants.NV_BRIDGES, None)
1930 test = not isinstance(missing, list)
1931 self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1932 "did not return valid bridge information")
1934 self._ErrorIf(bool(missing), constants.CV_ENODENET, ninfo.name,
1935 "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1937 def _VerifyNodeUserScripts(self, ninfo, nresult):
1938 """Check the results of user scripts presence and executability on the node
1940 @type ninfo: L{objects.Node}
1941 @param ninfo: the node to check
1942 @param nresult: the remote results for the node
1945 test = not constants.NV_USERSCRIPTS in nresult
1946 self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1947 "did not return user scripts information")
1949 broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
1951 self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1952 "user scripts not present or not executable: %s" %
1953 utils.CommaJoin(sorted(broken_scripts)))
1955 def _VerifyNodeNetwork(self, ninfo, nresult):
1956 """Check the node network connectivity results.
1958 @type ninfo: L{objects.Node}
1959 @param ninfo: the node to check
1960 @param nresult: the remote results for the node
1963 test = constants.NV_NODELIST not in nresult
1964 self._ErrorIf(test, constants.CV_ENODESSH, ninfo.name,
1965 "node hasn't returned node ssh connectivity data")
1967 if nresult[constants.NV_NODELIST]:
1968 for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1969 self._ErrorIf(True, constants.CV_ENODESSH, ninfo.name,
1970 "ssh communication with node '%s': %s", a_node, a_msg)
1972 test = constants.NV_NODENETTEST not in nresult
1973 self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1974 "node hasn't returned node tcp connectivity data")
1976 if nresult[constants.NV_NODENETTEST]:
1977 nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1979 self._ErrorIf(True, constants.CV_ENODENET, ninfo.name,
1980 "tcp communication with node '%s': %s",
1981 anode, nresult[constants.NV_NODENETTEST][anode])
1983 test = constants.NV_MASTERIP not in nresult
1984 self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1985 "node hasn't returned node master IP reachability data")
1987 if not nresult[constants.NV_MASTERIP]:
1988 if ninfo.uuid == self.master_node:
1989 msg = "the master node cannot reach the master IP (not configured?)"
1991 msg = "cannot reach the master IP"
1992 self._ErrorIf(True, constants.CV_ENODENET, ninfo.name, msg)
1994 def _VerifyInstance(self, instance, node_image, diskstatus):
1995 """Verify an instance.
1997 This function checks to see if the required block devices are
1998 available on the instance's node, and that the nodes are in the correct
2002 pnode_uuid = instance.primary_node
2003 pnode_img = node_image[pnode_uuid]
2004 groupinfo = self.cfg.GetAllNodeGroupsInfo()
2006 node_vol_should = {}
2007 instance.MapLVsByNode(node_vol_should)
2009 cluster = self.cfg.GetClusterInfo()
2010 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2012 err = ComputeIPolicyInstanceViolation(ipolicy, instance, self.cfg)
2013 self._ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance.name,
2014 utils.CommaJoin(err), code=self.ETYPE_WARNING)
2016 for node_uuid in node_vol_should:
2017 n_img = node_image[node_uuid]
2018 if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
2019 # ignore missing volumes on offline or broken nodes
2021 for volume in node_vol_should[node_uuid]:
2022 test = volume not in n_img.volumes
2023 self._ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance.name,
2024 "volume %s missing on node %s", volume,
2025 self.cfg.GetNodeName(node_uuid))
2027 if instance.admin_state == constants.ADMINST_UP:
2028 test = instance.uuid not in pnode_img.instances and not pnode_img.offline
2029 self._ErrorIf(test, constants.CV_EINSTANCEDOWN, instance.name,
2030 "instance not running on its primary node %s",
2031 self.cfg.GetNodeName(pnode_uuid))
2032 self._ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE,
2033 instance.name, "instance is marked as running and lives on"
2034 " offline node %s", self.cfg.GetNodeName(pnode_uuid))
2036 diskdata = [(nname, success, status, idx)
2037 for (nname, disks) in diskstatus.items()
2038 for idx, (success, status) in enumerate(disks)]
2040 for nname, success, bdev_status, idx in diskdata:
2041 # the 'ghost node' construction in Exec() ensures that we have a
2043 snode = node_image[nname]
2044 bad_snode = snode.ghost or snode.offline
2045 self._ErrorIf(instance.disks_active and
2046 not success and not bad_snode,
2047 constants.CV_EINSTANCEFAULTYDISK, instance.name,
2048 "couldn't retrieve status for disk/%s on %s: %s",
2049 idx, self.cfg.GetNodeName(nname), bdev_status)
2051 if instance.disks_active and success and \
2052 (bdev_status.is_degraded or
2053 bdev_status.ldisk_status != constants.LDS_OKAY):
2054 msg = "disk/%s on %s" % (idx, self.cfg.GetNodeName(nname))
2055 if bdev_status.is_degraded:
2056 msg += " is degraded"
2057 if bdev_status.ldisk_status != constants.LDS_OKAY:
2058 msg += "; state is '%s'" % \
2059 constants.LDS_NAMES[bdev_status.ldisk_status]
2061 self._Error(constants.CV_EINSTANCEFAULTYDISK, instance.name, msg)
2063 self._ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2064 constants.CV_ENODERPC, self.cfg.GetNodeName(pnode_uuid),
2065 "instance %s, connection to primary node failed",
2068 self._ErrorIf(len(instance.secondary_nodes) > 1,
2069 constants.CV_EINSTANCELAYOUT, instance.name,
2070 "instance has multiple secondary nodes: %s",
2071 utils.CommaJoin(instance.secondary_nodes),
2072 code=self.ETYPE_WARNING)
2074 es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, instance.all_nodes)
2075 if any(es_flags.values()):
2076 if instance.disk_template not in constants.DTS_EXCL_STORAGE:
2077 # Disk template not compatible with exclusive_storage: no instance
2078 # node should have the flag set
2080 for (n, es) in es_flags.items()
2082 self._Error(constants.CV_EINSTANCEUNSUITABLENODE, instance.name,
2083 "instance has template %s, which is not supported on nodes"
2084 " that have exclusive storage set: %s",
2085 instance.disk_template,
2086 utils.CommaJoin(self.cfg.GetNodeNames(es_nodes)))
2087 for (idx, disk) in enumerate(instance.disks):
2088 self._ErrorIf(disk.spindles is None,
2089 constants.CV_EINSTANCEMISSINGCFGPARAMETER, instance.name,
2090 "number of spindles not configured for disk %s while"
2091 " exclusive storage is enabled, try running"
2092 " gnt-cluster repair-disk-sizes", idx)
2094 if instance.disk_template in constants.DTS_INT_MIRROR:
2095 instance_nodes = utils.NiceSort(instance.all_nodes)
2096 instance_groups = {}
2098 for node_uuid in instance_nodes:
2099 instance_groups.setdefault(self.all_node_info[node_uuid].group,
2100 []).append(node_uuid)
2103 "%s (group %s)" % (utils.CommaJoin(self.cfg.GetNodeNames(nodes)),
2104 groupinfo[group].name)
2105 # Sort so that we always list the primary node first.
2106 for group, nodes in sorted(instance_groups.items(),
2107 key=lambda (_, nodes): pnode_uuid in nodes,
2110 self._ErrorIf(len(instance_groups) > 1,
2111 constants.CV_EINSTANCESPLITGROUPS,
2112 instance.name, "instance has primary and secondary nodes in"
2113 " different groups: %s", utils.CommaJoin(pretty_list),
2114 code=self.ETYPE_WARNING)
2116 inst_nodes_offline = []
2117 for snode in instance.secondary_nodes:
2118 s_img = node_image[snode]
2119 self._ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
2120 self.cfg.GetNodeName(snode),
2121 "instance %s, connection to secondary node failed",
2125 inst_nodes_offline.append(snode)
2127 # warn that the instance lives on offline nodes
2128 self._ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE,
2129 instance.name, "instance has offline secondary node(s) %s",
2130 utils.CommaJoin(self.cfg.GetNodeNames(inst_nodes_offline)))
2131 # ... or ghost/non-vm_capable nodes
2132 for node_uuid in instance.all_nodes:
2133 self._ErrorIf(node_image[node_uuid].ghost, constants.CV_EINSTANCEBADNODE,
2134 instance.name, "instance lives on ghost node %s",
2135 self.cfg.GetNodeName(node_uuid))
2136 self._ErrorIf(not node_image[node_uuid].vm_capable,
2137 constants.CV_EINSTANCEBADNODE, instance.name,
2138 "instance lives on non-vm_capable node %s",
2139 self.cfg.GetNodeName(node_uuid))
2141 def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2142 """Verify if there are any unknown volumes in the cluster.
2144 The .os, .swap and backup volumes are ignored. All other volumes are
2145 reported as unknown.
2147 @type reserved: L{ganeti.utils.FieldSet}
2148 @param reserved: a FieldSet of reserved volume names
2151 for node_uuid, n_img in node_image.items():
2152 if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
2153 self.all_node_info[node_uuid].group != self.group_uuid):
2154 # skip non-healthy nodes
2156 for volume in n_img.volumes:
2157 test = ((node_uuid not in node_vol_should or
2158 volume not in node_vol_should[node_uuid]) and
2159 not reserved.Matches(volume))
2160 self._ErrorIf(test, constants.CV_ENODEORPHANLV,
2161 self.cfg.GetNodeName(node_uuid),
2162 "volume %s is unknown", volume)
2164 def _VerifyNPlusOneMemory(self, node_image, all_insts):
2165 """Verify N+1 Memory Resilience.
2167 Check that if one single node dies we can still start all the
2168 instances it was primary for.
2171 cluster_info = self.cfg.GetClusterInfo()
2172 for node_uuid, n_img in node_image.items():
2173 # This code checks that every node which is now listed as
2174 # secondary has enough memory to host all instances it is
2175 # supposed to should a single other node in the cluster fail.
2176 # FIXME: not ready for failover to an arbitrary node
2177 # FIXME: does not support file-backed instances
2178 # WARNING: we currently take into account down instances as well
2179 # as up ones, considering that even if they're down someone
2180 # might want to start them even in the event of a node failure.
2181 if n_img.offline or \
2182 self.all_node_info[node_uuid].group != self.group_uuid:
2183 # we're skipping nodes marked offline and nodes in other groups from
2184 # the N+1 warning, since most likely we don't have good memory
2185 # information from them; we already list instances living on such
2186 # nodes, and that's enough warning
2188 #TODO(dynmem): also consider ballooning out other instances
2189 for prinode, inst_uuids in n_img.sbp.items():
2191 for inst_uuid in inst_uuids:
2192 bep = cluster_info.FillBE(all_insts[inst_uuid])
2193 if bep[constants.BE_AUTO_BALANCE]:
2194 needed_mem += bep[constants.BE_MINMEM]
2195 test = n_img.mfree < needed_mem
2196 self._ErrorIf(test, constants.CV_ENODEN1,
2197 self.cfg.GetNodeName(node_uuid),
2198 "not enough memory to accomodate instance failovers"
2199 " should node %s fail (%dMiB needed, %dMiB available)",
2200 self.cfg.GetNodeName(prinode), needed_mem, n_img.mfree)
2202 def _VerifyFiles(self, nodes, master_node_uuid, all_nvinfo,
2203 (files_all, files_opt, files_mc, files_vm)):
2204 """Verifies file checksums collected from all nodes.
2206 @param nodes: List of L{objects.Node} objects
2207 @param master_node_uuid: UUID of master node
2208 @param all_nvinfo: RPC results
2211 # Define functions determining which nodes to consider for a file
2214 (files_mc, lambda node: (node.master_candidate or
2215 node.uuid == master_node_uuid)),
2216 (files_vm, lambda node: node.vm_capable),
2219 # Build mapping from filename to list of nodes which should have the file
2221 for (files, fn) in files2nodefn:
2225 filenodes = filter(fn, nodes)
2226 nodefiles.update((filename,
2227 frozenset(map(operator.attrgetter("uuid"), filenodes)))
2228 for filename in files)
2230 assert set(nodefiles) == (files_all | files_mc | files_vm)
2232 fileinfo = dict((filename, {}) for filename in nodefiles)
2233 ignore_nodes = set()
2237 ignore_nodes.add(node.uuid)
2240 nresult = all_nvinfo[node.uuid]
2242 if nresult.fail_msg or not nresult.payload:
2245 fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
2246 node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2247 for (key, value) in fingerprints.items())
2250 test = not (node_files and isinstance(node_files, dict))
2251 self._ErrorIf(test, constants.CV_ENODEFILECHECK, node.name,
2252 "Node did not return file checksum data")
2254 ignore_nodes.add(node.uuid)
2257 # Build per-checksum mapping from filename to nodes having it
2258 for (filename, checksum) in node_files.items():
2259 assert filename in nodefiles
2260 fileinfo[filename].setdefault(checksum, set()).add(node.uuid)
2262 for (filename, checksums) in fileinfo.items():
2263 assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2265 # Nodes having the file
2266 with_file = frozenset(node_uuid
2267 for node_uuids in fileinfo[filename].values()
2268 for node_uuid in node_uuids) - ignore_nodes
2270 expected_nodes = nodefiles[filename] - ignore_nodes
2272 # Nodes missing file
2273 missing_file = expected_nodes - with_file
2275 if filename in files_opt:
2277 self._ErrorIf(missing_file and missing_file != expected_nodes,
2278 constants.CV_ECLUSTERFILECHECK, None,
2279 "File %s is optional, but it must exist on all or no"
2280 " nodes (not found on %s)",
2284 map(self.cfg.GetNodeName, missing_file))))
2286 self._ErrorIf(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2287 "File %s is missing from node(s) %s", filename,
2290 map(self.cfg.GetNodeName, missing_file))))
2292 # Warn if a node has a file it shouldn't
2293 unexpected = with_file - expected_nodes
2294 self._ErrorIf(unexpected,
2295 constants.CV_ECLUSTERFILECHECK, None,
2296 "File %s should not exist on node(s) %s",
2297 filename, utils.CommaJoin(
2298 utils.NiceSort(map(self.cfg.GetNodeName, unexpected))))
2300 # See if there are multiple versions of the file
2301 test = len(checksums) > 1
2303 variants = ["variant %s on %s" %
2305 utils.CommaJoin(utils.NiceSort(
2306 map(self.cfg.GetNodeName, node_uuids))))
2307 for (idx, (checksum, node_uuids)) in
2308 enumerate(sorted(checksums.items()))]
2312 self._ErrorIf(test, constants.CV_ECLUSTERFILECHECK, None,
2313 "File %s found with %s different checksums (%s)",
2314 filename, len(checksums), "; ".join(variants))
2316 def _VerifyNodeDrbdHelper(self, ninfo, nresult, drbd_helper):
2317 """Verify the drbd helper.
2321 helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2322 test = (helper_result is None)
2323 self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2324 "no drbd usermode helper returned")
2326 status, payload = helper_result
2328 self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2329 "drbd usermode helper check unsuccessful: %s", payload)
2330 test = status and (payload != drbd_helper)
2331 self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2332 "wrong drbd usermode helper: %s", payload)
2334 def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2336 """Verifies and the node DRBD status.
2338 @type ninfo: L{objects.Node}
2339 @param ninfo: the node to check
2340 @param nresult: the remote results for the node
2341 @param instanceinfo: the dict of instances
2342 @param drbd_helper: the configured DRBD usermode helper
2343 @param drbd_map: the DRBD map as returned by
2344 L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2347 self._VerifyNodeDrbdHelper(ninfo, nresult, drbd_helper)
2349 # compute the DRBD minors
2351 for minor, inst_uuid in drbd_map[ninfo.uuid].items():
2352 test = inst_uuid not in instanceinfo
2353 self._ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2354 "ghost instance '%s' in temporary DRBD map", inst_uuid)
2355 # ghost instance should not be running, but otherwise we
2356 # don't give double warnings (both ghost instance and
2357 # unallocated minor in use)
2359 node_drbd[minor] = (inst_uuid, False)
2361 instance = instanceinfo[inst_uuid]
2362 node_drbd[minor] = (inst_uuid, instance.disks_active)
2364 # and now check them
2365 used_minors = nresult.get(constants.NV_DRBDLIST, [])
2366 test = not isinstance(used_minors, (tuple, list))
2367 self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2368 "cannot parse drbd status file: %s", str(used_minors))
2370 # we cannot check drbd status
2373 for minor, (inst_uuid, must_exist) in node_drbd.items():
2374 test = minor not in used_minors and must_exist
2375 self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2376 "drbd minor %d of instance %s is not active", minor,
2377 self.cfg.GetInstanceName(inst_uuid))
2378 for minor in used_minors:
2379 test = minor not in node_drbd
2380 self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2381 "unallocated drbd minor %d is in use", minor)
2383 def _UpdateNodeOS(self, ninfo, nresult, nimg):
2384 """Builds the node OS structures.
2386 @type ninfo: L{objects.Node}
2387 @param ninfo: the node to check
2388 @param nresult: the remote results for the node
2389 @param nimg: the node image object
2392 remote_os = nresult.get(constants.NV_OSLIST, None)
2393 test = (not isinstance(remote_os, list) or
2394 not compat.all(isinstance(v, list) and len(v) == 7
2395 for v in remote_os))
2397 self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2398 "node hasn't returned valid OS data")
2407 for (name, os_path, status, diagnose,
2408 variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2410 if name not in os_dict:
2413 # parameters is a list of lists instead of list of tuples due to
2414 # JSON lacking a real tuple type, fix it:
2415 parameters = [tuple(v) for v in parameters]
2416 os_dict[name].append((os_path, status, diagnose,
2417 set(variants), set(parameters), set(api_ver)))
2419 nimg.oslist = os_dict
2421 def _VerifyNodeOS(self, ninfo, nimg, base):
2422 """Verifies the node OS list.
2424 @type ninfo: L{objects.Node}
2425 @param ninfo: the node to check
2426 @param nimg: the node image object
2427 @param base: the 'template' node we match against (e.g. from the master)
2430 assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2432 beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2433 for os_name, os_data in nimg.oslist.items():
2434 assert os_data, "Empty OS status for OS %s?!" % os_name
2435 f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2436 self._ErrorIf(not f_status, constants.CV_ENODEOS, ninfo.name,
2437 "Invalid OS %s (located at %s): %s",
2438 os_name, f_path, f_diag)
2439 self._ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, ninfo.name,
2440 "OS '%s' has multiple entries"
2441 " (first one shadows the rest): %s",
2442 os_name, utils.CommaJoin([v[0] for v in os_data]))
2443 # comparisons with the 'base' image
2444 test = os_name not in base.oslist
2445 self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2446 "Extra OS %s not present on reference node (%s)",
2447 os_name, self.cfg.GetNodeName(base.uuid))
2450 assert base.oslist[os_name], "Base node has empty OS status?"
2451 _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2453 # base OS is invalid, skipping
2455 for kind, a, b in [("API version", f_api, b_api),
2456 ("variants list", f_var, b_var),
2457 ("parameters", beautify_params(f_param),
2458 beautify_params(b_param))]:
2459 self._ErrorIf(a != b, constants.CV_ENODEOS, ninfo.name,
2460 "OS %s for %s differs from reference node %s:"
2461 " [%s] vs. [%s]", kind, os_name,
2462 self.cfg.GetNodeName(base.uuid),
2463 utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2465 # check any missing OSes
2466 missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2467 self._ErrorIf(missing, constants.CV_ENODEOS, ninfo.name,
2468 "OSes present on reference node %s"
2469 " but missing on this node: %s",
2470 self.cfg.GetNodeName(base.uuid), utils.CommaJoin(missing))
2472 def _VerifyAcceptedFileStoragePaths(self, ninfo, nresult, is_master):
2473 """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2475 @type ninfo: L{objects.Node}
2476 @param ninfo: the node to check
2477 @param nresult: the remote results for the node
2478 @type is_master: bool
2479 @param is_master: Whether node is the master node
2482 cluster = self.cfg.GetClusterInfo()
2484 (cluster.IsFileStorageEnabled() or
2485 cluster.IsSharedFileStorageEnabled())):
2487 fspaths = nresult[constants.NV_ACCEPTED_STORAGE_PATHS]
2489 # This should never happen
2490 self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2491 "Node did not return forbidden file storage paths")
2493 self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2494 "Found forbidden file storage paths: %s",
2495 utils.CommaJoin(fspaths))
2497 self._ErrorIf(constants.NV_ACCEPTED_STORAGE_PATHS in nresult,
2498 constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2499 "Node should not have returned forbidden file storage"
2502 def _VerifyStoragePaths(self, ninfo, nresult, file_disk_template,
2503 verify_key, error_key):
2504 """Verifies (file) storage paths.
2506 @type ninfo: L{objects.Node}
2507 @param ninfo: the node to check
2508 @param nresult: the remote results for the node
2509 @type file_disk_template: string
2510 @param file_disk_template: file-based disk template, whose directory
2511 is supposed to be verified
2512 @type verify_key: string
2513 @param verify_key: key for the verification map of this file
2515 @param error_key: error key to be added to the verification results
2516 in case something goes wrong in this verification step
2519 assert (file_disk_template in
2520 utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
2521 cluster = self.cfg.GetClusterInfo()
2522 if cluster.IsDiskTemplateEnabled(file_disk_template):
2524 verify_key in nresult,
2525 error_key, ninfo.name,
2526 "The configured %s storage path is unusable: %s" %
2527 (file_disk_template, nresult.get(verify_key)))
2529 def _VerifyFileStoragePaths(self, ninfo, nresult):
2530 """Verifies (file) storage paths.
2532 @see: C{_VerifyStoragePaths}
2535 self._VerifyStoragePaths(
2536 ninfo, nresult, constants.DT_FILE,
2537 constants.NV_FILE_STORAGE_PATH,
2538 constants.CV_ENODEFILESTORAGEPATHUNUSABLE)
2540 def _VerifySharedFileStoragePaths(self, ninfo, nresult):
2541 """Verifies (file) storage paths.
2543 @see: C{_VerifyStoragePaths}
2546 self._VerifyStoragePaths(
2547 ninfo, nresult, constants.DT_SHARED_FILE,
2548 constants.NV_SHARED_FILE_STORAGE_PATH,
2549 constants.CV_ENODESHAREDFILESTORAGEPATHUNUSABLE)
2551 def _VerifyOob(self, ninfo, nresult):
2552 """Verifies out of band functionality of a node.
2554 @type ninfo: L{objects.Node}
2555 @param ninfo: the node to check
2556 @param nresult: the remote results for the node
2559 # We just have to verify the paths on master and/or master candidates
2560 # as the oob helper is invoked on the master
2561 if ((ninfo.master_candidate or ninfo.master_capable) and
2562 constants.NV_OOB_PATHS in nresult):
2563 for path_result in nresult[constants.NV_OOB_PATHS]:
2564 self._ErrorIf(path_result, constants.CV_ENODEOOBPATH,
2565 ninfo.name, path_result)
2567 def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2568 """Verifies and updates the node volume data.
2570 This function will update a L{NodeImage}'s internal structures
2571 with data from the remote call.
2573 @type ninfo: L{objects.Node}
2574 @param ninfo: the node to check
2575 @param nresult: the remote results for the node
2576 @param nimg: the node image object
2577 @param vg_name: the configured VG name
2580 nimg.lvm_fail = True
2581 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2584 elif isinstance(lvdata, basestring):
2585 self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2586 "LVM problem on node: %s", utils.SafeEncode(lvdata))
2587 elif not isinstance(lvdata, dict):
2588 self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2589 "rpc call to node failed (lvlist)")
2591 nimg.volumes = lvdata
2592 nimg.lvm_fail = False
2594 def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2595 """Verifies and updates the node instance list.
2597 If the listing was successful, then updates this node's instance
2598 list. Otherwise, it marks the RPC call as failed for the instance
2601 @type ninfo: L{objects.Node}
2602 @param ninfo: the node to check
2603 @param nresult: the remote results for the node
2604 @param nimg: the node image object
2607 idata = nresult.get(constants.NV_INSTANCELIST, None)
2608 test = not isinstance(idata, list)
2609 self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2610 "rpc call to node failed (instancelist): %s",
2611 utils.SafeEncode(str(idata)))
2613 nimg.hyp_fail = True
2615 nimg.instances = [inst.uuid for (_, inst) in
2616 self.cfg.GetMultiInstanceInfoByName(idata)]
2618 def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2619 """Verifies and computes a node information map
2621 @type ninfo: L{objects.Node}
2622 @param ninfo: the node to check
2623 @param nresult: the remote results for the node
2624 @param nimg: the node image object
2625 @param vg_name: the configured VG name
2628 # try to read free memory (from the hypervisor)
2629 hv_info = nresult.get(constants.NV_HVINFO, None)
2630 test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2631 self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2632 "rpc call to node failed (hvinfo)")
2635 nimg.mfree = int(hv_info["memory_free"])
2636 except (ValueError, TypeError):
2637 self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2638 "node returned invalid nodeinfo, check hypervisor")
2640 # FIXME: devise a free space model for file based instances as well
2641 if vg_name is not None:
2642 test = (constants.NV_VGLIST not in nresult or
2643 vg_name not in nresult[constants.NV_VGLIST])
2644 self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
2645 "node didn't return data for the volume group '%s'"
2646 " - it is either missing or broken", vg_name)
2649 nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2650 except (ValueError, TypeError):
2651 self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2652 "node returned invalid LVM info, check LVM status")
2654 def _CollectDiskInfo(self, node_uuids, node_image, instanceinfo):
2655 """Gets per-disk status information for all instances.
2657 @type node_uuids: list of strings
2658 @param node_uuids: Node UUIDs
2659 @type node_image: dict of (UUID, L{objects.Node})
2660 @param node_image: Node objects
2661 @type instanceinfo: dict of (UUID, L{objects.Instance})
2662 @param instanceinfo: Instance objects
2663 @rtype: {instance: {node: [(succes, payload)]}}
2664 @return: a dictionary of per-instance dictionaries with nodes as
2665 keys and disk information as values; the disk information is a
2666 list of tuples (success, payload)
2670 node_disks_devonly = {}
2671 diskless_instances = set()
2672 diskless = constants.DT_DISKLESS
2674 for nuuid in node_uuids:
2675 node_inst_uuids = list(itertools.chain(node_image[nuuid].pinst,
2676 node_image[nuuid].sinst))
2677 diskless_instances.update(uuid for uuid in node_inst_uuids
2678 if instanceinfo[uuid].disk_template == diskless)
2679 disks = [(inst_uuid, disk)
2680 for inst_uuid in node_inst_uuids
2681 for disk in instanceinfo[inst_uuid].disks]
2684 # No need to collect data
2687 node_disks[nuuid] = disks
2689 # _AnnotateDiskParams makes already copies of the disks
2691 for (inst_uuid, dev) in disks:
2692 (anno_disk,) = AnnotateDiskParams(instanceinfo[inst_uuid], [dev],
2694 self.cfg.SetDiskID(anno_disk, nuuid)
2695 devonly.append(anno_disk)
2697 node_disks_devonly[nuuid] = devonly
2699 assert len(node_disks) == len(node_disks_devonly)
2701 # Collect data from all nodes with disks
2702 result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2705 assert len(result) == len(node_disks)
2709 for (nuuid, nres) in result.items():
2710 node = self.cfg.GetNodeInfo(nuuid)
2711 disks = node_disks[node.uuid]
2714 # No data from this node
2715 data = len(disks) * [(False, "node offline")]
2718 self._ErrorIf(msg, constants.CV_ENODERPC, node.name,
2719 "while getting disk information: %s", msg)
2721 # No data from this node
2722 data = len(disks) * [(False, msg)]
2725 for idx, i in enumerate(nres.payload):
2726 if isinstance(i, (tuple, list)) and len(i) == 2:
2729 logging.warning("Invalid result from node %s, entry %d: %s",
2731 data.append((False, "Invalid result from the remote node"))
2733 for ((inst_uuid, _), status) in zip(disks, data):
2734 instdisk.setdefault(inst_uuid, {}).setdefault(node.uuid, []) \
2737 # Add empty entries for diskless instances.
2738 for inst_uuid in diskless_instances:
2739 assert inst_uuid not in instdisk
2740 instdisk[inst_uuid] = {}
2742 assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2743 len(nuuids) <= len(instanceinfo[inst].all_nodes) and
2744 compat.all(isinstance(s, (tuple, list)) and
2745 len(s) == 2 for s in statuses)
2746 for inst, nuuids in instdisk.items()
2747 for nuuid, statuses in nuuids.items())
2749 instdisk_keys = set(instdisk)
2750 instanceinfo_keys = set(instanceinfo)
2751 assert instdisk_keys == instanceinfo_keys, \
2752 ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2753 (instdisk_keys, instanceinfo_keys))
2758 def _SshNodeSelector(group_uuid, all_nodes):
2759 """Create endless iterators for all potential SSH check hosts.
2762 nodes = [node for node in all_nodes
2763 if (node.group != group_uuid and
2765 keyfunc = operator.attrgetter("group")
2767 return map(itertools.cycle,
2768 [sorted(map(operator.attrgetter("name"), names))
2769 for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2773 def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2774 """Choose which nodes should talk to which other nodes.
2776 We will make nodes contact all nodes in their group, and one node from
2779 @warning: This algorithm has a known issue if one node group is much
2780 smaller than others (e.g. just one node). In such a case all other
2781 nodes will talk to the single node.
2784 online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2785 sel = cls._SshNodeSelector(group_uuid, all_nodes)
2787 return (online_nodes,
2788 dict((name, sorted([i.next() for i in sel]))
2789 for name in online_nodes))
2791 def BuildHooksEnv(self):
2794 Cluster-Verify hooks just ran in the post phase and their failure makes
2795 the output be logged in the verify output and the verification to fail.
2799 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2802 env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2803 for node in self.my_node_info.values())
2807 def BuildHooksNodes(self):
2808 """Build hooks nodes.
2811 return ([], list(self.my_node_info.keys()))
2813 def Exec(self, feedback_fn):
2814 """Verify integrity of the node group, performing various test on nodes.
2817 # This method has too many local variables. pylint: disable=R0914
2818 feedback_fn("* Verifying group '%s'" % self.group_info.name)
2820 if not self.my_node_uuids:
2822 feedback_fn("* Empty node group, skipping verification")
2826 verbose = self.op.verbose
2827 self._feedback_fn = feedback_fn
2829 vg_name = self.cfg.GetVGName()
2830 drbd_helper = self.cfg.GetDRBDHelper()
2831 cluster = self.cfg.GetClusterInfo()
2832 hypervisors = cluster.enabled_hypervisors
2833 node_data_list = self.my_node_info.values()
2835 i_non_redundant = [] # Non redundant instances
2836 i_non_a_balanced = [] # Non auto-balanced instances
2837 i_offline = 0 # Count of offline instances
2838 n_offline = 0 # Count of offline nodes
2839 n_drained = 0 # Count of nodes being drained
2840 node_vol_should = {}
2842 # FIXME: verify OS list
2845 filemap = ComputeAncillaryFiles(cluster, False)
2847 # do local checksums
2848 master_node_uuid = self.master_node = self.cfg.GetMasterNode()
2849 master_ip = self.cfg.GetMasterIP()
2851 feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_uuids))
2854 if self.cfg.GetUseExternalMipScript():
2855 user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2857 node_verify_param = {
2858 constants.NV_FILELIST:
2859 map(vcluster.MakeVirtualPath,
2860 utils.UniqueSequence(filename
2861 for files in filemap
2862 for filename in files)),
2863 constants.NV_NODELIST:
2864 self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2865 self.all_node_info.values()),
2866 constants.NV_HYPERVISOR: hypervisors,
2867 constants.NV_HVPARAMS:
2868 _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2869 constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2870 for node in node_data_list
2871 if not node.offline],
2872 constants.NV_INSTANCELIST: hypervisors,
2873 constants.NV_VERSION: None,
2874 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2875 constants.NV_NODESETUP: None,
2876 constants.NV_TIME: None,
2877 constants.NV_MASTERIP: (self.cfg.GetMasterNodeName(), master_ip),
2878 constants.NV_OSLIST: None,
2879 constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2880 constants.NV_USERSCRIPTS: user_scripts,
2883 if vg_name is not None:
2884 node_verify_param[constants.NV_VGLIST] = None
2885 node_verify_param[constants.NV_LVLIST] = vg_name
2886 node_verify_param[constants.NV_PVLIST] = [vg_name]
2888 if cluster.IsDiskTemplateEnabled(constants.DT_DRBD8):
2890 node_verify_param[constants.NV_DRBDVERSION] = None
2891 node_verify_param[constants.NV_DRBDLIST] = None
2892 node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2894 if cluster.IsFileStorageEnabled() or \
2895 cluster.IsSharedFileStorageEnabled():
2896 # Load file storage paths only from master node
2897 node_verify_param[constants.NV_ACCEPTED_STORAGE_PATHS] = \
2898 self.cfg.GetMasterNodeName()
2899 if cluster.IsFileStorageEnabled():
2900 node_verify_param[constants.NV_FILE_STORAGE_PATH] = \
2901 cluster.file_storage_dir
2904 # FIXME: this needs to be changed per node-group, not cluster-wide
2906 default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2907 if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2908 bridges.add(default_nicpp[constants.NIC_LINK])
2909 for inst_uuid in self.my_inst_info.values():
2910 for nic in inst_uuid.nics:
2911 full_nic = cluster.SimpleFillNIC(nic.nicparams)
2912 if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2913 bridges.add(full_nic[constants.NIC_LINK])
2916 node_verify_param[constants.NV_BRIDGES] = list(bridges)
2918 # Build our expected cluster state
2919 node_image = dict((node.uuid, self.NodeImage(offline=node.offline,
2921 vm_capable=node.vm_capable))
2922 for node in node_data_list)
2926 for node in self.all_node_info.values():
2927 path = SupportsOob(self.cfg, node)
2928 if path and path not in oob_paths:
2929 oob_paths.append(path)
2932 node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2934 for inst_uuid in self.my_inst_uuids:
2935 instance = self.my_inst_info[inst_uuid]
2936 if instance.admin_state == constants.ADMINST_OFFLINE:
2939 for nuuid in instance.all_nodes:
2940 if nuuid not in node_image:
2941 gnode = self.NodeImage(uuid=nuuid)
2942 gnode.ghost = (nuuid not in self.all_node_info)
2943 node_image[nuuid] = gnode
2945 instance.MapLVsByNode(node_vol_should)
2947 pnode = instance.primary_node
2948 node_image[pnode].pinst.append(instance.uuid)
2950 for snode in instance.secondary_nodes:
2951 nimg = node_image[snode]
2952 nimg.sinst.append(instance.uuid)
2953 if pnode not in nimg.sbp:
2954 nimg.sbp[pnode] = []
2955 nimg.sbp[pnode].append(instance.uuid)
2957 es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
2958 self.my_node_info.keys())
2959 # The value of exclusive_storage should be the same across the group, so if
2960 # it's True for at least a node, we act as if it were set for all the nodes
2961 self._exclusive_storage = compat.any(es_flags.values())
2962 if self._exclusive_storage:
2963 node_verify_param[constants.NV_EXCLUSIVEPVS] = True
2965 # At this point, we have the in-memory data structures complete,
2966 # except for the runtime information, which we'll gather next
2968 # Due to the way our RPC system works, exact response times cannot be
2969 # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2970 # time before and after executing the request, we can at least have a time
2972 nvinfo_starttime = time.time()
2973 all_nvinfo = self.rpc.call_node_verify(self.my_node_uuids,
2975 self.cfg.GetClusterName(),
2976 self.cfg.GetClusterInfo().hvparams)
2977 nvinfo_endtime = time.time()
2979 if self.extra_lv_nodes and vg_name is not None:
2981 self.rpc.call_node_verify(self.extra_lv_nodes,
2982 {constants.NV_LVLIST: vg_name},
2983 self.cfg.GetClusterName(),
2984 self.cfg.GetClusterInfo().hvparams)
2986 extra_lv_nvinfo = {}
2988 all_drbd_map = self.cfg.ComputeDRBDMap()
2990 feedback_fn("* Gathering disk information (%s nodes)" %
2991 len(self.my_node_uuids))
2992 instdisk = self._CollectDiskInfo(self.my_node_info.keys(), node_image,
2995 feedback_fn("* Verifying configuration file consistency")
2997 # If not all nodes are being checked, we need to make sure the master node
2998 # and a non-checked vm_capable node are in the list.
2999 absent_node_uuids = set(self.all_node_info).difference(self.my_node_info)
3000 if absent_node_uuids:
3001 vf_nvinfo = all_nvinfo.copy()
3002 vf_node_info = list(self.my_node_info.values())
3003 additional_node_uuids = []
3004 if master_node_uuid not in self.my_node_info:
3005 additional_node_uuids.append(master_node_uuid)
3006 vf_node_info.append(self.all_node_info[master_node_uuid])
3007 # Add the first vm_capable node we find which is not included,
3008 # excluding the master node (which we already have)
3009 for node_uuid in absent_node_uuids:
3010 nodeinfo = self.all_node_info[node_uuid]
3011 if (nodeinfo.vm_capable and not nodeinfo.offline and
3012 node_uuid != master_node_uuid):
3013 additional_node_uuids.append(node_uuid)
3014 vf_node_info.append(self.all_node_info[node_uuid])
3016 key = constants.NV_FILELIST
3017 vf_nvinfo.update(self.rpc.call_node_verify(
3018 additional_node_uuids, {key: node_verify_param[key]},
3019 self.cfg.GetClusterName(), self.cfg.GetClusterInfo().hvparams))
3021 vf_nvinfo = all_nvinfo
3022 vf_node_info = self.my_node_info.values()
3024 self._VerifyFiles(vf_node_info, master_node_uuid, vf_nvinfo, filemap)
3026 feedback_fn("* Verifying node status")
3030 for node_i in node_data_list:
3031 nimg = node_image[node_i.uuid]
3035 feedback_fn("* Skipping offline node %s" % (node_i.name,))
3039 if node_i.uuid == master_node_uuid:
3041 elif node_i.master_candidate:
3042 ntype = "master candidate"
3043 elif node_i.drained:
3049 feedback_fn("* Verifying node %s (%s)" % (node_i.name, ntype))
3051 msg = all_nvinfo[node_i.uuid].fail_msg
3052 self._ErrorIf(msg, constants.CV_ENODERPC, node_i.name,
3053 "while contacting node: %s", msg)
3055 nimg.rpc_fail = True
3058 nresult = all_nvinfo[node_i.uuid].payload
3060 nimg.call_ok = self._VerifyNode(node_i, nresult)
3061 self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
3062 self._VerifyNodeNetwork(node_i, nresult)
3063 self._VerifyNodeUserScripts(node_i, nresult)
3064 self._VerifyOob(node_i, nresult)
3065 self._VerifyAcceptedFileStoragePaths(node_i, nresult,
3066 node_i.uuid == master_node_uuid)
3067 self._VerifyFileStoragePaths(node_i, nresult)
3068 self._VerifySharedFileStoragePaths(node_i, nresult)
3071 self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
3072 self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
3075 self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
3076 self._UpdateNodeInstances(node_i, nresult, nimg)
3077 self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
3078 self._UpdateNodeOS(node_i, nresult, nimg)
3080 if not nimg.os_fail:
3081 if refos_img is None:
3083 self._VerifyNodeOS(node_i, nimg, refos_img)
3084 self._VerifyNodeBridges(node_i, nresult, bridges)
3086 # Check whether all running instances are primary for the node. (This
3087 # can no longer be done from _VerifyInstance below, since some of the
3088 # wrong instances could be from other node groups.)
3089 non_primary_inst_uuids = set(nimg.instances).difference(nimg.pinst)
3091 for inst_uuid in non_primary_inst_uuids:
3092 test = inst_uuid in self.all_inst_info
3093 self._ErrorIf(test, constants.CV_EINSTANCEWRONGNODE,
3094 self.cfg.GetInstanceName(inst_uuid),
3095 "instance should not run on node %s", node_i.name)
3096 self._ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
3097 "node is running unknown instance %s", inst_uuid)
3099 self._VerifyGroupDRBDVersion(all_nvinfo)
3100 self._VerifyGroupLVM(node_image, vg_name)
3102 for node_uuid, result in extra_lv_nvinfo.items():
3103 self._UpdateNodeVolumes(self.all_node_info[node_uuid], result.payload,
3104 node_image[node_uuid], vg_name)
3106 feedback_fn("* Verifying instance status")
3107 for inst_uuid in self.my_inst_uuids:
3108 instance = self.my_inst_info[inst_uuid]
3110 feedback_fn("* Verifying instance %s" % instance.name)
3111 self._VerifyInstance(instance, node_image, instdisk[inst_uuid])
3113 # If the instance is non-redundant we cannot survive losing its primary
3114 # node, so we are not N+1 compliant.
3115 if instance.disk_template not in constants.DTS_MIRRORED:
3116 i_non_redundant.append(instance)
3118 if not cluster.FillBE(instance)[constants.BE_AUTO_BALANCE]:
3119 i_non_a_balanced.append(instance)
3121 feedback_fn("* Verifying orphan volumes")
3122 reserved = utils.FieldSet(*cluster.reserved_lvs)
3124 # We will get spurious "unknown volume" warnings if any node of this group
3125 # is secondary for an instance whose primary is in another group. To avoid
3126 # them, we find these instances and add their volumes to node_vol_should.
3127 for instance in self.all_inst_info.values():
3128 for secondary in instance.secondary_nodes:
3129 if (secondary in self.my_node_info
3130 and instance.name not in self.my_inst_info):
3131 instance.MapLVsByNode(node_vol_should)
3134 self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
3136 if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
3137 feedback_fn("* Verifying N+1 Memory redundancy")
3138 self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
3140 feedback_fn("* Other Notes")
3142 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
3143 % len(i_non_redundant))
3145 if i_non_a_balanced:
3146 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
3147 % len(i_non_a_balanced))
3150 feedback_fn(" - NOTICE: %d offline instance(s) found." % i_offline)
3153 feedback_fn(" - NOTICE: %d offline node(s) found." % n_offline)
3156 feedback_fn(" - NOTICE: %d drained node(s) found." % n_drained)
3160 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
3161 """Analyze the post-hooks' result
3163 This method analyses the hook result, handles it, and sends some
3164 nicely-formatted feedback back to the user.
3166 @param phase: one of L{constants.HOOKS_PHASE_POST} or
3167 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
3168 @param hooks_results: the results of the multi-node hooks rpc call
3169 @param feedback_fn: function used send feedback back to the caller
3170 @param lu_result: previous Exec result
3171 @return: the new Exec result, based on the previous result
3175 # We only really run POST phase hooks, only for non-empty groups,
3176 # and are only interested in their results
3177 if not self.my_node_uuids:
3180 elif phase == constants.HOOKS_PHASE_POST:
3181 # Used to change hooks' output to proper indentation
3182 feedback_fn("* Hooks Results")
3183 assert hooks_results, "invalid result from hooks"
3185 for node_name in hooks_results:
3186 res = hooks_results[node_name]
3188 test = msg and not res.offline
3189 self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3190 "Communication failure in hooks execution: %s", msg)
3191 if res.offline or msg:
3192 # No need to investigate payload if node is offline or gave
3195 for script, hkr, output in res.payload:
3196 test = hkr == constants.HKR_FAIL
3197 self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3198 "Script %s failed, output:", script)
3200 output = self._HOOKS_INDENT_RE.sub(" ", output)
3201 feedback_fn("%s" % output)
3207 class LUClusterVerifyDisks(NoHooksLU):
3208 """Verifies the cluster disks status.
3213 def ExpandNames(self):
3214 self.share_locks = ShareAll()
3215 self.needed_locks = {
3216 locking.LEVEL_NODEGROUP: locking.ALL_SET,
3219 def Exec(self, feedback_fn):
3220 group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3222 # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3223 return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3224 for group in group_names])