4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Logical units dealing with the cluster."""
34 from ganeti import compat
35 from ganeti import constants
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import masterd
40 from ganeti import netutils
41 from ganeti import objects
42 from ganeti import opcodes
43 from ganeti import pathutils
44 from ganeti import query
45 from ganeti import rpc
46 from ganeti import runtime
47 from ganeti import ssh
48 from ganeti import uidpool
49 from ganeti import utils
50 from ganeti import vcluster
52 from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
54 from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55 ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56 GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57 GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58 CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59 ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob, \
60 CheckIpolicyVsDiskTemplates, CheckDiskAccessModeValidity, \
61 CheckDiskAccessModeConsistency
63 import ganeti.masterd.instance
66 class LUClusterActivateMasterIp(NoHooksLU):
67 """Activate the master IP on the master node.
70 def Exec(self, feedback_fn):
71 """Activate the master IP.
74 master_params = self.cfg.GetMasterNetworkParameters()
75 ems = self.cfg.GetUseExternalMipScript()
76 result = self.rpc.call_node_activate_master_ip(master_params.uuid,
78 result.Raise("Could not activate the master IP")
81 class LUClusterDeactivateMasterIp(NoHooksLU):
82 """Deactivate the master IP on the master node.
85 def Exec(self, feedback_fn):
86 """Deactivate the master IP.
89 master_params = self.cfg.GetMasterNetworkParameters()
90 ems = self.cfg.GetUseExternalMipScript()
91 result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
93 result.Raise("Could not deactivate the master IP")
96 class LUClusterConfigQuery(NoHooksLU):
97 """Return configuration values.
102 def CheckArguments(self):
103 self.cq = ClusterQuery(None, self.op.output_fields, False)
105 def ExpandNames(self):
106 self.cq.ExpandNames(self)
108 def DeclareLocks(self, level):
109 self.cq.DeclareLocks(self, level)
111 def Exec(self, feedback_fn):
112 result = self.cq.OldStyleQuery(self)
114 assert len(result) == 1
119 class LUClusterDestroy(LogicalUnit):
120 """Logical unit for destroying the cluster.
123 HPATH = "cluster-destroy"
124 HTYPE = constants.HTYPE_CLUSTER
126 def BuildHooksEnv(self):
131 "OP_TARGET": self.cfg.GetClusterName(),
134 def BuildHooksNodes(self):
135 """Build hooks nodes.
140 def CheckPrereq(self):
141 """Check prerequisites.
143 This checks whether the cluster is empty.
145 Any errors are signaled by raising errors.OpPrereqError.
148 master = self.cfg.GetMasterNode()
150 nodelist = self.cfg.GetNodeList()
151 if len(nodelist) != 1 or nodelist[0] != master:
152 raise errors.OpPrereqError("There are still %d node(s) in"
153 " this cluster." % (len(nodelist) - 1),
155 instancelist = self.cfg.GetInstanceList()
157 raise errors.OpPrereqError("There are still %d instance(s) in"
158 " this cluster." % len(instancelist),
161 def Exec(self, feedback_fn):
162 """Destroys the cluster.
165 master_params = self.cfg.GetMasterNetworkParameters()
167 # Run post hooks on master node before it's removed
168 RunPostHook(self, self.cfg.GetNodeName(master_params.uuid))
170 ems = self.cfg.GetUseExternalMipScript()
171 result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
173 result.Warn("Error disabling the master IP address", self.LogWarning)
174 return master_params.uuid
177 class LUClusterPostInit(LogicalUnit):
178 """Logical unit for running hooks after cluster initialization.
181 HPATH = "cluster-init"
182 HTYPE = constants.HTYPE_CLUSTER
184 def BuildHooksEnv(self):
189 "OP_TARGET": self.cfg.GetClusterName(),
192 def BuildHooksNodes(self):
193 """Build hooks nodes.
196 return ([], [self.cfg.GetMasterNode()])
198 def Exec(self, feedback_fn):
205 class ClusterQuery(QueryBase):
206 FIELDS = query.CLUSTER_FIELDS
208 #: Do not sort (there is only one item)
211 def ExpandNames(self, lu):
214 # The following variables interact with _QueryBase._GetNames
215 self.wanted = locking.ALL_SET
216 self.do_locking = self.use_locking
219 raise errors.OpPrereqError("Can not use locking for cluster queries",
222 def DeclareLocks(self, lu, level):
225 def _GetQueryData(self, lu):
226 """Computes the list of nodes and their attributes.
229 # Locking is not used
230 assert not (compat.any(lu.glm.is_owned(level)
231 for level in locking.LEVELS
232 if level != locking.LEVEL_CLUSTER) or
233 self.do_locking or self.use_locking)
235 if query.CQ_CONFIG in self.requested_data:
236 cluster = lu.cfg.GetClusterInfo()
237 nodes = lu.cfg.GetAllNodesInfo()
239 cluster = NotImplemented
240 nodes = NotImplemented
242 if query.CQ_QUEUE_DRAINED in self.requested_data:
243 drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
245 drain_flag = NotImplemented
247 if query.CQ_WATCHER_PAUSE in self.requested_data:
248 master_node_uuid = lu.cfg.GetMasterNode()
250 result = lu.rpc.call_get_watcher_pause(master_node_uuid)
251 result.Raise("Can't retrieve watcher pause from master node '%s'" %
252 lu.cfg.GetMasterNodeName())
254 watcher_pause = result.payload
256 watcher_pause = NotImplemented
258 return query.ClusterQueryData(cluster, nodes, drain_flag, watcher_pause)
261 class LUClusterQuery(NoHooksLU):
262 """Query cluster configuration.
267 def ExpandNames(self):
268 self.needed_locks = {}
270 def Exec(self, feedback_fn):
271 """Return cluster config.
274 cluster = self.cfg.GetClusterInfo()
277 # Filter just for enabled hypervisors
278 for os_name, hv_dict in cluster.os_hvp.items():
280 for hv_name, hv_params in hv_dict.items():
281 if hv_name in cluster.enabled_hypervisors:
282 os_hvp[os_name][hv_name] = hv_params
284 # Convert ip_family to ip_version
285 primary_ip_version = constants.IP4_VERSION
286 if cluster.primary_ip_family == netutils.IP6Address.family:
287 primary_ip_version = constants.IP6_VERSION
290 "software_version": constants.RELEASE_VERSION,
291 "protocol_version": constants.PROTOCOL_VERSION,
292 "config_version": constants.CONFIG_VERSION,
293 "os_api_version": max(constants.OS_API_VERSIONS),
294 "export_version": constants.EXPORT_VERSION,
295 "vcs_version": constants.VCS_VERSION,
296 "architecture": runtime.GetArchInfo(),
297 "name": cluster.cluster_name,
298 "master": self.cfg.GetMasterNodeName(),
299 "default_hypervisor": cluster.primary_hypervisor,
300 "enabled_hypervisors": cluster.enabled_hypervisors,
301 "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
302 for hypervisor_name in cluster.enabled_hypervisors]),
304 "beparams": cluster.beparams,
305 "osparams": cluster.osparams,
306 "ipolicy": cluster.ipolicy,
307 "nicparams": cluster.nicparams,
308 "ndparams": cluster.ndparams,
309 "diskparams": cluster.diskparams,
310 "candidate_pool_size": cluster.candidate_pool_size,
311 "master_netdev": cluster.master_netdev,
312 "master_netmask": cluster.master_netmask,
313 "use_external_mip_script": cluster.use_external_mip_script,
314 "volume_group_name": cluster.volume_group_name,
315 "drbd_usermode_helper": cluster.drbd_usermode_helper,
316 "file_storage_dir": cluster.file_storage_dir,
317 "shared_file_storage_dir": cluster.shared_file_storage_dir,
318 "maintain_node_health": cluster.maintain_node_health,
319 "ctime": cluster.ctime,
320 "mtime": cluster.mtime,
321 "uuid": cluster.uuid,
322 "tags": list(cluster.GetTags()),
323 "uid_pool": cluster.uid_pool,
324 "default_iallocator": cluster.default_iallocator,
325 "reserved_lvs": cluster.reserved_lvs,
326 "primary_ip_version": primary_ip_version,
327 "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
328 "hidden_os": cluster.hidden_os,
329 "blacklisted_os": cluster.blacklisted_os,
330 "enabled_disk_templates": cluster.enabled_disk_templates,
336 class LUClusterRedistConf(NoHooksLU):
337 """Force the redistribution of cluster configuration.
339 This is a very simple LU.
344 def ExpandNames(self):
345 self.needed_locks = {
346 locking.LEVEL_NODE: locking.ALL_SET,
347 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
349 self.share_locks = ShareAll()
351 def Exec(self, feedback_fn):
352 """Redistribute the configuration.
355 self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
356 RedistributeAncillaryFiles(self)
359 class LUClusterRename(LogicalUnit):
360 """Rename the cluster.
363 HPATH = "cluster-rename"
364 HTYPE = constants.HTYPE_CLUSTER
366 def BuildHooksEnv(self):
371 "OP_TARGET": self.cfg.GetClusterName(),
372 "NEW_NAME": self.op.name,
375 def BuildHooksNodes(self):
376 """Build hooks nodes.
379 return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
381 def CheckPrereq(self):
382 """Verify that the passed name is a valid one.
385 hostname = netutils.GetHostname(name=self.op.name,
386 family=self.cfg.GetPrimaryIPFamily())
388 new_name = hostname.name
389 self.ip = new_ip = hostname.ip
390 old_name = self.cfg.GetClusterName()
391 old_ip = self.cfg.GetMasterIP()
392 if new_name == old_name and new_ip == old_ip:
393 raise errors.OpPrereqError("Neither the name nor the IP address of the"
394 " cluster has changed",
397 if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
398 raise errors.OpPrereqError("The given cluster IP address (%s) is"
399 " reachable on the network" %
400 new_ip, errors.ECODE_NOTUNIQUE)
402 self.op.name = new_name
404 def Exec(self, feedback_fn):
405 """Rename the cluster.
408 clustername = self.op.name
411 # shutdown the master IP
412 master_params = self.cfg.GetMasterNetworkParameters()
413 ems = self.cfg.GetUseExternalMipScript()
414 result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
416 result.Raise("Could not disable the master role")
419 cluster = self.cfg.GetClusterInfo()
420 cluster.cluster_name = clustername
421 cluster.master_ip = new_ip
422 self.cfg.Update(cluster, feedback_fn)
424 # update the known hosts file
425 ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
426 node_list = self.cfg.GetOnlineNodeList()
428 node_list.remove(master_params.uuid)
431 UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
433 master_params.ip = new_ip
434 result = self.rpc.call_node_activate_master_ip(master_params.uuid,
436 result.Warn("Could not re-enable the master role on the master,"
437 " please restart manually", self.LogWarning)
442 class LUClusterRepairDiskSizes(NoHooksLU):
443 """Verifies the cluster disks sizes.
448 def ExpandNames(self):
449 if self.op.instances:
450 (_, self.wanted_names) = GetWantedInstances(self, self.op.instances)
451 # Not getting the node allocation lock as only a specific set of
452 # instances (and their nodes) is going to be acquired
453 self.needed_locks = {
454 locking.LEVEL_NODE_RES: [],
455 locking.LEVEL_INSTANCE: self.wanted_names,
457 self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
459 self.wanted_names = None
460 self.needed_locks = {
461 locking.LEVEL_NODE_RES: locking.ALL_SET,
462 locking.LEVEL_INSTANCE: locking.ALL_SET,
464 # This opcode is acquires the node locks for all instances
465 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
469 locking.LEVEL_NODE_RES: 1,
470 locking.LEVEL_INSTANCE: 0,
471 locking.LEVEL_NODE_ALLOC: 1,
474 def DeclareLocks(self, level):
475 if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
476 self._LockInstancesNodes(primary_only=True, level=level)
478 def CheckPrereq(self):
479 """Check prerequisites.
481 This only checks the optional instance list against the existing names.
484 if self.wanted_names is None:
485 self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
487 self.wanted_instances = \
488 map(compat.snd, self.cfg.GetMultiInstanceInfoByName(self.wanted_names))
490 def _EnsureChildSizes(self, disk):
491 """Ensure children of the disk have the needed disk size.
493 This is valid mainly for DRBD8 and fixes an issue where the
494 children have smaller disk size.
496 @param disk: an L{ganeti.objects.Disk} object
499 if disk.dev_type == constants.DT_DRBD8:
500 assert disk.children, "Empty children for DRBD8?"
501 fchild = disk.children[0]
502 mismatch = fchild.size < disk.size
504 self.LogInfo("Child disk has size %d, parent %d, fixing",
505 fchild.size, disk.size)
506 fchild.size = disk.size
508 # and we recurse on this child only, not on the metadev
509 return self._EnsureChildSizes(fchild) or mismatch
513 def Exec(self, feedback_fn):
514 """Verify the size of cluster disks.
517 # TODO: check child disks too
518 # TODO: check differences in size between primary/secondary nodes
520 for instance in self.wanted_instances:
521 pnode = instance.primary_node
522 if pnode not in per_node_disks:
523 per_node_disks[pnode] = []
524 for idx, disk in enumerate(instance.disks):
525 per_node_disks[pnode].append((instance, idx, disk))
527 assert not (frozenset(per_node_disks.keys()) -
528 self.owned_locks(locking.LEVEL_NODE_RES)), \
529 "Not owning correct locks"
530 assert not self.owned_locks(locking.LEVEL_NODE)
532 es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
533 per_node_disks.keys())
536 for node_uuid, dskl in per_node_disks.items():
538 # no disks on the node
541 newl = [([v[2].Copy()], v[0]) for v in dskl]
542 node_name = self.cfg.GetNodeName(node_uuid)
543 result = self.rpc.call_blockdev_getdimensions(node_uuid, newl)
545 self.LogWarning("Failure in blockdev_getdimensions call to node"
546 " %s, ignoring", node_name)
548 if len(result.payload) != len(dskl):
549 logging.warning("Invalid result from node %s: len(dksl)=%d,"
550 " result.payload=%s", node_name, len(dskl),
552 self.LogWarning("Invalid result from node %s, ignoring node results",
555 for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
556 if dimensions is None:
557 self.LogWarning("Disk %d of instance %s did not return size"
558 " information, ignoring", idx, instance.name)
560 if not isinstance(dimensions, (tuple, list)):
561 self.LogWarning("Disk %d of instance %s did not return valid"
562 " dimension information, ignoring", idx,
565 (size, spindles) = dimensions
566 if not isinstance(size, (int, long)):
567 self.LogWarning("Disk %d of instance %s did not return valid"
568 " size information, ignoring", idx, instance.name)
571 if size != disk.size:
572 self.LogInfo("Disk %d of instance %s has mismatched size,"
573 " correcting: recorded %d, actual %d", idx,
574 instance.name, disk.size, size)
576 self.cfg.Update(instance, feedback_fn)
577 changed.append((instance.name, idx, "size", size))
578 if es_flags[node_uuid]:
580 self.LogWarning("Disk %d of instance %s did not return valid"
581 " spindles information, ignoring", idx,
583 elif disk.spindles is None or disk.spindles != spindles:
584 self.LogInfo("Disk %d of instance %s has mismatched spindles,"
585 " correcting: recorded %s, actual %s",
586 idx, instance.name, disk.spindles, spindles)
587 disk.spindles = spindles
588 self.cfg.Update(instance, feedback_fn)
589 changed.append((instance.name, idx, "spindles", disk.spindles))
590 if self._EnsureChildSizes(disk):
591 self.cfg.Update(instance, feedback_fn)
592 changed.append((instance.name, idx, "size", disk.size))
596 def _ValidateNetmask(cfg, netmask):
597 """Checks if a netmask is valid.
599 @type cfg: L{config.ConfigWriter}
600 @param cfg: The cluster configuration
602 @param netmask: the netmask to be verified
603 @raise errors.OpPrereqError: if the validation fails
606 ip_family = cfg.GetPrimaryIPFamily()
608 ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
609 except errors.ProgrammerError:
610 raise errors.OpPrereqError("Invalid primary ip family: %s." %
611 ip_family, errors.ECODE_INVAL)
612 if not ipcls.ValidateNetmask(netmask):
613 raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
614 (netmask), errors.ECODE_INVAL)
617 def CheckFileBasedStoragePathVsEnabledDiskTemplates(
618 logging_warn_fn, file_storage_dir, enabled_disk_templates,
620 """Checks whether the given file-based storage directory is acceptable.
622 Note: This function is public, because it is also used in bootstrap.py.
624 @type logging_warn_fn: function
625 @param logging_warn_fn: function which accepts a string and logs it
626 @type file_storage_dir: string
627 @param file_storage_dir: the directory to be used for file-based instances
628 @type enabled_disk_templates: list of string
629 @param enabled_disk_templates: the list of enabled disk templates
630 @type file_disk_template: string
631 @param file_disk_template: the file-based disk template for which the
632 path should be checked
635 assert (file_disk_template in
636 utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
637 file_storage_enabled = file_disk_template in enabled_disk_templates
638 if file_storage_dir is not None:
639 if file_storage_dir == "":
640 if file_storage_enabled:
641 raise errors.OpPrereqError(
642 "Unsetting the '%s' storage directory while having '%s' storage"
643 " enabled is not permitted." %
644 (file_disk_template, file_disk_template))
646 if not file_storage_enabled:
648 "Specified a %s storage directory, although %s storage is not"
649 " enabled." % (file_disk_template, file_disk_template))
651 raise errors.ProgrammerError("Received %s storage dir with value"
652 " 'None'." % file_disk_template)
655 def CheckFileStoragePathVsEnabledDiskTemplates(
656 logging_warn_fn, file_storage_dir, enabled_disk_templates):
657 """Checks whether the given file storage directory is acceptable.
659 @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
662 CheckFileBasedStoragePathVsEnabledDiskTemplates(
663 logging_warn_fn, file_storage_dir, enabled_disk_templates,
667 def CheckSharedFileStoragePathVsEnabledDiskTemplates(
668 logging_warn_fn, file_storage_dir, enabled_disk_templates):
669 """Checks whether the given shared file storage directory is acceptable.
671 @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
674 CheckFileBasedStoragePathVsEnabledDiskTemplates(
675 logging_warn_fn, file_storage_dir, enabled_disk_templates,
676 constants.DT_SHARED_FILE)
679 class LUClusterSetParams(LogicalUnit):
680 """Change the parameters of the cluster.
683 HPATH = "cluster-modify"
684 HTYPE = constants.HTYPE_CLUSTER
687 def CheckArguments(self):
692 uidpool.CheckUidPool(self.op.uid_pool)
695 uidpool.CheckUidPool(self.op.add_uids)
697 if self.op.remove_uids:
698 uidpool.CheckUidPool(self.op.remove_uids)
700 if self.op.master_netmask is not None:
701 _ValidateNetmask(self.cfg, self.op.master_netmask)
703 if self.op.diskparams:
704 for dt_params in self.op.diskparams.values():
705 utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
707 utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
708 CheckDiskAccessModeValidity(self.op.diskparams)
709 except errors.OpPrereqError, err:
710 raise errors.OpPrereqError("While verify diskparams options: %s" % err,
713 def ExpandNames(self):
714 # FIXME: in the future maybe other cluster params won't require checking on
715 # all nodes to be modified.
716 # FIXME: This opcode changes cluster-wide settings. Is acquiring all
717 # resource locks the right thing, shouldn't it be the BGL instead?
718 self.needed_locks = {
719 locking.LEVEL_NODE: locking.ALL_SET,
720 locking.LEVEL_INSTANCE: locking.ALL_SET,
721 locking.LEVEL_NODEGROUP: locking.ALL_SET,
722 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
724 self.share_locks = ShareAll()
726 def BuildHooksEnv(self):
731 "OP_TARGET": self.cfg.GetClusterName(),
732 "NEW_VG_NAME": self.op.vg_name,
735 def BuildHooksNodes(self):
736 """Build hooks nodes.
739 mn = self.cfg.GetMasterNode()
742 def _CheckVgName(self, node_uuids, enabled_disk_templates,
743 new_enabled_disk_templates):
744 """Check the consistency of the vg name on all nodes and in case it gets
745 unset whether there are instances still using it.
748 lvm_is_enabled = utils.IsLvmEnabled(enabled_disk_templates)
749 lvm_gets_enabled = utils.LvmGetsEnabled(enabled_disk_templates,
750 new_enabled_disk_templates)
751 current_vg_name = self.cfg.GetVGName()
753 if self.op.vg_name == '':
755 raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
756 " disk templates are or get enabled.")
758 if self.op.vg_name is None:
759 if current_vg_name is None and lvm_is_enabled:
760 raise errors.OpPrereqError("Please specify a volume group when"
761 " enabling lvm-based disk-templates.")
763 if self.op.vg_name is not None and not self.op.vg_name:
764 if self.cfg.HasAnyDiskOfType(constants.DT_PLAIN):
765 raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
766 " instances exist", errors.ECODE_INVAL)
768 if (self.op.vg_name is not None and lvm_is_enabled) or \
769 (self.cfg.GetVGName() is not None and lvm_gets_enabled):
770 self._CheckVgNameOnNodes(node_uuids)
772 def _CheckVgNameOnNodes(self, node_uuids):
773 """Check the status of the volume group on each node.
776 vglist = self.rpc.call_vg_list(node_uuids)
777 for node_uuid in node_uuids:
778 msg = vglist[node_uuid].fail_msg
781 self.LogWarning("Error while gathering data on node %s"
782 " (ignoring node): %s",
783 self.cfg.GetNodeName(node_uuid), msg)
785 vgstatus = utils.CheckVolumeGroupSize(vglist[node_uuid].payload,
787 constants.MIN_VG_SIZE)
789 raise errors.OpPrereqError("Error on node '%s': %s" %
790 (self.cfg.GetNodeName(node_uuid), vgstatus),
791 errors.ECODE_ENVIRON)
794 def _GetEnabledDiskTemplatesInner(op_enabled_disk_templates,
795 old_enabled_disk_templates):
796 """Determines the enabled disk templates and the subset of disk templates
797 that are newly enabled by this operation.
800 enabled_disk_templates = None
801 new_enabled_disk_templates = []
802 if op_enabled_disk_templates:
803 enabled_disk_templates = op_enabled_disk_templates
804 new_enabled_disk_templates = \
805 list(set(enabled_disk_templates)
806 - set(old_enabled_disk_templates))
808 enabled_disk_templates = old_enabled_disk_templates
809 return (enabled_disk_templates, new_enabled_disk_templates)
811 def _GetEnabledDiskTemplates(self, cluster):
812 """Determines the enabled disk templates and the subset of disk templates
813 that are newly enabled by this operation.
816 return self._GetEnabledDiskTemplatesInner(self.op.enabled_disk_templates,
817 cluster.enabled_disk_templates)
819 def _CheckIpolicy(self, cluster, enabled_disk_templates):
820 """Checks the ipolicy.
822 @type cluster: C{objects.Cluster}
823 @param cluster: the cluster's configuration
824 @type enabled_disk_templates: list of string
825 @param enabled_disk_templates: list of (possibly newly) enabled disk
829 # FIXME: write unit tests for this
831 self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
834 CheckIpolicyVsDiskTemplates(self.new_ipolicy,
835 enabled_disk_templates)
837 all_instances = self.cfg.GetAllInstancesInfo().values()
839 for group in self.cfg.GetAllNodeGroupsInfo().values():
840 instances = frozenset([inst for inst in all_instances
841 if compat.any(nuuid in group.members
842 for nuuid in inst.all_nodes)])
843 new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
844 ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
845 new = ComputeNewInstanceViolations(ipol, new_ipolicy, instances,
848 violations.update(new)
851 self.LogWarning("After the ipolicy change the following instances"
853 utils.CommaJoin(utils.NiceSort(violations)))
855 CheckIpolicyVsDiskTemplates(cluster.ipolicy,
856 enabled_disk_templates)
858 def _CheckDrbdHelperOnNodes(self, drbd_helper, node_uuids):
859 """Checks whether the set DRBD helper actually exists on the nodes.
861 @type drbd_helper: string
862 @param drbd_helper: path of the drbd usermode helper binary
863 @type node_uuids: list of strings
864 @param node_uuids: list of node UUIDs to check for the helper
867 # checks given drbd helper on all nodes
868 helpers = self.rpc.call_drbd_helper(node_uuids)
869 for (_, ninfo) in self.cfg.GetMultiNodeInfo(node_uuids):
871 self.LogInfo("Not checking drbd helper on offline node %s",
874 msg = helpers[ninfo.uuid].fail_msg
876 raise errors.OpPrereqError("Error checking drbd helper on node"
877 " '%s': %s" % (ninfo.name, msg),
878 errors.ECODE_ENVIRON)
879 node_helper = helpers[ninfo.uuid].payload
880 if node_helper != drbd_helper:
881 raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
882 (ninfo.name, node_helper),
883 errors.ECODE_ENVIRON)
885 def _CheckDrbdHelper(self, node_uuids, drbd_enabled, drbd_gets_enabled):
886 """Check the DRBD usermode helper.
888 @type node_uuids: list of strings
889 @param node_uuids: a list of nodes' UUIDs
890 @type drbd_enabled: boolean
891 @param drbd_enabled: whether DRBD will be enabled after this operation
892 (no matter if it was disabled before or not)
893 @type drbd_gets_enabled: boolen
894 @param drbd_gets_enabled: true if DRBD was disabled before this
895 operation, but will be enabled afterwards
898 if self.op.drbd_helper == '':
900 raise errors.OpPrereqError("Cannot disable drbd helper while"
902 if self.cfg.HasAnyDiskOfType(constants.DT_DRBD8):
903 raise errors.OpPrereqError("Cannot disable drbd helper while"
904 " drbd-based instances exist",
908 if self.op.drbd_helper is not None and drbd_enabled:
909 self._CheckDrbdHelperOnNodes(self.op.drbd_helper, node_uuids)
911 if drbd_gets_enabled:
912 current_drbd_helper = self.cfg.GetClusterInfo().drbd_usermode_helper
913 if current_drbd_helper is not None:
914 self._CheckDrbdHelperOnNodes(current_drbd_helper, node_uuids)
916 raise errors.OpPrereqError("Cannot enable DRBD without a"
917 " DRBD usermode helper set.")
919 def CheckPrereq(self):
920 """Check prerequisites.
922 This checks whether the given params don't conflict and
923 if the given volume group is valid.
926 node_uuids = self.owned_locks(locking.LEVEL_NODE)
927 self.cluster = cluster = self.cfg.GetClusterInfo()
929 vm_capable_node_uuids = [node.uuid
930 for node in self.cfg.GetAllNodesInfo().values()
931 if node.uuid in node_uuids and node.vm_capable]
933 (enabled_disk_templates, new_enabled_disk_templates) = \
934 self._GetEnabledDiskTemplates(cluster)
936 self._CheckVgName(vm_capable_node_uuids, enabled_disk_templates,
937 new_enabled_disk_templates)
939 if self.op.file_storage_dir is not None:
940 CheckFileStoragePathVsEnabledDiskTemplates(
941 self.LogWarning, self.op.file_storage_dir, enabled_disk_templates)
943 if self.op.shared_file_storage_dir is not None:
944 CheckSharedFileStoragePathVsEnabledDiskTemplates(
945 self.LogWarning, self.op.shared_file_storage_dir,
946 enabled_disk_templates)
948 drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
949 drbd_gets_enabled = constants.DT_DRBD8 in new_enabled_disk_templates
950 self._CheckDrbdHelper(node_uuids, drbd_enabled, drbd_gets_enabled)
952 # validate params changes
954 objects.UpgradeBeParams(self.op.beparams)
955 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
956 self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
959 utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
960 self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
962 # TODO: we need a more general way to handle resetting
963 # cluster-level parameters to default values
964 if self.new_ndparams["oob_program"] == "":
965 self.new_ndparams["oob_program"] = \
966 constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
969 new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
970 self.cluster.hv_state_static)
971 self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
972 for hv, values in new_hv_state.items())
974 if self.op.disk_state:
975 new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
976 self.cluster.disk_state_static)
977 self.new_disk_state = \
978 dict((storage, dict((name, cluster.SimpleFillDiskState(values))
979 for name, values in svalues.items()))
980 for storage, svalues in new_disk_state.items())
982 self._CheckIpolicy(cluster, enabled_disk_templates)
984 if self.op.nicparams:
985 utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
986 self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
987 objects.NIC.CheckParameterSyntax(self.new_nicparams)
990 # check all instances for consistency
991 for instance in self.cfg.GetAllInstancesInfo().values():
992 for nic_idx, nic in enumerate(instance.nics):
993 params_copy = copy.deepcopy(nic.nicparams)
994 params_filled = objects.FillDict(self.new_nicparams, params_copy)
996 # check parameter syntax
998 objects.NIC.CheckParameterSyntax(params_filled)
999 except errors.ConfigurationError, err:
1000 nic_errors.append("Instance %s, nic/%d: %s" %
1001 (instance.name, nic_idx, err))
1003 # if we're moving instances to routed, check that they have an ip
1004 target_mode = params_filled[constants.NIC_MODE]
1005 if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
1006 nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
1007 " address" % (instance.name, nic_idx))
1009 raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
1010 "\n".join(nic_errors), errors.ECODE_INVAL)
1012 # hypervisor list/parameters
1013 self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
1014 if self.op.hvparams:
1015 for hv_name, hv_dict in self.op.hvparams.items():
1016 if hv_name not in self.new_hvparams:
1017 self.new_hvparams[hv_name] = hv_dict
1019 self.new_hvparams[hv_name].update(hv_dict)
1021 # disk template parameters
1022 self.new_diskparams = objects.FillDict(cluster.diskparams, {})
1023 if self.op.diskparams:
1024 for dt_name, dt_params in self.op.diskparams.items():
1025 if dt_name not in self.new_diskparams:
1026 self.new_diskparams[dt_name] = dt_params
1028 self.new_diskparams[dt_name].update(dt_params)
1029 CheckDiskAccessModeConsistency(self.op.diskparams, self.cfg)
1031 # os hypervisor parameters
1032 self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
1034 for os_name, hvs in self.op.os_hvp.items():
1035 if os_name not in self.new_os_hvp:
1036 self.new_os_hvp[os_name] = hvs
1038 for hv_name, hv_dict in hvs.items():
1040 # Delete if it exists
1041 self.new_os_hvp[os_name].pop(hv_name, None)
1042 elif hv_name not in self.new_os_hvp[os_name]:
1043 self.new_os_hvp[os_name][hv_name] = hv_dict
1045 self.new_os_hvp[os_name][hv_name].update(hv_dict)
1048 self.new_osp = objects.FillDict(cluster.osparams, {})
1049 if self.op.osparams:
1050 for os_name, osp in self.op.osparams.items():
1051 if os_name not in self.new_osp:
1052 self.new_osp[os_name] = {}
1054 self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
1057 if not self.new_osp[os_name]:
1058 # we removed all parameters
1059 del self.new_osp[os_name]
1061 # check the parameter validity (remote check)
1062 CheckOSParams(self, False, [self.cfg.GetMasterNode()],
1063 os_name, self.new_osp[os_name])
1065 # changes to the hypervisor list
1066 if self.op.enabled_hypervisors is not None:
1067 self.hv_list = self.op.enabled_hypervisors
1068 for hv in self.hv_list:
1069 # if the hypervisor doesn't already exist in the cluster
1070 # hvparams, we initialize it to empty, and then (in both
1071 # cases) we make sure to fill the defaults, as we might not
1072 # have a complete defaults list if the hypervisor wasn't
1074 if hv not in new_hvp:
1076 new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
1077 utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
1079 self.hv_list = cluster.enabled_hypervisors
1081 if self.op.hvparams or self.op.enabled_hypervisors is not None:
1082 # either the enabled list has changed, or the parameters have, validate
1083 for hv_name, hv_params in self.new_hvparams.items():
1084 if ((self.op.hvparams and hv_name in self.op.hvparams) or
1085 (self.op.enabled_hypervisors and
1086 hv_name in self.op.enabled_hypervisors)):
1087 # either this is a new hypervisor, or its parameters have changed
1088 hv_class = hypervisor.GetHypervisorClass(hv_name)
1089 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1090 hv_class.CheckParameterSyntax(hv_params)
1091 CheckHVParams(self, node_uuids, hv_name, hv_params)
1093 self._CheckDiskTemplateConsistency()
1096 # no need to check any newly-enabled hypervisors, since the
1097 # defaults have already been checked in the above code-block
1098 for os_name, os_hvp in self.new_os_hvp.items():
1099 for hv_name, hv_params in os_hvp.items():
1100 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1101 # we need to fill in the new os_hvp on top of the actual hv_p
1102 cluster_defaults = self.new_hvparams.get(hv_name, {})
1103 new_osp = objects.FillDict(cluster_defaults, hv_params)
1104 hv_class = hypervisor.GetHypervisorClass(hv_name)
1105 hv_class.CheckParameterSyntax(new_osp)
1106 CheckHVParams(self, node_uuids, hv_name, new_osp)
1108 if self.op.default_iallocator:
1109 alloc_script = utils.FindFile(self.op.default_iallocator,
1110 constants.IALLOCATOR_SEARCH_PATH,
1112 if alloc_script is None:
1113 raise errors.OpPrereqError("Invalid default iallocator script '%s'"
1114 " specified" % self.op.default_iallocator,
1117 def _CheckDiskTemplateConsistency(self):
1118 """Check whether the disk templates that are going to be disabled
1119 are still in use by some instances.
1122 if self.op.enabled_disk_templates:
1123 cluster = self.cfg.GetClusterInfo()
1124 instances = self.cfg.GetAllInstancesInfo()
1126 disk_templates_to_remove = set(cluster.enabled_disk_templates) \
1127 - set(self.op.enabled_disk_templates)
1128 for instance in instances.itervalues():
1129 if instance.disk_template in disk_templates_to_remove:
1130 raise errors.OpPrereqError("Cannot disable disk template '%s',"
1131 " because instance '%s' is using it." %
1132 (instance.disk_template, instance.name))
1134 def _SetVgName(self, feedback_fn):
1135 """Determines and sets the new volume group name.
1138 if self.op.vg_name is not None:
1139 new_volume = self.op.vg_name
1142 if new_volume != self.cfg.GetVGName():
1143 self.cfg.SetVGName(new_volume)
1145 feedback_fn("Cluster LVM configuration already in desired"
1146 " state, not changing")
1148 def _SetFileStorageDir(self, feedback_fn):
1149 """Set the file storage directory.
1152 if self.op.file_storage_dir is not None:
1153 if self.cluster.file_storage_dir == self.op.file_storage_dir:
1154 feedback_fn("Global file storage dir already set to value '%s'"
1155 % self.cluster.file_storage_dir)
1157 self.cluster.file_storage_dir = self.op.file_storage_dir
1159 def _SetDrbdHelper(self, feedback_fn):
1160 """Set the DRBD usermode helper.
1163 if self.op.drbd_helper is not None:
1164 if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
1165 feedback_fn("Note that you specified a drbd user helper, but did not"
1166 " enable the drbd disk template.")
1167 new_helper = self.op.drbd_helper
1170 if new_helper != self.cfg.GetDRBDHelper():
1171 self.cfg.SetDRBDHelper(new_helper)
1173 feedback_fn("Cluster DRBD helper already in desired state,"
1176 def Exec(self, feedback_fn):
1177 """Change the parameters of the cluster.
1180 if self.op.enabled_disk_templates:
1181 self.cluster.enabled_disk_templates = \
1182 list(set(self.op.enabled_disk_templates))
1184 self._SetVgName(feedback_fn)
1185 self._SetFileStorageDir(feedback_fn)
1186 self._SetDrbdHelper(feedback_fn)
1188 if self.op.hvparams:
1189 self.cluster.hvparams = self.new_hvparams
1191 self.cluster.os_hvp = self.new_os_hvp
1192 if self.op.enabled_hypervisors is not None:
1193 self.cluster.hvparams = self.new_hvparams
1194 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1195 if self.op.beparams:
1196 self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1197 if self.op.nicparams:
1198 self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1200 self.cluster.ipolicy = self.new_ipolicy
1201 if self.op.osparams:
1202 self.cluster.osparams = self.new_osp
1203 if self.op.ndparams:
1204 self.cluster.ndparams = self.new_ndparams
1205 if self.op.diskparams:
1206 self.cluster.diskparams = self.new_diskparams
1207 if self.op.hv_state:
1208 self.cluster.hv_state_static = self.new_hv_state
1209 if self.op.disk_state:
1210 self.cluster.disk_state_static = self.new_disk_state
1212 if self.op.candidate_pool_size is not None:
1213 self.cluster.candidate_pool_size = self.op.candidate_pool_size
1214 # we need to update the pool size here, otherwise the save will fail
1215 AdjustCandidatePool(self, [])
1217 if self.op.maintain_node_health is not None:
1218 if self.op.maintain_node_health and not constants.ENABLE_CONFD:
1219 feedback_fn("Note: CONFD was disabled at build time, node health"
1220 " maintenance is not useful (still enabling it)")
1221 self.cluster.maintain_node_health = self.op.maintain_node_health
1223 if self.op.modify_etc_hosts is not None:
1224 self.cluster.modify_etc_hosts = self.op.modify_etc_hosts
1226 if self.op.prealloc_wipe_disks is not None:
1227 self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
1229 if self.op.add_uids is not None:
1230 uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
1232 if self.op.remove_uids is not None:
1233 uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
1235 if self.op.uid_pool is not None:
1236 self.cluster.uid_pool = self.op.uid_pool
1238 if self.op.default_iallocator is not None:
1239 self.cluster.default_iallocator = self.op.default_iallocator
1241 if self.op.reserved_lvs is not None:
1242 self.cluster.reserved_lvs = self.op.reserved_lvs
1244 if self.op.use_external_mip_script is not None:
1245 self.cluster.use_external_mip_script = self.op.use_external_mip_script
1247 def helper_os(aname, mods, desc):
1249 lst = getattr(self.cluster, aname)
1250 for key, val in mods:
1251 if key == constants.DDM_ADD:
1253 feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1256 elif key == constants.DDM_REMOVE:
1260 feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1262 raise errors.ProgrammerError("Invalid modification '%s'" % key)
1264 if self.op.hidden_os:
1265 helper_os("hidden_os", self.op.hidden_os, "hidden")
1267 if self.op.blacklisted_os:
1268 helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1270 if self.op.master_netdev:
1271 master_params = self.cfg.GetMasterNetworkParameters()
1272 ems = self.cfg.GetUseExternalMipScript()
1273 feedback_fn("Shutting down master ip on the current netdev (%s)" %
1274 self.cluster.master_netdev)
1275 result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
1277 if not self.op.force:
1278 result.Raise("Could not disable the master ip")
1281 msg = ("Could not disable the master ip (continuing anyway): %s" %
1284 feedback_fn("Changing master_netdev from %s to %s" %
1285 (master_params.netdev, self.op.master_netdev))
1286 self.cluster.master_netdev = self.op.master_netdev
1288 if self.op.master_netmask:
1289 master_params = self.cfg.GetMasterNetworkParameters()
1290 feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1291 result = self.rpc.call_node_change_master_netmask(
1292 master_params.uuid, master_params.netmask,
1293 self.op.master_netmask, master_params.ip,
1294 master_params.netdev)
1295 result.Warn("Could not change the master IP netmask", feedback_fn)
1296 self.cluster.master_netmask = self.op.master_netmask
1298 self.cfg.Update(self.cluster, feedback_fn)
1300 if self.op.master_netdev:
1301 master_params = self.cfg.GetMasterNetworkParameters()
1302 feedback_fn("Starting the master ip on the new master netdev (%s)" %
1303 self.op.master_netdev)
1304 ems = self.cfg.GetUseExternalMipScript()
1305 result = self.rpc.call_node_activate_master_ip(master_params.uuid,
1307 result.Warn("Could not re-enable the master ip on the master,"
1308 " please restart manually", self.LogWarning)
1311 class LUClusterVerify(NoHooksLU):
1312 """Submits all jobs necessary to verify the cluster.
1317 def ExpandNames(self):
1318 self.needed_locks = {}
1320 def Exec(self, feedback_fn):
1323 if self.op.group_name:
1324 groups = [self.op.group_name]
1325 depends_fn = lambda: None
1327 groups = self.cfg.GetNodeGroupList()
1329 # Verify global configuration
1331 opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1334 # Always depend on global verification
1335 depends_fn = lambda: [(-len(jobs), [])]
1338 [opcodes.OpClusterVerifyGroup(group_name=group,
1339 ignore_errors=self.op.ignore_errors,
1340 depends=depends_fn())]
1341 for group in groups)
1343 # Fix up all parameters
1344 for op in itertools.chain(*jobs): # pylint: disable=W0142
1345 op.debug_simulate_errors = self.op.debug_simulate_errors
1346 op.verbose = self.op.verbose
1347 op.error_codes = self.op.error_codes
1349 op.skip_checks = self.op.skip_checks
1350 except AttributeError:
1351 assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1353 return ResultWithJobs(jobs)
1356 class _VerifyErrors(object):
1357 """Mix-in for cluster/group verify LUs.
1359 It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1360 self.op and self._feedback_fn to be available.)
1364 ETYPE_FIELD = "code"
1365 ETYPE_ERROR = "ERROR"
1366 ETYPE_WARNING = "WARNING"
1368 def _Error(self, ecode, item, msg, *args, **kwargs):
1369 """Format an error message.
1371 Based on the opcode's error_codes parameter, either format a
1372 parseable error code, or a simpler error string.
1374 This must be called only from Exec and functions called from Exec.
1377 ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1378 itype, etxt, _ = ecode
1379 # If the error code is in the list of ignored errors, demote the error to a
1381 if etxt in self.op.ignore_errors: # pylint: disable=E1101
1382 ltype = self.ETYPE_WARNING
1383 # first complete the msg
1386 # then format the whole message
1387 if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1388 msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1394 msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1395 # and finally report it via the feedback_fn
1396 self._feedback_fn(" - %s" % msg) # Mix-in. pylint: disable=E1101
1397 # do not mark the operation as failed for WARN cases only
1398 if ltype == self.ETYPE_ERROR:
1401 def _ErrorIf(self, cond, *args, **kwargs):
1402 """Log an error message if the passed condition is True.
1406 or self.op.debug_simulate_errors): # pylint: disable=E1101
1407 self._Error(*args, **kwargs)
1410 def _VerifyCertificate(filename):
1411 """Verifies a certificate for L{LUClusterVerifyConfig}.
1413 @type filename: string
1414 @param filename: Path to PEM file
1418 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1419 utils.ReadFile(filename))
1420 except Exception, err: # pylint: disable=W0703
1421 return (LUClusterVerifyConfig.ETYPE_ERROR,
1422 "Failed to load X509 certificate %s: %s" % (filename, err))
1425 utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1426 constants.SSL_CERT_EXPIRATION_ERROR)
1429 fnamemsg = "While verifying %s: %s" % (filename, msg)
1434 return (None, fnamemsg)
1435 elif errcode == utils.CERT_WARNING:
1436 return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1437 elif errcode == utils.CERT_ERROR:
1438 return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1440 raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1443 def _GetAllHypervisorParameters(cluster, instances):
1444 """Compute the set of all hypervisor parameters.
1446 @type cluster: L{objects.Cluster}
1447 @param cluster: the cluster object
1448 @param instances: list of L{objects.Instance}
1449 @param instances: additional instances from which to obtain parameters
1450 @rtype: list of (origin, hypervisor, parameters)
1451 @return: a list with all parameters found, indicating the hypervisor they
1452 apply to, and the origin (can be "cluster", "os X", or "instance Y")
1457 for hv_name in cluster.enabled_hypervisors:
1458 hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1460 for os_name, os_hvp in cluster.os_hvp.items():
1461 for hv_name, hv_params in os_hvp.items():
1463 full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1464 hvp_data.append(("os %s" % os_name, hv_name, full_params))
1466 # TODO: collapse identical parameter values in a single one
1467 for instance in instances:
1468 if instance.hvparams:
1469 hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1470 cluster.FillHV(instance)))
1475 class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1476 """Verifies the cluster config.
1481 def _VerifyHVP(self, hvp_data):
1482 """Verifies locally the syntax of the hypervisor parameters.
1485 for item, hv_name, hv_params in hvp_data:
1486 msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1489 hv_class = hypervisor.GetHypervisorClass(hv_name)
1490 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1491 hv_class.CheckParameterSyntax(hv_params)
1492 except errors.GenericError, err:
1493 self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1495 def ExpandNames(self):
1496 self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1497 self.share_locks = ShareAll()
1499 def CheckPrereq(self):
1500 """Check prerequisites.
1503 # Retrieve all information
1504 self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1505 self.all_node_info = self.cfg.GetAllNodesInfo()
1506 self.all_inst_info = self.cfg.GetAllInstancesInfo()
1508 def Exec(self, feedback_fn):
1509 """Verify integrity of cluster, performing various test on nodes.
1513 self._feedback_fn = feedback_fn
1515 feedback_fn("* Verifying cluster config")
1517 for msg in self.cfg.VerifyConfig():
1518 self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1520 feedback_fn("* Verifying cluster certificate files")
1522 for cert_filename in pathutils.ALL_CERT_FILES:
1523 (errcode, msg) = _VerifyCertificate(cert_filename)
1524 self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1526 self._ErrorIf(not utils.CanRead(constants.LUXID_USER,
1527 pathutils.NODED_CERT_FILE),
1528 constants.CV_ECLUSTERCERT,
1530 pathutils.NODED_CERT_FILE + " must be accessible by the " +
1531 constants.LUXID_USER + " user")
1533 feedback_fn("* Verifying hypervisor parameters")
1535 self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1536 self.all_inst_info.values()))
1538 feedback_fn("* Verifying all nodes belong to an existing group")
1540 # We do this verification here because, should this bogus circumstance
1541 # occur, it would never be caught by VerifyGroup, which only acts on
1542 # nodes/instances reachable from existing node groups.
1544 dangling_nodes = set(node for node in self.all_node_info.values()
1545 if node.group not in self.all_group_info)
1547 dangling_instances = {}
1548 no_node_instances = []
1550 for inst in self.all_inst_info.values():
1551 if inst.primary_node in [node.uuid for node in dangling_nodes]:
1552 dangling_instances.setdefault(inst.primary_node, []).append(inst)
1553 elif inst.primary_node not in self.all_node_info:
1554 no_node_instances.append(inst)
1559 utils.CommaJoin(inst.name for
1560 inst in dangling_instances.get(node.uuid, [])))
1561 for node in dangling_nodes]
1563 self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1565 "the following nodes (and their instances) belong to a non"
1566 " existing group: %s", utils.CommaJoin(pretty_dangling))
1568 self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1570 "the following instances have a non-existing primary-node:"
1571 " %s", utils.CommaJoin(inst.name for
1572 inst in no_node_instances))
1577 class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1578 """Verifies the status of a node group.
1581 HPATH = "cluster-verify"
1582 HTYPE = constants.HTYPE_CLUSTER
1585 _HOOKS_INDENT_RE = re.compile("^", re.M)
1587 class NodeImage(object):
1588 """A class representing the logical and physical status of a node.
1591 @ivar uuid: the node UUID to which this object refers
1592 @ivar volumes: a structure as returned from
1593 L{ganeti.backend.GetVolumeList} (runtime)
1594 @ivar instances: a list of running instances (runtime)
1595 @ivar pinst: list of configured primary instances (config)
1596 @ivar sinst: list of configured secondary instances (config)
1597 @ivar sbp: dictionary of {primary-node: list of instances} for all
1598 instances for which this node is secondary (config)
1599 @ivar mfree: free memory, as reported by hypervisor (runtime)
1600 @ivar dfree: free disk, as reported by the node (runtime)
1601 @ivar offline: the offline status (config)
1602 @type rpc_fail: boolean
1603 @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1604 not whether the individual keys were correct) (runtime)
1605 @type lvm_fail: boolean
1606 @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1607 @type hyp_fail: boolean
1608 @ivar hyp_fail: whether the RPC call didn't return the instance list
1609 @type ghost: boolean
1610 @ivar ghost: whether this is a known node or not (config)
1611 @type os_fail: boolean
1612 @ivar os_fail: whether the RPC call didn't return valid OS data
1614 @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1615 @type vm_capable: boolean
1616 @ivar vm_capable: whether the node can host instances
1618 @ivar pv_min: size in MiB of the smallest PVs
1620 @ivar pv_max: size in MiB of the biggest PVs
1623 def __init__(self, offline=False, uuid=None, vm_capable=True):
1632 self.offline = offline
1633 self.vm_capable = vm_capable
1634 self.rpc_fail = False
1635 self.lvm_fail = False
1636 self.hyp_fail = False
1638 self.os_fail = False
1643 def ExpandNames(self):
1644 # This raises errors.OpPrereqError on its own:
1645 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1647 # Get instances in node group; this is unsafe and needs verification later
1649 self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1651 self.needed_locks = {
1652 locking.LEVEL_INSTANCE: self.cfg.GetInstanceNames(inst_uuids),
1653 locking.LEVEL_NODEGROUP: [self.group_uuid],
1654 locking.LEVEL_NODE: [],
1656 # This opcode is run by watcher every five minutes and acquires all nodes
1657 # for a group. It doesn't run for a long time, so it's better to acquire
1658 # the node allocation lock as well.
1659 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1662 self.share_locks = ShareAll()
1664 def DeclareLocks(self, level):
1665 if level == locking.LEVEL_NODE:
1666 # Get members of node group; this is unsafe and needs verification later
1667 nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1669 # In Exec(), we warn about mirrored instances that have primary and
1670 # secondary living in separate node groups. To fully verify that
1671 # volumes for these instances are healthy, we will need to do an
1672 # extra call to their secondaries. We ensure here those nodes will
1674 for inst_name in self.owned_locks(locking.LEVEL_INSTANCE):
1675 # Important: access only the instances whose lock is owned
1676 instance = self.cfg.GetInstanceInfoByName(inst_name)
1677 if instance.disk_template in constants.DTS_INT_MIRROR:
1678 nodes.update(instance.secondary_nodes)
1680 self.needed_locks[locking.LEVEL_NODE] = nodes
1682 def CheckPrereq(self):
1683 assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1684 self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1686 group_node_uuids = set(self.group_info.members)
1687 group_inst_uuids = \
1688 self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1690 unlocked_node_uuids = \
1691 group_node_uuids.difference(self.owned_locks(locking.LEVEL_NODE))
1693 unlocked_inst_uuids = \
1694 group_inst_uuids.difference(
1695 [self.cfg.GetInstanceInfoByName(name).uuid
1696 for name in self.owned_locks(locking.LEVEL_INSTANCE)])
1698 if unlocked_node_uuids:
1699 raise errors.OpPrereqError(
1700 "Missing lock for nodes: %s" %
1701 utils.CommaJoin(self.cfg.GetNodeNames(unlocked_node_uuids)),
1704 if unlocked_inst_uuids:
1705 raise errors.OpPrereqError(
1706 "Missing lock for instances: %s" %
1707 utils.CommaJoin(self.cfg.GetInstanceNames(unlocked_inst_uuids)),
1710 self.all_node_info = self.cfg.GetAllNodesInfo()
1711 self.all_inst_info = self.cfg.GetAllInstancesInfo()
1713 self.my_node_uuids = group_node_uuids
1714 self.my_node_info = dict((node_uuid, self.all_node_info[node_uuid])
1715 for node_uuid in group_node_uuids)
1717 self.my_inst_uuids = group_inst_uuids
1718 self.my_inst_info = dict((inst_uuid, self.all_inst_info[inst_uuid])
1719 for inst_uuid in group_inst_uuids)
1721 # We detect here the nodes that will need the extra RPC calls for verifying
1722 # split LV volumes; they should be locked.
1723 extra_lv_nodes = set()
1725 for inst in self.my_inst_info.values():
1726 if inst.disk_template in constants.DTS_INT_MIRROR:
1727 for nuuid in inst.all_nodes:
1728 if self.all_node_info[nuuid].group != self.group_uuid:
1729 extra_lv_nodes.add(nuuid)
1731 unlocked_lv_nodes = \
1732 extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1734 if unlocked_lv_nodes:
1735 raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1736 utils.CommaJoin(unlocked_lv_nodes),
1738 self.extra_lv_nodes = list(extra_lv_nodes)
1740 def _VerifyNode(self, ninfo, nresult):
1741 """Perform some basic validation on data returned from a node.
1743 - check the result data structure is well formed and has all the
1745 - check ganeti version
1747 @type ninfo: L{objects.Node}
1748 @param ninfo: the node to check
1749 @param nresult: the results from the node
1751 @return: whether overall this call was successful (and we can expect
1752 reasonable values in the respose)
1755 # main result, nresult should be a non-empty dict
1756 test = not nresult or not isinstance(nresult, dict)
1757 self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1758 "unable to verify node: no data returned")
1762 # compares ganeti version
1763 local_version = constants.PROTOCOL_VERSION
1764 remote_version = nresult.get("version", None)
1765 test = not (remote_version and
1766 isinstance(remote_version, (list, tuple)) and
1767 len(remote_version) == 2)
1768 self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1769 "connection to node returned invalid data")
1773 test = local_version != remote_version[0]
1774 self._ErrorIf(test, constants.CV_ENODEVERSION, ninfo.name,
1775 "incompatible protocol versions: master %s,"
1776 " node %s", local_version, remote_version[0])
1780 # node seems compatible, we can actually try to look into its results
1782 # full package version
1783 self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1784 constants.CV_ENODEVERSION, ninfo.name,
1785 "software version mismatch: master %s, node %s",
1786 constants.RELEASE_VERSION, remote_version[1],
1787 code=self.ETYPE_WARNING)
1789 hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1790 if ninfo.vm_capable and isinstance(hyp_result, dict):
1791 for hv_name, hv_result in hyp_result.iteritems():
1792 test = hv_result is not None
1793 self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
1794 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1796 hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1797 if ninfo.vm_capable and isinstance(hvp_result, list):
1798 for item, hv_name, hv_result in hvp_result:
1799 self._ErrorIf(True, constants.CV_ENODEHV, ninfo.name,
1800 "hypervisor %s parameter verify failure (source %s): %s",
1801 hv_name, item, hv_result)
1803 test = nresult.get(constants.NV_NODESETUP,
1804 ["Missing NODESETUP results"])
1805 self._ErrorIf(test, constants.CV_ENODESETUP, ninfo.name,
1806 "node setup error: %s", "; ".join(test))
1810 def _VerifyNodeTime(self, ninfo, nresult,
1811 nvinfo_starttime, nvinfo_endtime):
1812 """Check the node time.
1814 @type ninfo: L{objects.Node}
1815 @param ninfo: the node to check
1816 @param nresult: the remote results for the node
1817 @param nvinfo_starttime: the start time of the RPC call
1818 @param nvinfo_endtime: the end time of the RPC call
1821 ntime = nresult.get(constants.NV_TIME, None)
1823 ntime_merged = utils.MergeTime(ntime)
1824 except (ValueError, TypeError):
1825 self._ErrorIf(True, constants.CV_ENODETIME, ninfo.name,
1826 "Node returned invalid time")
1829 if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1830 ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1831 elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1832 ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1836 self._ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, ninfo.name,
1837 "Node time diverges by at least %s from master node time",
1840 def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1841 """Check the node LVM results and update info for cross-node checks.
1843 @type ninfo: L{objects.Node}
1844 @param ninfo: the node to check
1845 @param nresult: the remote results for the node
1846 @param vg_name: the configured VG name
1847 @type nimg: L{NodeImage}
1848 @param nimg: node image
1854 # checks vg existence and size > 20G
1855 vglist = nresult.get(constants.NV_VGLIST, None)
1857 self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
1858 "unable to check volume groups")
1860 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1861 constants.MIN_VG_SIZE)
1862 self._ErrorIf(vgstatus, constants.CV_ENODELVM, ninfo.name, vgstatus)
1865 (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1867 self._Error(constants.CV_ENODELVM, ninfo.name, em)
1868 if pvminmax is not None:
1869 (nimg.pv_min, nimg.pv_max) = pvminmax
1871 def _VerifyGroupDRBDVersion(self, node_verify_infos):
1872 """Check cross-node DRBD version consistency.
1874 @type node_verify_infos: dict
1875 @param node_verify_infos: infos about nodes as returned from the
1880 for node_uuid, ndata in node_verify_infos.items():
1881 nresult = ndata.payload
1882 version = nresult.get(constants.NV_DRBDVERSION, "Missing DRBD version")
1883 node_versions[node_uuid] = version
1885 if len(set(node_versions.values())) > 1:
1886 for node_uuid, version in sorted(node_versions.items()):
1887 msg = "DRBD version mismatch: %s" % version
1888 self._Error(constants.CV_ENODEDRBDHELPER, node_uuid, msg,
1889 code=self.ETYPE_WARNING)
1891 def _VerifyGroupLVM(self, node_image, vg_name):
1892 """Check cross-node consistency in LVM.
1894 @type node_image: dict
1895 @param node_image: info about nodes, mapping from node to names to
1896 L{NodeImage} objects
1897 @param vg_name: the configured VG name
1903 # Only exclusive storage needs this kind of checks
1904 if not self._exclusive_storage:
1907 # exclusive_storage wants all PVs to have the same size (approximately),
1908 # if the smallest and the biggest ones are okay, everything is fine.
1909 # pv_min is None iff pv_max is None
1910 vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1913 (pvmin, minnode_uuid) = min((ni.pv_min, ni.uuid) for ni in vals)
1914 (pvmax, maxnode_uuid) = max((ni.pv_max, ni.uuid) for ni in vals)
1915 bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1916 self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1917 "PV sizes differ too much in the group; smallest (%s MB) is"
1918 " on %s, biggest (%s MB) is on %s",
1919 pvmin, self.cfg.GetNodeName(minnode_uuid),
1920 pvmax, self.cfg.GetNodeName(maxnode_uuid))
1922 def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1923 """Check the node bridges.
1925 @type ninfo: L{objects.Node}
1926 @param ninfo: the node to check
1927 @param nresult: the remote results for the node
1928 @param bridges: the expected list of bridges
1934 missing = nresult.get(constants.NV_BRIDGES, None)
1935 test = not isinstance(missing, list)
1936 self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1937 "did not return valid bridge information")
1939 self._ErrorIf(bool(missing), constants.CV_ENODENET, ninfo.name,
1940 "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1942 def _VerifyNodeUserScripts(self, ninfo, nresult):
1943 """Check the results of user scripts presence and executability on the node
1945 @type ninfo: L{objects.Node}
1946 @param ninfo: the node to check
1947 @param nresult: the remote results for the node
1950 test = not constants.NV_USERSCRIPTS in nresult
1951 self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1952 "did not return user scripts information")
1954 broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
1956 self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1957 "user scripts not present or not executable: %s" %
1958 utils.CommaJoin(sorted(broken_scripts)))
1960 def _VerifyNodeNetwork(self, ninfo, nresult):
1961 """Check the node network connectivity results.
1963 @type ninfo: L{objects.Node}
1964 @param ninfo: the node to check
1965 @param nresult: the remote results for the node
1968 test = constants.NV_NODELIST not in nresult
1969 self._ErrorIf(test, constants.CV_ENODESSH, ninfo.name,
1970 "node hasn't returned node ssh connectivity data")
1972 if nresult[constants.NV_NODELIST]:
1973 for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1974 self._ErrorIf(True, constants.CV_ENODESSH, ninfo.name,
1975 "ssh communication with node '%s': %s", a_node, a_msg)
1977 test = constants.NV_NODENETTEST not in nresult
1978 self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1979 "node hasn't returned node tcp connectivity data")
1981 if nresult[constants.NV_NODENETTEST]:
1982 nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1984 self._ErrorIf(True, constants.CV_ENODENET, ninfo.name,
1985 "tcp communication with node '%s': %s",
1986 anode, nresult[constants.NV_NODENETTEST][anode])
1988 test = constants.NV_MASTERIP not in nresult
1989 self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1990 "node hasn't returned node master IP reachability data")
1992 if not nresult[constants.NV_MASTERIP]:
1993 if ninfo.uuid == self.master_node:
1994 msg = "the master node cannot reach the master IP (not configured?)"
1996 msg = "cannot reach the master IP"
1997 self._ErrorIf(True, constants.CV_ENODENET, ninfo.name, msg)
1999 def _VerifyInstance(self, instance, node_image, diskstatus):
2000 """Verify an instance.
2002 This function checks to see if the required block devices are
2003 available on the instance's node, and that the nodes are in the correct
2007 pnode_uuid = instance.primary_node
2008 pnode_img = node_image[pnode_uuid]
2009 groupinfo = self.cfg.GetAllNodeGroupsInfo()
2011 node_vol_should = {}
2012 instance.MapLVsByNode(node_vol_should)
2014 cluster = self.cfg.GetClusterInfo()
2015 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2017 err = ComputeIPolicyInstanceViolation(ipolicy, instance, self.cfg)
2018 self._ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance.name,
2019 utils.CommaJoin(err), code=self.ETYPE_WARNING)
2021 for node_uuid in node_vol_should:
2022 n_img = node_image[node_uuid]
2023 if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
2024 # ignore missing volumes on offline or broken nodes
2026 for volume in node_vol_should[node_uuid]:
2027 test = volume not in n_img.volumes
2028 self._ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance.name,
2029 "volume %s missing on node %s", volume,
2030 self.cfg.GetNodeName(node_uuid))
2032 if instance.admin_state == constants.ADMINST_UP:
2033 test = instance.uuid not in pnode_img.instances and not pnode_img.offline
2034 self._ErrorIf(test, constants.CV_EINSTANCEDOWN, instance.name,
2035 "instance not running on its primary node %s",
2036 self.cfg.GetNodeName(pnode_uuid))
2037 self._ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE,
2038 instance.name, "instance is marked as running and lives on"
2039 " offline node %s", self.cfg.GetNodeName(pnode_uuid))
2041 diskdata = [(nname, success, status, idx)
2042 for (nname, disks) in diskstatus.items()
2043 for idx, (success, status) in enumerate(disks)]
2045 for nname, success, bdev_status, idx in diskdata:
2046 # the 'ghost node' construction in Exec() ensures that we have a
2048 snode = node_image[nname]
2049 bad_snode = snode.ghost or snode.offline
2050 self._ErrorIf(instance.disks_active and
2051 not success and not bad_snode,
2052 constants.CV_EINSTANCEFAULTYDISK, instance.name,
2053 "couldn't retrieve status for disk/%s on %s: %s",
2054 idx, self.cfg.GetNodeName(nname), bdev_status)
2056 if instance.disks_active and success and \
2057 (bdev_status.is_degraded or
2058 bdev_status.ldisk_status != constants.LDS_OKAY):
2059 msg = "disk/%s on %s" % (idx, self.cfg.GetNodeName(nname))
2060 if bdev_status.is_degraded:
2061 msg += " is degraded"
2062 if bdev_status.ldisk_status != constants.LDS_OKAY:
2063 msg += "; state is '%s'" % \
2064 constants.LDS_NAMES[bdev_status.ldisk_status]
2066 self._Error(constants.CV_EINSTANCEFAULTYDISK, instance.name, msg)
2068 self._ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2069 constants.CV_ENODERPC, self.cfg.GetNodeName(pnode_uuid),
2070 "instance %s, connection to primary node failed",
2073 self._ErrorIf(len(instance.secondary_nodes) > 1,
2074 constants.CV_EINSTANCELAYOUT, instance.name,
2075 "instance has multiple secondary nodes: %s",
2076 utils.CommaJoin(instance.secondary_nodes),
2077 code=self.ETYPE_WARNING)
2079 es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, instance.all_nodes)
2080 if any(es_flags.values()):
2081 if instance.disk_template not in constants.DTS_EXCL_STORAGE:
2082 # Disk template not compatible with exclusive_storage: no instance
2083 # node should have the flag set
2085 for (n, es) in es_flags.items()
2087 self._Error(constants.CV_EINSTANCEUNSUITABLENODE, instance.name,
2088 "instance has template %s, which is not supported on nodes"
2089 " that have exclusive storage set: %s",
2090 instance.disk_template,
2091 utils.CommaJoin(self.cfg.GetNodeNames(es_nodes)))
2092 for (idx, disk) in enumerate(instance.disks):
2093 self._ErrorIf(disk.spindles is None,
2094 constants.CV_EINSTANCEMISSINGCFGPARAMETER, instance.name,
2095 "number of spindles not configured for disk %s while"
2096 " exclusive storage is enabled, try running"
2097 " gnt-cluster repair-disk-sizes", idx)
2099 if instance.disk_template in constants.DTS_INT_MIRROR:
2100 instance_nodes = utils.NiceSort(instance.all_nodes)
2101 instance_groups = {}
2103 for node_uuid in instance_nodes:
2104 instance_groups.setdefault(self.all_node_info[node_uuid].group,
2105 []).append(node_uuid)
2108 "%s (group %s)" % (utils.CommaJoin(self.cfg.GetNodeNames(nodes)),
2109 groupinfo[group].name)
2110 # Sort so that we always list the primary node first.
2111 for group, nodes in sorted(instance_groups.items(),
2112 key=lambda (_, nodes): pnode_uuid in nodes,
2115 self._ErrorIf(len(instance_groups) > 1,
2116 constants.CV_EINSTANCESPLITGROUPS,
2117 instance.name, "instance has primary and secondary nodes in"
2118 " different groups: %s", utils.CommaJoin(pretty_list),
2119 code=self.ETYPE_WARNING)
2121 inst_nodes_offline = []
2122 for snode in instance.secondary_nodes:
2123 s_img = node_image[snode]
2124 self._ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
2125 self.cfg.GetNodeName(snode),
2126 "instance %s, connection to secondary node failed",
2130 inst_nodes_offline.append(snode)
2132 # warn that the instance lives on offline nodes
2133 self._ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE,
2134 instance.name, "instance has offline secondary node(s) %s",
2135 utils.CommaJoin(self.cfg.GetNodeNames(inst_nodes_offline)))
2136 # ... or ghost/non-vm_capable nodes
2137 for node_uuid in instance.all_nodes:
2138 self._ErrorIf(node_image[node_uuid].ghost, constants.CV_EINSTANCEBADNODE,
2139 instance.name, "instance lives on ghost node %s",
2140 self.cfg.GetNodeName(node_uuid))
2141 self._ErrorIf(not node_image[node_uuid].vm_capable,
2142 constants.CV_EINSTANCEBADNODE, instance.name,
2143 "instance lives on non-vm_capable node %s",
2144 self.cfg.GetNodeName(node_uuid))
2146 def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2147 """Verify if there are any unknown volumes in the cluster.
2149 The .os, .swap and backup volumes are ignored. All other volumes are
2150 reported as unknown.
2152 @type reserved: L{ganeti.utils.FieldSet}
2153 @param reserved: a FieldSet of reserved volume names
2156 for node_uuid, n_img in node_image.items():
2157 if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
2158 self.all_node_info[node_uuid].group != self.group_uuid):
2159 # skip non-healthy nodes
2161 for volume in n_img.volumes:
2162 test = ((node_uuid not in node_vol_should or
2163 volume not in node_vol_should[node_uuid]) and
2164 not reserved.Matches(volume))
2165 self._ErrorIf(test, constants.CV_ENODEORPHANLV,
2166 self.cfg.GetNodeName(node_uuid),
2167 "volume %s is unknown", volume)
2169 def _VerifyNPlusOneMemory(self, node_image, all_insts):
2170 """Verify N+1 Memory Resilience.
2172 Check that if one single node dies we can still start all the
2173 instances it was primary for.
2176 cluster_info = self.cfg.GetClusterInfo()
2177 for node_uuid, n_img in node_image.items():
2178 # This code checks that every node which is now listed as
2179 # secondary has enough memory to host all instances it is
2180 # supposed to should a single other node in the cluster fail.
2181 # FIXME: not ready for failover to an arbitrary node
2182 # FIXME: does not support file-backed instances
2183 # WARNING: we currently take into account down instances as well
2184 # as up ones, considering that even if they're down someone
2185 # might want to start them even in the event of a node failure.
2186 if n_img.offline or \
2187 self.all_node_info[node_uuid].group != self.group_uuid:
2188 # we're skipping nodes marked offline and nodes in other groups from
2189 # the N+1 warning, since most likely we don't have good memory
2190 # information from them; we already list instances living on such
2191 # nodes, and that's enough warning
2193 #TODO(dynmem): also consider ballooning out other instances
2194 for prinode, inst_uuids in n_img.sbp.items():
2196 for inst_uuid in inst_uuids:
2197 bep = cluster_info.FillBE(all_insts[inst_uuid])
2198 if bep[constants.BE_AUTO_BALANCE]:
2199 needed_mem += bep[constants.BE_MINMEM]
2200 test = n_img.mfree < needed_mem
2201 self._ErrorIf(test, constants.CV_ENODEN1,
2202 self.cfg.GetNodeName(node_uuid),
2203 "not enough memory to accomodate instance failovers"
2204 " should node %s fail (%dMiB needed, %dMiB available)",
2205 self.cfg.GetNodeName(prinode), needed_mem, n_img.mfree)
2207 def _VerifyFiles(self, nodes, master_node_uuid, all_nvinfo,
2208 (files_all, files_opt, files_mc, files_vm)):
2209 """Verifies file checksums collected from all nodes.
2211 @param nodes: List of L{objects.Node} objects
2212 @param master_node_uuid: UUID of master node
2213 @param all_nvinfo: RPC results
2216 # Define functions determining which nodes to consider for a file
2219 (files_mc, lambda node: (node.master_candidate or
2220 node.uuid == master_node_uuid)),
2221 (files_vm, lambda node: node.vm_capable),
2224 # Build mapping from filename to list of nodes which should have the file
2226 for (files, fn) in files2nodefn:
2230 filenodes = filter(fn, nodes)
2231 nodefiles.update((filename,
2232 frozenset(map(operator.attrgetter("uuid"), filenodes)))
2233 for filename in files)
2235 assert set(nodefiles) == (files_all | files_mc | files_vm)
2237 fileinfo = dict((filename, {}) for filename in nodefiles)
2238 ignore_nodes = set()
2242 ignore_nodes.add(node.uuid)
2245 nresult = all_nvinfo[node.uuid]
2247 if nresult.fail_msg or not nresult.payload:
2250 fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
2251 node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2252 for (key, value) in fingerprints.items())
2255 test = not (node_files and isinstance(node_files, dict))
2256 self._ErrorIf(test, constants.CV_ENODEFILECHECK, node.name,
2257 "Node did not return file checksum data")
2259 ignore_nodes.add(node.uuid)
2262 # Build per-checksum mapping from filename to nodes having it
2263 for (filename, checksum) in node_files.items():
2264 assert filename in nodefiles
2265 fileinfo[filename].setdefault(checksum, set()).add(node.uuid)
2267 for (filename, checksums) in fileinfo.items():
2268 assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2270 # Nodes having the file
2271 with_file = frozenset(node_uuid
2272 for node_uuids in fileinfo[filename].values()
2273 for node_uuid in node_uuids) - ignore_nodes
2275 expected_nodes = nodefiles[filename] - ignore_nodes
2277 # Nodes missing file
2278 missing_file = expected_nodes - with_file
2280 if filename in files_opt:
2282 self._ErrorIf(missing_file and missing_file != expected_nodes,
2283 constants.CV_ECLUSTERFILECHECK, None,
2284 "File %s is optional, but it must exist on all or no"
2285 " nodes (not found on %s)",
2289 map(self.cfg.GetNodeName, missing_file))))
2291 self._ErrorIf(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2292 "File %s is missing from node(s) %s", filename,
2295 map(self.cfg.GetNodeName, missing_file))))
2297 # Warn if a node has a file it shouldn't
2298 unexpected = with_file - expected_nodes
2299 self._ErrorIf(unexpected,
2300 constants.CV_ECLUSTERFILECHECK, None,
2301 "File %s should not exist on node(s) %s",
2302 filename, utils.CommaJoin(
2303 utils.NiceSort(map(self.cfg.GetNodeName, unexpected))))
2305 # See if there are multiple versions of the file
2306 test = len(checksums) > 1
2308 variants = ["variant %s on %s" %
2310 utils.CommaJoin(utils.NiceSort(
2311 map(self.cfg.GetNodeName, node_uuids))))
2312 for (idx, (checksum, node_uuids)) in
2313 enumerate(sorted(checksums.items()))]
2317 self._ErrorIf(test, constants.CV_ECLUSTERFILECHECK, None,
2318 "File %s found with %s different checksums (%s)",
2319 filename, len(checksums), "; ".join(variants))
2321 def _VerifyNodeDrbdHelper(self, ninfo, nresult, drbd_helper):
2322 """Verify the drbd helper.
2326 helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2327 test = (helper_result is None)
2328 self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2329 "no drbd usermode helper returned")
2331 status, payload = helper_result
2333 self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2334 "drbd usermode helper check unsuccessful: %s", payload)
2335 test = status and (payload != drbd_helper)
2336 self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2337 "wrong drbd usermode helper: %s", payload)
2339 def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2341 """Verifies and the node DRBD status.
2343 @type ninfo: L{objects.Node}
2344 @param ninfo: the node to check
2345 @param nresult: the remote results for the node
2346 @param instanceinfo: the dict of instances
2347 @param drbd_helper: the configured DRBD usermode helper
2348 @param drbd_map: the DRBD map as returned by
2349 L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2352 self._VerifyNodeDrbdHelper(ninfo, nresult, drbd_helper)
2354 # compute the DRBD minors
2356 for minor, inst_uuid in drbd_map[ninfo.uuid].items():
2357 test = inst_uuid not in instanceinfo
2358 self._ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2359 "ghost instance '%s' in temporary DRBD map", inst_uuid)
2360 # ghost instance should not be running, but otherwise we
2361 # don't give double warnings (both ghost instance and
2362 # unallocated minor in use)
2364 node_drbd[minor] = (inst_uuid, False)
2366 instance = instanceinfo[inst_uuid]
2367 node_drbd[minor] = (inst_uuid, instance.disks_active)
2369 # and now check them
2370 used_minors = nresult.get(constants.NV_DRBDLIST, [])
2371 test = not isinstance(used_minors, (tuple, list))
2372 self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2373 "cannot parse drbd status file: %s", str(used_minors))
2375 # we cannot check drbd status
2378 for minor, (inst_uuid, must_exist) in node_drbd.items():
2379 test = minor not in used_minors and must_exist
2380 self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2381 "drbd minor %d of instance %s is not active", minor,
2382 self.cfg.GetInstanceName(inst_uuid))
2383 for minor in used_minors:
2384 test = minor not in node_drbd
2385 self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2386 "unallocated drbd minor %d is in use", minor)
2388 def _UpdateNodeOS(self, ninfo, nresult, nimg):
2389 """Builds the node OS structures.
2391 @type ninfo: L{objects.Node}
2392 @param ninfo: the node to check
2393 @param nresult: the remote results for the node
2394 @param nimg: the node image object
2397 remote_os = nresult.get(constants.NV_OSLIST, None)
2398 test = (not isinstance(remote_os, list) or
2399 not compat.all(isinstance(v, list) and len(v) == 7
2400 for v in remote_os))
2402 self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2403 "node hasn't returned valid OS data")
2412 for (name, os_path, status, diagnose,
2413 variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2415 if name not in os_dict:
2418 # parameters is a list of lists instead of list of tuples due to
2419 # JSON lacking a real tuple type, fix it:
2420 parameters = [tuple(v) for v in parameters]
2421 os_dict[name].append((os_path, status, diagnose,
2422 set(variants), set(parameters), set(api_ver)))
2424 nimg.oslist = os_dict
2426 def _VerifyNodeOS(self, ninfo, nimg, base):
2427 """Verifies the node OS list.
2429 @type ninfo: L{objects.Node}
2430 @param ninfo: the node to check
2431 @param nimg: the node image object
2432 @param base: the 'template' node we match against (e.g. from the master)
2435 assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2437 beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2438 for os_name, os_data in nimg.oslist.items():
2439 assert os_data, "Empty OS status for OS %s?!" % os_name
2440 f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2441 self._ErrorIf(not f_status, constants.CV_ENODEOS, ninfo.name,
2442 "Invalid OS %s (located at %s): %s",
2443 os_name, f_path, f_diag)
2444 self._ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, ninfo.name,
2445 "OS '%s' has multiple entries"
2446 " (first one shadows the rest): %s",
2447 os_name, utils.CommaJoin([v[0] for v in os_data]))
2448 # comparisons with the 'base' image
2449 test = os_name not in base.oslist
2450 self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2451 "Extra OS %s not present on reference node (%s)",
2452 os_name, self.cfg.GetNodeName(base.uuid))
2455 assert base.oslist[os_name], "Base node has empty OS status?"
2456 _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2458 # base OS is invalid, skipping
2460 for kind, a, b in [("API version", f_api, b_api),
2461 ("variants list", f_var, b_var),
2462 ("parameters", beautify_params(f_param),
2463 beautify_params(b_param))]:
2464 self._ErrorIf(a != b, constants.CV_ENODEOS, ninfo.name,
2465 "OS %s for %s differs from reference node %s:"
2466 " [%s] vs. [%s]", kind, os_name,
2467 self.cfg.GetNodeName(base.uuid),
2468 utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2470 # check any missing OSes
2471 missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2472 self._ErrorIf(missing, constants.CV_ENODEOS, ninfo.name,
2473 "OSes present on reference node %s"
2474 " but missing on this node: %s",
2475 self.cfg.GetNodeName(base.uuid), utils.CommaJoin(missing))
2477 def _VerifyAcceptedFileStoragePaths(self, ninfo, nresult, is_master):
2478 """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2480 @type ninfo: L{objects.Node}
2481 @param ninfo: the node to check
2482 @param nresult: the remote results for the node
2483 @type is_master: bool
2484 @param is_master: Whether node is the master node
2487 cluster = self.cfg.GetClusterInfo()
2489 (cluster.IsFileStorageEnabled() or
2490 cluster.IsSharedFileStorageEnabled())):
2492 fspaths = nresult[constants.NV_ACCEPTED_STORAGE_PATHS]
2494 # This should never happen
2495 self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2496 "Node did not return forbidden file storage paths")
2498 self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2499 "Found forbidden file storage paths: %s",
2500 utils.CommaJoin(fspaths))
2502 self._ErrorIf(constants.NV_ACCEPTED_STORAGE_PATHS in nresult,
2503 constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2504 "Node should not have returned forbidden file storage"
2507 def _VerifyStoragePaths(self, ninfo, nresult, file_disk_template,
2508 verify_key, error_key):
2509 """Verifies (file) storage paths.
2511 @type ninfo: L{objects.Node}
2512 @param ninfo: the node to check
2513 @param nresult: the remote results for the node
2514 @type file_disk_template: string
2515 @param file_disk_template: file-based disk template, whose directory
2516 is supposed to be verified
2517 @type verify_key: string
2518 @param verify_key: key for the verification map of this file
2520 @param error_key: error key to be added to the verification results
2521 in case something goes wrong in this verification step
2524 assert (file_disk_template in
2525 utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
2526 cluster = self.cfg.GetClusterInfo()
2527 if cluster.IsDiskTemplateEnabled(file_disk_template):
2529 verify_key in nresult,
2530 error_key, ninfo.name,
2531 "The configured %s storage path is unusable: %s" %
2532 (file_disk_template, nresult.get(verify_key)))
2534 def _VerifyFileStoragePaths(self, ninfo, nresult):
2535 """Verifies (file) storage paths.
2537 @see: C{_VerifyStoragePaths}
2540 self._VerifyStoragePaths(
2541 ninfo, nresult, constants.DT_FILE,
2542 constants.NV_FILE_STORAGE_PATH,
2543 constants.CV_ENODEFILESTORAGEPATHUNUSABLE)
2545 def _VerifySharedFileStoragePaths(self, ninfo, nresult):
2546 """Verifies (file) storage paths.
2548 @see: C{_VerifyStoragePaths}
2551 self._VerifyStoragePaths(
2552 ninfo, nresult, constants.DT_SHARED_FILE,
2553 constants.NV_SHARED_FILE_STORAGE_PATH,
2554 constants.CV_ENODESHAREDFILESTORAGEPATHUNUSABLE)
2556 def _VerifyOob(self, ninfo, nresult):
2557 """Verifies out of band functionality of a node.
2559 @type ninfo: L{objects.Node}
2560 @param ninfo: the node to check
2561 @param nresult: the remote results for the node
2564 # We just have to verify the paths on master and/or master candidates
2565 # as the oob helper is invoked on the master
2566 if ((ninfo.master_candidate or ninfo.master_capable) and
2567 constants.NV_OOB_PATHS in nresult):
2568 for path_result in nresult[constants.NV_OOB_PATHS]:
2569 self._ErrorIf(path_result, constants.CV_ENODEOOBPATH,
2570 ninfo.name, path_result)
2572 def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2573 """Verifies and updates the node volume data.
2575 This function will update a L{NodeImage}'s internal structures
2576 with data from the remote call.
2578 @type ninfo: L{objects.Node}
2579 @param ninfo: the node to check
2580 @param nresult: the remote results for the node
2581 @param nimg: the node image object
2582 @param vg_name: the configured VG name
2585 nimg.lvm_fail = True
2586 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2589 elif isinstance(lvdata, basestring):
2590 self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2591 "LVM problem on node: %s", utils.SafeEncode(lvdata))
2592 elif not isinstance(lvdata, dict):
2593 self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2594 "rpc call to node failed (lvlist)")
2596 nimg.volumes = lvdata
2597 nimg.lvm_fail = False
2599 def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2600 """Verifies and updates the node instance list.
2602 If the listing was successful, then updates this node's instance
2603 list. Otherwise, it marks the RPC call as failed for the instance
2606 @type ninfo: L{objects.Node}
2607 @param ninfo: the node to check
2608 @param nresult: the remote results for the node
2609 @param nimg: the node image object
2612 idata = nresult.get(constants.NV_INSTANCELIST, None)
2613 test = not isinstance(idata, list)
2614 self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2615 "rpc call to node failed (instancelist): %s",
2616 utils.SafeEncode(str(idata)))
2618 nimg.hyp_fail = True
2620 nimg.instances = [inst.uuid for (_, inst) in
2621 self.cfg.GetMultiInstanceInfoByName(idata)]
2623 def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2624 """Verifies and computes a node information map
2626 @type ninfo: L{objects.Node}
2627 @param ninfo: the node to check
2628 @param nresult: the remote results for the node
2629 @param nimg: the node image object
2630 @param vg_name: the configured VG name
2633 # try to read free memory (from the hypervisor)
2634 hv_info = nresult.get(constants.NV_HVINFO, None)
2635 test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2636 self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2637 "rpc call to node failed (hvinfo)")
2640 nimg.mfree = int(hv_info["memory_free"])
2641 except (ValueError, TypeError):
2642 self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2643 "node returned invalid nodeinfo, check hypervisor")
2645 # FIXME: devise a free space model for file based instances as well
2646 if vg_name is not None:
2647 test = (constants.NV_VGLIST not in nresult or
2648 vg_name not in nresult[constants.NV_VGLIST])
2649 self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
2650 "node didn't return data for the volume group '%s'"
2651 " - it is either missing or broken", vg_name)
2654 nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2655 except (ValueError, TypeError):
2656 self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2657 "node returned invalid LVM info, check LVM status")
2659 def _CollectDiskInfo(self, node_uuids, node_image, instanceinfo):
2660 """Gets per-disk status information for all instances.
2662 @type node_uuids: list of strings
2663 @param node_uuids: Node UUIDs
2664 @type node_image: dict of (UUID, L{objects.Node})
2665 @param node_image: Node objects
2666 @type instanceinfo: dict of (UUID, L{objects.Instance})
2667 @param instanceinfo: Instance objects
2668 @rtype: {instance: {node: [(succes, payload)]}}
2669 @return: a dictionary of per-instance dictionaries with nodes as
2670 keys and disk information as values; the disk information is a
2671 list of tuples (success, payload)
2675 node_disks_dev_inst_only = {}
2676 diskless_instances = set()
2677 diskless = constants.DT_DISKLESS
2679 for nuuid in node_uuids:
2680 node_inst_uuids = list(itertools.chain(node_image[nuuid].pinst,
2681 node_image[nuuid].sinst))
2682 diskless_instances.update(uuid for uuid in node_inst_uuids
2683 if instanceinfo[uuid].disk_template == diskless)
2684 disks = [(inst_uuid, disk)
2685 for inst_uuid in node_inst_uuids
2686 for disk in instanceinfo[inst_uuid].disks]
2689 # No need to collect data
2692 node_disks[nuuid] = disks
2694 # _AnnotateDiskParams makes already copies of the disks
2696 for (inst_uuid, dev) in disks:
2697 (anno_disk,) = AnnotateDiskParams(instanceinfo[inst_uuid], [dev],
2699 dev_inst_only.append((anno_disk, instanceinfo[inst_uuid]))
2701 node_disks_dev_inst_only[nuuid] = dev_inst_only
2703 assert len(node_disks) == len(node_disks_dev_inst_only)
2705 # Collect data from all nodes with disks
2706 result = self.rpc.call_blockdev_getmirrorstatus_multi(
2707 node_disks.keys(), node_disks_dev_inst_only)
2709 assert len(result) == len(node_disks)
2713 for (nuuid, nres) in result.items():
2714 node = self.cfg.GetNodeInfo(nuuid)
2715 disks = node_disks[node.uuid]
2718 # No data from this node
2719 data = len(disks) * [(False, "node offline")]
2722 self._ErrorIf(msg, constants.CV_ENODERPC, node.name,
2723 "while getting disk information: %s", msg)
2725 # No data from this node
2726 data = len(disks) * [(False, msg)]
2729 for idx, i in enumerate(nres.payload):
2730 if isinstance(i, (tuple, list)) and len(i) == 2:
2733 logging.warning("Invalid result from node %s, entry %d: %s",
2735 data.append((False, "Invalid result from the remote node"))
2737 for ((inst_uuid, _), status) in zip(disks, data):
2738 instdisk.setdefault(inst_uuid, {}).setdefault(node.uuid, []) \
2741 # Add empty entries for diskless instances.
2742 for inst_uuid in diskless_instances:
2743 assert inst_uuid not in instdisk
2744 instdisk[inst_uuid] = {}
2746 assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2747 len(nuuids) <= len(instanceinfo[inst].all_nodes) and
2748 compat.all(isinstance(s, (tuple, list)) and
2749 len(s) == 2 for s in statuses)
2750 for inst, nuuids in instdisk.items()
2751 for nuuid, statuses in nuuids.items())
2753 instdisk_keys = set(instdisk)
2754 instanceinfo_keys = set(instanceinfo)
2755 assert instdisk_keys == instanceinfo_keys, \
2756 ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2757 (instdisk_keys, instanceinfo_keys))
2762 def _SshNodeSelector(group_uuid, all_nodes):
2763 """Create endless iterators for all potential SSH check hosts.
2766 nodes = [node for node in all_nodes
2767 if (node.group != group_uuid and
2769 keyfunc = operator.attrgetter("group")
2771 return map(itertools.cycle,
2772 [sorted(map(operator.attrgetter("name"), names))
2773 for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2777 def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2778 """Choose which nodes should talk to which other nodes.
2780 We will make nodes contact all nodes in their group, and one node from
2783 @warning: This algorithm has a known issue if one node group is much
2784 smaller than others (e.g. just one node). In such a case all other
2785 nodes will talk to the single node.
2788 online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2789 sel = cls._SshNodeSelector(group_uuid, all_nodes)
2791 return (online_nodes,
2792 dict((name, sorted([i.next() for i in sel]))
2793 for name in online_nodes))
2795 def BuildHooksEnv(self):
2798 Cluster-Verify hooks just ran in the post phase and their failure makes
2799 the output be logged in the verify output and the verification to fail.
2803 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2806 env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2807 for node in self.my_node_info.values())
2811 def BuildHooksNodes(self):
2812 """Build hooks nodes.
2815 return ([], list(self.my_node_info.keys()))
2817 def Exec(self, feedback_fn):
2818 """Verify integrity of the node group, performing various test on nodes.
2821 # This method has too many local variables. pylint: disable=R0914
2822 feedback_fn("* Verifying group '%s'" % self.group_info.name)
2824 if not self.my_node_uuids:
2826 feedback_fn("* Empty node group, skipping verification")
2830 verbose = self.op.verbose
2831 self._feedback_fn = feedback_fn
2833 vg_name = self.cfg.GetVGName()
2834 drbd_helper = self.cfg.GetDRBDHelper()
2835 cluster = self.cfg.GetClusterInfo()
2836 hypervisors = cluster.enabled_hypervisors
2837 node_data_list = self.my_node_info.values()
2839 i_non_redundant = [] # Non redundant instances
2840 i_non_a_balanced = [] # Non auto-balanced instances
2841 i_offline = 0 # Count of offline instances
2842 n_offline = 0 # Count of offline nodes
2843 n_drained = 0 # Count of nodes being drained
2844 node_vol_should = {}
2846 # FIXME: verify OS list
2849 filemap = ComputeAncillaryFiles(cluster, False)
2851 # do local checksums
2852 master_node_uuid = self.master_node = self.cfg.GetMasterNode()
2853 master_ip = self.cfg.GetMasterIP()
2855 feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_uuids))
2858 if self.cfg.GetUseExternalMipScript():
2859 user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2861 node_verify_param = {
2862 constants.NV_FILELIST:
2863 map(vcluster.MakeVirtualPath,
2864 utils.UniqueSequence(filename
2865 for files in filemap
2866 for filename in files)),
2867 constants.NV_NODELIST:
2868 self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2869 self.all_node_info.values()),
2870 constants.NV_HYPERVISOR: hypervisors,
2871 constants.NV_HVPARAMS:
2872 _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2873 constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2874 for node in node_data_list
2875 if not node.offline],
2876 constants.NV_INSTANCELIST: hypervisors,
2877 constants.NV_VERSION: None,
2878 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2879 constants.NV_NODESETUP: None,
2880 constants.NV_TIME: None,
2881 constants.NV_MASTERIP: (self.cfg.GetMasterNodeName(), master_ip),
2882 constants.NV_OSLIST: None,
2883 constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2884 constants.NV_USERSCRIPTS: user_scripts,
2887 if vg_name is not None:
2888 node_verify_param[constants.NV_VGLIST] = None
2889 node_verify_param[constants.NV_LVLIST] = vg_name
2890 node_verify_param[constants.NV_PVLIST] = [vg_name]
2892 if cluster.IsDiskTemplateEnabled(constants.DT_DRBD8):
2894 node_verify_param[constants.NV_DRBDVERSION] = None
2895 node_verify_param[constants.NV_DRBDLIST] = None
2896 node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2898 if cluster.IsFileStorageEnabled() or \
2899 cluster.IsSharedFileStorageEnabled():
2900 # Load file storage paths only from master node
2901 node_verify_param[constants.NV_ACCEPTED_STORAGE_PATHS] = \
2902 self.cfg.GetMasterNodeName()
2903 if cluster.IsFileStorageEnabled():
2904 node_verify_param[constants.NV_FILE_STORAGE_PATH] = \
2905 cluster.file_storage_dir
2908 # FIXME: this needs to be changed per node-group, not cluster-wide
2910 default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2911 if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2912 bridges.add(default_nicpp[constants.NIC_LINK])
2913 for inst_uuid in self.my_inst_info.values():
2914 for nic in inst_uuid.nics:
2915 full_nic = cluster.SimpleFillNIC(nic.nicparams)
2916 if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2917 bridges.add(full_nic[constants.NIC_LINK])
2920 node_verify_param[constants.NV_BRIDGES] = list(bridges)
2922 # Build our expected cluster state
2923 node_image = dict((node.uuid, self.NodeImage(offline=node.offline,
2925 vm_capable=node.vm_capable))
2926 for node in node_data_list)
2930 for node in self.all_node_info.values():
2931 path = SupportsOob(self.cfg, node)
2932 if path and path not in oob_paths:
2933 oob_paths.append(path)
2936 node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2938 for inst_uuid in self.my_inst_uuids:
2939 instance = self.my_inst_info[inst_uuid]
2940 if instance.admin_state == constants.ADMINST_OFFLINE:
2943 for nuuid in instance.all_nodes:
2944 if nuuid not in node_image:
2945 gnode = self.NodeImage(uuid=nuuid)
2946 gnode.ghost = (nuuid not in self.all_node_info)
2947 node_image[nuuid] = gnode
2949 instance.MapLVsByNode(node_vol_should)
2951 pnode = instance.primary_node
2952 node_image[pnode].pinst.append(instance.uuid)
2954 for snode in instance.secondary_nodes:
2955 nimg = node_image[snode]
2956 nimg.sinst.append(instance.uuid)
2957 if pnode not in nimg.sbp:
2958 nimg.sbp[pnode] = []
2959 nimg.sbp[pnode].append(instance.uuid)
2961 es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
2962 self.my_node_info.keys())
2963 # The value of exclusive_storage should be the same across the group, so if
2964 # it's True for at least a node, we act as if it were set for all the nodes
2965 self._exclusive_storage = compat.any(es_flags.values())
2966 if self._exclusive_storage:
2967 node_verify_param[constants.NV_EXCLUSIVEPVS] = True
2969 # At this point, we have the in-memory data structures complete,
2970 # except for the runtime information, which we'll gather next
2972 # Due to the way our RPC system works, exact response times cannot be
2973 # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2974 # time before and after executing the request, we can at least have a time
2976 nvinfo_starttime = time.time()
2977 all_nvinfo = self.rpc.call_node_verify(self.my_node_uuids,
2979 self.cfg.GetClusterName(),
2980 self.cfg.GetClusterInfo().hvparams)
2981 nvinfo_endtime = time.time()
2983 if self.extra_lv_nodes and vg_name is not None:
2985 self.rpc.call_node_verify(self.extra_lv_nodes,
2986 {constants.NV_LVLIST: vg_name},
2987 self.cfg.GetClusterName(),
2988 self.cfg.GetClusterInfo().hvparams)
2990 extra_lv_nvinfo = {}
2992 all_drbd_map = self.cfg.ComputeDRBDMap()
2994 feedback_fn("* Gathering disk information (%s nodes)" %
2995 len(self.my_node_uuids))
2996 instdisk = self._CollectDiskInfo(self.my_node_info.keys(), node_image,
2999 feedback_fn("* Verifying configuration file consistency")
3001 # If not all nodes are being checked, we need to make sure the master node
3002 # and a non-checked vm_capable node are in the list.
3003 absent_node_uuids = set(self.all_node_info).difference(self.my_node_info)
3004 if absent_node_uuids:
3005 vf_nvinfo = all_nvinfo.copy()
3006 vf_node_info = list(self.my_node_info.values())
3007 additional_node_uuids = []
3008 if master_node_uuid not in self.my_node_info:
3009 additional_node_uuids.append(master_node_uuid)
3010 vf_node_info.append(self.all_node_info[master_node_uuid])
3011 # Add the first vm_capable node we find which is not included,
3012 # excluding the master node (which we already have)
3013 for node_uuid in absent_node_uuids:
3014 nodeinfo = self.all_node_info[node_uuid]
3015 if (nodeinfo.vm_capable and not nodeinfo.offline and
3016 node_uuid != master_node_uuid):
3017 additional_node_uuids.append(node_uuid)
3018 vf_node_info.append(self.all_node_info[node_uuid])
3020 key = constants.NV_FILELIST
3021 vf_nvinfo.update(self.rpc.call_node_verify(
3022 additional_node_uuids, {key: node_verify_param[key]},
3023 self.cfg.GetClusterName(), self.cfg.GetClusterInfo().hvparams))
3025 vf_nvinfo = all_nvinfo
3026 vf_node_info = self.my_node_info.values()
3028 self._VerifyFiles(vf_node_info, master_node_uuid, vf_nvinfo, filemap)
3030 feedback_fn("* Verifying node status")
3034 for node_i in node_data_list:
3035 nimg = node_image[node_i.uuid]
3039 feedback_fn("* Skipping offline node %s" % (node_i.name,))
3043 if node_i.uuid == master_node_uuid:
3045 elif node_i.master_candidate:
3046 ntype = "master candidate"
3047 elif node_i.drained:
3053 feedback_fn("* Verifying node %s (%s)" % (node_i.name, ntype))
3055 msg = all_nvinfo[node_i.uuid].fail_msg
3056 self._ErrorIf(msg, constants.CV_ENODERPC, node_i.name,
3057 "while contacting node: %s", msg)
3059 nimg.rpc_fail = True
3062 nresult = all_nvinfo[node_i.uuid].payload
3064 nimg.call_ok = self._VerifyNode(node_i, nresult)
3065 self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
3066 self._VerifyNodeNetwork(node_i, nresult)
3067 self._VerifyNodeUserScripts(node_i, nresult)
3068 self._VerifyOob(node_i, nresult)
3069 self._VerifyAcceptedFileStoragePaths(node_i, nresult,
3070 node_i.uuid == master_node_uuid)
3071 self._VerifyFileStoragePaths(node_i, nresult)
3072 self._VerifySharedFileStoragePaths(node_i, nresult)
3075 self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
3076 self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
3079 self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
3080 self._UpdateNodeInstances(node_i, nresult, nimg)
3081 self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
3082 self._UpdateNodeOS(node_i, nresult, nimg)
3084 if not nimg.os_fail:
3085 if refos_img is None:
3087 self._VerifyNodeOS(node_i, nimg, refos_img)
3088 self._VerifyNodeBridges(node_i, nresult, bridges)
3090 # Check whether all running instances are primary for the node. (This
3091 # can no longer be done from _VerifyInstance below, since some of the
3092 # wrong instances could be from other node groups.)
3093 non_primary_inst_uuids = set(nimg.instances).difference(nimg.pinst)
3095 for inst_uuid in non_primary_inst_uuids:
3096 test = inst_uuid in self.all_inst_info
3097 self._ErrorIf(test, constants.CV_EINSTANCEWRONGNODE,
3098 self.cfg.GetInstanceName(inst_uuid),
3099 "instance should not run on node %s", node_i.name)
3100 self._ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
3101 "node is running unknown instance %s", inst_uuid)
3103 self._VerifyGroupDRBDVersion(all_nvinfo)
3104 self._VerifyGroupLVM(node_image, vg_name)
3106 for node_uuid, result in extra_lv_nvinfo.items():
3107 self._UpdateNodeVolumes(self.all_node_info[node_uuid], result.payload,
3108 node_image[node_uuid], vg_name)
3110 feedback_fn("* Verifying instance status")
3111 for inst_uuid in self.my_inst_uuids:
3112 instance = self.my_inst_info[inst_uuid]
3114 feedback_fn("* Verifying instance %s" % instance.name)
3115 self._VerifyInstance(instance, node_image, instdisk[inst_uuid])
3117 # If the instance is non-redundant we cannot survive losing its primary
3118 # node, so we are not N+1 compliant.
3119 if instance.disk_template not in constants.DTS_MIRRORED:
3120 i_non_redundant.append(instance)
3122 if not cluster.FillBE(instance)[constants.BE_AUTO_BALANCE]:
3123 i_non_a_balanced.append(instance)
3125 feedback_fn("* Verifying orphan volumes")
3126 reserved = utils.FieldSet(*cluster.reserved_lvs)
3128 # We will get spurious "unknown volume" warnings if any node of this group
3129 # is secondary for an instance whose primary is in another group. To avoid
3130 # them, we find these instances and add their volumes to node_vol_should.
3131 for instance in self.all_inst_info.values():
3132 for secondary in instance.secondary_nodes:
3133 if (secondary in self.my_node_info
3134 and instance.name not in self.my_inst_info):
3135 instance.MapLVsByNode(node_vol_should)
3138 self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
3140 if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
3141 feedback_fn("* Verifying N+1 Memory redundancy")
3142 self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
3144 feedback_fn("* Other Notes")
3146 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
3147 % len(i_non_redundant))
3149 if i_non_a_balanced:
3150 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
3151 % len(i_non_a_balanced))
3154 feedback_fn(" - NOTICE: %d offline instance(s) found." % i_offline)
3157 feedback_fn(" - NOTICE: %d offline node(s) found." % n_offline)
3160 feedback_fn(" - NOTICE: %d drained node(s) found." % n_drained)
3164 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
3165 """Analyze the post-hooks' result
3167 This method analyses the hook result, handles it, and sends some
3168 nicely-formatted feedback back to the user.
3170 @param phase: one of L{constants.HOOKS_PHASE_POST} or
3171 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
3172 @param hooks_results: the results of the multi-node hooks rpc call
3173 @param feedback_fn: function used send feedback back to the caller
3174 @param lu_result: previous Exec result
3175 @return: the new Exec result, based on the previous result
3179 # We only really run POST phase hooks, only for non-empty groups,
3180 # and are only interested in their results
3181 if not self.my_node_uuids:
3184 elif phase == constants.HOOKS_PHASE_POST:
3185 # Used to change hooks' output to proper indentation
3186 feedback_fn("* Hooks Results")
3187 assert hooks_results, "invalid result from hooks"
3189 for node_name in hooks_results:
3190 res = hooks_results[node_name]
3192 test = msg and not res.offline
3193 self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3194 "Communication failure in hooks execution: %s", msg)
3195 if res.offline or msg:
3196 # No need to investigate payload if node is offline or gave
3199 for script, hkr, output in res.payload:
3200 test = hkr == constants.HKR_FAIL
3201 self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3202 "Script %s failed, output:", script)
3204 output = self._HOOKS_INDENT_RE.sub(" ", output)
3205 feedback_fn("%s" % output)
3211 class LUClusterVerifyDisks(NoHooksLU):
3212 """Verifies the cluster disks status.
3217 def ExpandNames(self):
3218 self.share_locks = ShareAll()
3219 self.needed_locks = {
3220 locking.LEVEL_NODEGROUP: locking.ALL_SET,
3223 def Exec(self, feedback_fn):
3224 group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3226 # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3227 return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3228 for group in group_names])