9ced853b1aaba888dc5372887a62fabd1b20e24a
[ganeti-local] / lib / cmdlib / cluster.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Logical units dealing with the cluster."""
23
24 import OpenSSL
25
26 import copy
27 import itertools
28 import logging
29 import operator
30 import os
31 import re
32 import time
33
34 from ganeti import compat
35 from ganeti import constants
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import masterd
40 from ganeti import netutils
41 from ganeti import objects
42 from ganeti import opcodes
43 from ganeti import pathutils
44 from ganeti import query
45 from ganeti import rpc
46 from ganeti import runtime
47 from ganeti import ssh
48 from ganeti import uidpool
49 from ganeti import utils
50 from ganeti import vcluster
51
52 from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
53   ResultWithJobs
54 from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55   ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56   GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57   GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58   CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59   ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob
60
61 import ganeti.masterd.instance
62
63
64 class LUClusterActivateMasterIp(NoHooksLU):
65   """Activate the master IP on the master node.
66
67   """
68   def Exec(self, feedback_fn):
69     """Activate the master IP.
70
71     """
72     master_params = self.cfg.GetMasterNetworkParameters()
73     ems = self.cfg.GetUseExternalMipScript()
74     result = self.rpc.call_node_activate_master_ip(master_params.name,
75                                                    master_params, ems)
76     result.Raise("Could not activate the master IP")
77
78
79 class LUClusterDeactivateMasterIp(NoHooksLU):
80   """Deactivate the master IP on the master node.
81
82   """
83   def Exec(self, feedback_fn):
84     """Deactivate the master IP.
85
86     """
87     master_params = self.cfg.GetMasterNetworkParameters()
88     ems = self.cfg.GetUseExternalMipScript()
89     result = self.rpc.call_node_deactivate_master_ip(master_params.name,
90                                                      master_params, ems)
91     result.Raise("Could not deactivate the master IP")
92
93
94 class LUClusterConfigQuery(NoHooksLU):
95   """Return configuration values.
96
97   """
98   REQ_BGL = False
99
100   def CheckArguments(self):
101     self.cq = ClusterQuery(None, self.op.output_fields, False)
102
103   def ExpandNames(self):
104     self.cq.ExpandNames(self)
105
106   def DeclareLocks(self, level):
107     self.cq.DeclareLocks(self, level)
108
109   def Exec(self, feedback_fn):
110     result = self.cq.OldStyleQuery(self)
111
112     assert len(result) == 1
113
114     return result[0]
115
116
117 class LUClusterDestroy(LogicalUnit):
118   """Logical unit for destroying the cluster.
119
120   """
121   HPATH = "cluster-destroy"
122   HTYPE = constants.HTYPE_CLUSTER
123
124   def BuildHooksEnv(self):
125     """Build hooks env.
126
127     """
128     return {
129       "OP_TARGET": self.cfg.GetClusterName(),
130       }
131
132   def BuildHooksNodes(self):
133     """Build hooks nodes.
134
135     """
136     return ([], [])
137
138   def CheckPrereq(self):
139     """Check prerequisites.
140
141     This checks whether the cluster is empty.
142
143     Any errors are signaled by raising errors.OpPrereqError.
144
145     """
146     master = self.cfg.GetMasterNode()
147
148     nodelist = self.cfg.GetNodeList()
149     if len(nodelist) != 1 or nodelist[0] != master:
150       raise errors.OpPrereqError("There are still %d node(s) in"
151                                  " this cluster." % (len(nodelist) - 1),
152                                  errors.ECODE_INVAL)
153     instancelist = self.cfg.GetInstanceList()
154     if instancelist:
155       raise errors.OpPrereqError("There are still %d instance(s) in"
156                                  " this cluster." % len(instancelist),
157                                  errors.ECODE_INVAL)
158
159   def Exec(self, feedback_fn):
160     """Destroys the cluster.
161
162     """
163     master_params = self.cfg.GetMasterNetworkParameters()
164
165     # Run post hooks on master node before it's removed
166     RunPostHook(self, master_params.name)
167
168     ems = self.cfg.GetUseExternalMipScript()
169     result = self.rpc.call_node_deactivate_master_ip(master_params.name,
170                                                      master_params, ems)
171     if result.fail_msg:
172       self.LogWarning("Error disabling the master IP address: %s",
173                       result.fail_msg)
174
175     return master_params.name
176
177
178 class LUClusterPostInit(LogicalUnit):
179   """Logical unit for running hooks after cluster initialization.
180
181   """
182   HPATH = "cluster-init"
183   HTYPE = constants.HTYPE_CLUSTER
184
185   def BuildHooksEnv(self):
186     """Build hooks env.
187
188     """
189     return {
190       "OP_TARGET": self.cfg.GetClusterName(),
191       }
192
193   def BuildHooksNodes(self):
194     """Build hooks nodes.
195
196     """
197     return ([], [self.cfg.GetMasterNode()])
198
199   def Exec(self, feedback_fn):
200     """Nothing to do.
201
202     """
203     return True
204
205
206 class ClusterQuery(QueryBase):
207   FIELDS = query.CLUSTER_FIELDS
208
209   #: Do not sort (there is only one item)
210   SORT_FIELD = None
211
212   def ExpandNames(self, lu):
213     lu.needed_locks = {}
214
215     # The following variables interact with _QueryBase._GetNames
216     self.wanted = locking.ALL_SET
217     self.do_locking = self.use_locking
218
219     if self.do_locking:
220       raise errors.OpPrereqError("Can not use locking for cluster queries",
221                                  errors.ECODE_INVAL)
222
223   def DeclareLocks(self, lu, level):
224     pass
225
226   def _GetQueryData(self, lu):
227     """Computes the list of nodes and their attributes.
228
229     """
230     # Locking is not used
231     assert not (compat.any(lu.glm.is_owned(level)
232                            for level in locking.LEVELS
233                            if level != locking.LEVEL_CLUSTER) or
234                 self.do_locking or self.use_locking)
235
236     if query.CQ_CONFIG in self.requested_data:
237       cluster = lu.cfg.GetClusterInfo()
238     else:
239       cluster = NotImplemented
240
241     if query.CQ_QUEUE_DRAINED in self.requested_data:
242       drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
243     else:
244       drain_flag = NotImplemented
245
246     if query.CQ_WATCHER_PAUSE in self.requested_data:
247       master_name = lu.cfg.GetMasterNode()
248
249       result = lu.rpc.call_get_watcher_pause(master_name)
250       result.Raise("Can't retrieve watcher pause from master node '%s'" %
251                    master_name)
252
253       watcher_pause = result.payload
254     else:
255       watcher_pause = NotImplemented
256
257     return query.ClusterQueryData(cluster, drain_flag, watcher_pause)
258
259
260 class LUClusterQuery(NoHooksLU):
261   """Query cluster configuration.
262
263   """
264   REQ_BGL = False
265
266   def ExpandNames(self):
267     self.needed_locks = {}
268
269   def Exec(self, feedback_fn):
270     """Return cluster config.
271
272     """
273     cluster = self.cfg.GetClusterInfo()
274     os_hvp = {}
275
276     # Filter just for enabled hypervisors
277     for os_name, hv_dict in cluster.os_hvp.items():
278       os_hvp[os_name] = {}
279       for hv_name, hv_params in hv_dict.items():
280         if hv_name in cluster.enabled_hypervisors:
281           os_hvp[os_name][hv_name] = hv_params
282
283     # Convert ip_family to ip_version
284     primary_ip_version = constants.IP4_VERSION
285     if cluster.primary_ip_family == netutils.IP6Address.family:
286       primary_ip_version = constants.IP6_VERSION
287
288     result = {
289       "software_version": constants.RELEASE_VERSION,
290       "protocol_version": constants.PROTOCOL_VERSION,
291       "config_version": constants.CONFIG_VERSION,
292       "os_api_version": max(constants.OS_API_VERSIONS),
293       "export_version": constants.EXPORT_VERSION,
294       "architecture": runtime.GetArchInfo(),
295       "name": cluster.cluster_name,
296       "master": cluster.master_node,
297       "default_hypervisor": cluster.primary_hypervisor,
298       "enabled_hypervisors": cluster.enabled_hypervisors,
299       "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
300                         for hypervisor_name in cluster.enabled_hypervisors]),
301       "os_hvp": os_hvp,
302       "beparams": cluster.beparams,
303       "osparams": cluster.osparams,
304       "ipolicy": cluster.ipolicy,
305       "nicparams": cluster.nicparams,
306       "ndparams": cluster.ndparams,
307       "diskparams": cluster.diskparams,
308       "candidate_pool_size": cluster.candidate_pool_size,
309       "master_netdev": cluster.master_netdev,
310       "master_netmask": cluster.master_netmask,
311       "use_external_mip_script": cluster.use_external_mip_script,
312       "volume_group_name": cluster.volume_group_name,
313       "drbd_usermode_helper": cluster.drbd_usermode_helper,
314       "file_storage_dir": cluster.file_storage_dir,
315       "shared_file_storage_dir": cluster.shared_file_storage_dir,
316       "maintain_node_health": cluster.maintain_node_health,
317       "ctime": cluster.ctime,
318       "mtime": cluster.mtime,
319       "uuid": cluster.uuid,
320       "tags": list(cluster.GetTags()),
321       "uid_pool": cluster.uid_pool,
322       "default_iallocator": cluster.default_iallocator,
323       "reserved_lvs": cluster.reserved_lvs,
324       "primary_ip_version": primary_ip_version,
325       "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
326       "hidden_os": cluster.hidden_os,
327       "blacklisted_os": cluster.blacklisted_os,
328       "enabled_disk_templates": cluster.enabled_disk_templates,
329       }
330
331     return result
332
333
334 class LUClusterRedistConf(NoHooksLU):
335   """Force the redistribution of cluster configuration.
336
337   This is a very simple LU.
338
339   """
340   REQ_BGL = False
341
342   def ExpandNames(self):
343     self.needed_locks = {
344       locking.LEVEL_NODE: locking.ALL_SET,
345       locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
346     }
347     self.share_locks = ShareAll()
348
349   def Exec(self, feedback_fn):
350     """Redistribute the configuration.
351
352     """
353     self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
354     RedistributeAncillaryFiles(self)
355
356
357 class LUClusterRename(LogicalUnit):
358   """Rename the cluster.
359
360   """
361   HPATH = "cluster-rename"
362   HTYPE = constants.HTYPE_CLUSTER
363
364   def BuildHooksEnv(self):
365     """Build hooks env.
366
367     """
368     return {
369       "OP_TARGET": self.cfg.GetClusterName(),
370       "NEW_NAME": self.op.name,
371       }
372
373   def BuildHooksNodes(self):
374     """Build hooks nodes.
375
376     """
377     return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
378
379   def CheckPrereq(self):
380     """Verify that the passed name is a valid one.
381
382     """
383     hostname = netutils.GetHostname(name=self.op.name,
384                                     family=self.cfg.GetPrimaryIPFamily())
385
386     new_name = hostname.name
387     self.ip = new_ip = hostname.ip
388     old_name = self.cfg.GetClusterName()
389     old_ip = self.cfg.GetMasterIP()
390     if new_name == old_name and new_ip == old_ip:
391       raise errors.OpPrereqError("Neither the name nor the IP address of the"
392                                  " cluster has changed",
393                                  errors.ECODE_INVAL)
394     if new_ip != old_ip:
395       if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
396         raise errors.OpPrereqError("The given cluster IP address (%s) is"
397                                    " reachable on the network" %
398                                    new_ip, errors.ECODE_NOTUNIQUE)
399
400     self.op.name = new_name
401
402   def Exec(self, feedback_fn):
403     """Rename the cluster.
404
405     """
406     clustername = self.op.name
407     new_ip = self.ip
408
409     # shutdown the master IP
410     master_params = self.cfg.GetMasterNetworkParameters()
411     ems = self.cfg.GetUseExternalMipScript()
412     result = self.rpc.call_node_deactivate_master_ip(master_params.name,
413                                                      master_params, ems)
414     result.Raise("Could not disable the master role")
415
416     try:
417       cluster = self.cfg.GetClusterInfo()
418       cluster.cluster_name = clustername
419       cluster.master_ip = new_ip
420       self.cfg.Update(cluster, feedback_fn)
421
422       # update the known hosts file
423       ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
424       node_list = self.cfg.GetOnlineNodeList()
425       try:
426         node_list.remove(master_params.name)
427       except ValueError:
428         pass
429       UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
430     finally:
431       master_params.ip = new_ip
432       result = self.rpc.call_node_activate_master_ip(master_params.name,
433                                                      master_params, ems)
434       msg = result.fail_msg
435       if msg:
436         self.LogWarning("Could not re-enable the master role on"
437                         " the master, please restart manually: %s", msg)
438
439     return clustername
440
441
442 class LUClusterRepairDiskSizes(NoHooksLU):
443   """Verifies the cluster disks sizes.
444
445   """
446   REQ_BGL = False
447
448   def ExpandNames(self):
449     if self.op.instances:
450       self.wanted_names = GetWantedInstances(self, self.op.instances)
451       # Not getting the node allocation lock as only a specific set of
452       # instances (and their nodes) is going to be acquired
453       self.needed_locks = {
454         locking.LEVEL_NODE_RES: [],
455         locking.LEVEL_INSTANCE: self.wanted_names,
456         }
457       self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
458     else:
459       self.wanted_names = None
460       self.needed_locks = {
461         locking.LEVEL_NODE_RES: locking.ALL_SET,
462         locking.LEVEL_INSTANCE: locking.ALL_SET,
463
464         # This opcode is acquires the node locks for all instances
465         locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
466         }
467
468     self.share_locks = {
469       locking.LEVEL_NODE_RES: 1,
470       locking.LEVEL_INSTANCE: 0,
471       locking.LEVEL_NODE_ALLOC: 1,
472       }
473
474   def DeclareLocks(self, level):
475     if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
476       self._LockInstancesNodes(primary_only=True, level=level)
477
478   def CheckPrereq(self):
479     """Check prerequisites.
480
481     This only checks the optional instance list against the existing names.
482
483     """
484     if self.wanted_names is None:
485       self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
486
487     self.wanted_instances = \
488         map(compat.snd, self.cfg.GetMultiInstanceInfo(self.wanted_names))
489
490   def _EnsureChildSizes(self, disk):
491     """Ensure children of the disk have the needed disk size.
492
493     This is valid mainly for DRBD8 and fixes an issue where the
494     children have smaller disk size.
495
496     @param disk: an L{ganeti.objects.Disk} object
497
498     """
499     if disk.dev_type == constants.LD_DRBD8:
500       assert disk.children, "Empty children for DRBD8?"
501       fchild = disk.children[0]
502       mismatch = fchild.size < disk.size
503       if mismatch:
504         self.LogInfo("Child disk has size %d, parent %d, fixing",
505                      fchild.size, disk.size)
506         fchild.size = disk.size
507
508       # and we recurse on this child only, not on the metadev
509       return self._EnsureChildSizes(fchild) or mismatch
510     else:
511       return False
512
513   def Exec(self, feedback_fn):
514     """Verify the size of cluster disks.
515
516     """
517     # TODO: check child disks too
518     # TODO: check differences in size between primary/secondary nodes
519     per_node_disks = {}
520     for instance in self.wanted_instances:
521       pnode = instance.primary_node
522       if pnode not in per_node_disks:
523         per_node_disks[pnode] = []
524       for idx, disk in enumerate(instance.disks):
525         per_node_disks[pnode].append((instance, idx, disk))
526
527     assert not (frozenset(per_node_disks.keys()) -
528                 self.owned_locks(locking.LEVEL_NODE_RES)), \
529       "Not owning correct locks"
530     assert not self.owned_locks(locking.LEVEL_NODE)
531
532     changed = []
533     for node, dskl in per_node_disks.items():
534       newl = [v[2].Copy() for v in dskl]
535       for dsk in newl:
536         self.cfg.SetDiskID(dsk, node)
537       result = self.rpc.call_blockdev_getsize(node, newl)
538       if result.fail_msg:
539         self.LogWarning("Failure in blockdev_getsize call to node"
540                         " %s, ignoring", node)
541         continue
542       if len(result.payload) != len(dskl):
543         logging.warning("Invalid result from node %s: len(dksl)=%d,"
544                         " result.payload=%s", node, len(dskl), result.payload)
545         self.LogWarning("Invalid result from node %s, ignoring node results",
546                         node)
547         continue
548       for ((instance, idx, disk), size) in zip(dskl, result.payload):
549         if size is None:
550           self.LogWarning("Disk %d of instance %s did not return size"
551                           " information, ignoring", idx, instance.name)
552           continue
553         if not isinstance(size, (int, long)):
554           self.LogWarning("Disk %d of instance %s did not return valid"
555                           " size information, ignoring", idx, instance.name)
556           continue
557         size = size >> 20
558         if size != disk.size:
559           self.LogInfo("Disk %d of instance %s has mismatched size,"
560                        " correcting: recorded %d, actual %d", idx,
561                        instance.name, disk.size, size)
562           disk.size = size
563           self.cfg.Update(instance, feedback_fn)
564           changed.append((instance.name, idx, size))
565         if self._EnsureChildSizes(disk):
566           self.cfg.Update(instance, feedback_fn)
567           changed.append((instance.name, idx, disk.size))
568     return changed
569
570
571 def _ValidateNetmask(cfg, netmask):
572   """Checks if a netmask is valid.
573
574   @type cfg: L{config.ConfigWriter}
575   @param cfg: The cluster configuration
576   @type netmask: int
577   @param netmask: the netmask to be verified
578   @raise errors.OpPrereqError: if the validation fails
579
580   """
581   ip_family = cfg.GetPrimaryIPFamily()
582   try:
583     ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
584   except errors.ProgrammerError:
585     raise errors.OpPrereqError("Invalid primary ip family: %s." %
586                                ip_family, errors.ECODE_INVAL)
587   if not ipcls.ValidateNetmask(netmask):
588     raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
589                                (netmask), errors.ECODE_INVAL)
590
591
592 class LUClusterSetParams(LogicalUnit):
593   """Change the parameters of the cluster.
594
595   """
596   HPATH = "cluster-modify"
597   HTYPE = constants.HTYPE_CLUSTER
598   REQ_BGL = False
599
600   def CheckArguments(self):
601     """Check parameters
602
603     """
604     if self.op.uid_pool:
605       uidpool.CheckUidPool(self.op.uid_pool)
606
607     if self.op.add_uids:
608       uidpool.CheckUidPool(self.op.add_uids)
609
610     if self.op.remove_uids:
611       uidpool.CheckUidPool(self.op.remove_uids)
612
613     if self.op.master_netmask is not None:
614       _ValidateNetmask(self.cfg, self.op.master_netmask)
615
616     if self.op.diskparams:
617       for dt_params in self.op.diskparams.values():
618         utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
619       try:
620         utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
621       except errors.OpPrereqError, err:
622         raise errors.OpPrereqError("While verify diskparams options: %s" % err,
623                                    errors.ECODE_INVAL)
624
625   def ExpandNames(self):
626     # FIXME: in the future maybe other cluster params won't require checking on
627     # all nodes to be modified.
628     # FIXME: This opcode changes cluster-wide settings. Is acquiring all
629     # resource locks the right thing, shouldn't it be the BGL instead?
630     self.needed_locks = {
631       locking.LEVEL_NODE: locking.ALL_SET,
632       locking.LEVEL_INSTANCE: locking.ALL_SET,
633       locking.LEVEL_NODEGROUP: locking.ALL_SET,
634       locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
635     }
636     self.share_locks = ShareAll()
637
638   def BuildHooksEnv(self):
639     """Build hooks env.
640
641     """
642     return {
643       "OP_TARGET": self.cfg.GetClusterName(),
644       "NEW_VG_NAME": self.op.vg_name,
645       }
646
647   def BuildHooksNodes(self):
648     """Build hooks nodes.
649
650     """
651     mn = self.cfg.GetMasterNode()
652     return ([mn], [mn])
653
654   def CheckPrereq(self):
655     """Check prerequisites.
656
657     This checks whether the given params don't conflict and
658     if the given volume group is valid.
659
660     """
661     if self.op.vg_name is not None and not self.op.vg_name:
662       if self.cfg.HasAnyDiskOfType(constants.LD_LV):
663         raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
664                                    " instances exist", errors.ECODE_INVAL)
665
666     if self.op.drbd_helper is not None and not self.op.drbd_helper:
667       if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
668         raise errors.OpPrereqError("Cannot disable drbd helper while"
669                                    " drbd-based instances exist",
670                                    errors.ECODE_INVAL)
671
672     node_list = self.owned_locks(locking.LEVEL_NODE)
673
674     vm_capable_nodes = [node.name
675                         for node in self.cfg.GetAllNodesInfo().values()
676                         if node.name in node_list and node.vm_capable]
677
678     # if vg_name not None, checks given volume group on all nodes
679     if self.op.vg_name:
680       vglist = self.rpc.call_vg_list(vm_capable_nodes)
681       for node in vm_capable_nodes:
682         msg = vglist[node].fail_msg
683         if msg:
684           # ignoring down node
685           self.LogWarning("Error while gathering data on node %s"
686                           " (ignoring node): %s", node, msg)
687           continue
688         vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
689                                               self.op.vg_name,
690                                               constants.MIN_VG_SIZE)
691         if vgstatus:
692           raise errors.OpPrereqError("Error on node '%s': %s" %
693                                      (node, vgstatus), errors.ECODE_ENVIRON)
694
695     if self.op.drbd_helper:
696       # checks given drbd helper on all nodes
697       helpers = self.rpc.call_drbd_helper(node_list)
698       for (node, ninfo) in self.cfg.GetMultiNodeInfo(node_list):
699         if ninfo.offline:
700           self.LogInfo("Not checking drbd helper on offline node %s", node)
701           continue
702         msg = helpers[node].fail_msg
703         if msg:
704           raise errors.OpPrereqError("Error checking drbd helper on node"
705                                      " '%s': %s" % (node, msg),
706                                      errors.ECODE_ENVIRON)
707         node_helper = helpers[node].payload
708         if node_helper != self.op.drbd_helper:
709           raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
710                                      (node, node_helper), errors.ECODE_ENVIRON)
711
712     self.cluster = cluster = self.cfg.GetClusterInfo()
713     # validate params changes
714     if self.op.beparams:
715       objects.UpgradeBeParams(self.op.beparams)
716       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
717       self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
718
719     if self.op.ndparams:
720       utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
721       self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
722
723       # TODO: we need a more general way to handle resetting
724       # cluster-level parameters to default values
725       if self.new_ndparams["oob_program"] == "":
726         self.new_ndparams["oob_program"] = \
727             constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
728
729     if self.op.hv_state:
730       new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
731                                            self.cluster.hv_state_static)
732       self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
733                                for hv, values in new_hv_state.items())
734
735     if self.op.disk_state:
736       new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
737                                                self.cluster.disk_state_static)
738       self.new_disk_state = \
739         dict((storage, dict((name, cluster.SimpleFillDiskState(values))
740                             for name, values in svalues.items()))
741              for storage, svalues in new_disk_state.items())
742
743     if self.op.ipolicy:
744       self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
745                                            group_policy=False)
746
747       all_instances = self.cfg.GetAllInstancesInfo().values()
748       violations = set()
749       for group in self.cfg.GetAllNodeGroupsInfo().values():
750         instances = frozenset([inst for inst in all_instances
751                                if compat.any(node in group.members
752                                              for node in inst.all_nodes)])
753         new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
754         ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
755         new = ComputeNewInstanceViolations(ipol,
756                                            new_ipolicy, instances, self.cfg)
757         if new:
758           violations.update(new)
759
760       if violations:
761         self.LogWarning("After the ipolicy change the following instances"
762                         " violate them: %s",
763                         utils.CommaJoin(utils.NiceSort(violations)))
764
765     if self.op.nicparams:
766       utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
767       self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
768       objects.NIC.CheckParameterSyntax(self.new_nicparams)
769       nic_errors = []
770
771       # check all instances for consistency
772       for instance in self.cfg.GetAllInstancesInfo().values():
773         for nic_idx, nic in enumerate(instance.nics):
774           params_copy = copy.deepcopy(nic.nicparams)
775           params_filled = objects.FillDict(self.new_nicparams, params_copy)
776
777           # check parameter syntax
778           try:
779             objects.NIC.CheckParameterSyntax(params_filled)
780           except errors.ConfigurationError, err:
781             nic_errors.append("Instance %s, nic/%d: %s" %
782                               (instance.name, nic_idx, err))
783
784           # if we're moving instances to routed, check that they have an ip
785           target_mode = params_filled[constants.NIC_MODE]
786           if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
787             nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
788                               " address" % (instance.name, nic_idx))
789       if nic_errors:
790         raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
791                                    "\n".join(nic_errors), errors.ECODE_INVAL)
792
793     # hypervisor list/parameters
794     self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
795     if self.op.hvparams:
796       for hv_name, hv_dict in self.op.hvparams.items():
797         if hv_name not in self.new_hvparams:
798           self.new_hvparams[hv_name] = hv_dict
799         else:
800           self.new_hvparams[hv_name].update(hv_dict)
801
802     # disk template parameters
803     self.new_diskparams = objects.FillDict(cluster.diskparams, {})
804     if self.op.diskparams:
805       for dt_name, dt_params in self.op.diskparams.items():
806         if dt_name not in self.op.diskparams:
807           self.new_diskparams[dt_name] = dt_params
808         else:
809           self.new_diskparams[dt_name].update(dt_params)
810
811     # os hypervisor parameters
812     self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
813     if self.op.os_hvp:
814       for os_name, hvs in self.op.os_hvp.items():
815         if os_name not in self.new_os_hvp:
816           self.new_os_hvp[os_name] = hvs
817         else:
818           for hv_name, hv_dict in hvs.items():
819             if hv_dict is None:
820               # Delete if it exists
821               self.new_os_hvp[os_name].pop(hv_name, None)
822             elif hv_name not in self.new_os_hvp[os_name]:
823               self.new_os_hvp[os_name][hv_name] = hv_dict
824             else:
825               self.new_os_hvp[os_name][hv_name].update(hv_dict)
826
827     # os parameters
828     self.new_osp = objects.FillDict(cluster.osparams, {})
829     if self.op.osparams:
830       for os_name, osp in self.op.osparams.items():
831         if os_name not in self.new_osp:
832           self.new_osp[os_name] = {}
833
834         self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
835                                                  use_none=True)
836
837         if not self.new_osp[os_name]:
838           # we removed all parameters
839           del self.new_osp[os_name]
840         else:
841           # check the parameter validity (remote check)
842           CheckOSParams(self, False, [self.cfg.GetMasterNode()],
843                         os_name, self.new_osp[os_name])
844
845     # changes to the hypervisor list
846     if self.op.enabled_hypervisors is not None:
847       self.hv_list = self.op.enabled_hypervisors
848       for hv in self.hv_list:
849         # if the hypervisor doesn't already exist in the cluster
850         # hvparams, we initialize it to empty, and then (in both
851         # cases) we make sure to fill the defaults, as we might not
852         # have a complete defaults list if the hypervisor wasn't
853         # enabled before
854         if hv not in new_hvp:
855           new_hvp[hv] = {}
856         new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
857         utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
858     else:
859       self.hv_list = cluster.enabled_hypervisors
860
861     if self.op.hvparams or self.op.enabled_hypervisors is not None:
862       # either the enabled list has changed, or the parameters have, validate
863       for hv_name, hv_params in self.new_hvparams.items():
864         if ((self.op.hvparams and hv_name in self.op.hvparams) or
865             (self.op.enabled_hypervisors and
866              hv_name in self.op.enabled_hypervisors)):
867           # either this is a new hypervisor, or its parameters have changed
868           hv_class = hypervisor.GetHypervisorClass(hv_name)
869           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
870           hv_class.CheckParameterSyntax(hv_params)
871           CheckHVParams(self, node_list, hv_name, hv_params)
872
873     self._CheckDiskTemplateConsistency()
874
875     if self.op.os_hvp:
876       # no need to check any newly-enabled hypervisors, since the
877       # defaults have already been checked in the above code-block
878       for os_name, os_hvp in self.new_os_hvp.items():
879         for hv_name, hv_params in os_hvp.items():
880           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
881           # we need to fill in the new os_hvp on top of the actual hv_p
882           cluster_defaults = self.new_hvparams.get(hv_name, {})
883           new_osp = objects.FillDict(cluster_defaults, hv_params)
884           hv_class = hypervisor.GetHypervisorClass(hv_name)
885           hv_class.CheckParameterSyntax(new_osp)
886           CheckHVParams(self, node_list, hv_name, new_osp)
887
888     if self.op.default_iallocator:
889       alloc_script = utils.FindFile(self.op.default_iallocator,
890                                     constants.IALLOCATOR_SEARCH_PATH,
891                                     os.path.isfile)
892       if alloc_script is None:
893         raise errors.OpPrereqError("Invalid default iallocator script '%s'"
894                                    " specified" % self.op.default_iallocator,
895                                    errors.ECODE_INVAL)
896
897   def _CheckDiskTemplateConsistency(self):
898     """Check whether the disk templates that are going to be disabled
899        are still in use by some instances.
900
901     """
902     if self.op.enabled_disk_templates:
903       cluster = self.cfg.GetClusterInfo()
904       instances = self.cfg.GetAllInstancesInfo()
905
906       disk_templates_to_remove = set(cluster.enabled_disk_templates) \
907         - set(self.op.enabled_disk_templates)
908       for instance in instances.itervalues():
909         if instance.disk_template in disk_templates_to_remove:
910           raise errors.OpPrereqError("Cannot disable disk template '%s',"
911                                      " because instance '%s' is using it." %
912                                      (instance.disk_template, instance.name))
913
914   def Exec(self, feedback_fn):
915     """Change the parameters of the cluster.
916
917     """
918     if self.op.vg_name is not None:
919       new_volume = self.op.vg_name
920       if not new_volume:
921         new_volume = None
922       if new_volume != self.cfg.GetVGName():
923         self.cfg.SetVGName(new_volume)
924       else:
925         feedback_fn("Cluster LVM configuration already in desired"
926                     " state, not changing")
927     if self.op.drbd_helper is not None:
928       new_helper = self.op.drbd_helper
929       if not new_helper:
930         new_helper = None
931       if new_helper != self.cfg.GetDRBDHelper():
932         self.cfg.SetDRBDHelper(new_helper)
933       else:
934         feedback_fn("Cluster DRBD helper already in desired state,"
935                     " not changing")
936     if self.op.hvparams:
937       self.cluster.hvparams = self.new_hvparams
938     if self.op.os_hvp:
939       self.cluster.os_hvp = self.new_os_hvp
940     if self.op.enabled_hypervisors is not None:
941       self.cluster.hvparams = self.new_hvparams
942       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
943     if self.op.enabled_disk_templates:
944       self.cluster.enabled_disk_templates = \
945         list(set(self.op.enabled_disk_templates))
946     if self.op.beparams:
947       self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
948     if self.op.nicparams:
949       self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
950     if self.op.ipolicy:
951       self.cluster.ipolicy = self.new_ipolicy
952     if self.op.osparams:
953       self.cluster.osparams = self.new_osp
954     if self.op.ndparams:
955       self.cluster.ndparams = self.new_ndparams
956     if self.op.diskparams:
957       self.cluster.diskparams = self.new_diskparams
958     if self.op.hv_state:
959       self.cluster.hv_state_static = self.new_hv_state
960     if self.op.disk_state:
961       self.cluster.disk_state_static = self.new_disk_state
962
963     if self.op.candidate_pool_size is not None:
964       self.cluster.candidate_pool_size = self.op.candidate_pool_size
965       # we need to update the pool size here, otherwise the save will fail
966       AdjustCandidatePool(self, [])
967
968     if self.op.maintain_node_health is not None:
969       if self.op.maintain_node_health and not constants.ENABLE_CONFD:
970         feedback_fn("Note: CONFD was disabled at build time, node health"
971                     " maintenance is not useful (still enabling it)")
972       self.cluster.maintain_node_health = self.op.maintain_node_health
973
974     if self.op.prealloc_wipe_disks is not None:
975       self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
976
977     if self.op.add_uids is not None:
978       uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
979
980     if self.op.remove_uids is not None:
981       uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
982
983     if self.op.uid_pool is not None:
984       self.cluster.uid_pool = self.op.uid_pool
985
986     if self.op.default_iallocator is not None:
987       self.cluster.default_iallocator = self.op.default_iallocator
988
989     if self.op.reserved_lvs is not None:
990       self.cluster.reserved_lvs = self.op.reserved_lvs
991
992     if self.op.use_external_mip_script is not None:
993       self.cluster.use_external_mip_script = self.op.use_external_mip_script
994
995     def helper_os(aname, mods, desc):
996       desc += " OS list"
997       lst = getattr(self.cluster, aname)
998       for key, val in mods:
999         if key == constants.DDM_ADD:
1000           if val in lst:
1001             feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1002           else:
1003             lst.append(val)
1004         elif key == constants.DDM_REMOVE:
1005           if val in lst:
1006             lst.remove(val)
1007           else:
1008             feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1009         else:
1010           raise errors.ProgrammerError("Invalid modification '%s'" % key)
1011
1012     if self.op.hidden_os:
1013       helper_os("hidden_os", self.op.hidden_os, "hidden")
1014
1015     if self.op.blacklisted_os:
1016       helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1017
1018     if self.op.master_netdev:
1019       master_params = self.cfg.GetMasterNetworkParameters()
1020       ems = self.cfg.GetUseExternalMipScript()
1021       feedback_fn("Shutting down master ip on the current netdev (%s)" %
1022                   self.cluster.master_netdev)
1023       result = self.rpc.call_node_deactivate_master_ip(master_params.name,
1024                                                        master_params, ems)
1025       if not self.op.force:
1026         result.Raise("Could not disable the master ip")
1027       else:
1028         if result.fail_msg:
1029           msg = ("Could not disable the master ip (continuing anyway): %s" %
1030                  result.fail_msg)
1031           feedback_fn(msg)
1032       feedback_fn("Changing master_netdev from %s to %s" %
1033                   (master_params.netdev, self.op.master_netdev))
1034       self.cluster.master_netdev = self.op.master_netdev
1035
1036     if self.op.master_netmask:
1037       master_params = self.cfg.GetMasterNetworkParameters()
1038       feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1039       result = self.rpc.call_node_change_master_netmask(master_params.name,
1040                                                         master_params.netmask,
1041                                                         self.op.master_netmask,
1042                                                         master_params.ip,
1043                                                         master_params.netdev)
1044       if result.fail_msg:
1045         msg = "Could not change the master IP netmask: %s" % result.fail_msg
1046         feedback_fn(msg)
1047
1048       self.cluster.master_netmask = self.op.master_netmask
1049
1050     self.cfg.Update(self.cluster, feedback_fn)
1051
1052     if self.op.master_netdev:
1053       master_params = self.cfg.GetMasterNetworkParameters()
1054       feedback_fn("Starting the master ip on the new master netdev (%s)" %
1055                   self.op.master_netdev)
1056       ems = self.cfg.GetUseExternalMipScript()
1057       result = self.rpc.call_node_activate_master_ip(master_params.name,
1058                                                      master_params, ems)
1059       if result.fail_msg:
1060         self.LogWarning("Could not re-enable the master ip on"
1061                         " the master, please restart manually: %s",
1062                         result.fail_msg)
1063
1064
1065 class LUClusterVerify(NoHooksLU):
1066   """Submits all jobs necessary to verify the cluster.
1067
1068   """
1069   REQ_BGL = False
1070
1071   def ExpandNames(self):
1072     self.needed_locks = {}
1073
1074   def Exec(self, feedback_fn):
1075     jobs = []
1076
1077     if self.op.group_name:
1078       groups = [self.op.group_name]
1079       depends_fn = lambda: None
1080     else:
1081       groups = self.cfg.GetNodeGroupList()
1082
1083       # Verify global configuration
1084       jobs.append([
1085         opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1086         ])
1087
1088       # Always depend on global verification
1089       depends_fn = lambda: [(-len(jobs), [])]
1090
1091     jobs.extend(
1092       [opcodes.OpClusterVerifyGroup(group_name=group,
1093                                     ignore_errors=self.op.ignore_errors,
1094                                     depends=depends_fn())]
1095       for group in groups)
1096
1097     # Fix up all parameters
1098     for op in itertools.chain(*jobs): # pylint: disable=W0142
1099       op.debug_simulate_errors = self.op.debug_simulate_errors
1100       op.verbose = self.op.verbose
1101       op.error_codes = self.op.error_codes
1102       try:
1103         op.skip_checks = self.op.skip_checks
1104       except AttributeError:
1105         assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1106
1107     return ResultWithJobs(jobs)
1108
1109
1110 class _VerifyErrors(object):
1111   """Mix-in for cluster/group verify LUs.
1112
1113   It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1114   self.op and self._feedback_fn to be available.)
1115
1116   """
1117
1118   ETYPE_FIELD = "code"
1119   ETYPE_ERROR = "ERROR"
1120   ETYPE_WARNING = "WARNING"
1121
1122   def _Error(self, ecode, item, msg, *args, **kwargs):
1123     """Format an error message.
1124
1125     Based on the opcode's error_codes parameter, either format a
1126     parseable error code, or a simpler error string.
1127
1128     This must be called only from Exec and functions called from Exec.
1129
1130     """
1131     ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1132     itype, etxt, _ = ecode
1133     # If the error code is in the list of ignored errors, demote the error to a
1134     # warning
1135     if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1136       ltype = self.ETYPE_WARNING
1137     # first complete the msg
1138     if args:
1139       msg = msg % args
1140     # then format the whole message
1141     if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1142       msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1143     else:
1144       if item:
1145         item = " " + item
1146       else:
1147         item = ""
1148       msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1149     # and finally report it via the feedback_fn
1150     self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1151     # do not mark the operation as failed for WARN cases only
1152     if ltype == self.ETYPE_ERROR:
1153       self.bad = True
1154
1155   def _ErrorIf(self, cond, *args, **kwargs):
1156     """Log an error message if the passed condition is True.
1157
1158     """
1159     if (bool(cond)
1160         or self.op.debug_simulate_errors): # pylint: disable=E1101
1161       self._Error(*args, **kwargs)
1162
1163
1164 def _VerifyCertificate(filename):
1165   """Verifies a certificate for L{LUClusterVerifyConfig}.
1166
1167   @type filename: string
1168   @param filename: Path to PEM file
1169
1170   """
1171   try:
1172     cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1173                                            utils.ReadFile(filename))
1174   except Exception, err: # pylint: disable=W0703
1175     return (LUClusterVerifyConfig.ETYPE_ERROR,
1176             "Failed to load X509 certificate %s: %s" % (filename, err))
1177
1178   (errcode, msg) = \
1179     utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1180                                 constants.SSL_CERT_EXPIRATION_ERROR)
1181
1182   if msg:
1183     fnamemsg = "While verifying %s: %s" % (filename, msg)
1184   else:
1185     fnamemsg = None
1186
1187   if errcode is None:
1188     return (None, fnamemsg)
1189   elif errcode == utils.CERT_WARNING:
1190     return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1191   elif errcode == utils.CERT_ERROR:
1192     return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1193
1194   raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1195
1196
1197 def _GetAllHypervisorParameters(cluster, instances):
1198   """Compute the set of all hypervisor parameters.
1199
1200   @type cluster: L{objects.Cluster}
1201   @param cluster: the cluster object
1202   @param instances: list of L{objects.Instance}
1203   @param instances: additional instances from which to obtain parameters
1204   @rtype: list of (origin, hypervisor, parameters)
1205   @return: a list with all parameters found, indicating the hypervisor they
1206        apply to, and the origin (can be "cluster", "os X", or "instance Y")
1207
1208   """
1209   hvp_data = []
1210
1211   for hv_name in cluster.enabled_hypervisors:
1212     hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1213
1214   for os_name, os_hvp in cluster.os_hvp.items():
1215     for hv_name, hv_params in os_hvp.items():
1216       if hv_params:
1217         full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1218         hvp_data.append(("os %s" % os_name, hv_name, full_params))
1219
1220   # TODO: collapse identical parameter values in a single one
1221   for instance in instances:
1222     if instance.hvparams:
1223       hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1224                        cluster.FillHV(instance)))
1225
1226   return hvp_data
1227
1228
1229 class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1230   """Verifies the cluster config.
1231
1232   """
1233   REQ_BGL = False
1234
1235   def _VerifyHVP(self, hvp_data):
1236     """Verifies locally the syntax of the hypervisor parameters.
1237
1238     """
1239     for item, hv_name, hv_params in hvp_data:
1240       msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1241              (item, hv_name))
1242       try:
1243         hv_class = hypervisor.GetHypervisorClass(hv_name)
1244         utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1245         hv_class.CheckParameterSyntax(hv_params)
1246       except errors.GenericError, err:
1247         self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1248
1249   def ExpandNames(self):
1250     self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1251     self.share_locks = ShareAll()
1252
1253   def CheckPrereq(self):
1254     """Check prerequisites.
1255
1256     """
1257     # Retrieve all information
1258     self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1259     self.all_node_info = self.cfg.GetAllNodesInfo()
1260     self.all_inst_info = self.cfg.GetAllInstancesInfo()
1261
1262   def Exec(self, feedback_fn):
1263     """Verify integrity of cluster, performing various test on nodes.
1264
1265     """
1266     self.bad = False
1267     self._feedback_fn = feedback_fn
1268
1269     feedback_fn("* Verifying cluster config")
1270
1271     for msg in self.cfg.VerifyConfig():
1272       self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1273
1274     feedback_fn("* Verifying cluster certificate files")
1275
1276     for cert_filename in pathutils.ALL_CERT_FILES:
1277       (errcode, msg) = _VerifyCertificate(cert_filename)
1278       self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1279
1280     feedback_fn("* Verifying hypervisor parameters")
1281
1282     self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1283                                                 self.all_inst_info.values()))
1284
1285     feedback_fn("* Verifying all nodes belong to an existing group")
1286
1287     # We do this verification here because, should this bogus circumstance
1288     # occur, it would never be caught by VerifyGroup, which only acts on
1289     # nodes/instances reachable from existing node groups.
1290
1291     dangling_nodes = set(node.name for node in self.all_node_info.values()
1292                          if node.group not in self.all_group_info)
1293
1294     dangling_instances = {}
1295     no_node_instances = []
1296
1297     for inst in self.all_inst_info.values():
1298       if inst.primary_node in dangling_nodes:
1299         dangling_instances.setdefault(inst.primary_node, []).append(inst.name)
1300       elif inst.primary_node not in self.all_node_info:
1301         no_node_instances.append(inst.name)
1302
1303     pretty_dangling = [
1304         "%s (%s)" %
1305         (node.name,
1306          utils.CommaJoin(dangling_instances.get(node.name,
1307                                                 ["no instances"])))
1308         for node in dangling_nodes]
1309
1310     self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1311                   None,
1312                   "the following nodes (and their instances) belong to a non"
1313                   " existing group: %s", utils.CommaJoin(pretty_dangling))
1314
1315     self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1316                   None,
1317                   "the following instances have a non-existing primary-node:"
1318                   " %s", utils.CommaJoin(no_node_instances))
1319
1320     return not self.bad
1321
1322
1323 class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1324   """Verifies the status of a node group.
1325
1326   """
1327   HPATH = "cluster-verify"
1328   HTYPE = constants.HTYPE_CLUSTER
1329   REQ_BGL = False
1330
1331   _HOOKS_INDENT_RE = re.compile("^", re.M)
1332
1333   class NodeImage(object):
1334     """A class representing the logical and physical status of a node.
1335
1336     @type name: string
1337     @ivar name: the node name to which this object refers
1338     @ivar volumes: a structure as returned from
1339         L{ganeti.backend.GetVolumeList} (runtime)
1340     @ivar instances: a list of running instances (runtime)
1341     @ivar pinst: list of configured primary instances (config)
1342     @ivar sinst: list of configured secondary instances (config)
1343     @ivar sbp: dictionary of {primary-node: list of instances} for all
1344         instances for which this node is secondary (config)
1345     @ivar mfree: free memory, as reported by hypervisor (runtime)
1346     @ivar dfree: free disk, as reported by the node (runtime)
1347     @ivar offline: the offline status (config)
1348     @type rpc_fail: boolean
1349     @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1350         not whether the individual keys were correct) (runtime)
1351     @type lvm_fail: boolean
1352     @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1353     @type hyp_fail: boolean
1354     @ivar hyp_fail: whether the RPC call didn't return the instance list
1355     @type ghost: boolean
1356     @ivar ghost: whether this is a known node or not (config)
1357     @type os_fail: boolean
1358     @ivar os_fail: whether the RPC call didn't return valid OS data
1359     @type oslist: list
1360     @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1361     @type vm_capable: boolean
1362     @ivar vm_capable: whether the node can host instances
1363     @type pv_min: float
1364     @ivar pv_min: size in MiB of the smallest PVs
1365     @type pv_max: float
1366     @ivar pv_max: size in MiB of the biggest PVs
1367
1368     """
1369     def __init__(self, offline=False, name=None, vm_capable=True):
1370       self.name = name
1371       self.volumes = {}
1372       self.instances = []
1373       self.pinst = []
1374       self.sinst = []
1375       self.sbp = {}
1376       self.mfree = 0
1377       self.dfree = 0
1378       self.offline = offline
1379       self.vm_capable = vm_capable
1380       self.rpc_fail = False
1381       self.lvm_fail = False
1382       self.hyp_fail = False
1383       self.ghost = False
1384       self.os_fail = False
1385       self.oslist = {}
1386       self.pv_min = None
1387       self.pv_max = None
1388
1389   def ExpandNames(self):
1390     # This raises errors.OpPrereqError on its own:
1391     self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1392
1393     # Get instances in node group; this is unsafe and needs verification later
1394     inst_names = \
1395       self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1396
1397     self.needed_locks = {
1398       locking.LEVEL_INSTANCE: inst_names,
1399       locking.LEVEL_NODEGROUP: [self.group_uuid],
1400       locking.LEVEL_NODE: [],
1401
1402       # This opcode is run by watcher every five minutes and acquires all nodes
1403       # for a group. It doesn't run for a long time, so it's better to acquire
1404       # the node allocation lock as well.
1405       locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1406       }
1407
1408     self.share_locks = ShareAll()
1409
1410   def DeclareLocks(self, level):
1411     if level == locking.LEVEL_NODE:
1412       # Get members of node group; this is unsafe and needs verification later
1413       nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1414
1415       all_inst_info = self.cfg.GetAllInstancesInfo()
1416
1417       # In Exec(), we warn about mirrored instances that have primary and
1418       # secondary living in separate node groups. To fully verify that
1419       # volumes for these instances are healthy, we will need to do an
1420       # extra call to their secondaries. We ensure here those nodes will
1421       # be locked.
1422       for inst in self.owned_locks(locking.LEVEL_INSTANCE):
1423         # Important: access only the instances whose lock is owned
1424         if all_inst_info[inst].disk_template in constants.DTS_INT_MIRROR:
1425           nodes.update(all_inst_info[inst].secondary_nodes)
1426
1427       self.needed_locks[locking.LEVEL_NODE] = nodes
1428
1429   def CheckPrereq(self):
1430     assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1431     self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1432
1433     group_nodes = set(self.group_info.members)
1434     group_instances = \
1435       self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1436
1437     unlocked_nodes = \
1438         group_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1439
1440     unlocked_instances = \
1441         group_instances.difference(self.owned_locks(locking.LEVEL_INSTANCE))
1442
1443     if unlocked_nodes:
1444       raise errors.OpPrereqError("Missing lock for nodes: %s" %
1445                                  utils.CommaJoin(unlocked_nodes),
1446                                  errors.ECODE_STATE)
1447
1448     if unlocked_instances:
1449       raise errors.OpPrereqError("Missing lock for instances: %s" %
1450                                  utils.CommaJoin(unlocked_instances),
1451                                  errors.ECODE_STATE)
1452
1453     self.all_node_info = self.cfg.GetAllNodesInfo()
1454     self.all_inst_info = self.cfg.GetAllInstancesInfo()
1455
1456     self.my_node_names = utils.NiceSort(group_nodes)
1457     self.my_inst_names = utils.NiceSort(group_instances)
1458
1459     self.my_node_info = dict((name, self.all_node_info[name])
1460                              for name in self.my_node_names)
1461
1462     self.my_inst_info = dict((name, self.all_inst_info[name])
1463                              for name in self.my_inst_names)
1464
1465     # We detect here the nodes that will need the extra RPC calls for verifying
1466     # split LV volumes; they should be locked.
1467     extra_lv_nodes = set()
1468
1469     for inst in self.my_inst_info.values():
1470       if inst.disk_template in constants.DTS_INT_MIRROR:
1471         for nname in inst.all_nodes:
1472           if self.all_node_info[nname].group != self.group_uuid:
1473             extra_lv_nodes.add(nname)
1474
1475     unlocked_lv_nodes = \
1476         extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1477
1478     if unlocked_lv_nodes:
1479       raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1480                                  utils.CommaJoin(unlocked_lv_nodes),
1481                                  errors.ECODE_STATE)
1482     self.extra_lv_nodes = list(extra_lv_nodes)
1483
1484   def _VerifyNode(self, ninfo, nresult):
1485     """Perform some basic validation on data returned from a node.
1486
1487       - check the result data structure is well formed and has all the
1488         mandatory fields
1489       - check ganeti version
1490
1491     @type ninfo: L{objects.Node}
1492     @param ninfo: the node to check
1493     @param nresult: the results from the node
1494     @rtype: boolean
1495     @return: whether overall this call was successful (and we can expect
1496          reasonable values in the respose)
1497
1498     """
1499     node = ninfo.name
1500     _ErrorIf = self._ErrorIf # pylint: disable=C0103
1501
1502     # main result, nresult should be a non-empty dict
1503     test = not nresult or not isinstance(nresult, dict)
1504     _ErrorIf(test, constants.CV_ENODERPC, node,
1505                   "unable to verify node: no data returned")
1506     if test:
1507       return False
1508
1509     # compares ganeti version
1510     local_version = constants.PROTOCOL_VERSION
1511     remote_version = nresult.get("version", None)
1512     test = not (remote_version and
1513                 isinstance(remote_version, (list, tuple)) and
1514                 len(remote_version) == 2)
1515     _ErrorIf(test, constants.CV_ENODERPC, node,
1516              "connection to node returned invalid data")
1517     if test:
1518       return False
1519
1520     test = local_version != remote_version[0]
1521     _ErrorIf(test, constants.CV_ENODEVERSION, node,
1522              "incompatible protocol versions: master %s,"
1523              " node %s", local_version, remote_version[0])
1524     if test:
1525       return False
1526
1527     # node seems compatible, we can actually try to look into its results
1528
1529     # full package version
1530     self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1531                   constants.CV_ENODEVERSION, node,
1532                   "software version mismatch: master %s, node %s",
1533                   constants.RELEASE_VERSION, remote_version[1],
1534                   code=self.ETYPE_WARNING)
1535
1536     hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1537     if ninfo.vm_capable and isinstance(hyp_result, dict):
1538       for hv_name, hv_result in hyp_result.iteritems():
1539         test = hv_result is not None
1540         _ErrorIf(test, constants.CV_ENODEHV, node,
1541                  "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1542
1543     hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1544     if ninfo.vm_capable and isinstance(hvp_result, list):
1545       for item, hv_name, hv_result in hvp_result:
1546         _ErrorIf(True, constants.CV_ENODEHV, node,
1547                  "hypervisor %s parameter verify failure (source %s): %s",
1548                  hv_name, item, hv_result)
1549
1550     test = nresult.get(constants.NV_NODESETUP,
1551                        ["Missing NODESETUP results"])
1552     _ErrorIf(test, constants.CV_ENODESETUP, node, "node setup error: %s",
1553              "; ".join(test))
1554
1555     return True
1556
1557   def _VerifyNodeTime(self, ninfo, nresult,
1558                       nvinfo_starttime, nvinfo_endtime):
1559     """Check the node time.
1560
1561     @type ninfo: L{objects.Node}
1562     @param ninfo: the node to check
1563     @param nresult: the remote results for the node
1564     @param nvinfo_starttime: the start time of the RPC call
1565     @param nvinfo_endtime: the end time of the RPC call
1566
1567     """
1568     node = ninfo.name
1569     _ErrorIf = self._ErrorIf # pylint: disable=C0103
1570
1571     ntime = nresult.get(constants.NV_TIME, None)
1572     try:
1573       ntime_merged = utils.MergeTime(ntime)
1574     except (ValueError, TypeError):
1575       _ErrorIf(True, constants.CV_ENODETIME, node, "Node returned invalid time")
1576       return
1577
1578     if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1579       ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1580     elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1581       ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1582     else:
1583       ntime_diff = None
1584
1585     _ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, node,
1586              "Node time diverges by at least %s from master node time",
1587              ntime_diff)
1588
1589   def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1590     """Check the node LVM results and update info for cross-node checks.
1591
1592     @type ninfo: L{objects.Node}
1593     @param ninfo: the node to check
1594     @param nresult: the remote results for the node
1595     @param vg_name: the configured VG name
1596     @type nimg: L{NodeImage}
1597     @param nimg: node image
1598
1599     """
1600     if vg_name is None:
1601       return
1602
1603     node = ninfo.name
1604     _ErrorIf = self._ErrorIf # pylint: disable=C0103
1605
1606     # checks vg existence and size > 20G
1607     vglist = nresult.get(constants.NV_VGLIST, None)
1608     test = not vglist
1609     _ErrorIf(test, constants.CV_ENODELVM, node, "unable to check volume groups")
1610     if not test:
1611       vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1612                                             constants.MIN_VG_SIZE)
1613       _ErrorIf(vgstatus, constants.CV_ENODELVM, node, vgstatus)
1614
1615     # Check PVs
1616     (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1617     for em in errmsgs:
1618       self._Error(constants.CV_ENODELVM, node, em)
1619     if pvminmax is not None:
1620       (nimg.pv_min, nimg.pv_max) = pvminmax
1621
1622   def _VerifyGroupLVM(self, node_image, vg_name):
1623     """Check cross-node consistency in LVM.
1624
1625     @type node_image: dict
1626     @param node_image: info about nodes, mapping from node to names to
1627       L{NodeImage} objects
1628     @param vg_name: the configured VG name
1629
1630     """
1631     if vg_name is None:
1632       return
1633
1634     # Only exlcusive storage needs this kind of checks
1635     if not self._exclusive_storage:
1636       return
1637
1638     # exclusive_storage wants all PVs to have the same size (approximately),
1639     # if the smallest and the biggest ones are okay, everything is fine.
1640     # pv_min is None iff pv_max is None
1641     vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1642     if not vals:
1643       return
1644     (pvmin, minnode) = min((ni.pv_min, ni.name) for ni in vals)
1645     (pvmax, maxnode) = max((ni.pv_max, ni.name) for ni in vals)
1646     bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1647     self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1648                   "PV sizes differ too much in the group; smallest (%s MB) is"
1649                   " on %s, biggest (%s MB) is on %s",
1650                   pvmin, minnode, pvmax, maxnode)
1651
1652   def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1653     """Check the node bridges.
1654
1655     @type ninfo: L{objects.Node}
1656     @param ninfo: the node to check
1657     @param nresult: the remote results for the node
1658     @param bridges: the expected list of bridges
1659
1660     """
1661     if not bridges:
1662       return
1663
1664     node = ninfo.name
1665     _ErrorIf = self._ErrorIf # pylint: disable=C0103
1666
1667     missing = nresult.get(constants.NV_BRIDGES, None)
1668     test = not isinstance(missing, list)
1669     _ErrorIf(test, constants.CV_ENODENET, node,
1670              "did not return valid bridge information")
1671     if not test:
1672       _ErrorIf(bool(missing), constants.CV_ENODENET, node,
1673                "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1674
1675   def _VerifyNodeUserScripts(self, ninfo, nresult):
1676     """Check the results of user scripts presence and executability on the node
1677
1678     @type ninfo: L{objects.Node}
1679     @param ninfo: the node to check
1680     @param nresult: the remote results for the node
1681
1682     """
1683     node = ninfo.name
1684
1685     test = not constants.NV_USERSCRIPTS in nresult
1686     self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, node,
1687                   "did not return user scripts information")
1688
1689     broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
1690     if not test:
1691       self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, node,
1692                     "user scripts not present or not executable: %s" %
1693                     utils.CommaJoin(sorted(broken_scripts)))
1694
1695   def _VerifyNodeNetwork(self, ninfo, nresult):
1696     """Check the node network connectivity results.
1697
1698     @type ninfo: L{objects.Node}
1699     @param ninfo: the node to check
1700     @param nresult: the remote results for the node
1701
1702     """
1703     node = ninfo.name
1704     _ErrorIf = self._ErrorIf # pylint: disable=C0103
1705
1706     test = constants.NV_NODELIST not in nresult
1707     _ErrorIf(test, constants.CV_ENODESSH, node,
1708              "node hasn't returned node ssh connectivity data")
1709     if not test:
1710       if nresult[constants.NV_NODELIST]:
1711         for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1712           _ErrorIf(True, constants.CV_ENODESSH, node,
1713                    "ssh communication with node '%s': %s", a_node, a_msg)
1714
1715     test = constants.NV_NODENETTEST not in nresult
1716     _ErrorIf(test, constants.CV_ENODENET, node,
1717              "node hasn't returned node tcp connectivity data")
1718     if not test:
1719       if nresult[constants.NV_NODENETTEST]:
1720         nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1721         for anode in nlist:
1722           _ErrorIf(True, constants.CV_ENODENET, node,
1723                    "tcp communication with node '%s': %s",
1724                    anode, nresult[constants.NV_NODENETTEST][anode])
1725
1726     test = constants.NV_MASTERIP not in nresult
1727     _ErrorIf(test, constants.CV_ENODENET, node,
1728              "node hasn't returned node master IP reachability data")
1729     if not test:
1730       if not nresult[constants.NV_MASTERIP]:
1731         if node == self.master_node:
1732           msg = "the master node cannot reach the master IP (not configured?)"
1733         else:
1734           msg = "cannot reach the master IP"
1735         _ErrorIf(True, constants.CV_ENODENET, node, msg)
1736
1737   def _VerifyInstance(self, instance, inst_config, node_image,
1738                       diskstatus):
1739     """Verify an instance.
1740
1741     This function checks to see if the required block devices are
1742     available on the instance's node, and that the nodes are in the correct
1743     state.
1744
1745     """
1746     _ErrorIf = self._ErrorIf # pylint: disable=C0103
1747     pnode = inst_config.primary_node
1748     pnode_img = node_image[pnode]
1749     groupinfo = self.cfg.GetAllNodeGroupsInfo()
1750
1751     node_vol_should = {}
1752     inst_config.MapLVsByNode(node_vol_should)
1753
1754     cluster = self.cfg.GetClusterInfo()
1755     ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
1756                                                             self.group_info)
1757     err = ComputeIPolicyInstanceViolation(ipolicy, inst_config, self.cfg)
1758     _ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance, utils.CommaJoin(err),
1759              code=self.ETYPE_WARNING)
1760
1761     for node in node_vol_should:
1762       n_img = node_image[node]
1763       if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1764         # ignore missing volumes on offline or broken nodes
1765         continue
1766       for volume in node_vol_should[node]:
1767         test = volume not in n_img.volumes
1768         _ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance,
1769                  "volume %s missing on node %s", volume, node)
1770
1771     if inst_config.admin_state == constants.ADMINST_UP:
1772       test = instance not in pnode_img.instances and not pnode_img.offline
1773       _ErrorIf(test, constants.CV_EINSTANCEDOWN, instance,
1774                "instance not running on its primary node %s",
1775                pnode)
1776       _ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE, instance,
1777                "instance is marked as running and lives on offline node %s",
1778                pnode)
1779
1780     diskdata = [(nname, success, status, idx)
1781                 for (nname, disks) in diskstatus.items()
1782                 for idx, (success, status) in enumerate(disks)]
1783
1784     for nname, success, bdev_status, idx in diskdata:
1785       # the 'ghost node' construction in Exec() ensures that we have a
1786       # node here
1787       snode = node_image[nname]
1788       bad_snode = snode.ghost or snode.offline
1789       _ErrorIf(inst_config.disks_active and
1790                not success and not bad_snode,
1791                constants.CV_EINSTANCEFAULTYDISK, instance,
1792                "couldn't retrieve status for disk/%s on %s: %s",
1793                idx, nname, bdev_status)
1794       _ErrorIf((inst_config.disks_active and
1795                 success and bdev_status.ldisk_status == constants.LDS_FAULTY),
1796                constants.CV_EINSTANCEFAULTYDISK, instance,
1797                "disk/%s on %s is faulty", idx, nname)
1798
1799     _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
1800              constants.CV_ENODERPC, pnode, "instance %s, connection to"
1801              " primary node failed", instance)
1802
1803     _ErrorIf(len(inst_config.secondary_nodes) > 1,
1804              constants.CV_EINSTANCELAYOUT,
1805              instance, "instance has multiple secondary nodes: %s",
1806              utils.CommaJoin(inst_config.secondary_nodes),
1807              code=self.ETYPE_WARNING)
1808
1809     if inst_config.disk_template not in constants.DTS_EXCL_STORAGE:
1810       # Disk template not compatible with exclusive_storage: no instance
1811       # node should have the flag set
1812       es_flags = rpc.GetExclusiveStorageForNodeNames(self.cfg,
1813                                                      inst_config.all_nodes)
1814       es_nodes = [n for (n, es) in es_flags.items()
1815                   if es]
1816       _ErrorIf(es_nodes, constants.CV_EINSTANCEUNSUITABLENODE, instance,
1817                "instance has template %s, which is not supported on nodes"
1818                " that have exclusive storage set: %s",
1819                inst_config.disk_template, utils.CommaJoin(es_nodes))
1820
1821     if inst_config.disk_template in constants.DTS_INT_MIRROR:
1822       instance_nodes = utils.NiceSort(inst_config.all_nodes)
1823       instance_groups = {}
1824
1825       for node in instance_nodes:
1826         instance_groups.setdefault(self.all_node_info[node].group,
1827                                    []).append(node)
1828
1829       pretty_list = [
1830         "%s (group %s)" % (utils.CommaJoin(nodes), groupinfo[group].name)
1831         # Sort so that we always list the primary node first.
1832         for group, nodes in sorted(instance_groups.items(),
1833                                    key=lambda (_, nodes): pnode in nodes,
1834                                    reverse=True)]
1835
1836       self._ErrorIf(len(instance_groups) > 1,
1837                     constants.CV_EINSTANCESPLITGROUPS,
1838                     instance, "instance has primary and secondary nodes in"
1839                     " different groups: %s", utils.CommaJoin(pretty_list),
1840                     code=self.ETYPE_WARNING)
1841
1842     inst_nodes_offline = []
1843     for snode in inst_config.secondary_nodes:
1844       s_img = node_image[snode]
1845       _ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
1846                snode, "instance %s, connection to secondary node failed",
1847                instance)
1848
1849       if s_img.offline:
1850         inst_nodes_offline.append(snode)
1851
1852     # warn that the instance lives on offline nodes
1853     _ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE, instance,
1854              "instance has offline secondary node(s) %s",
1855              utils.CommaJoin(inst_nodes_offline))
1856     # ... or ghost/non-vm_capable nodes
1857     for node in inst_config.all_nodes:
1858       _ErrorIf(node_image[node].ghost, constants.CV_EINSTANCEBADNODE,
1859                instance, "instance lives on ghost node %s", node)
1860       _ErrorIf(not node_image[node].vm_capable, constants.CV_EINSTANCEBADNODE,
1861                instance, "instance lives on non-vm_capable node %s", node)
1862
1863   def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
1864     """Verify if there are any unknown volumes in the cluster.
1865
1866     The .os, .swap and backup volumes are ignored. All other volumes are
1867     reported as unknown.
1868
1869     @type reserved: L{ganeti.utils.FieldSet}
1870     @param reserved: a FieldSet of reserved volume names
1871
1872     """
1873     for node, n_img in node_image.items():
1874       if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
1875           self.all_node_info[node].group != self.group_uuid):
1876         # skip non-healthy nodes
1877         continue
1878       for volume in n_img.volumes:
1879         test = ((node not in node_vol_should or
1880                 volume not in node_vol_should[node]) and
1881                 not reserved.Matches(volume))
1882         self._ErrorIf(test, constants.CV_ENODEORPHANLV, node,
1883                       "volume %s is unknown", volume)
1884
1885   def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
1886     """Verify N+1 Memory Resilience.
1887
1888     Check that if one single node dies we can still start all the
1889     instances it was primary for.
1890
1891     """
1892     cluster_info = self.cfg.GetClusterInfo()
1893     for node, n_img in node_image.items():
1894       # This code checks that every node which is now listed as
1895       # secondary has enough memory to host all instances it is
1896       # supposed to should a single other node in the cluster fail.
1897       # FIXME: not ready for failover to an arbitrary node
1898       # FIXME: does not support file-backed instances
1899       # WARNING: we currently take into account down instances as well
1900       # as up ones, considering that even if they're down someone
1901       # might want to start them even in the event of a node failure.
1902       if n_img.offline or self.all_node_info[node].group != self.group_uuid:
1903         # we're skipping nodes marked offline and nodes in other groups from
1904         # the N+1 warning, since most likely we don't have good memory
1905         # infromation from them; we already list instances living on such
1906         # nodes, and that's enough warning
1907         continue
1908       #TODO(dynmem): also consider ballooning out other instances
1909       for prinode, instances in n_img.sbp.items():
1910         needed_mem = 0
1911         for instance in instances:
1912           bep = cluster_info.FillBE(instance_cfg[instance])
1913           if bep[constants.BE_AUTO_BALANCE]:
1914             needed_mem += bep[constants.BE_MINMEM]
1915         test = n_img.mfree < needed_mem
1916         self._ErrorIf(test, constants.CV_ENODEN1, node,
1917                       "not enough memory to accomodate instance failovers"
1918                       " should node %s fail (%dMiB needed, %dMiB available)",
1919                       prinode, needed_mem, n_img.mfree)
1920
1921   @classmethod
1922   def _VerifyFiles(cls, errorif, nodeinfo, master_node, all_nvinfo,
1923                    (files_all, files_opt, files_mc, files_vm)):
1924     """Verifies file checksums collected from all nodes.
1925
1926     @param errorif: Callback for reporting errors
1927     @param nodeinfo: List of L{objects.Node} objects
1928     @param master_node: Name of master node
1929     @param all_nvinfo: RPC results
1930
1931     """
1932     # Define functions determining which nodes to consider for a file
1933     files2nodefn = [
1934       (files_all, None),
1935       (files_mc, lambda node: (node.master_candidate or
1936                                node.name == master_node)),
1937       (files_vm, lambda node: node.vm_capable),
1938       ]
1939
1940     # Build mapping from filename to list of nodes which should have the file
1941     nodefiles = {}
1942     for (files, fn) in files2nodefn:
1943       if fn is None:
1944         filenodes = nodeinfo
1945       else:
1946         filenodes = filter(fn, nodeinfo)
1947       nodefiles.update((filename,
1948                         frozenset(map(operator.attrgetter("name"), filenodes)))
1949                        for filename in files)
1950
1951     assert set(nodefiles) == (files_all | files_mc | files_vm)
1952
1953     fileinfo = dict((filename, {}) for filename in nodefiles)
1954     ignore_nodes = set()
1955
1956     for node in nodeinfo:
1957       if node.offline:
1958         ignore_nodes.add(node.name)
1959         continue
1960
1961       nresult = all_nvinfo[node.name]
1962
1963       if nresult.fail_msg or not nresult.payload:
1964         node_files = None
1965       else:
1966         fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
1967         node_files = dict((vcluster.LocalizeVirtualPath(key), value)
1968                           for (key, value) in fingerprints.items())
1969         del fingerprints
1970
1971       test = not (node_files and isinstance(node_files, dict))
1972       errorif(test, constants.CV_ENODEFILECHECK, node.name,
1973               "Node did not return file checksum data")
1974       if test:
1975         ignore_nodes.add(node.name)
1976         continue
1977
1978       # Build per-checksum mapping from filename to nodes having it
1979       for (filename, checksum) in node_files.items():
1980         assert filename in nodefiles
1981         fileinfo[filename].setdefault(checksum, set()).add(node.name)
1982
1983     for (filename, checksums) in fileinfo.items():
1984       assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
1985
1986       # Nodes having the file
1987       with_file = frozenset(node_name
1988                             for nodes in fileinfo[filename].values()
1989                             for node_name in nodes) - ignore_nodes
1990
1991       expected_nodes = nodefiles[filename] - ignore_nodes
1992
1993       # Nodes missing file
1994       missing_file = expected_nodes - with_file
1995
1996       if filename in files_opt:
1997         # All or no nodes
1998         errorif(missing_file and missing_file != expected_nodes,
1999                 constants.CV_ECLUSTERFILECHECK, None,
2000                 "File %s is optional, but it must exist on all or no"
2001                 " nodes (not found on %s)",
2002                 filename, utils.CommaJoin(utils.NiceSort(missing_file)))
2003       else:
2004         errorif(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2005                 "File %s is missing from node(s) %s", filename,
2006                 utils.CommaJoin(utils.NiceSort(missing_file)))
2007
2008         # Warn if a node has a file it shouldn't
2009         unexpected = with_file - expected_nodes
2010         errorif(unexpected,
2011                 constants.CV_ECLUSTERFILECHECK, None,
2012                 "File %s should not exist on node(s) %s",
2013                 filename, utils.CommaJoin(utils.NiceSort(unexpected)))
2014
2015       # See if there are multiple versions of the file
2016       test = len(checksums) > 1
2017       if test:
2018         variants = ["variant %s on %s" %
2019                     (idx + 1, utils.CommaJoin(utils.NiceSort(nodes)))
2020                     for (idx, (checksum, nodes)) in
2021                       enumerate(sorted(checksums.items()))]
2022       else:
2023         variants = []
2024
2025       errorif(test, constants.CV_ECLUSTERFILECHECK, None,
2026               "File %s found with %s different checksums (%s)",
2027               filename, len(checksums), "; ".join(variants))
2028
2029   def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2030                       drbd_map):
2031     """Verifies and the node DRBD status.
2032
2033     @type ninfo: L{objects.Node}
2034     @param ninfo: the node to check
2035     @param nresult: the remote results for the node
2036     @param instanceinfo: the dict of instances
2037     @param drbd_helper: the configured DRBD usermode helper
2038     @param drbd_map: the DRBD map as returned by
2039         L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2040
2041     """
2042     node = ninfo.name
2043     _ErrorIf = self._ErrorIf # pylint: disable=C0103
2044
2045     if drbd_helper:
2046       helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2047       test = (helper_result is None)
2048       _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2049                "no drbd usermode helper returned")
2050       if helper_result:
2051         status, payload = helper_result
2052         test = not status
2053         _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2054                  "drbd usermode helper check unsuccessful: %s", payload)
2055         test = status and (payload != drbd_helper)
2056         _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2057                  "wrong drbd usermode helper: %s", payload)
2058
2059     # compute the DRBD minors
2060     node_drbd = {}
2061     for minor, instance in drbd_map[node].items():
2062       test = instance not in instanceinfo
2063       _ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2064                "ghost instance '%s' in temporary DRBD map", instance)
2065         # ghost instance should not be running, but otherwise we
2066         # don't give double warnings (both ghost instance and
2067         # unallocated minor in use)
2068       if test:
2069         node_drbd[minor] = (instance, False)
2070       else:
2071         instance = instanceinfo[instance]
2072         node_drbd[minor] = (instance.name, instance.disks_active)
2073
2074     # and now check them
2075     used_minors = nresult.get(constants.NV_DRBDLIST, [])
2076     test = not isinstance(used_minors, (tuple, list))
2077     _ErrorIf(test, constants.CV_ENODEDRBD, node,
2078              "cannot parse drbd status file: %s", str(used_minors))
2079     if test:
2080       # we cannot check drbd status
2081       return
2082
2083     for minor, (iname, must_exist) in node_drbd.items():
2084       test = minor not in used_minors and must_exist
2085       _ErrorIf(test, constants.CV_ENODEDRBD, node,
2086                "drbd minor %d of instance %s is not active", minor, iname)
2087     for minor in used_minors:
2088       test = minor not in node_drbd
2089       _ErrorIf(test, constants.CV_ENODEDRBD, node,
2090                "unallocated drbd minor %d is in use", minor)
2091
2092   def _UpdateNodeOS(self, ninfo, nresult, nimg):
2093     """Builds the node OS structures.
2094
2095     @type ninfo: L{objects.Node}
2096     @param ninfo: the node to check
2097     @param nresult: the remote results for the node
2098     @param nimg: the node image object
2099
2100     """
2101     node = ninfo.name
2102     _ErrorIf = self._ErrorIf # pylint: disable=C0103
2103
2104     remote_os = nresult.get(constants.NV_OSLIST, None)
2105     test = (not isinstance(remote_os, list) or
2106             not compat.all(isinstance(v, list) and len(v) == 7
2107                            for v in remote_os))
2108
2109     _ErrorIf(test, constants.CV_ENODEOS, node,
2110              "node hasn't returned valid OS data")
2111
2112     nimg.os_fail = test
2113
2114     if test:
2115       return
2116
2117     os_dict = {}
2118
2119     for (name, os_path, status, diagnose,
2120          variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2121
2122       if name not in os_dict:
2123         os_dict[name] = []
2124
2125       # parameters is a list of lists instead of list of tuples due to
2126       # JSON lacking a real tuple type, fix it:
2127       parameters = [tuple(v) for v in parameters]
2128       os_dict[name].append((os_path, status, diagnose,
2129                             set(variants), set(parameters), set(api_ver)))
2130
2131     nimg.oslist = os_dict
2132
2133   def _VerifyNodeOS(self, ninfo, nimg, base):
2134     """Verifies the node OS list.
2135
2136     @type ninfo: L{objects.Node}
2137     @param ninfo: the node to check
2138     @param nimg: the node image object
2139     @param base: the 'template' node we match against (e.g. from the master)
2140
2141     """
2142     node = ninfo.name
2143     _ErrorIf = self._ErrorIf # pylint: disable=C0103
2144
2145     assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2146
2147     beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2148     for os_name, os_data in nimg.oslist.items():
2149       assert os_data, "Empty OS status for OS %s?!" % os_name
2150       f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2151       _ErrorIf(not f_status, constants.CV_ENODEOS, node,
2152                "Invalid OS %s (located at %s): %s", os_name, f_path, f_diag)
2153       _ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, node,
2154                "OS '%s' has multiple entries (first one shadows the rest): %s",
2155                os_name, utils.CommaJoin([v[0] for v in os_data]))
2156       # comparisons with the 'base' image
2157       test = os_name not in base.oslist
2158       _ErrorIf(test, constants.CV_ENODEOS, node,
2159                "Extra OS %s not present on reference node (%s)",
2160                os_name, base.name)
2161       if test:
2162         continue
2163       assert base.oslist[os_name], "Base node has empty OS status?"
2164       _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2165       if not b_status:
2166         # base OS is invalid, skipping
2167         continue
2168       for kind, a, b in [("API version", f_api, b_api),
2169                          ("variants list", f_var, b_var),
2170                          ("parameters", beautify_params(f_param),
2171                           beautify_params(b_param))]:
2172         _ErrorIf(a != b, constants.CV_ENODEOS, node,
2173                  "OS %s for %s differs from reference node %s: [%s] vs. [%s]",
2174                  kind, os_name, base.name,
2175                  utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2176
2177     # check any missing OSes
2178     missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2179     _ErrorIf(missing, constants.CV_ENODEOS, node,
2180              "OSes present on reference node %s but missing on this node: %s",
2181              base.name, utils.CommaJoin(missing))
2182
2183   def _VerifyFileStoragePaths(self, ninfo, nresult, is_master):
2184     """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2185
2186     @type ninfo: L{objects.Node}
2187     @param ninfo: the node to check
2188     @param nresult: the remote results for the node
2189     @type is_master: bool
2190     @param is_master: Whether node is the master node
2191
2192     """
2193     node = ninfo.name
2194
2195     if (is_master and
2196         (constants.ENABLE_FILE_STORAGE or
2197          constants.ENABLE_SHARED_FILE_STORAGE)):
2198       try:
2199         fspaths = nresult[constants.NV_FILE_STORAGE_PATHS]
2200       except KeyError:
2201         # This should never happen
2202         self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, node,
2203                       "Node did not return forbidden file storage paths")
2204       else:
2205         self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, node,
2206                       "Found forbidden file storage paths: %s",
2207                       utils.CommaJoin(fspaths))
2208     else:
2209       self._ErrorIf(constants.NV_FILE_STORAGE_PATHS in nresult,
2210                     constants.CV_ENODEFILESTORAGEPATHS, node,
2211                     "Node should not have returned forbidden file storage"
2212                     " paths")
2213
2214   def _VerifyOob(self, ninfo, nresult):
2215     """Verifies out of band functionality of a node.
2216
2217     @type ninfo: L{objects.Node}
2218     @param ninfo: the node to check
2219     @param nresult: the remote results for the node
2220
2221     """
2222     node = ninfo.name
2223     # We just have to verify the paths on master and/or master candidates
2224     # as the oob helper is invoked on the master
2225     if ((ninfo.master_candidate or ninfo.master_capable) and
2226         constants.NV_OOB_PATHS in nresult):
2227       for path_result in nresult[constants.NV_OOB_PATHS]:
2228         self._ErrorIf(path_result, constants.CV_ENODEOOBPATH, node, path_result)
2229
2230   def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2231     """Verifies and updates the node volume data.
2232
2233     This function will update a L{NodeImage}'s internal structures
2234     with data from the remote call.
2235
2236     @type ninfo: L{objects.Node}
2237     @param ninfo: the node to check
2238     @param nresult: the remote results for the node
2239     @param nimg: the node image object
2240     @param vg_name: the configured VG name
2241
2242     """
2243     node = ninfo.name
2244     _ErrorIf = self._ErrorIf # pylint: disable=C0103
2245
2246     nimg.lvm_fail = True
2247     lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2248     if vg_name is None:
2249       pass
2250     elif isinstance(lvdata, basestring):
2251       _ErrorIf(True, constants.CV_ENODELVM, node, "LVM problem on node: %s",
2252                utils.SafeEncode(lvdata))
2253     elif not isinstance(lvdata, dict):
2254       _ErrorIf(True, constants.CV_ENODELVM, node,
2255                "rpc call to node failed (lvlist)")
2256     else:
2257       nimg.volumes = lvdata
2258       nimg.lvm_fail = False
2259
2260   def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2261     """Verifies and updates the node instance list.
2262
2263     If the listing was successful, then updates this node's instance
2264     list. Otherwise, it marks the RPC call as failed for the instance
2265     list key.
2266
2267     @type ninfo: L{objects.Node}
2268     @param ninfo: the node to check
2269     @param nresult: the remote results for the node
2270     @param nimg: the node image object
2271
2272     """
2273     idata = nresult.get(constants.NV_INSTANCELIST, None)
2274     test = not isinstance(idata, list)
2275     self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2276                   "rpc call to node failed (instancelist): %s",
2277                   utils.SafeEncode(str(idata)))
2278     if test:
2279       nimg.hyp_fail = True
2280     else:
2281       nimg.instances = idata
2282
2283   def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2284     """Verifies and computes a node information map
2285
2286     @type ninfo: L{objects.Node}
2287     @param ninfo: the node to check
2288     @param nresult: the remote results for the node
2289     @param nimg: the node image object
2290     @param vg_name: the configured VG name
2291
2292     """
2293     node = ninfo.name
2294     _ErrorIf = self._ErrorIf # pylint: disable=C0103
2295
2296     # try to read free memory (from the hypervisor)
2297     hv_info = nresult.get(constants.NV_HVINFO, None)
2298     test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2299     _ErrorIf(test, constants.CV_ENODEHV, node,
2300              "rpc call to node failed (hvinfo)")
2301     if not test:
2302       try:
2303         nimg.mfree = int(hv_info["memory_free"])
2304       except (ValueError, TypeError):
2305         _ErrorIf(True, constants.CV_ENODERPC, node,
2306                  "node returned invalid nodeinfo, check hypervisor")
2307
2308     # FIXME: devise a free space model for file based instances as well
2309     if vg_name is not None:
2310       test = (constants.NV_VGLIST not in nresult or
2311               vg_name not in nresult[constants.NV_VGLIST])
2312       _ErrorIf(test, constants.CV_ENODELVM, node,
2313                "node didn't return data for the volume group '%s'"
2314                " - it is either missing or broken", vg_name)
2315       if not test:
2316         try:
2317           nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2318         except (ValueError, TypeError):
2319           _ErrorIf(True, constants.CV_ENODERPC, node,
2320                    "node returned invalid LVM info, check LVM status")
2321
2322   def _CollectDiskInfo(self, nodelist, node_image, instanceinfo):
2323     """Gets per-disk status information for all instances.
2324
2325     @type nodelist: list of strings
2326     @param nodelist: Node names
2327     @type node_image: dict of (name, L{objects.Node})
2328     @param node_image: Node objects
2329     @type instanceinfo: dict of (name, L{objects.Instance})
2330     @param instanceinfo: Instance objects
2331     @rtype: {instance: {node: [(succes, payload)]}}
2332     @return: a dictionary of per-instance dictionaries with nodes as
2333         keys and disk information as values; the disk information is a
2334         list of tuples (success, payload)
2335
2336     """
2337     _ErrorIf = self._ErrorIf # pylint: disable=C0103
2338
2339     node_disks = {}
2340     node_disks_devonly = {}
2341     diskless_instances = set()
2342     diskless = constants.DT_DISKLESS
2343
2344     for nname in nodelist:
2345       node_instances = list(itertools.chain(node_image[nname].pinst,
2346                                             node_image[nname].sinst))
2347       diskless_instances.update(inst for inst in node_instances
2348                                 if instanceinfo[inst].disk_template == diskless)
2349       disks = [(inst, disk)
2350                for inst in node_instances
2351                for disk in instanceinfo[inst].disks]
2352
2353       if not disks:
2354         # No need to collect data
2355         continue
2356
2357       node_disks[nname] = disks
2358
2359       # _AnnotateDiskParams makes already copies of the disks
2360       devonly = []
2361       for (inst, dev) in disks:
2362         (anno_disk,) = AnnotateDiskParams(instanceinfo[inst], [dev], self.cfg)
2363         self.cfg.SetDiskID(anno_disk, nname)
2364         devonly.append(anno_disk)
2365
2366       node_disks_devonly[nname] = devonly
2367
2368     assert len(node_disks) == len(node_disks_devonly)
2369
2370     # Collect data from all nodes with disks
2371     result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2372                                                           node_disks_devonly)
2373
2374     assert len(result) == len(node_disks)
2375
2376     instdisk = {}
2377
2378     for (nname, nres) in result.items():
2379       disks = node_disks[nname]
2380
2381       if nres.offline:
2382         # No data from this node
2383         data = len(disks) * [(False, "node offline")]
2384       else:
2385         msg = nres.fail_msg
2386         _ErrorIf(msg, constants.CV_ENODERPC, nname,
2387                  "while getting disk information: %s", msg)
2388         if msg:
2389           # No data from this node
2390           data = len(disks) * [(False, msg)]
2391         else:
2392           data = []
2393           for idx, i in enumerate(nres.payload):
2394             if isinstance(i, (tuple, list)) and len(i) == 2:
2395               data.append(i)
2396             else:
2397               logging.warning("Invalid result from node %s, entry %d: %s",
2398                               nname, idx, i)
2399               data.append((False, "Invalid result from the remote node"))
2400
2401       for ((inst, _), status) in zip(disks, data):
2402         instdisk.setdefault(inst, {}).setdefault(nname, []).append(status)
2403
2404     # Add empty entries for diskless instances.
2405     for inst in diskless_instances:
2406       assert inst not in instdisk
2407       instdisk[inst] = {}
2408
2409     assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2410                       len(nnames) <= len(instanceinfo[inst].all_nodes) and
2411                       compat.all(isinstance(s, (tuple, list)) and
2412                                  len(s) == 2 for s in statuses)
2413                       for inst, nnames in instdisk.items()
2414                       for nname, statuses in nnames.items())
2415     if __debug__:
2416       instdisk_keys = set(instdisk)
2417       instanceinfo_keys = set(instanceinfo)
2418       assert instdisk_keys == instanceinfo_keys, \
2419         ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2420          (instdisk_keys, instanceinfo_keys))
2421
2422     return instdisk
2423
2424   @staticmethod
2425   def _SshNodeSelector(group_uuid, all_nodes):
2426     """Create endless iterators for all potential SSH check hosts.
2427
2428     """
2429     nodes = [node for node in all_nodes
2430              if (node.group != group_uuid and
2431                  not node.offline)]
2432     keyfunc = operator.attrgetter("group")
2433
2434     return map(itertools.cycle,
2435                [sorted(map(operator.attrgetter("name"), names))
2436                 for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2437                                                   keyfunc)])
2438
2439   @classmethod
2440   def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2441     """Choose which nodes should talk to which other nodes.
2442
2443     We will make nodes contact all nodes in their group, and one node from
2444     every other group.
2445
2446     @warning: This algorithm has a known issue if one node group is much
2447       smaller than others (e.g. just one node). In such a case all other
2448       nodes will talk to the single node.
2449
2450     """
2451     online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2452     sel = cls._SshNodeSelector(group_uuid, all_nodes)
2453
2454     return (online_nodes,
2455             dict((name, sorted([i.next() for i in sel]))
2456                  for name in online_nodes))
2457
2458   def BuildHooksEnv(self):
2459     """Build hooks env.
2460
2461     Cluster-Verify hooks just ran in the post phase and their failure makes
2462     the output be logged in the verify output and the verification to fail.
2463
2464     """
2465     env = {
2466       "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2467       }
2468
2469     env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2470                for node in self.my_node_info.values())
2471
2472     return env
2473
2474   def BuildHooksNodes(self):
2475     """Build hooks nodes.
2476
2477     """
2478     return ([], self.my_node_names)
2479
2480   def Exec(self, feedback_fn):
2481     """Verify integrity of the node group, performing various test on nodes.
2482
2483     """
2484     # This method has too many local variables. pylint: disable=R0914
2485     feedback_fn("* Verifying group '%s'" % self.group_info.name)
2486
2487     if not self.my_node_names:
2488       # empty node group
2489       feedback_fn("* Empty node group, skipping verification")
2490       return True
2491
2492     self.bad = False
2493     _ErrorIf = self._ErrorIf # pylint: disable=C0103
2494     verbose = self.op.verbose
2495     self._feedback_fn = feedback_fn
2496
2497     vg_name = self.cfg.GetVGName()
2498     drbd_helper = self.cfg.GetDRBDHelper()
2499     cluster = self.cfg.GetClusterInfo()
2500     hypervisors = cluster.enabled_hypervisors
2501     node_data_list = [self.my_node_info[name] for name in self.my_node_names]
2502
2503     i_non_redundant = [] # Non redundant instances
2504     i_non_a_balanced = [] # Non auto-balanced instances
2505     i_offline = 0 # Count of offline instances
2506     n_offline = 0 # Count of offline nodes
2507     n_drained = 0 # Count of nodes being drained
2508     node_vol_should = {}
2509
2510     # FIXME: verify OS list
2511
2512     # File verification
2513     filemap = ComputeAncillaryFiles(cluster, False)
2514
2515     # do local checksums
2516     master_node = self.master_node = self.cfg.GetMasterNode()
2517     master_ip = self.cfg.GetMasterIP()
2518
2519     feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_names))
2520
2521     user_scripts = []
2522     if self.cfg.GetUseExternalMipScript():
2523       user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2524
2525     node_verify_param = {
2526       constants.NV_FILELIST:
2527         map(vcluster.MakeVirtualPath,
2528             utils.UniqueSequence(filename
2529                                  for files in filemap
2530                                  for filename in files)),
2531       constants.NV_NODELIST:
2532         self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2533                                   self.all_node_info.values()),
2534       constants.NV_HYPERVISOR: hypervisors,
2535       constants.NV_HVPARAMS:
2536         _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2537       constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2538                                  for node in node_data_list
2539                                  if not node.offline],
2540       constants.NV_INSTANCELIST: hypervisors,
2541       constants.NV_VERSION: None,
2542       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2543       constants.NV_NODESETUP: None,
2544       constants.NV_TIME: None,
2545       constants.NV_MASTERIP: (master_node, master_ip),
2546       constants.NV_OSLIST: None,
2547       constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2548       constants.NV_USERSCRIPTS: user_scripts,
2549       }
2550
2551     if vg_name is not None:
2552       node_verify_param[constants.NV_VGLIST] = None
2553       node_verify_param[constants.NV_LVLIST] = vg_name
2554       node_verify_param[constants.NV_PVLIST] = [vg_name]
2555
2556     if drbd_helper:
2557       node_verify_param[constants.NV_DRBDLIST] = None
2558       node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2559
2560     if constants.ENABLE_FILE_STORAGE or constants.ENABLE_SHARED_FILE_STORAGE:
2561       # Load file storage paths only from master node
2562       node_verify_param[constants.NV_FILE_STORAGE_PATHS] = master_node
2563
2564     # bridge checks
2565     # FIXME: this needs to be changed per node-group, not cluster-wide
2566     bridges = set()
2567     default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2568     if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2569       bridges.add(default_nicpp[constants.NIC_LINK])
2570     for instance in self.my_inst_info.values():
2571       for nic in instance.nics:
2572         full_nic = cluster.SimpleFillNIC(nic.nicparams)
2573         if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2574           bridges.add(full_nic[constants.NIC_LINK])
2575
2576     if bridges:
2577       node_verify_param[constants.NV_BRIDGES] = list(bridges)
2578
2579     # Build our expected cluster state
2580     node_image = dict((node.name, self.NodeImage(offline=node.offline,
2581                                                  name=node.name,
2582                                                  vm_capable=node.vm_capable))
2583                       for node in node_data_list)
2584
2585     # Gather OOB paths
2586     oob_paths = []
2587     for node in self.all_node_info.values():
2588       path = SupportsOob(self.cfg, node)
2589       if path and path not in oob_paths:
2590         oob_paths.append(path)
2591
2592     if oob_paths:
2593       node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2594
2595     for instance in self.my_inst_names:
2596       inst_config = self.my_inst_info[instance]
2597       if inst_config.admin_state == constants.ADMINST_OFFLINE:
2598         i_offline += 1
2599
2600       for nname in inst_config.all_nodes:
2601         if nname not in node_image:
2602           gnode = self.NodeImage(name=nname)
2603           gnode.ghost = (nname not in self.all_node_info)
2604           node_image[nname] = gnode
2605
2606       inst_config.MapLVsByNode(node_vol_should)
2607
2608       pnode = inst_config.primary_node
2609       node_image[pnode].pinst.append(instance)
2610
2611       for snode in inst_config.secondary_nodes:
2612         nimg = node_image[snode]
2613         nimg.sinst.append(instance)
2614         if pnode not in nimg.sbp:
2615           nimg.sbp[pnode] = []
2616         nimg.sbp[pnode].append(instance)
2617
2618     es_flags = rpc.GetExclusiveStorageForNodeNames(self.cfg, self.my_node_names)
2619     # The value of exclusive_storage should be the same across the group, so if
2620     # it's True for at least a node, we act as if it were set for all the nodes
2621     self._exclusive_storage = compat.any(es_flags.values())
2622     if self._exclusive_storage:
2623       node_verify_param[constants.NV_EXCLUSIVEPVS] = True
2624
2625     # At this point, we have the in-memory data structures complete,
2626     # except for the runtime information, which we'll gather next
2627
2628     # Due to the way our RPC system works, exact response times cannot be
2629     # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2630     # time before and after executing the request, we can at least have a time
2631     # window.
2632     nvinfo_starttime = time.time()
2633     all_nvinfo = self.rpc.call_node_verify(self.my_node_names,
2634                                            node_verify_param,
2635                                            self.cfg.GetClusterName())
2636     nvinfo_endtime = time.time()
2637
2638     if self.extra_lv_nodes and vg_name is not None:
2639       extra_lv_nvinfo = \
2640           self.rpc.call_node_verify(self.extra_lv_nodes,
2641                                     {constants.NV_LVLIST: vg_name},
2642                                     self.cfg.GetClusterName())
2643     else:
2644       extra_lv_nvinfo = {}
2645
2646     all_drbd_map = self.cfg.ComputeDRBDMap()
2647
2648     feedback_fn("* Gathering disk information (%s nodes)" %
2649                 len(self.my_node_names))
2650     instdisk = self._CollectDiskInfo(self.my_node_names, node_image,
2651                                      self.my_inst_info)
2652
2653     feedback_fn("* Verifying configuration file consistency")
2654
2655     # If not all nodes are being checked, we need to make sure the master node
2656     # and a non-checked vm_capable node are in the list.
2657     absent_nodes = set(self.all_node_info).difference(self.my_node_info)
2658     if absent_nodes:
2659       vf_nvinfo = all_nvinfo.copy()
2660       vf_node_info = list(self.my_node_info.values())
2661       additional_nodes = []
2662       if master_node not in self.my_node_info:
2663         additional_nodes.append(master_node)
2664         vf_node_info.append(self.all_node_info[master_node])
2665       # Add the first vm_capable node we find which is not included,
2666       # excluding the master node (which we already have)
2667       for node in absent_nodes:
2668         nodeinfo = self.all_node_info[node]
2669         if (nodeinfo.vm_capable and not nodeinfo.offline and
2670             node != master_node):
2671           additional_nodes.append(node)
2672           vf_node_info.append(self.all_node_info[node])
2673           break
2674       key = constants.NV_FILELIST
2675       vf_nvinfo.update(self.rpc.call_node_verify(additional_nodes,
2676                                                  {key: node_verify_param[key]},
2677                                                  self.cfg.GetClusterName()))
2678     else:
2679       vf_nvinfo = all_nvinfo
2680       vf_node_info = self.my_node_info.values()
2681
2682     self._VerifyFiles(_ErrorIf, vf_node_info, master_node, vf_nvinfo, filemap)
2683
2684     feedback_fn("* Verifying node status")
2685
2686     refos_img = None
2687
2688     for node_i in node_data_list:
2689       node = node_i.name
2690       nimg = node_image[node]
2691
2692       if node_i.offline:
2693         if verbose:
2694           feedback_fn("* Skipping offline node %s" % (node,))
2695         n_offline += 1
2696         continue
2697
2698       if node == master_node:
2699         ntype = "master"
2700       elif node_i.master_candidate:
2701         ntype = "master candidate"
2702       elif node_i.drained:
2703         ntype = "drained"
2704         n_drained += 1
2705       else:
2706         ntype = "regular"
2707       if verbose:
2708         feedback_fn("* Verifying node %s (%s)" % (node, ntype))
2709
2710       msg = all_nvinfo[node].fail_msg
2711       _ErrorIf(msg, constants.CV_ENODERPC, node, "while contacting node: %s",
2712                msg)
2713       if msg:
2714         nimg.rpc_fail = True
2715         continue
2716
2717       nresult = all_nvinfo[node].payload
2718
2719       nimg.call_ok = self._VerifyNode(node_i, nresult)
2720       self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2721       self._VerifyNodeNetwork(node_i, nresult)
2722       self._VerifyNodeUserScripts(node_i, nresult)
2723       self._VerifyOob(node_i, nresult)
2724       self._VerifyFileStoragePaths(node_i, nresult,
2725                                    node == master_node)
2726
2727       if nimg.vm_capable:
2728         self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
2729         self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
2730                              all_drbd_map)
2731
2732         self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2733         self._UpdateNodeInstances(node_i, nresult, nimg)
2734         self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2735         self._UpdateNodeOS(node_i, nresult, nimg)
2736
2737         if not nimg.os_fail:
2738           if refos_img is None:
2739             refos_img = nimg
2740           self._VerifyNodeOS(node_i, nimg, refos_img)
2741         self._VerifyNodeBridges(node_i, nresult, bridges)
2742
2743         # Check whether all running instancies are primary for the node. (This
2744         # can no longer be done from _VerifyInstance below, since some of the
2745         # wrong instances could be from other node groups.)
2746         non_primary_inst = set(nimg.instances).difference(nimg.pinst)
2747
2748         for inst in non_primary_inst:
2749           test = inst in self.all_inst_info
2750           _ErrorIf(test, constants.CV_EINSTANCEWRONGNODE, inst,
2751                    "instance should not run on node %s", node_i.name)
2752           _ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
2753                    "node is running unknown instance %s", inst)
2754
2755     self._VerifyGroupLVM(node_image, vg_name)
2756
2757     for node, result in extra_lv_nvinfo.items():
2758       self._UpdateNodeVolumes(self.all_node_info[node], result.payload,
2759                               node_image[node], vg_name)
2760
2761     feedback_fn("* Verifying instance status")
2762     for instance in self.my_inst_names:
2763       if verbose:
2764         feedback_fn("* Verifying instance %s" % instance)
2765       inst_config = self.my_inst_info[instance]
2766       self._VerifyInstance(instance, inst_config, node_image,
2767                            instdisk[instance])
2768
2769       # If the instance is non-redundant we cannot survive losing its primary
2770       # node, so we are not N+1 compliant.
2771       if inst_config.disk_template not in constants.DTS_MIRRORED:
2772         i_non_redundant.append(instance)
2773
2774       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
2775         i_non_a_balanced.append(instance)
2776
2777     feedback_fn("* Verifying orphan volumes")
2778     reserved = utils.FieldSet(*cluster.reserved_lvs)
2779
2780     # We will get spurious "unknown volume" warnings if any node of this group
2781     # is secondary for an instance whose primary is in another group. To avoid
2782     # them, we find these instances and add their volumes to node_vol_should.
2783     for inst in self.all_inst_info.values():
2784       for secondary in inst.secondary_nodes:
2785         if (secondary in self.my_node_info
2786             and inst.name not in self.my_inst_info):
2787           inst.MapLVsByNode(node_vol_should)
2788           break
2789
2790     self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
2791
2792     if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
2793       feedback_fn("* Verifying N+1 Memory redundancy")
2794       self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
2795
2796     feedback_fn("* Other Notes")
2797     if i_non_redundant:
2798       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
2799                   % len(i_non_redundant))
2800
2801     if i_non_a_balanced:
2802       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
2803                   % len(i_non_a_balanced))
2804
2805     if i_offline:
2806       feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
2807
2808     if n_offline:
2809       feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
2810
2811     if n_drained:
2812       feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
2813
2814     return not self.bad
2815
2816   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
2817     """Analyze the post-hooks' result
2818
2819     This method analyses the hook result, handles it, and sends some
2820     nicely-formatted feedback back to the user.
2821
2822     @param phase: one of L{constants.HOOKS_PHASE_POST} or
2823         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
2824     @param hooks_results: the results of the multi-node hooks rpc call
2825     @param feedback_fn: function used send feedback back to the caller
2826     @param lu_result: previous Exec result
2827     @return: the new Exec result, based on the previous result
2828         and hook results
2829
2830     """
2831     # We only really run POST phase hooks, only for non-empty groups,
2832     # and are only interested in their results
2833     if not self.my_node_names:
2834       # empty node group
2835       pass
2836     elif phase == constants.HOOKS_PHASE_POST:
2837       # Used to change hooks' output to proper indentation
2838       feedback_fn("* Hooks Results")
2839       assert hooks_results, "invalid result from hooks"
2840
2841       for node_name in hooks_results:
2842         res = hooks_results[node_name]
2843         msg = res.fail_msg
2844         test = msg and not res.offline
2845         self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
2846                       "Communication failure in hooks execution: %s", msg)
2847         if res.offline or msg:
2848           # No need to investigate payload if node is offline or gave
2849           # an error.
2850           continue
2851         for script, hkr, output in res.payload:
2852           test = hkr == constants.HKR_FAIL
2853           self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
2854                         "Script %s failed, output:", script)
2855           if test:
2856             output = self._HOOKS_INDENT_RE.sub("      ", output)
2857             feedback_fn("%s" % output)
2858             lu_result = False
2859
2860     return lu_result
2861
2862
2863 class LUClusterVerifyDisks(NoHooksLU):
2864   """Verifies the cluster disks status.
2865
2866   """
2867   REQ_BGL = False
2868
2869   def ExpandNames(self):
2870     self.share_locks = ShareAll()
2871     self.needed_locks = {
2872       locking.LEVEL_NODEGROUP: locking.ALL_SET,
2873       }
2874
2875   def Exec(self, feedback_fn):
2876     group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
2877
2878     # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
2879     return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
2880                            for group in group_names])