(2.10) Hotplug: cmdlib support
[ganeti-local] / lib / cmdlib / cluster.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Logical units dealing with the cluster."""
23
24 import OpenSSL
25
26 import copy
27 import itertools
28 import logging
29 import operator
30 import os
31 import re
32 import time
33
34 from ganeti import compat
35 from ganeti import constants
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import masterd
40 from ganeti import netutils
41 from ganeti import objects
42 from ganeti import opcodes
43 from ganeti import pathutils
44 from ganeti import query
45 from ganeti import rpc
46 from ganeti import runtime
47 from ganeti import ssh
48 from ganeti import uidpool
49 from ganeti import utils
50 from ganeti import vcluster
51
52 from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
53   ResultWithJobs
54 from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55   ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56   GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57   GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58   CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59   ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob
60
61 import ganeti.masterd.instance
62
63
64 class LUClusterActivateMasterIp(NoHooksLU):
65   """Activate the master IP on the master node.
66
67   """
68   def Exec(self, feedback_fn):
69     """Activate the master IP.
70
71     """
72     master_params = self.cfg.GetMasterNetworkParameters()
73     ems = self.cfg.GetUseExternalMipScript()
74     result = self.rpc.call_node_activate_master_ip(master_params.name,
75                                                    master_params, ems)
76     result.Raise("Could not activate the master IP")
77
78
79 class LUClusterDeactivateMasterIp(NoHooksLU):
80   """Deactivate the master IP on the master node.
81
82   """
83   def Exec(self, feedback_fn):
84     """Deactivate the master IP.
85
86     """
87     master_params = self.cfg.GetMasterNetworkParameters()
88     ems = self.cfg.GetUseExternalMipScript()
89     result = self.rpc.call_node_deactivate_master_ip(master_params.name,
90                                                      master_params, ems)
91     result.Raise("Could not deactivate the master IP")
92
93
94 class LUClusterConfigQuery(NoHooksLU):
95   """Return configuration values.
96
97   """
98   REQ_BGL = False
99
100   def CheckArguments(self):
101     self.cq = ClusterQuery(None, self.op.output_fields, False)
102
103   def ExpandNames(self):
104     self.cq.ExpandNames(self)
105
106   def DeclareLocks(self, level):
107     self.cq.DeclareLocks(self, level)
108
109   def Exec(self, feedback_fn):
110     result = self.cq.OldStyleQuery(self)
111
112     assert len(result) == 1
113
114     return result[0]
115
116
117 class LUClusterDestroy(LogicalUnit):
118   """Logical unit for destroying the cluster.
119
120   """
121   HPATH = "cluster-destroy"
122   HTYPE = constants.HTYPE_CLUSTER
123
124   def BuildHooksEnv(self):
125     """Build hooks env.
126
127     """
128     return {
129       "OP_TARGET": self.cfg.GetClusterName(),
130       }
131
132   def BuildHooksNodes(self):
133     """Build hooks nodes.
134
135     """
136     return ([], [])
137
138   def CheckPrereq(self):
139     """Check prerequisites.
140
141     This checks whether the cluster is empty.
142
143     Any errors are signaled by raising errors.OpPrereqError.
144
145     """
146     master = self.cfg.GetMasterNode()
147
148     nodelist = self.cfg.GetNodeList()
149     if len(nodelist) != 1 or nodelist[0] != master:
150       raise errors.OpPrereqError("There are still %d node(s) in"
151                                  " this cluster." % (len(nodelist) - 1),
152                                  errors.ECODE_INVAL)
153     instancelist = self.cfg.GetInstanceList()
154     if instancelist:
155       raise errors.OpPrereqError("There are still %d instance(s) in"
156                                  " this cluster." % len(instancelist),
157                                  errors.ECODE_INVAL)
158
159   def Exec(self, feedback_fn):
160     """Destroys the cluster.
161
162     """
163     master_params = self.cfg.GetMasterNetworkParameters()
164
165     # Run post hooks on master node before it's removed
166     RunPostHook(self, master_params.name)
167
168     ems = self.cfg.GetUseExternalMipScript()
169     result = self.rpc.call_node_deactivate_master_ip(master_params.name,
170                                                      master_params, ems)
171     if result.fail_msg:
172       self.LogWarning("Error disabling the master IP address: %s",
173                       result.fail_msg)
174
175     return master_params.name
176
177
178 class LUClusterPostInit(LogicalUnit):
179   """Logical unit for running hooks after cluster initialization.
180
181   """
182   HPATH = "cluster-init"
183   HTYPE = constants.HTYPE_CLUSTER
184
185   def BuildHooksEnv(self):
186     """Build hooks env.
187
188     """
189     return {
190       "OP_TARGET": self.cfg.GetClusterName(),
191       }
192
193   def BuildHooksNodes(self):
194     """Build hooks nodes.
195
196     """
197     return ([], [self.cfg.GetMasterNode()])
198
199   def Exec(self, feedback_fn):
200     """Nothing to do.
201
202     """
203     return True
204
205
206 class ClusterQuery(QueryBase):
207   FIELDS = query.CLUSTER_FIELDS
208
209   #: Do not sort (there is only one item)
210   SORT_FIELD = None
211
212   def ExpandNames(self, lu):
213     lu.needed_locks = {}
214
215     # The following variables interact with _QueryBase._GetNames
216     self.wanted = locking.ALL_SET
217     self.do_locking = self.use_locking
218
219     if self.do_locking:
220       raise errors.OpPrereqError("Can not use locking for cluster queries",
221                                  errors.ECODE_INVAL)
222
223   def DeclareLocks(self, lu, level):
224     pass
225
226   def _GetQueryData(self, lu):
227     """Computes the list of nodes and their attributes.
228
229     """
230     # Locking is not used
231     assert not (compat.any(lu.glm.is_owned(level)
232                            for level in locking.LEVELS
233                            if level != locking.LEVEL_CLUSTER) or
234                 self.do_locking or self.use_locking)
235
236     if query.CQ_CONFIG in self.requested_data:
237       cluster = lu.cfg.GetClusterInfo()
238     else:
239       cluster = NotImplemented
240
241     if query.CQ_QUEUE_DRAINED in self.requested_data:
242       drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
243     else:
244       drain_flag = NotImplemented
245
246     if query.CQ_WATCHER_PAUSE in self.requested_data:
247       master_name = lu.cfg.GetMasterNode()
248
249       result = lu.rpc.call_get_watcher_pause(master_name)
250       result.Raise("Can't retrieve watcher pause from master node '%s'" %
251                    master_name)
252
253       watcher_pause = result.payload
254     else:
255       watcher_pause = NotImplemented
256
257     return query.ClusterQueryData(cluster, drain_flag, watcher_pause)
258
259
260 class LUClusterQuery(NoHooksLU):
261   """Query cluster configuration.
262
263   """
264   REQ_BGL = False
265
266   def ExpandNames(self):
267     self.needed_locks = {}
268
269   def Exec(self, feedback_fn):
270     """Return cluster config.
271
272     """
273     cluster = self.cfg.GetClusterInfo()
274     os_hvp = {}
275
276     # Filter just for enabled hypervisors
277     for os_name, hv_dict in cluster.os_hvp.items():
278       os_hvp[os_name] = {}
279       for hv_name, hv_params in hv_dict.items():
280         if hv_name in cluster.enabled_hypervisors:
281           os_hvp[os_name][hv_name] = hv_params
282
283     # Convert ip_family to ip_version
284     primary_ip_version = constants.IP4_VERSION
285     if cluster.primary_ip_family == netutils.IP6Address.family:
286       primary_ip_version = constants.IP6_VERSION
287
288     result = {
289       "software_version": constants.RELEASE_VERSION,
290       "protocol_version": constants.PROTOCOL_VERSION,
291       "config_version": constants.CONFIG_VERSION,
292       "os_api_version": max(constants.OS_API_VERSIONS),
293       "export_version": constants.EXPORT_VERSION,
294       "vcs_version": constants.VCS_VERSION,
295       "architecture": runtime.GetArchInfo(),
296       "name": cluster.cluster_name,
297       "master": cluster.master_node,
298       "default_hypervisor": cluster.primary_hypervisor,
299       "enabled_hypervisors": cluster.enabled_hypervisors,
300       "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
301                         for hypervisor_name in cluster.enabled_hypervisors]),
302       "os_hvp": os_hvp,
303       "beparams": cluster.beparams,
304       "osparams": cluster.osparams,
305       "ipolicy": cluster.ipolicy,
306       "nicparams": cluster.nicparams,
307       "ndparams": cluster.ndparams,
308       "diskparams": cluster.diskparams,
309       "candidate_pool_size": cluster.candidate_pool_size,
310       "master_netdev": cluster.master_netdev,
311       "master_netmask": cluster.master_netmask,
312       "use_external_mip_script": cluster.use_external_mip_script,
313       "volume_group_name": cluster.volume_group_name,
314       "drbd_usermode_helper": cluster.drbd_usermode_helper,
315       "file_storage_dir": cluster.file_storage_dir,
316       "shared_file_storage_dir": cluster.shared_file_storage_dir,
317       "maintain_node_health": cluster.maintain_node_health,
318       "ctime": cluster.ctime,
319       "mtime": cluster.mtime,
320       "uuid": cluster.uuid,
321       "tags": list(cluster.GetTags()),
322       "uid_pool": cluster.uid_pool,
323       "default_iallocator": cluster.default_iallocator,
324       "reserved_lvs": cluster.reserved_lvs,
325       "primary_ip_version": primary_ip_version,
326       "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
327       "hidden_os": cluster.hidden_os,
328       "blacklisted_os": cluster.blacklisted_os,
329       "enabled_disk_templates": cluster.enabled_disk_templates,
330       }
331
332     return result
333
334
335 class LUClusterRedistConf(NoHooksLU):
336   """Force the redistribution of cluster configuration.
337
338   This is a very simple LU.
339
340   """
341   REQ_BGL = False
342
343   def ExpandNames(self):
344     self.needed_locks = {
345       locking.LEVEL_NODE: locking.ALL_SET,
346       locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
347     }
348     self.share_locks = ShareAll()
349
350   def Exec(self, feedback_fn):
351     """Redistribute the configuration.
352
353     """
354     self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
355     RedistributeAncillaryFiles(self)
356
357
358 class LUClusterRename(LogicalUnit):
359   """Rename the cluster.
360
361   """
362   HPATH = "cluster-rename"
363   HTYPE = constants.HTYPE_CLUSTER
364
365   def BuildHooksEnv(self):
366     """Build hooks env.
367
368     """
369     return {
370       "OP_TARGET": self.cfg.GetClusterName(),
371       "NEW_NAME": self.op.name,
372       }
373
374   def BuildHooksNodes(self):
375     """Build hooks nodes.
376
377     """
378     return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
379
380   def CheckPrereq(self):
381     """Verify that the passed name is a valid one.
382
383     """
384     hostname = netutils.GetHostname(name=self.op.name,
385                                     family=self.cfg.GetPrimaryIPFamily())
386
387     new_name = hostname.name
388     self.ip = new_ip = hostname.ip
389     old_name = self.cfg.GetClusterName()
390     old_ip = self.cfg.GetMasterIP()
391     if new_name == old_name and new_ip == old_ip:
392       raise errors.OpPrereqError("Neither the name nor the IP address of the"
393                                  " cluster has changed",
394                                  errors.ECODE_INVAL)
395     if new_ip != old_ip:
396       if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
397         raise errors.OpPrereqError("The given cluster IP address (%s) is"
398                                    " reachable on the network" %
399                                    new_ip, errors.ECODE_NOTUNIQUE)
400
401     self.op.name = new_name
402
403   def Exec(self, feedback_fn):
404     """Rename the cluster.
405
406     """
407     clustername = self.op.name
408     new_ip = self.ip
409
410     # shutdown the master IP
411     master_params = self.cfg.GetMasterNetworkParameters()
412     ems = self.cfg.GetUseExternalMipScript()
413     result = self.rpc.call_node_deactivate_master_ip(master_params.name,
414                                                      master_params, ems)
415     result.Raise("Could not disable the master role")
416
417     try:
418       cluster = self.cfg.GetClusterInfo()
419       cluster.cluster_name = clustername
420       cluster.master_ip = new_ip
421       self.cfg.Update(cluster, feedback_fn)
422
423       # update the known hosts file
424       ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
425       node_list = self.cfg.GetOnlineNodeList()
426       try:
427         node_list.remove(master_params.name)
428       except ValueError:
429         pass
430       UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
431     finally:
432       master_params.ip = new_ip
433       result = self.rpc.call_node_activate_master_ip(master_params.name,
434                                                      master_params, ems)
435       msg = result.fail_msg
436       if msg:
437         self.LogWarning("Could not re-enable the master role on"
438                         " the master, please restart manually: %s", msg)
439
440     return clustername
441
442
443 class LUClusterRepairDiskSizes(NoHooksLU):
444   """Verifies the cluster disks sizes.
445
446   """
447   REQ_BGL = False
448
449   def ExpandNames(self):
450     if self.op.instances:
451       self.wanted_names = GetWantedInstances(self, self.op.instances)
452       # Not getting the node allocation lock as only a specific set of
453       # instances (and their nodes) is going to be acquired
454       self.needed_locks = {
455         locking.LEVEL_NODE_RES: [],
456         locking.LEVEL_INSTANCE: self.wanted_names,
457         }
458       self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
459     else:
460       self.wanted_names = None
461       self.needed_locks = {
462         locking.LEVEL_NODE_RES: locking.ALL_SET,
463         locking.LEVEL_INSTANCE: locking.ALL_SET,
464
465         # This opcode is acquires the node locks for all instances
466         locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
467         }
468
469     self.share_locks = {
470       locking.LEVEL_NODE_RES: 1,
471       locking.LEVEL_INSTANCE: 0,
472       locking.LEVEL_NODE_ALLOC: 1,
473       }
474
475   def DeclareLocks(self, level):
476     if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
477       self._LockInstancesNodes(primary_only=True, level=level)
478
479   def CheckPrereq(self):
480     """Check prerequisites.
481
482     This only checks the optional instance list against the existing names.
483
484     """
485     if self.wanted_names is None:
486       self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
487
488     self.wanted_instances = \
489         map(compat.snd, self.cfg.GetMultiInstanceInfo(self.wanted_names))
490
491   def _EnsureChildSizes(self, disk):
492     """Ensure children of the disk have the needed disk size.
493
494     This is valid mainly for DRBD8 and fixes an issue where the
495     children have smaller disk size.
496
497     @param disk: an L{ganeti.objects.Disk} object
498
499     """
500     if disk.dev_type == constants.LD_DRBD8:
501       assert disk.children, "Empty children for DRBD8?"
502       fchild = disk.children[0]
503       mismatch = fchild.size < disk.size
504       if mismatch:
505         self.LogInfo("Child disk has size %d, parent %d, fixing",
506                      fchild.size, disk.size)
507         fchild.size = disk.size
508
509       # and we recurse on this child only, not on the metadev
510       return self._EnsureChildSizes(fchild) or mismatch
511     else:
512       return False
513
514   def Exec(self, feedback_fn):
515     """Verify the size of cluster disks.
516
517     """
518     # TODO: check child disks too
519     # TODO: check differences in size between primary/secondary nodes
520     per_node_disks = {}
521     for instance in self.wanted_instances:
522       pnode = instance.primary_node
523       if pnode not in per_node_disks:
524         per_node_disks[pnode] = []
525       for idx, disk in enumerate(instance.disks):
526         per_node_disks[pnode].append((instance, idx, disk))
527
528     assert not (frozenset(per_node_disks.keys()) -
529                 self.owned_locks(locking.LEVEL_NODE_RES)), \
530       "Not owning correct locks"
531     assert not self.owned_locks(locking.LEVEL_NODE)
532
533     changed = []
534     for node, dskl in per_node_disks.items():
535       newl = [v[2].Copy() for v in dskl]
536       for dsk in newl:
537         self.cfg.SetDiskID(dsk, node)
538       result = self.rpc.call_blockdev_getsize(node, newl)
539       if result.fail_msg:
540         self.LogWarning("Failure in blockdev_getsize call to node"
541                         " %s, ignoring", node)
542         continue
543       if len(result.payload) != len(dskl):
544         logging.warning("Invalid result from node %s: len(dksl)=%d,"
545                         " result.payload=%s", node, len(dskl), result.payload)
546         self.LogWarning("Invalid result from node %s, ignoring node results",
547                         node)
548         continue
549       for ((instance, idx, disk), size) in zip(dskl, result.payload):
550         if size is None:
551           self.LogWarning("Disk %d of instance %s did not return size"
552                           " information, ignoring", idx, instance.name)
553           continue
554         if not isinstance(size, (int, long)):
555           self.LogWarning("Disk %d of instance %s did not return valid"
556                           " size information, ignoring", idx, instance.name)
557           continue
558         size = size >> 20
559         if size != disk.size:
560           self.LogInfo("Disk %d of instance %s has mismatched size,"
561                        " correcting: recorded %d, actual %d", idx,
562                        instance.name, disk.size, size)
563           disk.size = size
564           self.cfg.Update(instance, feedback_fn)
565           changed.append((instance.name, idx, size))
566         if self._EnsureChildSizes(disk):
567           self.cfg.Update(instance, feedback_fn)
568           changed.append((instance.name, idx, disk.size))
569     return changed
570
571
572 def _ValidateNetmask(cfg, netmask):
573   """Checks if a netmask is valid.
574
575   @type cfg: L{config.ConfigWriter}
576   @param cfg: The cluster configuration
577   @type netmask: int
578   @param netmask: the netmask to be verified
579   @raise errors.OpPrereqError: if the validation fails
580
581   """
582   ip_family = cfg.GetPrimaryIPFamily()
583   try:
584     ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
585   except errors.ProgrammerError:
586     raise errors.OpPrereqError("Invalid primary ip family: %s." %
587                                ip_family, errors.ECODE_INVAL)
588   if not ipcls.ValidateNetmask(netmask):
589     raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
590                                (netmask), errors.ECODE_INVAL)
591
592
593 class LUClusterSetParams(LogicalUnit):
594   """Change the parameters of the cluster.
595
596   """
597   HPATH = "cluster-modify"
598   HTYPE = constants.HTYPE_CLUSTER
599   REQ_BGL = False
600
601   def CheckArguments(self):
602     """Check parameters
603
604     """
605     if self.op.uid_pool:
606       uidpool.CheckUidPool(self.op.uid_pool)
607
608     if self.op.add_uids:
609       uidpool.CheckUidPool(self.op.add_uids)
610
611     if self.op.remove_uids:
612       uidpool.CheckUidPool(self.op.remove_uids)
613
614     if self.op.master_netmask is not None:
615       _ValidateNetmask(self.cfg, self.op.master_netmask)
616
617     if self.op.diskparams:
618       for dt_params in self.op.diskparams.values():
619         utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
620       try:
621         utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
622       except errors.OpPrereqError, err:
623         raise errors.OpPrereqError("While verify diskparams options: %s" % err,
624                                    errors.ECODE_INVAL)
625
626   def ExpandNames(self):
627     # FIXME: in the future maybe other cluster params won't require checking on
628     # all nodes to be modified.
629     # FIXME: This opcode changes cluster-wide settings. Is acquiring all
630     # resource locks the right thing, shouldn't it be the BGL instead?
631     self.needed_locks = {
632       locking.LEVEL_NODE: locking.ALL_SET,
633       locking.LEVEL_INSTANCE: locking.ALL_SET,
634       locking.LEVEL_NODEGROUP: locking.ALL_SET,
635       locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
636     }
637     self.share_locks = ShareAll()
638
639   def BuildHooksEnv(self):
640     """Build hooks env.
641
642     """
643     return {
644       "OP_TARGET": self.cfg.GetClusterName(),
645       "NEW_VG_NAME": self.op.vg_name,
646       }
647
648   def BuildHooksNodes(self):
649     """Build hooks nodes.
650
651     """
652     mn = self.cfg.GetMasterNode()
653     return ([mn], [mn])
654
655   def CheckPrereq(self):
656     """Check prerequisites.
657
658     This checks whether the given params don't conflict and
659     if the given volume group is valid.
660
661     """
662     if self.op.vg_name is not None and not self.op.vg_name:
663       if self.cfg.HasAnyDiskOfType(constants.LD_LV):
664         raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
665                                    " instances exist", errors.ECODE_INVAL)
666
667     if self.op.drbd_helper is not None and not self.op.drbd_helper:
668       if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
669         raise errors.OpPrereqError("Cannot disable drbd helper while"
670                                    " drbd-based instances exist",
671                                    errors.ECODE_INVAL)
672
673     node_list = self.owned_locks(locking.LEVEL_NODE)
674
675     vm_capable_nodes = [node.name
676                         for node in self.cfg.GetAllNodesInfo().values()
677                         if node.name in node_list and node.vm_capable]
678
679     # if vg_name not None, checks given volume group on all nodes
680     if self.op.vg_name:
681       vglist = self.rpc.call_vg_list(vm_capable_nodes)
682       for node in vm_capable_nodes:
683         msg = vglist[node].fail_msg
684         if msg:
685           # ignoring down node
686           self.LogWarning("Error while gathering data on node %s"
687                           " (ignoring node): %s", node, msg)
688           continue
689         vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
690                                               self.op.vg_name,
691                                               constants.MIN_VG_SIZE)
692         if vgstatus:
693           raise errors.OpPrereqError("Error on node '%s': %s" %
694                                      (node, vgstatus), errors.ECODE_ENVIRON)
695
696     if self.op.drbd_helper:
697       # checks given drbd helper on all nodes
698       helpers = self.rpc.call_drbd_helper(node_list)
699       for (node, ninfo) in self.cfg.GetMultiNodeInfo(node_list):
700         if ninfo.offline:
701           self.LogInfo("Not checking drbd helper on offline node %s", node)
702           continue
703         msg = helpers[node].fail_msg
704         if msg:
705           raise errors.OpPrereqError("Error checking drbd helper on node"
706                                      " '%s': %s" % (node, msg),
707                                      errors.ECODE_ENVIRON)
708         node_helper = helpers[node].payload
709         if node_helper != self.op.drbd_helper:
710           raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
711                                      (node, node_helper), errors.ECODE_ENVIRON)
712
713     self.cluster = cluster = self.cfg.GetClusterInfo()
714     # validate params changes
715     if self.op.beparams:
716       objects.UpgradeBeParams(self.op.beparams)
717       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
718       self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
719
720     if self.op.ndparams:
721       utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
722       self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
723
724       # TODO: we need a more general way to handle resetting
725       # cluster-level parameters to default values
726       if self.new_ndparams["oob_program"] == "":
727         self.new_ndparams["oob_program"] = \
728             constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
729
730     if self.op.hv_state:
731       new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
732                                            self.cluster.hv_state_static)
733       self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
734                                for hv, values in new_hv_state.items())
735
736     if self.op.disk_state:
737       new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
738                                                self.cluster.disk_state_static)
739       self.new_disk_state = \
740         dict((storage, dict((name, cluster.SimpleFillDiskState(values))
741                             for name, values in svalues.items()))
742              for storage, svalues in new_disk_state.items())
743
744     if self.op.ipolicy:
745       self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
746                                            group_policy=False)
747
748       all_instances = self.cfg.GetAllInstancesInfo().values()
749       violations = set()
750       for group in self.cfg.GetAllNodeGroupsInfo().values():
751         instances = frozenset([inst for inst in all_instances
752                                if compat.any(node in group.members
753                                              for node in inst.all_nodes)])
754         new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
755         ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
756         new = ComputeNewInstanceViolations(ipol,
757                                            new_ipolicy, instances, self.cfg)
758         if new:
759           violations.update(new)
760
761       if violations:
762         self.LogWarning("After the ipolicy change the following instances"
763                         " violate them: %s",
764                         utils.CommaJoin(utils.NiceSort(violations)))
765
766     if self.op.nicparams:
767       utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
768       self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
769       objects.NIC.CheckParameterSyntax(self.new_nicparams)
770       nic_errors = []
771
772       # check all instances for consistency
773       for instance in self.cfg.GetAllInstancesInfo().values():
774         for nic_idx, nic in enumerate(instance.nics):
775           params_copy = copy.deepcopy(nic.nicparams)
776           params_filled = objects.FillDict(self.new_nicparams, params_copy)
777
778           # check parameter syntax
779           try:
780             objects.NIC.CheckParameterSyntax(params_filled)
781           except errors.ConfigurationError, err:
782             nic_errors.append("Instance %s, nic/%d: %s" %
783                               (instance.name, nic_idx, err))
784
785           # if we're moving instances to routed, check that they have an ip
786           target_mode = params_filled[constants.NIC_MODE]
787           if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
788             nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
789                               " address" % (instance.name, nic_idx))
790       if nic_errors:
791         raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
792                                    "\n".join(nic_errors), errors.ECODE_INVAL)
793
794     # hypervisor list/parameters
795     self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
796     if self.op.hvparams:
797       for hv_name, hv_dict in self.op.hvparams.items():
798         if hv_name not in self.new_hvparams:
799           self.new_hvparams[hv_name] = hv_dict
800         else:
801           self.new_hvparams[hv_name].update(hv_dict)
802
803     # disk template parameters
804     self.new_diskparams = objects.FillDict(cluster.diskparams, {})
805     if self.op.diskparams:
806       for dt_name, dt_params in self.op.diskparams.items():
807         if dt_name not in self.new_diskparams:
808           self.new_diskparams[dt_name] = dt_params
809         else:
810           self.new_diskparams[dt_name].update(dt_params)
811
812     # os hypervisor parameters
813     self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
814     if self.op.os_hvp:
815       for os_name, hvs in self.op.os_hvp.items():
816         if os_name not in self.new_os_hvp:
817           self.new_os_hvp[os_name] = hvs
818         else:
819           for hv_name, hv_dict in hvs.items():
820             if hv_dict is None:
821               # Delete if it exists
822               self.new_os_hvp[os_name].pop(hv_name, None)
823             elif hv_name not in self.new_os_hvp[os_name]:
824               self.new_os_hvp[os_name][hv_name] = hv_dict
825             else:
826               self.new_os_hvp[os_name][hv_name].update(hv_dict)
827
828     # os parameters
829     self.new_osp = objects.FillDict(cluster.osparams, {})
830     if self.op.osparams:
831       for os_name, osp in self.op.osparams.items():
832         if os_name not in self.new_osp:
833           self.new_osp[os_name] = {}
834
835         self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
836                                                  use_none=True)
837
838         if not self.new_osp[os_name]:
839           # we removed all parameters
840           del self.new_osp[os_name]
841         else:
842           # check the parameter validity (remote check)
843           CheckOSParams(self, False, [self.cfg.GetMasterNode()],
844                         os_name, self.new_osp[os_name])
845
846     # changes to the hypervisor list
847     if self.op.enabled_hypervisors is not None:
848       self.hv_list = self.op.enabled_hypervisors
849       for hv in self.hv_list:
850         # if the hypervisor doesn't already exist in the cluster
851         # hvparams, we initialize it to empty, and then (in both
852         # cases) we make sure to fill the defaults, as we might not
853         # have a complete defaults list if the hypervisor wasn't
854         # enabled before
855         if hv not in new_hvp:
856           new_hvp[hv] = {}
857         new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
858         utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
859     else:
860       self.hv_list = cluster.enabled_hypervisors
861
862     if self.op.hvparams or self.op.enabled_hypervisors is not None:
863       # either the enabled list has changed, or the parameters have, validate
864       for hv_name, hv_params in self.new_hvparams.items():
865         if ((self.op.hvparams and hv_name in self.op.hvparams) or
866             (self.op.enabled_hypervisors and
867              hv_name in self.op.enabled_hypervisors)):
868           # either this is a new hypervisor, or its parameters have changed
869           hv_class = hypervisor.GetHypervisorClass(hv_name)
870           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
871           hv_class.CheckParameterSyntax(hv_params)
872           CheckHVParams(self, node_list, hv_name, hv_params)
873
874     self._CheckDiskTemplateConsistency()
875
876     if self.op.os_hvp:
877       # no need to check any newly-enabled hypervisors, since the
878       # defaults have already been checked in the above code-block
879       for os_name, os_hvp in self.new_os_hvp.items():
880         for hv_name, hv_params in os_hvp.items():
881           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
882           # we need to fill in the new os_hvp on top of the actual hv_p
883           cluster_defaults = self.new_hvparams.get(hv_name, {})
884           new_osp = objects.FillDict(cluster_defaults, hv_params)
885           hv_class = hypervisor.GetHypervisorClass(hv_name)
886           hv_class.CheckParameterSyntax(new_osp)
887           CheckHVParams(self, node_list, hv_name, new_osp)
888
889     if self.op.default_iallocator:
890       alloc_script = utils.FindFile(self.op.default_iallocator,
891                                     constants.IALLOCATOR_SEARCH_PATH,
892                                     os.path.isfile)
893       if alloc_script is None:
894         raise errors.OpPrereqError("Invalid default iallocator script '%s'"
895                                    " specified" % self.op.default_iallocator,
896                                    errors.ECODE_INVAL)
897
898   def _CheckDiskTemplateConsistency(self):
899     """Check whether the disk templates that are going to be disabled
900        are still in use by some instances.
901
902     """
903     if self.op.enabled_disk_templates:
904       cluster = self.cfg.GetClusterInfo()
905       instances = self.cfg.GetAllInstancesInfo()
906
907       disk_templates_to_remove = set(cluster.enabled_disk_templates) \
908         - set(self.op.enabled_disk_templates)
909       for instance in instances.itervalues():
910         if instance.disk_template in disk_templates_to_remove:
911           raise errors.OpPrereqError("Cannot disable disk template '%s',"
912                                      " because instance '%s' is using it." %
913                                      (instance.disk_template, instance.name))
914
915   def Exec(self, feedback_fn):
916     """Change the parameters of the cluster.
917
918     """
919     if self.op.vg_name is not None:
920       new_volume = self.op.vg_name
921       if not new_volume:
922         new_volume = None
923       if new_volume != self.cfg.GetVGName():
924         self.cfg.SetVGName(new_volume)
925       else:
926         feedback_fn("Cluster LVM configuration already in desired"
927                     " state, not changing")
928     if self.op.drbd_helper is not None:
929       new_helper = self.op.drbd_helper
930       if not new_helper:
931         new_helper = None
932       if new_helper != self.cfg.GetDRBDHelper():
933         self.cfg.SetDRBDHelper(new_helper)
934       else:
935         feedback_fn("Cluster DRBD helper already in desired state,"
936                     " not changing")
937     if self.op.hvparams:
938       self.cluster.hvparams = self.new_hvparams
939     if self.op.os_hvp:
940       self.cluster.os_hvp = self.new_os_hvp
941     if self.op.enabled_hypervisors is not None:
942       self.cluster.hvparams = self.new_hvparams
943       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
944     if self.op.enabled_disk_templates:
945       self.cluster.enabled_disk_templates = \
946         list(set(self.op.enabled_disk_templates))
947     if self.op.beparams:
948       self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
949     if self.op.nicparams:
950       self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
951     if self.op.ipolicy:
952       self.cluster.ipolicy = self.new_ipolicy
953     if self.op.osparams:
954       self.cluster.osparams = self.new_osp
955     if self.op.ndparams:
956       self.cluster.ndparams = self.new_ndparams
957     if self.op.diskparams:
958       self.cluster.diskparams = self.new_diskparams
959     if self.op.hv_state:
960       self.cluster.hv_state_static = self.new_hv_state
961     if self.op.disk_state:
962       self.cluster.disk_state_static = self.new_disk_state
963
964     if self.op.candidate_pool_size is not None:
965       self.cluster.candidate_pool_size = self.op.candidate_pool_size
966       # we need to update the pool size here, otherwise the save will fail
967       AdjustCandidatePool(self, [])
968
969     if self.op.maintain_node_health is not None:
970       if self.op.maintain_node_health and not constants.ENABLE_CONFD:
971         feedback_fn("Note: CONFD was disabled at build time, node health"
972                     " maintenance is not useful (still enabling it)")
973       self.cluster.maintain_node_health = self.op.maintain_node_health
974
975     if self.op.modify_etc_hosts is not None:
976       self.cluster.modify_etc_hosts = self.op.modify_etc_hosts
977
978     if self.op.prealloc_wipe_disks is not None:
979       self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
980
981     if self.op.add_uids is not None:
982       uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
983
984     if self.op.remove_uids is not None:
985       uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
986
987     if self.op.uid_pool is not None:
988       self.cluster.uid_pool = self.op.uid_pool
989
990     if self.op.default_iallocator is not None:
991       self.cluster.default_iallocator = self.op.default_iallocator
992
993     if self.op.reserved_lvs is not None:
994       self.cluster.reserved_lvs = self.op.reserved_lvs
995
996     if self.op.use_external_mip_script is not None:
997       self.cluster.use_external_mip_script = self.op.use_external_mip_script
998
999     def helper_os(aname, mods, desc):
1000       desc += " OS list"
1001       lst = getattr(self.cluster, aname)
1002       for key, val in mods:
1003         if key == constants.DDM_ADD:
1004           if val in lst:
1005             feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1006           else:
1007             lst.append(val)
1008         elif key == constants.DDM_REMOVE:
1009           if val in lst:
1010             lst.remove(val)
1011           else:
1012             feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1013         else:
1014           raise errors.ProgrammerError("Invalid modification '%s'" % key)
1015
1016     if self.op.hidden_os:
1017       helper_os("hidden_os", self.op.hidden_os, "hidden")
1018
1019     if self.op.blacklisted_os:
1020       helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1021
1022     if self.op.master_netdev:
1023       master_params = self.cfg.GetMasterNetworkParameters()
1024       ems = self.cfg.GetUseExternalMipScript()
1025       feedback_fn("Shutting down master ip on the current netdev (%s)" %
1026                   self.cluster.master_netdev)
1027       result = self.rpc.call_node_deactivate_master_ip(master_params.name,
1028                                                        master_params, ems)
1029       if not self.op.force:
1030         result.Raise("Could not disable the master ip")
1031       else:
1032         if result.fail_msg:
1033           msg = ("Could not disable the master ip (continuing anyway): %s" %
1034                  result.fail_msg)
1035           feedback_fn(msg)
1036       feedback_fn("Changing master_netdev from %s to %s" %
1037                   (master_params.netdev, self.op.master_netdev))
1038       self.cluster.master_netdev = self.op.master_netdev
1039
1040     if self.op.master_netmask:
1041       master_params = self.cfg.GetMasterNetworkParameters()
1042       feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1043       result = self.rpc.call_node_change_master_netmask(master_params.name,
1044                                                         master_params.netmask,
1045                                                         self.op.master_netmask,
1046                                                         master_params.ip,
1047                                                         master_params.netdev)
1048       if result.fail_msg:
1049         msg = "Could not change the master IP netmask: %s" % result.fail_msg
1050         feedback_fn(msg)
1051
1052       self.cluster.master_netmask = self.op.master_netmask
1053
1054     self.cfg.Update(self.cluster, feedback_fn)
1055
1056     if self.op.master_netdev:
1057       master_params = self.cfg.GetMasterNetworkParameters()
1058       feedback_fn("Starting the master ip on the new master netdev (%s)" %
1059                   self.op.master_netdev)
1060       ems = self.cfg.GetUseExternalMipScript()
1061       result = self.rpc.call_node_activate_master_ip(master_params.name,
1062                                                      master_params, ems)
1063       if result.fail_msg:
1064         self.LogWarning("Could not re-enable the master ip on"
1065                         " the master, please restart manually: %s",
1066                         result.fail_msg)
1067
1068
1069 class LUClusterVerify(NoHooksLU):
1070   """Submits all jobs necessary to verify the cluster.
1071
1072   """
1073   REQ_BGL = False
1074
1075   def ExpandNames(self):
1076     self.needed_locks = {}
1077
1078   def Exec(self, feedback_fn):
1079     jobs = []
1080
1081     if self.op.group_name:
1082       groups = [self.op.group_name]
1083       depends_fn = lambda: None
1084     else:
1085       groups = self.cfg.GetNodeGroupList()
1086
1087       # Verify global configuration
1088       jobs.append([
1089         opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1090         ])
1091
1092       # Always depend on global verification
1093       depends_fn = lambda: [(-len(jobs), [])]
1094
1095     jobs.extend(
1096       [opcodes.OpClusterVerifyGroup(group_name=group,
1097                                     ignore_errors=self.op.ignore_errors,
1098                                     depends=depends_fn())]
1099       for group in groups)
1100
1101     # Fix up all parameters
1102     for op in itertools.chain(*jobs): # pylint: disable=W0142
1103       op.debug_simulate_errors = self.op.debug_simulate_errors
1104       op.verbose = self.op.verbose
1105       op.error_codes = self.op.error_codes
1106       try:
1107         op.skip_checks = self.op.skip_checks
1108       except AttributeError:
1109         assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1110
1111     return ResultWithJobs(jobs)
1112
1113
1114 class _VerifyErrors(object):
1115   """Mix-in for cluster/group verify LUs.
1116
1117   It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1118   self.op and self._feedback_fn to be available.)
1119
1120   """
1121
1122   ETYPE_FIELD = "code"
1123   ETYPE_ERROR = "ERROR"
1124   ETYPE_WARNING = "WARNING"
1125
1126   def _Error(self, ecode, item, msg, *args, **kwargs):
1127     """Format an error message.
1128
1129     Based on the opcode's error_codes parameter, either format a
1130     parseable error code, or a simpler error string.
1131
1132     This must be called only from Exec and functions called from Exec.
1133
1134     """
1135     ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1136     itype, etxt, _ = ecode
1137     # If the error code is in the list of ignored errors, demote the error to a
1138     # warning
1139     if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1140       ltype = self.ETYPE_WARNING
1141     # first complete the msg
1142     if args:
1143       msg = msg % args
1144     # then format the whole message
1145     if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1146       msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1147     else:
1148       if item:
1149         item = " " + item
1150       else:
1151         item = ""
1152       msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1153     # and finally report it via the feedback_fn
1154     self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1155     # do not mark the operation as failed for WARN cases only
1156     if ltype == self.ETYPE_ERROR:
1157       self.bad = True
1158
1159   def _ErrorIf(self, cond, *args, **kwargs):
1160     """Log an error message if the passed condition is True.
1161
1162     """
1163     if (bool(cond)
1164         or self.op.debug_simulate_errors): # pylint: disable=E1101
1165       self._Error(*args, **kwargs)
1166
1167
1168 def _VerifyCertificate(filename):
1169   """Verifies a certificate for L{LUClusterVerifyConfig}.
1170
1171   @type filename: string
1172   @param filename: Path to PEM file
1173
1174   """
1175   try:
1176     cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1177                                            utils.ReadFile(filename))
1178   except Exception, err: # pylint: disable=W0703
1179     return (LUClusterVerifyConfig.ETYPE_ERROR,
1180             "Failed to load X509 certificate %s: %s" % (filename, err))
1181
1182   (errcode, msg) = \
1183     utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1184                                 constants.SSL_CERT_EXPIRATION_ERROR)
1185
1186   if msg:
1187     fnamemsg = "While verifying %s: %s" % (filename, msg)
1188   else:
1189     fnamemsg = None
1190
1191   if errcode is None:
1192     return (None, fnamemsg)
1193   elif errcode == utils.CERT_WARNING:
1194     return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1195   elif errcode == utils.CERT_ERROR:
1196     return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1197
1198   raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1199
1200
1201 def _GetAllHypervisorParameters(cluster, instances):
1202   """Compute the set of all hypervisor parameters.
1203
1204   @type cluster: L{objects.Cluster}
1205   @param cluster: the cluster object
1206   @param instances: list of L{objects.Instance}
1207   @param instances: additional instances from which to obtain parameters
1208   @rtype: list of (origin, hypervisor, parameters)
1209   @return: a list with all parameters found, indicating the hypervisor they
1210        apply to, and the origin (can be "cluster", "os X", or "instance Y")
1211
1212   """
1213   hvp_data = []
1214
1215   for hv_name in cluster.enabled_hypervisors:
1216     hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1217
1218   for os_name, os_hvp in cluster.os_hvp.items():
1219     for hv_name, hv_params in os_hvp.items():
1220       if hv_params:
1221         full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1222         hvp_data.append(("os %s" % os_name, hv_name, full_params))
1223
1224   # TODO: collapse identical parameter values in a single one
1225   for instance in instances:
1226     if instance.hvparams:
1227       hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1228                        cluster.FillHV(instance)))
1229
1230   return hvp_data
1231
1232
1233 class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1234   """Verifies the cluster config.
1235
1236   """
1237   REQ_BGL = False
1238
1239   def _VerifyHVP(self, hvp_data):
1240     """Verifies locally the syntax of the hypervisor parameters.
1241
1242     """
1243     for item, hv_name, hv_params in hvp_data:
1244       msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1245              (item, hv_name))
1246       try:
1247         hv_class = hypervisor.GetHypervisorClass(hv_name)
1248         utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1249         hv_class.CheckParameterSyntax(hv_params)
1250       except errors.GenericError, err:
1251         self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1252
1253   def ExpandNames(self):
1254     self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1255     self.share_locks = ShareAll()
1256
1257   def CheckPrereq(self):
1258     """Check prerequisites.
1259
1260     """
1261     # Retrieve all information
1262     self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1263     self.all_node_info = self.cfg.GetAllNodesInfo()
1264     self.all_inst_info = self.cfg.GetAllInstancesInfo()
1265
1266   def Exec(self, feedback_fn):
1267     """Verify integrity of cluster, performing various test on nodes.
1268
1269     """
1270     self.bad = False
1271     self._feedback_fn = feedback_fn
1272
1273     feedback_fn("* Verifying cluster config")
1274
1275     for msg in self.cfg.VerifyConfig():
1276       self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1277
1278     feedback_fn("* Verifying cluster certificate files")
1279
1280     for cert_filename in pathutils.ALL_CERT_FILES:
1281       (errcode, msg) = _VerifyCertificate(cert_filename)
1282       self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1283
1284     self._ErrorIf(not utils.CanRead(constants.LUXID_USER,
1285                                     pathutils.NODED_CERT_FILE),
1286                   constants.CV_ECLUSTERCERT,
1287                   None,
1288                   pathutils.NODED_CERT_FILE + " must be accessible by the " +
1289                     constants.LUXID_USER + " user")
1290
1291     feedback_fn("* Verifying hypervisor parameters")
1292
1293     self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1294                                                 self.all_inst_info.values()))
1295
1296     feedback_fn("* Verifying all nodes belong to an existing group")
1297
1298     # We do this verification here because, should this bogus circumstance
1299     # occur, it would never be caught by VerifyGroup, which only acts on
1300     # nodes/instances reachable from existing node groups.
1301
1302     dangling_nodes = set(node.name for node in self.all_node_info.values()
1303                          if node.group not in self.all_group_info)
1304
1305     dangling_instances = {}
1306     no_node_instances = []
1307
1308     for inst in self.all_inst_info.values():
1309       if inst.primary_node in dangling_nodes:
1310         dangling_instances.setdefault(inst.primary_node, []).append(inst.name)
1311       elif inst.primary_node not in self.all_node_info:
1312         no_node_instances.append(inst.name)
1313
1314     pretty_dangling = [
1315         "%s (%s)" %
1316         (node.name,
1317          utils.CommaJoin(dangling_instances.get(node.name,
1318                                                 ["no instances"])))
1319         for node in dangling_nodes]
1320
1321     self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1322                   None,
1323                   "the following nodes (and their instances) belong to a non"
1324                   " existing group: %s", utils.CommaJoin(pretty_dangling))
1325
1326     self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1327                   None,
1328                   "the following instances have a non-existing primary-node:"
1329                   " %s", utils.CommaJoin(no_node_instances))
1330
1331     return not self.bad
1332
1333
1334 class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1335   """Verifies the status of a node group.
1336
1337   """
1338   HPATH = "cluster-verify"
1339   HTYPE = constants.HTYPE_CLUSTER
1340   REQ_BGL = False
1341
1342   _HOOKS_INDENT_RE = re.compile("^", re.M)
1343
1344   class NodeImage(object):
1345     """A class representing the logical and physical status of a node.
1346
1347     @type name: string
1348     @ivar name: the node name to which this object refers
1349     @ivar volumes: a structure as returned from
1350         L{ganeti.backend.GetVolumeList} (runtime)
1351     @ivar instances: a list of running instances (runtime)
1352     @ivar pinst: list of configured primary instances (config)
1353     @ivar sinst: list of configured secondary instances (config)
1354     @ivar sbp: dictionary of {primary-node: list of instances} for all
1355         instances for which this node is secondary (config)
1356     @ivar mfree: free memory, as reported by hypervisor (runtime)
1357     @ivar dfree: free disk, as reported by the node (runtime)
1358     @ivar offline: the offline status (config)
1359     @type rpc_fail: boolean
1360     @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1361         not whether the individual keys were correct) (runtime)
1362     @type lvm_fail: boolean
1363     @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1364     @type hyp_fail: boolean
1365     @ivar hyp_fail: whether the RPC call didn't return the instance list
1366     @type ghost: boolean
1367     @ivar ghost: whether this is a known node or not (config)
1368     @type os_fail: boolean
1369     @ivar os_fail: whether the RPC call didn't return valid OS data
1370     @type oslist: list
1371     @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1372     @type vm_capable: boolean
1373     @ivar vm_capable: whether the node can host instances
1374     @type pv_min: float
1375     @ivar pv_min: size in MiB of the smallest PVs
1376     @type pv_max: float
1377     @ivar pv_max: size in MiB of the biggest PVs
1378
1379     """
1380     def __init__(self, offline=False, name=None, vm_capable=True):
1381       self.name = name
1382       self.volumes = {}
1383       self.instances = []
1384       self.pinst = []
1385       self.sinst = []
1386       self.sbp = {}
1387       self.mfree = 0
1388       self.dfree = 0
1389       self.offline = offline
1390       self.vm_capable = vm_capable
1391       self.rpc_fail = False
1392       self.lvm_fail = False
1393       self.hyp_fail = False
1394       self.ghost = False
1395       self.os_fail = False
1396       self.oslist = {}
1397       self.pv_min = None
1398       self.pv_max = None
1399
1400   def ExpandNames(self):
1401     # This raises errors.OpPrereqError on its own:
1402     self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1403
1404     # Get instances in node group; this is unsafe and needs verification later
1405     inst_names = \
1406       self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1407
1408     self.needed_locks = {
1409       locking.LEVEL_INSTANCE: inst_names,
1410       locking.LEVEL_NODEGROUP: [self.group_uuid],
1411       locking.LEVEL_NODE: [],
1412
1413       # This opcode is run by watcher every five minutes and acquires all nodes
1414       # for a group. It doesn't run for a long time, so it's better to acquire
1415       # the node allocation lock as well.
1416       locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1417       }
1418
1419     self.share_locks = ShareAll()
1420
1421   def DeclareLocks(self, level):
1422     if level == locking.LEVEL_NODE:
1423       # Get members of node group; this is unsafe and needs verification later
1424       nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1425
1426       all_inst_info = self.cfg.GetAllInstancesInfo()
1427
1428       # In Exec(), we warn about mirrored instances that have primary and
1429       # secondary living in separate node groups. To fully verify that
1430       # volumes for these instances are healthy, we will need to do an
1431       # extra call to their secondaries. We ensure here those nodes will
1432       # be locked.
1433       for inst in self.owned_locks(locking.LEVEL_INSTANCE):
1434         # Important: access only the instances whose lock is owned
1435         if all_inst_info[inst].disk_template in constants.DTS_INT_MIRROR:
1436           nodes.update(all_inst_info[inst].secondary_nodes)
1437
1438       self.needed_locks[locking.LEVEL_NODE] = nodes
1439
1440   def CheckPrereq(self):
1441     assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1442     self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1443
1444     group_nodes = set(self.group_info.members)
1445     group_instances = \
1446       self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1447
1448     unlocked_nodes = \
1449         group_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1450
1451     unlocked_instances = \
1452         group_instances.difference(self.owned_locks(locking.LEVEL_INSTANCE))
1453
1454     if unlocked_nodes:
1455       raise errors.OpPrereqError("Missing lock for nodes: %s" %
1456                                  utils.CommaJoin(unlocked_nodes),
1457                                  errors.ECODE_STATE)
1458
1459     if unlocked_instances:
1460       raise errors.OpPrereqError("Missing lock for instances: %s" %
1461                                  utils.CommaJoin(unlocked_instances),
1462                                  errors.ECODE_STATE)
1463
1464     self.all_node_info = self.cfg.GetAllNodesInfo()
1465     self.all_inst_info = self.cfg.GetAllInstancesInfo()
1466
1467     self.my_node_names = utils.NiceSort(group_nodes)
1468     self.my_inst_names = utils.NiceSort(group_instances)
1469
1470     self.my_node_info = dict((name, self.all_node_info[name])
1471                              for name in self.my_node_names)
1472
1473     self.my_inst_info = dict((name, self.all_inst_info[name])
1474                              for name in self.my_inst_names)
1475
1476     # We detect here the nodes that will need the extra RPC calls for verifying
1477     # split LV volumes; they should be locked.
1478     extra_lv_nodes = set()
1479
1480     for inst in self.my_inst_info.values():
1481       if inst.disk_template in constants.DTS_INT_MIRROR:
1482         for nname in inst.all_nodes:
1483           if self.all_node_info[nname].group != self.group_uuid:
1484             extra_lv_nodes.add(nname)
1485
1486     unlocked_lv_nodes = \
1487         extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1488
1489     if unlocked_lv_nodes:
1490       raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1491                                  utils.CommaJoin(unlocked_lv_nodes),
1492                                  errors.ECODE_STATE)
1493     self.extra_lv_nodes = list(extra_lv_nodes)
1494
1495   def _VerifyNode(self, ninfo, nresult):
1496     """Perform some basic validation on data returned from a node.
1497
1498       - check the result data structure is well formed and has all the
1499         mandatory fields
1500       - check ganeti version
1501
1502     @type ninfo: L{objects.Node}
1503     @param ninfo: the node to check
1504     @param nresult: the results from the node
1505     @rtype: boolean
1506     @return: whether overall this call was successful (and we can expect
1507          reasonable values in the respose)
1508
1509     """
1510     node = ninfo.name
1511     _ErrorIf = self._ErrorIf # pylint: disable=C0103
1512
1513     # main result, nresult should be a non-empty dict
1514     test = not nresult or not isinstance(nresult, dict)
1515     _ErrorIf(test, constants.CV_ENODERPC, node,
1516                   "unable to verify node: no data returned")
1517     if test:
1518       return False
1519
1520     # compares ganeti version
1521     local_version = constants.PROTOCOL_VERSION
1522     remote_version = nresult.get("version", None)
1523     test = not (remote_version and
1524                 isinstance(remote_version, (list, tuple)) and
1525                 len(remote_version) == 2)
1526     _ErrorIf(test, constants.CV_ENODERPC, node,
1527              "connection to node returned invalid data")
1528     if test:
1529       return False
1530
1531     test = local_version != remote_version[0]
1532     _ErrorIf(test, constants.CV_ENODEVERSION, node,
1533              "incompatible protocol versions: master %s,"
1534              " node %s", local_version, remote_version[0])
1535     if test:
1536       return False
1537
1538     # node seems compatible, we can actually try to look into its results
1539
1540     # full package version
1541     self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1542                   constants.CV_ENODEVERSION, node,
1543                   "software version mismatch: master %s, node %s",
1544                   constants.RELEASE_VERSION, remote_version[1],
1545                   code=self.ETYPE_WARNING)
1546
1547     hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1548     if ninfo.vm_capable and isinstance(hyp_result, dict):
1549       for hv_name, hv_result in hyp_result.iteritems():
1550         test = hv_result is not None
1551         _ErrorIf(test, constants.CV_ENODEHV, node,
1552                  "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1553
1554     hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1555     if ninfo.vm_capable and isinstance(hvp_result, list):
1556       for item, hv_name, hv_result in hvp_result:
1557         _ErrorIf(True, constants.CV_ENODEHV, node,
1558                  "hypervisor %s parameter verify failure (source %s): %s",
1559                  hv_name, item, hv_result)
1560
1561     test = nresult.get(constants.NV_NODESETUP,
1562                        ["Missing NODESETUP results"])
1563     _ErrorIf(test, constants.CV_ENODESETUP, node, "node setup error: %s",
1564              "; ".join(test))
1565
1566     return True
1567
1568   def _VerifyNodeTime(self, ninfo, nresult,
1569                       nvinfo_starttime, nvinfo_endtime):
1570     """Check the node time.
1571
1572     @type ninfo: L{objects.Node}
1573     @param ninfo: the node to check
1574     @param nresult: the remote results for the node
1575     @param nvinfo_starttime: the start time of the RPC call
1576     @param nvinfo_endtime: the end time of the RPC call
1577
1578     """
1579     node = ninfo.name
1580     _ErrorIf = self._ErrorIf # pylint: disable=C0103
1581
1582     ntime = nresult.get(constants.NV_TIME, None)
1583     try:
1584       ntime_merged = utils.MergeTime(ntime)
1585     except (ValueError, TypeError):
1586       _ErrorIf(True, constants.CV_ENODETIME, node, "Node returned invalid time")
1587       return
1588
1589     if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1590       ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1591     elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1592       ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1593     else:
1594       ntime_diff = None
1595
1596     _ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, node,
1597              "Node time diverges by at least %s from master node time",
1598              ntime_diff)
1599
1600   def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1601     """Check the node LVM results and update info for cross-node checks.
1602
1603     @type ninfo: L{objects.Node}
1604     @param ninfo: the node to check
1605     @param nresult: the remote results for the node
1606     @param vg_name: the configured VG name
1607     @type nimg: L{NodeImage}
1608     @param nimg: node image
1609
1610     """
1611     if vg_name is None:
1612       return
1613
1614     node = ninfo.name
1615     _ErrorIf = self._ErrorIf # pylint: disable=C0103
1616
1617     # checks vg existence and size > 20G
1618     vglist = nresult.get(constants.NV_VGLIST, None)
1619     test = not vglist
1620     _ErrorIf(test, constants.CV_ENODELVM, node, "unable to check volume groups")
1621     if not test:
1622       vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1623                                             constants.MIN_VG_SIZE)
1624       _ErrorIf(vgstatus, constants.CV_ENODELVM, node, vgstatus)
1625
1626     # Check PVs
1627     (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1628     for em in errmsgs:
1629       self._Error(constants.CV_ENODELVM, node, em)
1630     if pvminmax is not None:
1631       (nimg.pv_min, nimg.pv_max) = pvminmax
1632
1633   def _VerifyGroupLVM(self, node_image, vg_name):
1634     """Check cross-node consistency in LVM.
1635
1636     @type node_image: dict
1637     @param node_image: info about nodes, mapping from node to names to
1638       L{NodeImage} objects
1639     @param vg_name: the configured VG name
1640
1641     """
1642     if vg_name is None:
1643       return
1644
1645     # Only exlcusive storage needs this kind of checks
1646     if not self._exclusive_storage:
1647       return
1648
1649     # exclusive_storage wants all PVs to have the same size (approximately),
1650     # if the smallest and the biggest ones are okay, everything is fine.
1651     # pv_min is None iff pv_max is None
1652     vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1653     if not vals:
1654       return
1655     (pvmin, minnode) = min((ni.pv_min, ni.name) for ni in vals)
1656     (pvmax, maxnode) = max((ni.pv_max, ni.name) for ni in vals)
1657     bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1658     self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1659                   "PV sizes differ too much in the group; smallest (%s MB) is"
1660                   " on %s, biggest (%s MB) is on %s",
1661                   pvmin, minnode, pvmax, maxnode)
1662
1663   def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1664     """Check the node bridges.
1665
1666     @type ninfo: L{objects.Node}
1667     @param ninfo: the node to check
1668     @param nresult: the remote results for the node
1669     @param bridges: the expected list of bridges
1670
1671     """
1672     if not bridges:
1673       return
1674
1675     node = ninfo.name
1676     _ErrorIf = self._ErrorIf # pylint: disable=C0103
1677
1678     missing = nresult.get(constants.NV_BRIDGES, None)
1679     test = not isinstance(missing, list)
1680     _ErrorIf(test, constants.CV_ENODENET, node,
1681              "did not return valid bridge information")
1682     if not test:
1683       _ErrorIf(bool(missing), constants.CV_ENODENET, node,
1684                "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1685
1686   def _VerifyNodeUserScripts(self, ninfo, nresult):
1687     """Check the results of user scripts presence and executability on the node
1688
1689     @type ninfo: L{objects.Node}
1690     @param ninfo: the node to check
1691     @param nresult: the remote results for the node
1692
1693     """
1694     node = ninfo.name
1695
1696     test = not constants.NV_USERSCRIPTS in nresult
1697     self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, node,
1698                   "did not return user scripts information")
1699
1700     broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
1701     if not test:
1702       self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, node,
1703                     "user scripts not present or not executable: %s" %
1704                     utils.CommaJoin(sorted(broken_scripts)))
1705
1706   def _VerifyNodeNetwork(self, ninfo, nresult):
1707     """Check the node network connectivity results.
1708
1709     @type ninfo: L{objects.Node}
1710     @param ninfo: the node to check
1711     @param nresult: the remote results for the node
1712
1713     """
1714     node = ninfo.name
1715     _ErrorIf = self._ErrorIf # pylint: disable=C0103
1716
1717     test = constants.NV_NODELIST not in nresult
1718     _ErrorIf(test, constants.CV_ENODESSH, node,
1719              "node hasn't returned node ssh connectivity data")
1720     if not test:
1721       if nresult[constants.NV_NODELIST]:
1722         for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1723           _ErrorIf(True, constants.CV_ENODESSH, node,
1724                    "ssh communication with node '%s': %s", a_node, a_msg)
1725
1726     test = constants.NV_NODENETTEST not in nresult
1727     _ErrorIf(test, constants.CV_ENODENET, node,
1728              "node hasn't returned node tcp connectivity data")
1729     if not test:
1730       if nresult[constants.NV_NODENETTEST]:
1731         nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1732         for anode in nlist:
1733           _ErrorIf(True, constants.CV_ENODENET, node,
1734                    "tcp communication with node '%s': %s",
1735                    anode, nresult[constants.NV_NODENETTEST][anode])
1736
1737     test = constants.NV_MASTERIP not in nresult
1738     _ErrorIf(test, constants.CV_ENODENET, node,
1739              "node hasn't returned node master IP reachability data")
1740     if not test:
1741       if not nresult[constants.NV_MASTERIP]:
1742         if node == self.master_node:
1743           msg = "the master node cannot reach the master IP (not configured?)"
1744         else:
1745           msg = "cannot reach the master IP"
1746         _ErrorIf(True, constants.CV_ENODENET, node, msg)
1747
1748   def _VerifyInstance(self, instance, inst_config, node_image,
1749                       diskstatus):
1750     """Verify an instance.
1751
1752     This function checks to see if the required block devices are
1753     available on the instance's node, and that the nodes are in the correct
1754     state.
1755
1756     """
1757     _ErrorIf = self._ErrorIf # pylint: disable=C0103
1758     pnode = inst_config.primary_node
1759     pnode_img = node_image[pnode]
1760     groupinfo = self.cfg.GetAllNodeGroupsInfo()
1761
1762     node_vol_should = {}
1763     inst_config.MapLVsByNode(node_vol_should)
1764
1765     cluster = self.cfg.GetClusterInfo()
1766     ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
1767                                                             self.group_info)
1768     err = ComputeIPolicyInstanceViolation(ipolicy, inst_config, self.cfg)
1769     _ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance, utils.CommaJoin(err),
1770              code=self.ETYPE_WARNING)
1771
1772     for node in node_vol_should:
1773       n_img = node_image[node]
1774       if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1775         # ignore missing volumes on offline or broken nodes
1776         continue
1777       for volume in node_vol_should[node]:
1778         test = volume not in n_img.volumes
1779         _ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance,
1780                  "volume %s missing on node %s", volume, node)
1781
1782     if inst_config.admin_state == constants.ADMINST_UP:
1783       test = instance not in pnode_img.instances and not pnode_img.offline
1784       _ErrorIf(test, constants.CV_EINSTANCEDOWN, instance,
1785                "instance not running on its primary node %s",
1786                pnode)
1787       _ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE, instance,
1788                "instance is marked as running and lives on offline node %s",
1789                pnode)
1790
1791     diskdata = [(nname, success, status, idx)
1792                 for (nname, disks) in diskstatus.items()
1793                 for idx, (success, status) in enumerate(disks)]
1794
1795     for nname, success, bdev_status, idx in diskdata:
1796       # the 'ghost node' construction in Exec() ensures that we have a
1797       # node here
1798       snode = node_image[nname]
1799       bad_snode = snode.ghost or snode.offline
1800       _ErrorIf(inst_config.disks_active and
1801                not success and not bad_snode,
1802                constants.CV_EINSTANCEFAULTYDISK, instance,
1803                "couldn't retrieve status for disk/%s on %s: %s",
1804                idx, nname, bdev_status)
1805       _ErrorIf((inst_config.disks_active and
1806                 success and bdev_status.ldisk_status == constants.LDS_FAULTY),
1807                constants.CV_EINSTANCEFAULTYDISK, instance,
1808                "disk/%s on %s is faulty", idx, nname)
1809
1810     _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
1811              constants.CV_ENODERPC, pnode, "instance %s, connection to"
1812              " primary node failed", instance)
1813
1814     _ErrorIf(len(inst_config.secondary_nodes) > 1,
1815              constants.CV_EINSTANCELAYOUT,
1816              instance, "instance has multiple secondary nodes: %s",
1817              utils.CommaJoin(inst_config.secondary_nodes),
1818              code=self.ETYPE_WARNING)
1819
1820     if inst_config.disk_template not in constants.DTS_EXCL_STORAGE:
1821       # Disk template not compatible with exclusive_storage: no instance
1822       # node should have the flag set
1823       es_flags = rpc.GetExclusiveStorageForNodeNames(self.cfg,
1824                                                      inst_config.all_nodes)
1825       es_nodes = [n for (n, es) in es_flags.items()
1826                   if es]
1827       _ErrorIf(es_nodes, constants.CV_EINSTANCEUNSUITABLENODE, instance,
1828                "instance has template %s, which is not supported on nodes"
1829                " that have exclusive storage set: %s",
1830                inst_config.disk_template, utils.CommaJoin(es_nodes))
1831
1832     if inst_config.disk_template in constants.DTS_INT_MIRROR:
1833       instance_nodes = utils.NiceSort(inst_config.all_nodes)
1834       instance_groups = {}
1835
1836       for node in instance_nodes:
1837         instance_groups.setdefault(self.all_node_info[node].group,
1838                                    []).append(node)
1839
1840       pretty_list = [
1841         "%s (group %s)" % (utils.CommaJoin(nodes), groupinfo[group].name)
1842         # Sort so that we always list the primary node first.
1843         for group, nodes in sorted(instance_groups.items(),
1844                                    key=lambda (_, nodes): pnode in nodes,
1845                                    reverse=True)]
1846
1847       self._ErrorIf(len(instance_groups) > 1,
1848                     constants.CV_EINSTANCESPLITGROUPS,
1849                     instance, "instance has primary and secondary nodes in"
1850                     " different groups: %s", utils.CommaJoin(pretty_list),
1851                     code=self.ETYPE_WARNING)
1852
1853     inst_nodes_offline = []
1854     for snode in inst_config.secondary_nodes:
1855       s_img = node_image[snode]
1856       _ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
1857                snode, "instance %s, connection to secondary node failed",
1858                instance)
1859
1860       if s_img.offline:
1861         inst_nodes_offline.append(snode)
1862
1863     # warn that the instance lives on offline nodes
1864     _ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE, instance,
1865              "instance has offline secondary node(s) %s",
1866              utils.CommaJoin(inst_nodes_offline))
1867     # ... or ghost/non-vm_capable nodes
1868     for node in inst_config.all_nodes:
1869       _ErrorIf(node_image[node].ghost, constants.CV_EINSTANCEBADNODE,
1870                instance, "instance lives on ghost node %s", node)
1871       _ErrorIf(not node_image[node].vm_capable, constants.CV_EINSTANCEBADNODE,
1872                instance, "instance lives on non-vm_capable node %s", node)
1873
1874   def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
1875     """Verify if there are any unknown volumes in the cluster.
1876
1877     The .os, .swap and backup volumes are ignored. All other volumes are
1878     reported as unknown.
1879
1880     @type reserved: L{ganeti.utils.FieldSet}
1881     @param reserved: a FieldSet of reserved volume names
1882
1883     """
1884     for node, n_img in node_image.items():
1885       if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
1886           self.all_node_info[node].group != self.group_uuid):
1887         # skip non-healthy nodes
1888         continue
1889       for volume in n_img.volumes:
1890         test = ((node not in node_vol_should or
1891                 volume not in node_vol_should[node]) and
1892                 not reserved.Matches(volume))
1893         self._ErrorIf(test, constants.CV_ENODEORPHANLV, node,
1894                       "volume %s is unknown", volume)
1895
1896   def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
1897     """Verify N+1 Memory Resilience.
1898
1899     Check that if one single node dies we can still start all the
1900     instances it was primary for.
1901
1902     """
1903     cluster_info = self.cfg.GetClusterInfo()
1904     for node, n_img in node_image.items():
1905       # This code checks that every node which is now listed as
1906       # secondary has enough memory to host all instances it is
1907       # supposed to should a single other node in the cluster fail.
1908       # FIXME: not ready for failover to an arbitrary node
1909       # FIXME: does not support file-backed instances
1910       # WARNING: we currently take into account down instances as well
1911       # as up ones, considering that even if they're down someone
1912       # might want to start them even in the event of a node failure.
1913       if n_img.offline or self.all_node_info[node].group != self.group_uuid:
1914         # we're skipping nodes marked offline and nodes in other groups from
1915         # the N+1 warning, since most likely we don't have good memory
1916         # infromation from them; we already list instances living on such
1917         # nodes, and that's enough warning
1918         continue
1919       #TODO(dynmem): also consider ballooning out other instances
1920       for prinode, instances in n_img.sbp.items():
1921         needed_mem = 0
1922         for instance in instances:
1923           bep = cluster_info.FillBE(instance_cfg[instance])
1924           if bep[constants.BE_AUTO_BALANCE]:
1925             needed_mem += bep[constants.BE_MINMEM]
1926         test = n_img.mfree < needed_mem
1927         self._ErrorIf(test, constants.CV_ENODEN1, node,
1928                       "not enough memory to accomodate instance failovers"
1929                       " should node %s fail (%dMiB needed, %dMiB available)",
1930                       prinode, needed_mem, n_img.mfree)
1931
1932   @classmethod
1933   def _VerifyFiles(cls, errorif, nodeinfo, master_node, all_nvinfo,
1934                    (files_all, files_opt, files_mc, files_vm)):
1935     """Verifies file checksums collected from all nodes.
1936
1937     @param errorif: Callback for reporting errors
1938     @param nodeinfo: List of L{objects.Node} objects
1939     @param master_node: Name of master node
1940     @param all_nvinfo: RPC results
1941
1942     """
1943     # Define functions determining which nodes to consider for a file
1944     files2nodefn = [
1945       (files_all, None),
1946       (files_mc, lambda node: (node.master_candidate or
1947                                node.name == master_node)),
1948       (files_vm, lambda node: node.vm_capable),
1949       ]
1950
1951     # Build mapping from filename to list of nodes which should have the file
1952     nodefiles = {}
1953     for (files, fn) in files2nodefn:
1954       if fn is None:
1955         filenodes = nodeinfo
1956       else:
1957         filenodes = filter(fn, nodeinfo)
1958       nodefiles.update((filename,
1959                         frozenset(map(operator.attrgetter("name"), filenodes)))
1960                        for filename in files)
1961
1962     assert set(nodefiles) == (files_all | files_mc | files_vm)
1963
1964     fileinfo = dict((filename, {}) for filename in nodefiles)
1965     ignore_nodes = set()
1966
1967     for node in nodeinfo:
1968       if node.offline:
1969         ignore_nodes.add(node.name)
1970         continue
1971
1972       nresult = all_nvinfo[node.name]
1973
1974       if nresult.fail_msg or not nresult.payload:
1975         node_files = None
1976       else:
1977         fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
1978         node_files = dict((vcluster.LocalizeVirtualPath(key), value)
1979                           for (key, value) in fingerprints.items())
1980         del fingerprints
1981
1982       test = not (node_files and isinstance(node_files, dict))
1983       errorif(test, constants.CV_ENODEFILECHECK, node.name,
1984               "Node did not return file checksum data")
1985       if test:
1986         ignore_nodes.add(node.name)
1987         continue
1988
1989       # Build per-checksum mapping from filename to nodes having it
1990       for (filename, checksum) in node_files.items():
1991         assert filename in nodefiles
1992         fileinfo[filename].setdefault(checksum, set()).add(node.name)
1993
1994     for (filename, checksums) in fileinfo.items():
1995       assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
1996
1997       # Nodes having the file
1998       with_file = frozenset(node_name
1999                             for nodes in fileinfo[filename].values()
2000                             for node_name in nodes) - ignore_nodes
2001
2002       expected_nodes = nodefiles[filename] - ignore_nodes
2003
2004       # Nodes missing file
2005       missing_file = expected_nodes - with_file
2006
2007       if filename in files_opt:
2008         # All or no nodes
2009         errorif(missing_file and missing_file != expected_nodes,
2010                 constants.CV_ECLUSTERFILECHECK, None,
2011                 "File %s is optional, but it must exist on all or no"
2012                 " nodes (not found on %s)",
2013                 filename, utils.CommaJoin(utils.NiceSort(missing_file)))
2014       else:
2015         errorif(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2016                 "File %s is missing from node(s) %s", filename,
2017                 utils.CommaJoin(utils.NiceSort(missing_file)))
2018
2019         # Warn if a node has a file it shouldn't
2020         unexpected = with_file - expected_nodes
2021         errorif(unexpected,
2022                 constants.CV_ECLUSTERFILECHECK, None,
2023                 "File %s should not exist on node(s) %s",
2024                 filename, utils.CommaJoin(utils.NiceSort(unexpected)))
2025
2026       # See if there are multiple versions of the file
2027       test = len(checksums) > 1
2028       if test:
2029         variants = ["variant %s on %s" %
2030                     (idx + 1, utils.CommaJoin(utils.NiceSort(nodes)))
2031                     for (idx, (checksum, nodes)) in
2032                       enumerate(sorted(checksums.items()))]
2033       else:
2034         variants = []
2035
2036       errorif(test, constants.CV_ECLUSTERFILECHECK, None,
2037               "File %s found with %s different checksums (%s)",
2038               filename, len(checksums), "; ".join(variants))
2039
2040   def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2041                       drbd_map):
2042     """Verifies and the node DRBD status.
2043
2044     @type ninfo: L{objects.Node}
2045     @param ninfo: the node to check
2046     @param nresult: the remote results for the node
2047     @param instanceinfo: the dict of instances
2048     @param drbd_helper: the configured DRBD usermode helper
2049     @param drbd_map: the DRBD map as returned by
2050         L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2051
2052     """
2053     node = ninfo.name
2054     _ErrorIf = self._ErrorIf # pylint: disable=C0103
2055
2056     if drbd_helper:
2057       helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2058       test = (helper_result is None)
2059       _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2060                "no drbd usermode helper returned")
2061       if helper_result:
2062         status, payload = helper_result
2063         test = not status
2064         _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2065                  "drbd usermode helper check unsuccessful: %s", payload)
2066         test = status and (payload != drbd_helper)
2067         _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2068                  "wrong drbd usermode helper: %s", payload)
2069
2070     # compute the DRBD minors
2071     node_drbd = {}
2072     for minor, instance in drbd_map[node].items():
2073       test = instance not in instanceinfo
2074       _ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2075                "ghost instance '%s' in temporary DRBD map", instance)
2076         # ghost instance should not be running, but otherwise we
2077         # don't give double warnings (both ghost instance and
2078         # unallocated minor in use)
2079       if test:
2080         node_drbd[minor] = (instance, False)
2081       else:
2082         instance = instanceinfo[instance]
2083         node_drbd[minor] = (instance.name, instance.disks_active)
2084
2085     # and now check them
2086     used_minors = nresult.get(constants.NV_DRBDLIST, [])
2087     test = not isinstance(used_minors, (tuple, list))
2088     _ErrorIf(test, constants.CV_ENODEDRBD, node,
2089              "cannot parse drbd status file: %s", str(used_minors))
2090     if test:
2091       # we cannot check drbd status
2092       return
2093
2094     for minor, (iname, must_exist) in node_drbd.items():
2095       test = minor not in used_minors and must_exist
2096       _ErrorIf(test, constants.CV_ENODEDRBD, node,
2097                "drbd minor %d of instance %s is not active", minor, iname)
2098     for minor in used_minors:
2099       test = minor not in node_drbd
2100       _ErrorIf(test, constants.CV_ENODEDRBD, node,
2101                "unallocated drbd minor %d is in use", minor)
2102
2103   def _UpdateNodeOS(self, ninfo, nresult, nimg):
2104     """Builds the node OS structures.
2105
2106     @type ninfo: L{objects.Node}
2107     @param ninfo: the node to check
2108     @param nresult: the remote results for the node
2109     @param nimg: the node image object
2110
2111     """
2112     node = ninfo.name
2113     _ErrorIf = self._ErrorIf # pylint: disable=C0103
2114
2115     remote_os = nresult.get(constants.NV_OSLIST, None)
2116     test = (not isinstance(remote_os, list) or
2117             not compat.all(isinstance(v, list) and len(v) == 7
2118                            for v in remote_os))
2119
2120     _ErrorIf(test, constants.CV_ENODEOS, node,
2121              "node hasn't returned valid OS data")
2122
2123     nimg.os_fail = test
2124
2125     if test:
2126       return
2127
2128     os_dict = {}
2129
2130     for (name, os_path, status, diagnose,
2131          variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2132
2133       if name not in os_dict:
2134         os_dict[name] = []
2135
2136       # parameters is a list of lists instead of list of tuples due to
2137       # JSON lacking a real tuple type, fix it:
2138       parameters = [tuple(v) for v in parameters]
2139       os_dict[name].append((os_path, status, diagnose,
2140                             set(variants), set(parameters), set(api_ver)))
2141
2142     nimg.oslist = os_dict
2143
2144   def _VerifyNodeOS(self, ninfo, nimg, base):
2145     """Verifies the node OS list.
2146
2147     @type ninfo: L{objects.Node}
2148     @param ninfo: the node to check
2149     @param nimg: the node image object
2150     @param base: the 'template' node we match against (e.g. from the master)
2151
2152     """
2153     node = ninfo.name
2154     _ErrorIf = self._ErrorIf # pylint: disable=C0103
2155
2156     assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2157
2158     beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2159     for os_name, os_data in nimg.oslist.items():
2160       assert os_data, "Empty OS status for OS %s?!" % os_name
2161       f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2162       _ErrorIf(not f_status, constants.CV_ENODEOS, node,
2163                "Invalid OS %s (located at %s): %s", os_name, f_path, f_diag)
2164       _ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, node,
2165                "OS '%s' has multiple entries (first one shadows the rest): %s",
2166                os_name, utils.CommaJoin([v[0] for v in os_data]))
2167       # comparisons with the 'base' image
2168       test = os_name not in base.oslist
2169       _ErrorIf(test, constants.CV_ENODEOS, node,
2170                "Extra OS %s not present on reference node (%s)",
2171                os_name, base.name)
2172       if test:
2173         continue
2174       assert base.oslist[os_name], "Base node has empty OS status?"
2175       _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2176       if not b_status:
2177         # base OS is invalid, skipping
2178         continue
2179       for kind, a, b in [("API version", f_api, b_api),
2180                          ("variants list", f_var, b_var),
2181                          ("parameters", beautify_params(f_param),
2182                           beautify_params(b_param))]:
2183         _ErrorIf(a != b, constants.CV_ENODEOS, node,
2184                  "OS %s for %s differs from reference node %s: [%s] vs. [%s]",
2185                  kind, os_name, base.name,
2186                  utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2187
2188     # check any missing OSes
2189     missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2190     _ErrorIf(missing, constants.CV_ENODEOS, node,
2191              "OSes present on reference node %s but missing on this node: %s",
2192              base.name, utils.CommaJoin(missing))
2193
2194   def _VerifyFileStoragePaths(self, ninfo, nresult, is_master):
2195     """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2196
2197     @type ninfo: L{objects.Node}
2198     @param ninfo: the node to check
2199     @param nresult: the remote results for the node
2200     @type is_master: bool
2201     @param is_master: Whether node is the master node
2202
2203     """
2204     node = ninfo.name
2205
2206     if (is_master and
2207         (constants.ENABLE_FILE_STORAGE or
2208          constants.ENABLE_SHARED_FILE_STORAGE)):
2209       try:
2210         fspaths = nresult[constants.NV_FILE_STORAGE_PATHS]
2211       except KeyError:
2212         # This should never happen
2213         self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, node,
2214                       "Node did not return forbidden file storage paths")
2215       else:
2216         self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, node,
2217                       "Found forbidden file storage paths: %s",
2218                       utils.CommaJoin(fspaths))
2219     else:
2220       self._ErrorIf(constants.NV_FILE_STORAGE_PATHS in nresult,
2221                     constants.CV_ENODEFILESTORAGEPATHS, node,
2222                     "Node should not have returned forbidden file storage"
2223                     " paths")
2224
2225   def _VerifyOob(self, ninfo, nresult):
2226     """Verifies out of band functionality of a node.
2227
2228     @type ninfo: L{objects.Node}
2229     @param ninfo: the node to check
2230     @param nresult: the remote results for the node
2231
2232     """
2233     node = ninfo.name
2234     # We just have to verify the paths on master and/or master candidates
2235     # as the oob helper is invoked on the master
2236     if ((ninfo.master_candidate or ninfo.master_capable) and
2237         constants.NV_OOB_PATHS in nresult):
2238       for path_result in nresult[constants.NV_OOB_PATHS]:
2239         self._ErrorIf(path_result, constants.CV_ENODEOOBPATH, node, path_result)
2240
2241   def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2242     """Verifies and updates the node volume data.
2243
2244     This function will update a L{NodeImage}'s internal structures
2245     with data from the remote call.
2246
2247     @type ninfo: L{objects.Node}
2248     @param ninfo: the node to check
2249     @param nresult: the remote results for the node
2250     @param nimg: the node image object
2251     @param vg_name: the configured VG name
2252
2253     """
2254     node = ninfo.name
2255     _ErrorIf = self._ErrorIf # pylint: disable=C0103
2256
2257     nimg.lvm_fail = True
2258     lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2259     if vg_name is None:
2260       pass
2261     elif isinstance(lvdata, basestring):
2262       _ErrorIf(True, constants.CV_ENODELVM, node, "LVM problem on node: %s",
2263                utils.SafeEncode(lvdata))
2264     elif not isinstance(lvdata, dict):
2265       _ErrorIf(True, constants.CV_ENODELVM, node,
2266                "rpc call to node failed (lvlist)")
2267     else:
2268       nimg.volumes = lvdata
2269       nimg.lvm_fail = False
2270
2271   def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2272     """Verifies and updates the node instance list.
2273
2274     If the listing was successful, then updates this node's instance
2275     list. Otherwise, it marks the RPC call as failed for the instance
2276     list key.
2277
2278     @type ninfo: L{objects.Node}
2279     @param ninfo: the node to check
2280     @param nresult: the remote results for the node
2281     @param nimg: the node image object
2282
2283     """
2284     idata = nresult.get(constants.NV_INSTANCELIST, None)
2285     test = not isinstance(idata, list)
2286     self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2287                   "rpc call to node failed (instancelist): %s",
2288                   utils.SafeEncode(str(idata)))
2289     if test:
2290       nimg.hyp_fail = True
2291     else:
2292       nimg.instances = idata
2293
2294   def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2295     """Verifies and computes a node information map
2296
2297     @type ninfo: L{objects.Node}
2298     @param ninfo: the node to check
2299     @param nresult: the remote results for the node
2300     @param nimg: the node image object
2301     @param vg_name: the configured VG name
2302
2303     """
2304     node = ninfo.name
2305     _ErrorIf = self._ErrorIf # pylint: disable=C0103
2306
2307     # try to read free memory (from the hypervisor)
2308     hv_info = nresult.get(constants.NV_HVINFO, None)
2309     test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2310     _ErrorIf(test, constants.CV_ENODEHV, node,
2311              "rpc call to node failed (hvinfo)")
2312     if not test:
2313       try:
2314         nimg.mfree = int(hv_info["memory_free"])
2315       except (ValueError, TypeError):
2316         _ErrorIf(True, constants.CV_ENODERPC, node,
2317                  "node returned invalid nodeinfo, check hypervisor")
2318
2319     # FIXME: devise a free space model for file based instances as well
2320     if vg_name is not None:
2321       test = (constants.NV_VGLIST not in nresult or
2322               vg_name not in nresult[constants.NV_VGLIST])
2323       _ErrorIf(test, constants.CV_ENODELVM, node,
2324                "node didn't return data for the volume group '%s'"
2325                " - it is either missing or broken", vg_name)
2326       if not test:
2327         try:
2328           nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2329         except (ValueError, TypeError):
2330           _ErrorIf(True, constants.CV_ENODERPC, node,
2331                    "node returned invalid LVM info, check LVM status")
2332
2333   def _CollectDiskInfo(self, nodelist, node_image, instanceinfo):
2334     """Gets per-disk status information for all instances.
2335
2336     @type nodelist: list of strings
2337     @param nodelist: Node names
2338     @type node_image: dict of (name, L{objects.Node})
2339     @param node_image: Node objects
2340     @type instanceinfo: dict of (name, L{objects.Instance})
2341     @param instanceinfo: Instance objects
2342     @rtype: {instance: {node: [(succes, payload)]}}
2343     @return: a dictionary of per-instance dictionaries with nodes as
2344         keys and disk information as values; the disk information is a
2345         list of tuples (success, payload)
2346
2347     """
2348     _ErrorIf = self._ErrorIf # pylint: disable=C0103
2349
2350     node_disks = {}
2351     node_disks_devonly = {}
2352     diskless_instances = set()
2353     diskless = constants.DT_DISKLESS
2354
2355     for nname in nodelist:
2356       node_instances = list(itertools.chain(node_image[nname].pinst,
2357                                             node_image[nname].sinst))
2358       diskless_instances.update(inst for inst in node_instances
2359                                 if instanceinfo[inst].disk_template == diskless)
2360       disks = [(inst, disk)
2361                for inst in node_instances
2362                for disk in instanceinfo[inst].disks]
2363
2364       if not disks:
2365         # No need to collect data
2366         continue
2367
2368       node_disks[nname] = disks
2369
2370       # _AnnotateDiskParams makes already copies of the disks
2371       devonly = []
2372       for (inst, dev) in disks:
2373         (anno_disk,) = AnnotateDiskParams(instanceinfo[inst], [dev], self.cfg)
2374         self.cfg.SetDiskID(anno_disk, nname)
2375         devonly.append(anno_disk)
2376
2377       node_disks_devonly[nname] = devonly
2378
2379     assert len(node_disks) == len(node_disks_devonly)
2380
2381     # Collect data from all nodes with disks
2382     result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2383                                                           node_disks_devonly)
2384
2385     assert len(result) == len(node_disks)
2386
2387     instdisk = {}
2388
2389     for (nname, nres) in result.items():
2390       disks = node_disks[nname]
2391
2392       if nres.offline:
2393         # No data from this node
2394         data = len(disks) * [(False, "node offline")]
2395       else:
2396         msg = nres.fail_msg
2397         _ErrorIf(msg, constants.CV_ENODERPC, nname,
2398                  "while getting disk information: %s", msg)
2399         if msg:
2400           # No data from this node
2401           data = len(disks) * [(False, msg)]
2402         else:
2403           data = []
2404           for idx, i in enumerate(nres.payload):
2405             if isinstance(i, (tuple, list)) and len(i) == 2:
2406               data.append(i)
2407             else:
2408               logging.warning("Invalid result from node %s, entry %d: %s",
2409                               nname, idx, i)
2410               data.append((False, "Invalid result from the remote node"))
2411
2412       for ((inst, _), status) in zip(disks, data):
2413         instdisk.setdefault(inst, {}).setdefault(nname, []).append(status)
2414
2415     # Add empty entries for diskless instances.
2416     for inst in diskless_instances:
2417       assert inst not in instdisk
2418       instdisk[inst] = {}
2419
2420     assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2421                       len(nnames) <= len(instanceinfo[inst].all_nodes) and
2422                       compat.all(isinstance(s, (tuple, list)) and
2423                                  len(s) == 2 for s in statuses)
2424                       for inst, nnames in instdisk.items()
2425                       for nname, statuses in nnames.items())
2426     if __debug__:
2427       instdisk_keys = set(instdisk)
2428       instanceinfo_keys = set(instanceinfo)
2429       assert instdisk_keys == instanceinfo_keys, \
2430         ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2431          (instdisk_keys, instanceinfo_keys))
2432
2433     return instdisk
2434
2435   @staticmethod
2436   def _SshNodeSelector(group_uuid, all_nodes):
2437     """Create endless iterators for all potential SSH check hosts.
2438
2439     """
2440     nodes = [node for node in all_nodes
2441              if (node.group != group_uuid and
2442                  not node.offline)]
2443     keyfunc = operator.attrgetter("group")
2444
2445     return map(itertools.cycle,
2446                [sorted(map(operator.attrgetter("name"), names))
2447                 for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2448                                                   keyfunc)])
2449
2450   @classmethod
2451   def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2452     """Choose which nodes should talk to which other nodes.
2453
2454     We will make nodes contact all nodes in their group, and one node from
2455     every other group.
2456
2457     @warning: This algorithm has a known issue if one node group is much
2458       smaller than others (e.g. just one node). In such a case all other
2459       nodes will talk to the single node.
2460
2461     """
2462     online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2463     sel = cls._SshNodeSelector(group_uuid, all_nodes)
2464
2465     return (online_nodes,
2466             dict((name, sorted([i.next() for i in sel]))
2467                  for name in online_nodes))
2468
2469   def BuildHooksEnv(self):
2470     """Build hooks env.
2471
2472     Cluster-Verify hooks just ran in the post phase and their failure makes
2473     the output be logged in the verify output and the verification to fail.
2474
2475     """
2476     env = {
2477       "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2478       }
2479
2480     env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2481                for node in self.my_node_info.values())
2482
2483     return env
2484
2485   def BuildHooksNodes(self):
2486     """Build hooks nodes.
2487
2488     """
2489     return ([], self.my_node_names)
2490
2491   def Exec(self, feedback_fn):
2492     """Verify integrity of the node group, performing various test on nodes.
2493
2494     """
2495     # This method has too many local variables. pylint: disable=R0914
2496     feedback_fn("* Verifying group '%s'" % self.group_info.name)
2497
2498     if not self.my_node_names:
2499       # empty node group
2500       feedback_fn("* Empty node group, skipping verification")
2501       return True
2502
2503     self.bad = False
2504     _ErrorIf = self._ErrorIf # pylint: disable=C0103
2505     verbose = self.op.verbose
2506     self._feedback_fn = feedback_fn
2507
2508     vg_name = self.cfg.GetVGName()
2509     drbd_helper = self.cfg.GetDRBDHelper()
2510     cluster = self.cfg.GetClusterInfo()
2511     hypervisors = cluster.enabled_hypervisors
2512     node_data_list = [self.my_node_info[name] for name in self.my_node_names]
2513
2514     i_non_redundant = [] # Non redundant instances
2515     i_non_a_balanced = [] # Non auto-balanced instances
2516     i_offline = 0 # Count of offline instances
2517     n_offline = 0 # Count of offline nodes
2518     n_drained = 0 # Count of nodes being drained
2519     node_vol_should = {}
2520
2521     # FIXME: verify OS list
2522
2523     # File verification
2524     filemap = ComputeAncillaryFiles(cluster, False)
2525
2526     # do local checksums
2527     master_node = self.master_node = self.cfg.GetMasterNode()
2528     master_ip = self.cfg.GetMasterIP()
2529
2530     feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_names))
2531
2532     user_scripts = []
2533     if self.cfg.GetUseExternalMipScript():
2534       user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2535
2536     node_verify_param = {
2537       constants.NV_FILELIST:
2538         map(vcluster.MakeVirtualPath,
2539             utils.UniqueSequence(filename
2540                                  for files in filemap
2541                                  for filename in files)),
2542       constants.NV_NODELIST:
2543         self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2544                                   self.all_node_info.values()),
2545       constants.NV_HYPERVISOR: hypervisors,
2546       constants.NV_HVPARAMS:
2547         _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2548       constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2549                                  for node in node_data_list
2550                                  if not node.offline],
2551       constants.NV_INSTANCELIST: hypervisors,
2552       constants.NV_VERSION: None,
2553       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2554       constants.NV_NODESETUP: None,
2555       constants.NV_TIME: None,
2556       constants.NV_MASTERIP: (master_node, master_ip),
2557       constants.NV_OSLIST: None,
2558       constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2559       constants.NV_USERSCRIPTS: user_scripts,
2560       }
2561
2562     if vg_name is not None:
2563       node_verify_param[constants.NV_VGLIST] = None
2564       node_verify_param[constants.NV_LVLIST] = vg_name
2565       node_verify_param[constants.NV_PVLIST] = [vg_name]
2566
2567     if drbd_helper:
2568       node_verify_param[constants.NV_DRBDLIST] = None
2569       node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2570
2571     if constants.ENABLE_FILE_STORAGE or constants.ENABLE_SHARED_FILE_STORAGE:
2572       # Load file storage paths only from master node
2573       node_verify_param[constants.NV_FILE_STORAGE_PATHS] = master_node
2574
2575     # bridge checks
2576     # FIXME: this needs to be changed per node-group, not cluster-wide
2577     bridges = set()
2578     default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2579     if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2580       bridges.add(default_nicpp[constants.NIC_LINK])
2581     for instance in self.my_inst_info.values():
2582       for nic in instance.nics:
2583         full_nic = cluster.SimpleFillNIC(nic.nicparams)
2584         if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2585           bridges.add(full_nic[constants.NIC_LINK])
2586
2587     if bridges:
2588       node_verify_param[constants.NV_BRIDGES] = list(bridges)
2589
2590     # Build our expected cluster state
2591     node_image = dict((node.name, self.NodeImage(offline=node.offline,
2592                                                  name=node.name,
2593                                                  vm_capable=node.vm_capable))
2594                       for node in node_data_list)
2595
2596     # Gather OOB paths
2597     oob_paths = []
2598     for node in self.all_node_info.values():
2599       path = SupportsOob(self.cfg, node)
2600       if path and path not in oob_paths:
2601         oob_paths.append(path)
2602
2603     if oob_paths:
2604       node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2605
2606     for instance in self.my_inst_names:
2607       inst_config = self.my_inst_info[instance]
2608       if inst_config.admin_state == constants.ADMINST_OFFLINE:
2609         i_offline += 1
2610
2611       for nname in inst_config.all_nodes:
2612         if nname not in node_image:
2613           gnode = self.NodeImage(name=nname)
2614           gnode.ghost = (nname not in self.all_node_info)
2615           node_image[nname] = gnode
2616
2617       inst_config.MapLVsByNode(node_vol_should)
2618
2619       pnode = inst_config.primary_node
2620       node_image[pnode].pinst.append(instance)
2621
2622       for snode in inst_config.secondary_nodes:
2623         nimg = node_image[snode]
2624         nimg.sinst.append(instance)
2625         if pnode not in nimg.sbp:
2626           nimg.sbp[pnode] = []
2627         nimg.sbp[pnode].append(instance)
2628
2629     es_flags = rpc.GetExclusiveStorageForNodeNames(self.cfg, self.my_node_names)
2630     # The value of exclusive_storage should be the same across the group, so if
2631     # it's True for at least a node, we act as if it were set for all the nodes
2632     self._exclusive_storage = compat.any(es_flags.values())
2633     if self._exclusive_storage:
2634       node_verify_param[constants.NV_EXCLUSIVEPVS] = True
2635
2636     # At this point, we have the in-memory data structures complete,
2637     # except for the runtime information, which we'll gather next
2638
2639     # Due to the way our RPC system works, exact response times cannot be
2640     # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2641     # time before and after executing the request, we can at least have a time
2642     # window.
2643     nvinfo_starttime = time.time()
2644     all_nvinfo = self.rpc.call_node_verify(self.my_node_names,
2645                                            node_verify_param,
2646                                            self.cfg.GetClusterName())
2647     nvinfo_endtime = time.time()
2648
2649     if self.extra_lv_nodes and vg_name is not None:
2650       extra_lv_nvinfo = \
2651           self.rpc.call_node_verify(self.extra_lv_nodes,
2652                                     {constants.NV_LVLIST: vg_name},
2653                                     self.cfg.GetClusterName())
2654     else:
2655       extra_lv_nvinfo = {}
2656
2657     all_drbd_map = self.cfg.ComputeDRBDMap()
2658
2659     feedback_fn("* Gathering disk information (%s nodes)" %
2660                 len(self.my_node_names))
2661     instdisk = self._CollectDiskInfo(self.my_node_names, node_image,
2662                                      self.my_inst_info)
2663
2664     feedback_fn("* Verifying configuration file consistency")
2665
2666     # If not all nodes are being checked, we need to make sure the master node
2667     # and a non-checked vm_capable node are in the list.
2668     absent_nodes = set(self.all_node_info).difference(self.my_node_info)
2669     if absent_nodes:
2670       vf_nvinfo = all_nvinfo.copy()
2671       vf_node_info = list(self.my_node_info.values())
2672       additional_nodes = []
2673       if master_node not in self.my_node_info:
2674         additional_nodes.append(master_node)
2675         vf_node_info.append(self.all_node_info[master_node])
2676       # Add the first vm_capable node we find which is not included,
2677       # excluding the master node (which we already have)
2678       for node in absent_nodes:
2679         nodeinfo = self.all_node_info[node]
2680         if (nodeinfo.vm_capable and not nodeinfo.offline and
2681             node != master_node):
2682           additional_nodes.append(node)
2683           vf_node_info.append(self.all_node_info[node])
2684           break
2685       key = constants.NV_FILELIST
2686       vf_nvinfo.update(self.rpc.call_node_verify(additional_nodes,
2687                                                  {key: node_verify_param[key]},
2688                                                  self.cfg.GetClusterName()))
2689     else:
2690       vf_nvinfo = all_nvinfo
2691       vf_node_info = self.my_node_info.values()
2692
2693     self._VerifyFiles(_ErrorIf, vf_node_info, master_node, vf_nvinfo, filemap)
2694
2695     feedback_fn("* Verifying node status")
2696
2697     refos_img = None
2698
2699     for node_i in node_data_list:
2700       node = node_i.name
2701       nimg = node_image[node]
2702
2703       if node_i.offline:
2704         if verbose:
2705           feedback_fn("* Skipping offline node %s" % (node,))
2706         n_offline += 1
2707         continue
2708
2709       if node == master_node:
2710         ntype = "master"
2711       elif node_i.master_candidate:
2712         ntype = "master candidate"
2713       elif node_i.drained:
2714         ntype = "drained"
2715         n_drained += 1
2716       else:
2717         ntype = "regular"
2718       if verbose:
2719         feedback_fn("* Verifying node %s (%s)" % (node, ntype))
2720
2721       msg = all_nvinfo[node].fail_msg
2722       _ErrorIf(msg, constants.CV_ENODERPC, node, "while contacting node: %s",
2723                msg)
2724       if msg:
2725         nimg.rpc_fail = True
2726         continue
2727
2728       nresult = all_nvinfo[node].payload
2729
2730       nimg.call_ok = self._VerifyNode(node_i, nresult)
2731       self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2732       self._VerifyNodeNetwork(node_i, nresult)
2733       self._VerifyNodeUserScripts(node_i, nresult)
2734       self._VerifyOob(node_i, nresult)
2735       self._VerifyFileStoragePaths(node_i, nresult,
2736                                    node == master_node)
2737
2738       if nimg.vm_capable:
2739         self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
2740         self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
2741                              all_drbd_map)
2742
2743         self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2744         self._UpdateNodeInstances(node_i, nresult, nimg)
2745         self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2746         self._UpdateNodeOS(node_i, nresult, nimg)
2747
2748         if not nimg.os_fail:
2749           if refos_img is None:
2750             refos_img = nimg
2751           self._VerifyNodeOS(node_i, nimg, refos_img)
2752         self._VerifyNodeBridges(node_i, nresult, bridges)
2753
2754         # Check whether all running instancies are primary for the node. (This
2755         # can no longer be done from _VerifyInstance below, since some of the
2756         # wrong instances could be from other node groups.)
2757         non_primary_inst = set(nimg.instances).difference(nimg.pinst)
2758
2759         for inst in non_primary_inst:
2760           test = inst in self.all_inst_info
2761           _ErrorIf(test, constants.CV_EINSTANCEWRONGNODE, inst,
2762                    "instance should not run on node %s", node_i.name)
2763           _ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
2764                    "node is running unknown instance %s", inst)
2765
2766     self._VerifyGroupLVM(node_image, vg_name)
2767
2768     for node, result in extra_lv_nvinfo.items():
2769       self._UpdateNodeVolumes(self.all_node_info[node], result.payload,
2770                               node_image[node], vg_name)
2771
2772     feedback_fn("* Verifying instance status")
2773     for instance in self.my_inst_names:
2774       if verbose:
2775         feedback_fn("* Verifying instance %s" % instance)
2776       inst_config = self.my_inst_info[instance]
2777       self._VerifyInstance(instance, inst_config, node_image,
2778                            instdisk[instance])
2779
2780       # If the instance is non-redundant we cannot survive losing its primary
2781       # node, so we are not N+1 compliant.
2782       if inst_config.disk_template not in constants.DTS_MIRRORED:
2783         i_non_redundant.append(instance)
2784
2785       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
2786         i_non_a_balanced.append(instance)
2787
2788     feedback_fn("* Verifying orphan volumes")
2789     reserved = utils.FieldSet(*cluster.reserved_lvs)
2790
2791     # We will get spurious "unknown volume" warnings if any node of this group
2792     # is secondary for an instance whose primary is in another group. To avoid
2793     # them, we find these instances and add their volumes to node_vol_should.
2794     for inst in self.all_inst_info.values():
2795       for secondary in inst.secondary_nodes:
2796         if (secondary in self.my_node_info
2797             and inst.name not in self.my_inst_info):
2798           inst.MapLVsByNode(node_vol_should)
2799           break
2800
2801     self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
2802
2803     if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
2804       feedback_fn("* Verifying N+1 Memory redundancy")
2805       self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
2806
2807     feedback_fn("* Other Notes")
2808     if i_non_redundant:
2809       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
2810                   % len(i_non_redundant))
2811
2812     if i_non_a_balanced:
2813       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
2814                   % len(i_non_a_balanced))
2815
2816     if i_offline:
2817       feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
2818
2819     if n_offline:
2820       feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
2821
2822     if n_drained:
2823       feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
2824
2825     return not self.bad
2826
2827   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
2828     """Analyze the post-hooks' result
2829
2830     This method analyses the hook result, handles it, and sends some
2831     nicely-formatted feedback back to the user.
2832
2833     @param phase: one of L{constants.HOOKS_PHASE_POST} or
2834         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
2835     @param hooks_results: the results of the multi-node hooks rpc call
2836     @param feedback_fn: function used send feedback back to the caller
2837     @param lu_result: previous Exec result
2838     @return: the new Exec result, based on the previous result
2839         and hook results
2840
2841     """
2842     # We only really run POST phase hooks, only for non-empty groups,
2843     # and are only interested in their results
2844     if not self.my_node_names:
2845       # empty node group
2846       pass
2847     elif phase == constants.HOOKS_PHASE_POST:
2848       # Used to change hooks' output to proper indentation
2849       feedback_fn("* Hooks Results")
2850       assert hooks_results, "invalid result from hooks"
2851
2852       for node_name in hooks_results:
2853         res = hooks_results[node_name]
2854         msg = res.fail_msg
2855         test = msg and not res.offline
2856         self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
2857                       "Communication failure in hooks execution: %s", msg)
2858         if res.offline or msg:
2859           # No need to investigate payload if node is offline or gave
2860           # an error.
2861           continue
2862         for script, hkr, output in res.payload:
2863           test = hkr == constants.HKR_FAIL
2864           self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
2865                         "Script %s failed, output:", script)
2866           if test:
2867             output = self._HOOKS_INDENT_RE.sub("      ", output)
2868             feedback_fn("%s" % output)
2869             lu_result = False
2870
2871     return lu_result
2872
2873
2874 class LUClusterVerifyDisks(NoHooksLU):
2875   """Verifies the cluster disks status.
2876
2877   """
2878   REQ_BGL = False
2879
2880   def ExpandNames(self):
2881     self.share_locks = ShareAll()
2882     self.needed_locks = {
2883       locking.LEVEL_NODEGROUP: locking.ALL_SET,
2884       }
2885
2886   def Exec(self, feedback_fn):
2887     group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
2888
2889     # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
2890     return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
2891                            for group in group_names])