Remove physical_id field from disk object
[ganeti-local] / lib / cmdlib / cluster.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Logical units dealing with the cluster."""
23
24 import OpenSSL
25
26 import copy
27 import itertools
28 import logging
29 import operator
30 import os
31 import re
32 import time
33
34 from ganeti import compat
35 from ganeti import constants
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import masterd
40 from ganeti import netutils
41 from ganeti import objects
42 from ganeti import opcodes
43 from ganeti import pathutils
44 from ganeti import query
45 from ganeti import rpc
46 from ganeti import runtime
47 from ganeti import ssh
48 from ganeti import uidpool
49 from ganeti import utils
50 from ganeti import vcluster
51
52 from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
53   ResultWithJobs
54 from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55   ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56   GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57   GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58   CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59   ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob, \
60   CheckIpolicyVsDiskTemplates
61
62 import ganeti.masterd.instance
63
64
65 class LUClusterActivateMasterIp(NoHooksLU):
66   """Activate the master IP on the master node.
67
68   """
69   def Exec(self, feedback_fn):
70     """Activate the master IP.
71
72     """
73     master_params = self.cfg.GetMasterNetworkParameters()
74     ems = self.cfg.GetUseExternalMipScript()
75     result = self.rpc.call_node_activate_master_ip(master_params.uuid,
76                                                    master_params, ems)
77     result.Raise("Could not activate the master IP")
78
79
80 class LUClusterDeactivateMasterIp(NoHooksLU):
81   """Deactivate the master IP on the master node.
82
83   """
84   def Exec(self, feedback_fn):
85     """Deactivate the master IP.
86
87     """
88     master_params = self.cfg.GetMasterNetworkParameters()
89     ems = self.cfg.GetUseExternalMipScript()
90     result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
91                                                      master_params, ems)
92     result.Raise("Could not deactivate the master IP")
93
94
95 class LUClusterConfigQuery(NoHooksLU):
96   """Return configuration values.
97
98   """
99   REQ_BGL = False
100
101   def CheckArguments(self):
102     self.cq = ClusterQuery(None, self.op.output_fields, False)
103
104   def ExpandNames(self):
105     self.cq.ExpandNames(self)
106
107   def DeclareLocks(self, level):
108     self.cq.DeclareLocks(self, level)
109
110   def Exec(self, feedback_fn):
111     result = self.cq.OldStyleQuery(self)
112
113     assert len(result) == 1
114
115     return result[0]
116
117
118 class LUClusterDestroy(LogicalUnit):
119   """Logical unit for destroying the cluster.
120
121   """
122   HPATH = "cluster-destroy"
123   HTYPE = constants.HTYPE_CLUSTER
124
125   def BuildHooksEnv(self):
126     """Build hooks env.
127
128     """
129     return {
130       "OP_TARGET": self.cfg.GetClusterName(),
131       }
132
133   def BuildHooksNodes(self):
134     """Build hooks nodes.
135
136     """
137     return ([], [])
138
139   def CheckPrereq(self):
140     """Check prerequisites.
141
142     This checks whether the cluster is empty.
143
144     Any errors are signaled by raising errors.OpPrereqError.
145
146     """
147     master = self.cfg.GetMasterNode()
148
149     nodelist = self.cfg.GetNodeList()
150     if len(nodelist) != 1 or nodelist[0] != master:
151       raise errors.OpPrereqError("There are still %d node(s) in"
152                                  " this cluster." % (len(nodelist) - 1),
153                                  errors.ECODE_INVAL)
154     instancelist = self.cfg.GetInstanceList()
155     if instancelist:
156       raise errors.OpPrereqError("There are still %d instance(s) in"
157                                  " this cluster." % len(instancelist),
158                                  errors.ECODE_INVAL)
159
160   def Exec(self, feedback_fn):
161     """Destroys the cluster.
162
163     """
164     master_params = self.cfg.GetMasterNetworkParameters()
165
166     # Run post hooks on master node before it's removed
167     RunPostHook(self, self.cfg.GetNodeName(master_params.uuid))
168
169     ems = self.cfg.GetUseExternalMipScript()
170     result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
171                                                      master_params, ems)
172     result.Warn("Error disabling the master IP address", self.LogWarning)
173     return master_params.uuid
174
175
176 class LUClusterPostInit(LogicalUnit):
177   """Logical unit for running hooks after cluster initialization.
178
179   """
180   HPATH = "cluster-init"
181   HTYPE = constants.HTYPE_CLUSTER
182
183   def BuildHooksEnv(self):
184     """Build hooks env.
185
186     """
187     return {
188       "OP_TARGET": self.cfg.GetClusterName(),
189       }
190
191   def BuildHooksNodes(self):
192     """Build hooks nodes.
193
194     """
195     return ([], [self.cfg.GetMasterNode()])
196
197   def Exec(self, feedback_fn):
198     """Nothing to do.
199
200     """
201     return True
202
203
204 class ClusterQuery(QueryBase):
205   FIELDS = query.CLUSTER_FIELDS
206
207   #: Do not sort (there is only one item)
208   SORT_FIELD = None
209
210   def ExpandNames(self, lu):
211     lu.needed_locks = {}
212
213     # The following variables interact with _QueryBase._GetNames
214     self.wanted = locking.ALL_SET
215     self.do_locking = self.use_locking
216
217     if self.do_locking:
218       raise errors.OpPrereqError("Can not use locking for cluster queries",
219                                  errors.ECODE_INVAL)
220
221   def DeclareLocks(self, lu, level):
222     pass
223
224   def _GetQueryData(self, lu):
225     """Computes the list of nodes and their attributes.
226
227     """
228     # Locking is not used
229     assert not (compat.any(lu.glm.is_owned(level)
230                            for level in locking.LEVELS
231                            if level != locking.LEVEL_CLUSTER) or
232                 self.do_locking or self.use_locking)
233
234     if query.CQ_CONFIG in self.requested_data:
235       cluster = lu.cfg.GetClusterInfo()
236       nodes = lu.cfg.GetAllNodesInfo()
237     else:
238       cluster = NotImplemented
239       nodes = NotImplemented
240
241     if query.CQ_QUEUE_DRAINED in self.requested_data:
242       drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
243     else:
244       drain_flag = NotImplemented
245
246     if query.CQ_WATCHER_PAUSE in self.requested_data:
247       master_node_uuid = lu.cfg.GetMasterNode()
248
249       result = lu.rpc.call_get_watcher_pause(master_node_uuid)
250       result.Raise("Can't retrieve watcher pause from master node '%s'" %
251                    lu.cfg.GetMasterNodeName())
252
253       watcher_pause = result.payload
254     else:
255       watcher_pause = NotImplemented
256
257     return query.ClusterQueryData(cluster, nodes, drain_flag, watcher_pause)
258
259
260 class LUClusterQuery(NoHooksLU):
261   """Query cluster configuration.
262
263   """
264   REQ_BGL = False
265
266   def ExpandNames(self):
267     self.needed_locks = {}
268
269   def Exec(self, feedback_fn):
270     """Return cluster config.
271
272     """
273     cluster = self.cfg.GetClusterInfo()
274     os_hvp = {}
275
276     # Filter just for enabled hypervisors
277     for os_name, hv_dict in cluster.os_hvp.items():
278       os_hvp[os_name] = {}
279       for hv_name, hv_params in hv_dict.items():
280         if hv_name in cluster.enabled_hypervisors:
281           os_hvp[os_name][hv_name] = hv_params
282
283     # Convert ip_family to ip_version
284     primary_ip_version = constants.IP4_VERSION
285     if cluster.primary_ip_family == netutils.IP6Address.family:
286       primary_ip_version = constants.IP6_VERSION
287
288     result = {
289       "software_version": constants.RELEASE_VERSION,
290       "protocol_version": constants.PROTOCOL_VERSION,
291       "config_version": constants.CONFIG_VERSION,
292       "os_api_version": max(constants.OS_API_VERSIONS),
293       "export_version": constants.EXPORT_VERSION,
294       "vcs_version": constants.VCS_VERSION,
295       "architecture": runtime.GetArchInfo(),
296       "name": cluster.cluster_name,
297       "master": self.cfg.GetMasterNodeName(),
298       "default_hypervisor": cluster.primary_hypervisor,
299       "enabled_hypervisors": cluster.enabled_hypervisors,
300       "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
301                         for hypervisor_name in cluster.enabled_hypervisors]),
302       "os_hvp": os_hvp,
303       "beparams": cluster.beparams,
304       "osparams": cluster.osparams,
305       "ipolicy": cluster.ipolicy,
306       "nicparams": cluster.nicparams,
307       "ndparams": cluster.ndparams,
308       "diskparams": cluster.diskparams,
309       "candidate_pool_size": cluster.candidate_pool_size,
310       "master_netdev": cluster.master_netdev,
311       "master_netmask": cluster.master_netmask,
312       "use_external_mip_script": cluster.use_external_mip_script,
313       "volume_group_name": cluster.volume_group_name,
314       "drbd_usermode_helper": cluster.drbd_usermode_helper,
315       "file_storage_dir": cluster.file_storage_dir,
316       "shared_file_storage_dir": cluster.shared_file_storage_dir,
317       "maintain_node_health": cluster.maintain_node_health,
318       "ctime": cluster.ctime,
319       "mtime": cluster.mtime,
320       "uuid": cluster.uuid,
321       "tags": list(cluster.GetTags()),
322       "uid_pool": cluster.uid_pool,
323       "default_iallocator": cluster.default_iallocator,
324       "reserved_lvs": cluster.reserved_lvs,
325       "primary_ip_version": primary_ip_version,
326       "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
327       "hidden_os": cluster.hidden_os,
328       "blacklisted_os": cluster.blacklisted_os,
329       "enabled_disk_templates": cluster.enabled_disk_templates,
330       }
331
332     return result
333
334
335 class LUClusterRedistConf(NoHooksLU):
336   """Force the redistribution of cluster configuration.
337
338   This is a very simple LU.
339
340   """
341   REQ_BGL = False
342
343   def ExpandNames(self):
344     self.needed_locks = {
345       locking.LEVEL_NODE: locking.ALL_SET,
346       locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
347     }
348     self.share_locks = ShareAll()
349
350   def Exec(self, feedback_fn):
351     """Redistribute the configuration.
352
353     """
354     self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
355     RedistributeAncillaryFiles(self)
356
357
358 class LUClusterRename(LogicalUnit):
359   """Rename the cluster.
360
361   """
362   HPATH = "cluster-rename"
363   HTYPE = constants.HTYPE_CLUSTER
364
365   def BuildHooksEnv(self):
366     """Build hooks env.
367
368     """
369     return {
370       "OP_TARGET": self.cfg.GetClusterName(),
371       "NEW_NAME": self.op.name,
372       }
373
374   def BuildHooksNodes(self):
375     """Build hooks nodes.
376
377     """
378     return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
379
380   def CheckPrereq(self):
381     """Verify that the passed name is a valid one.
382
383     """
384     hostname = netutils.GetHostname(name=self.op.name,
385                                     family=self.cfg.GetPrimaryIPFamily())
386
387     new_name = hostname.name
388     self.ip = new_ip = hostname.ip
389     old_name = self.cfg.GetClusterName()
390     old_ip = self.cfg.GetMasterIP()
391     if new_name == old_name and new_ip == old_ip:
392       raise errors.OpPrereqError("Neither the name nor the IP address of the"
393                                  " cluster has changed",
394                                  errors.ECODE_INVAL)
395     if new_ip != old_ip:
396       if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
397         raise errors.OpPrereqError("The given cluster IP address (%s) is"
398                                    " reachable on the network" %
399                                    new_ip, errors.ECODE_NOTUNIQUE)
400
401     self.op.name = new_name
402
403   def Exec(self, feedback_fn):
404     """Rename the cluster.
405
406     """
407     clustername = self.op.name
408     new_ip = self.ip
409
410     # shutdown the master IP
411     master_params = self.cfg.GetMasterNetworkParameters()
412     ems = self.cfg.GetUseExternalMipScript()
413     result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
414                                                      master_params, ems)
415     result.Raise("Could not disable the master role")
416
417     try:
418       cluster = self.cfg.GetClusterInfo()
419       cluster.cluster_name = clustername
420       cluster.master_ip = new_ip
421       self.cfg.Update(cluster, feedback_fn)
422
423       # update the known hosts file
424       ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
425       node_list = self.cfg.GetOnlineNodeList()
426       try:
427         node_list.remove(master_params.uuid)
428       except ValueError:
429         pass
430       UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
431     finally:
432       master_params.ip = new_ip
433       result = self.rpc.call_node_activate_master_ip(master_params.uuid,
434                                                      master_params, ems)
435       result.Warn("Could not re-enable the master role on the master,"
436                   " please restart manually", self.LogWarning)
437
438     return clustername
439
440
441 class LUClusterRepairDiskSizes(NoHooksLU):
442   """Verifies the cluster disks sizes.
443
444   """
445   REQ_BGL = False
446
447   def ExpandNames(self):
448     if self.op.instances:
449       (_, self.wanted_names) = GetWantedInstances(self, self.op.instances)
450       # Not getting the node allocation lock as only a specific set of
451       # instances (and their nodes) is going to be acquired
452       self.needed_locks = {
453         locking.LEVEL_NODE_RES: [],
454         locking.LEVEL_INSTANCE: self.wanted_names,
455         }
456       self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
457     else:
458       self.wanted_names = None
459       self.needed_locks = {
460         locking.LEVEL_NODE_RES: locking.ALL_SET,
461         locking.LEVEL_INSTANCE: locking.ALL_SET,
462
463         # This opcode is acquires the node locks for all instances
464         locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
465         }
466
467     self.share_locks = {
468       locking.LEVEL_NODE_RES: 1,
469       locking.LEVEL_INSTANCE: 0,
470       locking.LEVEL_NODE_ALLOC: 1,
471       }
472
473   def DeclareLocks(self, level):
474     if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
475       self._LockInstancesNodes(primary_only=True, level=level)
476
477   def CheckPrereq(self):
478     """Check prerequisites.
479
480     This only checks the optional instance list against the existing names.
481
482     """
483     if self.wanted_names is None:
484       self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
485
486     self.wanted_instances = \
487         map(compat.snd, self.cfg.GetMultiInstanceInfoByName(self.wanted_names))
488
489   def _EnsureChildSizes(self, disk):
490     """Ensure children of the disk have the needed disk size.
491
492     This is valid mainly for DRBD8 and fixes an issue where the
493     children have smaller disk size.
494
495     @param disk: an L{ganeti.objects.Disk} object
496
497     """
498     if disk.dev_type == constants.DT_DRBD8:
499       assert disk.children, "Empty children for DRBD8?"
500       fchild = disk.children[0]
501       mismatch = fchild.size < disk.size
502       if mismatch:
503         self.LogInfo("Child disk has size %d, parent %d, fixing",
504                      fchild.size, disk.size)
505         fchild.size = disk.size
506
507       # and we recurse on this child only, not on the metadev
508       return self._EnsureChildSizes(fchild) or mismatch
509     else:
510       return False
511
512   def Exec(self, feedback_fn):
513     """Verify the size of cluster disks.
514
515     """
516     # TODO: check child disks too
517     # TODO: check differences in size between primary/secondary nodes
518     per_node_disks = {}
519     for instance in self.wanted_instances:
520       pnode = instance.primary_node
521       if pnode not in per_node_disks:
522         per_node_disks[pnode] = []
523       for idx, disk in enumerate(instance.disks):
524         per_node_disks[pnode].append((instance, idx, disk))
525
526     assert not (frozenset(per_node_disks.keys()) -
527                 self.owned_locks(locking.LEVEL_NODE_RES)), \
528       "Not owning correct locks"
529     assert not self.owned_locks(locking.LEVEL_NODE)
530
531     es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
532                                                per_node_disks.keys())
533
534     changed = []
535     for node_uuid, dskl in per_node_disks.items():
536       if not dskl:
537         # no disks on the node
538         continue
539
540       newl = [(v[2].Copy(), v[0]) for v in dskl]
541       node_name = self.cfg.GetNodeName(node_uuid)
542       result = self.rpc.call_blockdev_getdimensions(node_uuid, newl)
543       if result.fail_msg:
544         self.LogWarning("Failure in blockdev_getdimensions call to node"
545                         " %s, ignoring", node_name)
546         continue
547       if len(result.payload) != len(dskl):
548         logging.warning("Invalid result from node %s: len(dksl)=%d,"
549                         " result.payload=%s", node_name, len(dskl),
550                         result.payload)
551         self.LogWarning("Invalid result from node %s, ignoring node results",
552                         node_name)
553         continue
554       for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
555         if dimensions is None:
556           self.LogWarning("Disk %d of instance %s did not return size"
557                           " information, ignoring", idx, instance.name)
558           continue
559         if not isinstance(dimensions, (tuple, list)):
560           self.LogWarning("Disk %d of instance %s did not return valid"
561                           " dimension information, ignoring", idx,
562                           instance.name)
563           continue
564         (size, spindles) = dimensions
565         if not isinstance(size, (int, long)):
566           self.LogWarning("Disk %d of instance %s did not return valid"
567                           " size information, ignoring", idx, instance.name)
568           continue
569         size = size >> 20
570         if size != disk.size:
571           self.LogInfo("Disk %d of instance %s has mismatched size,"
572                        " correcting: recorded %d, actual %d", idx,
573                        instance.name, disk.size, size)
574           disk.size = size
575           self.cfg.Update(instance, feedback_fn)
576           changed.append((instance.name, idx, "size", size))
577         if es_flags[node_uuid]:
578           if spindles is None:
579             self.LogWarning("Disk %d of instance %s did not return valid"
580                             " spindles information, ignoring", idx,
581                             instance.name)
582           elif disk.spindles is None or disk.spindles != spindles:
583             self.LogInfo("Disk %d of instance %s has mismatched spindles,"
584                          " correcting: recorded %s, actual %s",
585                          idx, instance.name, disk.spindles, spindles)
586             disk.spindles = spindles
587             self.cfg.Update(instance, feedback_fn)
588             changed.append((instance.name, idx, "spindles", disk.spindles))
589         if self._EnsureChildSizes(disk):
590           self.cfg.Update(instance, feedback_fn)
591           changed.append((instance.name, idx, "size", disk.size))
592     return changed
593
594
595 def _ValidateNetmask(cfg, netmask):
596   """Checks if a netmask is valid.
597
598   @type cfg: L{config.ConfigWriter}
599   @param cfg: The cluster configuration
600   @type netmask: int
601   @param netmask: the netmask to be verified
602   @raise errors.OpPrereqError: if the validation fails
603
604   """
605   ip_family = cfg.GetPrimaryIPFamily()
606   try:
607     ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
608   except errors.ProgrammerError:
609     raise errors.OpPrereqError("Invalid primary ip family: %s." %
610                                ip_family, errors.ECODE_INVAL)
611   if not ipcls.ValidateNetmask(netmask):
612     raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
613                                (netmask), errors.ECODE_INVAL)
614
615
616 def CheckFileBasedStoragePathVsEnabledDiskTemplates(
617     logging_warn_fn, file_storage_dir, enabled_disk_templates,
618     file_disk_template):
619   """Checks whether the given file-based storage directory is acceptable.
620
621   Note: This function is public, because it is also used in bootstrap.py.
622
623   @type logging_warn_fn: function
624   @param logging_warn_fn: function which accepts a string and logs it
625   @type file_storage_dir: string
626   @param file_storage_dir: the directory to be used for file-based instances
627   @type enabled_disk_templates: list of string
628   @param enabled_disk_templates: the list of enabled disk templates
629   @type file_disk_template: string
630   @param file_disk_template: the file-based disk template for which the
631       path should be checked
632
633   """
634   assert (file_disk_template in
635           utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
636   file_storage_enabled = file_disk_template in enabled_disk_templates
637   if file_storage_dir is not None:
638     if file_storage_dir == "":
639       if file_storage_enabled:
640         raise errors.OpPrereqError(
641             "Unsetting the '%s' storage directory while having '%s' storage"
642             " enabled is not permitted." %
643             (file_disk_template, file_disk_template))
644     else:
645       if not file_storage_enabled:
646         logging_warn_fn(
647             "Specified a %s storage directory, although %s storage is not"
648             " enabled." % (file_disk_template, file_disk_template))
649   else:
650     raise errors.ProgrammerError("Received %s storage dir with value"
651                                  " 'None'." % file_disk_template)
652
653
654 def CheckFileStoragePathVsEnabledDiskTemplates(
655     logging_warn_fn, file_storage_dir, enabled_disk_templates):
656   """Checks whether the given file storage directory is acceptable.
657
658   @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
659
660   """
661   CheckFileBasedStoragePathVsEnabledDiskTemplates(
662       logging_warn_fn, file_storage_dir, enabled_disk_templates,
663       constants.DT_FILE)
664
665
666 def CheckSharedFileStoragePathVsEnabledDiskTemplates(
667     logging_warn_fn, file_storage_dir, enabled_disk_templates):
668   """Checks whether the given shared file storage directory is acceptable.
669
670   @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
671
672   """
673   CheckFileBasedStoragePathVsEnabledDiskTemplates(
674       logging_warn_fn, file_storage_dir, enabled_disk_templates,
675       constants.DT_SHARED_FILE)
676
677
678 class LUClusterSetParams(LogicalUnit):
679   """Change the parameters of the cluster.
680
681   """
682   HPATH = "cluster-modify"
683   HTYPE = constants.HTYPE_CLUSTER
684   REQ_BGL = False
685
686   def CheckArguments(self):
687     """Check parameters
688
689     """
690     if self.op.uid_pool:
691       uidpool.CheckUidPool(self.op.uid_pool)
692
693     if self.op.add_uids:
694       uidpool.CheckUidPool(self.op.add_uids)
695
696     if self.op.remove_uids:
697       uidpool.CheckUidPool(self.op.remove_uids)
698
699     if self.op.master_netmask is not None:
700       _ValidateNetmask(self.cfg, self.op.master_netmask)
701
702     if self.op.diskparams:
703       for dt_params in self.op.diskparams.values():
704         utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
705       try:
706         utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
707       except errors.OpPrereqError, err:
708         raise errors.OpPrereqError("While verify diskparams options: %s" % err,
709                                    errors.ECODE_INVAL)
710
711   def ExpandNames(self):
712     # FIXME: in the future maybe other cluster params won't require checking on
713     # all nodes to be modified.
714     # FIXME: This opcode changes cluster-wide settings. Is acquiring all
715     # resource locks the right thing, shouldn't it be the BGL instead?
716     self.needed_locks = {
717       locking.LEVEL_NODE: locking.ALL_SET,
718       locking.LEVEL_INSTANCE: locking.ALL_SET,
719       locking.LEVEL_NODEGROUP: locking.ALL_SET,
720       locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
721     }
722     self.share_locks = ShareAll()
723
724   def BuildHooksEnv(self):
725     """Build hooks env.
726
727     """
728     return {
729       "OP_TARGET": self.cfg.GetClusterName(),
730       "NEW_VG_NAME": self.op.vg_name,
731       }
732
733   def BuildHooksNodes(self):
734     """Build hooks nodes.
735
736     """
737     mn = self.cfg.GetMasterNode()
738     return ([mn], [mn])
739
740   def _CheckVgName(self, node_uuids, enabled_disk_templates,
741                    new_enabled_disk_templates):
742     """Check the consistency of the vg name on all nodes and in case it gets
743        unset whether there are instances still using it.
744
745     """
746     lvm_is_enabled = utils.IsLvmEnabled(enabled_disk_templates)
747     lvm_gets_enabled = utils.LvmGetsEnabled(enabled_disk_templates,
748                                             new_enabled_disk_templates)
749     current_vg_name = self.cfg.GetVGName()
750
751     if self.op.vg_name == '':
752       if lvm_is_enabled:
753         raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
754                                    " disk templates are or get enabled.")
755
756     if self.op.vg_name is None:
757       if current_vg_name is None and lvm_is_enabled:
758         raise errors.OpPrereqError("Please specify a volume group when"
759                                    " enabling lvm-based disk-templates.")
760
761     if self.op.vg_name is not None and not self.op.vg_name:
762       if self.cfg.HasAnyDiskOfType(constants.DT_PLAIN):
763         raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
764                                    " instances exist", errors.ECODE_INVAL)
765
766     if (self.op.vg_name is not None and lvm_is_enabled) or \
767         (self.cfg.GetVGName() is not None and lvm_gets_enabled):
768       self._CheckVgNameOnNodes(node_uuids)
769
770   def _CheckVgNameOnNodes(self, node_uuids):
771     """Check the status of the volume group on each node.
772
773     """
774     vglist = self.rpc.call_vg_list(node_uuids)
775     for node_uuid in node_uuids:
776       msg = vglist[node_uuid].fail_msg
777       if msg:
778         # ignoring down node
779         self.LogWarning("Error while gathering data on node %s"
780                         " (ignoring node): %s",
781                         self.cfg.GetNodeName(node_uuid), msg)
782         continue
783       vgstatus = utils.CheckVolumeGroupSize(vglist[node_uuid].payload,
784                                             self.op.vg_name,
785                                             constants.MIN_VG_SIZE)
786       if vgstatus:
787         raise errors.OpPrereqError("Error on node '%s': %s" %
788                                    (self.cfg.GetNodeName(node_uuid), vgstatus),
789                                    errors.ECODE_ENVIRON)
790
791   @staticmethod
792   def _GetEnabledDiskTemplatesInner(op_enabled_disk_templates,
793                                     old_enabled_disk_templates):
794     """Determines the enabled disk templates and the subset of disk templates
795        that are newly enabled by this operation.
796
797     """
798     enabled_disk_templates = None
799     new_enabled_disk_templates = []
800     if op_enabled_disk_templates:
801       enabled_disk_templates = op_enabled_disk_templates
802       new_enabled_disk_templates = \
803         list(set(enabled_disk_templates)
804              - set(old_enabled_disk_templates))
805     else:
806       enabled_disk_templates = old_enabled_disk_templates
807     return (enabled_disk_templates, new_enabled_disk_templates)
808
809   def _GetEnabledDiskTemplates(self, cluster):
810     """Determines the enabled disk templates and the subset of disk templates
811        that are newly enabled by this operation.
812
813     """
814     return self._GetEnabledDiskTemplatesInner(self.op.enabled_disk_templates,
815                                               cluster.enabled_disk_templates)
816
817   def _CheckIpolicy(self, cluster, enabled_disk_templates):
818     """Checks the ipolicy.
819
820     @type cluster: C{objects.Cluster}
821     @param cluster: the cluster's configuration
822     @type enabled_disk_templates: list of string
823     @param enabled_disk_templates: list of (possibly newly) enabled disk
824       templates
825
826     """
827     # FIXME: write unit tests for this
828     if self.op.ipolicy:
829       self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
830                                            group_policy=False)
831
832       CheckIpolicyVsDiskTemplates(self.new_ipolicy,
833                                   enabled_disk_templates)
834
835       all_instances = self.cfg.GetAllInstancesInfo().values()
836       violations = set()
837       for group in self.cfg.GetAllNodeGroupsInfo().values():
838         instances = frozenset([inst for inst in all_instances
839                                if compat.any(nuuid in group.members
840                                              for nuuid in inst.all_nodes)])
841         new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
842         ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
843         new = ComputeNewInstanceViolations(ipol, new_ipolicy, instances,
844                                            self.cfg)
845         if new:
846           violations.update(new)
847
848       if violations:
849         self.LogWarning("After the ipolicy change the following instances"
850                         " violate them: %s",
851                         utils.CommaJoin(utils.NiceSort(violations)))
852     else:
853       CheckIpolicyVsDiskTemplates(cluster.ipolicy,
854                                   enabled_disk_templates)
855
856   def _CheckDrbdHelperOnNodes(self, drbd_helper, node_uuids):
857     """Checks whether the set DRBD helper actually exists on the nodes.
858
859     @type drbd_helper: string
860     @param drbd_helper: path of the drbd usermode helper binary
861     @type node_uuids: list of strings
862     @param node_uuids: list of node UUIDs to check for the helper
863
864     """
865     # checks given drbd helper on all nodes
866     helpers = self.rpc.call_drbd_helper(node_uuids)
867     for (_, ninfo) in self.cfg.GetMultiNodeInfo(node_uuids):
868       if ninfo.offline:
869         self.LogInfo("Not checking drbd helper on offline node %s",
870                      ninfo.name)
871         continue
872       msg = helpers[ninfo.uuid].fail_msg
873       if msg:
874         raise errors.OpPrereqError("Error checking drbd helper on node"
875                                    " '%s': %s" % (ninfo.name, msg),
876                                    errors.ECODE_ENVIRON)
877       node_helper = helpers[ninfo.uuid].payload
878       if node_helper != drbd_helper:
879         raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
880                                    (ninfo.name, node_helper),
881                                    errors.ECODE_ENVIRON)
882
883   def _CheckDrbdHelper(self, node_uuids, drbd_enabled, drbd_gets_enabled):
884     """Check the DRBD usermode helper.
885
886     @type node_uuids: list of strings
887     @param node_uuids: a list of nodes' UUIDs
888     @type drbd_enabled: boolean
889     @param drbd_enabled: whether DRBD will be enabled after this operation
890       (no matter if it was disabled before or not)
891     @type drbd_gets_enabled: boolen
892     @param drbd_gets_enabled: true if DRBD was disabled before this
893       operation, but will be enabled afterwards
894
895     """
896     if self.op.drbd_helper == '':
897       if drbd_enabled:
898         raise errors.OpPrereqError("Cannot disable drbd helper while"
899                                    " DRBD is enabled.")
900       if self.cfg.HasAnyDiskOfType(constants.DT_DRBD8):
901         raise errors.OpPrereqError("Cannot disable drbd helper while"
902                                    " drbd-based instances exist",
903                                    errors.ECODE_INVAL)
904
905     else:
906       if self.op.drbd_helper is not None and drbd_enabled:
907         self._CheckDrbdHelperOnNodes(self.op.drbd_helper, node_uuids)
908       else:
909         if drbd_gets_enabled:
910           current_drbd_helper = self.cfg.GetClusterInfo().drbd_usermode_helper
911           if current_drbd_helper is not None:
912             self._CheckDrbdHelperOnNodes(current_drbd_helper, node_uuids)
913           else:
914             raise errors.OpPrereqError("Cannot enable DRBD without a"
915                                        " DRBD usermode helper set.")
916
917   def CheckPrereq(self):
918     """Check prerequisites.
919
920     This checks whether the given params don't conflict and
921     if the given volume group is valid.
922
923     """
924     node_uuids = self.owned_locks(locking.LEVEL_NODE)
925     self.cluster = cluster = self.cfg.GetClusterInfo()
926
927     vm_capable_node_uuids = [node.uuid
928                              for node in self.cfg.GetAllNodesInfo().values()
929                              if node.uuid in node_uuids and node.vm_capable]
930
931     (enabled_disk_templates, new_enabled_disk_templates) = \
932       self._GetEnabledDiskTemplates(cluster)
933
934     self._CheckVgName(vm_capable_node_uuids, enabled_disk_templates,
935                       new_enabled_disk_templates)
936
937     if self.op.file_storage_dir is not None:
938       CheckFileStoragePathVsEnabledDiskTemplates(
939           self.LogWarning, self.op.file_storage_dir, enabled_disk_templates)
940
941     if self.op.shared_file_storage_dir is not None:
942       CheckSharedFileStoragePathVsEnabledDiskTemplates(
943           self.LogWarning, self.op.shared_file_storage_dir,
944           enabled_disk_templates)
945
946     drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
947     drbd_gets_enabled = constants.DT_DRBD8 in new_enabled_disk_templates
948     self._CheckDrbdHelper(node_uuids, drbd_enabled, drbd_gets_enabled)
949
950     # validate params changes
951     if self.op.beparams:
952       objects.UpgradeBeParams(self.op.beparams)
953       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
954       self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
955
956     if self.op.ndparams:
957       utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
958       self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
959
960       # TODO: we need a more general way to handle resetting
961       # cluster-level parameters to default values
962       if self.new_ndparams["oob_program"] == "":
963         self.new_ndparams["oob_program"] = \
964             constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
965
966     if self.op.hv_state:
967       new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
968                                            self.cluster.hv_state_static)
969       self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
970                                for hv, values in new_hv_state.items())
971
972     if self.op.disk_state:
973       new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
974                                                self.cluster.disk_state_static)
975       self.new_disk_state = \
976         dict((storage, dict((name, cluster.SimpleFillDiskState(values))
977                             for name, values in svalues.items()))
978              for storage, svalues in new_disk_state.items())
979
980     self._CheckIpolicy(cluster, enabled_disk_templates)
981
982     if self.op.nicparams:
983       utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
984       self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
985       objects.NIC.CheckParameterSyntax(self.new_nicparams)
986       nic_errors = []
987
988       # check all instances for consistency
989       for instance in self.cfg.GetAllInstancesInfo().values():
990         for nic_idx, nic in enumerate(instance.nics):
991           params_copy = copy.deepcopy(nic.nicparams)
992           params_filled = objects.FillDict(self.new_nicparams, params_copy)
993
994           # check parameter syntax
995           try:
996             objects.NIC.CheckParameterSyntax(params_filled)
997           except errors.ConfigurationError, err:
998             nic_errors.append("Instance %s, nic/%d: %s" %
999                               (instance.name, nic_idx, err))
1000
1001           # if we're moving instances to routed, check that they have an ip
1002           target_mode = params_filled[constants.NIC_MODE]
1003           if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
1004             nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
1005                               " address" % (instance.name, nic_idx))
1006       if nic_errors:
1007         raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
1008                                    "\n".join(nic_errors), errors.ECODE_INVAL)
1009
1010     # hypervisor list/parameters
1011     self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
1012     if self.op.hvparams:
1013       for hv_name, hv_dict in self.op.hvparams.items():
1014         if hv_name not in self.new_hvparams:
1015           self.new_hvparams[hv_name] = hv_dict
1016         else:
1017           self.new_hvparams[hv_name].update(hv_dict)
1018
1019     # disk template parameters
1020     self.new_diskparams = objects.FillDict(cluster.diskparams, {})
1021     if self.op.diskparams:
1022       for dt_name, dt_params in self.op.diskparams.items():
1023         if dt_name not in self.new_diskparams:
1024           self.new_diskparams[dt_name] = dt_params
1025         else:
1026           self.new_diskparams[dt_name].update(dt_params)
1027
1028     # os hypervisor parameters
1029     self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
1030     if self.op.os_hvp:
1031       for os_name, hvs in self.op.os_hvp.items():
1032         if os_name not in self.new_os_hvp:
1033           self.new_os_hvp[os_name] = hvs
1034         else:
1035           for hv_name, hv_dict in hvs.items():
1036             if hv_dict is None:
1037               # Delete if it exists
1038               self.new_os_hvp[os_name].pop(hv_name, None)
1039             elif hv_name not in self.new_os_hvp[os_name]:
1040               self.new_os_hvp[os_name][hv_name] = hv_dict
1041             else:
1042               self.new_os_hvp[os_name][hv_name].update(hv_dict)
1043
1044     # os parameters
1045     self.new_osp = objects.FillDict(cluster.osparams, {})
1046     if self.op.osparams:
1047       for os_name, osp in self.op.osparams.items():
1048         if os_name not in self.new_osp:
1049           self.new_osp[os_name] = {}
1050
1051         self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
1052                                                  use_none=True)
1053
1054         if not self.new_osp[os_name]:
1055           # we removed all parameters
1056           del self.new_osp[os_name]
1057         else:
1058           # check the parameter validity (remote check)
1059           CheckOSParams(self, False, [self.cfg.GetMasterNode()],
1060                         os_name, self.new_osp[os_name])
1061
1062     # changes to the hypervisor list
1063     if self.op.enabled_hypervisors is not None:
1064       self.hv_list = self.op.enabled_hypervisors
1065       for hv in self.hv_list:
1066         # if the hypervisor doesn't already exist in the cluster
1067         # hvparams, we initialize it to empty, and then (in both
1068         # cases) we make sure to fill the defaults, as we might not
1069         # have a complete defaults list if the hypervisor wasn't
1070         # enabled before
1071         if hv not in new_hvp:
1072           new_hvp[hv] = {}
1073         new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
1074         utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
1075     else:
1076       self.hv_list = cluster.enabled_hypervisors
1077
1078     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1079       # either the enabled list has changed, or the parameters have, validate
1080       for hv_name, hv_params in self.new_hvparams.items():
1081         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1082             (self.op.enabled_hypervisors and
1083              hv_name in self.op.enabled_hypervisors)):
1084           # either this is a new hypervisor, or its parameters have changed
1085           hv_class = hypervisor.GetHypervisorClass(hv_name)
1086           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1087           hv_class.CheckParameterSyntax(hv_params)
1088           CheckHVParams(self, node_uuids, hv_name, hv_params)
1089
1090     self._CheckDiskTemplateConsistency()
1091
1092     if self.op.os_hvp:
1093       # no need to check any newly-enabled hypervisors, since the
1094       # defaults have already been checked in the above code-block
1095       for os_name, os_hvp in self.new_os_hvp.items():
1096         for hv_name, hv_params in os_hvp.items():
1097           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1098           # we need to fill in the new os_hvp on top of the actual hv_p
1099           cluster_defaults = self.new_hvparams.get(hv_name, {})
1100           new_osp = objects.FillDict(cluster_defaults, hv_params)
1101           hv_class = hypervisor.GetHypervisorClass(hv_name)
1102           hv_class.CheckParameterSyntax(new_osp)
1103           CheckHVParams(self, node_uuids, hv_name, new_osp)
1104
1105     if self.op.default_iallocator:
1106       alloc_script = utils.FindFile(self.op.default_iallocator,
1107                                     constants.IALLOCATOR_SEARCH_PATH,
1108                                     os.path.isfile)
1109       if alloc_script is None:
1110         raise errors.OpPrereqError("Invalid default iallocator script '%s'"
1111                                    " specified" % self.op.default_iallocator,
1112                                    errors.ECODE_INVAL)
1113
1114   def _CheckDiskTemplateConsistency(self):
1115     """Check whether the disk templates that are going to be disabled
1116        are still in use by some instances.
1117
1118     """
1119     if self.op.enabled_disk_templates:
1120       cluster = self.cfg.GetClusterInfo()
1121       instances = self.cfg.GetAllInstancesInfo()
1122
1123       disk_templates_to_remove = set(cluster.enabled_disk_templates) \
1124         - set(self.op.enabled_disk_templates)
1125       for instance in instances.itervalues():
1126         if instance.disk_template in disk_templates_to_remove:
1127           raise errors.OpPrereqError("Cannot disable disk template '%s',"
1128                                      " because instance '%s' is using it." %
1129                                      (instance.disk_template, instance.name))
1130
1131   def _SetVgName(self, feedback_fn):
1132     """Determines and sets the new volume group name.
1133
1134     """
1135     if self.op.vg_name is not None:
1136       new_volume = self.op.vg_name
1137       if not new_volume:
1138         new_volume = None
1139       if new_volume != self.cfg.GetVGName():
1140         self.cfg.SetVGName(new_volume)
1141       else:
1142         feedback_fn("Cluster LVM configuration already in desired"
1143                     " state, not changing")
1144
1145   def _SetFileStorageDir(self, feedback_fn):
1146     """Set the file storage directory.
1147
1148     """
1149     if self.op.file_storage_dir is not None:
1150       if self.cluster.file_storage_dir == self.op.file_storage_dir:
1151         feedback_fn("Global file storage dir already set to value '%s'"
1152                     % self.cluster.file_storage_dir)
1153       else:
1154         self.cluster.file_storage_dir = self.op.file_storage_dir
1155
1156   def _SetDrbdHelper(self, feedback_fn):
1157     """Set the DRBD usermode helper.
1158
1159     """
1160     if self.op.drbd_helper is not None:
1161       if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
1162         feedback_fn("Note that you specified a drbd user helper, but did not"
1163                     " enable the drbd disk template.")
1164       new_helper = self.op.drbd_helper
1165       if not new_helper:
1166         new_helper = None
1167       if new_helper != self.cfg.GetDRBDHelper():
1168         self.cfg.SetDRBDHelper(new_helper)
1169       else:
1170         feedback_fn("Cluster DRBD helper already in desired state,"
1171                     " not changing")
1172
1173   def Exec(self, feedback_fn):
1174     """Change the parameters of the cluster.
1175
1176     """
1177     if self.op.enabled_disk_templates:
1178       self.cluster.enabled_disk_templates = \
1179         list(set(self.op.enabled_disk_templates))
1180
1181     self._SetVgName(feedback_fn)
1182     self._SetFileStorageDir(feedback_fn)
1183     self._SetDrbdHelper(feedback_fn)
1184
1185     if self.op.hvparams:
1186       self.cluster.hvparams = self.new_hvparams
1187     if self.op.os_hvp:
1188       self.cluster.os_hvp = self.new_os_hvp
1189     if self.op.enabled_hypervisors is not None:
1190       self.cluster.hvparams = self.new_hvparams
1191       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1192     if self.op.beparams:
1193       self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1194     if self.op.nicparams:
1195       self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1196     if self.op.ipolicy:
1197       self.cluster.ipolicy = self.new_ipolicy
1198     if self.op.osparams:
1199       self.cluster.osparams = self.new_osp
1200     if self.op.ndparams:
1201       self.cluster.ndparams = self.new_ndparams
1202     if self.op.diskparams:
1203       self.cluster.diskparams = self.new_diskparams
1204     if self.op.hv_state:
1205       self.cluster.hv_state_static = self.new_hv_state
1206     if self.op.disk_state:
1207       self.cluster.disk_state_static = self.new_disk_state
1208
1209     if self.op.candidate_pool_size is not None:
1210       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1211       # we need to update the pool size here, otherwise the save will fail
1212       AdjustCandidatePool(self, [])
1213
1214     if self.op.maintain_node_health is not None:
1215       if self.op.maintain_node_health and not constants.ENABLE_CONFD:
1216         feedback_fn("Note: CONFD was disabled at build time, node health"
1217                     " maintenance is not useful (still enabling it)")
1218       self.cluster.maintain_node_health = self.op.maintain_node_health
1219
1220     if self.op.modify_etc_hosts is not None:
1221       self.cluster.modify_etc_hosts = self.op.modify_etc_hosts
1222
1223     if self.op.prealloc_wipe_disks is not None:
1224       self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
1225
1226     if self.op.add_uids is not None:
1227       uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
1228
1229     if self.op.remove_uids is not None:
1230       uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
1231
1232     if self.op.uid_pool is not None:
1233       self.cluster.uid_pool = self.op.uid_pool
1234
1235     if self.op.default_iallocator is not None:
1236       self.cluster.default_iallocator = self.op.default_iallocator
1237
1238     if self.op.reserved_lvs is not None:
1239       self.cluster.reserved_lvs = self.op.reserved_lvs
1240
1241     if self.op.use_external_mip_script is not None:
1242       self.cluster.use_external_mip_script = self.op.use_external_mip_script
1243
1244     def helper_os(aname, mods, desc):
1245       desc += " OS list"
1246       lst = getattr(self.cluster, aname)
1247       for key, val in mods:
1248         if key == constants.DDM_ADD:
1249           if val in lst:
1250             feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1251           else:
1252             lst.append(val)
1253         elif key == constants.DDM_REMOVE:
1254           if val in lst:
1255             lst.remove(val)
1256           else:
1257             feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1258         else:
1259           raise errors.ProgrammerError("Invalid modification '%s'" % key)
1260
1261     if self.op.hidden_os:
1262       helper_os("hidden_os", self.op.hidden_os, "hidden")
1263
1264     if self.op.blacklisted_os:
1265       helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1266
1267     if self.op.master_netdev:
1268       master_params = self.cfg.GetMasterNetworkParameters()
1269       ems = self.cfg.GetUseExternalMipScript()
1270       feedback_fn("Shutting down master ip on the current netdev (%s)" %
1271                   self.cluster.master_netdev)
1272       result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
1273                                                        master_params, ems)
1274       if not self.op.force:
1275         result.Raise("Could not disable the master ip")
1276       else:
1277         if result.fail_msg:
1278           msg = ("Could not disable the master ip (continuing anyway): %s" %
1279                  result.fail_msg)
1280           feedback_fn(msg)
1281       feedback_fn("Changing master_netdev from %s to %s" %
1282                   (master_params.netdev, self.op.master_netdev))
1283       self.cluster.master_netdev = self.op.master_netdev
1284
1285     if self.op.master_netmask:
1286       master_params = self.cfg.GetMasterNetworkParameters()
1287       feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1288       result = self.rpc.call_node_change_master_netmask(
1289                  master_params.uuid, master_params.netmask,
1290                  self.op.master_netmask, master_params.ip,
1291                  master_params.netdev)
1292       result.Warn("Could not change the master IP netmask", feedback_fn)
1293       self.cluster.master_netmask = self.op.master_netmask
1294
1295     self.cfg.Update(self.cluster, feedback_fn)
1296
1297     if self.op.master_netdev:
1298       master_params = self.cfg.GetMasterNetworkParameters()
1299       feedback_fn("Starting the master ip on the new master netdev (%s)" %
1300                   self.op.master_netdev)
1301       ems = self.cfg.GetUseExternalMipScript()
1302       result = self.rpc.call_node_activate_master_ip(master_params.uuid,
1303                                                      master_params, ems)
1304       result.Warn("Could not re-enable the master ip on the master,"
1305                   " please restart manually", self.LogWarning)
1306
1307
1308 class LUClusterVerify(NoHooksLU):
1309   """Submits all jobs necessary to verify the cluster.
1310
1311   """
1312   REQ_BGL = False
1313
1314   def ExpandNames(self):
1315     self.needed_locks = {}
1316
1317   def Exec(self, feedback_fn):
1318     jobs = []
1319
1320     if self.op.group_name:
1321       groups = [self.op.group_name]
1322       depends_fn = lambda: None
1323     else:
1324       groups = self.cfg.GetNodeGroupList()
1325
1326       # Verify global configuration
1327       jobs.append([
1328         opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1329         ])
1330
1331       # Always depend on global verification
1332       depends_fn = lambda: [(-len(jobs), [])]
1333
1334     jobs.extend(
1335       [opcodes.OpClusterVerifyGroup(group_name=group,
1336                                     ignore_errors=self.op.ignore_errors,
1337                                     depends=depends_fn())]
1338       for group in groups)
1339
1340     # Fix up all parameters
1341     for op in itertools.chain(*jobs): # pylint: disable=W0142
1342       op.debug_simulate_errors = self.op.debug_simulate_errors
1343       op.verbose = self.op.verbose
1344       op.error_codes = self.op.error_codes
1345       try:
1346         op.skip_checks = self.op.skip_checks
1347       except AttributeError:
1348         assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1349
1350     return ResultWithJobs(jobs)
1351
1352
1353 class _VerifyErrors(object):
1354   """Mix-in for cluster/group verify LUs.
1355
1356   It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1357   self.op and self._feedback_fn to be available.)
1358
1359   """
1360
1361   ETYPE_FIELD = "code"
1362   ETYPE_ERROR = "ERROR"
1363   ETYPE_WARNING = "WARNING"
1364
1365   def _Error(self, ecode, item, msg, *args, **kwargs):
1366     """Format an error message.
1367
1368     Based on the opcode's error_codes parameter, either format a
1369     parseable error code, or a simpler error string.
1370
1371     This must be called only from Exec and functions called from Exec.
1372
1373     """
1374     ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1375     itype, etxt, _ = ecode
1376     # If the error code is in the list of ignored errors, demote the error to a
1377     # warning
1378     if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1379       ltype = self.ETYPE_WARNING
1380     # first complete the msg
1381     if args:
1382       msg = msg % args
1383     # then format the whole message
1384     if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1385       msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1386     else:
1387       if item:
1388         item = " " + item
1389       else:
1390         item = ""
1391       msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1392     # and finally report it via the feedback_fn
1393     self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1394     # do not mark the operation as failed for WARN cases only
1395     if ltype == self.ETYPE_ERROR:
1396       self.bad = True
1397
1398   def _ErrorIf(self, cond, *args, **kwargs):
1399     """Log an error message if the passed condition is True.
1400
1401     """
1402     if (bool(cond)
1403         or self.op.debug_simulate_errors): # pylint: disable=E1101
1404       self._Error(*args, **kwargs)
1405
1406
1407 def _VerifyCertificate(filename):
1408   """Verifies a certificate for L{LUClusterVerifyConfig}.
1409
1410   @type filename: string
1411   @param filename: Path to PEM file
1412
1413   """
1414   try:
1415     cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1416                                            utils.ReadFile(filename))
1417   except Exception, err: # pylint: disable=W0703
1418     return (LUClusterVerifyConfig.ETYPE_ERROR,
1419             "Failed to load X509 certificate %s: %s" % (filename, err))
1420
1421   (errcode, msg) = \
1422     utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1423                                 constants.SSL_CERT_EXPIRATION_ERROR)
1424
1425   if msg:
1426     fnamemsg = "While verifying %s: %s" % (filename, msg)
1427   else:
1428     fnamemsg = None
1429
1430   if errcode is None:
1431     return (None, fnamemsg)
1432   elif errcode == utils.CERT_WARNING:
1433     return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1434   elif errcode == utils.CERT_ERROR:
1435     return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1436
1437   raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1438
1439
1440 def _GetAllHypervisorParameters(cluster, instances):
1441   """Compute the set of all hypervisor parameters.
1442
1443   @type cluster: L{objects.Cluster}
1444   @param cluster: the cluster object
1445   @param instances: list of L{objects.Instance}
1446   @param instances: additional instances from which to obtain parameters
1447   @rtype: list of (origin, hypervisor, parameters)
1448   @return: a list with all parameters found, indicating the hypervisor they
1449        apply to, and the origin (can be "cluster", "os X", or "instance Y")
1450
1451   """
1452   hvp_data = []
1453
1454   for hv_name in cluster.enabled_hypervisors:
1455     hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1456
1457   for os_name, os_hvp in cluster.os_hvp.items():
1458     for hv_name, hv_params in os_hvp.items():
1459       if hv_params:
1460         full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1461         hvp_data.append(("os %s" % os_name, hv_name, full_params))
1462
1463   # TODO: collapse identical parameter values in a single one
1464   for instance in instances:
1465     if instance.hvparams:
1466       hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1467                        cluster.FillHV(instance)))
1468
1469   return hvp_data
1470
1471
1472 class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1473   """Verifies the cluster config.
1474
1475   """
1476   REQ_BGL = False
1477
1478   def _VerifyHVP(self, hvp_data):
1479     """Verifies locally the syntax of the hypervisor parameters.
1480
1481     """
1482     for item, hv_name, hv_params in hvp_data:
1483       msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1484              (item, hv_name))
1485       try:
1486         hv_class = hypervisor.GetHypervisorClass(hv_name)
1487         utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1488         hv_class.CheckParameterSyntax(hv_params)
1489       except errors.GenericError, err:
1490         self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1491
1492   def ExpandNames(self):
1493     self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1494     self.share_locks = ShareAll()
1495
1496   def CheckPrereq(self):
1497     """Check prerequisites.
1498
1499     """
1500     # Retrieve all information
1501     self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1502     self.all_node_info = self.cfg.GetAllNodesInfo()
1503     self.all_inst_info = self.cfg.GetAllInstancesInfo()
1504
1505   def Exec(self, feedback_fn):
1506     """Verify integrity of cluster, performing various test on nodes.
1507
1508     """
1509     self.bad = False
1510     self._feedback_fn = feedback_fn
1511
1512     feedback_fn("* Verifying cluster config")
1513
1514     for msg in self.cfg.VerifyConfig():
1515       self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1516
1517     feedback_fn("* Verifying cluster certificate files")
1518
1519     for cert_filename in pathutils.ALL_CERT_FILES:
1520       (errcode, msg) = _VerifyCertificate(cert_filename)
1521       self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1522
1523     self._ErrorIf(not utils.CanRead(constants.LUXID_USER,
1524                                     pathutils.NODED_CERT_FILE),
1525                   constants.CV_ECLUSTERCERT,
1526                   None,
1527                   pathutils.NODED_CERT_FILE + " must be accessible by the " +
1528                     constants.LUXID_USER + " user")
1529
1530     feedback_fn("* Verifying hypervisor parameters")
1531
1532     self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1533                                                 self.all_inst_info.values()))
1534
1535     feedback_fn("* Verifying all nodes belong to an existing group")
1536
1537     # We do this verification here because, should this bogus circumstance
1538     # occur, it would never be caught by VerifyGroup, which only acts on
1539     # nodes/instances reachable from existing node groups.
1540
1541     dangling_nodes = set(node for node in self.all_node_info.values()
1542                          if node.group not in self.all_group_info)
1543
1544     dangling_instances = {}
1545     no_node_instances = []
1546
1547     for inst in self.all_inst_info.values():
1548       if inst.primary_node in [node.uuid for node in dangling_nodes]:
1549         dangling_instances.setdefault(inst.primary_node, []).append(inst)
1550       elif inst.primary_node not in self.all_node_info:
1551         no_node_instances.append(inst)
1552
1553     pretty_dangling = [
1554         "%s (%s)" %
1555         (node.name,
1556          utils.CommaJoin(inst.name for
1557                          inst in dangling_instances.get(node.uuid, [])))
1558         for node in dangling_nodes]
1559
1560     self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1561                   None,
1562                   "the following nodes (and their instances) belong to a non"
1563                   " existing group: %s", utils.CommaJoin(pretty_dangling))
1564
1565     self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1566                   None,
1567                   "the following instances have a non-existing primary-node:"
1568                   " %s", utils.CommaJoin(inst.name for
1569                                          inst in no_node_instances))
1570
1571     return not self.bad
1572
1573
1574 class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1575   """Verifies the status of a node group.
1576
1577   """
1578   HPATH = "cluster-verify"
1579   HTYPE = constants.HTYPE_CLUSTER
1580   REQ_BGL = False
1581
1582   _HOOKS_INDENT_RE = re.compile("^", re.M)
1583
1584   class NodeImage(object):
1585     """A class representing the logical and physical status of a node.
1586
1587     @type uuid: string
1588     @ivar uuid: the node UUID to which this object refers
1589     @ivar volumes: a structure as returned from
1590         L{ganeti.backend.GetVolumeList} (runtime)
1591     @ivar instances: a list of running instances (runtime)
1592     @ivar pinst: list of configured primary instances (config)
1593     @ivar sinst: list of configured secondary instances (config)
1594     @ivar sbp: dictionary of {primary-node: list of instances} for all
1595         instances for which this node is secondary (config)
1596     @ivar mfree: free memory, as reported by hypervisor (runtime)
1597     @ivar dfree: free disk, as reported by the node (runtime)
1598     @ivar offline: the offline status (config)
1599     @type rpc_fail: boolean
1600     @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1601         not whether the individual keys were correct) (runtime)
1602     @type lvm_fail: boolean
1603     @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1604     @type hyp_fail: boolean
1605     @ivar hyp_fail: whether the RPC call didn't return the instance list
1606     @type ghost: boolean
1607     @ivar ghost: whether this is a known node or not (config)
1608     @type os_fail: boolean
1609     @ivar os_fail: whether the RPC call didn't return valid OS data
1610     @type oslist: list
1611     @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1612     @type vm_capable: boolean
1613     @ivar vm_capable: whether the node can host instances
1614     @type pv_min: float
1615     @ivar pv_min: size in MiB of the smallest PVs
1616     @type pv_max: float
1617     @ivar pv_max: size in MiB of the biggest PVs
1618
1619     """
1620     def __init__(self, offline=False, uuid=None, vm_capable=True):
1621       self.uuid = uuid
1622       self.volumes = {}
1623       self.instances = []
1624       self.pinst = []
1625       self.sinst = []
1626       self.sbp = {}
1627       self.mfree = 0
1628       self.dfree = 0
1629       self.offline = offline
1630       self.vm_capable = vm_capable
1631       self.rpc_fail = False
1632       self.lvm_fail = False
1633       self.hyp_fail = False
1634       self.ghost = False
1635       self.os_fail = False
1636       self.oslist = {}
1637       self.pv_min = None
1638       self.pv_max = None
1639
1640   def ExpandNames(self):
1641     # This raises errors.OpPrereqError on its own:
1642     self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1643
1644     # Get instances in node group; this is unsafe and needs verification later
1645     inst_uuids = \
1646       self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1647
1648     self.needed_locks = {
1649       locking.LEVEL_INSTANCE: self.cfg.GetInstanceNames(inst_uuids),
1650       locking.LEVEL_NODEGROUP: [self.group_uuid],
1651       locking.LEVEL_NODE: [],
1652
1653       # This opcode is run by watcher every five minutes and acquires all nodes
1654       # for a group. It doesn't run for a long time, so it's better to acquire
1655       # the node allocation lock as well.
1656       locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1657       }
1658
1659     self.share_locks = ShareAll()
1660
1661   def DeclareLocks(self, level):
1662     if level == locking.LEVEL_NODE:
1663       # Get members of node group; this is unsafe and needs verification later
1664       nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1665
1666       # In Exec(), we warn about mirrored instances that have primary and
1667       # secondary living in separate node groups. To fully verify that
1668       # volumes for these instances are healthy, we will need to do an
1669       # extra call to their secondaries. We ensure here those nodes will
1670       # be locked.
1671       for inst_name in self.owned_locks(locking.LEVEL_INSTANCE):
1672         # Important: access only the instances whose lock is owned
1673         instance = self.cfg.GetInstanceInfoByName(inst_name)
1674         if instance.disk_template in constants.DTS_INT_MIRROR:
1675           nodes.update(instance.secondary_nodes)
1676
1677       self.needed_locks[locking.LEVEL_NODE] = nodes
1678
1679   def CheckPrereq(self):
1680     assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1681     self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1682
1683     group_node_uuids = set(self.group_info.members)
1684     group_inst_uuids = \
1685       self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1686
1687     unlocked_node_uuids = \
1688         group_node_uuids.difference(self.owned_locks(locking.LEVEL_NODE))
1689
1690     unlocked_inst_uuids = \
1691         group_inst_uuids.difference(
1692           [self.cfg.GetInstanceInfoByName(name).uuid
1693            for name in self.owned_locks(locking.LEVEL_INSTANCE)])
1694
1695     if unlocked_node_uuids:
1696       raise errors.OpPrereqError(
1697         "Missing lock for nodes: %s" %
1698         utils.CommaJoin(self.cfg.GetNodeNames(unlocked_node_uuids)),
1699         errors.ECODE_STATE)
1700
1701     if unlocked_inst_uuids:
1702       raise errors.OpPrereqError(
1703         "Missing lock for instances: %s" %
1704         utils.CommaJoin(self.cfg.GetInstanceNames(unlocked_inst_uuids)),
1705         errors.ECODE_STATE)
1706
1707     self.all_node_info = self.cfg.GetAllNodesInfo()
1708     self.all_inst_info = self.cfg.GetAllInstancesInfo()
1709
1710     self.my_node_uuids = group_node_uuids
1711     self.my_node_info = dict((node_uuid, self.all_node_info[node_uuid])
1712                              for node_uuid in group_node_uuids)
1713
1714     self.my_inst_uuids = group_inst_uuids
1715     self.my_inst_info = dict((inst_uuid, self.all_inst_info[inst_uuid])
1716                              for inst_uuid in group_inst_uuids)
1717
1718     # We detect here the nodes that will need the extra RPC calls for verifying
1719     # split LV volumes; they should be locked.
1720     extra_lv_nodes = set()
1721
1722     for inst in self.my_inst_info.values():
1723       if inst.disk_template in constants.DTS_INT_MIRROR:
1724         for nuuid in inst.all_nodes:
1725           if self.all_node_info[nuuid].group != self.group_uuid:
1726             extra_lv_nodes.add(nuuid)
1727
1728     unlocked_lv_nodes = \
1729         extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1730
1731     if unlocked_lv_nodes:
1732       raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1733                                  utils.CommaJoin(unlocked_lv_nodes),
1734                                  errors.ECODE_STATE)
1735     self.extra_lv_nodes = list(extra_lv_nodes)
1736
1737   def _VerifyNode(self, ninfo, nresult):
1738     """Perform some basic validation on data returned from a node.
1739
1740       - check the result data structure is well formed and has all the
1741         mandatory fields
1742       - check ganeti version
1743
1744     @type ninfo: L{objects.Node}
1745     @param ninfo: the node to check
1746     @param nresult: the results from the node
1747     @rtype: boolean
1748     @return: whether overall this call was successful (and we can expect
1749          reasonable values in the respose)
1750
1751     """
1752     # main result, nresult should be a non-empty dict
1753     test = not nresult or not isinstance(nresult, dict)
1754     self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1755                   "unable to verify node: no data returned")
1756     if test:
1757       return False
1758
1759     # compares ganeti version
1760     local_version = constants.PROTOCOL_VERSION
1761     remote_version = nresult.get("version", None)
1762     test = not (remote_version and
1763                 isinstance(remote_version, (list, tuple)) and
1764                 len(remote_version) == 2)
1765     self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1766                   "connection to node returned invalid data")
1767     if test:
1768       return False
1769
1770     test = local_version != remote_version[0]
1771     self._ErrorIf(test, constants.CV_ENODEVERSION, ninfo.name,
1772                   "incompatible protocol versions: master %s,"
1773                   " node %s", local_version, remote_version[0])
1774     if test:
1775       return False
1776
1777     # node seems compatible, we can actually try to look into its results
1778
1779     # full package version
1780     self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1781                   constants.CV_ENODEVERSION, ninfo.name,
1782                   "software version mismatch: master %s, node %s",
1783                   constants.RELEASE_VERSION, remote_version[1],
1784                   code=self.ETYPE_WARNING)
1785
1786     hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1787     if ninfo.vm_capable and isinstance(hyp_result, dict):
1788       for hv_name, hv_result in hyp_result.iteritems():
1789         test = hv_result is not None
1790         self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
1791                       "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1792
1793     hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1794     if ninfo.vm_capable and isinstance(hvp_result, list):
1795       for item, hv_name, hv_result in hvp_result:
1796         self._ErrorIf(True, constants.CV_ENODEHV, ninfo.name,
1797                       "hypervisor %s parameter verify failure (source %s): %s",
1798                       hv_name, item, hv_result)
1799
1800     test = nresult.get(constants.NV_NODESETUP,
1801                        ["Missing NODESETUP results"])
1802     self._ErrorIf(test, constants.CV_ENODESETUP, ninfo.name,
1803                   "node setup error: %s", "; ".join(test))
1804
1805     return True
1806
1807   def _VerifyNodeTime(self, ninfo, nresult,
1808                       nvinfo_starttime, nvinfo_endtime):
1809     """Check the node time.
1810
1811     @type ninfo: L{objects.Node}
1812     @param ninfo: the node to check
1813     @param nresult: the remote results for the node
1814     @param nvinfo_starttime: the start time of the RPC call
1815     @param nvinfo_endtime: the end time of the RPC call
1816
1817     """
1818     ntime = nresult.get(constants.NV_TIME, None)
1819     try:
1820       ntime_merged = utils.MergeTime(ntime)
1821     except (ValueError, TypeError):
1822       self._ErrorIf(True, constants.CV_ENODETIME, ninfo.name,
1823                     "Node returned invalid time")
1824       return
1825
1826     if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1827       ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1828     elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1829       ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1830     else:
1831       ntime_diff = None
1832
1833     self._ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, ninfo.name,
1834                   "Node time diverges by at least %s from master node time",
1835                   ntime_diff)
1836
1837   def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1838     """Check the node LVM results and update info for cross-node checks.
1839
1840     @type ninfo: L{objects.Node}
1841     @param ninfo: the node to check
1842     @param nresult: the remote results for the node
1843     @param vg_name: the configured VG name
1844     @type nimg: L{NodeImage}
1845     @param nimg: node image
1846
1847     """
1848     if vg_name is None:
1849       return
1850
1851     # checks vg existence and size > 20G
1852     vglist = nresult.get(constants.NV_VGLIST, None)
1853     test = not vglist
1854     self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
1855                   "unable to check volume groups")
1856     if not test:
1857       vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1858                                             constants.MIN_VG_SIZE)
1859       self._ErrorIf(vgstatus, constants.CV_ENODELVM, ninfo.name, vgstatus)
1860
1861     # Check PVs
1862     (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1863     for em in errmsgs:
1864       self._Error(constants.CV_ENODELVM, ninfo.name, em)
1865     if pvminmax is not None:
1866       (nimg.pv_min, nimg.pv_max) = pvminmax
1867
1868   def _VerifyGroupDRBDVersion(self, node_verify_infos):
1869     """Check cross-node DRBD version consistency.
1870
1871     @type node_verify_infos: dict
1872     @param node_verify_infos: infos about nodes as returned from the
1873       node_verify call.
1874
1875     """
1876     node_versions = {}
1877     for node_uuid, ndata in node_verify_infos.items():
1878       nresult = ndata.payload
1879       version = nresult.get(constants.NV_DRBDVERSION, "Missing DRBD version")
1880       node_versions[node_uuid] = version
1881
1882     if len(set(node_versions.values())) > 1:
1883       for node_uuid, version in sorted(node_versions.items()):
1884         msg = "DRBD version mismatch: %s" % version
1885         self._Error(constants.CV_ENODEDRBDHELPER, node_uuid, msg,
1886                     code=self.ETYPE_WARNING)
1887
1888   def _VerifyGroupLVM(self, node_image, vg_name):
1889     """Check cross-node consistency in LVM.
1890
1891     @type node_image: dict
1892     @param node_image: info about nodes, mapping from node to names to
1893       L{NodeImage} objects
1894     @param vg_name: the configured VG name
1895
1896     """
1897     if vg_name is None:
1898       return
1899
1900     # Only exclusive storage needs this kind of checks
1901     if not self._exclusive_storage:
1902       return
1903
1904     # exclusive_storage wants all PVs to have the same size (approximately),
1905     # if the smallest and the biggest ones are okay, everything is fine.
1906     # pv_min is None iff pv_max is None
1907     vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1908     if not vals:
1909       return
1910     (pvmin, minnode_uuid) = min((ni.pv_min, ni.uuid) for ni in vals)
1911     (pvmax, maxnode_uuid) = max((ni.pv_max, ni.uuid) for ni in vals)
1912     bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1913     self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1914                   "PV sizes differ too much in the group; smallest (%s MB) is"
1915                   " on %s, biggest (%s MB) is on %s",
1916                   pvmin, self.cfg.GetNodeName(minnode_uuid),
1917                   pvmax, self.cfg.GetNodeName(maxnode_uuid))
1918
1919   def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1920     """Check the node bridges.
1921
1922     @type ninfo: L{objects.Node}
1923     @param ninfo: the node to check
1924     @param nresult: the remote results for the node
1925     @param bridges: the expected list of bridges
1926
1927     """
1928     if not bridges:
1929       return
1930
1931     missing = nresult.get(constants.NV_BRIDGES, None)
1932     test = not isinstance(missing, list)
1933     self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1934                   "did not return valid bridge information")
1935     if not test:
1936       self._ErrorIf(bool(missing), constants.CV_ENODENET, ninfo.name,
1937                     "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1938
1939   def _VerifyNodeUserScripts(self, ninfo, nresult):
1940     """Check the results of user scripts presence and executability on the node
1941
1942     @type ninfo: L{objects.Node}
1943     @param ninfo: the node to check
1944     @param nresult: the remote results for the node
1945
1946     """
1947     test = not constants.NV_USERSCRIPTS in nresult
1948     self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1949                   "did not return user scripts information")
1950
1951     broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
1952     if not test:
1953       self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1954                     "user scripts not present or not executable: %s" %
1955                     utils.CommaJoin(sorted(broken_scripts)))
1956
1957   def _VerifyNodeNetwork(self, ninfo, nresult):
1958     """Check the node network connectivity results.
1959
1960     @type ninfo: L{objects.Node}
1961     @param ninfo: the node to check
1962     @param nresult: the remote results for the node
1963
1964     """
1965     test = constants.NV_NODELIST not in nresult
1966     self._ErrorIf(test, constants.CV_ENODESSH, ninfo.name,
1967                   "node hasn't returned node ssh connectivity data")
1968     if not test:
1969       if nresult[constants.NV_NODELIST]:
1970         for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1971           self._ErrorIf(True, constants.CV_ENODESSH, ninfo.name,
1972                         "ssh communication with node '%s': %s", a_node, a_msg)
1973
1974     test = constants.NV_NODENETTEST not in nresult
1975     self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1976                   "node hasn't returned node tcp connectivity data")
1977     if not test:
1978       if nresult[constants.NV_NODENETTEST]:
1979         nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1980         for anode in nlist:
1981           self._ErrorIf(True, constants.CV_ENODENET, ninfo.name,
1982                         "tcp communication with node '%s': %s",
1983                         anode, nresult[constants.NV_NODENETTEST][anode])
1984
1985     test = constants.NV_MASTERIP not in nresult
1986     self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1987                   "node hasn't returned node master IP reachability data")
1988     if not test:
1989       if not nresult[constants.NV_MASTERIP]:
1990         if ninfo.uuid == self.master_node:
1991           msg = "the master node cannot reach the master IP (not configured?)"
1992         else:
1993           msg = "cannot reach the master IP"
1994         self._ErrorIf(True, constants.CV_ENODENET, ninfo.name, msg)
1995
1996   def _VerifyInstance(self, instance, node_image, diskstatus):
1997     """Verify an instance.
1998
1999     This function checks to see if the required block devices are
2000     available on the instance's node, and that the nodes are in the correct
2001     state.
2002
2003     """
2004     pnode_uuid = instance.primary_node
2005     pnode_img = node_image[pnode_uuid]
2006     groupinfo = self.cfg.GetAllNodeGroupsInfo()
2007
2008     node_vol_should = {}
2009     instance.MapLVsByNode(node_vol_should)
2010
2011     cluster = self.cfg.GetClusterInfo()
2012     ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2013                                                             self.group_info)
2014     err = ComputeIPolicyInstanceViolation(ipolicy, instance, self.cfg)
2015     self._ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance.name,
2016                   utils.CommaJoin(err), code=self.ETYPE_WARNING)
2017
2018     for node_uuid in node_vol_should:
2019       n_img = node_image[node_uuid]
2020       if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
2021         # ignore missing volumes on offline or broken nodes
2022         continue
2023       for volume in node_vol_should[node_uuid]:
2024         test = volume not in n_img.volumes
2025         self._ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance.name,
2026                       "volume %s missing on node %s", volume,
2027                       self.cfg.GetNodeName(node_uuid))
2028
2029     if instance.admin_state == constants.ADMINST_UP:
2030       test = instance.uuid not in pnode_img.instances and not pnode_img.offline
2031       self._ErrorIf(test, constants.CV_EINSTANCEDOWN, instance.name,
2032                     "instance not running on its primary node %s",
2033                      self.cfg.GetNodeName(pnode_uuid))
2034       self._ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE,
2035                     instance.name, "instance is marked as running and lives on"
2036                     " offline node %s", self.cfg.GetNodeName(pnode_uuid))
2037
2038     diskdata = [(nname, success, status, idx)
2039                 for (nname, disks) in diskstatus.items()
2040                 for idx, (success, status) in enumerate(disks)]
2041
2042     for nname, success, bdev_status, idx in diskdata:
2043       # the 'ghost node' construction in Exec() ensures that we have a
2044       # node here
2045       snode = node_image[nname]
2046       bad_snode = snode.ghost or snode.offline
2047       self._ErrorIf(instance.disks_active and
2048                     not success and not bad_snode,
2049                     constants.CV_EINSTANCEFAULTYDISK, instance.name,
2050                     "couldn't retrieve status for disk/%s on %s: %s",
2051                     idx, self.cfg.GetNodeName(nname), bdev_status)
2052
2053       if instance.disks_active and success and \
2054          (bdev_status.is_degraded or
2055           bdev_status.ldisk_status != constants.LDS_OKAY):
2056         msg = "disk/%s on %s" % (idx, self.cfg.GetNodeName(nname))
2057         if bdev_status.is_degraded:
2058           msg += " is degraded"
2059         if bdev_status.ldisk_status != constants.LDS_OKAY:
2060           msg += "; state is '%s'" % \
2061                  constants.LDS_NAMES[bdev_status.ldisk_status]
2062
2063         self._Error(constants.CV_EINSTANCEFAULTYDISK, instance.name, msg)
2064
2065     self._ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2066                   constants.CV_ENODERPC, self.cfg.GetNodeName(pnode_uuid),
2067                   "instance %s, connection to primary node failed",
2068                   instance.name)
2069
2070     self._ErrorIf(len(instance.secondary_nodes) > 1,
2071                   constants.CV_EINSTANCELAYOUT, instance.name,
2072                   "instance has multiple secondary nodes: %s",
2073                   utils.CommaJoin(instance.secondary_nodes),
2074                   code=self.ETYPE_WARNING)
2075
2076     es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, instance.all_nodes)
2077     if any(es_flags.values()):
2078       if instance.disk_template not in constants.DTS_EXCL_STORAGE:
2079         # Disk template not compatible with exclusive_storage: no instance
2080         # node should have the flag set
2081         es_nodes = [n
2082                     for (n, es) in es_flags.items()
2083                     if es]
2084         self._Error(constants.CV_EINSTANCEUNSUITABLENODE, instance.name,
2085                     "instance has template %s, which is not supported on nodes"
2086                     " that have exclusive storage set: %s",
2087                     instance.disk_template,
2088                     utils.CommaJoin(self.cfg.GetNodeNames(es_nodes)))
2089       for (idx, disk) in enumerate(instance.disks):
2090         self._ErrorIf(disk.spindles is None,
2091                       constants.CV_EINSTANCEMISSINGCFGPARAMETER, instance.name,
2092                       "number of spindles not configured for disk %s while"
2093                       " exclusive storage is enabled, try running"
2094                       " gnt-cluster repair-disk-sizes", idx)
2095
2096     if instance.disk_template in constants.DTS_INT_MIRROR:
2097       instance_nodes = utils.NiceSort(instance.all_nodes)
2098       instance_groups = {}
2099
2100       for node_uuid in instance_nodes:
2101         instance_groups.setdefault(self.all_node_info[node_uuid].group,
2102                                    []).append(node_uuid)
2103
2104       pretty_list = [
2105         "%s (group %s)" % (utils.CommaJoin(self.cfg.GetNodeNames(nodes)),
2106                            groupinfo[group].name)
2107         # Sort so that we always list the primary node first.
2108         for group, nodes in sorted(instance_groups.items(),
2109                                    key=lambda (_, nodes): pnode_uuid in nodes,
2110                                    reverse=True)]
2111
2112       self._ErrorIf(len(instance_groups) > 1,
2113                     constants.CV_EINSTANCESPLITGROUPS,
2114                     instance.name, "instance has primary and secondary nodes in"
2115                     " different groups: %s", utils.CommaJoin(pretty_list),
2116                     code=self.ETYPE_WARNING)
2117
2118     inst_nodes_offline = []
2119     for snode in instance.secondary_nodes:
2120       s_img = node_image[snode]
2121       self._ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
2122                     self.cfg.GetNodeName(snode),
2123                     "instance %s, connection to secondary node failed",
2124                     instance.name)
2125
2126       if s_img.offline:
2127         inst_nodes_offline.append(snode)
2128
2129     # warn that the instance lives on offline nodes
2130     self._ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE,
2131                   instance.name, "instance has offline secondary node(s) %s",
2132                   utils.CommaJoin(self.cfg.GetNodeNames(inst_nodes_offline)))
2133     # ... or ghost/non-vm_capable nodes
2134     for node_uuid in instance.all_nodes:
2135       self._ErrorIf(node_image[node_uuid].ghost, constants.CV_EINSTANCEBADNODE,
2136                     instance.name, "instance lives on ghost node %s",
2137                     self.cfg.GetNodeName(node_uuid))
2138       self._ErrorIf(not node_image[node_uuid].vm_capable,
2139                     constants.CV_EINSTANCEBADNODE, instance.name,
2140                     "instance lives on non-vm_capable node %s",
2141                     self.cfg.GetNodeName(node_uuid))
2142
2143   def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2144     """Verify if there are any unknown volumes in the cluster.
2145
2146     The .os, .swap and backup volumes are ignored. All other volumes are
2147     reported as unknown.
2148
2149     @type reserved: L{ganeti.utils.FieldSet}
2150     @param reserved: a FieldSet of reserved volume names
2151
2152     """
2153     for node_uuid, n_img in node_image.items():
2154       if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
2155           self.all_node_info[node_uuid].group != self.group_uuid):
2156         # skip non-healthy nodes
2157         continue
2158       for volume in n_img.volumes:
2159         test = ((node_uuid not in node_vol_should or
2160                 volume not in node_vol_should[node_uuid]) and
2161                 not reserved.Matches(volume))
2162         self._ErrorIf(test, constants.CV_ENODEORPHANLV,
2163                       self.cfg.GetNodeName(node_uuid),
2164                       "volume %s is unknown", volume)
2165
2166   def _VerifyNPlusOneMemory(self, node_image, all_insts):
2167     """Verify N+1 Memory Resilience.
2168
2169     Check that if one single node dies we can still start all the
2170     instances it was primary for.
2171
2172     """
2173     cluster_info = self.cfg.GetClusterInfo()
2174     for node_uuid, n_img in node_image.items():
2175       # This code checks that every node which is now listed as
2176       # secondary has enough memory to host all instances it is
2177       # supposed to should a single other node in the cluster fail.
2178       # FIXME: not ready for failover to an arbitrary node
2179       # FIXME: does not support file-backed instances
2180       # WARNING: we currently take into account down instances as well
2181       # as up ones, considering that even if they're down someone
2182       # might want to start them even in the event of a node failure.
2183       if n_img.offline or \
2184          self.all_node_info[node_uuid].group != self.group_uuid:
2185         # we're skipping nodes marked offline and nodes in other groups from
2186         # the N+1 warning, since most likely we don't have good memory
2187         # information from them; we already list instances living on such
2188         # nodes, and that's enough warning
2189         continue
2190       #TODO(dynmem): also consider ballooning out other instances
2191       for prinode, inst_uuids in n_img.sbp.items():
2192         needed_mem = 0
2193         for inst_uuid in inst_uuids:
2194           bep = cluster_info.FillBE(all_insts[inst_uuid])
2195           if bep[constants.BE_AUTO_BALANCE]:
2196             needed_mem += bep[constants.BE_MINMEM]
2197         test = n_img.mfree < needed_mem
2198         self._ErrorIf(test, constants.CV_ENODEN1,
2199                       self.cfg.GetNodeName(node_uuid),
2200                       "not enough memory to accomodate instance failovers"
2201                       " should node %s fail (%dMiB needed, %dMiB available)",
2202                       self.cfg.GetNodeName(prinode), needed_mem, n_img.mfree)
2203
2204   def _VerifyFiles(self, nodes, master_node_uuid, all_nvinfo,
2205                    (files_all, files_opt, files_mc, files_vm)):
2206     """Verifies file checksums collected from all nodes.
2207
2208     @param nodes: List of L{objects.Node} objects
2209     @param master_node_uuid: UUID of master node
2210     @param all_nvinfo: RPC results
2211
2212     """
2213     # Define functions determining which nodes to consider for a file
2214     files2nodefn = [
2215       (files_all, None),
2216       (files_mc, lambda node: (node.master_candidate or
2217                                node.uuid == master_node_uuid)),
2218       (files_vm, lambda node: node.vm_capable),
2219       ]
2220
2221     # Build mapping from filename to list of nodes which should have the file
2222     nodefiles = {}
2223     for (files, fn) in files2nodefn:
2224       if fn is None:
2225         filenodes = nodes
2226       else:
2227         filenodes = filter(fn, nodes)
2228       nodefiles.update((filename,
2229                         frozenset(map(operator.attrgetter("uuid"), filenodes)))
2230                        for filename in files)
2231
2232     assert set(nodefiles) == (files_all | files_mc | files_vm)
2233
2234     fileinfo = dict((filename, {}) for filename in nodefiles)
2235     ignore_nodes = set()
2236
2237     for node in nodes:
2238       if node.offline:
2239         ignore_nodes.add(node.uuid)
2240         continue
2241
2242       nresult = all_nvinfo[node.uuid]
2243
2244       if nresult.fail_msg or not nresult.payload:
2245         node_files = None
2246       else:
2247         fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
2248         node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2249                           for (key, value) in fingerprints.items())
2250         del fingerprints
2251
2252       test = not (node_files and isinstance(node_files, dict))
2253       self._ErrorIf(test, constants.CV_ENODEFILECHECK, node.name,
2254                     "Node did not return file checksum data")
2255       if test:
2256         ignore_nodes.add(node.uuid)
2257         continue
2258
2259       # Build per-checksum mapping from filename to nodes having it
2260       for (filename, checksum) in node_files.items():
2261         assert filename in nodefiles
2262         fileinfo[filename].setdefault(checksum, set()).add(node.uuid)
2263
2264     for (filename, checksums) in fileinfo.items():
2265       assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2266
2267       # Nodes having the file
2268       with_file = frozenset(node_uuid
2269                             for node_uuids in fileinfo[filename].values()
2270                             for node_uuid in node_uuids) - ignore_nodes
2271
2272       expected_nodes = nodefiles[filename] - ignore_nodes
2273
2274       # Nodes missing file
2275       missing_file = expected_nodes - with_file
2276
2277       if filename in files_opt:
2278         # All or no nodes
2279         self._ErrorIf(missing_file and missing_file != expected_nodes,
2280                       constants.CV_ECLUSTERFILECHECK, None,
2281                       "File %s is optional, but it must exist on all or no"
2282                       " nodes (not found on %s)",
2283                       filename,
2284                       utils.CommaJoin(
2285                         utils.NiceSort(
2286                           map(self.cfg.GetNodeName, missing_file))))
2287       else:
2288         self._ErrorIf(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2289                       "File %s is missing from node(s) %s", filename,
2290                       utils.CommaJoin(
2291                         utils.NiceSort(
2292                           map(self.cfg.GetNodeName, missing_file))))
2293
2294         # Warn if a node has a file it shouldn't
2295         unexpected = with_file - expected_nodes
2296         self._ErrorIf(unexpected,
2297                       constants.CV_ECLUSTERFILECHECK, None,
2298                       "File %s should not exist on node(s) %s",
2299                       filename, utils.CommaJoin(
2300                         utils.NiceSort(map(self.cfg.GetNodeName, unexpected))))
2301
2302       # See if there are multiple versions of the file
2303       test = len(checksums) > 1
2304       if test:
2305         variants = ["variant %s on %s" %
2306                     (idx + 1,
2307                      utils.CommaJoin(utils.NiceSort(
2308                        map(self.cfg.GetNodeName, node_uuids))))
2309                     for (idx, (checksum, node_uuids)) in
2310                       enumerate(sorted(checksums.items()))]
2311       else:
2312         variants = []
2313
2314       self._ErrorIf(test, constants.CV_ECLUSTERFILECHECK, None,
2315                     "File %s found with %s different checksums (%s)",
2316                     filename, len(checksums), "; ".join(variants))
2317
2318   def _VerifyNodeDrbdHelper(self, ninfo, nresult, drbd_helper):
2319     """Verify the drbd helper.
2320
2321     """
2322     if drbd_helper:
2323       helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2324       test = (helper_result is None)
2325       self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2326                     "no drbd usermode helper returned")
2327       if helper_result:
2328         status, payload = helper_result
2329         test = not status
2330         self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2331                       "drbd usermode helper check unsuccessful: %s", payload)
2332         test = status and (payload != drbd_helper)
2333         self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2334                       "wrong drbd usermode helper: %s", payload)
2335
2336   def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2337                       drbd_map):
2338     """Verifies and the node DRBD status.
2339
2340     @type ninfo: L{objects.Node}
2341     @param ninfo: the node to check
2342     @param nresult: the remote results for the node
2343     @param instanceinfo: the dict of instances
2344     @param drbd_helper: the configured DRBD usermode helper
2345     @param drbd_map: the DRBD map as returned by
2346         L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2347
2348     """
2349     self._VerifyNodeDrbdHelper(ninfo, nresult, drbd_helper)
2350
2351     # compute the DRBD minors
2352     node_drbd = {}
2353     for minor, inst_uuid in drbd_map[ninfo.uuid].items():
2354       test = inst_uuid not in instanceinfo
2355       self._ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2356                     "ghost instance '%s' in temporary DRBD map", inst_uuid)
2357         # ghost instance should not be running, but otherwise we
2358         # don't give double warnings (both ghost instance and
2359         # unallocated minor in use)
2360       if test:
2361         node_drbd[minor] = (inst_uuid, False)
2362       else:
2363         instance = instanceinfo[inst_uuid]
2364         node_drbd[minor] = (inst_uuid, instance.disks_active)
2365
2366     # and now check them
2367     used_minors = nresult.get(constants.NV_DRBDLIST, [])
2368     test = not isinstance(used_minors, (tuple, list))
2369     self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2370                   "cannot parse drbd status file: %s", str(used_minors))
2371     if test:
2372       # we cannot check drbd status
2373       return
2374
2375     for minor, (inst_uuid, must_exist) in node_drbd.items():
2376       test = minor not in used_minors and must_exist
2377       self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2378                     "drbd minor %d of instance %s is not active", minor,
2379                     self.cfg.GetInstanceName(inst_uuid))
2380     for minor in used_minors:
2381       test = minor not in node_drbd
2382       self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2383                     "unallocated drbd minor %d is in use", minor)
2384
2385   def _UpdateNodeOS(self, ninfo, nresult, nimg):
2386     """Builds the node OS structures.
2387
2388     @type ninfo: L{objects.Node}
2389     @param ninfo: the node to check
2390     @param nresult: the remote results for the node
2391     @param nimg: the node image object
2392
2393     """
2394     remote_os = nresult.get(constants.NV_OSLIST, None)
2395     test = (not isinstance(remote_os, list) or
2396             not compat.all(isinstance(v, list) and len(v) == 7
2397                            for v in remote_os))
2398
2399     self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2400                   "node hasn't returned valid OS data")
2401
2402     nimg.os_fail = test
2403
2404     if test:
2405       return
2406
2407     os_dict = {}
2408
2409     for (name, os_path, status, diagnose,
2410          variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2411
2412       if name not in os_dict:
2413         os_dict[name] = []
2414
2415       # parameters is a list of lists instead of list of tuples due to
2416       # JSON lacking a real tuple type, fix it:
2417       parameters = [tuple(v) for v in parameters]
2418       os_dict[name].append((os_path, status, diagnose,
2419                             set(variants), set(parameters), set(api_ver)))
2420
2421     nimg.oslist = os_dict
2422
2423   def _VerifyNodeOS(self, ninfo, nimg, base):
2424     """Verifies the node OS list.
2425
2426     @type ninfo: L{objects.Node}
2427     @param ninfo: the node to check
2428     @param nimg: the node image object
2429     @param base: the 'template' node we match against (e.g. from the master)
2430
2431     """
2432     assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2433
2434     beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2435     for os_name, os_data in nimg.oslist.items():
2436       assert os_data, "Empty OS status for OS %s?!" % os_name
2437       f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2438       self._ErrorIf(not f_status, constants.CV_ENODEOS, ninfo.name,
2439                     "Invalid OS %s (located at %s): %s",
2440                     os_name, f_path, f_diag)
2441       self._ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, ninfo.name,
2442                     "OS '%s' has multiple entries"
2443                     " (first one shadows the rest): %s",
2444                     os_name, utils.CommaJoin([v[0] for v in os_data]))
2445       # comparisons with the 'base' image
2446       test = os_name not in base.oslist
2447       self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2448                     "Extra OS %s not present on reference node (%s)",
2449                     os_name, self.cfg.GetNodeName(base.uuid))
2450       if test:
2451         continue
2452       assert base.oslist[os_name], "Base node has empty OS status?"
2453       _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2454       if not b_status:
2455         # base OS is invalid, skipping
2456         continue
2457       for kind, a, b in [("API version", f_api, b_api),
2458                          ("variants list", f_var, b_var),
2459                          ("parameters", beautify_params(f_param),
2460                           beautify_params(b_param))]:
2461         self._ErrorIf(a != b, constants.CV_ENODEOS, ninfo.name,
2462                       "OS %s for %s differs from reference node %s:"
2463                       " [%s] vs. [%s]", kind, os_name,
2464                       self.cfg.GetNodeName(base.uuid),
2465                       utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2466
2467     # check any missing OSes
2468     missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2469     self._ErrorIf(missing, constants.CV_ENODEOS, ninfo.name,
2470                   "OSes present on reference node %s"
2471                   " but missing on this node: %s",
2472                   self.cfg.GetNodeName(base.uuid), utils.CommaJoin(missing))
2473
2474   def _VerifyAcceptedFileStoragePaths(self, ninfo, nresult, is_master):
2475     """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2476
2477     @type ninfo: L{objects.Node}
2478     @param ninfo: the node to check
2479     @param nresult: the remote results for the node
2480     @type is_master: bool
2481     @param is_master: Whether node is the master node
2482
2483     """
2484     cluster = self.cfg.GetClusterInfo()
2485     if (is_master and
2486         (cluster.IsFileStorageEnabled() or
2487          cluster.IsSharedFileStorageEnabled())):
2488       try:
2489         fspaths = nresult[constants.NV_ACCEPTED_STORAGE_PATHS]
2490       except KeyError:
2491         # This should never happen
2492         self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2493                       "Node did not return forbidden file storage paths")
2494       else:
2495         self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2496                       "Found forbidden file storage paths: %s",
2497                       utils.CommaJoin(fspaths))
2498     else:
2499       self._ErrorIf(constants.NV_ACCEPTED_STORAGE_PATHS in nresult,
2500                     constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2501                     "Node should not have returned forbidden file storage"
2502                     " paths")
2503
2504   def _VerifyStoragePaths(self, ninfo, nresult, file_disk_template,
2505                           verify_key, error_key):
2506     """Verifies (file) storage paths.
2507
2508     @type ninfo: L{objects.Node}
2509     @param ninfo: the node to check
2510     @param nresult: the remote results for the node
2511     @type file_disk_template: string
2512     @param file_disk_template: file-based disk template, whose directory
2513         is supposed to be verified
2514     @type verify_key: string
2515     @param verify_key: key for the verification map of this file
2516         verification step
2517     @param error_key: error key to be added to the verification results
2518         in case something goes wrong in this verification step
2519
2520     """
2521     assert (file_disk_template in
2522             utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
2523     cluster = self.cfg.GetClusterInfo()
2524     if cluster.IsDiskTemplateEnabled(file_disk_template):
2525       self._ErrorIf(
2526           verify_key in nresult,
2527           error_key, ninfo.name,
2528           "The configured %s storage path is unusable: %s" %
2529           (file_disk_template, nresult.get(verify_key)))
2530
2531   def _VerifyFileStoragePaths(self, ninfo, nresult):
2532     """Verifies (file) storage paths.
2533
2534     @see: C{_VerifyStoragePaths}
2535
2536     """
2537     self._VerifyStoragePaths(
2538         ninfo, nresult, constants.DT_FILE,
2539         constants.NV_FILE_STORAGE_PATH,
2540         constants.CV_ENODEFILESTORAGEPATHUNUSABLE)
2541
2542   def _VerifySharedFileStoragePaths(self, ninfo, nresult):
2543     """Verifies (file) storage paths.
2544
2545     @see: C{_VerifyStoragePaths}
2546
2547     """
2548     self._VerifyStoragePaths(
2549         ninfo, nresult, constants.DT_SHARED_FILE,
2550         constants.NV_SHARED_FILE_STORAGE_PATH,
2551         constants.CV_ENODESHAREDFILESTORAGEPATHUNUSABLE)
2552
2553   def _VerifyOob(self, ninfo, nresult):
2554     """Verifies out of band functionality of a node.
2555
2556     @type ninfo: L{objects.Node}
2557     @param ninfo: the node to check
2558     @param nresult: the remote results for the node
2559
2560     """
2561     # We just have to verify the paths on master and/or master candidates
2562     # as the oob helper is invoked on the master
2563     if ((ninfo.master_candidate or ninfo.master_capable) and
2564         constants.NV_OOB_PATHS in nresult):
2565       for path_result in nresult[constants.NV_OOB_PATHS]:
2566         self._ErrorIf(path_result, constants.CV_ENODEOOBPATH,
2567                       ninfo.name, path_result)
2568
2569   def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2570     """Verifies and updates the node volume data.
2571
2572     This function will update a L{NodeImage}'s internal structures
2573     with data from the remote call.
2574
2575     @type ninfo: L{objects.Node}
2576     @param ninfo: the node to check
2577     @param nresult: the remote results for the node
2578     @param nimg: the node image object
2579     @param vg_name: the configured VG name
2580
2581     """
2582     nimg.lvm_fail = True
2583     lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2584     if vg_name is None:
2585       pass
2586     elif isinstance(lvdata, basestring):
2587       self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2588                     "LVM problem on node: %s", utils.SafeEncode(lvdata))
2589     elif not isinstance(lvdata, dict):
2590       self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2591                     "rpc call to node failed (lvlist)")
2592     else:
2593       nimg.volumes = lvdata
2594       nimg.lvm_fail = False
2595
2596   def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2597     """Verifies and updates the node instance list.
2598
2599     If the listing was successful, then updates this node's instance
2600     list. Otherwise, it marks the RPC call as failed for the instance
2601     list key.
2602
2603     @type ninfo: L{objects.Node}
2604     @param ninfo: the node to check
2605     @param nresult: the remote results for the node
2606     @param nimg: the node image object
2607
2608     """
2609     idata = nresult.get(constants.NV_INSTANCELIST, None)
2610     test = not isinstance(idata, list)
2611     self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2612                   "rpc call to node failed (instancelist): %s",
2613                   utils.SafeEncode(str(idata)))
2614     if test:
2615       nimg.hyp_fail = True
2616     else:
2617       nimg.instances = [inst.uuid for (_, inst) in
2618                         self.cfg.GetMultiInstanceInfoByName(idata)]
2619
2620   def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2621     """Verifies and computes a node information map
2622
2623     @type ninfo: L{objects.Node}
2624     @param ninfo: the node to check
2625     @param nresult: the remote results for the node
2626     @param nimg: the node image object
2627     @param vg_name: the configured VG name
2628
2629     """
2630     # try to read free memory (from the hypervisor)
2631     hv_info = nresult.get(constants.NV_HVINFO, None)
2632     test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2633     self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2634                   "rpc call to node failed (hvinfo)")
2635     if not test:
2636       try:
2637         nimg.mfree = int(hv_info["memory_free"])
2638       except (ValueError, TypeError):
2639         self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2640                       "node returned invalid nodeinfo, check hypervisor")
2641
2642     # FIXME: devise a free space model for file based instances as well
2643     if vg_name is not None:
2644       test = (constants.NV_VGLIST not in nresult or
2645               vg_name not in nresult[constants.NV_VGLIST])
2646       self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
2647                     "node didn't return data for the volume group '%s'"
2648                     " - it is either missing or broken", vg_name)
2649       if not test:
2650         try:
2651           nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2652         except (ValueError, TypeError):
2653           self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2654                         "node returned invalid LVM info, check LVM status")
2655
2656   def _CollectDiskInfo(self, node_uuids, node_image, instanceinfo):
2657     """Gets per-disk status information for all instances.
2658
2659     @type node_uuids: list of strings
2660     @param node_uuids: Node UUIDs
2661     @type node_image: dict of (UUID, L{objects.Node})
2662     @param node_image: Node objects
2663     @type instanceinfo: dict of (UUID, L{objects.Instance})
2664     @param instanceinfo: Instance objects
2665     @rtype: {instance: {node: [(succes, payload)]}}
2666     @return: a dictionary of per-instance dictionaries with nodes as
2667         keys and disk information as values; the disk information is a
2668         list of tuples (success, payload)
2669
2670     """
2671     node_disks = {}
2672     node_disks_dev_inst_only = {}
2673     diskless_instances = set()
2674     diskless = constants.DT_DISKLESS
2675
2676     for nuuid in node_uuids:
2677       node_inst_uuids = list(itertools.chain(node_image[nuuid].pinst,
2678                                              node_image[nuuid].sinst))
2679       diskless_instances.update(uuid for uuid in node_inst_uuids
2680                                 if instanceinfo[uuid].disk_template == diskless)
2681       disks = [(inst_uuid, disk)
2682                for inst_uuid in node_inst_uuids
2683                for disk in instanceinfo[inst_uuid].disks]
2684
2685       if not disks:
2686         # No need to collect data
2687         continue
2688
2689       node_disks[nuuid] = disks
2690
2691       # _AnnotateDiskParams makes already copies of the disks
2692       dev_inst_only = []
2693       for (inst_uuid, dev) in disks:
2694         (anno_disk,) = AnnotateDiskParams(instanceinfo[inst_uuid], [dev],
2695                                           self.cfg)
2696         dev_inst_only.append((anno_disk, instanceinfo[inst_uuid]))
2697
2698       node_disks_dev_inst_only[nuuid] = dev_inst_only
2699
2700     assert len(node_disks) == len(node_disks_dev_inst_only)
2701
2702     # Collect data from all nodes with disks
2703     result = self.rpc.call_blockdev_getmirrorstatus_multi(
2704                node_disks.keys(), node_disks_dev_inst_only)
2705
2706     assert len(result) == len(node_disks)
2707
2708     instdisk = {}
2709
2710     for (nuuid, nres) in result.items():
2711       node = self.cfg.GetNodeInfo(nuuid)
2712       disks = node_disks[node.uuid]
2713
2714       if nres.offline:
2715         # No data from this node
2716         data = len(disks) * [(False, "node offline")]
2717       else:
2718         msg = nres.fail_msg
2719         self._ErrorIf(msg, constants.CV_ENODERPC, node.name,
2720                       "while getting disk information: %s", msg)
2721         if msg:
2722           # No data from this node
2723           data = len(disks) * [(False, msg)]
2724         else:
2725           data = []
2726           for idx, i in enumerate(nres.payload):
2727             if isinstance(i, (tuple, list)) and len(i) == 2:
2728               data.append(i)
2729             else:
2730               logging.warning("Invalid result from node %s, entry %d: %s",
2731                               node.name, idx, i)
2732               data.append((False, "Invalid result from the remote node"))
2733
2734       for ((inst_uuid, _), status) in zip(disks, data):
2735         instdisk.setdefault(inst_uuid, {}).setdefault(node.uuid, []) \
2736           .append(status)
2737
2738     # Add empty entries for diskless instances.
2739     for inst_uuid in diskless_instances:
2740       assert inst_uuid not in instdisk
2741       instdisk[inst_uuid] = {}
2742
2743     assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2744                       len(nuuids) <= len(instanceinfo[inst].all_nodes) and
2745                       compat.all(isinstance(s, (tuple, list)) and
2746                                  len(s) == 2 for s in statuses)
2747                       for inst, nuuids in instdisk.items()
2748                       for nuuid, statuses in nuuids.items())
2749     if __debug__:
2750       instdisk_keys = set(instdisk)
2751       instanceinfo_keys = set(instanceinfo)
2752       assert instdisk_keys == instanceinfo_keys, \
2753         ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2754          (instdisk_keys, instanceinfo_keys))
2755
2756     return instdisk
2757
2758   @staticmethod
2759   def _SshNodeSelector(group_uuid, all_nodes):
2760     """Create endless iterators for all potential SSH check hosts.
2761
2762     """
2763     nodes = [node for node in all_nodes
2764              if (node.group != group_uuid and
2765                  not node.offline)]
2766     keyfunc = operator.attrgetter("group")
2767
2768     return map(itertools.cycle,
2769                [sorted(map(operator.attrgetter("name"), names))
2770                 for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2771                                                   keyfunc)])
2772
2773   @classmethod
2774   def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2775     """Choose which nodes should talk to which other nodes.
2776
2777     We will make nodes contact all nodes in their group, and one node from
2778     every other group.
2779
2780     @warning: This algorithm has a known issue if one node group is much
2781       smaller than others (e.g. just one node). In such a case all other
2782       nodes will talk to the single node.
2783
2784     """
2785     online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2786     sel = cls._SshNodeSelector(group_uuid, all_nodes)
2787
2788     return (online_nodes,
2789             dict((name, sorted([i.next() for i in sel]))
2790                  for name in online_nodes))
2791
2792   def BuildHooksEnv(self):
2793     """Build hooks env.
2794
2795     Cluster-Verify hooks just ran in the post phase and their failure makes
2796     the output be logged in the verify output and the verification to fail.
2797
2798     """
2799     env = {
2800       "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2801       }
2802
2803     env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2804                for node in self.my_node_info.values())
2805
2806     return env
2807
2808   def BuildHooksNodes(self):
2809     """Build hooks nodes.
2810
2811     """
2812     return ([], list(self.my_node_info.keys()))
2813
2814   def Exec(self, feedback_fn):
2815     """Verify integrity of the node group, performing various test on nodes.
2816
2817     """
2818     # This method has too many local variables. pylint: disable=R0914
2819     feedback_fn("* Verifying group '%s'" % self.group_info.name)
2820
2821     if not self.my_node_uuids:
2822       # empty node group
2823       feedback_fn("* Empty node group, skipping verification")
2824       return True
2825
2826     self.bad = False
2827     verbose = self.op.verbose
2828     self._feedback_fn = feedback_fn
2829
2830     vg_name = self.cfg.GetVGName()
2831     drbd_helper = self.cfg.GetDRBDHelper()
2832     cluster = self.cfg.GetClusterInfo()
2833     hypervisors = cluster.enabled_hypervisors
2834     node_data_list = self.my_node_info.values()
2835
2836     i_non_redundant = [] # Non redundant instances
2837     i_non_a_balanced = [] # Non auto-balanced instances
2838     i_offline = 0 # Count of offline instances
2839     n_offline = 0 # Count of offline nodes
2840     n_drained = 0 # Count of nodes being drained
2841     node_vol_should = {}
2842
2843     # FIXME: verify OS list
2844
2845     # File verification
2846     filemap = ComputeAncillaryFiles(cluster, False)
2847
2848     # do local checksums
2849     master_node_uuid = self.master_node = self.cfg.GetMasterNode()
2850     master_ip = self.cfg.GetMasterIP()
2851
2852     feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_uuids))
2853
2854     user_scripts = []
2855     if self.cfg.GetUseExternalMipScript():
2856       user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2857
2858     node_verify_param = {
2859       constants.NV_FILELIST:
2860         map(vcluster.MakeVirtualPath,
2861             utils.UniqueSequence(filename
2862                                  for files in filemap
2863                                  for filename in files)),
2864       constants.NV_NODELIST:
2865         self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2866                                   self.all_node_info.values()),
2867       constants.NV_HYPERVISOR: hypervisors,
2868       constants.NV_HVPARAMS:
2869         _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2870       constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2871                                  for node in node_data_list
2872                                  if not node.offline],
2873       constants.NV_INSTANCELIST: hypervisors,
2874       constants.NV_VERSION: None,
2875       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2876       constants.NV_NODESETUP: None,
2877       constants.NV_TIME: None,
2878       constants.NV_MASTERIP: (self.cfg.GetMasterNodeName(), master_ip),
2879       constants.NV_OSLIST: None,
2880       constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2881       constants.NV_USERSCRIPTS: user_scripts,
2882       }
2883
2884     if vg_name is not None:
2885       node_verify_param[constants.NV_VGLIST] = None
2886       node_verify_param[constants.NV_LVLIST] = vg_name
2887       node_verify_param[constants.NV_PVLIST] = [vg_name]
2888
2889     if cluster.IsDiskTemplateEnabled(constants.DT_DRBD8):
2890       if drbd_helper:
2891         node_verify_param[constants.NV_DRBDVERSION] = None
2892         node_verify_param[constants.NV_DRBDLIST] = None
2893         node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2894
2895     if cluster.IsFileStorageEnabled() or \
2896         cluster.IsSharedFileStorageEnabled():
2897       # Load file storage paths only from master node
2898       node_verify_param[constants.NV_ACCEPTED_STORAGE_PATHS] = \
2899         self.cfg.GetMasterNodeName()
2900       if cluster.IsFileStorageEnabled():
2901         node_verify_param[constants.NV_FILE_STORAGE_PATH] = \
2902           cluster.file_storage_dir
2903
2904     # bridge checks
2905     # FIXME: this needs to be changed per node-group, not cluster-wide
2906     bridges = set()
2907     default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2908     if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2909       bridges.add(default_nicpp[constants.NIC_LINK])
2910     for inst_uuid in self.my_inst_info.values():
2911       for nic in inst_uuid.nics:
2912         full_nic = cluster.SimpleFillNIC(nic.nicparams)
2913         if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2914           bridges.add(full_nic[constants.NIC_LINK])
2915
2916     if bridges:
2917       node_verify_param[constants.NV_BRIDGES] = list(bridges)
2918
2919     # Build our expected cluster state
2920     node_image = dict((node.uuid, self.NodeImage(offline=node.offline,
2921                                                  uuid=node.uuid,
2922                                                  vm_capable=node.vm_capable))
2923                       for node in node_data_list)
2924
2925     # Gather OOB paths
2926     oob_paths = []
2927     for node in self.all_node_info.values():
2928       path = SupportsOob(self.cfg, node)
2929       if path and path not in oob_paths:
2930         oob_paths.append(path)
2931
2932     if oob_paths:
2933       node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2934
2935     for inst_uuid in self.my_inst_uuids:
2936       instance = self.my_inst_info[inst_uuid]
2937       if instance.admin_state == constants.ADMINST_OFFLINE:
2938         i_offline += 1
2939
2940       for nuuid in instance.all_nodes:
2941         if nuuid not in node_image:
2942           gnode = self.NodeImage(uuid=nuuid)
2943           gnode.ghost = (nuuid not in self.all_node_info)
2944           node_image[nuuid] = gnode
2945
2946       instance.MapLVsByNode(node_vol_should)
2947
2948       pnode = instance.primary_node
2949       node_image[pnode].pinst.append(instance.uuid)
2950
2951       for snode in instance.secondary_nodes:
2952         nimg = node_image[snode]
2953         nimg.sinst.append(instance.uuid)
2954         if pnode not in nimg.sbp:
2955           nimg.sbp[pnode] = []
2956         nimg.sbp[pnode].append(instance.uuid)
2957
2958     es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
2959                                                self.my_node_info.keys())
2960     # The value of exclusive_storage should be the same across the group, so if
2961     # it's True for at least a node, we act as if it were set for all the nodes
2962     self._exclusive_storage = compat.any(es_flags.values())
2963     if self._exclusive_storage:
2964       node_verify_param[constants.NV_EXCLUSIVEPVS] = True
2965
2966     # At this point, we have the in-memory data structures complete,
2967     # except for the runtime information, which we'll gather next
2968
2969     # Due to the way our RPC system works, exact response times cannot be
2970     # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2971     # time before and after executing the request, we can at least have a time
2972     # window.
2973     nvinfo_starttime = time.time()
2974     all_nvinfo = self.rpc.call_node_verify(self.my_node_uuids,
2975                                            node_verify_param,
2976                                            self.cfg.GetClusterName(),
2977                                            self.cfg.GetClusterInfo().hvparams)
2978     nvinfo_endtime = time.time()
2979
2980     if self.extra_lv_nodes and vg_name is not None:
2981       extra_lv_nvinfo = \
2982           self.rpc.call_node_verify(self.extra_lv_nodes,
2983                                     {constants.NV_LVLIST: vg_name},
2984                                     self.cfg.GetClusterName(),
2985                                     self.cfg.GetClusterInfo().hvparams)
2986     else:
2987       extra_lv_nvinfo = {}
2988
2989     all_drbd_map = self.cfg.ComputeDRBDMap()
2990
2991     feedback_fn("* Gathering disk information (%s nodes)" %
2992                 len(self.my_node_uuids))
2993     instdisk = self._CollectDiskInfo(self.my_node_info.keys(), node_image,
2994                                      self.my_inst_info)
2995
2996     feedback_fn("* Verifying configuration file consistency")
2997
2998     # If not all nodes are being checked, we need to make sure the master node
2999     # and a non-checked vm_capable node are in the list.
3000     absent_node_uuids = set(self.all_node_info).difference(self.my_node_info)
3001     if absent_node_uuids:
3002       vf_nvinfo = all_nvinfo.copy()
3003       vf_node_info = list(self.my_node_info.values())
3004       additional_node_uuids = []
3005       if master_node_uuid not in self.my_node_info:
3006         additional_node_uuids.append(master_node_uuid)
3007         vf_node_info.append(self.all_node_info[master_node_uuid])
3008       # Add the first vm_capable node we find which is not included,
3009       # excluding the master node (which we already have)
3010       for node_uuid in absent_node_uuids:
3011         nodeinfo = self.all_node_info[node_uuid]
3012         if (nodeinfo.vm_capable and not nodeinfo.offline and
3013             node_uuid != master_node_uuid):
3014           additional_node_uuids.append(node_uuid)
3015           vf_node_info.append(self.all_node_info[node_uuid])
3016           break
3017       key = constants.NV_FILELIST
3018       vf_nvinfo.update(self.rpc.call_node_verify(
3019          additional_node_uuids, {key: node_verify_param[key]},
3020          self.cfg.GetClusterName(), self.cfg.GetClusterInfo().hvparams))
3021     else:
3022       vf_nvinfo = all_nvinfo
3023       vf_node_info = self.my_node_info.values()
3024
3025     self._VerifyFiles(vf_node_info, master_node_uuid, vf_nvinfo, filemap)
3026
3027     feedback_fn("* Verifying node status")
3028
3029     refos_img = None
3030
3031     for node_i in node_data_list:
3032       nimg = node_image[node_i.uuid]
3033
3034       if node_i.offline:
3035         if verbose:
3036           feedback_fn("* Skipping offline node %s" % (node_i.name,))
3037         n_offline += 1
3038         continue
3039
3040       if node_i.uuid == master_node_uuid:
3041         ntype = "master"
3042       elif node_i.master_candidate:
3043         ntype = "master candidate"
3044       elif node_i.drained:
3045         ntype = "drained"
3046         n_drained += 1
3047       else:
3048         ntype = "regular"
3049       if verbose:
3050         feedback_fn("* Verifying node %s (%s)" % (node_i.name, ntype))
3051
3052       msg = all_nvinfo[node_i.uuid].fail_msg
3053       self._ErrorIf(msg, constants.CV_ENODERPC, node_i.name,
3054                     "while contacting node: %s", msg)
3055       if msg:
3056         nimg.rpc_fail = True
3057         continue
3058
3059       nresult = all_nvinfo[node_i.uuid].payload
3060
3061       nimg.call_ok = self._VerifyNode(node_i, nresult)
3062       self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
3063       self._VerifyNodeNetwork(node_i, nresult)
3064       self._VerifyNodeUserScripts(node_i, nresult)
3065       self._VerifyOob(node_i, nresult)
3066       self._VerifyAcceptedFileStoragePaths(node_i, nresult,
3067                                            node_i.uuid == master_node_uuid)
3068       self._VerifyFileStoragePaths(node_i, nresult)
3069       self._VerifySharedFileStoragePaths(node_i, nresult)
3070
3071       if nimg.vm_capable:
3072         self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
3073         self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
3074                              all_drbd_map)
3075
3076         self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
3077         self._UpdateNodeInstances(node_i, nresult, nimg)
3078         self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
3079         self._UpdateNodeOS(node_i, nresult, nimg)
3080
3081         if not nimg.os_fail:
3082           if refos_img is None:
3083             refos_img = nimg
3084           self._VerifyNodeOS(node_i, nimg, refos_img)
3085         self._VerifyNodeBridges(node_i, nresult, bridges)
3086
3087         # Check whether all running instances are primary for the node. (This
3088         # can no longer be done from _VerifyInstance below, since some of the
3089         # wrong instances could be from other node groups.)
3090         non_primary_inst_uuids = set(nimg.instances).difference(nimg.pinst)
3091
3092         for inst_uuid in non_primary_inst_uuids:
3093           test = inst_uuid in self.all_inst_info
3094           self._ErrorIf(test, constants.CV_EINSTANCEWRONGNODE,
3095                         self.cfg.GetInstanceName(inst_uuid),
3096                         "instance should not run on node %s", node_i.name)
3097           self._ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
3098                         "node is running unknown instance %s", inst_uuid)
3099
3100     self._VerifyGroupDRBDVersion(all_nvinfo)
3101     self._VerifyGroupLVM(node_image, vg_name)
3102
3103     for node_uuid, result in extra_lv_nvinfo.items():
3104       self._UpdateNodeVolumes(self.all_node_info[node_uuid], result.payload,
3105                               node_image[node_uuid], vg_name)
3106
3107     feedback_fn("* Verifying instance status")
3108     for inst_uuid in self.my_inst_uuids:
3109       instance = self.my_inst_info[inst_uuid]
3110       if verbose:
3111         feedback_fn("* Verifying instance %s" % instance.name)
3112       self._VerifyInstance(instance, node_image, instdisk[inst_uuid])
3113
3114       # If the instance is non-redundant we cannot survive losing its primary
3115       # node, so we are not N+1 compliant.
3116       if instance.disk_template not in constants.DTS_MIRRORED:
3117         i_non_redundant.append(instance)
3118
3119       if not cluster.FillBE(instance)[constants.BE_AUTO_BALANCE]:
3120         i_non_a_balanced.append(instance)
3121
3122     feedback_fn("* Verifying orphan volumes")
3123     reserved = utils.FieldSet(*cluster.reserved_lvs)
3124
3125     # We will get spurious "unknown volume" warnings if any node of this group
3126     # is secondary for an instance whose primary is in another group. To avoid
3127     # them, we find these instances and add their volumes to node_vol_should.
3128     for instance in self.all_inst_info.values():
3129       for secondary in instance.secondary_nodes:
3130         if (secondary in self.my_node_info
3131             and instance.name not in self.my_inst_info):
3132           instance.MapLVsByNode(node_vol_should)
3133           break
3134
3135     self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
3136
3137     if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
3138       feedback_fn("* Verifying N+1 Memory redundancy")
3139       self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
3140
3141     feedback_fn("* Other Notes")
3142     if i_non_redundant:
3143       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
3144                   % len(i_non_redundant))
3145
3146     if i_non_a_balanced:
3147       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
3148                   % len(i_non_a_balanced))
3149
3150     if i_offline:
3151       feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
3152
3153     if n_offline:
3154       feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
3155
3156     if n_drained:
3157       feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
3158
3159     return not self.bad
3160
3161   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
3162     """Analyze the post-hooks' result
3163
3164     This method analyses the hook result, handles it, and sends some
3165     nicely-formatted feedback back to the user.
3166
3167     @param phase: one of L{constants.HOOKS_PHASE_POST} or
3168         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
3169     @param hooks_results: the results of the multi-node hooks rpc call
3170     @param feedback_fn: function used send feedback back to the caller
3171     @param lu_result: previous Exec result
3172     @return: the new Exec result, based on the previous result
3173         and hook results
3174
3175     """
3176     # We only really run POST phase hooks, only for non-empty groups,
3177     # and are only interested in their results
3178     if not self.my_node_uuids:
3179       # empty node group
3180       pass
3181     elif phase == constants.HOOKS_PHASE_POST:
3182       # Used to change hooks' output to proper indentation
3183       feedback_fn("* Hooks Results")
3184       assert hooks_results, "invalid result from hooks"
3185
3186       for node_name in hooks_results:
3187         res = hooks_results[node_name]
3188         msg = res.fail_msg
3189         test = msg and not res.offline
3190         self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3191                       "Communication failure in hooks execution: %s", msg)
3192         if res.offline or msg:
3193           # No need to investigate payload if node is offline or gave
3194           # an error.
3195           continue
3196         for script, hkr, output in res.payload:
3197           test = hkr == constants.HKR_FAIL
3198           self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3199                         "Script %s failed, output:", script)
3200           if test:
3201             output = self._HOOKS_INDENT_RE.sub("      ", output)
3202             feedback_fn("%s" % output)
3203             lu_result = False
3204
3205     return lu_result
3206
3207
3208 class LUClusterVerifyDisks(NoHooksLU):
3209   """Verifies the cluster disks status.
3210
3211   """
3212   REQ_BGL = False
3213
3214   def ExpandNames(self):
3215     self.share_locks = ShareAll()
3216     self.needed_locks = {
3217       locking.LEVEL_NODEGROUP: locking.ALL_SET,
3218       }
3219
3220   def Exec(self, feedback_fn):
3221     group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3222
3223     # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3224     return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3225                            for group in group_names])