Fix reference to NodeInfo.name
[ganeti-local] / lib / cmdlib / cluster.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Logical units dealing with the cluster."""
23
24 import OpenSSL
25
26 import copy
27 import itertools
28 import logging
29 import operator
30 import os
31 import re
32 import time
33
34 from ganeti import compat
35 from ganeti import constants
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import masterd
40 from ganeti import netutils
41 from ganeti import objects
42 from ganeti import opcodes
43 from ganeti import pathutils
44 from ganeti import query
45 from ganeti import rpc
46 from ganeti import runtime
47 from ganeti import ssh
48 from ganeti import uidpool
49 from ganeti import utils
50 from ganeti import vcluster
51
52 from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
53   ResultWithJobs
54 from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55   ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56   GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57   GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58   CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59   ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob
60
61 import ganeti.masterd.instance
62
63
64 class LUClusterActivateMasterIp(NoHooksLU):
65   """Activate the master IP on the master node.
66
67   """
68   def Exec(self, feedback_fn):
69     """Activate the master IP.
70
71     """
72     master_params = self.cfg.GetMasterNetworkParameters()
73     ems = self.cfg.GetUseExternalMipScript()
74     result = self.rpc.call_node_activate_master_ip(master_params.uuid,
75                                                    master_params, ems)
76     result.Raise("Could not activate the master IP")
77
78
79 class LUClusterDeactivateMasterIp(NoHooksLU):
80   """Deactivate the master IP on the master node.
81
82   """
83   def Exec(self, feedback_fn):
84     """Deactivate the master IP.
85
86     """
87     master_params = self.cfg.GetMasterNetworkParameters()
88     ems = self.cfg.GetUseExternalMipScript()
89     result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
90                                                      master_params, ems)
91     result.Raise("Could not deactivate the master IP")
92
93
94 class LUClusterConfigQuery(NoHooksLU):
95   """Return configuration values.
96
97   """
98   REQ_BGL = False
99
100   def CheckArguments(self):
101     self.cq = ClusterQuery(None, self.op.output_fields, False)
102
103   def ExpandNames(self):
104     self.cq.ExpandNames(self)
105
106   def DeclareLocks(self, level):
107     self.cq.DeclareLocks(self, level)
108
109   def Exec(self, feedback_fn):
110     result = self.cq.OldStyleQuery(self)
111
112     assert len(result) == 1
113
114     return result[0]
115
116
117 class LUClusterDestroy(LogicalUnit):
118   """Logical unit for destroying the cluster.
119
120   """
121   HPATH = "cluster-destroy"
122   HTYPE = constants.HTYPE_CLUSTER
123
124   def BuildHooksEnv(self):
125     """Build hooks env.
126
127     """
128     return {
129       "OP_TARGET": self.cfg.GetClusterName(),
130       }
131
132   def BuildHooksNodes(self):
133     """Build hooks nodes.
134
135     """
136     return ([], [])
137
138   def CheckPrereq(self):
139     """Check prerequisites.
140
141     This checks whether the cluster is empty.
142
143     Any errors are signaled by raising errors.OpPrereqError.
144
145     """
146     master = self.cfg.GetMasterNode()
147
148     nodelist = self.cfg.GetNodeList()
149     if len(nodelist) != 1 or nodelist[0] != master:
150       raise errors.OpPrereqError("There are still %d node(s) in"
151                                  " this cluster." % (len(nodelist) - 1),
152                                  errors.ECODE_INVAL)
153     instancelist = self.cfg.GetInstanceList()
154     if instancelist:
155       raise errors.OpPrereqError("There are still %d instance(s) in"
156                                  " this cluster." % len(instancelist),
157                                  errors.ECODE_INVAL)
158
159   def Exec(self, feedback_fn):
160     """Destroys the cluster.
161
162     """
163     master_params = self.cfg.GetMasterNetworkParameters()
164
165     # Run post hooks on master node before it's removed
166     RunPostHook(self, self.cfg.GetNodeName(master_params.uuid))
167
168     ems = self.cfg.GetUseExternalMipScript()
169     result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
170                                                      master_params, ems)
171     result.Warn("Error disabling the master IP address", self.LogWarning)
172     return master_params.uuid
173
174
175 class LUClusterPostInit(LogicalUnit):
176   """Logical unit for running hooks after cluster initialization.
177
178   """
179   HPATH = "cluster-init"
180   HTYPE = constants.HTYPE_CLUSTER
181
182   def BuildHooksEnv(self):
183     """Build hooks env.
184
185     """
186     return {
187       "OP_TARGET": self.cfg.GetClusterName(),
188       }
189
190   def BuildHooksNodes(self):
191     """Build hooks nodes.
192
193     """
194     return ([], [self.cfg.GetMasterNode()])
195
196   def Exec(self, feedback_fn):
197     """Nothing to do.
198
199     """
200     return True
201
202
203 class ClusterQuery(QueryBase):
204   FIELDS = query.CLUSTER_FIELDS
205
206   #: Do not sort (there is only one item)
207   SORT_FIELD = None
208
209   def ExpandNames(self, lu):
210     lu.needed_locks = {}
211
212     # The following variables interact with _QueryBase._GetNames
213     self.wanted = locking.ALL_SET
214     self.do_locking = self.use_locking
215
216     if self.do_locking:
217       raise errors.OpPrereqError("Can not use locking for cluster queries",
218                                  errors.ECODE_INVAL)
219
220   def DeclareLocks(self, lu, level):
221     pass
222
223   def _GetQueryData(self, lu):
224     """Computes the list of nodes and their attributes.
225
226     """
227     # Locking is not used
228     assert not (compat.any(lu.glm.is_owned(level)
229                            for level in locking.LEVELS
230                            if level != locking.LEVEL_CLUSTER) or
231                 self.do_locking or self.use_locking)
232
233     if query.CQ_CONFIG in self.requested_data:
234       cluster = lu.cfg.GetClusterInfo()
235       nodes = lu.cfg.GetAllNodesInfo()
236     else:
237       cluster = NotImplemented
238       nodes = NotImplemented
239
240     if query.CQ_QUEUE_DRAINED in self.requested_data:
241       drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
242     else:
243       drain_flag = NotImplemented
244
245     if query.CQ_WATCHER_PAUSE in self.requested_data:
246       master_node_uuid = lu.cfg.GetMasterNode()
247
248       result = lu.rpc.call_get_watcher_pause(master_node_uuid)
249       result.Raise("Can't retrieve watcher pause from master node '%s'" %
250                    lu.cfg.GetMasterNodeName())
251
252       watcher_pause = result.payload
253     else:
254       watcher_pause = NotImplemented
255
256     return query.ClusterQueryData(cluster, nodes, drain_flag, watcher_pause)
257
258
259 class LUClusterQuery(NoHooksLU):
260   """Query cluster configuration.
261
262   """
263   REQ_BGL = False
264
265   def ExpandNames(self):
266     self.needed_locks = {}
267
268   def Exec(self, feedback_fn):
269     """Return cluster config.
270
271     """
272     cluster = self.cfg.GetClusterInfo()
273     os_hvp = {}
274
275     # Filter just for enabled hypervisors
276     for os_name, hv_dict in cluster.os_hvp.items():
277       os_hvp[os_name] = {}
278       for hv_name, hv_params in hv_dict.items():
279         if hv_name in cluster.enabled_hypervisors:
280           os_hvp[os_name][hv_name] = hv_params
281
282     # Convert ip_family to ip_version
283     primary_ip_version = constants.IP4_VERSION
284     if cluster.primary_ip_family == netutils.IP6Address.family:
285       primary_ip_version = constants.IP6_VERSION
286
287     result = {
288       "software_version": constants.RELEASE_VERSION,
289       "protocol_version": constants.PROTOCOL_VERSION,
290       "config_version": constants.CONFIG_VERSION,
291       "os_api_version": max(constants.OS_API_VERSIONS),
292       "export_version": constants.EXPORT_VERSION,
293       "architecture": runtime.GetArchInfo(),
294       "name": cluster.cluster_name,
295       "master": self.cfg.GetMasterNodeName(),
296       "default_hypervisor": cluster.primary_hypervisor,
297       "enabled_hypervisors": cluster.enabled_hypervisors,
298       "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
299                         for hypervisor_name in cluster.enabled_hypervisors]),
300       "os_hvp": os_hvp,
301       "beparams": cluster.beparams,
302       "osparams": cluster.osparams,
303       "ipolicy": cluster.ipolicy,
304       "nicparams": cluster.nicparams,
305       "ndparams": cluster.ndparams,
306       "diskparams": cluster.diskparams,
307       "candidate_pool_size": cluster.candidate_pool_size,
308       "master_netdev": cluster.master_netdev,
309       "master_netmask": cluster.master_netmask,
310       "use_external_mip_script": cluster.use_external_mip_script,
311       "volume_group_name": cluster.volume_group_name,
312       "drbd_usermode_helper": cluster.drbd_usermode_helper,
313       "file_storage_dir": cluster.file_storage_dir,
314       "shared_file_storage_dir": cluster.shared_file_storage_dir,
315       "maintain_node_health": cluster.maintain_node_health,
316       "ctime": cluster.ctime,
317       "mtime": cluster.mtime,
318       "uuid": cluster.uuid,
319       "tags": list(cluster.GetTags()),
320       "uid_pool": cluster.uid_pool,
321       "default_iallocator": cluster.default_iallocator,
322       "reserved_lvs": cluster.reserved_lvs,
323       "primary_ip_version": primary_ip_version,
324       "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
325       "hidden_os": cluster.hidden_os,
326       "blacklisted_os": cluster.blacklisted_os,
327       }
328
329     return result
330
331
332 class LUClusterRedistConf(NoHooksLU):
333   """Force the redistribution of cluster configuration.
334
335   This is a very simple LU.
336
337   """
338   REQ_BGL = False
339
340   def ExpandNames(self):
341     self.needed_locks = {
342       locking.LEVEL_NODE: locking.ALL_SET,
343       locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
344     }
345     self.share_locks = ShareAll()
346
347   def Exec(self, feedback_fn):
348     """Redistribute the configuration.
349
350     """
351     self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
352     RedistributeAncillaryFiles(self)
353
354
355 class LUClusterRename(LogicalUnit):
356   """Rename the cluster.
357
358   """
359   HPATH = "cluster-rename"
360   HTYPE = constants.HTYPE_CLUSTER
361
362   def BuildHooksEnv(self):
363     """Build hooks env.
364
365     """
366     return {
367       "OP_TARGET": self.cfg.GetClusterName(),
368       "NEW_NAME": self.op.name,
369       }
370
371   def BuildHooksNodes(self):
372     """Build hooks nodes.
373
374     """
375     return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
376
377   def CheckPrereq(self):
378     """Verify that the passed name is a valid one.
379
380     """
381     hostname = netutils.GetHostname(name=self.op.name,
382                                     family=self.cfg.GetPrimaryIPFamily())
383
384     new_name = hostname.name
385     self.ip = new_ip = hostname.ip
386     old_name = self.cfg.GetClusterName()
387     old_ip = self.cfg.GetMasterIP()
388     if new_name == old_name and new_ip == old_ip:
389       raise errors.OpPrereqError("Neither the name nor the IP address of the"
390                                  " cluster has changed",
391                                  errors.ECODE_INVAL)
392     if new_ip != old_ip:
393       if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
394         raise errors.OpPrereqError("The given cluster IP address (%s) is"
395                                    " reachable on the network" %
396                                    new_ip, errors.ECODE_NOTUNIQUE)
397
398     self.op.name = new_name
399
400   def Exec(self, feedback_fn):
401     """Rename the cluster.
402
403     """
404     clustername = self.op.name
405     new_ip = self.ip
406
407     # shutdown the master IP
408     master_params = self.cfg.GetMasterNetworkParameters()
409     ems = self.cfg.GetUseExternalMipScript()
410     result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
411                                                      master_params, ems)
412     result.Raise("Could not disable the master role")
413
414     try:
415       cluster = self.cfg.GetClusterInfo()
416       cluster.cluster_name = clustername
417       cluster.master_ip = new_ip
418       self.cfg.Update(cluster, feedback_fn)
419
420       # update the known hosts file
421       ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
422       node_list = self.cfg.GetOnlineNodeList()
423       try:
424         node_list.remove(master_params.uuid)
425       except ValueError:
426         pass
427       UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
428     finally:
429       master_params.ip = new_ip
430       result = self.rpc.call_node_activate_master_ip(master_params.uuid,
431                                                      master_params, ems)
432       result.Warn("Could not re-enable the master role on the master,"
433                   " please restart manually", self.LogWarning)
434
435     return clustername
436
437
438 class LUClusterRepairDiskSizes(NoHooksLU):
439   """Verifies the cluster disks sizes.
440
441   """
442   REQ_BGL = False
443
444   def ExpandNames(self):
445     if self.op.instances:
446       self.wanted_names = GetWantedInstances(self, self.op.instances)
447       # Not getting the node allocation lock as only a specific set of
448       # instances (and their nodes) is going to be acquired
449       self.needed_locks = {
450         locking.LEVEL_NODE_RES: [],
451         locking.LEVEL_INSTANCE: self.wanted_names,
452         }
453       self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
454     else:
455       self.wanted_names = None
456       self.needed_locks = {
457         locking.LEVEL_NODE_RES: locking.ALL_SET,
458         locking.LEVEL_INSTANCE: locking.ALL_SET,
459
460         # This opcode is acquires the node locks for all instances
461         locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
462         }
463
464     self.share_locks = {
465       locking.LEVEL_NODE_RES: 1,
466       locking.LEVEL_INSTANCE: 0,
467       locking.LEVEL_NODE_ALLOC: 1,
468       }
469
470   def DeclareLocks(self, level):
471     if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
472       self._LockInstancesNodes(primary_only=True, level=level)
473
474   def CheckPrereq(self):
475     """Check prerequisites.
476
477     This only checks the optional instance list against the existing names.
478
479     """
480     if self.wanted_names is None:
481       self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
482
483     self.wanted_instances = \
484         map(compat.snd, self.cfg.GetMultiInstanceInfo(self.wanted_names))
485
486   def _EnsureChildSizes(self, disk):
487     """Ensure children of the disk have the needed disk size.
488
489     This is valid mainly for DRBD8 and fixes an issue where the
490     children have smaller disk size.
491
492     @param disk: an L{ganeti.objects.Disk} object
493
494     """
495     if disk.dev_type == constants.LD_DRBD8:
496       assert disk.children, "Empty children for DRBD8?"
497       fchild = disk.children[0]
498       mismatch = fchild.size < disk.size
499       if mismatch:
500         self.LogInfo("Child disk has size %d, parent %d, fixing",
501                      fchild.size, disk.size)
502         fchild.size = disk.size
503
504       # and we recurse on this child only, not on the metadev
505       return self._EnsureChildSizes(fchild) or mismatch
506     else:
507       return False
508
509   def Exec(self, feedback_fn):
510     """Verify the size of cluster disks.
511
512     """
513     # TODO: check child disks too
514     # TODO: check differences in size between primary/secondary nodes
515     per_node_disks = {}
516     for instance in self.wanted_instances:
517       pnode = instance.primary_node
518       if pnode not in per_node_disks:
519         per_node_disks[pnode] = []
520       for idx, disk in enumerate(instance.disks):
521         per_node_disks[pnode].append((instance, idx, disk))
522
523     assert not (frozenset(per_node_disks.keys()) -
524                 self.owned_locks(locking.LEVEL_NODE_RES)), \
525       "Not owning correct locks"
526     assert not self.owned_locks(locking.LEVEL_NODE)
527
528     es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
529                                                per_node_disks.keys())
530
531     changed = []
532     for node_uuid, dskl in per_node_disks.items():
533       newl = [v[2].Copy() for v in dskl]
534       for dsk in newl:
535         self.cfg.SetDiskID(dsk, node_uuid)
536       node_name = self.cfg.GetNodeName(node_uuid)
537       result = self.rpc.call_blockdev_getdimensions(node_uuid, newl)
538       if result.fail_msg:
539         self.LogWarning("Failure in blockdev_getdimensions call to node"
540                         " %s, ignoring", node_name)
541         continue
542       if len(result.payload) != len(dskl):
543         logging.warning("Invalid result from node %s: len(dksl)=%d,"
544                         " result.payload=%s", node_name, len(dskl),
545                         result.payload)
546         self.LogWarning("Invalid result from node %s, ignoring node results",
547                         node_name)
548         continue
549       for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
550         if dimensions is None:
551           self.LogWarning("Disk %d of instance %s did not return size"
552                           " information, ignoring", idx, instance.name)
553           continue
554         if not isinstance(dimensions, (tuple, list)):
555           self.LogWarning("Disk %d of instance %s did not return valid"
556                           " dimension information, ignoring", idx,
557                           instance.name)
558           continue
559         (size, spindles) = dimensions
560         if not isinstance(size, (int, long)):
561           self.LogWarning("Disk %d of instance %s did not return valid"
562                           " size information, ignoring", idx, instance.name)
563           continue
564         size = size >> 20
565         if size != disk.size:
566           self.LogInfo("Disk %d of instance %s has mismatched size,"
567                        " correcting: recorded %d, actual %d", idx,
568                        instance.name, disk.size, size)
569           disk.size = size
570           self.cfg.Update(instance, feedback_fn)
571           changed.append((instance.name, idx, "size", size))
572         if es_flags[node_uuid]:
573           if spindles is None:
574             self.LogWarning("Disk %d of instance %s did not return valid"
575                             " spindles information, ignoring", idx,
576                             instance.name)
577           elif disk.spindles is None or disk.spindles != spindles:
578             self.LogInfo("Disk %d of instance %s has mismatched spindles,"
579                          " correcting: recorded %s, actual %s",
580                          idx, instance.name, disk.spindles, spindles)
581             disk.spindles = spindles
582             self.cfg.Update(instance, feedback_fn)
583             changed.append((instance.name, idx, "spindles", disk.spindles))
584         if self._EnsureChildSizes(disk):
585           self.cfg.Update(instance, feedback_fn)
586           changed.append((instance.name, idx, "size", disk.size))
587     return changed
588
589
590 def _ValidateNetmask(cfg, netmask):
591   """Checks if a netmask is valid.
592
593   @type cfg: L{config.ConfigWriter}
594   @param cfg: The cluster configuration
595   @type netmask: int
596   @param netmask: the netmask to be verified
597   @raise errors.OpPrereqError: if the validation fails
598
599   """
600   ip_family = cfg.GetPrimaryIPFamily()
601   try:
602     ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
603   except errors.ProgrammerError:
604     raise errors.OpPrereqError("Invalid primary ip family: %s." %
605                                ip_family, errors.ECODE_INVAL)
606   if not ipcls.ValidateNetmask(netmask):
607     raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
608                                (netmask), errors.ECODE_INVAL)
609
610
611 class LUClusterSetParams(LogicalUnit):
612   """Change the parameters of the cluster.
613
614   """
615   HPATH = "cluster-modify"
616   HTYPE = constants.HTYPE_CLUSTER
617   REQ_BGL = False
618
619   def CheckArguments(self):
620     """Check parameters
621
622     """
623     if self.op.uid_pool:
624       uidpool.CheckUidPool(self.op.uid_pool)
625
626     if self.op.add_uids:
627       uidpool.CheckUidPool(self.op.add_uids)
628
629     if self.op.remove_uids:
630       uidpool.CheckUidPool(self.op.remove_uids)
631
632     if self.op.master_netmask is not None:
633       _ValidateNetmask(self.cfg, self.op.master_netmask)
634
635     if self.op.diskparams:
636       for dt_params in self.op.diskparams.values():
637         utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
638       try:
639         utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
640       except errors.OpPrereqError, err:
641         raise errors.OpPrereqError("While verify diskparams options: %s" % err,
642                                    errors.ECODE_INVAL)
643
644   def ExpandNames(self):
645     # FIXME: in the future maybe other cluster params won't require checking on
646     # all nodes to be modified.
647     # FIXME: This opcode changes cluster-wide settings. Is acquiring all
648     # resource locks the right thing, shouldn't it be the BGL instead?
649     self.needed_locks = {
650       locking.LEVEL_NODE: locking.ALL_SET,
651       locking.LEVEL_INSTANCE: locking.ALL_SET,
652       locking.LEVEL_NODEGROUP: locking.ALL_SET,
653       locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
654     }
655     self.share_locks = ShareAll()
656
657   def BuildHooksEnv(self):
658     """Build hooks env.
659
660     """
661     return {
662       "OP_TARGET": self.cfg.GetClusterName(),
663       "NEW_VG_NAME": self.op.vg_name,
664       }
665
666   def BuildHooksNodes(self):
667     """Build hooks nodes.
668
669     """
670     mn = self.cfg.GetMasterNode()
671     return ([mn], [mn])
672
673   def _CheckVgName(self, node_uuids, enabled_disk_templates,
674                    new_enabled_disk_templates):
675     """Check the consistency of the vg name on all nodes and in case it gets
676        unset whether there are instances still using it.
677
678     """
679     if self.op.vg_name is not None and not self.op.vg_name:
680       if self.cfg.HasAnyDiskOfType(constants.LD_LV):
681         raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
682                                    " instances exist", errors.ECODE_INVAL)
683
684     if (self.op.vg_name is not None and
685         utils.IsLvmEnabled(enabled_disk_templates)) or \
686            (self.cfg.GetVGName() is not None and
687             utils.LvmGetsEnabled(enabled_disk_templates,
688                                  new_enabled_disk_templates)):
689       self._CheckVgNameOnNodes(node_uuids)
690
691   def _CheckVgNameOnNodes(self, node_uuids):
692     """Check the status of the volume group on each node.
693
694     """
695     vglist = self.rpc.call_vg_list(node_uuids)
696     for node_uuid in node_uuids:
697       msg = vglist[node_uuid].fail_msg
698       if msg:
699         # ignoring down node
700         self.LogWarning("Error while gathering data on node %s"
701                         " (ignoring node): %s",
702                         self.cfg.GetNodeName(node_uuid), msg)
703         continue
704       vgstatus = utils.CheckVolumeGroupSize(vglist[node_uuid].payload,
705                                             self.op.vg_name,
706                                             constants.MIN_VG_SIZE)
707       if vgstatus:
708         raise errors.OpPrereqError("Error on node '%s': %s" %
709                                    (self.cfg.GetNodeName(node_uuid), vgstatus),
710                                    errors.ECODE_ENVIRON)
711
712   def _GetEnabledDiskTemplates(self, cluster):
713     """Determines the enabled disk templates and the subset of disk templates
714        that are newly enabled by this operation.
715
716     """
717     enabled_disk_templates = None
718     new_enabled_disk_templates = []
719     if self.op.enabled_disk_templates:
720       enabled_disk_templates = self.op.enabled_disk_templates
721       new_enabled_disk_templates = \
722         list(set(enabled_disk_templates)
723              - set(cluster.enabled_disk_templates))
724     else:
725       enabled_disk_templates = cluster.enabled_disk_templates
726     return (enabled_disk_templates, new_enabled_disk_templates)
727
728   def CheckPrereq(self):
729     """Check prerequisites.
730
731     This checks whether the given params don't conflict and
732     if the given volume group is valid.
733
734     """
735     if self.op.drbd_helper is not None and not self.op.drbd_helper:
736       if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
737         raise errors.OpPrereqError("Cannot disable drbd helper while"
738                                    " drbd-based instances exist",
739                                    errors.ECODE_INVAL)
740
741     node_uuids = self.owned_locks(locking.LEVEL_NODE)
742     self.cluster = cluster = self.cfg.GetClusterInfo()
743
744     vm_capable_node_uuids = [node.uuid
745                              for node in self.cfg.GetAllNodesInfo().values()
746                              if node.uuid in node_uuids and node.vm_capable]
747
748     (enabled_disk_templates, new_enabled_disk_templates) = \
749       self._GetEnabledDiskTemplates(cluster)
750
751     self._CheckVgName(vm_capable_node_uuids, enabled_disk_templates,
752                       new_enabled_disk_templates)
753
754     if self.op.drbd_helper:
755       # checks given drbd helper on all nodes
756       helpers = self.rpc.call_drbd_helper(node_uuids)
757       for (_, ninfo) in self.cfg.GetMultiNodeInfo(node_uuids):
758         if ninfo.offline:
759           self.LogInfo("Not checking drbd helper on offline node %s",
760                        ninfo.name)
761           continue
762         msg = helpers[ninfo.uuid].fail_msg
763         if msg:
764           raise errors.OpPrereqError("Error checking drbd helper on node"
765                                      " '%s': %s" % (ninfo.name, msg),
766                                      errors.ECODE_ENVIRON)
767         node_helper = helpers[ninfo.uuid].payload
768         if node_helper != self.op.drbd_helper:
769           raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
770                                      (ninfo.name, node_helper),
771                                      errors.ECODE_ENVIRON)
772
773     # validate params changes
774     if self.op.beparams:
775       objects.UpgradeBeParams(self.op.beparams)
776       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
777       self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
778
779     if self.op.ndparams:
780       utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
781       self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
782
783       # TODO: we need a more general way to handle resetting
784       # cluster-level parameters to default values
785       if self.new_ndparams["oob_program"] == "":
786         self.new_ndparams["oob_program"] = \
787             constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
788
789     if self.op.hv_state:
790       new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
791                                            self.cluster.hv_state_static)
792       self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
793                                for hv, values in new_hv_state.items())
794
795     if self.op.disk_state:
796       new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
797                                                self.cluster.disk_state_static)
798       self.new_disk_state = \
799         dict((storage, dict((name, cluster.SimpleFillDiskState(values))
800                             for name, values in svalues.items()))
801              for storage, svalues in new_disk_state.items())
802
803     if self.op.ipolicy:
804       self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
805                                            group_policy=False)
806
807       all_instances = self.cfg.GetAllInstancesInfo().values()
808       violations = set()
809       for group in self.cfg.GetAllNodeGroupsInfo().values():
810         instances = frozenset([inst for inst in all_instances
811                                if compat.any(nuuid in group.members
812                                              for nuuid in inst.all_nodes)])
813         new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
814         ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
815         new = ComputeNewInstanceViolations(ipol,
816                                            new_ipolicy, instances, self.cfg)
817         if new:
818           violations.update(new)
819
820       if violations:
821         self.LogWarning("After the ipolicy change the following instances"
822                         " violate them: %s",
823                         utils.CommaJoin(utils.NiceSort(violations)))
824
825     if self.op.nicparams:
826       utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
827       self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
828       objects.NIC.CheckParameterSyntax(self.new_nicparams)
829       nic_errors = []
830
831       # check all instances for consistency
832       for instance in self.cfg.GetAllInstancesInfo().values():
833         for nic_idx, nic in enumerate(instance.nics):
834           params_copy = copy.deepcopy(nic.nicparams)
835           params_filled = objects.FillDict(self.new_nicparams, params_copy)
836
837           # check parameter syntax
838           try:
839             objects.NIC.CheckParameterSyntax(params_filled)
840           except errors.ConfigurationError, err:
841             nic_errors.append("Instance %s, nic/%d: %s" %
842                               (instance.name, nic_idx, err))
843
844           # if we're moving instances to routed, check that they have an ip
845           target_mode = params_filled[constants.NIC_MODE]
846           if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
847             nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
848                               " address" % (instance.name, nic_idx))
849       if nic_errors:
850         raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
851                                    "\n".join(nic_errors), errors.ECODE_INVAL)
852
853     # hypervisor list/parameters
854     self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
855     if self.op.hvparams:
856       for hv_name, hv_dict in self.op.hvparams.items():
857         if hv_name not in self.new_hvparams:
858           self.new_hvparams[hv_name] = hv_dict
859         else:
860           self.new_hvparams[hv_name].update(hv_dict)
861
862     # disk template parameters
863     self.new_diskparams = objects.FillDict(cluster.diskparams, {})
864     if self.op.diskparams:
865       for dt_name, dt_params in self.op.diskparams.items():
866         if dt_name not in self.op.diskparams:
867           self.new_diskparams[dt_name] = dt_params
868         else:
869           self.new_diskparams[dt_name].update(dt_params)
870
871     # os hypervisor parameters
872     self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
873     if self.op.os_hvp:
874       for os_name, hvs in self.op.os_hvp.items():
875         if os_name not in self.new_os_hvp:
876           self.new_os_hvp[os_name] = hvs
877         else:
878           for hv_name, hv_dict in hvs.items():
879             if hv_dict is None:
880               # Delete if it exists
881               self.new_os_hvp[os_name].pop(hv_name, None)
882             elif hv_name not in self.new_os_hvp[os_name]:
883               self.new_os_hvp[os_name][hv_name] = hv_dict
884             else:
885               self.new_os_hvp[os_name][hv_name].update(hv_dict)
886
887     # os parameters
888     self.new_osp = objects.FillDict(cluster.osparams, {})
889     if self.op.osparams:
890       for os_name, osp in self.op.osparams.items():
891         if os_name not in self.new_osp:
892           self.new_osp[os_name] = {}
893
894         self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
895                                                  use_none=True)
896
897         if not self.new_osp[os_name]:
898           # we removed all parameters
899           del self.new_osp[os_name]
900         else:
901           # check the parameter validity (remote check)
902           CheckOSParams(self, False, [self.cfg.GetMasterNode()],
903                         os_name, self.new_osp[os_name])
904
905     # changes to the hypervisor list
906     if self.op.enabled_hypervisors is not None:
907       self.hv_list = self.op.enabled_hypervisors
908       for hv in self.hv_list:
909         # if the hypervisor doesn't already exist in the cluster
910         # hvparams, we initialize it to empty, and then (in both
911         # cases) we make sure to fill the defaults, as we might not
912         # have a complete defaults list if the hypervisor wasn't
913         # enabled before
914         if hv not in new_hvp:
915           new_hvp[hv] = {}
916         new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
917         utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
918     else:
919       self.hv_list = cluster.enabled_hypervisors
920
921     if self.op.hvparams or self.op.enabled_hypervisors is not None:
922       # either the enabled list has changed, or the parameters have, validate
923       for hv_name, hv_params in self.new_hvparams.items():
924         if ((self.op.hvparams and hv_name in self.op.hvparams) or
925             (self.op.enabled_hypervisors and
926              hv_name in self.op.enabled_hypervisors)):
927           # either this is a new hypervisor, or its parameters have changed
928           hv_class = hypervisor.GetHypervisorClass(hv_name)
929           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
930           hv_class.CheckParameterSyntax(hv_params)
931           CheckHVParams(self, node_uuids, hv_name, hv_params)
932
933     self._CheckDiskTemplateConsistency()
934
935     if self.op.os_hvp:
936       # no need to check any newly-enabled hypervisors, since the
937       # defaults have already been checked in the above code-block
938       for os_name, os_hvp in self.new_os_hvp.items():
939         for hv_name, hv_params in os_hvp.items():
940           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
941           # we need to fill in the new os_hvp on top of the actual hv_p
942           cluster_defaults = self.new_hvparams.get(hv_name, {})
943           new_osp = objects.FillDict(cluster_defaults, hv_params)
944           hv_class = hypervisor.GetHypervisorClass(hv_name)
945           hv_class.CheckParameterSyntax(new_osp)
946           CheckHVParams(self, node_uuids, hv_name, new_osp)
947
948     if self.op.default_iallocator:
949       alloc_script = utils.FindFile(self.op.default_iallocator,
950                                     constants.IALLOCATOR_SEARCH_PATH,
951                                     os.path.isfile)
952       if alloc_script is None:
953         raise errors.OpPrereqError("Invalid default iallocator script '%s'"
954                                    " specified" % self.op.default_iallocator,
955                                    errors.ECODE_INVAL)
956
957   def _CheckDiskTemplateConsistency(self):
958     """Check whether the disk templates that are going to be disabled
959        are still in use by some instances.
960
961     """
962     if self.op.enabled_disk_templates:
963       cluster = self.cfg.GetClusterInfo()
964       instances = self.cfg.GetAllInstancesInfo()
965
966       disk_templates_to_remove = set(cluster.enabled_disk_templates) \
967         - set(self.op.enabled_disk_templates)
968       for instance in instances.itervalues():
969         if instance.disk_template in disk_templates_to_remove:
970           raise errors.OpPrereqError("Cannot disable disk template '%s',"
971                                      " because instance '%s' is using it." %
972                                      (instance.disk_template, instance.name))
973
974   def _SetVgName(self, feedback_fn):
975     """Determines and sets the new volume group name.
976
977     """
978     if self.op.vg_name is not None:
979       if self.op.vg_name and not \
980            utils.IsLvmEnabled(self.cluster.enabled_disk_templates):
981         feedback_fn("Note that you specified a volume group, but did not"
982                     " enable any lvm disk template.")
983       new_volume = self.op.vg_name
984       if not new_volume:
985         if utils.IsLvmEnabled(self.cluster.enabled_disk_templates):
986           raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
987                                      " disk templates are enabled.")
988         new_volume = None
989       if new_volume != self.cfg.GetVGName():
990         self.cfg.SetVGName(new_volume)
991       else:
992         feedback_fn("Cluster LVM configuration already in desired"
993                     " state, not changing")
994     else:
995       if utils.IsLvmEnabled(self.cluster.enabled_disk_templates) and \
996           not self.cfg.GetVGName():
997         raise errors.OpPrereqError("Please specify a volume group when"
998                                    " enabling lvm-based disk-templates.")
999
1000   def Exec(self, feedback_fn):
1001     """Change the parameters of the cluster.
1002
1003     """
1004     if self.op.enabled_disk_templates:
1005       self.cluster.enabled_disk_templates = \
1006         list(set(self.op.enabled_disk_templates))
1007
1008     self._SetVgName(feedback_fn)
1009
1010     if self.op.drbd_helper is not None:
1011       if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
1012         feedback_fn("Note that you specified a drbd user helper, but did"
1013                     " enabled the drbd disk template.")
1014       new_helper = self.op.drbd_helper
1015       if not new_helper:
1016         new_helper = None
1017       if new_helper != self.cfg.GetDRBDHelper():
1018         self.cfg.SetDRBDHelper(new_helper)
1019       else:
1020         feedback_fn("Cluster DRBD helper already in desired state,"
1021                     " not changing")
1022     if self.op.hvparams:
1023       self.cluster.hvparams = self.new_hvparams
1024     if self.op.os_hvp:
1025       self.cluster.os_hvp = self.new_os_hvp
1026     if self.op.enabled_hypervisors is not None:
1027       self.cluster.hvparams = self.new_hvparams
1028       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1029     if self.op.beparams:
1030       self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1031     if self.op.nicparams:
1032       self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1033     if self.op.ipolicy:
1034       self.cluster.ipolicy = self.new_ipolicy
1035     if self.op.osparams:
1036       self.cluster.osparams = self.new_osp
1037     if self.op.ndparams:
1038       self.cluster.ndparams = self.new_ndparams
1039     if self.op.diskparams:
1040       self.cluster.diskparams = self.new_diskparams
1041     if self.op.hv_state:
1042       self.cluster.hv_state_static = self.new_hv_state
1043     if self.op.disk_state:
1044       self.cluster.disk_state_static = self.new_disk_state
1045
1046     if self.op.candidate_pool_size is not None:
1047       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1048       # we need to update the pool size here, otherwise the save will fail
1049       AdjustCandidatePool(self, [])
1050
1051     if self.op.maintain_node_health is not None:
1052       if self.op.maintain_node_health and not constants.ENABLE_CONFD:
1053         feedback_fn("Note: CONFD was disabled at build time, node health"
1054                     " maintenance is not useful (still enabling it)")
1055       self.cluster.maintain_node_health = self.op.maintain_node_health
1056
1057     if self.op.prealloc_wipe_disks is not None:
1058       self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
1059
1060     if self.op.add_uids is not None:
1061       uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
1062
1063     if self.op.remove_uids is not None:
1064       uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
1065
1066     if self.op.uid_pool is not None:
1067       self.cluster.uid_pool = self.op.uid_pool
1068
1069     if self.op.default_iallocator is not None:
1070       self.cluster.default_iallocator = self.op.default_iallocator
1071
1072     if self.op.reserved_lvs is not None:
1073       self.cluster.reserved_lvs = self.op.reserved_lvs
1074
1075     if self.op.use_external_mip_script is not None:
1076       self.cluster.use_external_mip_script = self.op.use_external_mip_script
1077
1078     def helper_os(aname, mods, desc):
1079       desc += " OS list"
1080       lst = getattr(self.cluster, aname)
1081       for key, val in mods:
1082         if key == constants.DDM_ADD:
1083           if val in lst:
1084             feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1085           else:
1086             lst.append(val)
1087         elif key == constants.DDM_REMOVE:
1088           if val in lst:
1089             lst.remove(val)
1090           else:
1091             feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1092         else:
1093           raise errors.ProgrammerError("Invalid modification '%s'" % key)
1094
1095     if self.op.hidden_os:
1096       helper_os("hidden_os", self.op.hidden_os, "hidden")
1097
1098     if self.op.blacklisted_os:
1099       helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1100
1101     if self.op.master_netdev:
1102       master_params = self.cfg.GetMasterNetworkParameters()
1103       ems = self.cfg.GetUseExternalMipScript()
1104       feedback_fn("Shutting down master ip on the current netdev (%s)" %
1105                   self.cluster.master_netdev)
1106       result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
1107                                                        master_params, ems)
1108       if not self.op.force:
1109         result.Raise("Could not disable the master ip")
1110       else:
1111         if result.fail_msg:
1112           msg = ("Could not disable the master ip (continuing anyway): %s" %
1113                  result.fail_msg)
1114           feedback_fn(msg)
1115       feedback_fn("Changing master_netdev from %s to %s" %
1116                   (master_params.netdev, self.op.master_netdev))
1117       self.cluster.master_netdev = self.op.master_netdev
1118
1119     if self.op.master_netmask:
1120       master_params = self.cfg.GetMasterNetworkParameters()
1121       feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1122       result = self.rpc.call_node_change_master_netmask(
1123                  master_params.uuid, master_params.netmask,
1124                  self.op.master_netmask, master_params.ip,
1125                  master_params.netdev)
1126       result.Warn("Could not change the master IP netmask", feedback_fn)
1127       self.cluster.master_netmask = self.op.master_netmask
1128
1129     self.cfg.Update(self.cluster, feedback_fn)
1130
1131     if self.op.master_netdev:
1132       master_params = self.cfg.GetMasterNetworkParameters()
1133       feedback_fn("Starting the master ip on the new master netdev (%s)" %
1134                   self.op.master_netdev)
1135       ems = self.cfg.GetUseExternalMipScript()
1136       result = self.rpc.call_node_activate_master_ip(master_params.uuid,
1137                                                      master_params, ems)
1138       result.Warn("Could not re-enable the master ip on the master,"
1139                   " please restart manually", self.LogWarning)
1140
1141
1142 class LUClusterVerify(NoHooksLU):
1143   """Submits all jobs necessary to verify the cluster.
1144
1145   """
1146   REQ_BGL = False
1147
1148   def ExpandNames(self):
1149     self.needed_locks = {}
1150
1151   def Exec(self, feedback_fn):
1152     jobs = []
1153
1154     if self.op.group_name:
1155       groups = [self.op.group_name]
1156       depends_fn = lambda: None
1157     else:
1158       groups = self.cfg.GetNodeGroupList()
1159
1160       # Verify global configuration
1161       jobs.append([
1162         opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1163         ])
1164
1165       # Always depend on global verification
1166       depends_fn = lambda: [(-len(jobs), [])]
1167
1168     jobs.extend(
1169       [opcodes.OpClusterVerifyGroup(group_name=group,
1170                                     ignore_errors=self.op.ignore_errors,
1171                                     depends=depends_fn())]
1172       for group in groups)
1173
1174     # Fix up all parameters
1175     for op in itertools.chain(*jobs): # pylint: disable=W0142
1176       op.debug_simulate_errors = self.op.debug_simulate_errors
1177       op.verbose = self.op.verbose
1178       op.error_codes = self.op.error_codes
1179       try:
1180         op.skip_checks = self.op.skip_checks
1181       except AttributeError:
1182         assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1183
1184     return ResultWithJobs(jobs)
1185
1186
1187 class _VerifyErrors(object):
1188   """Mix-in for cluster/group verify LUs.
1189
1190   It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1191   self.op and self._feedback_fn to be available.)
1192
1193   """
1194
1195   ETYPE_FIELD = "code"
1196   ETYPE_ERROR = "ERROR"
1197   ETYPE_WARNING = "WARNING"
1198
1199   def _Error(self, ecode, item, msg, *args, **kwargs):
1200     """Format an error message.
1201
1202     Based on the opcode's error_codes parameter, either format a
1203     parseable error code, or a simpler error string.
1204
1205     This must be called only from Exec and functions called from Exec.
1206
1207     """
1208     ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1209     itype, etxt, _ = ecode
1210     # If the error code is in the list of ignored errors, demote the error to a
1211     # warning
1212     if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1213       ltype = self.ETYPE_WARNING
1214     # first complete the msg
1215     if args:
1216       msg = msg % args
1217     # then format the whole message
1218     if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1219       msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1220     else:
1221       if item:
1222         item = " " + item
1223       else:
1224         item = ""
1225       msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1226     # and finally report it via the feedback_fn
1227     self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1228     # do not mark the operation as failed for WARN cases only
1229     if ltype == self.ETYPE_ERROR:
1230       self.bad = True
1231
1232   def _ErrorIf(self, cond, *args, **kwargs):
1233     """Log an error message if the passed condition is True.
1234
1235     """
1236     if (bool(cond)
1237         or self.op.debug_simulate_errors): # pylint: disable=E1101
1238       self._Error(*args, **kwargs)
1239
1240
1241 def _VerifyCertificate(filename):
1242   """Verifies a certificate for L{LUClusterVerifyConfig}.
1243
1244   @type filename: string
1245   @param filename: Path to PEM file
1246
1247   """
1248   try:
1249     cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1250                                            utils.ReadFile(filename))
1251   except Exception, err: # pylint: disable=W0703
1252     return (LUClusterVerifyConfig.ETYPE_ERROR,
1253             "Failed to load X509 certificate %s: %s" % (filename, err))
1254
1255   (errcode, msg) = \
1256     utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1257                                 constants.SSL_CERT_EXPIRATION_ERROR)
1258
1259   if msg:
1260     fnamemsg = "While verifying %s: %s" % (filename, msg)
1261   else:
1262     fnamemsg = None
1263
1264   if errcode is None:
1265     return (None, fnamemsg)
1266   elif errcode == utils.CERT_WARNING:
1267     return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1268   elif errcode == utils.CERT_ERROR:
1269     return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1270
1271   raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1272
1273
1274 def _GetAllHypervisorParameters(cluster, instances):
1275   """Compute the set of all hypervisor parameters.
1276
1277   @type cluster: L{objects.Cluster}
1278   @param cluster: the cluster object
1279   @param instances: list of L{objects.Instance}
1280   @param instances: additional instances from which to obtain parameters
1281   @rtype: list of (origin, hypervisor, parameters)
1282   @return: a list with all parameters found, indicating the hypervisor they
1283        apply to, and the origin (can be "cluster", "os X", or "instance Y")
1284
1285   """
1286   hvp_data = []
1287
1288   for hv_name in cluster.enabled_hypervisors:
1289     hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1290
1291   for os_name, os_hvp in cluster.os_hvp.items():
1292     for hv_name, hv_params in os_hvp.items():
1293       if hv_params:
1294         full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1295         hvp_data.append(("os %s" % os_name, hv_name, full_params))
1296
1297   # TODO: collapse identical parameter values in a single one
1298   for instance in instances:
1299     if instance.hvparams:
1300       hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1301                        cluster.FillHV(instance)))
1302
1303   return hvp_data
1304
1305
1306 class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1307   """Verifies the cluster config.
1308
1309   """
1310   REQ_BGL = False
1311
1312   def _VerifyHVP(self, hvp_data):
1313     """Verifies locally the syntax of the hypervisor parameters.
1314
1315     """
1316     for item, hv_name, hv_params in hvp_data:
1317       msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1318              (item, hv_name))
1319       try:
1320         hv_class = hypervisor.GetHypervisorClass(hv_name)
1321         utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1322         hv_class.CheckParameterSyntax(hv_params)
1323       except errors.GenericError, err:
1324         self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1325
1326   def ExpandNames(self):
1327     self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1328     self.share_locks = ShareAll()
1329
1330   def CheckPrereq(self):
1331     """Check prerequisites.
1332
1333     """
1334     # Retrieve all information
1335     self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1336     self.all_node_info = self.cfg.GetAllNodesInfo()
1337     self.all_inst_info = self.cfg.GetAllInstancesInfo()
1338
1339   def Exec(self, feedback_fn):
1340     """Verify integrity of cluster, performing various test on nodes.
1341
1342     """
1343     self.bad = False
1344     self._feedback_fn = feedback_fn
1345
1346     feedback_fn("* Verifying cluster config")
1347
1348     for msg in self.cfg.VerifyConfig():
1349       self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1350
1351     feedback_fn("* Verifying cluster certificate files")
1352
1353     for cert_filename in pathutils.ALL_CERT_FILES:
1354       (errcode, msg) = _VerifyCertificate(cert_filename)
1355       self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1356
1357     feedback_fn("* Verifying hypervisor parameters")
1358
1359     self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1360                                                 self.all_inst_info.values()))
1361
1362     feedback_fn("* Verifying all nodes belong to an existing group")
1363
1364     # We do this verification here because, should this bogus circumstance
1365     # occur, it would never be caught by VerifyGroup, which only acts on
1366     # nodes/instances reachable from existing node groups.
1367
1368     dangling_nodes = set(node for node in self.all_node_info.values()
1369                          if node.group not in self.all_group_info)
1370
1371     dangling_instances = {}
1372     no_node_instances = []
1373
1374     for inst in self.all_inst_info.values():
1375       if inst.primary_node in [node.uuid for node in dangling_nodes]:
1376         dangling_instances.setdefault(inst.primary_node, []).append(inst.name)
1377       elif inst.primary_node not in self.all_node_info:
1378         no_node_instances.append(inst.name)
1379
1380     pretty_dangling = [
1381         "%s (%s)" %
1382         (node.name,
1383          utils.CommaJoin(dangling_instances.get(node.uuid,
1384                                                 ["no instances"])))
1385         for node in dangling_nodes]
1386
1387     self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1388                   None,
1389                   "the following nodes (and their instances) belong to a non"
1390                   " existing group: %s", utils.CommaJoin(pretty_dangling))
1391
1392     self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1393                   None,
1394                   "the following instances have a non-existing primary-node:"
1395                   " %s", utils.CommaJoin(no_node_instances))
1396
1397     return not self.bad
1398
1399
1400 class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1401   """Verifies the status of a node group.
1402
1403   """
1404   HPATH = "cluster-verify"
1405   HTYPE = constants.HTYPE_CLUSTER
1406   REQ_BGL = False
1407
1408   _HOOKS_INDENT_RE = re.compile("^", re.M)
1409
1410   class NodeImage(object):
1411     """A class representing the logical and physical status of a node.
1412
1413     @type uuid: string
1414     @ivar uuid: the node UUID to which this object refers
1415     @ivar volumes: a structure as returned from
1416         L{ganeti.backend.GetVolumeList} (runtime)
1417     @ivar instances: a list of running instances (runtime)
1418     @ivar pinst: list of configured primary instances (config)
1419     @ivar sinst: list of configured secondary instances (config)
1420     @ivar sbp: dictionary of {primary-node: list of instances} for all
1421         instances for which this node is secondary (config)
1422     @ivar mfree: free memory, as reported by hypervisor (runtime)
1423     @ivar dfree: free disk, as reported by the node (runtime)
1424     @ivar offline: the offline status (config)
1425     @type rpc_fail: boolean
1426     @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1427         not whether the individual keys were correct) (runtime)
1428     @type lvm_fail: boolean
1429     @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1430     @type hyp_fail: boolean
1431     @ivar hyp_fail: whether the RPC call didn't return the instance list
1432     @type ghost: boolean
1433     @ivar ghost: whether this is a known node or not (config)
1434     @type os_fail: boolean
1435     @ivar os_fail: whether the RPC call didn't return valid OS data
1436     @type oslist: list
1437     @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1438     @type vm_capable: boolean
1439     @ivar vm_capable: whether the node can host instances
1440     @type pv_min: float
1441     @ivar pv_min: size in MiB of the smallest PVs
1442     @type pv_max: float
1443     @ivar pv_max: size in MiB of the biggest PVs
1444
1445     """
1446     def __init__(self, offline=False, uuid=None, vm_capable=True):
1447       self.uuid = uuid
1448       self.volumes = {}
1449       self.instances = []
1450       self.pinst = []
1451       self.sinst = []
1452       self.sbp = {}
1453       self.mfree = 0
1454       self.dfree = 0
1455       self.offline = offline
1456       self.vm_capable = vm_capable
1457       self.rpc_fail = False
1458       self.lvm_fail = False
1459       self.hyp_fail = False
1460       self.ghost = False
1461       self.os_fail = False
1462       self.oslist = {}
1463       self.pv_min = None
1464       self.pv_max = None
1465
1466   def ExpandNames(self):
1467     # This raises errors.OpPrereqError on its own:
1468     self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1469
1470     # Get instances in node group; this is unsafe and needs verification later
1471     inst_names = \
1472       self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1473
1474     self.needed_locks = {
1475       locking.LEVEL_INSTANCE: inst_names,
1476       locking.LEVEL_NODEGROUP: [self.group_uuid],
1477       locking.LEVEL_NODE: [],
1478
1479       # This opcode is run by watcher every five minutes and acquires all nodes
1480       # for a group. It doesn't run for a long time, so it's better to acquire
1481       # the node allocation lock as well.
1482       locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1483       }
1484
1485     self.share_locks = ShareAll()
1486
1487   def DeclareLocks(self, level):
1488     if level == locking.LEVEL_NODE:
1489       # Get members of node group; this is unsafe and needs verification later
1490       nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1491
1492       all_inst_info = self.cfg.GetAllInstancesInfo()
1493
1494       # In Exec(), we warn about mirrored instances that have primary and
1495       # secondary living in separate node groups. To fully verify that
1496       # volumes for these instances are healthy, we will need to do an
1497       # extra call to their secondaries. We ensure here those nodes will
1498       # be locked.
1499       for inst in self.owned_locks(locking.LEVEL_INSTANCE):
1500         # Important: access only the instances whose lock is owned
1501         if all_inst_info[inst].disk_template in constants.DTS_INT_MIRROR:
1502           nodes.update(all_inst_info[inst].secondary_nodes)
1503
1504       self.needed_locks[locking.LEVEL_NODE] = nodes
1505
1506   def CheckPrereq(self):
1507     assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1508     self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1509
1510     group_node_uuids = set(self.group_info.members)
1511     group_instances = \
1512       self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1513
1514     unlocked_node_uuids = \
1515         group_node_uuids.difference(self.owned_locks(locking.LEVEL_NODE))
1516
1517     unlocked_instances = \
1518         group_instances.difference(self.owned_locks(locking.LEVEL_INSTANCE))
1519
1520     if unlocked_node_uuids:
1521       raise errors.OpPrereqError(
1522         "Missing lock for nodes: %s" %
1523         utils.CommaJoin(self.cfg.GetNodeNames(unlocked_node_uuids)),
1524         errors.ECODE_STATE)
1525
1526     if unlocked_instances:
1527       raise errors.OpPrereqError("Missing lock for instances: %s" %
1528                                  utils.CommaJoin(unlocked_instances),
1529                                  errors.ECODE_STATE)
1530
1531     self.all_node_info = self.cfg.GetAllNodesInfo()
1532     self.all_inst_info = self.cfg.GetAllInstancesInfo()
1533
1534     self.my_node_uuids = group_node_uuids
1535     self.my_node_info = dict((node_uuid, self.all_node_info[node_uuid])
1536                              for node_uuid in group_node_uuids)
1537
1538     self.my_inst_names = utils.NiceSort(group_instances)
1539     self.my_inst_info = dict((name, self.all_inst_info[name])
1540                              for name in self.my_inst_names)
1541
1542     # We detect here the nodes that will need the extra RPC calls for verifying
1543     # split LV volumes; they should be locked.
1544     extra_lv_nodes = set()
1545
1546     for inst in self.my_inst_info.values():
1547       if inst.disk_template in constants.DTS_INT_MIRROR:
1548         for nuuid in inst.all_nodes:
1549           if self.all_node_info[nuuid].group != self.group_uuid:
1550             extra_lv_nodes.add(nuuid)
1551
1552     unlocked_lv_nodes = \
1553         extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1554
1555     if unlocked_lv_nodes:
1556       raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1557                                  utils.CommaJoin(unlocked_lv_nodes),
1558                                  errors.ECODE_STATE)
1559     self.extra_lv_nodes = list(extra_lv_nodes)
1560
1561   def _VerifyNode(self, ninfo, nresult):
1562     """Perform some basic validation on data returned from a node.
1563
1564       - check the result data structure is well formed and has all the
1565         mandatory fields
1566       - check ganeti version
1567
1568     @type ninfo: L{objects.Node}
1569     @param ninfo: the node to check
1570     @param nresult: the results from the node
1571     @rtype: boolean
1572     @return: whether overall this call was successful (and we can expect
1573          reasonable values in the respose)
1574
1575     """
1576     # main result, nresult should be a non-empty dict
1577     test = not nresult or not isinstance(nresult, dict)
1578     self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1579                   "unable to verify node: no data returned")
1580     if test:
1581       return False
1582
1583     # compares ganeti version
1584     local_version = constants.PROTOCOL_VERSION
1585     remote_version = nresult.get("version", None)
1586     test = not (remote_version and
1587                 isinstance(remote_version, (list, tuple)) and
1588                 len(remote_version) == 2)
1589     self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1590                   "connection to node returned invalid data")
1591     if test:
1592       return False
1593
1594     test = local_version != remote_version[0]
1595     self._ErrorIf(test, constants.CV_ENODEVERSION, ninfo.name,
1596                   "incompatible protocol versions: master %s,"
1597                   " node %s", local_version, remote_version[0])
1598     if test:
1599       return False
1600
1601     # node seems compatible, we can actually try to look into its results
1602
1603     # full package version
1604     self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1605                   constants.CV_ENODEVERSION, ninfo.name,
1606                   "software version mismatch: master %s, node %s",
1607                   constants.RELEASE_VERSION, remote_version[1],
1608                   code=self.ETYPE_WARNING)
1609
1610     hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1611     if ninfo.vm_capable and isinstance(hyp_result, dict):
1612       for hv_name, hv_result in hyp_result.iteritems():
1613         test = hv_result is not None
1614         self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
1615                       "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1616
1617     hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1618     if ninfo.vm_capable and isinstance(hvp_result, list):
1619       for item, hv_name, hv_result in hvp_result:
1620         self._ErrorIf(True, constants.CV_ENODEHV, ninfo.name,
1621                       "hypervisor %s parameter verify failure (source %s): %s",
1622                       hv_name, item, hv_result)
1623
1624     test = nresult.get(constants.NV_NODESETUP,
1625                        ["Missing NODESETUP results"])
1626     self._ErrorIf(test, constants.CV_ENODESETUP, ninfo.name,
1627                   "node setup error: %s", "; ".join(test))
1628
1629     return True
1630
1631   def _VerifyNodeTime(self, ninfo, nresult,
1632                       nvinfo_starttime, nvinfo_endtime):
1633     """Check the node time.
1634
1635     @type ninfo: L{objects.Node}
1636     @param ninfo: the node to check
1637     @param nresult: the remote results for the node
1638     @param nvinfo_starttime: the start time of the RPC call
1639     @param nvinfo_endtime: the end time of the RPC call
1640
1641     """
1642     ntime = nresult.get(constants.NV_TIME, None)
1643     try:
1644       ntime_merged = utils.MergeTime(ntime)
1645     except (ValueError, TypeError):
1646       self._ErrorIf(True, constants.CV_ENODETIME, ninfo.name,
1647                     "Node returned invalid time")
1648       return
1649
1650     if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1651       ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1652     elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1653       ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1654     else:
1655       ntime_diff = None
1656
1657     self._ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, ninfo.name,
1658                   "Node time diverges by at least %s from master node time",
1659                   ntime_diff)
1660
1661   def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1662     """Check the node LVM results and update info for cross-node checks.
1663
1664     @type ninfo: L{objects.Node}
1665     @param ninfo: the node to check
1666     @param nresult: the remote results for the node
1667     @param vg_name: the configured VG name
1668     @type nimg: L{NodeImage}
1669     @param nimg: node image
1670
1671     """
1672     if vg_name is None:
1673       return
1674
1675     # checks vg existence and size > 20G
1676     vglist = nresult.get(constants.NV_VGLIST, None)
1677     test = not vglist
1678     self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
1679                   "unable to check volume groups")
1680     if not test:
1681       vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1682                                             constants.MIN_VG_SIZE)
1683       self._ErrorIf(vgstatus, constants.CV_ENODELVM, ninfo.name, vgstatus)
1684
1685     # Check PVs
1686     (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1687     for em in errmsgs:
1688       self._Error(constants.CV_ENODELVM, ninfo.name, em)
1689     if pvminmax is not None:
1690       (nimg.pv_min, nimg.pv_max) = pvminmax
1691
1692   def _VerifyGroupDRBDVersion(self, node_verify_infos):
1693     """Check cross-node DRBD version consistency.
1694
1695     @type node_verify_infos: dict
1696     @param node_verify_infos: infos about nodes as returned from the
1697       node_verify call.
1698
1699     """
1700     node_versions = {}
1701     for node_uuid, ndata in node_verify_infos.items():
1702       nresult = ndata.payload
1703       version = nresult.get(constants.NV_DRBDVERSION, "Missing DRBD version")
1704       node_versions[node_uuid] = version
1705
1706     if len(set(node_versions.values())) > 1:
1707       for node_uuid, version in sorted(node_versions.items()):
1708         msg = "DRBD version mismatch: %s" % version
1709         self._Error(constants.CV_ENODEDRBDHELPER, node_uuid, msg,
1710                     code=self.ETYPE_WARNING)
1711
1712   def _VerifyGroupLVM(self, node_image, vg_name):
1713     """Check cross-node consistency in LVM.
1714
1715     @type node_image: dict
1716     @param node_image: info about nodes, mapping from node to names to
1717       L{NodeImage} objects
1718     @param vg_name: the configured VG name
1719
1720     """
1721     if vg_name is None:
1722       return
1723
1724     # Only exclusive storage needs this kind of checks
1725     if not self._exclusive_storage:
1726       return
1727
1728     # exclusive_storage wants all PVs to have the same size (approximately),
1729     # if the smallest and the biggest ones are okay, everything is fine.
1730     # pv_min is None iff pv_max is None
1731     vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1732     if not vals:
1733       return
1734     (pvmin, minnode_uuid) = min((ni.pv_min, ni.uuid) for ni in vals)
1735     (pvmax, maxnode_uuid) = max((ni.pv_max, ni.uuid) for ni in vals)
1736     bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1737     self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1738                   "PV sizes differ too much in the group; smallest (%s MB) is"
1739                   " on %s, biggest (%s MB) is on %s",
1740                   pvmin, self.cfg.GetNodeName(minnode_uuid),
1741                   pvmax, self.cfg.GetNodeName(maxnode_uuid))
1742
1743   def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1744     """Check the node bridges.
1745
1746     @type ninfo: L{objects.Node}
1747     @param ninfo: the node to check
1748     @param nresult: the remote results for the node
1749     @param bridges: the expected list of bridges
1750
1751     """
1752     if not bridges:
1753       return
1754
1755     missing = nresult.get(constants.NV_BRIDGES, None)
1756     test = not isinstance(missing, list)
1757     self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1758                   "did not return valid bridge information")
1759     if not test:
1760       self._ErrorIf(bool(missing), constants.CV_ENODENET, ninfo.name,
1761                     "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1762
1763   def _VerifyNodeUserScripts(self, ninfo, nresult):
1764     """Check the results of user scripts presence and executability on the node
1765
1766     @type ninfo: L{objects.Node}
1767     @param ninfo: the node to check
1768     @param nresult: the remote results for the node
1769
1770     """
1771     test = not constants.NV_USERSCRIPTS in nresult
1772     self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1773                   "did not return user scripts information")
1774
1775     broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
1776     if not test:
1777       self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1778                     "user scripts not present or not executable: %s" %
1779                     utils.CommaJoin(sorted(broken_scripts)))
1780
1781   def _VerifyNodeNetwork(self, ninfo, nresult):
1782     """Check the node network connectivity results.
1783
1784     @type ninfo: L{objects.Node}
1785     @param ninfo: the node to check
1786     @param nresult: the remote results for the node
1787
1788     """
1789     test = constants.NV_NODELIST not in nresult
1790     self._ErrorIf(test, constants.CV_ENODESSH, ninfo.name,
1791                   "node hasn't returned node ssh connectivity data")
1792     if not test:
1793       if nresult[constants.NV_NODELIST]:
1794         for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1795           self._ErrorIf(True, constants.CV_ENODESSH, ninfo.name,
1796                         "ssh communication with node '%s': %s", a_node, a_msg)
1797
1798     test = constants.NV_NODENETTEST not in nresult
1799     self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1800                   "node hasn't returned node tcp connectivity data")
1801     if not test:
1802       if nresult[constants.NV_NODENETTEST]:
1803         nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1804         for anode in nlist:
1805           self._ErrorIf(True, constants.CV_ENODENET, ninfo.name,
1806                         "tcp communication with node '%s': %s",
1807                         anode, nresult[constants.NV_NODENETTEST][anode])
1808
1809     test = constants.NV_MASTERIP not in nresult
1810     self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1811                   "node hasn't returned node master IP reachability data")
1812     if not test:
1813       if not nresult[constants.NV_MASTERIP]:
1814         if ninfo.uuid == self.master_node:
1815           msg = "the master node cannot reach the master IP (not configured?)"
1816         else:
1817           msg = "cannot reach the master IP"
1818         self._ErrorIf(True, constants.CV_ENODENET, ninfo.name, msg)
1819
1820   def _VerifyInstance(self, instance, inst_config, node_image,
1821                       diskstatus):
1822     """Verify an instance.
1823
1824     This function checks to see if the required block devices are
1825     available on the instance's node, and that the nodes are in the correct
1826     state.
1827
1828     """
1829     pnode = inst_config.primary_node
1830     pnode_img = node_image[pnode]
1831     groupinfo = self.cfg.GetAllNodeGroupsInfo()
1832
1833     node_vol_should = {}
1834     inst_config.MapLVsByNode(node_vol_should)
1835
1836     cluster = self.cfg.GetClusterInfo()
1837     ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
1838                                                             self.group_info)
1839     err = ComputeIPolicyInstanceViolation(ipolicy, inst_config, self.cfg)
1840     self._ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance,
1841                   utils.CommaJoin(err), code=self.ETYPE_WARNING)
1842
1843     for node in node_vol_should:
1844       n_img = node_image[node]
1845       if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1846         # ignore missing volumes on offline or broken nodes
1847         continue
1848       for volume in node_vol_should[node]:
1849         test = volume not in n_img.volumes
1850         self._ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance,
1851                       "volume %s missing on node %s", volume,
1852                       self.cfg.GetNodeName(node))
1853
1854     if inst_config.admin_state == constants.ADMINST_UP:
1855       test = instance not in pnode_img.instances and not pnode_img.offline
1856       self._ErrorIf(test, constants.CV_EINSTANCEDOWN, instance,
1857                     "instance not running on its primary node %s",
1858                      self.cfg.GetNodeName(pnode))
1859       self._ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE, instance,
1860                     "instance is marked as running and lives on"
1861                     " offline node %s", self.cfg.GetNodeName(pnode))
1862
1863     diskdata = [(nname, success, status, idx)
1864                 for (nname, disks) in diskstatus.items()
1865                 for idx, (success, status) in enumerate(disks)]
1866
1867     for nname, success, bdev_status, idx in diskdata:
1868       # the 'ghost node' construction in Exec() ensures that we have a
1869       # node here
1870       snode = node_image[nname]
1871       bad_snode = snode.ghost or snode.offline
1872       self._ErrorIf(inst_config.disks_active and
1873                     not success and not bad_snode,
1874                     constants.CV_EINSTANCEFAULTYDISK, instance,
1875                     "couldn't retrieve status for disk/%s on %s: %s",
1876                     idx, self.cfg.GetNodeName(nname), bdev_status)
1877       self._ErrorIf((inst_config.disks_active and
1878                      success and
1879                      bdev_status.ldisk_status == constants.LDS_FAULTY),
1880                     constants.CV_EINSTANCEFAULTYDISK, instance,
1881                     "disk/%s on %s is faulty", idx, self.cfg.GetNodeName(nname))
1882
1883     self._ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
1884                   constants.CV_ENODERPC, pnode, "instance %s, connection to"
1885                   " primary node failed", instance)
1886
1887     self._ErrorIf(len(inst_config.secondary_nodes) > 1,
1888                   constants.CV_EINSTANCELAYOUT,
1889                   instance, "instance has multiple secondary nodes: %s",
1890                   utils.CommaJoin(inst_config.secondary_nodes),
1891                   code=self.ETYPE_WARNING)
1892
1893     es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
1894                                                inst_config.all_nodes)
1895     if any(es_flags.values()):
1896       if inst_config.disk_template not in constants.DTS_EXCL_STORAGE:
1897         # Disk template not compatible with exclusive_storage: no instance
1898         # node should have the flag set
1899         es_nodes = [n
1900                     for (n, es) in es_flags.items()
1901                     if es]
1902         self._Error(constants.CV_EINSTANCEUNSUITABLENODE, instance,
1903                     "instance has template %s, which is not supported on nodes"
1904                     " that have exclusive storage set: %s",
1905                     inst_config.disk_template,
1906                     utils.CommaJoin(self.cfg.GetNodeNames(es_nodes)))
1907       for (idx, disk) in enumerate(inst_config.disks):
1908         self._ErrorIf(disk.spindles is None,
1909                       constants.CV_EINSTANCEMISSINGCFGPARAMETER, instance,
1910                       "number of spindles not configured for disk %s while"
1911                       " exclusive storage is enabled, try running"
1912                       " gnt-cluster repair-disk-sizes", idx)
1913
1914     if inst_config.disk_template in constants.DTS_INT_MIRROR:
1915       instance_nodes = utils.NiceSort(inst_config.all_nodes)
1916       instance_groups = {}
1917
1918       for node in instance_nodes:
1919         instance_groups.setdefault(self.all_node_info[node].group,
1920                                    []).append(node)
1921
1922       pretty_list = [
1923         "%s (group %s)" % (utils.CommaJoin(self.cfg.GetNodeNames(nodes)),
1924                            groupinfo[group].name)
1925         # Sort so that we always list the primary node first.
1926         for group, nodes in sorted(instance_groups.items(),
1927                                    key=lambda (_, nodes): pnode in nodes,
1928                                    reverse=True)]
1929
1930       self._ErrorIf(len(instance_groups) > 1,
1931                     constants.CV_EINSTANCESPLITGROUPS,
1932                     instance, "instance has primary and secondary nodes in"
1933                     " different groups: %s", utils.CommaJoin(pretty_list),
1934                     code=self.ETYPE_WARNING)
1935
1936     inst_nodes_offline = []
1937     for snode in inst_config.secondary_nodes:
1938       s_img = node_image[snode]
1939       self._ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
1940                     snode, "instance %s, connection to secondary node failed",
1941                     instance)
1942
1943       if s_img.offline:
1944         inst_nodes_offline.append(snode)
1945
1946     # warn that the instance lives on offline nodes
1947     self._ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE, instance,
1948                   "instance has offline secondary node(s) %s",
1949                   utils.CommaJoin(self.cfg.GetNodeNames(inst_nodes_offline)))
1950     # ... or ghost/non-vm_capable nodes
1951     for node in inst_config.all_nodes:
1952       self._ErrorIf(node_image[node].ghost, constants.CV_EINSTANCEBADNODE,
1953                     instance, "instance lives on ghost node %s",
1954                     self.cfg.GetNodeName(node))
1955       self._ErrorIf(not node_image[node].vm_capable,
1956                     constants.CV_EINSTANCEBADNODE, instance,
1957                     "instance lives on non-vm_capable node %s",
1958                     self.cfg.GetNodeName(node))
1959
1960   def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
1961     """Verify if there are any unknown volumes in the cluster.
1962
1963     The .os, .swap and backup volumes are ignored. All other volumes are
1964     reported as unknown.
1965
1966     @type reserved: L{ganeti.utils.FieldSet}
1967     @param reserved: a FieldSet of reserved volume names
1968
1969     """
1970     for node_uuid, n_img in node_image.items():
1971       if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
1972           self.all_node_info[node_uuid].group != self.group_uuid):
1973         # skip non-healthy nodes
1974         continue
1975       for volume in n_img.volumes:
1976         test = ((node_uuid not in node_vol_should or
1977                 volume not in node_vol_should[node_uuid]) and
1978                 not reserved.Matches(volume))
1979         self._ErrorIf(test, constants.CV_ENODEORPHANLV,
1980                       self.cfg.GetNodeName(node_uuid),
1981                       "volume %s is unknown", volume)
1982
1983   def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
1984     """Verify N+1 Memory Resilience.
1985
1986     Check that if one single node dies we can still start all the
1987     instances it was primary for.
1988
1989     """
1990     cluster_info = self.cfg.GetClusterInfo()
1991     for node_uuid, n_img in node_image.items():
1992       # This code checks that every node which is now listed as
1993       # secondary has enough memory to host all instances it is
1994       # supposed to should a single other node in the cluster fail.
1995       # FIXME: not ready for failover to an arbitrary node
1996       # FIXME: does not support file-backed instances
1997       # WARNING: we currently take into account down instances as well
1998       # as up ones, considering that even if they're down someone
1999       # might want to start them even in the event of a node failure.
2000       if n_img.offline or \
2001          self.all_node_info[node_uuid].group != self.group_uuid:
2002         # we're skipping nodes marked offline and nodes in other groups from
2003         # the N+1 warning, since most likely we don't have good memory
2004         # infromation from them; we already list instances living on such
2005         # nodes, and that's enough warning
2006         continue
2007       #TODO(dynmem): also consider ballooning out other instances
2008       for prinode, instances in n_img.sbp.items():
2009         needed_mem = 0
2010         for instance in instances:
2011           bep = cluster_info.FillBE(instance_cfg[instance])
2012           if bep[constants.BE_AUTO_BALANCE]:
2013             needed_mem += bep[constants.BE_MINMEM]
2014         test = n_img.mfree < needed_mem
2015         self._ErrorIf(test, constants.CV_ENODEN1,
2016                       self.cfg.GetNodeName(node_uuid),
2017                       "not enough memory to accomodate instance failovers"
2018                       " should node %s fail (%dMiB needed, %dMiB available)",
2019                       self.cfg.GetNodeName(prinode), needed_mem, n_img.mfree)
2020
2021   def _VerifyFiles(self, nodes, master_node_uuid, all_nvinfo,
2022                    (files_all, files_opt, files_mc, files_vm)):
2023     """Verifies file checksums collected from all nodes.
2024
2025     @param nodes: List of L{objects.Node} objects
2026     @param master_node_uuid: UUID of master node
2027     @param all_nvinfo: RPC results
2028
2029     """
2030     # Define functions determining which nodes to consider for a file
2031     files2nodefn = [
2032       (files_all, None),
2033       (files_mc, lambda node: (node.master_candidate or
2034                                node.uuid == master_node_uuid)),
2035       (files_vm, lambda node: node.vm_capable),
2036       ]
2037
2038     # Build mapping from filename to list of nodes which should have the file
2039     nodefiles = {}
2040     for (files, fn) in files2nodefn:
2041       if fn is None:
2042         filenodes = nodes
2043       else:
2044         filenodes = filter(fn, nodes)
2045       nodefiles.update((filename,
2046                         frozenset(map(operator.attrgetter("uuid"), filenodes)))
2047                        for filename in files)
2048
2049     assert set(nodefiles) == (files_all | files_mc | files_vm)
2050
2051     fileinfo = dict((filename, {}) for filename in nodefiles)
2052     ignore_nodes = set()
2053
2054     for node in nodes:
2055       if node.offline:
2056         ignore_nodes.add(node.uuid)
2057         continue
2058
2059       nresult = all_nvinfo[node.uuid]
2060
2061       if nresult.fail_msg or not nresult.payload:
2062         node_files = None
2063       else:
2064         fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
2065         node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2066                           for (key, value) in fingerprints.items())
2067         del fingerprints
2068
2069       test = not (node_files and isinstance(node_files, dict))
2070       self._ErrorIf(test, constants.CV_ENODEFILECHECK, node.name,
2071                     "Node did not return file checksum data")
2072       if test:
2073         ignore_nodes.add(node.uuid)
2074         continue
2075
2076       # Build per-checksum mapping from filename to nodes having it
2077       for (filename, checksum) in node_files.items():
2078         assert filename in nodefiles
2079         fileinfo[filename].setdefault(checksum, set()).add(node.uuid)
2080
2081     for (filename, checksums) in fileinfo.items():
2082       assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2083
2084       # Nodes having the file
2085       with_file = frozenset(node_uuid
2086                             for node_uuids in fileinfo[filename].values()
2087                             for node_uuid in node_uuids) - ignore_nodes
2088
2089       expected_nodes = nodefiles[filename] - ignore_nodes
2090
2091       # Nodes missing file
2092       missing_file = expected_nodes - with_file
2093
2094       if filename in files_opt:
2095         # All or no nodes
2096         self._ErrorIf(missing_file and missing_file != expected_nodes,
2097                       constants.CV_ECLUSTERFILECHECK, None,
2098                       "File %s is optional, but it must exist on all or no"
2099                       " nodes (not found on %s)",
2100                       filename,
2101                       utils.CommaJoin(
2102                         utils.NiceSort(
2103                           map(self.cfg.GetNodeName, missing_file))))
2104       else:
2105         self._ErrorIf(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2106                       "File %s is missing from node(s) %s", filename,
2107                       utils.CommaJoin(
2108                         utils.NiceSort(
2109                           map(self.cfg.GetNodeName, missing_file))))
2110
2111         # Warn if a node has a file it shouldn't
2112         unexpected = with_file - expected_nodes
2113         self._ErrorIf(unexpected,
2114                       constants.CV_ECLUSTERFILECHECK, None,
2115                       "File %s should not exist on node(s) %s",
2116                       filename, utils.CommaJoin(
2117                         utils.NiceSort(map(self.cfg.GetNodeName, unexpected))))
2118
2119       # See if there are multiple versions of the file
2120       test = len(checksums) > 1
2121       if test:
2122         variants = ["variant %s on %s" %
2123                     (idx + 1,
2124                      utils.CommaJoin(utils.NiceSort(
2125                        map(self.cfg.GetNodeName, node_uuids))))
2126                     for (idx, (checksum, node_uuids)) in
2127                       enumerate(sorted(checksums.items()))]
2128       else:
2129         variants = []
2130
2131       self._ErrorIf(test, constants.CV_ECLUSTERFILECHECK, None,
2132                     "File %s found with %s different checksums (%s)",
2133                     filename, len(checksums), "; ".join(variants))
2134
2135   def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2136                       drbd_map):
2137     """Verifies and the node DRBD status.
2138
2139     @type ninfo: L{objects.Node}
2140     @param ninfo: the node to check
2141     @param nresult: the remote results for the node
2142     @param instanceinfo: the dict of instances
2143     @param drbd_helper: the configured DRBD usermode helper
2144     @param drbd_map: the DRBD map as returned by
2145         L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2146
2147     """
2148     if drbd_helper:
2149       helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2150       test = (helper_result is None)
2151       self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2152                     "no drbd usermode helper returned")
2153       if helper_result:
2154         status, payload = helper_result
2155         test = not status
2156         self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2157                       "drbd usermode helper check unsuccessful: %s", payload)
2158         test = status and (payload != drbd_helper)
2159         self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2160                       "wrong drbd usermode helper: %s", payload)
2161
2162     # compute the DRBD minors
2163     node_drbd = {}
2164     for minor, instance in drbd_map[ninfo.uuid].items():
2165       test = instance not in instanceinfo
2166       self._ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2167                     "ghost instance '%s' in temporary DRBD map", instance)
2168         # ghost instance should not be running, but otherwise we
2169         # don't give double warnings (both ghost instance and
2170         # unallocated minor in use)
2171       if test:
2172         node_drbd[minor] = (instance, False)
2173       else:
2174         instance = instanceinfo[instance]
2175         node_drbd[minor] = (instance.name, instance.disks_active)
2176
2177     # and now check them
2178     used_minors = nresult.get(constants.NV_DRBDLIST, [])
2179     test = not isinstance(used_minors, (tuple, list))
2180     self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2181                   "cannot parse drbd status file: %s", str(used_minors))
2182     if test:
2183       # we cannot check drbd status
2184       return
2185
2186     for minor, (iname, must_exist) in node_drbd.items():
2187       test = minor not in used_minors and must_exist
2188       self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2189                     "drbd minor %d of instance %s is not active", minor, iname)
2190     for minor in used_minors:
2191       test = minor not in node_drbd
2192       self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2193                     "unallocated drbd minor %d is in use", minor)
2194
2195   def _UpdateNodeOS(self, ninfo, nresult, nimg):
2196     """Builds the node OS structures.
2197
2198     @type ninfo: L{objects.Node}
2199     @param ninfo: the node to check
2200     @param nresult: the remote results for the node
2201     @param nimg: the node image object
2202
2203     """
2204     remote_os = nresult.get(constants.NV_OSLIST, None)
2205     test = (not isinstance(remote_os, list) or
2206             not compat.all(isinstance(v, list) and len(v) == 7
2207                            for v in remote_os))
2208
2209     self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2210                   "node hasn't returned valid OS data")
2211
2212     nimg.os_fail = test
2213
2214     if test:
2215       return
2216
2217     os_dict = {}
2218
2219     for (name, os_path, status, diagnose,
2220          variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2221
2222       if name not in os_dict:
2223         os_dict[name] = []
2224
2225       # parameters is a list of lists instead of list of tuples due to
2226       # JSON lacking a real tuple type, fix it:
2227       parameters = [tuple(v) for v in parameters]
2228       os_dict[name].append((os_path, status, diagnose,
2229                             set(variants), set(parameters), set(api_ver)))
2230
2231     nimg.oslist = os_dict
2232
2233   def _VerifyNodeOS(self, ninfo, nimg, base):
2234     """Verifies the node OS list.
2235
2236     @type ninfo: L{objects.Node}
2237     @param ninfo: the node to check
2238     @param nimg: the node image object
2239     @param base: the 'template' node we match against (e.g. from the master)
2240
2241     """
2242     assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2243
2244     beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2245     for os_name, os_data in nimg.oslist.items():
2246       assert os_data, "Empty OS status for OS %s?!" % os_name
2247       f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2248       self._ErrorIf(not f_status, constants.CV_ENODEOS, ninfo.name,
2249                     "Invalid OS %s (located at %s): %s",
2250                     os_name, f_path, f_diag)
2251       self._ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, ninfo.name,
2252                     "OS '%s' has multiple entries"
2253                     " (first one shadows the rest): %s",
2254                     os_name, utils.CommaJoin([v[0] for v in os_data]))
2255       # comparisons with the 'base' image
2256       test = os_name not in base.oslist
2257       self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2258                     "Extra OS %s not present on reference node (%s)",
2259                     os_name, self.cfg.GetNodeName(base.uuid))
2260       if test:
2261         continue
2262       assert base.oslist[os_name], "Base node has empty OS status?"
2263       _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2264       if not b_status:
2265         # base OS is invalid, skipping
2266         continue
2267       for kind, a, b in [("API version", f_api, b_api),
2268                          ("variants list", f_var, b_var),
2269                          ("parameters", beautify_params(f_param),
2270                           beautify_params(b_param))]:
2271         self._ErrorIf(a != b, constants.CV_ENODEOS, ninfo.name,
2272                       "OS %s for %s differs from reference node %s:"
2273                       " [%s] vs. [%s]", kind, os_name,
2274                       self.cfg.GetNodeName(base.uuid),
2275                       utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2276
2277     # check any missing OSes
2278     missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2279     self._ErrorIf(missing, constants.CV_ENODEOS, ninfo.name,
2280                   "OSes present on reference node %s"
2281                   " but missing on this node: %s",
2282                   self.cfg.GetNodeName(base.uuid), utils.CommaJoin(missing))
2283
2284   def _VerifyFileStoragePaths(self, ninfo, nresult, is_master):
2285     """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2286
2287     @type ninfo: L{objects.Node}
2288     @param ninfo: the node to check
2289     @param nresult: the remote results for the node
2290     @type is_master: bool
2291     @param is_master: Whether node is the master node
2292
2293     """
2294     if (is_master and
2295         (constants.ENABLE_FILE_STORAGE or
2296          constants.ENABLE_SHARED_FILE_STORAGE)):
2297       try:
2298         fspaths = nresult[constants.NV_FILE_STORAGE_PATHS]
2299       except KeyError:
2300         # This should never happen
2301         self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2302                       "Node did not return forbidden file storage paths")
2303       else:
2304         self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2305                       "Found forbidden file storage paths: %s",
2306                       utils.CommaJoin(fspaths))
2307     else:
2308       self._ErrorIf(constants.NV_FILE_STORAGE_PATHS in nresult,
2309                     constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2310                     "Node should not have returned forbidden file storage"
2311                     " paths")
2312
2313   def _VerifyOob(self, ninfo, nresult):
2314     """Verifies out of band functionality of a node.
2315
2316     @type ninfo: L{objects.Node}
2317     @param ninfo: the node to check
2318     @param nresult: the remote results for the node
2319
2320     """
2321     # We just have to verify the paths on master and/or master candidates
2322     # as the oob helper is invoked on the master
2323     if ((ninfo.master_candidate or ninfo.master_capable) and
2324         constants.NV_OOB_PATHS in nresult):
2325       for path_result in nresult[constants.NV_OOB_PATHS]:
2326         self._ErrorIf(path_result, constants.CV_ENODEOOBPATH,
2327                       ninfo.name, path_result)
2328
2329   def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2330     """Verifies and updates the node volume data.
2331
2332     This function will update a L{NodeImage}'s internal structures
2333     with data from the remote call.
2334
2335     @type ninfo: L{objects.Node}
2336     @param ninfo: the node to check
2337     @param nresult: the remote results for the node
2338     @param nimg: the node image object
2339     @param vg_name: the configured VG name
2340
2341     """
2342     nimg.lvm_fail = True
2343     lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2344     if vg_name is None:
2345       pass
2346     elif isinstance(lvdata, basestring):
2347       self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2348                     "LVM problem on node: %s", utils.SafeEncode(lvdata))
2349     elif not isinstance(lvdata, dict):
2350       self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2351                     "rpc call to node failed (lvlist)")
2352     else:
2353       nimg.volumes = lvdata
2354       nimg.lvm_fail = False
2355
2356   def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2357     """Verifies and updates the node instance list.
2358
2359     If the listing was successful, then updates this node's instance
2360     list. Otherwise, it marks the RPC call as failed for the instance
2361     list key.
2362
2363     @type ninfo: L{objects.Node}
2364     @param ninfo: the node to check
2365     @param nresult: the remote results for the node
2366     @param nimg: the node image object
2367
2368     """
2369     idata = nresult.get(constants.NV_INSTANCELIST, None)
2370     test = not isinstance(idata, list)
2371     self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2372                   "rpc call to node failed (instancelist): %s",
2373                   utils.SafeEncode(str(idata)))
2374     if test:
2375       nimg.hyp_fail = True
2376     else:
2377       nimg.instances = idata
2378
2379   def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2380     """Verifies and computes a node information map
2381
2382     @type ninfo: L{objects.Node}
2383     @param ninfo: the node to check
2384     @param nresult: the remote results for the node
2385     @param nimg: the node image object
2386     @param vg_name: the configured VG name
2387
2388     """
2389     # try to read free memory (from the hypervisor)
2390     hv_info = nresult.get(constants.NV_HVINFO, None)
2391     test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2392     self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2393                   "rpc call to node failed (hvinfo)")
2394     if not test:
2395       try:
2396         nimg.mfree = int(hv_info["memory_free"])
2397       except (ValueError, TypeError):
2398         self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2399                       "node returned invalid nodeinfo, check hypervisor")
2400
2401     # FIXME: devise a free space model for file based instances as well
2402     if vg_name is not None:
2403       test = (constants.NV_VGLIST not in nresult or
2404               vg_name not in nresult[constants.NV_VGLIST])
2405       self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
2406                     "node didn't return data for the volume group '%s'"
2407                     " - it is either missing or broken", vg_name)
2408       if not test:
2409         try:
2410           nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2411         except (ValueError, TypeError):
2412           self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2413                         "node returned invalid LVM info, check LVM status")
2414
2415   def _CollectDiskInfo(self, node_uuids, node_image, instanceinfo):
2416     """Gets per-disk status information for all instances.
2417
2418     @type node_uuids: list of strings
2419     @param node_uuids: Node UUIDs
2420     @type node_image: dict of (name, L{objects.Node})
2421     @param node_image: Node objects
2422     @type instanceinfo: dict of (name, L{objects.Instance})
2423     @param instanceinfo: Instance objects
2424     @rtype: {instance: {node: [(succes, payload)]}}
2425     @return: a dictionary of per-instance dictionaries with nodes as
2426         keys and disk information as values; the disk information is a
2427         list of tuples (success, payload)
2428
2429     """
2430     node_disks = {}
2431     node_disks_devonly = {}
2432     diskless_instances = set()
2433     diskless = constants.DT_DISKLESS
2434
2435     for nuuid in node_uuids:
2436       node_instances = list(itertools.chain(node_image[nuuid].pinst,
2437                                             node_image[nuuid].sinst))
2438       diskless_instances.update(inst for inst in node_instances
2439                                 if instanceinfo[inst].disk_template == diskless)
2440       disks = [(inst, disk)
2441                for inst in node_instances
2442                for disk in instanceinfo[inst].disks]
2443
2444       if not disks:
2445         # No need to collect data
2446         continue
2447
2448       node_disks[nuuid] = disks
2449
2450       # _AnnotateDiskParams makes already copies of the disks
2451       devonly = []
2452       for (inst, dev) in disks:
2453         (anno_disk,) = AnnotateDiskParams(instanceinfo[inst], [dev], self.cfg)
2454         self.cfg.SetDiskID(anno_disk, nuuid)
2455         devonly.append(anno_disk)
2456
2457       node_disks_devonly[nuuid] = devonly
2458
2459     assert len(node_disks) == len(node_disks_devonly)
2460
2461     # Collect data from all nodes with disks
2462     result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2463                                                           node_disks_devonly)
2464
2465     assert len(result) == len(node_disks)
2466
2467     instdisk = {}
2468
2469     for (nuuid, nres) in result.items():
2470       node = self.cfg.GetNodeInfo(nuuid)
2471       disks = node_disks[node.uuid]
2472
2473       if nres.offline:
2474         # No data from this node
2475         data = len(disks) * [(False, "node offline")]
2476       else:
2477         msg = nres.fail_msg
2478         self._ErrorIf(msg, constants.CV_ENODERPC, node.name,
2479                       "while getting disk information: %s", msg)
2480         if msg:
2481           # No data from this node
2482           data = len(disks) * [(False, msg)]
2483         else:
2484           data = []
2485           for idx, i in enumerate(nres.payload):
2486             if isinstance(i, (tuple, list)) and len(i) == 2:
2487               data.append(i)
2488             else:
2489               logging.warning("Invalid result from node %s, entry %d: %s",
2490                               node.name, idx, i)
2491               data.append((False, "Invalid result from the remote node"))
2492
2493       for ((inst, _), status) in zip(disks, data):
2494         instdisk.setdefault(inst, {}).setdefault(node.uuid, []).append(status)
2495
2496     # Add empty entries for diskless instances.
2497     for inst in diskless_instances:
2498       assert inst not in instdisk
2499       instdisk[inst] = {}
2500
2501     assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2502                       len(nuuids) <= len(instanceinfo[inst].all_nodes) and
2503                       compat.all(isinstance(s, (tuple, list)) and
2504                                  len(s) == 2 for s in statuses)
2505                       for inst, nuuids in instdisk.items()
2506                       for nuuid, statuses in nuuids.items())
2507     if __debug__:
2508       instdisk_keys = set(instdisk)
2509       instanceinfo_keys = set(instanceinfo)
2510       assert instdisk_keys == instanceinfo_keys, \
2511         ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2512          (instdisk_keys, instanceinfo_keys))
2513
2514     return instdisk
2515
2516   @staticmethod
2517   def _SshNodeSelector(group_uuid, all_nodes):
2518     """Create endless iterators for all potential SSH check hosts.
2519
2520     """
2521     nodes = [node for node in all_nodes
2522              if (node.group != group_uuid and
2523                  not node.offline)]
2524     keyfunc = operator.attrgetter("group")
2525
2526     return map(itertools.cycle,
2527                [sorted(map(operator.attrgetter("name"), names))
2528                 for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2529                                                   keyfunc)])
2530
2531   @classmethod
2532   def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2533     """Choose which nodes should talk to which other nodes.
2534
2535     We will make nodes contact all nodes in their group, and one node from
2536     every other group.
2537
2538     @warning: This algorithm has a known issue if one node group is much
2539       smaller than others (e.g. just one node). In such a case all other
2540       nodes will talk to the single node.
2541
2542     """
2543     online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2544     sel = cls._SshNodeSelector(group_uuid, all_nodes)
2545
2546     return (online_nodes,
2547             dict((name, sorted([i.next() for i in sel]))
2548                  for name in online_nodes))
2549
2550   def BuildHooksEnv(self):
2551     """Build hooks env.
2552
2553     Cluster-Verify hooks just ran in the post phase and their failure makes
2554     the output be logged in the verify output and the verification to fail.
2555
2556     """
2557     env = {
2558       "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2559       }
2560
2561     env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2562                for node in self.my_node_info.values())
2563
2564     return env
2565
2566   def BuildHooksNodes(self):
2567     """Build hooks nodes.
2568
2569     """
2570     return ([], list(self.my_node_info.keys()))
2571
2572   def Exec(self, feedback_fn):
2573     """Verify integrity of the node group, performing various test on nodes.
2574
2575     """
2576     # This method has too many local variables. pylint: disable=R0914
2577     feedback_fn("* Verifying group '%s'" % self.group_info.name)
2578
2579     if not self.my_node_uuids:
2580       # empty node group
2581       feedback_fn("* Empty node group, skipping verification")
2582       return True
2583
2584     self.bad = False
2585     verbose = self.op.verbose
2586     self._feedback_fn = feedback_fn
2587
2588     vg_name = self.cfg.GetVGName()
2589     drbd_helper = self.cfg.GetDRBDHelper()
2590     cluster = self.cfg.GetClusterInfo()
2591     hypervisors = cluster.enabled_hypervisors
2592     node_data_list = self.my_node_info.values()
2593
2594     i_non_redundant = [] # Non redundant instances
2595     i_non_a_balanced = [] # Non auto-balanced instances
2596     i_offline = 0 # Count of offline instances
2597     n_offline = 0 # Count of offline nodes
2598     n_drained = 0 # Count of nodes being drained
2599     node_vol_should = {}
2600
2601     # FIXME: verify OS list
2602
2603     # File verification
2604     filemap = ComputeAncillaryFiles(cluster, False)
2605
2606     # do local checksums
2607     master_node_uuid = self.master_node = self.cfg.GetMasterNode()
2608     master_ip = self.cfg.GetMasterIP()
2609
2610     feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_uuids))
2611
2612     user_scripts = []
2613     if self.cfg.GetUseExternalMipScript():
2614       user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2615
2616     node_verify_param = {
2617       constants.NV_FILELIST:
2618         map(vcluster.MakeVirtualPath,
2619             utils.UniqueSequence(filename
2620                                  for files in filemap
2621                                  for filename in files)),
2622       constants.NV_NODELIST:
2623         self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2624                                   self.all_node_info.values()),
2625       constants.NV_HYPERVISOR: hypervisors,
2626       constants.NV_HVPARAMS:
2627         _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2628       constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2629                                  for node in node_data_list
2630                                  if not node.offline],
2631       constants.NV_INSTANCELIST: hypervisors,
2632       constants.NV_VERSION: None,
2633       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2634       constants.NV_NODESETUP: None,
2635       constants.NV_TIME: None,
2636       constants.NV_MASTERIP: (self.cfg.GetMasterNodeName(), master_ip),
2637       constants.NV_OSLIST: None,
2638       constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2639       constants.NV_USERSCRIPTS: user_scripts,
2640       }
2641
2642     if vg_name is not None:
2643       node_verify_param[constants.NV_VGLIST] = None
2644       node_verify_param[constants.NV_LVLIST] = vg_name
2645       node_verify_param[constants.NV_PVLIST] = [vg_name]
2646
2647     if drbd_helper:
2648       node_verify_param[constants.NV_DRBDVERSION] = None
2649       node_verify_param[constants.NV_DRBDLIST] = None
2650       node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2651
2652     if constants.ENABLE_FILE_STORAGE or constants.ENABLE_SHARED_FILE_STORAGE:
2653       # Load file storage paths only from master node
2654       node_verify_param[constants.NV_FILE_STORAGE_PATHS] = \
2655         self.cfg.GetMasterNodeName()
2656
2657     # bridge checks
2658     # FIXME: this needs to be changed per node-group, not cluster-wide
2659     bridges = set()
2660     default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2661     if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2662       bridges.add(default_nicpp[constants.NIC_LINK])
2663     for instance in self.my_inst_info.values():
2664       for nic in instance.nics:
2665         full_nic = cluster.SimpleFillNIC(nic.nicparams)
2666         if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2667           bridges.add(full_nic[constants.NIC_LINK])
2668
2669     if bridges:
2670       node_verify_param[constants.NV_BRIDGES] = list(bridges)
2671
2672     # Build our expected cluster state
2673     node_image = dict((node.uuid, self.NodeImage(offline=node.offline,
2674                                                  uuid=node.uuid,
2675                                                  vm_capable=node.vm_capable))
2676                       for node in node_data_list)
2677
2678     # Gather OOB paths
2679     oob_paths = []
2680     for node in self.all_node_info.values():
2681       path = SupportsOob(self.cfg, node)
2682       if path and path not in oob_paths:
2683         oob_paths.append(path)
2684
2685     if oob_paths:
2686       node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2687
2688     for instance in self.my_inst_names:
2689       inst_config = self.my_inst_info[instance]
2690       if inst_config.admin_state == constants.ADMINST_OFFLINE:
2691         i_offline += 1
2692
2693       for nuuid in inst_config.all_nodes:
2694         if nuuid not in node_image:
2695           gnode = self.NodeImage(uuid=nuuid)
2696           gnode.ghost = (nuuid not in self.all_node_info)
2697           node_image[nuuid] = gnode
2698
2699       inst_config.MapLVsByNode(node_vol_should)
2700
2701       pnode = inst_config.primary_node
2702       node_image[pnode].pinst.append(instance)
2703
2704       for snode in inst_config.secondary_nodes:
2705         nimg = node_image[snode]
2706         nimg.sinst.append(instance)
2707         if pnode not in nimg.sbp:
2708           nimg.sbp[pnode] = []
2709         nimg.sbp[pnode].append(instance)
2710
2711     es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
2712                                                self.my_node_info.keys())
2713     # The value of exclusive_storage should be the same across the group, so if
2714     # it's True for at least a node, we act as if it were set for all the nodes
2715     self._exclusive_storage = compat.any(es_flags.values())
2716     if self._exclusive_storage:
2717       node_verify_param[constants.NV_EXCLUSIVEPVS] = True
2718
2719     # At this point, we have the in-memory data structures complete,
2720     # except for the runtime information, which we'll gather next
2721
2722     # Due to the way our RPC system works, exact response times cannot be
2723     # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2724     # time before and after executing the request, we can at least have a time
2725     # window.
2726     nvinfo_starttime = time.time()
2727     all_nvinfo = self.rpc.call_node_verify(self.my_node_uuids,
2728                                            node_verify_param,
2729                                            self.cfg.GetClusterName(),
2730                                            self.cfg.GetClusterInfo().hvparams)
2731     nvinfo_endtime = time.time()
2732
2733     if self.extra_lv_nodes and vg_name is not None:
2734       extra_lv_nvinfo = \
2735           self.rpc.call_node_verify(self.extra_lv_nodes,
2736                                     {constants.NV_LVLIST: vg_name},
2737                                     self.cfg.GetClusterName(),
2738                                     self.cfg.GetClusterInfo().hvparams)
2739     else:
2740       extra_lv_nvinfo = {}
2741
2742     all_drbd_map = self.cfg.ComputeDRBDMap()
2743
2744     feedback_fn("* Gathering disk information (%s nodes)" %
2745                 len(self.my_node_uuids))
2746     instdisk = self._CollectDiskInfo(self.my_node_info.keys(), node_image,
2747                                      self.my_inst_info)
2748
2749     feedback_fn("* Verifying configuration file consistency")
2750
2751     # If not all nodes are being checked, we need to make sure the master node
2752     # and a non-checked vm_capable node are in the list.
2753     absent_node_uuids = set(self.all_node_info).difference(self.my_node_info)
2754     if absent_node_uuids:
2755       vf_nvinfo = all_nvinfo.copy()
2756       vf_node_info = list(self.my_node_info.values())
2757       additional_node_uuids = []
2758       if master_node_uuid not in self.my_node_info:
2759         additional_node_uuids.append(master_node_uuid)
2760         vf_node_info.append(self.all_node_info[master_node_uuid])
2761       # Add the first vm_capable node we find which is not included,
2762       # excluding the master node (which we already have)
2763       for node_uuid in absent_node_uuids:
2764         nodeinfo = self.all_node_info[node_uuid]
2765         if (nodeinfo.vm_capable and not nodeinfo.offline and
2766             node_uuid != master_node_uuid):
2767           additional_node_uuids.append(node_uuid)
2768           vf_node_info.append(self.all_node_info[node_uuid])
2769           break
2770       key = constants.NV_FILELIST
2771       vf_nvinfo.update(self.rpc.call_node_verify(
2772          additional_node_uuids, {key: node_verify_param[key]},
2773          self.cfg.GetClusterName(), self.cfg.GetClusterInfo().hvparams))
2774     else:
2775       vf_nvinfo = all_nvinfo
2776       vf_node_info = self.my_node_info.values()
2777
2778     self._VerifyFiles(vf_node_info, master_node_uuid, vf_nvinfo, filemap)
2779
2780     feedback_fn("* Verifying node status")
2781
2782     refos_img = None
2783
2784     for node_i in node_data_list:
2785       nimg = node_image[node_i.uuid]
2786
2787       if node_i.offline:
2788         if verbose:
2789           feedback_fn("* Skipping offline node %s" % (node_i.name,))
2790         n_offline += 1
2791         continue
2792
2793       if node_i.uuid == master_node_uuid:
2794         ntype = "master"
2795       elif node_i.master_candidate:
2796         ntype = "master candidate"
2797       elif node_i.drained:
2798         ntype = "drained"
2799         n_drained += 1
2800       else:
2801         ntype = "regular"
2802       if verbose:
2803         feedback_fn("* Verifying node %s (%s)" % (node_i.name, ntype))
2804
2805       msg = all_nvinfo[node_i.uuid].fail_msg
2806       self._ErrorIf(msg, constants.CV_ENODERPC, node_i.name,
2807                     "while contacting node: %s", msg)
2808       if msg:
2809         nimg.rpc_fail = True
2810         continue
2811
2812       nresult = all_nvinfo[node_i.uuid].payload
2813
2814       nimg.call_ok = self._VerifyNode(node_i, nresult)
2815       self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2816       self._VerifyNodeNetwork(node_i, nresult)
2817       self._VerifyNodeUserScripts(node_i, nresult)
2818       self._VerifyOob(node_i, nresult)
2819       self._VerifyFileStoragePaths(node_i, nresult,
2820                                    node_i.uuid == master_node_uuid)
2821
2822       if nimg.vm_capable:
2823         self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
2824         self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
2825                              all_drbd_map)
2826
2827         self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2828         self._UpdateNodeInstances(node_i, nresult, nimg)
2829         self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2830         self._UpdateNodeOS(node_i, nresult, nimg)
2831
2832         if not nimg.os_fail:
2833           if refos_img is None:
2834             refos_img = nimg
2835           self._VerifyNodeOS(node_i, nimg, refos_img)
2836         self._VerifyNodeBridges(node_i, nresult, bridges)
2837
2838         # Check whether all running instancies are primary for the node. (This
2839         # can no longer be done from _VerifyInstance below, since some of the
2840         # wrong instances could be from other node groups.)
2841         non_primary_inst = set(nimg.instances).difference(nimg.pinst)
2842
2843         for inst in non_primary_inst:
2844           test = inst in self.all_inst_info
2845           self._ErrorIf(test, constants.CV_EINSTANCEWRONGNODE, inst,
2846                         "instance should not run on node %s", node_i.name)
2847           self._ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
2848                         "node is running unknown instance %s", inst)
2849
2850     self._VerifyGroupDRBDVersion(all_nvinfo)
2851     self._VerifyGroupLVM(node_image, vg_name)
2852
2853     for node_uuid, result in extra_lv_nvinfo.items():
2854       self._UpdateNodeVolumes(self.all_node_info[node_uuid], result.payload,
2855                               node_image[node_uuid], vg_name)
2856
2857     feedback_fn("* Verifying instance status")
2858     for instance in self.my_inst_names:
2859       if verbose:
2860         feedback_fn("* Verifying instance %s" % instance)
2861       inst_config = self.my_inst_info[instance]
2862       self._VerifyInstance(instance, inst_config, node_image,
2863                            instdisk[instance])
2864
2865       # If the instance is non-redundant we cannot survive losing its primary
2866       # node, so we are not N+1 compliant.
2867       if inst_config.disk_template not in constants.DTS_MIRRORED:
2868         i_non_redundant.append(instance)
2869
2870       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
2871         i_non_a_balanced.append(instance)
2872
2873     feedback_fn("* Verifying orphan volumes")
2874     reserved = utils.FieldSet(*cluster.reserved_lvs)
2875
2876     # We will get spurious "unknown volume" warnings if any node of this group
2877     # is secondary for an instance whose primary is in another group. To avoid
2878     # them, we find these instances and add their volumes to node_vol_should.
2879     for inst in self.all_inst_info.values():
2880       for secondary in inst.secondary_nodes:
2881         if (secondary in self.my_node_info
2882             and inst.name not in self.my_inst_info):
2883           inst.MapLVsByNode(node_vol_should)
2884           break
2885
2886     self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
2887
2888     if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
2889       feedback_fn("* Verifying N+1 Memory redundancy")
2890       self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
2891
2892     feedback_fn("* Other Notes")
2893     if i_non_redundant:
2894       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
2895                   % len(i_non_redundant))
2896
2897     if i_non_a_balanced:
2898       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
2899                   % len(i_non_a_balanced))
2900
2901     if i_offline:
2902       feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
2903
2904     if n_offline:
2905       feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
2906
2907     if n_drained:
2908       feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
2909
2910     return not self.bad
2911
2912   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
2913     """Analyze the post-hooks' result
2914
2915     This method analyses the hook result, handles it, and sends some
2916     nicely-formatted feedback back to the user.
2917
2918     @param phase: one of L{constants.HOOKS_PHASE_POST} or
2919         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
2920     @param hooks_results: the results of the multi-node hooks rpc call
2921     @param feedback_fn: function used send feedback back to the caller
2922     @param lu_result: previous Exec result
2923     @return: the new Exec result, based on the previous result
2924         and hook results
2925
2926     """
2927     # We only really run POST phase hooks, only for non-empty groups,
2928     # and are only interested in their results
2929     if not self.my_node_uuids:
2930       # empty node group
2931       pass
2932     elif phase == constants.HOOKS_PHASE_POST:
2933       # Used to change hooks' output to proper indentation
2934       feedback_fn("* Hooks Results")
2935       assert hooks_results, "invalid result from hooks"
2936
2937       for node_name in hooks_results:
2938         res = hooks_results[node_name]
2939         msg = res.fail_msg
2940         test = msg and not res.offline
2941         self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
2942                       "Communication failure in hooks execution: %s", msg)
2943         if res.offline or msg:
2944           # No need to investigate payload if node is offline or gave
2945           # an error.
2946           continue
2947         for script, hkr, output in res.payload:
2948           test = hkr == constants.HKR_FAIL
2949           self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
2950                         "Script %s failed, output:", script)
2951           if test:
2952             output = self._HOOKS_INDENT_RE.sub("      ", output)
2953             feedback_fn("%s" % output)
2954             lu_result = False
2955
2956     return lu_result
2957
2958
2959 class LUClusterVerifyDisks(NoHooksLU):
2960   """Verifies the cluster disks status.
2961
2962   """
2963   REQ_BGL = False
2964
2965   def ExpandNames(self):
2966     self.share_locks = ShareAll()
2967     self.needed_locks = {
2968       locking.LEVEL_NODEGROUP: locking.ALL_SET,
2969       }
2970
2971   def Exec(self, feedback_fn):
2972     group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
2973
2974     # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
2975     return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
2976                            for group in group_names])