4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Logical units dealing with the cluster."""
34 from ganeti import compat
35 from ganeti import constants
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import masterd
40 from ganeti import netutils
41 from ganeti import objects
42 from ganeti import opcodes
43 from ganeti import pathutils
44 from ganeti import query
45 from ganeti import rpc
46 from ganeti import runtime
47 from ganeti import ssh
48 from ganeti import uidpool
49 from ganeti import utils
50 from ganeti import vcluster
52 from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
54 from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55 ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56 GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57 GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58 CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59 ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob, \
60 CheckIpolicyVsDiskTemplates
62 import ganeti.masterd.instance
65 class LUClusterActivateMasterIp(NoHooksLU):
66 """Activate the master IP on the master node.
69 def Exec(self, feedback_fn):
70 """Activate the master IP.
73 master_params = self.cfg.GetMasterNetworkParameters()
74 ems = self.cfg.GetUseExternalMipScript()
75 result = self.rpc.call_node_activate_master_ip(master_params.uuid,
77 result.Raise("Could not activate the master IP")
80 class LUClusterDeactivateMasterIp(NoHooksLU):
81 """Deactivate the master IP on the master node.
84 def Exec(self, feedback_fn):
85 """Deactivate the master IP.
88 master_params = self.cfg.GetMasterNetworkParameters()
89 ems = self.cfg.GetUseExternalMipScript()
90 result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
92 result.Raise("Could not deactivate the master IP")
95 class LUClusterConfigQuery(NoHooksLU):
96 """Return configuration values.
101 def CheckArguments(self):
102 self.cq = ClusterQuery(None, self.op.output_fields, False)
104 def ExpandNames(self):
105 self.cq.ExpandNames(self)
107 def DeclareLocks(self, level):
108 self.cq.DeclareLocks(self, level)
110 def Exec(self, feedback_fn):
111 result = self.cq.OldStyleQuery(self)
113 assert len(result) == 1
118 class LUClusterDestroy(LogicalUnit):
119 """Logical unit for destroying the cluster.
122 HPATH = "cluster-destroy"
123 HTYPE = constants.HTYPE_CLUSTER
125 def BuildHooksEnv(self):
130 "OP_TARGET": self.cfg.GetClusterName(),
133 def BuildHooksNodes(self):
134 """Build hooks nodes.
139 def CheckPrereq(self):
140 """Check prerequisites.
142 This checks whether the cluster is empty.
144 Any errors are signaled by raising errors.OpPrereqError.
147 master = self.cfg.GetMasterNode()
149 nodelist = self.cfg.GetNodeList()
150 if len(nodelist) != 1 or nodelist[0] != master:
151 raise errors.OpPrereqError("There are still %d node(s) in"
152 " this cluster." % (len(nodelist) - 1),
154 instancelist = self.cfg.GetInstanceList()
156 raise errors.OpPrereqError("There are still %d instance(s) in"
157 " this cluster." % len(instancelist),
160 def Exec(self, feedback_fn):
161 """Destroys the cluster.
164 master_params = self.cfg.GetMasterNetworkParameters()
166 # Run post hooks on master node before it's removed
167 RunPostHook(self, self.cfg.GetNodeName(master_params.uuid))
169 ems = self.cfg.GetUseExternalMipScript()
170 result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
172 result.Warn("Error disabling the master IP address", self.LogWarning)
173 return master_params.uuid
176 class LUClusterPostInit(LogicalUnit):
177 """Logical unit for running hooks after cluster initialization.
180 HPATH = "cluster-init"
181 HTYPE = constants.HTYPE_CLUSTER
183 def BuildHooksEnv(self):
188 "OP_TARGET": self.cfg.GetClusterName(),
191 def BuildHooksNodes(self):
192 """Build hooks nodes.
195 return ([], [self.cfg.GetMasterNode()])
197 def Exec(self, feedback_fn):
204 class ClusterQuery(QueryBase):
205 FIELDS = query.CLUSTER_FIELDS
207 #: Do not sort (there is only one item)
210 def ExpandNames(self, lu):
213 # The following variables interact with _QueryBase._GetNames
214 self.wanted = locking.ALL_SET
215 self.do_locking = self.use_locking
218 raise errors.OpPrereqError("Can not use locking for cluster queries",
221 def DeclareLocks(self, lu, level):
224 def _GetQueryData(self, lu):
225 """Computes the list of nodes and their attributes.
228 # Locking is not used
229 assert not (compat.any(lu.glm.is_owned(level)
230 for level in locking.LEVELS
231 if level != locking.LEVEL_CLUSTER) or
232 self.do_locking or self.use_locking)
234 if query.CQ_CONFIG in self.requested_data:
235 cluster = lu.cfg.GetClusterInfo()
236 nodes = lu.cfg.GetAllNodesInfo()
238 cluster = NotImplemented
239 nodes = NotImplemented
241 if query.CQ_QUEUE_DRAINED in self.requested_data:
242 drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
244 drain_flag = NotImplemented
246 if query.CQ_WATCHER_PAUSE in self.requested_data:
247 master_node_uuid = lu.cfg.GetMasterNode()
249 result = lu.rpc.call_get_watcher_pause(master_node_uuid)
250 result.Raise("Can't retrieve watcher pause from master node '%s'" %
251 lu.cfg.GetMasterNodeName())
253 watcher_pause = result.payload
255 watcher_pause = NotImplemented
257 return query.ClusterQueryData(cluster, nodes, drain_flag, watcher_pause)
260 class LUClusterQuery(NoHooksLU):
261 """Query cluster configuration.
266 def ExpandNames(self):
267 self.needed_locks = {}
269 def Exec(self, feedback_fn):
270 """Return cluster config.
273 cluster = self.cfg.GetClusterInfo()
276 # Filter just for enabled hypervisors
277 for os_name, hv_dict in cluster.os_hvp.items():
279 for hv_name, hv_params in hv_dict.items():
280 if hv_name in cluster.enabled_hypervisors:
281 os_hvp[os_name][hv_name] = hv_params
283 # Convert ip_family to ip_version
284 primary_ip_version = constants.IP4_VERSION
285 if cluster.primary_ip_family == netutils.IP6Address.family:
286 primary_ip_version = constants.IP6_VERSION
289 "software_version": constants.RELEASE_VERSION,
290 "protocol_version": constants.PROTOCOL_VERSION,
291 "config_version": constants.CONFIG_VERSION,
292 "os_api_version": max(constants.OS_API_VERSIONS),
293 "export_version": constants.EXPORT_VERSION,
294 "vcs_version": constants.VCS_VERSION,
295 "architecture": runtime.GetArchInfo(),
296 "name": cluster.cluster_name,
297 "master": self.cfg.GetMasterNodeName(),
298 "default_hypervisor": cluster.primary_hypervisor,
299 "enabled_hypervisors": cluster.enabled_hypervisors,
300 "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
301 for hypervisor_name in cluster.enabled_hypervisors]),
303 "beparams": cluster.beparams,
304 "osparams": cluster.osparams,
305 "ipolicy": cluster.ipolicy,
306 "nicparams": cluster.nicparams,
307 "ndparams": cluster.ndparams,
308 "diskparams": cluster.diskparams,
309 "candidate_pool_size": cluster.candidate_pool_size,
310 "master_netdev": cluster.master_netdev,
311 "master_netmask": cluster.master_netmask,
312 "use_external_mip_script": cluster.use_external_mip_script,
313 "volume_group_name": cluster.volume_group_name,
314 "drbd_usermode_helper": cluster.drbd_usermode_helper,
315 "file_storage_dir": cluster.file_storage_dir,
316 "shared_file_storage_dir": cluster.shared_file_storage_dir,
317 "maintain_node_health": cluster.maintain_node_health,
318 "ctime": cluster.ctime,
319 "mtime": cluster.mtime,
320 "uuid": cluster.uuid,
321 "tags": list(cluster.GetTags()),
322 "uid_pool": cluster.uid_pool,
323 "default_iallocator": cluster.default_iallocator,
324 "reserved_lvs": cluster.reserved_lvs,
325 "primary_ip_version": primary_ip_version,
326 "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
327 "hidden_os": cluster.hidden_os,
328 "blacklisted_os": cluster.blacklisted_os,
329 "enabled_disk_templates": cluster.enabled_disk_templates,
335 class LUClusterRedistConf(NoHooksLU):
336 """Force the redistribution of cluster configuration.
338 This is a very simple LU.
343 def ExpandNames(self):
344 self.needed_locks = {
345 locking.LEVEL_NODE: locking.ALL_SET,
346 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
348 self.share_locks = ShareAll()
350 def Exec(self, feedback_fn):
351 """Redistribute the configuration.
354 self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
355 RedistributeAncillaryFiles(self)
358 class LUClusterRename(LogicalUnit):
359 """Rename the cluster.
362 HPATH = "cluster-rename"
363 HTYPE = constants.HTYPE_CLUSTER
365 def BuildHooksEnv(self):
370 "OP_TARGET": self.cfg.GetClusterName(),
371 "NEW_NAME": self.op.name,
374 def BuildHooksNodes(self):
375 """Build hooks nodes.
378 return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
380 def CheckPrereq(self):
381 """Verify that the passed name is a valid one.
384 hostname = netutils.GetHostname(name=self.op.name,
385 family=self.cfg.GetPrimaryIPFamily())
387 new_name = hostname.name
388 self.ip = new_ip = hostname.ip
389 old_name = self.cfg.GetClusterName()
390 old_ip = self.cfg.GetMasterIP()
391 if new_name == old_name and new_ip == old_ip:
392 raise errors.OpPrereqError("Neither the name nor the IP address of the"
393 " cluster has changed",
396 if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
397 raise errors.OpPrereqError("The given cluster IP address (%s) is"
398 " reachable on the network" %
399 new_ip, errors.ECODE_NOTUNIQUE)
401 self.op.name = new_name
403 def Exec(self, feedback_fn):
404 """Rename the cluster.
407 clustername = self.op.name
410 # shutdown the master IP
411 master_params = self.cfg.GetMasterNetworkParameters()
412 ems = self.cfg.GetUseExternalMipScript()
413 result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
415 result.Raise("Could not disable the master role")
418 cluster = self.cfg.GetClusterInfo()
419 cluster.cluster_name = clustername
420 cluster.master_ip = new_ip
421 self.cfg.Update(cluster, feedback_fn)
423 # update the known hosts file
424 ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
425 node_list = self.cfg.GetOnlineNodeList()
427 node_list.remove(master_params.uuid)
430 UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
432 master_params.ip = new_ip
433 result = self.rpc.call_node_activate_master_ip(master_params.uuid,
435 result.Warn("Could not re-enable the master role on the master,"
436 " please restart manually", self.LogWarning)
441 class LUClusterRepairDiskSizes(NoHooksLU):
442 """Verifies the cluster disks sizes.
447 def ExpandNames(self):
448 if self.op.instances:
449 (_, self.wanted_names) = GetWantedInstances(self, self.op.instances)
450 # Not getting the node allocation lock as only a specific set of
451 # instances (and their nodes) is going to be acquired
452 self.needed_locks = {
453 locking.LEVEL_NODE_RES: [],
454 locking.LEVEL_INSTANCE: self.wanted_names,
456 self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
458 self.wanted_names = None
459 self.needed_locks = {
460 locking.LEVEL_NODE_RES: locking.ALL_SET,
461 locking.LEVEL_INSTANCE: locking.ALL_SET,
463 # This opcode is acquires the node locks for all instances
464 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
468 locking.LEVEL_NODE_RES: 1,
469 locking.LEVEL_INSTANCE: 0,
470 locking.LEVEL_NODE_ALLOC: 1,
473 def DeclareLocks(self, level):
474 if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
475 self._LockInstancesNodes(primary_only=True, level=level)
477 def CheckPrereq(self):
478 """Check prerequisites.
480 This only checks the optional instance list against the existing names.
483 if self.wanted_names is None:
484 self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
486 self.wanted_instances = \
487 map(compat.snd, self.cfg.GetMultiInstanceInfoByName(self.wanted_names))
489 def _EnsureChildSizes(self, disk):
490 """Ensure children of the disk have the needed disk size.
492 This is valid mainly for DRBD8 and fixes an issue where the
493 children have smaller disk size.
495 @param disk: an L{ganeti.objects.Disk} object
498 if disk.dev_type == constants.LD_DRBD8:
499 assert disk.children, "Empty children for DRBD8?"
500 fchild = disk.children[0]
501 mismatch = fchild.size < disk.size
503 self.LogInfo("Child disk has size %d, parent %d, fixing",
504 fchild.size, disk.size)
505 fchild.size = disk.size
507 # and we recurse on this child only, not on the metadev
508 return self._EnsureChildSizes(fchild) or mismatch
512 def Exec(self, feedback_fn):
513 """Verify the size of cluster disks.
516 # TODO: check child disks too
517 # TODO: check differences in size between primary/secondary nodes
519 for instance in self.wanted_instances:
520 pnode = instance.primary_node
521 if pnode not in per_node_disks:
522 per_node_disks[pnode] = []
523 for idx, disk in enumerate(instance.disks):
524 per_node_disks[pnode].append((instance, idx, disk))
526 assert not (frozenset(per_node_disks.keys()) -
527 self.owned_locks(locking.LEVEL_NODE_RES)), \
528 "Not owning correct locks"
529 assert not self.owned_locks(locking.LEVEL_NODE)
531 es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
532 per_node_disks.keys())
535 for node_uuid, dskl in per_node_disks.items():
536 newl = [v[2].Copy() for v in dskl]
538 self.cfg.SetDiskID(dsk, node_uuid)
539 node_name = self.cfg.GetNodeName(node_uuid)
540 result = self.rpc.call_blockdev_getdimensions(node_uuid, newl)
542 self.LogWarning("Failure in blockdev_getdimensions call to node"
543 " %s, ignoring", node_name)
545 if len(result.payload) != len(dskl):
546 logging.warning("Invalid result from node %s: len(dksl)=%d,"
547 " result.payload=%s", node_name, len(dskl),
549 self.LogWarning("Invalid result from node %s, ignoring node results",
552 for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
553 if dimensions is None:
554 self.LogWarning("Disk %d of instance %s did not return size"
555 " information, ignoring", idx, instance.name)
557 if not isinstance(dimensions, (tuple, list)):
558 self.LogWarning("Disk %d of instance %s did not return valid"
559 " dimension information, ignoring", idx,
562 (size, spindles) = dimensions
563 if not isinstance(size, (int, long)):
564 self.LogWarning("Disk %d of instance %s did not return valid"
565 " size information, ignoring", idx, instance.name)
568 if size != disk.size:
569 self.LogInfo("Disk %d of instance %s has mismatched size,"
570 " correcting: recorded %d, actual %d", idx,
571 instance.name, disk.size, size)
573 self.cfg.Update(instance, feedback_fn)
574 changed.append((instance.name, idx, "size", size))
575 if es_flags[node_uuid]:
577 self.LogWarning("Disk %d of instance %s did not return valid"
578 " spindles information, ignoring", idx,
580 elif disk.spindles is None or disk.spindles != spindles:
581 self.LogInfo("Disk %d of instance %s has mismatched spindles,"
582 " correcting: recorded %s, actual %s",
583 idx, instance.name, disk.spindles, spindles)
584 disk.spindles = spindles
585 self.cfg.Update(instance, feedback_fn)
586 changed.append((instance.name, idx, "spindles", disk.spindles))
587 if self._EnsureChildSizes(disk):
588 self.cfg.Update(instance, feedback_fn)
589 changed.append((instance.name, idx, "size", disk.size))
593 def _ValidateNetmask(cfg, netmask):
594 """Checks if a netmask is valid.
596 @type cfg: L{config.ConfigWriter}
597 @param cfg: The cluster configuration
599 @param netmask: the netmask to be verified
600 @raise errors.OpPrereqError: if the validation fails
603 ip_family = cfg.GetPrimaryIPFamily()
605 ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
606 except errors.ProgrammerError:
607 raise errors.OpPrereqError("Invalid primary ip family: %s." %
608 ip_family, errors.ECODE_INVAL)
609 if not ipcls.ValidateNetmask(netmask):
610 raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
611 (netmask), errors.ECODE_INVAL)
614 def CheckFileBasedStoragePathVsEnabledDiskTemplates(
615 logging_warn_fn, file_storage_dir, enabled_disk_templates,
617 """Checks whether the given file-based storage directory is acceptable.
619 Note: This function is public, because it is also used in bootstrap.py.
621 @type logging_warn_fn: function
622 @param logging_warn_fn: function which accepts a string and logs it
623 @type file_storage_dir: string
624 @param file_storage_dir: the directory to be used for file-based instances
625 @type enabled_disk_templates: list of string
626 @param enabled_disk_templates: the list of enabled disk templates
627 @type file_disk_template: string
628 @param file_disk_template: the file-based disk template for which the
629 path should be checked
632 assert (file_disk_template in
633 utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
634 file_storage_enabled = file_disk_template in enabled_disk_templates
635 if file_storage_dir is not None:
636 if file_storage_dir == "":
637 if file_storage_enabled:
638 raise errors.OpPrereqError(
639 "Unsetting the '%s' storage directory while having '%s' storage"
640 " enabled is not permitted." %
641 (file_disk_template, file_disk_template))
643 if not file_storage_enabled:
645 "Specified a %s storage directory, although %s storage is not"
646 " enabled." % (file_disk_template, file_disk_template))
648 raise errors.ProgrammerError("Received %s storage dir with value"
649 " 'None'." % file_disk_template)
652 def CheckFileStoragePathVsEnabledDiskTemplates(
653 logging_warn_fn, file_storage_dir, enabled_disk_templates):
654 """Checks whether the given file storage directory is acceptable.
656 @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
659 CheckFileBasedStoragePathVsEnabledDiskTemplates(
660 logging_warn_fn, file_storage_dir, enabled_disk_templates,
664 def CheckSharedFileStoragePathVsEnabledDiskTemplates(
665 logging_warn_fn, file_storage_dir, enabled_disk_templates):
666 """Checks whether the given shared file storage directory is acceptable.
668 @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
671 CheckFileBasedStoragePathVsEnabledDiskTemplates(
672 logging_warn_fn, file_storage_dir, enabled_disk_templates,
673 constants.DT_SHARED_FILE)
676 class LUClusterSetParams(LogicalUnit):
677 """Change the parameters of the cluster.
680 HPATH = "cluster-modify"
681 HTYPE = constants.HTYPE_CLUSTER
684 def CheckArguments(self):
689 uidpool.CheckUidPool(self.op.uid_pool)
692 uidpool.CheckUidPool(self.op.add_uids)
694 if self.op.remove_uids:
695 uidpool.CheckUidPool(self.op.remove_uids)
697 if self.op.master_netmask is not None:
698 _ValidateNetmask(self.cfg, self.op.master_netmask)
700 if self.op.diskparams:
701 for dt_params in self.op.diskparams.values():
702 utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
704 utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
705 except errors.OpPrereqError, err:
706 raise errors.OpPrereqError("While verify diskparams options: %s" % err,
709 def ExpandNames(self):
710 # FIXME: in the future maybe other cluster params won't require checking on
711 # all nodes to be modified.
712 # FIXME: This opcode changes cluster-wide settings. Is acquiring all
713 # resource locks the right thing, shouldn't it be the BGL instead?
714 self.needed_locks = {
715 locking.LEVEL_NODE: locking.ALL_SET,
716 locking.LEVEL_INSTANCE: locking.ALL_SET,
717 locking.LEVEL_NODEGROUP: locking.ALL_SET,
718 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
720 self.share_locks = ShareAll()
722 def BuildHooksEnv(self):
727 "OP_TARGET": self.cfg.GetClusterName(),
728 "NEW_VG_NAME": self.op.vg_name,
731 def BuildHooksNodes(self):
732 """Build hooks nodes.
735 mn = self.cfg.GetMasterNode()
738 def _CheckVgName(self, node_uuids, enabled_disk_templates,
739 new_enabled_disk_templates):
740 """Check the consistency of the vg name on all nodes and in case it gets
741 unset whether there are instances still using it.
744 lvm_is_enabled = utils.IsLvmEnabled(enabled_disk_templates)
745 lvm_gets_enabled = utils.LvmGetsEnabled(enabled_disk_templates,
746 new_enabled_disk_templates)
747 current_vg_name = self.cfg.GetVGName()
749 if self.op.vg_name == '':
751 raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
752 " disk templates are or get enabled.")
754 if self.op.vg_name is None:
755 if current_vg_name is None and lvm_is_enabled:
756 raise errors.OpPrereqError("Please specify a volume group when"
757 " enabling lvm-based disk-templates.")
759 if self.op.vg_name is not None and not self.op.vg_name:
760 if self.cfg.HasAnyDiskOfType(constants.LD_LV):
761 raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
762 " instances exist", errors.ECODE_INVAL)
764 if (self.op.vg_name is not None and lvm_is_enabled) or \
765 (self.cfg.GetVGName() is not None and lvm_gets_enabled):
766 self._CheckVgNameOnNodes(node_uuids)
768 def _CheckVgNameOnNodes(self, node_uuids):
769 """Check the status of the volume group on each node.
772 vglist = self.rpc.call_vg_list(node_uuids)
773 for node_uuid in node_uuids:
774 msg = vglist[node_uuid].fail_msg
777 self.LogWarning("Error while gathering data on node %s"
778 " (ignoring node): %s",
779 self.cfg.GetNodeName(node_uuid), msg)
781 vgstatus = utils.CheckVolumeGroupSize(vglist[node_uuid].payload,
783 constants.MIN_VG_SIZE)
785 raise errors.OpPrereqError("Error on node '%s': %s" %
786 (self.cfg.GetNodeName(node_uuid), vgstatus),
787 errors.ECODE_ENVIRON)
790 def _GetEnabledDiskTemplatesInner(op_enabled_disk_templates,
791 old_enabled_disk_templates):
792 """Determines the enabled disk templates and the subset of disk templates
793 that are newly enabled by this operation.
796 enabled_disk_templates = None
797 new_enabled_disk_templates = []
798 if op_enabled_disk_templates:
799 enabled_disk_templates = op_enabled_disk_templates
800 new_enabled_disk_templates = \
801 list(set(enabled_disk_templates)
802 - set(old_enabled_disk_templates))
804 enabled_disk_templates = old_enabled_disk_templates
805 return (enabled_disk_templates, new_enabled_disk_templates)
807 def _GetEnabledDiskTemplates(self, cluster):
808 """Determines the enabled disk templates and the subset of disk templates
809 that are newly enabled by this operation.
812 return self._GetEnabledDiskTemplatesInner(self.op.enabled_disk_templates,
813 cluster.enabled_disk_templates)
815 def _CheckIpolicy(self, cluster, enabled_disk_templates):
816 """Checks the ipolicy.
818 @type cluster: C{objects.Cluster}
819 @param cluster: the cluster's configuration
820 @type enabled_disk_templates: list of string
821 @param enabled_disk_templates: list of (possibly newly) enabled disk
825 # FIXME: write unit tests for this
827 self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
830 CheckIpolicyVsDiskTemplates(self.new_ipolicy,
831 enabled_disk_templates)
833 all_instances = self.cfg.GetAllInstancesInfo().values()
835 for group in self.cfg.GetAllNodeGroupsInfo().values():
836 instances = frozenset([inst for inst in all_instances
837 if compat.any(nuuid in group.members
838 for nuuid in inst.all_nodes)])
839 new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
840 ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
841 new = ComputeNewInstanceViolations(ipol, new_ipolicy, instances,
844 violations.update(new)
847 self.LogWarning("After the ipolicy change the following instances"
849 utils.CommaJoin(utils.NiceSort(violations)))
851 CheckIpolicyVsDiskTemplates(cluster.ipolicy,
852 enabled_disk_templates)
854 def CheckPrereq(self):
855 """Check prerequisites.
857 This checks whether the given params don't conflict and
858 if the given volume group is valid.
861 if self.op.drbd_helper is not None and not self.op.drbd_helper:
862 if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
863 raise errors.OpPrereqError("Cannot disable drbd helper while"
864 " drbd-based instances exist",
867 node_uuids = self.owned_locks(locking.LEVEL_NODE)
868 self.cluster = cluster = self.cfg.GetClusterInfo()
870 vm_capable_node_uuids = [node.uuid
871 for node in self.cfg.GetAllNodesInfo().values()
872 if node.uuid in node_uuids and node.vm_capable]
874 (enabled_disk_templates, new_enabled_disk_templates) = \
875 self._GetEnabledDiskTemplates(cluster)
877 self._CheckVgName(vm_capable_node_uuids, enabled_disk_templates,
878 new_enabled_disk_templates)
880 if self.op.file_storage_dir is not None:
881 CheckFileStoragePathVsEnabledDiskTemplates(
882 self.LogWarning, self.op.file_storage_dir, enabled_disk_templates)
884 if self.op.shared_file_storage_dir is not None:
885 CheckSharedFileStoragePathVsEnabledDiskTemplates(
886 self.LogWarning, self.op.shared_file_storage_dir,
887 enabled_disk_templates)
889 if self.op.drbd_helper:
890 # checks given drbd helper on all nodes
891 helpers = self.rpc.call_drbd_helper(node_uuids)
892 for (_, ninfo) in self.cfg.GetMultiNodeInfo(node_uuids):
894 self.LogInfo("Not checking drbd helper on offline node %s",
897 msg = helpers[ninfo.uuid].fail_msg
899 raise errors.OpPrereqError("Error checking drbd helper on node"
900 " '%s': %s" % (ninfo.name, msg),
901 errors.ECODE_ENVIRON)
902 node_helper = helpers[ninfo.uuid].payload
903 if node_helper != self.op.drbd_helper:
904 raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
905 (ninfo.name, node_helper),
906 errors.ECODE_ENVIRON)
908 # validate params changes
910 objects.UpgradeBeParams(self.op.beparams)
911 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
912 self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
915 utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
916 self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
918 # TODO: we need a more general way to handle resetting
919 # cluster-level parameters to default values
920 if self.new_ndparams["oob_program"] == "":
921 self.new_ndparams["oob_program"] = \
922 constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
925 new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
926 self.cluster.hv_state_static)
927 self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
928 for hv, values in new_hv_state.items())
930 if self.op.disk_state:
931 new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
932 self.cluster.disk_state_static)
933 self.new_disk_state = \
934 dict((storage, dict((name, cluster.SimpleFillDiskState(values))
935 for name, values in svalues.items()))
936 for storage, svalues in new_disk_state.items())
938 self._CheckIpolicy(cluster, enabled_disk_templates)
940 if self.op.nicparams:
941 utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
942 self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
943 objects.NIC.CheckParameterSyntax(self.new_nicparams)
946 # check all instances for consistency
947 for instance in self.cfg.GetAllInstancesInfo().values():
948 for nic_idx, nic in enumerate(instance.nics):
949 params_copy = copy.deepcopy(nic.nicparams)
950 params_filled = objects.FillDict(self.new_nicparams, params_copy)
952 # check parameter syntax
954 objects.NIC.CheckParameterSyntax(params_filled)
955 except errors.ConfigurationError, err:
956 nic_errors.append("Instance %s, nic/%d: %s" %
957 (instance.name, nic_idx, err))
959 # if we're moving instances to routed, check that they have an ip
960 target_mode = params_filled[constants.NIC_MODE]
961 if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
962 nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
963 " address" % (instance.name, nic_idx))
965 raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
966 "\n".join(nic_errors), errors.ECODE_INVAL)
968 # hypervisor list/parameters
969 self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
971 for hv_name, hv_dict in self.op.hvparams.items():
972 if hv_name not in self.new_hvparams:
973 self.new_hvparams[hv_name] = hv_dict
975 self.new_hvparams[hv_name].update(hv_dict)
977 # disk template parameters
978 self.new_diskparams = objects.FillDict(cluster.diskparams, {})
979 if self.op.diskparams:
980 for dt_name, dt_params in self.op.diskparams.items():
981 if dt_name not in self.new_diskparams:
982 self.new_diskparams[dt_name] = dt_params
984 self.new_diskparams[dt_name].update(dt_params)
986 # os hypervisor parameters
987 self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
989 for os_name, hvs in self.op.os_hvp.items():
990 if os_name not in self.new_os_hvp:
991 self.new_os_hvp[os_name] = hvs
993 for hv_name, hv_dict in hvs.items():
995 # Delete if it exists
996 self.new_os_hvp[os_name].pop(hv_name, None)
997 elif hv_name not in self.new_os_hvp[os_name]:
998 self.new_os_hvp[os_name][hv_name] = hv_dict
1000 self.new_os_hvp[os_name][hv_name].update(hv_dict)
1003 self.new_osp = objects.FillDict(cluster.osparams, {})
1004 if self.op.osparams:
1005 for os_name, osp in self.op.osparams.items():
1006 if os_name not in self.new_osp:
1007 self.new_osp[os_name] = {}
1009 self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
1012 if not self.new_osp[os_name]:
1013 # we removed all parameters
1014 del self.new_osp[os_name]
1016 # check the parameter validity (remote check)
1017 CheckOSParams(self, False, [self.cfg.GetMasterNode()],
1018 os_name, self.new_osp[os_name])
1020 # changes to the hypervisor list
1021 if self.op.enabled_hypervisors is not None:
1022 self.hv_list = self.op.enabled_hypervisors
1023 for hv in self.hv_list:
1024 # if the hypervisor doesn't already exist in the cluster
1025 # hvparams, we initialize it to empty, and then (in both
1026 # cases) we make sure to fill the defaults, as we might not
1027 # have a complete defaults list if the hypervisor wasn't
1029 if hv not in new_hvp:
1031 new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
1032 utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
1034 self.hv_list = cluster.enabled_hypervisors
1036 if self.op.hvparams or self.op.enabled_hypervisors is not None:
1037 # either the enabled list has changed, or the parameters have, validate
1038 for hv_name, hv_params in self.new_hvparams.items():
1039 if ((self.op.hvparams and hv_name in self.op.hvparams) or
1040 (self.op.enabled_hypervisors and
1041 hv_name in self.op.enabled_hypervisors)):
1042 # either this is a new hypervisor, or its parameters have changed
1043 hv_class = hypervisor.GetHypervisorClass(hv_name)
1044 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1045 hv_class.CheckParameterSyntax(hv_params)
1046 CheckHVParams(self, node_uuids, hv_name, hv_params)
1048 self._CheckDiskTemplateConsistency()
1051 # no need to check any newly-enabled hypervisors, since the
1052 # defaults have already been checked in the above code-block
1053 for os_name, os_hvp in self.new_os_hvp.items():
1054 for hv_name, hv_params in os_hvp.items():
1055 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1056 # we need to fill in the new os_hvp on top of the actual hv_p
1057 cluster_defaults = self.new_hvparams.get(hv_name, {})
1058 new_osp = objects.FillDict(cluster_defaults, hv_params)
1059 hv_class = hypervisor.GetHypervisorClass(hv_name)
1060 hv_class.CheckParameterSyntax(new_osp)
1061 CheckHVParams(self, node_uuids, hv_name, new_osp)
1063 if self.op.default_iallocator:
1064 alloc_script = utils.FindFile(self.op.default_iallocator,
1065 constants.IALLOCATOR_SEARCH_PATH,
1067 if alloc_script is None:
1068 raise errors.OpPrereqError("Invalid default iallocator script '%s'"
1069 " specified" % self.op.default_iallocator,
1072 def _CheckDiskTemplateConsistency(self):
1073 """Check whether the disk templates that are going to be disabled
1074 are still in use by some instances.
1077 if self.op.enabled_disk_templates:
1078 cluster = self.cfg.GetClusterInfo()
1079 instances = self.cfg.GetAllInstancesInfo()
1081 disk_templates_to_remove = set(cluster.enabled_disk_templates) \
1082 - set(self.op.enabled_disk_templates)
1083 for instance in instances.itervalues():
1084 if instance.disk_template in disk_templates_to_remove:
1085 raise errors.OpPrereqError("Cannot disable disk template '%s',"
1086 " because instance '%s' is using it." %
1087 (instance.disk_template, instance.name))
1089 def _SetVgName(self, feedback_fn):
1090 """Determines and sets the new volume group name.
1093 if self.op.vg_name is not None:
1094 new_volume = self.op.vg_name
1097 if new_volume != self.cfg.GetVGName():
1098 self.cfg.SetVGName(new_volume)
1100 feedback_fn("Cluster LVM configuration already in desired"
1101 " state, not changing")
1103 def _SetFileStorageDir(self, feedback_fn):
1104 """Set the file storage directory.
1107 if self.op.file_storage_dir is not None:
1108 if self.cluster.file_storage_dir == self.op.file_storage_dir:
1109 feedback_fn("Global file storage dir already set to value '%s'"
1110 % self.cluster.file_storage_dir)
1112 self.cluster.file_storage_dir = self.op.file_storage_dir
1114 def Exec(self, feedback_fn):
1115 """Change the parameters of the cluster.
1118 if self.op.enabled_disk_templates:
1119 self.cluster.enabled_disk_templates = \
1120 list(set(self.op.enabled_disk_templates))
1122 self._SetVgName(feedback_fn)
1123 self._SetFileStorageDir(feedback_fn)
1125 if self.op.drbd_helper is not None:
1126 if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
1127 feedback_fn("Note that you specified a drbd user helper, but did"
1128 " enabled the drbd disk template.")
1129 new_helper = self.op.drbd_helper
1132 if new_helper != self.cfg.GetDRBDHelper():
1133 self.cfg.SetDRBDHelper(new_helper)
1135 feedback_fn("Cluster DRBD helper already in desired state,"
1137 if self.op.hvparams:
1138 self.cluster.hvparams = self.new_hvparams
1140 self.cluster.os_hvp = self.new_os_hvp
1141 if self.op.enabled_hypervisors is not None:
1142 self.cluster.hvparams = self.new_hvparams
1143 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1144 if self.op.beparams:
1145 self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1146 if self.op.nicparams:
1147 self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1149 self.cluster.ipolicy = self.new_ipolicy
1150 if self.op.osparams:
1151 self.cluster.osparams = self.new_osp
1152 if self.op.ndparams:
1153 self.cluster.ndparams = self.new_ndparams
1154 if self.op.diskparams:
1155 self.cluster.diskparams = self.new_diskparams
1156 if self.op.hv_state:
1157 self.cluster.hv_state_static = self.new_hv_state
1158 if self.op.disk_state:
1159 self.cluster.disk_state_static = self.new_disk_state
1161 if self.op.candidate_pool_size is not None:
1162 self.cluster.candidate_pool_size = self.op.candidate_pool_size
1163 # we need to update the pool size here, otherwise the save will fail
1164 AdjustCandidatePool(self, [])
1166 if self.op.maintain_node_health is not None:
1167 if self.op.maintain_node_health and not constants.ENABLE_CONFD:
1168 feedback_fn("Note: CONFD was disabled at build time, node health"
1169 " maintenance is not useful (still enabling it)")
1170 self.cluster.maintain_node_health = self.op.maintain_node_health
1172 if self.op.modify_etc_hosts is not None:
1173 self.cluster.modify_etc_hosts = self.op.modify_etc_hosts
1175 if self.op.prealloc_wipe_disks is not None:
1176 self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
1178 if self.op.add_uids is not None:
1179 uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
1181 if self.op.remove_uids is not None:
1182 uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
1184 if self.op.uid_pool is not None:
1185 self.cluster.uid_pool = self.op.uid_pool
1187 if self.op.default_iallocator is not None:
1188 self.cluster.default_iallocator = self.op.default_iallocator
1190 if self.op.reserved_lvs is not None:
1191 self.cluster.reserved_lvs = self.op.reserved_lvs
1193 if self.op.use_external_mip_script is not None:
1194 self.cluster.use_external_mip_script = self.op.use_external_mip_script
1196 def helper_os(aname, mods, desc):
1198 lst = getattr(self.cluster, aname)
1199 for key, val in mods:
1200 if key == constants.DDM_ADD:
1202 feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1205 elif key == constants.DDM_REMOVE:
1209 feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1211 raise errors.ProgrammerError("Invalid modification '%s'" % key)
1213 if self.op.hidden_os:
1214 helper_os("hidden_os", self.op.hidden_os, "hidden")
1216 if self.op.blacklisted_os:
1217 helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1219 if self.op.master_netdev:
1220 master_params = self.cfg.GetMasterNetworkParameters()
1221 ems = self.cfg.GetUseExternalMipScript()
1222 feedback_fn("Shutting down master ip on the current netdev (%s)" %
1223 self.cluster.master_netdev)
1224 result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
1226 if not self.op.force:
1227 result.Raise("Could not disable the master ip")
1230 msg = ("Could not disable the master ip (continuing anyway): %s" %
1233 feedback_fn("Changing master_netdev from %s to %s" %
1234 (master_params.netdev, self.op.master_netdev))
1235 self.cluster.master_netdev = self.op.master_netdev
1237 if self.op.master_netmask:
1238 master_params = self.cfg.GetMasterNetworkParameters()
1239 feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1240 result = self.rpc.call_node_change_master_netmask(
1241 master_params.uuid, master_params.netmask,
1242 self.op.master_netmask, master_params.ip,
1243 master_params.netdev)
1244 result.Warn("Could not change the master IP netmask", feedback_fn)
1245 self.cluster.master_netmask = self.op.master_netmask
1247 self.cfg.Update(self.cluster, feedback_fn)
1249 if self.op.master_netdev:
1250 master_params = self.cfg.GetMasterNetworkParameters()
1251 feedback_fn("Starting the master ip on the new master netdev (%s)" %
1252 self.op.master_netdev)
1253 ems = self.cfg.GetUseExternalMipScript()
1254 result = self.rpc.call_node_activate_master_ip(master_params.uuid,
1256 result.Warn("Could not re-enable the master ip on the master,"
1257 " please restart manually", self.LogWarning)
1260 class LUClusterVerify(NoHooksLU):
1261 """Submits all jobs necessary to verify the cluster.
1266 def ExpandNames(self):
1267 self.needed_locks = {}
1269 def Exec(self, feedback_fn):
1272 if self.op.group_name:
1273 groups = [self.op.group_name]
1274 depends_fn = lambda: None
1276 groups = self.cfg.GetNodeGroupList()
1278 # Verify global configuration
1280 opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1283 # Always depend on global verification
1284 depends_fn = lambda: [(-len(jobs), [])]
1287 [opcodes.OpClusterVerifyGroup(group_name=group,
1288 ignore_errors=self.op.ignore_errors,
1289 depends=depends_fn())]
1290 for group in groups)
1292 # Fix up all parameters
1293 for op in itertools.chain(*jobs): # pylint: disable=W0142
1294 op.debug_simulate_errors = self.op.debug_simulate_errors
1295 op.verbose = self.op.verbose
1296 op.error_codes = self.op.error_codes
1298 op.skip_checks = self.op.skip_checks
1299 except AttributeError:
1300 assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1302 return ResultWithJobs(jobs)
1305 class _VerifyErrors(object):
1306 """Mix-in for cluster/group verify LUs.
1308 It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1309 self.op and self._feedback_fn to be available.)
1313 ETYPE_FIELD = "code"
1314 ETYPE_ERROR = "ERROR"
1315 ETYPE_WARNING = "WARNING"
1317 def _Error(self, ecode, item, msg, *args, **kwargs):
1318 """Format an error message.
1320 Based on the opcode's error_codes parameter, either format a
1321 parseable error code, or a simpler error string.
1323 This must be called only from Exec and functions called from Exec.
1326 ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1327 itype, etxt, _ = ecode
1328 # If the error code is in the list of ignored errors, demote the error to a
1330 if etxt in self.op.ignore_errors: # pylint: disable=E1101
1331 ltype = self.ETYPE_WARNING
1332 # first complete the msg
1335 # then format the whole message
1336 if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1337 msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1343 msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1344 # and finally report it via the feedback_fn
1345 self._feedback_fn(" - %s" % msg) # Mix-in. pylint: disable=E1101
1346 # do not mark the operation as failed for WARN cases only
1347 if ltype == self.ETYPE_ERROR:
1350 def _ErrorIf(self, cond, *args, **kwargs):
1351 """Log an error message if the passed condition is True.
1355 or self.op.debug_simulate_errors): # pylint: disable=E1101
1356 self._Error(*args, **kwargs)
1359 def _VerifyCertificate(filename):
1360 """Verifies a certificate for L{LUClusterVerifyConfig}.
1362 @type filename: string
1363 @param filename: Path to PEM file
1367 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1368 utils.ReadFile(filename))
1369 except Exception, err: # pylint: disable=W0703
1370 return (LUClusterVerifyConfig.ETYPE_ERROR,
1371 "Failed to load X509 certificate %s: %s" % (filename, err))
1374 utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1375 constants.SSL_CERT_EXPIRATION_ERROR)
1378 fnamemsg = "While verifying %s: %s" % (filename, msg)
1383 return (None, fnamemsg)
1384 elif errcode == utils.CERT_WARNING:
1385 return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1386 elif errcode == utils.CERT_ERROR:
1387 return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1389 raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1392 def _GetAllHypervisorParameters(cluster, instances):
1393 """Compute the set of all hypervisor parameters.
1395 @type cluster: L{objects.Cluster}
1396 @param cluster: the cluster object
1397 @param instances: list of L{objects.Instance}
1398 @param instances: additional instances from which to obtain parameters
1399 @rtype: list of (origin, hypervisor, parameters)
1400 @return: a list with all parameters found, indicating the hypervisor they
1401 apply to, and the origin (can be "cluster", "os X", or "instance Y")
1406 for hv_name in cluster.enabled_hypervisors:
1407 hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1409 for os_name, os_hvp in cluster.os_hvp.items():
1410 for hv_name, hv_params in os_hvp.items():
1412 full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1413 hvp_data.append(("os %s" % os_name, hv_name, full_params))
1415 # TODO: collapse identical parameter values in a single one
1416 for instance in instances:
1417 if instance.hvparams:
1418 hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1419 cluster.FillHV(instance)))
1424 class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1425 """Verifies the cluster config.
1430 def _VerifyHVP(self, hvp_data):
1431 """Verifies locally the syntax of the hypervisor parameters.
1434 for item, hv_name, hv_params in hvp_data:
1435 msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1438 hv_class = hypervisor.GetHypervisorClass(hv_name)
1439 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1440 hv_class.CheckParameterSyntax(hv_params)
1441 except errors.GenericError, err:
1442 self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1444 def ExpandNames(self):
1445 self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1446 self.share_locks = ShareAll()
1448 def CheckPrereq(self):
1449 """Check prerequisites.
1452 # Retrieve all information
1453 self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1454 self.all_node_info = self.cfg.GetAllNodesInfo()
1455 self.all_inst_info = self.cfg.GetAllInstancesInfo()
1457 def Exec(self, feedback_fn):
1458 """Verify integrity of cluster, performing various test on nodes.
1462 self._feedback_fn = feedback_fn
1464 feedback_fn("* Verifying cluster config")
1466 for msg in self.cfg.VerifyConfig():
1467 self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1469 feedback_fn("* Verifying cluster certificate files")
1471 for cert_filename in pathutils.ALL_CERT_FILES:
1472 (errcode, msg) = _VerifyCertificate(cert_filename)
1473 self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1475 self._ErrorIf(not utils.CanRead(constants.LUXID_USER,
1476 pathutils.NODED_CERT_FILE),
1477 constants.CV_ECLUSTERCERT,
1479 pathutils.NODED_CERT_FILE + " must be accessible by the " +
1480 constants.LUXID_USER + " user")
1482 feedback_fn("* Verifying hypervisor parameters")
1484 self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1485 self.all_inst_info.values()))
1487 feedback_fn("* Verifying all nodes belong to an existing group")
1489 # We do this verification here because, should this bogus circumstance
1490 # occur, it would never be caught by VerifyGroup, which only acts on
1491 # nodes/instances reachable from existing node groups.
1493 dangling_nodes = set(node for node in self.all_node_info.values()
1494 if node.group not in self.all_group_info)
1496 dangling_instances = {}
1497 no_node_instances = []
1499 for inst in self.all_inst_info.values():
1500 if inst.primary_node in [node.uuid for node in dangling_nodes]:
1501 dangling_instances.setdefault(inst.primary_node, []).append(inst)
1502 elif inst.primary_node not in self.all_node_info:
1503 no_node_instances.append(inst)
1508 utils.CommaJoin(inst.name for
1509 inst in dangling_instances.get(node.uuid, [])))
1510 for node in dangling_nodes]
1512 self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1514 "the following nodes (and their instances) belong to a non"
1515 " existing group: %s", utils.CommaJoin(pretty_dangling))
1517 self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1519 "the following instances have a non-existing primary-node:"
1520 " %s", utils.CommaJoin(inst.name for
1521 inst in no_node_instances))
1526 class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1527 """Verifies the status of a node group.
1530 HPATH = "cluster-verify"
1531 HTYPE = constants.HTYPE_CLUSTER
1534 _HOOKS_INDENT_RE = re.compile("^", re.M)
1536 class NodeImage(object):
1537 """A class representing the logical and physical status of a node.
1540 @ivar uuid: the node UUID to which this object refers
1541 @ivar volumes: a structure as returned from
1542 L{ganeti.backend.GetVolumeList} (runtime)
1543 @ivar instances: a list of running instances (runtime)
1544 @ivar pinst: list of configured primary instances (config)
1545 @ivar sinst: list of configured secondary instances (config)
1546 @ivar sbp: dictionary of {primary-node: list of instances} for all
1547 instances for which this node is secondary (config)
1548 @ivar mfree: free memory, as reported by hypervisor (runtime)
1549 @ivar dfree: free disk, as reported by the node (runtime)
1550 @ivar offline: the offline status (config)
1551 @type rpc_fail: boolean
1552 @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1553 not whether the individual keys were correct) (runtime)
1554 @type lvm_fail: boolean
1555 @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1556 @type hyp_fail: boolean
1557 @ivar hyp_fail: whether the RPC call didn't return the instance list
1558 @type ghost: boolean
1559 @ivar ghost: whether this is a known node or not (config)
1560 @type os_fail: boolean
1561 @ivar os_fail: whether the RPC call didn't return valid OS data
1563 @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1564 @type vm_capable: boolean
1565 @ivar vm_capable: whether the node can host instances
1567 @ivar pv_min: size in MiB of the smallest PVs
1569 @ivar pv_max: size in MiB of the biggest PVs
1572 def __init__(self, offline=False, uuid=None, vm_capable=True):
1581 self.offline = offline
1582 self.vm_capable = vm_capable
1583 self.rpc_fail = False
1584 self.lvm_fail = False
1585 self.hyp_fail = False
1587 self.os_fail = False
1592 def ExpandNames(self):
1593 # This raises errors.OpPrereqError on its own:
1594 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1596 # Get instances in node group; this is unsafe and needs verification later
1598 self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1600 self.needed_locks = {
1601 locking.LEVEL_INSTANCE: self.cfg.GetInstanceNames(inst_uuids),
1602 locking.LEVEL_NODEGROUP: [self.group_uuid],
1603 locking.LEVEL_NODE: [],
1605 # This opcode is run by watcher every five minutes and acquires all nodes
1606 # for a group. It doesn't run for a long time, so it's better to acquire
1607 # the node allocation lock as well.
1608 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1611 self.share_locks = ShareAll()
1613 def DeclareLocks(self, level):
1614 if level == locking.LEVEL_NODE:
1615 # Get members of node group; this is unsafe and needs verification later
1616 nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1618 # In Exec(), we warn about mirrored instances that have primary and
1619 # secondary living in separate node groups. To fully verify that
1620 # volumes for these instances are healthy, we will need to do an
1621 # extra call to their secondaries. We ensure here those nodes will
1623 for inst_name in self.owned_locks(locking.LEVEL_INSTANCE):
1624 # Important: access only the instances whose lock is owned
1625 instance = self.cfg.GetInstanceInfoByName(inst_name)
1626 if instance.disk_template in constants.DTS_INT_MIRROR:
1627 nodes.update(instance.secondary_nodes)
1629 self.needed_locks[locking.LEVEL_NODE] = nodes
1631 def CheckPrereq(self):
1632 assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1633 self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1635 group_node_uuids = set(self.group_info.members)
1636 group_inst_uuids = \
1637 self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1639 unlocked_node_uuids = \
1640 group_node_uuids.difference(self.owned_locks(locking.LEVEL_NODE))
1642 unlocked_inst_uuids = \
1643 group_inst_uuids.difference(
1644 [self.cfg.GetInstanceInfoByName(name).uuid
1645 for name in self.owned_locks(locking.LEVEL_INSTANCE)])
1647 if unlocked_node_uuids:
1648 raise errors.OpPrereqError(
1649 "Missing lock for nodes: %s" %
1650 utils.CommaJoin(self.cfg.GetNodeNames(unlocked_node_uuids)),
1653 if unlocked_inst_uuids:
1654 raise errors.OpPrereqError(
1655 "Missing lock for instances: %s" %
1656 utils.CommaJoin(self.cfg.GetInstanceNames(unlocked_inst_uuids)),
1659 self.all_node_info = self.cfg.GetAllNodesInfo()
1660 self.all_inst_info = self.cfg.GetAllInstancesInfo()
1662 self.my_node_uuids = group_node_uuids
1663 self.my_node_info = dict((node_uuid, self.all_node_info[node_uuid])
1664 for node_uuid in group_node_uuids)
1666 self.my_inst_uuids = group_inst_uuids
1667 self.my_inst_info = dict((inst_uuid, self.all_inst_info[inst_uuid])
1668 for inst_uuid in group_inst_uuids)
1670 # We detect here the nodes that will need the extra RPC calls for verifying
1671 # split LV volumes; they should be locked.
1672 extra_lv_nodes = set()
1674 for inst in self.my_inst_info.values():
1675 if inst.disk_template in constants.DTS_INT_MIRROR:
1676 for nuuid in inst.all_nodes:
1677 if self.all_node_info[nuuid].group != self.group_uuid:
1678 extra_lv_nodes.add(nuuid)
1680 unlocked_lv_nodes = \
1681 extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1683 if unlocked_lv_nodes:
1684 raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1685 utils.CommaJoin(unlocked_lv_nodes),
1687 self.extra_lv_nodes = list(extra_lv_nodes)
1689 def _VerifyNode(self, ninfo, nresult):
1690 """Perform some basic validation on data returned from a node.
1692 - check the result data structure is well formed and has all the
1694 - check ganeti version
1696 @type ninfo: L{objects.Node}
1697 @param ninfo: the node to check
1698 @param nresult: the results from the node
1700 @return: whether overall this call was successful (and we can expect
1701 reasonable values in the respose)
1704 # main result, nresult should be a non-empty dict
1705 test = not nresult or not isinstance(nresult, dict)
1706 self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1707 "unable to verify node: no data returned")
1711 # compares ganeti version
1712 local_version = constants.PROTOCOL_VERSION
1713 remote_version = nresult.get("version", None)
1714 test = not (remote_version and
1715 isinstance(remote_version, (list, tuple)) and
1716 len(remote_version) == 2)
1717 self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1718 "connection to node returned invalid data")
1722 test = local_version != remote_version[0]
1723 self._ErrorIf(test, constants.CV_ENODEVERSION, ninfo.name,
1724 "incompatible protocol versions: master %s,"
1725 " node %s", local_version, remote_version[0])
1729 # node seems compatible, we can actually try to look into its results
1731 # full package version
1732 self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1733 constants.CV_ENODEVERSION, ninfo.name,
1734 "software version mismatch: master %s, node %s",
1735 constants.RELEASE_VERSION, remote_version[1],
1736 code=self.ETYPE_WARNING)
1738 hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1739 if ninfo.vm_capable and isinstance(hyp_result, dict):
1740 for hv_name, hv_result in hyp_result.iteritems():
1741 test = hv_result is not None
1742 self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
1743 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1745 hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1746 if ninfo.vm_capable and isinstance(hvp_result, list):
1747 for item, hv_name, hv_result in hvp_result:
1748 self._ErrorIf(True, constants.CV_ENODEHV, ninfo.name,
1749 "hypervisor %s parameter verify failure (source %s): %s",
1750 hv_name, item, hv_result)
1752 test = nresult.get(constants.NV_NODESETUP,
1753 ["Missing NODESETUP results"])
1754 self._ErrorIf(test, constants.CV_ENODESETUP, ninfo.name,
1755 "node setup error: %s", "; ".join(test))
1759 def _VerifyNodeTime(self, ninfo, nresult,
1760 nvinfo_starttime, nvinfo_endtime):
1761 """Check the node time.
1763 @type ninfo: L{objects.Node}
1764 @param ninfo: the node to check
1765 @param nresult: the remote results for the node
1766 @param nvinfo_starttime: the start time of the RPC call
1767 @param nvinfo_endtime: the end time of the RPC call
1770 ntime = nresult.get(constants.NV_TIME, None)
1772 ntime_merged = utils.MergeTime(ntime)
1773 except (ValueError, TypeError):
1774 self._ErrorIf(True, constants.CV_ENODETIME, ninfo.name,
1775 "Node returned invalid time")
1778 if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1779 ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1780 elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1781 ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1785 self._ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, ninfo.name,
1786 "Node time diverges by at least %s from master node time",
1789 def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1790 """Check the node LVM results and update info for cross-node checks.
1792 @type ninfo: L{objects.Node}
1793 @param ninfo: the node to check
1794 @param nresult: the remote results for the node
1795 @param vg_name: the configured VG name
1796 @type nimg: L{NodeImage}
1797 @param nimg: node image
1803 # checks vg existence and size > 20G
1804 vglist = nresult.get(constants.NV_VGLIST, None)
1806 self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
1807 "unable to check volume groups")
1809 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1810 constants.MIN_VG_SIZE)
1811 self._ErrorIf(vgstatus, constants.CV_ENODELVM, ninfo.name, vgstatus)
1814 (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1816 self._Error(constants.CV_ENODELVM, ninfo.name, em)
1817 if pvminmax is not None:
1818 (nimg.pv_min, nimg.pv_max) = pvminmax
1820 def _VerifyGroupDRBDVersion(self, node_verify_infos):
1821 """Check cross-node DRBD version consistency.
1823 @type node_verify_infos: dict
1824 @param node_verify_infos: infos about nodes as returned from the
1829 for node_uuid, ndata in node_verify_infos.items():
1830 nresult = ndata.payload
1831 version = nresult.get(constants.NV_DRBDVERSION, "Missing DRBD version")
1832 node_versions[node_uuid] = version
1834 if len(set(node_versions.values())) > 1:
1835 for node_uuid, version in sorted(node_versions.items()):
1836 msg = "DRBD version mismatch: %s" % version
1837 self._Error(constants.CV_ENODEDRBDHELPER, node_uuid, msg,
1838 code=self.ETYPE_WARNING)
1840 def _VerifyGroupLVM(self, node_image, vg_name):
1841 """Check cross-node consistency in LVM.
1843 @type node_image: dict
1844 @param node_image: info about nodes, mapping from node to names to
1845 L{NodeImage} objects
1846 @param vg_name: the configured VG name
1852 # Only exclusive storage needs this kind of checks
1853 if not self._exclusive_storage:
1856 # exclusive_storage wants all PVs to have the same size (approximately),
1857 # if the smallest and the biggest ones are okay, everything is fine.
1858 # pv_min is None iff pv_max is None
1859 vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1862 (pvmin, minnode_uuid) = min((ni.pv_min, ni.uuid) for ni in vals)
1863 (pvmax, maxnode_uuid) = max((ni.pv_max, ni.uuid) for ni in vals)
1864 bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1865 self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1866 "PV sizes differ too much in the group; smallest (%s MB) is"
1867 " on %s, biggest (%s MB) is on %s",
1868 pvmin, self.cfg.GetNodeName(minnode_uuid),
1869 pvmax, self.cfg.GetNodeName(maxnode_uuid))
1871 def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1872 """Check the node bridges.
1874 @type ninfo: L{objects.Node}
1875 @param ninfo: the node to check
1876 @param nresult: the remote results for the node
1877 @param bridges: the expected list of bridges
1883 missing = nresult.get(constants.NV_BRIDGES, None)
1884 test = not isinstance(missing, list)
1885 self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1886 "did not return valid bridge information")
1888 self._ErrorIf(bool(missing), constants.CV_ENODENET, ninfo.name,
1889 "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1891 def _VerifyNodeUserScripts(self, ninfo, nresult):
1892 """Check the results of user scripts presence and executability on the node
1894 @type ninfo: L{objects.Node}
1895 @param ninfo: the node to check
1896 @param nresult: the remote results for the node
1899 test = not constants.NV_USERSCRIPTS in nresult
1900 self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1901 "did not return user scripts information")
1903 broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
1905 self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1906 "user scripts not present or not executable: %s" %
1907 utils.CommaJoin(sorted(broken_scripts)))
1909 def _VerifyNodeNetwork(self, ninfo, nresult):
1910 """Check the node network connectivity results.
1912 @type ninfo: L{objects.Node}
1913 @param ninfo: the node to check
1914 @param nresult: the remote results for the node
1917 test = constants.NV_NODELIST not in nresult
1918 self._ErrorIf(test, constants.CV_ENODESSH, ninfo.name,
1919 "node hasn't returned node ssh connectivity data")
1921 if nresult[constants.NV_NODELIST]:
1922 for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1923 self._ErrorIf(True, constants.CV_ENODESSH, ninfo.name,
1924 "ssh communication with node '%s': %s", a_node, a_msg)
1926 test = constants.NV_NODENETTEST not in nresult
1927 self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1928 "node hasn't returned node tcp connectivity data")
1930 if nresult[constants.NV_NODENETTEST]:
1931 nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1933 self._ErrorIf(True, constants.CV_ENODENET, ninfo.name,
1934 "tcp communication with node '%s': %s",
1935 anode, nresult[constants.NV_NODENETTEST][anode])
1937 test = constants.NV_MASTERIP not in nresult
1938 self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1939 "node hasn't returned node master IP reachability data")
1941 if not nresult[constants.NV_MASTERIP]:
1942 if ninfo.uuid == self.master_node:
1943 msg = "the master node cannot reach the master IP (not configured?)"
1945 msg = "cannot reach the master IP"
1946 self._ErrorIf(True, constants.CV_ENODENET, ninfo.name, msg)
1948 def _VerifyInstance(self, instance, node_image, diskstatus):
1949 """Verify an instance.
1951 This function checks to see if the required block devices are
1952 available on the instance's node, and that the nodes are in the correct
1956 pnode_uuid = instance.primary_node
1957 pnode_img = node_image[pnode_uuid]
1958 groupinfo = self.cfg.GetAllNodeGroupsInfo()
1960 node_vol_should = {}
1961 instance.MapLVsByNode(node_vol_should)
1963 cluster = self.cfg.GetClusterInfo()
1964 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
1966 err = ComputeIPolicyInstanceViolation(ipolicy, instance, self.cfg)
1967 self._ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance.name,
1968 utils.CommaJoin(err), code=self.ETYPE_WARNING)
1970 for node_uuid in node_vol_should:
1971 n_img = node_image[node_uuid]
1972 if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1973 # ignore missing volumes on offline or broken nodes
1975 for volume in node_vol_should[node_uuid]:
1976 test = volume not in n_img.volumes
1977 self._ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance.name,
1978 "volume %s missing on node %s", volume,
1979 self.cfg.GetNodeName(node_uuid))
1981 if instance.admin_state == constants.ADMINST_UP:
1982 test = instance.uuid not in pnode_img.instances and not pnode_img.offline
1983 self._ErrorIf(test, constants.CV_EINSTANCEDOWN, instance.name,
1984 "instance not running on its primary node %s",
1985 self.cfg.GetNodeName(pnode_uuid))
1986 self._ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE,
1987 instance.name, "instance is marked as running and lives on"
1988 " offline node %s", self.cfg.GetNodeName(pnode_uuid))
1990 diskdata = [(nname, success, status, idx)
1991 for (nname, disks) in diskstatus.items()
1992 for idx, (success, status) in enumerate(disks)]
1994 for nname, success, bdev_status, idx in diskdata:
1995 # the 'ghost node' construction in Exec() ensures that we have a
1997 snode = node_image[nname]
1998 bad_snode = snode.ghost or snode.offline
1999 self._ErrorIf(instance.disks_active and
2000 not success and not bad_snode,
2001 constants.CV_EINSTANCEFAULTYDISK, instance.name,
2002 "couldn't retrieve status for disk/%s on %s: %s",
2003 idx, self.cfg.GetNodeName(nname), bdev_status)
2005 if instance.disks_active and success and \
2006 (bdev_status.is_degraded or
2007 bdev_status.ldisk_status != constants.LDS_OKAY):
2008 msg = "disk/%s on %s" % (idx, self.cfg.GetNodeName(nname))
2009 if bdev_status.is_degraded:
2010 msg += " is degraded"
2011 if bdev_status.ldisk_status != constants.LDS_OKAY:
2012 msg += "; state is '%s'" % \
2013 constants.LDS_NAMES[bdev_status.ldisk_status]
2015 self._Error(constants.CV_EINSTANCEFAULTYDISK, instance.name, msg)
2017 self._ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2018 constants.CV_ENODERPC, self.cfg.GetNodeName(pnode_uuid),
2019 "instance %s, connection to primary node failed",
2022 self._ErrorIf(len(instance.secondary_nodes) > 1,
2023 constants.CV_EINSTANCELAYOUT, instance.name,
2024 "instance has multiple secondary nodes: %s",
2025 utils.CommaJoin(instance.secondary_nodes),
2026 code=self.ETYPE_WARNING)
2028 es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, instance.all_nodes)
2029 if any(es_flags.values()):
2030 if instance.disk_template not in constants.DTS_EXCL_STORAGE:
2031 # Disk template not compatible with exclusive_storage: no instance
2032 # node should have the flag set
2034 for (n, es) in es_flags.items()
2036 self._Error(constants.CV_EINSTANCEUNSUITABLENODE, instance.name,
2037 "instance has template %s, which is not supported on nodes"
2038 " that have exclusive storage set: %s",
2039 instance.disk_template,
2040 utils.CommaJoin(self.cfg.GetNodeNames(es_nodes)))
2041 for (idx, disk) in enumerate(instance.disks):
2042 self._ErrorIf(disk.spindles is None,
2043 constants.CV_EINSTANCEMISSINGCFGPARAMETER, instance.name,
2044 "number of spindles not configured for disk %s while"
2045 " exclusive storage is enabled, try running"
2046 " gnt-cluster repair-disk-sizes", idx)
2048 if instance.disk_template in constants.DTS_INT_MIRROR:
2049 instance_nodes = utils.NiceSort(instance.all_nodes)
2050 instance_groups = {}
2052 for node_uuid in instance_nodes:
2053 instance_groups.setdefault(self.all_node_info[node_uuid].group,
2054 []).append(node_uuid)
2057 "%s (group %s)" % (utils.CommaJoin(self.cfg.GetNodeNames(nodes)),
2058 groupinfo[group].name)
2059 # Sort so that we always list the primary node first.
2060 for group, nodes in sorted(instance_groups.items(),
2061 key=lambda (_, nodes): pnode_uuid in nodes,
2064 self._ErrorIf(len(instance_groups) > 1,
2065 constants.CV_EINSTANCESPLITGROUPS,
2066 instance.name, "instance has primary and secondary nodes in"
2067 " different groups: %s", utils.CommaJoin(pretty_list),
2068 code=self.ETYPE_WARNING)
2070 inst_nodes_offline = []
2071 for snode in instance.secondary_nodes:
2072 s_img = node_image[snode]
2073 self._ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
2074 self.cfg.GetNodeName(snode),
2075 "instance %s, connection to secondary node failed",
2079 inst_nodes_offline.append(snode)
2081 # warn that the instance lives on offline nodes
2082 self._ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE,
2083 instance.name, "instance has offline secondary node(s) %s",
2084 utils.CommaJoin(self.cfg.GetNodeNames(inst_nodes_offline)))
2085 # ... or ghost/non-vm_capable nodes
2086 for node_uuid in instance.all_nodes:
2087 self._ErrorIf(node_image[node_uuid].ghost, constants.CV_EINSTANCEBADNODE,
2088 instance.name, "instance lives on ghost node %s",
2089 self.cfg.GetNodeName(node_uuid))
2090 self._ErrorIf(not node_image[node_uuid].vm_capable,
2091 constants.CV_EINSTANCEBADNODE, instance.name,
2092 "instance lives on non-vm_capable node %s",
2093 self.cfg.GetNodeName(node_uuid))
2095 def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2096 """Verify if there are any unknown volumes in the cluster.
2098 The .os, .swap and backup volumes are ignored. All other volumes are
2099 reported as unknown.
2101 @type reserved: L{ganeti.utils.FieldSet}
2102 @param reserved: a FieldSet of reserved volume names
2105 for node_uuid, n_img in node_image.items():
2106 if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
2107 self.all_node_info[node_uuid].group != self.group_uuid):
2108 # skip non-healthy nodes
2110 for volume in n_img.volumes:
2111 test = ((node_uuid not in node_vol_should or
2112 volume not in node_vol_should[node_uuid]) and
2113 not reserved.Matches(volume))
2114 self._ErrorIf(test, constants.CV_ENODEORPHANLV,
2115 self.cfg.GetNodeName(node_uuid),
2116 "volume %s is unknown", volume)
2118 def _VerifyNPlusOneMemory(self, node_image, all_insts):
2119 """Verify N+1 Memory Resilience.
2121 Check that if one single node dies we can still start all the
2122 instances it was primary for.
2125 cluster_info = self.cfg.GetClusterInfo()
2126 for node_uuid, n_img in node_image.items():
2127 # This code checks that every node which is now listed as
2128 # secondary has enough memory to host all instances it is
2129 # supposed to should a single other node in the cluster fail.
2130 # FIXME: not ready for failover to an arbitrary node
2131 # FIXME: does not support file-backed instances
2132 # WARNING: we currently take into account down instances as well
2133 # as up ones, considering that even if they're down someone
2134 # might want to start them even in the event of a node failure.
2135 if n_img.offline or \
2136 self.all_node_info[node_uuid].group != self.group_uuid:
2137 # we're skipping nodes marked offline and nodes in other groups from
2138 # the N+1 warning, since most likely we don't have good memory
2139 # infromation from them; we already list instances living on such
2140 # nodes, and that's enough warning
2142 #TODO(dynmem): also consider ballooning out other instances
2143 for prinode, inst_uuids in n_img.sbp.items():
2145 for inst_uuid in inst_uuids:
2146 bep = cluster_info.FillBE(all_insts[inst_uuid])
2147 if bep[constants.BE_AUTO_BALANCE]:
2148 needed_mem += bep[constants.BE_MINMEM]
2149 test = n_img.mfree < needed_mem
2150 self._ErrorIf(test, constants.CV_ENODEN1,
2151 self.cfg.GetNodeName(node_uuid),
2152 "not enough memory to accomodate instance failovers"
2153 " should node %s fail (%dMiB needed, %dMiB available)",
2154 self.cfg.GetNodeName(prinode), needed_mem, n_img.mfree)
2156 def _VerifyFiles(self, nodes, master_node_uuid, all_nvinfo,
2157 (files_all, files_opt, files_mc, files_vm)):
2158 """Verifies file checksums collected from all nodes.
2160 @param nodes: List of L{objects.Node} objects
2161 @param master_node_uuid: UUID of master node
2162 @param all_nvinfo: RPC results
2165 # Define functions determining which nodes to consider for a file
2168 (files_mc, lambda node: (node.master_candidate or
2169 node.uuid == master_node_uuid)),
2170 (files_vm, lambda node: node.vm_capable),
2173 # Build mapping from filename to list of nodes which should have the file
2175 for (files, fn) in files2nodefn:
2179 filenodes = filter(fn, nodes)
2180 nodefiles.update((filename,
2181 frozenset(map(operator.attrgetter("uuid"), filenodes)))
2182 for filename in files)
2184 assert set(nodefiles) == (files_all | files_mc | files_vm)
2186 fileinfo = dict((filename, {}) for filename in nodefiles)
2187 ignore_nodes = set()
2191 ignore_nodes.add(node.uuid)
2194 nresult = all_nvinfo[node.uuid]
2196 if nresult.fail_msg or not nresult.payload:
2199 fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
2200 node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2201 for (key, value) in fingerprints.items())
2204 test = not (node_files and isinstance(node_files, dict))
2205 self._ErrorIf(test, constants.CV_ENODEFILECHECK, node.name,
2206 "Node did not return file checksum data")
2208 ignore_nodes.add(node.uuid)
2211 # Build per-checksum mapping from filename to nodes having it
2212 for (filename, checksum) in node_files.items():
2213 assert filename in nodefiles
2214 fileinfo[filename].setdefault(checksum, set()).add(node.uuid)
2216 for (filename, checksums) in fileinfo.items():
2217 assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2219 # Nodes having the file
2220 with_file = frozenset(node_uuid
2221 for node_uuids in fileinfo[filename].values()
2222 for node_uuid in node_uuids) - ignore_nodes
2224 expected_nodes = nodefiles[filename] - ignore_nodes
2226 # Nodes missing file
2227 missing_file = expected_nodes - with_file
2229 if filename in files_opt:
2231 self._ErrorIf(missing_file and missing_file != expected_nodes,
2232 constants.CV_ECLUSTERFILECHECK, None,
2233 "File %s is optional, but it must exist on all or no"
2234 " nodes (not found on %s)",
2238 map(self.cfg.GetNodeName, missing_file))))
2240 self._ErrorIf(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2241 "File %s is missing from node(s) %s", filename,
2244 map(self.cfg.GetNodeName, missing_file))))
2246 # Warn if a node has a file it shouldn't
2247 unexpected = with_file - expected_nodes
2248 self._ErrorIf(unexpected,
2249 constants.CV_ECLUSTERFILECHECK, None,
2250 "File %s should not exist on node(s) %s",
2251 filename, utils.CommaJoin(
2252 utils.NiceSort(map(self.cfg.GetNodeName, unexpected))))
2254 # See if there are multiple versions of the file
2255 test = len(checksums) > 1
2257 variants = ["variant %s on %s" %
2259 utils.CommaJoin(utils.NiceSort(
2260 map(self.cfg.GetNodeName, node_uuids))))
2261 for (idx, (checksum, node_uuids)) in
2262 enumerate(sorted(checksums.items()))]
2266 self._ErrorIf(test, constants.CV_ECLUSTERFILECHECK, None,
2267 "File %s found with %s different checksums (%s)",
2268 filename, len(checksums), "; ".join(variants))
2270 def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2272 """Verifies and the node DRBD status.
2274 @type ninfo: L{objects.Node}
2275 @param ninfo: the node to check
2276 @param nresult: the remote results for the node
2277 @param instanceinfo: the dict of instances
2278 @param drbd_helper: the configured DRBD usermode helper
2279 @param drbd_map: the DRBD map as returned by
2280 L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2284 helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2285 test = (helper_result is None)
2286 self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2287 "no drbd usermode helper returned")
2289 status, payload = helper_result
2291 self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2292 "drbd usermode helper check unsuccessful: %s", payload)
2293 test = status and (payload != drbd_helper)
2294 self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2295 "wrong drbd usermode helper: %s", payload)
2297 # compute the DRBD minors
2299 for minor, inst_uuid in drbd_map[ninfo.uuid].items():
2300 test = inst_uuid not in instanceinfo
2301 self._ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2302 "ghost instance '%s' in temporary DRBD map", inst_uuid)
2303 # ghost instance should not be running, but otherwise we
2304 # don't give double warnings (both ghost instance and
2305 # unallocated minor in use)
2307 node_drbd[minor] = (inst_uuid, False)
2309 instance = instanceinfo[inst_uuid]
2310 node_drbd[minor] = (inst_uuid, instance.disks_active)
2312 # and now check them
2313 used_minors = nresult.get(constants.NV_DRBDLIST, [])
2314 test = not isinstance(used_minors, (tuple, list))
2315 self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2316 "cannot parse drbd status file: %s", str(used_minors))
2318 # we cannot check drbd status
2321 for minor, (inst_uuid, must_exist) in node_drbd.items():
2322 test = minor not in used_minors and must_exist
2323 self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2324 "drbd minor %d of instance %s is not active", minor,
2325 self.cfg.GetInstanceName(inst_uuid))
2326 for minor in used_minors:
2327 test = minor not in node_drbd
2328 self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2329 "unallocated drbd minor %d is in use", minor)
2331 def _UpdateNodeOS(self, ninfo, nresult, nimg):
2332 """Builds the node OS structures.
2334 @type ninfo: L{objects.Node}
2335 @param ninfo: the node to check
2336 @param nresult: the remote results for the node
2337 @param nimg: the node image object
2340 remote_os = nresult.get(constants.NV_OSLIST, None)
2341 test = (not isinstance(remote_os, list) or
2342 not compat.all(isinstance(v, list) and len(v) == 7
2343 for v in remote_os))
2345 self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2346 "node hasn't returned valid OS data")
2355 for (name, os_path, status, diagnose,
2356 variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2358 if name not in os_dict:
2361 # parameters is a list of lists instead of list of tuples due to
2362 # JSON lacking a real tuple type, fix it:
2363 parameters = [tuple(v) for v in parameters]
2364 os_dict[name].append((os_path, status, diagnose,
2365 set(variants), set(parameters), set(api_ver)))
2367 nimg.oslist = os_dict
2369 def _VerifyNodeOS(self, ninfo, nimg, base):
2370 """Verifies the node OS list.
2372 @type ninfo: L{objects.Node}
2373 @param ninfo: the node to check
2374 @param nimg: the node image object
2375 @param base: the 'template' node we match against (e.g. from the master)
2378 assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2380 beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2381 for os_name, os_data in nimg.oslist.items():
2382 assert os_data, "Empty OS status for OS %s?!" % os_name
2383 f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2384 self._ErrorIf(not f_status, constants.CV_ENODEOS, ninfo.name,
2385 "Invalid OS %s (located at %s): %s",
2386 os_name, f_path, f_diag)
2387 self._ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, ninfo.name,
2388 "OS '%s' has multiple entries"
2389 " (first one shadows the rest): %s",
2390 os_name, utils.CommaJoin([v[0] for v in os_data]))
2391 # comparisons with the 'base' image
2392 test = os_name not in base.oslist
2393 self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2394 "Extra OS %s not present on reference node (%s)",
2395 os_name, self.cfg.GetNodeName(base.uuid))
2398 assert base.oslist[os_name], "Base node has empty OS status?"
2399 _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2401 # base OS is invalid, skipping
2403 for kind, a, b in [("API version", f_api, b_api),
2404 ("variants list", f_var, b_var),
2405 ("parameters", beautify_params(f_param),
2406 beautify_params(b_param))]:
2407 self._ErrorIf(a != b, constants.CV_ENODEOS, ninfo.name,
2408 "OS %s for %s differs from reference node %s:"
2409 " [%s] vs. [%s]", kind, os_name,
2410 self.cfg.GetNodeName(base.uuid),
2411 utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2413 # check any missing OSes
2414 missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2415 self._ErrorIf(missing, constants.CV_ENODEOS, ninfo.name,
2416 "OSes present on reference node %s"
2417 " but missing on this node: %s",
2418 self.cfg.GetNodeName(base.uuid), utils.CommaJoin(missing))
2420 def _VerifyAcceptedFileStoragePaths(self, ninfo, nresult, is_master):
2421 """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2423 @type ninfo: L{objects.Node}
2424 @param ninfo: the node to check
2425 @param nresult: the remote results for the node
2426 @type is_master: bool
2427 @param is_master: Whether node is the master node
2430 cluster = self.cfg.GetClusterInfo()
2432 (cluster.IsFileStorageEnabled() or
2433 cluster.IsSharedFileStorageEnabled())):
2435 fspaths = nresult[constants.NV_ACCEPTED_STORAGE_PATHS]
2437 # This should never happen
2438 self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2439 "Node did not return forbidden file storage paths")
2441 self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2442 "Found forbidden file storage paths: %s",
2443 utils.CommaJoin(fspaths))
2445 self._ErrorIf(constants.NV_ACCEPTED_STORAGE_PATHS in nresult,
2446 constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2447 "Node should not have returned forbidden file storage"
2450 def _VerifyStoragePaths(self, ninfo, nresult, file_disk_template,
2451 verify_key, error_key):
2452 """Verifies (file) storage paths.
2454 @type ninfo: L{objects.Node}
2455 @param ninfo: the node to check
2456 @param nresult: the remote results for the node
2457 @type file_disk_template: string
2458 @param file_disk_template: file-based disk template, whose directory
2459 is supposed to be verified
2460 @type verify_key: string
2461 @param verify_key: key for the verification map of this file
2463 @param error_key: error key to be added to the verification results
2464 in case something goes wrong in this verification step
2467 assert (file_disk_template in
2468 utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
2469 cluster = self.cfg.GetClusterInfo()
2470 if cluster.IsDiskTemplateEnabled(file_disk_template):
2472 verify_key in nresult,
2473 error_key, ninfo.name,
2474 "The configured %s storage path is unusable: %s" %
2475 (file_disk_template, nresult.get(verify_key)))
2477 def _VerifyFileStoragePaths(self, ninfo, nresult):
2478 """Verifies (file) storage paths.
2480 @see: C{_VerifyStoragePaths}
2483 self._VerifyStoragePaths(
2484 ninfo, nresult, constants.DT_FILE,
2485 constants.NV_FILE_STORAGE_PATH,
2486 constants.CV_ENODEFILESTORAGEPATHUNUSABLE)
2488 def _VerifySharedFileStoragePaths(self, ninfo, nresult):
2489 """Verifies (file) storage paths.
2491 @see: C{_VerifyStoragePaths}
2494 self._VerifyStoragePaths(
2495 ninfo, nresult, constants.DT_SHARED_FILE,
2496 constants.NV_SHARED_FILE_STORAGE_PATH,
2497 constants.CV_ENODESHAREDFILESTORAGEPATHUNUSABLE)
2499 def _VerifyOob(self, ninfo, nresult):
2500 """Verifies out of band functionality of a node.
2502 @type ninfo: L{objects.Node}
2503 @param ninfo: the node to check
2504 @param nresult: the remote results for the node
2507 # We just have to verify the paths on master and/or master candidates
2508 # as the oob helper is invoked on the master
2509 if ((ninfo.master_candidate or ninfo.master_capable) and
2510 constants.NV_OOB_PATHS in nresult):
2511 for path_result in nresult[constants.NV_OOB_PATHS]:
2512 self._ErrorIf(path_result, constants.CV_ENODEOOBPATH,
2513 ninfo.name, path_result)
2515 def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2516 """Verifies and updates the node volume data.
2518 This function will update a L{NodeImage}'s internal structures
2519 with data from the remote call.
2521 @type ninfo: L{objects.Node}
2522 @param ninfo: the node to check
2523 @param nresult: the remote results for the node
2524 @param nimg: the node image object
2525 @param vg_name: the configured VG name
2528 nimg.lvm_fail = True
2529 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2532 elif isinstance(lvdata, basestring):
2533 self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2534 "LVM problem on node: %s", utils.SafeEncode(lvdata))
2535 elif not isinstance(lvdata, dict):
2536 self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2537 "rpc call to node failed (lvlist)")
2539 nimg.volumes = lvdata
2540 nimg.lvm_fail = False
2542 def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2543 """Verifies and updates the node instance list.
2545 If the listing was successful, then updates this node's instance
2546 list. Otherwise, it marks the RPC call as failed for the instance
2549 @type ninfo: L{objects.Node}
2550 @param ninfo: the node to check
2551 @param nresult: the remote results for the node
2552 @param nimg: the node image object
2555 idata = nresult.get(constants.NV_INSTANCELIST, None)
2556 test = not isinstance(idata, list)
2557 self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2558 "rpc call to node failed (instancelist): %s",
2559 utils.SafeEncode(str(idata)))
2561 nimg.hyp_fail = True
2563 nimg.instances = [inst.uuid for (_, inst) in
2564 self.cfg.GetMultiInstanceInfoByName(idata)]
2566 def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2567 """Verifies and computes a node information map
2569 @type ninfo: L{objects.Node}
2570 @param ninfo: the node to check
2571 @param nresult: the remote results for the node
2572 @param nimg: the node image object
2573 @param vg_name: the configured VG name
2576 # try to read free memory (from the hypervisor)
2577 hv_info = nresult.get(constants.NV_HVINFO, None)
2578 test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2579 self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2580 "rpc call to node failed (hvinfo)")
2583 nimg.mfree = int(hv_info["memory_free"])
2584 except (ValueError, TypeError):
2585 self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2586 "node returned invalid nodeinfo, check hypervisor")
2588 # FIXME: devise a free space model for file based instances as well
2589 if vg_name is not None:
2590 test = (constants.NV_VGLIST not in nresult or
2591 vg_name not in nresult[constants.NV_VGLIST])
2592 self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
2593 "node didn't return data for the volume group '%s'"
2594 " - it is either missing or broken", vg_name)
2597 nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2598 except (ValueError, TypeError):
2599 self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2600 "node returned invalid LVM info, check LVM status")
2602 def _CollectDiskInfo(self, node_uuids, node_image, instanceinfo):
2603 """Gets per-disk status information for all instances.
2605 @type node_uuids: list of strings
2606 @param node_uuids: Node UUIDs
2607 @type node_image: dict of (UUID, L{objects.Node})
2608 @param node_image: Node objects
2609 @type instanceinfo: dict of (UUID, L{objects.Instance})
2610 @param instanceinfo: Instance objects
2611 @rtype: {instance: {node: [(succes, payload)]}}
2612 @return: a dictionary of per-instance dictionaries with nodes as
2613 keys and disk information as values; the disk information is a
2614 list of tuples (success, payload)
2618 node_disks_devonly = {}
2619 diskless_instances = set()
2620 diskless = constants.DT_DISKLESS
2622 for nuuid in node_uuids:
2623 node_inst_uuids = list(itertools.chain(node_image[nuuid].pinst,
2624 node_image[nuuid].sinst))
2625 diskless_instances.update(uuid for uuid in node_inst_uuids
2626 if instanceinfo[uuid].disk_template == diskless)
2627 disks = [(inst_uuid, disk)
2628 for inst_uuid in node_inst_uuids
2629 for disk in instanceinfo[inst_uuid].disks]
2632 # No need to collect data
2635 node_disks[nuuid] = disks
2637 # _AnnotateDiskParams makes already copies of the disks
2639 for (inst_uuid, dev) in disks:
2640 (anno_disk,) = AnnotateDiskParams(instanceinfo[inst_uuid], [dev],
2642 self.cfg.SetDiskID(anno_disk, nuuid)
2643 devonly.append(anno_disk)
2645 node_disks_devonly[nuuid] = devonly
2647 assert len(node_disks) == len(node_disks_devonly)
2649 # Collect data from all nodes with disks
2650 result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2653 assert len(result) == len(node_disks)
2657 for (nuuid, nres) in result.items():
2658 node = self.cfg.GetNodeInfo(nuuid)
2659 disks = node_disks[node.uuid]
2662 # No data from this node
2663 data = len(disks) * [(False, "node offline")]
2666 self._ErrorIf(msg, constants.CV_ENODERPC, node.name,
2667 "while getting disk information: %s", msg)
2669 # No data from this node
2670 data = len(disks) * [(False, msg)]
2673 for idx, i in enumerate(nres.payload):
2674 if isinstance(i, (tuple, list)) and len(i) == 2:
2677 logging.warning("Invalid result from node %s, entry %d: %s",
2679 data.append((False, "Invalid result from the remote node"))
2681 for ((inst_uuid, _), status) in zip(disks, data):
2682 instdisk.setdefault(inst_uuid, {}).setdefault(node.uuid, []) \
2685 # Add empty entries for diskless instances.
2686 for inst_uuid in diskless_instances:
2687 assert inst_uuid not in instdisk
2688 instdisk[inst_uuid] = {}
2690 assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2691 len(nuuids) <= len(instanceinfo[inst].all_nodes) and
2692 compat.all(isinstance(s, (tuple, list)) and
2693 len(s) == 2 for s in statuses)
2694 for inst, nuuids in instdisk.items()
2695 for nuuid, statuses in nuuids.items())
2697 instdisk_keys = set(instdisk)
2698 instanceinfo_keys = set(instanceinfo)
2699 assert instdisk_keys == instanceinfo_keys, \
2700 ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2701 (instdisk_keys, instanceinfo_keys))
2706 def _SshNodeSelector(group_uuid, all_nodes):
2707 """Create endless iterators for all potential SSH check hosts.
2710 nodes = [node for node in all_nodes
2711 if (node.group != group_uuid and
2713 keyfunc = operator.attrgetter("group")
2715 return map(itertools.cycle,
2716 [sorted(map(operator.attrgetter("name"), names))
2717 for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2721 def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2722 """Choose which nodes should talk to which other nodes.
2724 We will make nodes contact all nodes in their group, and one node from
2727 @warning: This algorithm has a known issue if one node group is much
2728 smaller than others (e.g. just one node). In such a case all other
2729 nodes will talk to the single node.
2732 online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2733 sel = cls._SshNodeSelector(group_uuid, all_nodes)
2735 return (online_nodes,
2736 dict((name, sorted([i.next() for i in sel]))
2737 for name in online_nodes))
2739 def BuildHooksEnv(self):
2742 Cluster-Verify hooks just ran in the post phase and their failure makes
2743 the output be logged in the verify output and the verification to fail.
2747 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2750 env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2751 for node in self.my_node_info.values())
2755 def BuildHooksNodes(self):
2756 """Build hooks nodes.
2759 return ([], list(self.my_node_info.keys()))
2761 def Exec(self, feedback_fn):
2762 """Verify integrity of the node group, performing various test on nodes.
2765 # This method has too many local variables. pylint: disable=R0914
2766 feedback_fn("* Verifying group '%s'" % self.group_info.name)
2768 if not self.my_node_uuids:
2770 feedback_fn("* Empty node group, skipping verification")
2774 verbose = self.op.verbose
2775 self._feedback_fn = feedback_fn
2777 vg_name = self.cfg.GetVGName()
2778 drbd_helper = self.cfg.GetDRBDHelper()
2779 cluster = self.cfg.GetClusterInfo()
2780 hypervisors = cluster.enabled_hypervisors
2781 node_data_list = self.my_node_info.values()
2783 i_non_redundant = [] # Non redundant instances
2784 i_non_a_balanced = [] # Non auto-balanced instances
2785 i_offline = 0 # Count of offline instances
2786 n_offline = 0 # Count of offline nodes
2787 n_drained = 0 # Count of nodes being drained
2788 node_vol_should = {}
2790 # FIXME: verify OS list
2793 filemap = ComputeAncillaryFiles(cluster, False)
2795 # do local checksums
2796 master_node_uuid = self.master_node = self.cfg.GetMasterNode()
2797 master_ip = self.cfg.GetMasterIP()
2799 feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_uuids))
2802 if self.cfg.GetUseExternalMipScript():
2803 user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2805 node_verify_param = {
2806 constants.NV_FILELIST:
2807 map(vcluster.MakeVirtualPath,
2808 utils.UniqueSequence(filename
2809 for files in filemap
2810 for filename in files)),
2811 constants.NV_NODELIST:
2812 self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2813 self.all_node_info.values()),
2814 constants.NV_HYPERVISOR: hypervisors,
2815 constants.NV_HVPARAMS:
2816 _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2817 constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2818 for node in node_data_list
2819 if not node.offline],
2820 constants.NV_INSTANCELIST: hypervisors,
2821 constants.NV_VERSION: None,
2822 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2823 constants.NV_NODESETUP: None,
2824 constants.NV_TIME: None,
2825 constants.NV_MASTERIP: (self.cfg.GetMasterNodeName(), master_ip),
2826 constants.NV_OSLIST: None,
2827 constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2828 constants.NV_USERSCRIPTS: user_scripts,
2831 if vg_name is not None:
2832 node_verify_param[constants.NV_VGLIST] = None
2833 node_verify_param[constants.NV_LVLIST] = vg_name
2834 node_verify_param[constants.NV_PVLIST] = [vg_name]
2837 node_verify_param[constants.NV_DRBDVERSION] = None
2838 node_verify_param[constants.NV_DRBDLIST] = None
2839 node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2841 if cluster.IsFileStorageEnabled() or \
2842 cluster.IsSharedFileStorageEnabled():
2843 # Load file storage paths only from master node
2844 node_verify_param[constants.NV_ACCEPTED_STORAGE_PATHS] = \
2845 self.cfg.GetMasterNodeName()
2846 if cluster.IsFileStorageEnabled():
2847 node_verify_param[constants.NV_FILE_STORAGE_PATH] = \
2848 cluster.file_storage_dir
2851 # FIXME: this needs to be changed per node-group, not cluster-wide
2853 default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2854 if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2855 bridges.add(default_nicpp[constants.NIC_LINK])
2856 for inst_uuid in self.my_inst_info.values():
2857 for nic in inst_uuid.nics:
2858 full_nic = cluster.SimpleFillNIC(nic.nicparams)
2859 if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2860 bridges.add(full_nic[constants.NIC_LINK])
2863 node_verify_param[constants.NV_BRIDGES] = list(bridges)
2865 # Build our expected cluster state
2866 node_image = dict((node.uuid, self.NodeImage(offline=node.offline,
2868 vm_capable=node.vm_capable))
2869 for node in node_data_list)
2873 for node in self.all_node_info.values():
2874 path = SupportsOob(self.cfg, node)
2875 if path and path not in oob_paths:
2876 oob_paths.append(path)
2879 node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2881 for inst_uuid in self.my_inst_uuids:
2882 instance = self.my_inst_info[inst_uuid]
2883 if instance.admin_state == constants.ADMINST_OFFLINE:
2886 for nuuid in instance.all_nodes:
2887 if nuuid not in node_image:
2888 gnode = self.NodeImage(uuid=nuuid)
2889 gnode.ghost = (nuuid not in self.all_node_info)
2890 node_image[nuuid] = gnode
2892 instance.MapLVsByNode(node_vol_should)
2894 pnode = instance.primary_node
2895 node_image[pnode].pinst.append(instance.uuid)
2897 for snode in instance.secondary_nodes:
2898 nimg = node_image[snode]
2899 nimg.sinst.append(instance.uuid)
2900 if pnode not in nimg.sbp:
2901 nimg.sbp[pnode] = []
2902 nimg.sbp[pnode].append(instance.uuid)
2904 es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
2905 self.my_node_info.keys())
2906 # The value of exclusive_storage should be the same across the group, so if
2907 # it's True for at least a node, we act as if it were set for all the nodes
2908 self._exclusive_storage = compat.any(es_flags.values())
2909 if self._exclusive_storage:
2910 node_verify_param[constants.NV_EXCLUSIVEPVS] = True
2912 # At this point, we have the in-memory data structures complete,
2913 # except for the runtime information, which we'll gather next
2915 # Due to the way our RPC system works, exact response times cannot be
2916 # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2917 # time before and after executing the request, we can at least have a time
2919 nvinfo_starttime = time.time()
2920 all_nvinfo = self.rpc.call_node_verify(self.my_node_uuids,
2922 self.cfg.GetClusterName(),
2923 self.cfg.GetClusterInfo().hvparams)
2924 nvinfo_endtime = time.time()
2926 if self.extra_lv_nodes and vg_name is not None:
2928 self.rpc.call_node_verify(self.extra_lv_nodes,
2929 {constants.NV_LVLIST: vg_name},
2930 self.cfg.GetClusterName(),
2931 self.cfg.GetClusterInfo().hvparams)
2933 extra_lv_nvinfo = {}
2935 all_drbd_map = self.cfg.ComputeDRBDMap()
2937 feedback_fn("* Gathering disk information (%s nodes)" %
2938 len(self.my_node_uuids))
2939 instdisk = self._CollectDiskInfo(self.my_node_info.keys(), node_image,
2942 feedback_fn("* Verifying configuration file consistency")
2944 # If not all nodes are being checked, we need to make sure the master node
2945 # and a non-checked vm_capable node are in the list.
2946 absent_node_uuids = set(self.all_node_info).difference(self.my_node_info)
2947 if absent_node_uuids:
2948 vf_nvinfo = all_nvinfo.copy()
2949 vf_node_info = list(self.my_node_info.values())
2950 additional_node_uuids = []
2951 if master_node_uuid not in self.my_node_info:
2952 additional_node_uuids.append(master_node_uuid)
2953 vf_node_info.append(self.all_node_info[master_node_uuid])
2954 # Add the first vm_capable node we find which is not included,
2955 # excluding the master node (which we already have)
2956 for node_uuid in absent_node_uuids:
2957 nodeinfo = self.all_node_info[node_uuid]
2958 if (nodeinfo.vm_capable and not nodeinfo.offline and
2959 node_uuid != master_node_uuid):
2960 additional_node_uuids.append(node_uuid)
2961 vf_node_info.append(self.all_node_info[node_uuid])
2963 key = constants.NV_FILELIST
2964 vf_nvinfo.update(self.rpc.call_node_verify(
2965 additional_node_uuids, {key: node_verify_param[key]},
2966 self.cfg.GetClusterName(), self.cfg.GetClusterInfo().hvparams))
2968 vf_nvinfo = all_nvinfo
2969 vf_node_info = self.my_node_info.values()
2971 self._VerifyFiles(vf_node_info, master_node_uuid, vf_nvinfo, filemap)
2973 feedback_fn("* Verifying node status")
2977 for node_i in node_data_list:
2978 nimg = node_image[node_i.uuid]
2982 feedback_fn("* Skipping offline node %s" % (node_i.name,))
2986 if node_i.uuid == master_node_uuid:
2988 elif node_i.master_candidate:
2989 ntype = "master candidate"
2990 elif node_i.drained:
2996 feedback_fn("* Verifying node %s (%s)" % (node_i.name, ntype))
2998 msg = all_nvinfo[node_i.uuid].fail_msg
2999 self._ErrorIf(msg, constants.CV_ENODERPC, node_i.name,
3000 "while contacting node: %s", msg)
3002 nimg.rpc_fail = True
3005 nresult = all_nvinfo[node_i.uuid].payload
3007 nimg.call_ok = self._VerifyNode(node_i, nresult)
3008 self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
3009 self._VerifyNodeNetwork(node_i, nresult)
3010 self._VerifyNodeUserScripts(node_i, nresult)
3011 self._VerifyOob(node_i, nresult)
3012 self._VerifyAcceptedFileStoragePaths(node_i, nresult,
3013 node_i.uuid == master_node_uuid)
3014 self._VerifyFileStoragePaths(node_i, nresult)
3015 self._VerifySharedFileStoragePaths(node_i, nresult)
3018 self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
3019 self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
3022 self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
3023 self._UpdateNodeInstances(node_i, nresult, nimg)
3024 self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
3025 self._UpdateNodeOS(node_i, nresult, nimg)
3027 if not nimg.os_fail:
3028 if refos_img is None:
3030 self._VerifyNodeOS(node_i, nimg, refos_img)
3031 self._VerifyNodeBridges(node_i, nresult, bridges)
3033 # Check whether all running instances are primary for the node. (This
3034 # can no longer be done from _VerifyInstance below, since some of the
3035 # wrong instances could be from other node groups.)
3036 non_primary_inst_uuids = set(nimg.instances).difference(nimg.pinst)
3038 for inst_uuid in non_primary_inst_uuids:
3039 test = inst_uuid in self.all_inst_info
3040 self._ErrorIf(test, constants.CV_EINSTANCEWRONGNODE,
3041 self.cfg.GetInstanceName(inst_uuid),
3042 "instance should not run on node %s", node_i.name)
3043 self._ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
3044 "node is running unknown instance %s", inst_uuid)
3046 self._VerifyGroupDRBDVersion(all_nvinfo)
3047 self._VerifyGroupLVM(node_image, vg_name)
3049 for node_uuid, result in extra_lv_nvinfo.items():
3050 self._UpdateNodeVolumes(self.all_node_info[node_uuid], result.payload,
3051 node_image[node_uuid], vg_name)
3053 feedback_fn("* Verifying instance status")
3054 for inst_uuid in self.my_inst_uuids:
3055 instance = self.my_inst_info[inst_uuid]
3057 feedback_fn("* Verifying instance %s" % instance.name)
3058 self._VerifyInstance(instance, node_image, instdisk[inst_uuid])
3060 # If the instance is non-redundant we cannot survive losing its primary
3061 # node, so we are not N+1 compliant.
3062 if instance.disk_template not in constants.DTS_MIRRORED:
3063 i_non_redundant.append(instance)
3065 if not cluster.FillBE(instance)[constants.BE_AUTO_BALANCE]:
3066 i_non_a_balanced.append(instance)
3068 feedback_fn("* Verifying orphan volumes")
3069 reserved = utils.FieldSet(*cluster.reserved_lvs)
3071 # We will get spurious "unknown volume" warnings if any node of this group
3072 # is secondary for an instance whose primary is in another group. To avoid
3073 # them, we find these instances and add their volumes to node_vol_should.
3074 for instance in self.all_inst_info.values():
3075 for secondary in instance.secondary_nodes:
3076 if (secondary in self.my_node_info
3077 and instance.name not in self.my_inst_info):
3078 instance.MapLVsByNode(node_vol_should)
3081 self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
3083 if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
3084 feedback_fn("* Verifying N+1 Memory redundancy")
3085 self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
3087 feedback_fn("* Other Notes")
3089 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
3090 % len(i_non_redundant))
3092 if i_non_a_balanced:
3093 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
3094 % len(i_non_a_balanced))
3097 feedback_fn(" - NOTICE: %d offline instance(s) found." % i_offline)
3100 feedback_fn(" - NOTICE: %d offline node(s) found." % n_offline)
3103 feedback_fn(" - NOTICE: %d drained node(s) found." % n_drained)
3107 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
3108 """Analyze the post-hooks' result
3110 This method analyses the hook result, handles it, and sends some
3111 nicely-formatted feedback back to the user.
3113 @param phase: one of L{constants.HOOKS_PHASE_POST} or
3114 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
3115 @param hooks_results: the results of the multi-node hooks rpc call
3116 @param feedback_fn: function used send feedback back to the caller
3117 @param lu_result: previous Exec result
3118 @return: the new Exec result, based on the previous result
3122 # We only really run POST phase hooks, only for non-empty groups,
3123 # and are only interested in their results
3124 if not self.my_node_uuids:
3127 elif phase == constants.HOOKS_PHASE_POST:
3128 # Used to change hooks' output to proper indentation
3129 feedback_fn("* Hooks Results")
3130 assert hooks_results, "invalid result from hooks"
3132 for node_name in hooks_results:
3133 res = hooks_results[node_name]
3135 test = msg and not res.offline
3136 self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3137 "Communication failure in hooks execution: %s", msg)
3138 if res.offline or msg:
3139 # No need to investigate payload if node is offline or gave
3142 for script, hkr, output in res.payload:
3143 test = hkr == constants.HKR_FAIL
3144 self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3145 "Script %s failed, output:", script)
3147 output = self._HOOKS_INDENT_RE.sub(" ", output)
3148 feedback_fn("%s" % output)
3154 class LUClusterVerifyDisks(NoHooksLU):
3155 """Verifies the cluster disks status.
3160 def ExpandNames(self):
3161 self.share_locks = ShareAll()
3162 self.needed_locks = {
3163 locking.LEVEL_NODEGROUP: locking.ALL_SET,
3166 def Exec(self, feedback_fn):
3167 group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3169 # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3170 return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3171 for group in group_names])