Cluster verify checks server.pem permissions
[ganeti-local] / lib / cmdlib / cluster.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Logical units dealing with the cluster."""
23
24 import OpenSSL
25
26 import copy
27 import itertools
28 import logging
29 import operator
30 import os
31 import re
32 import time
33
34 from ganeti import compat
35 from ganeti import constants
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import masterd
40 from ganeti import netutils
41 from ganeti import objects
42 from ganeti import opcodes
43 from ganeti import pathutils
44 from ganeti import query
45 from ganeti import rpc
46 from ganeti import runtime
47 from ganeti import ssh
48 from ganeti import uidpool
49 from ganeti import utils
50 from ganeti import vcluster
51
52 from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
53   ResultWithJobs
54 from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55   ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56   GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57   GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58   CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59   ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob
60
61 import ganeti.masterd.instance
62
63
64 class LUClusterActivateMasterIp(NoHooksLU):
65   """Activate the master IP on the master node.
66
67   """
68   def Exec(self, feedback_fn):
69     """Activate the master IP.
70
71     """
72     master_params = self.cfg.GetMasterNetworkParameters()
73     ems = self.cfg.GetUseExternalMipScript()
74     result = self.rpc.call_node_activate_master_ip(master_params.name,
75                                                    master_params, ems)
76     result.Raise("Could not activate the master IP")
77
78
79 class LUClusterDeactivateMasterIp(NoHooksLU):
80   """Deactivate the master IP on the master node.
81
82   """
83   def Exec(self, feedback_fn):
84     """Deactivate the master IP.
85
86     """
87     master_params = self.cfg.GetMasterNetworkParameters()
88     ems = self.cfg.GetUseExternalMipScript()
89     result = self.rpc.call_node_deactivate_master_ip(master_params.name,
90                                                      master_params, ems)
91     result.Raise("Could not deactivate the master IP")
92
93
94 class LUClusterConfigQuery(NoHooksLU):
95   """Return configuration values.
96
97   """
98   REQ_BGL = False
99
100   def CheckArguments(self):
101     self.cq = ClusterQuery(None, self.op.output_fields, False)
102
103   def ExpandNames(self):
104     self.cq.ExpandNames(self)
105
106   def DeclareLocks(self, level):
107     self.cq.DeclareLocks(self, level)
108
109   def Exec(self, feedback_fn):
110     result = self.cq.OldStyleQuery(self)
111
112     assert len(result) == 1
113
114     return result[0]
115
116
117 class LUClusterDestroy(LogicalUnit):
118   """Logical unit for destroying the cluster.
119
120   """
121   HPATH = "cluster-destroy"
122   HTYPE = constants.HTYPE_CLUSTER
123
124   def BuildHooksEnv(self):
125     """Build hooks env.
126
127     """
128     return {
129       "OP_TARGET": self.cfg.GetClusterName(),
130       }
131
132   def BuildHooksNodes(self):
133     """Build hooks nodes.
134
135     """
136     return ([], [])
137
138   def CheckPrereq(self):
139     """Check prerequisites.
140
141     This checks whether the cluster is empty.
142
143     Any errors are signaled by raising errors.OpPrereqError.
144
145     """
146     master = self.cfg.GetMasterNode()
147
148     nodelist = self.cfg.GetNodeList()
149     if len(nodelist) != 1 or nodelist[0] != master:
150       raise errors.OpPrereqError("There are still %d node(s) in"
151                                  " this cluster." % (len(nodelist) - 1),
152                                  errors.ECODE_INVAL)
153     instancelist = self.cfg.GetInstanceList()
154     if instancelist:
155       raise errors.OpPrereqError("There are still %d instance(s) in"
156                                  " this cluster." % len(instancelist),
157                                  errors.ECODE_INVAL)
158
159   def Exec(self, feedback_fn):
160     """Destroys the cluster.
161
162     """
163     master_params = self.cfg.GetMasterNetworkParameters()
164
165     # Run post hooks on master node before it's removed
166     RunPostHook(self, master_params.name)
167
168     ems = self.cfg.GetUseExternalMipScript()
169     result = self.rpc.call_node_deactivate_master_ip(master_params.name,
170                                                      master_params, ems)
171     if result.fail_msg:
172       self.LogWarning("Error disabling the master IP address: %s",
173                       result.fail_msg)
174
175     return master_params.name
176
177
178 class LUClusterPostInit(LogicalUnit):
179   """Logical unit for running hooks after cluster initialization.
180
181   """
182   HPATH = "cluster-init"
183   HTYPE = constants.HTYPE_CLUSTER
184
185   def BuildHooksEnv(self):
186     """Build hooks env.
187
188     """
189     return {
190       "OP_TARGET": self.cfg.GetClusterName(),
191       }
192
193   def BuildHooksNodes(self):
194     """Build hooks nodes.
195
196     """
197     return ([], [self.cfg.GetMasterNode()])
198
199   def Exec(self, feedback_fn):
200     """Nothing to do.
201
202     """
203     return True
204
205
206 class ClusterQuery(QueryBase):
207   FIELDS = query.CLUSTER_FIELDS
208
209   #: Do not sort (there is only one item)
210   SORT_FIELD = None
211
212   def ExpandNames(self, lu):
213     lu.needed_locks = {}
214
215     # The following variables interact with _QueryBase._GetNames
216     self.wanted = locking.ALL_SET
217     self.do_locking = self.use_locking
218
219     if self.do_locking:
220       raise errors.OpPrereqError("Can not use locking for cluster queries",
221                                  errors.ECODE_INVAL)
222
223   def DeclareLocks(self, lu, level):
224     pass
225
226   def _GetQueryData(self, lu):
227     """Computes the list of nodes and their attributes.
228
229     """
230     # Locking is not used
231     assert not (compat.any(lu.glm.is_owned(level)
232                            for level in locking.LEVELS
233                            if level != locking.LEVEL_CLUSTER) or
234                 self.do_locking or self.use_locking)
235
236     if query.CQ_CONFIG in self.requested_data:
237       cluster = lu.cfg.GetClusterInfo()
238     else:
239       cluster = NotImplemented
240
241     if query.CQ_QUEUE_DRAINED in self.requested_data:
242       drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
243     else:
244       drain_flag = NotImplemented
245
246     if query.CQ_WATCHER_PAUSE in self.requested_data:
247       master_name = lu.cfg.GetMasterNode()
248
249       result = lu.rpc.call_get_watcher_pause(master_name)
250       result.Raise("Can't retrieve watcher pause from master node '%s'" %
251                    master_name)
252
253       watcher_pause = result.payload
254     else:
255       watcher_pause = NotImplemented
256
257     return query.ClusterQueryData(cluster, drain_flag, watcher_pause)
258
259
260 class LUClusterQuery(NoHooksLU):
261   """Query cluster configuration.
262
263   """
264   REQ_BGL = False
265
266   def ExpandNames(self):
267     self.needed_locks = {}
268
269   def Exec(self, feedback_fn):
270     """Return cluster config.
271
272     """
273     cluster = self.cfg.GetClusterInfo()
274     os_hvp = {}
275
276     # Filter just for enabled hypervisors
277     for os_name, hv_dict in cluster.os_hvp.items():
278       os_hvp[os_name] = {}
279       for hv_name, hv_params in hv_dict.items():
280         if hv_name in cluster.enabled_hypervisors:
281           os_hvp[os_name][hv_name] = hv_params
282
283     # Convert ip_family to ip_version
284     primary_ip_version = constants.IP4_VERSION
285     if cluster.primary_ip_family == netutils.IP6Address.family:
286       primary_ip_version = constants.IP6_VERSION
287
288     result = {
289       "software_version": constants.RELEASE_VERSION,
290       "protocol_version": constants.PROTOCOL_VERSION,
291       "config_version": constants.CONFIG_VERSION,
292       "os_api_version": max(constants.OS_API_VERSIONS),
293       "export_version": constants.EXPORT_VERSION,
294       "architecture": runtime.GetArchInfo(),
295       "name": cluster.cluster_name,
296       "master": cluster.master_node,
297       "default_hypervisor": cluster.primary_hypervisor,
298       "enabled_hypervisors": cluster.enabled_hypervisors,
299       "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
300                         for hypervisor_name in cluster.enabled_hypervisors]),
301       "os_hvp": os_hvp,
302       "beparams": cluster.beparams,
303       "osparams": cluster.osparams,
304       "ipolicy": cluster.ipolicy,
305       "nicparams": cluster.nicparams,
306       "ndparams": cluster.ndparams,
307       "diskparams": cluster.diskparams,
308       "candidate_pool_size": cluster.candidate_pool_size,
309       "master_netdev": cluster.master_netdev,
310       "master_netmask": cluster.master_netmask,
311       "use_external_mip_script": cluster.use_external_mip_script,
312       "volume_group_name": cluster.volume_group_name,
313       "drbd_usermode_helper": cluster.drbd_usermode_helper,
314       "file_storage_dir": cluster.file_storage_dir,
315       "shared_file_storage_dir": cluster.shared_file_storage_dir,
316       "maintain_node_health": cluster.maintain_node_health,
317       "ctime": cluster.ctime,
318       "mtime": cluster.mtime,
319       "uuid": cluster.uuid,
320       "tags": list(cluster.GetTags()),
321       "uid_pool": cluster.uid_pool,
322       "default_iallocator": cluster.default_iallocator,
323       "reserved_lvs": cluster.reserved_lvs,
324       "primary_ip_version": primary_ip_version,
325       "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
326       "hidden_os": cluster.hidden_os,
327       "blacklisted_os": cluster.blacklisted_os,
328       "enabled_disk_templates": cluster.enabled_disk_templates,
329       }
330
331     return result
332
333
334 class LUClusterRedistConf(NoHooksLU):
335   """Force the redistribution of cluster configuration.
336
337   This is a very simple LU.
338
339   """
340   REQ_BGL = False
341
342   def ExpandNames(self):
343     self.needed_locks = {
344       locking.LEVEL_NODE: locking.ALL_SET,
345       locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
346     }
347     self.share_locks = ShareAll()
348
349   def Exec(self, feedback_fn):
350     """Redistribute the configuration.
351
352     """
353     self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
354     RedistributeAncillaryFiles(self)
355
356
357 class LUClusterRename(LogicalUnit):
358   """Rename the cluster.
359
360   """
361   HPATH = "cluster-rename"
362   HTYPE = constants.HTYPE_CLUSTER
363
364   def BuildHooksEnv(self):
365     """Build hooks env.
366
367     """
368     return {
369       "OP_TARGET": self.cfg.GetClusterName(),
370       "NEW_NAME": self.op.name,
371       }
372
373   def BuildHooksNodes(self):
374     """Build hooks nodes.
375
376     """
377     return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
378
379   def CheckPrereq(self):
380     """Verify that the passed name is a valid one.
381
382     """
383     hostname = netutils.GetHostname(name=self.op.name,
384                                     family=self.cfg.GetPrimaryIPFamily())
385
386     new_name = hostname.name
387     self.ip = new_ip = hostname.ip
388     old_name = self.cfg.GetClusterName()
389     old_ip = self.cfg.GetMasterIP()
390     if new_name == old_name and new_ip == old_ip:
391       raise errors.OpPrereqError("Neither the name nor the IP address of the"
392                                  " cluster has changed",
393                                  errors.ECODE_INVAL)
394     if new_ip != old_ip:
395       if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
396         raise errors.OpPrereqError("The given cluster IP address (%s) is"
397                                    " reachable on the network" %
398                                    new_ip, errors.ECODE_NOTUNIQUE)
399
400     self.op.name = new_name
401
402   def Exec(self, feedback_fn):
403     """Rename the cluster.
404
405     """
406     clustername = self.op.name
407     new_ip = self.ip
408
409     # shutdown the master IP
410     master_params = self.cfg.GetMasterNetworkParameters()
411     ems = self.cfg.GetUseExternalMipScript()
412     result = self.rpc.call_node_deactivate_master_ip(master_params.name,
413                                                      master_params, ems)
414     result.Raise("Could not disable the master role")
415
416     try:
417       cluster = self.cfg.GetClusterInfo()
418       cluster.cluster_name = clustername
419       cluster.master_ip = new_ip
420       self.cfg.Update(cluster, feedback_fn)
421
422       # update the known hosts file
423       ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
424       node_list = self.cfg.GetOnlineNodeList()
425       try:
426         node_list.remove(master_params.name)
427       except ValueError:
428         pass
429       UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
430     finally:
431       master_params.ip = new_ip
432       result = self.rpc.call_node_activate_master_ip(master_params.name,
433                                                      master_params, ems)
434       msg = result.fail_msg
435       if msg:
436         self.LogWarning("Could not re-enable the master role on"
437                         " the master, please restart manually: %s", msg)
438
439     return clustername
440
441
442 class LUClusterRepairDiskSizes(NoHooksLU):
443   """Verifies the cluster disks sizes.
444
445   """
446   REQ_BGL = False
447
448   def ExpandNames(self):
449     if self.op.instances:
450       self.wanted_names = GetWantedInstances(self, self.op.instances)
451       # Not getting the node allocation lock as only a specific set of
452       # instances (and their nodes) is going to be acquired
453       self.needed_locks = {
454         locking.LEVEL_NODE_RES: [],
455         locking.LEVEL_INSTANCE: self.wanted_names,
456         }
457       self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
458     else:
459       self.wanted_names = None
460       self.needed_locks = {
461         locking.LEVEL_NODE_RES: locking.ALL_SET,
462         locking.LEVEL_INSTANCE: locking.ALL_SET,
463
464         # This opcode is acquires the node locks for all instances
465         locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
466         }
467
468     self.share_locks = {
469       locking.LEVEL_NODE_RES: 1,
470       locking.LEVEL_INSTANCE: 0,
471       locking.LEVEL_NODE_ALLOC: 1,
472       }
473
474   def DeclareLocks(self, level):
475     if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
476       self._LockInstancesNodes(primary_only=True, level=level)
477
478   def CheckPrereq(self):
479     """Check prerequisites.
480
481     This only checks the optional instance list against the existing names.
482
483     """
484     if self.wanted_names is None:
485       self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
486
487     self.wanted_instances = \
488         map(compat.snd, self.cfg.GetMultiInstanceInfo(self.wanted_names))
489
490   def _EnsureChildSizes(self, disk):
491     """Ensure children of the disk have the needed disk size.
492
493     This is valid mainly for DRBD8 and fixes an issue where the
494     children have smaller disk size.
495
496     @param disk: an L{ganeti.objects.Disk} object
497
498     """
499     if disk.dev_type == constants.LD_DRBD8:
500       assert disk.children, "Empty children for DRBD8?"
501       fchild = disk.children[0]
502       mismatch = fchild.size < disk.size
503       if mismatch:
504         self.LogInfo("Child disk has size %d, parent %d, fixing",
505                      fchild.size, disk.size)
506         fchild.size = disk.size
507
508       # and we recurse on this child only, not on the metadev
509       return self._EnsureChildSizes(fchild) or mismatch
510     else:
511       return False
512
513   def Exec(self, feedback_fn):
514     """Verify the size of cluster disks.
515
516     """
517     # TODO: check child disks too
518     # TODO: check differences in size between primary/secondary nodes
519     per_node_disks = {}
520     for instance in self.wanted_instances:
521       pnode = instance.primary_node
522       if pnode not in per_node_disks:
523         per_node_disks[pnode] = []
524       for idx, disk in enumerate(instance.disks):
525         per_node_disks[pnode].append((instance, idx, disk))
526
527     assert not (frozenset(per_node_disks.keys()) -
528                 self.owned_locks(locking.LEVEL_NODE_RES)), \
529       "Not owning correct locks"
530     assert not self.owned_locks(locking.LEVEL_NODE)
531
532     changed = []
533     for node, dskl in per_node_disks.items():
534       newl = [v[2].Copy() for v in dskl]
535       for dsk in newl:
536         self.cfg.SetDiskID(dsk, node)
537       result = self.rpc.call_blockdev_getsize(node, newl)
538       if result.fail_msg:
539         self.LogWarning("Failure in blockdev_getsize call to node"
540                         " %s, ignoring", node)
541         continue
542       if len(result.payload) != len(dskl):
543         logging.warning("Invalid result from node %s: len(dksl)=%d,"
544                         " result.payload=%s", node, len(dskl), result.payload)
545         self.LogWarning("Invalid result from node %s, ignoring node results",
546                         node)
547         continue
548       for ((instance, idx, disk), size) in zip(dskl, result.payload):
549         if size is None:
550           self.LogWarning("Disk %d of instance %s did not return size"
551                           " information, ignoring", idx, instance.name)
552           continue
553         if not isinstance(size, (int, long)):
554           self.LogWarning("Disk %d of instance %s did not return valid"
555                           " size information, ignoring", idx, instance.name)
556           continue
557         size = size >> 20
558         if size != disk.size:
559           self.LogInfo("Disk %d of instance %s has mismatched size,"
560                        " correcting: recorded %d, actual %d", idx,
561                        instance.name, disk.size, size)
562           disk.size = size
563           self.cfg.Update(instance, feedback_fn)
564           changed.append((instance.name, idx, size))
565         if self._EnsureChildSizes(disk):
566           self.cfg.Update(instance, feedback_fn)
567           changed.append((instance.name, idx, disk.size))
568     return changed
569
570
571 def _ValidateNetmask(cfg, netmask):
572   """Checks if a netmask is valid.
573
574   @type cfg: L{config.ConfigWriter}
575   @param cfg: The cluster configuration
576   @type netmask: int
577   @param netmask: the netmask to be verified
578   @raise errors.OpPrereqError: if the validation fails
579
580   """
581   ip_family = cfg.GetPrimaryIPFamily()
582   try:
583     ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
584   except errors.ProgrammerError:
585     raise errors.OpPrereqError("Invalid primary ip family: %s." %
586                                ip_family, errors.ECODE_INVAL)
587   if not ipcls.ValidateNetmask(netmask):
588     raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
589                                (netmask), errors.ECODE_INVAL)
590
591
592 class LUClusterSetParams(LogicalUnit):
593   """Change the parameters of the cluster.
594
595   """
596   HPATH = "cluster-modify"
597   HTYPE = constants.HTYPE_CLUSTER
598   REQ_BGL = False
599
600   def CheckArguments(self):
601     """Check parameters
602
603     """
604     if self.op.uid_pool:
605       uidpool.CheckUidPool(self.op.uid_pool)
606
607     if self.op.add_uids:
608       uidpool.CheckUidPool(self.op.add_uids)
609
610     if self.op.remove_uids:
611       uidpool.CheckUidPool(self.op.remove_uids)
612
613     if self.op.master_netmask is not None:
614       _ValidateNetmask(self.cfg, self.op.master_netmask)
615
616     if self.op.diskparams:
617       for dt_params in self.op.diskparams.values():
618         utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
619       try:
620         utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
621       except errors.OpPrereqError, err:
622         raise errors.OpPrereqError("While verify diskparams options: %s" % err,
623                                    errors.ECODE_INVAL)
624
625   def ExpandNames(self):
626     # FIXME: in the future maybe other cluster params won't require checking on
627     # all nodes to be modified.
628     # FIXME: This opcode changes cluster-wide settings. Is acquiring all
629     # resource locks the right thing, shouldn't it be the BGL instead?
630     self.needed_locks = {
631       locking.LEVEL_NODE: locking.ALL_SET,
632       locking.LEVEL_INSTANCE: locking.ALL_SET,
633       locking.LEVEL_NODEGROUP: locking.ALL_SET,
634       locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
635     }
636     self.share_locks = ShareAll()
637
638   def BuildHooksEnv(self):
639     """Build hooks env.
640
641     """
642     return {
643       "OP_TARGET": self.cfg.GetClusterName(),
644       "NEW_VG_NAME": self.op.vg_name,
645       }
646
647   def BuildHooksNodes(self):
648     """Build hooks nodes.
649
650     """
651     mn = self.cfg.GetMasterNode()
652     return ([mn], [mn])
653
654   def CheckPrereq(self):
655     """Check prerequisites.
656
657     This checks whether the given params don't conflict and
658     if the given volume group is valid.
659
660     """
661     if self.op.vg_name is not None and not self.op.vg_name:
662       if self.cfg.HasAnyDiskOfType(constants.LD_LV):
663         raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
664                                    " instances exist", errors.ECODE_INVAL)
665
666     if self.op.drbd_helper is not None and not self.op.drbd_helper:
667       if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
668         raise errors.OpPrereqError("Cannot disable drbd helper while"
669                                    " drbd-based instances exist",
670                                    errors.ECODE_INVAL)
671
672     node_list = self.owned_locks(locking.LEVEL_NODE)
673
674     vm_capable_nodes = [node.name
675                         for node in self.cfg.GetAllNodesInfo().values()
676                         if node.name in node_list and node.vm_capable]
677
678     # if vg_name not None, checks given volume group on all nodes
679     if self.op.vg_name:
680       vglist = self.rpc.call_vg_list(vm_capable_nodes)
681       for node in vm_capable_nodes:
682         msg = vglist[node].fail_msg
683         if msg:
684           # ignoring down node
685           self.LogWarning("Error while gathering data on node %s"
686                           " (ignoring node): %s", node, msg)
687           continue
688         vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
689                                               self.op.vg_name,
690                                               constants.MIN_VG_SIZE)
691         if vgstatus:
692           raise errors.OpPrereqError("Error on node '%s': %s" %
693                                      (node, vgstatus), errors.ECODE_ENVIRON)
694
695     if self.op.drbd_helper:
696       # checks given drbd helper on all nodes
697       helpers = self.rpc.call_drbd_helper(node_list)
698       for (node, ninfo) in self.cfg.GetMultiNodeInfo(node_list):
699         if ninfo.offline:
700           self.LogInfo("Not checking drbd helper on offline node %s", node)
701           continue
702         msg = helpers[node].fail_msg
703         if msg:
704           raise errors.OpPrereqError("Error checking drbd helper on node"
705                                      " '%s': %s" % (node, msg),
706                                      errors.ECODE_ENVIRON)
707         node_helper = helpers[node].payload
708         if node_helper != self.op.drbd_helper:
709           raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
710                                      (node, node_helper), errors.ECODE_ENVIRON)
711
712     self.cluster = cluster = self.cfg.GetClusterInfo()
713     # validate params changes
714     if self.op.beparams:
715       objects.UpgradeBeParams(self.op.beparams)
716       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
717       self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
718
719     if self.op.ndparams:
720       utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
721       self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
722
723       # TODO: we need a more general way to handle resetting
724       # cluster-level parameters to default values
725       if self.new_ndparams["oob_program"] == "":
726         self.new_ndparams["oob_program"] = \
727             constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
728
729     if self.op.hv_state:
730       new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
731                                            self.cluster.hv_state_static)
732       self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
733                                for hv, values in new_hv_state.items())
734
735     if self.op.disk_state:
736       new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
737                                                self.cluster.disk_state_static)
738       self.new_disk_state = \
739         dict((storage, dict((name, cluster.SimpleFillDiskState(values))
740                             for name, values in svalues.items()))
741              for storage, svalues in new_disk_state.items())
742
743     if self.op.ipolicy:
744       self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
745                                            group_policy=False)
746
747       all_instances = self.cfg.GetAllInstancesInfo().values()
748       violations = set()
749       for group in self.cfg.GetAllNodeGroupsInfo().values():
750         instances = frozenset([inst for inst in all_instances
751                                if compat.any(node in group.members
752                                              for node in inst.all_nodes)])
753         new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
754         ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
755         new = ComputeNewInstanceViolations(ipol,
756                                            new_ipolicy, instances, self.cfg)
757         if new:
758           violations.update(new)
759
760       if violations:
761         self.LogWarning("After the ipolicy change the following instances"
762                         " violate them: %s",
763                         utils.CommaJoin(utils.NiceSort(violations)))
764
765     if self.op.nicparams:
766       utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
767       self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
768       objects.NIC.CheckParameterSyntax(self.new_nicparams)
769       nic_errors = []
770
771       # check all instances for consistency
772       for instance in self.cfg.GetAllInstancesInfo().values():
773         for nic_idx, nic in enumerate(instance.nics):
774           params_copy = copy.deepcopy(nic.nicparams)
775           params_filled = objects.FillDict(self.new_nicparams, params_copy)
776
777           # check parameter syntax
778           try:
779             objects.NIC.CheckParameterSyntax(params_filled)
780           except errors.ConfigurationError, err:
781             nic_errors.append("Instance %s, nic/%d: %s" %
782                               (instance.name, nic_idx, err))
783
784           # if we're moving instances to routed, check that they have an ip
785           target_mode = params_filled[constants.NIC_MODE]
786           if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
787             nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
788                               " address" % (instance.name, nic_idx))
789       if nic_errors:
790         raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
791                                    "\n".join(nic_errors), errors.ECODE_INVAL)
792
793     # hypervisor list/parameters
794     self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
795     if self.op.hvparams:
796       for hv_name, hv_dict in self.op.hvparams.items():
797         if hv_name not in self.new_hvparams:
798           self.new_hvparams[hv_name] = hv_dict
799         else:
800           self.new_hvparams[hv_name].update(hv_dict)
801
802     # disk template parameters
803     self.new_diskparams = objects.FillDict(cluster.diskparams, {})
804     if self.op.diskparams:
805       for dt_name, dt_params in self.op.diskparams.items():
806         if dt_name not in self.op.diskparams:
807           self.new_diskparams[dt_name] = dt_params
808         else:
809           self.new_diskparams[dt_name].update(dt_params)
810
811     # os hypervisor parameters
812     self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
813     if self.op.os_hvp:
814       for os_name, hvs in self.op.os_hvp.items():
815         if os_name not in self.new_os_hvp:
816           self.new_os_hvp[os_name] = hvs
817         else:
818           for hv_name, hv_dict in hvs.items():
819             if hv_dict is None:
820               # Delete if it exists
821               self.new_os_hvp[os_name].pop(hv_name, None)
822             elif hv_name not in self.new_os_hvp[os_name]:
823               self.new_os_hvp[os_name][hv_name] = hv_dict
824             else:
825               self.new_os_hvp[os_name][hv_name].update(hv_dict)
826
827     # os parameters
828     self.new_osp = objects.FillDict(cluster.osparams, {})
829     if self.op.osparams:
830       for os_name, osp in self.op.osparams.items():
831         if os_name not in self.new_osp:
832           self.new_osp[os_name] = {}
833
834         self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
835                                                  use_none=True)
836
837         if not self.new_osp[os_name]:
838           # we removed all parameters
839           del self.new_osp[os_name]
840         else:
841           # check the parameter validity (remote check)
842           CheckOSParams(self, False, [self.cfg.GetMasterNode()],
843                         os_name, self.new_osp[os_name])
844
845     # changes to the hypervisor list
846     if self.op.enabled_hypervisors is not None:
847       self.hv_list = self.op.enabled_hypervisors
848       for hv in self.hv_list:
849         # if the hypervisor doesn't already exist in the cluster
850         # hvparams, we initialize it to empty, and then (in both
851         # cases) we make sure to fill the defaults, as we might not
852         # have a complete defaults list if the hypervisor wasn't
853         # enabled before
854         if hv not in new_hvp:
855           new_hvp[hv] = {}
856         new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
857         utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
858     else:
859       self.hv_list = cluster.enabled_hypervisors
860
861     if self.op.hvparams or self.op.enabled_hypervisors is not None:
862       # either the enabled list has changed, or the parameters have, validate
863       for hv_name, hv_params in self.new_hvparams.items():
864         if ((self.op.hvparams and hv_name in self.op.hvparams) or
865             (self.op.enabled_hypervisors and
866              hv_name in self.op.enabled_hypervisors)):
867           # either this is a new hypervisor, or its parameters have changed
868           hv_class = hypervisor.GetHypervisorClass(hv_name)
869           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
870           hv_class.CheckParameterSyntax(hv_params)
871           CheckHVParams(self, node_list, hv_name, hv_params)
872
873     self._CheckDiskTemplateConsistency()
874
875     if self.op.os_hvp:
876       # no need to check any newly-enabled hypervisors, since the
877       # defaults have already been checked in the above code-block
878       for os_name, os_hvp in self.new_os_hvp.items():
879         for hv_name, hv_params in os_hvp.items():
880           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
881           # we need to fill in the new os_hvp on top of the actual hv_p
882           cluster_defaults = self.new_hvparams.get(hv_name, {})
883           new_osp = objects.FillDict(cluster_defaults, hv_params)
884           hv_class = hypervisor.GetHypervisorClass(hv_name)
885           hv_class.CheckParameterSyntax(new_osp)
886           CheckHVParams(self, node_list, hv_name, new_osp)
887
888     if self.op.default_iallocator:
889       alloc_script = utils.FindFile(self.op.default_iallocator,
890                                     constants.IALLOCATOR_SEARCH_PATH,
891                                     os.path.isfile)
892       if alloc_script is None:
893         raise errors.OpPrereqError("Invalid default iallocator script '%s'"
894                                    " specified" % self.op.default_iallocator,
895                                    errors.ECODE_INVAL)
896
897   def _CheckDiskTemplateConsistency(self):
898     """Check whether the disk templates that are going to be disabled
899        are still in use by some instances.
900
901     """
902     if self.op.enabled_disk_templates:
903       cluster = self.cfg.GetClusterInfo()
904       instances = self.cfg.GetAllInstancesInfo()
905
906       disk_templates_to_remove = set(cluster.enabled_disk_templates) \
907         - set(self.op.enabled_disk_templates)
908       for instance in instances.itervalues():
909         if instance.disk_template in disk_templates_to_remove:
910           raise errors.OpPrereqError("Cannot disable disk template '%s',"
911                                      " because instance '%s' is using it." %
912                                      (instance.disk_template, instance.name))
913
914   def Exec(self, feedback_fn):
915     """Change the parameters of the cluster.
916
917     """
918     if self.op.vg_name is not None:
919       new_volume = self.op.vg_name
920       if not new_volume:
921         new_volume = None
922       if new_volume != self.cfg.GetVGName():
923         self.cfg.SetVGName(new_volume)
924       else:
925         feedback_fn("Cluster LVM configuration already in desired"
926                     " state, not changing")
927     if self.op.drbd_helper is not None:
928       new_helper = self.op.drbd_helper
929       if not new_helper:
930         new_helper = None
931       if new_helper != self.cfg.GetDRBDHelper():
932         self.cfg.SetDRBDHelper(new_helper)
933       else:
934         feedback_fn("Cluster DRBD helper already in desired state,"
935                     " not changing")
936     if self.op.hvparams:
937       self.cluster.hvparams = self.new_hvparams
938     if self.op.os_hvp:
939       self.cluster.os_hvp = self.new_os_hvp
940     if self.op.enabled_hypervisors is not None:
941       self.cluster.hvparams = self.new_hvparams
942       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
943     if self.op.enabled_disk_templates:
944       self.cluster.enabled_disk_templates = \
945         list(set(self.op.enabled_disk_templates))
946     if self.op.beparams:
947       self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
948     if self.op.nicparams:
949       self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
950     if self.op.ipolicy:
951       self.cluster.ipolicy = self.new_ipolicy
952     if self.op.osparams:
953       self.cluster.osparams = self.new_osp
954     if self.op.ndparams:
955       self.cluster.ndparams = self.new_ndparams
956     if self.op.diskparams:
957       self.cluster.diskparams = self.new_diskparams
958     if self.op.hv_state:
959       self.cluster.hv_state_static = self.new_hv_state
960     if self.op.disk_state:
961       self.cluster.disk_state_static = self.new_disk_state
962
963     if self.op.candidate_pool_size is not None:
964       self.cluster.candidate_pool_size = self.op.candidate_pool_size
965       # we need to update the pool size here, otherwise the save will fail
966       AdjustCandidatePool(self, [])
967
968     if self.op.maintain_node_health is not None:
969       if self.op.maintain_node_health and not constants.ENABLE_CONFD:
970         feedback_fn("Note: CONFD was disabled at build time, node health"
971                     " maintenance is not useful (still enabling it)")
972       self.cluster.maintain_node_health = self.op.maintain_node_health
973
974     if self.op.prealloc_wipe_disks is not None:
975       self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
976
977     if self.op.add_uids is not None:
978       uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
979
980     if self.op.remove_uids is not None:
981       uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
982
983     if self.op.uid_pool is not None:
984       self.cluster.uid_pool = self.op.uid_pool
985
986     if self.op.default_iallocator is not None:
987       self.cluster.default_iallocator = self.op.default_iallocator
988
989     if self.op.reserved_lvs is not None:
990       self.cluster.reserved_lvs = self.op.reserved_lvs
991
992     if self.op.use_external_mip_script is not None:
993       self.cluster.use_external_mip_script = self.op.use_external_mip_script
994
995     def helper_os(aname, mods, desc):
996       desc += " OS list"
997       lst = getattr(self.cluster, aname)
998       for key, val in mods:
999         if key == constants.DDM_ADD:
1000           if val in lst:
1001             feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1002           else:
1003             lst.append(val)
1004         elif key == constants.DDM_REMOVE:
1005           if val in lst:
1006             lst.remove(val)
1007           else:
1008             feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1009         else:
1010           raise errors.ProgrammerError("Invalid modification '%s'" % key)
1011
1012     if self.op.hidden_os:
1013       helper_os("hidden_os", self.op.hidden_os, "hidden")
1014
1015     if self.op.blacklisted_os:
1016       helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1017
1018     if self.op.master_netdev:
1019       master_params = self.cfg.GetMasterNetworkParameters()
1020       ems = self.cfg.GetUseExternalMipScript()
1021       feedback_fn("Shutting down master ip on the current netdev (%s)" %
1022                   self.cluster.master_netdev)
1023       result = self.rpc.call_node_deactivate_master_ip(master_params.name,
1024                                                        master_params, ems)
1025       if not self.op.force:
1026         result.Raise("Could not disable the master ip")
1027       else:
1028         if result.fail_msg:
1029           msg = ("Could not disable the master ip (continuing anyway): %s" %
1030                  result.fail_msg)
1031           feedback_fn(msg)
1032       feedback_fn("Changing master_netdev from %s to %s" %
1033                   (master_params.netdev, self.op.master_netdev))
1034       self.cluster.master_netdev = self.op.master_netdev
1035
1036     if self.op.master_netmask:
1037       master_params = self.cfg.GetMasterNetworkParameters()
1038       feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1039       result = self.rpc.call_node_change_master_netmask(master_params.name,
1040                                                         master_params.netmask,
1041                                                         self.op.master_netmask,
1042                                                         master_params.ip,
1043                                                         master_params.netdev)
1044       if result.fail_msg:
1045         msg = "Could not change the master IP netmask: %s" % result.fail_msg
1046         feedback_fn(msg)
1047
1048       self.cluster.master_netmask = self.op.master_netmask
1049
1050     self.cfg.Update(self.cluster, feedback_fn)
1051
1052     if self.op.master_netdev:
1053       master_params = self.cfg.GetMasterNetworkParameters()
1054       feedback_fn("Starting the master ip on the new master netdev (%s)" %
1055                   self.op.master_netdev)
1056       ems = self.cfg.GetUseExternalMipScript()
1057       result = self.rpc.call_node_activate_master_ip(master_params.name,
1058                                                      master_params, ems)
1059       if result.fail_msg:
1060         self.LogWarning("Could not re-enable the master ip on"
1061                         " the master, please restart manually: %s",
1062                         result.fail_msg)
1063
1064
1065 class LUClusterVerify(NoHooksLU):
1066   """Submits all jobs necessary to verify the cluster.
1067
1068   """
1069   REQ_BGL = False
1070
1071   def ExpandNames(self):
1072     self.needed_locks = {}
1073
1074   def Exec(self, feedback_fn):
1075     jobs = []
1076
1077     if self.op.group_name:
1078       groups = [self.op.group_name]
1079       depends_fn = lambda: None
1080     else:
1081       groups = self.cfg.GetNodeGroupList()
1082
1083       # Verify global configuration
1084       jobs.append([
1085         opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1086         ])
1087
1088       # Always depend on global verification
1089       depends_fn = lambda: [(-len(jobs), [])]
1090
1091     jobs.extend(
1092       [opcodes.OpClusterVerifyGroup(group_name=group,
1093                                     ignore_errors=self.op.ignore_errors,
1094                                     depends=depends_fn())]
1095       for group in groups)
1096
1097     # Fix up all parameters
1098     for op in itertools.chain(*jobs): # pylint: disable=W0142
1099       op.debug_simulate_errors = self.op.debug_simulate_errors
1100       op.verbose = self.op.verbose
1101       op.error_codes = self.op.error_codes
1102       try:
1103         op.skip_checks = self.op.skip_checks
1104       except AttributeError:
1105         assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1106
1107     return ResultWithJobs(jobs)
1108
1109
1110 class _VerifyErrors(object):
1111   """Mix-in for cluster/group verify LUs.
1112
1113   It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1114   self.op and self._feedback_fn to be available.)
1115
1116   """
1117
1118   ETYPE_FIELD = "code"
1119   ETYPE_ERROR = "ERROR"
1120   ETYPE_WARNING = "WARNING"
1121
1122   def _Error(self, ecode, item, msg, *args, **kwargs):
1123     """Format an error message.
1124
1125     Based on the opcode's error_codes parameter, either format a
1126     parseable error code, or a simpler error string.
1127
1128     This must be called only from Exec and functions called from Exec.
1129
1130     """
1131     ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1132     itype, etxt, _ = ecode
1133     # If the error code is in the list of ignored errors, demote the error to a
1134     # warning
1135     if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1136       ltype = self.ETYPE_WARNING
1137     # first complete the msg
1138     if args:
1139       msg = msg % args
1140     # then format the whole message
1141     if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1142       msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1143     else:
1144       if item:
1145         item = " " + item
1146       else:
1147         item = ""
1148       msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1149     # and finally report it via the feedback_fn
1150     self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1151     # do not mark the operation as failed for WARN cases only
1152     if ltype == self.ETYPE_ERROR:
1153       self.bad = True
1154
1155   def _ErrorIf(self, cond, *args, **kwargs):
1156     """Log an error message if the passed condition is True.
1157
1158     """
1159     if (bool(cond)
1160         or self.op.debug_simulate_errors): # pylint: disable=E1101
1161       self._Error(*args, **kwargs)
1162
1163
1164 def _VerifyCertificate(filename):
1165   """Verifies a certificate for L{LUClusterVerifyConfig}.
1166
1167   @type filename: string
1168   @param filename: Path to PEM file
1169
1170   """
1171   try:
1172     cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1173                                            utils.ReadFile(filename))
1174   except Exception, err: # pylint: disable=W0703
1175     return (LUClusterVerifyConfig.ETYPE_ERROR,
1176             "Failed to load X509 certificate %s: %s" % (filename, err))
1177
1178   (errcode, msg) = \
1179     utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1180                                 constants.SSL_CERT_EXPIRATION_ERROR)
1181
1182   if msg:
1183     fnamemsg = "While verifying %s: %s" % (filename, msg)
1184   else:
1185     fnamemsg = None
1186
1187   if errcode is None:
1188     return (None, fnamemsg)
1189   elif errcode == utils.CERT_WARNING:
1190     return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1191   elif errcode == utils.CERT_ERROR:
1192     return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1193
1194   raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1195
1196
1197 def _GetAllHypervisorParameters(cluster, instances):
1198   """Compute the set of all hypervisor parameters.
1199
1200   @type cluster: L{objects.Cluster}
1201   @param cluster: the cluster object
1202   @param instances: list of L{objects.Instance}
1203   @param instances: additional instances from which to obtain parameters
1204   @rtype: list of (origin, hypervisor, parameters)
1205   @return: a list with all parameters found, indicating the hypervisor they
1206        apply to, and the origin (can be "cluster", "os X", or "instance Y")
1207
1208   """
1209   hvp_data = []
1210
1211   for hv_name in cluster.enabled_hypervisors:
1212     hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1213
1214   for os_name, os_hvp in cluster.os_hvp.items():
1215     for hv_name, hv_params in os_hvp.items():
1216       if hv_params:
1217         full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1218         hvp_data.append(("os %s" % os_name, hv_name, full_params))
1219
1220   # TODO: collapse identical parameter values in a single one
1221   for instance in instances:
1222     if instance.hvparams:
1223       hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1224                        cluster.FillHV(instance)))
1225
1226   return hvp_data
1227
1228
1229 class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1230   """Verifies the cluster config.
1231
1232   """
1233   REQ_BGL = False
1234
1235   def _VerifyHVP(self, hvp_data):
1236     """Verifies locally the syntax of the hypervisor parameters.
1237
1238     """
1239     for item, hv_name, hv_params in hvp_data:
1240       msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1241              (item, hv_name))
1242       try:
1243         hv_class = hypervisor.GetHypervisorClass(hv_name)
1244         utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1245         hv_class.CheckParameterSyntax(hv_params)
1246       except errors.GenericError, err:
1247         self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1248
1249   def ExpandNames(self):
1250     self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1251     self.share_locks = ShareAll()
1252
1253   def CheckPrereq(self):
1254     """Check prerequisites.
1255
1256     """
1257     # Retrieve all information
1258     self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1259     self.all_node_info = self.cfg.GetAllNodesInfo()
1260     self.all_inst_info = self.cfg.GetAllInstancesInfo()
1261
1262   def Exec(self, feedback_fn):
1263     """Verify integrity of cluster, performing various test on nodes.
1264
1265     """
1266     self.bad = False
1267     self._feedback_fn = feedback_fn
1268
1269     feedback_fn("* Verifying cluster config")
1270
1271     for msg in self.cfg.VerifyConfig():
1272       self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1273
1274     feedback_fn("* Verifying cluster certificate files")
1275
1276     for cert_filename in pathutils.ALL_CERT_FILES:
1277       (errcode, msg) = _VerifyCertificate(cert_filename)
1278       self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1279
1280     self._ErrorIf(not utils.CanRead(constants.CONFD_USER,
1281                                     pathutils.NODED_CERT_FILE),
1282                   constants.CV_ECLUSTERCERT,
1283                   None,
1284                   pathutils.NODED_CERT_FILE + " must be accessible by the " +
1285                     constants.CONFD_USER + " user")
1286
1287     feedback_fn("* Verifying hypervisor parameters")
1288
1289     self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1290                                                 self.all_inst_info.values()))
1291
1292     feedback_fn("* Verifying all nodes belong to an existing group")
1293
1294     # We do this verification here because, should this bogus circumstance
1295     # occur, it would never be caught by VerifyGroup, which only acts on
1296     # nodes/instances reachable from existing node groups.
1297
1298     dangling_nodes = set(node.name for node in self.all_node_info.values()
1299                          if node.group not in self.all_group_info)
1300
1301     dangling_instances = {}
1302     no_node_instances = []
1303
1304     for inst in self.all_inst_info.values():
1305       if inst.primary_node in dangling_nodes:
1306         dangling_instances.setdefault(inst.primary_node, []).append(inst.name)
1307       elif inst.primary_node not in self.all_node_info:
1308         no_node_instances.append(inst.name)
1309
1310     pretty_dangling = [
1311         "%s (%s)" %
1312         (node.name,
1313          utils.CommaJoin(dangling_instances.get(node.name,
1314                                                 ["no instances"])))
1315         for node in dangling_nodes]
1316
1317     self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1318                   None,
1319                   "the following nodes (and their instances) belong to a non"
1320                   " existing group: %s", utils.CommaJoin(pretty_dangling))
1321
1322     self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1323                   None,
1324                   "the following instances have a non-existing primary-node:"
1325                   " %s", utils.CommaJoin(no_node_instances))
1326
1327     return not self.bad
1328
1329
1330 class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1331   """Verifies the status of a node group.
1332
1333   """
1334   HPATH = "cluster-verify"
1335   HTYPE = constants.HTYPE_CLUSTER
1336   REQ_BGL = False
1337
1338   _HOOKS_INDENT_RE = re.compile("^", re.M)
1339
1340   class NodeImage(object):
1341     """A class representing the logical and physical status of a node.
1342
1343     @type name: string
1344     @ivar name: the node name to which this object refers
1345     @ivar volumes: a structure as returned from
1346         L{ganeti.backend.GetVolumeList} (runtime)
1347     @ivar instances: a list of running instances (runtime)
1348     @ivar pinst: list of configured primary instances (config)
1349     @ivar sinst: list of configured secondary instances (config)
1350     @ivar sbp: dictionary of {primary-node: list of instances} for all
1351         instances for which this node is secondary (config)
1352     @ivar mfree: free memory, as reported by hypervisor (runtime)
1353     @ivar dfree: free disk, as reported by the node (runtime)
1354     @ivar offline: the offline status (config)
1355     @type rpc_fail: boolean
1356     @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1357         not whether the individual keys were correct) (runtime)
1358     @type lvm_fail: boolean
1359     @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1360     @type hyp_fail: boolean
1361     @ivar hyp_fail: whether the RPC call didn't return the instance list
1362     @type ghost: boolean
1363     @ivar ghost: whether this is a known node or not (config)
1364     @type os_fail: boolean
1365     @ivar os_fail: whether the RPC call didn't return valid OS data
1366     @type oslist: list
1367     @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1368     @type vm_capable: boolean
1369     @ivar vm_capable: whether the node can host instances
1370     @type pv_min: float
1371     @ivar pv_min: size in MiB of the smallest PVs
1372     @type pv_max: float
1373     @ivar pv_max: size in MiB of the biggest PVs
1374
1375     """
1376     def __init__(self, offline=False, name=None, vm_capable=True):
1377       self.name = name
1378       self.volumes = {}
1379       self.instances = []
1380       self.pinst = []
1381       self.sinst = []
1382       self.sbp = {}
1383       self.mfree = 0
1384       self.dfree = 0
1385       self.offline = offline
1386       self.vm_capable = vm_capable
1387       self.rpc_fail = False
1388       self.lvm_fail = False
1389       self.hyp_fail = False
1390       self.ghost = False
1391       self.os_fail = False
1392       self.oslist = {}
1393       self.pv_min = None
1394       self.pv_max = None
1395
1396   def ExpandNames(self):
1397     # This raises errors.OpPrereqError on its own:
1398     self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1399
1400     # Get instances in node group; this is unsafe and needs verification later
1401     inst_names = \
1402       self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1403
1404     self.needed_locks = {
1405       locking.LEVEL_INSTANCE: inst_names,
1406       locking.LEVEL_NODEGROUP: [self.group_uuid],
1407       locking.LEVEL_NODE: [],
1408
1409       # This opcode is run by watcher every five minutes and acquires all nodes
1410       # for a group. It doesn't run for a long time, so it's better to acquire
1411       # the node allocation lock as well.
1412       locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1413       }
1414
1415     self.share_locks = ShareAll()
1416
1417   def DeclareLocks(self, level):
1418     if level == locking.LEVEL_NODE:
1419       # Get members of node group; this is unsafe and needs verification later
1420       nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1421
1422       all_inst_info = self.cfg.GetAllInstancesInfo()
1423
1424       # In Exec(), we warn about mirrored instances that have primary and
1425       # secondary living in separate node groups. To fully verify that
1426       # volumes for these instances are healthy, we will need to do an
1427       # extra call to their secondaries. We ensure here those nodes will
1428       # be locked.
1429       for inst in self.owned_locks(locking.LEVEL_INSTANCE):
1430         # Important: access only the instances whose lock is owned
1431         if all_inst_info[inst].disk_template in constants.DTS_INT_MIRROR:
1432           nodes.update(all_inst_info[inst].secondary_nodes)
1433
1434       self.needed_locks[locking.LEVEL_NODE] = nodes
1435
1436   def CheckPrereq(self):
1437     assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1438     self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1439
1440     group_nodes = set(self.group_info.members)
1441     group_instances = \
1442       self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1443
1444     unlocked_nodes = \
1445         group_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1446
1447     unlocked_instances = \
1448         group_instances.difference(self.owned_locks(locking.LEVEL_INSTANCE))
1449
1450     if unlocked_nodes:
1451       raise errors.OpPrereqError("Missing lock for nodes: %s" %
1452                                  utils.CommaJoin(unlocked_nodes),
1453                                  errors.ECODE_STATE)
1454
1455     if unlocked_instances:
1456       raise errors.OpPrereqError("Missing lock for instances: %s" %
1457                                  utils.CommaJoin(unlocked_instances),
1458                                  errors.ECODE_STATE)
1459
1460     self.all_node_info = self.cfg.GetAllNodesInfo()
1461     self.all_inst_info = self.cfg.GetAllInstancesInfo()
1462
1463     self.my_node_names = utils.NiceSort(group_nodes)
1464     self.my_inst_names = utils.NiceSort(group_instances)
1465
1466     self.my_node_info = dict((name, self.all_node_info[name])
1467                              for name in self.my_node_names)
1468
1469     self.my_inst_info = dict((name, self.all_inst_info[name])
1470                              for name in self.my_inst_names)
1471
1472     # We detect here the nodes that will need the extra RPC calls for verifying
1473     # split LV volumes; they should be locked.
1474     extra_lv_nodes = set()
1475
1476     for inst in self.my_inst_info.values():
1477       if inst.disk_template in constants.DTS_INT_MIRROR:
1478         for nname in inst.all_nodes:
1479           if self.all_node_info[nname].group != self.group_uuid:
1480             extra_lv_nodes.add(nname)
1481
1482     unlocked_lv_nodes = \
1483         extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1484
1485     if unlocked_lv_nodes:
1486       raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1487                                  utils.CommaJoin(unlocked_lv_nodes),
1488                                  errors.ECODE_STATE)
1489     self.extra_lv_nodes = list(extra_lv_nodes)
1490
1491   def _VerifyNode(self, ninfo, nresult):
1492     """Perform some basic validation on data returned from a node.
1493
1494       - check the result data structure is well formed and has all the
1495         mandatory fields
1496       - check ganeti version
1497
1498     @type ninfo: L{objects.Node}
1499     @param ninfo: the node to check
1500     @param nresult: the results from the node
1501     @rtype: boolean
1502     @return: whether overall this call was successful (and we can expect
1503          reasonable values in the respose)
1504
1505     """
1506     node = ninfo.name
1507     _ErrorIf = self._ErrorIf # pylint: disable=C0103
1508
1509     # main result, nresult should be a non-empty dict
1510     test = not nresult or not isinstance(nresult, dict)
1511     _ErrorIf(test, constants.CV_ENODERPC, node,
1512                   "unable to verify node: no data returned")
1513     if test:
1514       return False
1515
1516     # compares ganeti version
1517     local_version = constants.PROTOCOL_VERSION
1518     remote_version = nresult.get("version", None)
1519     test = not (remote_version and
1520                 isinstance(remote_version, (list, tuple)) and
1521                 len(remote_version) == 2)
1522     _ErrorIf(test, constants.CV_ENODERPC, node,
1523              "connection to node returned invalid data")
1524     if test:
1525       return False
1526
1527     test = local_version != remote_version[0]
1528     _ErrorIf(test, constants.CV_ENODEVERSION, node,
1529              "incompatible protocol versions: master %s,"
1530              " node %s", local_version, remote_version[0])
1531     if test:
1532       return False
1533
1534     # node seems compatible, we can actually try to look into its results
1535
1536     # full package version
1537     self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1538                   constants.CV_ENODEVERSION, node,
1539                   "software version mismatch: master %s, node %s",
1540                   constants.RELEASE_VERSION, remote_version[1],
1541                   code=self.ETYPE_WARNING)
1542
1543     hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1544     if ninfo.vm_capable and isinstance(hyp_result, dict):
1545       for hv_name, hv_result in hyp_result.iteritems():
1546         test = hv_result is not None
1547         _ErrorIf(test, constants.CV_ENODEHV, node,
1548                  "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1549
1550     hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1551     if ninfo.vm_capable and isinstance(hvp_result, list):
1552       for item, hv_name, hv_result in hvp_result:
1553         _ErrorIf(True, constants.CV_ENODEHV, node,
1554                  "hypervisor %s parameter verify failure (source %s): %s",
1555                  hv_name, item, hv_result)
1556
1557     test = nresult.get(constants.NV_NODESETUP,
1558                        ["Missing NODESETUP results"])
1559     _ErrorIf(test, constants.CV_ENODESETUP, node, "node setup error: %s",
1560              "; ".join(test))
1561
1562     return True
1563
1564   def _VerifyNodeTime(self, ninfo, nresult,
1565                       nvinfo_starttime, nvinfo_endtime):
1566     """Check the node time.
1567
1568     @type ninfo: L{objects.Node}
1569     @param ninfo: the node to check
1570     @param nresult: the remote results for the node
1571     @param nvinfo_starttime: the start time of the RPC call
1572     @param nvinfo_endtime: the end time of the RPC call
1573
1574     """
1575     node = ninfo.name
1576     _ErrorIf = self._ErrorIf # pylint: disable=C0103
1577
1578     ntime = nresult.get(constants.NV_TIME, None)
1579     try:
1580       ntime_merged = utils.MergeTime(ntime)
1581     except (ValueError, TypeError):
1582       _ErrorIf(True, constants.CV_ENODETIME, node, "Node returned invalid time")
1583       return
1584
1585     if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1586       ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1587     elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1588       ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1589     else:
1590       ntime_diff = None
1591
1592     _ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, node,
1593              "Node time diverges by at least %s from master node time",
1594              ntime_diff)
1595
1596   def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1597     """Check the node LVM results and update info for cross-node checks.
1598
1599     @type ninfo: L{objects.Node}
1600     @param ninfo: the node to check
1601     @param nresult: the remote results for the node
1602     @param vg_name: the configured VG name
1603     @type nimg: L{NodeImage}
1604     @param nimg: node image
1605
1606     """
1607     if vg_name is None:
1608       return
1609
1610     node = ninfo.name
1611     _ErrorIf = self._ErrorIf # pylint: disable=C0103
1612
1613     # checks vg existence and size > 20G
1614     vglist = nresult.get(constants.NV_VGLIST, None)
1615     test = not vglist
1616     _ErrorIf(test, constants.CV_ENODELVM, node, "unable to check volume groups")
1617     if not test:
1618       vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1619                                             constants.MIN_VG_SIZE)
1620       _ErrorIf(vgstatus, constants.CV_ENODELVM, node, vgstatus)
1621
1622     # Check PVs
1623     (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1624     for em in errmsgs:
1625       self._Error(constants.CV_ENODELVM, node, em)
1626     if pvminmax is not None:
1627       (nimg.pv_min, nimg.pv_max) = pvminmax
1628
1629   def _VerifyGroupLVM(self, node_image, vg_name):
1630     """Check cross-node consistency in LVM.
1631
1632     @type node_image: dict
1633     @param node_image: info about nodes, mapping from node to names to
1634       L{NodeImage} objects
1635     @param vg_name: the configured VG name
1636
1637     """
1638     if vg_name is None:
1639       return
1640
1641     # Only exlcusive storage needs this kind of checks
1642     if not self._exclusive_storage:
1643       return
1644
1645     # exclusive_storage wants all PVs to have the same size (approximately),
1646     # if the smallest and the biggest ones are okay, everything is fine.
1647     # pv_min is None iff pv_max is None
1648     vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1649     if not vals:
1650       return
1651     (pvmin, minnode) = min((ni.pv_min, ni.name) for ni in vals)
1652     (pvmax, maxnode) = max((ni.pv_max, ni.name) for ni in vals)
1653     bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1654     self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1655                   "PV sizes differ too much in the group; smallest (%s MB) is"
1656                   " on %s, biggest (%s MB) is on %s",
1657                   pvmin, minnode, pvmax, maxnode)
1658
1659   def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1660     """Check the node bridges.
1661
1662     @type ninfo: L{objects.Node}
1663     @param ninfo: the node to check
1664     @param nresult: the remote results for the node
1665     @param bridges: the expected list of bridges
1666
1667     """
1668     if not bridges:
1669       return
1670
1671     node = ninfo.name
1672     _ErrorIf = self._ErrorIf # pylint: disable=C0103
1673
1674     missing = nresult.get(constants.NV_BRIDGES, None)
1675     test = not isinstance(missing, list)
1676     _ErrorIf(test, constants.CV_ENODENET, node,
1677              "did not return valid bridge information")
1678     if not test:
1679       _ErrorIf(bool(missing), constants.CV_ENODENET, node,
1680                "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1681
1682   def _VerifyNodeUserScripts(self, ninfo, nresult):
1683     """Check the results of user scripts presence and executability on the node
1684
1685     @type ninfo: L{objects.Node}
1686     @param ninfo: the node to check
1687     @param nresult: the remote results for the node
1688
1689     """
1690     node = ninfo.name
1691
1692     test = not constants.NV_USERSCRIPTS in nresult
1693     self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, node,
1694                   "did not return user scripts information")
1695
1696     broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
1697     if not test:
1698       self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, node,
1699                     "user scripts not present or not executable: %s" %
1700                     utils.CommaJoin(sorted(broken_scripts)))
1701
1702   def _VerifyNodeNetwork(self, ninfo, nresult):
1703     """Check the node network connectivity results.
1704
1705     @type ninfo: L{objects.Node}
1706     @param ninfo: the node to check
1707     @param nresult: the remote results for the node
1708
1709     """
1710     node = ninfo.name
1711     _ErrorIf = self._ErrorIf # pylint: disable=C0103
1712
1713     test = constants.NV_NODELIST not in nresult
1714     _ErrorIf(test, constants.CV_ENODESSH, node,
1715              "node hasn't returned node ssh connectivity data")
1716     if not test:
1717       if nresult[constants.NV_NODELIST]:
1718         for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1719           _ErrorIf(True, constants.CV_ENODESSH, node,
1720                    "ssh communication with node '%s': %s", a_node, a_msg)
1721
1722     test = constants.NV_NODENETTEST not in nresult
1723     _ErrorIf(test, constants.CV_ENODENET, node,
1724              "node hasn't returned node tcp connectivity data")
1725     if not test:
1726       if nresult[constants.NV_NODENETTEST]:
1727         nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1728         for anode in nlist:
1729           _ErrorIf(True, constants.CV_ENODENET, node,
1730                    "tcp communication with node '%s': %s",
1731                    anode, nresult[constants.NV_NODENETTEST][anode])
1732
1733     test = constants.NV_MASTERIP not in nresult
1734     _ErrorIf(test, constants.CV_ENODENET, node,
1735              "node hasn't returned node master IP reachability data")
1736     if not test:
1737       if not nresult[constants.NV_MASTERIP]:
1738         if node == self.master_node:
1739           msg = "the master node cannot reach the master IP (not configured?)"
1740         else:
1741           msg = "cannot reach the master IP"
1742         _ErrorIf(True, constants.CV_ENODENET, node, msg)
1743
1744   def _VerifyInstance(self, instance, inst_config, node_image,
1745                       diskstatus):
1746     """Verify an instance.
1747
1748     This function checks to see if the required block devices are
1749     available on the instance's node, and that the nodes are in the correct
1750     state.
1751
1752     """
1753     _ErrorIf = self._ErrorIf # pylint: disable=C0103
1754     pnode = inst_config.primary_node
1755     pnode_img = node_image[pnode]
1756     groupinfo = self.cfg.GetAllNodeGroupsInfo()
1757
1758     node_vol_should = {}
1759     inst_config.MapLVsByNode(node_vol_should)
1760
1761     cluster = self.cfg.GetClusterInfo()
1762     ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
1763                                                             self.group_info)
1764     err = ComputeIPolicyInstanceViolation(ipolicy, inst_config, self.cfg)
1765     _ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance, utils.CommaJoin(err),
1766              code=self.ETYPE_WARNING)
1767
1768     for node in node_vol_should:
1769       n_img = node_image[node]
1770       if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1771         # ignore missing volumes on offline or broken nodes
1772         continue
1773       for volume in node_vol_should[node]:
1774         test = volume not in n_img.volumes
1775         _ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance,
1776                  "volume %s missing on node %s", volume, node)
1777
1778     if inst_config.admin_state == constants.ADMINST_UP:
1779       test = instance not in pnode_img.instances and not pnode_img.offline
1780       _ErrorIf(test, constants.CV_EINSTANCEDOWN, instance,
1781                "instance not running on its primary node %s",
1782                pnode)
1783       _ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE, instance,
1784                "instance is marked as running and lives on offline node %s",
1785                pnode)
1786
1787     diskdata = [(nname, success, status, idx)
1788                 for (nname, disks) in diskstatus.items()
1789                 for idx, (success, status) in enumerate(disks)]
1790
1791     for nname, success, bdev_status, idx in diskdata:
1792       # the 'ghost node' construction in Exec() ensures that we have a
1793       # node here
1794       snode = node_image[nname]
1795       bad_snode = snode.ghost or snode.offline
1796       _ErrorIf(inst_config.disks_active and
1797                not success and not bad_snode,
1798                constants.CV_EINSTANCEFAULTYDISK, instance,
1799                "couldn't retrieve status for disk/%s on %s: %s",
1800                idx, nname, bdev_status)
1801       _ErrorIf((inst_config.disks_active and
1802                 success and bdev_status.ldisk_status == constants.LDS_FAULTY),
1803                constants.CV_EINSTANCEFAULTYDISK, instance,
1804                "disk/%s on %s is faulty", idx, nname)
1805
1806     _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
1807              constants.CV_ENODERPC, pnode, "instance %s, connection to"
1808              " primary node failed", instance)
1809
1810     _ErrorIf(len(inst_config.secondary_nodes) > 1,
1811              constants.CV_EINSTANCELAYOUT,
1812              instance, "instance has multiple secondary nodes: %s",
1813              utils.CommaJoin(inst_config.secondary_nodes),
1814              code=self.ETYPE_WARNING)
1815
1816     if inst_config.disk_template not in constants.DTS_EXCL_STORAGE:
1817       # Disk template not compatible with exclusive_storage: no instance
1818       # node should have the flag set
1819       es_flags = rpc.GetExclusiveStorageForNodeNames(self.cfg,
1820                                                      inst_config.all_nodes)
1821       es_nodes = [n for (n, es) in es_flags.items()
1822                   if es]
1823       _ErrorIf(es_nodes, constants.CV_EINSTANCEUNSUITABLENODE, instance,
1824                "instance has template %s, which is not supported on nodes"
1825                " that have exclusive storage set: %s",
1826                inst_config.disk_template, utils.CommaJoin(es_nodes))
1827
1828     if inst_config.disk_template in constants.DTS_INT_MIRROR:
1829       instance_nodes = utils.NiceSort(inst_config.all_nodes)
1830       instance_groups = {}
1831
1832       for node in instance_nodes:
1833         instance_groups.setdefault(self.all_node_info[node].group,
1834                                    []).append(node)
1835
1836       pretty_list = [
1837         "%s (group %s)" % (utils.CommaJoin(nodes), groupinfo[group].name)
1838         # Sort so that we always list the primary node first.
1839         for group, nodes in sorted(instance_groups.items(),
1840                                    key=lambda (_, nodes): pnode in nodes,
1841                                    reverse=True)]
1842
1843       self._ErrorIf(len(instance_groups) > 1,
1844                     constants.CV_EINSTANCESPLITGROUPS,
1845                     instance, "instance has primary and secondary nodes in"
1846                     " different groups: %s", utils.CommaJoin(pretty_list),
1847                     code=self.ETYPE_WARNING)
1848
1849     inst_nodes_offline = []
1850     for snode in inst_config.secondary_nodes:
1851       s_img = node_image[snode]
1852       _ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
1853                snode, "instance %s, connection to secondary node failed",
1854                instance)
1855
1856       if s_img.offline:
1857         inst_nodes_offline.append(snode)
1858
1859     # warn that the instance lives on offline nodes
1860     _ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE, instance,
1861              "instance has offline secondary node(s) %s",
1862              utils.CommaJoin(inst_nodes_offline))
1863     # ... or ghost/non-vm_capable nodes
1864     for node in inst_config.all_nodes:
1865       _ErrorIf(node_image[node].ghost, constants.CV_EINSTANCEBADNODE,
1866                instance, "instance lives on ghost node %s", node)
1867       _ErrorIf(not node_image[node].vm_capable, constants.CV_EINSTANCEBADNODE,
1868                instance, "instance lives on non-vm_capable node %s", node)
1869
1870   def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
1871     """Verify if there are any unknown volumes in the cluster.
1872
1873     The .os, .swap and backup volumes are ignored. All other volumes are
1874     reported as unknown.
1875
1876     @type reserved: L{ganeti.utils.FieldSet}
1877     @param reserved: a FieldSet of reserved volume names
1878
1879     """
1880     for node, n_img in node_image.items():
1881       if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
1882           self.all_node_info[node].group != self.group_uuid):
1883         # skip non-healthy nodes
1884         continue
1885       for volume in n_img.volumes:
1886         test = ((node not in node_vol_should or
1887                 volume not in node_vol_should[node]) and
1888                 not reserved.Matches(volume))
1889         self._ErrorIf(test, constants.CV_ENODEORPHANLV, node,
1890                       "volume %s is unknown", volume)
1891
1892   def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
1893     """Verify N+1 Memory Resilience.
1894
1895     Check that if one single node dies we can still start all the
1896     instances it was primary for.
1897
1898     """
1899     cluster_info = self.cfg.GetClusterInfo()
1900     for node, n_img in node_image.items():
1901       # This code checks that every node which is now listed as
1902       # secondary has enough memory to host all instances it is
1903       # supposed to should a single other node in the cluster fail.
1904       # FIXME: not ready for failover to an arbitrary node
1905       # FIXME: does not support file-backed instances
1906       # WARNING: we currently take into account down instances as well
1907       # as up ones, considering that even if they're down someone
1908       # might want to start them even in the event of a node failure.
1909       if n_img.offline or self.all_node_info[node].group != self.group_uuid:
1910         # we're skipping nodes marked offline and nodes in other groups from
1911         # the N+1 warning, since most likely we don't have good memory
1912         # infromation from them; we already list instances living on such
1913         # nodes, and that's enough warning
1914         continue
1915       #TODO(dynmem): also consider ballooning out other instances
1916       for prinode, instances in n_img.sbp.items():
1917         needed_mem = 0
1918         for instance in instances:
1919           bep = cluster_info.FillBE(instance_cfg[instance])
1920           if bep[constants.BE_AUTO_BALANCE]:
1921             needed_mem += bep[constants.BE_MINMEM]
1922         test = n_img.mfree < needed_mem
1923         self._ErrorIf(test, constants.CV_ENODEN1, node,
1924                       "not enough memory to accomodate instance failovers"
1925                       " should node %s fail (%dMiB needed, %dMiB available)",
1926                       prinode, needed_mem, n_img.mfree)
1927
1928   @classmethod
1929   def _VerifyFiles(cls, errorif, nodeinfo, master_node, all_nvinfo,
1930                    (files_all, files_opt, files_mc, files_vm)):
1931     """Verifies file checksums collected from all nodes.
1932
1933     @param errorif: Callback for reporting errors
1934     @param nodeinfo: List of L{objects.Node} objects
1935     @param master_node: Name of master node
1936     @param all_nvinfo: RPC results
1937
1938     """
1939     # Define functions determining which nodes to consider for a file
1940     files2nodefn = [
1941       (files_all, None),
1942       (files_mc, lambda node: (node.master_candidate or
1943                                node.name == master_node)),
1944       (files_vm, lambda node: node.vm_capable),
1945       ]
1946
1947     # Build mapping from filename to list of nodes which should have the file
1948     nodefiles = {}
1949     for (files, fn) in files2nodefn:
1950       if fn is None:
1951         filenodes = nodeinfo
1952       else:
1953         filenodes = filter(fn, nodeinfo)
1954       nodefiles.update((filename,
1955                         frozenset(map(operator.attrgetter("name"), filenodes)))
1956                        for filename in files)
1957
1958     assert set(nodefiles) == (files_all | files_mc | files_vm)
1959
1960     fileinfo = dict((filename, {}) for filename in nodefiles)
1961     ignore_nodes = set()
1962
1963     for node in nodeinfo:
1964       if node.offline:
1965         ignore_nodes.add(node.name)
1966         continue
1967
1968       nresult = all_nvinfo[node.name]
1969
1970       if nresult.fail_msg or not nresult.payload:
1971         node_files = None
1972       else:
1973         fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
1974         node_files = dict((vcluster.LocalizeVirtualPath(key), value)
1975                           for (key, value) in fingerprints.items())
1976         del fingerprints
1977
1978       test = not (node_files and isinstance(node_files, dict))
1979       errorif(test, constants.CV_ENODEFILECHECK, node.name,
1980               "Node did not return file checksum data")
1981       if test:
1982         ignore_nodes.add(node.name)
1983         continue
1984
1985       # Build per-checksum mapping from filename to nodes having it
1986       for (filename, checksum) in node_files.items():
1987         assert filename in nodefiles
1988         fileinfo[filename].setdefault(checksum, set()).add(node.name)
1989
1990     for (filename, checksums) in fileinfo.items():
1991       assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
1992
1993       # Nodes having the file
1994       with_file = frozenset(node_name
1995                             for nodes in fileinfo[filename].values()
1996                             for node_name in nodes) - ignore_nodes
1997
1998       expected_nodes = nodefiles[filename] - ignore_nodes
1999
2000       # Nodes missing file
2001       missing_file = expected_nodes - with_file
2002
2003       if filename in files_opt:
2004         # All or no nodes
2005         errorif(missing_file and missing_file != expected_nodes,
2006                 constants.CV_ECLUSTERFILECHECK, None,
2007                 "File %s is optional, but it must exist on all or no"
2008                 " nodes (not found on %s)",
2009                 filename, utils.CommaJoin(utils.NiceSort(missing_file)))
2010       else:
2011         errorif(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2012                 "File %s is missing from node(s) %s", filename,
2013                 utils.CommaJoin(utils.NiceSort(missing_file)))
2014
2015         # Warn if a node has a file it shouldn't
2016         unexpected = with_file - expected_nodes
2017         errorif(unexpected,
2018                 constants.CV_ECLUSTERFILECHECK, None,
2019                 "File %s should not exist on node(s) %s",
2020                 filename, utils.CommaJoin(utils.NiceSort(unexpected)))
2021
2022       # See if there are multiple versions of the file
2023       test = len(checksums) > 1
2024       if test:
2025         variants = ["variant %s on %s" %
2026                     (idx + 1, utils.CommaJoin(utils.NiceSort(nodes)))
2027                     for (idx, (checksum, nodes)) in
2028                       enumerate(sorted(checksums.items()))]
2029       else:
2030         variants = []
2031
2032       errorif(test, constants.CV_ECLUSTERFILECHECK, None,
2033               "File %s found with %s different checksums (%s)",
2034               filename, len(checksums), "; ".join(variants))
2035
2036   def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2037                       drbd_map):
2038     """Verifies and the node DRBD status.
2039
2040     @type ninfo: L{objects.Node}
2041     @param ninfo: the node to check
2042     @param nresult: the remote results for the node
2043     @param instanceinfo: the dict of instances
2044     @param drbd_helper: the configured DRBD usermode helper
2045     @param drbd_map: the DRBD map as returned by
2046         L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2047
2048     """
2049     node = ninfo.name
2050     _ErrorIf = self._ErrorIf # pylint: disable=C0103
2051
2052     if drbd_helper:
2053       helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2054       test = (helper_result is None)
2055       _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2056                "no drbd usermode helper returned")
2057       if helper_result:
2058         status, payload = helper_result
2059         test = not status
2060         _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2061                  "drbd usermode helper check unsuccessful: %s", payload)
2062         test = status and (payload != drbd_helper)
2063         _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2064                  "wrong drbd usermode helper: %s", payload)
2065
2066     # compute the DRBD minors
2067     node_drbd = {}
2068     for minor, instance in drbd_map[node].items():
2069       test = instance not in instanceinfo
2070       _ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2071                "ghost instance '%s' in temporary DRBD map", instance)
2072         # ghost instance should not be running, but otherwise we
2073         # don't give double warnings (both ghost instance and
2074         # unallocated minor in use)
2075       if test:
2076         node_drbd[minor] = (instance, False)
2077       else:
2078         instance = instanceinfo[instance]
2079         node_drbd[minor] = (instance.name, instance.disks_active)
2080
2081     # and now check them
2082     used_minors = nresult.get(constants.NV_DRBDLIST, [])
2083     test = not isinstance(used_minors, (tuple, list))
2084     _ErrorIf(test, constants.CV_ENODEDRBD, node,
2085              "cannot parse drbd status file: %s", str(used_minors))
2086     if test:
2087       # we cannot check drbd status
2088       return
2089
2090     for minor, (iname, must_exist) in node_drbd.items():
2091       test = minor not in used_minors and must_exist
2092       _ErrorIf(test, constants.CV_ENODEDRBD, node,
2093                "drbd minor %d of instance %s is not active", minor, iname)
2094     for minor in used_minors:
2095       test = minor not in node_drbd
2096       _ErrorIf(test, constants.CV_ENODEDRBD, node,
2097                "unallocated drbd minor %d is in use", minor)
2098
2099   def _UpdateNodeOS(self, ninfo, nresult, nimg):
2100     """Builds the node OS structures.
2101
2102     @type ninfo: L{objects.Node}
2103     @param ninfo: the node to check
2104     @param nresult: the remote results for the node
2105     @param nimg: the node image object
2106
2107     """
2108     node = ninfo.name
2109     _ErrorIf = self._ErrorIf # pylint: disable=C0103
2110
2111     remote_os = nresult.get(constants.NV_OSLIST, None)
2112     test = (not isinstance(remote_os, list) or
2113             not compat.all(isinstance(v, list) and len(v) == 7
2114                            for v in remote_os))
2115
2116     _ErrorIf(test, constants.CV_ENODEOS, node,
2117              "node hasn't returned valid OS data")
2118
2119     nimg.os_fail = test
2120
2121     if test:
2122       return
2123
2124     os_dict = {}
2125
2126     for (name, os_path, status, diagnose,
2127          variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2128
2129       if name not in os_dict:
2130         os_dict[name] = []
2131
2132       # parameters is a list of lists instead of list of tuples due to
2133       # JSON lacking a real tuple type, fix it:
2134       parameters = [tuple(v) for v in parameters]
2135       os_dict[name].append((os_path, status, diagnose,
2136                             set(variants), set(parameters), set(api_ver)))
2137
2138     nimg.oslist = os_dict
2139
2140   def _VerifyNodeOS(self, ninfo, nimg, base):
2141     """Verifies the node OS list.
2142
2143     @type ninfo: L{objects.Node}
2144     @param ninfo: the node to check
2145     @param nimg: the node image object
2146     @param base: the 'template' node we match against (e.g. from the master)
2147
2148     """
2149     node = ninfo.name
2150     _ErrorIf = self._ErrorIf # pylint: disable=C0103
2151
2152     assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2153
2154     beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2155     for os_name, os_data in nimg.oslist.items():
2156       assert os_data, "Empty OS status for OS %s?!" % os_name
2157       f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2158       _ErrorIf(not f_status, constants.CV_ENODEOS, node,
2159                "Invalid OS %s (located at %s): %s", os_name, f_path, f_diag)
2160       _ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, node,
2161                "OS '%s' has multiple entries (first one shadows the rest): %s",
2162                os_name, utils.CommaJoin([v[0] for v in os_data]))
2163       # comparisons with the 'base' image
2164       test = os_name not in base.oslist
2165       _ErrorIf(test, constants.CV_ENODEOS, node,
2166                "Extra OS %s not present on reference node (%s)",
2167                os_name, base.name)
2168       if test:
2169         continue
2170       assert base.oslist[os_name], "Base node has empty OS status?"
2171       _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2172       if not b_status:
2173         # base OS is invalid, skipping
2174         continue
2175       for kind, a, b in [("API version", f_api, b_api),
2176                          ("variants list", f_var, b_var),
2177                          ("parameters", beautify_params(f_param),
2178                           beautify_params(b_param))]:
2179         _ErrorIf(a != b, constants.CV_ENODEOS, node,
2180                  "OS %s for %s differs from reference node %s: [%s] vs. [%s]",
2181                  kind, os_name, base.name,
2182                  utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2183
2184     # check any missing OSes
2185     missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2186     _ErrorIf(missing, constants.CV_ENODEOS, node,
2187              "OSes present on reference node %s but missing on this node: %s",
2188              base.name, utils.CommaJoin(missing))
2189
2190   def _VerifyFileStoragePaths(self, ninfo, nresult, is_master):
2191     """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2192
2193     @type ninfo: L{objects.Node}
2194     @param ninfo: the node to check
2195     @param nresult: the remote results for the node
2196     @type is_master: bool
2197     @param is_master: Whether node is the master node
2198
2199     """
2200     node = ninfo.name
2201
2202     if (is_master and
2203         (constants.ENABLE_FILE_STORAGE or
2204          constants.ENABLE_SHARED_FILE_STORAGE)):
2205       try:
2206         fspaths = nresult[constants.NV_FILE_STORAGE_PATHS]
2207       except KeyError:
2208         # This should never happen
2209         self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, node,
2210                       "Node did not return forbidden file storage paths")
2211       else:
2212         self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, node,
2213                       "Found forbidden file storage paths: %s",
2214                       utils.CommaJoin(fspaths))
2215     else:
2216       self._ErrorIf(constants.NV_FILE_STORAGE_PATHS in nresult,
2217                     constants.CV_ENODEFILESTORAGEPATHS, node,
2218                     "Node should not have returned forbidden file storage"
2219                     " paths")
2220
2221   def _VerifyOob(self, ninfo, nresult):
2222     """Verifies out of band functionality of a node.
2223
2224     @type ninfo: L{objects.Node}
2225     @param ninfo: the node to check
2226     @param nresult: the remote results for the node
2227
2228     """
2229     node = ninfo.name
2230     # We just have to verify the paths on master and/or master candidates
2231     # as the oob helper is invoked on the master
2232     if ((ninfo.master_candidate or ninfo.master_capable) and
2233         constants.NV_OOB_PATHS in nresult):
2234       for path_result in nresult[constants.NV_OOB_PATHS]:
2235         self._ErrorIf(path_result, constants.CV_ENODEOOBPATH, node, path_result)
2236
2237   def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2238     """Verifies and updates the node volume data.
2239
2240     This function will update a L{NodeImage}'s internal structures
2241     with data from the remote call.
2242
2243     @type ninfo: L{objects.Node}
2244     @param ninfo: the node to check
2245     @param nresult: the remote results for the node
2246     @param nimg: the node image object
2247     @param vg_name: the configured VG name
2248
2249     """
2250     node = ninfo.name
2251     _ErrorIf = self._ErrorIf # pylint: disable=C0103
2252
2253     nimg.lvm_fail = True
2254     lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2255     if vg_name is None:
2256       pass
2257     elif isinstance(lvdata, basestring):
2258       _ErrorIf(True, constants.CV_ENODELVM, node, "LVM problem on node: %s",
2259                utils.SafeEncode(lvdata))
2260     elif not isinstance(lvdata, dict):
2261       _ErrorIf(True, constants.CV_ENODELVM, node,
2262                "rpc call to node failed (lvlist)")
2263     else:
2264       nimg.volumes = lvdata
2265       nimg.lvm_fail = False
2266
2267   def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2268     """Verifies and updates the node instance list.
2269
2270     If the listing was successful, then updates this node's instance
2271     list. Otherwise, it marks the RPC call as failed for the instance
2272     list key.
2273
2274     @type ninfo: L{objects.Node}
2275     @param ninfo: the node to check
2276     @param nresult: the remote results for the node
2277     @param nimg: the node image object
2278
2279     """
2280     idata = nresult.get(constants.NV_INSTANCELIST, None)
2281     test = not isinstance(idata, list)
2282     self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2283                   "rpc call to node failed (instancelist): %s",
2284                   utils.SafeEncode(str(idata)))
2285     if test:
2286       nimg.hyp_fail = True
2287     else:
2288       nimg.instances = idata
2289
2290   def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2291     """Verifies and computes a node information map
2292
2293     @type ninfo: L{objects.Node}
2294     @param ninfo: the node to check
2295     @param nresult: the remote results for the node
2296     @param nimg: the node image object
2297     @param vg_name: the configured VG name
2298
2299     """
2300     node = ninfo.name
2301     _ErrorIf = self._ErrorIf # pylint: disable=C0103
2302
2303     # try to read free memory (from the hypervisor)
2304     hv_info = nresult.get(constants.NV_HVINFO, None)
2305     test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2306     _ErrorIf(test, constants.CV_ENODEHV, node,
2307              "rpc call to node failed (hvinfo)")
2308     if not test:
2309       try:
2310         nimg.mfree = int(hv_info["memory_free"])
2311       except (ValueError, TypeError):
2312         _ErrorIf(True, constants.CV_ENODERPC, node,
2313                  "node returned invalid nodeinfo, check hypervisor")
2314
2315     # FIXME: devise a free space model for file based instances as well
2316     if vg_name is not None:
2317       test = (constants.NV_VGLIST not in nresult or
2318               vg_name not in nresult[constants.NV_VGLIST])
2319       _ErrorIf(test, constants.CV_ENODELVM, node,
2320                "node didn't return data for the volume group '%s'"
2321                " - it is either missing or broken", vg_name)
2322       if not test:
2323         try:
2324           nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2325         except (ValueError, TypeError):
2326           _ErrorIf(True, constants.CV_ENODERPC, node,
2327                    "node returned invalid LVM info, check LVM status")
2328
2329   def _CollectDiskInfo(self, nodelist, node_image, instanceinfo):
2330     """Gets per-disk status information for all instances.
2331
2332     @type nodelist: list of strings
2333     @param nodelist: Node names
2334     @type node_image: dict of (name, L{objects.Node})
2335     @param node_image: Node objects
2336     @type instanceinfo: dict of (name, L{objects.Instance})
2337     @param instanceinfo: Instance objects
2338     @rtype: {instance: {node: [(succes, payload)]}}
2339     @return: a dictionary of per-instance dictionaries with nodes as
2340         keys and disk information as values; the disk information is a
2341         list of tuples (success, payload)
2342
2343     """
2344     _ErrorIf = self._ErrorIf # pylint: disable=C0103
2345
2346     node_disks = {}
2347     node_disks_devonly = {}
2348     diskless_instances = set()
2349     diskless = constants.DT_DISKLESS
2350
2351     for nname in nodelist:
2352       node_instances = list(itertools.chain(node_image[nname].pinst,
2353                                             node_image[nname].sinst))
2354       diskless_instances.update(inst for inst in node_instances
2355                                 if instanceinfo[inst].disk_template == diskless)
2356       disks = [(inst, disk)
2357                for inst in node_instances
2358                for disk in instanceinfo[inst].disks]
2359
2360       if not disks:
2361         # No need to collect data
2362         continue
2363
2364       node_disks[nname] = disks
2365
2366       # _AnnotateDiskParams makes already copies of the disks
2367       devonly = []
2368       for (inst, dev) in disks:
2369         (anno_disk,) = AnnotateDiskParams(instanceinfo[inst], [dev], self.cfg)
2370         self.cfg.SetDiskID(anno_disk, nname)
2371         devonly.append(anno_disk)
2372
2373       node_disks_devonly[nname] = devonly
2374
2375     assert len(node_disks) == len(node_disks_devonly)
2376
2377     # Collect data from all nodes with disks
2378     result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2379                                                           node_disks_devonly)
2380
2381     assert len(result) == len(node_disks)
2382
2383     instdisk = {}
2384
2385     for (nname, nres) in result.items():
2386       disks = node_disks[nname]
2387
2388       if nres.offline:
2389         # No data from this node
2390         data = len(disks) * [(False, "node offline")]
2391       else:
2392         msg = nres.fail_msg
2393         _ErrorIf(msg, constants.CV_ENODERPC, nname,
2394                  "while getting disk information: %s", msg)
2395         if msg:
2396           # No data from this node
2397           data = len(disks) * [(False, msg)]
2398         else:
2399           data = []
2400           for idx, i in enumerate(nres.payload):
2401             if isinstance(i, (tuple, list)) and len(i) == 2:
2402               data.append(i)
2403             else:
2404               logging.warning("Invalid result from node %s, entry %d: %s",
2405                               nname, idx, i)
2406               data.append((False, "Invalid result from the remote node"))
2407
2408       for ((inst, _), status) in zip(disks, data):
2409         instdisk.setdefault(inst, {}).setdefault(nname, []).append(status)
2410
2411     # Add empty entries for diskless instances.
2412     for inst in diskless_instances:
2413       assert inst not in instdisk
2414       instdisk[inst] = {}
2415
2416     assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2417                       len(nnames) <= len(instanceinfo[inst].all_nodes) and
2418                       compat.all(isinstance(s, (tuple, list)) and
2419                                  len(s) == 2 for s in statuses)
2420                       for inst, nnames in instdisk.items()
2421                       for nname, statuses in nnames.items())
2422     if __debug__:
2423       instdisk_keys = set(instdisk)
2424       instanceinfo_keys = set(instanceinfo)
2425       assert instdisk_keys == instanceinfo_keys, \
2426         ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2427          (instdisk_keys, instanceinfo_keys))
2428
2429     return instdisk
2430
2431   @staticmethod
2432   def _SshNodeSelector(group_uuid, all_nodes):
2433     """Create endless iterators for all potential SSH check hosts.
2434
2435     """
2436     nodes = [node for node in all_nodes
2437              if (node.group != group_uuid and
2438                  not node.offline)]
2439     keyfunc = operator.attrgetter("group")
2440
2441     return map(itertools.cycle,
2442                [sorted(map(operator.attrgetter("name"), names))
2443                 for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2444                                                   keyfunc)])
2445
2446   @classmethod
2447   def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2448     """Choose which nodes should talk to which other nodes.
2449
2450     We will make nodes contact all nodes in their group, and one node from
2451     every other group.
2452
2453     @warning: This algorithm has a known issue if one node group is much
2454       smaller than others (e.g. just one node). In such a case all other
2455       nodes will talk to the single node.
2456
2457     """
2458     online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2459     sel = cls._SshNodeSelector(group_uuid, all_nodes)
2460
2461     return (online_nodes,
2462             dict((name, sorted([i.next() for i in sel]))
2463                  for name in online_nodes))
2464
2465   def BuildHooksEnv(self):
2466     """Build hooks env.
2467
2468     Cluster-Verify hooks just ran in the post phase and their failure makes
2469     the output be logged in the verify output and the verification to fail.
2470
2471     """
2472     env = {
2473       "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2474       }
2475
2476     env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2477                for node in self.my_node_info.values())
2478
2479     return env
2480
2481   def BuildHooksNodes(self):
2482     """Build hooks nodes.
2483
2484     """
2485     return ([], self.my_node_names)
2486
2487   def Exec(self, feedback_fn):
2488     """Verify integrity of the node group, performing various test on nodes.
2489
2490     """
2491     # This method has too many local variables. pylint: disable=R0914
2492     feedback_fn("* Verifying group '%s'" % self.group_info.name)
2493
2494     if not self.my_node_names:
2495       # empty node group
2496       feedback_fn("* Empty node group, skipping verification")
2497       return True
2498
2499     self.bad = False
2500     _ErrorIf = self._ErrorIf # pylint: disable=C0103
2501     verbose = self.op.verbose
2502     self._feedback_fn = feedback_fn
2503
2504     vg_name = self.cfg.GetVGName()
2505     drbd_helper = self.cfg.GetDRBDHelper()
2506     cluster = self.cfg.GetClusterInfo()
2507     hypervisors = cluster.enabled_hypervisors
2508     node_data_list = [self.my_node_info[name] for name in self.my_node_names]
2509
2510     i_non_redundant = [] # Non redundant instances
2511     i_non_a_balanced = [] # Non auto-balanced instances
2512     i_offline = 0 # Count of offline instances
2513     n_offline = 0 # Count of offline nodes
2514     n_drained = 0 # Count of nodes being drained
2515     node_vol_should = {}
2516
2517     # FIXME: verify OS list
2518
2519     # File verification
2520     filemap = ComputeAncillaryFiles(cluster, False)
2521
2522     # do local checksums
2523     master_node = self.master_node = self.cfg.GetMasterNode()
2524     master_ip = self.cfg.GetMasterIP()
2525
2526     feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_names))
2527
2528     user_scripts = []
2529     if self.cfg.GetUseExternalMipScript():
2530       user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2531
2532     node_verify_param = {
2533       constants.NV_FILELIST:
2534         map(vcluster.MakeVirtualPath,
2535             utils.UniqueSequence(filename
2536                                  for files in filemap
2537                                  for filename in files)),
2538       constants.NV_NODELIST:
2539         self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2540                                   self.all_node_info.values()),
2541       constants.NV_HYPERVISOR: hypervisors,
2542       constants.NV_HVPARAMS:
2543         _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2544       constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2545                                  for node in node_data_list
2546                                  if not node.offline],
2547       constants.NV_INSTANCELIST: hypervisors,
2548       constants.NV_VERSION: None,
2549       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2550       constants.NV_NODESETUP: None,
2551       constants.NV_TIME: None,
2552       constants.NV_MASTERIP: (master_node, master_ip),
2553       constants.NV_OSLIST: None,
2554       constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2555       constants.NV_USERSCRIPTS: user_scripts,
2556       }
2557
2558     if vg_name is not None:
2559       node_verify_param[constants.NV_VGLIST] = None
2560       node_verify_param[constants.NV_LVLIST] = vg_name
2561       node_verify_param[constants.NV_PVLIST] = [vg_name]
2562
2563     if drbd_helper:
2564       node_verify_param[constants.NV_DRBDLIST] = None
2565       node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2566
2567     if constants.ENABLE_FILE_STORAGE or constants.ENABLE_SHARED_FILE_STORAGE:
2568       # Load file storage paths only from master node
2569       node_verify_param[constants.NV_FILE_STORAGE_PATHS] = master_node
2570
2571     # bridge checks
2572     # FIXME: this needs to be changed per node-group, not cluster-wide
2573     bridges = set()
2574     default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2575     if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2576       bridges.add(default_nicpp[constants.NIC_LINK])
2577     for instance in self.my_inst_info.values():
2578       for nic in instance.nics:
2579         full_nic = cluster.SimpleFillNIC(nic.nicparams)
2580         if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2581           bridges.add(full_nic[constants.NIC_LINK])
2582
2583     if bridges:
2584       node_verify_param[constants.NV_BRIDGES] = list(bridges)
2585
2586     # Build our expected cluster state
2587     node_image = dict((node.name, self.NodeImage(offline=node.offline,
2588                                                  name=node.name,
2589                                                  vm_capable=node.vm_capable))
2590                       for node in node_data_list)
2591
2592     # Gather OOB paths
2593     oob_paths = []
2594     for node in self.all_node_info.values():
2595       path = SupportsOob(self.cfg, node)
2596       if path and path not in oob_paths:
2597         oob_paths.append(path)
2598
2599     if oob_paths:
2600       node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2601
2602     for instance in self.my_inst_names:
2603       inst_config = self.my_inst_info[instance]
2604       if inst_config.admin_state == constants.ADMINST_OFFLINE:
2605         i_offline += 1
2606
2607       for nname in inst_config.all_nodes:
2608         if nname not in node_image:
2609           gnode = self.NodeImage(name=nname)
2610           gnode.ghost = (nname not in self.all_node_info)
2611           node_image[nname] = gnode
2612
2613       inst_config.MapLVsByNode(node_vol_should)
2614
2615       pnode = inst_config.primary_node
2616       node_image[pnode].pinst.append(instance)
2617
2618       for snode in inst_config.secondary_nodes:
2619         nimg = node_image[snode]
2620         nimg.sinst.append(instance)
2621         if pnode not in nimg.sbp:
2622           nimg.sbp[pnode] = []
2623         nimg.sbp[pnode].append(instance)
2624
2625     es_flags = rpc.GetExclusiveStorageForNodeNames(self.cfg, self.my_node_names)
2626     # The value of exclusive_storage should be the same across the group, so if
2627     # it's True for at least a node, we act as if it were set for all the nodes
2628     self._exclusive_storage = compat.any(es_flags.values())
2629     if self._exclusive_storage:
2630       node_verify_param[constants.NV_EXCLUSIVEPVS] = True
2631
2632     # At this point, we have the in-memory data structures complete,
2633     # except for the runtime information, which we'll gather next
2634
2635     # Due to the way our RPC system works, exact response times cannot be
2636     # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2637     # time before and after executing the request, we can at least have a time
2638     # window.
2639     nvinfo_starttime = time.time()
2640     all_nvinfo = self.rpc.call_node_verify(self.my_node_names,
2641                                            node_verify_param,
2642                                            self.cfg.GetClusterName())
2643     nvinfo_endtime = time.time()
2644
2645     if self.extra_lv_nodes and vg_name is not None:
2646       extra_lv_nvinfo = \
2647           self.rpc.call_node_verify(self.extra_lv_nodes,
2648                                     {constants.NV_LVLIST: vg_name},
2649                                     self.cfg.GetClusterName())
2650     else:
2651       extra_lv_nvinfo = {}
2652
2653     all_drbd_map = self.cfg.ComputeDRBDMap()
2654
2655     feedback_fn("* Gathering disk information (%s nodes)" %
2656                 len(self.my_node_names))
2657     instdisk = self._CollectDiskInfo(self.my_node_names, node_image,
2658                                      self.my_inst_info)
2659
2660     feedback_fn("* Verifying configuration file consistency")
2661
2662     # If not all nodes are being checked, we need to make sure the master node
2663     # and a non-checked vm_capable node are in the list.
2664     absent_nodes = set(self.all_node_info).difference(self.my_node_info)
2665     if absent_nodes:
2666       vf_nvinfo = all_nvinfo.copy()
2667       vf_node_info = list(self.my_node_info.values())
2668       additional_nodes = []
2669       if master_node not in self.my_node_info:
2670         additional_nodes.append(master_node)
2671         vf_node_info.append(self.all_node_info[master_node])
2672       # Add the first vm_capable node we find which is not included,
2673       # excluding the master node (which we already have)
2674       for node in absent_nodes:
2675         nodeinfo = self.all_node_info[node]
2676         if (nodeinfo.vm_capable and not nodeinfo.offline and
2677             node != master_node):
2678           additional_nodes.append(node)
2679           vf_node_info.append(self.all_node_info[node])
2680           break
2681       key = constants.NV_FILELIST
2682       vf_nvinfo.update(self.rpc.call_node_verify(additional_nodes,
2683                                                  {key: node_verify_param[key]},
2684                                                  self.cfg.GetClusterName()))
2685     else:
2686       vf_nvinfo = all_nvinfo
2687       vf_node_info = self.my_node_info.values()
2688
2689     self._VerifyFiles(_ErrorIf, vf_node_info, master_node, vf_nvinfo, filemap)
2690
2691     feedback_fn("* Verifying node status")
2692
2693     refos_img = None
2694
2695     for node_i in node_data_list:
2696       node = node_i.name
2697       nimg = node_image[node]
2698
2699       if node_i.offline:
2700         if verbose:
2701           feedback_fn("* Skipping offline node %s" % (node,))
2702         n_offline += 1
2703         continue
2704
2705       if node == master_node:
2706         ntype = "master"
2707       elif node_i.master_candidate:
2708         ntype = "master candidate"
2709       elif node_i.drained:
2710         ntype = "drained"
2711         n_drained += 1
2712       else:
2713         ntype = "regular"
2714       if verbose:
2715         feedback_fn("* Verifying node %s (%s)" % (node, ntype))
2716
2717       msg = all_nvinfo[node].fail_msg
2718       _ErrorIf(msg, constants.CV_ENODERPC, node, "while contacting node: %s",
2719                msg)
2720       if msg:
2721         nimg.rpc_fail = True
2722         continue
2723
2724       nresult = all_nvinfo[node].payload
2725
2726       nimg.call_ok = self._VerifyNode(node_i, nresult)
2727       self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2728       self._VerifyNodeNetwork(node_i, nresult)
2729       self._VerifyNodeUserScripts(node_i, nresult)
2730       self._VerifyOob(node_i, nresult)
2731       self._VerifyFileStoragePaths(node_i, nresult,
2732                                    node == master_node)
2733
2734       if nimg.vm_capable:
2735         self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
2736         self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
2737                              all_drbd_map)
2738
2739         self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2740         self._UpdateNodeInstances(node_i, nresult, nimg)
2741         self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2742         self._UpdateNodeOS(node_i, nresult, nimg)
2743
2744         if not nimg.os_fail:
2745           if refos_img is None:
2746             refos_img = nimg
2747           self._VerifyNodeOS(node_i, nimg, refos_img)
2748         self._VerifyNodeBridges(node_i, nresult, bridges)
2749
2750         # Check whether all running instancies are primary for the node. (This
2751         # can no longer be done from _VerifyInstance below, since some of the
2752         # wrong instances could be from other node groups.)
2753         non_primary_inst = set(nimg.instances).difference(nimg.pinst)
2754
2755         for inst in non_primary_inst:
2756           test = inst in self.all_inst_info
2757           _ErrorIf(test, constants.CV_EINSTANCEWRONGNODE, inst,
2758                    "instance should not run on node %s", node_i.name)
2759           _ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
2760                    "node is running unknown instance %s", inst)
2761
2762     self._VerifyGroupLVM(node_image, vg_name)
2763
2764     for node, result in extra_lv_nvinfo.items():
2765       self._UpdateNodeVolumes(self.all_node_info[node], result.payload,
2766                               node_image[node], vg_name)
2767
2768     feedback_fn("* Verifying instance status")
2769     for instance in self.my_inst_names:
2770       if verbose:
2771         feedback_fn("* Verifying instance %s" % instance)
2772       inst_config = self.my_inst_info[instance]
2773       self._VerifyInstance(instance, inst_config, node_image,
2774                            instdisk[instance])
2775
2776       # If the instance is non-redundant we cannot survive losing its primary
2777       # node, so we are not N+1 compliant.
2778       if inst_config.disk_template not in constants.DTS_MIRRORED:
2779         i_non_redundant.append(instance)
2780
2781       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
2782         i_non_a_balanced.append(instance)
2783
2784     feedback_fn("* Verifying orphan volumes")
2785     reserved = utils.FieldSet(*cluster.reserved_lvs)
2786
2787     # We will get spurious "unknown volume" warnings if any node of this group
2788     # is secondary for an instance whose primary is in another group. To avoid
2789     # them, we find these instances and add their volumes to node_vol_should.
2790     for inst in self.all_inst_info.values():
2791       for secondary in inst.secondary_nodes:
2792         if (secondary in self.my_node_info
2793             and inst.name not in self.my_inst_info):
2794           inst.MapLVsByNode(node_vol_should)
2795           break
2796
2797     self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
2798
2799     if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
2800       feedback_fn("* Verifying N+1 Memory redundancy")
2801       self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
2802
2803     feedback_fn("* Other Notes")
2804     if i_non_redundant:
2805       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
2806                   % len(i_non_redundant))
2807
2808     if i_non_a_balanced:
2809       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
2810                   % len(i_non_a_balanced))
2811
2812     if i_offline:
2813       feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
2814
2815     if n_offline:
2816       feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
2817
2818     if n_drained:
2819       feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
2820
2821     return not self.bad
2822
2823   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
2824     """Analyze the post-hooks' result
2825
2826     This method analyses the hook result, handles it, and sends some
2827     nicely-formatted feedback back to the user.
2828
2829     @param phase: one of L{constants.HOOKS_PHASE_POST} or
2830         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
2831     @param hooks_results: the results of the multi-node hooks rpc call
2832     @param feedback_fn: function used send feedback back to the caller
2833     @param lu_result: previous Exec result
2834     @return: the new Exec result, based on the previous result
2835         and hook results
2836
2837     """
2838     # We only really run POST phase hooks, only for non-empty groups,
2839     # and are only interested in their results
2840     if not self.my_node_names:
2841       # empty node group
2842       pass
2843     elif phase == constants.HOOKS_PHASE_POST:
2844       # Used to change hooks' output to proper indentation
2845       feedback_fn("* Hooks Results")
2846       assert hooks_results, "invalid result from hooks"
2847
2848       for node_name in hooks_results:
2849         res = hooks_results[node_name]
2850         msg = res.fail_msg
2851         test = msg and not res.offline
2852         self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
2853                       "Communication failure in hooks execution: %s", msg)
2854         if res.offline or msg:
2855           # No need to investigate payload if node is offline or gave
2856           # an error.
2857           continue
2858         for script, hkr, output in res.payload:
2859           test = hkr == constants.HKR_FAIL
2860           self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
2861                         "Script %s failed, output:", script)
2862           if test:
2863             output = self._HOOKS_INDENT_RE.sub("      ", output)
2864             feedback_fn("%s" % output)
2865             lu_result = False
2866
2867     return lu_result
2868
2869
2870 class LUClusterVerifyDisks(NoHooksLU):
2871   """Verifies the cluster disks status.
2872
2873   """
2874   REQ_BGL = False
2875
2876   def ExpandNames(self):
2877     self.share_locks = ShareAll()
2878     self.needed_locks = {
2879       locking.LEVEL_NODEGROUP: locking.ALL_SET,
2880       }
2881
2882   def Exec(self, feedback_fn):
2883     group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
2884
2885     # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
2886     return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
2887                            for group in group_names])