ClusterSetParams: move vg-name checks from to CheckPrereq
[ganeti-local] / lib / cmdlib / cluster.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Logical units dealing with the cluster."""
23
24 import OpenSSL
25
26 import copy
27 import itertools
28 import logging
29 import operator
30 import os
31 import re
32 import time
33
34 from ganeti import compat
35 from ganeti import constants
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import masterd
40 from ganeti import netutils
41 from ganeti import objects
42 from ganeti import opcodes
43 from ganeti import pathutils
44 from ganeti import query
45 from ganeti import rpc
46 from ganeti import runtime
47 from ganeti import ssh
48 from ganeti import uidpool
49 from ganeti import utils
50 from ganeti import vcluster
51
52 from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
53   ResultWithJobs
54 from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55   ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56   GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57   GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58   CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59   ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob, \
60   CheckIpolicyVsDiskTemplates
61
62 import ganeti.masterd.instance
63
64
65 class LUClusterActivateMasterIp(NoHooksLU):
66   """Activate the master IP on the master node.
67
68   """
69   def Exec(self, feedback_fn):
70     """Activate the master IP.
71
72     """
73     master_params = self.cfg.GetMasterNetworkParameters()
74     ems = self.cfg.GetUseExternalMipScript()
75     result = self.rpc.call_node_activate_master_ip(master_params.uuid,
76                                                    master_params, ems)
77     result.Raise("Could not activate the master IP")
78
79
80 class LUClusterDeactivateMasterIp(NoHooksLU):
81   """Deactivate the master IP on the master node.
82
83   """
84   def Exec(self, feedback_fn):
85     """Deactivate the master IP.
86
87     """
88     master_params = self.cfg.GetMasterNetworkParameters()
89     ems = self.cfg.GetUseExternalMipScript()
90     result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
91                                                      master_params, ems)
92     result.Raise("Could not deactivate the master IP")
93
94
95 class LUClusterConfigQuery(NoHooksLU):
96   """Return configuration values.
97
98   """
99   REQ_BGL = False
100
101   def CheckArguments(self):
102     self.cq = ClusterQuery(None, self.op.output_fields, False)
103
104   def ExpandNames(self):
105     self.cq.ExpandNames(self)
106
107   def DeclareLocks(self, level):
108     self.cq.DeclareLocks(self, level)
109
110   def Exec(self, feedback_fn):
111     result = self.cq.OldStyleQuery(self)
112
113     assert len(result) == 1
114
115     return result[0]
116
117
118 class LUClusterDestroy(LogicalUnit):
119   """Logical unit for destroying the cluster.
120
121   """
122   HPATH = "cluster-destroy"
123   HTYPE = constants.HTYPE_CLUSTER
124
125   def BuildHooksEnv(self):
126     """Build hooks env.
127
128     """
129     return {
130       "OP_TARGET": self.cfg.GetClusterName(),
131       }
132
133   def BuildHooksNodes(self):
134     """Build hooks nodes.
135
136     """
137     return ([], [])
138
139   def CheckPrereq(self):
140     """Check prerequisites.
141
142     This checks whether the cluster is empty.
143
144     Any errors are signaled by raising errors.OpPrereqError.
145
146     """
147     master = self.cfg.GetMasterNode()
148
149     nodelist = self.cfg.GetNodeList()
150     if len(nodelist) != 1 or nodelist[0] != master:
151       raise errors.OpPrereqError("There are still %d node(s) in"
152                                  " this cluster." % (len(nodelist) - 1),
153                                  errors.ECODE_INVAL)
154     instancelist = self.cfg.GetInstanceList()
155     if instancelist:
156       raise errors.OpPrereqError("There are still %d instance(s) in"
157                                  " this cluster." % len(instancelist),
158                                  errors.ECODE_INVAL)
159
160   def Exec(self, feedback_fn):
161     """Destroys the cluster.
162
163     """
164     master_params = self.cfg.GetMasterNetworkParameters()
165
166     # Run post hooks on master node before it's removed
167     RunPostHook(self, self.cfg.GetNodeName(master_params.uuid))
168
169     ems = self.cfg.GetUseExternalMipScript()
170     result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
171                                                      master_params, ems)
172     result.Warn("Error disabling the master IP address", self.LogWarning)
173     return master_params.uuid
174
175
176 class LUClusterPostInit(LogicalUnit):
177   """Logical unit for running hooks after cluster initialization.
178
179   """
180   HPATH = "cluster-init"
181   HTYPE = constants.HTYPE_CLUSTER
182
183   def BuildHooksEnv(self):
184     """Build hooks env.
185
186     """
187     return {
188       "OP_TARGET": self.cfg.GetClusterName(),
189       }
190
191   def BuildHooksNodes(self):
192     """Build hooks nodes.
193
194     """
195     return ([], [self.cfg.GetMasterNode()])
196
197   def Exec(self, feedback_fn):
198     """Nothing to do.
199
200     """
201     return True
202
203
204 class ClusterQuery(QueryBase):
205   FIELDS = query.CLUSTER_FIELDS
206
207   #: Do not sort (there is only one item)
208   SORT_FIELD = None
209
210   def ExpandNames(self, lu):
211     lu.needed_locks = {}
212
213     # The following variables interact with _QueryBase._GetNames
214     self.wanted = locking.ALL_SET
215     self.do_locking = self.use_locking
216
217     if self.do_locking:
218       raise errors.OpPrereqError("Can not use locking for cluster queries",
219                                  errors.ECODE_INVAL)
220
221   def DeclareLocks(self, lu, level):
222     pass
223
224   def _GetQueryData(self, lu):
225     """Computes the list of nodes and their attributes.
226
227     """
228     # Locking is not used
229     assert not (compat.any(lu.glm.is_owned(level)
230                            for level in locking.LEVELS
231                            if level != locking.LEVEL_CLUSTER) or
232                 self.do_locking or self.use_locking)
233
234     if query.CQ_CONFIG in self.requested_data:
235       cluster = lu.cfg.GetClusterInfo()
236       nodes = lu.cfg.GetAllNodesInfo()
237     else:
238       cluster = NotImplemented
239       nodes = NotImplemented
240
241     if query.CQ_QUEUE_DRAINED in self.requested_data:
242       drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
243     else:
244       drain_flag = NotImplemented
245
246     if query.CQ_WATCHER_PAUSE in self.requested_data:
247       master_node_uuid = lu.cfg.GetMasterNode()
248
249       result = lu.rpc.call_get_watcher_pause(master_node_uuid)
250       result.Raise("Can't retrieve watcher pause from master node '%s'" %
251                    lu.cfg.GetMasterNodeName())
252
253       watcher_pause = result.payload
254     else:
255       watcher_pause = NotImplemented
256
257     return query.ClusterQueryData(cluster, nodes, drain_flag, watcher_pause)
258
259
260 class LUClusterQuery(NoHooksLU):
261   """Query cluster configuration.
262
263   """
264   REQ_BGL = False
265
266   def ExpandNames(self):
267     self.needed_locks = {}
268
269   def Exec(self, feedback_fn):
270     """Return cluster config.
271
272     """
273     cluster = self.cfg.GetClusterInfo()
274     os_hvp = {}
275
276     # Filter just for enabled hypervisors
277     for os_name, hv_dict in cluster.os_hvp.items():
278       os_hvp[os_name] = {}
279       for hv_name, hv_params in hv_dict.items():
280         if hv_name in cluster.enabled_hypervisors:
281           os_hvp[os_name][hv_name] = hv_params
282
283     # Convert ip_family to ip_version
284     primary_ip_version = constants.IP4_VERSION
285     if cluster.primary_ip_family == netutils.IP6Address.family:
286       primary_ip_version = constants.IP6_VERSION
287
288     result = {
289       "software_version": constants.RELEASE_VERSION,
290       "protocol_version": constants.PROTOCOL_VERSION,
291       "config_version": constants.CONFIG_VERSION,
292       "os_api_version": max(constants.OS_API_VERSIONS),
293       "export_version": constants.EXPORT_VERSION,
294       "vcs_version": constants.VCS_VERSION,
295       "architecture": runtime.GetArchInfo(),
296       "name": cluster.cluster_name,
297       "master": self.cfg.GetMasterNodeName(),
298       "default_hypervisor": cluster.primary_hypervisor,
299       "enabled_hypervisors": cluster.enabled_hypervisors,
300       "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
301                         for hypervisor_name in cluster.enabled_hypervisors]),
302       "os_hvp": os_hvp,
303       "beparams": cluster.beparams,
304       "osparams": cluster.osparams,
305       "ipolicy": cluster.ipolicy,
306       "nicparams": cluster.nicparams,
307       "ndparams": cluster.ndparams,
308       "diskparams": cluster.diskparams,
309       "candidate_pool_size": cluster.candidate_pool_size,
310       "master_netdev": cluster.master_netdev,
311       "master_netmask": cluster.master_netmask,
312       "use_external_mip_script": cluster.use_external_mip_script,
313       "volume_group_name": cluster.volume_group_name,
314       "drbd_usermode_helper": cluster.drbd_usermode_helper,
315       "file_storage_dir": cluster.file_storage_dir,
316       "shared_file_storage_dir": cluster.shared_file_storage_dir,
317       "maintain_node_health": cluster.maintain_node_health,
318       "ctime": cluster.ctime,
319       "mtime": cluster.mtime,
320       "uuid": cluster.uuid,
321       "tags": list(cluster.GetTags()),
322       "uid_pool": cluster.uid_pool,
323       "default_iallocator": cluster.default_iallocator,
324       "reserved_lvs": cluster.reserved_lvs,
325       "primary_ip_version": primary_ip_version,
326       "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
327       "hidden_os": cluster.hidden_os,
328       "blacklisted_os": cluster.blacklisted_os,
329       "enabled_disk_templates": cluster.enabled_disk_templates,
330       }
331
332     return result
333
334
335 class LUClusterRedistConf(NoHooksLU):
336   """Force the redistribution of cluster configuration.
337
338   This is a very simple LU.
339
340   """
341   REQ_BGL = False
342
343   def ExpandNames(self):
344     self.needed_locks = {
345       locking.LEVEL_NODE: locking.ALL_SET,
346       locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
347     }
348     self.share_locks = ShareAll()
349
350   def Exec(self, feedback_fn):
351     """Redistribute the configuration.
352
353     """
354     self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
355     RedistributeAncillaryFiles(self)
356
357
358 class LUClusterRename(LogicalUnit):
359   """Rename the cluster.
360
361   """
362   HPATH = "cluster-rename"
363   HTYPE = constants.HTYPE_CLUSTER
364
365   def BuildHooksEnv(self):
366     """Build hooks env.
367
368     """
369     return {
370       "OP_TARGET": self.cfg.GetClusterName(),
371       "NEW_NAME": self.op.name,
372       }
373
374   def BuildHooksNodes(self):
375     """Build hooks nodes.
376
377     """
378     return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
379
380   def CheckPrereq(self):
381     """Verify that the passed name is a valid one.
382
383     """
384     hostname = netutils.GetHostname(name=self.op.name,
385                                     family=self.cfg.GetPrimaryIPFamily())
386
387     new_name = hostname.name
388     self.ip = new_ip = hostname.ip
389     old_name = self.cfg.GetClusterName()
390     old_ip = self.cfg.GetMasterIP()
391     if new_name == old_name and new_ip == old_ip:
392       raise errors.OpPrereqError("Neither the name nor the IP address of the"
393                                  " cluster has changed",
394                                  errors.ECODE_INVAL)
395     if new_ip != old_ip:
396       if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
397         raise errors.OpPrereqError("The given cluster IP address (%s) is"
398                                    " reachable on the network" %
399                                    new_ip, errors.ECODE_NOTUNIQUE)
400
401     self.op.name = new_name
402
403   def Exec(self, feedback_fn):
404     """Rename the cluster.
405
406     """
407     clustername = self.op.name
408     new_ip = self.ip
409
410     # shutdown the master IP
411     master_params = self.cfg.GetMasterNetworkParameters()
412     ems = self.cfg.GetUseExternalMipScript()
413     result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
414                                                      master_params, ems)
415     result.Raise("Could not disable the master role")
416
417     try:
418       cluster = self.cfg.GetClusterInfo()
419       cluster.cluster_name = clustername
420       cluster.master_ip = new_ip
421       self.cfg.Update(cluster, feedback_fn)
422
423       # update the known hosts file
424       ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
425       node_list = self.cfg.GetOnlineNodeList()
426       try:
427         node_list.remove(master_params.uuid)
428       except ValueError:
429         pass
430       UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
431     finally:
432       master_params.ip = new_ip
433       result = self.rpc.call_node_activate_master_ip(master_params.uuid,
434                                                      master_params, ems)
435       result.Warn("Could not re-enable the master role on the master,"
436                   " please restart manually", self.LogWarning)
437
438     return clustername
439
440
441 class LUClusterRepairDiskSizes(NoHooksLU):
442   """Verifies the cluster disks sizes.
443
444   """
445   REQ_BGL = False
446
447   def ExpandNames(self):
448     if self.op.instances:
449       (_, self.wanted_names) = GetWantedInstances(self, self.op.instances)
450       # Not getting the node allocation lock as only a specific set of
451       # instances (and their nodes) is going to be acquired
452       self.needed_locks = {
453         locking.LEVEL_NODE_RES: [],
454         locking.LEVEL_INSTANCE: self.wanted_names,
455         }
456       self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
457     else:
458       self.wanted_names = None
459       self.needed_locks = {
460         locking.LEVEL_NODE_RES: locking.ALL_SET,
461         locking.LEVEL_INSTANCE: locking.ALL_SET,
462
463         # This opcode is acquires the node locks for all instances
464         locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
465         }
466
467     self.share_locks = {
468       locking.LEVEL_NODE_RES: 1,
469       locking.LEVEL_INSTANCE: 0,
470       locking.LEVEL_NODE_ALLOC: 1,
471       }
472
473   def DeclareLocks(self, level):
474     if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
475       self._LockInstancesNodes(primary_only=True, level=level)
476
477   def CheckPrereq(self):
478     """Check prerequisites.
479
480     This only checks the optional instance list against the existing names.
481
482     """
483     if self.wanted_names is None:
484       self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
485
486     self.wanted_instances = \
487         map(compat.snd, self.cfg.GetMultiInstanceInfoByName(self.wanted_names))
488
489   def _EnsureChildSizes(self, disk):
490     """Ensure children of the disk have the needed disk size.
491
492     This is valid mainly for DRBD8 and fixes an issue where the
493     children have smaller disk size.
494
495     @param disk: an L{ganeti.objects.Disk} object
496
497     """
498     if disk.dev_type == constants.LD_DRBD8:
499       assert disk.children, "Empty children for DRBD8?"
500       fchild = disk.children[0]
501       mismatch = fchild.size < disk.size
502       if mismatch:
503         self.LogInfo("Child disk has size %d, parent %d, fixing",
504                      fchild.size, disk.size)
505         fchild.size = disk.size
506
507       # and we recurse on this child only, not on the metadev
508       return self._EnsureChildSizes(fchild) or mismatch
509     else:
510       return False
511
512   def Exec(self, feedback_fn):
513     """Verify the size of cluster disks.
514
515     """
516     # TODO: check child disks too
517     # TODO: check differences in size between primary/secondary nodes
518     per_node_disks = {}
519     for instance in self.wanted_instances:
520       pnode = instance.primary_node
521       if pnode not in per_node_disks:
522         per_node_disks[pnode] = []
523       for idx, disk in enumerate(instance.disks):
524         per_node_disks[pnode].append((instance, idx, disk))
525
526     assert not (frozenset(per_node_disks.keys()) -
527                 self.owned_locks(locking.LEVEL_NODE_RES)), \
528       "Not owning correct locks"
529     assert not self.owned_locks(locking.LEVEL_NODE)
530
531     es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
532                                                per_node_disks.keys())
533
534     changed = []
535     for node_uuid, dskl in per_node_disks.items():
536       newl = [v[2].Copy() for v in dskl]
537       for dsk in newl:
538         self.cfg.SetDiskID(dsk, node_uuid)
539       node_name = self.cfg.GetNodeName(node_uuid)
540       result = self.rpc.call_blockdev_getdimensions(node_uuid, newl)
541       if result.fail_msg:
542         self.LogWarning("Failure in blockdev_getdimensions call to node"
543                         " %s, ignoring", node_name)
544         continue
545       if len(result.payload) != len(dskl):
546         logging.warning("Invalid result from node %s: len(dksl)=%d,"
547                         " result.payload=%s", node_name, len(dskl),
548                         result.payload)
549         self.LogWarning("Invalid result from node %s, ignoring node results",
550                         node_name)
551         continue
552       for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
553         if dimensions is None:
554           self.LogWarning("Disk %d of instance %s did not return size"
555                           " information, ignoring", idx, instance.name)
556           continue
557         if not isinstance(dimensions, (tuple, list)):
558           self.LogWarning("Disk %d of instance %s did not return valid"
559                           " dimension information, ignoring", idx,
560                           instance.name)
561           continue
562         (size, spindles) = dimensions
563         if not isinstance(size, (int, long)):
564           self.LogWarning("Disk %d of instance %s did not return valid"
565                           " size information, ignoring", idx, instance.name)
566           continue
567         size = size >> 20
568         if size != disk.size:
569           self.LogInfo("Disk %d of instance %s has mismatched size,"
570                        " correcting: recorded %d, actual %d", idx,
571                        instance.name, disk.size, size)
572           disk.size = size
573           self.cfg.Update(instance, feedback_fn)
574           changed.append((instance.name, idx, "size", size))
575         if es_flags[node_uuid]:
576           if spindles is None:
577             self.LogWarning("Disk %d of instance %s did not return valid"
578                             " spindles information, ignoring", idx,
579                             instance.name)
580           elif disk.spindles is None or disk.spindles != spindles:
581             self.LogInfo("Disk %d of instance %s has mismatched spindles,"
582                          " correcting: recorded %s, actual %s",
583                          idx, instance.name, disk.spindles, spindles)
584             disk.spindles = spindles
585             self.cfg.Update(instance, feedback_fn)
586             changed.append((instance.name, idx, "spindles", disk.spindles))
587         if self._EnsureChildSizes(disk):
588           self.cfg.Update(instance, feedback_fn)
589           changed.append((instance.name, idx, "size", disk.size))
590     return changed
591
592
593 def _ValidateNetmask(cfg, netmask):
594   """Checks if a netmask is valid.
595
596   @type cfg: L{config.ConfigWriter}
597   @param cfg: The cluster configuration
598   @type netmask: int
599   @param netmask: the netmask to be verified
600   @raise errors.OpPrereqError: if the validation fails
601
602   """
603   ip_family = cfg.GetPrimaryIPFamily()
604   try:
605     ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
606   except errors.ProgrammerError:
607     raise errors.OpPrereqError("Invalid primary ip family: %s." %
608                                ip_family, errors.ECODE_INVAL)
609   if not ipcls.ValidateNetmask(netmask):
610     raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
611                                (netmask), errors.ECODE_INVAL)
612
613
614 def CheckFileBasedStoragePathVsEnabledDiskTemplates(
615     logging_warn_fn, file_storage_dir, enabled_disk_templates,
616     file_disk_template):
617   """Checks whether the given file-based storage directory is acceptable.
618
619   Note: This function is public, because it is also used in bootstrap.py.
620
621   @type logging_warn_fn: function
622   @param logging_warn_fn: function which accepts a string and logs it
623   @type file_storage_dir: string
624   @param file_storage_dir: the directory to be used for file-based instances
625   @type enabled_disk_templates: list of string
626   @param enabled_disk_templates: the list of enabled disk templates
627   @type file_disk_template: string
628   @param file_disk_template: the file-based disk template for which the
629       path should be checked
630
631   """
632   assert (file_disk_template in
633           utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
634   file_storage_enabled = file_disk_template in enabled_disk_templates
635   if file_storage_dir is not None:
636     if file_storage_dir == "":
637       if file_storage_enabled:
638         raise errors.OpPrereqError(
639             "Unsetting the '%s' storage directory while having '%s' storage"
640             " enabled is not permitted." %
641             (file_disk_template, file_disk_template))
642     else:
643       if not file_storage_enabled:
644         logging_warn_fn(
645             "Specified a %s storage directory, although %s storage is not"
646             " enabled." % (file_disk_template, file_disk_template))
647   else:
648     raise errors.ProgrammerError("Received %s storage dir with value"
649                                  " 'None'." % file_disk_template)
650
651
652 def CheckFileStoragePathVsEnabledDiskTemplates(
653     logging_warn_fn, file_storage_dir, enabled_disk_templates):
654   """Checks whether the given file storage directory is acceptable.
655
656   @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
657
658   """
659   CheckFileBasedStoragePathVsEnabledDiskTemplates(
660       logging_warn_fn, file_storage_dir, enabled_disk_templates,
661       constants.DT_FILE)
662
663
664 def CheckSharedFileStoragePathVsEnabledDiskTemplates(
665     logging_warn_fn, file_storage_dir, enabled_disk_templates):
666   """Checks whether the given shared file storage directory is acceptable.
667
668   @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
669
670   """
671   CheckFileBasedStoragePathVsEnabledDiskTemplates(
672       logging_warn_fn, file_storage_dir, enabled_disk_templates,
673       constants.DT_SHARED_FILE)
674
675
676 class LUClusterSetParams(LogicalUnit):
677   """Change the parameters of the cluster.
678
679   """
680   HPATH = "cluster-modify"
681   HTYPE = constants.HTYPE_CLUSTER
682   REQ_BGL = False
683
684   def CheckArguments(self):
685     """Check parameters
686
687     """
688     if self.op.uid_pool:
689       uidpool.CheckUidPool(self.op.uid_pool)
690
691     if self.op.add_uids:
692       uidpool.CheckUidPool(self.op.add_uids)
693
694     if self.op.remove_uids:
695       uidpool.CheckUidPool(self.op.remove_uids)
696
697     if self.op.master_netmask is not None:
698       _ValidateNetmask(self.cfg, self.op.master_netmask)
699
700     if self.op.diskparams:
701       for dt_params in self.op.diskparams.values():
702         utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
703       try:
704         utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
705       except errors.OpPrereqError, err:
706         raise errors.OpPrereqError("While verify diskparams options: %s" % err,
707                                    errors.ECODE_INVAL)
708
709   def ExpandNames(self):
710     # FIXME: in the future maybe other cluster params won't require checking on
711     # all nodes to be modified.
712     # FIXME: This opcode changes cluster-wide settings. Is acquiring all
713     # resource locks the right thing, shouldn't it be the BGL instead?
714     self.needed_locks = {
715       locking.LEVEL_NODE: locking.ALL_SET,
716       locking.LEVEL_INSTANCE: locking.ALL_SET,
717       locking.LEVEL_NODEGROUP: locking.ALL_SET,
718       locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
719     }
720     self.share_locks = ShareAll()
721
722   def BuildHooksEnv(self):
723     """Build hooks env.
724
725     """
726     return {
727       "OP_TARGET": self.cfg.GetClusterName(),
728       "NEW_VG_NAME": self.op.vg_name,
729       }
730
731   def BuildHooksNodes(self):
732     """Build hooks nodes.
733
734     """
735     mn = self.cfg.GetMasterNode()
736     return ([mn], [mn])
737
738   def _CheckVgName(self, node_uuids, enabled_disk_templates,
739                    new_enabled_disk_templates):
740     """Check the consistency of the vg name on all nodes and in case it gets
741        unset whether there are instances still using it.
742
743     """
744     lvm_is_enabled = utils.IsLvmEnabled(enabled_disk_templates)
745     lvm_gets_enabled = utils.LvmGetsEnabled(enabled_disk_templates,
746                                             new_enabled_disk_templates)
747     current_vg_name = self.cfg.GetVGName()
748
749     if self.op.vg_name == '':
750       if lvm_is_enabled:
751         raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
752                                    " disk templates are or get enabled.")
753
754     if self.op.vg_name is None:
755       if current_vg_name is None and lvm_is_enabled:
756         raise errors.OpPrereqError("Please specify a volume group when"
757                                    " enabling lvm-based disk-templates.")
758
759     if self.op.vg_name is not None and not self.op.vg_name:
760       if self.cfg.HasAnyDiskOfType(constants.LD_LV):
761         raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
762                                    " instances exist", errors.ECODE_INVAL)
763
764     if (self.op.vg_name is not None and lvm_is_enabled) or \
765         (self.cfg.GetVGName() is not None and lvm_gets_enabled):
766       self._CheckVgNameOnNodes(node_uuids)
767
768   def _CheckVgNameOnNodes(self, node_uuids):
769     """Check the status of the volume group on each node.
770
771     """
772     vglist = self.rpc.call_vg_list(node_uuids)
773     for node_uuid in node_uuids:
774       msg = vglist[node_uuid].fail_msg
775       if msg:
776         # ignoring down node
777         self.LogWarning("Error while gathering data on node %s"
778                         " (ignoring node): %s",
779                         self.cfg.GetNodeName(node_uuid), msg)
780         continue
781       vgstatus = utils.CheckVolumeGroupSize(vglist[node_uuid].payload,
782                                             self.op.vg_name,
783                                             constants.MIN_VG_SIZE)
784       if vgstatus:
785         raise errors.OpPrereqError("Error on node '%s': %s" %
786                                    (self.cfg.GetNodeName(node_uuid), vgstatus),
787                                    errors.ECODE_ENVIRON)
788
789   @staticmethod
790   def _GetEnabledDiskTemplatesInner(op_enabled_disk_templates,
791                                     old_enabled_disk_templates):
792     """Determines the enabled disk templates and the subset of disk templates
793        that are newly enabled by this operation.
794
795     """
796     enabled_disk_templates = None
797     new_enabled_disk_templates = []
798     if op_enabled_disk_templates:
799       enabled_disk_templates = op_enabled_disk_templates
800       new_enabled_disk_templates = \
801         list(set(enabled_disk_templates)
802              - set(old_enabled_disk_templates))
803     else:
804       enabled_disk_templates = old_enabled_disk_templates
805     return (enabled_disk_templates, new_enabled_disk_templates)
806
807   def _GetEnabledDiskTemplates(self, cluster):
808     """Determines the enabled disk templates and the subset of disk templates
809        that are newly enabled by this operation.
810
811     """
812     return self._GetEnabledDiskTemplatesInner(self.op.enabled_disk_templates,
813                                               cluster.enabled_disk_templates)
814
815   def _CheckIpolicy(self, cluster, enabled_disk_templates):
816     """Checks the ipolicy.
817
818     @type cluster: C{objects.Cluster}
819     @param cluster: the cluster's configuration
820     @type enabled_disk_templates: list of string
821     @param enabled_disk_templates: list of (possibly newly) enabled disk
822       templates
823
824     """
825     # FIXME: write unit tests for this
826     if self.op.ipolicy:
827       self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
828                                            group_policy=False)
829
830       CheckIpolicyVsDiskTemplates(self.new_ipolicy,
831                                   enabled_disk_templates)
832
833       all_instances = self.cfg.GetAllInstancesInfo().values()
834       violations = set()
835       for group in self.cfg.GetAllNodeGroupsInfo().values():
836         instances = frozenset([inst for inst in all_instances
837                                if compat.any(nuuid in group.members
838                                              for nuuid in inst.all_nodes)])
839         new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
840         ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
841         new = ComputeNewInstanceViolations(ipol, new_ipolicy, instances,
842                                            self.cfg)
843         if new:
844           violations.update(new)
845
846       if violations:
847         self.LogWarning("After the ipolicy change the following instances"
848                         " violate them: %s",
849                         utils.CommaJoin(utils.NiceSort(violations)))
850     else:
851       CheckIpolicyVsDiskTemplates(cluster.ipolicy,
852                                   enabled_disk_templates)
853
854   def CheckPrereq(self):
855     """Check prerequisites.
856
857     This checks whether the given params don't conflict and
858     if the given volume group is valid.
859
860     """
861     if self.op.drbd_helper is not None and not self.op.drbd_helper:
862       if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
863         raise errors.OpPrereqError("Cannot disable drbd helper while"
864                                    " drbd-based instances exist",
865                                    errors.ECODE_INVAL)
866
867     node_uuids = self.owned_locks(locking.LEVEL_NODE)
868     self.cluster = cluster = self.cfg.GetClusterInfo()
869
870     vm_capable_node_uuids = [node.uuid
871                              for node in self.cfg.GetAllNodesInfo().values()
872                              if node.uuid in node_uuids and node.vm_capable]
873
874     (enabled_disk_templates, new_enabled_disk_templates) = \
875       self._GetEnabledDiskTemplates(cluster)
876
877     self._CheckVgName(vm_capable_node_uuids, enabled_disk_templates,
878                       new_enabled_disk_templates)
879
880     if self.op.file_storage_dir is not None:
881       CheckFileStoragePathVsEnabledDiskTemplates(
882           self.LogWarning, self.op.file_storage_dir, enabled_disk_templates)
883
884     if self.op.shared_file_storage_dir is not None:
885       CheckSharedFileStoragePathVsEnabledDiskTemplates(
886           self.LogWarning, self.op.shared_file_storage_dir,
887           enabled_disk_templates)
888
889     if self.op.drbd_helper:
890       # checks given drbd helper on all nodes
891       helpers = self.rpc.call_drbd_helper(node_uuids)
892       for (_, ninfo) in self.cfg.GetMultiNodeInfo(node_uuids):
893         if ninfo.offline:
894           self.LogInfo("Not checking drbd helper on offline node %s",
895                        ninfo.name)
896           continue
897         msg = helpers[ninfo.uuid].fail_msg
898         if msg:
899           raise errors.OpPrereqError("Error checking drbd helper on node"
900                                      " '%s': %s" % (ninfo.name, msg),
901                                      errors.ECODE_ENVIRON)
902         node_helper = helpers[ninfo.uuid].payload
903         if node_helper != self.op.drbd_helper:
904           raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
905                                      (ninfo.name, node_helper),
906                                      errors.ECODE_ENVIRON)
907
908     # validate params changes
909     if self.op.beparams:
910       objects.UpgradeBeParams(self.op.beparams)
911       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
912       self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
913
914     if self.op.ndparams:
915       utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
916       self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
917
918       # TODO: we need a more general way to handle resetting
919       # cluster-level parameters to default values
920       if self.new_ndparams["oob_program"] == "":
921         self.new_ndparams["oob_program"] = \
922             constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
923
924     if self.op.hv_state:
925       new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
926                                            self.cluster.hv_state_static)
927       self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
928                                for hv, values in new_hv_state.items())
929
930     if self.op.disk_state:
931       new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
932                                                self.cluster.disk_state_static)
933       self.new_disk_state = \
934         dict((storage, dict((name, cluster.SimpleFillDiskState(values))
935                             for name, values in svalues.items()))
936              for storage, svalues in new_disk_state.items())
937
938     self._CheckIpolicy(cluster, enabled_disk_templates)
939
940     if self.op.nicparams:
941       utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
942       self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
943       objects.NIC.CheckParameterSyntax(self.new_nicparams)
944       nic_errors = []
945
946       # check all instances for consistency
947       for instance in self.cfg.GetAllInstancesInfo().values():
948         for nic_idx, nic in enumerate(instance.nics):
949           params_copy = copy.deepcopy(nic.nicparams)
950           params_filled = objects.FillDict(self.new_nicparams, params_copy)
951
952           # check parameter syntax
953           try:
954             objects.NIC.CheckParameterSyntax(params_filled)
955           except errors.ConfigurationError, err:
956             nic_errors.append("Instance %s, nic/%d: %s" %
957                               (instance.name, nic_idx, err))
958
959           # if we're moving instances to routed, check that they have an ip
960           target_mode = params_filled[constants.NIC_MODE]
961           if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
962             nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
963                               " address" % (instance.name, nic_idx))
964       if nic_errors:
965         raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
966                                    "\n".join(nic_errors), errors.ECODE_INVAL)
967
968     # hypervisor list/parameters
969     self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
970     if self.op.hvparams:
971       for hv_name, hv_dict in self.op.hvparams.items():
972         if hv_name not in self.new_hvparams:
973           self.new_hvparams[hv_name] = hv_dict
974         else:
975           self.new_hvparams[hv_name].update(hv_dict)
976
977     # disk template parameters
978     self.new_diskparams = objects.FillDict(cluster.diskparams, {})
979     if self.op.diskparams:
980       for dt_name, dt_params in self.op.diskparams.items():
981         if dt_name not in self.new_diskparams:
982           self.new_diskparams[dt_name] = dt_params
983         else:
984           self.new_diskparams[dt_name].update(dt_params)
985
986     # os hypervisor parameters
987     self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
988     if self.op.os_hvp:
989       for os_name, hvs in self.op.os_hvp.items():
990         if os_name not in self.new_os_hvp:
991           self.new_os_hvp[os_name] = hvs
992         else:
993           for hv_name, hv_dict in hvs.items():
994             if hv_dict is None:
995               # Delete if it exists
996               self.new_os_hvp[os_name].pop(hv_name, None)
997             elif hv_name not in self.new_os_hvp[os_name]:
998               self.new_os_hvp[os_name][hv_name] = hv_dict
999             else:
1000               self.new_os_hvp[os_name][hv_name].update(hv_dict)
1001
1002     # os parameters
1003     self.new_osp = objects.FillDict(cluster.osparams, {})
1004     if self.op.osparams:
1005       for os_name, osp in self.op.osparams.items():
1006         if os_name not in self.new_osp:
1007           self.new_osp[os_name] = {}
1008
1009         self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
1010                                                  use_none=True)
1011
1012         if not self.new_osp[os_name]:
1013           # we removed all parameters
1014           del self.new_osp[os_name]
1015         else:
1016           # check the parameter validity (remote check)
1017           CheckOSParams(self, False, [self.cfg.GetMasterNode()],
1018                         os_name, self.new_osp[os_name])
1019
1020     # changes to the hypervisor list
1021     if self.op.enabled_hypervisors is not None:
1022       self.hv_list = self.op.enabled_hypervisors
1023       for hv in self.hv_list:
1024         # if the hypervisor doesn't already exist in the cluster
1025         # hvparams, we initialize it to empty, and then (in both
1026         # cases) we make sure to fill the defaults, as we might not
1027         # have a complete defaults list if the hypervisor wasn't
1028         # enabled before
1029         if hv not in new_hvp:
1030           new_hvp[hv] = {}
1031         new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
1032         utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
1033     else:
1034       self.hv_list = cluster.enabled_hypervisors
1035
1036     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1037       # either the enabled list has changed, or the parameters have, validate
1038       for hv_name, hv_params in self.new_hvparams.items():
1039         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1040             (self.op.enabled_hypervisors and
1041              hv_name in self.op.enabled_hypervisors)):
1042           # either this is a new hypervisor, or its parameters have changed
1043           hv_class = hypervisor.GetHypervisorClass(hv_name)
1044           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1045           hv_class.CheckParameterSyntax(hv_params)
1046           CheckHVParams(self, node_uuids, hv_name, hv_params)
1047
1048     self._CheckDiskTemplateConsistency()
1049
1050     if self.op.os_hvp:
1051       # no need to check any newly-enabled hypervisors, since the
1052       # defaults have already been checked in the above code-block
1053       for os_name, os_hvp in self.new_os_hvp.items():
1054         for hv_name, hv_params in os_hvp.items():
1055           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1056           # we need to fill in the new os_hvp on top of the actual hv_p
1057           cluster_defaults = self.new_hvparams.get(hv_name, {})
1058           new_osp = objects.FillDict(cluster_defaults, hv_params)
1059           hv_class = hypervisor.GetHypervisorClass(hv_name)
1060           hv_class.CheckParameterSyntax(new_osp)
1061           CheckHVParams(self, node_uuids, hv_name, new_osp)
1062
1063     if self.op.default_iallocator:
1064       alloc_script = utils.FindFile(self.op.default_iallocator,
1065                                     constants.IALLOCATOR_SEARCH_PATH,
1066                                     os.path.isfile)
1067       if alloc_script is None:
1068         raise errors.OpPrereqError("Invalid default iallocator script '%s'"
1069                                    " specified" % self.op.default_iallocator,
1070                                    errors.ECODE_INVAL)
1071
1072   def _CheckDiskTemplateConsistency(self):
1073     """Check whether the disk templates that are going to be disabled
1074        are still in use by some instances.
1075
1076     """
1077     if self.op.enabled_disk_templates:
1078       cluster = self.cfg.GetClusterInfo()
1079       instances = self.cfg.GetAllInstancesInfo()
1080
1081       disk_templates_to_remove = set(cluster.enabled_disk_templates) \
1082         - set(self.op.enabled_disk_templates)
1083       for instance in instances.itervalues():
1084         if instance.disk_template in disk_templates_to_remove:
1085           raise errors.OpPrereqError("Cannot disable disk template '%s',"
1086                                      " because instance '%s' is using it." %
1087                                      (instance.disk_template, instance.name))
1088
1089   def _SetVgName(self, feedback_fn):
1090     """Determines and sets the new volume group name.
1091
1092     """
1093     if self.op.vg_name is not None:
1094       new_volume = self.op.vg_name
1095       if not new_volume:
1096         new_volume = None
1097       if new_volume != self.cfg.GetVGName():
1098         self.cfg.SetVGName(new_volume)
1099       else:
1100         feedback_fn("Cluster LVM configuration already in desired"
1101                     " state, not changing")
1102
1103   def _SetFileStorageDir(self, feedback_fn):
1104     """Set the file storage directory.
1105
1106     """
1107     if self.op.file_storage_dir is not None:
1108       if self.cluster.file_storage_dir == self.op.file_storage_dir:
1109         feedback_fn("Global file storage dir already set to value '%s'"
1110                     % self.cluster.file_storage_dir)
1111       else:
1112         self.cluster.file_storage_dir = self.op.file_storage_dir
1113
1114   def Exec(self, feedback_fn):
1115     """Change the parameters of the cluster.
1116
1117     """
1118     if self.op.enabled_disk_templates:
1119       self.cluster.enabled_disk_templates = \
1120         list(set(self.op.enabled_disk_templates))
1121
1122     self._SetVgName(feedback_fn)
1123     self._SetFileStorageDir(feedback_fn)
1124
1125     if self.op.drbd_helper is not None:
1126       if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
1127         feedback_fn("Note that you specified a drbd user helper, but did"
1128                     " enabled the drbd disk template.")
1129       new_helper = self.op.drbd_helper
1130       if not new_helper:
1131         new_helper = None
1132       if new_helper != self.cfg.GetDRBDHelper():
1133         self.cfg.SetDRBDHelper(new_helper)
1134       else:
1135         feedback_fn("Cluster DRBD helper already in desired state,"
1136                     " not changing")
1137     if self.op.hvparams:
1138       self.cluster.hvparams = self.new_hvparams
1139     if self.op.os_hvp:
1140       self.cluster.os_hvp = self.new_os_hvp
1141     if self.op.enabled_hypervisors is not None:
1142       self.cluster.hvparams = self.new_hvparams
1143       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1144     if self.op.beparams:
1145       self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1146     if self.op.nicparams:
1147       self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1148     if self.op.ipolicy:
1149       self.cluster.ipolicy = self.new_ipolicy
1150     if self.op.osparams:
1151       self.cluster.osparams = self.new_osp
1152     if self.op.ndparams:
1153       self.cluster.ndparams = self.new_ndparams
1154     if self.op.diskparams:
1155       self.cluster.diskparams = self.new_diskparams
1156     if self.op.hv_state:
1157       self.cluster.hv_state_static = self.new_hv_state
1158     if self.op.disk_state:
1159       self.cluster.disk_state_static = self.new_disk_state
1160
1161     if self.op.candidate_pool_size is not None:
1162       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1163       # we need to update the pool size here, otherwise the save will fail
1164       AdjustCandidatePool(self, [])
1165
1166     if self.op.maintain_node_health is not None:
1167       if self.op.maintain_node_health and not constants.ENABLE_CONFD:
1168         feedback_fn("Note: CONFD was disabled at build time, node health"
1169                     " maintenance is not useful (still enabling it)")
1170       self.cluster.maintain_node_health = self.op.maintain_node_health
1171
1172     if self.op.modify_etc_hosts is not None:
1173       self.cluster.modify_etc_hosts = self.op.modify_etc_hosts
1174
1175     if self.op.prealloc_wipe_disks is not None:
1176       self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
1177
1178     if self.op.add_uids is not None:
1179       uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
1180
1181     if self.op.remove_uids is not None:
1182       uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
1183
1184     if self.op.uid_pool is not None:
1185       self.cluster.uid_pool = self.op.uid_pool
1186
1187     if self.op.default_iallocator is not None:
1188       self.cluster.default_iallocator = self.op.default_iallocator
1189
1190     if self.op.reserved_lvs is not None:
1191       self.cluster.reserved_lvs = self.op.reserved_lvs
1192
1193     if self.op.use_external_mip_script is not None:
1194       self.cluster.use_external_mip_script = self.op.use_external_mip_script
1195
1196     def helper_os(aname, mods, desc):
1197       desc += " OS list"
1198       lst = getattr(self.cluster, aname)
1199       for key, val in mods:
1200         if key == constants.DDM_ADD:
1201           if val in lst:
1202             feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1203           else:
1204             lst.append(val)
1205         elif key == constants.DDM_REMOVE:
1206           if val in lst:
1207             lst.remove(val)
1208           else:
1209             feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1210         else:
1211           raise errors.ProgrammerError("Invalid modification '%s'" % key)
1212
1213     if self.op.hidden_os:
1214       helper_os("hidden_os", self.op.hidden_os, "hidden")
1215
1216     if self.op.blacklisted_os:
1217       helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1218
1219     if self.op.master_netdev:
1220       master_params = self.cfg.GetMasterNetworkParameters()
1221       ems = self.cfg.GetUseExternalMipScript()
1222       feedback_fn("Shutting down master ip on the current netdev (%s)" %
1223                   self.cluster.master_netdev)
1224       result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
1225                                                        master_params, ems)
1226       if not self.op.force:
1227         result.Raise("Could not disable the master ip")
1228       else:
1229         if result.fail_msg:
1230           msg = ("Could not disable the master ip (continuing anyway): %s" %
1231                  result.fail_msg)
1232           feedback_fn(msg)
1233       feedback_fn("Changing master_netdev from %s to %s" %
1234                   (master_params.netdev, self.op.master_netdev))
1235       self.cluster.master_netdev = self.op.master_netdev
1236
1237     if self.op.master_netmask:
1238       master_params = self.cfg.GetMasterNetworkParameters()
1239       feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1240       result = self.rpc.call_node_change_master_netmask(
1241                  master_params.uuid, master_params.netmask,
1242                  self.op.master_netmask, master_params.ip,
1243                  master_params.netdev)
1244       result.Warn("Could not change the master IP netmask", feedback_fn)
1245       self.cluster.master_netmask = self.op.master_netmask
1246
1247     self.cfg.Update(self.cluster, feedback_fn)
1248
1249     if self.op.master_netdev:
1250       master_params = self.cfg.GetMasterNetworkParameters()
1251       feedback_fn("Starting the master ip on the new master netdev (%s)" %
1252                   self.op.master_netdev)
1253       ems = self.cfg.GetUseExternalMipScript()
1254       result = self.rpc.call_node_activate_master_ip(master_params.uuid,
1255                                                      master_params, ems)
1256       result.Warn("Could not re-enable the master ip on the master,"
1257                   " please restart manually", self.LogWarning)
1258
1259
1260 class LUClusterVerify(NoHooksLU):
1261   """Submits all jobs necessary to verify the cluster.
1262
1263   """
1264   REQ_BGL = False
1265
1266   def ExpandNames(self):
1267     self.needed_locks = {}
1268
1269   def Exec(self, feedback_fn):
1270     jobs = []
1271
1272     if self.op.group_name:
1273       groups = [self.op.group_name]
1274       depends_fn = lambda: None
1275     else:
1276       groups = self.cfg.GetNodeGroupList()
1277
1278       # Verify global configuration
1279       jobs.append([
1280         opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1281         ])
1282
1283       # Always depend on global verification
1284       depends_fn = lambda: [(-len(jobs), [])]
1285
1286     jobs.extend(
1287       [opcodes.OpClusterVerifyGroup(group_name=group,
1288                                     ignore_errors=self.op.ignore_errors,
1289                                     depends=depends_fn())]
1290       for group in groups)
1291
1292     # Fix up all parameters
1293     for op in itertools.chain(*jobs): # pylint: disable=W0142
1294       op.debug_simulate_errors = self.op.debug_simulate_errors
1295       op.verbose = self.op.verbose
1296       op.error_codes = self.op.error_codes
1297       try:
1298         op.skip_checks = self.op.skip_checks
1299       except AttributeError:
1300         assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1301
1302     return ResultWithJobs(jobs)
1303
1304
1305 class _VerifyErrors(object):
1306   """Mix-in for cluster/group verify LUs.
1307
1308   It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1309   self.op and self._feedback_fn to be available.)
1310
1311   """
1312
1313   ETYPE_FIELD = "code"
1314   ETYPE_ERROR = "ERROR"
1315   ETYPE_WARNING = "WARNING"
1316
1317   def _Error(self, ecode, item, msg, *args, **kwargs):
1318     """Format an error message.
1319
1320     Based on the opcode's error_codes parameter, either format a
1321     parseable error code, or a simpler error string.
1322
1323     This must be called only from Exec and functions called from Exec.
1324
1325     """
1326     ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1327     itype, etxt, _ = ecode
1328     # If the error code is in the list of ignored errors, demote the error to a
1329     # warning
1330     if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1331       ltype = self.ETYPE_WARNING
1332     # first complete the msg
1333     if args:
1334       msg = msg % args
1335     # then format the whole message
1336     if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1337       msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1338     else:
1339       if item:
1340         item = " " + item
1341       else:
1342         item = ""
1343       msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1344     # and finally report it via the feedback_fn
1345     self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1346     # do not mark the operation as failed for WARN cases only
1347     if ltype == self.ETYPE_ERROR:
1348       self.bad = True
1349
1350   def _ErrorIf(self, cond, *args, **kwargs):
1351     """Log an error message if the passed condition is True.
1352
1353     """
1354     if (bool(cond)
1355         or self.op.debug_simulate_errors): # pylint: disable=E1101
1356       self._Error(*args, **kwargs)
1357
1358
1359 def _VerifyCertificate(filename):
1360   """Verifies a certificate for L{LUClusterVerifyConfig}.
1361
1362   @type filename: string
1363   @param filename: Path to PEM file
1364
1365   """
1366   try:
1367     cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1368                                            utils.ReadFile(filename))
1369   except Exception, err: # pylint: disable=W0703
1370     return (LUClusterVerifyConfig.ETYPE_ERROR,
1371             "Failed to load X509 certificate %s: %s" % (filename, err))
1372
1373   (errcode, msg) = \
1374     utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1375                                 constants.SSL_CERT_EXPIRATION_ERROR)
1376
1377   if msg:
1378     fnamemsg = "While verifying %s: %s" % (filename, msg)
1379   else:
1380     fnamemsg = None
1381
1382   if errcode is None:
1383     return (None, fnamemsg)
1384   elif errcode == utils.CERT_WARNING:
1385     return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1386   elif errcode == utils.CERT_ERROR:
1387     return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1388
1389   raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1390
1391
1392 def _GetAllHypervisorParameters(cluster, instances):
1393   """Compute the set of all hypervisor parameters.
1394
1395   @type cluster: L{objects.Cluster}
1396   @param cluster: the cluster object
1397   @param instances: list of L{objects.Instance}
1398   @param instances: additional instances from which to obtain parameters
1399   @rtype: list of (origin, hypervisor, parameters)
1400   @return: a list with all parameters found, indicating the hypervisor they
1401        apply to, and the origin (can be "cluster", "os X", or "instance Y")
1402
1403   """
1404   hvp_data = []
1405
1406   for hv_name in cluster.enabled_hypervisors:
1407     hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1408
1409   for os_name, os_hvp in cluster.os_hvp.items():
1410     for hv_name, hv_params in os_hvp.items():
1411       if hv_params:
1412         full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1413         hvp_data.append(("os %s" % os_name, hv_name, full_params))
1414
1415   # TODO: collapse identical parameter values in a single one
1416   for instance in instances:
1417     if instance.hvparams:
1418       hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1419                        cluster.FillHV(instance)))
1420
1421   return hvp_data
1422
1423
1424 class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1425   """Verifies the cluster config.
1426
1427   """
1428   REQ_BGL = False
1429
1430   def _VerifyHVP(self, hvp_data):
1431     """Verifies locally the syntax of the hypervisor parameters.
1432
1433     """
1434     for item, hv_name, hv_params in hvp_data:
1435       msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1436              (item, hv_name))
1437       try:
1438         hv_class = hypervisor.GetHypervisorClass(hv_name)
1439         utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1440         hv_class.CheckParameterSyntax(hv_params)
1441       except errors.GenericError, err:
1442         self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1443
1444   def ExpandNames(self):
1445     self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1446     self.share_locks = ShareAll()
1447
1448   def CheckPrereq(self):
1449     """Check prerequisites.
1450
1451     """
1452     # Retrieve all information
1453     self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1454     self.all_node_info = self.cfg.GetAllNodesInfo()
1455     self.all_inst_info = self.cfg.GetAllInstancesInfo()
1456
1457   def Exec(self, feedback_fn):
1458     """Verify integrity of cluster, performing various test on nodes.
1459
1460     """
1461     self.bad = False
1462     self._feedback_fn = feedback_fn
1463
1464     feedback_fn("* Verifying cluster config")
1465
1466     for msg in self.cfg.VerifyConfig():
1467       self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1468
1469     feedback_fn("* Verifying cluster certificate files")
1470
1471     for cert_filename in pathutils.ALL_CERT_FILES:
1472       (errcode, msg) = _VerifyCertificate(cert_filename)
1473       self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1474
1475     self._ErrorIf(not utils.CanRead(constants.LUXID_USER,
1476                                     pathutils.NODED_CERT_FILE),
1477                   constants.CV_ECLUSTERCERT,
1478                   None,
1479                   pathutils.NODED_CERT_FILE + " must be accessible by the " +
1480                     constants.LUXID_USER + " user")
1481
1482     feedback_fn("* Verifying hypervisor parameters")
1483
1484     self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1485                                                 self.all_inst_info.values()))
1486
1487     feedback_fn("* Verifying all nodes belong to an existing group")
1488
1489     # We do this verification here because, should this bogus circumstance
1490     # occur, it would never be caught by VerifyGroup, which only acts on
1491     # nodes/instances reachable from existing node groups.
1492
1493     dangling_nodes = set(node for node in self.all_node_info.values()
1494                          if node.group not in self.all_group_info)
1495
1496     dangling_instances = {}
1497     no_node_instances = []
1498
1499     for inst in self.all_inst_info.values():
1500       if inst.primary_node in [node.uuid for node in dangling_nodes]:
1501         dangling_instances.setdefault(inst.primary_node, []).append(inst)
1502       elif inst.primary_node not in self.all_node_info:
1503         no_node_instances.append(inst)
1504
1505     pretty_dangling = [
1506         "%s (%s)" %
1507         (node.name,
1508          utils.CommaJoin(inst.name for
1509                          inst in dangling_instances.get(node.uuid, [])))
1510         for node in dangling_nodes]
1511
1512     self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1513                   None,
1514                   "the following nodes (and their instances) belong to a non"
1515                   " existing group: %s", utils.CommaJoin(pretty_dangling))
1516
1517     self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1518                   None,
1519                   "the following instances have a non-existing primary-node:"
1520                   " %s", utils.CommaJoin(inst.name for
1521                                          inst in no_node_instances))
1522
1523     return not self.bad
1524
1525
1526 class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1527   """Verifies the status of a node group.
1528
1529   """
1530   HPATH = "cluster-verify"
1531   HTYPE = constants.HTYPE_CLUSTER
1532   REQ_BGL = False
1533
1534   _HOOKS_INDENT_RE = re.compile("^", re.M)
1535
1536   class NodeImage(object):
1537     """A class representing the logical and physical status of a node.
1538
1539     @type uuid: string
1540     @ivar uuid: the node UUID to which this object refers
1541     @ivar volumes: a structure as returned from
1542         L{ganeti.backend.GetVolumeList} (runtime)
1543     @ivar instances: a list of running instances (runtime)
1544     @ivar pinst: list of configured primary instances (config)
1545     @ivar sinst: list of configured secondary instances (config)
1546     @ivar sbp: dictionary of {primary-node: list of instances} for all
1547         instances for which this node is secondary (config)
1548     @ivar mfree: free memory, as reported by hypervisor (runtime)
1549     @ivar dfree: free disk, as reported by the node (runtime)
1550     @ivar offline: the offline status (config)
1551     @type rpc_fail: boolean
1552     @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1553         not whether the individual keys were correct) (runtime)
1554     @type lvm_fail: boolean
1555     @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1556     @type hyp_fail: boolean
1557     @ivar hyp_fail: whether the RPC call didn't return the instance list
1558     @type ghost: boolean
1559     @ivar ghost: whether this is a known node or not (config)
1560     @type os_fail: boolean
1561     @ivar os_fail: whether the RPC call didn't return valid OS data
1562     @type oslist: list
1563     @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1564     @type vm_capable: boolean
1565     @ivar vm_capable: whether the node can host instances
1566     @type pv_min: float
1567     @ivar pv_min: size in MiB of the smallest PVs
1568     @type pv_max: float
1569     @ivar pv_max: size in MiB of the biggest PVs
1570
1571     """
1572     def __init__(self, offline=False, uuid=None, vm_capable=True):
1573       self.uuid = uuid
1574       self.volumes = {}
1575       self.instances = []
1576       self.pinst = []
1577       self.sinst = []
1578       self.sbp = {}
1579       self.mfree = 0
1580       self.dfree = 0
1581       self.offline = offline
1582       self.vm_capable = vm_capable
1583       self.rpc_fail = False
1584       self.lvm_fail = False
1585       self.hyp_fail = False
1586       self.ghost = False
1587       self.os_fail = False
1588       self.oslist = {}
1589       self.pv_min = None
1590       self.pv_max = None
1591
1592   def ExpandNames(self):
1593     # This raises errors.OpPrereqError on its own:
1594     self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1595
1596     # Get instances in node group; this is unsafe and needs verification later
1597     inst_uuids = \
1598       self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1599
1600     self.needed_locks = {
1601       locking.LEVEL_INSTANCE: self.cfg.GetInstanceNames(inst_uuids),
1602       locking.LEVEL_NODEGROUP: [self.group_uuid],
1603       locking.LEVEL_NODE: [],
1604
1605       # This opcode is run by watcher every five minutes and acquires all nodes
1606       # for a group. It doesn't run for a long time, so it's better to acquire
1607       # the node allocation lock as well.
1608       locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1609       }
1610
1611     self.share_locks = ShareAll()
1612
1613   def DeclareLocks(self, level):
1614     if level == locking.LEVEL_NODE:
1615       # Get members of node group; this is unsafe and needs verification later
1616       nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1617
1618       # In Exec(), we warn about mirrored instances that have primary and
1619       # secondary living in separate node groups. To fully verify that
1620       # volumes for these instances are healthy, we will need to do an
1621       # extra call to their secondaries. We ensure here those nodes will
1622       # be locked.
1623       for inst_name in self.owned_locks(locking.LEVEL_INSTANCE):
1624         # Important: access only the instances whose lock is owned
1625         instance = self.cfg.GetInstanceInfoByName(inst_name)
1626         if instance.disk_template in constants.DTS_INT_MIRROR:
1627           nodes.update(instance.secondary_nodes)
1628
1629       self.needed_locks[locking.LEVEL_NODE] = nodes
1630
1631   def CheckPrereq(self):
1632     assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1633     self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1634
1635     group_node_uuids = set(self.group_info.members)
1636     group_inst_uuids = \
1637       self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1638
1639     unlocked_node_uuids = \
1640         group_node_uuids.difference(self.owned_locks(locking.LEVEL_NODE))
1641
1642     unlocked_inst_uuids = \
1643         group_inst_uuids.difference(
1644           [self.cfg.GetInstanceInfoByName(name).uuid
1645            for name in self.owned_locks(locking.LEVEL_INSTANCE)])
1646
1647     if unlocked_node_uuids:
1648       raise errors.OpPrereqError(
1649         "Missing lock for nodes: %s" %
1650         utils.CommaJoin(self.cfg.GetNodeNames(unlocked_node_uuids)),
1651         errors.ECODE_STATE)
1652
1653     if unlocked_inst_uuids:
1654       raise errors.OpPrereqError(
1655         "Missing lock for instances: %s" %
1656         utils.CommaJoin(self.cfg.GetInstanceNames(unlocked_inst_uuids)),
1657         errors.ECODE_STATE)
1658
1659     self.all_node_info = self.cfg.GetAllNodesInfo()
1660     self.all_inst_info = self.cfg.GetAllInstancesInfo()
1661
1662     self.my_node_uuids = group_node_uuids
1663     self.my_node_info = dict((node_uuid, self.all_node_info[node_uuid])
1664                              for node_uuid in group_node_uuids)
1665
1666     self.my_inst_uuids = group_inst_uuids
1667     self.my_inst_info = dict((inst_uuid, self.all_inst_info[inst_uuid])
1668                              for inst_uuid in group_inst_uuids)
1669
1670     # We detect here the nodes that will need the extra RPC calls for verifying
1671     # split LV volumes; they should be locked.
1672     extra_lv_nodes = set()
1673
1674     for inst in self.my_inst_info.values():
1675       if inst.disk_template in constants.DTS_INT_MIRROR:
1676         for nuuid in inst.all_nodes:
1677           if self.all_node_info[nuuid].group != self.group_uuid:
1678             extra_lv_nodes.add(nuuid)
1679
1680     unlocked_lv_nodes = \
1681         extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1682
1683     if unlocked_lv_nodes:
1684       raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1685                                  utils.CommaJoin(unlocked_lv_nodes),
1686                                  errors.ECODE_STATE)
1687     self.extra_lv_nodes = list(extra_lv_nodes)
1688
1689   def _VerifyNode(self, ninfo, nresult):
1690     """Perform some basic validation on data returned from a node.
1691
1692       - check the result data structure is well formed and has all the
1693         mandatory fields
1694       - check ganeti version
1695
1696     @type ninfo: L{objects.Node}
1697     @param ninfo: the node to check
1698     @param nresult: the results from the node
1699     @rtype: boolean
1700     @return: whether overall this call was successful (and we can expect
1701          reasonable values in the respose)
1702
1703     """
1704     # main result, nresult should be a non-empty dict
1705     test = not nresult or not isinstance(nresult, dict)
1706     self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1707                   "unable to verify node: no data returned")
1708     if test:
1709       return False
1710
1711     # compares ganeti version
1712     local_version = constants.PROTOCOL_VERSION
1713     remote_version = nresult.get("version", None)
1714     test = not (remote_version and
1715                 isinstance(remote_version, (list, tuple)) and
1716                 len(remote_version) == 2)
1717     self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1718                   "connection to node returned invalid data")
1719     if test:
1720       return False
1721
1722     test = local_version != remote_version[0]
1723     self._ErrorIf(test, constants.CV_ENODEVERSION, ninfo.name,
1724                   "incompatible protocol versions: master %s,"
1725                   " node %s", local_version, remote_version[0])
1726     if test:
1727       return False
1728
1729     # node seems compatible, we can actually try to look into its results
1730
1731     # full package version
1732     self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1733                   constants.CV_ENODEVERSION, ninfo.name,
1734                   "software version mismatch: master %s, node %s",
1735                   constants.RELEASE_VERSION, remote_version[1],
1736                   code=self.ETYPE_WARNING)
1737
1738     hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1739     if ninfo.vm_capable and isinstance(hyp_result, dict):
1740       for hv_name, hv_result in hyp_result.iteritems():
1741         test = hv_result is not None
1742         self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
1743                       "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1744
1745     hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1746     if ninfo.vm_capable and isinstance(hvp_result, list):
1747       for item, hv_name, hv_result in hvp_result:
1748         self._ErrorIf(True, constants.CV_ENODEHV, ninfo.name,
1749                       "hypervisor %s parameter verify failure (source %s): %s",
1750                       hv_name, item, hv_result)
1751
1752     test = nresult.get(constants.NV_NODESETUP,
1753                        ["Missing NODESETUP results"])
1754     self._ErrorIf(test, constants.CV_ENODESETUP, ninfo.name,
1755                   "node setup error: %s", "; ".join(test))
1756
1757     return True
1758
1759   def _VerifyNodeTime(self, ninfo, nresult,
1760                       nvinfo_starttime, nvinfo_endtime):
1761     """Check the node time.
1762
1763     @type ninfo: L{objects.Node}
1764     @param ninfo: the node to check
1765     @param nresult: the remote results for the node
1766     @param nvinfo_starttime: the start time of the RPC call
1767     @param nvinfo_endtime: the end time of the RPC call
1768
1769     """
1770     ntime = nresult.get(constants.NV_TIME, None)
1771     try:
1772       ntime_merged = utils.MergeTime(ntime)
1773     except (ValueError, TypeError):
1774       self._ErrorIf(True, constants.CV_ENODETIME, ninfo.name,
1775                     "Node returned invalid time")
1776       return
1777
1778     if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1779       ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1780     elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1781       ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1782     else:
1783       ntime_diff = None
1784
1785     self._ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, ninfo.name,
1786                   "Node time diverges by at least %s from master node time",
1787                   ntime_diff)
1788
1789   def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1790     """Check the node LVM results and update info for cross-node checks.
1791
1792     @type ninfo: L{objects.Node}
1793     @param ninfo: the node to check
1794     @param nresult: the remote results for the node
1795     @param vg_name: the configured VG name
1796     @type nimg: L{NodeImage}
1797     @param nimg: node image
1798
1799     """
1800     if vg_name is None:
1801       return
1802
1803     # checks vg existence and size > 20G
1804     vglist = nresult.get(constants.NV_VGLIST, None)
1805     test = not vglist
1806     self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
1807                   "unable to check volume groups")
1808     if not test:
1809       vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1810                                             constants.MIN_VG_SIZE)
1811       self._ErrorIf(vgstatus, constants.CV_ENODELVM, ninfo.name, vgstatus)
1812
1813     # Check PVs
1814     (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1815     for em in errmsgs:
1816       self._Error(constants.CV_ENODELVM, ninfo.name, em)
1817     if pvminmax is not None:
1818       (nimg.pv_min, nimg.pv_max) = pvminmax
1819
1820   def _VerifyGroupDRBDVersion(self, node_verify_infos):
1821     """Check cross-node DRBD version consistency.
1822
1823     @type node_verify_infos: dict
1824     @param node_verify_infos: infos about nodes as returned from the
1825       node_verify call.
1826
1827     """
1828     node_versions = {}
1829     for node_uuid, ndata in node_verify_infos.items():
1830       nresult = ndata.payload
1831       version = nresult.get(constants.NV_DRBDVERSION, "Missing DRBD version")
1832       node_versions[node_uuid] = version
1833
1834     if len(set(node_versions.values())) > 1:
1835       for node_uuid, version in sorted(node_versions.items()):
1836         msg = "DRBD version mismatch: %s" % version
1837         self._Error(constants.CV_ENODEDRBDHELPER, node_uuid, msg,
1838                     code=self.ETYPE_WARNING)
1839
1840   def _VerifyGroupLVM(self, node_image, vg_name):
1841     """Check cross-node consistency in LVM.
1842
1843     @type node_image: dict
1844     @param node_image: info about nodes, mapping from node to names to
1845       L{NodeImage} objects
1846     @param vg_name: the configured VG name
1847
1848     """
1849     if vg_name is None:
1850       return
1851
1852     # Only exclusive storage needs this kind of checks
1853     if not self._exclusive_storage:
1854       return
1855
1856     # exclusive_storage wants all PVs to have the same size (approximately),
1857     # if the smallest and the biggest ones are okay, everything is fine.
1858     # pv_min is None iff pv_max is None
1859     vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1860     if not vals:
1861       return
1862     (pvmin, minnode_uuid) = min((ni.pv_min, ni.uuid) for ni in vals)
1863     (pvmax, maxnode_uuid) = max((ni.pv_max, ni.uuid) for ni in vals)
1864     bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1865     self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1866                   "PV sizes differ too much in the group; smallest (%s MB) is"
1867                   " on %s, biggest (%s MB) is on %s",
1868                   pvmin, self.cfg.GetNodeName(minnode_uuid),
1869                   pvmax, self.cfg.GetNodeName(maxnode_uuid))
1870
1871   def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1872     """Check the node bridges.
1873
1874     @type ninfo: L{objects.Node}
1875     @param ninfo: the node to check
1876     @param nresult: the remote results for the node
1877     @param bridges: the expected list of bridges
1878
1879     """
1880     if not bridges:
1881       return
1882
1883     missing = nresult.get(constants.NV_BRIDGES, None)
1884     test = not isinstance(missing, list)
1885     self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1886                   "did not return valid bridge information")
1887     if not test:
1888       self._ErrorIf(bool(missing), constants.CV_ENODENET, ninfo.name,
1889                     "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1890
1891   def _VerifyNodeUserScripts(self, ninfo, nresult):
1892     """Check the results of user scripts presence and executability on the node
1893
1894     @type ninfo: L{objects.Node}
1895     @param ninfo: the node to check
1896     @param nresult: the remote results for the node
1897
1898     """
1899     test = not constants.NV_USERSCRIPTS in nresult
1900     self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1901                   "did not return user scripts information")
1902
1903     broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
1904     if not test:
1905       self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1906                     "user scripts not present or not executable: %s" %
1907                     utils.CommaJoin(sorted(broken_scripts)))
1908
1909   def _VerifyNodeNetwork(self, ninfo, nresult):
1910     """Check the node network connectivity results.
1911
1912     @type ninfo: L{objects.Node}
1913     @param ninfo: the node to check
1914     @param nresult: the remote results for the node
1915
1916     """
1917     test = constants.NV_NODELIST not in nresult
1918     self._ErrorIf(test, constants.CV_ENODESSH, ninfo.name,
1919                   "node hasn't returned node ssh connectivity data")
1920     if not test:
1921       if nresult[constants.NV_NODELIST]:
1922         for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1923           self._ErrorIf(True, constants.CV_ENODESSH, ninfo.name,
1924                         "ssh communication with node '%s': %s", a_node, a_msg)
1925
1926     test = constants.NV_NODENETTEST not in nresult
1927     self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1928                   "node hasn't returned node tcp connectivity data")
1929     if not test:
1930       if nresult[constants.NV_NODENETTEST]:
1931         nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1932         for anode in nlist:
1933           self._ErrorIf(True, constants.CV_ENODENET, ninfo.name,
1934                         "tcp communication with node '%s': %s",
1935                         anode, nresult[constants.NV_NODENETTEST][anode])
1936
1937     test = constants.NV_MASTERIP not in nresult
1938     self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1939                   "node hasn't returned node master IP reachability data")
1940     if not test:
1941       if not nresult[constants.NV_MASTERIP]:
1942         if ninfo.uuid == self.master_node:
1943           msg = "the master node cannot reach the master IP (not configured?)"
1944         else:
1945           msg = "cannot reach the master IP"
1946         self._ErrorIf(True, constants.CV_ENODENET, ninfo.name, msg)
1947
1948   def _VerifyInstance(self, instance, node_image, diskstatus):
1949     """Verify an instance.
1950
1951     This function checks to see if the required block devices are
1952     available on the instance's node, and that the nodes are in the correct
1953     state.
1954
1955     """
1956     pnode_uuid = instance.primary_node
1957     pnode_img = node_image[pnode_uuid]
1958     groupinfo = self.cfg.GetAllNodeGroupsInfo()
1959
1960     node_vol_should = {}
1961     instance.MapLVsByNode(node_vol_should)
1962
1963     cluster = self.cfg.GetClusterInfo()
1964     ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
1965                                                             self.group_info)
1966     err = ComputeIPolicyInstanceViolation(ipolicy, instance, self.cfg)
1967     self._ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance.name,
1968                   utils.CommaJoin(err), code=self.ETYPE_WARNING)
1969
1970     for node_uuid in node_vol_should:
1971       n_img = node_image[node_uuid]
1972       if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1973         # ignore missing volumes on offline or broken nodes
1974         continue
1975       for volume in node_vol_should[node_uuid]:
1976         test = volume not in n_img.volumes
1977         self._ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance.name,
1978                       "volume %s missing on node %s", volume,
1979                       self.cfg.GetNodeName(node_uuid))
1980
1981     if instance.admin_state == constants.ADMINST_UP:
1982       test = instance.uuid not in pnode_img.instances and not pnode_img.offline
1983       self._ErrorIf(test, constants.CV_EINSTANCEDOWN, instance.name,
1984                     "instance not running on its primary node %s",
1985                      self.cfg.GetNodeName(pnode_uuid))
1986       self._ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE,
1987                     instance.name, "instance is marked as running and lives on"
1988                     " offline node %s", self.cfg.GetNodeName(pnode_uuid))
1989
1990     diskdata = [(nname, success, status, idx)
1991                 for (nname, disks) in diskstatus.items()
1992                 for idx, (success, status) in enumerate(disks)]
1993
1994     for nname, success, bdev_status, idx in diskdata:
1995       # the 'ghost node' construction in Exec() ensures that we have a
1996       # node here
1997       snode = node_image[nname]
1998       bad_snode = snode.ghost or snode.offline
1999       self._ErrorIf(instance.disks_active and
2000                     not success and not bad_snode,
2001                     constants.CV_EINSTANCEFAULTYDISK, instance.name,
2002                     "couldn't retrieve status for disk/%s on %s: %s",
2003                     idx, self.cfg.GetNodeName(nname), bdev_status)
2004
2005       if instance.disks_active and success and \
2006          (bdev_status.is_degraded or
2007           bdev_status.ldisk_status != constants.LDS_OKAY):
2008         msg = "disk/%s on %s" % (idx, self.cfg.GetNodeName(nname))
2009         if bdev_status.is_degraded:
2010           msg += " is degraded"
2011         if bdev_status.ldisk_status != constants.LDS_OKAY:
2012           msg += "; state is '%s'" % \
2013                  constants.LDS_NAMES[bdev_status.ldisk_status]
2014
2015         self._Error(constants.CV_EINSTANCEFAULTYDISK, instance.name, msg)
2016
2017     self._ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2018                   constants.CV_ENODERPC, self.cfg.GetNodeName(pnode_uuid),
2019                   "instance %s, connection to primary node failed",
2020                   instance.name)
2021
2022     self._ErrorIf(len(instance.secondary_nodes) > 1,
2023                   constants.CV_EINSTANCELAYOUT, instance.name,
2024                   "instance has multiple secondary nodes: %s",
2025                   utils.CommaJoin(instance.secondary_nodes),
2026                   code=self.ETYPE_WARNING)
2027
2028     es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, instance.all_nodes)
2029     if any(es_flags.values()):
2030       if instance.disk_template not in constants.DTS_EXCL_STORAGE:
2031         # Disk template not compatible with exclusive_storage: no instance
2032         # node should have the flag set
2033         es_nodes = [n
2034                     for (n, es) in es_flags.items()
2035                     if es]
2036         self._Error(constants.CV_EINSTANCEUNSUITABLENODE, instance.name,
2037                     "instance has template %s, which is not supported on nodes"
2038                     " that have exclusive storage set: %s",
2039                     instance.disk_template,
2040                     utils.CommaJoin(self.cfg.GetNodeNames(es_nodes)))
2041       for (idx, disk) in enumerate(instance.disks):
2042         self._ErrorIf(disk.spindles is None,
2043                       constants.CV_EINSTANCEMISSINGCFGPARAMETER, instance.name,
2044                       "number of spindles not configured for disk %s while"
2045                       " exclusive storage is enabled, try running"
2046                       " gnt-cluster repair-disk-sizes", idx)
2047
2048     if instance.disk_template in constants.DTS_INT_MIRROR:
2049       instance_nodes = utils.NiceSort(instance.all_nodes)
2050       instance_groups = {}
2051
2052       for node_uuid in instance_nodes:
2053         instance_groups.setdefault(self.all_node_info[node_uuid].group,
2054                                    []).append(node_uuid)
2055
2056       pretty_list = [
2057         "%s (group %s)" % (utils.CommaJoin(self.cfg.GetNodeNames(nodes)),
2058                            groupinfo[group].name)
2059         # Sort so that we always list the primary node first.
2060         for group, nodes in sorted(instance_groups.items(),
2061                                    key=lambda (_, nodes): pnode_uuid in nodes,
2062                                    reverse=True)]
2063
2064       self._ErrorIf(len(instance_groups) > 1,
2065                     constants.CV_EINSTANCESPLITGROUPS,
2066                     instance.name, "instance has primary and secondary nodes in"
2067                     " different groups: %s", utils.CommaJoin(pretty_list),
2068                     code=self.ETYPE_WARNING)
2069
2070     inst_nodes_offline = []
2071     for snode in instance.secondary_nodes:
2072       s_img = node_image[snode]
2073       self._ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
2074                     self.cfg.GetNodeName(snode),
2075                     "instance %s, connection to secondary node failed",
2076                     instance.name)
2077
2078       if s_img.offline:
2079         inst_nodes_offline.append(snode)
2080
2081     # warn that the instance lives on offline nodes
2082     self._ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE,
2083                   instance.name, "instance has offline secondary node(s) %s",
2084                   utils.CommaJoin(self.cfg.GetNodeNames(inst_nodes_offline)))
2085     # ... or ghost/non-vm_capable nodes
2086     for node_uuid in instance.all_nodes:
2087       self._ErrorIf(node_image[node_uuid].ghost, constants.CV_EINSTANCEBADNODE,
2088                     instance.name, "instance lives on ghost node %s",
2089                     self.cfg.GetNodeName(node_uuid))
2090       self._ErrorIf(not node_image[node_uuid].vm_capable,
2091                     constants.CV_EINSTANCEBADNODE, instance.name,
2092                     "instance lives on non-vm_capable node %s",
2093                     self.cfg.GetNodeName(node_uuid))
2094
2095   def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2096     """Verify if there are any unknown volumes in the cluster.
2097
2098     The .os, .swap and backup volumes are ignored. All other volumes are
2099     reported as unknown.
2100
2101     @type reserved: L{ganeti.utils.FieldSet}
2102     @param reserved: a FieldSet of reserved volume names
2103
2104     """
2105     for node_uuid, n_img in node_image.items():
2106       if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
2107           self.all_node_info[node_uuid].group != self.group_uuid):
2108         # skip non-healthy nodes
2109         continue
2110       for volume in n_img.volumes:
2111         test = ((node_uuid not in node_vol_should or
2112                 volume not in node_vol_should[node_uuid]) and
2113                 not reserved.Matches(volume))
2114         self._ErrorIf(test, constants.CV_ENODEORPHANLV,
2115                       self.cfg.GetNodeName(node_uuid),
2116                       "volume %s is unknown", volume)
2117
2118   def _VerifyNPlusOneMemory(self, node_image, all_insts):
2119     """Verify N+1 Memory Resilience.
2120
2121     Check that if one single node dies we can still start all the
2122     instances it was primary for.
2123
2124     """
2125     cluster_info = self.cfg.GetClusterInfo()
2126     for node_uuid, n_img in node_image.items():
2127       # This code checks that every node which is now listed as
2128       # secondary has enough memory to host all instances it is
2129       # supposed to should a single other node in the cluster fail.
2130       # FIXME: not ready for failover to an arbitrary node
2131       # FIXME: does not support file-backed instances
2132       # WARNING: we currently take into account down instances as well
2133       # as up ones, considering that even if they're down someone
2134       # might want to start them even in the event of a node failure.
2135       if n_img.offline or \
2136          self.all_node_info[node_uuid].group != self.group_uuid:
2137         # we're skipping nodes marked offline and nodes in other groups from
2138         # the N+1 warning, since most likely we don't have good memory
2139         # infromation from them; we already list instances living on such
2140         # nodes, and that's enough warning
2141         continue
2142       #TODO(dynmem): also consider ballooning out other instances
2143       for prinode, inst_uuids in n_img.sbp.items():
2144         needed_mem = 0
2145         for inst_uuid in inst_uuids:
2146           bep = cluster_info.FillBE(all_insts[inst_uuid])
2147           if bep[constants.BE_AUTO_BALANCE]:
2148             needed_mem += bep[constants.BE_MINMEM]
2149         test = n_img.mfree < needed_mem
2150         self._ErrorIf(test, constants.CV_ENODEN1,
2151                       self.cfg.GetNodeName(node_uuid),
2152                       "not enough memory to accomodate instance failovers"
2153                       " should node %s fail (%dMiB needed, %dMiB available)",
2154                       self.cfg.GetNodeName(prinode), needed_mem, n_img.mfree)
2155
2156   def _VerifyFiles(self, nodes, master_node_uuid, all_nvinfo,
2157                    (files_all, files_opt, files_mc, files_vm)):
2158     """Verifies file checksums collected from all nodes.
2159
2160     @param nodes: List of L{objects.Node} objects
2161     @param master_node_uuid: UUID of master node
2162     @param all_nvinfo: RPC results
2163
2164     """
2165     # Define functions determining which nodes to consider for a file
2166     files2nodefn = [
2167       (files_all, None),
2168       (files_mc, lambda node: (node.master_candidate or
2169                                node.uuid == master_node_uuid)),
2170       (files_vm, lambda node: node.vm_capable),
2171       ]
2172
2173     # Build mapping from filename to list of nodes which should have the file
2174     nodefiles = {}
2175     for (files, fn) in files2nodefn:
2176       if fn is None:
2177         filenodes = nodes
2178       else:
2179         filenodes = filter(fn, nodes)
2180       nodefiles.update((filename,
2181                         frozenset(map(operator.attrgetter("uuid"), filenodes)))
2182                        for filename in files)
2183
2184     assert set(nodefiles) == (files_all | files_mc | files_vm)
2185
2186     fileinfo = dict((filename, {}) for filename in nodefiles)
2187     ignore_nodes = set()
2188
2189     for node in nodes:
2190       if node.offline:
2191         ignore_nodes.add(node.uuid)
2192         continue
2193
2194       nresult = all_nvinfo[node.uuid]
2195
2196       if nresult.fail_msg or not nresult.payload:
2197         node_files = None
2198       else:
2199         fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
2200         node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2201                           for (key, value) in fingerprints.items())
2202         del fingerprints
2203
2204       test = not (node_files and isinstance(node_files, dict))
2205       self._ErrorIf(test, constants.CV_ENODEFILECHECK, node.name,
2206                     "Node did not return file checksum data")
2207       if test:
2208         ignore_nodes.add(node.uuid)
2209         continue
2210
2211       # Build per-checksum mapping from filename to nodes having it
2212       for (filename, checksum) in node_files.items():
2213         assert filename in nodefiles
2214         fileinfo[filename].setdefault(checksum, set()).add(node.uuid)
2215
2216     for (filename, checksums) in fileinfo.items():
2217       assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2218
2219       # Nodes having the file
2220       with_file = frozenset(node_uuid
2221                             for node_uuids in fileinfo[filename].values()
2222                             for node_uuid in node_uuids) - ignore_nodes
2223
2224       expected_nodes = nodefiles[filename] - ignore_nodes
2225
2226       # Nodes missing file
2227       missing_file = expected_nodes - with_file
2228
2229       if filename in files_opt:
2230         # All or no nodes
2231         self._ErrorIf(missing_file and missing_file != expected_nodes,
2232                       constants.CV_ECLUSTERFILECHECK, None,
2233                       "File %s is optional, but it must exist on all or no"
2234                       " nodes (not found on %s)",
2235                       filename,
2236                       utils.CommaJoin(
2237                         utils.NiceSort(
2238                           map(self.cfg.GetNodeName, missing_file))))
2239       else:
2240         self._ErrorIf(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2241                       "File %s is missing from node(s) %s", filename,
2242                       utils.CommaJoin(
2243                         utils.NiceSort(
2244                           map(self.cfg.GetNodeName, missing_file))))
2245
2246         # Warn if a node has a file it shouldn't
2247         unexpected = with_file - expected_nodes
2248         self._ErrorIf(unexpected,
2249                       constants.CV_ECLUSTERFILECHECK, None,
2250                       "File %s should not exist on node(s) %s",
2251                       filename, utils.CommaJoin(
2252                         utils.NiceSort(map(self.cfg.GetNodeName, unexpected))))
2253
2254       # See if there are multiple versions of the file
2255       test = len(checksums) > 1
2256       if test:
2257         variants = ["variant %s on %s" %
2258                     (idx + 1,
2259                      utils.CommaJoin(utils.NiceSort(
2260                        map(self.cfg.GetNodeName, node_uuids))))
2261                     for (idx, (checksum, node_uuids)) in
2262                       enumerate(sorted(checksums.items()))]
2263       else:
2264         variants = []
2265
2266       self._ErrorIf(test, constants.CV_ECLUSTERFILECHECK, None,
2267                     "File %s found with %s different checksums (%s)",
2268                     filename, len(checksums), "; ".join(variants))
2269
2270   def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2271                       drbd_map):
2272     """Verifies and the node DRBD status.
2273
2274     @type ninfo: L{objects.Node}
2275     @param ninfo: the node to check
2276     @param nresult: the remote results for the node
2277     @param instanceinfo: the dict of instances
2278     @param drbd_helper: the configured DRBD usermode helper
2279     @param drbd_map: the DRBD map as returned by
2280         L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2281
2282     """
2283     if drbd_helper:
2284       helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2285       test = (helper_result is None)
2286       self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2287                     "no drbd usermode helper returned")
2288       if helper_result:
2289         status, payload = helper_result
2290         test = not status
2291         self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2292                       "drbd usermode helper check unsuccessful: %s", payload)
2293         test = status and (payload != drbd_helper)
2294         self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2295                       "wrong drbd usermode helper: %s", payload)
2296
2297     # compute the DRBD minors
2298     node_drbd = {}
2299     for minor, inst_uuid in drbd_map[ninfo.uuid].items():
2300       test = inst_uuid not in instanceinfo
2301       self._ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2302                     "ghost instance '%s' in temporary DRBD map", inst_uuid)
2303         # ghost instance should not be running, but otherwise we
2304         # don't give double warnings (both ghost instance and
2305         # unallocated minor in use)
2306       if test:
2307         node_drbd[minor] = (inst_uuid, False)
2308       else:
2309         instance = instanceinfo[inst_uuid]
2310         node_drbd[minor] = (inst_uuid, instance.disks_active)
2311
2312     # and now check them
2313     used_minors = nresult.get(constants.NV_DRBDLIST, [])
2314     test = not isinstance(used_minors, (tuple, list))
2315     self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2316                   "cannot parse drbd status file: %s", str(used_minors))
2317     if test:
2318       # we cannot check drbd status
2319       return
2320
2321     for minor, (inst_uuid, must_exist) in node_drbd.items():
2322       test = minor not in used_minors and must_exist
2323       self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2324                     "drbd minor %d of instance %s is not active", minor,
2325                     self.cfg.GetInstanceName(inst_uuid))
2326     for minor in used_minors:
2327       test = minor not in node_drbd
2328       self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2329                     "unallocated drbd minor %d is in use", minor)
2330
2331   def _UpdateNodeOS(self, ninfo, nresult, nimg):
2332     """Builds the node OS structures.
2333
2334     @type ninfo: L{objects.Node}
2335     @param ninfo: the node to check
2336     @param nresult: the remote results for the node
2337     @param nimg: the node image object
2338
2339     """
2340     remote_os = nresult.get(constants.NV_OSLIST, None)
2341     test = (not isinstance(remote_os, list) or
2342             not compat.all(isinstance(v, list) and len(v) == 7
2343                            for v in remote_os))
2344
2345     self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2346                   "node hasn't returned valid OS data")
2347
2348     nimg.os_fail = test
2349
2350     if test:
2351       return
2352
2353     os_dict = {}
2354
2355     for (name, os_path, status, diagnose,
2356          variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2357
2358       if name not in os_dict:
2359         os_dict[name] = []
2360
2361       # parameters is a list of lists instead of list of tuples due to
2362       # JSON lacking a real tuple type, fix it:
2363       parameters = [tuple(v) for v in parameters]
2364       os_dict[name].append((os_path, status, diagnose,
2365                             set(variants), set(parameters), set(api_ver)))
2366
2367     nimg.oslist = os_dict
2368
2369   def _VerifyNodeOS(self, ninfo, nimg, base):
2370     """Verifies the node OS list.
2371
2372     @type ninfo: L{objects.Node}
2373     @param ninfo: the node to check
2374     @param nimg: the node image object
2375     @param base: the 'template' node we match against (e.g. from the master)
2376
2377     """
2378     assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2379
2380     beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2381     for os_name, os_data in nimg.oslist.items():
2382       assert os_data, "Empty OS status for OS %s?!" % os_name
2383       f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2384       self._ErrorIf(not f_status, constants.CV_ENODEOS, ninfo.name,
2385                     "Invalid OS %s (located at %s): %s",
2386                     os_name, f_path, f_diag)
2387       self._ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, ninfo.name,
2388                     "OS '%s' has multiple entries"
2389                     " (first one shadows the rest): %s",
2390                     os_name, utils.CommaJoin([v[0] for v in os_data]))
2391       # comparisons with the 'base' image
2392       test = os_name not in base.oslist
2393       self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2394                     "Extra OS %s not present on reference node (%s)",
2395                     os_name, self.cfg.GetNodeName(base.uuid))
2396       if test:
2397         continue
2398       assert base.oslist[os_name], "Base node has empty OS status?"
2399       _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2400       if not b_status:
2401         # base OS is invalid, skipping
2402         continue
2403       for kind, a, b in [("API version", f_api, b_api),
2404                          ("variants list", f_var, b_var),
2405                          ("parameters", beautify_params(f_param),
2406                           beautify_params(b_param))]:
2407         self._ErrorIf(a != b, constants.CV_ENODEOS, ninfo.name,
2408                       "OS %s for %s differs from reference node %s:"
2409                       " [%s] vs. [%s]", kind, os_name,
2410                       self.cfg.GetNodeName(base.uuid),
2411                       utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2412
2413     # check any missing OSes
2414     missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2415     self._ErrorIf(missing, constants.CV_ENODEOS, ninfo.name,
2416                   "OSes present on reference node %s"
2417                   " but missing on this node: %s",
2418                   self.cfg.GetNodeName(base.uuid), utils.CommaJoin(missing))
2419
2420   def _VerifyAcceptedFileStoragePaths(self, ninfo, nresult, is_master):
2421     """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2422
2423     @type ninfo: L{objects.Node}
2424     @param ninfo: the node to check
2425     @param nresult: the remote results for the node
2426     @type is_master: bool
2427     @param is_master: Whether node is the master node
2428
2429     """
2430     cluster = self.cfg.GetClusterInfo()
2431     if (is_master and
2432         (cluster.IsFileStorageEnabled() or
2433          cluster.IsSharedFileStorageEnabled())):
2434       try:
2435         fspaths = nresult[constants.NV_ACCEPTED_STORAGE_PATHS]
2436       except KeyError:
2437         # This should never happen
2438         self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2439                       "Node did not return forbidden file storage paths")
2440       else:
2441         self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2442                       "Found forbidden file storage paths: %s",
2443                       utils.CommaJoin(fspaths))
2444     else:
2445       self._ErrorIf(constants.NV_ACCEPTED_STORAGE_PATHS in nresult,
2446                     constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2447                     "Node should not have returned forbidden file storage"
2448                     " paths")
2449
2450   def _VerifyStoragePaths(self, ninfo, nresult, file_disk_template,
2451                           verify_key, error_key):
2452     """Verifies (file) storage paths.
2453
2454     @type ninfo: L{objects.Node}
2455     @param ninfo: the node to check
2456     @param nresult: the remote results for the node
2457     @type file_disk_template: string
2458     @param file_disk_template: file-based disk template, whose directory
2459         is supposed to be verified
2460     @type verify_key: string
2461     @param verify_key: key for the verification map of this file
2462         verification step
2463     @param error_key: error key to be added to the verification results
2464         in case something goes wrong in this verification step
2465
2466     """
2467     assert (file_disk_template in
2468             utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
2469     cluster = self.cfg.GetClusterInfo()
2470     if cluster.IsDiskTemplateEnabled(file_disk_template):
2471       self._ErrorIf(
2472           verify_key in nresult,
2473           error_key, ninfo.name,
2474           "The configured %s storage path is unusable: %s" %
2475           (file_disk_template, nresult.get(verify_key)))
2476
2477   def _VerifyFileStoragePaths(self, ninfo, nresult):
2478     """Verifies (file) storage paths.
2479
2480     @see: C{_VerifyStoragePaths}
2481
2482     """
2483     self._VerifyStoragePaths(
2484         ninfo, nresult, constants.DT_FILE,
2485         constants.NV_FILE_STORAGE_PATH,
2486         constants.CV_ENODEFILESTORAGEPATHUNUSABLE)
2487
2488   def _VerifySharedFileStoragePaths(self, ninfo, nresult):
2489     """Verifies (file) storage paths.
2490
2491     @see: C{_VerifyStoragePaths}
2492
2493     """
2494     self._VerifyStoragePaths(
2495         ninfo, nresult, constants.DT_SHARED_FILE,
2496         constants.NV_SHARED_FILE_STORAGE_PATH,
2497         constants.CV_ENODESHAREDFILESTORAGEPATHUNUSABLE)
2498
2499   def _VerifyOob(self, ninfo, nresult):
2500     """Verifies out of band functionality of a node.
2501
2502     @type ninfo: L{objects.Node}
2503     @param ninfo: the node to check
2504     @param nresult: the remote results for the node
2505
2506     """
2507     # We just have to verify the paths on master and/or master candidates
2508     # as the oob helper is invoked on the master
2509     if ((ninfo.master_candidate or ninfo.master_capable) and
2510         constants.NV_OOB_PATHS in nresult):
2511       for path_result in nresult[constants.NV_OOB_PATHS]:
2512         self._ErrorIf(path_result, constants.CV_ENODEOOBPATH,
2513                       ninfo.name, path_result)
2514
2515   def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2516     """Verifies and updates the node volume data.
2517
2518     This function will update a L{NodeImage}'s internal structures
2519     with data from the remote call.
2520
2521     @type ninfo: L{objects.Node}
2522     @param ninfo: the node to check
2523     @param nresult: the remote results for the node
2524     @param nimg: the node image object
2525     @param vg_name: the configured VG name
2526
2527     """
2528     nimg.lvm_fail = True
2529     lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2530     if vg_name is None:
2531       pass
2532     elif isinstance(lvdata, basestring):
2533       self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2534                     "LVM problem on node: %s", utils.SafeEncode(lvdata))
2535     elif not isinstance(lvdata, dict):
2536       self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2537                     "rpc call to node failed (lvlist)")
2538     else:
2539       nimg.volumes = lvdata
2540       nimg.lvm_fail = False
2541
2542   def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2543     """Verifies and updates the node instance list.
2544
2545     If the listing was successful, then updates this node's instance
2546     list. Otherwise, it marks the RPC call as failed for the instance
2547     list key.
2548
2549     @type ninfo: L{objects.Node}
2550     @param ninfo: the node to check
2551     @param nresult: the remote results for the node
2552     @param nimg: the node image object
2553
2554     """
2555     idata = nresult.get(constants.NV_INSTANCELIST, None)
2556     test = not isinstance(idata, list)
2557     self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2558                   "rpc call to node failed (instancelist): %s",
2559                   utils.SafeEncode(str(idata)))
2560     if test:
2561       nimg.hyp_fail = True
2562     else:
2563       nimg.instances = [inst.uuid for (_, inst) in
2564                         self.cfg.GetMultiInstanceInfoByName(idata)]
2565
2566   def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2567     """Verifies and computes a node information map
2568
2569     @type ninfo: L{objects.Node}
2570     @param ninfo: the node to check
2571     @param nresult: the remote results for the node
2572     @param nimg: the node image object
2573     @param vg_name: the configured VG name
2574
2575     """
2576     # try to read free memory (from the hypervisor)
2577     hv_info = nresult.get(constants.NV_HVINFO, None)
2578     test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2579     self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2580                   "rpc call to node failed (hvinfo)")
2581     if not test:
2582       try:
2583         nimg.mfree = int(hv_info["memory_free"])
2584       except (ValueError, TypeError):
2585         self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2586                       "node returned invalid nodeinfo, check hypervisor")
2587
2588     # FIXME: devise a free space model for file based instances as well
2589     if vg_name is not None:
2590       test = (constants.NV_VGLIST not in nresult or
2591               vg_name not in nresult[constants.NV_VGLIST])
2592       self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
2593                     "node didn't return data for the volume group '%s'"
2594                     " - it is either missing or broken", vg_name)
2595       if not test:
2596         try:
2597           nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2598         except (ValueError, TypeError):
2599           self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2600                         "node returned invalid LVM info, check LVM status")
2601
2602   def _CollectDiskInfo(self, node_uuids, node_image, instanceinfo):
2603     """Gets per-disk status information for all instances.
2604
2605     @type node_uuids: list of strings
2606     @param node_uuids: Node UUIDs
2607     @type node_image: dict of (UUID, L{objects.Node})
2608     @param node_image: Node objects
2609     @type instanceinfo: dict of (UUID, L{objects.Instance})
2610     @param instanceinfo: Instance objects
2611     @rtype: {instance: {node: [(succes, payload)]}}
2612     @return: a dictionary of per-instance dictionaries with nodes as
2613         keys and disk information as values; the disk information is a
2614         list of tuples (success, payload)
2615
2616     """
2617     node_disks = {}
2618     node_disks_devonly = {}
2619     diskless_instances = set()
2620     diskless = constants.DT_DISKLESS
2621
2622     for nuuid in node_uuids:
2623       node_inst_uuids = list(itertools.chain(node_image[nuuid].pinst,
2624                                              node_image[nuuid].sinst))
2625       diskless_instances.update(uuid for uuid in node_inst_uuids
2626                                 if instanceinfo[uuid].disk_template == diskless)
2627       disks = [(inst_uuid, disk)
2628                for inst_uuid in node_inst_uuids
2629                for disk in instanceinfo[inst_uuid].disks]
2630
2631       if not disks:
2632         # No need to collect data
2633         continue
2634
2635       node_disks[nuuid] = disks
2636
2637       # _AnnotateDiskParams makes already copies of the disks
2638       devonly = []
2639       for (inst_uuid, dev) in disks:
2640         (anno_disk,) = AnnotateDiskParams(instanceinfo[inst_uuid], [dev],
2641                                           self.cfg)
2642         self.cfg.SetDiskID(anno_disk, nuuid)
2643         devonly.append(anno_disk)
2644
2645       node_disks_devonly[nuuid] = devonly
2646
2647     assert len(node_disks) == len(node_disks_devonly)
2648
2649     # Collect data from all nodes with disks
2650     result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2651                                                           node_disks_devonly)
2652
2653     assert len(result) == len(node_disks)
2654
2655     instdisk = {}
2656
2657     for (nuuid, nres) in result.items():
2658       node = self.cfg.GetNodeInfo(nuuid)
2659       disks = node_disks[node.uuid]
2660
2661       if nres.offline:
2662         # No data from this node
2663         data = len(disks) * [(False, "node offline")]
2664       else:
2665         msg = nres.fail_msg
2666         self._ErrorIf(msg, constants.CV_ENODERPC, node.name,
2667                       "while getting disk information: %s", msg)
2668         if msg:
2669           # No data from this node
2670           data = len(disks) * [(False, msg)]
2671         else:
2672           data = []
2673           for idx, i in enumerate(nres.payload):
2674             if isinstance(i, (tuple, list)) and len(i) == 2:
2675               data.append(i)
2676             else:
2677               logging.warning("Invalid result from node %s, entry %d: %s",
2678                               node.name, idx, i)
2679               data.append((False, "Invalid result from the remote node"))
2680
2681       for ((inst_uuid, _), status) in zip(disks, data):
2682         instdisk.setdefault(inst_uuid, {}).setdefault(node.uuid, []) \
2683           .append(status)
2684
2685     # Add empty entries for diskless instances.
2686     for inst_uuid in diskless_instances:
2687       assert inst_uuid not in instdisk
2688       instdisk[inst_uuid] = {}
2689
2690     assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2691                       len(nuuids) <= len(instanceinfo[inst].all_nodes) and
2692                       compat.all(isinstance(s, (tuple, list)) and
2693                                  len(s) == 2 for s in statuses)
2694                       for inst, nuuids in instdisk.items()
2695                       for nuuid, statuses in nuuids.items())
2696     if __debug__:
2697       instdisk_keys = set(instdisk)
2698       instanceinfo_keys = set(instanceinfo)
2699       assert instdisk_keys == instanceinfo_keys, \
2700         ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2701          (instdisk_keys, instanceinfo_keys))
2702
2703     return instdisk
2704
2705   @staticmethod
2706   def _SshNodeSelector(group_uuid, all_nodes):
2707     """Create endless iterators for all potential SSH check hosts.
2708
2709     """
2710     nodes = [node for node in all_nodes
2711              if (node.group != group_uuid and
2712                  not node.offline)]
2713     keyfunc = operator.attrgetter("group")
2714
2715     return map(itertools.cycle,
2716                [sorted(map(operator.attrgetter("name"), names))
2717                 for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2718                                                   keyfunc)])
2719
2720   @classmethod
2721   def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2722     """Choose which nodes should talk to which other nodes.
2723
2724     We will make nodes contact all nodes in their group, and one node from
2725     every other group.
2726
2727     @warning: This algorithm has a known issue if one node group is much
2728       smaller than others (e.g. just one node). In such a case all other
2729       nodes will talk to the single node.
2730
2731     """
2732     online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2733     sel = cls._SshNodeSelector(group_uuid, all_nodes)
2734
2735     return (online_nodes,
2736             dict((name, sorted([i.next() for i in sel]))
2737                  for name in online_nodes))
2738
2739   def BuildHooksEnv(self):
2740     """Build hooks env.
2741
2742     Cluster-Verify hooks just ran in the post phase and their failure makes
2743     the output be logged in the verify output and the verification to fail.
2744
2745     """
2746     env = {
2747       "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2748       }
2749
2750     env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2751                for node in self.my_node_info.values())
2752
2753     return env
2754
2755   def BuildHooksNodes(self):
2756     """Build hooks nodes.
2757
2758     """
2759     return ([], list(self.my_node_info.keys()))
2760
2761   def Exec(self, feedback_fn):
2762     """Verify integrity of the node group, performing various test on nodes.
2763
2764     """
2765     # This method has too many local variables. pylint: disable=R0914
2766     feedback_fn("* Verifying group '%s'" % self.group_info.name)
2767
2768     if not self.my_node_uuids:
2769       # empty node group
2770       feedback_fn("* Empty node group, skipping verification")
2771       return True
2772
2773     self.bad = False
2774     verbose = self.op.verbose
2775     self._feedback_fn = feedback_fn
2776
2777     vg_name = self.cfg.GetVGName()
2778     drbd_helper = self.cfg.GetDRBDHelper()
2779     cluster = self.cfg.GetClusterInfo()
2780     hypervisors = cluster.enabled_hypervisors
2781     node_data_list = self.my_node_info.values()
2782
2783     i_non_redundant = [] # Non redundant instances
2784     i_non_a_balanced = [] # Non auto-balanced instances
2785     i_offline = 0 # Count of offline instances
2786     n_offline = 0 # Count of offline nodes
2787     n_drained = 0 # Count of nodes being drained
2788     node_vol_should = {}
2789
2790     # FIXME: verify OS list
2791
2792     # File verification
2793     filemap = ComputeAncillaryFiles(cluster, False)
2794
2795     # do local checksums
2796     master_node_uuid = self.master_node = self.cfg.GetMasterNode()
2797     master_ip = self.cfg.GetMasterIP()
2798
2799     feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_uuids))
2800
2801     user_scripts = []
2802     if self.cfg.GetUseExternalMipScript():
2803       user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2804
2805     node_verify_param = {
2806       constants.NV_FILELIST:
2807         map(vcluster.MakeVirtualPath,
2808             utils.UniqueSequence(filename
2809                                  for files in filemap
2810                                  for filename in files)),
2811       constants.NV_NODELIST:
2812         self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2813                                   self.all_node_info.values()),
2814       constants.NV_HYPERVISOR: hypervisors,
2815       constants.NV_HVPARAMS:
2816         _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2817       constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2818                                  for node in node_data_list
2819                                  if not node.offline],
2820       constants.NV_INSTANCELIST: hypervisors,
2821       constants.NV_VERSION: None,
2822       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2823       constants.NV_NODESETUP: None,
2824       constants.NV_TIME: None,
2825       constants.NV_MASTERIP: (self.cfg.GetMasterNodeName(), master_ip),
2826       constants.NV_OSLIST: None,
2827       constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2828       constants.NV_USERSCRIPTS: user_scripts,
2829       }
2830
2831     if vg_name is not None:
2832       node_verify_param[constants.NV_VGLIST] = None
2833       node_verify_param[constants.NV_LVLIST] = vg_name
2834       node_verify_param[constants.NV_PVLIST] = [vg_name]
2835
2836     if drbd_helper:
2837       node_verify_param[constants.NV_DRBDVERSION] = None
2838       node_verify_param[constants.NV_DRBDLIST] = None
2839       node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2840
2841     if cluster.IsFileStorageEnabled() or \
2842         cluster.IsSharedFileStorageEnabled():
2843       # Load file storage paths only from master node
2844       node_verify_param[constants.NV_ACCEPTED_STORAGE_PATHS] = \
2845         self.cfg.GetMasterNodeName()
2846       if cluster.IsFileStorageEnabled():
2847         node_verify_param[constants.NV_FILE_STORAGE_PATH] = \
2848           cluster.file_storage_dir
2849
2850     # bridge checks
2851     # FIXME: this needs to be changed per node-group, not cluster-wide
2852     bridges = set()
2853     default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2854     if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2855       bridges.add(default_nicpp[constants.NIC_LINK])
2856     for inst_uuid in self.my_inst_info.values():
2857       for nic in inst_uuid.nics:
2858         full_nic = cluster.SimpleFillNIC(nic.nicparams)
2859         if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2860           bridges.add(full_nic[constants.NIC_LINK])
2861
2862     if bridges:
2863       node_verify_param[constants.NV_BRIDGES] = list(bridges)
2864
2865     # Build our expected cluster state
2866     node_image = dict((node.uuid, self.NodeImage(offline=node.offline,
2867                                                  uuid=node.uuid,
2868                                                  vm_capable=node.vm_capable))
2869                       for node in node_data_list)
2870
2871     # Gather OOB paths
2872     oob_paths = []
2873     for node in self.all_node_info.values():
2874       path = SupportsOob(self.cfg, node)
2875       if path and path not in oob_paths:
2876         oob_paths.append(path)
2877
2878     if oob_paths:
2879       node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2880
2881     for inst_uuid in self.my_inst_uuids:
2882       instance = self.my_inst_info[inst_uuid]
2883       if instance.admin_state == constants.ADMINST_OFFLINE:
2884         i_offline += 1
2885
2886       for nuuid in instance.all_nodes:
2887         if nuuid not in node_image:
2888           gnode = self.NodeImage(uuid=nuuid)
2889           gnode.ghost = (nuuid not in self.all_node_info)
2890           node_image[nuuid] = gnode
2891
2892       instance.MapLVsByNode(node_vol_should)
2893
2894       pnode = instance.primary_node
2895       node_image[pnode].pinst.append(instance.uuid)
2896
2897       for snode in instance.secondary_nodes:
2898         nimg = node_image[snode]
2899         nimg.sinst.append(instance.uuid)
2900         if pnode not in nimg.sbp:
2901           nimg.sbp[pnode] = []
2902         nimg.sbp[pnode].append(instance.uuid)
2903
2904     es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
2905                                                self.my_node_info.keys())
2906     # The value of exclusive_storage should be the same across the group, so if
2907     # it's True for at least a node, we act as if it were set for all the nodes
2908     self._exclusive_storage = compat.any(es_flags.values())
2909     if self._exclusive_storage:
2910       node_verify_param[constants.NV_EXCLUSIVEPVS] = True
2911
2912     # At this point, we have the in-memory data structures complete,
2913     # except for the runtime information, which we'll gather next
2914
2915     # Due to the way our RPC system works, exact response times cannot be
2916     # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2917     # time before and after executing the request, we can at least have a time
2918     # window.
2919     nvinfo_starttime = time.time()
2920     all_nvinfo = self.rpc.call_node_verify(self.my_node_uuids,
2921                                            node_verify_param,
2922                                            self.cfg.GetClusterName(),
2923                                            self.cfg.GetClusterInfo().hvparams)
2924     nvinfo_endtime = time.time()
2925
2926     if self.extra_lv_nodes and vg_name is not None:
2927       extra_lv_nvinfo = \
2928           self.rpc.call_node_verify(self.extra_lv_nodes,
2929                                     {constants.NV_LVLIST: vg_name},
2930                                     self.cfg.GetClusterName(),
2931                                     self.cfg.GetClusterInfo().hvparams)
2932     else:
2933       extra_lv_nvinfo = {}
2934
2935     all_drbd_map = self.cfg.ComputeDRBDMap()
2936
2937     feedback_fn("* Gathering disk information (%s nodes)" %
2938                 len(self.my_node_uuids))
2939     instdisk = self._CollectDiskInfo(self.my_node_info.keys(), node_image,
2940                                      self.my_inst_info)
2941
2942     feedback_fn("* Verifying configuration file consistency")
2943
2944     # If not all nodes are being checked, we need to make sure the master node
2945     # and a non-checked vm_capable node are in the list.
2946     absent_node_uuids = set(self.all_node_info).difference(self.my_node_info)
2947     if absent_node_uuids:
2948       vf_nvinfo = all_nvinfo.copy()
2949       vf_node_info = list(self.my_node_info.values())
2950       additional_node_uuids = []
2951       if master_node_uuid not in self.my_node_info:
2952         additional_node_uuids.append(master_node_uuid)
2953         vf_node_info.append(self.all_node_info[master_node_uuid])
2954       # Add the first vm_capable node we find which is not included,
2955       # excluding the master node (which we already have)
2956       for node_uuid in absent_node_uuids:
2957         nodeinfo = self.all_node_info[node_uuid]
2958         if (nodeinfo.vm_capable and not nodeinfo.offline and
2959             node_uuid != master_node_uuid):
2960           additional_node_uuids.append(node_uuid)
2961           vf_node_info.append(self.all_node_info[node_uuid])
2962           break
2963       key = constants.NV_FILELIST
2964       vf_nvinfo.update(self.rpc.call_node_verify(
2965          additional_node_uuids, {key: node_verify_param[key]},
2966          self.cfg.GetClusterName(), self.cfg.GetClusterInfo().hvparams))
2967     else:
2968       vf_nvinfo = all_nvinfo
2969       vf_node_info = self.my_node_info.values()
2970
2971     self._VerifyFiles(vf_node_info, master_node_uuid, vf_nvinfo, filemap)
2972
2973     feedback_fn("* Verifying node status")
2974
2975     refos_img = None
2976
2977     for node_i in node_data_list:
2978       nimg = node_image[node_i.uuid]
2979
2980       if node_i.offline:
2981         if verbose:
2982           feedback_fn("* Skipping offline node %s" % (node_i.name,))
2983         n_offline += 1
2984         continue
2985
2986       if node_i.uuid == master_node_uuid:
2987         ntype = "master"
2988       elif node_i.master_candidate:
2989         ntype = "master candidate"
2990       elif node_i.drained:
2991         ntype = "drained"
2992         n_drained += 1
2993       else:
2994         ntype = "regular"
2995       if verbose:
2996         feedback_fn("* Verifying node %s (%s)" % (node_i.name, ntype))
2997
2998       msg = all_nvinfo[node_i.uuid].fail_msg
2999       self._ErrorIf(msg, constants.CV_ENODERPC, node_i.name,
3000                     "while contacting node: %s", msg)
3001       if msg:
3002         nimg.rpc_fail = True
3003         continue
3004
3005       nresult = all_nvinfo[node_i.uuid].payload
3006
3007       nimg.call_ok = self._VerifyNode(node_i, nresult)
3008       self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
3009       self._VerifyNodeNetwork(node_i, nresult)
3010       self._VerifyNodeUserScripts(node_i, nresult)
3011       self._VerifyOob(node_i, nresult)
3012       self._VerifyAcceptedFileStoragePaths(node_i, nresult,
3013                                            node_i.uuid == master_node_uuid)
3014       self._VerifyFileStoragePaths(node_i, nresult)
3015       self._VerifySharedFileStoragePaths(node_i, nresult)
3016
3017       if nimg.vm_capable:
3018         self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
3019         self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
3020                              all_drbd_map)
3021
3022         self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
3023         self._UpdateNodeInstances(node_i, nresult, nimg)
3024         self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
3025         self._UpdateNodeOS(node_i, nresult, nimg)
3026
3027         if not nimg.os_fail:
3028           if refos_img is None:
3029             refos_img = nimg
3030           self._VerifyNodeOS(node_i, nimg, refos_img)
3031         self._VerifyNodeBridges(node_i, nresult, bridges)
3032
3033         # Check whether all running instances are primary for the node. (This
3034         # can no longer be done from _VerifyInstance below, since some of the
3035         # wrong instances could be from other node groups.)
3036         non_primary_inst_uuids = set(nimg.instances).difference(nimg.pinst)
3037
3038         for inst_uuid in non_primary_inst_uuids:
3039           test = inst_uuid in self.all_inst_info
3040           self._ErrorIf(test, constants.CV_EINSTANCEWRONGNODE,
3041                         self.cfg.GetInstanceName(inst_uuid),
3042                         "instance should not run on node %s", node_i.name)
3043           self._ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
3044                         "node is running unknown instance %s", inst_uuid)
3045
3046     self._VerifyGroupDRBDVersion(all_nvinfo)
3047     self._VerifyGroupLVM(node_image, vg_name)
3048
3049     for node_uuid, result in extra_lv_nvinfo.items():
3050       self._UpdateNodeVolumes(self.all_node_info[node_uuid], result.payload,
3051                               node_image[node_uuid], vg_name)
3052
3053     feedback_fn("* Verifying instance status")
3054     for inst_uuid in self.my_inst_uuids:
3055       instance = self.my_inst_info[inst_uuid]
3056       if verbose:
3057         feedback_fn("* Verifying instance %s" % instance.name)
3058       self._VerifyInstance(instance, node_image, instdisk[inst_uuid])
3059
3060       # If the instance is non-redundant we cannot survive losing its primary
3061       # node, so we are not N+1 compliant.
3062       if instance.disk_template not in constants.DTS_MIRRORED:
3063         i_non_redundant.append(instance)
3064
3065       if not cluster.FillBE(instance)[constants.BE_AUTO_BALANCE]:
3066         i_non_a_balanced.append(instance)
3067
3068     feedback_fn("* Verifying orphan volumes")
3069     reserved = utils.FieldSet(*cluster.reserved_lvs)
3070
3071     # We will get spurious "unknown volume" warnings if any node of this group
3072     # is secondary for an instance whose primary is in another group. To avoid
3073     # them, we find these instances and add their volumes to node_vol_should.
3074     for instance in self.all_inst_info.values():
3075       for secondary in instance.secondary_nodes:
3076         if (secondary in self.my_node_info
3077             and instance.name not in self.my_inst_info):
3078           instance.MapLVsByNode(node_vol_should)
3079           break
3080
3081     self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
3082
3083     if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
3084       feedback_fn("* Verifying N+1 Memory redundancy")
3085       self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
3086
3087     feedback_fn("* Other Notes")
3088     if i_non_redundant:
3089       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
3090                   % len(i_non_redundant))
3091
3092     if i_non_a_balanced:
3093       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
3094                   % len(i_non_a_balanced))
3095
3096     if i_offline:
3097       feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
3098
3099     if n_offline:
3100       feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
3101
3102     if n_drained:
3103       feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
3104
3105     return not self.bad
3106
3107   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
3108     """Analyze the post-hooks' result
3109
3110     This method analyses the hook result, handles it, and sends some
3111     nicely-formatted feedback back to the user.
3112
3113     @param phase: one of L{constants.HOOKS_PHASE_POST} or
3114         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
3115     @param hooks_results: the results of the multi-node hooks rpc call
3116     @param feedback_fn: function used send feedback back to the caller
3117     @param lu_result: previous Exec result
3118     @return: the new Exec result, based on the previous result
3119         and hook results
3120
3121     """
3122     # We only really run POST phase hooks, only for non-empty groups,
3123     # and are only interested in their results
3124     if not self.my_node_uuids:
3125       # empty node group
3126       pass
3127     elif phase == constants.HOOKS_PHASE_POST:
3128       # Used to change hooks' output to proper indentation
3129       feedback_fn("* Hooks Results")
3130       assert hooks_results, "invalid result from hooks"
3131
3132       for node_name in hooks_results:
3133         res = hooks_results[node_name]
3134         msg = res.fail_msg
3135         test = msg and not res.offline
3136         self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3137                       "Communication failure in hooks execution: %s", msg)
3138         if res.offline or msg:
3139           # No need to investigate payload if node is offline or gave
3140           # an error.
3141           continue
3142         for script, hkr, output in res.payload:
3143           test = hkr == constants.HKR_FAIL
3144           self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3145                         "Script %s failed, output:", script)
3146           if test:
3147             output = self._HOOKS_INDENT_RE.sub("      ", output)
3148             feedback_fn("%s" % output)
3149             lu_result = False
3150
3151     return lu_result
3152
3153
3154 class LUClusterVerifyDisks(NoHooksLU):
3155   """Verifies the cluster disks status.
3156
3157   """
3158   REQ_BGL = False
3159
3160   def ExpandNames(self):
3161     self.share_locks = ShareAll()
3162     self.needed_locks = {
3163       locking.LEVEL_NODEGROUP: locking.ALL_SET,
3164       }
3165
3166   def Exec(self, feedback_fn):
3167     group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3168
3169     # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3170     return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3171                            for group in group_names])