Check validity of the access protocol parameter
[ganeti-local] / lib / cmdlib / cluster.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Logical units dealing with the cluster."""
23
24 import OpenSSL
25
26 import copy
27 import itertools
28 import logging
29 import operator
30 import os
31 import re
32 import time
33
34 from ganeti import compat
35 from ganeti import constants
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import masterd
40 from ganeti import netutils
41 from ganeti import objects
42 from ganeti import opcodes
43 from ganeti import pathutils
44 from ganeti import query
45 from ganeti import rpc
46 from ganeti import runtime
47 from ganeti import ssh
48 from ganeti import uidpool
49 from ganeti import utils
50 from ganeti import vcluster
51
52 from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
53   ResultWithJobs
54 from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55   ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56   GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57   GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58   CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59   ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob, \
60   CheckIpolicyVsDiskTemplates, CheckDiskAccessModeValidity, \
61   CheckDiskAccessModeConsistency
62
63 import ganeti.masterd.instance
64
65
66 class LUClusterActivateMasterIp(NoHooksLU):
67   """Activate the master IP on the master node.
68
69   """
70   def Exec(self, feedback_fn):
71     """Activate the master IP.
72
73     """
74     master_params = self.cfg.GetMasterNetworkParameters()
75     ems = self.cfg.GetUseExternalMipScript()
76     result = self.rpc.call_node_activate_master_ip(master_params.uuid,
77                                                    master_params, ems)
78     result.Raise("Could not activate the master IP")
79
80
81 class LUClusterDeactivateMasterIp(NoHooksLU):
82   """Deactivate the master IP on the master node.
83
84   """
85   def Exec(self, feedback_fn):
86     """Deactivate the master IP.
87
88     """
89     master_params = self.cfg.GetMasterNetworkParameters()
90     ems = self.cfg.GetUseExternalMipScript()
91     result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
92                                                      master_params, ems)
93     result.Raise("Could not deactivate the master IP")
94
95
96 class LUClusterConfigQuery(NoHooksLU):
97   """Return configuration values.
98
99   """
100   REQ_BGL = False
101
102   def CheckArguments(self):
103     self.cq = ClusterQuery(None, self.op.output_fields, False)
104
105   def ExpandNames(self):
106     self.cq.ExpandNames(self)
107
108   def DeclareLocks(self, level):
109     self.cq.DeclareLocks(self, level)
110
111   def Exec(self, feedback_fn):
112     result = self.cq.OldStyleQuery(self)
113
114     assert len(result) == 1
115
116     return result[0]
117
118
119 class LUClusterDestroy(LogicalUnit):
120   """Logical unit for destroying the cluster.
121
122   """
123   HPATH = "cluster-destroy"
124   HTYPE = constants.HTYPE_CLUSTER
125
126   def BuildHooksEnv(self):
127     """Build hooks env.
128
129     """
130     return {
131       "OP_TARGET": self.cfg.GetClusterName(),
132       }
133
134   def BuildHooksNodes(self):
135     """Build hooks nodes.
136
137     """
138     return ([], [])
139
140   def CheckPrereq(self):
141     """Check prerequisites.
142
143     This checks whether the cluster is empty.
144
145     Any errors are signaled by raising errors.OpPrereqError.
146
147     """
148     master = self.cfg.GetMasterNode()
149
150     nodelist = self.cfg.GetNodeList()
151     if len(nodelist) != 1 or nodelist[0] != master:
152       raise errors.OpPrereqError("There are still %d node(s) in"
153                                  " this cluster." % (len(nodelist) - 1),
154                                  errors.ECODE_INVAL)
155     instancelist = self.cfg.GetInstanceList()
156     if instancelist:
157       raise errors.OpPrereqError("There are still %d instance(s) in"
158                                  " this cluster." % len(instancelist),
159                                  errors.ECODE_INVAL)
160
161   def Exec(self, feedback_fn):
162     """Destroys the cluster.
163
164     """
165     master_params = self.cfg.GetMasterNetworkParameters()
166
167     # Run post hooks on master node before it's removed
168     RunPostHook(self, self.cfg.GetNodeName(master_params.uuid))
169
170     ems = self.cfg.GetUseExternalMipScript()
171     result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
172                                                      master_params, ems)
173     result.Warn("Error disabling the master IP address", self.LogWarning)
174     return master_params.uuid
175
176
177 class LUClusterPostInit(LogicalUnit):
178   """Logical unit for running hooks after cluster initialization.
179
180   """
181   HPATH = "cluster-init"
182   HTYPE = constants.HTYPE_CLUSTER
183
184   def BuildHooksEnv(self):
185     """Build hooks env.
186
187     """
188     return {
189       "OP_TARGET": self.cfg.GetClusterName(),
190       }
191
192   def BuildHooksNodes(self):
193     """Build hooks nodes.
194
195     """
196     return ([], [self.cfg.GetMasterNode()])
197
198   def Exec(self, feedback_fn):
199     """Nothing to do.
200
201     """
202     return True
203
204
205 class ClusterQuery(QueryBase):
206   FIELDS = query.CLUSTER_FIELDS
207
208   #: Do not sort (there is only one item)
209   SORT_FIELD = None
210
211   def ExpandNames(self, lu):
212     lu.needed_locks = {}
213
214     # The following variables interact with _QueryBase._GetNames
215     self.wanted = locking.ALL_SET
216     self.do_locking = self.use_locking
217
218     if self.do_locking:
219       raise errors.OpPrereqError("Can not use locking for cluster queries",
220                                  errors.ECODE_INVAL)
221
222   def DeclareLocks(self, lu, level):
223     pass
224
225   def _GetQueryData(self, lu):
226     """Computes the list of nodes and their attributes.
227
228     """
229     # Locking is not used
230     assert not (compat.any(lu.glm.is_owned(level)
231                            for level in locking.LEVELS
232                            if level != locking.LEVEL_CLUSTER) or
233                 self.do_locking or self.use_locking)
234
235     if query.CQ_CONFIG in self.requested_data:
236       cluster = lu.cfg.GetClusterInfo()
237       nodes = lu.cfg.GetAllNodesInfo()
238     else:
239       cluster = NotImplemented
240       nodes = NotImplemented
241
242     if query.CQ_QUEUE_DRAINED in self.requested_data:
243       drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
244     else:
245       drain_flag = NotImplemented
246
247     if query.CQ_WATCHER_PAUSE in self.requested_data:
248       master_node_uuid = lu.cfg.GetMasterNode()
249
250       result = lu.rpc.call_get_watcher_pause(master_node_uuid)
251       result.Raise("Can't retrieve watcher pause from master node '%s'" %
252                    lu.cfg.GetMasterNodeName())
253
254       watcher_pause = result.payload
255     else:
256       watcher_pause = NotImplemented
257
258     return query.ClusterQueryData(cluster, nodes, drain_flag, watcher_pause)
259
260
261 class LUClusterQuery(NoHooksLU):
262   """Query cluster configuration.
263
264   """
265   REQ_BGL = False
266
267   def ExpandNames(self):
268     self.needed_locks = {}
269
270   def Exec(self, feedback_fn):
271     """Return cluster config.
272
273     """
274     cluster = self.cfg.GetClusterInfo()
275     os_hvp = {}
276
277     # Filter just for enabled hypervisors
278     for os_name, hv_dict in cluster.os_hvp.items():
279       os_hvp[os_name] = {}
280       for hv_name, hv_params in hv_dict.items():
281         if hv_name in cluster.enabled_hypervisors:
282           os_hvp[os_name][hv_name] = hv_params
283
284     # Convert ip_family to ip_version
285     primary_ip_version = constants.IP4_VERSION
286     if cluster.primary_ip_family == netutils.IP6Address.family:
287       primary_ip_version = constants.IP6_VERSION
288
289     result = {
290       "software_version": constants.RELEASE_VERSION,
291       "protocol_version": constants.PROTOCOL_VERSION,
292       "config_version": constants.CONFIG_VERSION,
293       "os_api_version": max(constants.OS_API_VERSIONS),
294       "export_version": constants.EXPORT_VERSION,
295       "vcs_version": constants.VCS_VERSION,
296       "architecture": runtime.GetArchInfo(),
297       "name": cluster.cluster_name,
298       "master": self.cfg.GetMasterNodeName(),
299       "default_hypervisor": cluster.primary_hypervisor,
300       "enabled_hypervisors": cluster.enabled_hypervisors,
301       "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
302                         for hypervisor_name in cluster.enabled_hypervisors]),
303       "os_hvp": os_hvp,
304       "beparams": cluster.beparams,
305       "osparams": cluster.osparams,
306       "ipolicy": cluster.ipolicy,
307       "nicparams": cluster.nicparams,
308       "ndparams": cluster.ndparams,
309       "diskparams": cluster.diskparams,
310       "candidate_pool_size": cluster.candidate_pool_size,
311       "master_netdev": cluster.master_netdev,
312       "master_netmask": cluster.master_netmask,
313       "use_external_mip_script": cluster.use_external_mip_script,
314       "volume_group_name": cluster.volume_group_name,
315       "drbd_usermode_helper": cluster.drbd_usermode_helper,
316       "file_storage_dir": cluster.file_storage_dir,
317       "shared_file_storage_dir": cluster.shared_file_storage_dir,
318       "maintain_node_health": cluster.maintain_node_health,
319       "ctime": cluster.ctime,
320       "mtime": cluster.mtime,
321       "uuid": cluster.uuid,
322       "tags": list(cluster.GetTags()),
323       "uid_pool": cluster.uid_pool,
324       "default_iallocator": cluster.default_iallocator,
325       "reserved_lvs": cluster.reserved_lvs,
326       "primary_ip_version": primary_ip_version,
327       "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
328       "hidden_os": cluster.hidden_os,
329       "blacklisted_os": cluster.blacklisted_os,
330       "enabled_disk_templates": cluster.enabled_disk_templates,
331       }
332
333     return result
334
335
336 class LUClusterRedistConf(NoHooksLU):
337   """Force the redistribution of cluster configuration.
338
339   This is a very simple LU.
340
341   """
342   REQ_BGL = False
343
344   def ExpandNames(self):
345     self.needed_locks = {
346       locking.LEVEL_NODE: locking.ALL_SET,
347       locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
348     }
349     self.share_locks = ShareAll()
350
351   def Exec(self, feedback_fn):
352     """Redistribute the configuration.
353
354     """
355     self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
356     RedistributeAncillaryFiles(self)
357
358
359 class LUClusterRename(LogicalUnit):
360   """Rename the cluster.
361
362   """
363   HPATH = "cluster-rename"
364   HTYPE = constants.HTYPE_CLUSTER
365
366   def BuildHooksEnv(self):
367     """Build hooks env.
368
369     """
370     return {
371       "OP_TARGET": self.cfg.GetClusterName(),
372       "NEW_NAME": self.op.name,
373       }
374
375   def BuildHooksNodes(self):
376     """Build hooks nodes.
377
378     """
379     return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
380
381   def CheckPrereq(self):
382     """Verify that the passed name is a valid one.
383
384     """
385     hostname = netutils.GetHostname(name=self.op.name,
386                                     family=self.cfg.GetPrimaryIPFamily())
387
388     new_name = hostname.name
389     self.ip = new_ip = hostname.ip
390     old_name = self.cfg.GetClusterName()
391     old_ip = self.cfg.GetMasterIP()
392     if new_name == old_name and new_ip == old_ip:
393       raise errors.OpPrereqError("Neither the name nor the IP address of the"
394                                  " cluster has changed",
395                                  errors.ECODE_INVAL)
396     if new_ip != old_ip:
397       if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
398         raise errors.OpPrereqError("The given cluster IP address (%s) is"
399                                    " reachable on the network" %
400                                    new_ip, errors.ECODE_NOTUNIQUE)
401
402     self.op.name = new_name
403
404   def Exec(self, feedback_fn):
405     """Rename the cluster.
406
407     """
408     clustername = self.op.name
409     new_ip = self.ip
410
411     # shutdown the master IP
412     master_params = self.cfg.GetMasterNetworkParameters()
413     ems = self.cfg.GetUseExternalMipScript()
414     result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
415                                                      master_params, ems)
416     result.Raise("Could not disable the master role")
417
418     try:
419       cluster = self.cfg.GetClusterInfo()
420       cluster.cluster_name = clustername
421       cluster.master_ip = new_ip
422       self.cfg.Update(cluster, feedback_fn)
423
424       # update the known hosts file
425       ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
426       node_list = self.cfg.GetOnlineNodeList()
427       try:
428         node_list.remove(master_params.uuid)
429       except ValueError:
430         pass
431       UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
432     finally:
433       master_params.ip = new_ip
434       result = self.rpc.call_node_activate_master_ip(master_params.uuid,
435                                                      master_params, ems)
436       result.Warn("Could not re-enable the master role on the master,"
437                   " please restart manually", self.LogWarning)
438
439     return clustername
440
441
442 class LUClusterRepairDiskSizes(NoHooksLU):
443   """Verifies the cluster disks sizes.
444
445   """
446   REQ_BGL = False
447
448   def ExpandNames(self):
449     if self.op.instances:
450       (_, self.wanted_names) = GetWantedInstances(self, self.op.instances)
451       # Not getting the node allocation lock as only a specific set of
452       # instances (and their nodes) is going to be acquired
453       self.needed_locks = {
454         locking.LEVEL_NODE_RES: [],
455         locking.LEVEL_INSTANCE: self.wanted_names,
456         }
457       self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
458     else:
459       self.wanted_names = None
460       self.needed_locks = {
461         locking.LEVEL_NODE_RES: locking.ALL_SET,
462         locking.LEVEL_INSTANCE: locking.ALL_SET,
463
464         # This opcode is acquires the node locks for all instances
465         locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
466         }
467
468     self.share_locks = {
469       locking.LEVEL_NODE_RES: 1,
470       locking.LEVEL_INSTANCE: 0,
471       locking.LEVEL_NODE_ALLOC: 1,
472       }
473
474   def DeclareLocks(self, level):
475     if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
476       self._LockInstancesNodes(primary_only=True, level=level)
477
478   def CheckPrereq(self):
479     """Check prerequisites.
480
481     This only checks the optional instance list against the existing names.
482
483     """
484     if self.wanted_names is None:
485       self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
486
487     self.wanted_instances = \
488         map(compat.snd, self.cfg.GetMultiInstanceInfoByName(self.wanted_names))
489
490   def _EnsureChildSizes(self, disk):
491     """Ensure children of the disk have the needed disk size.
492
493     This is valid mainly for DRBD8 and fixes an issue where the
494     children have smaller disk size.
495
496     @param disk: an L{ganeti.objects.Disk} object
497
498     """
499     if disk.dev_type == constants.DT_DRBD8:
500       assert disk.children, "Empty children for DRBD8?"
501       fchild = disk.children[0]
502       mismatch = fchild.size < disk.size
503       if mismatch:
504         self.LogInfo("Child disk has size %d, parent %d, fixing",
505                      fchild.size, disk.size)
506         fchild.size = disk.size
507
508       # and we recurse on this child only, not on the metadev
509       return self._EnsureChildSizes(fchild) or mismatch
510     else:
511       return False
512
513   def Exec(self, feedback_fn):
514     """Verify the size of cluster disks.
515
516     """
517     # TODO: check child disks too
518     # TODO: check differences in size between primary/secondary nodes
519     per_node_disks = {}
520     for instance in self.wanted_instances:
521       pnode = instance.primary_node
522       if pnode not in per_node_disks:
523         per_node_disks[pnode] = []
524       for idx, disk in enumerate(instance.disks):
525         per_node_disks[pnode].append((instance, idx, disk))
526
527     assert not (frozenset(per_node_disks.keys()) -
528                 self.owned_locks(locking.LEVEL_NODE_RES)), \
529       "Not owning correct locks"
530     assert not self.owned_locks(locking.LEVEL_NODE)
531
532     es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
533                                                per_node_disks.keys())
534
535     changed = []
536     for node_uuid, dskl in per_node_disks.items():
537       if not dskl:
538         # no disks on the node
539         continue
540
541       newl = [([v[2].Copy()], v[0]) for v in dskl]
542       node_name = self.cfg.GetNodeName(node_uuid)
543       result = self.rpc.call_blockdev_getdimensions(node_uuid, newl)
544       if result.fail_msg:
545         self.LogWarning("Failure in blockdev_getdimensions call to node"
546                         " %s, ignoring", node_name)
547         continue
548       if len(result.payload) != len(dskl):
549         logging.warning("Invalid result from node %s: len(dksl)=%d,"
550                         " result.payload=%s", node_name, len(dskl),
551                         result.payload)
552         self.LogWarning("Invalid result from node %s, ignoring node results",
553                         node_name)
554         continue
555       for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
556         if dimensions is None:
557           self.LogWarning("Disk %d of instance %s did not return size"
558                           " information, ignoring", idx, instance.name)
559           continue
560         if not isinstance(dimensions, (tuple, list)):
561           self.LogWarning("Disk %d of instance %s did not return valid"
562                           " dimension information, ignoring", idx,
563                           instance.name)
564           continue
565         (size, spindles) = dimensions
566         if not isinstance(size, (int, long)):
567           self.LogWarning("Disk %d of instance %s did not return valid"
568                           " size information, ignoring", idx, instance.name)
569           continue
570         size = size >> 20
571         if size != disk.size:
572           self.LogInfo("Disk %d of instance %s has mismatched size,"
573                        " correcting: recorded %d, actual %d", idx,
574                        instance.name, disk.size, size)
575           disk.size = size
576           self.cfg.Update(instance, feedback_fn)
577           changed.append((instance.name, idx, "size", size))
578         if es_flags[node_uuid]:
579           if spindles is None:
580             self.LogWarning("Disk %d of instance %s did not return valid"
581                             " spindles information, ignoring", idx,
582                             instance.name)
583           elif disk.spindles is None or disk.spindles != spindles:
584             self.LogInfo("Disk %d of instance %s has mismatched spindles,"
585                          " correcting: recorded %s, actual %s",
586                          idx, instance.name, disk.spindles, spindles)
587             disk.spindles = spindles
588             self.cfg.Update(instance, feedback_fn)
589             changed.append((instance.name, idx, "spindles", disk.spindles))
590         if self._EnsureChildSizes(disk):
591           self.cfg.Update(instance, feedback_fn)
592           changed.append((instance.name, idx, "size", disk.size))
593     return changed
594
595
596 def _ValidateNetmask(cfg, netmask):
597   """Checks if a netmask is valid.
598
599   @type cfg: L{config.ConfigWriter}
600   @param cfg: The cluster configuration
601   @type netmask: int
602   @param netmask: the netmask to be verified
603   @raise errors.OpPrereqError: if the validation fails
604
605   """
606   ip_family = cfg.GetPrimaryIPFamily()
607   try:
608     ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
609   except errors.ProgrammerError:
610     raise errors.OpPrereqError("Invalid primary ip family: %s." %
611                                ip_family, errors.ECODE_INVAL)
612   if not ipcls.ValidateNetmask(netmask):
613     raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
614                                (netmask), errors.ECODE_INVAL)
615
616
617 def CheckFileBasedStoragePathVsEnabledDiskTemplates(
618     logging_warn_fn, file_storage_dir, enabled_disk_templates,
619     file_disk_template):
620   """Checks whether the given file-based storage directory is acceptable.
621
622   Note: This function is public, because it is also used in bootstrap.py.
623
624   @type logging_warn_fn: function
625   @param logging_warn_fn: function which accepts a string and logs it
626   @type file_storage_dir: string
627   @param file_storage_dir: the directory to be used for file-based instances
628   @type enabled_disk_templates: list of string
629   @param enabled_disk_templates: the list of enabled disk templates
630   @type file_disk_template: string
631   @param file_disk_template: the file-based disk template for which the
632       path should be checked
633
634   """
635   assert (file_disk_template in
636           utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
637   file_storage_enabled = file_disk_template in enabled_disk_templates
638   if file_storage_dir is not None:
639     if file_storage_dir == "":
640       if file_storage_enabled:
641         raise errors.OpPrereqError(
642             "Unsetting the '%s' storage directory while having '%s' storage"
643             " enabled is not permitted." %
644             (file_disk_template, file_disk_template))
645     else:
646       if not file_storage_enabled:
647         logging_warn_fn(
648             "Specified a %s storage directory, although %s storage is not"
649             " enabled." % (file_disk_template, file_disk_template))
650   else:
651     raise errors.ProgrammerError("Received %s storage dir with value"
652                                  " 'None'." % file_disk_template)
653
654
655 def CheckFileStoragePathVsEnabledDiskTemplates(
656     logging_warn_fn, file_storage_dir, enabled_disk_templates):
657   """Checks whether the given file storage directory is acceptable.
658
659   @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
660
661   """
662   CheckFileBasedStoragePathVsEnabledDiskTemplates(
663       logging_warn_fn, file_storage_dir, enabled_disk_templates,
664       constants.DT_FILE)
665
666
667 def CheckSharedFileStoragePathVsEnabledDiskTemplates(
668     logging_warn_fn, file_storage_dir, enabled_disk_templates):
669   """Checks whether the given shared file storage directory is acceptable.
670
671   @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
672
673   """
674   CheckFileBasedStoragePathVsEnabledDiskTemplates(
675       logging_warn_fn, file_storage_dir, enabled_disk_templates,
676       constants.DT_SHARED_FILE)
677
678
679 class LUClusterSetParams(LogicalUnit):
680   """Change the parameters of the cluster.
681
682   """
683   HPATH = "cluster-modify"
684   HTYPE = constants.HTYPE_CLUSTER
685   REQ_BGL = False
686
687   def CheckArguments(self):
688     """Check parameters
689
690     """
691     if self.op.uid_pool:
692       uidpool.CheckUidPool(self.op.uid_pool)
693
694     if self.op.add_uids:
695       uidpool.CheckUidPool(self.op.add_uids)
696
697     if self.op.remove_uids:
698       uidpool.CheckUidPool(self.op.remove_uids)
699
700     if self.op.master_netmask is not None:
701       _ValidateNetmask(self.cfg, self.op.master_netmask)
702
703     if self.op.diskparams:
704       for dt_params in self.op.diskparams.values():
705         utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
706       try:
707         utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
708         CheckDiskAccessModeValidity(self.op.diskparams)
709       except errors.OpPrereqError, err:
710         raise errors.OpPrereqError("While verify diskparams options: %s" % err,
711                                    errors.ECODE_INVAL)
712
713   def ExpandNames(self):
714     # FIXME: in the future maybe other cluster params won't require checking on
715     # all nodes to be modified.
716     # FIXME: This opcode changes cluster-wide settings. Is acquiring all
717     # resource locks the right thing, shouldn't it be the BGL instead?
718     self.needed_locks = {
719       locking.LEVEL_NODE: locking.ALL_SET,
720       locking.LEVEL_INSTANCE: locking.ALL_SET,
721       locking.LEVEL_NODEGROUP: locking.ALL_SET,
722       locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
723     }
724     self.share_locks = ShareAll()
725
726   def BuildHooksEnv(self):
727     """Build hooks env.
728
729     """
730     return {
731       "OP_TARGET": self.cfg.GetClusterName(),
732       "NEW_VG_NAME": self.op.vg_name,
733       }
734
735   def BuildHooksNodes(self):
736     """Build hooks nodes.
737
738     """
739     mn = self.cfg.GetMasterNode()
740     return ([mn], [mn])
741
742   def _CheckVgName(self, node_uuids, enabled_disk_templates,
743                    new_enabled_disk_templates):
744     """Check the consistency of the vg name on all nodes and in case it gets
745        unset whether there are instances still using it.
746
747     """
748     lvm_is_enabled = utils.IsLvmEnabled(enabled_disk_templates)
749     lvm_gets_enabled = utils.LvmGetsEnabled(enabled_disk_templates,
750                                             new_enabled_disk_templates)
751     current_vg_name = self.cfg.GetVGName()
752
753     if self.op.vg_name == '':
754       if lvm_is_enabled:
755         raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
756                                    " disk templates are or get enabled.")
757
758     if self.op.vg_name is None:
759       if current_vg_name is None and lvm_is_enabled:
760         raise errors.OpPrereqError("Please specify a volume group when"
761                                    " enabling lvm-based disk-templates.")
762
763     if self.op.vg_name is not None and not self.op.vg_name:
764       if self.cfg.HasAnyDiskOfType(constants.DT_PLAIN):
765         raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
766                                    " instances exist", errors.ECODE_INVAL)
767
768     if (self.op.vg_name is not None and lvm_is_enabled) or \
769         (self.cfg.GetVGName() is not None and lvm_gets_enabled):
770       self._CheckVgNameOnNodes(node_uuids)
771
772   def _CheckVgNameOnNodes(self, node_uuids):
773     """Check the status of the volume group on each node.
774
775     """
776     vglist = self.rpc.call_vg_list(node_uuids)
777     for node_uuid in node_uuids:
778       msg = vglist[node_uuid].fail_msg
779       if msg:
780         # ignoring down node
781         self.LogWarning("Error while gathering data on node %s"
782                         " (ignoring node): %s",
783                         self.cfg.GetNodeName(node_uuid), msg)
784         continue
785       vgstatus = utils.CheckVolumeGroupSize(vglist[node_uuid].payload,
786                                             self.op.vg_name,
787                                             constants.MIN_VG_SIZE)
788       if vgstatus:
789         raise errors.OpPrereqError("Error on node '%s': %s" %
790                                    (self.cfg.GetNodeName(node_uuid), vgstatus),
791                                    errors.ECODE_ENVIRON)
792
793   @staticmethod
794   def _GetEnabledDiskTemplatesInner(op_enabled_disk_templates,
795                                     old_enabled_disk_templates):
796     """Determines the enabled disk templates and the subset of disk templates
797        that are newly enabled by this operation.
798
799     """
800     enabled_disk_templates = None
801     new_enabled_disk_templates = []
802     if op_enabled_disk_templates:
803       enabled_disk_templates = op_enabled_disk_templates
804       new_enabled_disk_templates = \
805         list(set(enabled_disk_templates)
806              - set(old_enabled_disk_templates))
807     else:
808       enabled_disk_templates = old_enabled_disk_templates
809     return (enabled_disk_templates, new_enabled_disk_templates)
810
811   def _GetEnabledDiskTemplates(self, cluster):
812     """Determines the enabled disk templates and the subset of disk templates
813        that are newly enabled by this operation.
814
815     """
816     return self._GetEnabledDiskTemplatesInner(self.op.enabled_disk_templates,
817                                               cluster.enabled_disk_templates)
818
819   def _CheckIpolicy(self, cluster, enabled_disk_templates):
820     """Checks the ipolicy.
821
822     @type cluster: C{objects.Cluster}
823     @param cluster: the cluster's configuration
824     @type enabled_disk_templates: list of string
825     @param enabled_disk_templates: list of (possibly newly) enabled disk
826       templates
827
828     """
829     # FIXME: write unit tests for this
830     if self.op.ipolicy:
831       self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
832                                            group_policy=False)
833
834       CheckIpolicyVsDiskTemplates(self.new_ipolicy,
835                                   enabled_disk_templates)
836
837       all_instances = self.cfg.GetAllInstancesInfo().values()
838       violations = set()
839       for group in self.cfg.GetAllNodeGroupsInfo().values():
840         instances = frozenset([inst for inst in all_instances
841                                if compat.any(nuuid in group.members
842                                              for nuuid in inst.all_nodes)])
843         new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
844         ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
845         new = ComputeNewInstanceViolations(ipol, new_ipolicy, instances,
846                                            self.cfg)
847         if new:
848           violations.update(new)
849
850       if violations:
851         self.LogWarning("After the ipolicy change the following instances"
852                         " violate them: %s",
853                         utils.CommaJoin(utils.NiceSort(violations)))
854     else:
855       CheckIpolicyVsDiskTemplates(cluster.ipolicy,
856                                   enabled_disk_templates)
857
858   def _CheckDrbdHelperOnNodes(self, drbd_helper, node_uuids):
859     """Checks whether the set DRBD helper actually exists on the nodes.
860
861     @type drbd_helper: string
862     @param drbd_helper: path of the drbd usermode helper binary
863     @type node_uuids: list of strings
864     @param node_uuids: list of node UUIDs to check for the helper
865
866     """
867     # checks given drbd helper on all nodes
868     helpers = self.rpc.call_drbd_helper(node_uuids)
869     for (_, ninfo) in self.cfg.GetMultiNodeInfo(node_uuids):
870       if ninfo.offline:
871         self.LogInfo("Not checking drbd helper on offline node %s",
872                      ninfo.name)
873         continue
874       msg = helpers[ninfo.uuid].fail_msg
875       if msg:
876         raise errors.OpPrereqError("Error checking drbd helper on node"
877                                    " '%s': %s" % (ninfo.name, msg),
878                                    errors.ECODE_ENVIRON)
879       node_helper = helpers[ninfo.uuid].payload
880       if node_helper != drbd_helper:
881         raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
882                                    (ninfo.name, node_helper),
883                                    errors.ECODE_ENVIRON)
884
885   def _CheckDrbdHelper(self, node_uuids, drbd_enabled, drbd_gets_enabled):
886     """Check the DRBD usermode helper.
887
888     @type node_uuids: list of strings
889     @param node_uuids: a list of nodes' UUIDs
890     @type drbd_enabled: boolean
891     @param drbd_enabled: whether DRBD will be enabled after this operation
892       (no matter if it was disabled before or not)
893     @type drbd_gets_enabled: boolen
894     @param drbd_gets_enabled: true if DRBD was disabled before this
895       operation, but will be enabled afterwards
896
897     """
898     if self.op.drbd_helper == '':
899       if drbd_enabled:
900         raise errors.OpPrereqError("Cannot disable drbd helper while"
901                                    " DRBD is enabled.")
902       if self.cfg.HasAnyDiskOfType(constants.DT_DRBD8):
903         raise errors.OpPrereqError("Cannot disable drbd helper while"
904                                    " drbd-based instances exist",
905                                    errors.ECODE_INVAL)
906
907     else:
908       if self.op.drbd_helper is not None and drbd_enabled:
909         self._CheckDrbdHelperOnNodes(self.op.drbd_helper, node_uuids)
910       else:
911         if drbd_gets_enabled:
912           current_drbd_helper = self.cfg.GetClusterInfo().drbd_usermode_helper
913           if current_drbd_helper is not None:
914             self._CheckDrbdHelperOnNodes(current_drbd_helper, node_uuids)
915           else:
916             raise errors.OpPrereqError("Cannot enable DRBD without a"
917                                        " DRBD usermode helper set.")
918
919   def CheckPrereq(self):
920     """Check prerequisites.
921
922     This checks whether the given params don't conflict and
923     if the given volume group is valid.
924
925     """
926     node_uuids = self.owned_locks(locking.LEVEL_NODE)
927     self.cluster = cluster = self.cfg.GetClusterInfo()
928
929     vm_capable_node_uuids = [node.uuid
930                              for node in self.cfg.GetAllNodesInfo().values()
931                              if node.uuid in node_uuids and node.vm_capable]
932
933     (enabled_disk_templates, new_enabled_disk_templates) = \
934       self._GetEnabledDiskTemplates(cluster)
935
936     self._CheckVgName(vm_capable_node_uuids, enabled_disk_templates,
937                       new_enabled_disk_templates)
938
939     if self.op.file_storage_dir is not None:
940       CheckFileStoragePathVsEnabledDiskTemplates(
941           self.LogWarning, self.op.file_storage_dir, enabled_disk_templates)
942
943     if self.op.shared_file_storage_dir is not None:
944       CheckSharedFileStoragePathVsEnabledDiskTemplates(
945           self.LogWarning, self.op.shared_file_storage_dir,
946           enabled_disk_templates)
947
948     drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
949     drbd_gets_enabled = constants.DT_DRBD8 in new_enabled_disk_templates
950     self._CheckDrbdHelper(node_uuids, drbd_enabled, drbd_gets_enabled)
951
952     # validate params changes
953     if self.op.beparams:
954       objects.UpgradeBeParams(self.op.beparams)
955       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
956       self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
957
958     if self.op.ndparams:
959       utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
960       self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
961
962       # TODO: we need a more general way to handle resetting
963       # cluster-level parameters to default values
964       if self.new_ndparams["oob_program"] == "":
965         self.new_ndparams["oob_program"] = \
966             constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
967
968     if self.op.hv_state:
969       new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
970                                            self.cluster.hv_state_static)
971       self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
972                                for hv, values in new_hv_state.items())
973
974     if self.op.disk_state:
975       new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
976                                                self.cluster.disk_state_static)
977       self.new_disk_state = \
978         dict((storage, dict((name, cluster.SimpleFillDiskState(values))
979                             for name, values in svalues.items()))
980              for storage, svalues in new_disk_state.items())
981
982     self._CheckIpolicy(cluster, enabled_disk_templates)
983
984     if self.op.nicparams:
985       utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
986       self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
987       objects.NIC.CheckParameterSyntax(self.new_nicparams)
988       nic_errors = []
989
990       # check all instances for consistency
991       for instance in self.cfg.GetAllInstancesInfo().values():
992         for nic_idx, nic in enumerate(instance.nics):
993           params_copy = copy.deepcopy(nic.nicparams)
994           params_filled = objects.FillDict(self.new_nicparams, params_copy)
995
996           # check parameter syntax
997           try:
998             objects.NIC.CheckParameterSyntax(params_filled)
999           except errors.ConfigurationError, err:
1000             nic_errors.append("Instance %s, nic/%d: %s" %
1001                               (instance.name, nic_idx, err))
1002
1003           # if we're moving instances to routed, check that they have an ip
1004           target_mode = params_filled[constants.NIC_MODE]
1005           if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
1006             nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
1007                               " address" % (instance.name, nic_idx))
1008       if nic_errors:
1009         raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
1010                                    "\n".join(nic_errors), errors.ECODE_INVAL)
1011
1012     # hypervisor list/parameters
1013     self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
1014     if self.op.hvparams:
1015       for hv_name, hv_dict in self.op.hvparams.items():
1016         if hv_name not in self.new_hvparams:
1017           self.new_hvparams[hv_name] = hv_dict
1018         else:
1019           self.new_hvparams[hv_name].update(hv_dict)
1020
1021     # disk template parameters
1022     self.new_diskparams = objects.FillDict(cluster.diskparams, {})
1023     if self.op.diskparams:
1024       for dt_name, dt_params in self.op.diskparams.items():
1025         if dt_name not in self.new_diskparams:
1026           self.new_diskparams[dt_name] = dt_params
1027         else:
1028           self.new_diskparams[dt_name].update(dt_params)
1029       CheckDiskAccessModeConsistency(self.op.diskparams, self.cfg)
1030
1031     # os hypervisor parameters
1032     self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
1033     if self.op.os_hvp:
1034       for os_name, hvs in self.op.os_hvp.items():
1035         if os_name not in self.new_os_hvp:
1036           self.new_os_hvp[os_name] = hvs
1037         else:
1038           for hv_name, hv_dict in hvs.items():
1039             if hv_dict is None:
1040               # Delete if it exists
1041               self.new_os_hvp[os_name].pop(hv_name, None)
1042             elif hv_name not in self.new_os_hvp[os_name]:
1043               self.new_os_hvp[os_name][hv_name] = hv_dict
1044             else:
1045               self.new_os_hvp[os_name][hv_name].update(hv_dict)
1046
1047     # os parameters
1048     self.new_osp = objects.FillDict(cluster.osparams, {})
1049     if self.op.osparams:
1050       for os_name, osp in self.op.osparams.items():
1051         if os_name not in self.new_osp:
1052           self.new_osp[os_name] = {}
1053
1054         self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
1055                                                  use_none=True)
1056
1057         if not self.new_osp[os_name]:
1058           # we removed all parameters
1059           del self.new_osp[os_name]
1060         else:
1061           # check the parameter validity (remote check)
1062           CheckOSParams(self, False, [self.cfg.GetMasterNode()],
1063                         os_name, self.new_osp[os_name])
1064
1065     # changes to the hypervisor list
1066     if self.op.enabled_hypervisors is not None:
1067       self.hv_list = self.op.enabled_hypervisors
1068       for hv in self.hv_list:
1069         # if the hypervisor doesn't already exist in the cluster
1070         # hvparams, we initialize it to empty, and then (in both
1071         # cases) we make sure to fill the defaults, as we might not
1072         # have a complete defaults list if the hypervisor wasn't
1073         # enabled before
1074         if hv not in new_hvp:
1075           new_hvp[hv] = {}
1076         new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
1077         utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
1078     else:
1079       self.hv_list = cluster.enabled_hypervisors
1080
1081     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1082       # either the enabled list has changed, or the parameters have, validate
1083       for hv_name, hv_params in self.new_hvparams.items():
1084         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1085             (self.op.enabled_hypervisors and
1086              hv_name in self.op.enabled_hypervisors)):
1087           # either this is a new hypervisor, or its parameters have changed
1088           hv_class = hypervisor.GetHypervisorClass(hv_name)
1089           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1090           hv_class.CheckParameterSyntax(hv_params)
1091           CheckHVParams(self, node_uuids, hv_name, hv_params)
1092
1093     self._CheckDiskTemplateConsistency()
1094
1095     if self.op.os_hvp:
1096       # no need to check any newly-enabled hypervisors, since the
1097       # defaults have already been checked in the above code-block
1098       for os_name, os_hvp in self.new_os_hvp.items():
1099         for hv_name, hv_params in os_hvp.items():
1100           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1101           # we need to fill in the new os_hvp on top of the actual hv_p
1102           cluster_defaults = self.new_hvparams.get(hv_name, {})
1103           new_osp = objects.FillDict(cluster_defaults, hv_params)
1104           hv_class = hypervisor.GetHypervisorClass(hv_name)
1105           hv_class.CheckParameterSyntax(new_osp)
1106           CheckHVParams(self, node_uuids, hv_name, new_osp)
1107
1108     if self.op.default_iallocator:
1109       alloc_script = utils.FindFile(self.op.default_iallocator,
1110                                     constants.IALLOCATOR_SEARCH_PATH,
1111                                     os.path.isfile)
1112       if alloc_script is None:
1113         raise errors.OpPrereqError("Invalid default iallocator script '%s'"
1114                                    " specified" % self.op.default_iallocator,
1115                                    errors.ECODE_INVAL)
1116
1117   def _CheckDiskTemplateConsistency(self):
1118     """Check whether the disk templates that are going to be disabled
1119        are still in use by some instances.
1120
1121     """
1122     if self.op.enabled_disk_templates:
1123       cluster = self.cfg.GetClusterInfo()
1124       instances = self.cfg.GetAllInstancesInfo()
1125
1126       disk_templates_to_remove = set(cluster.enabled_disk_templates) \
1127         - set(self.op.enabled_disk_templates)
1128       for instance in instances.itervalues():
1129         if instance.disk_template in disk_templates_to_remove:
1130           raise errors.OpPrereqError("Cannot disable disk template '%s',"
1131                                      " because instance '%s' is using it." %
1132                                      (instance.disk_template, instance.name))
1133
1134   def _SetVgName(self, feedback_fn):
1135     """Determines and sets the new volume group name.
1136
1137     """
1138     if self.op.vg_name is not None:
1139       new_volume = self.op.vg_name
1140       if not new_volume:
1141         new_volume = None
1142       if new_volume != self.cfg.GetVGName():
1143         self.cfg.SetVGName(new_volume)
1144       else:
1145         feedback_fn("Cluster LVM configuration already in desired"
1146                     " state, not changing")
1147
1148   def _SetFileStorageDir(self, feedback_fn):
1149     """Set the file storage directory.
1150
1151     """
1152     if self.op.file_storage_dir is not None:
1153       if self.cluster.file_storage_dir == self.op.file_storage_dir:
1154         feedback_fn("Global file storage dir already set to value '%s'"
1155                     % self.cluster.file_storage_dir)
1156       else:
1157         self.cluster.file_storage_dir = self.op.file_storage_dir
1158
1159   def _SetDrbdHelper(self, feedback_fn):
1160     """Set the DRBD usermode helper.
1161
1162     """
1163     if self.op.drbd_helper is not None:
1164       if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
1165         feedback_fn("Note that you specified a drbd user helper, but did not"
1166                     " enable the drbd disk template.")
1167       new_helper = self.op.drbd_helper
1168       if not new_helper:
1169         new_helper = None
1170       if new_helper != self.cfg.GetDRBDHelper():
1171         self.cfg.SetDRBDHelper(new_helper)
1172       else:
1173         feedback_fn("Cluster DRBD helper already in desired state,"
1174                     " not changing")
1175
1176   def Exec(self, feedback_fn):
1177     """Change the parameters of the cluster.
1178
1179     """
1180     if self.op.enabled_disk_templates:
1181       self.cluster.enabled_disk_templates = \
1182         list(set(self.op.enabled_disk_templates))
1183
1184     self._SetVgName(feedback_fn)
1185     self._SetFileStorageDir(feedback_fn)
1186     self._SetDrbdHelper(feedback_fn)
1187
1188     if self.op.hvparams:
1189       self.cluster.hvparams = self.new_hvparams
1190     if self.op.os_hvp:
1191       self.cluster.os_hvp = self.new_os_hvp
1192     if self.op.enabled_hypervisors is not None:
1193       self.cluster.hvparams = self.new_hvparams
1194       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1195     if self.op.beparams:
1196       self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1197     if self.op.nicparams:
1198       self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1199     if self.op.ipolicy:
1200       self.cluster.ipolicy = self.new_ipolicy
1201     if self.op.osparams:
1202       self.cluster.osparams = self.new_osp
1203     if self.op.ndparams:
1204       self.cluster.ndparams = self.new_ndparams
1205     if self.op.diskparams:
1206       self.cluster.diskparams = self.new_diskparams
1207     if self.op.hv_state:
1208       self.cluster.hv_state_static = self.new_hv_state
1209     if self.op.disk_state:
1210       self.cluster.disk_state_static = self.new_disk_state
1211
1212     if self.op.candidate_pool_size is not None:
1213       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1214       # we need to update the pool size here, otherwise the save will fail
1215       AdjustCandidatePool(self, [])
1216
1217     if self.op.maintain_node_health is not None:
1218       if self.op.maintain_node_health and not constants.ENABLE_CONFD:
1219         feedback_fn("Note: CONFD was disabled at build time, node health"
1220                     " maintenance is not useful (still enabling it)")
1221       self.cluster.maintain_node_health = self.op.maintain_node_health
1222
1223     if self.op.modify_etc_hosts is not None:
1224       self.cluster.modify_etc_hosts = self.op.modify_etc_hosts
1225
1226     if self.op.prealloc_wipe_disks is not None:
1227       self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
1228
1229     if self.op.add_uids is not None:
1230       uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
1231
1232     if self.op.remove_uids is not None:
1233       uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
1234
1235     if self.op.uid_pool is not None:
1236       self.cluster.uid_pool = self.op.uid_pool
1237
1238     if self.op.default_iallocator is not None:
1239       self.cluster.default_iallocator = self.op.default_iallocator
1240
1241     if self.op.reserved_lvs is not None:
1242       self.cluster.reserved_lvs = self.op.reserved_lvs
1243
1244     if self.op.use_external_mip_script is not None:
1245       self.cluster.use_external_mip_script = self.op.use_external_mip_script
1246
1247     def helper_os(aname, mods, desc):
1248       desc += " OS list"
1249       lst = getattr(self.cluster, aname)
1250       for key, val in mods:
1251         if key == constants.DDM_ADD:
1252           if val in lst:
1253             feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1254           else:
1255             lst.append(val)
1256         elif key == constants.DDM_REMOVE:
1257           if val in lst:
1258             lst.remove(val)
1259           else:
1260             feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1261         else:
1262           raise errors.ProgrammerError("Invalid modification '%s'" % key)
1263
1264     if self.op.hidden_os:
1265       helper_os("hidden_os", self.op.hidden_os, "hidden")
1266
1267     if self.op.blacklisted_os:
1268       helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1269
1270     if self.op.master_netdev:
1271       master_params = self.cfg.GetMasterNetworkParameters()
1272       ems = self.cfg.GetUseExternalMipScript()
1273       feedback_fn("Shutting down master ip on the current netdev (%s)" %
1274                   self.cluster.master_netdev)
1275       result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
1276                                                        master_params, ems)
1277       if not self.op.force:
1278         result.Raise("Could not disable the master ip")
1279       else:
1280         if result.fail_msg:
1281           msg = ("Could not disable the master ip (continuing anyway): %s" %
1282                  result.fail_msg)
1283           feedback_fn(msg)
1284       feedback_fn("Changing master_netdev from %s to %s" %
1285                   (master_params.netdev, self.op.master_netdev))
1286       self.cluster.master_netdev = self.op.master_netdev
1287
1288     if self.op.master_netmask:
1289       master_params = self.cfg.GetMasterNetworkParameters()
1290       feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1291       result = self.rpc.call_node_change_master_netmask(
1292                  master_params.uuid, master_params.netmask,
1293                  self.op.master_netmask, master_params.ip,
1294                  master_params.netdev)
1295       result.Warn("Could not change the master IP netmask", feedback_fn)
1296       self.cluster.master_netmask = self.op.master_netmask
1297
1298     self.cfg.Update(self.cluster, feedback_fn)
1299
1300     if self.op.master_netdev:
1301       master_params = self.cfg.GetMasterNetworkParameters()
1302       feedback_fn("Starting the master ip on the new master netdev (%s)" %
1303                   self.op.master_netdev)
1304       ems = self.cfg.GetUseExternalMipScript()
1305       result = self.rpc.call_node_activate_master_ip(master_params.uuid,
1306                                                      master_params, ems)
1307       result.Warn("Could not re-enable the master ip on the master,"
1308                   " please restart manually", self.LogWarning)
1309
1310
1311 class LUClusterVerify(NoHooksLU):
1312   """Submits all jobs necessary to verify the cluster.
1313
1314   """
1315   REQ_BGL = False
1316
1317   def ExpandNames(self):
1318     self.needed_locks = {}
1319
1320   def Exec(self, feedback_fn):
1321     jobs = []
1322
1323     if self.op.group_name:
1324       groups = [self.op.group_name]
1325       depends_fn = lambda: None
1326     else:
1327       groups = self.cfg.GetNodeGroupList()
1328
1329       # Verify global configuration
1330       jobs.append([
1331         opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1332         ])
1333
1334       # Always depend on global verification
1335       depends_fn = lambda: [(-len(jobs), [])]
1336
1337     jobs.extend(
1338       [opcodes.OpClusterVerifyGroup(group_name=group,
1339                                     ignore_errors=self.op.ignore_errors,
1340                                     depends=depends_fn())]
1341       for group in groups)
1342
1343     # Fix up all parameters
1344     for op in itertools.chain(*jobs): # pylint: disable=W0142
1345       op.debug_simulate_errors = self.op.debug_simulate_errors
1346       op.verbose = self.op.verbose
1347       op.error_codes = self.op.error_codes
1348       try:
1349         op.skip_checks = self.op.skip_checks
1350       except AttributeError:
1351         assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1352
1353     return ResultWithJobs(jobs)
1354
1355
1356 class _VerifyErrors(object):
1357   """Mix-in for cluster/group verify LUs.
1358
1359   It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1360   self.op and self._feedback_fn to be available.)
1361
1362   """
1363
1364   ETYPE_FIELD = "code"
1365   ETYPE_ERROR = "ERROR"
1366   ETYPE_WARNING = "WARNING"
1367
1368   def _Error(self, ecode, item, msg, *args, **kwargs):
1369     """Format an error message.
1370
1371     Based on the opcode's error_codes parameter, either format a
1372     parseable error code, or a simpler error string.
1373
1374     This must be called only from Exec and functions called from Exec.
1375
1376     """
1377     ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1378     itype, etxt, _ = ecode
1379     # If the error code is in the list of ignored errors, demote the error to a
1380     # warning
1381     if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1382       ltype = self.ETYPE_WARNING
1383     # first complete the msg
1384     if args:
1385       msg = msg % args
1386     # then format the whole message
1387     if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1388       msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1389     else:
1390       if item:
1391         item = " " + item
1392       else:
1393         item = ""
1394       msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1395     # and finally report it via the feedback_fn
1396     self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1397     # do not mark the operation as failed for WARN cases only
1398     if ltype == self.ETYPE_ERROR:
1399       self.bad = True
1400
1401   def _ErrorIf(self, cond, *args, **kwargs):
1402     """Log an error message if the passed condition is True.
1403
1404     """
1405     if (bool(cond)
1406         or self.op.debug_simulate_errors): # pylint: disable=E1101
1407       self._Error(*args, **kwargs)
1408
1409
1410 def _VerifyCertificate(filename):
1411   """Verifies a certificate for L{LUClusterVerifyConfig}.
1412
1413   @type filename: string
1414   @param filename: Path to PEM file
1415
1416   """
1417   try:
1418     cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1419                                            utils.ReadFile(filename))
1420   except Exception, err: # pylint: disable=W0703
1421     return (LUClusterVerifyConfig.ETYPE_ERROR,
1422             "Failed to load X509 certificate %s: %s" % (filename, err))
1423
1424   (errcode, msg) = \
1425     utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1426                                 constants.SSL_CERT_EXPIRATION_ERROR)
1427
1428   if msg:
1429     fnamemsg = "While verifying %s: %s" % (filename, msg)
1430   else:
1431     fnamemsg = None
1432
1433   if errcode is None:
1434     return (None, fnamemsg)
1435   elif errcode == utils.CERT_WARNING:
1436     return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1437   elif errcode == utils.CERT_ERROR:
1438     return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1439
1440   raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1441
1442
1443 def _GetAllHypervisorParameters(cluster, instances):
1444   """Compute the set of all hypervisor parameters.
1445
1446   @type cluster: L{objects.Cluster}
1447   @param cluster: the cluster object
1448   @param instances: list of L{objects.Instance}
1449   @param instances: additional instances from which to obtain parameters
1450   @rtype: list of (origin, hypervisor, parameters)
1451   @return: a list with all parameters found, indicating the hypervisor they
1452        apply to, and the origin (can be "cluster", "os X", or "instance Y")
1453
1454   """
1455   hvp_data = []
1456
1457   for hv_name in cluster.enabled_hypervisors:
1458     hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1459
1460   for os_name, os_hvp in cluster.os_hvp.items():
1461     for hv_name, hv_params in os_hvp.items():
1462       if hv_params:
1463         full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1464         hvp_data.append(("os %s" % os_name, hv_name, full_params))
1465
1466   # TODO: collapse identical parameter values in a single one
1467   for instance in instances:
1468     if instance.hvparams:
1469       hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1470                        cluster.FillHV(instance)))
1471
1472   return hvp_data
1473
1474
1475 class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1476   """Verifies the cluster config.
1477
1478   """
1479   REQ_BGL = False
1480
1481   def _VerifyHVP(self, hvp_data):
1482     """Verifies locally the syntax of the hypervisor parameters.
1483
1484     """
1485     for item, hv_name, hv_params in hvp_data:
1486       msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1487              (item, hv_name))
1488       try:
1489         hv_class = hypervisor.GetHypervisorClass(hv_name)
1490         utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1491         hv_class.CheckParameterSyntax(hv_params)
1492       except errors.GenericError, err:
1493         self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1494
1495   def ExpandNames(self):
1496     self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1497     self.share_locks = ShareAll()
1498
1499   def CheckPrereq(self):
1500     """Check prerequisites.
1501
1502     """
1503     # Retrieve all information
1504     self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1505     self.all_node_info = self.cfg.GetAllNodesInfo()
1506     self.all_inst_info = self.cfg.GetAllInstancesInfo()
1507
1508   def Exec(self, feedback_fn):
1509     """Verify integrity of cluster, performing various test on nodes.
1510
1511     """
1512     self.bad = False
1513     self._feedback_fn = feedback_fn
1514
1515     feedback_fn("* Verifying cluster config")
1516
1517     for msg in self.cfg.VerifyConfig():
1518       self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1519
1520     feedback_fn("* Verifying cluster certificate files")
1521
1522     for cert_filename in pathutils.ALL_CERT_FILES:
1523       (errcode, msg) = _VerifyCertificate(cert_filename)
1524       self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1525
1526     self._ErrorIf(not utils.CanRead(constants.LUXID_USER,
1527                                     pathutils.NODED_CERT_FILE),
1528                   constants.CV_ECLUSTERCERT,
1529                   None,
1530                   pathutils.NODED_CERT_FILE + " must be accessible by the " +
1531                     constants.LUXID_USER + " user")
1532
1533     feedback_fn("* Verifying hypervisor parameters")
1534
1535     self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1536                                                 self.all_inst_info.values()))
1537
1538     feedback_fn("* Verifying all nodes belong to an existing group")
1539
1540     # We do this verification here because, should this bogus circumstance
1541     # occur, it would never be caught by VerifyGroup, which only acts on
1542     # nodes/instances reachable from existing node groups.
1543
1544     dangling_nodes = set(node for node in self.all_node_info.values()
1545                          if node.group not in self.all_group_info)
1546
1547     dangling_instances = {}
1548     no_node_instances = []
1549
1550     for inst in self.all_inst_info.values():
1551       if inst.primary_node in [node.uuid for node in dangling_nodes]:
1552         dangling_instances.setdefault(inst.primary_node, []).append(inst)
1553       elif inst.primary_node not in self.all_node_info:
1554         no_node_instances.append(inst)
1555
1556     pretty_dangling = [
1557         "%s (%s)" %
1558         (node.name,
1559          utils.CommaJoin(inst.name for
1560                          inst in dangling_instances.get(node.uuid, [])))
1561         for node in dangling_nodes]
1562
1563     self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1564                   None,
1565                   "the following nodes (and their instances) belong to a non"
1566                   " existing group: %s", utils.CommaJoin(pretty_dangling))
1567
1568     self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1569                   None,
1570                   "the following instances have a non-existing primary-node:"
1571                   " %s", utils.CommaJoin(inst.name for
1572                                          inst in no_node_instances))
1573
1574     return not self.bad
1575
1576
1577 class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1578   """Verifies the status of a node group.
1579
1580   """
1581   HPATH = "cluster-verify"
1582   HTYPE = constants.HTYPE_CLUSTER
1583   REQ_BGL = False
1584
1585   _HOOKS_INDENT_RE = re.compile("^", re.M)
1586
1587   class NodeImage(object):
1588     """A class representing the logical and physical status of a node.
1589
1590     @type uuid: string
1591     @ivar uuid: the node UUID to which this object refers
1592     @ivar volumes: a structure as returned from
1593         L{ganeti.backend.GetVolumeList} (runtime)
1594     @ivar instances: a list of running instances (runtime)
1595     @ivar pinst: list of configured primary instances (config)
1596     @ivar sinst: list of configured secondary instances (config)
1597     @ivar sbp: dictionary of {primary-node: list of instances} for all
1598         instances for which this node is secondary (config)
1599     @ivar mfree: free memory, as reported by hypervisor (runtime)
1600     @ivar dfree: free disk, as reported by the node (runtime)
1601     @ivar offline: the offline status (config)
1602     @type rpc_fail: boolean
1603     @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1604         not whether the individual keys were correct) (runtime)
1605     @type lvm_fail: boolean
1606     @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1607     @type hyp_fail: boolean
1608     @ivar hyp_fail: whether the RPC call didn't return the instance list
1609     @type ghost: boolean
1610     @ivar ghost: whether this is a known node or not (config)
1611     @type os_fail: boolean
1612     @ivar os_fail: whether the RPC call didn't return valid OS data
1613     @type oslist: list
1614     @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1615     @type vm_capable: boolean
1616     @ivar vm_capable: whether the node can host instances
1617     @type pv_min: float
1618     @ivar pv_min: size in MiB of the smallest PVs
1619     @type pv_max: float
1620     @ivar pv_max: size in MiB of the biggest PVs
1621
1622     """
1623     def __init__(self, offline=False, uuid=None, vm_capable=True):
1624       self.uuid = uuid
1625       self.volumes = {}
1626       self.instances = []
1627       self.pinst = []
1628       self.sinst = []
1629       self.sbp = {}
1630       self.mfree = 0
1631       self.dfree = 0
1632       self.offline = offline
1633       self.vm_capable = vm_capable
1634       self.rpc_fail = False
1635       self.lvm_fail = False
1636       self.hyp_fail = False
1637       self.ghost = False
1638       self.os_fail = False
1639       self.oslist = {}
1640       self.pv_min = None
1641       self.pv_max = None
1642
1643   def ExpandNames(self):
1644     # This raises errors.OpPrereqError on its own:
1645     self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1646
1647     # Get instances in node group; this is unsafe and needs verification later
1648     inst_uuids = \
1649       self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1650
1651     self.needed_locks = {
1652       locking.LEVEL_INSTANCE: self.cfg.GetInstanceNames(inst_uuids),
1653       locking.LEVEL_NODEGROUP: [self.group_uuid],
1654       locking.LEVEL_NODE: [],
1655
1656       # This opcode is run by watcher every five minutes and acquires all nodes
1657       # for a group. It doesn't run for a long time, so it's better to acquire
1658       # the node allocation lock as well.
1659       locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1660       }
1661
1662     self.share_locks = ShareAll()
1663
1664   def DeclareLocks(self, level):
1665     if level == locking.LEVEL_NODE:
1666       # Get members of node group; this is unsafe and needs verification later
1667       nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1668
1669       # In Exec(), we warn about mirrored instances that have primary and
1670       # secondary living in separate node groups. To fully verify that
1671       # volumes for these instances are healthy, we will need to do an
1672       # extra call to their secondaries. We ensure here those nodes will
1673       # be locked.
1674       for inst_name in self.owned_locks(locking.LEVEL_INSTANCE):
1675         # Important: access only the instances whose lock is owned
1676         instance = self.cfg.GetInstanceInfoByName(inst_name)
1677         if instance.disk_template in constants.DTS_INT_MIRROR:
1678           nodes.update(instance.secondary_nodes)
1679
1680       self.needed_locks[locking.LEVEL_NODE] = nodes
1681
1682   def CheckPrereq(self):
1683     assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1684     self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1685
1686     group_node_uuids = set(self.group_info.members)
1687     group_inst_uuids = \
1688       self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1689
1690     unlocked_node_uuids = \
1691         group_node_uuids.difference(self.owned_locks(locking.LEVEL_NODE))
1692
1693     unlocked_inst_uuids = \
1694         group_inst_uuids.difference(
1695           [self.cfg.GetInstanceInfoByName(name).uuid
1696            for name in self.owned_locks(locking.LEVEL_INSTANCE)])
1697
1698     if unlocked_node_uuids:
1699       raise errors.OpPrereqError(
1700         "Missing lock for nodes: %s" %
1701         utils.CommaJoin(self.cfg.GetNodeNames(unlocked_node_uuids)),
1702         errors.ECODE_STATE)
1703
1704     if unlocked_inst_uuids:
1705       raise errors.OpPrereqError(
1706         "Missing lock for instances: %s" %
1707         utils.CommaJoin(self.cfg.GetInstanceNames(unlocked_inst_uuids)),
1708         errors.ECODE_STATE)
1709
1710     self.all_node_info = self.cfg.GetAllNodesInfo()
1711     self.all_inst_info = self.cfg.GetAllInstancesInfo()
1712
1713     self.my_node_uuids = group_node_uuids
1714     self.my_node_info = dict((node_uuid, self.all_node_info[node_uuid])
1715                              for node_uuid in group_node_uuids)
1716
1717     self.my_inst_uuids = group_inst_uuids
1718     self.my_inst_info = dict((inst_uuid, self.all_inst_info[inst_uuid])
1719                              for inst_uuid in group_inst_uuids)
1720
1721     # We detect here the nodes that will need the extra RPC calls for verifying
1722     # split LV volumes; they should be locked.
1723     extra_lv_nodes = set()
1724
1725     for inst in self.my_inst_info.values():
1726       if inst.disk_template in constants.DTS_INT_MIRROR:
1727         for nuuid in inst.all_nodes:
1728           if self.all_node_info[nuuid].group != self.group_uuid:
1729             extra_lv_nodes.add(nuuid)
1730
1731     unlocked_lv_nodes = \
1732         extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1733
1734     if unlocked_lv_nodes:
1735       raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1736                                  utils.CommaJoin(unlocked_lv_nodes),
1737                                  errors.ECODE_STATE)
1738     self.extra_lv_nodes = list(extra_lv_nodes)
1739
1740   def _VerifyNode(self, ninfo, nresult):
1741     """Perform some basic validation on data returned from a node.
1742
1743       - check the result data structure is well formed and has all the
1744         mandatory fields
1745       - check ganeti version
1746
1747     @type ninfo: L{objects.Node}
1748     @param ninfo: the node to check
1749     @param nresult: the results from the node
1750     @rtype: boolean
1751     @return: whether overall this call was successful (and we can expect
1752          reasonable values in the respose)
1753
1754     """
1755     # main result, nresult should be a non-empty dict
1756     test = not nresult or not isinstance(nresult, dict)
1757     self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1758                   "unable to verify node: no data returned")
1759     if test:
1760       return False
1761
1762     # compares ganeti version
1763     local_version = constants.PROTOCOL_VERSION
1764     remote_version = nresult.get("version", None)
1765     test = not (remote_version and
1766                 isinstance(remote_version, (list, tuple)) and
1767                 len(remote_version) == 2)
1768     self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1769                   "connection to node returned invalid data")
1770     if test:
1771       return False
1772
1773     test = local_version != remote_version[0]
1774     self._ErrorIf(test, constants.CV_ENODEVERSION, ninfo.name,
1775                   "incompatible protocol versions: master %s,"
1776                   " node %s", local_version, remote_version[0])
1777     if test:
1778       return False
1779
1780     # node seems compatible, we can actually try to look into its results
1781
1782     # full package version
1783     self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1784                   constants.CV_ENODEVERSION, ninfo.name,
1785                   "software version mismatch: master %s, node %s",
1786                   constants.RELEASE_VERSION, remote_version[1],
1787                   code=self.ETYPE_WARNING)
1788
1789     hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1790     if ninfo.vm_capable and isinstance(hyp_result, dict):
1791       for hv_name, hv_result in hyp_result.iteritems():
1792         test = hv_result is not None
1793         self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
1794                       "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1795
1796     hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1797     if ninfo.vm_capable and isinstance(hvp_result, list):
1798       for item, hv_name, hv_result in hvp_result:
1799         self._ErrorIf(True, constants.CV_ENODEHV, ninfo.name,
1800                       "hypervisor %s parameter verify failure (source %s): %s",
1801                       hv_name, item, hv_result)
1802
1803     test = nresult.get(constants.NV_NODESETUP,
1804                        ["Missing NODESETUP results"])
1805     self._ErrorIf(test, constants.CV_ENODESETUP, ninfo.name,
1806                   "node setup error: %s", "; ".join(test))
1807
1808     return True
1809
1810   def _VerifyNodeTime(self, ninfo, nresult,
1811                       nvinfo_starttime, nvinfo_endtime):
1812     """Check the node time.
1813
1814     @type ninfo: L{objects.Node}
1815     @param ninfo: the node to check
1816     @param nresult: the remote results for the node
1817     @param nvinfo_starttime: the start time of the RPC call
1818     @param nvinfo_endtime: the end time of the RPC call
1819
1820     """
1821     ntime = nresult.get(constants.NV_TIME, None)
1822     try:
1823       ntime_merged = utils.MergeTime(ntime)
1824     except (ValueError, TypeError):
1825       self._ErrorIf(True, constants.CV_ENODETIME, ninfo.name,
1826                     "Node returned invalid time")
1827       return
1828
1829     if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1830       ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1831     elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1832       ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1833     else:
1834       ntime_diff = None
1835
1836     self._ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, ninfo.name,
1837                   "Node time diverges by at least %s from master node time",
1838                   ntime_diff)
1839
1840   def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1841     """Check the node LVM results and update info for cross-node checks.
1842
1843     @type ninfo: L{objects.Node}
1844     @param ninfo: the node to check
1845     @param nresult: the remote results for the node
1846     @param vg_name: the configured VG name
1847     @type nimg: L{NodeImage}
1848     @param nimg: node image
1849
1850     """
1851     if vg_name is None:
1852       return
1853
1854     # checks vg existence and size > 20G
1855     vglist = nresult.get(constants.NV_VGLIST, None)
1856     test = not vglist
1857     self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
1858                   "unable to check volume groups")
1859     if not test:
1860       vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1861                                             constants.MIN_VG_SIZE)
1862       self._ErrorIf(vgstatus, constants.CV_ENODELVM, ninfo.name, vgstatus)
1863
1864     # Check PVs
1865     (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1866     for em in errmsgs:
1867       self._Error(constants.CV_ENODELVM, ninfo.name, em)
1868     if pvminmax is not None:
1869       (nimg.pv_min, nimg.pv_max) = pvminmax
1870
1871   def _VerifyGroupDRBDVersion(self, node_verify_infos):
1872     """Check cross-node DRBD version consistency.
1873
1874     @type node_verify_infos: dict
1875     @param node_verify_infos: infos about nodes as returned from the
1876       node_verify call.
1877
1878     """
1879     node_versions = {}
1880     for node_uuid, ndata in node_verify_infos.items():
1881       nresult = ndata.payload
1882       version = nresult.get(constants.NV_DRBDVERSION, "Missing DRBD version")
1883       node_versions[node_uuid] = version
1884
1885     if len(set(node_versions.values())) > 1:
1886       for node_uuid, version in sorted(node_versions.items()):
1887         msg = "DRBD version mismatch: %s" % version
1888         self._Error(constants.CV_ENODEDRBDHELPER, node_uuid, msg,
1889                     code=self.ETYPE_WARNING)
1890
1891   def _VerifyGroupLVM(self, node_image, vg_name):
1892     """Check cross-node consistency in LVM.
1893
1894     @type node_image: dict
1895     @param node_image: info about nodes, mapping from node to names to
1896       L{NodeImage} objects
1897     @param vg_name: the configured VG name
1898
1899     """
1900     if vg_name is None:
1901       return
1902
1903     # Only exclusive storage needs this kind of checks
1904     if not self._exclusive_storage:
1905       return
1906
1907     # exclusive_storage wants all PVs to have the same size (approximately),
1908     # if the smallest and the biggest ones are okay, everything is fine.
1909     # pv_min is None iff pv_max is None
1910     vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1911     if not vals:
1912       return
1913     (pvmin, minnode_uuid) = min((ni.pv_min, ni.uuid) for ni in vals)
1914     (pvmax, maxnode_uuid) = max((ni.pv_max, ni.uuid) for ni in vals)
1915     bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1916     self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1917                   "PV sizes differ too much in the group; smallest (%s MB) is"
1918                   " on %s, biggest (%s MB) is on %s",
1919                   pvmin, self.cfg.GetNodeName(minnode_uuid),
1920                   pvmax, self.cfg.GetNodeName(maxnode_uuid))
1921
1922   def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1923     """Check the node bridges.
1924
1925     @type ninfo: L{objects.Node}
1926     @param ninfo: the node to check
1927     @param nresult: the remote results for the node
1928     @param bridges: the expected list of bridges
1929
1930     """
1931     if not bridges:
1932       return
1933
1934     missing = nresult.get(constants.NV_BRIDGES, None)
1935     test = not isinstance(missing, list)
1936     self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1937                   "did not return valid bridge information")
1938     if not test:
1939       self._ErrorIf(bool(missing), constants.CV_ENODENET, ninfo.name,
1940                     "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1941
1942   def _VerifyNodeUserScripts(self, ninfo, nresult):
1943     """Check the results of user scripts presence and executability on the node
1944
1945     @type ninfo: L{objects.Node}
1946     @param ninfo: the node to check
1947     @param nresult: the remote results for the node
1948
1949     """
1950     test = not constants.NV_USERSCRIPTS in nresult
1951     self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1952                   "did not return user scripts information")
1953
1954     broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
1955     if not test:
1956       self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1957                     "user scripts not present or not executable: %s" %
1958                     utils.CommaJoin(sorted(broken_scripts)))
1959
1960   def _VerifyNodeNetwork(self, ninfo, nresult):
1961     """Check the node network connectivity results.
1962
1963     @type ninfo: L{objects.Node}
1964     @param ninfo: the node to check
1965     @param nresult: the remote results for the node
1966
1967     """
1968     test = constants.NV_NODELIST not in nresult
1969     self._ErrorIf(test, constants.CV_ENODESSH, ninfo.name,
1970                   "node hasn't returned node ssh connectivity data")
1971     if not test:
1972       if nresult[constants.NV_NODELIST]:
1973         for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1974           self._ErrorIf(True, constants.CV_ENODESSH, ninfo.name,
1975                         "ssh communication with node '%s': %s", a_node, a_msg)
1976
1977     test = constants.NV_NODENETTEST not in nresult
1978     self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1979                   "node hasn't returned node tcp connectivity data")
1980     if not test:
1981       if nresult[constants.NV_NODENETTEST]:
1982         nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1983         for anode in nlist:
1984           self._ErrorIf(True, constants.CV_ENODENET, ninfo.name,
1985                         "tcp communication with node '%s': %s",
1986                         anode, nresult[constants.NV_NODENETTEST][anode])
1987
1988     test = constants.NV_MASTERIP not in nresult
1989     self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1990                   "node hasn't returned node master IP reachability data")
1991     if not test:
1992       if not nresult[constants.NV_MASTERIP]:
1993         if ninfo.uuid == self.master_node:
1994           msg = "the master node cannot reach the master IP (not configured?)"
1995         else:
1996           msg = "cannot reach the master IP"
1997         self._ErrorIf(True, constants.CV_ENODENET, ninfo.name, msg)
1998
1999   def _VerifyInstance(self, instance, node_image, diskstatus):
2000     """Verify an instance.
2001
2002     This function checks to see if the required block devices are
2003     available on the instance's node, and that the nodes are in the correct
2004     state.
2005
2006     """
2007     pnode_uuid = instance.primary_node
2008     pnode_img = node_image[pnode_uuid]
2009     groupinfo = self.cfg.GetAllNodeGroupsInfo()
2010
2011     node_vol_should = {}
2012     instance.MapLVsByNode(node_vol_should)
2013
2014     cluster = self.cfg.GetClusterInfo()
2015     ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2016                                                             self.group_info)
2017     err = ComputeIPolicyInstanceViolation(ipolicy, instance, self.cfg)
2018     self._ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance.name,
2019                   utils.CommaJoin(err), code=self.ETYPE_WARNING)
2020
2021     for node_uuid in node_vol_should:
2022       n_img = node_image[node_uuid]
2023       if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
2024         # ignore missing volumes on offline or broken nodes
2025         continue
2026       for volume in node_vol_should[node_uuid]:
2027         test = volume not in n_img.volumes
2028         self._ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance.name,
2029                       "volume %s missing on node %s", volume,
2030                       self.cfg.GetNodeName(node_uuid))
2031
2032     if instance.admin_state == constants.ADMINST_UP:
2033       test = instance.uuid not in pnode_img.instances and not pnode_img.offline
2034       self._ErrorIf(test, constants.CV_EINSTANCEDOWN, instance.name,
2035                     "instance not running on its primary node %s",
2036                      self.cfg.GetNodeName(pnode_uuid))
2037       self._ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE,
2038                     instance.name, "instance is marked as running and lives on"
2039                     " offline node %s", self.cfg.GetNodeName(pnode_uuid))
2040
2041     diskdata = [(nname, success, status, idx)
2042                 for (nname, disks) in diskstatus.items()
2043                 for idx, (success, status) in enumerate(disks)]
2044
2045     for nname, success, bdev_status, idx in diskdata:
2046       # the 'ghost node' construction in Exec() ensures that we have a
2047       # node here
2048       snode = node_image[nname]
2049       bad_snode = snode.ghost or snode.offline
2050       self._ErrorIf(instance.disks_active and
2051                     not success and not bad_snode,
2052                     constants.CV_EINSTANCEFAULTYDISK, instance.name,
2053                     "couldn't retrieve status for disk/%s on %s: %s",
2054                     idx, self.cfg.GetNodeName(nname), bdev_status)
2055
2056       if instance.disks_active and success and \
2057          (bdev_status.is_degraded or
2058           bdev_status.ldisk_status != constants.LDS_OKAY):
2059         msg = "disk/%s on %s" % (idx, self.cfg.GetNodeName(nname))
2060         if bdev_status.is_degraded:
2061           msg += " is degraded"
2062         if bdev_status.ldisk_status != constants.LDS_OKAY:
2063           msg += "; state is '%s'" % \
2064                  constants.LDS_NAMES[bdev_status.ldisk_status]
2065
2066         self._Error(constants.CV_EINSTANCEFAULTYDISK, instance.name, msg)
2067
2068     self._ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2069                   constants.CV_ENODERPC, self.cfg.GetNodeName(pnode_uuid),
2070                   "instance %s, connection to primary node failed",
2071                   instance.name)
2072
2073     self._ErrorIf(len(instance.secondary_nodes) > 1,
2074                   constants.CV_EINSTANCELAYOUT, instance.name,
2075                   "instance has multiple secondary nodes: %s",
2076                   utils.CommaJoin(instance.secondary_nodes),
2077                   code=self.ETYPE_WARNING)
2078
2079     es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, instance.all_nodes)
2080     if any(es_flags.values()):
2081       if instance.disk_template not in constants.DTS_EXCL_STORAGE:
2082         # Disk template not compatible with exclusive_storage: no instance
2083         # node should have the flag set
2084         es_nodes = [n
2085                     for (n, es) in es_flags.items()
2086                     if es]
2087         self._Error(constants.CV_EINSTANCEUNSUITABLENODE, instance.name,
2088                     "instance has template %s, which is not supported on nodes"
2089                     " that have exclusive storage set: %s",
2090                     instance.disk_template,
2091                     utils.CommaJoin(self.cfg.GetNodeNames(es_nodes)))
2092       for (idx, disk) in enumerate(instance.disks):
2093         self._ErrorIf(disk.spindles is None,
2094                       constants.CV_EINSTANCEMISSINGCFGPARAMETER, instance.name,
2095                       "number of spindles not configured for disk %s while"
2096                       " exclusive storage is enabled, try running"
2097                       " gnt-cluster repair-disk-sizes", idx)
2098
2099     if instance.disk_template in constants.DTS_INT_MIRROR:
2100       instance_nodes = utils.NiceSort(instance.all_nodes)
2101       instance_groups = {}
2102
2103       for node_uuid in instance_nodes:
2104         instance_groups.setdefault(self.all_node_info[node_uuid].group,
2105                                    []).append(node_uuid)
2106
2107       pretty_list = [
2108         "%s (group %s)" % (utils.CommaJoin(self.cfg.GetNodeNames(nodes)),
2109                            groupinfo[group].name)
2110         # Sort so that we always list the primary node first.
2111         for group, nodes in sorted(instance_groups.items(),
2112                                    key=lambda (_, nodes): pnode_uuid in nodes,
2113                                    reverse=True)]
2114
2115       self._ErrorIf(len(instance_groups) > 1,
2116                     constants.CV_EINSTANCESPLITGROUPS,
2117                     instance.name, "instance has primary and secondary nodes in"
2118                     " different groups: %s", utils.CommaJoin(pretty_list),
2119                     code=self.ETYPE_WARNING)
2120
2121     inst_nodes_offline = []
2122     for snode in instance.secondary_nodes:
2123       s_img = node_image[snode]
2124       self._ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
2125                     self.cfg.GetNodeName(snode),
2126                     "instance %s, connection to secondary node failed",
2127                     instance.name)
2128
2129       if s_img.offline:
2130         inst_nodes_offline.append(snode)
2131
2132     # warn that the instance lives on offline nodes
2133     self._ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE,
2134                   instance.name, "instance has offline secondary node(s) %s",
2135                   utils.CommaJoin(self.cfg.GetNodeNames(inst_nodes_offline)))
2136     # ... or ghost/non-vm_capable nodes
2137     for node_uuid in instance.all_nodes:
2138       self._ErrorIf(node_image[node_uuid].ghost, constants.CV_EINSTANCEBADNODE,
2139                     instance.name, "instance lives on ghost node %s",
2140                     self.cfg.GetNodeName(node_uuid))
2141       self._ErrorIf(not node_image[node_uuid].vm_capable,
2142                     constants.CV_EINSTANCEBADNODE, instance.name,
2143                     "instance lives on non-vm_capable node %s",
2144                     self.cfg.GetNodeName(node_uuid))
2145
2146   def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2147     """Verify if there are any unknown volumes in the cluster.
2148
2149     The .os, .swap and backup volumes are ignored. All other volumes are
2150     reported as unknown.
2151
2152     @type reserved: L{ganeti.utils.FieldSet}
2153     @param reserved: a FieldSet of reserved volume names
2154
2155     """
2156     for node_uuid, n_img in node_image.items():
2157       if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
2158           self.all_node_info[node_uuid].group != self.group_uuid):
2159         # skip non-healthy nodes
2160         continue
2161       for volume in n_img.volumes:
2162         test = ((node_uuid not in node_vol_should or
2163                 volume not in node_vol_should[node_uuid]) and
2164                 not reserved.Matches(volume))
2165         self._ErrorIf(test, constants.CV_ENODEORPHANLV,
2166                       self.cfg.GetNodeName(node_uuid),
2167                       "volume %s is unknown", volume)
2168
2169   def _VerifyNPlusOneMemory(self, node_image, all_insts):
2170     """Verify N+1 Memory Resilience.
2171
2172     Check that if one single node dies we can still start all the
2173     instances it was primary for.
2174
2175     """
2176     cluster_info = self.cfg.GetClusterInfo()
2177     for node_uuid, n_img in node_image.items():
2178       # This code checks that every node which is now listed as
2179       # secondary has enough memory to host all instances it is
2180       # supposed to should a single other node in the cluster fail.
2181       # FIXME: not ready for failover to an arbitrary node
2182       # FIXME: does not support file-backed instances
2183       # WARNING: we currently take into account down instances as well
2184       # as up ones, considering that even if they're down someone
2185       # might want to start them even in the event of a node failure.
2186       if n_img.offline or \
2187          self.all_node_info[node_uuid].group != self.group_uuid:
2188         # we're skipping nodes marked offline and nodes in other groups from
2189         # the N+1 warning, since most likely we don't have good memory
2190         # information from them; we already list instances living on such
2191         # nodes, and that's enough warning
2192         continue
2193       #TODO(dynmem): also consider ballooning out other instances
2194       for prinode, inst_uuids in n_img.sbp.items():
2195         needed_mem = 0
2196         for inst_uuid in inst_uuids:
2197           bep = cluster_info.FillBE(all_insts[inst_uuid])
2198           if bep[constants.BE_AUTO_BALANCE]:
2199             needed_mem += bep[constants.BE_MINMEM]
2200         test = n_img.mfree < needed_mem
2201         self._ErrorIf(test, constants.CV_ENODEN1,
2202                       self.cfg.GetNodeName(node_uuid),
2203                       "not enough memory to accomodate instance failovers"
2204                       " should node %s fail (%dMiB needed, %dMiB available)",
2205                       self.cfg.GetNodeName(prinode), needed_mem, n_img.mfree)
2206
2207   def _VerifyFiles(self, nodes, master_node_uuid, all_nvinfo,
2208                    (files_all, files_opt, files_mc, files_vm)):
2209     """Verifies file checksums collected from all nodes.
2210
2211     @param nodes: List of L{objects.Node} objects
2212     @param master_node_uuid: UUID of master node
2213     @param all_nvinfo: RPC results
2214
2215     """
2216     # Define functions determining which nodes to consider for a file
2217     files2nodefn = [
2218       (files_all, None),
2219       (files_mc, lambda node: (node.master_candidate or
2220                                node.uuid == master_node_uuid)),
2221       (files_vm, lambda node: node.vm_capable),
2222       ]
2223
2224     # Build mapping from filename to list of nodes which should have the file
2225     nodefiles = {}
2226     for (files, fn) in files2nodefn:
2227       if fn is None:
2228         filenodes = nodes
2229       else:
2230         filenodes = filter(fn, nodes)
2231       nodefiles.update((filename,
2232                         frozenset(map(operator.attrgetter("uuid"), filenodes)))
2233                        for filename in files)
2234
2235     assert set(nodefiles) == (files_all | files_mc | files_vm)
2236
2237     fileinfo = dict((filename, {}) for filename in nodefiles)
2238     ignore_nodes = set()
2239
2240     for node in nodes:
2241       if node.offline:
2242         ignore_nodes.add(node.uuid)
2243         continue
2244
2245       nresult = all_nvinfo[node.uuid]
2246
2247       if nresult.fail_msg or not nresult.payload:
2248         node_files = None
2249       else:
2250         fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
2251         node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2252                           for (key, value) in fingerprints.items())
2253         del fingerprints
2254
2255       test = not (node_files and isinstance(node_files, dict))
2256       self._ErrorIf(test, constants.CV_ENODEFILECHECK, node.name,
2257                     "Node did not return file checksum data")
2258       if test:
2259         ignore_nodes.add(node.uuid)
2260         continue
2261
2262       # Build per-checksum mapping from filename to nodes having it
2263       for (filename, checksum) in node_files.items():
2264         assert filename in nodefiles
2265         fileinfo[filename].setdefault(checksum, set()).add(node.uuid)
2266
2267     for (filename, checksums) in fileinfo.items():
2268       assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2269
2270       # Nodes having the file
2271       with_file = frozenset(node_uuid
2272                             for node_uuids in fileinfo[filename].values()
2273                             for node_uuid in node_uuids) - ignore_nodes
2274
2275       expected_nodes = nodefiles[filename] - ignore_nodes
2276
2277       # Nodes missing file
2278       missing_file = expected_nodes - with_file
2279
2280       if filename in files_opt:
2281         # All or no nodes
2282         self._ErrorIf(missing_file and missing_file != expected_nodes,
2283                       constants.CV_ECLUSTERFILECHECK, None,
2284                       "File %s is optional, but it must exist on all or no"
2285                       " nodes (not found on %s)",
2286                       filename,
2287                       utils.CommaJoin(
2288                         utils.NiceSort(
2289                           map(self.cfg.GetNodeName, missing_file))))
2290       else:
2291         self._ErrorIf(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2292                       "File %s is missing from node(s) %s", filename,
2293                       utils.CommaJoin(
2294                         utils.NiceSort(
2295                           map(self.cfg.GetNodeName, missing_file))))
2296
2297         # Warn if a node has a file it shouldn't
2298         unexpected = with_file - expected_nodes
2299         self._ErrorIf(unexpected,
2300                       constants.CV_ECLUSTERFILECHECK, None,
2301                       "File %s should not exist on node(s) %s",
2302                       filename, utils.CommaJoin(
2303                         utils.NiceSort(map(self.cfg.GetNodeName, unexpected))))
2304
2305       # See if there are multiple versions of the file
2306       test = len(checksums) > 1
2307       if test:
2308         variants = ["variant %s on %s" %
2309                     (idx + 1,
2310                      utils.CommaJoin(utils.NiceSort(
2311                        map(self.cfg.GetNodeName, node_uuids))))
2312                     for (idx, (checksum, node_uuids)) in
2313                       enumerate(sorted(checksums.items()))]
2314       else:
2315         variants = []
2316
2317       self._ErrorIf(test, constants.CV_ECLUSTERFILECHECK, None,
2318                     "File %s found with %s different checksums (%s)",
2319                     filename, len(checksums), "; ".join(variants))
2320
2321   def _VerifyNodeDrbdHelper(self, ninfo, nresult, drbd_helper):
2322     """Verify the drbd helper.
2323
2324     """
2325     if drbd_helper:
2326       helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2327       test = (helper_result is None)
2328       self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2329                     "no drbd usermode helper returned")
2330       if helper_result:
2331         status, payload = helper_result
2332         test = not status
2333         self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2334                       "drbd usermode helper check unsuccessful: %s", payload)
2335         test = status and (payload != drbd_helper)
2336         self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2337                       "wrong drbd usermode helper: %s", payload)
2338
2339   def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2340                       drbd_map):
2341     """Verifies and the node DRBD status.
2342
2343     @type ninfo: L{objects.Node}
2344     @param ninfo: the node to check
2345     @param nresult: the remote results for the node
2346     @param instanceinfo: the dict of instances
2347     @param drbd_helper: the configured DRBD usermode helper
2348     @param drbd_map: the DRBD map as returned by
2349         L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2350
2351     """
2352     self._VerifyNodeDrbdHelper(ninfo, nresult, drbd_helper)
2353
2354     # compute the DRBD minors
2355     node_drbd = {}
2356     for minor, inst_uuid in drbd_map[ninfo.uuid].items():
2357       test = inst_uuid not in instanceinfo
2358       self._ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2359                     "ghost instance '%s' in temporary DRBD map", inst_uuid)
2360         # ghost instance should not be running, but otherwise we
2361         # don't give double warnings (both ghost instance and
2362         # unallocated minor in use)
2363       if test:
2364         node_drbd[minor] = (inst_uuid, False)
2365       else:
2366         instance = instanceinfo[inst_uuid]
2367         node_drbd[minor] = (inst_uuid, instance.disks_active)
2368
2369     # and now check them
2370     used_minors = nresult.get(constants.NV_DRBDLIST, [])
2371     test = not isinstance(used_minors, (tuple, list))
2372     self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2373                   "cannot parse drbd status file: %s", str(used_minors))
2374     if test:
2375       # we cannot check drbd status
2376       return
2377
2378     for minor, (inst_uuid, must_exist) in node_drbd.items():
2379       test = minor not in used_minors and must_exist
2380       self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2381                     "drbd minor %d of instance %s is not active", minor,
2382                     self.cfg.GetInstanceName(inst_uuid))
2383     for minor in used_minors:
2384       test = minor not in node_drbd
2385       self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2386                     "unallocated drbd minor %d is in use", minor)
2387
2388   def _UpdateNodeOS(self, ninfo, nresult, nimg):
2389     """Builds the node OS structures.
2390
2391     @type ninfo: L{objects.Node}
2392     @param ninfo: the node to check
2393     @param nresult: the remote results for the node
2394     @param nimg: the node image object
2395
2396     """
2397     remote_os = nresult.get(constants.NV_OSLIST, None)
2398     test = (not isinstance(remote_os, list) or
2399             not compat.all(isinstance(v, list) and len(v) == 7
2400                            for v in remote_os))
2401
2402     self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2403                   "node hasn't returned valid OS data")
2404
2405     nimg.os_fail = test
2406
2407     if test:
2408       return
2409
2410     os_dict = {}
2411
2412     for (name, os_path, status, diagnose,
2413          variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2414
2415       if name not in os_dict:
2416         os_dict[name] = []
2417
2418       # parameters is a list of lists instead of list of tuples due to
2419       # JSON lacking a real tuple type, fix it:
2420       parameters = [tuple(v) for v in parameters]
2421       os_dict[name].append((os_path, status, diagnose,
2422                             set(variants), set(parameters), set(api_ver)))
2423
2424     nimg.oslist = os_dict
2425
2426   def _VerifyNodeOS(self, ninfo, nimg, base):
2427     """Verifies the node OS list.
2428
2429     @type ninfo: L{objects.Node}
2430     @param ninfo: the node to check
2431     @param nimg: the node image object
2432     @param base: the 'template' node we match against (e.g. from the master)
2433
2434     """
2435     assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2436
2437     beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2438     for os_name, os_data in nimg.oslist.items():
2439       assert os_data, "Empty OS status for OS %s?!" % os_name
2440       f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2441       self._ErrorIf(not f_status, constants.CV_ENODEOS, ninfo.name,
2442                     "Invalid OS %s (located at %s): %s",
2443                     os_name, f_path, f_diag)
2444       self._ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, ninfo.name,
2445                     "OS '%s' has multiple entries"
2446                     " (first one shadows the rest): %s",
2447                     os_name, utils.CommaJoin([v[0] for v in os_data]))
2448       # comparisons with the 'base' image
2449       test = os_name not in base.oslist
2450       self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2451                     "Extra OS %s not present on reference node (%s)",
2452                     os_name, self.cfg.GetNodeName(base.uuid))
2453       if test:
2454         continue
2455       assert base.oslist[os_name], "Base node has empty OS status?"
2456       _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2457       if not b_status:
2458         # base OS is invalid, skipping
2459         continue
2460       for kind, a, b in [("API version", f_api, b_api),
2461                          ("variants list", f_var, b_var),
2462                          ("parameters", beautify_params(f_param),
2463                           beautify_params(b_param))]:
2464         self._ErrorIf(a != b, constants.CV_ENODEOS, ninfo.name,
2465                       "OS %s for %s differs from reference node %s:"
2466                       " [%s] vs. [%s]", kind, os_name,
2467                       self.cfg.GetNodeName(base.uuid),
2468                       utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2469
2470     # check any missing OSes
2471     missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2472     self._ErrorIf(missing, constants.CV_ENODEOS, ninfo.name,
2473                   "OSes present on reference node %s"
2474                   " but missing on this node: %s",
2475                   self.cfg.GetNodeName(base.uuid), utils.CommaJoin(missing))
2476
2477   def _VerifyAcceptedFileStoragePaths(self, ninfo, nresult, is_master):
2478     """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2479
2480     @type ninfo: L{objects.Node}
2481     @param ninfo: the node to check
2482     @param nresult: the remote results for the node
2483     @type is_master: bool
2484     @param is_master: Whether node is the master node
2485
2486     """
2487     cluster = self.cfg.GetClusterInfo()
2488     if (is_master and
2489         (cluster.IsFileStorageEnabled() or
2490          cluster.IsSharedFileStorageEnabled())):
2491       try:
2492         fspaths = nresult[constants.NV_ACCEPTED_STORAGE_PATHS]
2493       except KeyError:
2494         # This should never happen
2495         self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2496                       "Node did not return forbidden file storage paths")
2497       else:
2498         self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2499                       "Found forbidden file storage paths: %s",
2500                       utils.CommaJoin(fspaths))
2501     else:
2502       self._ErrorIf(constants.NV_ACCEPTED_STORAGE_PATHS in nresult,
2503                     constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2504                     "Node should not have returned forbidden file storage"
2505                     " paths")
2506
2507   def _VerifyStoragePaths(self, ninfo, nresult, file_disk_template,
2508                           verify_key, error_key):
2509     """Verifies (file) storage paths.
2510
2511     @type ninfo: L{objects.Node}
2512     @param ninfo: the node to check
2513     @param nresult: the remote results for the node
2514     @type file_disk_template: string
2515     @param file_disk_template: file-based disk template, whose directory
2516         is supposed to be verified
2517     @type verify_key: string
2518     @param verify_key: key for the verification map of this file
2519         verification step
2520     @param error_key: error key to be added to the verification results
2521         in case something goes wrong in this verification step
2522
2523     """
2524     assert (file_disk_template in
2525             utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
2526     cluster = self.cfg.GetClusterInfo()
2527     if cluster.IsDiskTemplateEnabled(file_disk_template):
2528       self._ErrorIf(
2529           verify_key in nresult,
2530           error_key, ninfo.name,
2531           "The configured %s storage path is unusable: %s" %
2532           (file_disk_template, nresult.get(verify_key)))
2533
2534   def _VerifyFileStoragePaths(self, ninfo, nresult):
2535     """Verifies (file) storage paths.
2536
2537     @see: C{_VerifyStoragePaths}
2538
2539     """
2540     self._VerifyStoragePaths(
2541         ninfo, nresult, constants.DT_FILE,
2542         constants.NV_FILE_STORAGE_PATH,
2543         constants.CV_ENODEFILESTORAGEPATHUNUSABLE)
2544
2545   def _VerifySharedFileStoragePaths(self, ninfo, nresult):
2546     """Verifies (file) storage paths.
2547
2548     @see: C{_VerifyStoragePaths}
2549
2550     """
2551     self._VerifyStoragePaths(
2552         ninfo, nresult, constants.DT_SHARED_FILE,
2553         constants.NV_SHARED_FILE_STORAGE_PATH,
2554         constants.CV_ENODESHAREDFILESTORAGEPATHUNUSABLE)
2555
2556   def _VerifyOob(self, ninfo, nresult):
2557     """Verifies out of band functionality of a node.
2558
2559     @type ninfo: L{objects.Node}
2560     @param ninfo: the node to check
2561     @param nresult: the remote results for the node
2562
2563     """
2564     # We just have to verify the paths on master and/or master candidates
2565     # as the oob helper is invoked on the master
2566     if ((ninfo.master_candidate or ninfo.master_capable) and
2567         constants.NV_OOB_PATHS in nresult):
2568       for path_result in nresult[constants.NV_OOB_PATHS]:
2569         self._ErrorIf(path_result, constants.CV_ENODEOOBPATH,
2570                       ninfo.name, path_result)
2571
2572   def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2573     """Verifies and updates the node volume data.
2574
2575     This function will update a L{NodeImage}'s internal structures
2576     with data from the remote call.
2577
2578     @type ninfo: L{objects.Node}
2579     @param ninfo: the node to check
2580     @param nresult: the remote results for the node
2581     @param nimg: the node image object
2582     @param vg_name: the configured VG name
2583
2584     """
2585     nimg.lvm_fail = True
2586     lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2587     if vg_name is None:
2588       pass
2589     elif isinstance(lvdata, basestring):
2590       self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2591                     "LVM problem on node: %s", utils.SafeEncode(lvdata))
2592     elif not isinstance(lvdata, dict):
2593       self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2594                     "rpc call to node failed (lvlist)")
2595     else:
2596       nimg.volumes = lvdata
2597       nimg.lvm_fail = False
2598
2599   def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2600     """Verifies and updates the node instance list.
2601
2602     If the listing was successful, then updates this node's instance
2603     list. Otherwise, it marks the RPC call as failed for the instance
2604     list key.
2605
2606     @type ninfo: L{objects.Node}
2607     @param ninfo: the node to check
2608     @param nresult: the remote results for the node
2609     @param nimg: the node image object
2610
2611     """
2612     idata = nresult.get(constants.NV_INSTANCELIST, None)
2613     test = not isinstance(idata, list)
2614     self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2615                   "rpc call to node failed (instancelist): %s",
2616                   utils.SafeEncode(str(idata)))
2617     if test:
2618       nimg.hyp_fail = True
2619     else:
2620       nimg.instances = [inst.uuid for (_, inst) in
2621                         self.cfg.GetMultiInstanceInfoByName(idata)]
2622
2623   def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2624     """Verifies and computes a node information map
2625
2626     @type ninfo: L{objects.Node}
2627     @param ninfo: the node to check
2628     @param nresult: the remote results for the node
2629     @param nimg: the node image object
2630     @param vg_name: the configured VG name
2631
2632     """
2633     # try to read free memory (from the hypervisor)
2634     hv_info = nresult.get(constants.NV_HVINFO, None)
2635     test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2636     self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2637                   "rpc call to node failed (hvinfo)")
2638     if not test:
2639       try:
2640         nimg.mfree = int(hv_info["memory_free"])
2641       except (ValueError, TypeError):
2642         self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2643                       "node returned invalid nodeinfo, check hypervisor")
2644
2645     # FIXME: devise a free space model for file based instances as well
2646     if vg_name is not None:
2647       test = (constants.NV_VGLIST not in nresult or
2648               vg_name not in nresult[constants.NV_VGLIST])
2649       self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
2650                     "node didn't return data for the volume group '%s'"
2651                     " - it is either missing or broken", vg_name)
2652       if not test:
2653         try:
2654           nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2655         except (ValueError, TypeError):
2656           self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2657                         "node returned invalid LVM info, check LVM status")
2658
2659   def _CollectDiskInfo(self, node_uuids, node_image, instanceinfo):
2660     """Gets per-disk status information for all instances.
2661
2662     @type node_uuids: list of strings
2663     @param node_uuids: Node UUIDs
2664     @type node_image: dict of (UUID, L{objects.Node})
2665     @param node_image: Node objects
2666     @type instanceinfo: dict of (UUID, L{objects.Instance})
2667     @param instanceinfo: Instance objects
2668     @rtype: {instance: {node: [(succes, payload)]}}
2669     @return: a dictionary of per-instance dictionaries with nodes as
2670         keys and disk information as values; the disk information is a
2671         list of tuples (success, payload)
2672
2673     """
2674     node_disks = {}
2675     node_disks_dev_inst_only = {}
2676     diskless_instances = set()
2677     diskless = constants.DT_DISKLESS
2678
2679     for nuuid in node_uuids:
2680       node_inst_uuids = list(itertools.chain(node_image[nuuid].pinst,
2681                                              node_image[nuuid].sinst))
2682       diskless_instances.update(uuid for uuid in node_inst_uuids
2683                                 if instanceinfo[uuid].disk_template == diskless)
2684       disks = [(inst_uuid, disk)
2685                for inst_uuid in node_inst_uuids
2686                for disk in instanceinfo[inst_uuid].disks]
2687
2688       if not disks:
2689         # No need to collect data
2690         continue
2691
2692       node_disks[nuuid] = disks
2693
2694       # _AnnotateDiskParams makes already copies of the disks
2695       dev_inst_only = []
2696       for (inst_uuid, dev) in disks:
2697         (anno_disk,) = AnnotateDiskParams(instanceinfo[inst_uuid], [dev],
2698                                           self.cfg)
2699         dev_inst_only.append((anno_disk, instanceinfo[inst_uuid]))
2700
2701       node_disks_dev_inst_only[nuuid] = dev_inst_only
2702
2703     assert len(node_disks) == len(node_disks_dev_inst_only)
2704
2705     # Collect data from all nodes with disks
2706     result = self.rpc.call_blockdev_getmirrorstatus_multi(
2707                node_disks.keys(), node_disks_dev_inst_only)
2708
2709     assert len(result) == len(node_disks)
2710
2711     instdisk = {}
2712
2713     for (nuuid, nres) in result.items():
2714       node = self.cfg.GetNodeInfo(nuuid)
2715       disks = node_disks[node.uuid]
2716
2717       if nres.offline:
2718         # No data from this node
2719         data = len(disks) * [(False, "node offline")]
2720       else:
2721         msg = nres.fail_msg
2722         self._ErrorIf(msg, constants.CV_ENODERPC, node.name,
2723                       "while getting disk information: %s", msg)
2724         if msg:
2725           # No data from this node
2726           data = len(disks) * [(False, msg)]
2727         else:
2728           data = []
2729           for idx, i in enumerate(nres.payload):
2730             if isinstance(i, (tuple, list)) and len(i) == 2:
2731               data.append(i)
2732             else:
2733               logging.warning("Invalid result from node %s, entry %d: %s",
2734                               node.name, idx, i)
2735               data.append((False, "Invalid result from the remote node"))
2736
2737       for ((inst_uuid, _), status) in zip(disks, data):
2738         instdisk.setdefault(inst_uuid, {}).setdefault(node.uuid, []) \
2739           .append(status)
2740
2741     # Add empty entries for diskless instances.
2742     for inst_uuid in diskless_instances:
2743       assert inst_uuid not in instdisk
2744       instdisk[inst_uuid] = {}
2745
2746     assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2747                       len(nuuids) <= len(instanceinfo[inst].all_nodes) and
2748                       compat.all(isinstance(s, (tuple, list)) and
2749                                  len(s) == 2 for s in statuses)
2750                       for inst, nuuids in instdisk.items()
2751                       for nuuid, statuses in nuuids.items())
2752     if __debug__:
2753       instdisk_keys = set(instdisk)
2754       instanceinfo_keys = set(instanceinfo)
2755       assert instdisk_keys == instanceinfo_keys, \
2756         ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2757          (instdisk_keys, instanceinfo_keys))
2758
2759     return instdisk
2760
2761   @staticmethod
2762   def _SshNodeSelector(group_uuid, all_nodes):
2763     """Create endless iterators for all potential SSH check hosts.
2764
2765     """
2766     nodes = [node for node in all_nodes
2767              if (node.group != group_uuid and
2768                  not node.offline)]
2769     keyfunc = operator.attrgetter("group")
2770
2771     return map(itertools.cycle,
2772                [sorted(map(operator.attrgetter("name"), names))
2773                 for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2774                                                   keyfunc)])
2775
2776   @classmethod
2777   def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2778     """Choose which nodes should talk to which other nodes.
2779
2780     We will make nodes contact all nodes in their group, and one node from
2781     every other group.
2782
2783     @warning: This algorithm has a known issue if one node group is much
2784       smaller than others (e.g. just one node). In such a case all other
2785       nodes will talk to the single node.
2786
2787     """
2788     online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2789     sel = cls._SshNodeSelector(group_uuid, all_nodes)
2790
2791     return (online_nodes,
2792             dict((name, sorted([i.next() for i in sel]))
2793                  for name in online_nodes))
2794
2795   def BuildHooksEnv(self):
2796     """Build hooks env.
2797
2798     Cluster-Verify hooks just ran in the post phase and their failure makes
2799     the output be logged in the verify output and the verification to fail.
2800
2801     """
2802     env = {
2803       "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2804       }
2805
2806     env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2807                for node in self.my_node_info.values())
2808
2809     return env
2810
2811   def BuildHooksNodes(self):
2812     """Build hooks nodes.
2813
2814     """
2815     return ([], list(self.my_node_info.keys()))
2816
2817   def Exec(self, feedback_fn):
2818     """Verify integrity of the node group, performing various test on nodes.
2819
2820     """
2821     # This method has too many local variables. pylint: disable=R0914
2822     feedback_fn("* Verifying group '%s'" % self.group_info.name)
2823
2824     if not self.my_node_uuids:
2825       # empty node group
2826       feedback_fn("* Empty node group, skipping verification")
2827       return True
2828
2829     self.bad = False
2830     verbose = self.op.verbose
2831     self._feedback_fn = feedback_fn
2832
2833     vg_name = self.cfg.GetVGName()
2834     drbd_helper = self.cfg.GetDRBDHelper()
2835     cluster = self.cfg.GetClusterInfo()
2836     hypervisors = cluster.enabled_hypervisors
2837     node_data_list = self.my_node_info.values()
2838
2839     i_non_redundant = [] # Non redundant instances
2840     i_non_a_balanced = [] # Non auto-balanced instances
2841     i_offline = 0 # Count of offline instances
2842     n_offline = 0 # Count of offline nodes
2843     n_drained = 0 # Count of nodes being drained
2844     node_vol_should = {}
2845
2846     # FIXME: verify OS list
2847
2848     # File verification
2849     filemap = ComputeAncillaryFiles(cluster, False)
2850
2851     # do local checksums
2852     master_node_uuid = self.master_node = self.cfg.GetMasterNode()
2853     master_ip = self.cfg.GetMasterIP()
2854
2855     feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_uuids))
2856
2857     user_scripts = []
2858     if self.cfg.GetUseExternalMipScript():
2859       user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2860
2861     node_verify_param = {
2862       constants.NV_FILELIST:
2863         map(vcluster.MakeVirtualPath,
2864             utils.UniqueSequence(filename
2865                                  for files in filemap
2866                                  for filename in files)),
2867       constants.NV_NODELIST:
2868         self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2869                                   self.all_node_info.values()),
2870       constants.NV_HYPERVISOR: hypervisors,
2871       constants.NV_HVPARAMS:
2872         _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2873       constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2874                                  for node in node_data_list
2875                                  if not node.offline],
2876       constants.NV_INSTANCELIST: hypervisors,
2877       constants.NV_VERSION: None,
2878       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2879       constants.NV_NODESETUP: None,
2880       constants.NV_TIME: None,
2881       constants.NV_MASTERIP: (self.cfg.GetMasterNodeName(), master_ip),
2882       constants.NV_OSLIST: None,
2883       constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2884       constants.NV_USERSCRIPTS: user_scripts,
2885       }
2886
2887     if vg_name is not None:
2888       node_verify_param[constants.NV_VGLIST] = None
2889       node_verify_param[constants.NV_LVLIST] = vg_name
2890       node_verify_param[constants.NV_PVLIST] = [vg_name]
2891
2892     if cluster.IsDiskTemplateEnabled(constants.DT_DRBD8):
2893       if drbd_helper:
2894         node_verify_param[constants.NV_DRBDVERSION] = None
2895         node_verify_param[constants.NV_DRBDLIST] = None
2896         node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2897
2898     if cluster.IsFileStorageEnabled() or \
2899         cluster.IsSharedFileStorageEnabled():
2900       # Load file storage paths only from master node
2901       node_verify_param[constants.NV_ACCEPTED_STORAGE_PATHS] = \
2902         self.cfg.GetMasterNodeName()
2903       if cluster.IsFileStorageEnabled():
2904         node_verify_param[constants.NV_FILE_STORAGE_PATH] = \
2905           cluster.file_storage_dir
2906
2907     # bridge checks
2908     # FIXME: this needs to be changed per node-group, not cluster-wide
2909     bridges = set()
2910     default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2911     if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2912       bridges.add(default_nicpp[constants.NIC_LINK])
2913     for inst_uuid in self.my_inst_info.values():
2914       for nic in inst_uuid.nics:
2915         full_nic = cluster.SimpleFillNIC(nic.nicparams)
2916         if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2917           bridges.add(full_nic[constants.NIC_LINK])
2918
2919     if bridges:
2920       node_verify_param[constants.NV_BRIDGES] = list(bridges)
2921
2922     # Build our expected cluster state
2923     node_image = dict((node.uuid, self.NodeImage(offline=node.offline,
2924                                                  uuid=node.uuid,
2925                                                  vm_capable=node.vm_capable))
2926                       for node in node_data_list)
2927
2928     # Gather OOB paths
2929     oob_paths = []
2930     for node in self.all_node_info.values():
2931       path = SupportsOob(self.cfg, node)
2932       if path and path not in oob_paths:
2933         oob_paths.append(path)
2934
2935     if oob_paths:
2936       node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2937
2938     for inst_uuid in self.my_inst_uuids:
2939       instance = self.my_inst_info[inst_uuid]
2940       if instance.admin_state == constants.ADMINST_OFFLINE:
2941         i_offline += 1
2942
2943       for nuuid in instance.all_nodes:
2944         if nuuid not in node_image:
2945           gnode = self.NodeImage(uuid=nuuid)
2946           gnode.ghost = (nuuid not in self.all_node_info)
2947           node_image[nuuid] = gnode
2948
2949       instance.MapLVsByNode(node_vol_should)
2950
2951       pnode = instance.primary_node
2952       node_image[pnode].pinst.append(instance.uuid)
2953
2954       for snode in instance.secondary_nodes:
2955         nimg = node_image[snode]
2956         nimg.sinst.append(instance.uuid)
2957         if pnode not in nimg.sbp:
2958           nimg.sbp[pnode] = []
2959         nimg.sbp[pnode].append(instance.uuid)
2960
2961     es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
2962                                                self.my_node_info.keys())
2963     # The value of exclusive_storage should be the same across the group, so if
2964     # it's True for at least a node, we act as if it were set for all the nodes
2965     self._exclusive_storage = compat.any(es_flags.values())
2966     if self._exclusive_storage:
2967       node_verify_param[constants.NV_EXCLUSIVEPVS] = True
2968
2969     # At this point, we have the in-memory data structures complete,
2970     # except for the runtime information, which we'll gather next
2971
2972     # Due to the way our RPC system works, exact response times cannot be
2973     # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2974     # time before and after executing the request, we can at least have a time
2975     # window.
2976     nvinfo_starttime = time.time()
2977     all_nvinfo = self.rpc.call_node_verify(self.my_node_uuids,
2978                                            node_verify_param,
2979                                            self.cfg.GetClusterName(),
2980                                            self.cfg.GetClusterInfo().hvparams)
2981     nvinfo_endtime = time.time()
2982
2983     if self.extra_lv_nodes and vg_name is not None:
2984       extra_lv_nvinfo = \
2985           self.rpc.call_node_verify(self.extra_lv_nodes,
2986                                     {constants.NV_LVLIST: vg_name},
2987                                     self.cfg.GetClusterName(),
2988                                     self.cfg.GetClusterInfo().hvparams)
2989     else:
2990       extra_lv_nvinfo = {}
2991
2992     all_drbd_map = self.cfg.ComputeDRBDMap()
2993
2994     feedback_fn("* Gathering disk information (%s nodes)" %
2995                 len(self.my_node_uuids))
2996     instdisk = self._CollectDiskInfo(self.my_node_info.keys(), node_image,
2997                                      self.my_inst_info)
2998
2999     feedback_fn("* Verifying configuration file consistency")
3000
3001     # If not all nodes are being checked, we need to make sure the master node
3002     # and a non-checked vm_capable node are in the list.
3003     absent_node_uuids = set(self.all_node_info).difference(self.my_node_info)
3004     if absent_node_uuids:
3005       vf_nvinfo = all_nvinfo.copy()
3006       vf_node_info = list(self.my_node_info.values())
3007       additional_node_uuids = []
3008       if master_node_uuid not in self.my_node_info:
3009         additional_node_uuids.append(master_node_uuid)
3010         vf_node_info.append(self.all_node_info[master_node_uuid])
3011       # Add the first vm_capable node we find which is not included,
3012       # excluding the master node (which we already have)
3013       for node_uuid in absent_node_uuids:
3014         nodeinfo = self.all_node_info[node_uuid]
3015         if (nodeinfo.vm_capable and not nodeinfo.offline and
3016             node_uuid != master_node_uuid):
3017           additional_node_uuids.append(node_uuid)
3018           vf_node_info.append(self.all_node_info[node_uuid])
3019           break
3020       key = constants.NV_FILELIST
3021       vf_nvinfo.update(self.rpc.call_node_verify(
3022          additional_node_uuids, {key: node_verify_param[key]},
3023          self.cfg.GetClusterName(), self.cfg.GetClusterInfo().hvparams))
3024     else:
3025       vf_nvinfo = all_nvinfo
3026       vf_node_info = self.my_node_info.values()
3027
3028     self._VerifyFiles(vf_node_info, master_node_uuid, vf_nvinfo, filemap)
3029
3030     feedback_fn("* Verifying node status")
3031
3032     refos_img = None
3033
3034     for node_i in node_data_list:
3035       nimg = node_image[node_i.uuid]
3036
3037       if node_i.offline:
3038         if verbose:
3039           feedback_fn("* Skipping offline node %s" % (node_i.name,))
3040         n_offline += 1
3041         continue
3042
3043       if node_i.uuid == master_node_uuid:
3044         ntype = "master"
3045       elif node_i.master_candidate:
3046         ntype = "master candidate"
3047       elif node_i.drained:
3048         ntype = "drained"
3049         n_drained += 1
3050       else:
3051         ntype = "regular"
3052       if verbose:
3053         feedback_fn("* Verifying node %s (%s)" % (node_i.name, ntype))
3054
3055       msg = all_nvinfo[node_i.uuid].fail_msg
3056       self._ErrorIf(msg, constants.CV_ENODERPC, node_i.name,
3057                     "while contacting node: %s", msg)
3058       if msg:
3059         nimg.rpc_fail = True
3060         continue
3061
3062       nresult = all_nvinfo[node_i.uuid].payload
3063
3064       nimg.call_ok = self._VerifyNode(node_i, nresult)
3065       self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
3066       self._VerifyNodeNetwork(node_i, nresult)
3067       self._VerifyNodeUserScripts(node_i, nresult)
3068       self._VerifyOob(node_i, nresult)
3069       self._VerifyAcceptedFileStoragePaths(node_i, nresult,
3070                                            node_i.uuid == master_node_uuid)
3071       self._VerifyFileStoragePaths(node_i, nresult)
3072       self._VerifySharedFileStoragePaths(node_i, nresult)
3073
3074       if nimg.vm_capable:
3075         self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
3076         self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
3077                              all_drbd_map)
3078
3079         self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
3080         self._UpdateNodeInstances(node_i, nresult, nimg)
3081         self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
3082         self._UpdateNodeOS(node_i, nresult, nimg)
3083
3084         if not nimg.os_fail:
3085           if refos_img is None:
3086             refos_img = nimg
3087           self._VerifyNodeOS(node_i, nimg, refos_img)
3088         self._VerifyNodeBridges(node_i, nresult, bridges)
3089
3090         # Check whether all running instances are primary for the node. (This
3091         # can no longer be done from _VerifyInstance below, since some of the
3092         # wrong instances could be from other node groups.)
3093         non_primary_inst_uuids = set(nimg.instances).difference(nimg.pinst)
3094
3095         for inst_uuid in non_primary_inst_uuids:
3096           test = inst_uuid in self.all_inst_info
3097           self._ErrorIf(test, constants.CV_EINSTANCEWRONGNODE,
3098                         self.cfg.GetInstanceName(inst_uuid),
3099                         "instance should not run on node %s", node_i.name)
3100           self._ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
3101                         "node is running unknown instance %s", inst_uuid)
3102
3103     self._VerifyGroupDRBDVersion(all_nvinfo)
3104     self._VerifyGroupLVM(node_image, vg_name)
3105
3106     for node_uuid, result in extra_lv_nvinfo.items():
3107       self._UpdateNodeVolumes(self.all_node_info[node_uuid], result.payload,
3108                               node_image[node_uuid], vg_name)
3109
3110     feedback_fn("* Verifying instance status")
3111     for inst_uuid in self.my_inst_uuids:
3112       instance = self.my_inst_info[inst_uuid]
3113       if verbose:
3114         feedback_fn("* Verifying instance %s" % instance.name)
3115       self._VerifyInstance(instance, node_image, instdisk[inst_uuid])
3116
3117       # If the instance is non-redundant we cannot survive losing its primary
3118       # node, so we are not N+1 compliant.
3119       if instance.disk_template not in constants.DTS_MIRRORED:
3120         i_non_redundant.append(instance)
3121
3122       if not cluster.FillBE(instance)[constants.BE_AUTO_BALANCE]:
3123         i_non_a_balanced.append(instance)
3124
3125     feedback_fn("* Verifying orphan volumes")
3126     reserved = utils.FieldSet(*cluster.reserved_lvs)
3127
3128     # We will get spurious "unknown volume" warnings if any node of this group
3129     # is secondary for an instance whose primary is in another group. To avoid
3130     # them, we find these instances and add their volumes to node_vol_should.
3131     for instance in self.all_inst_info.values():
3132       for secondary in instance.secondary_nodes:
3133         if (secondary in self.my_node_info
3134             and instance.name not in self.my_inst_info):
3135           instance.MapLVsByNode(node_vol_should)
3136           break
3137
3138     self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
3139
3140     if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
3141       feedback_fn("* Verifying N+1 Memory redundancy")
3142       self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
3143
3144     feedback_fn("* Other Notes")
3145     if i_non_redundant:
3146       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
3147                   % len(i_non_redundant))
3148
3149     if i_non_a_balanced:
3150       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
3151                   % len(i_non_a_balanced))
3152
3153     if i_offline:
3154       feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
3155
3156     if n_offline:
3157       feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
3158
3159     if n_drained:
3160       feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
3161
3162     return not self.bad
3163
3164   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
3165     """Analyze the post-hooks' result
3166
3167     This method analyses the hook result, handles it, and sends some
3168     nicely-formatted feedback back to the user.
3169
3170     @param phase: one of L{constants.HOOKS_PHASE_POST} or
3171         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
3172     @param hooks_results: the results of the multi-node hooks rpc call
3173     @param feedback_fn: function used send feedback back to the caller
3174     @param lu_result: previous Exec result
3175     @return: the new Exec result, based on the previous result
3176         and hook results
3177
3178     """
3179     # We only really run POST phase hooks, only for non-empty groups,
3180     # and are only interested in their results
3181     if not self.my_node_uuids:
3182       # empty node group
3183       pass
3184     elif phase == constants.HOOKS_PHASE_POST:
3185       # Used to change hooks' output to proper indentation
3186       feedback_fn("* Hooks Results")
3187       assert hooks_results, "invalid result from hooks"
3188
3189       for node_name in hooks_results:
3190         res = hooks_results[node_name]
3191         msg = res.fail_msg
3192         test = msg and not res.offline
3193         self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3194                       "Communication failure in hooks execution: %s", msg)
3195         if res.offline or msg:
3196           # No need to investigate payload if node is offline or gave
3197           # an error.
3198           continue
3199         for script, hkr, output in res.payload:
3200           test = hkr == constants.HKR_FAIL
3201           self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3202                         "Script %s failed, output:", script)
3203           if test:
3204             output = self._HOOKS_INDENT_RE.sub("      ", output)
3205             feedback_fn("%s" % output)
3206             lu_result = False
3207
3208     return lu_result
3209
3210
3211 class LUClusterVerifyDisks(NoHooksLU):
3212   """Verifies the cluster disks status.
3213
3214   """
3215   REQ_BGL = False
3216
3217   def ExpandNames(self):
3218     self.share_locks = ShareAll()
3219     self.needed_locks = {
3220       locking.LEVEL_NODEGROUP: locking.ALL_SET,
3221       }
3222
3223   def Exec(self, feedback_fn):
3224     group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3225
3226     # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3227     return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3228                            for group in group_names])