Merge branch 'stable-2.9' into master
[ganeti-local] / lib / cmdlib / cluster.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Logical units dealing with the cluster."""
23
24 import OpenSSL
25
26 import copy
27 import itertools
28 import logging
29 import operator
30 import os
31 import re
32 import time
33
34 from ganeti import compat
35 from ganeti import constants
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import masterd
40 from ganeti import netutils
41 from ganeti import objects
42 from ganeti import opcodes
43 from ganeti import pathutils
44 from ganeti import query
45 from ganeti import rpc
46 from ganeti import runtime
47 from ganeti import ssh
48 from ganeti import uidpool
49 from ganeti import utils
50 from ganeti import vcluster
51
52 from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
53   ResultWithJobs
54 from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55   ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56   GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57   GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58   CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59   ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob, \
60   CheckIpolicyVsDiskTemplates
61
62 import ganeti.masterd.instance
63
64
65 class LUClusterActivateMasterIp(NoHooksLU):
66   """Activate the master IP on the master node.
67
68   """
69   def Exec(self, feedback_fn):
70     """Activate the master IP.
71
72     """
73     master_params = self.cfg.GetMasterNetworkParameters()
74     ems = self.cfg.GetUseExternalMipScript()
75     result = self.rpc.call_node_activate_master_ip(master_params.uuid,
76                                                    master_params, ems)
77     result.Raise("Could not activate the master IP")
78
79
80 class LUClusterDeactivateMasterIp(NoHooksLU):
81   """Deactivate the master IP on the master node.
82
83   """
84   def Exec(self, feedback_fn):
85     """Deactivate the master IP.
86
87     """
88     master_params = self.cfg.GetMasterNetworkParameters()
89     ems = self.cfg.GetUseExternalMipScript()
90     result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
91                                                      master_params, ems)
92     result.Raise("Could not deactivate the master IP")
93
94
95 class LUClusterConfigQuery(NoHooksLU):
96   """Return configuration values.
97
98   """
99   REQ_BGL = False
100
101   def CheckArguments(self):
102     self.cq = ClusterQuery(None, self.op.output_fields, False)
103
104   def ExpandNames(self):
105     self.cq.ExpandNames(self)
106
107   def DeclareLocks(self, level):
108     self.cq.DeclareLocks(self, level)
109
110   def Exec(self, feedback_fn):
111     result = self.cq.OldStyleQuery(self)
112
113     assert len(result) == 1
114
115     return result[0]
116
117
118 class LUClusterDestroy(LogicalUnit):
119   """Logical unit for destroying the cluster.
120
121   """
122   HPATH = "cluster-destroy"
123   HTYPE = constants.HTYPE_CLUSTER
124
125   def BuildHooksEnv(self):
126     """Build hooks env.
127
128     """
129     return {
130       "OP_TARGET": self.cfg.GetClusterName(),
131       }
132
133   def BuildHooksNodes(self):
134     """Build hooks nodes.
135
136     """
137     return ([], [])
138
139   def CheckPrereq(self):
140     """Check prerequisites.
141
142     This checks whether the cluster is empty.
143
144     Any errors are signaled by raising errors.OpPrereqError.
145
146     """
147     master = self.cfg.GetMasterNode()
148
149     nodelist = self.cfg.GetNodeList()
150     if len(nodelist) != 1 or nodelist[0] != master:
151       raise errors.OpPrereqError("There are still %d node(s) in"
152                                  " this cluster." % (len(nodelist) - 1),
153                                  errors.ECODE_INVAL)
154     instancelist = self.cfg.GetInstanceList()
155     if instancelist:
156       raise errors.OpPrereqError("There are still %d instance(s) in"
157                                  " this cluster." % len(instancelist),
158                                  errors.ECODE_INVAL)
159
160   def Exec(self, feedback_fn):
161     """Destroys the cluster.
162
163     """
164     master_params = self.cfg.GetMasterNetworkParameters()
165
166     # Run post hooks on master node before it's removed
167     RunPostHook(self, self.cfg.GetNodeName(master_params.uuid))
168
169     ems = self.cfg.GetUseExternalMipScript()
170     result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
171                                                      master_params, ems)
172     result.Warn("Error disabling the master IP address", self.LogWarning)
173     return master_params.uuid
174
175
176 class LUClusterPostInit(LogicalUnit):
177   """Logical unit for running hooks after cluster initialization.
178
179   """
180   HPATH = "cluster-init"
181   HTYPE = constants.HTYPE_CLUSTER
182
183   def BuildHooksEnv(self):
184     """Build hooks env.
185
186     """
187     return {
188       "OP_TARGET": self.cfg.GetClusterName(),
189       }
190
191   def BuildHooksNodes(self):
192     """Build hooks nodes.
193
194     """
195     return ([], [self.cfg.GetMasterNode()])
196
197   def Exec(self, feedback_fn):
198     """Nothing to do.
199
200     """
201     return True
202
203
204 class ClusterQuery(QueryBase):
205   FIELDS = query.CLUSTER_FIELDS
206
207   #: Do not sort (there is only one item)
208   SORT_FIELD = None
209
210   def ExpandNames(self, lu):
211     lu.needed_locks = {}
212
213     # The following variables interact with _QueryBase._GetNames
214     self.wanted = locking.ALL_SET
215     self.do_locking = self.use_locking
216
217     if self.do_locking:
218       raise errors.OpPrereqError("Can not use locking for cluster queries",
219                                  errors.ECODE_INVAL)
220
221   def DeclareLocks(self, lu, level):
222     pass
223
224   def _GetQueryData(self, lu):
225     """Computes the list of nodes and their attributes.
226
227     """
228     # Locking is not used
229     assert not (compat.any(lu.glm.is_owned(level)
230                            for level in locking.LEVELS
231                            if level != locking.LEVEL_CLUSTER) or
232                 self.do_locking or self.use_locking)
233
234     if query.CQ_CONFIG in self.requested_data:
235       cluster = lu.cfg.GetClusterInfo()
236       nodes = lu.cfg.GetAllNodesInfo()
237     else:
238       cluster = NotImplemented
239       nodes = NotImplemented
240
241     if query.CQ_QUEUE_DRAINED in self.requested_data:
242       drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
243     else:
244       drain_flag = NotImplemented
245
246     if query.CQ_WATCHER_PAUSE in self.requested_data:
247       master_node_uuid = lu.cfg.GetMasterNode()
248
249       result = lu.rpc.call_get_watcher_pause(master_node_uuid)
250       result.Raise("Can't retrieve watcher pause from master node '%s'" %
251                    lu.cfg.GetMasterNodeName())
252
253       watcher_pause = result.payload
254     else:
255       watcher_pause = NotImplemented
256
257     return query.ClusterQueryData(cluster, nodes, drain_flag, watcher_pause)
258
259
260 class LUClusterQuery(NoHooksLU):
261   """Query cluster configuration.
262
263   """
264   REQ_BGL = False
265
266   def ExpandNames(self):
267     self.needed_locks = {}
268
269   def Exec(self, feedback_fn):
270     """Return cluster config.
271
272     """
273     cluster = self.cfg.GetClusterInfo()
274     os_hvp = {}
275
276     # Filter just for enabled hypervisors
277     for os_name, hv_dict in cluster.os_hvp.items():
278       os_hvp[os_name] = {}
279       for hv_name, hv_params in hv_dict.items():
280         if hv_name in cluster.enabled_hypervisors:
281           os_hvp[os_name][hv_name] = hv_params
282
283     # Convert ip_family to ip_version
284     primary_ip_version = constants.IP4_VERSION
285     if cluster.primary_ip_family == netutils.IP6Address.family:
286       primary_ip_version = constants.IP6_VERSION
287
288     result = {
289       "software_version": constants.RELEASE_VERSION,
290       "protocol_version": constants.PROTOCOL_VERSION,
291       "config_version": constants.CONFIG_VERSION,
292       "os_api_version": max(constants.OS_API_VERSIONS),
293       "export_version": constants.EXPORT_VERSION,
294       "vcs_version": constants.VCS_VERSION,
295       "architecture": runtime.GetArchInfo(),
296       "name": cluster.cluster_name,
297       "master": self.cfg.GetMasterNodeName(),
298       "default_hypervisor": cluster.primary_hypervisor,
299       "enabled_hypervisors": cluster.enabled_hypervisors,
300       "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
301                         for hypervisor_name in cluster.enabled_hypervisors]),
302       "os_hvp": os_hvp,
303       "beparams": cluster.beparams,
304       "osparams": cluster.osparams,
305       "ipolicy": cluster.ipolicy,
306       "nicparams": cluster.nicparams,
307       "ndparams": cluster.ndparams,
308       "diskparams": cluster.diskparams,
309       "candidate_pool_size": cluster.candidate_pool_size,
310       "master_netdev": cluster.master_netdev,
311       "master_netmask": cluster.master_netmask,
312       "use_external_mip_script": cluster.use_external_mip_script,
313       "volume_group_name": cluster.volume_group_name,
314       "drbd_usermode_helper": cluster.drbd_usermode_helper,
315       "file_storage_dir": cluster.file_storage_dir,
316       "shared_file_storage_dir": cluster.shared_file_storage_dir,
317       "maintain_node_health": cluster.maintain_node_health,
318       "ctime": cluster.ctime,
319       "mtime": cluster.mtime,
320       "uuid": cluster.uuid,
321       "tags": list(cluster.GetTags()),
322       "uid_pool": cluster.uid_pool,
323       "default_iallocator": cluster.default_iallocator,
324       "reserved_lvs": cluster.reserved_lvs,
325       "primary_ip_version": primary_ip_version,
326       "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
327       "hidden_os": cluster.hidden_os,
328       "blacklisted_os": cluster.blacklisted_os,
329       "enabled_disk_templates": cluster.enabled_disk_templates,
330       }
331
332     return result
333
334
335 class LUClusterRedistConf(NoHooksLU):
336   """Force the redistribution of cluster configuration.
337
338   This is a very simple LU.
339
340   """
341   REQ_BGL = False
342
343   def ExpandNames(self):
344     self.needed_locks = {
345       locking.LEVEL_NODE: locking.ALL_SET,
346       locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
347     }
348     self.share_locks = ShareAll()
349
350   def Exec(self, feedback_fn):
351     """Redistribute the configuration.
352
353     """
354     self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
355     RedistributeAncillaryFiles(self)
356
357
358 class LUClusterRename(LogicalUnit):
359   """Rename the cluster.
360
361   """
362   HPATH = "cluster-rename"
363   HTYPE = constants.HTYPE_CLUSTER
364
365   def BuildHooksEnv(self):
366     """Build hooks env.
367
368     """
369     return {
370       "OP_TARGET": self.cfg.GetClusterName(),
371       "NEW_NAME": self.op.name,
372       }
373
374   def BuildHooksNodes(self):
375     """Build hooks nodes.
376
377     """
378     return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
379
380   def CheckPrereq(self):
381     """Verify that the passed name is a valid one.
382
383     """
384     hostname = netutils.GetHostname(name=self.op.name,
385                                     family=self.cfg.GetPrimaryIPFamily())
386
387     new_name = hostname.name
388     self.ip = new_ip = hostname.ip
389     old_name = self.cfg.GetClusterName()
390     old_ip = self.cfg.GetMasterIP()
391     if new_name == old_name and new_ip == old_ip:
392       raise errors.OpPrereqError("Neither the name nor the IP address of the"
393                                  " cluster has changed",
394                                  errors.ECODE_INVAL)
395     if new_ip != old_ip:
396       if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
397         raise errors.OpPrereqError("The given cluster IP address (%s) is"
398                                    " reachable on the network" %
399                                    new_ip, errors.ECODE_NOTUNIQUE)
400
401     self.op.name = new_name
402
403   def Exec(self, feedback_fn):
404     """Rename the cluster.
405
406     """
407     clustername = self.op.name
408     new_ip = self.ip
409
410     # shutdown the master IP
411     master_params = self.cfg.GetMasterNetworkParameters()
412     ems = self.cfg.GetUseExternalMipScript()
413     result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
414                                                      master_params, ems)
415     result.Raise("Could not disable the master role")
416
417     try:
418       cluster = self.cfg.GetClusterInfo()
419       cluster.cluster_name = clustername
420       cluster.master_ip = new_ip
421       self.cfg.Update(cluster, feedback_fn)
422
423       # update the known hosts file
424       ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
425       node_list = self.cfg.GetOnlineNodeList()
426       try:
427         node_list.remove(master_params.uuid)
428       except ValueError:
429         pass
430       UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
431     finally:
432       master_params.ip = new_ip
433       result = self.rpc.call_node_activate_master_ip(master_params.uuid,
434                                                      master_params, ems)
435       result.Warn("Could not re-enable the master role on the master,"
436                   " please restart manually", self.LogWarning)
437
438     return clustername
439
440
441 class LUClusterRepairDiskSizes(NoHooksLU):
442   """Verifies the cluster disks sizes.
443
444   """
445   REQ_BGL = False
446
447   def ExpandNames(self):
448     if self.op.instances:
449       (_, self.wanted_names) = GetWantedInstances(self, self.op.instances)
450       # Not getting the node allocation lock as only a specific set of
451       # instances (and their nodes) is going to be acquired
452       self.needed_locks = {
453         locking.LEVEL_NODE_RES: [],
454         locking.LEVEL_INSTANCE: self.wanted_names,
455         }
456       self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
457     else:
458       self.wanted_names = None
459       self.needed_locks = {
460         locking.LEVEL_NODE_RES: locking.ALL_SET,
461         locking.LEVEL_INSTANCE: locking.ALL_SET,
462
463         # This opcode is acquires the node locks for all instances
464         locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
465         }
466
467     self.share_locks = {
468       locking.LEVEL_NODE_RES: 1,
469       locking.LEVEL_INSTANCE: 0,
470       locking.LEVEL_NODE_ALLOC: 1,
471       }
472
473   def DeclareLocks(self, level):
474     if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
475       self._LockInstancesNodes(primary_only=True, level=level)
476
477   def CheckPrereq(self):
478     """Check prerequisites.
479
480     This only checks the optional instance list against the existing names.
481
482     """
483     if self.wanted_names is None:
484       self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
485
486     self.wanted_instances = \
487         map(compat.snd, self.cfg.GetMultiInstanceInfoByName(self.wanted_names))
488
489   def _EnsureChildSizes(self, disk):
490     """Ensure children of the disk have the needed disk size.
491
492     This is valid mainly for DRBD8 and fixes an issue where the
493     children have smaller disk size.
494
495     @param disk: an L{ganeti.objects.Disk} object
496
497     """
498     if disk.dev_type == constants.DT_DRBD8:
499       assert disk.children, "Empty children for DRBD8?"
500       fchild = disk.children[0]
501       mismatch = fchild.size < disk.size
502       if mismatch:
503         self.LogInfo("Child disk has size %d, parent %d, fixing",
504                      fchild.size, disk.size)
505         fchild.size = disk.size
506
507       # and we recurse on this child only, not on the metadev
508       return self._EnsureChildSizes(fchild) or mismatch
509     else:
510       return False
511
512   def Exec(self, feedback_fn):
513     """Verify the size of cluster disks.
514
515     """
516     # TODO: check child disks too
517     # TODO: check differences in size between primary/secondary nodes
518     per_node_disks = {}
519     for instance in self.wanted_instances:
520       pnode = instance.primary_node
521       if pnode not in per_node_disks:
522         per_node_disks[pnode] = []
523       for idx, disk in enumerate(instance.disks):
524         per_node_disks[pnode].append((instance, idx, disk))
525
526     assert not (frozenset(per_node_disks.keys()) -
527                 self.owned_locks(locking.LEVEL_NODE_RES)), \
528       "Not owning correct locks"
529     assert not self.owned_locks(locking.LEVEL_NODE)
530
531     es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
532                                                per_node_disks.keys())
533
534     changed = []
535     for node_uuid, dskl in per_node_disks.items():
536       newl = [v[2].Copy() for v in dskl]
537       for dsk in newl:
538         self.cfg.SetDiskID(dsk, node_uuid)
539       node_name = self.cfg.GetNodeName(node_uuid)
540       result = self.rpc.call_blockdev_getdimensions(node_uuid, newl)
541       if result.fail_msg:
542         self.LogWarning("Failure in blockdev_getdimensions call to node"
543                         " %s, ignoring", node_name)
544         continue
545       if len(result.payload) != len(dskl):
546         logging.warning("Invalid result from node %s: len(dksl)=%d,"
547                         " result.payload=%s", node_name, len(dskl),
548                         result.payload)
549         self.LogWarning("Invalid result from node %s, ignoring node results",
550                         node_name)
551         continue
552       for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
553         if dimensions is None:
554           self.LogWarning("Disk %d of instance %s did not return size"
555                           " information, ignoring", idx, instance.name)
556           continue
557         if not isinstance(dimensions, (tuple, list)):
558           self.LogWarning("Disk %d of instance %s did not return valid"
559                           " dimension information, ignoring", idx,
560                           instance.name)
561           continue
562         (size, spindles) = dimensions
563         if not isinstance(size, (int, long)):
564           self.LogWarning("Disk %d of instance %s did not return valid"
565                           " size information, ignoring", idx, instance.name)
566           continue
567         size = size >> 20
568         if size != disk.size:
569           self.LogInfo("Disk %d of instance %s has mismatched size,"
570                        " correcting: recorded %d, actual %d", idx,
571                        instance.name, disk.size, size)
572           disk.size = size
573           self.cfg.Update(instance, feedback_fn)
574           changed.append((instance.name, idx, "size", size))
575         if es_flags[node_uuid]:
576           if spindles is None:
577             self.LogWarning("Disk %d of instance %s did not return valid"
578                             " spindles information, ignoring", idx,
579                             instance.name)
580           elif disk.spindles is None or disk.spindles != spindles:
581             self.LogInfo("Disk %d of instance %s has mismatched spindles,"
582                          " correcting: recorded %s, actual %s",
583                          idx, instance.name, disk.spindles, spindles)
584             disk.spindles = spindles
585             self.cfg.Update(instance, feedback_fn)
586             changed.append((instance.name, idx, "spindles", disk.spindles))
587         if self._EnsureChildSizes(disk):
588           self.cfg.Update(instance, feedback_fn)
589           changed.append((instance.name, idx, "size", disk.size))
590     return changed
591
592
593 def _ValidateNetmask(cfg, netmask):
594   """Checks if a netmask is valid.
595
596   @type cfg: L{config.ConfigWriter}
597   @param cfg: The cluster configuration
598   @type netmask: int
599   @param netmask: the netmask to be verified
600   @raise errors.OpPrereqError: if the validation fails
601
602   """
603   ip_family = cfg.GetPrimaryIPFamily()
604   try:
605     ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
606   except errors.ProgrammerError:
607     raise errors.OpPrereqError("Invalid primary ip family: %s." %
608                                ip_family, errors.ECODE_INVAL)
609   if not ipcls.ValidateNetmask(netmask):
610     raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
611                                (netmask), errors.ECODE_INVAL)
612
613
614 def CheckFileBasedStoragePathVsEnabledDiskTemplates(
615     logging_warn_fn, file_storage_dir, enabled_disk_templates,
616     file_disk_template):
617   """Checks whether the given file-based storage directory is acceptable.
618
619   Note: This function is public, because it is also used in bootstrap.py.
620
621   @type logging_warn_fn: function
622   @param logging_warn_fn: function which accepts a string and logs it
623   @type file_storage_dir: string
624   @param file_storage_dir: the directory to be used for file-based instances
625   @type enabled_disk_templates: list of string
626   @param enabled_disk_templates: the list of enabled disk templates
627   @type file_disk_template: string
628   @param file_disk_template: the file-based disk template for which the
629       path should be checked
630
631   """
632   assert (file_disk_template in
633           utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
634   file_storage_enabled = file_disk_template in enabled_disk_templates
635   if file_storage_dir is not None:
636     if file_storage_dir == "":
637       if file_storage_enabled:
638         raise errors.OpPrereqError(
639             "Unsetting the '%s' storage directory while having '%s' storage"
640             " enabled is not permitted." %
641             (file_disk_template, file_disk_template))
642     else:
643       if not file_storage_enabled:
644         logging_warn_fn(
645             "Specified a %s storage directory, although %s storage is not"
646             " enabled." % (file_disk_template, file_disk_template))
647   else:
648     raise errors.ProgrammerError("Received %s storage dir with value"
649                                  " 'None'." % file_disk_template)
650
651
652 def CheckFileStoragePathVsEnabledDiskTemplates(
653     logging_warn_fn, file_storage_dir, enabled_disk_templates):
654   """Checks whether the given file storage directory is acceptable.
655
656   @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
657
658   """
659   CheckFileBasedStoragePathVsEnabledDiskTemplates(
660       logging_warn_fn, file_storage_dir, enabled_disk_templates,
661       constants.DT_FILE)
662
663
664 def CheckSharedFileStoragePathVsEnabledDiskTemplates(
665     logging_warn_fn, file_storage_dir, enabled_disk_templates):
666   """Checks whether the given shared file storage directory is acceptable.
667
668   @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
669
670   """
671   CheckFileBasedStoragePathVsEnabledDiskTemplates(
672       logging_warn_fn, file_storage_dir, enabled_disk_templates,
673       constants.DT_SHARED_FILE)
674
675
676 class LUClusterSetParams(LogicalUnit):
677   """Change the parameters of the cluster.
678
679   """
680   HPATH = "cluster-modify"
681   HTYPE = constants.HTYPE_CLUSTER
682   REQ_BGL = False
683
684   def CheckArguments(self):
685     """Check parameters
686
687     """
688     if self.op.uid_pool:
689       uidpool.CheckUidPool(self.op.uid_pool)
690
691     if self.op.add_uids:
692       uidpool.CheckUidPool(self.op.add_uids)
693
694     if self.op.remove_uids:
695       uidpool.CheckUidPool(self.op.remove_uids)
696
697     if self.op.master_netmask is not None:
698       _ValidateNetmask(self.cfg, self.op.master_netmask)
699
700     if self.op.diskparams:
701       for dt_params in self.op.diskparams.values():
702         utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
703       try:
704         utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
705       except errors.OpPrereqError, err:
706         raise errors.OpPrereqError("While verify diskparams options: %s" % err,
707                                    errors.ECODE_INVAL)
708
709   def ExpandNames(self):
710     # FIXME: in the future maybe other cluster params won't require checking on
711     # all nodes to be modified.
712     # FIXME: This opcode changes cluster-wide settings. Is acquiring all
713     # resource locks the right thing, shouldn't it be the BGL instead?
714     self.needed_locks = {
715       locking.LEVEL_NODE: locking.ALL_SET,
716       locking.LEVEL_INSTANCE: locking.ALL_SET,
717       locking.LEVEL_NODEGROUP: locking.ALL_SET,
718       locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
719     }
720     self.share_locks = ShareAll()
721
722   def BuildHooksEnv(self):
723     """Build hooks env.
724
725     """
726     return {
727       "OP_TARGET": self.cfg.GetClusterName(),
728       "NEW_VG_NAME": self.op.vg_name,
729       }
730
731   def BuildHooksNodes(self):
732     """Build hooks nodes.
733
734     """
735     mn = self.cfg.GetMasterNode()
736     return ([mn], [mn])
737
738   def _CheckVgName(self, node_uuids, enabled_disk_templates,
739                    new_enabled_disk_templates):
740     """Check the consistency of the vg name on all nodes and in case it gets
741        unset whether there are instances still using it.
742
743     """
744     lvm_is_enabled = utils.IsLvmEnabled(enabled_disk_templates)
745     lvm_gets_enabled = utils.LvmGetsEnabled(enabled_disk_templates,
746                                             new_enabled_disk_templates)
747     current_vg_name = self.cfg.GetVGName()
748
749     if self.op.vg_name == '':
750       if lvm_is_enabled:
751         raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
752                                    " disk templates are or get enabled.")
753
754     if self.op.vg_name is None:
755       if current_vg_name is None and lvm_is_enabled:
756         raise errors.OpPrereqError("Please specify a volume group when"
757                                    " enabling lvm-based disk-templates.")
758
759     if self.op.vg_name is not None and not self.op.vg_name:
760       if self.cfg.HasAnyDiskOfType(constants.DT_PLAIN):
761         raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
762                                    " instances exist", errors.ECODE_INVAL)
763
764     if (self.op.vg_name is not None and lvm_is_enabled) or \
765         (self.cfg.GetVGName() is not None and lvm_gets_enabled):
766       self._CheckVgNameOnNodes(node_uuids)
767
768   def _CheckVgNameOnNodes(self, node_uuids):
769     """Check the status of the volume group on each node.
770
771     """
772     vglist = self.rpc.call_vg_list(node_uuids)
773     for node_uuid in node_uuids:
774       msg = vglist[node_uuid].fail_msg
775       if msg:
776         # ignoring down node
777         self.LogWarning("Error while gathering data on node %s"
778                         " (ignoring node): %s",
779                         self.cfg.GetNodeName(node_uuid), msg)
780         continue
781       vgstatus = utils.CheckVolumeGroupSize(vglist[node_uuid].payload,
782                                             self.op.vg_name,
783                                             constants.MIN_VG_SIZE)
784       if vgstatus:
785         raise errors.OpPrereqError("Error on node '%s': %s" %
786                                    (self.cfg.GetNodeName(node_uuid), vgstatus),
787                                    errors.ECODE_ENVIRON)
788
789   @staticmethod
790   def _GetEnabledDiskTemplatesInner(op_enabled_disk_templates,
791                                     old_enabled_disk_templates):
792     """Determines the enabled disk templates and the subset of disk templates
793        that are newly enabled by this operation.
794
795     """
796     enabled_disk_templates = None
797     new_enabled_disk_templates = []
798     if op_enabled_disk_templates:
799       enabled_disk_templates = op_enabled_disk_templates
800       new_enabled_disk_templates = \
801         list(set(enabled_disk_templates)
802              - set(old_enabled_disk_templates))
803     else:
804       enabled_disk_templates = old_enabled_disk_templates
805     return (enabled_disk_templates, new_enabled_disk_templates)
806
807   def _GetEnabledDiskTemplates(self, cluster):
808     """Determines the enabled disk templates and the subset of disk templates
809        that are newly enabled by this operation.
810
811     """
812     return self._GetEnabledDiskTemplatesInner(self.op.enabled_disk_templates,
813                                               cluster.enabled_disk_templates)
814
815   def _CheckIpolicy(self, cluster, enabled_disk_templates):
816     """Checks the ipolicy.
817
818     @type cluster: C{objects.Cluster}
819     @param cluster: the cluster's configuration
820     @type enabled_disk_templates: list of string
821     @param enabled_disk_templates: list of (possibly newly) enabled disk
822       templates
823
824     """
825     # FIXME: write unit tests for this
826     if self.op.ipolicy:
827       self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
828                                            group_policy=False)
829
830       CheckIpolicyVsDiskTemplates(self.new_ipolicy,
831                                   enabled_disk_templates)
832
833       all_instances = self.cfg.GetAllInstancesInfo().values()
834       violations = set()
835       for group in self.cfg.GetAllNodeGroupsInfo().values():
836         instances = frozenset([inst for inst in all_instances
837                                if compat.any(nuuid in group.members
838                                              for nuuid in inst.all_nodes)])
839         new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
840         ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
841         new = ComputeNewInstanceViolations(ipol, new_ipolicy, instances,
842                                            self.cfg)
843         if new:
844           violations.update(new)
845
846       if violations:
847         self.LogWarning("After the ipolicy change the following instances"
848                         " violate them: %s",
849                         utils.CommaJoin(utils.NiceSort(violations)))
850     else:
851       CheckIpolicyVsDiskTemplates(cluster.ipolicy,
852                                   enabled_disk_templates)
853
854   def _CheckDrbdHelperOnNodes(self, drbd_helper, node_uuids):
855     """Checks whether the set DRBD helper actually exists on the nodes.
856
857     @type drbd_helper: string
858     @param drbd_helper: path of the drbd usermode helper binary
859     @type node_uuids: list of strings
860     @param node_uuids: list of node UUIDs to check for the helper
861
862     """
863     # checks given drbd helper on all nodes
864     helpers = self.rpc.call_drbd_helper(node_uuids)
865     for (_, ninfo) in self.cfg.GetMultiNodeInfo(node_uuids):
866       if ninfo.offline:
867         self.LogInfo("Not checking drbd helper on offline node %s",
868                      ninfo.name)
869         continue
870       msg = helpers[ninfo.uuid].fail_msg
871       if msg:
872         raise errors.OpPrereqError("Error checking drbd helper on node"
873                                    " '%s': %s" % (ninfo.name, msg),
874                                    errors.ECODE_ENVIRON)
875       node_helper = helpers[ninfo.uuid].payload
876       if node_helper != drbd_helper:
877         raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
878                                    (ninfo.name, node_helper),
879                                    errors.ECODE_ENVIRON)
880
881   def _CheckDrbdHelper(self, node_uuids, drbd_enabled, drbd_gets_enabled):
882     """Check the DRBD usermode helper.
883
884     @type node_uuids: list of strings
885     @param node_uuids: a list of nodes' UUIDs
886     @type drbd_enabled: boolean
887     @param drbd_enabled: whether DRBD will be enabled after this operation
888       (no matter if it was disabled before or not)
889     @type drbd_gets_enabled: boolen
890     @param drbd_gets_enabled: true if DRBD was disabled before this
891       operation, but will be enabled afterwards
892
893     """
894     if self.op.drbd_helper == '':
895       if drbd_enabled:
896         raise errors.OpPrereqError("Cannot disable drbd helper while"
897                                    " DRBD is enabled.")
898       if self.cfg.HasAnyDiskOfType(constants.DT_DRBD8):
899         raise errors.OpPrereqError("Cannot disable drbd helper while"
900                                    " drbd-based instances exist",
901                                    errors.ECODE_INVAL)
902
903     else:
904       if self.op.drbd_helper is not None and drbd_enabled:
905         self._CheckDrbdHelperOnNodes(self.op.drbd_helper, node_uuids)
906       else:
907         if drbd_gets_enabled:
908           current_drbd_helper = self.cfg.GetClusterInfo().drbd_usermode_helper
909           if current_drbd_helper is not None:
910             self._CheckDrbdHelperOnNodes(current_drbd_helper, node_uuids)
911           else:
912             raise errors.OpPrereqError("Cannot enable DRBD without a"
913                                        " DRBD usermode helper set.")
914
915   def CheckPrereq(self):
916     """Check prerequisites.
917
918     This checks whether the given params don't conflict and
919     if the given volume group is valid.
920
921     """
922     node_uuids = self.owned_locks(locking.LEVEL_NODE)
923     self.cluster = cluster = self.cfg.GetClusterInfo()
924
925     vm_capable_node_uuids = [node.uuid
926                              for node in self.cfg.GetAllNodesInfo().values()
927                              if node.uuid in node_uuids and node.vm_capable]
928
929     (enabled_disk_templates, new_enabled_disk_templates) = \
930       self._GetEnabledDiskTemplates(cluster)
931
932     self._CheckVgName(vm_capable_node_uuids, enabled_disk_templates,
933                       new_enabled_disk_templates)
934
935     if self.op.file_storage_dir is not None:
936       CheckFileStoragePathVsEnabledDiskTemplates(
937           self.LogWarning, self.op.file_storage_dir, enabled_disk_templates)
938
939     if self.op.shared_file_storage_dir is not None:
940       CheckSharedFileStoragePathVsEnabledDiskTemplates(
941           self.LogWarning, self.op.shared_file_storage_dir,
942           enabled_disk_templates)
943
944     drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
945     drbd_gets_enabled = constants.DT_DRBD8 in new_enabled_disk_templates
946     self._CheckDrbdHelper(node_uuids, drbd_enabled, drbd_gets_enabled)
947
948     # validate params changes
949     if self.op.beparams:
950       objects.UpgradeBeParams(self.op.beparams)
951       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
952       self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
953
954     if self.op.ndparams:
955       utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
956       self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
957
958       # TODO: we need a more general way to handle resetting
959       # cluster-level parameters to default values
960       if self.new_ndparams["oob_program"] == "":
961         self.new_ndparams["oob_program"] = \
962             constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
963
964     if self.op.hv_state:
965       new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
966                                            self.cluster.hv_state_static)
967       self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
968                                for hv, values in new_hv_state.items())
969
970     if self.op.disk_state:
971       new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
972                                                self.cluster.disk_state_static)
973       self.new_disk_state = \
974         dict((storage, dict((name, cluster.SimpleFillDiskState(values))
975                             for name, values in svalues.items()))
976              for storage, svalues in new_disk_state.items())
977
978     self._CheckIpolicy(cluster, enabled_disk_templates)
979
980     if self.op.nicparams:
981       utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
982       self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
983       objects.NIC.CheckParameterSyntax(self.new_nicparams)
984       nic_errors = []
985
986       # check all instances for consistency
987       for instance in self.cfg.GetAllInstancesInfo().values():
988         for nic_idx, nic in enumerate(instance.nics):
989           params_copy = copy.deepcopy(nic.nicparams)
990           params_filled = objects.FillDict(self.new_nicparams, params_copy)
991
992           # check parameter syntax
993           try:
994             objects.NIC.CheckParameterSyntax(params_filled)
995           except errors.ConfigurationError, err:
996             nic_errors.append("Instance %s, nic/%d: %s" %
997                               (instance.name, nic_idx, err))
998
999           # if we're moving instances to routed, check that they have an ip
1000           target_mode = params_filled[constants.NIC_MODE]
1001           if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
1002             nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
1003                               " address" % (instance.name, nic_idx))
1004       if nic_errors:
1005         raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
1006                                    "\n".join(nic_errors), errors.ECODE_INVAL)
1007
1008     # hypervisor list/parameters
1009     self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
1010     if self.op.hvparams:
1011       for hv_name, hv_dict in self.op.hvparams.items():
1012         if hv_name not in self.new_hvparams:
1013           self.new_hvparams[hv_name] = hv_dict
1014         else:
1015           self.new_hvparams[hv_name].update(hv_dict)
1016
1017     # disk template parameters
1018     self.new_diskparams = objects.FillDict(cluster.diskparams, {})
1019     if self.op.diskparams:
1020       for dt_name, dt_params in self.op.diskparams.items():
1021         if dt_name not in self.new_diskparams:
1022           self.new_diskparams[dt_name] = dt_params
1023         else:
1024           self.new_diskparams[dt_name].update(dt_params)
1025
1026     # os hypervisor parameters
1027     self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
1028     if self.op.os_hvp:
1029       for os_name, hvs in self.op.os_hvp.items():
1030         if os_name not in self.new_os_hvp:
1031           self.new_os_hvp[os_name] = hvs
1032         else:
1033           for hv_name, hv_dict in hvs.items():
1034             if hv_dict is None:
1035               # Delete if it exists
1036               self.new_os_hvp[os_name].pop(hv_name, None)
1037             elif hv_name not in self.new_os_hvp[os_name]:
1038               self.new_os_hvp[os_name][hv_name] = hv_dict
1039             else:
1040               self.new_os_hvp[os_name][hv_name].update(hv_dict)
1041
1042     # os parameters
1043     self.new_osp = objects.FillDict(cluster.osparams, {})
1044     if self.op.osparams:
1045       for os_name, osp in self.op.osparams.items():
1046         if os_name not in self.new_osp:
1047           self.new_osp[os_name] = {}
1048
1049         self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
1050                                                  use_none=True)
1051
1052         if not self.new_osp[os_name]:
1053           # we removed all parameters
1054           del self.new_osp[os_name]
1055         else:
1056           # check the parameter validity (remote check)
1057           CheckOSParams(self, False, [self.cfg.GetMasterNode()],
1058                         os_name, self.new_osp[os_name])
1059
1060     # changes to the hypervisor list
1061     if self.op.enabled_hypervisors is not None:
1062       self.hv_list = self.op.enabled_hypervisors
1063       for hv in self.hv_list:
1064         # if the hypervisor doesn't already exist in the cluster
1065         # hvparams, we initialize it to empty, and then (in both
1066         # cases) we make sure to fill the defaults, as we might not
1067         # have a complete defaults list if the hypervisor wasn't
1068         # enabled before
1069         if hv not in new_hvp:
1070           new_hvp[hv] = {}
1071         new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
1072         utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
1073     else:
1074       self.hv_list = cluster.enabled_hypervisors
1075
1076     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1077       # either the enabled list has changed, or the parameters have, validate
1078       for hv_name, hv_params in self.new_hvparams.items():
1079         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1080             (self.op.enabled_hypervisors and
1081              hv_name in self.op.enabled_hypervisors)):
1082           # either this is a new hypervisor, or its parameters have changed
1083           hv_class = hypervisor.GetHypervisorClass(hv_name)
1084           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1085           hv_class.CheckParameterSyntax(hv_params)
1086           CheckHVParams(self, node_uuids, hv_name, hv_params)
1087
1088     self._CheckDiskTemplateConsistency()
1089
1090     if self.op.os_hvp:
1091       # no need to check any newly-enabled hypervisors, since the
1092       # defaults have already been checked in the above code-block
1093       for os_name, os_hvp in self.new_os_hvp.items():
1094         for hv_name, hv_params in os_hvp.items():
1095           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1096           # we need to fill in the new os_hvp on top of the actual hv_p
1097           cluster_defaults = self.new_hvparams.get(hv_name, {})
1098           new_osp = objects.FillDict(cluster_defaults, hv_params)
1099           hv_class = hypervisor.GetHypervisorClass(hv_name)
1100           hv_class.CheckParameterSyntax(new_osp)
1101           CheckHVParams(self, node_uuids, hv_name, new_osp)
1102
1103     if self.op.default_iallocator:
1104       alloc_script = utils.FindFile(self.op.default_iallocator,
1105                                     constants.IALLOCATOR_SEARCH_PATH,
1106                                     os.path.isfile)
1107       if alloc_script is None:
1108         raise errors.OpPrereqError("Invalid default iallocator script '%s'"
1109                                    " specified" % self.op.default_iallocator,
1110                                    errors.ECODE_INVAL)
1111
1112   def _CheckDiskTemplateConsistency(self):
1113     """Check whether the disk templates that are going to be disabled
1114        are still in use by some instances.
1115
1116     """
1117     if self.op.enabled_disk_templates:
1118       cluster = self.cfg.GetClusterInfo()
1119       instances = self.cfg.GetAllInstancesInfo()
1120
1121       disk_templates_to_remove = set(cluster.enabled_disk_templates) \
1122         - set(self.op.enabled_disk_templates)
1123       for instance in instances.itervalues():
1124         if instance.disk_template in disk_templates_to_remove:
1125           raise errors.OpPrereqError("Cannot disable disk template '%s',"
1126                                      " because instance '%s' is using it." %
1127                                      (instance.disk_template, instance.name))
1128
1129   def _SetVgName(self, feedback_fn):
1130     """Determines and sets the new volume group name.
1131
1132     """
1133     if self.op.vg_name is not None:
1134       new_volume = self.op.vg_name
1135       if not new_volume:
1136         new_volume = None
1137       if new_volume != self.cfg.GetVGName():
1138         self.cfg.SetVGName(new_volume)
1139       else:
1140         feedback_fn("Cluster LVM configuration already in desired"
1141                     " state, not changing")
1142
1143   def _SetFileStorageDir(self, feedback_fn):
1144     """Set the file storage directory.
1145
1146     """
1147     if self.op.file_storage_dir is not None:
1148       if self.cluster.file_storage_dir == self.op.file_storage_dir:
1149         feedback_fn("Global file storage dir already set to value '%s'"
1150                     % self.cluster.file_storage_dir)
1151       else:
1152         self.cluster.file_storage_dir = self.op.file_storage_dir
1153
1154   def _SetDrbdHelper(self, feedback_fn):
1155     """Set the DRBD usermode helper.
1156
1157     """
1158     if self.op.drbd_helper is not None:
1159       if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
1160         feedback_fn("Note that you specified a drbd user helper, but did not"
1161                     " enable the drbd disk template.")
1162       new_helper = self.op.drbd_helper
1163       if not new_helper:
1164         new_helper = None
1165       if new_helper != self.cfg.GetDRBDHelper():
1166         self.cfg.SetDRBDHelper(new_helper)
1167       else:
1168         feedback_fn("Cluster DRBD helper already in desired state,"
1169                     " not changing")
1170
1171   def Exec(self, feedback_fn):
1172     """Change the parameters of the cluster.
1173
1174     """
1175     if self.op.enabled_disk_templates:
1176       self.cluster.enabled_disk_templates = \
1177         list(set(self.op.enabled_disk_templates))
1178
1179     self._SetVgName(feedback_fn)
1180     self._SetFileStorageDir(feedback_fn)
1181     self._SetDrbdHelper(feedback_fn)
1182
1183     if self.op.hvparams:
1184       self.cluster.hvparams = self.new_hvparams
1185     if self.op.os_hvp:
1186       self.cluster.os_hvp = self.new_os_hvp
1187     if self.op.enabled_hypervisors is not None:
1188       self.cluster.hvparams = self.new_hvparams
1189       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1190     if self.op.beparams:
1191       self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1192     if self.op.nicparams:
1193       self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1194     if self.op.ipolicy:
1195       self.cluster.ipolicy = self.new_ipolicy
1196     if self.op.osparams:
1197       self.cluster.osparams = self.new_osp
1198     if self.op.ndparams:
1199       self.cluster.ndparams = self.new_ndparams
1200     if self.op.diskparams:
1201       self.cluster.diskparams = self.new_diskparams
1202     if self.op.hv_state:
1203       self.cluster.hv_state_static = self.new_hv_state
1204     if self.op.disk_state:
1205       self.cluster.disk_state_static = self.new_disk_state
1206
1207     if self.op.candidate_pool_size is not None:
1208       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1209       # we need to update the pool size here, otherwise the save will fail
1210       AdjustCandidatePool(self, [])
1211
1212     if self.op.maintain_node_health is not None:
1213       if self.op.maintain_node_health and not constants.ENABLE_CONFD:
1214         feedback_fn("Note: CONFD was disabled at build time, node health"
1215                     " maintenance is not useful (still enabling it)")
1216       self.cluster.maintain_node_health = self.op.maintain_node_health
1217
1218     if self.op.modify_etc_hosts is not None:
1219       self.cluster.modify_etc_hosts = self.op.modify_etc_hosts
1220
1221     if self.op.prealloc_wipe_disks is not None:
1222       self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
1223
1224     if self.op.add_uids is not None:
1225       uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
1226
1227     if self.op.remove_uids is not None:
1228       uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
1229
1230     if self.op.uid_pool is not None:
1231       self.cluster.uid_pool = self.op.uid_pool
1232
1233     if self.op.default_iallocator is not None:
1234       self.cluster.default_iallocator = self.op.default_iallocator
1235
1236     if self.op.reserved_lvs is not None:
1237       self.cluster.reserved_lvs = self.op.reserved_lvs
1238
1239     if self.op.use_external_mip_script is not None:
1240       self.cluster.use_external_mip_script = self.op.use_external_mip_script
1241
1242     def helper_os(aname, mods, desc):
1243       desc += " OS list"
1244       lst = getattr(self.cluster, aname)
1245       for key, val in mods:
1246         if key == constants.DDM_ADD:
1247           if val in lst:
1248             feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1249           else:
1250             lst.append(val)
1251         elif key == constants.DDM_REMOVE:
1252           if val in lst:
1253             lst.remove(val)
1254           else:
1255             feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1256         else:
1257           raise errors.ProgrammerError("Invalid modification '%s'" % key)
1258
1259     if self.op.hidden_os:
1260       helper_os("hidden_os", self.op.hidden_os, "hidden")
1261
1262     if self.op.blacklisted_os:
1263       helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1264
1265     if self.op.master_netdev:
1266       master_params = self.cfg.GetMasterNetworkParameters()
1267       ems = self.cfg.GetUseExternalMipScript()
1268       feedback_fn("Shutting down master ip on the current netdev (%s)" %
1269                   self.cluster.master_netdev)
1270       result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
1271                                                        master_params, ems)
1272       if not self.op.force:
1273         result.Raise("Could not disable the master ip")
1274       else:
1275         if result.fail_msg:
1276           msg = ("Could not disable the master ip (continuing anyway): %s" %
1277                  result.fail_msg)
1278           feedback_fn(msg)
1279       feedback_fn("Changing master_netdev from %s to %s" %
1280                   (master_params.netdev, self.op.master_netdev))
1281       self.cluster.master_netdev = self.op.master_netdev
1282
1283     if self.op.master_netmask:
1284       master_params = self.cfg.GetMasterNetworkParameters()
1285       feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1286       result = self.rpc.call_node_change_master_netmask(
1287                  master_params.uuid, master_params.netmask,
1288                  self.op.master_netmask, master_params.ip,
1289                  master_params.netdev)
1290       result.Warn("Could not change the master IP netmask", feedback_fn)
1291       self.cluster.master_netmask = self.op.master_netmask
1292
1293     self.cfg.Update(self.cluster, feedback_fn)
1294
1295     if self.op.master_netdev:
1296       master_params = self.cfg.GetMasterNetworkParameters()
1297       feedback_fn("Starting the master ip on the new master netdev (%s)" %
1298                   self.op.master_netdev)
1299       ems = self.cfg.GetUseExternalMipScript()
1300       result = self.rpc.call_node_activate_master_ip(master_params.uuid,
1301                                                      master_params, ems)
1302       result.Warn("Could not re-enable the master ip on the master,"
1303                   " please restart manually", self.LogWarning)
1304
1305
1306 class LUClusterVerify(NoHooksLU):
1307   """Submits all jobs necessary to verify the cluster.
1308
1309   """
1310   REQ_BGL = False
1311
1312   def ExpandNames(self):
1313     self.needed_locks = {}
1314
1315   def Exec(self, feedback_fn):
1316     jobs = []
1317
1318     if self.op.group_name:
1319       groups = [self.op.group_name]
1320       depends_fn = lambda: None
1321     else:
1322       groups = self.cfg.GetNodeGroupList()
1323
1324       # Verify global configuration
1325       jobs.append([
1326         opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1327         ])
1328
1329       # Always depend on global verification
1330       depends_fn = lambda: [(-len(jobs), [])]
1331
1332     jobs.extend(
1333       [opcodes.OpClusterVerifyGroup(group_name=group,
1334                                     ignore_errors=self.op.ignore_errors,
1335                                     depends=depends_fn())]
1336       for group in groups)
1337
1338     # Fix up all parameters
1339     for op in itertools.chain(*jobs): # pylint: disable=W0142
1340       op.debug_simulate_errors = self.op.debug_simulate_errors
1341       op.verbose = self.op.verbose
1342       op.error_codes = self.op.error_codes
1343       try:
1344         op.skip_checks = self.op.skip_checks
1345       except AttributeError:
1346         assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1347
1348     return ResultWithJobs(jobs)
1349
1350
1351 class _VerifyErrors(object):
1352   """Mix-in for cluster/group verify LUs.
1353
1354   It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1355   self.op and self._feedback_fn to be available.)
1356
1357   """
1358
1359   ETYPE_FIELD = "code"
1360   ETYPE_ERROR = "ERROR"
1361   ETYPE_WARNING = "WARNING"
1362
1363   def _Error(self, ecode, item, msg, *args, **kwargs):
1364     """Format an error message.
1365
1366     Based on the opcode's error_codes parameter, either format a
1367     parseable error code, or a simpler error string.
1368
1369     This must be called only from Exec and functions called from Exec.
1370
1371     """
1372     ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1373     itype, etxt, _ = ecode
1374     # If the error code is in the list of ignored errors, demote the error to a
1375     # warning
1376     if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1377       ltype = self.ETYPE_WARNING
1378     # first complete the msg
1379     if args:
1380       msg = msg % args
1381     # then format the whole message
1382     if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1383       msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1384     else:
1385       if item:
1386         item = " " + item
1387       else:
1388         item = ""
1389       msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1390     # and finally report it via the feedback_fn
1391     self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1392     # do not mark the operation as failed for WARN cases only
1393     if ltype == self.ETYPE_ERROR:
1394       self.bad = True
1395
1396   def _ErrorIf(self, cond, *args, **kwargs):
1397     """Log an error message if the passed condition is True.
1398
1399     """
1400     if (bool(cond)
1401         or self.op.debug_simulate_errors): # pylint: disable=E1101
1402       self._Error(*args, **kwargs)
1403
1404
1405 def _VerifyCertificate(filename):
1406   """Verifies a certificate for L{LUClusterVerifyConfig}.
1407
1408   @type filename: string
1409   @param filename: Path to PEM file
1410
1411   """
1412   try:
1413     cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1414                                            utils.ReadFile(filename))
1415   except Exception, err: # pylint: disable=W0703
1416     return (LUClusterVerifyConfig.ETYPE_ERROR,
1417             "Failed to load X509 certificate %s: %s" % (filename, err))
1418
1419   (errcode, msg) = \
1420     utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1421                                 constants.SSL_CERT_EXPIRATION_ERROR)
1422
1423   if msg:
1424     fnamemsg = "While verifying %s: %s" % (filename, msg)
1425   else:
1426     fnamemsg = None
1427
1428   if errcode is None:
1429     return (None, fnamemsg)
1430   elif errcode == utils.CERT_WARNING:
1431     return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1432   elif errcode == utils.CERT_ERROR:
1433     return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1434
1435   raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1436
1437
1438 def _GetAllHypervisorParameters(cluster, instances):
1439   """Compute the set of all hypervisor parameters.
1440
1441   @type cluster: L{objects.Cluster}
1442   @param cluster: the cluster object
1443   @param instances: list of L{objects.Instance}
1444   @param instances: additional instances from which to obtain parameters
1445   @rtype: list of (origin, hypervisor, parameters)
1446   @return: a list with all parameters found, indicating the hypervisor they
1447        apply to, and the origin (can be "cluster", "os X", or "instance Y")
1448
1449   """
1450   hvp_data = []
1451
1452   for hv_name in cluster.enabled_hypervisors:
1453     hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1454
1455   for os_name, os_hvp in cluster.os_hvp.items():
1456     for hv_name, hv_params in os_hvp.items():
1457       if hv_params:
1458         full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1459         hvp_data.append(("os %s" % os_name, hv_name, full_params))
1460
1461   # TODO: collapse identical parameter values in a single one
1462   for instance in instances:
1463     if instance.hvparams:
1464       hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1465                        cluster.FillHV(instance)))
1466
1467   return hvp_data
1468
1469
1470 class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1471   """Verifies the cluster config.
1472
1473   """
1474   REQ_BGL = False
1475
1476   def _VerifyHVP(self, hvp_data):
1477     """Verifies locally the syntax of the hypervisor parameters.
1478
1479     """
1480     for item, hv_name, hv_params in hvp_data:
1481       msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1482              (item, hv_name))
1483       try:
1484         hv_class = hypervisor.GetHypervisorClass(hv_name)
1485         utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1486         hv_class.CheckParameterSyntax(hv_params)
1487       except errors.GenericError, err:
1488         self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1489
1490   def ExpandNames(self):
1491     self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1492     self.share_locks = ShareAll()
1493
1494   def CheckPrereq(self):
1495     """Check prerequisites.
1496
1497     """
1498     # Retrieve all information
1499     self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1500     self.all_node_info = self.cfg.GetAllNodesInfo()
1501     self.all_inst_info = self.cfg.GetAllInstancesInfo()
1502
1503   def Exec(self, feedback_fn):
1504     """Verify integrity of cluster, performing various test on nodes.
1505
1506     """
1507     self.bad = False
1508     self._feedback_fn = feedback_fn
1509
1510     feedback_fn("* Verifying cluster config")
1511
1512     for msg in self.cfg.VerifyConfig():
1513       self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1514
1515     feedback_fn("* Verifying cluster certificate files")
1516
1517     for cert_filename in pathutils.ALL_CERT_FILES:
1518       (errcode, msg) = _VerifyCertificate(cert_filename)
1519       self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1520
1521     self._ErrorIf(not utils.CanRead(constants.LUXID_USER,
1522                                     pathutils.NODED_CERT_FILE),
1523                   constants.CV_ECLUSTERCERT,
1524                   None,
1525                   pathutils.NODED_CERT_FILE + " must be accessible by the " +
1526                     constants.LUXID_USER + " user")
1527
1528     feedback_fn("* Verifying hypervisor parameters")
1529
1530     self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1531                                                 self.all_inst_info.values()))
1532
1533     feedback_fn("* Verifying all nodes belong to an existing group")
1534
1535     # We do this verification here because, should this bogus circumstance
1536     # occur, it would never be caught by VerifyGroup, which only acts on
1537     # nodes/instances reachable from existing node groups.
1538
1539     dangling_nodes = set(node for node in self.all_node_info.values()
1540                          if node.group not in self.all_group_info)
1541
1542     dangling_instances = {}
1543     no_node_instances = []
1544
1545     for inst in self.all_inst_info.values():
1546       if inst.primary_node in [node.uuid for node in dangling_nodes]:
1547         dangling_instances.setdefault(inst.primary_node, []).append(inst)
1548       elif inst.primary_node not in self.all_node_info:
1549         no_node_instances.append(inst)
1550
1551     pretty_dangling = [
1552         "%s (%s)" %
1553         (node.name,
1554          utils.CommaJoin(inst.name for
1555                          inst in dangling_instances.get(node.uuid, [])))
1556         for node in dangling_nodes]
1557
1558     self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1559                   None,
1560                   "the following nodes (and their instances) belong to a non"
1561                   " existing group: %s", utils.CommaJoin(pretty_dangling))
1562
1563     self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1564                   None,
1565                   "the following instances have a non-existing primary-node:"
1566                   " %s", utils.CommaJoin(inst.name for
1567                                          inst in no_node_instances))
1568
1569     return not self.bad
1570
1571
1572 class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1573   """Verifies the status of a node group.
1574
1575   """
1576   HPATH = "cluster-verify"
1577   HTYPE = constants.HTYPE_CLUSTER
1578   REQ_BGL = False
1579
1580   _HOOKS_INDENT_RE = re.compile("^", re.M)
1581
1582   class NodeImage(object):
1583     """A class representing the logical and physical status of a node.
1584
1585     @type uuid: string
1586     @ivar uuid: the node UUID to which this object refers
1587     @ivar volumes: a structure as returned from
1588         L{ganeti.backend.GetVolumeList} (runtime)
1589     @ivar instances: a list of running instances (runtime)
1590     @ivar pinst: list of configured primary instances (config)
1591     @ivar sinst: list of configured secondary instances (config)
1592     @ivar sbp: dictionary of {primary-node: list of instances} for all
1593         instances for which this node is secondary (config)
1594     @ivar mfree: free memory, as reported by hypervisor (runtime)
1595     @ivar dfree: free disk, as reported by the node (runtime)
1596     @ivar offline: the offline status (config)
1597     @type rpc_fail: boolean
1598     @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1599         not whether the individual keys were correct) (runtime)
1600     @type lvm_fail: boolean
1601     @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1602     @type hyp_fail: boolean
1603     @ivar hyp_fail: whether the RPC call didn't return the instance list
1604     @type ghost: boolean
1605     @ivar ghost: whether this is a known node or not (config)
1606     @type os_fail: boolean
1607     @ivar os_fail: whether the RPC call didn't return valid OS data
1608     @type oslist: list
1609     @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1610     @type vm_capable: boolean
1611     @ivar vm_capable: whether the node can host instances
1612     @type pv_min: float
1613     @ivar pv_min: size in MiB of the smallest PVs
1614     @type pv_max: float
1615     @ivar pv_max: size in MiB of the biggest PVs
1616
1617     """
1618     def __init__(self, offline=False, uuid=None, vm_capable=True):
1619       self.uuid = uuid
1620       self.volumes = {}
1621       self.instances = []
1622       self.pinst = []
1623       self.sinst = []
1624       self.sbp = {}
1625       self.mfree = 0
1626       self.dfree = 0
1627       self.offline = offline
1628       self.vm_capable = vm_capable
1629       self.rpc_fail = False
1630       self.lvm_fail = False
1631       self.hyp_fail = False
1632       self.ghost = False
1633       self.os_fail = False
1634       self.oslist = {}
1635       self.pv_min = None
1636       self.pv_max = None
1637
1638   def ExpandNames(self):
1639     # This raises errors.OpPrereqError on its own:
1640     self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1641
1642     # Get instances in node group; this is unsafe and needs verification later
1643     inst_uuids = \
1644       self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1645
1646     self.needed_locks = {
1647       locking.LEVEL_INSTANCE: self.cfg.GetInstanceNames(inst_uuids),
1648       locking.LEVEL_NODEGROUP: [self.group_uuid],
1649       locking.LEVEL_NODE: [],
1650
1651       # This opcode is run by watcher every five minutes and acquires all nodes
1652       # for a group. It doesn't run for a long time, so it's better to acquire
1653       # the node allocation lock as well.
1654       locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1655       }
1656
1657     self.share_locks = ShareAll()
1658
1659   def DeclareLocks(self, level):
1660     if level == locking.LEVEL_NODE:
1661       # Get members of node group; this is unsafe and needs verification later
1662       nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1663
1664       # In Exec(), we warn about mirrored instances that have primary and
1665       # secondary living in separate node groups. To fully verify that
1666       # volumes for these instances are healthy, we will need to do an
1667       # extra call to their secondaries. We ensure here those nodes will
1668       # be locked.
1669       for inst_name in self.owned_locks(locking.LEVEL_INSTANCE):
1670         # Important: access only the instances whose lock is owned
1671         instance = self.cfg.GetInstanceInfoByName(inst_name)
1672         if instance.disk_template in constants.DTS_INT_MIRROR:
1673           nodes.update(instance.secondary_nodes)
1674
1675       self.needed_locks[locking.LEVEL_NODE] = nodes
1676
1677   def CheckPrereq(self):
1678     assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1679     self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1680
1681     group_node_uuids = set(self.group_info.members)
1682     group_inst_uuids = \
1683       self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1684
1685     unlocked_node_uuids = \
1686         group_node_uuids.difference(self.owned_locks(locking.LEVEL_NODE))
1687
1688     unlocked_inst_uuids = \
1689         group_inst_uuids.difference(
1690           [self.cfg.GetInstanceInfoByName(name).uuid
1691            for name in self.owned_locks(locking.LEVEL_INSTANCE)])
1692
1693     if unlocked_node_uuids:
1694       raise errors.OpPrereqError(
1695         "Missing lock for nodes: %s" %
1696         utils.CommaJoin(self.cfg.GetNodeNames(unlocked_node_uuids)),
1697         errors.ECODE_STATE)
1698
1699     if unlocked_inst_uuids:
1700       raise errors.OpPrereqError(
1701         "Missing lock for instances: %s" %
1702         utils.CommaJoin(self.cfg.GetInstanceNames(unlocked_inst_uuids)),
1703         errors.ECODE_STATE)
1704
1705     self.all_node_info = self.cfg.GetAllNodesInfo()
1706     self.all_inst_info = self.cfg.GetAllInstancesInfo()
1707
1708     self.my_node_uuids = group_node_uuids
1709     self.my_node_info = dict((node_uuid, self.all_node_info[node_uuid])
1710                              for node_uuid in group_node_uuids)
1711
1712     self.my_inst_uuids = group_inst_uuids
1713     self.my_inst_info = dict((inst_uuid, self.all_inst_info[inst_uuid])
1714                              for inst_uuid in group_inst_uuids)
1715
1716     # We detect here the nodes that will need the extra RPC calls for verifying
1717     # split LV volumes; they should be locked.
1718     extra_lv_nodes = set()
1719
1720     for inst in self.my_inst_info.values():
1721       if inst.disk_template in constants.DTS_INT_MIRROR:
1722         for nuuid in inst.all_nodes:
1723           if self.all_node_info[nuuid].group != self.group_uuid:
1724             extra_lv_nodes.add(nuuid)
1725
1726     unlocked_lv_nodes = \
1727         extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1728
1729     if unlocked_lv_nodes:
1730       raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1731                                  utils.CommaJoin(unlocked_lv_nodes),
1732                                  errors.ECODE_STATE)
1733     self.extra_lv_nodes = list(extra_lv_nodes)
1734
1735   def _VerifyNode(self, ninfo, nresult):
1736     """Perform some basic validation on data returned from a node.
1737
1738       - check the result data structure is well formed and has all the
1739         mandatory fields
1740       - check ganeti version
1741
1742     @type ninfo: L{objects.Node}
1743     @param ninfo: the node to check
1744     @param nresult: the results from the node
1745     @rtype: boolean
1746     @return: whether overall this call was successful (and we can expect
1747          reasonable values in the respose)
1748
1749     """
1750     # main result, nresult should be a non-empty dict
1751     test = not nresult or not isinstance(nresult, dict)
1752     self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1753                   "unable to verify node: no data returned")
1754     if test:
1755       return False
1756
1757     # compares ganeti version
1758     local_version = constants.PROTOCOL_VERSION
1759     remote_version = nresult.get("version", None)
1760     test = not (remote_version and
1761                 isinstance(remote_version, (list, tuple)) and
1762                 len(remote_version) == 2)
1763     self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1764                   "connection to node returned invalid data")
1765     if test:
1766       return False
1767
1768     test = local_version != remote_version[0]
1769     self._ErrorIf(test, constants.CV_ENODEVERSION, ninfo.name,
1770                   "incompatible protocol versions: master %s,"
1771                   " node %s", local_version, remote_version[0])
1772     if test:
1773       return False
1774
1775     # node seems compatible, we can actually try to look into its results
1776
1777     # full package version
1778     self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1779                   constants.CV_ENODEVERSION, ninfo.name,
1780                   "software version mismatch: master %s, node %s",
1781                   constants.RELEASE_VERSION, remote_version[1],
1782                   code=self.ETYPE_WARNING)
1783
1784     hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1785     if ninfo.vm_capable and isinstance(hyp_result, dict):
1786       for hv_name, hv_result in hyp_result.iteritems():
1787         test = hv_result is not None
1788         self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
1789                       "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1790
1791     hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1792     if ninfo.vm_capable and isinstance(hvp_result, list):
1793       for item, hv_name, hv_result in hvp_result:
1794         self._ErrorIf(True, constants.CV_ENODEHV, ninfo.name,
1795                       "hypervisor %s parameter verify failure (source %s): %s",
1796                       hv_name, item, hv_result)
1797
1798     test = nresult.get(constants.NV_NODESETUP,
1799                        ["Missing NODESETUP results"])
1800     self._ErrorIf(test, constants.CV_ENODESETUP, ninfo.name,
1801                   "node setup error: %s", "; ".join(test))
1802
1803     return True
1804
1805   def _VerifyNodeTime(self, ninfo, nresult,
1806                       nvinfo_starttime, nvinfo_endtime):
1807     """Check the node time.
1808
1809     @type ninfo: L{objects.Node}
1810     @param ninfo: the node to check
1811     @param nresult: the remote results for the node
1812     @param nvinfo_starttime: the start time of the RPC call
1813     @param nvinfo_endtime: the end time of the RPC call
1814
1815     """
1816     ntime = nresult.get(constants.NV_TIME, None)
1817     try:
1818       ntime_merged = utils.MergeTime(ntime)
1819     except (ValueError, TypeError):
1820       self._ErrorIf(True, constants.CV_ENODETIME, ninfo.name,
1821                     "Node returned invalid time")
1822       return
1823
1824     if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1825       ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1826     elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1827       ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1828     else:
1829       ntime_diff = None
1830
1831     self._ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, ninfo.name,
1832                   "Node time diverges by at least %s from master node time",
1833                   ntime_diff)
1834
1835   def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1836     """Check the node LVM results and update info for cross-node checks.
1837
1838     @type ninfo: L{objects.Node}
1839     @param ninfo: the node to check
1840     @param nresult: the remote results for the node
1841     @param vg_name: the configured VG name
1842     @type nimg: L{NodeImage}
1843     @param nimg: node image
1844
1845     """
1846     if vg_name is None:
1847       return
1848
1849     # checks vg existence and size > 20G
1850     vglist = nresult.get(constants.NV_VGLIST, None)
1851     test = not vglist
1852     self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
1853                   "unable to check volume groups")
1854     if not test:
1855       vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1856                                             constants.MIN_VG_SIZE)
1857       self._ErrorIf(vgstatus, constants.CV_ENODELVM, ninfo.name, vgstatus)
1858
1859     # Check PVs
1860     (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1861     for em in errmsgs:
1862       self._Error(constants.CV_ENODELVM, ninfo.name, em)
1863     if pvminmax is not None:
1864       (nimg.pv_min, nimg.pv_max) = pvminmax
1865
1866   def _VerifyGroupDRBDVersion(self, node_verify_infos):
1867     """Check cross-node DRBD version consistency.
1868
1869     @type node_verify_infos: dict
1870     @param node_verify_infos: infos about nodes as returned from the
1871       node_verify call.
1872
1873     """
1874     node_versions = {}
1875     for node_uuid, ndata in node_verify_infos.items():
1876       nresult = ndata.payload
1877       version = nresult.get(constants.NV_DRBDVERSION, "Missing DRBD version")
1878       node_versions[node_uuid] = version
1879
1880     if len(set(node_versions.values())) > 1:
1881       for node_uuid, version in sorted(node_versions.items()):
1882         msg = "DRBD version mismatch: %s" % version
1883         self._Error(constants.CV_ENODEDRBDHELPER, node_uuid, msg,
1884                     code=self.ETYPE_WARNING)
1885
1886   def _VerifyGroupLVM(self, node_image, vg_name):
1887     """Check cross-node consistency in LVM.
1888
1889     @type node_image: dict
1890     @param node_image: info about nodes, mapping from node to names to
1891       L{NodeImage} objects
1892     @param vg_name: the configured VG name
1893
1894     """
1895     if vg_name is None:
1896       return
1897
1898     # Only exclusive storage needs this kind of checks
1899     if not self._exclusive_storage:
1900       return
1901
1902     # exclusive_storage wants all PVs to have the same size (approximately),
1903     # if the smallest and the biggest ones are okay, everything is fine.
1904     # pv_min is None iff pv_max is None
1905     vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1906     if not vals:
1907       return
1908     (pvmin, minnode_uuid) = min((ni.pv_min, ni.uuid) for ni in vals)
1909     (pvmax, maxnode_uuid) = max((ni.pv_max, ni.uuid) for ni in vals)
1910     bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1911     self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1912                   "PV sizes differ too much in the group; smallest (%s MB) is"
1913                   " on %s, biggest (%s MB) is on %s",
1914                   pvmin, self.cfg.GetNodeName(minnode_uuid),
1915                   pvmax, self.cfg.GetNodeName(maxnode_uuid))
1916
1917   def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1918     """Check the node bridges.
1919
1920     @type ninfo: L{objects.Node}
1921     @param ninfo: the node to check
1922     @param nresult: the remote results for the node
1923     @param bridges: the expected list of bridges
1924
1925     """
1926     if not bridges:
1927       return
1928
1929     missing = nresult.get(constants.NV_BRIDGES, None)
1930     test = not isinstance(missing, list)
1931     self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1932                   "did not return valid bridge information")
1933     if not test:
1934       self._ErrorIf(bool(missing), constants.CV_ENODENET, ninfo.name,
1935                     "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1936
1937   def _VerifyNodeUserScripts(self, ninfo, nresult):
1938     """Check the results of user scripts presence and executability on the node
1939
1940     @type ninfo: L{objects.Node}
1941     @param ninfo: the node to check
1942     @param nresult: the remote results for the node
1943
1944     """
1945     test = not constants.NV_USERSCRIPTS in nresult
1946     self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1947                   "did not return user scripts information")
1948
1949     broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
1950     if not test:
1951       self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1952                     "user scripts not present or not executable: %s" %
1953                     utils.CommaJoin(sorted(broken_scripts)))
1954
1955   def _VerifyNodeNetwork(self, ninfo, nresult):
1956     """Check the node network connectivity results.
1957
1958     @type ninfo: L{objects.Node}
1959     @param ninfo: the node to check
1960     @param nresult: the remote results for the node
1961
1962     """
1963     test = constants.NV_NODELIST not in nresult
1964     self._ErrorIf(test, constants.CV_ENODESSH, ninfo.name,
1965                   "node hasn't returned node ssh connectivity data")
1966     if not test:
1967       if nresult[constants.NV_NODELIST]:
1968         for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1969           self._ErrorIf(True, constants.CV_ENODESSH, ninfo.name,
1970                         "ssh communication with node '%s': %s", a_node, a_msg)
1971
1972     test = constants.NV_NODENETTEST not in nresult
1973     self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1974                   "node hasn't returned node tcp connectivity data")
1975     if not test:
1976       if nresult[constants.NV_NODENETTEST]:
1977         nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1978         for anode in nlist:
1979           self._ErrorIf(True, constants.CV_ENODENET, ninfo.name,
1980                         "tcp communication with node '%s': %s",
1981                         anode, nresult[constants.NV_NODENETTEST][anode])
1982
1983     test = constants.NV_MASTERIP not in nresult
1984     self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1985                   "node hasn't returned node master IP reachability data")
1986     if not test:
1987       if not nresult[constants.NV_MASTERIP]:
1988         if ninfo.uuid == self.master_node:
1989           msg = "the master node cannot reach the master IP (not configured?)"
1990         else:
1991           msg = "cannot reach the master IP"
1992         self._ErrorIf(True, constants.CV_ENODENET, ninfo.name, msg)
1993
1994   def _VerifyInstance(self, instance, node_image, diskstatus):
1995     """Verify an instance.
1996
1997     This function checks to see if the required block devices are
1998     available on the instance's node, and that the nodes are in the correct
1999     state.
2000
2001     """
2002     pnode_uuid = instance.primary_node
2003     pnode_img = node_image[pnode_uuid]
2004     groupinfo = self.cfg.GetAllNodeGroupsInfo()
2005
2006     node_vol_should = {}
2007     instance.MapLVsByNode(node_vol_should)
2008
2009     cluster = self.cfg.GetClusterInfo()
2010     ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2011                                                             self.group_info)
2012     err = ComputeIPolicyInstanceViolation(ipolicy, instance, self.cfg)
2013     self._ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance.name,
2014                   utils.CommaJoin(err), code=self.ETYPE_WARNING)
2015
2016     for node_uuid in node_vol_should:
2017       n_img = node_image[node_uuid]
2018       if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
2019         # ignore missing volumes on offline or broken nodes
2020         continue
2021       for volume in node_vol_should[node_uuid]:
2022         test = volume not in n_img.volumes
2023         self._ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance.name,
2024                       "volume %s missing on node %s", volume,
2025                       self.cfg.GetNodeName(node_uuid))
2026
2027     if instance.admin_state == constants.ADMINST_UP:
2028       test = instance.uuid not in pnode_img.instances and not pnode_img.offline
2029       self._ErrorIf(test, constants.CV_EINSTANCEDOWN, instance.name,
2030                     "instance not running on its primary node %s",
2031                      self.cfg.GetNodeName(pnode_uuid))
2032       self._ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE,
2033                     instance.name, "instance is marked as running and lives on"
2034                     " offline node %s", self.cfg.GetNodeName(pnode_uuid))
2035
2036     diskdata = [(nname, success, status, idx)
2037                 for (nname, disks) in diskstatus.items()
2038                 for idx, (success, status) in enumerate(disks)]
2039
2040     for nname, success, bdev_status, idx in diskdata:
2041       # the 'ghost node' construction in Exec() ensures that we have a
2042       # node here
2043       snode = node_image[nname]
2044       bad_snode = snode.ghost or snode.offline
2045       self._ErrorIf(instance.disks_active and
2046                     not success and not bad_snode,
2047                     constants.CV_EINSTANCEFAULTYDISK, instance.name,
2048                     "couldn't retrieve status for disk/%s on %s: %s",
2049                     idx, self.cfg.GetNodeName(nname), bdev_status)
2050
2051       if instance.disks_active and success and \
2052          (bdev_status.is_degraded or
2053           bdev_status.ldisk_status != constants.LDS_OKAY):
2054         msg = "disk/%s on %s" % (idx, self.cfg.GetNodeName(nname))
2055         if bdev_status.is_degraded:
2056           msg += " is degraded"
2057         if bdev_status.ldisk_status != constants.LDS_OKAY:
2058           msg += "; state is '%s'" % \
2059                  constants.LDS_NAMES[bdev_status.ldisk_status]
2060
2061         self._Error(constants.CV_EINSTANCEFAULTYDISK, instance.name, msg)
2062
2063     self._ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2064                   constants.CV_ENODERPC, self.cfg.GetNodeName(pnode_uuid),
2065                   "instance %s, connection to primary node failed",
2066                   instance.name)
2067
2068     self._ErrorIf(len(instance.secondary_nodes) > 1,
2069                   constants.CV_EINSTANCELAYOUT, instance.name,
2070                   "instance has multiple secondary nodes: %s",
2071                   utils.CommaJoin(instance.secondary_nodes),
2072                   code=self.ETYPE_WARNING)
2073
2074     es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, instance.all_nodes)
2075     if any(es_flags.values()):
2076       if instance.disk_template not in constants.DTS_EXCL_STORAGE:
2077         # Disk template not compatible with exclusive_storage: no instance
2078         # node should have the flag set
2079         es_nodes = [n
2080                     for (n, es) in es_flags.items()
2081                     if es]
2082         self._Error(constants.CV_EINSTANCEUNSUITABLENODE, instance.name,
2083                     "instance has template %s, which is not supported on nodes"
2084                     " that have exclusive storage set: %s",
2085                     instance.disk_template,
2086                     utils.CommaJoin(self.cfg.GetNodeNames(es_nodes)))
2087       for (idx, disk) in enumerate(instance.disks):
2088         self._ErrorIf(disk.spindles is None,
2089                       constants.CV_EINSTANCEMISSINGCFGPARAMETER, instance.name,
2090                       "number of spindles not configured for disk %s while"
2091                       " exclusive storage is enabled, try running"
2092                       " gnt-cluster repair-disk-sizes", idx)
2093
2094     if instance.disk_template in constants.DTS_INT_MIRROR:
2095       instance_nodes = utils.NiceSort(instance.all_nodes)
2096       instance_groups = {}
2097
2098       for node_uuid in instance_nodes:
2099         instance_groups.setdefault(self.all_node_info[node_uuid].group,
2100                                    []).append(node_uuid)
2101
2102       pretty_list = [
2103         "%s (group %s)" % (utils.CommaJoin(self.cfg.GetNodeNames(nodes)),
2104                            groupinfo[group].name)
2105         # Sort so that we always list the primary node first.
2106         for group, nodes in sorted(instance_groups.items(),
2107                                    key=lambda (_, nodes): pnode_uuid in nodes,
2108                                    reverse=True)]
2109
2110       self._ErrorIf(len(instance_groups) > 1,
2111                     constants.CV_EINSTANCESPLITGROUPS,
2112                     instance.name, "instance has primary and secondary nodes in"
2113                     " different groups: %s", utils.CommaJoin(pretty_list),
2114                     code=self.ETYPE_WARNING)
2115
2116     inst_nodes_offline = []
2117     for snode in instance.secondary_nodes:
2118       s_img = node_image[snode]
2119       self._ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
2120                     self.cfg.GetNodeName(snode),
2121                     "instance %s, connection to secondary node failed",
2122                     instance.name)
2123
2124       if s_img.offline:
2125         inst_nodes_offline.append(snode)
2126
2127     # warn that the instance lives on offline nodes
2128     self._ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE,
2129                   instance.name, "instance has offline secondary node(s) %s",
2130                   utils.CommaJoin(self.cfg.GetNodeNames(inst_nodes_offline)))
2131     # ... or ghost/non-vm_capable nodes
2132     for node_uuid in instance.all_nodes:
2133       self._ErrorIf(node_image[node_uuid].ghost, constants.CV_EINSTANCEBADNODE,
2134                     instance.name, "instance lives on ghost node %s",
2135                     self.cfg.GetNodeName(node_uuid))
2136       self._ErrorIf(not node_image[node_uuid].vm_capable,
2137                     constants.CV_EINSTANCEBADNODE, instance.name,
2138                     "instance lives on non-vm_capable node %s",
2139                     self.cfg.GetNodeName(node_uuid))
2140
2141   def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2142     """Verify if there are any unknown volumes in the cluster.
2143
2144     The .os, .swap and backup volumes are ignored. All other volumes are
2145     reported as unknown.
2146
2147     @type reserved: L{ganeti.utils.FieldSet}
2148     @param reserved: a FieldSet of reserved volume names
2149
2150     """
2151     for node_uuid, n_img in node_image.items():
2152       if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
2153           self.all_node_info[node_uuid].group != self.group_uuid):
2154         # skip non-healthy nodes
2155         continue
2156       for volume in n_img.volumes:
2157         test = ((node_uuid not in node_vol_should or
2158                 volume not in node_vol_should[node_uuid]) and
2159                 not reserved.Matches(volume))
2160         self._ErrorIf(test, constants.CV_ENODEORPHANLV,
2161                       self.cfg.GetNodeName(node_uuid),
2162                       "volume %s is unknown", volume)
2163
2164   def _VerifyNPlusOneMemory(self, node_image, all_insts):
2165     """Verify N+1 Memory Resilience.
2166
2167     Check that if one single node dies we can still start all the
2168     instances it was primary for.
2169
2170     """
2171     cluster_info = self.cfg.GetClusterInfo()
2172     for node_uuid, n_img in node_image.items():
2173       # This code checks that every node which is now listed as
2174       # secondary has enough memory to host all instances it is
2175       # supposed to should a single other node in the cluster fail.
2176       # FIXME: not ready for failover to an arbitrary node
2177       # FIXME: does not support file-backed instances
2178       # WARNING: we currently take into account down instances as well
2179       # as up ones, considering that even if they're down someone
2180       # might want to start them even in the event of a node failure.
2181       if n_img.offline or \
2182          self.all_node_info[node_uuid].group != self.group_uuid:
2183         # we're skipping nodes marked offline and nodes in other groups from
2184         # the N+1 warning, since most likely we don't have good memory
2185         # information from them; we already list instances living on such
2186         # nodes, and that's enough warning
2187         continue
2188       #TODO(dynmem): also consider ballooning out other instances
2189       for prinode, inst_uuids in n_img.sbp.items():
2190         needed_mem = 0
2191         for inst_uuid in inst_uuids:
2192           bep = cluster_info.FillBE(all_insts[inst_uuid])
2193           if bep[constants.BE_AUTO_BALANCE]:
2194             needed_mem += bep[constants.BE_MINMEM]
2195         test = n_img.mfree < needed_mem
2196         self._ErrorIf(test, constants.CV_ENODEN1,
2197                       self.cfg.GetNodeName(node_uuid),
2198                       "not enough memory to accomodate instance failovers"
2199                       " should node %s fail (%dMiB needed, %dMiB available)",
2200                       self.cfg.GetNodeName(prinode), needed_mem, n_img.mfree)
2201
2202   def _VerifyFiles(self, nodes, master_node_uuid, all_nvinfo,
2203                    (files_all, files_opt, files_mc, files_vm)):
2204     """Verifies file checksums collected from all nodes.
2205
2206     @param nodes: List of L{objects.Node} objects
2207     @param master_node_uuid: UUID of master node
2208     @param all_nvinfo: RPC results
2209
2210     """
2211     # Define functions determining which nodes to consider for a file
2212     files2nodefn = [
2213       (files_all, None),
2214       (files_mc, lambda node: (node.master_candidate or
2215                                node.uuid == master_node_uuid)),
2216       (files_vm, lambda node: node.vm_capable),
2217       ]
2218
2219     # Build mapping from filename to list of nodes which should have the file
2220     nodefiles = {}
2221     for (files, fn) in files2nodefn:
2222       if fn is None:
2223         filenodes = nodes
2224       else:
2225         filenodes = filter(fn, nodes)
2226       nodefiles.update((filename,
2227                         frozenset(map(operator.attrgetter("uuid"), filenodes)))
2228                        for filename in files)
2229
2230     assert set(nodefiles) == (files_all | files_mc | files_vm)
2231
2232     fileinfo = dict((filename, {}) for filename in nodefiles)
2233     ignore_nodes = set()
2234
2235     for node in nodes:
2236       if node.offline:
2237         ignore_nodes.add(node.uuid)
2238         continue
2239
2240       nresult = all_nvinfo[node.uuid]
2241
2242       if nresult.fail_msg or not nresult.payload:
2243         node_files = None
2244       else:
2245         fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
2246         node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2247                           for (key, value) in fingerprints.items())
2248         del fingerprints
2249
2250       test = not (node_files and isinstance(node_files, dict))
2251       self._ErrorIf(test, constants.CV_ENODEFILECHECK, node.name,
2252                     "Node did not return file checksum data")
2253       if test:
2254         ignore_nodes.add(node.uuid)
2255         continue
2256
2257       # Build per-checksum mapping from filename to nodes having it
2258       for (filename, checksum) in node_files.items():
2259         assert filename in nodefiles
2260         fileinfo[filename].setdefault(checksum, set()).add(node.uuid)
2261
2262     for (filename, checksums) in fileinfo.items():
2263       assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2264
2265       # Nodes having the file
2266       with_file = frozenset(node_uuid
2267                             for node_uuids in fileinfo[filename].values()
2268                             for node_uuid in node_uuids) - ignore_nodes
2269
2270       expected_nodes = nodefiles[filename] - ignore_nodes
2271
2272       # Nodes missing file
2273       missing_file = expected_nodes - with_file
2274
2275       if filename in files_opt:
2276         # All or no nodes
2277         self._ErrorIf(missing_file and missing_file != expected_nodes,
2278                       constants.CV_ECLUSTERFILECHECK, None,
2279                       "File %s is optional, but it must exist on all or no"
2280                       " nodes (not found on %s)",
2281                       filename,
2282                       utils.CommaJoin(
2283                         utils.NiceSort(
2284                           map(self.cfg.GetNodeName, missing_file))))
2285       else:
2286         self._ErrorIf(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2287                       "File %s is missing from node(s) %s", filename,
2288                       utils.CommaJoin(
2289                         utils.NiceSort(
2290                           map(self.cfg.GetNodeName, missing_file))))
2291
2292         # Warn if a node has a file it shouldn't
2293         unexpected = with_file - expected_nodes
2294         self._ErrorIf(unexpected,
2295                       constants.CV_ECLUSTERFILECHECK, None,
2296                       "File %s should not exist on node(s) %s",
2297                       filename, utils.CommaJoin(
2298                         utils.NiceSort(map(self.cfg.GetNodeName, unexpected))))
2299
2300       # See if there are multiple versions of the file
2301       test = len(checksums) > 1
2302       if test:
2303         variants = ["variant %s on %s" %
2304                     (idx + 1,
2305                      utils.CommaJoin(utils.NiceSort(
2306                        map(self.cfg.GetNodeName, node_uuids))))
2307                     for (idx, (checksum, node_uuids)) in
2308                       enumerate(sorted(checksums.items()))]
2309       else:
2310         variants = []
2311
2312       self._ErrorIf(test, constants.CV_ECLUSTERFILECHECK, None,
2313                     "File %s found with %s different checksums (%s)",
2314                     filename, len(checksums), "; ".join(variants))
2315
2316   def _VerifyNodeDrbdHelper(self, ninfo, nresult, drbd_helper):
2317     """Verify the drbd helper.
2318
2319     """
2320     if drbd_helper:
2321       helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2322       test = (helper_result is None)
2323       self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2324                     "no drbd usermode helper returned")
2325       if helper_result:
2326         status, payload = helper_result
2327         test = not status
2328         self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2329                       "drbd usermode helper check unsuccessful: %s", payload)
2330         test = status and (payload != drbd_helper)
2331         self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2332                       "wrong drbd usermode helper: %s", payload)
2333
2334   def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2335                       drbd_map):
2336     """Verifies and the node DRBD status.
2337
2338     @type ninfo: L{objects.Node}
2339     @param ninfo: the node to check
2340     @param nresult: the remote results for the node
2341     @param instanceinfo: the dict of instances
2342     @param drbd_helper: the configured DRBD usermode helper
2343     @param drbd_map: the DRBD map as returned by
2344         L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2345
2346     """
2347     self._VerifyNodeDrbdHelper(ninfo, nresult, drbd_helper)
2348
2349     # compute the DRBD minors
2350     node_drbd = {}
2351     for minor, inst_uuid in drbd_map[ninfo.uuid].items():
2352       test = inst_uuid not in instanceinfo
2353       self._ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2354                     "ghost instance '%s' in temporary DRBD map", inst_uuid)
2355         # ghost instance should not be running, but otherwise we
2356         # don't give double warnings (both ghost instance and
2357         # unallocated minor in use)
2358       if test:
2359         node_drbd[minor] = (inst_uuid, False)
2360       else:
2361         instance = instanceinfo[inst_uuid]
2362         node_drbd[minor] = (inst_uuid, instance.disks_active)
2363
2364     # and now check them
2365     used_minors = nresult.get(constants.NV_DRBDLIST, [])
2366     test = not isinstance(used_minors, (tuple, list))
2367     self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2368                   "cannot parse drbd status file: %s", str(used_minors))
2369     if test:
2370       # we cannot check drbd status
2371       return
2372
2373     for minor, (inst_uuid, must_exist) in node_drbd.items():
2374       test = minor not in used_minors and must_exist
2375       self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2376                     "drbd minor %d of instance %s is not active", minor,
2377                     self.cfg.GetInstanceName(inst_uuid))
2378     for minor in used_minors:
2379       test = minor not in node_drbd
2380       self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2381                     "unallocated drbd minor %d is in use", minor)
2382
2383   def _UpdateNodeOS(self, ninfo, nresult, nimg):
2384     """Builds the node OS structures.
2385
2386     @type ninfo: L{objects.Node}
2387     @param ninfo: the node to check
2388     @param nresult: the remote results for the node
2389     @param nimg: the node image object
2390
2391     """
2392     remote_os = nresult.get(constants.NV_OSLIST, None)
2393     test = (not isinstance(remote_os, list) or
2394             not compat.all(isinstance(v, list) and len(v) == 7
2395                            for v in remote_os))
2396
2397     self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2398                   "node hasn't returned valid OS data")
2399
2400     nimg.os_fail = test
2401
2402     if test:
2403       return
2404
2405     os_dict = {}
2406
2407     for (name, os_path, status, diagnose,
2408          variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2409
2410       if name not in os_dict:
2411         os_dict[name] = []
2412
2413       # parameters is a list of lists instead of list of tuples due to
2414       # JSON lacking a real tuple type, fix it:
2415       parameters = [tuple(v) for v in parameters]
2416       os_dict[name].append((os_path, status, diagnose,
2417                             set(variants), set(parameters), set(api_ver)))
2418
2419     nimg.oslist = os_dict
2420
2421   def _VerifyNodeOS(self, ninfo, nimg, base):
2422     """Verifies the node OS list.
2423
2424     @type ninfo: L{objects.Node}
2425     @param ninfo: the node to check
2426     @param nimg: the node image object
2427     @param base: the 'template' node we match against (e.g. from the master)
2428
2429     """
2430     assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2431
2432     beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2433     for os_name, os_data in nimg.oslist.items():
2434       assert os_data, "Empty OS status for OS %s?!" % os_name
2435       f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2436       self._ErrorIf(not f_status, constants.CV_ENODEOS, ninfo.name,
2437                     "Invalid OS %s (located at %s): %s",
2438                     os_name, f_path, f_diag)
2439       self._ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, ninfo.name,
2440                     "OS '%s' has multiple entries"
2441                     " (first one shadows the rest): %s",
2442                     os_name, utils.CommaJoin([v[0] for v in os_data]))
2443       # comparisons with the 'base' image
2444       test = os_name not in base.oslist
2445       self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2446                     "Extra OS %s not present on reference node (%s)",
2447                     os_name, self.cfg.GetNodeName(base.uuid))
2448       if test:
2449         continue
2450       assert base.oslist[os_name], "Base node has empty OS status?"
2451       _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2452       if not b_status:
2453         # base OS is invalid, skipping
2454         continue
2455       for kind, a, b in [("API version", f_api, b_api),
2456                          ("variants list", f_var, b_var),
2457                          ("parameters", beautify_params(f_param),
2458                           beautify_params(b_param))]:
2459         self._ErrorIf(a != b, constants.CV_ENODEOS, ninfo.name,
2460                       "OS %s for %s differs from reference node %s:"
2461                       " [%s] vs. [%s]", kind, os_name,
2462                       self.cfg.GetNodeName(base.uuid),
2463                       utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2464
2465     # check any missing OSes
2466     missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2467     self._ErrorIf(missing, constants.CV_ENODEOS, ninfo.name,
2468                   "OSes present on reference node %s"
2469                   " but missing on this node: %s",
2470                   self.cfg.GetNodeName(base.uuid), utils.CommaJoin(missing))
2471
2472   def _VerifyAcceptedFileStoragePaths(self, ninfo, nresult, is_master):
2473     """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2474
2475     @type ninfo: L{objects.Node}
2476     @param ninfo: the node to check
2477     @param nresult: the remote results for the node
2478     @type is_master: bool
2479     @param is_master: Whether node is the master node
2480
2481     """
2482     cluster = self.cfg.GetClusterInfo()
2483     if (is_master and
2484         (cluster.IsFileStorageEnabled() or
2485          cluster.IsSharedFileStorageEnabled())):
2486       try:
2487         fspaths = nresult[constants.NV_ACCEPTED_STORAGE_PATHS]
2488       except KeyError:
2489         # This should never happen
2490         self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2491                       "Node did not return forbidden file storage paths")
2492       else:
2493         self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2494                       "Found forbidden file storage paths: %s",
2495                       utils.CommaJoin(fspaths))
2496     else:
2497       self._ErrorIf(constants.NV_ACCEPTED_STORAGE_PATHS in nresult,
2498                     constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2499                     "Node should not have returned forbidden file storage"
2500                     " paths")
2501
2502   def _VerifyStoragePaths(self, ninfo, nresult, file_disk_template,
2503                           verify_key, error_key):
2504     """Verifies (file) storage paths.
2505
2506     @type ninfo: L{objects.Node}
2507     @param ninfo: the node to check
2508     @param nresult: the remote results for the node
2509     @type file_disk_template: string
2510     @param file_disk_template: file-based disk template, whose directory
2511         is supposed to be verified
2512     @type verify_key: string
2513     @param verify_key: key for the verification map of this file
2514         verification step
2515     @param error_key: error key to be added to the verification results
2516         in case something goes wrong in this verification step
2517
2518     """
2519     assert (file_disk_template in
2520             utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
2521     cluster = self.cfg.GetClusterInfo()
2522     if cluster.IsDiskTemplateEnabled(file_disk_template):
2523       self._ErrorIf(
2524           verify_key in nresult,
2525           error_key, ninfo.name,
2526           "The configured %s storage path is unusable: %s" %
2527           (file_disk_template, nresult.get(verify_key)))
2528
2529   def _VerifyFileStoragePaths(self, ninfo, nresult):
2530     """Verifies (file) storage paths.
2531
2532     @see: C{_VerifyStoragePaths}
2533
2534     """
2535     self._VerifyStoragePaths(
2536         ninfo, nresult, constants.DT_FILE,
2537         constants.NV_FILE_STORAGE_PATH,
2538         constants.CV_ENODEFILESTORAGEPATHUNUSABLE)
2539
2540   def _VerifySharedFileStoragePaths(self, ninfo, nresult):
2541     """Verifies (file) storage paths.
2542
2543     @see: C{_VerifyStoragePaths}
2544
2545     """
2546     self._VerifyStoragePaths(
2547         ninfo, nresult, constants.DT_SHARED_FILE,
2548         constants.NV_SHARED_FILE_STORAGE_PATH,
2549         constants.CV_ENODESHAREDFILESTORAGEPATHUNUSABLE)
2550
2551   def _VerifyOob(self, ninfo, nresult):
2552     """Verifies out of band functionality of a node.
2553
2554     @type ninfo: L{objects.Node}
2555     @param ninfo: the node to check
2556     @param nresult: the remote results for the node
2557
2558     """
2559     # We just have to verify the paths on master and/or master candidates
2560     # as the oob helper is invoked on the master
2561     if ((ninfo.master_candidate or ninfo.master_capable) and
2562         constants.NV_OOB_PATHS in nresult):
2563       for path_result in nresult[constants.NV_OOB_PATHS]:
2564         self._ErrorIf(path_result, constants.CV_ENODEOOBPATH,
2565                       ninfo.name, path_result)
2566
2567   def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2568     """Verifies and updates the node volume data.
2569
2570     This function will update a L{NodeImage}'s internal structures
2571     with data from the remote call.
2572
2573     @type ninfo: L{objects.Node}
2574     @param ninfo: the node to check
2575     @param nresult: the remote results for the node
2576     @param nimg: the node image object
2577     @param vg_name: the configured VG name
2578
2579     """
2580     nimg.lvm_fail = True
2581     lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2582     if vg_name is None:
2583       pass
2584     elif isinstance(lvdata, basestring):
2585       self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2586                     "LVM problem on node: %s", utils.SafeEncode(lvdata))
2587     elif not isinstance(lvdata, dict):
2588       self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2589                     "rpc call to node failed (lvlist)")
2590     else:
2591       nimg.volumes = lvdata
2592       nimg.lvm_fail = False
2593
2594   def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2595     """Verifies and updates the node instance list.
2596
2597     If the listing was successful, then updates this node's instance
2598     list. Otherwise, it marks the RPC call as failed for the instance
2599     list key.
2600
2601     @type ninfo: L{objects.Node}
2602     @param ninfo: the node to check
2603     @param nresult: the remote results for the node
2604     @param nimg: the node image object
2605
2606     """
2607     idata = nresult.get(constants.NV_INSTANCELIST, None)
2608     test = not isinstance(idata, list)
2609     self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2610                   "rpc call to node failed (instancelist): %s",
2611                   utils.SafeEncode(str(idata)))
2612     if test:
2613       nimg.hyp_fail = True
2614     else:
2615       nimg.instances = [inst.uuid for (_, inst) in
2616                         self.cfg.GetMultiInstanceInfoByName(idata)]
2617
2618   def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2619     """Verifies and computes a node information map
2620
2621     @type ninfo: L{objects.Node}
2622     @param ninfo: the node to check
2623     @param nresult: the remote results for the node
2624     @param nimg: the node image object
2625     @param vg_name: the configured VG name
2626
2627     """
2628     # try to read free memory (from the hypervisor)
2629     hv_info = nresult.get(constants.NV_HVINFO, None)
2630     test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2631     self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2632                   "rpc call to node failed (hvinfo)")
2633     if not test:
2634       try:
2635         nimg.mfree = int(hv_info["memory_free"])
2636       except (ValueError, TypeError):
2637         self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2638                       "node returned invalid nodeinfo, check hypervisor")
2639
2640     # FIXME: devise a free space model for file based instances as well
2641     if vg_name is not None:
2642       test = (constants.NV_VGLIST not in nresult or
2643               vg_name not in nresult[constants.NV_VGLIST])
2644       self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
2645                     "node didn't return data for the volume group '%s'"
2646                     " - it is either missing or broken", vg_name)
2647       if not test:
2648         try:
2649           nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2650         except (ValueError, TypeError):
2651           self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2652                         "node returned invalid LVM info, check LVM status")
2653
2654   def _CollectDiskInfo(self, node_uuids, node_image, instanceinfo):
2655     """Gets per-disk status information for all instances.
2656
2657     @type node_uuids: list of strings
2658     @param node_uuids: Node UUIDs
2659     @type node_image: dict of (UUID, L{objects.Node})
2660     @param node_image: Node objects
2661     @type instanceinfo: dict of (UUID, L{objects.Instance})
2662     @param instanceinfo: Instance objects
2663     @rtype: {instance: {node: [(succes, payload)]}}
2664     @return: a dictionary of per-instance dictionaries with nodes as
2665         keys and disk information as values; the disk information is a
2666         list of tuples (success, payload)
2667
2668     """
2669     node_disks = {}
2670     node_disks_devonly = {}
2671     diskless_instances = set()
2672     diskless = constants.DT_DISKLESS
2673
2674     for nuuid in node_uuids:
2675       node_inst_uuids = list(itertools.chain(node_image[nuuid].pinst,
2676                                              node_image[nuuid].sinst))
2677       diskless_instances.update(uuid for uuid in node_inst_uuids
2678                                 if instanceinfo[uuid].disk_template == diskless)
2679       disks = [(inst_uuid, disk)
2680                for inst_uuid in node_inst_uuids
2681                for disk in instanceinfo[inst_uuid].disks]
2682
2683       if not disks:
2684         # No need to collect data
2685         continue
2686
2687       node_disks[nuuid] = disks
2688
2689       # _AnnotateDiskParams makes already copies of the disks
2690       devonly = []
2691       for (inst_uuid, dev) in disks:
2692         (anno_disk,) = AnnotateDiskParams(instanceinfo[inst_uuid], [dev],
2693                                           self.cfg)
2694         self.cfg.SetDiskID(anno_disk, nuuid)
2695         devonly.append(anno_disk)
2696
2697       node_disks_devonly[nuuid] = devonly
2698
2699     assert len(node_disks) == len(node_disks_devonly)
2700
2701     # Collect data from all nodes with disks
2702     result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2703                                                           node_disks_devonly)
2704
2705     assert len(result) == len(node_disks)
2706
2707     instdisk = {}
2708
2709     for (nuuid, nres) in result.items():
2710       node = self.cfg.GetNodeInfo(nuuid)
2711       disks = node_disks[node.uuid]
2712
2713       if nres.offline:
2714         # No data from this node
2715         data = len(disks) * [(False, "node offline")]
2716       else:
2717         msg = nres.fail_msg
2718         self._ErrorIf(msg, constants.CV_ENODERPC, node.name,
2719                       "while getting disk information: %s", msg)
2720         if msg:
2721           # No data from this node
2722           data = len(disks) * [(False, msg)]
2723         else:
2724           data = []
2725           for idx, i in enumerate(nres.payload):
2726             if isinstance(i, (tuple, list)) and len(i) == 2:
2727               data.append(i)
2728             else:
2729               logging.warning("Invalid result from node %s, entry %d: %s",
2730                               node.name, idx, i)
2731               data.append((False, "Invalid result from the remote node"))
2732
2733       for ((inst_uuid, _), status) in zip(disks, data):
2734         instdisk.setdefault(inst_uuid, {}).setdefault(node.uuid, []) \
2735           .append(status)
2736
2737     # Add empty entries for diskless instances.
2738     for inst_uuid in diskless_instances:
2739       assert inst_uuid not in instdisk
2740       instdisk[inst_uuid] = {}
2741
2742     assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2743                       len(nuuids) <= len(instanceinfo[inst].all_nodes) and
2744                       compat.all(isinstance(s, (tuple, list)) and
2745                                  len(s) == 2 for s in statuses)
2746                       for inst, nuuids in instdisk.items()
2747                       for nuuid, statuses in nuuids.items())
2748     if __debug__:
2749       instdisk_keys = set(instdisk)
2750       instanceinfo_keys = set(instanceinfo)
2751       assert instdisk_keys == instanceinfo_keys, \
2752         ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2753          (instdisk_keys, instanceinfo_keys))
2754
2755     return instdisk
2756
2757   @staticmethod
2758   def _SshNodeSelector(group_uuid, all_nodes):
2759     """Create endless iterators for all potential SSH check hosts.
2760
2761     """
2762     nodes = [node for node in all_nodes
2763              if (node.group != group_uuid and
2764                  not node.offline)]
2765     keyfunc = operator.attrgetter("group")
2766
2767     return map(itertools.cycle,
2768                [sorted(map(operator.attrgetter("name"), names))
2769                 for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2770                                                   keyfunc)])
2771
2772   @classmethod
2773   def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2774     """Choose which nodes should talk to which other nodes.
2775
2776     We will make nodes contact all nodes in their group, and one node from
2777     every other group.
2778
2779     @warning: This algorithm has a known issue if one node group is much
2780       smaller than others (e.g. just one node). In such a case all other
2781       nodes will talk to the single node.
2782
2783     """
2784     online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2785     sel = cls._SshNodeSelector(group_uuid, all_nodes)
2786
2787     return (online_nodes,
2788             dict((name, sorted([i.next() for i in sel]))
2789                  for name in online_nodes))
2790
2791   def BuildHooksEnv(self):
2792     """Build hooks env.
2793
2794     Cluster-Verify hooks just ran in the post phase and their failure makes
2795     the output be logged in the verify output and the verification to fail.
2796
2797     """
2798     env = {
2799       "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2800       }
2801
2802     env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2803                for node in self.my_node_info.values())
2804
2805     return env
2806
2807   def BuildHooksNodes(self):
2808     """Build hooks nodes.
2809
2810     """
2811     return ([], list(self.my_node_info.keys()))
2812
2813   def Exec(self, feedback_fn):
2814     """Verify integrity of the node group, performing various test on nodes.
2815
2816     """
2817     # This method has too many local variables. pylint: disable=R0914
2818     feedback_fn("* Verifying group '%s'" % self.group_info.name)
2819
2820     if not self.my_node_uuids:
2821       # empty node group
2822       feedback_fn("* Empty node group, skipping verification")
2823       return True
2824
2825     self.bad = False
2826     verbose = self.op.verbose
2827     self._feedback_fn = feedback_fn
2828
2829     vg_name = self.cfg.GetVGName()
2830     drbd_helper = self.cfg.GetDRBDHelper()
2831     cluster = self.cfg.GetClusterInfo()
2832     hypervisors = cluster.enabled_hypervisors
2833     node_data_list = self.my_node_info.values()
2834
2835     i_non_redundant = [] # Non redundant instances
2836     i_non_a_balanced = [] # Non auto-balanced instances
2837     i_offline = 0 # Count of offline instances
2838     n_offline = 0 # Count of offline nodes
2839     n_drained = 0 # Count of nodes being drained
2840     node_vol_should = {}
2841
2842     # FIXME: verify OS list
2843
2844     # File verification
2845     filemap = ComputeAncillaryFiles(cluster, False)
2846
2847     # do local checksums
2848     master_node_uuid = self.master_node = self.cfg.GetMasterNode()
2849     master_ip = self.cfg.GetMasterIP()
2850
2851     feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_uuids))
2852
2853     user_scripts = []
2854     if self.cfg.GetUseExternalMipScript():
2855       user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2856
2857     node_verify_param = {
2858       constants.NV_FILELIST:
2859         map(vcluster.MakeVirtualPath,
2860             utils.UniqueSequence(filename
2861                                  for files in filemap
2862                                  for filename in files)),
2863       constants.NV_NODELIST:
2864         self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2865                                   self.all_node_info.values()),
2866       constants.NV_HYPERVISOR: hypervisors,
2867       constants.NV_HVPARAMS:
2868         _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2869       constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2870                                  for node in node_data_list
2871                                  if not node.offline],
2872       constants.NV_INSTANCELIST: hypervisors,
2873       constants.NV_VERSION: None,
2874       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2875       constants.NV_NODESETUP: None,
2876       constants.NV_TIME: None,
2877       constants.NV_MASTERIP: (self.cfg.GetMasterNodeName(), master_ip),
2878       constants.NV_OSLIST: None,
2879       constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2880       constants.NV_USERSCRIPTS: user_scripts,
2881       }
2882
2883     if vg_name is not None:
2884       node_verify_param[constants.NV_VGLIST] = None
2885       node_verify_param[constants.NV_LVLIST] = vg_name
2886       node_verify_param[constants.NV_PVLIST] = [vg_name]
2887
2888     if cluster.IsDiskTemplateEnabled(constants.DT_DRBD8):
2889       if drbd_helper:
2890         node_verify_param[constants.NV_DRBDVERSION] = None
2891         node_verify_param[constants.NV_DRBDLIST] = None
2892         node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2893
2894     if cluster.IsFileStorageEnabled() or \
2895         cluster.IsSharedFileStorageEnabled():
2896       # Load file storage paths only from master node
2897       node_verify_param[constants.NV_ACCEPTED_STORAGE_PATHS] = \
2898         self.cfg.GetMasterNodeName()
2899       if cluster.IsFileStorageEnabled():
2900         node_verify_param[constants.NV_FILE_STORAGE_PATH] = \
2901           cluster.file_storage_dir
2902
2903     # bridge checks
2904     # FIXME: this needs to be changed per node-group, not cluster-wide
2905     bridges = set()
2906     default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2907     if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2908       bridges.add(default_nicpp[constants.NIC_LINK])
2909     for inst_uuid in self.my_inst_info.values():
2910       for nic in inst_uuid.nics:
2911         full_nic = cluster.SimpleFillNIC(nic.nicparams)
2912         if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2913           bridges.add(full_nic[constants.NIC_LINK])
2914
2915     if bridges:
2916       node_verify_param[constants.NV_BRIDGES] = list(bridges)
2917
2918     # Build our expected cluster state
2919     node_image = dict((node.uuid, self.NodeImage(offline=node.offline,
2920                                                  uuid=node.uuid,
2921                                                  vm_capable=node.vm_capable))
2922                       for node in node_data_list)
2923
2924     # Gather OOB paths
2925     oob_paths = []
2926     for node in self.all_node_info.values():
2927       path = SupportsOob(self.cfg, node)
2928       if path and path not in oob_paths:
2929         oob_paths.append(path)
2930
2931     if oob_paths:
2932       node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2933
2934     for inst_uuid in self.my_inst_uuids:
2935       instance = self.my_inst_info[inst_uuid]
2936       if instance.admin_state == constants.ADMINST_OFFLINE:
2937         i_offline += 1
2938
2939       for nuuid in instance.all_nodes:
2940         if nuuid not in node_image:
2941           gnode = self.NodeImage(uuid=nuuid)
2942           gnode.ghost = (nuuid not in self.all_node_info)
2943           node_image[nuuid] = gnode
2944
2945       instance.MapLVsByNode(node_vol_should)
2946
2947       pnode = instance.primary_node
2948       node_image[pnode].pinst.append(instance.uuid)
2949
2950       for snode in instance.secondary_nodes:
2951         nimg = node_image[snode]
2952         nimg.sinst.append(instance.uuid)
2953         if pnode not in nimg.sbp:
2954           nimg.sbp[pnode] = []
2955         nimg.sbp[pnode].append(instance.uuid)
2956
2957     es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
2958                                                self.my_node_info.keys())
2959     # The value of exclusive_storage should be the same across the group, so if
2960     # it's True for at least a node, we act as if it were set for all the nodes
2961     self._exclusive_storage = compat.any(es_flags.values())
2962     if self._exclusive_storage:
2963       node_verify_param[constants.NV_EXCLUSIVEPVS] = True
2964
2965     # At this point, we have the in-memory data structures complete,
2966     # except for the runtime information, which we'll gather next
2967
2968     # Due to the way our RPC system works, exact response times cannot be
2969     # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2970     # time before and after executing the request, we can at least have a time
2971     # window.
2972     nvinfo_starttime = time.time()
2973     all_nvinfo = self.rpc.call_node_verify(self.my_node_uuids,
2974                                            node_verify_param,
2975                                            self.cfg.GetClusterName(),
2976                                            self.cfg.GetClusterInfo().hvparams)
2977     nvinfo_endtime = time.time()
2978
2979     if self.extra_lv_nodes and vg_name is not None:
2980       extra_lv_nvinfo = \
2981           self.rpc.call_node_verify(self.extra_lv_nodes,
2982                                     {constants.NV_LVLIST: vg_name},
2983                                     self.cfg.GetClusterName(),
2984                                     self.cfg.GetClusterInfo().hvparams)
2985     else:
2986       extra_lv_nvinfo = {}
2987
2988     all_drbd_map = self.cfg.ComputeDRBDMap()
2989
2990     feedback_fn("* Gathering disk information (%s nodes)" %
2991                 len(self.my_node_uuids))
2992     instdisk = self._CollectDiskInfo(self.my_node_info.keys(), node_image,
2993                                      self.my_inst_info)
2994
2995     feedback_fn("* Verifying configuration file consistency")
2996
2997     # If not all nodes are being checked, we need to make sure the master node
2998     # and a non-checked vm_capable node are in the list.
2999     absent_node_uuids = set(self.all_node_info).difference(self.my_node_info)
3000     if absent_node_uuids:
3001       vf_nvinfo = all_nvinfo.copy()
3002       vf_node_info = list(self.my_node_info.values())
3003       additional_node_uuids = []
3004       if master_node_uuid not in self.my_node_info:
3005         additional_node_uuids.append(master_node_uuid)
3006         vf_node_info.append(self.all_node_info[master_node_uuid])
3007       # Add the first vm_capable node we find which is not included,
3008       # excluding the master node (which we already have)
3009       for node_uuid in absent_node_uuids:
3010         nodeinfo = self.all_node_info[node_uuid]
3011         if (nodeinfo.vm_capable and not nodeinfo.offline and
3012             node_uuid != master_node_uuid):
3013           additional_node_uuids.append(node_uuid)
3014           vf_node_info.append(self.all_node_info[node_uuid])
3015           break
3016       key = constants.NV_FILELIST
3017       vf_nvinfo.update(self.rpc.call_node_verify(
3018          additional_node_uuids, {key: node_verify_param[key]},
3019          self.cfg.GetClusterName(), self.cfg.GetClusterInfo().hvparams))
3020     else:
3021       vf_nvinfo = all_nvinfo
3022       vf_node_info = self.my_node_info.values()
3023
3024     self._VerifyFiles(vf_node_info, master_node_uuid, vf_nvinfo, filemap)
3025
3026     feedback_fn("* Verifying node status")
3027
3028     refos_img = None
3029
3030     for node_i in node_data_list:
3031       nimg = node_image[node_i.uuid]
3032
3033       if node_i.offline:
3034         if verbose:
3035           feedback_fn("* Skipping offline node %s" % (node_i.name,))
3036         n_offline += 1
3037         continue
3038
3039       if node_i.uuid == master_node_uuid:
3040         ntype = "master"
3041       elif node_i.master_candidate:
3042         ntype = "master candidate"
3043       elif node_i.drained:
3044         ntype = "drained"
3045         n_drained += 1
3046       else:
3047         ntype = "regular"
3048       if verbose:
3049         feedback_fn("* Verifying node %s (%s)" % (node_i.name, ntype))
3050
3051       msg = all_nvinfo[node_i.uuid].fail_msg
3052       self._ErrorIf(msg, constants.CV_ENODERPC, node_i.name,
3053                     "while contacting node: %s", msg)
3054       if msg:
3055         nimg.rpc_fail = True
3056         continue
3057
3058       nresult = all_nvinfo[node_i.uuid].payload
3059
3060       nimg.call_ok = self._VerifyNode(node_i, nresult)
3061       self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
3062       self._VerifyNodeNetwork(node_i, nresult)
3063       self._VerifyNodeUserScripts(node_i, nresult)
3064       self._VerifyOob(node_i, nresult)
3065       self._VerifyAcceptedFileStoragePaths(node_i, nresult,
3066                                            node_i.uuid == master_node_uuid)
3067       self._VerifyFileStoragePaths(node_i, nresult)
3068       self._VerifySharedFileStoragePaths(node_i, nresult)
3069
3070       if nimg.vm_capable:
3071         self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
3072         self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
3073                              all_drbd_map)
3074
3075         self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
3076         self._UpdateNodeInstances(node_i, nresult, nimg)
3077         self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
3078         self._UpdateNodeOS(node_i, nresult, nimg)
3079
3080         if not nimg.os_fail:
3081           if refos_img is None:
3082             refos_img = nimg
3083           self._VerifyNodeOS(node_i, nimg, refos_img)
3084         self._VerifyNodeBridges(node_i, nresult, bridges)
3085
3086         # Check whether all running instances are primary for the node. (This
3087         # can no longer be done from _VerifyInstance below, since some of the
3088         # wrong instances could be from other node groups.)
3089         non_primary_inst_uuids = set(nimg.instances).difference(nimg.pinst)
3090
3091         for inst_uuid in non_primary_inst_uuids:
3092           test = inst_uuid in self.all_inst_info
3093           self._ErrorIf(test, constants.CV_EINSTANCEWRONGNODE,
3094                         self.cfg.GetInstanceName(inst_uuid),
3095                         "instance should not run on node %s", node_i.name)
3096           self._ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
3097                         "node is running unknown instance %s", inst_uuid)
3098
3099     self._VerifyGroupDRBDVersion(all_nvinfo)
3100     self._VerifyGroupLVM(node_image, vg_name)
3101
3102     for node_uuid, result in extra_lv_nvinfo.items():
3103       self._UpdateNodeVolumes(self.all_node_info[node_uuid], result.payload,
3104                               node_image[node_uuid], vg_name)
3105
3106     feedback_fn("* Verifying instance status")
3107     for inst_uuid in self.my_inst_uuids:
3108       instance = self.my_inst_info[inst_uuid]
3109       if verbose:
3110         feedback_fn("* Verifying instance %s" % instance.name)
3111       self._VerifyInstance(instance, node_image, instdisk[inst_uuid])
3112
3113       # If the instance is non-redundant we cannot survive losing its primary
3114       # node, so we are not N+1 compliant.
3115       if instance.disk_template not in constants.DTS_MIRRORED:
3116         i_non_redundant.append(instance)
3117
3118       if not cluster.FillBE(instance)[constants.BE_AUTO_BALANCE]:
3119         i_non_a_balanced.append(instance)
3120
3121     feedback_fn("* Verifying orphan volumes")
3122     reserved = utils.FieldSet(*cluster.reserved_lvs)
3123
3124     # We will get spurious "unknown volume" warnings if any node of this group
3125     # is secondary for an instance whose primary is in another group. To avoid
3126     # them, we find these instances and add their volumes to node_vol_should.
3127     for instance in self.all_inst_info.values():
3128       for secondary in instance.secondary_nodes:
3129         if (secondary in self.my_node_info
3130             and instance.name not in self.my_inst_info):
3131           instance.MapLVsByNode(node_vol_should)
3132           break
3133
3134     self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
3135
3136     if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
3137       feedback_fn("* Verifying N+1 Memory redundancy")
3138       self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
3139
3140     feedback_fn("* Other Notes")
3141     if i_non_redundant:
3142       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
3143                   % len(i_non_redundant))
3144
3145     if i_non_a_balanced:
3146       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
3147                   % len(i_non_a_balanced))
3148
3149     if i_offline:
3150       feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
3151
3152     if n_offline:
3153       feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
3154
3155     if n_drained:
3156       feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
3157
3158     return not self.bad
3159
3160   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
3161     """Analyze the post-hooks' result
3162
3163     This method analyses the hook result, handles it, and sends some
3164     nicely-formatted feedback back to the user.
3165
3166     @param phase: one of L{constants.HOOKS_PHASE_POST} or
3167         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
3168     @param hooks_results: the results of the multi-node hooks rpc call
3169     @param feedback_fn: function used send feedback back to the caller
3170     @param lu_result: previous Exec result
3171     @return: the new Exec result, based on the previous result
3172         and hook results
3173
3174     """
3175     # We only really run POST phase hooks, only for non-empty groups,
3176     # and are only interested in their results
3177     if not self.my_node_uuids:
3178       # empty node group
3179       pass
3180     elif phase == constants.HOOKS_PHASE_POST:
3181       # Used to change hooks' output to proper indentation
3182       feedback_fn("* Hooks Results")
3183       assert hooks_results, "invalid result from hooks"
3184
3185       for node_name in hooks_results:
3186         res = hooks_results[node_name]
3187         msg = res.fail_msg
3188         test = msg and not res.offline
3189         self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3190                       "Communication failure in hooks execution: %s", msg)
3191         if res.offline or msg:
3192           # No need to investigate payload if node is offline or gave
3193           # an error.
3194           continue
3195         for script, hkr, output in res.payload:
3196           test = hkr == constants.HKR_FAIL
3197           self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3198                         "Script %s failed, output:", script)
3199           if test:
3200             output = self._HOOKS_INDENT_RE.sub("      ", output)
3201             feedback_fn("%s" % output)
3202             lu_result = False
3203
3204     return lu_result
3205
3206
3207 class LUClusterVerifyDisks(NoHooksLU):
3208   """Verifies the cluster disks status.
3209
3210   """
3211   REQ_BGL = False
3212
3213   def ExpandNames(self):
3214     self.share_locks = ShareAll()
3215     self.needed_locks = {
3216       locking.LEVEL_NODEGROUP: locking.ALL_SET,
3217       }
3218
3219   def Exec(self, feedback_fn):
3220     group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3221
3222     # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3223     return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3224                            for group in group_names])