Use RpcResult.Warn where appropriate the cmdlib
[ganeti-local] / lib / cmdlib / cluster.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Logical units dealing with the cluster."""
23
24 import OpenSSL
25
26 import copy
27 import itertools
28 import logging
29 import operator
30 import os
31 import re
32 import time
33
34 from ganeti import compat
35 from ganeti import constants
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import masterd
40 from ganeti import netutils
41 from ganeti import objects
42 from ganeti import opcodes
43 from ganeti import pathutils
44 from ganeti import query
45 from ganeti import rpc
46 from ganeti import runtime
47 from ganeti import ssh
48 from ganeti import uidpool
49 from ganeti import utils
50 from ganeti import vcluster
51
52 from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
53   ResultWithJobs
54 from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55   ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56   GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57   GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58   CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59   ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob
60
61 import ganeti.masterd.instance
62
63
64 class LUClusterActivateMasterIp(NoHooksLU):
65   """Activate the master IP on the master node.
66
67   """
68   def Exec(self, feedback_fn):
69     """Activate the master IP.
70
71     """
72     master_params = self.cfg.GetMasterNetworkParameters()
73     ems = self.cfg.GetUseExternalMipScript()
74     result = self.rpc.call_node_activate_master_ip(master_params.name,
75                                                    master_params, ems)
76     result.Raise("Could not activate the master IP")
77
78
79 class LUClusterDeactivateMasterIp(NoHooksLU):
80   """Deactivate the master IP on the master node.
81
82   """
83   def Exec(self, feedback_fn):
84     """Deactivate the master IP.
85
86     """
87     master_params = self.cfg.GetMasterNetworkParameters()
88     ems = self.cfg.GetUseExternalMipScript()
89     result = self.rpc.call_node_deactivate_master_ip(master_params.name,
90                                                      master_params, ems)
91     result.Raise("Could not deactivate the master IP")
92
93
94 class LUClusterConfigQuery(NoHooksLU):
95   """Return configuration values.
96
97   """
98   REQ_BGL = False
99
100   def CheckArguments(self):
101     self.cq = ClusterQuery(None, self.op.output_fields, False)
102
103   def ExpandNames(self):
104     self.cq.ExpandNames(self)
105
106   def DeclareLocks(self, level):
107     self.cq.DeclareLocks(self, level)
108
109   def Exec(self, feedback_fn):
110     result = self.cq.OldStyleQuery(self)
111
112     assert len(result) == 1
113
114     return result[0]
115
116
117 class LUClusterDestroy(LogicalUnit):
118   """Logical unit for destroying the cluster.
119
120   """
121   HPATH = "cluster-destroy"
122   HTYPE = constants.HTYPE_CLUSTER
123
124   def BuildHooksEnv(self):
125     """Build hooks env.
126
127     """
128     return {
129       "OP_TARGET": self.cfg.GetClusterName(),
130       }
131
132   def BuildHooksNodes(self):
133     """Build hooks nodes.
134
135     """
136     return ([], [])
137
138   def CheckPrereq(self):
139     """Check prerequisites.
140
141     This checks whether the cluster is empty.
142
143     Any errors are signaled by raising errors.OpPrereqError.
144
145     """
146     master = self.cfg.GetMasterNode()
147
148     nodelist = self.cfg.GetNodeList()
149     if len(nodelist) != 1 or nodelist[0] != master:
150       raise errors.OpPrereqError("There are still %d node(s) in"
151                                  " this cluster." % (len(nodelist) - 1),
152                                  errors.ECODE_INVAL)
153     instancelist = self.cfg.GetInstanceList()
154     if instancelist:
155       raise errors.OpPrereqError("There are still %d instance(s) in"
156                                  " this cluster." % len(instancelist),
157                                  errors.ECODE_INVAL)
158
159   def Exec(self, feedback_fn):
160     """Destroys the cluster.
161
162     """
163     master_params = self.cfg.GetMasterNetworkParameters()
164
165     # Run post hooks on master node before it's removed
166     RunPostHook(self, master_params.name)
167
168     ems = self.cfg.GetUseExternalMipScript()
169     result = self.rpc.call_node_deactivate_master_ip(master_params.name,
170                                                      master_params, ems)
171     result.Warn("Error disabling the master IP address", self.LogWarning)
172     return master_params.name
173
174
175 class LUClusterPostInit(LogicalUnit):
176   """Logical unit for running hooks after cluster initialization.
177
178   """
179   HPATH = "cluster-init"
180   HTYPE = constants.HTYPE_CLUSTER
181
182   def BuildHooksEnv(self):
183     """Build hooks env.
184
185     """
186     return {
187       "OP_TARGET": self.cfg.GetClusterName(),
188       }
189
190   def BuildHooksNodes(self):
191     """Build hooks nodes.
192
193     """
194     return ([], [self.cfg.GetMasterNode()])
195
196   def Exec(self, feedback_fn):
197     """Nothing to do.
198
199     """
200     return True
201
202
203 class ClusterQuery(QueryBase):
204   FIELDS = query.CLUSTER_FIELDS
205
206   #: Do not sort (there is only one item)
207   SORT_FIELD = None
208
209   def ExpandNames(self, lu):
210     lu.needed_locks = {}
211
212     # The following variables interact with _QueryBase._GetNames
213     self.wanted = locking.ALL_SET
214     self.do_locking = self.use_locking
215
216     if self.do_locking:
217       raise errors.OpPrereqError("Can not use locking for cluster queries",
218                                  errors.ECODE_INVAL)
219
220   def DeclareLocks(self, lu, level):
221     pass
222
223   def _GetQueryData(self, lu):
224     """Computes the list of nodes and their attributes.
225
226     """
227     # Locking is not used
228     assert not (compat.any(lu.glm.is_owned(level)
229                            for level in locking.LEVELS
230                            if level != locking.LEVEL_CLUSTER) or
231                 self.do_locking or self.use_locking)
232
233     if query.CQ_CONFIG in self.requested_data:
234       cluster = lu.cfg.GetClusterInfo()
235     else:
236       cluster = NotImplemented
237
238     if query.CQ_QUEUE_DRAINED in self.requested_data:
239       drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
240     else:
241       drain_flag = NotImplemented
242
243     if query.CQ_WATCHER_PAUSE in self.requested_data:
244       master_name = lu.cfg.GetMasterNode()
245
246       result = lu.rpc.call_get_watcher_pause(master_name)
247       result.Raise("Can't retrieve watcher pause from master node '%s'" %
248                    master_name)
249
250       watcher_pause = result.payload
251     else:
252       watcher_pause = NotImplemented
253
254     return query.ClusterQueryData(cluster, drain_flag, watcher_pause)
255
256
257 class LUClusterQuery(NoHooksLU):
258   """Query cluster configuration.
259
260   """
261   REQ_BGL = False
262
263   def ExpandNames(self):
264     self.needed_locks = {}
265
266   def Exec(self, feedback_fn):
267     """Return cluster config.
268
269     """
270     cluster = self.cfg.GetClusterInfo()
271     os_hvp = {}
272
273     # Filter just for enabled hypervisors
274     for os_name, hv_dict in cluster.os_hvp.items():
275       os_hvp[os_name] = {}
276       for hv_name, hv_params in hv_dict.items():
277         if hv_name in cluster.enabled_hypervisors:
278           os_hvp[os_name][hv_name] = hv_params
279
280     # Convert ip_family to ip_version
281     primary_ip_version = constants.IP4_VERSION
282     if cluster.primary_ip_family == netutils.IP6Address.family:
283       primary_ip_version = constants.IP6_VERSION
284
285     result = {
286       "software_version": constants.RELEASE_VERSION,
287       "protocol_version": constants.PROTOCOL_VERSION,
288       "config_version": constants.CONFIG_VERSION,
289       "os_api_version": max(constants.OS_API_VERSIONS),
290       "export_version": constants.EXPORT_VERSION,
291       "architecture": runtime.GetArchInfo(),
292       "name": cluster.cluster_name,
293       "master": cluster.master_node,
294       "default_hypervisor": cluster.primary_hypervisor,
295       "enabled_hypervisors": cluster.enabled_hypervisors,
296       "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
297                         for hypervisor_name in cluster.enabled_hypervisors]),
298       "os_hvp": os_hvp,
299       "beparams": cluster.beparams,
300       "osparams": cluster.osparams,
301       "ipolicy": cluster.ipolicy,
302       "nicparams": cluster.nicparams,
303       "ndparams": cluster.ndparams,
304       "diskparams": cluster.diskparams,
305       "candidate_pool_size": cluster.candidate_pool_size,
306       "master_netdev": cluster.master_netdev,
307       "master_netmask": cluster.master_netmask,
308       "use_external_mip_script": cluster.use_external_mip_script,
309       "volume_group_name": cluster.volume_group_name,
310       "drbd_usermode_helper": cluster.drbd_usermode_helper,
311       "file_storage_dir": cluster.file_storage_dir,
312       "shared_file_storage_dir": cluster.shared_file_storage_dir,
313       "maintain_node_health": cluster.maintain_node_health,
314       "ctime": cluster.ctime,
315       "mtime": cluster.mtime,
316       "uuid": cluster.uuid,
317       "tags": list(cluster.GetTags()),
318       "uid_pool": cluster.uid_pool,
319       "default_iallocator": cluster.default_iallocator,
320       "reserved_lvs": cluster.reserved_lvs,
321       "primary_ip_version": primary_ip_version,
322       "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
323       "hidden_os": cluster.hidden_os,
324       "blacklisted_os": cluster.blacklisted_os,
325       }
326
327     return result
328
329
330 class LUClusterRedistConf(NoHooksLU):
331   """Force the redistribution of cluster configuration.
332
333   This is a very simple LU.
334
335   """
336   REQ_BGL = False
337
338   def ExpandNames(self):
339     self.needed_locks = {
340       locking.LEVEL_NODE: locking.ALL_SET,
341       locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
342     }
343     self.share_locks = ShareAll()
344
345   def Exec(self, feedback_fn):
346     """Redistribute the configuration.
347
348     """
349     self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
350     RedistributeAncillaryFiles(self)
351
352
353 class LUClusterRename(LogicalUnit):
354   """Rename the cluster.
355
356   """
357   HPATH = "cluster-rename"
358   HTYPE = constants.HTYPE_CLUSTER
359
360   def BuildHooksEnv(self):
361     """Build hooks env.
362
363     """
364     return {
365       "OP_TARGET": self.cfg.GetClusterName(),
366       "NEW_NAME": self.op.name,
367       }
368
369   def BuildHooksNodes(self):
370     """Build hooks nodes.
371
372     """
373     return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
374
375   def CheckPrereq(self):
376     """Verify that the passed name is a valid one.
377
378     """
379     hostname = netutils.GetHostname(name=self.op.name,
380                                     family=self.cfg.GetPrimaryIPFamily())
381
382     new_name = hostname.name
383     self.ip = new_ip = hostname.ip
384     old_name = self.cfg.GetClusterName()
385     old_ip = self.cfg.GetMasterIP()
386     if new_name == old_name and new_ip == old_ip:
387       raise errors.OpPrereqError("Neither the name nor the IP address of the"
388                                  " cluster has changed",
389                                  errors.ECODE_INVAL)
390     if new_ip != old_ip:
391       if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
392         raise errors.OpPrereqError("The given cluster IP address (%s) is"
393                                    " reachable on the network" %
394                                    new_ip, errors.ECODE_NOTUNIQUE)
395
396     self.op.name = new_name
397
398   def Exec(self, feedback_fn):
399     """Rename the cluster.
400
401     """
402     clustername = self.op.name
403     new_ip = self.ip
404
405     # shutdown the master IP
406     master_params = self.cfg.GetMasterNetworkParameters()
407     ems = self.cfg.GetUseExternalMipScript()
408     result = self.rpc.call_node_deactivate_master_ip(master_params.name,
409                                                      master_params, ems)
410     result.Raise("Could not disable the master role")
411
412     try:
413       cluster = self.cfg.GetClusterInfo()
414       cluster.cluster_name = clustername
415       cluster.master_ip = new_ip
416       self.cfg.Update(cluster, feedback_fn)
417
418       # update the known hosts file
419       ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
420       node_list = self.cfg.GetOnlineNodeList()
421       try:
422         node_list.remove(master_params.name)
423       except ValueError:
424         pass
425       UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
426     finally:
427       master_params.ip = new_ip
428       result = self.rpc.call_node_activate_master_ip(master_params.name,
429                                                      master_params, ems)
430       result.Warn("Could not re-enable the master role on the master,"
431                   " please restart manually", self.LogWarning)
432
433     return clustername
434
435
436 class LUClusterRepairDiskSizes(NoHooksLU):
437   """Verifies the cluster disks sizes.
438
439   """
440   REQ_BGL = False
441
442   def ExpandNames(self):
443     if self.op.instances:
444       self.wanted_names = GetWantedInstances(self, self.op.instances)
445       # Not getting the node allocation lock as only a specific set of
446       # instances (and their nodes) is going to be acquired
447       self.needed_locks = {
448         locking.LEVEL_NODE_RES: [],
449         locking.LEVEL_INSTANCE: self.wanted_names,
450         }
451       self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
452     else:
453       self.wanted_names = None
454       self.needed_locks = {
455         locking.LEVEL_NODE_RES: locking.ALL_SET,
456         locking.LEVEL_INSTANCE: locking.ALL_SET,
457
458         # This opcode is acquires the node locks for all instances
459         locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
460         }
461
462     self.share_locks = {
463       locking.LEVEL_NODE_RES: 1,
464       locking.LEVEL_INSTANCE: 0,
465       locking.LEVEL_NODE_ALLOC: 1,
466       }
467
468   def DeclareLocks(self, level):
469     if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
470       self._LockInstancesNodes(primary_only=True, level=level)
471
472   def CheckPrereq(self):
473     """Check prerequisites.
474
475     This only checks the optional instance list against the existing names.
476
477     """
478     if self.wanted_names is None:
479       self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
480
481     self.wanted_instances = \
482         map(compat.snd, self.cfg.GetMultiInstanceInfo(self.wanted_names))
483
484   def _EnsureChildSizes(self, disk):
485     """Ensure children of the disk have the needed disk size.
486
487     This is valid mainly for DRBD8 and fixes an issue where the
488     children have smaller disk size.
489
490     @param disk: an L{ganeti.objects.Disk} object
491
492     """
493     if disk.dev_type == constants.LD_DRBD8:
494       assert disk.children, "Empty children for DRBD8?"
495       fchild = disk.children[0]
496       mismatch = fchild.size < disk.size
497       if mismatch:
498         self.LogInfo("Child disk has size %d, parent %d, fixing",
499                      fchild.size, disk.size)
500         fchild.size = disk.size
501
502       # and we recurse on this child only, not on the metadev
503       return self._EnsureChildSizes(fchild) or mismatch
504     else:
505       return False
506
507   def Exec(self, feedback_fn):
508     """Verify the size of cluster disks.
509
510     """
511     # TODO: check child disks too
512     # TODO: check differences in size between primary/secondary nodes
513     per_node_disks = {}
514     for instance in self.wanted_instances:
515       pnode = instance.primary_node
516       if pnode not in per_node_disks:
517         per_node_disks[pnode] = []
518       for idx, disk in enumerate(instance.disks):
519         per_node_disks[pnode].append((instance, idx, disk))
520
521     assert not (frozenset(per_node_disks.keys()) -
522                 self.owned_locks(locking.LEVEL_NODE_RES)), \
523       "Not owning correct locks"
524     assert not self.owned_locks(locking.LEVEL_NODE)
525
526     es_flags = rpc.GetExclusiveStorageForNodeNames(self.cfg,
527                                                    per_node_disks.keys())
528
529     changed = []
530     for node, dskl in per_node_disks.items():
531       newl = [v[2].Copy() for v in dskl]
532       for dsk in newl:
533         self.cfg.SetDiskID(dsk, node)
534       result = self.rpc.call_blockdev_getdimensions(node, newl)
535       if result.fail_msg:
536         self.LogWarning("Failure in blockdev_getdimensions call to node"
537                         " %s, ignoring", node)
538         continue
539       if len(result.payload) != len(dskl):
540         logging.warning("Invalid result from node %s: len(dksl)=%d,"
541                         " result.payload=%s", node, len(dskl), result.payload)
542         self.LogWarning("Invalid result from node %s, ignoring node results",
543                         node)
544         continue
545       for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
546         if dimensions is None:
547           self.LogWarning("Disk %d of instance %s did not return size"
548                           " information, ignoring", idx, instance.name)
549           continue
550         if not isinstance(dimensions, (tuple, list)):
551           self.LogWarning("Disk %d of instance %s did not return valid"
552                           " dimension information, ignoring", idx,
553                           instance.name)
554           continue
555         (size, spindles) = dimensions
556         if not isinstance(size, (int, long)):
557           self.LogWarning("Disk %d of instance %s did not return valid"
558                           " size information, ignoring", idx, instance.name)
559           continue
560         size = size >> 20
561         if size != disk.size:
562           self.LogInfo("Disk %d of instance %s has mismatched size,"
563                        " correcting: recorded %d, actual %d", idx,
564                        instance.name, disk.size, size)
565           disk.size = size
566           self.cfg.Update(instance, feedback_fn)
567           changed.append((instance.name, idx, "size", size))
568         if es_flags[node]:
569           if spindles is None:
570             self.LogWarning("Disk %d of instance %s did not return valid"
571                             " spindles information, ignoring", idx,
572                             instance.name)
573           elif disk.spindles is None or disk.spindles != spindles:
574             self.LogInfo("Disk %d of instance %s has mismatched spindles,"
575                          " correcting: recorded %s, actual %s",
576                          idx, instance.name, disk.spindles, spindles)
577             disk.spindles = spindles
578             self.cfg.Update(instance, feedback_fn)
579             changed.append((instance.name, idx, "spindles", disk.spindles))
580         if self._EnsureChildSizes(disk):
581           self.cfg.Update(instance, feedback_fn)
582           changed.append((instance.name, idx, "size", disk.size))
583     return changed
584
585
586 def _ValidateNetmask(cfg, netmask):
587   """Checks if a netmask is valid.
588
589   @type cfg: L{config.ConfigWriter}
590   @param cfg: The cluster configuration
591   @type netmask: int
592   @param netmask: the netmask to be verified
593   @raise errors.OpPrereqError: if the validation fails
594
595   """
596   ip_family = cfg.GetPrimaryIPFamily()
597   try:
598     ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
599   except errors.ProgrammerError:
600     raise errors.OpPrereqError("Invalid primary ip family: %s." %
601                                ip_family, errors.ECODE_INVAL)
602   if not ipcls.ValidateNetmask(netmask):
603     raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
604                                (netmask), errors.ECODE_INVAL)
605
606
607 class LUClusterSetParams(LogicalUnit):
608   """Change the parameters of the cluster.
609
610   """
611   HPATH = "cluster-modify"
612   HTYPE = constants.HTYPE_CLUSTER
613   REQ_BGL = False
614
615   def CheckArguments(self):
616     """Check parameters
617
618     """
619     if self.op.uid_pool:
620       uidpool.CheckUidPool(self.op.uid_pool)
621
622     if self.op.add_uids:
623       uidpool.CheckUidPool(self.op.add_uids)
624
625     if self.op.remove_uids:
626       uidpool.CheckUidPool(self.op.remove_uids)
627
628     if self.op.master_netmask is not None:
629       _ValidateNetmask(self.cfg, self.op.master_netmask)
630
631     if self.op.diskparams:
632       for dt_params in self.op.diskparams.values():
633         utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
634       try:
635         utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
636       except errors.OpPrereqError, err:
637         raise errors.OpPrereqError("While verify diskparams options: %s" % err,
638                                    errors.ECODE_INVAL)
639
640   def ExpandNames(self):
641     # FIXME: in the future maybe other cluster params won't require checking on
642     # all nodes to be modified.
643     # FIXME: This opcode changes cluster-wide settings. Is acquiring all
644     # resource locks the right thing, shouldn't it be the BGL instead?
645     self.needed_locks = {
646       locking.LEVEL_NODE: locking.ALL_SET,
647       locking.LEVEL_INSTANCE: locking.ALL_SET,
648       locking.LEVEL_NODEGROUP: locking.ALL_SET,
649       locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
650     }
651     self.share_locks = ShareAll()
652
653   def BuildHooksEnv(self):
654     """Build hooks env.
655
656     """
657     return {
658       "OP_TARGET": self.cfg.GetClusterName(),
659       "NEW_VG_NAME": self.op.vg_name,
660       }
661
662   def BuildHooksNodes(self):
663     """Build hooks nodes.
664
665     """
666     mn = self.cfg.GetMasterNode()
667     return ([mn], [mn])
668
669   def _CheckVgName(self, node_list, enabled_disk_templates,
670                    new_enabled_disk_templates):
671     """Check the consistency of the vg name on all nodes and in case it gets
672        unset whether there are instances still using it.
673
674     """
675     if self.op.vg_name is not None and not self.op.vg_name:
676       if self.cfg.HasAnyDiskOfType(constants.LD_LV):
677         raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
678                                    " instances exist", errors.ECODE_INVAL)
679
680     if (self.op.vg_name is not None and
681         utils.IsLvmEnabled(enabled_disk_templates)) or \
682            (self.cfg.GetVGName() is not None and
683             utils.LvmGetsEnabled(enabled_disk_templates,
684                                  new_enabled_disk_templates)):
685       self._CheckVgNameOnNodes(node_list)
686
687   def _CheckVgNameOnNodes(self, node_list):
688     """Check the status of the volume group on each node.
689
690     """
691     vglist = self.rpc.call_vg_list(node_list)
692     for node in node_list:
693       msg = vglist[node].fail_msg
694       if msg:
695         # ignoring down node
696         self.LogWarning("Error while gathering data on node %s"
697                         " (ignoring node): %s", node, msg)
698         continue
699       vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
700                                             self.op.vg_name,
701                                             constants.MIN_VG_SIZE)
702       if vgstatus:
703         raise errors.OpPrereqError("Error on node '%s': %s" %
704                                    (node, vgstatus), errors.ECODE_ENVIRON)
705
706   def _GetEnabledDiskTemplates(self, cluster):
707     """Determines the enabled disk templates and the subset of disk templates
708        that are newly enabled by this operation.
709
710     """
711     enabled_disk_templates = None
712     new_enabled_disk_templates = []
713     if self.op.enabled_disk_templates:
714       enabled_disk_templates = self.op.enabled_disk_templates
715       new_enabled_disk_templates = \
716         list(set(enabled_disk_templates)
717              - set(cluster.enabled_disk_templates))
718     else:
719       enabled_disk_templates = cluster.enabled_disk_templates
720     return (enabled_disk_templates, new_enabled_disk_templates)
721
722   def CheckPrereq(self):
723     """Check prerequisites.
724
725     This checks whether the given params don't conflict and
726     if the given volume group is valid.
727
728     """
729     if self.op.drbd_helper is not None and not self.op.drbd_helper:
730       if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
731         raise errors.OpPrereqError("Cannot disable drbd helper while"
732                                    " drbd-based instances exist",
733                                    errors.ECODE_INVAL)
734
735     node_list = self.owned_locks(locking.LEVEL_NODE)
736     self.cluster = cluster = self.cfg.GetClusterInfo()
737
738     vm_capable_nodes = [node.name
739                         for node in self.cfg.GetAllNodesInfo().values()
740                         if node.name in node_list and node.vm_capable]
741
742     (enabled_disk_templates, new_enabled_disk_templates) = \
743       self._GetEnabledDiskTemplates(cluster)
744
745     self._CheckVgName(vm_capable_nodes, enabled_disk_templates,
746                       new_enabled_disk_templates)
747
748     if self.op.drbd_helper:
749       # checks given drbd helper on all nodes
750       helpers = self.rpc.call_drbd_helper(node_list)
751       for (node, ninfo) in self.cfg.GetMultiNodeInfo(node_list):
752         if ninfo.offline:
753           self.LogInfo("Not checking drbd helper on offline node %s", node)
754           continue
755         msg = helpers[node].fail_msg
756         if msg:
757           raise errors.OpPrereqError("Error checking drbd helper on node"
758                                      " '%s': %s" % (node, msg),
759                                      errors.ECODE_ENVIRON)
760         node_helper = helpers[node].payload
761         if node_helper != self.op.drbd_helper:
762           raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
763                                      (node, node_helper), errors.ECODE_ENVIRON)
764
765     # validate params changes
766     if self.op.beparams:
767       objects.UpgradeBeParams(self.op.beparams)
768       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
769       self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
770
771     if self.op.ndparams:
772       utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
773       self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
774
775       # TODO: we need a more general way to handle resetting
776       # cluster-level parameters to default values
777       if self.new_ndparams["oob_program"] == "":
778         self.new_ndparams["oob_program"] = \
779             constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
780
781     if self.op.hv_state:
782       new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
783                                            self.cluster.hv_state_static)
784       self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
785                                for hv, values in new_hv_state.items())
786
787     if self.op.disk_state:
788       new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
789                                                self.cluster.disk_state_static)
790       self.new_disk_state = \
791         dict((storage, dict((name, cluster.SimpleFillDiskState(values))
792                             for name, values in svalues.items()))
793              for storage, svalues in new_disk_state.items())
794
795     if self.op.ipolicy:
796       self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
797                                            group_policy=False)
798
799       all_instances = self.cfg.GetAllInstancesInfo().values()
800       violations = set()
801       for group in self.cfg.GetAllNodeGroupsInfo().values():
802         instances = frozenset([inst for inst in all_instances
803                                if compat.any(node in group.members
804                                              for node in inst.all_nodes)])
805         new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
806         ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
807         new = ComputeNewInstanceViolations(ipol,
808                                            new_ipolicy, instances, self.cfg)
809         if new:
810           violations.update(new)
811
812       if violations:
813         self.LogWarning("After the ipolicy change the following instances"
814                         " violate them: %s",
815                         utils.CommaJoin(utils.NiceSort(violations)))
816
817     if self.op.nicparams:
818       utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
819       self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
820       objects.NIC.CheckParameterSyntax(self.new_nicparams)
821       nic_errors = []
822
823       # check all instances for consistency
824       for instance in self.cfg.GetAllInstancesInfo().values():
825         for nic_idx, nic in enumerate(instance.nics):
826           params_copy = copy.deepcopy(nic.nicparams)
827           params_filled = objects.FillDict(self.new_nicparams, params_copy)
828
829           # check parameter syntax
830           try:
831             objects.NIC.CheckParameterSyntax(params_filled)
832           except errors.ConfigurationError, err:
833             nic_errors.append("Instance %s, nic/%d: %s" %
834                               (instance.name, nic_idx, err))
835
836           # if we're moving instances to routed, check that they have an ip
837           target_mode = params_filled[constants.NIC_MODE]
838           if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
839             nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
840                               " address" % (instance.name, nic_idx))
841       if nic_errors:
842         raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
843                                    "\n".join(nic_errors), errors.ECODE_INVAL)
844
845     # hypervisor list/parameters
846     self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
847     if self.op.hvparams:
848       for hv_name, hv_dict in self.op.hvparams.items():
849         if hv_name not in self.new_hvparams:
850           self.new_hvparams[hv_name] = hv_dict
851         else:
852           self.new_hvparams[hv_name].update(hv_dict)
853
854     # disk template parameters
855     self.new_diskparams = objects.FillDict(cluster.diskparams, {})
856     if self.op.diskparams:
857       for dt_name, dt_params in self.op.diskparams.items():
858         if dt_name not in self.op.diskparams:
859           self.new_diskparams[dt_name] = dt_params
860         else:
861           self.new_diskparams[dt_name].update(dt_params)
862
863     # os hypervisor parameters
864     self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
865     if self.op.os_hvp:
866       for os_name, hvs in self.op.os_hvp.items():
867         if os_name not in self.new_os_hvp:
868           self.new_os_hvp[os_name] = hvs
869         else:
870           for hv_name, hv_dict in hvs.items():
871             if hv_dict is None:
872               # Delete if it exists
873               self.new_os_hvp[os_name].pop(hv_name, None)
874             elif hv_name not in self.new_os_hvp[os_name]:
875               self.new_os_hvp[os_name][hv_name] = hv_dict
876             else:
877               self.new_os_hvp[os_name][hv_name].update(hv_dict)
878
879     # os parameters
880     self.new_osp = objects.FillDict(cluster.osparams, {})
881     if self.op.osparams:
882       for os_name, osp in self.op.osparams.items():
883         if os_name not in self.new_osp:
884           self.new_osp[os_name] = {}
885
886         self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
887                                                  use_none=True)
888
889         if not self.new_osp[os_name]:
890           # we removed all parameters
891           del self.new_osp[os_name]
892         else:
893           # check the parameter validity (remote check)
894           CheckOSParams(self, False, [self.cfg.GetMasterNode()],
895                         os_name, self.new_osp[os_name])
896
897     # changes to the hypervisor list
898     if self.op.enabled_hypervisors is not None:
899       self.hv_list = self.op.enabled_hypervisors
900       for hv in self.hv_list:
901         # if the hypervisor doesn't already exist in the cluster
902         # hvparams, we initialize it to empty, and then (in both
903         # cases) we make sure to fill the defaults, as we might not
904         # have a complete defaults list if the hypervisor wasn't
905         # enabled before
906         if hv not in new_hvp:
907           new_hvp[hv] = {}
908         new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
909         utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
910     else:
911       self.hv_list = cluster.enabled_hypervisors
912
913     if self.op.hvparams or self.op.enabled_hypervisors is not None:
914       # either the enabled list has changed, or the parameters have, validate
915       for hv_name, hv_params in self.new_hvparams.items():
916         if ((self.op.hvparams and hv_name in self.op.hvparams) or
917             (self.op.enabled_hypervisors and
918              hv_name in self.op.enabled_hypervisors)):
919           # either this is a new hypervisor, or its parameters have changed
920           hv_class = hypervisor.GetHypervisorClass(hv_name)
921           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
922           hv_class.CheckParameterSyntax(hv_params)
923           CheckHVParams(self, node_list, hv_name, hv_params)
924
925     self._CheckDiskTemplateConsistency()
926
927     if self.op.os_hvp:
928       # no need to check any newly-enabled hypervisors, since the
929       # defaults have already been checked in the above code-block
930       for os_name, os_hvp in self.new_os_hvp.items():
931         for hv_name, hv_params in os_hvp.items():
932           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
933           # we need to fill in the new os_hvp on top of the actual hv_p
934           cluster_defaults = self.new_hvparams.get(hv_name, {})
935           new_osp = objects.FillDict(cluster_defaults, hv_params)
936           hv_class = hypervisor.GetHypervisorClass(hv_name)
937           hv_class.CheckParameterSyntax(new_osp)
938           CheckHVParams(self, node_list, hv_name, new_osp)
939
940     if self.op.default_iallocator:
941       alloc_script = utils.FindFile(self.op.default_iallocator,
942                                     constants.IALLOCATOR_SEARCH_PATH,
943                                     os.path.isfile)
944       if alloc_script is None:
945         raise errors.OpPrereqError("Invalid default iallocator script '%s'"
946                                    " specified" % self.op.default_iallocator,
947                                    errors.ECODE_INVAL)
948
949   def _CheckDiskTemplateConsistency(self):
950     """Check whether the disk templates that are going to be disabled
951        are still in use by some instances.
952
953     """
954     if self.op.enabled_disk_templates:
955       cluster = self.cfg.GetClusterInfo()
956       instances = self.cfg.GetAllInstancesInfo()
957
958       disk_templates_to_remove = set(cluster.enabled_disk_templates) \
959         - set(self.op.enabled_disk_templates)
960       for instance in instances.itervalues():
961         if instance.disk_template in disk_templates_to_remove:
962           raise errors.OpPrereqError("Cannot disable disk template '%s',"
963                                      " because instance '%s' is using it." %
964                                      (instance.disk_template, instance.name))
965
966   def _SetVgName(self, feedback_fn):
967     """Determines and sets the new volume group name.
968
969     """
970     if self.op.vg_name is not None:
971       if self.op.vg_name and not \
972            utils.IsLvmEnabled(self.cluster.enabled_disk_templates):
973         feedback_fn("Note that you specified a volume group, but did not"
974                     " enable any lvm disk template.")
975       new_volume = self.op.vg_name
976       if not new_volume:
977         if utils.IsLvmEnabled(self.cluster.enabled_disk_templates):
978           raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
979                                      " disk templates are enabled.")
980         new_volume = None
981       if new_volume != self.cfg.GetVGName():
982         self.cfg.SetVGName(new_volume)
983       else:
984         feedback_fn("Cluster LVM configuration already in desired"
985                     " state, not changing")
986     else:
987       if utils.IsLvmEnabled(self.cluster.enabled_disk_templates) and \
988           not self.cfg.GetVGName():
989         raise errors.OpPrereqError("Please specify a volume group when"
990                                    " enabling lvm-based disk-templates.")
991
992   def Exec(self, feedback_fn):
993     """Change the parameters of the cluster.
994
995     """
996     if self.op.enabled_disk_templates:
997       self.cluster.enabled_disk_templates = \
998         list(set(self.op.enabled_disk_templates))
999
1000     self._SetVgName(feedback_fn)
1001
1002     if self.op.drbd_helper is not None:
1003       if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
1004         feedback_fn("Note that you specified a drbd user helper, but did"
1005                     " enabled the drbd disk template.")
1006       new_helper = self.op.drbd_helper
1007       if not new_helper:
1008         new_helper = None
1009       if new_helper != self.cfg.GetDRBDHelper():
1010         self.cfg.SetDRBDHelper(new_helper)
1011       else:
1012         feedback_fn("Cluster DRBD helper already in desired state,"
1013                     " not changing")
1014     if self.op.hvparams:
1015       self.cluster.hvparams = self.new_hvparams
1016     if self.op.os_hvp:
1017       self.cluster.os_hvp = self.new_os_hvp
1018     if self.op.enabled_hypervisors is not None:
1019       self.cluster.hvparams = self.new_hvparams
1020       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1021     if self.op.beparams:
1022       self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1023     if self.op.nicparams:
1024       self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1025     if self.op.ipolicy:
1026       self.cluster.ipolicy = self.new_ipolicy
1027     if self.op.osparams:
1028       self.cluster.osparams = self.new_osp
1029     if self.op.ndparams:
1030       self.cluster.ndparams = self.new_ndparams
1031     if self.op.diskparams:
1032       self.cluster.diskparams = self.new_diskparams
1033     if self.op.hv_state:
1034       self.cluster.hv_state_static = self.new_hv_state
1035     if self.op.disk_state:
1036       self.cluster.disk_state_static = self.new_disk_state
1037
1038     if self.op.candidate_pool_size is not None:
1039       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1040       # we need to update the pool size here, otherwise the save will fail
1041       AdjustCandidatePool(self, [])
1042
1043     if self.op.maintain_node_health is not None:
1044       if self.op.maintain_node_health and not constants.ENABLE_CONFD:
1045         feedback_fn("Note: CONFD was disabled at build time, node health"
1046                     " maintenance is not useful (still enabling it)")
1047       self.cluster.maintain_node_health = self.op.maintain_node_health
1048
1049     if self.op.prealloc_wipe_disks is not None:
1050       self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
1051
1052     if self.op.add_uids is not None:
1053       uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
1054
1055     if self.op.remove_uids is not None:
1056       uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
1057
1058     if self.op.uid_pool is not None:
1059       self.cluster.uid_pool = self.op.uid_pool
1060
1061     if self.op.default_iallocator is not None:
1062       self.cluster.default_iallocator = self.op.default_iallocator
1063
1064     if self.op.reserved_lvs is not None:
1065       self.cluster.reserved_lvs = self.op.reserved_lvs
1066
1067     if self.op.use_external_mip_script is not None:
1068       self.cluster.use_external_mip_script = self.op.use_external_mip_script
1069
1070     def helper_os(aname, mods, desc):
1071       desc += " OS list"
1072       lst = getattr(self.cluster, aname)
1073       for key, val in mods:
1074         if key == constants.DDM_ADD:
1075           if val in lst:
1076             feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1077           else:
1078             lst.append(val)
1079         elif key == constants.DDM_REMOVE:
1080           if val in lst:
1081             lst.remove(val)
1082           else:
1083             feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1084         else:
1085           raise errors.ProgrammerError("Invalid modification '%s'" % key)
1086
1087     if self.op.hidden_os:
1088       helper_os("hidden_os", self.op.hidden_os, "hidden")
1089
1090     if self.op.blacklisted_os:
1091       helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1092
1093     if self.op.master_netdev:
1094       master_params = self.cfg.GetMasterNetworkParameters()
1095       ems = self.cfg.GetUseExternalMipScript()
1096       feedback_fn("Shutting down master ip on the current netdev (%s)" %
1097                   self.cluster.master_netdev)
1098       result = self.rpc.call_node_deactivate_master_ip(master_params.name,
1099                                                        master_params, ems)
1100       result.Raise("Could not disable the master ip")
1101       feedback_fn("Changing master_netdev from %s to %s" %
1102                   (master_params.netdev, self.op.master_netdev))
1103       self.cluster.master_netdev = self.op.master_netdev
1104
1105     if self.op.master_netmask:
1106       master_params = self.cfg.GetMasterNetworkParameters()
1107       feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1108       result = self.rpc.call_node_change_master_netmask(master_params.name,
1109                                                         master_params.netmask,
1110                                                         self.op.master_netmask,
1111                                                         master_params.ip,
1112                                                         master_params.netdev)
1113       result.Warn("Could not change the master IP netmask", feedback_fn)
1114       self.cluster.master_netmask = self.op.master_netmask
1115
1116     self.cfg.Update(self.cluster, feedback_fn)
1117
1118     if self.op.master_netdev:
1119       master_params = self.cfg.GetMasterNetworkParameters()
1120       feedback_fn("Starting the master ip on the new master netdev (%s)" %
1121                   self.op.master_netdev)
1122       ems = self.cfg.GetUseExternalMipScript()
1123       result = self.rpc.call_node_activate_master_ip(master_params.name,
1124                                                      master_params, ems)
1125       result.Warn("Could not re-enable the master ip on the master,"
1126                   " please restart manually", self.LogWarning)
1127
1128
1129 class LUClusterVerify(NoHooksLU):
1130   """Submits all jobs necessary to verify the cluster.
1131
1132   """
1133   REQ_BGL = False
1134
1135   def ExpandNames(self):
1136     self.needed_locks = {}
1137
1138   def Exec(self, feedback_fn):
1139     jobs = []
1140
1141     if self.op.group_name:
1142       groups = [self.op.group_name]
1143       depends_fn = lambda: None
1144     else:
1145       groups = self.cfg.GetNodeGroupList()
1146
1147       # Verify global configuration
1148       jobs.append([
1149         opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1150         ])
1151
1152       # Always depend on global verification
1153       depends_fn = lambda: [(-len(jobs), [])]
1154
1155     jobs.extend(
1156       [opcodes.OpClusterVerifyGroup(group_name=group,
1157                                     ignore_errors=self.op.ignore_errors,
1158                                     depends=depends_fn())]
1159       for group in groups)
1160
1161     # Fix up all parameters
1162     for op in itertools.chain(*jobs): # pylint: disable=W0142
1163       op.debug_simulate_errors = self.op.debug_simulate_errors
1164       op.verbose = self.op.verbose
1165       op.error_codes = self.op.error_codes
1166       try:
1167         op.skip_checks = self.op.skip_checks
1168       except AttributeError:
1169         assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1170
1171     return ResultWithJobs(jobs)
1172
1173
1174 class _VerifyErrors(object):
1175   """Mix-in for cluster/group verify LUs.
1176
1177   It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1178   self.op and self._feedback_fn to be available.)
1179
1180   """
1181
1182   ETYPE_FIELD = "code"
1183   ETYPE_ERROR = "ERROR"
1184   ETYPE_WARNING = "WARNING"
1185
1186   def _Error(self, ecode, item, msg, *args, **kwargs):
1187     """Format an error message.
1188
1189     Based on the opcode's error_codes parameter, either format a
1190     parseable error code, or a simpler error string.
1191
1192     This must be called only from Exec and functions called from Exec.
1193
1194     """
1195     ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1196     itype, etxt, _ = ecode
1197     # If the error code is in the list of ignored errors, demote the error to a
1198     # warning
1199     if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1200       ltype = self.ETYPE_WARNING
1201     # first complete the msg
1202     if args:
1203       msg = msg % args
1204     # then format the whole message
1205     if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1206       msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1207     else:
1208       if item:
1209         item = " " + item
1210       else:
1211         item = ""
1212       msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1213     # and finally report it via the feedback_fn
1214     self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1215     # do not mark the operation as failed for WARN cases only
1216     if ltype == self.ETYPE_ERROR:
1217       self.bad = True
1218
1219   def _ErrorIf(self, cond, *args, **kwargs):
1220     """Log an error message if the passed condition is True.
1221
1222     """
1223     if (bool(cond)
1224         or self.op.debug_simulate_errors): # pylint: disable=E1101
1225       self._Error(*args, **kwargs)
1226
1227
1228 def _VerifyCertificate(filename):
1229   """Verifies a certificate for L{LUClusterVerifyConfig}.
1230
1231   @type filename: string
1232   @param filename: Path to PEM file
1233
1234   """
1235   try:
1236     cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1237                                            utils.ReadFile(filename))
1238   except Exception, err: # pylint: disable=W0703
1239     return (LUClusterVerifyConfig.ETYPE_ERROR,
1240             "Failed to load X509 certificate %s: %s" % (filename, err))
1241
1242   (errcode, msg) = \
1243     utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1244                                 constants.SSL_CERT_EXPIRATION_ERROR)
1245
1246   if msg:
1247     fnamemsg = "While verifying %s: %s" % (filename, msg)
1248   else:
1249     fnamemsg = None
1250
1251   if errcode is None:
1252     return (None, fnamemsg)
1253   elif errcode == utils.CERT_WARNING:
1254     return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1255   elif errcode == utils.CERT_ERROR:
1256     return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1257
1258   raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1259
1260
1261 def _GetAllHypervisorParameters(cluster, instances):
1262   """Compute the set of all hypervisor parameters.
1263
1264   @type cluster: L{objects.Cluster}
1265   @param cluster: the cluster object
1266   @param instances: list of L{objects.Instance}
1267   @param instances: additional instances from which to obtain parameters
1268   @rtype: list of (origin, hypervisor, parameters)
1269   @return: a list with all parameters found, indicating the hypervisor they
1270        apply to, and the origin (can be "cluster", "os X", or "instance Y")
1271
1272   """
1273   hvp_data = []
1274
1275   for hv_name in cluster.enabled_hypervisors:
1276     hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1277
1278   for os_name, os_hvp in cluster.os_hvp.items():
1279     for hv_name, hv_params in os_hvp.items():
1280       if hv_params:
1281         full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1282         hvp_data.append(("os %s" % os_name, hv_name, full_params))
1283
1284   # TODO: collapse identical parameter values in a single one
1285   for instance in instances:
1286     if instance.hvparams:
1287       hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1288                        cluster.FillHV(instance)))
1289
1290   return hvp_data
1291
1292
1293 class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1294   """Verifies the cluster config.
1295
1296   """
1297   REQ_BGL = False
1298
1299   def _VerifyHVP(self, hvp_data):
1300     """Verifies locally the syntax of the hypervisor parameters.
1301
1302     """
1303     for item, hv_name, hv_params in hvp_data:
1304       msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1305              (item, hv_name))
1306       try:
1307         hv_class = hypervisor.GetHypervisorClass(hv_name)
1308         utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1309         hv_class.CheckParameterSyntax(hv_params)
1310       except errors.GenericError, err:
1311         self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1312
1313   def ExpandNames(self):
1314     self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1315     self.share_locks = ShareAll()
1316
1317   def CheckPrereq(self):
1318     """Check prerequisites.
1319
1320     """
1321     # Retrieve all information
1322     self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1323     self.all_node_info = self.cfg.GetAllNodesInfo()
1324     self.all_inst_info = self.cfg.GetAllInstancesInfo()
1325
1326   def Exec(self, feedback_fn):
1327     """Verify integrity of cluster, performing various test on nodes.
1328
1329     """
1330     self.bad = False
1331     self._feedback_fn = feedback_fn
1332
1333     feedback_fn("* Verifying cluster config")
1334
1335     for msg in self.cfg.VerifyConfig():
1336       self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1337
1338     feedback_fn("* Verifying cluster certificate files")
1339
1340     for cert_filename in pathutils.ALL_CERT_FILES:
1341       (errcode, msg) = _VerifyCertificate(cert_filename)
1342       self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1343
1344     feedback_fn("* Verifying hypervisor parameters")
1345
1346     self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1347                                                 self.all_inst_info.values()))
1348
1349     feedback_fn("* Verifying all nodes belong to an existing group")
1350
1351     # We do this verification here because, should this bogus circumstance
1352     # occur, it would never be caught by VerifyGroup, which only acts on
1353     # nodes/instances reachable from existing node groups.
1354
1355     dangling_nodes = set(node.name for node in self.all_node_info.values()
1356                          if node.group not in self.all_group_info)
1357
1358     dangling_instances = {}
1359     no_node_instances = []
1360
1361     for inst in self.all_inst_info.values():
1362       if inst.primary_node in dangling_nodes:
1363         dangling_instances.setdefault(inst.primary_node, []).append(inst.name)
1364       elif inst.primary_node not in self.all_node_info:
1365         no_node_instances.append(inst.name)
1366
1367     pretty_dangling = [
1368         "%s (%s)" %
1369         (node.name,
1370          utils.CommaJoin(dangling_instances.get(node.name,
1371                                                 ["no instances"])))
1372         for node in dangling_nodes]
1373
1374     self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1375                   None,
1376                   "the following nodes (and their instances) belong to a non"
1377                   " existing group: %s", utils.CommaJoin(pretty_dangling))
1378
1379     self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1380                   None,
1381                   "the following instances have a non-existing primary-node:"
1382                   " %s", utils.CommaJoin(no_node_instances))
1383
1384     return not self.bad
1385
1386
1387 class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1388   """Verifies the status of a node group.
1389
1390   """
1391   HPATH = "cluster-verify"
1392   HTYPE = constants.HTYPE_CLUSTER
1393   REQ_BGL = False
1394
1395   _HOOKS_INDENT_RE = re.compile("^", re.M)
1396
1397   class NodeImage(object):
1398     """A class representing the logical and physical status of a node.
1399
1400     @type name: string
1401     @ivar name: the node name to which this object refers
1402     @ivar volumes: a structure as returned from
1403         L{ganeti.backend.GetVolumeList} (runtime)
1404     @ivar instances: a list of running instances (runtime)
1405     @ivar pinst: list of configured primary instances (config)
1406     @ivar sinst: list of configured secondary instances (config)
1407     @ivar sbp: dictionary of {primary-node: list of instances} for all
1408         instances for which this node is secondary (config)
1409     @ivar mfree: free memory, as reported by hypervisor (runtime)
1410     @ivar dfree: free disk, as reported by the node (runtime)
1411     @ivar offline: the offline status (config)
1412     @type rpc_fail: boolean
1413     @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1414         not whether the individual keys were correct) (runtime)
1415     @type lvm_fail: boolean
1416     @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1417     @type hyp_fail: boolean
1418     @ivar hyp_fail: whether the RPC call didn't return the instance list
1419     @type ghost: boolean
1420     @ivar ghost: whether this is a known node or not (config)
1421     @type os_fail: boolean
1422     @ivar os_fail: whether the RPC call didn't return valid OS data
1423     @type oslist: list
1424     @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1425     @type vm_capable: boolean
1426     @ivar vm_capable: whether the node can host instances
1427     @type pv_min: float
1428     @ivar pv_min: size in MiB of the smallest PVs
1429     @type pv_max: float
1430     @ivar pv_max: size in MiB of the biggest PVs
1431
1432     """
1433     def __init__(self, offline=False, name=None, vm_capable=True):
1434       self.name = name
1435       self.volumes = {}
1436       self.instances = []
1437       self.pinst = []
1438       self.sinst = []
1439       self.sbp = {}
1440       self.mfree = 0
1441       self.dfree = 0
1442       self.offline = offline
1443       self.vm_capable = vm_capable
1444       self.rpc_fail = False
1445       self.lvm_fail = False
1446       self.hyp_fail = False
1447       self.ghost = False
1448       self.os_fail = False
1449       self.oslist = {}
1450       self.pv_min = None
1451       self.pv_max = None
1452
1453   def ExpandNames(self):
1454     # This raises errors.OpPrereqError on its own:
1455     self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1456
1457     # Get instances in node group; this is unsafe and needs verification later
1458     inst_names = \
1459       self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1460
1461     self.needed_locks = {
1462       locking.LEVEL_INSTANCE: inst_names,
1463       locking.LEVEL_NODEGROUP: [self.group_uuid],
1464       locking.LEVEL_NODE: [],
1465
1466       # This opcode is run by watcher every five minutes and acquires all nodes
1467       # for a group. It doesn't run for a long time, so it's better to acquire
1468       # the node allocation lock as well.
1469       locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1470       }
1471
1472     self.share_locks = ShareAll()
1473
1474   def DeclareLocks(self, level):
1475     if level == locking.LEVEL_NODE:
1476       # Get members of node group; this is unsafe and needs verification later
1477       nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1478
1479       all_inst_info = self.cfg.GetAllInstancesInfo()
1480
1481       # In Exec(), we warn about mirrored instances that have primary and
1482       # secondary living in separate node groups. To fully verify that
1483       # volumes for these instances are healthy, we will need to do an
1484       # extra call to their secondaries. We ensure here those nodes will
1485       # be locked.
1486       for inst in self.owned_locks(locking.LEVEL_INSTANCE):
1487         # Important: access only the instances whose lock is owned
1488         if all_inst_info[inst].disk_template in constants.DTS_INT_MIRROR:
1489           nodes.update(all_inst_info[inst].secondary_nodes)
1490
1491       self.needed_locks[locking.LEVEL_NODE] = nodes
1492
1493   def CheckPrereq(self):
1494     assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1495     self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1496
1497     group_nodes = set(self.group_info.members)
1498     group_instances = \
1499       self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1500
1501     unlocked_nodes = \
1502         group_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1503
1504     unlocked_instances = \
1505         group_instances.difference(self.owned_locks(locking.LEVEL_INSTANCE))
1506
1507     if unlocked_nodes:
1508       raise errors.OpPrereqError("Missing lock for nodes: %s" %
1509                                  utils.CommaJoin(unlocked_nodes),
1510                                  errors.ECODE_STATE)
1511
1512     if unlocked_instances:
1513       raise errors.OpPrereqError("Missing lock for instances: %s" %
1514                                  utils.CommaJoin(unlocked_instances),
1515                                  errors.ECODE_STATE)
1516
1517     self.all_node_info = self.cfg.GetAllNodesInfo()
1518     self.all_inst_info = self.cfg.GetAllInstancesInfo()
1519
1520     self.my_node_names = utils.NiceSort(group_nodes)
1521     self.my_inst_names = utils.NiceSort(group_instances)
1522
1523     self.my_node_info = dict((name, self.all_node_info[name])
1524                              for name in self.my_node_names)
1525
1526     self.my_inst_info = dict((name, self.all_inst_info[name])
1527                              for name in self.my_inst_names)
1528
1529     # We detect here the nodes that will need the extra RPC calls for verifying
1530     # split LV volumes; they should be locked.
1531     extra_lv_nodes = set()
1532
1533     for inst in self.my_inst_info.values():
1534       if inst.disk_template in constants.DTS_INT_MIRROR:
1535         for nname in inst.all_nodes:
1536           if self.all_node_info[nname].group != self.group_uuid:
1537             extra_lv_nodes.add(nname)
1538
1539     unlocked_lv_nodes = \
1540         extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1541
1542     if unlocked_lv_nodes:
1543       raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1544                                  utils.CommaJoin(unlocked_lv_nodes),
1545                                  errors.ECODE_STATE)
1546     self.extra_lv_nodes = list(extra_lv_nodes)
1547
1548   def _VerifyNode(self, ninfo, nresult):
1549     """Perform some basic validation on data returned from a node.
1550
1551       - check the result data structure is well formed and has all the
1552         mandatory fields
1553       - check ganeti version
1554
1555     @type ninfo: L{objects.Node}
1556     @param ninfo: the node to check
1557     @param nresult: the results from the node
1558     @rtype: boolean
1559     @return: whether overall this call was successful (and we can expect
1560          reasonable values in the respose)
1561
1562     """
1563     node = ninfo.name
1564     _ErrorIf = self._ErrorIf # pylint: disable=C0103
1565
1566     # main result, nresult should be a non-empty dict
1567     test = not nresult or not isinstance(nresult, dict)
1568     _ErrorIf(test, constants.CV_ENODERPC, node,
1569                   "unable to verify node: no data returned")
1570     if test:
1571       return False
1572
1573     # compares ganeti version
1574     local_version = constants.PROTOCOL_VERSION
1575     remote_version = nresult.get("version", None)
1576     test = not (remote_version and
1577                 isinstance(remote_version, (list, tuple)) and
1578                 len(remote_version) == 2)
1579     _ErrorIf(test, constants.CV_ENODERPC, node,
1580              "connection to node returned invalid data")
1581     if test:
1582       return False
1583
1584     test = local_version != remote_version[0]
1585     _ErrorIf(test, constants.CV_ENODEVERSION, node,
1586              "incompatible protocol versions: master %s,"
1587              " node %s", local_version, remote_version[0])
1588     if test:
1589       return False
1590
1591     # node seems compatible, we can actually try to look into its results
1592
1593     # full package version
1594     self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1595                   constants.CV_ENODEVERSION, node,
1596                   "software version mismatch: master %s, node %s",
1597                   constants.RELEASE_VERSION, remote_version[1],
1598                   code=self.ETYPE_WARNING)
1599
1600     hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1601     if ninfo.vm_capable and isinstance(hyp_result, dict):
1602       for hv_name, hv_result in hyp_result.iteritems():
1603         test = hv_result is not None
1604         _ErrorIf(test, constants.CV_ENODEHV, node,
1605                  "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1606
1607     hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1608     if ninfo.vm_capable and isinstance(hvp_result, list):
1609       for item, hv_name, hv_result in hvp_result:
1610         _ErrorIf(True, constants.CV_ENODEHV, node,
1611                  "hypervisor %s parameter verify failure (source %s): %s",
1612                  hv_name, item, hv_result)
1613
1614     test = nresult.get(constants.NV_NODESETUP,
1615                        ["Missing NODESETUP results"])
1616     _ErrorIf(test, constants.CV_ENODESETUP, node, "node setup error: %s",
1617              "; ".join(test))
1618
1619     return True
1620
1621   def _VerifyNodeTime(self, ninfo, nresult,
1622                       nvinfo_starttime, nvinfo_endtime):
1623     """Check the node time.
1624
1625     @type ninfo: L{objects.Node}
1626     @param ninfo: the node to check
1627     @param nresult: the remote results for the node
1628     @param nvinfo_starttime: the start time of the RPC call
1629     @param nvinfo_endtime: the end time of the RPC call
1630
1631     """
1632     node = ninfo.name
1633     _ErrorIf = self._ErrorIf # pylint: disable=C0103
1634
1635     ntime = nresult.get(constants.NV_TIME, None)
1636     try:
1637       ntime_merged = utils.MergeTime(ntime)
1638     except (ValueError, TypeError):
1639       _ErrorIf(True, constants.CV_ENODETIME, node, "Node returned invalid time")
1640       return
1641
1642     if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1643       ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1644     elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1645       ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1646     else:
1647       ntime_diff = None
1648
1649     _ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, node,
1650              "Node time diverges by at least %s from master node time",
1651              ntime_diff)
1652
1653   def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1654     """Check the node LVM results and update info for cross-node checks.
1655
1656     @type ninfo: L{objects.Node}
1657     @param ninfo: the node to check
1658     @param nresult: the remote results for the node
1659     @param vg_name: the configured VG name
1660     @type nimg: L{NodeImage}
1661     @param nimg: node image
1662
1663     """
1664     if vg_name is None:
1665       return
1666
1667     node = ninfo.name
1668     _ErrorIf = self._ErrorIf # pylint: disable=C0103
1669
1670     # checks vg existence and size > 20G
1671     vglist = nresult.get(constants.NV_VGLIST, None)
1672     test = not vglist
1673     _ErrorIf(test, constants.CV_ENODELVM, node, "unable to check volume groups")
1674     if not test:
1675       vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1676                                             constants.MIN_VG_SIZE)
1677       _ErrorIf(vgstatus, constants.CV_ENODELVM, node, vgstatus)
1678
1679     # Check PVs
1680     (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1681     for em in errmsgs:
1682       self._Error(constants.CV_ENODELVM, node, em)
1683     if pvminmax is not None:
1684       (nimg.pv_min, nimg.pv_max) = pvminmax
1685
1686   def _VerifyGroupDRBDVersion(self, node_verify_infos):
1687     """Check cross-node DRBD version consistency.
1688
1689     @type node_verify_infos: dict
1690     @param node_verify_infos: infos about nodes as returned from the
1691       node_verify call.
1692
1693     """
1694     node_versions = {}
1695     for node, ndata in node_verify_infos.items():
1696       nresult = ndata.payload
1697       version = nresult.get(constants.NV_DRBDVERSION, "Missing DRBD version")
1698       node_versions[node] = version
1699
1700     if len(set(node_versions.values())) > 1:
1701       for node, version in sorted(node_versions.items()):
1702         msg = "DRBD version mismatch: %s" % version
1703         self._Error(constants.CV_ENODEDRBDHELPER, node, msg,
1704                     code=self.ETYPE_WARNING)
1705
1706   def _VerifyGroupLVM(self, node_image, vg_name):
1707     """Check cross-node consistency in LVM.
1708
1709     @type node_image: dict
1710     @param node_image: info about nodes, mapping from node to names to
1711       L{NodeImage} objects
1712     @param vg_name: the configured VG name
1713
1714     """
1715     if vg_name is None:
1716       return
1717
1718     # Only exlcusive storage needs this kind of checks
1719     if not self._exclusive_storage:
1720       return
1721
1722     # exclusive_storage wants all PVs to have the same size (approximately),
1723     # if the smallest and the biggest ones are okay, everything is fine.
1724     # pv_min is None iff pv_max is None
1725     vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1726     if not vals:
1727       return
1728     (pvmin, minnode) = min((ni.pv_min, ni.name) for ni in vals)
1729     (pvmax, maxnode) = max((ni.pv_max, ni.name) for ni in vals)
1730     bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1731     self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1732                   "PV sizes differ too much in the group; smallest (%s MB) is"
1733                   " on %s, biggest (%s MB) is on %s",
1734                   pvmin, minnode, pvmax, maxnode)
1735
1736   def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1737     """Check the node bridges.
1738
1739     @type ninfo: L{objects.Node}
1740     @param ninfo: the node to check
1741     @param nresult: the remote results for the node
1742     @param bridges: the expected list of bridges
1743
1744     """
1745     if not bridges:
1746       return
1747
1748     node = ninfo.name
1749     _ErrorIf = self._ErrorIf # pylint: disable=C0103
1750
1751     missing = nresult.get(constants.NV_BRIDGES, None)
1752     test = not isinstance(missing, list)
1753     _ErrorIf(test, constants.CV_ENODENET, node,
1754              "did not return valid bridge information")
1755     if not test:
1756       _ErrorIf(bool(missing), constants.CV_ENODENET, node,
1757                "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1758
1759   def _VerifyNodeUserScripts(self, ninfo, nresult):
1760     """Check the results of user scripts presence and executability on the node
1761
1762     @type ninfo: L{objects.Node}
1763     @param ninfo: the node to check
1764     @param nresult: the remote results for the node
1765
1766     """
1767     node = ninfo.name
1768
1769     test = not constants.NV_USERSCRIPTS in nresult
1770     self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, node,
1771                   "did not return user scripts information")
1772
1773     broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
1774     if not test:
1775       self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, node,
1776                     "user scripts not present or not executable: %s" %
1777                     utils.CommaJoin(sorted(broken_scripts)))
1778
1779   def _VerifyNodeNetwork(self, ninfo, nresult):
1780     """Check the node network connectivity results.
1781
1782     @type ninfo: L{objects.Node}
1783     @param ninfo: the node to check
1784     @param nresult: the remote results for the node
1785
1786     """
1787     node = ninfo.name
1788     _ErrorIf = self._ErrorIf # pylint: disable=C0103
1789
1790     test = constants.NV_NODELIST not in nresult
1791     _ErrorIf(test, constants.CV_ENODESSH, node,
1792              "node hasn't returned node ssh connectivity data")
1793     if not test:
1794       if nresult[constants.NV_NODELIST]:
1795         for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1796           _ErrorIf(True, constants.CV_ENODESSH, node,
1797                    "ssh communication with node '%s': %s", a_node, a_msg)
1798
1799     test = constants.NV_NODENETTEST not in nresult
1800     _ErrorIf(test, constants.CV_ENODENET, node,
1801              "node hasn't returned node tcp connectivity data")
1802     if not test:
1803       if nresult[constants.NV_NODENETTEST]:
1804         nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1805         for anode in nlist:
1806           _ErrorIf(True, constants.CV_ENODENET, node,
1807                    "tcp communication with node '%s': %s",
1808                    anode, nresult[constants.NV_NODENETTEST][anode])
1809
1810     test = constants.NV_MASTERIP not in nresult
1811     _ErrorIf(test, constants.CV_ENODENET, node,
1812              "node hasn't returned node master IP reachability data")
1813     if not test:
1814       if not nresult[constants.NV_MASTERIP]:
1815         if node == self.master_node:
1816           msg = "the master node cannot reach the master IP (not configured?)"
1817         else:
1818           msg = "cannot reach the master IP"
1819         _ErrorIf(True, constants.CV_ENODENET, node, msg)
1820
1821   def _VerifyInstance(self, instance, inst_config, node_image,
1822                       diskstatus):
1823     """Verify an instance.
1824
1825     This function checks to see if the required block devices are
1826     available on the instance's node, and that the nodes are in the correct
1827     state.
1828
1829     """
1830     _ErrorIf = self._ErrorIf # pylint: disable=C0103
1831     pnode = inst_config.primary_node
1832     pnode_img = node_image[pnode]
1833     groupinfo = self.cfg.GetAllNodeGroupsInfo()
1834
1835     node_vol_should = {}
1836     inst_config.MapLVsByNode(node_vol_should)
1837
1838     cluster = self.cfg.GetClusterInfo()
1839     ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
1840                                                             self.group_info)
1841     err = ComputeIPolicyInstanceViolation(ipolicy, inst_config, self.cfg)
1842     _ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance, utils.CommaJoin(err),
1843              code=self.ETYPE_WARNING)
1844
1845     for node in node_vol_should:
1846       n_img = node_image[node]
1847       if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1848         # ignore missing volumes on offline or broken nodes
1849         continue
1850       for volume in node_vol_should[node]:
1851         test = volume not in n_img.volumes
1852         _ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance,
1853                  "volume %s missing on node %s", volume, node)
1854
1855     if inst_config.admin_state == constants.ADMINST_UP:
1856       test = instance not in pnode_img.instances and not pnode_img.offline
1857       _ErrorIf(test, constants.CV_EINSTANCEDOWN, instance,
1858                "instance not running on its primary node %s",
1859                pnode)
1860       _ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE, instance,
1861                "instance is marked as running and lives on offline node %s",
1862                pnode)
1863
1864     diskdata = [(nname, success, status, idx)
1865                 for (nname, disks) in diskstatus.items()
1866                 for idx, (success, status) in enumerate(disks)]
1867
1868     for nname, success, bdev_status, idx in diskdata:
1869       # the 'ghost node' construction in Exec() ensures that we have a
1870       # node here
1871       snode = node_image[nname]
1872       bad_snode = snode.ghost or snode.offline
1873       _ErrorIf(inst_config.disks_active and
1874                not success and not bad_snode,
1875                constants.CV_EINSTANCEFAULTYDISK, instance,
1876                "couldn't retrieve status for disk/%s on %s: %s",
1877                idx, nname, bdev_status)
1878       _ErrorIf((inst_config.disks_active and
1879                 success and bdev_status.ldisk_status == constants.LDS_FAULTY),
1880                constants.CV_EINSTANCEFAULTYDISK, instance,
1881                "disk/%s on %s is faulty", idx, nname)
1882
1883     _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
1884              constants.CV_ENODERPC, pnode, "instance %s, connection to"
1885              " primary node failed", instance)
1886
1887     _ErrorIf(len(inst_config.secondary_nodes) > 1,
1888              constants.CV_EINSTANCELAYOUT,
1889              instance, "instance has multiple secondary nodes: %s",
1890              utils.CommaJoin(inst_config.secondary_nodes),
1891              code=self.ETYPE_WARNING)
1892
1893     es_flags = rpc.GetExclusiveStorageForNodeNames(self.cfg,
1894                                                    inst_config.all_nodes)
1895     if any(es_flags.values()):
1896       if inst_config.disk_template not in constants.DTS_EXCL_STORAGE:
1897         # Disk template not compatible with exclusive_storage: no instance
1898         # node should have the flag set
1899         es_nodes = [n
1900                     for (n, es) in es_flags.items()
1901                     if es]
1902         self._Error(constants.CV_EINSTANCEUNSUITABLENODE, instance,
1903                     "instance has template %s, which is not supported on nodes"
1904                     " that have exclusive storage set: %s",
1905                     inst_config.disk_template, utils.CommaJoin(es_nodes))
1906       for (idx, disk) in enumerate(inst_config.disks):
1907         _ErrorIf(disk.spindles is None,
1908                  constants.CV_EINSTANCEMISSINGCFGPARAMETER, instance,
1909                  "number of spindles not configured for disk %s while"
1910                  " exclusive storage is enabled, try running"
1911                  " gnt-cluster repair-disk-sizes",
1912                  idx)
1913
1914     if inst_config.disk_template in constants.DTS_INT_MIRROR:
1915       instance_nodes = utils.NiceSort(inst_config.all_nodes)
1916       instance_groups = {}
1917
1918       for node in instance_nodes:
1919         instance_groups.setdefault(self.all_node_info[node].group,
1920                                    []).append(node)
1921
1922       pretty_list = [
1923         "%s (group %s)" % (utils.CommaJoin(nodes), groupinfo[group].name)
1924         # Sort so that we always list the primary node first.
1925         for group, nodes in sorted(instance_groups.items(),
1926                                    key=lambda (_, nodes): pnode in nodes,
1927                                    reverse=True)]
1928
1929       self._ErrorIf(len(instance_groups) > 1,
1930                     constants.CV_EINSTANCESPLITGROUPS,
1931                     instance, "instance has primary and secondary nodes in"
1932                     " different groups: %s", utils.CommaJoin(pretty_list),
1933                     code=self.ETYPE_WARNING)
1934
1935     inst_nodes_offline = []
1936     for snode in inst_config.secondary_nodes:
1937       s_img = node_image[snode]
1938       _ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
1939                snode, "instance %s, connection to secondary node failed",
1940                instance)
1941
1942       if s_img.offline:
1943         inst_nodes_offline.append(snode)
1944
1945     # warn that the instance lives on offline nodes
1946     _ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE, instance,
1947              "instance has offline secondary node(s) %s",
1948              utils.CommaJoin(inst_nodes_offline))
1949     # ... or ghost/non-vm_capable nodes
1950     for node in inst_config.all_nodes:
1951       _ErrorIf(node_image[node].ghost, constants.CV_EINSTANCEBADNODE,
1952                instance, "instance lives on ghost node %s", node)
1953       _ErrorIf(not node_image[node].vm_capable, constants.CV_EINSTANCEBADNODE,
1954                instance, "instance lives on non-vm_capable node %s", node)
1955
1956   def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
1957     """Verify if there are any unknown volumes in the cluster.
1958
1959     The .os, .swap and backup volumes are ignored. All other volumes are
1960     reported as unknown.
1961
1962     @type reserved: L{ganeti.utils.FieldSet}
1963     @param reserved: a FieldSet of reserved volume names
1964
1965     """
1966     for node, n_img in node_image.items():
1967       if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
1968           self.all_node_info[node].group != self.group_uuid):
1969         # skip non-healthy nodes
1970         continue
1971       for volume in n_img.volumes:
1972         test = ((node not in node_vol_should or
1973                 volume not in node_vol_should[node]) and
1974                 not reserved.Matches(volume))
1975         self._ErrorIf(test, constants.CV_ENODEORPHANLV, node,
1976                       "volume %s is unknown", volume)
1977
1978   def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
1979     """Verify N+1 Memory Resilience.
1980
1981     Check that if one single node dies we can still start all the
1982     instances it was primary for.
1983
1984     """
1985     cluster_info = self.cfg.GetClusterInfo()
1986     for node, n_img in node_image.items():
1987       # This code checks that every node which is now listed as
1988       # secondary has enough memory to host all instances it is
1989       # supposed to should a single other node in the cluster fail.
1990       # FIXME: not ready for failover to an arbitrary node
1991       # FIXME: does not support file-backed instances
1992       # WARNING: we currently take into account down instances as well
1993       # as up ones, considering that even if they're down someone
1994       # might want to start them even in the event of a node failure.
1995       if n_img.offline or self.all_node_info[node].group != self.group_uuid:
1996         # we're skipping nodes marked offline and nodes in other groups from
1997         # the N+1 warning, since most likely we don't have good memory
1998         # infromation from them; we already list instances living on such
1999         # nodes, and that's enough warning
2000         continue
2001       #TODO(dynmem): also consider ballooning out other instances
2002       for prinode, instances in n_img.sbp.items():
2003         needed_mem = 0
2004         for instance in instances:
2005           bep = cluster_info.FillBE(instance_cfg[instance])
2006           if bep[constants.BE_AUTO_BALANCE]:
2007             needed_mem += bep[constants.BE_MINMEM]
2008         test = n_img.mfree < needed_mem
2009         self._ErrorIf(test, constants.CV_ENODEN1, node,
2010                       "not enough memory to accomodate instance failovers"
2011                       " should node %s fail (%dMiB needed, %dMiB available)",
2012                       prinode, needed_mem, n_img.mfree)
2013
2014   @classmethod
2015   def _VerifyFiles(cls, errorif, nodeinfo, master_node, all_nvinfo,
2016                    (files_all, files_opt, files_mc, files_vm)):
2017     """Verifies file checksums collected from all nodes.
2018
2019     @param errorif: Callback for reporting errors
2020     @param nodeinfo: List of L{objects.Node} objects
2021     @param master_node: Name of master node
2022     @param all_nvinfo: RPC results
2023
2024     """
2025     # Define functions determining which nodes to consider for a file
2026     files2nodefn = [
2027       (files_all, None),
2028       (files_mc, lambda node: (node.master_candidate or
2029                                node.name == master_node)),
2030       (files_vm, lambda node: node.vm_capable),
2031       ]
2032
2033     # Build mapping from filename to list of nodes which should have the file
2034     nodefiles = {}
2035     for (files, fn) in files2nodefn:
2036       if fn is None:
2037         filenodes = nodeinfo
2038       else:
2039         filenodes = filter(fn, nodeinfo)
2040       nodefiles.update((filename,
2041                         frozenset(map(operator.attrgetter("name"), filenodes)))
2042                        for filename in files)
2043
2044     assert set(nodefiles) == (files_all | files_mc | files_vm)
2045
2046     fileinfo = dict((filename, {}) for filename in nodefiles)
2047     ignore_nodes = set()
2048
2049     for node in nodeinfo:
2050       if node.offline:
2051         ignore_nodes.add(node.name)
2052         continue
2053
2054       nresult = all_nvinfo[node.name]
2055
2056       if nresult.fail_msg or not nresult.payload:
2057         node_files = None
2058       else:
2059         fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
2060         node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2061                           for (key, value) in fingerprints.items())
2062         del fingerprints
2063
2064       test = not (node_files and isinstance(node_files, dict))
2065       errorif(test, constants.CV_ENODEFILECHECK, node.name,
2066               "Node did not return file checksum data")
2067       if test:
2068         ignore_nodes.add(node.name)
2069         continue
2070
2071       # Build per-checksum mapping from filename to nodes having it
2072       for (filename, checksum) in node_files.items():
2073         assert filename in nodefiles
2074         fileinfo[filename].setdefault(checksum, set()).add(node.name)
2075
2076     for (filename, checksums) in fileinfo.items():
2077       assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2078
2079       # Nodes having the file
2080       with_file = frozenset(node_name
2081                             for nodes in fileinfo[filename].values()
2082                             for node_name in nodes) - ignore_nodes
2083
2084       expected_nodes = nodefiles[filename] - ignore_nodes
2085
2086       # Nodes missing file
2087       missing_file = expected_nodes - with_file
2088
2089       if filename in files_opt:
2090         # All or no nodes
2091         errorif(missing_file and missing_file != expected_nodes,
2092                 constants.CV_ECLUSTERFILECHECK, None,
2093                 "File %s is optional, but it must exist on all or no"
2094                 " nodes (not found on %s)",
2095                 filename, utils.CommaJoin(utils.NiceSort(missing_file)))
2096       else:
2097         errorif(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2098                 "File %s is missing from node(s) %s", filename,
2099                 utils.CommaJoin(utils.NiceSort(missing_file)))
2100
2101         # Warn if a node has a file it shouldn't
2102         unexpected = with_file - expected_nodes
2103         errorif(unexpected,
2104                 constants.CV_ECLUSTERFILECHECK, None,
2105                 "File %s should not exist on node(s) %s",
2106                 filename, utils.CommaJoin(utils.NiceSort(unexpected)))
2107
2108       # See if there are multiple versions of the file
2109       test = len(checksums) > 1
2110       if test:
2111         variants = ["variant %s on %s" %
2112                     (idx + 1, utils.CommaJoin(utils.NiceSort(nodes)))
2113                     for (idx, (checksum, nodes)) in
2114                       enumerate(sorted(checksums.items()))]
2115       else:
2116         variants = []
2117
2118       errorif(test, constants.CV_ECLUSTERFILECHECK, None,
2119               "File %s found with %s different checksums (%s)",
2120               filename, len(checksums), "; ".join(variants))
2121
2122   def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2123                       drbd_map):
2124     """Verifies and the node DRBD status.
2125
2126     @type ninfo: L{objects.Node}
2127     @param ninfo: the node to check
2128     @param nresult: the remote results for the node
2129     @param instanceinfo: the dict of instances
2130     @param drbd_helper: the configured DRBD usermode helper
2131     @param drbd_map: the DRBD map as returned by
2132         L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2133
2134     """
2135     node = ninfo.name
2136     _ErrorIf = self._ErrorIf # pylint: disable=C0103
2137
2138     if drbd_helper:
2139       helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2140       test = (helper_result is None)
2141       _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2142                "no drbd usermode helper returned")
2143       if helper_result:
2144         status, payload = helper_result
2145         test = not status
2146         _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2147                  "drbd usermode helper check unsuccessful: %s", payload)
2148         test = status and (payload != drbd_helper)
2149         _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2150                  "wrong drbd usermode helper: %s", payload)
2151
2152     # compute the DRBD minors
2153     node_drbd = {}
2154     for minor, instance in drbd_map[node].items():
2155       test = instance not in instanceinfo
2156       _ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2157                "ghost instance '%s' in temporary DRBD map", instance)
2158         # ghost instance should not be running, but otherwise we
2159         # don't give double warnings (both ghost instance and
2160         # unallocated minor in use)
2161       if test:
2162         node_drbd[minor] = (instance, False)
2163       else:
2164         instance = instanceinfo[instance]
2165         node_drbd[minor] = (instance.name, instance.disks_active)
2166
2167     # and now check them
2168     used_minors = nresult.get(constants.NV_DRBDLIST, [])
2169     test = not isinstance(used_minors, (tuple, list))
2170     _ErrorIf(test, constants.CV_ENODEDRBD, node,
2171              "cannot parse drbd status file: %s", str(used_minors))
2172     if test:
2173       # we cannot check drbd status
2174       return
2175
2176     for minor, (iname, must_exist) in node_drbd.items():
2177       test = minor not in used_minors and must_exist
2178       _ErrorIf(test, constants.CV_ENODEDRBD, node,
2179                "drbd minor %d of instance %s is not active", minor, iname)
2180     for minor in used_minors:
2181       test = minor not in node_drbd
2182       _ErrorIf(test, constants.CV_ENODEDRBD, node,
2183                "unallocated drbd minor %d is in use", minor)
2184
2185   def _UpdateNodeOS(self, ninfo, nresult, nimg):
2186     """Builds the node OS structures.
2187
2188     @type ninfo: L{objects.Node}
2189     @param ninfo: the node to check
2190     @param nresult: the remote results for the node
2191     @param nimg: the node image object
2192
2193     """
2194     node = ninfo.name
2195     _ErrorIf = self._ErrorIf # pylint: disable=C0103
2196
2197     remote_os = nresult.get(constants.NV_OSLIST, None)
2198     test = (not isinstance(remote_os, list) or
2199             not compat.all(isinstance(v, list) and len(v) == 7
2200                            for v in remote_os))
2201
2202     _ErrorIf(test, constants.CV_ENODEOS, node,
2203              "node hasn't returned valid OS data")
2204
2205     nimg.os_fail = test
2206
2207     if test:
2208       return
2209
2210     os_dict = {}
2211
2212     for (name, os_path, status, diagnose,
2213          variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2214
2215       if name not in os_dict:
2216         os_dict[name] = []
2217
2218       # parameters is a list of lists instead of list of tuples due to
2219       # JSON lacking a real tuple type, fix it:
2220       parameters = [tuple(v) for v in parameters]
2221       os_dict[name].append((os_path, status, diagnose,
2222                             set(variants), set(parameters), set(api_ver)))
2223
2224     nimg.oslist = os_dict
2225
2226   def _VerifyNodeOS(self, ninfo, nimg, base):
2227     """Verifies the node OS list.
2228
2229     @type ninfo: L{objects.Node}
2230     @param ninfo: the node to check
2231     @param nimg: the node image object
2232     @param base: the 'template' node we match against (e.g. from the master)
2233
2234     """
2235     node = ninfo.name
2236     _ErrorIf = self._ErrorIf # pylint: disable=C0103
2237
2238     assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2239
2240     beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2241     for os_name, os_data in nimg.oslist.items():
2242       assert os_data, "Empty OS status for OS %s?!" % os_name
2243       f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2244       _ErrorIf(not f_status, constants.CV_ENODEOS, node,
2245                "Invalid OS %s (located at %s): %s", os_name, f_path, f_diag)
2246       _ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, node,
2247                "OS '%s' has multiple entries (first one shadows the rest): %s",
2248                os_name, utils.CommaJoin([v[0] for v in os_data]))
2249       # comparisons with the 'base' image
2250       test = os_name not in base.oslist
2251       _ErrorIf(test, constants.CV_ENODEOS, node,
2252                "Extra OS %s not present on reference node (%s)",
2253                os_name, base.name)
2254       if test:
2255         continue
2256       assert base.oslist[os_name], "Base node has empty OS status?"
2257       _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2258       if not b_status:
2259         # base OS is invalid, skipping
2260         continue
2261       for kind, a, b in [("API version", f_api, b_api),
2262                          ("variants list", f_var, b_var),
2263                          ("parameters", beautify_params(f_param),
2264                           beautify_params(b_param))]:
2265         _ErrorIf(a != b, constants.CV_ENODEOS, node,
2266                  "OS %s for %s differs from reference node %s: [%s] vs. [%s]",
2267                  kind, os_name, base.name,
2268                  utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2269
2270     # check any missing OSes
2271     missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2272     _ErrorIf(missing, constants.CV_ENODEOS, node,
2273              "OSes present on reference node %s but missing on this node: %s",
2274              base.name, utils.CommaJoin(missing))
2275
2276   def _VerifyFileStoragePaths(self, ninfo, nresult, is_master):
2277     """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2278
2279     @type ninfo: L{objects.Node}
2280     @param ninfo: the node to check
2281     @param nresult: the remote results for the node
2282     @type is_master: bool
2283     @param is_master: Whether node is the master node
2284
2285     """
2286     node = ninfo.name
2287
2288     if (is_master and
2289         (constants.ENABLE_FILE_STORAGE or
2290          constants.ENABLE_SHARED_FILE_STORAGE)):
2291       try:
2292         fspaths = nresult[constants.NV_FILE_STORAGE_PATHS]
2293       except KeyError:
2294         # This should never happen
2295         self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, node,
2296                       "Node did not return forbidden file storage paths")
2297       else:
2298         self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, node,
2299                       "Found forbidden file storage paths: %s",
2300                       utils.CommaJoin(fspaths))
2301     else:
2302       self._ErrorIf(constants.NV_FILE_STORAGE_PATHS in nresult,
2303                     constants.CV_ENODEFILESTORAGEPATHS, node,
2304                     "Node should not have returned forbidden file storage"
2305                     " paths")
2306
2307   def _VerifyOob(self, ninfo, nresult):
2308     """Verifies out of band functionality of a node.
2309
2310     @type ninfo: L{objects.Node}
2311     @param ninfo: the node to check
2312     @param nresult: the remote results for the node
2313
2314     """
2315     node = ninfo.name
2316     # We just have to verify the paths on master and/or master candidates
2317     # as the oob helper is invoked on the master
2318     if ((ninfo.master_candidate or ninfo.master_capable) and
2319         constants.NV_OOB_PATHS in nresult):
2320       for path_result in nresult[constants.NV_OOB_PATHS]:
2321         self._ErrorIf(path_result, constants.CV_ENODEOOBPATH, node, path_result)
2322
2323   def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2324     """Verifies and updates the node volume data.
2325
2326     This function will update a L{NodeImage}'s internal structures
2327     with data from the remote call.
2328
2329     @type ninfo: L{objects.Node}
2330     @param ninfo: the node to check
2331     @param nresult: the remote results for the node
2332     @param nimg: the node image object
2333     @param vg_name: the configured VG name
2334
2335     """
2336     node = ninfo.name
2337     _ErrorIf = self._ErrorIf # pylint: disable=C0103
2338
2339     nimg.lvm_fail = True
2340     lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2341     if vg_name is None:
2342       pass
2343     elif isinstance(lvdata, basestring):
2344       _ErrorIf(True, constants.CV_ENODELVM, node, "LVM problem on node: %s",
2345                utils.SafeEncode(lvdata))
2346     elif not isinstance(lvdata, dict):
2347       _ErrorIf(True, constants.CV_ENODELVM, node,
2348                "rpc call to node failed (lvlist)")
2349     else:
2350       nimg.volumes = lvdata
2351       nimg.lvm_fail = False
2352
2353   def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2354     """Verifies and updates the node instance list.
2355
2356     If the listing was successful, then updates this node's instance
2357     list. Otherwise, it marks the RPC call as failed for the instance
2358     list key.
2359
2360     @type ninfo: L{objects.Node}
2361     @param ninfo: the node to check
2362     @param nresult: the remote results for the node
2363     @param nimg: the node image object
2364
2365     """
2366     idata = nresult.get(constants.NV_INSTANCELIST, None)
2367     test = not isinstance(idata, list)
2368     self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2369                   "rpc call to node failed (instancelist): %s",
2370                   utils.SafeEncode(str(idata)))
2371     if test:
2372       nimg.hyp_fail = True
2373     else:
2374       nimg.instances = idata
2375
2376   def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2377     """Verifies and computes a node information map
2378
2379     @type ninfo: L{objects.Node}
2380     @param ninfo: the node to check
2381     @param nresult: the remote results for the node
2382     @param nimg: the node image object
2383     @param vg_name: the configured VG name
2384
2385     """
2386     node = ninfo.name
2387     _ErrorIf = self._ErrorIf # pylint: disable=C0103
2388
2389     # try to read free memory (from the hypervisor)
2390     hv_info = nresult.get(constants.NV_HVINFO, None)
2391     test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2392     _ErrorIf(test, constants.CV_ENODEHV, node,
2393              "rpc call to node failed (hvinfo)")
2394     if not test:
2395       try:
2396         nimg.mfree = int(hv_info["memory_free"])
2397       except (ValueError, TypeError):
2398         _ErrorIf(True, constants.CV_ENODERPC, node,
2399                  "node returned invalid nodeinfo, check hypervisor")
2400
2401     # FIXME: devise a free space model for file based instances as well
2402     if vg_name is not None:
2403       test = (constants.NV_VGLIST not in nresult or
2404               vg_name not in nresult[constants.NV_VGLIST])
2405       _ErrorIf(test, constants.CV_ENODELVM, node,
2406                "node didn't return data for the volume group '%s'"
2407                " - it is either missing or broken", vg_name)
2408       if not test:
2409         try:
2410           nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2411         except (ValueError, TypeError):
2412           _ErrorIf(True, constants.CV_ENODERPC, node,
2413                    "node returned invalid LVM info, check LVM status")
2414
2415   def _CollectDiskInfo(self, nodelist, node_image, instanceinfo):
2416     """Gets per-disk status information for all instances.
2417
2418     @type nodelist: list of strings
2419     @param nodelist: Node names
2420     @type node_image: dict of (name, L{objects.Node})
2421     @param node_image: Node objects
2422     @type instanceinfo: dict of (name, L{objects.Instance})
2423     @param instanceinfo: Instance objects
2424     @rtype: {instance: {node: [(succes, payload)]}}
2425     @return: a dictionary of per-instance dictionaries with nodes as
2426         keys and disk information as values; the disk information is a
2427         list of tuples (success, payload)
2428
2429     """
2430     _ErrorIf = self._ErrorIf # pylint: disable=C0103
2431
2432     node_disks = {}
2433     node_disks_devonly = {}
2434     diskless_instances = set()
2435     diskless = constants.DT_DISKLESS
2436
2437     for nname in nodelist:
2438       node_instances = list(itertools.chain(node_image[nname].pinst,
2439                                             node_image[nname].sinst))
2440       diskless_instances.update(inst for inst in node_instances
2441                                 if instanceinfo[inst].disk_template == diskless)
2442       disks = [(inst, disk)
2443                for inst in node_instances
2444                for disk in instanceinfo[inst].disks]
2445
2446       if not disks:
2447         # No need to collect data
2448         continue
2449
2450       node_disks[nname] = disks
2451
2452       # _AnnotateDiskParams makes already copies of the disks
2453       devonly = []
2454       for (inst, dev) in disks:
2455         (anno_disk,) = AnnotateDiskParams(instanceinfo[inst], [dev], self.cfg)
2456         self.cfg.SetDiskID(anno_disk, nname)
2457         devonly.append(anno_disk)
2458
2459       node_disks_devonly[nname] = devonly
2460
2461     assert len(node_disks) == len(node_disks_devonly)
2462
2463     # Collect data from all nodes with disks
2464     result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2465                                                           node_disks_devonly)
2466
2467     assert len(result) == len(node_disks)
2468
2469     instdisk = {}
2470
2471     for (nname, nres) in result.items():
2472       disks = node_disks[nname]
2473
2474       if nres.offline:
2475         # No data from this node
2476         data = len(disks) * [(False, "node offline")]
2477       else:
2478         msg = nres.fail_msg
2479         _ErrorIf(msg, constants.CV_ENODERPC, nname,
2480                  "while getting disk information: %s", msg)
2481         if msg:
2482           # No data from this node
2483           data = len(disks) * [(False, msg)]
2484         else:
2485           data = []
2486           for idx, i in enumerate(nres.payload):
2487             if isinstance(i, (tuple, list)) and len(i) == 2:
2488               data.append(i)
2489             else:
2490               logging.warning("Invalid result from node %s, entry %d: %s",
2491                               nname, idx, i)
2492               data.append((False, "Invalid result from the remote node"))
2493
2494       for ((inst, _), status) in zip(disks, data):
2495         instdisk.setdefault(inst, {}).setdefault(nname, []).append(status)
2496
2497     # Add empty entries for diskless instances.
2498     for inst in diskless_instances:
2499       assert inst not in instdisk
2500       instdisk[inst] = {}
2501
2502     assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2503                       len(nnames) <= len(instanceinfo[inst].all_nodes) and
2504                       compat.all(isinstance(s, (tuple, list)) and
2505                                  len(s) == 2 for s in statuses)
2506                       for inst, nnames in instdisk.items()
2507                       for nname, statuses in nnames.items())
2508     if __debug__:
2509       instdisk_keys = set(instdisk)
2510       instanceinfo_keys = set(instanceinfo)
2511       assert instdisk_keys == instanceinfo_keys, \
2512         ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2513          (instdisk_keys, instanceinfo_keys))
2514
2515     return instdisk
2516
2517   @staticmethod
2518   def _SshNodeSelector(group_uuid, all_nodes):
2519     """Create endless iterators for all potential SSH check hosts.
2520
2521     """
2522     nodes = [node for node in all_nodes
2523              if (node.group != group_uuid and
2524                  not node.offline)]
2525     keyfunc = operator.attrgetter("group")
2526
2527     return map(itertools.cycle,
2528                [sorted(map(operator.attrgetter("name"), names))
2529                 for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2530                                                   keyfunc)])
2531
2532   @classmethod
2533   def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2534     """Choose which nodes should talk to which other nodes.
2535
2536     We will make nodes contact all nodes in their group, and one node from
2537     every other group.
2538
2539     @warning: This algorithm has a known issue if one node group is much
2540       smaller than others (e.g. just one node). In such a case all other
2541       nodes will talk to the single node.
2542
2543     """
2544     online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2545     sel = cls._SshNodeSelector(group_uuid, all_nodes)
2546
2547     return (online_nodes,
2548             dict((name, sorted([i.next() for i in sel]))
2549                  for name in online_nodes))
2550
2551   def BuildHooksEnv(self):
2552     """Build hooks env.
2553
2554     Cluster-Verify hooks just ran in the post phase and their failure makes
2555     the output be logged in the verify output and the verification to fail.
2556
2557     """
2558     env = {
2559       "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2560       }
2561
2562     env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2563                for node in self.my_node_info.values())
2564
2565     return env
2566
2567   def BuildHooksNodes(self):
2568     """Build hooks nodes.
2569
2570     """
2571     return ([], self.my_node_names)
2572
2573   def Exec(self, feedback_fn):
2574     """Verify integrity of the node group, performing various test on nodes.
2575
2576     """
2577     # This method has too many local variables. pylint: disable=R0914
2578     feedback_fn("* Verifying group '%s'" % self.group_info.name)
2579
2580     if not self.my_node_names:
2581       # empty node group
2582       feedback_fn("* Empty node group, skipping verification")
2583       return True
2584
2585     self.bad = False
2586     _ErrorIf = self._ErrorIf # pylint: disable=C0103
2587     verbose = self.op.verbose
2588     self._feedback_fn = feedback_fn
2589
2590     vg_name = self.cfg.GetVGName()
2591     drbd_helper = self.cfg.GetDRBDHelper()
2592     cluster = self.cfg.GetClusterInfo()
2593     hypervisors = cluster.enabled_hypervisors
2594     node_data_list = [self.my_node_info[name] for name in self.my_node_names]
2595
2596     i_non_redundant = [] # Non redundant instances
2597     i_non_a_balanced = [] # Non auto-balanced instances
2598     i_offline = 0 # Count of offline instances
2599     n_offline = 0 # Count of offline nodes
2600     n_drained = 0 # Count of nodes being drained
2601     node_vol_should = {}
2602
2603     # FIXME: verify OS list
2604
2605     # File verification
2606     filemap = ComputeAncillaryFiles(cluster, False)
2607
2608     # do local checksums
2609     master_node = self.master_node = self.cfg.GetMasterNode()
2610     master_ip = self.cfg.GetMasterIP()
2611
2612     feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_names))
2613
2614     user_scripts = []
2615     if self.cfg.GetUseExternalMipScript():
2616       user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2617
2618     node_verify_param = {
2619       constants.NV_FILELIST:
2620         map(vcluster.MakeVirtualPath,
2621             utils.UniqueSequence(filename
2622                                  for files in filemap
2623                                  for filename in files)),
2624       constants.NV_NODELIST:
2625         self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2626                                   self.all_node_info.values()),
2627       constants.NV_HYPERVISOR: hypervisors,
2628       constants.NV_HVPARAMS:
2629         _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2630       constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2631                                  for node in node_data_list
2632                                  if not node.offline],
2633       constants.NV_INSTANCELIST: hypervisors,
2634       constants.NV_VERSION: None,
2635       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2636       constants.NV_NODESETUP: None,
2637       constants.NV_TIME: None,
2638       constants.NV_MASTERIP: (master_node, master_ip),
2639       constants.NV_OSLIST: None,
2640       constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2641       constants.NV_USERSCRIPTS: user_scripts,
2642       }
2643
2644     if vg_name is not None:
2645       node_verify_param[constants.NV_VGLIST] = None
2646       node_verify_param[constants.NV_LVLIST] = vg_name
2647       node_verify_param[constants.NV_PVLIST] = [vg_name]
2648
2649     if drbd_helper:
2650       node_verify_param[constants.NV_DRBDVERSION] = None
2651       node_verify_param[constants.NV_DRBDLIST] = None
2652       node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2653
2654     if constants.ENABLE_FILE_STORAGE or constants.ENABLE_SHARED_FILE_STORAGE:
2655       # Load file storage paths only from master node
2656       node_verify_param[constants.NV_FILE_STORAGE_PATHS] = master_node
2657
2658     # bridge checks
2659     # FIXME: this needs to be changed per node-group, not cluster-wide
2660     bridges = set()
2661     default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2662     if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2663       bridges.add(default_nicpp[constants.NIC_LINK])
2664     for instance in self.my_inst_info.values():
2665       for nic in instance.nics:
2666         full_nic = cluster.SimpleFillNIC(nic.nicparams)
2667         if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2668           bridges.add(full_nic[constants.NIC_LINK])
2669
2670     if bridges:
2671       node_verify_param[constants.NV_BRIDGES] = list(bridges)
2672
2673     # Build our expected cluster state
2674     node_image = dict((node.name, self.NodeImage(offline=node.offline,
2675                                                  name=node.name,
2676                                                  vm_capable=node.vm_capable))
2677                       for node in node_data_list)
2678
2679     # Gather OOB paths
2680     oob_paths = []
2681     for node in self.all_node_info.values():
2682       path = SupportsOob(self.cfg, node)
2683       if path and path not in oob_paths:
2684         oob_paths.append(path)
2685
2686     if oob_paths:
2687       node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2688
2689     for instance in self.my_inst_names:
2690       inst_config = self.my_inst_info[instance]
2691       if inst_config.admin_state == constants.ADMINST_OFFLINE:
2692         i_offline += 1
2693
2694       for nname in inst_config.all_nodes:
2695         if nname not in node_image:
2696           gnode = self.NodeImage(name=nname)
2697           gnode.ghost = (nname not in self.all_node_info)
2698           node_image[nname] = gnode
2699
2700       inst_config.MapLVsByNode(node_vol_should)
2701
2702       pnode = inst_config.primary_node
2703       node_image[pnode].pinst.append(instance)
2704
2705       for snode in inst_config.secondary_nodes:
2706         nimg = node_image[snode]
2707         nimg.sinst.append(instance)
2708         if pnode not in nimg.sbp:
2709           nimg.sbp[pnode] = []
2710         nimg.sbp[pnode].append(instance)
2711
2712     es_flags = rpc.GetExclusiveStorageForNodeNames(self.cfg, self.my_node_names)
2713     # The value of exclusive_storage should be the same across the group, so if
2714     # it's True for at least a node, we act as if it were set for all the nodes
2715     self._exclusive_storage = compat.any(es_flags.values())
2716     if self._exclusive_storage:
2717       node_verify_param[constants.NV_EXCLUSIVEPVS] = True
2718
2719     # At this point, we have the in-memory data structures complete,
2720     # except for the runtime information, which we'll gather next
2721
2722     # Due to the way our RPC system works, exact response times cannot be
2723     # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2724     # time before and after executing the request, we can at least have a time
2725     # window.
2726     nvinfo_starttime = time.time()
2727     all_nvinfo = self.rpc.call_node_verify(self.my_node_names,
2728                                            node_verify_param,
2729                                            self.cfg.GetClusterName())
2730     nvinfo_endtime = time.time()
2731
2732     if self.extra_lv_nodes and vg_name is not None:
2733       extra_lv_nvinfo = \
2734           self.rpc.call_node_verify(self.extra_lv_nodes,
2735                                     {constants.NV_LVLIST: vg_name},
2736                                     self.cfg.GetClusterName())
2737     else:
2738       extra_lv_nvinfo = {}
2739
2740     all_drbd_map = self.cfg.ComputeDRBDMap()
2741
2742     feedback_fn("* Gathering disk information (%s nodes)" %
2743                 len(self.my_node_names))
2744     instdisk = self._CollectDiskInfo(self.my_node_names, node_image,
2745                                      self.my_inst_info)
2746
2747     feedback_fn("* Verifying configuration file consistency")
2748
2749     # If not all nodes are being checked, we need to make sure the master node
2750     # and a non-checked vm_capable node are in the list.
2751     absent_nodes = set(self.all_node_info).difference(self.my_node_info)
2752     if absent_nodes:
2753       vf_nvinfo = all_nvinfo.copy()
2754       vf_node_info = list(self.my_node_info.values())
2755       additional_nodes = []
2756       if master_node not in self.my_node_info:
2757         additional_nodes.append(master_node)
2758         vf_node_info.append(self.all_node_info[master_node])
2759       # Add the first vm_capable node we find which is not included,
2760       # excluding the master node (which we already have)
2761       for node in absent_nodes:
2762         nodeinfo = self.all_node_info[node]
2763         if (nodeinfo.vm_capable and not nodeinfo.offline and
2764             node != master_node):
2765           additional_nodes.append(node)
2766           vf_node_info.append(self.all_node_info[node])
2767           break
2768       key = constants.NV_FILELIST
2769       vf_nvinfo.update(self.rpc.call_node_verify(additional_nodes,
2770                                                  {key: node_verify_param[key]},
2771                                                  self.cfg.GetClusterName()))
2772     else:
2773       vf_nvinfo = all_nvinfo
2774       vf_node_info = self.my_node_info.values()
2775
2776     self._VerifyFiles(_ErrorIf, vf_node_info, master_node, vf_nvinfo, filemap)
2777
2778     feedback_fn("* Verifying node status")
2779
2780     refos_img = None
2781
2782     for node_i in node_data_list:
2783       node = node_i.name
2784       nimg = node_image[node]
2785
2786       if node_i.offline:
2787         if verbose:
2788           feedback_fn("* Skipping offline node %s" % (node,))
2789         n_offline += 1
2790         continue
2791
2792       if node == master_node:
2793         ntype = "master"
2794       elif node_i.master_candidate:
2795         ntype = "master candidate"
2796       elif node_i.drained:
2797         ntype = "drained"
2798         n_drained += 1
2799       else:
2800         ntype = "regular"
2801       if verbose:
2802         feedback_fn("* Verifying node %s (%s)" % (node, ntype))
2803
2804       msg = all_nvinfo[node].fail_msg
2805       _ErrorIf(msg, constants.CV_ENODERPC, node, "while contacting node: %s",
2806                msg)
2807       if msg:
2808         nimg.rpc_fail = True
2809         continue
2810
2811       nresult = all_nvinfo[node].payload
2812
2813       nimg.call_ok = self._VerifyNode(node_i, nresult)
2814       self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2815       self._VerifyNodeNetwork(node_i, nresult)
2816       self._VerifyNodeUserScripts(node_i, nresult)
2817       self._VerifyOob(node_i, nresult)
2818       self._VerifyFileStoragePaths(node_i, nresult,
2819                                    node == master_node)
2820
2821       if nimg.vm_capable:
2822         self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
2823         self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
2824                              all_drbd_map)
2825
2826         self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2827         self._UpdateNodeInstances(node_i, nresult, nimg)
2828         self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2829         self._UpdateNodeOS(node_i, nresult, nimg)
2830
2831         if not nimg.os_fail:
2832           if refos_img is None:
2833             refos_img = nimg
2834           self._VerifyNodeOS(node_i, nimg, refos_img)
2835         self._VerifyNodeBridges(node_i, nresult, bridges)
2836
2837         # Check whether all running instancies are primary for the node. (This
2838         # can no longer be done from _VerifyInstance below, since some of the
2839         # wrong instances could be from other node groups.)
2840         non_primary_inst = set(nimg.instances).difference(nimg.pinst)
2841
2842         for inst in non_primary_inst:
2843           test = inst in self.all_inst_info
2844           _ErrorIf(test, constants.CV_EINSTANCEWRONGNODE, inst,
2845                    "instance should not run on node %s", node_i.name)
2846           _ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
2847                    "node is running unknown instance %s", inst)
2848
2849     self._VerifyGroupDRBDVersion(all_nvinfo)
2850     self._VerifyGroupLVM(node_image, vg_name)
2851
2852     for node, result in extra_lv_nvinfo.items():
2853       self._UpdateNodeVolumes(self.all_node_info[node], result.payload,
2854                               node_image[node], vg_name)
2855
2856     feedback_fn("* Verifying instance status")
2857     for instance in self.my_inst_names:
2858       if verbose:
2859         feedback_fn("* Verifying instance %s" % instance)
2860       inst_config = self.my_inst_info[instance]
2861       self._VerifyInstance(instance, inst_config, node_image,
2862                            instdisk[instance])
2863
2864       # If the instance is non-redundant we cannot survive losing its primary
2865       # node, so we are not N+1 compliant.
2866       if inst_config.disk_template not in constants.DTS_MIRRORED:
2867         i_non_redundant.append(instance)
2868
2869       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
2870         i_non_a_balanced.append(instance)
2871
2872     feedback_fn("* Verifying orphan volumes")
2873     reserved = utils.FieldSet(*cluster.reserved_lvs)
2874
2875     # We will get spurious "unknown volume" warnings if any node of this group
2876     # is secondary for an instance whose primary is in another group. To avoid
2877     # them, we find these instances and add their volumes to node_vol_should.
2878     for inst in self.all_inst_info.values():
2879       for secondary in inst.secondary_nodes:
2880         if (secondary in self.my_node_info
2881             and inst.name not in self.my_inst_info):
2882           inst.MapLVsByNode(node_vol_should)
2883           break
2884
2885     self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
2886
2887     if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
2888       feedback_fn("* Verifying N+1 Memory redundancy")
2889       self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
2890
2891     feedback_fn("* Other Notes")
2892     if i_non_redundant:
2893       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
2894                   % len(i_non_redundant))
2895
2896     if i_non_a_balanced:
2897       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
2898                   % len(i_non_a_balanced))
2899
2900     if i_offline:
2901       feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
2902
2903     if n_offline:
2904       feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
2905
2906     if n_drained:
2907       feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
2908
2909     return not self.bad
2910
2911   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
2912     """Analyze the post-hooks' result
2913
2914     This method analyses the hook result, handles it, and sends some
2915     nicely-formatted feedback back to the user.
2916
2917     @param phase: one of L{constants.HOOKS_PHASE_POST} or
2918         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
2919     @param hooks_results: the results of the multi-node hooks rpc call
2920     @param feedback_fn: function used send feedback back to the caller
2921     @param lu_result: previous Exec result
2922     @return: the new Exec result, based on the previous result
2923         and hook results
2924
2925     """
2926     # We only really run POST phase hooks, only for non-empty groups,
2927     # and are only interested in their results
2928     if not self.my_node_names:
2929       # empty node group
2930       pass
2931     elif phase == constants.HOOKS_PHASE_POST:
2932       # Used to change hooks' output to proper indentation
2933       feedback_fn("* Hooks Results")
2934       assert hooks_results, "invalid result from hooks"
2935
2936       for node_name in hooks_results:
2937         res = hooks_results[node_name]
2938         msg = res.fail_msg
2939         test = msg and not res.offline
2940         self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
2941                       "Communication failure in hooks execution: %s", msg)
2942         if res.offline or msg:
2943           # No need to investigate payload if node is offline or gave
2944           # an error.
2945           continue
2946         for script, hkr, output in res.payload:
2947           test = hkr == constants.HKR_FAIL
2948           self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
2949                         "Script %s failed, output:", script)
2950           if test:
2951             output = self._HOOKS_INDENT_RE.sub("      ", output)
2952             feedback_fn("%s" % output)
2953             lu_result = False
2954
2955     return lu_result
2956
2957
2958 class LUClusterVerifyDisks(NoHooksLU):
2959   """Verifies the cluster disks status.
2960
2961   """
2962   REQ_BGL = False
2963
2964   def ExpandNames(self):
2965     self.share_locks = ShareAll()
2966     self.needed_locks = {
2967       locking.LEVEL_NODEGROUP: locking.ALL_SET,
2968       }
2969
2970   def Exec(self, feedback_fn):
2971     group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
2972
2973     # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
2974     return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
2975                            for group in group_names])