Allow modify_etc_hosts to be changed
[ganeti-local] / lib / cmdlib / cluster.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Logical units dealing with the cluster."""
23
24 import OpenSSL
25
26 import copy
27 import itertools
28 import logging
29 import operator
30 import os
31 import re
32 import time
33
34 from ganeti import compat
35 from ganeti import constants
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import masterd
40 from ganeti import netutils
41 from ganeti import objects
42 from ganeti import opcodes
43 from ganeti import pathutils
44 from ganeti import query
45 from ganeti import rpc
46 from ganeti import runtime
47 from ganeti import ssh
48 from ganeti import uidpool
49 from ganeti import utils
50 from ganeti import vcluster
51
52 from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
53   ResultWithJobs
54 from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55   ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56   GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57   GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58   CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59   ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob
60
61 import ganeti.masterd.instance
62
63
64 class LUClusterActivateMasterIp(NoHooksLU):
65   """Activate the master IP on the master node.
66
67   """
68   def Exec(self, feedback_fn):
69     """Activate the master IP.
70
71     """
72     master_params = self.cfg.GetMasterNetworkParameters()
73     ems = self.cfg.GetUseExternalMipScript()
74     result = self.rpc.call_node_activate_master_ip(master_params.name,
75                                                    master_params, ems)
76     result.Raise("Could not activate the master IP")
77
78
79 class LUClusterDeactivateMasterIp(NoHooksLU):
80   """Deactivate the master IP on the master node.
81
82   """
83   def Exec(self, feedback_fn):
84     """Deactivate the master IP.
85
86     """
87     master_params = self.cfg.GetMasterNetworkParameters()
88     ems = self.cfg.GetUseExternalMipScript()
89     result = self.rpc.call_node_deactivate_master_ip(master_params.name,
90                                                      master_params, ems)
91     result.Raise("Could not deactivate the master IP")
92
93
94 class LUClusterConfigQuery(NoHooksLU):
95   """Return configuration values.
96
97   """
98   REQ_BGL = False
99
100   def CheckArguments(self):
101     self.cq = ClusterQuery(None, self.op.output_fields, False)
102
103   def ExpandNames(self):
104     self.cq.ExpandNames(self)
105
106   def DeclareLocks(self, level):
107     self.cq.DeclareLocks(self, level)
108
109   def Exec(self, feedback_fn):
110     result = self.cq.OldStyleQuery(self)
111
112     assert len(result) == 1
113
114     return result[0]
115
116
117 class LUClusterDestroy(LogicalUnit):
118   """Logical unit for destroying the cluster.
119
120   """
121   HPATH = "cluster-destroy"
122   HTYPE = constants.HTYPE_CLUSTER
123
124   def BuildHooksEnv(self):
125     """Build hooks env.
126
127     """
128     return {
129       "OP_TARGET": self.cfg.GetClusterName(),
130       }
131
132   def BuildHooksNodes(self):
133     """Build hooks nodes.
134
135     """
136     return ([], [])
137
138   def CheckPrereq(self):
139     """Check prerequisites.
140
141     This checks whether the cluster is empty.
142
143     Any errors are signaled by raising errors.OpPrereqError.
144
145     """
146     master = self.cfg.GetMasterNode()
147
148     nodelist = self.cfg.GetNodeList()
149     if len(nodelist) != 1 or nodelist[0] != master:
150       raise errors.OpPrereqError("There are still %d node(s) in"
151                                  " this cluster." % (len(nodelist) - 1),
152                                  errors.ECODE_INVAL)
153     instancelist = self.cfg.GetInstanceList()
154     if instancelist:
155       raise errors.OpPrereqError("There are still %d instance(s) in"
156                                  " this cluster." % len(instancelist),
157                                  errors.ECODE_INVAL)
158
159   def Exec(self, feedback_fn):
160     """Destroys the cluster.
161
162     """
163     master_params = self.cfg.GetMasterNetworkParameters()
164
165     # Run post hooks on master node before it's removed
166     RunPostHook(self, master_params.name)
167
168     ems = self.cfg.GetUseExternalMipScript()
169     result = self.rpc.call_node_deactivate_master_ip(master_params.name,
170                                                      master_params, ems)
171     if result.fail_msg:
172       self.LogWarning("Error disabling the master IP address: %s",
173                       result.fail_msg)
174
175     return master_params.name
176
177
178 class LUClusterPostInit(LogicalUnit):
179   """Logical unit for running hooks after cluster initialization.
180
181   """
182   HPATH = "cluster-init"
183   HTYPE = constants.HTYPE_CLUSTER
184
185   def BuildHooksEnv(self):
186     """Build hooks env.
187
188     """
189     return {
190       "OP_TARGET": self.cfg.GetClusterName(),
191       }
192
193   def BuildHooksNodes(self):
194     """Build hooks nodes.
195
196     """
197     return ([], [self.cfg.GetMasterNode()])
198
199   def Exec(self, feedback_fn):
200     """Nothing to do.
201
202     """
203     return True
204
205
206 class ClusterQuery(QueryBase):
207   FIELDS = query.CLUSTER_FIELDS
208
209   #: Do not sort (there is only one item)
210   SORT_FIELD = None
211
212   def ExpandNames(self, lu):
213     lu.needed_locks = {}
214
215     # The following variables interact with _QueryBase._GetNames
216     self.wanted = locking.ALL_SET
217     self.do_locking = self.use_locking
218
219     if self.do_locking:
220       raise errors.OpPrereqError("Can not use locking for cluster queries",
221                                  errors.ECODE_INVAL)
222
223   def DeclareLocks(self, lu, level):
224     pass
225
226   def _GetQueryData(self, lu):
227     """Computes the list of nodes and their attributes.
228
229     """
230     # Locking is not used
231     assert not (compat.any(lu.glm.is_owned(level)
232                            for level in locking.LEVELS
233                            if level != locking.LEVEL_CLUSTER) or
234                 self.do_locking or self.use_locking)
235
236     if query.CQ_CONFIG in self.requested_data:
237       cluster = lu.cfg.GetClusterInfo()
238     else:
239       cluster = NotImplemented
240
241     if query.CQ_QUEUE_DRAINED in self.requested_data:
242       drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
243     else:
244       drain_flag = NotImplemented
245
246     if query.CQ_WATCHER_PAUSE in self.requested_data:
247       master_name = lu.cfg.GetMasterNode()
248
249       result = lu.rpc.call_get_watcher_pause(master_name)
250       result.Raise("Can't retrieve watcher pause from master node '%s'" %
251                    master_name)
252
253       watcher_pause = result.payload
254     else:
255       watcher_pause = NotImplemented
256
257     return query.ClusterQueryData(cluster, drain_flag, watcher_pause)
258
259
260 class LUClusterQuery(NoHooksLU):
261   """Query cluster configuration.
262
263   """
264   REQ_BGL = False
265
266   def ExpandNames(self):
267     self.needed_locks = {}
268
269   def Exec(self, feedback_fn):
270     """Return cluster config.
271
272     """
273     cluster = self.cfg.GetClusterInfo()
274     os_hvp = {}
275
276     # Filter just for enabled hypervisors
277     for os_name, hv_dict in cluster.os_hvp.items():
278       os_hvp[os_name] = {}
279       for hv_name, hv_params in hv_dict.items():
280         if hv_name in cluster.enabled_hypervisors:
281           os_hvp[os_name][hv_name] = hv_params
282
283     # Convert ip_family to ip_version
284     primary_ip_version = constants.IP4_VERSION
285     if cluster.primary_ip_family == netutils.IP6Address.family:
286       primary_ip_version = constants.IP6_VERSION
287
288     result = {
289       "software_version": constants.RELEASE_VERSION,
290       "protocol_version": constants.PROTOCOL_VERSION,
291       "config_version": constants.CONFIG_VERSION,
292       "os_api_version": max(constants.OS_API_VERSIONS),
293       "export_version": constants.EXPORT_VERSION,
294       "architecture": runtime.GetArchInfo(),
295       "name": cluster.cluster_name,
296       "master": cluster.master_node,
297       "default_hypervisor": cluster.primary_hypervisor,
298       "enabled_hypervisors": cluster.enabled_hypervisors,
299       "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
300                         for hypervisor_name in cluster.enabled_hypervisors]),
301       "os_hvp": os_hvp,
302       "beparams": cluster.beparams,
303       "osparams": cluster.osparams,
304       "ipolicy": cluster.ipolicy,
305       "nicparams": cluster.nicparams,
306       "ndparams": cluster.ndparams,
307       "diskparams": cluster.diskparams,
308       "candidate_pool_size": cluster.candidate_pool_size,
309       "master_netdev": cluster.master_netdev,
310       "master_netmask": cluster.master_netmask,
311       "use_external_mip_script": cluster.use_external_mip_script,
312       "volume_group_name": cluster.volume_group_name,
313       "drbd_usermode_helper": cluster.drbd_usermode_helper,
314       "file_storage_dir": cluster.file_storage_dir,
315       "shared_file_storage_dir": cluster.shared_file_storage_dir,
316       "maintain_node_health": cluster.maintain_node_health,
317       "ctime": cluster.ctime,
318       "mtime": cluster.mtime,
319       "uuid": cluster.uuid,
320       "tags": list(cluster.GetTags()),
321       "uid_pool": cluster.uid_pool,
322       "default_iallocator": cluster.default_iallocator,
323       "reserved_lvs": cluster.reserved_lvs,
324       "primary_ip_version": primary_ip_version,
325       "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
326       "hidden_os": cluster.hidden_os,
327       "blacklisted_os": cluster.blacklisted_os,
328       "enabled_disk_templates": cluster.enabled_disk_templates,
329       }
330
331     return result
332
333
334 class LUClusterRedistConf(NoHooksLU):
335   """Force the redistribution of cluster configuration.
336
337   This is a very simple LU.
338
339   """
340   REQ_BGL = False
341
342   def ExpandNames(self):
343     self.needed_locks = {
344       locking.LEVEL_NODE: locking.ALL_SET,
345       locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
346     }
347     self.share_locks = ShareAll()
348
349   def Exec(self, feedback_fn):
350     """Redistribute the configuration.
351
352     """
353     self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
354     RedistributeAncillaryFiles(self)
355
356
357 class LUClusterRename(LogicalUnit):
358   """Rename the cluster.
359
360   """
361   HPATH = "cluster-rename"
362   HTYPE = constants.HTYPE_CLUSTER
363
364   def BuildHooksEnv(self):
365     """Build hooks env.
366
367     """
368     return {
369       "OP_TARGET": self.cfg.GetClusterName(),
370       "NEW_NAME": self.op.name,
371       }
372
373   def BuildHooksNodes(self):
374     """Build hooks nodes.
375
376     """
377     return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
378
379   def CheckPrereq(self):
380     """Verify that the passed name is a valid one.
381
382     """
383     hostname = netutils.GetHostname(name=self.op.name,
384                                     family=self.cfg.GetPrimaryIPFamily())
385
386     new_name = hostname.name
387     self.ip = new_ip = hostname.ip
388     old_name = self.cfg.GetClusterName()
389     old_ip = self.cfg.GetMasterIP()
390     if new_name == old_name and new_ip == old_ip:
391       raise errors.OpPrereqError("Neither the name nor the IP address of the"
392                                  " cluster has changed",
393                                  errors.ECODE_INVAL)
394     if new_ip != old_ip:
395       if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
396         raise errors.OpPrereqError("The given cluster IP address (%s) is"
397                                    " reachable on the network" %
398                                    new_ip, errors.ECODE_NOTUNIQUE)
399
400     self.op.name = new_name
401
402   def Exec(self, feedback_fn):
403     """Rename the cluster.
404
405     """
406     clustername = self.op.name
407     new_ip = self.ip
408
409     # shutdown the master IP
410     master_params = self.cfg.GetMasterNetworkParameters()
411     ems = self.cfg.GetUseExternalMipScript()
412     result = self.rpc.call_node_deactivate_master_ip(master_params.name,
413                                                      master_params, ems)
414     result.Raise("Could not disable the master role")
415
416     try:
417       cluster = self.cfg.GetClusterInfo()
418       cluster.cluster_name = clustername
419       cluster.master_ip = new_ip
420       self.cfg.Update(cluster, feedback_fn)
421
422       # update the known hosts file
423       ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
424       node_list = self.cfg.GetOnlineNodeList()
425       try:
426         node_list.remove(master_params.name)
427       except ValueError:
428         pass
429       UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
430     finally:
431       master_params.ip = new_ip
432       result = self.rpc.call_node_activate_master_ip(master_params.name,
433                                                      master_params, ems)
434       msg = result.fail_msg
435       if msg:
436         self.LogWarning("Could not re-enable the master role on"
437                         " the master, please restart manually: %s", msg)
438
439     return clustername
440
441
442 class LUClusterRepairDiskSizes(NoHooksLU):
443   """Verifies the cluster disks sizes.
444
445   """
446   REQ_BGL = False
447
448   def ExpandNames(self):
449     if self.op.instances:
450       self.wanted_names = GetWantedInstances(self, self.op.instances)
451       # Not getting the node allocation lock as only a specific set of
452       # instances (and their nodes) is going to be acquired
453       self.needed_locks = {
454         locking.LEVEL_NODE_RES: [],
455         locking.LEVEL_INSTANCE: self.wanted_names,
456         }
457       self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
458     else:
459       self.wanted_names = None
460       self.needed_locks = {
461         locking.LEVEL_NODE_RES: locking.ALL_SET,
462         locking.LEVEL_INSTANCE: locking.ALL_SET,
463
464         # This opcode is acquires the node locks for all instances
465         locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
466         }
467
468     self.share_locks = {
469       locking.LEVEL_NODE_RES: 1,
470       locking.LEVEL_INSTANCE: 0,
471       locking.LEVEL_NODE_ALLOC: 1,
472       }
473
474   def DeclareLocks(self, level):
475     if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
476       self._LockInstancesNodes(primary_only=True, level=level)
477
478   def CheckPrereq(self):
479     """Check prerequisites.
480
481     This only checks the optional instance list against the existing names.
482
483     """
484     if self.wanted_names is None:
485       self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
486
487     self.wanted_instances = \
488         map(compat.snd, self.cfg.GetMultiInstanceInfo(self.wanted_names))
489
490   def _EnsureChildSizes(self, disk):
491     """Ensure children of the disk have the needed disk size.
492
493     This is valid mainly for DRBD8 and fixes an issue where the
494     children have smaller disk size.
495
496     @param disk: an L{ganeti.objects.Disk} object
497
498     """
499     if disk.dev_type == constants.LD_DRBD8:
500       assert disk.children, "Empty children for DRBD8?"
501       fchild = disk.children[0]
502       mismatch = fchild.size < disk.size
503       if mismatch:
504         self.LogInfo("Child disk has size %d, parent %d, fixing",
505                      fchild.size, disk.size)
506         fchild.size = disk.size
507
508       # and we recurse on this child only, not on the metadev
509       return self._EnsureChildSizes(fchild) or mismatch
510     else:
511       return False
512
513   def Exec(self, feedback_fn):
514     """Verify the size of cluster disks.
515
516     """
517     # TODO: check child disks too
518     # TODO: check differences in size between primary/secondary nodes
519     per_node_disks = {}
520     for instance in self.wanted_instances:
521       pnode = instance.primary_node
522       if pnode not in per_node_disks:
523         per_node_disks[pnode] = []
524       for idx, disk in enumerate(instance.disks):
525         per_node_disks[pnode].append((instance, idx, disk))
526
527     assert not (frozenset(per_node_disks.keys()) -
528                 self.owned_locks(locking.LEVEL_NODE_RES)), \
529       "Not owning correct locks"
530     assert not self.owned_locks(locking.LEVEL_NODE)
531
532     changed = []
533     for node, dskl in per_node_disks.items():
534       newl = [v[2].Copy() for v in dskl]
535       for dsk in newl:
536         self.cfg.SetDiskID(dsk, node)
537       result = self.rpc.call_blockdev_getsize(node, newl)
538       if result.fail_msg:
539         self.LogWarning("Failure in blockdev_getsize call to node"
540                         " %s, ignoring", node)
541         continue
542       if len(result.payload) != len(dskl):
543         logging.warning("Invalid result from node %s: len(dksl)=%d,"
544                         " result.payload=%s", node, len(dskl), result.payload)
545         self.LogWarning("Invalid result from node %s, ignoring node results",
546                         node)
547         continue
548       for ((instance, idx, disk), size) in zip(dskl, result.payload):
549         if size is None:
550           self.LogWarning("Disk %d of instance %s did not return size"
551                           " information, ignoring", idx, instance.name)
552           continue
553         if not isinstance(size, (int, long)):
554           self.LogWarning("Disk %d of instance %s did not return valid"
555                           " size information, ignoring", idx, instance.name)
556           continue
557         size = size >> 20
558         if size != disk.size:
559           self.LogInfo("Disk %d of instance %s has mismatched size,"
560                        " correcting: recorded %d, actual %d", idx,
561                        instance.name, disk.size, size)
562           disk.size = size
563           self.cfg.Update(instance, feedback_fn)
564           changed.append((instance.name, idx, size))
565         if self._EnsureChildSizes(disk):
566           self.cfg.Update(instance, feedback_fn)
567           changed.append((instance.name, idx, disk.size))
568     return changed
569
570
571 def _ValidateNetmask(cfg, netmask):
572   """Checks if a netmask is valid.
573
574   @type cfg: L{config.ConfigWriter}
575   @param cfg: The cluster configuration
576   @type netmask: int
577   @param netmask: the netmask to be verified
578   @raise errors.OpPrereqError: if the validation fails
579
580   """
581   ip_family = cfg.GetPrimaryIPFamily()
582   try:
583     ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
584   except errors.ProgrammerError:
585     raise errors.OpPrereqError("Invalid primary ip family: %s." %
586                                ip_family, errors.ECODE_INVAL)
587   if not ipcls.ValidateNetmask(netmask):
588     raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
589                                (netmask), errors.ECODE_INVAL)
590
591
592 class LUClusterSetParams(LogicalUnit):
593   """Change the parameters of the cluster.
594
595   """
596   HPATH = "cluster-modify"
597   HTYPE = constants.HTYPE_CLUSTER
598   REQ_BGL = False
599
600   def CheckArguments(self):
601     """Check parameters
602
603     """
604     if self.op.uid_pool:
605       uidpool.CheckUidPool(self.op.uid_pool)
606
607     if self.op.add_uids:
608       uidpool.CheckUidPool(self.op.add_uids)
609
610     if self.op.remove_uids:
611       uidpool.CheckUidPool(self.op.remove_uids)
612
613     if self.op.master_netmask is not None:
614       _ValidateNetmask(self.cfg, self.op.master_netmask)
615
616     if self.op.diskparams:
617       for dt_params in self.op.diskparams.values():
618         utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
619       try:
620         utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
621       except errors.OpPrereqError, err:
622         raise errors.OpPrereqError("While verify diskparams options: %s" % err,
623                                    errors.ECODE_INVAL)
624
625   def ExpandNames(self):
626     # FIXME: in the future maybe other cluster params won't require checking on
627     # all nodes to be modified.
628     # FIXME: This opcode changes cluster-wide settings. Is acquiring all
629     # resource locks the right thing, shouldn't it be the BGL instead?
630     self.needed_locks = {
631       locking.LEVEL_NODE: locking.ALL_SET,
632       locking.LEVEL_INSTANCE: locking.ALL_SET,
633       locking.LEVEL_NODEGROUP: locking.ALL_SET,
634       locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
635     }
636     self.share_locks = ShareAll()
637
638   def BuildHooksEnv(self):
639     """Build hooks env.
640
641     """
642     return {
643       "OP_TARGET": self.cfg.GetClusterName(),
644       "NEW_VG_NAME": self.op.vg_name,
645       }
646
647   def BuildHooksNodes(self):
648     """Build hooks nodes.
649
650     """
651     mn = self.cfg.GetMasterNode()
652     return ([mn], [mn])
653
654   def CheckPrereq(self):
655     """Check prerequisites.
656
657     This checks whether the given params don't conflict and
658     if the given volume group is valid.
659
660     """
661     if self.op.vg_name is not None and not self.op.vg_name:
662       if self.cfg.HasAnyDiskOfType(constants.LD_LV):
663         raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
664                                    " instances exist", errors.ECODE_INVAL)
665
666     if self.op.drbd_helper is not None and not self.op.drbd_helper:
667       if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
668         raise errors.OpPrereqError("Cannot disable drbd helper while"
669                                    " drbd-based instances exist",
670                                    errors.ECODE_INVAL)
671
672     node_list = self.owned_locks(locking.LEVEL_NODE)
673
674     vm_capable_nodes = [node.name
675                         for node in self.cfg.GetAllNodesInfo().values()
676                         if node.name in node_list and node.vm_capable]
677
678     # if vg_name not None, checks given volume group on all nodes
679     if self.op.vg_name:
680       vglist = self.rpc.call_vg_list(vm_capable_nodes)
681       for node in vm_capable_nodes:
682         msg = vglist[node].fail_msg
683         if msg:
684           # ignoring down node
685           self.LogWarning("Error while gathering data on node %s"
686                           " (ignoring node): %s", node, msg)
687           continue
688         vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
689                                               self.op.vg_name,
690                                               constants.MIN_VG_SIZE)
691         if vgstatus:
692           raise errors.OpPrereqError("Error on node '%s': %s" %
693                                      (node, vgstatus), errors.ECODE_ENVIRON)
694
695     if self.op.drbd_helper:
696       # checks given drbd helper on all nodes
697       helpers = self.rpc.call_drbd_helper(node_list)
698       for (node, ninfo) in self.cfg.GetMultiNodeInfo(node_list):
699         if ninfo.offline:
700           self.LogInfo("Not checking drbd helper on offline node %s", node)
701           continue
702         msg = helpers[node].fail_msg
703         if msg:
704           raise errors.OpPrereqError("Error checking drbd helper on node"
705                                      " '%s': %s" % (node, msg),
706                                      errors.ECODE_ENVIRON)
707         node_helper = helpers[node].payload
708         if node_helper != self.op.drbd_helper:
709           raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
710                                      (node, node_helper), errors.ECODE_ENVIRON)
711
712     self.cluster = cluster = self.cfg.GetClusterInfo()
713     # validate params changes
714     if self.op.beparams:
715       objects.UpgradeBeParams(self.op.beparams)
716       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
717       self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
718
719     if self.op.ndparams:
720       utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
721       self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
722
723       # TODO: we need a more general way to handle resetting
724       # cluster-level parameters to default values
725       if self.new_ndparams["oob_program"] == "":
726         self.new_ndparams["oob_program"] = \
727             constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
728
729     if self.op.hv_state:
730       new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
731                                            self.cluster.hv_state_static)
732       self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
733                                for hv, values in new_hv_state.items())
734
735     if self.op.disk_state:
736       new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
737                                                self.cluster.disk_state_static)
738       self.new_disk_state = \
739         dict((storage, dict((name, cluster.SimpleFillDiskState(values))
740                             for name, values in svalues.items()))
741              for storage, svalues in new_disk_state.items())
742
743     if self.op.ipolicy:
744       self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
745                                            group_policy=False)
746
747       all_instances = self.cfg.GetAllInstancesInfo().values()
748       violations = set()
749       for group in self.cfg.GetAllNodeGroupsInfo().values():
750         instances = frozenset([inst for inst in all_instances
751                                if compat.any(node in group.members
752                                              for node in inst.all_nodes)])
753         new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
754         ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
755         new = ComputeNewInstanceViolations(ipol,
756                                            new_ipolicy, instances, self.cfg)
757         if new:
758           violations.update(new)
759
760       if violations:
761         self.LogWarning("After the ipolicy change the following instances"
762                         " violate them: %s",
763                         utils.CommaJoin(utils.NiceSort(violations)))
764
765     if self.op.nicparams:
766       utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
767       self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
768       objects.NIC.CheckParameterSyntax(self.new_nicparams)
769       nic_errors = []
770
771       # check all instances for consistency
772       for instance in self.cfg.GetAllInstancesInfo().values():
773         for nic_idx, nic in enumerate(instance.nics):
774           params_copy = copy.deepcopy(nic.nicparams)
775           params_filled = objects.FillDict(self.new_nicparams, params_copy)
776
777           # check parameter syntax
778           try:
779             objects.NIC.CheckParameterSyntax(params_filled)
780           except errors.ConfigurationError, err:
781             nic_errors.append("Instance %s, nic/%d: %s" %
782                               (instance.name, nic_idx, err))
783
784           # if we're moving instances to routed, check that they have an ip
785           target_mode = params_filled[constants.NIC_MODE]
786           if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
787             nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
788                               " address" % (instance.name, nic_idx))
789       if nic_errors:
790         raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
791                                    "\n".join(nic_errors), errors.ECODE_INVAL)
792
793     # hypervisor list/parameters
794     self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
795     if self.op.hvparams:
796       for hv_name, hv_dict in self.op.hvparams.items():
797         if hv_name not in self.new_hvparams:
798           self.new_hvparams[hv_name] = hv_dict
799         else:
800           self.new_hvparams[hv_name].update(hv_dict)
801
802     # disk template parameters
803     self.new_diskparams = objects.FillDict(cluster.diskparams, {})
804     if self.op.diskparams:
805       for dt_name, dt_params in self.op.diskparams.items():
806         if dt_name not in self.op.diskparams:
807           self.new_diskparams[dt_name] = dt_params
808         else:
809           self.new_diskparams[dt_name].update(dt_params)
810
811     # os hypervisor parameters
812     self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
813     if self.op.os_hvp:
814       for os_name, hvs in self.op.os_hvp.items():
815         if os_name not in self.new_os_hvp:
816           self.new_os_hvp[os_name] = hvs
817         else:
818           for hv_name, hv_dict in hvs.items():
819             if hv_dict is None:
820               # Delete if it exists
821               self.new_os_hvp[os_name].pop(hv_name, None)
822             elif hv_name not in self.new_os_hvp[os_name]:
823               self.new_os_hvp[os_name][hv_name] = hv_dict
824             else:
825               self.new_os_hvp[os_name][hv_name].update(hv_dict)
826
827     # os parameters
828     self.new_osp = objects.FillDict(cluster.osparams, {})
829     if self.op.osparams:
830       for os_name, osp in self.op.osparams.items():
831         if os_name not in self.new_osp:
832           self.new_osp[os_name] = {}
833
834         self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
835                                                  use_none=True)
836
837         if not self.new_osp[os_name]:
838           # we removed all parameters
839           del self.new_osp[os_name]
840         else:
841           # check the parameter validity (remote check)
842           CheckOSParams(self, False, [self.cfg.GetMasterNode()],
843                         os_name, self.new_osp[os_name])
844
845     # changes to the hypervisor list
846     if self.op.enabled_hypervisors is not None:
847       self.hv_list = self.op.enabled_hypervisors
848       for hv in self.hv_list:
849         # if the hypervisor doesn't already exist in the cluster
850         # hvparams, we initialize it to empty, and then (in both
851         # cases) we make sure to fill the defaults, as we might not
852         # have a complete defaults list if the hypervisor wasn't
853         # enabled before
854         if hv not in new_hvp:
855           new_hvp[hv] = {}
856         new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
857         utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
858     else:
859       self.hv_list = cluster.enabled_hypervisors
860
861     if self.op.hvparams or self.op.enabled_hypervisors is not None:
862       # either the enabled list has changed, or the parameters have, validate
863       for hv_name, hv_params in self.new_hvparams.items():
864         if ((self.op.hvparams and hv_name in self.op.hvparams) or
865             (self.op.enabled_hypervisors and
866              hv_name in self.op.enabled_hypervisors)):
867           # either this is a new hypervisor, or its parameters have changed
868           hv_class = hypervisor.GetHypervisorClass(hv_name)
869           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
870           hv_class.CheckParameterSyntax(hv_params)
871           CheckHVParams(self, node_list, hv_name, hv_params)
872
873     self._CheckDiskTemplateConsistency()
874
875     if self.op.os_hvp:
876       # no need to check any newly-enabled hypervisors, since the
877       # defaults have already been checked in the above code-block
878       for os_name, os_hvp in self.new_os_hvp.items():
879         for hv_name, hv_params in os_hvp.items():
880           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
881           # we need to fill in the new os_hvp on top of the actual hv_p
882           cluster_defaults = self.new_hvparams.get(hv_name, {})
883           new_osp = objects.FillDict(cluster_defaults, hv_params)
884           hv_class = hypervisor.GetHypervisorClass(hv_name)
885           hv_class.CheckParameterSyntax(new_osp)
886           CheckHVParams(self, node_list, hv_name, new_osp)
887
888     if self.op.default_iallocator:
889       alloc_script = utils.FindFile(self.op.default_iallocator,
890                                     constants.IALLOCATOR_SEARCH_PATH,
891                                     os.path.isfile)
892       if alloc_script is None:
893         raise errors.OpPrereqError("Invalid default iallocator script '%s'"
894                                    " specified" % self.op.default_iallocator,
895                                    errors.ECODE_INVAL)
896
897   def _CheckDiskTemplateConsistency(self):
898     """Check whether the disk templates that are going to be disabled
899        are still in use by some instances.
900
901     """
902     if self.op.enabled_disk_templates:
903       cluster = self.cfg.GetClusterInfo()
904       instances = self.cfg.GetAllInstancesInfo()
905
906       disk_templates_to_remove = set(cluster.enabled_disk_templates) \
907         - set(self.op.enabled_disk_templates)
908       for instance in instances.itervalues():
909         if instance.disk_template in disk_templates_to_remove:
910           raise errors.OpPrereqError("Cannot disable disk template '%s',"
911                                      " because instance '%s' is using it." %
912                                      (instance.disk_template, instance.name))
913
914   def Exec(self, feedback_fn):
915     """Change the parameters of the cluster.
916
917     """
918     if self.op.vg_name is not None:
919       new_volume = self.op.vg_name
920       if not new_volume:
921         new_volume = None
922       if new_volume != self.cfg.GetVGName():
923         self.cfg.SetVGName(new_volume)
924       else:
925         feedback_fn("Cluster LVM configuration already in desired"
926                     " state, not changing")
927     if self.op.drbd_helper is not None:
928       new_helper = self.op.drbd_helper
929       if not new_helper:
930         new_helper = None
931       if new_helper != self.cfg.GetDRBDHelper():
932         self.cfg.SetDRBDHelper(new_helper)
933       else:
934         feedback_fn("Cluster DRBD helper already in desired state,"
935                     " not changing")
936     if self.op.hvparams:
937       self.cluster.hvparams = self.new_hvparams
938     if self.op.os_hvp:
939       self.cluster.os_hvp = self.new_os_hvp
940     if self.op.enabled_hypervisors is not None:
941       self.cluster.hvparams = self.new_hvparams
942       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
943     if self.op.enabled_disk_templates:
944       self.cluster.enabled_disk_templates = \
945         list(set(self.op.enabled_disk_templates))
946     if self.op.beparams:
947       self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
948     if self.op.nicparams:
949       self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
950     if self.op.ipolicy:
951       self.cluster.ipolicy = self.new_ipolicy
952     if self.op.osparams:
953       self.cluster.osparams = self.new_osp
954     if self.op.ndparams:
955       self.cluster.ndparams = self.new_ndparams
956     if self.op.diskparams:
957       self.cluster.diskparams = self.new_diskparams
958     if self.op.hv_state:
959       self.cluster.hv_state_static = self.new_hv_state
960     if self.op.disk_state:
961       self.cluster.disk_state_static = self.new_disk_state
962
963     if self.op.candidate_pool_size is not None:
964       self.cluster.candidate_pool_size = self.op.candidate_pool_size
965       # we need to update the pool size here, otherwise the save will fail
966       AdjustCandidatePool(self, [])
967
968     if self.op.maintain_node_health is not None:
969       if self.op.maintain_node_health and not constants.ENABLE_CONFD:
970         feedback_fn("Note: CONFD was disabled at build time, node health"
971                     " maintenance is not useful (still enabling it)")
972       self.cluster.maintain_node_health = self.op.maintain_node_health
973
974     if self.op.modify_etc_hosts is not None:
975       self.cluster.modify_etc_hosts = self.op.modify_etc_hosts
976
977     if self.op.prealloc_wipe_disks is not None:
978       self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
979
980     if self.op.add_uids is not None:
981       uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
982
983     if self.op.remove_uids is not None:
984       uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
985
986     if self.op.uid_pool is not None:
987       self.cluster.uid_pool = self.op.uid_pool
988
989     if self.op.default_iallocator is not None:
990       self.cluster.default_iallocator = self.op.default_iallocator
991
992     if self.op.reserved_lvs is not None:
993       self.cluster.reserved_lvs = self.op.reserved_lvs
994
995     if self.op.use_external_mip_script is not None:
996       self.cluster.use_external_mip_script = self.op.use_external_mip_script
997
998     def helper_os(aname, mods, desc):
999       desc += " OS list"
1000       lst = getattr(self.cluster, aname)
1001       for key, val in mods:
1002         if key == constants.DDM_ADD:
1003           if val in lst:
1004             feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1005           else:
1006             lst.append(val)
1007         elif key == constants.DDM_REMOVE:
1008           if val in lst:
1009             lst.remove(val)
1010           else:
1011             feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1012         else:
1013           raise errors.ProgrammerError("Invalid modification '%s'" % key)
1014
1015     if self.op.hidden_os:
1016       helper_os("hidden_os", self.op.hidden_os, "hidden")
1017
1018     if self.op.blacklisted_os:
1019       helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1020
1021     if self.op.master_netdev:
1022       master_params = self.cfg.GetMasterNetworkParameters()
1023       ems = self.cfg.GetUseExternalMipScript()
1024       feedback_fn("Shutting down master ip on the current netdev (%s)" %
1025                   self.cluster.master_netdev)
1026       result = self.rpc.call_node_deactivate_master_ip(master_params.name,
1027                                                        master_params, ems)
1028       if not self.op.force:
1029         result.Raise("Could not disable the master ip")
1030       else:
1031         if result.fail_msg:
1032           msg = ("Could not disable the master ip (continuing anyway): %s" %
1033                  result.fail_msg)
1034           feedback_fn(msg)
1035       feedback_fn("Changing master_netdev from %s to %s" %
1036                   (master_params.netdev, self.op.master_netdev))
1037       self.cluster.master_netdev = self.op.master_netdev
1038
1039     if self.op.master_netmask:
1040       master_params = self.cfg.GetMasterNetworkParameters()
1041       feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1042       result = self.rpc.call_node_change_master_netmask(master_params.name,
1043                                                         master_params.netmask,
1044                                                         self.op.master_netmask,
1045                                                         master_params.ip,
1046                                                         master_params.netdev)
1047       if result.fail_msg:
1048         msg = "Could not change the master IP netmask: %s" % result.fail_msg
1049         feedback_fn(msg)
1050
1051       self.cluster.master_netmask = self.op.master_netmask
1052
1053     self.cfg.Update(self.cluster, feedback_fn)
1054
1055     if self.op.master_netdev:
1056       master_params = self.cfg.GetMasterNetworkParameters()
1057       feedback_fn("Starting the master ip on the new master netdev (%s)" %
1058                   self.op.master_netdev)
1059       ems = self.cfg.GetUseExternalMipScript()
1060       result = self.rpc.call_node_activate_master_ip(master_params.name,
1061                                                      master_params, ems)
1062       if result.fail_msg:
1063         self.LogWarning("Could not re-enable the master ip on"
1064                         " the master, please restart manually: %s",
1065                         result.fail_msg)
1066
1067
1068 class LUClusterVerify(NoHooksLU):
1069   """Submits all jobs necessary to verify the cluster.
1070
1071   """
1072   REQ_BGL = False
1073
1074   def ExpandNames(self):
1075     self.needed_locks = {}
1076
1077   def Exec(self, feedback_fn):
1078     jobs = []
1079
1080     if self.op.group_name:
1081       groups = [self.op.group_name]
1082       depends_fn = lambda: None
1083     else:
1084       groups = self.cfg.GetNodeGroupList()
1085
1086       # Verify global configuration
1087       jobs.append([
1088         opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1089         ])
1090
1091       # Always depend on global verification
1092       depends_fn = lambda: [(-len(jobs), [])]
1093
1094     jobs.extend(
1095       [opcodes.OpClusterVerifyGroup(group_name=group,
1096                                     ignore_errors=self.op.ignore_errors,
1097                                     depends=depends_fn())]
1098       for group in groups)
1099
1100     # Fix up all parameters
1101     for op in itertools.chain(*jobs): # pylint: disable=W0142
1102       op.debug_simulate_errors = self.op.debug_simulate_errors
1103       op.verbose = self.op.verbose
1104       op.error_codes = self.op.error_codes
1105       try:
1106         op.skip_checks = self.op.skip_checks
1107       except AttributeError:
1108         assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1109
1110     return ResultWithJobs(jobs)
1111
1112
1113 class _VerifyErrors(object):
1114   """Mix-in for cluster/group verify LUs.
1115
1116   It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1117   self.op and self._feedback_fn to be available.)
1118
1119   """
1120
1121   ETYPE_FIELD = "code"
1122   ETYPE_ERROR = "ERROR"
1123   ETYPE_WARNING = "WARNING"
1124
1125   def _Error(self, ecode, item, msg, *args, **kwargs):
1126     """Format an error message.
1127
1128     Based on the opcode's error_codes parameter, either format a
1129     parseable error code, or a simpler error string.
1130
1131     This must be called only from Exec and functions called from Exec.
1132
1133     """
1134     ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1135     itype, etxt, _ = ecode
1136     # If the error code is in the list of ignored errors, demote the error to a
1137     # warning
1138     if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1139       ltype = self.ETYPE_WARNING
1140     # first complete the msg
1141     if args:
1142       msg = msg % args
1143     # then format the whole message
1144     if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1145       msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1146     else:
1147       if item:
1148         item = " " + item
1149       else:
1150         item = ""
1151       msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1152     # and finally report it via the feedback_fn
1153     self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1154     # do not mark the operation as failed for WARN cases only
1155     if ltype == self.ETYPE_ERROR:
1156       self.bad = True
1157
1158   def _ErrorIf(self, cond, *args, **kwargs):
1159     """Log an error message if the passed condition is True.
1160
1161     """
1162     if (bool(cond)
1163         or self.op.debug_simulate_errors): # pylint: disable=E1101
1164       self._Error(*args, **kwargs)
1165
1166
1167 def _VerifyCertificate(filename):
1168   """Verifies a certificate for L{LUClusterVerifyConfig}.
1169
1170   @type filename: string
1171   @param filename: Path to PEM file
1172
1173   """
1174   try:
1175     cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1176                                            utils.ReadFile(filename))
1177   except Exception, err: # pylint: disable=W0703
1178     return (LUClusterVerifyConfig.ETYPE_ERROR,
1179             "Failed to load X509 certificate %s: %s" % (filename, err))
1180
1181   (errcode, msg) = \
1182     utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1183                                 constants.SSL_CERT_EXPIRATION_ERROR)
1184
1185   if msg:
1186     fnamemsg = "While verifying %s: %s" % (filename, msg)
1187   else:
1188     fnamemsg = None
1189
1190   if errcode is None:
1191     return (None, fnamemsg)
1192   elif errcode == utils.CERT_WARNING:
1193     return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1194   elif errcode == utils.CERT_ERROR:
1195     return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1196
1197   raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1198
1199
1200 def _GetAllHypervisorParameters(cluster, instances):
1201   """Compute the set of all hypervisor parameters.
1202
1203   @type cluster: L{objects.Cluster}
1204   @param cluster: the cluster object
1205   @param instances: list of L{objects.Instance}
1206   @param instances: additional instances from which to obtain parameters
1207   @rtype: list of (origin, hypervisor, parameters)
1208   @return: a list with all parameters found, indicating the hypervisor they
1209        apply to, and the origin (can be "cluster", "os X", or "instance Y")
1210
1211   """
1212   hvp_data = []
1213
1214   for hv_name in cluster.enabled_hypervisors:
1215     hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1216
1217   for os_name, os_hvp in cluster.os_hvp.items():
1218     for hv_name, hv_params in os_hvp.items():
1219       if hv_params:
1220         full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1221         hvp_data.append(("os %s" % os_name, hv_name, full_params))
1222
1223   # TODO: collapse identical parameter values in a single one
1224   for instance in instances:
1225     if instance.hvparams:
1226       hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1227                        cluster.FillHV(instance)))
1228
1229   return hvp_data
1230
1231
1232 class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1233   """Verifies the cluster config.
1234
1235   """
1236   REQ_BGL = False
1237
1238   def _VerifyHVP(self, hvp_data):
1239     """Verifies locally the syntax of the hypervisor parameters.
1240
1241     """
1242     for item, hv_name, hv_params in hvp_data:
1243       msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1244              (item, hv_name))
1245       try:
1246         hv_class = hypervisor.GetHypervisorClass(hv_name)
1247         utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1248         hv_class.CheckParameterSyntax(hv_params)
1249       except errors.GenericError, err:
1250         self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1251
1252   def ExpandNames(self):
1253     self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1254     self.share_locks = ShareAll()
1255
1256   def CheckPrereq(self):
1257     """Check prerequisites.
1258
1259     """
1260     # Retrieve all information
1261     self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1262     self.all_node_info = self.cfg.GetAllNodesInfo()
1263     self.all_inst_info = self.cfg.GetAllInstancesInfo()
1264
1265   def Exec(self, feedback_fn):
1266     """Verify integrity of cluster, performing various test on nodes.
1267
1268     """
1269     self.bad = False
1270     self._feedback_fn = feedback_fn
1271
1272     feedback_fn("* Verifying cluster config")
1273
1274     for msg in self.cfg.VerifyConfig():
1275       self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1276
1277     feedback_fn("* Verifying cluster certificate files")
1278
1279     for cert_filename in pathutils.ALL_CERT_FILES:
1280       (errcode, msg) = _VerifyCertificate(cert_filename)
1281       self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1282
1283     self._ErrorIf(not utils.CanRead(constants.CONFD_USER,
1284                                     pathutils.NODED_CERT_FILE),
1285                   constants.CV_ECLUSTERCERT,
1286                   None,
1287                   pathutils.NODED_CERT_FILE + " must be accessible by the " +
1288                     constants.CONFD_USER + " user")
1289
1290     feedback_fn("* Verifying hypervisor parameters")
1291
1292     self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1293                                                 self.all_inst_info.values()))
1294
1295     feedback_fn("* Verifying all nodes belong to an existing group")
1296
1297     # We do this verification here because, should this bogus circumstance
1298     # occur, it would never be caught by VerifyGroup, which only acts on
1299     # nodes/instances reachable from existing node groups.
1300
1301     dangling_nodes = set(node.name for node in self.all_node_info.values()
1302                          if node.group not in self.all_group_info)
1303
1304     dangling_instances = {}
1305     no_node_instances = []
1306
1307     for inst in self.all_inst_info.values():
1308       if inst.primary_node in dangling_nodes:
1309         dangling_instances.setdefault(inst.primary_node, []).append(inst.name)
1310       elif inst.primary_node not in self.all_node_info:
1311         no_node_instances.append(inst.name)
1312
1313     pretty_dangling = [
1314         "%s (%s)" %
1315         (node.name,
1316          utils.CommaJoin(dangling_instances.get(node.name,
1317                                                 ["no instances"])))
1318         for node in dangling_nodes]
1319
1320     self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1321                   None,
1322                   "the following nodes (and their instances) belong to a non"
1323                   " existing group: %s", utils.CommaJoin(pretty_dangling))
1324
1325     self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1326                   None,
1327                   "the following instances have a non-existing primary-node:"
1328                   " %s", utils.CommaJoin(no_node_instances))
1329
1330     return not self.bad
1331
1332
1333 class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1334   """Verifies the status of a node group.
1335
1336   """
1337   HPATH = "cluster-verify"
1338   HTYPE = constants.HTYPE_CLUSTER
1339   REQ_BGL = False
1340
1341   _HOOKS_INDENT_RE = re.compile("^", re.M)
1342
1343   class NodeImage(object):
1344     """A class representing the logical and physical status of a node.
1345
1346     @type name: string
1347     @ivar name: the node name to which this object refers
1348     @ivar volumes: a structure as returned from
1349         L{ganeti.backend.GetVolumeList} (runtime)
1350     @ivar instances: a list of running instances (runtime)
1351     @ivar pinst: list of configured primary instances (config)
1352     @ivar sinst: list of configured secondary instances (config)
1353     @ivar sbp: dictionary of {primary-node: list of instances} for all
1354         instances for which this node is secondary (config)
1355     @ivar mfree: free memory, as reported by hypervisor (runtime)
1356     @ivar dfree: free disk, as reported by the node (runtime)
1357     @ivar offline: the offline status (config)
1358     @type rpc_fail: boolean
1359     @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1360         not whether the individual keys were correct) (runtime)
1361     @type lvm_fail: boolean
1362     @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1363     @type hyp_fail: boolean
1364     @ivar hyp_fail: whether the RPC call didn't return the instance list
1365     @type ghost: boolean
1366     @ivar ghost: whether this is a known node or not (config)
1367     @type os_fail: boolean
1368     @ivar os_fail: whether the RPC call didn't return valid OS data
1369     @type oslist: list
1370     @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1371     @type vm_capable: boolean
1372     @ivar vm_capable: whether the node can host instances
1373     @type pv_min: float
1374     @ivar pv_min: size in MiB of the smallest PVs
1375     @type pv_max: float
1376     @ivar pv_max: size in MiB of the biggest PVs
1377
1378     """
1379     def __init__(self, offline=False, name=None, vm_capable=True):
1380       self.name = name
1381       self.volumes = {}
1382       self.instances = []
1383       self.pinst = []
1384       self.sinst = []
1385       self.sbp = {}
1386       self.mfree = 0
1387       self.dfree = 0
1388       self.offline = offline
1389       self.vm_capable = vm_capable
1390       self.rpc_fail = False
1391       self.lvm_fail = False
1392       self.hyp_fail = False
1393       self.ghost = False
1394       self.os_fail = False
1395       self.oslist = {}
1396       self.pv_min = None
1397       self.pv_max = None
1398
1399   def ExpandNames(self):
1400     # This raises errors.OpPrereqError on its own:
1401     self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1402
1403     # Get instances in node group; this is unsafe and needs verification later
1404     inst_names = \
1405       self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1406
1407     self.needed_locks = {
1408       locking.LEVEL_INSTANCE: inst_names,
1409       locking.LEVEL_NODEGROUP: [self.group_uuid],
1410       locking.LEVEL_NODE: [],
1411
1412       # This opcode is run by watcher every five minutes and acquires all nodes
1413       # for a group. It doesn't run for a long time, so it's better to acquire
1414       # the node allocation lock as well.
1415       locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1416       }
1417
1418     self.share_locks = ShareAll()
1419
1420   def DeclareLocks(self, level):
1421     if level == locking.LEVEL_NODE:
1422       # Get members of node group; this is unsafe and needs verification later
1423       nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1424
1425       all_inst_info = self.cfg.GetAllInstancesInfo()
1426
1427       # In Exec(), we warn about mirrored instances that have primary and
1428       # secondary living in separate node groups. To fully verify that
1429       # volumes for these instances are healthy, we will need to do an
1430       # extra call to their secondaries. We ensure here those nodes will
1431       # be locked.
1432       for inst in self.owned_locks(locking.LEVEL_INSTANCE):
1433         # Important: access only the instances whose lock is owned
1434         if all_inst_info[inst].disk_template in constants.DTS_INT_MIRROR:
1435           nodes.update(all_inst_info[inst].secondary_nodes)
1436
1437       self.needed_locks[locking.LEVEL_NODE] = nodes
1438
1439   def CheckPrereq(self):
1440     assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1441     self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1442
1443     group_nodes = set(self.group_info.members)
1444     group_instances = \
1445       self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1446
1447     unlocked_nodes = \
1448         group_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1449
1450     unlocked_instances = \
1451         group_instances.difference(self.owned_locks(locking.LEVEL_INSTANCE))
1452
1453     if unlocked_nodes:
1454       raise errors.OpPrereqError("Missing lock for nodes: %s" %
1455                                  utils.CommaJoin(unlocked_nodes),
1456                                  errors.ECODE_STATE)
1457
1458     if unlocked_instances:
1459       raise errors.OpPrereqError("Missing lock for instances: %s" %
1460                                  utils.CommaJoin(unlocked_instances),
1461                                  errors.ECODE_STATE)
1462
1463     self.all_node_info = self.cfg.GetAllNodesInfo()
1464     self.all_inst_info = self.cfg.GetAllInstancesInfo()
1465
1466     self.my_node_names = utils.NiceSort(group_nodes)
1467     self.my_inst_names = utils.NiceSort(group_instances)
1468
1469     self.my_node_info = dict((name, self.all_node_info[name])
1470                              for name in self.my_node_names)
1471
1472     self.my_inst_info = dict((name, self.all_inst_info[name])
1473                              for name in self.my_inst_names)
1474
1475     # We detect here the nodes that will need the extra RPC calls for verifying
1476     # split LV volumes; they should be locked.
1477     extra_lv_nodes = set()
1478
1479     for inst in self.my_inst_info.values():
1480       if inst.disk_template in constants.DTS_INT_MIRROR:
1481         for nname in inst.all_nodes:
1482           if self.all_node_info[nname].group != self.group_uuid:
1483             extra_lv_nodes.add(nname)
1484
1485     unlocked_lv_nodes = \
1486         extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1487
1488     if unlocked_lv_nodes:
1489       raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1490                                  utils.CommaJoin(unlocked_lv_nodes),
1491                                  errors.ECODE_STATE)
1492     self.extra_lv_nodes = list(extra_lv_nodes)
1493
1494   def _VerifyNode(self, ninfo, nresult):
1495     """Perform some basic validation on data returned from a node.
1496
1497       - check the result data structure is well formed and has all the
1498         mandatory fields
1499       - check ganeti version
1500
1501     @type ninfo: L{objects.Node}
1502     @param ninfo: the node to check
1503     @param nresult: the results from the node
1504     @rtype: boolean
1505     @return: whether overall this call was successful (and we can expect
1506          reasonable values in the respose)
1507
1508     """
1509     node = ninfo.name
1510     _ErrorIf = self._ErrorIf # pylint: disable=C0103
1511
1512     # main result, nresult should be a non-empty dict
1513     test = not nresult or not isinstance(nresult, dict)
1514     _ErrorIf(test, constants.CV_ENODERPC, node,
1515                   "unable to verify node: no data returned")
1516     if test:
1517       return False
1518
1519     # compares ganeti version
1520     local_version = constants.PROTOCOL_VERSION
1521     remote_version = nresult.get("version", None)
1522     test = not (remote_version and
1523                 isinstance(remote_version, (list, tuple)) and
1524                 len(remote_version) == 2)
1525     _ErrorIf(test, constants.CV_ENODERPC, node,
1526              "connection to node returned invalid data")
1527     if test:
1528       return False
1529
1530     test = local_version != remote_version[0]
1531     _ErrorIf(test, constants.CV_ENODEVERSION, node,
1532              "incompatible protocol versions: master %s,"
1533              " node %s", local_version, remote_version[0])
1534     if test:
1535       return False
1536
1537     # node seems compatible, we can actually try to look into its results
1538
1539     # full package version
1540     self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1541                   constants.CV_ENODEVERSION, node,
1542                   "software version mismatch: master %s, node %s",
1543                   constants.RELEASE_VERSION, remote_version[1],
1544                   code=self.ETYPE_WARNING)
1545
1546     hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1547     if ninfo.vm_capable and isinstance(hyp_result, dict):
1548       for hv_name, hv_result in hyp_result.iteritems():
1549         test = hv_result is not None
1550         _ErrorIf(test, constants.CV_ENODEHV, node,
1551                  "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1552
1553     hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1554     if ninfo.vm_capable and isinstance(hvp_result, list):
1555       for item, hv_name, hv_result in hvp_result:
1556         _ErrorIf(True, constants.CV_ENODEHV, node,
1557                  "hypervisor %s parameter verify failure (source %s): %s",
1558                  hv_name, item, hv_result)
1559
1560     test = nresult.get(constants.NV_NODESETUP,
1561                        ["Missing NODESETUP results"])
1562     _ErrorIf(test, constants.CV_ENODESETUP, node, "node setup error: %s",
1563              "; ".join(test))
1564
1565     return True
1566
1567   def _VerifyNodeTime(self, ninfo, nresult,
1568                       nvinfo_starttime, nvinfo_endtime):
1569     """Check the node time.
1570
1571     @type ninfo: L{objects.Node}
1572     @param ninfo: the node to check
1573     @param nresult: the remote results for the node
1574     @param nvinfo_starttime: the start time of the RPC call
1575     @param nvinfo_endtime: the end time of the RPC call
1576
1577     """
1578     node = ninfo.name
1579     _ErrorIf = self._ErrorIf # pylint: disable=C0103
1580
1581     ntime = nresult.get(constants.NV_TIME, None)
1582     try:
1583       ntime_merged = utils.MergeTime(ntime)
1584     except (ValueError, TypeError):
1585       _ErrorIf(True, constants.CV_ENODETIME, node, "Node returned invalid time")
1586       return
1587
1588     if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1589       ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1590     elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1591       ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1592     else:
1593       ntime_diff = None
1594
1595     _ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, node,
1596              "Node time diverges by at least %s from master node time",
1597              ntime_diff)
1598
1599   def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1600     """Check the node LVM results and update info for cross-node checks.
1601
1602     @type ninfo: L{objects.Node}
1603     @param ninfo: the node to check
1604     @param nresult: the remote results for the node
1605     @param vg_name: the configured VG name
1606     @type nimg: L{NodeImage}
1607     @param nimg: node image
1608
1609     """
1610     if vg_name is None:
1611       return
1612
1613     node = ninfo.name
1614     _ErrorIf = self._ErrorIf # pylint: disable=C0103
1615
1616     # checks vg existence and size > 20G
1617     vglist = nresult.get(constants.NV_VGLIST, None)
1618     test = not vglist
1619     _ErrorIf(test, constants.CV_ENODELVM, node, "unable to check volume groups")
1620     if not test:
1621       vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1622                                             constants.MIN_VG_SIZE)
1623       _ErrorIf(vgstatus, constants.CV_ENODELVM, node, vgstatus)
1624
1625     # Check PVs
1626     (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1627     for em in errmsgs:
1628       self._Error(constants.CV_ENODELVM, node, em)
1629     if pvminmax is not None:
1630       (nimg.pv_min, nimg.pv_max) = pvminmax
1631
1632   def _VerifyGroupLVM(self, node_image, vg_name):
1633     """Check cross-node consistency in LVM.
1634
1635     @type node_image: dict
1636     @param node_image: info about nodes, mapping from node to names to
1637       L{NodeImage} objects
1638     @param vg_name: the configured VG name
1639
1640     """
1641     if vg_name is None:
1642       return
1643
1644     # Only exlcusive storage needs this kind of checks
1645     if not self._exclusive_storage:
1646       return
1647
1648     # exclusive_storage wants all PVs to have the same size (approximately),
1649     # if the smallest and the biggest ones are okay, everything is fine.
1650     # pv_min is None iff pv_max is None
1651     vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1652     if not vals:
1653       return
1654     (pvmin, minnode) = min((ni.pv_min, ni.name) for ni in vals)
1655     (pvmax, maxnode) = max((ni.pv_max, ni.name) for ni in vals)
1656     bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1657     self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1658                   "PV sizes differ too much in the group; smallest (%s MB) is"
1659                   " on %s, biggest (%s MB) is on %s",
1660                   pvmin, minnode, pvmax, maxnode)
1661
1662   def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1663     """Check the node bridges.
1664
1665     @type ninfo: L{objects.Node}
1666     @param ninfo: the node to check
1667     @param nresult: the remote results for the node
1668     @param bridges: the expected list of bridges
1669
1670     """
1671     if not bridges:
1672       return
1673
1674     node = ninfo.name
1675     _ErrorIf = self._ErrorIf # pylint: disable=C0103
1676
1677     missing = nresult.get(constants.NV_BRIDGES, None)
1678     test = not isinstance(missing, list)
1679     _ErrorIf(test, constants.CV_ENODENET, node,
1680              "did not return valid bridge information")
1681     if not test:
1682       _ErrorIf(bool(missing), constants.CV_ENODENET, node,
1683                "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1684
1685   def _VerifyNodeUserScripts(self, ninfo, nresult):
1686     """Check the results of user scripts presence and executability on the node
1687
1688     @type ninfo: L{objects.Node}
1689     @param ninfo: the node to check
1690     @param nresult: the remote results for the node
1691
1692     """
1693     node = ninfo.name
1694
1695     test = not constants.NV_USERSCRIPTS in nresult
1696     self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, node,
1697                   "did not return user scripts information")
1698
1699     broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
1700     if not test:
1701       self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, node,
1702                     "user scripts not present or not executable: %s" %
1703                     utils.CommaJoin(sorted(broken_scripts)))
1704
1705   def _VerifyNodeNetwork(self, ninfo, nresult):
1706     """Check the node network connectivity results.
1707
1708     @type ninfo: L{objects.Node}
1709     @param ninfo: the node to check
1710     @param nresult: the remote results for the node
1711
1712     """
1713     node = ninfo.name
1714     _ErrorIf = self._ErrorIf # pylint: disable=C0103
1715
1716     test = constants.NV_NODELIST not in nresult
1717     _ErrorIf(test, constants.CV_ENODESSH, node,
1718              "node hasn't returned node ssh connectivity data")
1719     if not test:
1720       if nresult[constants.NV_NODELIST]:
1721         for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1722           _ErrorIf(True, constants.CV_ENODESSH, node,
1723                    "ssh communication with node '%s': %s", a_node, a_msg)
1724
1725     test = constants.NV_NODENETTEST not in nresult
1726     _ErrorIf(test, constants.CV_ENODENET, node,
1727              "node hasn't returned node tcp connectivity data")
1728     if not test:
1729       if nresult[constants.NV_NODENETTEST]:
1730         nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1731         for anode in nlist:
1732           _ErrorIf(True, constants.CV_ENODENET, node,
1733                    "tcp communication with node '%s': %s",
1734                    anode, nresult[constants.NV_NODENETTEST][anode])
1735
1736     test = constants.NV_MASTERIP not in nresult
1737     _ErrorIf(test, constants.CV_ENODENET, node,
1738              "node hasn't returned node master IP reachability data")
1739     if not test:
1740       if not nresult[constants.NV_MASTERIP]:
1741         if node == self.master_node:
1742           msg = "the master node cannot reach the master IP (not configured?)"
1743         else:
1744           msg = "cannot reach the master IP"
1745         _ErrorIf(True, constants.CV_ENODENET, node, msg)
1746
1747   def _VerifyInstance(self, instance, inst_config, node_image,
1748                       diskstatus):
1749     """Verify an instance.
1750
1751     This function checks to see if the required block devices are
1752     available on the instance's node, and that the nodes are in the correct
1753     state.
1754
1755     """
1756     _ErrorIf = self._ErrorIf # pylint: disable=C0103
1757     pnode = inst_config.primary_node
1758     pnode_img = node_image[pnode]
1759     groupinfo = self.cfg.GetAllNodeGroupsInfo()
1760
1761     node_vol_should = {}
1762     inst_config.MapLVsByNode(node_vol_should)
1763
1764     cluster = self.cfg.GetClusterInfo()
1765     ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
1766                                                             self.group_info)
1767     err = ComputeIPolicyInstanceViolation(ipolicy, inst_config, self.cfg)
1768     _ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance, utils.CommaJoin(err),
1769              code=self.ETYPE_WARNING)
1770
1771     for node in node_vol_should:
1772       n_img = node_image[node]
1773       if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1774         # ignore missing volumes on offline or broken nodes
1775         continue
1776       for volume in node_vol_should[node]:
1777         test = volume not in n_img.volumes
1778         _ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance,
1779                  "volume %s missing on node %s", volume, node)
1780
1781     if inst_config.admin_state == constants.ADMINST_UP:
1782       test = instance not in pnode_img.instances and not pnode_img.offline
1783       _ErrorIf(test, constants.CV_EINSTANCEDOWN, instance,
1784                "instance not running on its primary node %s",
1785                pnode)
1786       _ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE, instance,
1787                "instance is marked as running and lives on offline node %s",
1788                pnode)
1789
1790     diskdata = [(nname, success, status, idx)
1791                 for (nname, disks) in diskstatus.items()
1792                 for idx, (success, status) in enumerate(disks)]
1793
1794     for nname, success, bdev_status, idx in diskdata:
1795       # the 'ghost node' construction in Exec() ensures that we have a
1796       # node here
1797       snode = node_image[nname]
1798       bad_snode = snode.ghost or snode.offline
1799       _ErrorIf(inst_config.disks_active and
1800                not success and not bad_snode,
1801                constants.CV_EINSTANCEFAULTYDISK, instance,
1802                "couldn't retrieve status for disk/%s on %s: %s",
1803                idx, nname, bdev_status)
1804       _ErrorIf((inst_config.disks_active and
1805                 success and bdev_status.ldisk_status == constants.LDS_FAULTY),
1806                constants.CV_EINSTANCEFAULTYDISK, instance,
1807                "disk/%s on %s is faulty", idx, nname)
1808
1809     _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
1810              constants.CV_ENODERPC, pnode, "instance %s, connection to"
1811              " primary node failed", instance)
1812
1813     _ErrorIf(len(inst_config.secondary_nodes) > 1,
1814              constants.CV_EINSTANCELAYOUT,
1815              instance, "instance has multiple secondary nodes: %s",
1816              utils.CommaJoin(inst_config.secondary_nodes),
1817              code=self.ETYPE_WARNING)
1818
1819     if inst_config.disk_template not in constants.DTS_EXCL_STORAGE:
1820       # Disk template not compatible with exclusive_storage: no instance
1821       # node should have the flag set
1822       es_flags = rpc.GetExclusiveStorageForNodeNames(self.cfg,
1823                                                      inst_config.all_nodes)
1824       es_nodes = [n for (n, es) in es_flags.items()
1825                   if es]
1826       _ErrorIf(es_nodes, constants.CV_EINSTANCEUNSUITABLENODE, instance,
1827                "instance has template %s, which is not supported on nodes"
1828                " that have exclusive storage set: %s",
1829                inst_config.disk_template, utils.CommaJoin(es_nodes))
1830
1831     if inst_config.disk_template in constants.DTS_INT_MIRROR:
1832       instance_nodes = utils.NiceSort(inst_config.all_nodes)
1833       instance_groups = {}
1834
1835       for node in instance_nodes:
1836         instance_groups.setdefault(self.all_node_info[node].group,
1837                                    []).append(node)
1838
1839       pretty_list = [
1840         "%s (group %s)" % (utils.CommaJoin(nodes), groupinfo[group].name)
1841         # Sort so that we always list the primary node first.
1842         for group, nodes in sorted(instance_groups.items(),
1843                                    key=lambda (_, nodes): pnode in nodes,
1844                                    reverse=True)]
1845
1846       self._ErrorIf(len(instance_groups) > 1,
1847                     constants.CV_EINSTANCESPLITGROUPS,
1848                     instance, "instance has primary and secondary nodes in"
1849                     " different groups: %s", utils.CommaJoin(pretty_list),
1850                     code=self.ETYPE_WARNING)
1851
1852     inst_nodes_offline = []
1853     for snode in inst_config.secondary_nodes:
1854       s_img = node_image[snode]
1855       _ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
1856                snode, "instance %s, connection to secondary node failed",
1857                instance)
1858
1859       if s_img.offline:
1860         inst_nodes_offline.append(snode)
1861
1862     # warn that the instance lives on offline nodes
1863     _ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE, instance,
1864              "instance has offline secondary node(s) %s",
1865              utils.CommaJoin(inst_nodes_offline))
1866     # ... or ghost/non-vm_capable nodes
1867     for node in inst_config.all_nodes:
1868       _ErrorIf(node_image[node].ghost, constants.CV_EINSTANCEBADNODE,
1869                instance, "instance lives on ghost node %s", node)
1870       _ErrorIf(not node_image[node].vm_capable, constants.CV_EINSTANCEBADNODE,
1871                instance, "instance lives on non-vm_capable node %s", node)
1872
1873   def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
1874     """Verify if there are any unknown volumes in the cluster.
1875
1876     The .os, .swap and backup volumes are ignored. All other volumes are
1877     reported as unknown.
1878
1879     @type reserved: L{ganeti.utils.FieldSet}
1880     @param reserved: a FieldSet of reserved volume names
1881
1882     """
1883     for node, n_img in node_image.items():
1884       if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
1885           self.all_node_info[node].group != self.group_uuid):
1886         # skip non-healthy nodes
1887         continue
1888       for volume in n_img.volumes:
1889         test = ((node not in node_vol_should or
1890                 volume not in node_vol_should[node]) and
1891                 not reserved.Matches(volume))
1892         self._ErrorIf(test, constants.CV_ENODEORPHANLV, node,
1893                       "volume %s is unknown", volume)
1894
1895   def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
1896     """Verify N+1 Memory Resilience.
1897
1898     Check that if one single node dies we can still start all the
1899     instances it was primary for.
1900
1901     """
1902     cluster_info = self.cfg.GetClusterInfo()
1903     for node, n_img in node_image.items():
1904       # This code checks that every node which is now listed as
1905       # secondary has enough memory to host all instances it is
1906       # supposed to should a single other node in the cluster fail.
1907       # FIXME: not ready for failover to an arbitrary node
1908       # FIXME: does not support file-backed instances
1909       # WARNING: we currently take into account down instances as well
1910       # as up ones, considering that even if they're down someone
1911       # might want to start them even in the event of a node failure.
1912       if n_img.offline or self.all_node_info[node].group != self.group_uuid:
1913         # we're skipping nodes marked offline and nodes in other groups from
1914         # the N+1 warning, since most likely we don't have good memory
1915         # infromation from them; we already list instances living on such
1916         # nodes, and that's enough warning
1917         continue
1918       #TODO(dynmem): also consider ballooning out other instances
1919       for prinode, instances in n_img.sbp.items():
1920         needed_mem = 0
1921         for instance in instances:
1922           bep = cluster_info.FillBE(instance_cfg[instance])
1923           if bep[constants.BE_AUTO_BALANCE]:
1924             needed_mem += bep[constants.BE_MINMEM]
1925         test = n_img.mfree < needed_mem
1926         self._ErrorIf(test, constants.CV_ENODEN1, node,
1927                       "not enough memory to accomodate instance failovers"
1928                       " should node %s fail (%dMiB needed, %dMiB available)",
1929                       prinode, needed_mem, n_img.mfree)
1930
1931   @classmethod
1932   def _VerifyFiles(cls, errorif, nodeinfo, master_node, all_nvinfo,
1933                    (files_all, files_opt, files_mc, files_vm)):
1934     """Verifies file checksums collected from all nodes.
1935
1936     @param errorif: Callback for reporting errors
1937     @param nodeinfo: List of L{objects.Node} objects
1938     @param master_node: Name of master node
1939     @param all_nvinfo: RPC results
1940
1941     """
1942     # Define functions determining which nodes to consider for a file
1943     files2nodefn = [
1944       (files_all, None),
1945       (files_mc, lambda node: (node.master_candidate or
1946                                node.name == master_node)),
1947       (files_vm, lambda node: node.vm_capable),
1948       ]
1949
1950     # Build mapping from filename to list of nodes which should have the file
1951     nodefiles = {}
1952     for (files, fn) in files2nodefn:
1953       if fn is None:
1954         filenodes = nodeinfo
1955       else:
1956         filenodes = filter(fn, nodeinfo)
1957       nodefiles.update((filename,
1958                         frozenset(map(operator.attrgetter("name"), filenodes)))
1959                        for filename in files)
1960
1961     assert set(nodefiles) == (files_all | files_mc | files_vm)
1962
1963     fileinfo = dict((filename, {}) for filename in nodefiles)
1964     ignore_nodes = set()
1965
1966     for node in nodeinfo:
1967       if node.offline:
1968         ignore_nodes.add(node.name)
1969         continue
1970
1971       nresult = all_nvinfo[node.name]
1972
1973       if nresult.fail_msg or not nresult.payload:
1974         node_files = None
1975       else:
1976         fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
1977         node_files = dict((vcluster.LocalizeVirtualPath(key), value)
1978                           for (key, value) in fingerprints.items())
1979         del fingerprints
1980
1981       test = not (node_files and isinstance(node_files, dict))
1982       errorif(test, constants.CV_ENODEFILECHECK, node.name,
1983               "Node did not return file checksum data")
1984       if test:
1985         ignore_nodes.add(node.name)
1986         continue
1987
1988       # Build per-checksum mapping from filename to nodes having it
1989       for (filename, checksum) in node_files.items():
1990         assert filename in nodefiles
1991         fileinfo[filename].setdefault(checksum, set()).add(node.name)
1992
1993     for (filename, checksums) in fileinfo.items():
1994       assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
1995
1996       # Nodes having the file
1997       with_file = frozenset(node_name
1998                             for nodes in fileinfo[filename].values()
1999                             for node_name in nodes) - ignore_nodes
2000
2001       expected_nodes = nodefiles[filename] - ignore_nodes
2002
2003       # Nodes missing file
2004       missing_file = expected_nodes - with_file
2005
2006       if filename in files_opt:
2007         # All or no nodes
2008         errorif(missing_file and missing_file != expected_nodes,
2009                 constants.CV_ECLUSTERFILECHECK, None,
2010                 "File %s is optional, but it must exist on all or no"
2011                 " nodes (not found on %s)",
2012                 filename, utils.CommaJoin(utils.NiceSort(missing_file)))
2013       else:
2014         errorif(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2015                 "File %s is missing from node(s) %s", filename,
2016                 utils.CommaJoin(utils.NiceSort(missing_file)))
2017
2018         # Warn if a node has a file it shouldn't
2019         unexpected = with_file - expected_nodes
2020         errorif(unexpected,
2021                 constants.CV_ECLUSTERFILECHECK, None,
2022                 "File %s should not exist on node(s) %s",
2023                 filename, utils.CommaJoin(utils.NiceSort(unexpected)))
2024
2025       # See if there are multiple versions of the file
2026       test = len(checksums) > 1
2027       if test:
2028         variants = ["variant %s on %s" %
2029                     (idx + 1, utils.CommaJoin(utils.NiceSort(nodes)))
2030                     for (idx, (checksum, nodes)) in
2031                       enumerate(sorted(checksums.items()))]
2032       else:
2033         variants = []
2034
2035       errorif(test, constants.CV_ECLUSTERFILECHECK, None,
2036               "File %s found with %s different checksums (%s)",
2037               filename, len(checksums), "; ".join(variants))
2038
2039   def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2040                       drbd_map):
2041     """Verifies and the node DRBD status.
2042
2043     @type ninfo: L{objects.Node}
2044     @param ninfo: the node to check
2045     @param nresult: the remote results for the node
2046     @param instanceinfo: the dict of instances
2047     @param drbd_helper: the configured DRBD usermode helper
2048     @param drbd_map: the DRBD map as returned by
2049         L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2050
2051     """
2052     node = ninfo.name
2053     _ErrorIf = self._ErrorIf # pylint: disable=C0103
2054
2055     if drbd_helper:
2056       helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2057       test = (helper_result is None)
2058       _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2059                "no drbd usermode helper returned")
2060       if helper_result:
2061         status, payload = helper_result
2062         test = not status
2063         _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2064                  "drbd usermode helper check unsuccessful: %s", payload)
2065         test = status and (payload != drbd_helper)
2066         _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2067                  "wrong drbd usermode helper: %s", payload)
2068
2069     # compute the DRBD minors
2070     node_drbd = {}
2071     for minor, instance in drbd_map[node].items():
2072       test = instance not in instanceinfo
2073       _ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2074                "ghost instance '%s' in temporary DRBD map", instance)
2075         # ghost instance should not be running, but otherwise we
2076         # don't give double warnings (both ghost instance and
2077         # unallocated minor in use)
2078       if test:
2079         node_drbd[minor] = (instance, False)
2080       else:
2081         instance = instanceinfo[instance]
2082         node_drbd[minor] = (instance.name, instance.disks_active)
2083
2084     # and now check them
2085     used_minors = nresult.get(constants.NV_DRBDLIST, [])
2086     test = not isinstance(used_minors, (tuple, list))
2087     _ErrorIf(test, constants.CV_ENODEDRBD, node,
2088              "cannot parse drbd status file: %s", str(used_minors))
2089     if test:
2090       # we cannot check drbd status
2091       return
2092
2093     for minor, (iname, must_exist) in node_drbd.items():
2094       test = minor not in used_minors and must_exist
2095       _ErrorIf(test, constants.CV_ENODEDRBD, node,
2096                "drbd minor %d of instance %s is not active", minor, iname)
2097     for minor in used_minors:
2098       test = minor not in node_drbd
2099       _ErrorIf(test, constants.CV_ENODEDRBD, node,
2100                "unallocated drbd minor %d is in use", minor)
2101
2102   def _UpdateNodeOS(self, ninfo, nresult, nimg):
2103     """Builds the node OS structures.
2104
2105     @type ninfo: L{objects.Node}
2106     @param ninfo: the node to check
2107     @param nresult: the remote results for the node
2108     @param nimg: the node image object
2109
2110     """
2111     node = ninfo.name
2112     _ErrorIf = self._ErrorIf # pylint: disable=C0103
2113
2114     remote_os = nresult.get(constants.NV_OSLIST, None)
2115     test = (not isinstance(remote_os, list) or
2116             not compat.all(isinstance(v, list) and len(v) == 7
2117                            for v in remote_os))
2118
2119     _ErrorIf(test, constants.CV_ENODEOS, node,
2120              "node hasn't returned valid OS data")
2121
2122     nimg.os_fail = test
2123
2124     if test:
2125       return
2126
2127     os_dict = {}
2128
2129     for (name, os_path, status, diagnose,
2130          variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2131
2132       if name not in os_dict:
2133         os_dict[name] = []
2134
2135       # parameters is a list of lists instead of list of tuples due to
2136       # JSON lacking a real tuple type, fix it:
2137       parameters = [tuple(v) for v in parameters]
2138       os_dict[name].append((os_path, status, diagnose,
2139                             set(variants), set(parameters), set(api_ver)))
2140
2141     nimg.oslist = os_dict
2142
2143   def _VerifyNodeOS(self, ninfo, nimg, base):
2144     """Verifies the node OS list.
2145
2146     @type ninfo: L{objects.Node}
2147     @param ninfo: the node to check
2148     @param nimg: the node image object
2149     @param base: the 'template' node we match against (e.g. from the master)
2150
2151     """
2152     node = ninfo.name
2153     _ErrorIf = self._ErrorIf # pylint: disable=C0103
2154
2155     assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2156
2157     beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2158     for os_name, os_data in nimg.oslist.items():
2159       assert os_data, "Empty OS status for OS %s?!" % os_name
2160       f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2161       _ErrorIf(not f_status, constants.CV_ENODEOS, node,
2162                "Invalid OS %s (located at %s): %s", os_name, f_path, f_diag)
2163       _ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, node,
2164                "OS '%s' has multiple entries (first one shadows the rest): %s",
2165                os_name, utils.CommaJoin([v[0] for v in os_data]))
2166       # comparisons with the 'base' image
2167       test = os_name not in base.oslist
2168       _ErrorIf(test, constants.CV_ENODEOS, node,
2169                "Extra OS %s not present on reference node (%s)",
2170                os_name, base.name)
2171       if test:
2172         continue
2173       assert base.oslist[os_name], "Base node has empty OS status?"
2174       _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2175       if not b_status:
2176         # base OS is invalid, skipping
2177         continue
2178       for kind, a, b in [("API version", f_api, b_api),
2179                          ("variants list", f_var, b_var),
2180                          ("parameters", beautify_params(f_param),
2181                           beautify_params(b_param))]:
2182         _ErrorIf(a != b, constants.CV_ENODEOS, node,
2183                  "OS %s for %s differs from reference node %s: [%s] vs. [%s]",
2184                  kind, os_name, base.name,
2185                  utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2186
2187     # check any missing OSes
2188     missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2189     _ErrorIf(missing, constants.CV_ENODEOS, node,
2190              "OSes present on reference node %s but missing on this node: %s",
2191              base.name, utils.CommaJoin(missing))
2192
2193   def _VerifyFileStoragePaths(self, ninfo, nresult, is_master):
2194     """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2195
2196     @type ninfo: L{objects.Node}
2197     @param ninfo: the node to check
2198     @param nresult: the remote results for the node
2199     @type is_master: bool
2200     @param is_master: Whether node is the master node
2201
2202     """
2203     node = ninfo.name
2204
2205     if (is_master and
2206         (constants.ENABLE_FILE_STORAGE or
2207          constants.ENABLE_SHARED_FILE_STORAGE)):
2208       try:
2209         fspaths = nresult[constants.NV_FILE_STORAGE_PATHS]
2210       except KeyError:
2211         # This should never happen
2212         self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, node,
2213                       "Node did not return forbidden file storage paths")
2214       else:
2215         self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, node,
2216                       "Found forbidden file storage paths: %s",
2217                       utils.CommaJoin(fspaths))
2218     else:
2219       self._ErrorIf(constants.NV_FILE_STORAGE_PATHS in nresult,
2220                     constants.CV_ENODEFILESTORAGEPATHS, node,
2221                     "Node should not have returned forbidden file storage"
2222                     " paths")
2223
2224   def _VerifyOob(self, ninfo, nresult):
2225     """Verifies out of band functionality of a node.
2226
2227     @type ninfo: L{objects.Node}
2228     @param ninfo: the node to check
2229     @param nresult: the remote results for the node
2230
2231     """
2232     node = ninfo.name
2233     # We just have to verify the paths on master and/or master candidates
2234     # as the oob helper is invoked on the master
2235     if ((ninfo.master_candidate or ninfo.master_capable) and
2236         constants.NV_OOB_PATHS in nresult):
2237       for path_result in nresult[constants.NV_OOB_PATHS]:
2238         self._ErrorIf(path_result, constants.CV_ENODEOOBPATH, node, path_result)
2239
2240   def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2241     """Verifies and updates the node volume data.
2242
2243     This function will update a L{NodeImage}'s internal structures
2244     with data from the remote call.
2245
2246     @type ninfo: L{objects.Node}
2247     @param ninfo: the node to check
2248     @param nresult: the remote results for the node
2249     @param nimg: the node image object
2250     @param vg_name: the configured VG name
2251
2252     """
2253     node = ninfo.name
2254     _ErrorIf = self._ErrorIf # pylint: disable=C0103
2255
2256     nimg.lvm_fail = True
2257     lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2258     if vg_name is None:
2259       pass
2260     elif isinstance(lvdata, basestring):
2261       _ErrorIf(True, constants.CV_ENODELVM, node, "LVM problem on node: %s",
2262                utils.SafeEncode(lvdata))
2263     elif not isinstance(lvdata, dict):
2264       _ErrorIf(True, constants.CV_ENODELVM, node,
2265                "rpc call to node failed (lvlist)")
2266     else:
2267       nimg.volumes = lvdata
2268       nimg.lvm_fail = False
2269
2270   def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2271     """Verifies and updates the node instance list.
2272
2273     If the listing was successful, then updates this node's instance
2274     list. Otherwise, it marks the RPC call as failed for the instance
2275     list key.
2276
2277     @type ninfo: L{objects.Node}
2278     @param ninfo: the node to check
2279     @param nresult: the remote results for the node
2280     @param nimg: the node image object
2281
2282     """
2283     idata = nresult.get(constants.NV_INSTANCELIST, None)
2284     test = not isinstance(idata, list)
2285     self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2286                   "rpc call to node failed (instancelist): %s",
2287                   utils.SafeEncode(str(idata)))
2288     if test:
2289       nimg.hyp_fail = True
2290     else:
2291       nimg.instances = idata
2292
2293   def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2294     """Verifies and computes a node information map
2295
2296     @type ninfo: L{objects.Node}
2297     @param ninfo: the node to check
2298     @param nresult: the remote results for the node
2299     @param nimg: the node image object
2300     @param vg_name: the configured VG name
2301
2302     """
2303     node = ninfo.name
2304     _ErrorIf = self._ErrorIf # pylint: disable=C0103
2305
2306     # try to read free memory (from the hypervisor)
2307     hv_info = nresult.get(constants.NV_HVINFO, None)
2308     test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2309     _ErrorIf(test, constants.CV_ENODEHV, node,
2310              "rpc call to node failed (hvinfo)")
2311     if not test:
2312       try:
2313         nimg.mfree = int(hv_info["memory_free"])
2314       except (ValueError, TypeError):
2315         _ErrorIf(True, constants.CV_ENODERPC, node,
2316                  "node returned invalid nodeinfo, check hypervisor")
2317
2318     # FIXME: devise a free space model for file based instances as well
2319     if vg_name is not None:
2320       test = (constants.NV_VGLIST not in nresult or
2321               vg_name not in nresult[constants.NV_VGLIST])
2322       _ErrorIf(test, constants.CV_ENODELVM, node,
2323                "node didn't return data for the volume group '%s'"
2324                " - it is either missing or broken", vg_name)
2325       if not test:
2326         try:
2327           nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2328         except (ValueError, TypeError):
2329           _ErrorIf(True, constants.CV_ENODERPC, node,
2330                    "node returned invalid LVM info, check LVM status")
2331
2332   def _CollectDiskInfo(self, nodelist, node_image, instanceinfo):
2333     """Gets per-disk status information for all instances.
2334
2335     @type nodelist: list of strings
2336     @param nodelist: Node names
2337     @type node_image: dict of (name, L{objects.Node})
2338     @param node_image: Node objects
2339     @type instanceinfo: dict of (name, L{objects.Instance})
2340     @param instanceinfo: Instance objects
2341     @rtype: {instance: {node: [(succes, payload)]}}
2342     @return: a dictionary of per-instance dictionaries with nodes as
2343         keys and disk information as values; the disk information is a
2344         list of tuples (success, payload)
2345
2346     """
2347     _ErrorIf = self._ErrorIf # pylint: disable=C0103
2348
2349     node_disks = {}
2350     node_disks_devonly = {}
2351     diskless_instances = set()
2352     diskless = constants.DT_DISKLESS
2353
2354     for nname in nodelist:
2355       node_instances = list(itertools.chain(node_image[nname].pinst,
2356                                             node_image[nname].sinst))
2357       diskless_instances.update(inst for inst in node_instances
2358                                 if instanceinfo[inst].disk_template == diskless)
2359       disks = [(inst, disk)
2360                for inst in node_instances
2361                for disk in instanceinfo[inst].disks]
2362
2363       if not disks:
2364         # No need to collect data
2365         continue
2366
2367       node_disks[nname] = disks
2368
2369       # _AnnotateDiskParams makes already copies of the disks
2370       devonly = []
2371       for (inst, dev) in disks:
2372         (anno_disk,) = AnnotateDiskParams(instanceinfo[inst], [dev], self.cfg)
2373         self.cfg.SetDiskID(anno_disk, nname)
2374         devonly.append(anno_disk)
2375
2376       node_disks_devonly[nname] = devonly
2377
2378     assert len(node_disks) == len(node_disks_devonly)
2379
2380     # Collect data from all nodes with disks
2381     result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2382                                                           node_disks_devonly)
2383
2384     assert len(result) == len(node_disks)
2385
2386     instdisk = {}
2387
2388     for (nname, nres) in result.items():
2389       disks = node_disks[nname]
2390
2391       if nres.offline:
2392         # No data from this node
2393         data = len(disks) * [(False, "node offline")]
2394       else:
2395         msg = nres.fail_msg
2396         _ErrorIf(msg, constants.CV_ENODERPC, nname,
2397                  "while getting disk information: %s", msg)
2398         if msg:
2399           # No data from this node
2400           data = len(disks) * [(False, msg)]
2401         else:
2402           data = []
2403           for idx, i in enumerate(nres.payload):
2404             if isinstance(i, (tuple, list)) and len(i) == 2:
2405               data.append(i)
2406             else:
2407               logging.warning("Invalid result from node %s, entry %d: %s",
2408                               nname, idx, i)
2409               data.append((False, "Invalid result from the remote node"))
2410
2411       for ((inst, _), status) in zip(disks, data):
2412         instdisk.setdefault(inst, {}).setdefault(nname, []).append(status)
2413
2414     # Add empty entries for diskless instances.
2415     for inst in diskless_instances:
2416       assert inst not in instdisk
2417       instdisk[inst] = {}
2418
2419     assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2420                       len(nnames) <= len(instanceinfo[inst].all_nodes) and
2421                       compat.all(isinstance(s, (tuple, list)) and
2422                                  len(s) == 2 for s in statuses)
2423                       for inst, nnames in instdisk.items()
2424                       for nname, statuses in nnames.items())
2425     if __debug__:
2426       instdisk_keys = set(instdisk)
2427       instanceinfo_keys = set(instanceinfo)
2428       assert instdisk_keys == instanceinfo_keys, \
2429         ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2430          (instdisk_keys, instanceinfo_keys))
2431
2432     return instdisk
2433
2434   @staticmethod
2435   def _SshNodeSelector(group_uuid, all_nodes):
2436     """Create endless iterators for all potential SSH check hosts.
2437
2438     """
2439     nodes = [node for node in all_nodes
2440              if (node.group != group_uuid and
2441                  not node.offline)]
2442     keyfunc = operator.attrgetter("group")
2443
2444     return map(itertools.cycle,
2445                [sorted(map(operator.attrgetter("name"), names))
2446                 for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2447                                                   keyfunc)])
2448
2449   @classmethod
2450   def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2451     """Choose which nodes should talk to which other nodes.
2452
2453     We will make nodes contact all nodes in their group, and one node from
2454     every other group.
2455
2456     @warning: This algorithm has a known issue if one node group is much
2457       smaller than others (e.g. just one node). In such a case all other
2458       nodes will talk to the single node.
2459
2460     """
2461     online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2462     sel = cls._SshNodeSelector(group_uuid, all_nodes)
2463
2464     return (online_nodes,
2465             dict((name, sorted([i.next() for i in sel]))
2466                  for name in online_nodes))
2467
2468   def BuildHooksEnv(self):
2469     """Build hooks env.
2470
2471     Cluster-Verify hooks just ran in the post phase and their failure makes
2472     the output be logged in the verify output and the verification to fail.
2473
2474     """
2475     env = {
2476       "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2477       }
2478
2479     env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2480                for node in self.my_node_info.values())
2481
2482     return env
2483
2484   def BuildHooksNodes(self):
2485     """Build hooks nodes.
2486
2487     """
2488     return ([], self.my_node_names)
2489
2490   def Exec(self, feedback_fn):
2491     """Verify integrity of the node group, performing various test on nodes.
2492
2493     """
2494     # This method has too many local variables. pylint: disable=R0914
2495     feedback_fn("* Verifying group '%s'" % self.group_info.name)
2496
2497     if not self.my_node_names:
2498       # empty node group
2499       feedback_fn("* Empty node group, skipping verification")
2500       return True
2501
2502     self.bad = False
2503     _ErrorIf = self._ErrorIf # pylint: disable=C0103
2504     verbose = self.op.verbose
2505     self._feedback_fn = feedback_fn
2506
2507     vg_name = self.cfg.GetVGName()
2508     drbd_helper = self.cfg.GetDRBDHelper()
2509     cluster = self.cfg.GetClusterInfo()
2510     hypervisors = cluster.enabled_hypervisors
2511     node_data_list = [self.my_node_info[name] for name in self.my_node_names]
2512
2513     i_non_redundant = [] # Non redundant instances
2514     i_non_a_balanced = [] # Non auto-balanced instances
2515     i_offline = 0 # Count of offline instances
2516     n_offline = 0 # Count of offline nodes
2517     n_drained = 0 # Count of nodes being drained
2518     node_vol_should = {}
2519
2520     # FIXME: verify OS list
2521
2522     # File verification
2523     filemap = ComputeAncillaryFiles(cluster, False)
2524
2525     # do local checksums
2526     master_node = self.master_node = self.cfg.GetMasterNode()
2527     master_ip = self.cfg.GetMasterIP()
2528
2529     feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_names))
2530
2531     user_scripts = []
2532     if self.cfg.GetUseExternalMipScript():
2533       user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2534
2535     node_verify_param = {
2536       constants.NV_FILELIST:
2537         map(vcluster.MakeVirtualPath,
2538             utils.UniqueSequence(filename
2539                                  for files in filemap
2540                                  for filename in files)),
2541       constants.NV_NODELIST:
2542         self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2543                                   self.all_node_info.values()),
2544       constants.NV_HYPERVISOR: hypervisors,
2545       constants.NV_HVPARAMS:
2546         _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2547       constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2548                                  for node in node_data_list
2549                                  if not node.offline],
2550       constants.NV_INSTANCELIST: hypervisors,
2551       constants.NV_VERSION: None,
2552       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2553       constants.NV_NODESETUP: None,
2554       constants.NV_TIME: None,
2555       constants.NV_MASTERIP: (master_node, master_ip),
2556       constants.NV_OSLIST: None,
2557       constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2558       constants.NV_USERSCRIPTS: user_scripts,
2559       }
2560
2561     if vg_name is not None:
2562       node_verify_param[constants.NV_VGLIST] = None
2563       node_verify_param[constants.NV_LVLIST] = vg_name
2564       node_verify_param[constants.NV_PVLIST] = [vg_name]
2565
2566     if drbd_helper:
2567       node_verify_param[constants.NV_DRBDLIST] = None
2568       node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2569
2570     if constants.ENABLE_FILE_STORAGE or constants.ENABLE_SHARED_FILE_STORAGE:
2571       # Load file storage paths only from master node
2572       node_verify_param[constants.NV_FILE_STORAGE_PATHS] = master_node
2573
2574     # bridge checks
2575     # FIXME: this needs to be changed per node-group, not cluster-wide
2576     bridges = set()
2577     default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2578     if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2579       bridges.add(default_nicpp[constants.NIC_LINK])
2580     for instance in self.my_inst_info.values():
2581       for nic in instance.nics:
2582         full_nic = cluster.SimpleFillNIC(nic.nicparams)
2583         if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2584           bridges.add(full_nic[constants.NIC_LINK])
2585
2586     if bridges:
2587       node_verify_param[constants.NV_BRIDGES] = list(bridges)
2588
2589     # Build our expected cluster state
2590     node_image = dict((node.name, self.NodeImage(offline=node.offline,
2591                                                  name=node.name,
2592                                                  vm_capable=node.vm_capable))
2593                       for node in node_data_list)
2594
2595     # Gather OOB paths
2596     oob_paths = []
2597     for node in self.all_node_info.values():
2598       path = SupportsOob(self.cfg, node)
2599       if path and path not in oob_paths:
2600         oob_paths.append(path)
2601
2602     if oob_paths:
2603       node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2604
2605     for instance in self.my_inst_names:
2606       inst_config = self.my_inst_info[instance]
2607       if inst_config.admin_state == constants.ADMINST_OFFLINE:
2608         i_offline += 1
2609
2610       for nname in inst_config.all_nodes:
2611         if nname not in node_image:
2612           gnode = self.NodeImage(name=nname)
2613           gnode.ghost = (nname not in self.all_node_info)
2614           node_image[nname] = gnode
2615
2616       inst_config.MapLVsByNode(node_vol_should)
2617
2618       pnode = inst_config.primary_node
2619       node_image[pnode].pinst.append(instance)
2620
2621       for snode in inst_config.secondary_nodes:
2622         nimg = node_image[snode]
2623         nimg.sinst.append(instance)
2624         if pnode not in nimg.sbp:
2625           nimg.sbp[pnode] = []
2626         nimg.sbp[pnode].append(instance)
2627
2628     es_flags = rpc.GetExclusiveStorageForNodeNames(self.cfg, self.my_node_names)
2629     # The value of exclusive_storage should be the same across the group, so if
2630     # it's True for at least a node, we act as if it were set for all the nodes
2631     self._exclusive_storage = compat.any(es_flags.values())
2632     if self._exclusive_storage:
2633       node_verify_param[constants.NV_EXCLUSIVEPVS] = True
2634
2635     # At this point, we have the in-memory data structures complete,
2636     # except for the runtime information, which we'll gather next
2637
2638     # Due to the way our RPC system works, exact response times cannot be
2639     # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2640     # time before and after executing the request, we can at least have a time
2641     # window.
2642     nvinfo_starttime = time.time()
2643     all_nvinfo = self.rpc.call_node_verify(self.my_node_names,
2644                                            node_verify_param,
2645                                            self.cfg.GetClusterName())
2646     nvinfo_endtime = time.time()
2647
2648     if self.extra_lv_nodes and vg_name is not None:
2649       extra_lv_nvinfo = \
2650           self.rpc.call_node_verify(self.extra_lv_nodes,
2651                                     {constants.NV_LVLIST: vg_name},
2652                                     self.cfg.GetClusterName())
2653     else:
2654       extra_lv_nvinfo = {}
2655
2656     all_drbd_map = self.cfg.ComputeDRBDMap()
2657
2658     feedback_fn("* Gathering disk information (%s nodes)" %
2659                 len(self.my_node_names))
2660     instdisk = self._CollectDiskInfo(self.my_node_names, node_image,
2661                                      self.my_inst_info)
2662
2663     feedback_fn("* Verifying configuration file consistency")
2664
2665     # If not all nodes are being checked, we need to make sure the master node
2666     # and a non-checked vm_capable node are in the list.
2667     absent_nodes = set(self.all_node_info).difference(self.my_node_info)
2668     if absent_nodes:
2669       vf_nvinfo = all_nvinfo.copy()
2670       vf_node_info = list(self.my_node_info.values())
2671       additional_nodes = []
2672       if master_node not in self.my_node_info:
2673         additional_nodes.append(master_node)
2674         vf_node_info.append(self.all_node_info[master_node])
2675       # Add the first vm_capable node we find which is not included,
2676       # excluding the master node (which we already have)
2677       for node in absent_nodes:
2678         nodeinfo = self.all_node_info[node]
2679         if (nodeinfo.vm_capable and not nodeinfo.offline and
2680             node != master_node):
2681           additional_nodes.append(node)
2682           vf_node_info.append(self.all_node_info[node])
2683           break
2684       key = constants.NV_FILELIST
2685       vf_nvinfo.update(self.rpc.call_node_verify(additional_nodes,
2686                                                  {key: node_verify_param[key]},
2687                                                  self.cfg.GetClusterName()))
2688     else:
2689       vf_nvinfo = all_nvinfo
2690       vf_node_info = self.my_node_info.values()
2691
2692     self._VerifyFiles(_ErrorIf, vf_node_info, master_node, vf_nvinfo, filemap)
2693
2694     feedback_fn("* Verifying node status")
2695
2696     refos_img = None
2697
2698     for node_i in node_data_list:
2699       node = node_i.name
2700       nimg = node_image[node]
2701
2702       if node_i.offline:
2703         if verbose:
2704           feedback_fn("* Skipping offline node %s" % (node,))
2705         n_offline += 1
2706         continue
2707
2708       if node == master_node:
2709         ntype = "master"
2710       elif node_i.master_candidate:
2711         ntype = "master candidate"
2712       elif node_i.drained:
2713         ntype = "drained"
2714         n_drained += 1
2715       else:
2716         ntype = "regular"
2717       if verbose:
2718         feedback_fn("* Verifying node %s (%s)" % (node, ntype))
2719
2720       msg = all_nvinfo[node].fail_msg
2721       _ErrorIf(msg, constants.CV_ENODERPC, node, "while contacting node: %s",
2722                msg)
2723       if msg:
2724         nimg.rpc_fail = True
2725         continue
2726
2727       nresult = all_nvinfo[node].payload
2728
2729       nimg.call_ok = self._VerifyNode(node_i, nresult)
2730       self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2731       self._VerifyNodeNetwork(node_i, nresult)
2732       self._VerifyNodeUserScripts(node_i, nresult)
2733       self._VerifyOob(node_i, nresult)
2734       self._VerifyFileStoragePaths(node_i, nresult,
2735                                    node == master_node)
2736
2737       if nimg.vm_capable:
2738         self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
2739         self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
2740                              all_drbd_map)
2741
2742         self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2743         self._UpdateNodeInstances(node_i, nresult, nimg)
2744         self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2745         self._UpdateNodeOS(node_i, nresult, nimg)
2746
2747         if not nimg.os_fail:
2748           if refos_img is None:
2749             refos_img = nimg
2750           self._VerifyNodeOS(node_i, nimg, refos_img)
2751         self._VerifyNodeBridges(node_i, nresult, bridges)
2752
2753         # Check whether all running instancies are primary for the node. (This
2754         # can no longer be done from _VerifyInstance below, since some of the
2755         # wrong instances could be from other node groups.)
2756         non_primary_inst = set(nimg.instances).difference(nimg.pinst)
2757
2758         for inst in non_primary_inst:
2759           test = inst in self.all_inst_info
2760           _ErrorIf(test, constants.CV_EINSTANCEWRONGNODE, inst,
2761                    "instance should not run on node %s", node_i.name)
2762           _ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
2763                    "node is running unknown instance %s", inst)
2764
2765     self._VerifyGroupLVM(node_image, vg_name)
2766
2767     for node, result in extra_lv_nvinfo.items():
2768       self._UpdateNodeVolumes(self.all_node_info[node], result.payload,
2769                               node_image[node], vg_name)
2770
2771     feedback_fn("* Verifying instance status")
2772     for instance in self.my_inst_names:
2773       if verbose:
2774         feedback_fn("* Verifying instance %s" % instance)
2775       inst_config = self.my_inst_info[instance]
2776       self._VerifyInstance(instance, inst_config, node_image,
2777                            instdisk[instance])
2778
2779       # If the instance is non-redundant we cannot survive losing its primary
2780       # node, so we are not N+1 compliant.
2781       if inst_config.disk_template not in constants.DTS_MIRRORED:
2782         i_non_redundant.append(instance)
2783
2784       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
2785         i_non_a_balanced.append(instance)
2786
2787     feedback_fn("* Verifying orphan volumes")
2788     reserved = utils.FieldSet(*cluster.reserved_lvs)
2789
2790     # We will get spurious "unknown volume" warnings if any node of this group
2791     # is secondary for an instance whose primary is in another group. To avoid
2792     # them, we find these instances and add their volumes to node_vol_should.
2793     for inst in self.all_inst_info.values():
2794       for secondary in inst.secondary_nodes:
2795         if (secondary in self.my_node_info
2796             and inst.name not in self.my_inst_info):
2797           inst.MapLVsByNode(node_vol_should)
2798           break
2799
2800     self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
2801
2802     if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
2803       feedback_fn("* Verifying N+1 Memory redundancy")
2804       self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
2805
2806     feedback_fn("* Other Notes")
2807     if i_non_redundant:
2808       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
2809                   % len(i_non_redundant))
2810
2811     if i_non_a_balanced:
2812       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
2813                   % len(i_non_a_balanced))
2814
2815     if i_offline:
2816       feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
2817
2818     if n_offline:
2819       feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
2820
2821     if n_drained:
2822       feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
2823
2824     return not self.bad
2825
2826   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
2827     """Analyze the post-hooks' result
2828
2829     This method analyses the hook result, handles it, and sends some
2830     nicely-formatted feedback back to the user.
2831
2832     @param phase: one of L{constants.HOOKS_PHASE_POST} or
2833         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
2834     @param hooks_results: the results of the multi-node hooks rpc call
2835     @param feedback_fn: function used send feedback back to the caller
2836     @param lu_result: previous Exec result
2837     @return: the new Exec result, based on the previous result
2838         and hook results
2839
2840     """
2841     # We only really run POST phase hooks, only for non-empty groups,
2842     # and are only interested in their results
2843     if not self.my_node_names:
2844       # empty node group
2845       pass
2846     elif phase == constants.HOOKS_PHASE_POST:
2847       # Used to change hooks' output to proper indentation
2848       feedback_fn("* Hooks Results")
2849       assert hooks_results, "invalid result from hooks"
2850
2851       for node_name in hooks_results:
2852         res = hooks_results[node_name]
2853         msg = res.fail_msg
2854         test = msg and not res.offline
2855         self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
2856                       "Communication failure in hooks execution: %s", msg)
2857         if res.offline or msg:
2858           # No need to investigate payload if node is offline or gave
2859           # an error.
2860           continue
2861         for script, hkr, output in res.payload:
2862           test = hkr == constants.HKR_FAIL
2863           self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
2864                         "Script %s failed, output:", script)
2865           if test:
2866             output = self._HOOKS_INDENT_RE.sub("      ", output)
2867             feedback_fn("%s" % output)
2868             lu_result = False
2869
2870     return lu_result
2871
2872
2873 class LUClusterVerifyDisks(NoHooksLU):
2874   """Verifies the cluster disks status.
2875
2876   """
2877   REQ_BGL = False
2878
2879   def ExpandNames(self):
2880     self.share_locks = ShareAll()
2881     self.needed_locks = {
2882       locking.LEVEL_NODEGROUP: locking.ALL_SET,
2883       }
2884
2885   def Exec(self, feedback_fn):
2886     group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
2887
2888     # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
2889     return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
2890                            for group in group_names])