4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Logical units dealing with the cluster."""
34 from ganeti import compat
35 from ganeti import constants
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import masterd
40 from ganeti import netutils
41 from ganeti import objects
42 from ganeti import opcodes
43 from ganeti import pathutils
44 from ganeti import query
45 from ganeti import rpc
46 from ganeti import runtime
47 from ganeti import ssh
48 from ganeti import uidpool
49 from ganeti import utils
50 from ganeti import vcluster
52 from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
54 from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55 ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56 GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57 GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58 CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59 ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob, \
60 CheckIpolicyVsDiskTemplates
62 import ganeti.masterd.instance
65 class LUClusterActivateMasterIp(NoHooksLU):
66 """Activate the master IP on the master node.
69 def Exec(self, feedback_fn):
70 """Activate the master IP.
73 master_params = self.cfg.GetMasterNetworkParameters()
74 ems = self.cfg.GetUseExternalMipScript()
75 result = self.rpc.call_node_activate_master_ip(master_params.uuid,
77 result.Raise("Could not activate the master IP")
80 class LUClusterDeactivateMasterIp(NoHooksLU):
81 """Deactivate the master IP on the master node.
84 def Exec(self, feedback_fn):
85 """Deactivate the master IP.
88 master_params = self.cfg.GetMasterNetworkParameters()
89 ems = self.cfg.GetUseExternalMipScript()
90 result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
92 result.Raise("Could not deactivate the master IP")
95 class LUClusterConfigQuery(NoHooksLU):
96 """Return configuration values.
101 def CheckArguments(self):
102 self.cq = ClusterQuery(None, self.op.output_fields, False)
104 def ExpandNames(self):
105 self.cq.ExpandNames(self)
107 def DeclareLocks(self, level):
108 self.cq.DeclareLocks(self, level)
110 def Exec(self, feedback_fn):
111 result = self.cq.OldStyleQuery(self)
113 assert len(result) == 1
118 class LUClusterDestroy(LogicalUnit):
119 """Logical unit for destroying the cluster.
122 HPATH = "cluster-destroy"
123 HTYPE = constants.HTYPE_CLUSTER
125 def BuildHooksEnv(self):
130 "OP_TARGET": self.cfg.GetClusterName(),
133 def BuildHooksNodes(self):
134 """Build hooks nodes.
139 def CheckPrereq(self):
140 """Check prerequisites.
142 This checks whether the cluster is empty.
144 Any errors are signaled by raising errors.OpPrereqError.
147 master = self.cfg.GetMasterNode()
149 nodelist = self.cfg.GetNodeList()
150 if len(nodelist) != 1 or nodelist[0] != master:
151 raise errors.OpPrereqError("There are still %d node(s) in"
152 " this cluster." % (len(nodelist) - 1),
154 instancelist = self.cfg.GetInstanceList()
156 raise errors.OpPrereqError("There are still %d instance(s) in"
157 " this cluster." % len(instancelist),
160 def Exec(self, feedback_fn):
161 """Destroys the cluster.
164 master_params = self.cfg.GetMasterNetworkParameters()
166 # Run post hooks on master node before it's removed
167 RunPostHook(self, self.cfg.GetNodeName(master_params.uuid))
169 ems = self.cfg.GetUseExternalMipScript()
170 result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
172 result.Warn("Error disabling the master IP address", self.LogWarning)
173 return master_params.uuid
176 class LUClusterPostInit(LogicalUnit):
177 """Logical unit for running hooks after cluster initialization.
180 HPATH = "cluster-init"
181 HTYPE = constants.HTYPE_CLUSTER
183 def BuildHooksEnv(self):
188 "OP_TARGET": self.cfg.GetClusterName(),
191 def BuildHooksNodes(self):
192 """Build hooks nodes.
195 return ([], [self.cfg.GetMasterNode()])
197 def Exec(self, feedback_fn):
204 class ClusterQuery(QueryBase):
205 FIELDS = query.CLUSTER_FIELDS
207 #: Do not sort (there is only one item)
210 def ExpandNames(self, lu):
213 # The following variables interact with _QueryBase._GetNames
214 self.wanted = locking.ALL_SET
215 self.do_locking = self.use_locking
218 raise errors.OpPrereqError("Can not use locking for cluster queries",
221 def DeclareLocks(self, lu, level):
224 def _GetQueryData(self, lu):
225 """Computes the list of nodes and their attributes.
228 # Locking is not used
229 assert not (compat.any(lu.glm.is_owned(level)
230 for level in locking.LEVELS
231 if level != locking.LEVEL_CLUSTER) or
232 self.do_locking or self.use_locking)
234 if query.CQ_CONFIG in self.requested_data:
235 cluster = lu.cfg.GetClusterInfo()
236 nodes = lu.cfg.GetAllNodesInfo()
238 cluster = NotImplemented
239 nodes = NotImplemented
241 if query.CQ_QUEUE_DRAINED in self.requested_data:
242 drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
244 drain_flag = NotImplemented
246 if query.CQ_WATCHER_PAUSE in self.requested_data:
247 master_node_uuid = lu.cfg.GetMasterNode()
249 result = lu.rpc.call_get_watcher_pause(master_node_uuid)
250 result.Raise("Can't retrieve watcher pause from master node '%s'" %
251 lu.cfg.GetMasterNodeName())
253 watcher_pause = result.payload
255 watcher_pause = NotImplemented
257 return query.ClusterQueryData(cluster, nodes, drain_flag, watcher_pause)
260 class LUClusterQuery(NoHooksLU):
261 """Query cluster configuration.
266 def ExpandNames(self):
267 self.needed_locks = {}
269 def Exec(self, feedback_fn):
270 """Return cluster config.
273 cluster = self.cfg.GetClusterInfo()
276 # Filter just for enabled hypervisors
277 for os_name, hv_dict in cluster.os_hvp.items():
279 for hv_name, hv_params in hv_dict.items():
280 if hv_name in cluster.enabled_hypervisors:
281 os_hvp[os_name][hv_name] = hv_params
283 # Convert ip_family to ip_version
284 primary_ip_version = constants.IP4_VERSION
285 if cluster.primary_ip_family == netutils.IP6Address.family:
286 primary_ip_version = constants.IP6_VERSION
289 "software_version": constants.RELEASE_VERSION,
290 "protocol_version": constants.PROTOCOL_VERSION,
291 "config_version": constants.CONFIG_VERSION,
292 "os_api_version": max(constants.OS_API_VERSIONS),
293 "export_version": constants.EXPORT_VERSION,
294 "vcs_version": constants.VCS_VERSION,
295 "architecture": runtime.GetArchInfo(),
296 "name": cluster.cluster_name,
297 "master": self.cfg.GetMasterNodeName(),
298 "default_hypervisor": cluster.primary_hypervisor,
299 "enabled_hypervisors": cluster.enabled_hypervisors,
300 "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
301 for hypervisor_name in cluster.enabled_hypervisors]),
303 "beparams": cluster.beparams,
304 "osparams": cluster.osparams,
305 "ipolicy": cluster.ipolicy,
306 "nicparams": cluster.nicparams,
307 "ndparams": cluster.ndparams,
308 "diskparams": cluster.diskparams,
309 "candidate_pool_size": cluster.candidate_pool_size,
310 "master_netdev": cluster.master_netdev,
311 "master_netmask": cluster.master_netmask,
312 "use_external_mip_script": cluster.use_external_mip_script,
313 "volume_group_name": cluster.volume_group_name,
314 "drbd_usermode_helper": cluster.drbd_usermode_helper,
315 "file_storage_dir": cluster.file_storage_dir,
316 "shared_file_storage_dir": cluster.shared_file_storage_dir,
317 "maintain_node_health": cluster.maintain_node_health,
318 "ctime": cluster.ctime,
319 "mtime": cluster.mtime,
320 "uuid": cluster.uuid,
321 "tags": list(cluster.GetTags()),
322 "uid_pool": cluster.uid_pool,
323 "default_iallocator": cluster.default_iallocator,
324 "reserved_lvs": cluster.reserved_lvs,
325 "primary_ip_version": primary_ip_version,
326 "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
327 "hidden_os": cluster.hidden_os,
328 "blacklisted_os": cluster.blacklisted_os,
329 "enabled_disk_templates": cluster.enabled_disk_templates,
335 class LUClusterRedistConf(NoHooksLU):
336 """Force the redistribution of cluster configuration.
338 This is a very simple LU.
343 def ExpandNames(self):
344 self.needed_locks = {
345 locking.LEVEL_NODE: locking.ALL_SET,
346 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
348 self.share_locks = ShareAll()
350 def Exec(self, feedback_fn):
351 """Redistribute the configuration.
354 self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
355 RedistributeAncillaryFiles(self)
358 class LUClusterRename(LogicalUnit):
359 """Rename the cluster.
362 HPATH = "cluster-rename"
363 HTYPE = constants.HTYPE_CLUSTER
365 def BuildHooksEnv(self):
370 "OP_TARGET": self.cfg.GetClusterName(),
371 "NEW_NAME": self.op.name,
374 def BuildHooksNodes(self):
375 """Build hooks nodes.
378 return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
380 def CheckPrereq(self):
381 """Verify that the passed name is a valid one.
384 hostname = netutils.GetHostname(name=self.op.name,
385 family=self.cfg.GetPrimaryIPFamily())
387 new_name = hostname.name
388 self.ip = new_ip = hostname.ip
389 old_name = self.cfg.GetClusterName()
390 old_ip = self.cfg.GetMasterIP()
391 if new_name == old_name and new_ip == old_ip:
392 raise errors.OpPrereqError("Neither the name nor the IP address of the"
393 " cluster has changed",
396 if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
397 raise errors.OpPrereqError("The given cluster IP address (%s) is"
398 " reachable on the network" %
399 new_ip, errors.ECODE_NOTUNIQUE)
401 self.op.name = new_name
403 def Exec(self, feedback_fn):
404 """Rename the cluster.
407 clustername = self.op.name
410 # shutdown the master IP
411 master_params = self.cfg.GetMasterNetworkParameters()
412 ems = self.cfg.GetUseExternalMipScript()
413 result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
415 result.Raise("Could not disable the master role")
418 cluster = self.cfg.GetClusterInfo()
419 cluster.cluster_name = clustername
420 cluster.master_ip = new_ip
421 self.cfg.Update(cluster, feedback_fn)
423 # update the known hosts file
424 ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
425 node_list = self.cfg.GetOnlineNodeList()
427 node_list.remove(master_params.uuid)
430 UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
432 master_params.ip = new_ip
433 result = self.rpc.call_node_activate_master_ip(master_params.uuid,
435 result.Warn("Could not re-enable the master role on the master,"
436 " please restart manually", self.LogWarning)
441 class LUClusterRepairDiskSizes(NoHooksLU):
442 """Verifies the cluster disks sizes.
447 def ExpandNames(self):
448 if self.op.instances:
449 (_, self.wanted_names) = GetWantedInstances(self, self.op.instances)
450 # Not getting the node allocation lock as only a specific set of
451 # instances (and their nodes) is going to be acquired
452 self.needed_locks = {
453 locking.LEVEL_NODE_RES: [],
454 locking.LEVEL_INSTANCE: self.wanted_names,
456 self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
458 self.wanted_names = None
459 self.needed_locks = {
460 locking.LEVEL_NODE_RES: locking.ALL_SET,
461 locking.LEVEL_INSTANCE: locking.ALL_SET,
463 # This opcode is acquires the node locks for all instances
464 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
468 locking.LEVEL_NODE_RES: 1,
469 locking.LEVEL_INSTANCE: 0,
470 locking.LEVEL_NODE_ALLOC: 1,
473 def DeclareLocks(self, level):
474 if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
475 self._LockInstancesNodes(primary_only=True, level=level)
477 def CheckPrereq(self):
478 """Check prerequisites.
480 This only checks the optional instance list against the existing names.
483 if self.wanted_names is None:
484 self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
486 self.wanted_instances = \
487 map(compat.snd, self.cfg.GetMultiInstanceInfoByName(self.wanted_names))
489 def _EnsureChildSizes(self, disk):
490 """Ensure children of the disk have the needed disk size.
492 This is valid mainly for DRBD8 and fixes an issue where the
493 children have smaller disk size.
495 @param disk: an L{ganeti.objects.Disk} object
498 if disk.dev_type == constants.DT_DRBD8:
499 assert disk.children, "Empty children for DRBD8?"
500 fchild = disk.children[0]
501 mismatch = fchild.size < disk.size
503 self.LogInfo("Child disk has size %d, parent %d, fixing",
504 fchild.size, disk.size)
505 fchild.size = disk.size
507 # and we recurse on this child only, not on the metadev
508 return self._EnsureChildSizes(fchild) or mismatch
512 def Exec(self, feedback_fn):
513 """Verify the size of cluster disks.
516 # TODO: check child disks too
517 # TODO: check differences in size between primary/secondary nodes
519 for instance in self.wanted_instances:
520 pnode = instance.primary_node
521 if pnode not in per_node_disks:
522 per_node_disks[pnode] = []
523 for idx, disk in enumerate(instance.disks):
524 per_node_disks[pnode].append((instance, idx, disk))
526 assert not (frozenset(per_node_disks.keys()) -
527 self.owned_locks(locking.LEVEL_NODE_RES)), \
528 "Not owning correct locks"
529 assert not self.owned_locks(locking.LEVEL_NODE)
531 es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
532 per_node_disks.keys())
535 for node_uuid, dskl in per_node_disks.items():
537 # no disks on the node
540 newl = [(v[2].Copy(), v[0]) for v in dskl]
541 node_name = self.cfg.GetNodeName(node_uuid)
542 result = self.rpc.call_blockdev_getdimensions(node_uuid, newl)
544 self.LogWarning("Failure in blockdev_getdimensions call to node"
545 " %s, ignoring", node_name)
547 if len(result.payload) != len(dskl):
548 logging.warning("Invalid result from node %s: len(dksl)=%d,"
549 " result.payload=%s", node_name, len(dskl),
551 self.LogWarning("Invalid result from node %s, ignoring node results",
554 for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
555 if dimensions is None:
556 self.LogWarning("Disk %d of instance %s did not return size"
557 " information, ignoring", idx, instance.name)
559 if not isinstance(dimensions, (tuple, list)):
560 self.LogWarning("Disk %d of instance %s did not return valid"
561 " dimension information, ignoring", idx,
564 (size, spindles) = dimensions
565 if not isinstance(size, (int, long)):
566 self.LogWarning("Disk %d of instance %s did not return valid"
567 " size information, ignoring", idx, instance.name)
570 if size != disk.size:
571 self.LogInfo("Disk %d of instance %s has mismatched size,"
572 " correcting: recorded %d, actual %d", idx,
573 instance.name, disk.size, size)
575 self.cfg.Update(instance, feedback_fn)
576 changed.append((instance.name, idx, "size", size))
577 if es_flags[node_uuid]:
579 self.LogWarning("Disk %d of instance %s did not return valid"
580 " spindles information, ignoring", idx,
582 elif disk.spindles is None or disk.spindles != spindles:
583 self.LogInfo("Disk %d of instance %s has mismatched spindles,"
584 " correcting: recorded %s, actual %s",
585 idx, instance.name, disk.spindles, spindles)
586 disk.spindles = spindles
587 self.cfg.Update(instance, feedback_fn)
588 changed.append((instance.name, idx, "spindles", disk.spindles))
589 if self._EnsureChildSizes(disk):
590 self.cfg.Update(instance, feedback_fn)
591 changed.append((instance.name, idx, "size", disk.size))
595 def _ValidateNetmask(cfg, netmask):
596 """Checks if a netmask is valid.
598 @type cfg: L{config.ConfigWriter}
599 @param cfg: The cluster configuration
601 @param netmask: the netmask to be verified
602 @raise errors.OpPrereqError: if the validation fails
605 ip_family = cfg.GetPrimaryIPFamily()
607 ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
608 except errors.ProgrammerError:
609 raise errors.OpPrereqError("Invalid primary ip family: %s." %
610 ip_family, errors.ECODE_INVAL)
611 if not ipcls.ValidateNetmask(netmask):
612 raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
613 (netmask), errors.ECODE_INVAL)
616 def CheckFileBasedStoragePathVsEnabledDiskTemplates(
617 logging_warn_fn, file_storage_dir, enabled_disk_templates,
619 """Checks whether the given file-based storage directory is acceptable.
621 Note: This function is public, because it is also used in bootstrap.py.
623 @type logging_warn_fn: function
624 @param logging_warn_fn: function which accepts a string and logs it
625 @type file_storage_dir: string
626 @param file_storage_dir: the directory to be used for file-based instances
627 @type enabled_disk_templates: list of string
628 @param enabled_disk_templates: the list of enabled disk templates
629 @type file_disk_template: string
630 @param file_disk_template: the file-based disk template for which the
631 path should be checked
634 assert (file_disk_template in
635 utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
636 file_storage_enabled = file_disk_template in enabled_disk_templates
637 if file_storage_dir is not None:
638 if file_storage_dir == "":
639 if file_storage_enabled:
640 raise errors.OpPrereqError(
641 "Unsetting the '%s' storage directory while having '%s' storage"
642 " enabled is not permitted." %
643 (file_disk_template, file_disk_template))
645 if not file_storage_enabled:
647 "Specified a %s storage directory, although %s storage is not"
648 " enabled." % (file_disk_template, file_disk_template))
650 raise errors.ProgrammerError("Received %s storage dir with value"
651 " 'None'." % file_disk_template)
654 def CheckFileStoragePathVsEnabledDiskTemplates(
655 logging_warn_fn, file_storage_dir, enabled_disk_templates):
656 """Checks whether the given file storage directory is acceptable.
658 @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
661 CheckFileBasedStoragePathVsEnabledDiskTemplates(
662 logging_warn_fn, file_storage_dir, enabled_disk_templates,
666 def CheckSharedFileStoragePathVsEnabledDiskTemplates(
667 logging_warn_fn, file_storage_dir, enabled_disk_templates):
668 """Checks whether the given shared file storage directory is acceptable.
670 @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
673 CheckFileBasedStoragePathVsEnabledDiskTemplates(
674 logging_warn_fn, file_storage_dir, enabled_disk_templates,
675 constants.DT_SHARED_FILE)
678 class LUClusterSetParams(LogicalUnit):
679 """Change the parameters of the cluster.
682 HPATH = "cluster-modify"
683 HTYPE = constants.HTYPE_CLUSTER
686 def CheckArguments(self):
691 uidpool.CheckUidPool(self.op.uid_pool)
694 uidpool.CheckUidPool(self.op.add_uids)
696 if self.op.remove_uids:
697 uidpool.CheckUidPool(self.op.remove_uids)
699 if self.op.master_netmask is not None:
700 _ValidateNetmask(self.cfg, self.op.master_netmask)
702 if self.op.diskparams:
703 for dt_params in self.op.diskparams.values():
704 utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
706 utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
707 except errors.OpPrereqError, err:
708 raise errors.OpPrereqError("While verify diskparams options: %s" % err,
711 def ExpandNames(self):
712 # FIXME: in the future maybe other cluster params won't require checking on
713 # all nodes to be modified.
714 # FIXME: This opcode changes cluster-wide settings. Is acquiring all
715 # resource locks the right thing, shouldn't it be the BGL instead?
716 self.needed_locks = {
717 locking.LEVEL_NODE: locking.ALL_SET,
718 locking.LEVEL_INSTANCE: locking.ALL_SET,
719 locking.LEVEL_NODEGROUP: locking.ALL_SET,
720 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
722 self.share_locks = ShareAll()
724 def BuildHooksEnv(self):
729 "OP_TARGET": self.cfg.GetClusterName(),
730 "NEW_VG_NAME": self.op.vg_name,
733 def BuildHooksNodes(self):
734 """Build hooks nodes.
737 mn = self.cfg.GetMasterNode()
740 def _CheckVgName(self, node_uuids, enabled_disk_templates,
741 new_enabled_disk_templates):
742 """Check the consistency of the vg name on all nodes and in case it gets
743 unset whether there are instances still using it.
746 lvm_is_enabled = utils.IsLvmEnabled(enabled_disk_templates)
747 lvm_gets_enabled = utils.LvmGetsEnabled(enabled_disk_templates,
748 new_enabled_disk_templates)
749 current_vg_name = self.cfg.GetVGName()
751 if self.op.vg_name == '':
753 raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
754 " disk templates are or get enabled.")
756 if self.op.vg_name is None:
757 if current_vg_name is None and lvm_is_enabled:
758 raise errors.OpPrereqError("Please specify a volume group when"
759 " enabling lvm-based disk-templates.")
761 if self.op.vg_name is not None and not self.op.vg_name:
762 if self.cfg.HasAnyDiskOfType(constants.DT_PLAIN):
763 raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
764 " instances exist", errors.ECODE_INVAL)
766 if (self.op.vg_name is not None and lvm_is_enabled) or \
767 (self.cfg.GetVGName() is not None and lvm_gets_enabled):
768 self._CheckVgNameOnNodes(node_uuids)
770 def _CheckVgNameOnNodes(self, node_uuids):
771 """Check the status of the volume group on each node.
774 vglist = self.rpc.call_vg_list(node_uuids)
775 for node_uuid in node_uuids:
776 msg = vglist[node_uuid].fail_msg
779 self.LogWarning("Error while gathering data on node %s"
780 " (ignoring node): %s",
781 self.cfg.GetNodeName(node_uuid), msg)
783 vgstatus = utils.CheckVolumeGroupSize(vglist[node_uuid].payload,
785 constants.MIN_VG_SIZE)
787 raise errors.OpPrereqError("Error on node '%s': %s" %
788 (self.cfg.GetNodeName(node_uuid), vgstatus),
789 errors.ECODE_ENVIRON)
792 def _GetEnabledDiskTemplatesInner(op_enabled_disk_templates,
793 old_enabled_disk_templates):
794 """Determines the enabled disk templates and the subset of disk templates
795 that are newly enabled by this operation.
798 enabled_disk_templates = None
799 new_enabled_disk_templates = []
800 if op_enabled_disk_templates:
801 enabled_disk_templates = op_enabled_disk_templates
802 new_enabled_disk_templates = \
803 list(set(enabled_disk_templates)
804 - set(old_enabled_disk_templates))
806 enabled_disk_templates = old_enabled_disk_templates
807 return (enabled_disk_templates, new_enabled_disk_templates)
809 def _GetEnabledDiskTemplates(self, cluster):
810 """Determines the enabled disk templates and the subset of disk templates
811 that are newly enabled by this operation.
814 return self._GetEnabledDiskTemplatesInner(self.op.enabled_disk_templates,
815 cluster.enabled_disk_templates)
817 def _CheckIpolicy(self, cluster, enabled_disk_templates):
818 """Checks the ipolicy.
820 @type cluster: C{objects.Cluster}
821 @param cluster: the cluster's configuration
822 @type enabled_disk_templates: list of string
823 @param enabled_disk_templates: list of (possibly newly) enabled disk
827 # FIXME: write unit tests for this
829 self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
832 CheckIpolicyVsDiskTemplates(self.new_ipolicy,
833 enabled_disk_templates)
835 all_instances = self.cfg.GetAllInstancesInfo().values()
837 for group in self.cfg.GetAllNodeGroupsInfo().values():
838 instances = frozenset([inst for inst in all_instances
839 if compat.any(nuuid in group.members
840 for nuuid in inst.all_nodes)])
841 new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
842 ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
843 new = ComputeNewInstanceViolations(ipol, new_ipolicy, instances,
846 violations.update(new)
849 self.LogWarning("After the ipolicy change the following instances"
851 utils.CommaJoin(utils.NiceSort(violations)))
853 CheckIpolicyVsDiskTemplates(cluster.ipolicy,
854 enabled_disk_templates)
856 def _CheckDrbdHelperOnNodes(self, drbd_helper, node_uuids):
857 """Checks whether the set DRBD helper actually exists on the nodes.
859 @type drbd_helper: string
860 @param drbd_helper: path of the drbd usermode helper binary
861 @type node_uuids: list of strings
862 @param node_uuids: list of node UUIDs to check for the helper
865 # checks given drbd helper on all nodes
866 helpers = self.rpc.call_drbd_helper(node_uuids)
867 for (_, ninfo) in self.cfg.GetMultiNodeInfo(node_uuids):
869 self.LogInfo("Not checking drbd helper on offline node %s",
872 msg = helpers[ninfo.uuid].fail_msg
874 raise errors.OpPrereqError("Error checking drbd helper on node"
875 " '%s': %s" % (ninfo.name, msg),
876 errors.ECODE_ENVIRON)
877 node_helper = helpers[ninfo.uuid].payload
878 if node_helper != drbd_helper:
879 raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
880 (ninfo.name, node_helper),
881 errors.ECODE_ENVIRON)
883 def _CheckDrbdHelper(self, node_uuids, drbd_enabled, drbd_gets_enabled):
884 """Check the DRBD usermode helper.
886 @type node_uuids: list of strings
887 @param node_uuids: a list of nodes' UUIDs
888 @type drbd_enabled: boolean
889 @param drbd_enabled: whether DRBD will be enabled after this operation
890 (no matter if it was disabled before or not)
891 @type drbd_gets_enabled: boolen
892 @param drbd_gets_enabled: true if DRBD was disabled before this
893 operation, but will be enabled afterwards
896 if self.op.drbd_helper == '':
898 raise errors.OpPrereqError("Cannot disable drbd helper while"
900 if self.cfg.HasAnyDiskOfType(constants.DT_DRBD8):
901 raise errors.OpPrereqError("Cannot disable drbd helper while"
902 " drbd-based instances exist",
906 if self.op.drbd_helper is not None and drbd_enabled:
907 self._CheckDrbdHelperOnNodes(self.op.drbd_helper, node_uuids)
909 if drbd_gets_enabled:
910 current_drbd_helper = self.cfg.GetClusterInfo().drbd_usermode_helper
911 if current_drbd_helper is not None:
912 self._CheckDrbdHelperOnNodes(current_drbd_helper, node_uuids)
914 raise errors.OpPrereqError("Cannot enable DRBD without a"
915 " DRBD usermode helper set.")
917 def CheckPrereq(self):
918 """Check prerequisites.
920 This checks whether the given params don't conflict and
921 if the given volume group is valid.
924 node_uuids = self.owned_locks(locking.LEVEL_NODE)
925 self.cluster = cluster = self.cfg.GetClusterInfo()
927 vm_capable_node_uuids = [node.uuid
928 for node in self.cfg.GetAllNodesInfo().values()
929 if node.uuid in node_uuids and node.vm_capable]
931 (enabled_disk_templates, new_enabled_disk_templates) = \
932 self._GetEnabledDiskTemplates(cluster)
934 self._CheckVgName(vm_capable_node_uuids, enabled_disk_templates,
935 new_enabled_disk_templates)
937 if self.op.file_storage_dir is not None:
938 CheckFileStoragePathVsEnabledDiskTemplates(
939 self.LogWarning, self.op.file_storage_dir, enabled_disk_templates)
941 if self.op.shared_file_storage_dir is not None:
942 CheckSharedFileStoragePathVsEnabledDiskTemplates(
943 self.LogWarning, self.op.shared_file_storage_dir,
944 enabled_disk_templates)
946 drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
947 drbd_gets_enabled = constants.DT_DRBD8 in new_enabled_disk_templates
948 self._CheckDrbdHelper(node_uuids, drbd_enabled, drbd_gets_enabled)
950 # validate params changes
952 objects.UpgradeBeParams(self.op.beparams)
953 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
954 self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
957 utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
958 self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
960 # TODO: we need a more general way to handle resetting
961 # cluster-level parameters to default values
962 if self.new_ndparams["oob_program"] == "":
963 self.new_ndparams["oob_program"] = \
964 constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
967 new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
968 self.cluster.hv_state_static)
969 self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
970 for hv, values in new_hv_state.items())
972 if self.op.disk_state:
973 new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
974 self.cluster.disk_state_static)
975 self.new_disk_state = \
976 dict((storage, dict((name, cluster.SimpleFillDiskState(values))
977 for name, values in svalues.items()))
978 for storage, svalues in new_disk_state.items())
980 self._CheckIpolicy(cluster, enabled_disk_templates)
982 if self.op.nicparams:
983 utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
984 self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
985 objects.NIC.CheckParameterSyntax(self.new_nicparams)
988 # check all instances for consistency
989 for instance in self.cfg.GetAllInstancesInfo().values():
990 for nic_idx, nic in enumerate(instance.nics):
991 params_copy = copy.deepcopy(nic.nicparams)
992 params_filled = objects.FillDict(self.new_nicparams, params_copy)
994 # check parameter syntax
996 objects.NIC.CheckParameterSyntax(params_filled)
997 except errors.ConfigurationError, err:
998 nic_errors.append("Instance %s, nic/%d: %s" %
999 (instance.name, nic_idx, err))
1001 # if we're moving instances to routed, check that they have an ip
1002 target_mode = params_filled[constants.NIC_MODE]
1003 if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
1004 nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
1005 " address" % (instance.name, nic_idx))
1007 raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
1008 "\n".join(nic_errors), errors.ECODE_INVAL)
1010 # hypervisor list/parameters
1011 self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
1012 if self.op.hvparams:
1013 for hv_name, hv_dict in self.op.hvparams.items():
1014 if hv_name not in self.new_hvparams:
1015 self.new_hvparams[hv_name] = hv_dict
1017 self.new_hvparams[hv_name].update(hv_dict)
1019 # disk template parameters
1020 self.new_diskparams = objects.FillDict(cluster.diskparams, {})
1021 if self.op.diskparams:
1022 for dt_name, dt_params in self.op.diskparams.items():
1023 if dt_name not in self.new_diskparams:
1024 self.new_diskparams[dt_name] = dt_params
1026 self.new_diskparams[dt_name].update(dt_params)
1028 # os hypervisor parameters
1029 self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
1031 for os_name, hvs in self.op.os_hvp.items():
1032 if os_name not in self.new_os_hvp:
1033 self.new_os_hvp[os_name] = hvs
1035 for hv_name, hv_dict in hvs.items():
1037 # Delete if it exists
1038 self.new_os_hvp[os_name].pop(hv_name, None)
1039 elif hv_name not in self.new_os_hvp[os_name]:
1040 self.new_os_hvp[os_name][hv_name] = hv_dict
1042 self.new_os_hvp[os_name][hv_name].update(hv_dict)
1045 self.new_osp = objects.FillDict(cluster.osparams, {})
1046 if self.op.osparams:
1047 for os_name, osp in self.op.osparams.items():
1048 if os_name not in self.new_osp:
1049 self.new_osp[os_name] = {}
1051 self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
1054 if not self.new_osp[os_name]:
1055 # we removed all parameters
1056 del self.new_osp[os_name]
1058 # check the parameter validity (remote check)
1059 CheckOSParams(self, False, [self.cfg.GetMasterNode()],
1060 os_name, self.new_osp[os_name])
1062 # changes to the hypervisor list
1063 if self.op.enabled_hypervisors is not None:
1064 self.hv_list = self.op.enabled_hypervisors
1065 for hv in self.hv_list:
1066 # if the hypervisor doesn't already exist in the cluster
1067 # hvparams, we initialize it to empty, and then (in both
1068 # cases) we make sure to fill the defaults, as we might not
1069 # have a complete defaults list if the hypervisor wasn't
1071 if hv not in new_hvp:
1073 new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
1074 utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
1076 self.hv_list = cluster.enabled_hypervisors
1078 if self.op.hvparams or self.op.enabled_hypervisors is not None:
1079 # either the enabled list has changed, or the parameters have, validate
1080 for hv_name, hv_params in self.new_hvparams.items():
1081 if ((self.op.hvparams and hv_name in self.op.hvparams) or
1082 (self.op.enabled_hypervisors and
1083 hv_name in self.op.enabled_hypervisors)):
1084 # either this is a new hypervisor, or its parameters have changed
1085 hv_class = hypervisor.GetHypervisorClass(hv_name)
1086 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1087 hv_class.CheckParameterSyntax(hv_params)
1088 CheckHVParams(self, node_uuids, hv_name, hv_params)
1090 self._CheckDiskTemplateConsistency()
1093 # no need to check any newly-enabled hypervisors, since the
1094 # defaults have already been checked in the above code-block
1095 for os_name, os_hvp in self.new_os_hvp.items():
1096 for hv_name, hv_params in os_hvp.items():
1097 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1098 # we need to fill in the new os_hvp on top of the actual hv_p
1099 cluster_defaults = self.new_hvparams.get(hv_name, {})
1100 new_osp = objects.FillDict(cluster_defaults, hv_params)
1101 hv_class = hypervisor.GetHypervisorClass(hv_name)
1102 hv_class.CheckParameterSyntax(new_osp)
1103 CheckHVParams(self, node_uuids, hv_name, new_osp)
1105 if self.op.default_iallocator:
1106 alloc_script = utils.FindFile(self.op.default_iallocator,
1107 constants.IALLOCATOR_SEARCH_PATH,
1109 if alloc_script is None:
1110 raise errors.OpPrereqError("Invalid default iallocator script '%s'"
1111 " specified" % self.op.default_iallocator,
1114 def _CheckDiskTemplateConsistency(self):
1115 """Check whether the disk templates that are going to be disabled
1116 are still in use by some instances.
1119 if self.op.enabled_disk_templates:
1120 cluster = self.cfg.GetClusterInfo()
1121 instances = self.cfg.GetAllInstancesInfo()
1123 disk_templates_to_remove = set(cluster.enabled_disk_templates) \
1124 - set(self.op.enabled_disk_templates)
1125 for instance in instances.itervalues():
1126 if instance.disk_template in disk_templates_to_remove:
1127 raise errors.OpPrereqError("Cannot disable disk template '%s',"
1128 " because instance '%s' is using it." %
1129 (instance.disk_template, instance.name))
1131 def _SetVgName(self, feedback_fn):
1132 """Determines and sets the new volume group name.
1135 if self.op.vg_name is not None:
1136 new_volume = self.op.vg_name
1139 if new_volume != self.cfg.GetVGName():
1140 self.cfg.SetVGName(new_volume)
1142 feedback_fn("Cluster LVM configuration already in desired"
1143 " state, not changing")
1145 def _SetFileStorageDir(self, feedback_fn):
1146 """Set the file storage directory.
1149 if self.op.file_storage_dir is not None:
1150 if self.cluster.file_storage_dir == self.op.file_storage_dir:
1151 feedback_fn("Global file storage dir already set to value '%s'"
1152 % self.cluster.file_storage_dir)
1154 self.cluster.file_storage_dir = self.op.file_storage_dir
1156 def _SetDrbdHelper(self, feedback_fn):
1157 """Set the DRBD usermode helper.
1160 if self.op.drbd_helper is not None:
1161 if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
1162 feedback_fn("Note that you specified a drbd user helper, but did not"
1163 " enable the drbd disk template.")
1164 new_helper = self.op.drbd_helper
1167 if new_helper != self.cfg.GetDRBDHelper():
1168 self.cfg.SetDRBDHelper(new_helper)
1170 feedback_fn("Cluster DRBD helper already in desired state,"
1173 def Exec(self, feedback_fn):
1174 """Change the parameters of the cluster.
1177 if self.op.enabled_disk_templates:
1178 self.cluster.enabled_disk_templates = \
1179 list(set(self.op.enabled_disk_templates))
1181 self._SetVgName(feedback_fn)
1182 self._SetFileStorageDir(feedback_fn)
1183 self._SetDrbdHelper(feedback_fn)
1185 if self.op.hvparams:
1186 self.cluster.hvparams = self.new_hvparams
1188 self.cluster.os_hvp = self.new_os_hvp
1189 if self.op.enabled_hypervisors is not None:
1190 self.cluster.hvparams = self.new_hvparams
1191 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1192 if self.op.beparams:
1193 self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1194 if self.op.nicparams:
1195 self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1197 self.cluster.ipolicy = self.new_ipolicy
1198 if self.op.osparams:
1199 self.cluster.osparams = self.new_osp
1200 if self.op.ndparams:
1201 self.cluster.ndparams = self.new_ndparams
1202 if self.op.diskparams:
1203 self.cluster.diskparams = self.new_diskparams
1204 if self.op.hv_state:
1205 self.cluster.hv_state_static = self.new_hv_state
1206 if self.op.disk_state:
1207 self.cluster.disk_state_static = self.new_disk_state
1209 if self.op.candidate_pool_size is not None:
1210 self.cluster.candidate_pool_size = self.op.candidate_pool_size
1211 # we need to update the pool size here, otherwise the save will fail
1212 AdjustCandidatePool(self, [])
1214 if self.op.maintain_node_health is not None:
1215 if self.op.maintain_node_health and not constants.ENABLE_CONFD:
1216 feedback_fn("Note: CONFD was disabled at build time, node health"
1217 " maintenance is not useful (still enabling it)")
1218 self.cluster.maintain_node_health = self.op.maintain_node_health
1220 if self.op.modify_etc_hosts is not None:
1221 self.cluster.modify_etc_hosts = self.op.modify_etc_hosts
1223 if self.op.prealloc_wipe_disks is not None:
1224 self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
1226 if self.op.add_uids is not None:
1227 uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
1229 if self.op.remove_uids is not None:
1230 uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
1232 if self.op.uid_pool is not None:
1233 self.cluster.uid_pool = self.op.uid_pool
1235 if self.op.default_iallocator is not None:
1236 self.cluster.default_iallocator = self.op.default_iallocator
1238 if self.op.reserved_lvs is not None:
1239 self.cluster.reserved_lvs = self.op.reserved_lvs
1241 if self.op.use_external_mip_script is not None:
1242 self.cluster.use_external_mip_script = self.op.use_external_mip_script
1244 def helper_os(aname, mods, desc):
1246 lst = getattr(self.cluster, aname)
1247 for key, val in mods:
1248 if key == constants.DDM_ADD:
1250 feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1253 elif key == constants.DDM_REMOVE:
1257 feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1259 raise errors.ProgrammerError("Invalid modification '%s'" % key)
1261 if self.op.hidden_os:
1262 helper_os("hidden_os", self.op.hidden_os, "hidden")
1264 if self.op.blacklisted_os:
1265 helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1267 if self.op.master_netdev:
1268 master_params = self.cfg.GetMasterNetworkParameters()
1269 ems = self.cfg.GetUseExternalMipScript()
1270 feedback_fn("Shutting down master ip on the current netdev (%s)" %
1271 self.cluster.master_netdev)
1272 result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
1274 if not self.op.force:
1275 result.Raise("Could not disable the master ip")
1278 msg = ("Could not disable the master ip (continuing anyway): %s" %
1281 feedback_fn("Changing master_netdev from %s to %s" %
1282 (master_params.netdev, self.op.master_netdev))
1283 self.cluster.master_netdev = self.op.master_netdev
1285 if self.op.master_netmask:
1286 master_params = self.cfg.GetMasterNetworkParameters()
1287 feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1288 result = self.rpc.call_node_change_master_netmask(
1289 master_params.uuid, master_params.netmask,
1290 self.op.master_netmask, master_params.ip,
1291 master_params.netdev)
1292 result.Warn("Could not change the master IP netmask", feedback_fn)
1293 self.cluster.master_netmask = self.op.master_netmask
1295 self.cfg.Update(self.cluster, feedback_fn)
1297 if self.op.master_netdev:
1298 master_params = self.cfg.GetMasterNetworkParameters()
1299 feedback_fn("Starting the master ip on the new master netdev (%s)" %
1300 self.op.master_netdev)
1301 ems = self.cfg.GetUseExternalMipScript()
1302 result = self.rpc.call_node_activate_master_ip(master_params.uuid,
1304 result.Warn("Could not re-enable the master ip on the master,"
1305 " please restart manually", self.LogWarning)
1308 class LUClusterVerify(NoHooksLU):
1309 """Submits all jobs necessary to verify the cluster.
1314 def ExpandNames(self):
1315 self.needed_locks = {}
1317 def Exec(self, feedback_fn):
1320 if self.op.group_name:
1321 groups = [self.op.group_name]
1322 depends_fn = lambda: None
1324 groups = self.cfg.GetNodeGroupList()
1326 # Verify global configuration
1328 opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1331 # Always depend on global verification
1332 depends_fn = lambda: [(-len(jobs), [])]
1335 [opcodes.OpClusterVerifyGroup(group_name=group,
1336 ignore_errors=self.op.ignore_errors,
1337 depends=depends_fn())]
1338 for group in groups)
1340 # Fix up all parameters
1341 for op in itertools.chain(*jobs): # pylint: disable=W0142
1342 op.debug_simulate_errors = self.op.debug_simulate_errors
1343 op.verbose = self.op.verbose
1344 op.error_codes = self.op.error_codes
1346 op.skip_checks = self.op.skip_checks
1347 except AttributeError:
1348 assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1350 return ResultWithJobs(jobs)
1353 class _VerifyErrors(object):
1354 """Mix-in for cluster/group verify LUs.
1356 It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1357 self.op and self._feedback_fn to be available.)
1361 ETYPE_FIELD = "code"
1362 ETYPE_ERROR = "ERROR"
1363 ETYPE_WARNING = "WARNING"
1365 def _Error(self, ecode, item, msg, *args, **kwargs):
1366 """Format an error message.
1368 Based on the opcode's error_codes parameter, either format a
1369 parseable error code, or a simpler error string.
1371 This must be called only from Exec and functions called from Exec.
1374 ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1375 itype, etxt, _ = ecode
1376 # If the error code is in the list of ignored errors, demote the error to a
1378 if etxt in self.op.ignore_errors: # pylint: disable=E1101
1379 ltype = self.ETYPE_WARNING
1380 # first complete the msg
1383 # then format the whole message
1384 if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1385 msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1391 msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1392 # and finally report it via the feedback_fn
1393 self._feedback_fn(" - %s" % msg) # Mix-in. pylint: disable=E1101
1394 # do not mark the operation as failed for WARN cases only
1395 if ltype == self.ETYPE_ERROR:
1398 def _ErrorIf(self, cond, *args, **kwargs):
1399 """Log an error message if the passed condition is True.
1403 or self.op.debug_simulate_errors): # pylint: disable=E1101
1404 self._Error(*args, **kwargs)
1407 def _VerifyCertificate(filename):
1408 """Verifies a certificate for L{LUClusterVerifyConfig}.
1410 @type filename: string
1411 @param filename: Path to PEM file
1415 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1416 utils.ReadFile(filename))
1417 except Exception, err: # pylint: disable=W0703
1418 return (LUClusterVerifyConfig.ETYPE_ERROR,
1419 "Failed to load X509 certificate %s: %s" % (filename, err))
1422 utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1423 constants.SSL_CERT_EXPIRATION_ERROR)
1426 fnamemsg = "While verifying %s: %s" % (filename, msg)
1431 return (None, fnamemsg)
1432 elif errcode == utils.CERT_WARNING:
1433 return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1434 elif errcode == utils.CERT_ERROR:
1435 return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1437 raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1440 def _GetAllHypervisorParameters(cluster, instances):
1441 """Compute the set of all hypervisor parameters.
1443 @type cluster: L{objects.Cluster}
1444 @param cluster: the cluster object
1445 @param instances: list of L{objects.Instance}
1446 @param instances: additional instances from which to obtain parameters
1447 @rtype: list of (origin, hypervisor, parameters)
1448 @return: a list with all parameters found, indicating the hypervisor they
1449 apply to, and the origin (can be "cluster", "os X", or "instance Y")
1454 for hv_name in cluster.enabled_hypervisors:
1455 hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1457 for os_name, os_hvp in cluster.os_hvp.items():
1458 for hv_name, hv_params in os_hvp.items():
1460 full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1461 hvp_data.append(("os %s" % os_name, hv_name, full_params))
1463 # TODO: collapse identical parameter values in a single one
1464 for instance in instances:
1465 if instance.hvparams:
1466 hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1467 cluster.FillHV(instance)))
1472 class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1473 """Verifies the cluster config.
1478 def _VerifyHVP(self, hvp_data):
1479 """Verifies locally the syntax of the hypervisor parameters.
1482 for item, hv_name, hv_params in hvp_data:
1483 msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1486 hv_class = hypervisor.GetHypervisorClass(hv_name)
1487 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1488 hv_class.CheckParameterSyntax(hv_params)
1489 except errors.GenericError, err:
1490 self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1492 def ExpandNames(self):
1493 self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1494 self.share_locks = ShareAll()
1496 def CheckPrereq(self):
1497 """Check prerequisites.
1500 # Retrieve all information
1501 self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1502 self.all_node_info = self.cfg.GetAllNodesInfo()
1503 self.all_inst_info = self.cfg.GetAllInstancesInfo()
1505 def Exec(self, feedback_fn):
1506 """Verify integrity of cluster, performing various test on nodes.
1510 self._feedback_fn = feedback_fn
1512 feedback_fn("* Verifying cluster config")
1514 for msg in self.cfg.VerifyConfig():
1515 self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1517 feedback_fn("* Verifying cluster certificate files")
1519 for cert_filename in pathutils.ALL_CERT_FILES:
1520 (errcode, msg) = _VerifyCertificate(cert_filename)
1521 self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1523 self._ErrorIf(not utils.CanRead(constants.LUXID_USER,
1524 pathutils.NODED_CERT_FILE),
1525 constants.CV_ECLUSTERCERT,
1527 pathutils.NODED_CERT_FILE + " must be accessible by the " +
1528 constants.LUXID_USER + " user")
1530 feedback_fn("* Verifying hypervisor parameters")
1532 self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1533 self.all_inst_info.values()))
1535 feedback_fn("* Verifying all nodes belong to an existing group")
1537 # We do this verification here because, should this bogus circumstance
1538 # occur, it would never be caught by VerifyGroup, which only acts on
1539 # nodes/instances reachable from existing node groups.
1541 dangling_nodes = set(node for node in self.all_node_info.values()
1542 if node.group not in self.all_group_info)
1544 dangling_instances = {}
1545 no_node_instances = []
1547 for inst in self.all_inst_info.values():
1548 if inst.primary_node in [node.uuid for node in dangling_nodes]:
1549 dangling_instances.setdefault(inst.primary_node, []).append(inst)
1550 elif inst.primary_node not in self.all_node_info:
1551 no_node_instances.append(inst)
1556 utils.CommaJoin(inst.name for
1557 inst in dangling_instances.get(node.uuid, [])))
1558 for node in dangling_nodes]
1560 self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1562 "the following nodes (and their instances) belong to a non"
1563 " existing group: %s", utils.CommaJoin(pretty_dangling))
1565 self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1567 "the following instances have a non-existing primary-node:"
1568 " %s", utils.CommaJoin(inst.name for
1569 inst in no_node_instances))
1574 class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1575 """Verifies the status of a node group.
1578 HPATH = "cluster-verify"
1579 HTYPE = constants.HTYPE_CLUSTER
1582 _HOOKS_INDENT_RE = re.compile("^", re.M)
1584 class NodeImage(object):
1585 """A class representing the logical and physical status of a node.
1588 @ivar uuid: the node UUID to which this object refers
1589 @ivar volumes: a structure as returned from
1590 L{ganeti.backend.GetVolumeList} (runtime)
1591 @ivar instances: a list of running instances (runtime)
1592 @ivar pinst: list of configured primary instances (config)
1593 @ivar sinst: list of configured secondary instances (config)
1594 @ivar sbp: dictionary of {primary-node: list of instances} for all
1595 instances for which this node is secondary (config)
1596 @ivar mfree: free memory, as reported by hypervisor (runtime)
1597 @ivar dfree: free disk, as reported by the node (runtime)
1598 @ivar offline: the offline status (config)
1599 @type rpc_fail: boolean
1600 @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1601 not whether the individual keys were correct) (runtime)
1602 @type lvm_fail: boolean
1603 @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1604 @type hyp_fail: boolean
1605 @ivar hyp_fail: whether the RPC call didn't return the instance list
1606 @type ghost: boolean
1607 @ivar ghost: whether this is a known node or not (config)
1608 @type os_fail: boolean
1609 @ivar os_fail: whether the RPC call didn't return valid OS data
1611 @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1612 @type vm_capable: boolean
1613 @ivar vm_capable: whether the node can host instances
1615 @ivar pv_min: size in MiB of the smallest PVs
1617 @ivar pv_max: size in MiB of the biggest PVs
1620 def __init__(self, offline=False, uuid=None, vm_capable=True):
1629 self.offline = offline
1630 self.vm_capable = vm_capable
1631 self.rpc_fail = False
1632 self.lvm_fail = False
1633 self.hyp_fail = False
1635 self.os_fail = False
1640 def ExpandNames(self):
1641 # This raises errors.OpPrereqError on its own:
1642 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1644 # Get instances in node group; this is unsafe and needs verification later
1646 self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1648 self.needed_locks = {
1649 locking.LEVEL_INSTANCE: self.cfg.GetInstanceNames(inst_uuids),
1650 locking.LEVEL_NODEGROUP: [self.group_uuid],
1651 locking.LEVEL_NODE: [],
1653 # This opcode is run by watcher every five minutes and acquires all nodes
1654 # for a group. It doesn't run for a long time, so it's better to acquire
1655 # the node allocation lock as well.
1656 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1659 self.share_locks = ShareAll()
1661 def DeclareLocks(self, level):
1662 if level == locking.LEVEL_NODE:
1663 # Get members of node group; this is unsafe and needs verification later
1664 nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1666 # In Exec(), we warn about mirrored instances that have primary and
1667 # secondary living in separate node groups. To fully verify that
1668 # volumes for these instances are healthy, we will need to do an
1669 # extra call to their secondaries. We ensure here those nodes will
1671 for inst_name in self.owned_locks(locking.LEVEL_INSTANCE):
1672 # Important: access only the instances whose lock is owned
1673 instance = self.cfg.GetInstanceInfoByName(inst_name)
1674 if instance.disk_template in constants.DTS_INT_MIRROR:
1675 nodes.update(instance.secondary_nodes)
1677 self.needed_locks[locking.LEVEL_NODE] = nodes
1679 def CheckPrereq(self):
1680 assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1681 self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1683 group_node_uuids = set(self.group_info.members)
1684 group_inst_uuids = \
1685 self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1687 unlocked_node_uuids = \
1688 group_node_uuids.difference(self.owned_locks(locking.LEVEL_NODE))
1690 unlocked_inst_uuids = \
1691 group_inst_uuids.difference(
1692 [self.cfg.GetInstanceInfoByName(name).uuid
1693 for name in self.owned_locks(locking.LEVEL_INSTANCE)])
1695 if unlocked_node_uuids:
1696 raise errors.OpPrereqError(
1697 "Missing lock for nodes: %s" %
1698 utils.CommaJoin(self.cfg.GetNodeNames(unlocked_node_uuids)),
1701 if unlocked_inst_uuids:
1702 raise errors.OpPrereqError(
1703 "Missing lock for instances: %s" %
1704 utils.CommaJoin(self.cfg.GetInstanceNames(unlocked_inst_uuids)),
1707 self.all_node_info = self.cfg.GetAllNodesInfo()
1708 self.all_inst_info = self.cfg.GetAllInstancesInfo()
1710 self.my_node_uuids = group_node_uuids
1711 self.my_node_info = dict((node_uuid, self.all_node_info[node_uuid])
1712 for node_uuid in group_node_uuids)
1714 self.my_inst_uuids = group_inst_uuids
1715 self.my_inst_info = dict((inst_uuid, self.all_inst_info[inst_uuid])
1716 for inst_uuid in group_inst_uuids)
1718 # We detect here the nodes that will need the extra RPC calls for verifying
1719 # split LV volumes; they should be locked.
1720 extra_lv_nodes = set()
1722 for inst in self.my_inst_info.values():
1723 if inst.disk_template in constants.DTS_INT_MIRROR:
1724 for nuuid in inst.all_nodes:
1725 if self.all_node_info[nuuid].group != self.group_uuid:
1726 extra_lv_nodes.add(nuuid)
1728 unlocked_lv_nodes = \
1729 extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1731 if unlocked_lv_nodes:
1732 raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1733 utils.CommaJoin(unlocked_lv_nodes),
1735 self.extra_lv_nodes = list(extra_lv_nodes)
1737 def _VerifyNode(self, ninfo, nresult):
1738 """Perform some basic validation on data returned from a node.
1740 - check the result data structure is well formed and has all the
1742 - check ganeti version
1744 @type ninfo: L{objects.Node}
1745 @param ninfo: the node to check
1746 @param nresult: the results from the node
1748 @return: whether overall this call was successful (and we can expect
1749 reasonable values in the respose)
1752 # main result, nresult should be a non-empty dict
1753 test = not nresult or not isinstance(nresult, dict)
1754 self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1755 "unable to verify node: no data returned")
1759 # compares ganeti version
1760 local_version = constants.PROTOCOL_VERSION
1761 remote_version = nresult.get("version", None)
1762 test = not (remote_version and
1763 isinstance(remote_version, (list, tuple)) and
1764 len(remote_version) == 2)
1765 self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1766 "connection to node returned invalid data")
1770 test = local_version != remote_version[0]
1771 self._ErrorIf(test, constants.CV_ENODEVERSION, ninfo.name,
1772 "incompatible protocol versions: master %s,"
1773 " node %s", local_version, remote_version[0])
1777 # node seems compatible, we can actually try to look into its results
1779 # full package version
1780 self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1781 constants.CV_ENODEVERSION, ninfo.name,
1782 "software version mismatch: master %s, node %s",
1783 constants.RELEASE_VERSION, remote_version[1],
1784 code=self.ETYPE_WARNING)
1786 hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1787 if ninfo.vm_capable and isinstance(hyp_result, dict):
1788 for hv_name, hv_result in hyp_result.iteritems():
1789 test = hv_result is not None
1790 self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
1791 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1793 hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1794 if ninfo.vm_capable and isinstance(hvp_result, list):
1795 for item, hv_name, hv_result in hvp_result:
1796 self._ErrorIf(True, constants.CV_ENODEHV, ninfo.name,
1797 "hypervisor %s parameter verify failure (source %s): %s",
1798 hv_name, item, hv_result)
1800 test = nresult.get(constants.NV_NODESETUP,
1801 ["Missing NODESETUP results"])
1802 self._ErrorIf(test, constants.CV_ENODESETUP, ninfo.name,
1803 "node setup error: %s", "; ".join(test))
1807 def _VerifyNodeTime(self, ninfo, nresult,
1808 nvinfo_starttime, nvinfo_endtime):
1809 """Check the node time.
1811 @type ninfo: L{objects.Node}
1812 @param ninfo: the node to check
1813 @param nresult: the remote results for the node
1814 @param nvinfo_starttime: the start time of the RPC call
1815 @param nvinfo_endtime: the end time of the RPC call
1818 ntime = nresult.get(constants.NV_TIME, None)
1820 ntime_merged = utils.MergeTime(ntime)
1821 except (ValueError, TypeError):
1822 self._ErrorIf(True, constants.CV_ENODETIME, ninfo.name,
1823 "Node returned invalid time")
1826 if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1827 ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1828 elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1829 ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1833 self._ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, ninfo.name,
1834 "Node time diverges by at least %s from master node time",
1837 def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1838 """Check the node LVM results and update info for cross-node checks.
1840 @type ninfo: L{objects.Node}
1841 @param ninfo: the node to check
1842 @param nresult: the remote results for the node
1843 @param vg_name: the configured VG name
1844 @type nimg: L{NodeImage}
1845 @param nimg: node image
1851 # checks vg existence and size > 20G
1852 vglist = nresult.get(constants.NV_VGLIST, None)
1854 self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
1855 "unable to check volume groups")
1857 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1858 constants.MIN_VG_SIZE)
1859 self._ErrorIf(vgstatus, constants.CV_ENODELVM, ninfo.name, vgstatus)
1862 (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1864 self._Error(constants.CV_ENODELVM, ninfo.name, em)
1865 if pvminmax is not None:
1866 (nimg.pv_min, nimg.pv_max) = pvminmax
1868 def _VerifyGroupDRBDVersion(self, node_verify_infos):
1869 """Check cross-node DRBD version consistency.
1871 @type node_verify_infos: dict
1872 @param node_verify_infos: infos about nodes as returned from the
1877 for node_uuid, ndata in node_verify_infos.items():
1878 nresult = ndata.payload
1879 version = nresult.get(constants.NV_DRBDVERSION, "Missing DRBD version")
1880 node_versions[node_uuid] = version
1882 if len(set(node_versions.values())) > 1:
1883 for node_uuid, version in sorted(node_versions.items()):
1884 msg = "DRBD version mismatch: %s" % version
1885 self._Error(constants.CV_ENODEDRBDHELPER, node_uuid, msg,
1886 code=self.ETYPE_WARNING)
1888 def _VerifyGroupLVM(self, node_image, vg_name):
1889 """Check cross-node consistency in LVM.
1891 @type node_image: dict
1892 @param node_image: info about nodes, mapping from node to names to
1893 L{NodeImage} objects
1894 @param vg_name: the configured VG name
1900 # Only exclusive storage needs this kind of checks
1901 if not self._exclusive_storage:
1904 # exclusive_storage wants all PVs to have the same size (approximately),
1905 # if the smallest and the biggest ones are okay, everything is fine.
1906 # pv_min is None iff pv_max is None
1907 vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1910 (pvmin, minnode_uuid) = min((ni.pv_min, ni.uuid) for ni in vals)
1911 (pvmax, maxnode_uuid) = max((ni.pv_max, ni.uuid) for ni in vals)
1912 bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1913 self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1914 "PV sizes differ too much in the group; smallest (%s MB) is"
1915 " on %s, biggest (%s MB) is on %s",
1916 pvmin, self.cfg.GetNodeName(minnode_uuid),
1917 pvmax, self.cfg.GetNodeName(maxnode_uuid))
1919 def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1920 """Check the node bridges.
1922 @type ninfo: L{objects.Node}
1923 @param ninfo: the node to check
1924 @param nresult: the remote results for the node
1925 @param bridges: the expected list of bridges
1931 missing = nresult.get(constants.NV_BRIDGES, None)
1932 test = not isinstance(missing, list)
1933 self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1934 "did not return valid bridge information")
1936 self._ErrorIf(bool(missing), constants.CV_ENODENET, ninfo.name,
1937 "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1939 def _VerifyNodeUserScripts(self, ninfo, nresult):
1940 """Check the results of user scripts presence and executability on the node
1942 @type ninfo: L{objects.Node}
1943 @param ninfo: the node to check
1944 @param nresult: the remote results for the node
1947 test = not constants.NV_USERSCRIPTS in nresult
1948 self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1949 "did not return user scripts information")
1951 broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
1953 self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1954 "user scripts not present or not executable: %s" %
1955 utils.CommaJoin(sorted(broken_scripts)))
1957 def _VerifyNodeNetwork(self, ninfo, nresult):
1958 """Check the node network connectivity results.
1960 @type ninfo: L{objects.Node}
1961 @param ninfo: the node to check
1962 @param nresult: the remote results for the node
1965 test = constants.NV_NODELIST not in nresult
1966 self._ErrorIf(test, constants.CV_ENODESSH, ninfo.name,
1967 "node hasn't returned node ssh connectivity data")
1969 if nresult[constants.NV_NODELIST]:
1970 for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1971 self._ErrorIf(True, constants.CV_ENODESSH, ninfo.name,
1972 "ssh communication with node '%s': %s", a_node, a_msg)
1974 test = constants.NV_NODENETTEST not in nresult
1975 self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1976 "node hasn't returned node tcp connectivity data")
1978 if nresult[constants.NV_NODENETTEST]:
1979 nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1981 self._ErrorIf(True, constants.CV_ENODENET, ninfo.name,
1982 "tcp communication with node '%s': %s",
1983 anode, nresult[constants.NV_NODENETTEST][anode])
1985 test = constants.NV_MASTERIP not in nresult
1986 self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1987 "node hasn't returned node master IP reachability data")
1989 if not nresult[constants.NV_MASTERIP]:
1990 if ninfo.uuid == self.master_node:
1991 msg = "the master node cannot reach the master IP (not configured?)"
1993 msg = "cannot reach the master IP"
1994 self._ErrorIf(True, constants.CV_ENODENET, ninfo.name, msg)
1996 def _VerifyInstance(self, instance, node_image, diskstatus):
1997 """Verify an instance.
1999 This function checks to see if the required block devices are
2000 available on the instance's node, and that the nodes are in the correct
2004 pnode_uuid = instance.primary_node
2005 pnode_img = node_image[pnode_uuid]
2006 groupinfo = self.cfg.GetAllNodeGroupsInfo()
2008 node_vol_should = {}
2009 instance.MapLVsByNode(node_vol_should)
2011 cluster = self.cfg.GetClusterInfo()
2012 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2014 err = ComputeIPolicyInstanceViolation(ipolicy, instance, self.cfg)
2015 self._ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance.name,
2016 utils.CommaJoin(err), code=self.ETYPE_WARNING)
2018 for node_uuid in node_vol_should:
2019 n_img = node_image[node_uuid]
2020 if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
2021 # ignore missing volumes on offline or broken nodes
2023 for volume in node_vol_should[node_uuid]:
2024 test = volume not in n_img.volumes
2025 self._ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance.name,
2026 "volume %s missing on node %s", volume,
2027 self.cfg.GetNodeName(node_uuid))
2029 if instance.admin_state == constants.ADMINST_UP:
2030 test = instance.uuid not in pnode_img.instances and not pnode_img.offline
2031 self._ErrorIf(test, constants.CV_EINSTANCEDOWN, instance.name,
2032 "instance not running on its primary node %s",
2033 self.cfg.GetNodeName(pnode_uuid))
2034 self._ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE,
2035 instance.name, "instance is marked as running and lives on"
2036 " offline node %s", self.cfg.GetNodeName(pnode_uuid))
2038 diskdata = [(nname, success, status, idx)
2039 for (nname, disks) in diskstatus.items()
2040 for idx, (success, status) in enumerate(disks)]
2042 for nname, success, bdev_status, idx in diskdata:
2043 # the 'ghost node' construction in Exec() ensures that we have a
2045 snode = node_image[nname]
2046 bad_snode = snode.ghost or snode.offline
2047 self._ErrorIf(instance.disks_active and
2048 not success and not bad_snode,
2049 constants.CV_EINSTANCEFAULTYDISK, instance.name,
2050 "couldn't retrieve status for disk/%s on %s: %s",
2051 idx, self.cfg.GetNodeName(nname), bdev_status)
2053 if instance.disks_active and success and \
2054 (bdev_status.is_degraded or
2055 bdev_status.ldisk_status != constants.LDS_OKAY):
2056 msg = "disk/%s on %s" % (idx, self.cfg.GetNodeName(nname))
2057 if bdev_status.is_degraded:
2058 msg += " is degraded"
2059 if bdev_status.ldisk_status != constants.LDS_OKAY:
2060 msg += "; state is '%s'" % \
2061 constants.LDS_NAMES[bdev_status.ldisk_status]
2063 self._Error(constants.CV_EINSTANCEFAULTYDISK, instance.name, msg)
2065 self._ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2066 constants.CV_ENODERPC, self.cfg.GetNodeName(pnode_uuid),
2067 "instance %s, connection to primary node failed",
2070 self._ErrorIf(len(instance.secondary_nodes) > 1,
2071 constants.CV_EINSTANCELAYOUT, instance.name,
2072 "instance has multiple secondary nodes: %s",
2073 utils.CommaJoin(instance.secondary_nodes),
2074 code=self.ETYPE_WARNING)
2076 es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, instance.all_nodes)
2077 if any(es_flags.values()):
2078 if instance.disk_template not in constants.DTS_EXCL_STORAGE:
2079 # Disk template not compatible with exclusive_storage: no instance
2080 # node should have the flag set
2082 for (n, es) in es_flags.items()
2084 self._Error(constants.CV_EINSTANCEUNSUITABLENODE, instance.name,
2085 "instance has template %s, which is not supported on nodes"
2086 " that have exclusive storage set: %s",
2087 instance.disk_template,
2088 utils.CommaJoin(self.cfg.GetNodeNames(es_nodes)))
2089 for (idx, disk) in enumerate(instance.disks):
2090 self._ErrorIf(disk.spindles is None,
2091 constants.CV_EINSTANCEMISSINGCFGPARAMETER, instance.name,
2092 "number of spindles not configured for disk %s while"
2093 " exclusive storage is enabled, try running"
2094 " gnt-cluster repair-disk-sizes", idx)
2096 if instance.disk_template in constants.DTS_INT_MIRROR:
2097 instance_nodes = utils.NiceSort(instance.all_nodes)
2098 instance_groups = {}
2100 for node_uuid in instance_nodes:
2101 instance_groups.setdefault(self.all_node_info[node_uuid].group,
2102 []).append(node_uuid)
2105 "%s (group %s)" % (utils.CommaJoin(self.cfg.GetNodeNames(nodes)),
2106 groupinfo[group].name)
2107 # Sort so that we always list the primary node first.
2108 for group, nodes in sorted(instance_groups.items(),
2109 key=lambda (_, nodes): pnode_uuid in nodes,
2112 self._ErrorIf(len(instance_groups) > 1,
2113 constants.CV_EINSTANCESPLITGROUPS,
2114 instance.name, "instance has primary and secondary nodes in"
2115 " different groups: %s", utils.CommaJoin(pretty_list),
2116 code=self.ETYPE_WARNING)
2118 inst_nodes_offline = []
2119 for snode in instance.secondary_nodes:
2120 s_img = node_image[snode]
2121 self._ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
2122 self.cfg.GetNodeName(snode),
2123 "instance %s, connection to secondary node failed",
2127 inst_nodes_offline.append(snode)
2129 # warn that the instance lives on offline nodes
2130 self._ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE,
2131 instance.name, "instance has offline secondary node(s) %s",
2132 utils.CommaJoin(self.cfg.GetNodeNames(inst_nodes_offline)))
2133 # ... or ghost/non-vm_capable nodes
2134 for node_uuid in instance.all_nodes:
2135 self._ErrorIf(node_image[node_uuid].ghost, constants.CV_EINSTANCEBADNODE,
2136 instance.name, "instance lives on ghost node %s",
2137 self.cfg.GetNodeName(node_uuid))
2138 self._ErrorIf(not node_image[node_uuid].vm_capable,
2139 constants.CV_EINSTANCEBADNODE, instance.name,
2140 "instance lives on non-vm_capable node %s",
2141 self.cfg.GetNodeName(node_uuid))
2143 def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2144 """Verify if there are any unknown volumes in the cluster.
2146 The .os, .swap and backup volumes are ignored. All other volumes are
2147 reported as unknown.
2149 @type reserved: L{ganeti.utils.FieldSet}
2150 @param reserved: a FieldSet of reserved volume names
2153 for node_uuid, n_img in node_image.items():
2154 if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
2155 self.all_node_info[node_uuid].group != self.group_uuid):
2156 # skip non-healthy nodes
2158 for volume in n_img.volumes:
2159 test = ((node_uuid not in node_vol_should or
2160 volume not in node_vol_should[node_uuid]) and
2161 not reserved.Matches(volume))
2162 self._ErrorIf(test, constants.CV_ENODEORPHANLV,
2163 self.cfg.GetNodeName(node_uuid),
2164 "volume %s is unknown", volume)
2166 def _VerifyNPlusOneMemory(self, node_image, all_insts):
2167 """Verify N+1 Memory Resilience.
2169 Check that if one single node dies we can still start all the
2170 instances it was primary for.
2173 cluster_info = self.cfg.GetClusterInfo()
2174 for node_uuid, n_img in node_image.items():
2175 # This code checks that every node which is now listed as
2176 # secondary has enough memory to host all instances it is
2177 # supposed to should a single other node in the cluster fail.
2178 # FIXME: not ready for failover to an arbitrary node
2179 # FIXME: does not support file-backed instances
2180 # WARNING: we currently take into account down instances as well
2181 # as up ones, considering that even if they're down someone
2182 # might want to start them even in the event of a node failure.
2183 if n_img.offline or \
2184 self.all_node_info[node_uuid].group != self.group_uuid:
2185 # we're skipping nodes marked offline and nodes in other groups from
2186 # the N+1 warning, since most likely we don't have good memory
2187 # information from them; we already list instances living on such
2188 # nodes, and that's enough warning
2190 #TODO(dynmem): also consider ballooning out other instances
2191 for prinode, inst_uuids in n_img.sbp.items():
2193 for inst_uuid in inst_uuids:
2194 bep = cluster_info.FillBE(all_insts[inst_uuid])
2195 if bep[constants.BE_AUTO_BALANCE]:
2196 needed_mem += bep[constants.BE_MINMEM]
2197 test = n_img.mfree < needed_mem
2198 self._ErrorIf(test, constants.CV_ENODEN1,
2199 self.cfg.GetNodeName(node_uuid),
2200 "not enough memory to accomodate instance failovers"
2201 " should node %s fail (%dMiB needed, %dMiB available)",
2202 self.cfg.GetNodeName(prinode), needed_mem, n_img.mfree)
2204 def _VerifyFiles(self, nodes, master_node_uuid, all_nvinfo,
2205 (files_all, files_opt, files_mc, files_vm)):
2206 """Verifies file checksums collected from all nodes.
2208 @param nodes: List of L{objects.Node} objects
2209 @param master_node_uuid: UUID of master node
2210 @param all_nvinfo: RPC results
2213 # Define functions determining which nodes to consider for a file
2216 (files_mc, lambda node: (node.master_candidate or
2217 node.uuid == master_node_uuid)),
2218 (files_vm, lambda node: node.vm_capable),
2221 # Build mapping from filename to list of nodes which should have the file
2223 for (files, fn) in files2nodefn:
2227 filenodes = filter(fn, nodes)
2228 nodefiles.update((filename,
2229 frozenset(map(operator.attrgetter("uuid"), filenodes)))
2230 for filename in files)
2232 assert set(nodefiles) == (files_all | files_mc | files_vm)
2234 fileinfo = dict((filename, {}) for filename in nodefiles)
2235 ignore_nodes = set()
2239 ignore_nodes.add(node.uuid)
2242 nresult = all_nvinfo[node.uuid]
2244 if nresult.fail_msg or not nresult.payload:
2247 fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
2248 node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2249 for (key, value) in fingerprints.items())
2252 test = not (node_files and isinstance(node_files, dict))
2253 self._ErrorIf(test, constants.CV_ENODEFILECHECK, node.name,
2254 "Node did not return file checksum data")
2256 ignore_nodes.add(node.uuid)
2259 # Build per-checksum mapping from filename to nodes having it
2260 for (filename, checksum) in node_files.items():
2261 assert filename in nodefiles
2262 fileinfo[filename].setdefault(checksum, set()).add(node.uuid)
2264 for (filename, checksums) in fileinfo.items():
2265 assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2267 # Nodes having the file
2268 with_file = frozenset(node_uuid
2269 for node_uuids in fileinfo[filename].values()
2270 for node_uuid in node_uuids) - ignore_nodes
2272 expected_nodes = nodefiles[filename] - ignore_nodes
2274 # Nodes missing file
2275 missing_file = expected_nodes - with_file
2277 if filename in files_opt:
2279 self._ErrorIf(missing_file and missing_file != expected_nodes,
2280 constants.CV_ECLUSTERFILECHECK, None,
2281 "File %s is optional, but it must exist on all or no"
2282 " nodes (not found on %s)",
2286 map(self.cfg.GetNodeName, missing_file))))
2288 self._ErrorIf(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2289 "File %s is missing from node(s) %s", filename,
2292 map(self.cfg.GetNodeName, missing_file))))
2294 # Warn if a node has a file it shouldn't
2295 unexpected = with_file - expected_nodes
2296 self._ErrorIf(unexpected,
2297 constants.CV_ECLUSTERFILECHECK, None,
2298 "File %s should not exist on node(s) %s",
2299 filename, utils.CommaJoin(
2300 utils.NiceSort(map(self.cfg.GetNodeName, unexpected))))
2302 # See if there are multiple versions of the file
2303 test = len(checksums) > 1
2305 variants = ["variant %s on %s" %
2307 utils.CommaJoin(utils.NiceSort(
2308 map(self.cfg.GetNodeName, node_uuids))))
2309 for (idx, (checksum, node_uuids)) in
2310 enumerate(sorted(checksums.items()))]
2314 self._ErrorIf(test, constants.CV_ECLUSTERFILECHECK, None,
2315 "File %s found with %s different checksums (%s)",
2316 filename, len(checksums), "; ".join(variants))
2318 def _VerifyNodeDrbdHelper(self, ninfo, nresult, drbd_helper):
2319 """Verify the drbd helper.
2323 helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2324 test = (helper_result is None)
2325 self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2326 "no drbd usermode helper returned")
2328 status, payload = helper_result
2330 self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2331 "drbd usermode helper check unsuccessful: %s", payload)
2332 test = status and (payload != drbd_helper)
2333 self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2334 "wrong drbd usermode helper: %s", payload)
2336 def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2338 """Verifies and the node DRBD status.
2340 @type ninfo: L{objects.Node}
2341 @param ninfo: the node to check
2342 @param nresult: the remote results for the node
2343 @param instanceinfo: the dict of instances
2344 @param drbd_helper: the configured DRBD usermode helper
2345 @param drbd_map: the DRBD map as returned by
2346 L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2349 self._VerifyNodeDrbdHelper(ninfo, nresult, drbd_helper)
2351 # compute the DRBD minors
2353 for minor, inst_uuid in drbd_map[ninfo.uuid].items():
2354 test = inst_uuid not in instanceinfo
2355 self._ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2356 "ghost instance '%s' in temporary DRBD map", inst_uuid)
2357 # ghost instance should not be running, but otherwise we
2358 # don't give double warnings (both ghost instance and
2359 # unallocated minor in use)
2361 node_drbd[minor] = (inst_uuid, False)
2363 instance = instanceinfo[inst_uuid]
2364 node_drbd[minor] = (inst_uuid, instance.disks_active)
2366 # and now check them
2367 used_minors = nresult.get(constants.NV_DRBDLIST, [])
2368 test = not isinstance(used_minors, (tuple, list))
2369 self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2370 "cannot parse drbd status file: %s", str(used_minors))
2372 # we cannot check drbd status
2375 for minor, (inst_uuid, must_exist) in node_drbd.items():
2376 test = minor not in used_minors and must_exist
2377 self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2378 "drbd minor %d of instance %s is not active", minor,
2379 self.cfg.GetInstanceName(inst_uuid))
2380 for minor in used_minors:
2381 test = minor not in node_drbd
2382 self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2383 "unallocated drbd minor %d is in use", minor)
2385 def _UpdateNodeOS(self, ninfo, nresult, nimg):
2386 """Builds the node OS structures.
2388 @type ninfo: L{objects.Node}
2389 @param ninfo: the node to check
2390 @param nresult: the remote results for the node
2391 @param nimg: the node image object
2394 remote_os = nresult.get(constants.NV_OSLIST, None)
2395 test = (not isinstance(remote_os, list) or
2396 not compat.all(isinstance(v, list) and len(v) == 7
2397 for v in remote_os))
2399 self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2400 "node hasn't returned valid OS data")
2409 for (name, os_path, status, diagnose,
2410 variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2412 if name not in os_dict:
2415 # parameters is a list of lists instead of list of tuples due to
2416 # JSON lacking a real tuple type, fix it:
2417 parameters = [tuple(v) for v in parameters]
2418 os_dict[name].append((os_path, status, diagnose,
2419 set(variants), set(parameters), set(api_ver)))
2421 nimg.oslist = os_dict
2423 def _VerifyNodeOS(self, ninfo, nimg, base):
2424 """Verifies the node OS list.
2426 @type ninfo: L{objects.Node}
2427 @param ninfo: the node to check
2428 @param nimg: the node image object
2429 @param base: the 'template' node we match against (e.g. from the master)
2432 assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2434 beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2435 for os_name, os_data in nimg.oslist.items():
2436 assert os_data, "Empty OS status for OS %s?!" % os_name
2437 f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2438 self._ErrorIf(not f_status, constants.CV_ENODEOS, ninfo.name,
2439 "Invalid OS %s (located at %s): %s",
2440 os_name, f_path, f_diag)
2441 self._ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, ninfo.name,
2442 "OS '%s' has multiple entries"
2443 " (first one shadows the rest): %s",
2444 os_name, utils.CommaJoin([v[0] for v in os_data]))
2445 # comparisons with the 'base' image
2446 test = os_name not in base.oslist
2447 self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2448 "Extra OS %s not present on reference node (%s)",
2449 os_name, self.cfg.GetNodeName(base.uuid))
2452 assert base.oslist[os_name], "Base node has empty OS status?"
2453 _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2455 # base OS is invalid, skipping
2457 for kind, a, b in [("API version", f_api, b_api),
2458 ("variants list", f_var, b_var),
2459 ("parameters", beautify_params(f_param),
2460 beautify_params(b_param))]:
2461 self._ErrorIf(a != b, constants.CV_ENODEOS, ninfo.name,
2462 "OS %s for %s differs from reference node %s:"
2463 " [%s] vs. [%s]", kind, os_name,
2464 self.cfg.GetNodeName(base.uuid),
2465 utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2467 # check any missing OSes
2468 missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2469 self._ErrorIf(missing, constants.CV_ENODEOS, ninfo.name,
2470 "OSes present on reference node %s"
2471 " but missing on this node: %s",
2472 self.cfg.GetNodeName(base.uuid), utils.CommaJoin(missing))
2474 def _VerifyAcceptedFileStoragePaths(self, ninfo, nresult, is_master):
2475 """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2477 @type ninfo: L{objects.Node}
2478 @param ninfo: the node to check
2479 @param nresult: the remote results for the node
2480 @type is_master: bool
2481 @param is_master: Whether node is the master node
2484 cluster = self.cfg.GetClusterInfo()
2486 (cluster.IsFileStorageEnabled() or
2487 cluster.IsSharedFileStorageEnabled())):
2489 fspaths = nresult[constants.NV_ACCEPTED_STORAGE_PATHS]
2491 # This should never happen
2492 self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2493 "Node did not return forbidden file storage paths")
2495 self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2496 "Found forbidden file storage paths: %s",
2497 utils.CommaJoin(fspaths))
2499 self._ErrorIf(constants.NV_ACCEPTED_STORAGE_PATHS in nresult,
2500 constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2501 "Node should not have returned forbidden file storage"
2504 def _VerifyStoragePaths(self, ninfo, nresult, file_disk_template,
2505 verify_key, error_key):
2506 """Verifies (file) storage paths.
2508 @type ninfo: L{objects.Node}
2509 @param ninfo: the node to check
2510 @param nresult: the remote results for the node
2511 @type file_disk_template: string
2512 @param file_disk_template: file-based disk template, whose directory
2513 is supposed to be verified
2514 @type verify_key: string
2515 @param verify_key: key for the verification map of this file
2517 @param error_key: error key to be added to the verification results
2518 in case something goes wrong in this verification step
2521 assert (file_disk_template in
2522 utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
2523 cluster = self.cfg.GetClusterInfo()
2524 if cluster.IsDiskTemplateEnabled(file_disk_template):
2526 verify_key in nresult,
2527 error_key, ninfo.name,
2528 "The configured %s storage path is unusable: %s" %
2529 (file_disk_template, nresult.get(verify_key)))
2531 def _VerifyFileStoragePaths(self, ninfo, nresult):
2532 """Verifies (file) storage paths.
2534 @see: C{_VerifyStoragePaths}
2537 self._VerifyStoragePaths(
2538 ninfo, nresult, constants.DT_FILE,
2539 constants.NV_FILE_STORAGE_PATH,
2540 constants.CV_ENODEFILESTORAGEPATHUNUSABLE)
2542 def _VerifySharedFileStoragePaths(self, ninfo, nresult):
2543 """Verifies (file) storage paths.
2545 @see: C{_VerifyStoragePaths}
2548 self._VerifyStoragePaths(
2549 ninfo, nresult, constants.DT_SHARED_FILE,
2550 constants.NV_SHARED_FILE_STORAGE_PATH,
2551 constants.CV_ENODESHAREDFILESTORAGEPATHUNUSABLE)
2553 def _VerifyOob(self, ninfo, nresult):
2554 """Verifies out of band functionality of a node.
2556 @type ninfo: L{objects.Node}
2557 @param ninfo: the node to check
2558 @param nresult: the remote results for the node
2561 # We just have to verify the paths on master and/or master candidates
2562 # as the oob helper is invoked on the master
2563 if ((ninfo.master_candidate or ninfo.master_capable) and
2564 constants.NV_OOB_PATHS in nresult):
2565 for path_result in nresult[constants.NV_OOB_PATHS]:
2566 self._ErrorIf(path_result, constants.CV_ENODEOOBPATH,
2567 ninfo.name, path_result)
2569 def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2570 """Verifies and updates the node volume data.
2572 This function will update a L{NodeImage}'s internal structures
2573 with data from the remote call.
2575 @type ninfo: L{objects.Node}
2576 @param ninfo: the node to check
2577 @param nresult: the remote results for the node
2578 @param nimg: the node image object
2579 @param vg_name: the configured VG name
2582 nimg.lvm_fail = True
2583 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2586 elif isinstance(lvdata, basestring):
2587 self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2588 "LVM problem on node: %s", utils.SafeEncode(lvdata))
2589 elif not isinstance(lvdata, dict):
2590 self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2591 "rpc call to node failed (lvlist)")
2593 nimg.volumes = lvdata
2594 nimg.lvm_fail = False
2596 def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2597 """Verifies and updates the node instance list.
2599 If the listing was successful, then updates this node's instance
2600 list. Otherwise, it marks the RPC call as failed for the instance
2603 @type ninfo: L{objects.Node}
2604 @param ninfo: the node to check
2605 @param nresult: the remote results for the node
2606 @param nimg: the node image object
2609 idata = nresult.get(constants.NV_INSTANCELIST, None)
2610 test = not isinstance(idata, list)
2611 self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2612 "rpc call to node failed (instancelist): %s",
2613 utils.SafeEncode(str(idata)))
2615 nimg.hyp_fail = True
2617 nimg.instances = [inst.uuid for (_, inst) in
2618 self.cfg.GetMultiInstanceInfoByName(idata)]
2620 def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2621 """Verifies and computes a node information map
2623 @type ninfo: L{objects.Node}
2624 @param ninfo: the node to check
2625 @param nresult: the remote results for the node
2626 @param nimg: the node image object
2627 @param vg_name: the configured VG name
2630 # try to read free memory (from the hypervisor)
2631 hv_info = nresult.get(constants.NV_HVINFO, None)
2632 test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2633 self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2634 "rpc call to node failed (hvinfo)")
2637 nimg.mfree = int(hv_info["memory_free"])
2638 except (ValueError, TypeError):
2639 self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2640 "node returned invalid nodeinfo, check hypervisor")
2642 # FIXME: devise a free space model for file based instances as well
2643 if vg_name is not None:
2644 test = (constants.NV_VGLIST not in nresult or
2645 vg_name not in nresult[constants.NV_VGLIST])
2646 self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
2647 "node didn't return data for the volume group '%s'"
2648 " - it is either missing or broken", vg_name)
2651 nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2652 except (ValueError, TypeError):
2653 self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2654 "node returned invalid LVM info, check LVM status")
2656 def _CollectDiskInfo(self, node_uuids, node_image, instanceinfo):
2657 """Gets per-disk status information for all instances.
2659 @type node_uuids: list of strings
2660 @param node_uuids: Node UUIDs
2661 @type node_image: dict of (UUID, L{objects.Node})
2662 @param node_image: Node objects
2663 @type instanceinfo: dict of (UUID, L{objects.Instance})
2664 @param instanceinfo: Instance objects
2665 @rtype: {instance: {node: [(succes, payload)]}}
2666 @return: a dictionary of per-instance dictionaries with nodes as
2667 keys and disk information as values; the disk information is a
2668 list of tuples (success, payload)
2672 node_disks_dev_inst_only = {}
2673 diskless_instances = set()
2674 diskless = constants.DT_DISKLESS
2676 for nuuid in node_uuids:
2677 node_inst_uuids = list(itertools.chain(node_image[nuuid].pinst,
2678 node_image[nuuid].sinst))
2679 diskless_instances.update(uuid for uuid in node_inst_uuids
2680 if instanceinfo[uuid].disk_template == diskless)
2681 disks = [(inst_uuid, disk)
2682 for inst_uuid in node_inst_uuids
2683 for disk in instanceinfo[inst_uuid].disks]
2686 # No need to collect data
2689 node_disks[nuuid] = disks
2691 # _AnnotateDiskParams makes already copies of the disks
2693 for (inst_uuid, dev) in disks:
2694 (anno_disk,) = AnnotateDiskParams(instanceinfo[inst_uuid], [dev],
2696 dev_inst_only.append((anno_disk, instanceinfo[inst_uuid]))
2698 node_disks_dev_inst_only[nuuid] = dev_inst_only
2700 assert len(node_disks) == len(node_disks_dev_inst_only)
2702 # Collect data from all nodes with disks
2703 result = self.rpc.call_blockdev_getmirrorstatus_multi(
2704 node_disks.keys(), node_disks_dev_inst_only)
2706 assert len(result) == len(node_disks)
2710 for (nuuid, nres) in result.items():
2711 node = self.cfg.GetNodeInfo(nuuid)
2712 disks = node_disks[node.uuid]
2715 # No data from this node
2716 data = len(disks) * [(False, "node offline")]
2719 self._ErrorIf(msg, constants.CV_ENODERPC, node.name,
2720 "while getting disk information: %s", msg)
2722 # No data from this node
2723 data = len(disks) * [(False, msg)]
2726 for idx, i in enumerate(nres.payload):
2727 if isinstance(i, (tuple, list)) and len(i) == 2:
2730 logging.warning("Invalid result from node %s, entry %d: %s",
2732 data.append((False, "Invalid result from the remote node"))
2734 for ((inst_uuid, _), status) in zip(disks, data):
2735 instdisk.setdefault(inst_uuid, {}).setdefault(node.uuid, []) \
2738 # Add empty entries for diskless instances.
2739 for inst_uuid in diskless_instances:
2740 assert inst_uuid not in instdisk
2741 instdisk[inst_uuid] = {}
2743 assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2744 len(nuuids) <= len(instanceinfo[inst].all_nodes) and
2745 compat.all(isinstance(s, (tuple, list)) and
2746 len(s) == 2 for s in statuses)
2747 for inst, nuuids in instdisk.items()
2748 for nuuid, statuses in nuuids.items())
2750 instdisk_keys = set(instdisk)
2751 instanceinfo_keys = set(instanceinfo)
2752 assert instdisk_keys == instanceinfo_keys, \
2753 ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2754 (instdisk_keys, instanceinfo_keys))
2759 def _SshNodeSelector(group_uuid, all_nodes):
2760 """Create endless iterators for all potential SSH check hosts.
2763 nodes = [node for node in all_nodes
2764 if (node.group != group_uuid and
2766 keyfunc = operator.attrgetter("group")
2768 return map(itertools.cycle,
2769 [sorted(map(operator.attrgetter("name"), names))
2770 for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2774 def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2775 """Choose which nodes should talk to which other nodes.
2777 We will make nodes contact all nodes in their group, and one node from
2780 @warning: This algorithm has a known issue if one node group is much
2781 smaller than others (e.g. just one node). In such a case all other
2782 nodes will talk to the single node.
2785 online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2786 sel = cls._SshNodeSelector(group_uuid, all_nodes)
2788 return (online_nodes,
2789 dict((name, sorted([i.next() for i in sel]))
2790 for name in online_nodes))
2792 def BuildHooksEnv(self):
2795 Cluster-Verify hooks just ran in the post phase and their failure makes
2796 the output be logged in the verify output and the verification to fail.
2800 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2803 env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2804 for node in self.my_node_info.values())
2808 def BuildHooksNodes(self):
2809 """Build hooks nodes.
2812 return ([], list(self.my_node_info.keys()))
2814 def Exec(self, feedback_fn):
2815 """Verify integrity of the node group, performing various test on nodes.
2818 # This method has too many local variables. pylint: disable=R0914
2819 feedback_fn("* Verifying group '%s'" % self.group_info.name)
2821 if not self.my_node_uuids:
2823 feedback_fn("* Empty node group, skipping verification")
2827 verbose = self.op.verbose
2828 self._feedback_fn = feedback_fn
2830 vg_name = self.cfg.GetVGName()
2831 drbd_helper = self.cfg.GetDRBDHelper()
2832 cluster = self.cfg.GetClusterInfo()
2833 hypervisors = cluster.enabled_hypervisors
2834 node_data_list = self.my_node_info.values()
2836 i_non_redundant = [] # Non redundant instances
2837 i_non_a_balanced = [] # Non auto-balanced instances
2838 i_offline = 0 # Count of offline instances
2839 n_offline = 0 # Count of offline nodes
2840 n_drained = 0 # Count of nodes being drained
2841 node_vol_should = {}
2843 # FIXME: verify OS list
2846 filemap = ComputeAncillaryFiles(cluster, False)
2848 # do local checksums
2849 master_node_uuid = self.master_node = self.cfg.GetMasterNode()
2850 master_ip = self.cfg.GetMasterIP()
2852 feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_uuids))
2855 if self.cfg.GetUseExternalMipScript():
2856 user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2858 node_verify_param = {
2859 constants.NV_FILELIST:
2860 map(vcluster.MakeVirtualPath,
2861 utils.UniqueSequence(filename
2862 for files in filemap
2863 for filename in files)),
2864 constants.NV_NODELIST:
2865 self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2866 self.all_node_info.values()),
2867 constants.NV_HYPERVISOR: hypervisors,
2868 constants.NV_HVPARAMS:
2869 _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2870 constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2871 for node in node_data_list
2872 if not node.offline],
2873 constants.NV_INSTANCELIST: hypervisors,
2874 constants.NV_VERSION: None,
2875 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2876 constants.NV_NODESETUP: None,
2877 constants.NV_TIME: None,
2878 constants.NV_MASTERIP: (self.cfg.GetMasterNodeName(), master_ip),
2879 constants.NV_OSLIST: None,
2880 constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2881 constants.NV_USERSCRIPTS: user_scripts,
2884 if vg_name is not None:
2885 node_verify_param[constants.NV_VGLIST] = None
2886 node_verify_param[constants.NV_LVLIST] = vg_name
2887 node_verify_param[constants.NV_PVLIST] = [vg_name]
2889 if cluster.IsDiskTemplateEnabled(constants.DT_DRBD8):
2891 node_verify_param[constants.NV_DRBDVERSION] = None
2892 node_verify_param[constants.NV_DRBDLIST] = None
2893 node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2895 if cluster.IsFileStorageEnabled() or \
2896 cluster.IsSharedFileStorageEnabled():
2897 # Load file storage paths only from master node
2898 node_verify_param[constants.NV_ACCEPTED_STORAGE_PATHS] = \
2899 self.cfg.GetMasterNodeName()
2900 if cluster.IsFileStorageEnabled():
2901 node_verify_param[constants.NV_FILE_STORAGE_PATH] = \
2902 cluster.file_storage_dir
2905 # FIXME: this needs to be changed per node-group, not cluster-wide
2907 default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2908 if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2909 bridges.add(default_nicpp[constants.NIC_LINK])
2910 for inst_uuid in self.my_inst_info.values():
2911 for nic in inst_uuid.nics:
2912 full_nic = cluster.SimpleFillNIC(nic.nicparams)
2913 if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2914 bridges.add(full_nic[constants.NIC_LINK])
2917 node_verify_param[constants.NV_BRIDGES] = list(bridges)
2919 # Build our expected cluster state
2920 node_image = dict((node.uuid, self.NodeImage(offline=node.offline,
2922 vm_capable=node.vm_capable))
2923 for node in node_data_list)
2927 for node in self.all_node_info.values():
2928 path = SupportsOob(self.cfg, node)
2929 if path and path not in oob_paths:
2930 oob_paths.append(path)
2933 node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2935 for inst_uuid in self.my_inst_uuids:
2936 instance = self.my_inst_info[inst_uuid]
2937 if instance.admin_state == constants.ADMINST_OFFLINE:
2940 for nuuid in instance.all_nodes:
2941 if nuuid not in node_image:
2942 gnode = self.NodeImage(uuid=nuuid)
2943 gnode.ghost = (nuuid not in self.all_node_info)
2944 node_image[nuuid] = gnode
2946 instance.MapLVsByNode(node_vol_should)
2948 pnode = instance.primary_node
2949 node_image[pnode].pinst.append(instance.uuid)
2951 for snode in instance.secondary_nodes:
2952 nimg = node_image[snode]
2953 nimg.sinst.append(instance.uuid)
2954 if pnode not in nimg.sbp:
2955 nimg.sbp[pnode] = []
2956 nimg.sbp[pnode].append(instance.uuid)
2958 es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
2959 self.my_node_info.keys())
2960 # The value of exclusive_storage should be the same across the group, so if
2961 # it's True for at least a node, we act as if it were set for all the nodes
2962 self._exclusive_storage = compat.any(es_flags.values())
2963 if self._exclusive_storage:
2964 node_verify_param[constants.NV_EXCLUSIVEPVS] = True
2966 # At this point, we have the in-memory data structures complete,
2967 # except for the runtime information, which we'll gather next
2969 # Due to the way our RPC system works, exact response times cannot be
2970 # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2971 # time before and after executing the request, we can at least have a time
2973 nvinfo_starttime = time.time()
2974 all_nvinfo = self.rpc.call_node_verify(self.my_node_uuids,
2976 self.cfg.GetClusterName(),
2977 self.cfg.GetClusterInfo().hvparams)
2978 nvinfo_endtime = time.time()
2980 if self.extra_lv_nodes and vg_name is not None:
2982 self.rpc.call_node_verify(self.extra_lv_nodes,
2983 {constants.NV_LVLIST: vg_name},
2984 self.cfg.GetClusterName(),
2985 self.cfg.GetClusterInfo().hvparams)
2987 extra_lv_nvinfo = {}
2989 all_drbd_map = self.cfg.ComputeDRBDMap()
2991 feedback_fn("* Gathering disk information (%s nodes)" %
2992 len(self.my_node_uuids))
2993 instdisk = self._CollectDiskInfo(self.my_node_info.keys(), node_image,
2996 feedback_fn("* Verifying configuration file consistency")
2998 # If not all nodes are being checked, we need to make sure the master node
2999 # and a non-checked vm_capable node are in the list.
3000 absent_node_uuids = set(self.all_node_info).difference(self.my_node_info)
3001 if absent_node_uuids:
3002 vf_nvinfo = all_nvinfo.copy()
3003 vf_node_info = list(self.my_node_info.values())
3004 additional_node_uuids = []
3005 if master_node_uuid not in self.my_node_info:
3006 additional_node_uuids.append(master_node_uuid)
3007 vf_node_info.append(self.all_node_info[master_node_uuid])
3008 # Add the first vm_capable node we find which is not included,
3009 # excluding the master node (which we already have)
3010 for node_uuid in absent_node_uuids:
3011 nodeinfo = self.all_node_info[node_uuid]
3012 if (nodeinfo.vm_capable and not nodeinfo.offline and
3013 node_uuid != master_node_uuid):
3014 additional_node_uuids.append(node_uuid)
3015 vf_node_info.append(self.all_node_info[node_uuid])
3017 key = constants.NV_FILELIST
3018 vf_nvinfo.update(self.rpc.call_node_verify(
3019 additional_node_uuids, {key: node_verify_param[key]},
3020 self.cfg.GetClusterName(), self.cfg.GetClusterInfo().hvparams))
3022 vf_nvinfo = all_nvinfo
3023 vf_node_info = self.my_node_info.values()
3025 self._VerifyFiles(vf_node_info, master_node_uuid, vf_nvinfo, filemap)
3027 feedback_fn("* Verifying node status")
3031 for node_i in node_data_list:
3032 nimg = node_image[node_i.uuid]
3036 feedback_fn("* Skipping offline node %s" % (node_i.name,))
3040 if node_i.uuid == master_node_uuid:
3042 elif node_i.master_candidate:
3043 ntype = "master candidate"
3044 elif node_i.drained:
3050 feedback_fn("* Verifying node %s (%s)" % (node_i.name, ntype))
3052 msg = all_nvinfo[node_i.uuid].fail_msg
3053 self._ErrorIf(msg, constants.CV_ENODERPC, node_i.name,
3054 "while contacting node: %s", msg)
3056 nimg.rpc_fail = True
3059 nresult = all_nvinfo[node_i.uuid].payload
3061 nimg.call_ok = self._VerifyNode(node_i, nresult)
3062 self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
3063 self._VerifyNodeNetwork(node_i, nresult)
3064 self._VerifyNodeUserScripts(node_i, nresult)
3065 self._VerifyOob(node_i, nresult)
3066 self._VerifyAcceptedFileStoragePaths(node_i, nresult,
3067 node_i.uuid == master_node_uuid)
3068 self._VerifyFileStoragePaths(node_i, nresult)
3069 self._VerifySharedFileStoragePaths(node_i, nresult)
3072 self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
3073 self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
3076 self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
3077 self._UpdateNodeInstances(node_i, nresult, nimg)
3078 self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
3079 self._UpdateNodeOS(node_i, nresult, nimg)
3081 if not nimg.os_fail:
3082 if refos_img is None:
3084 self._VerifyNodeOS(node_i, nimg, refos_img)
3085 self._VerifyNodeBridges(node_i, nresult, bridges)
3087 # Check whether all running instances are primary for the node. (This
3088 # can no longer be done from _VerifyInstance below, since some of the
3089 # wrong instances could be from other node groups.)
3090 non_primary_inst_uuids = set(nimg.instances).difference(nimg.pinst)
3092 for inst_uuid in non_primary_inst_uuids:
3093 test = inst_uuid in self.all_inst_info
3094 self._ErrorIf(test, constants.CV_EINSTANCEWRONGNODE,
3095 self.cfg.GetInstanceName(inst_uuid),
3096 "instance should not run on node %s", node_i.name)
3097 self._ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
3098 "node is running unknown instance %s", inst_uuid)
3100 self._VerifyGroupDRBDVersion(all_nvinfo)
3101 self._VerifyGroupLVM(node_image, vg_name)
3103 for node_uuid, result in extra_lv_nvinfo.items():
3104 self._UpdateNodeVolumes(self.all_node_info[node_uuid], result.payload,
3105 node_image[node_uuid], vg_name)
3107 feedback_fn("* Verifying instance status")
3108 for inst_uuid in self.my_inst_uuids:
3109 instance = self.my_inst_info[inst_uuid]
3111 feedback_fn("* Verifying instance %s" % instance.name)
3112 self._VerifyInstance(instance, node_image, instdisk[inst_uuid])
3114 # If the instance is non-redundant we cannot survive losing its primary
3115 # node, so we are not N+1 compliant.
3116 if instance.disk_template not in constants.DTS_MIRRORED:
3117 i_non_redundant.append(instance)
3119 if not cluster.FillBE(instance)[constants.BE_AUTO_BALANCE]:
3120 i_non_a_balanced.append(instance)
3122 feedback_fn("* Verifying orphan volumes")
3123 reserved = utils.FieldSet(*cluster.reserved_lvs)
3125 # We will get spurious "unknown volume" warnings if any node of this group
3126 # is secondary for an instance whose primary is in another group. To avoid
3127 # them, we find these instances and add their volumes to node_vol_should.
3128 for instance in self.all_inst_info.values():
3129 for secondary in instance.secondary_nodes:
3130 if (secondary in self.my_node_info
3131 and instance.name not in self.my_inst_info):
3132 instance.MapLVsByNode(node_vol_should)
3135 self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
3137 if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
3138 feedback_fn("* Verifying N+1 Memory redundancy")
3139 self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
3141 feedback_fn("* Other Notes")
3143 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
3144 % len(i_non_redundant))
3146 if i_non_a_balanced:
3147 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
3148 % len(i_non_a_balanced))
3151 feedback_fn(" - NOTICE: %d offline instance(s) found." % i_offline)
3154 feedback_fn(" - NOTICE: %d offline node(s) found." % n_offline)
3157 feedback_fn(" - NOTICE: %d drained node(s) found." % n_drained)
3161 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
3162 """Analyze the post-hooks' result
3164 This method analyses the hook result, handles it, and sends some
3165 nicely-formatted feedback back to the user.
3167 @param phase: one of L{constants.HOOKS_PHASE_POST} or
3168 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
3169 @param hooks_results: the results of the multi-node hooks rpc call
3170 @param feedback_fn: function used send feedback back to the caller
3171 @param lu_result: previous Exec result
3172 @return: the new Exec result, based on the previous result
3176 # We only really run POST phase hooks, only for non-empty groups,
3177 # and are only interested in their results
3178 if not self.my_node_uuids:
3181 elif phase == constants.HOOKS_PHASE_POST:
3182 # Used to change hooks' output to proper indentation
3183 feedback_fn("* Hooks Results")
3184 assert hooks_results, "invalid result from hooks"
3186 for node_name in hooks_results:
3187 res = hooks_results[node_name]
3189 test = msg and not res.offline
3190 self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3191 "Communication failure in hooks execution: %s", msg)
3192 if res.offline or msg:
3193 # No need to investigate payload if node is offline or gave
3196 for script, hkr, output in res.payload:
3197 test = hkr == constants.HKR_FAIL
3198 self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3199 "Script %s failed, output:", script)
3201 output = self._HOOKS_INDENT_RE.sub(" ", output)
3202 feedback_fn("%s" % output)
3208 class LUClusterVerifyDisks(NoHooksLU):
3209 """Verifies the cluster disks status.
3214 def ExpandNames(self):
3215 self.share_locks = ShareAll()
3216 self.needed_locks = {
3217 locking.LEVEL_NODEGROUP: locking.ALL_SET,
3220 def Exec(self, feedback_fn):
3221 group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3223 # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3224 return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3225 for group in group_names])