Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / cluster.py @ 33a6464e

History | View | Annotate | Download (118.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with the cluster."""
23

    
24
import OpenSSL
25

    
26
import copy
27
import itertools
28
import logging
29
import operator
30
import os
31
import re
32
import time
33

    
34
from ganeti import compat
35
from ganeti import constants
36
from ganeti import errors
37
from ganeti import hypervisor
38
from ganeti import locking
39
from ganeti import masterd
40
from ganeti import netutils
41
from ganeti import objects
42
from ganeti import opcodes
43
from ganeti import pathutils
44
from ganeti import query
45
from ganeti import rpc
46
from ganeti import runtime
47
from ganeti import ssh
48
from ganeti import uidpool
49
from ganeti import utils
50
from ganeti import vcluster
51

    
52
from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
53
  ResultWithJobs
54
from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55
  ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56
  GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57
  GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58
  CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59
  ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob
60

    
61
import ganeti.masterd.instance
62

    
63

    
64
class LUClusterActivateMasterIp(NoHooksLU):
65
  """Activate the master IP on the master node.
66

67
  """
68
  def Exec(self, feedback_fn):
69
    """Activate the master IP.
70

71
    """
72
    master_params = self.cfg.GetMasterNetworkParameters()
73
    ems = self.cfg.GetUseExternalMipScript()
74
    result = self.rpc.call_node_activate_master_ip(master_params.uuid,
75
                                                   master_params, ems)
76
    result.Raise("Could not activate the master IP")
77

    
78

    
79
class LUClusterDeactivateMasterIp(NoHooksLU):
80
  """Deactivate the master IP on the master node.
81

82
  """
83
  def Exec(self, feedback_fn):
84
    """Deactivate the master IP.
85

86
    """
87
    master_params = self.cfg.GetMasterNetworkParameters()
88
    ems = self.cfg.GetUseExternalMipScript()
89
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
90
                                                     master_params, ems)
91
    result.Raise("Could not deactivate the master IP")
92

    
93

    
94
class LUClusterConfigQuery(NoHooksLU):
95
  """Return configuration values.
96

97
  """
98
  REQ_BGL = False
99

    
100
  def CheckArguments(self):
101
    self.cq = ClusterQuery(None, self.op.output_fields, False)
102

    
103
  def ExpandNames(self):
104
    self.cq.ExpandNames(self)
105

    
106
  def DeclareLocks(self, level):
107
    self.cq.DeclareLocks(self, level)
108

    
109
  def Exec(self, feedback_fn):
110
    result = self.cq.OldStyleQuery(self)
111

    
112
    assert len(result) == 1
113

    
114
    return result[0]
115

    
116

    
117
class LUClusterDestroy(LogicalUnit):
118
  """Logical unit for destroying the cluster.
119

120
  """
121
  HPATH = "cluster-destroy"
122
  HTYPE = constants.HTYPE_CLUSTER
123

    
124
  def BuildHooksEnv(self):
125
    """Build hooks env.
126

127
    """
128
    return {
129
      "OP_TARGET": self.cfg.GetClusterName(),
130
      }
131

    
132
  def BuildHooksNodes(self):
133
    """Build hooks nodes.
134

135
    """
136
    return ([], [])
137

    
138
  def CheckPrereq(self):
139
    """Check prerequisites.
140

141
    This checks whether the cluster is empty.
142

143
    Any errors are signaled by raising errors.OpPrereqError.
144

145
    """
146
    master = self.cfg.GetMasterNode()
147

    
148
    nodelist = self.cfg.GetNodeList()
149
    if len(nodelist) != 1 or nodelist[0] != master:
150
      raise errors.OpPrereqError("There are still %d node(s) in"
151
                                 " this cluster." % (len(nodelist) - 1),
152
                                 errors.ECODE_INVAL)
153
    instancelist = self.cfg.GetInstanceList()
154
    if instancelist:
155
      raise errors.OpPrereqError("There are still %d instance(s) in"
156
                                 " this cluster." % len(instancelist),
157
                                 errors.ECODE_INVAL)
158

    
159
  def Exec(self, feedback_fn):
160
    """Destroys the cluster.
161

162
    """
163
    master_params = self.cfg.GetMasterNetworkParameters()
164

    
165
    # Run post hooks on master node before it's removed
166
    RunPostHook(self, self.cfg.GetNodeName(master_params.uuid))
167

    
168
    ems = self.cfg.GetUseExternalMipScript()
169
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
170
                                                     master_params, ems)
171
    result.Warn("Error disabling the master IP address", self.LogWarning)
172
    return master_params.uuid
173

    
174

    
175
class LUClusterPostInit(LogicalUnit):
176
  """Logical unit for running hooks after cluster initialization.
177

178
  """
179
  HPATH = "cluster-init"
180
  HTYPE = constants.HTYPE_CLUSTER
181

    
182
  def BuildHooksEnv(self):
183
    """Build hooks env.
184

185
    """
186
    return {
187
      "OP_TARGET": self.cfg.GetClusterName(),
188
      }
189

    
190
  def BuildHooksNodes(self):
191
    """Build hooks nodes.
192

193
    """
194
    return ([], [self.cfg.GetMasterNode()])
195

    
196
  def Exec(self, feedback_fn):
197
    """Nothing to do.
198

199
    """
200
    return True
201

    
202

    
203
class ClusterQuery(QueryBase):
204
  FIELDS = query.CLUSTER_FIELDS
205

    
206
  #: Do not sort (there is only one item)
207
  SORT_FIELD = None
208

    
209
  def ExpandNames(self, lu):
210
    lu.needed_locks = {}
211

    
212
    # The following variables interact with _QueryBase._GetNames
213
    self.wanted = locking.ALL_SET
214
    self.do_locking = self.use_locking
215

    
216
    if self.do_locking:
217
      raise errors.OpPrereqError("Can not use locking for cluster queries",
218
                                 errors.ECODE_INVAL)
219

    
220
  def DeclareLocks(self, lu, level):
221
    pass
222

    
223
  def _GetQueryData(self, lu):
224
    """Computes the list of nodes and their attributes.
225

226
    """
227
    # Locking is not used
228
    assert not (compat.any(lu.glm.is_owned(level)
229
                           for level in locking.LEVELS
230
                           if level != locking.LEVEL_CLUSTER) or
231
                self.do_locking or self.use_locking)
232

    
233
    if query.CQ_CONFIG in self.requested_data:
234
      cluster = lu.cfg.GetClusterInfo()
235
      nodes = lu.cfg.GetAllNodesInfo()
236
    else:
237
      cluster = NotImplemented
238
      nodes = NotImplemented
239

    
240
    if query.CQ_QUEUE_DRAINED in self.requested_data:
241
      drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
242
    else:
243
      drain_flag = NotImplemented
244

    
245
    if query.CQ_WATCHER_PAUSE in self.requested_data:
246
      master_node_uuid = lu.cfg.GetMasterNode()
247

    
248
      result = lu.rpc.call_get_watcher_pause(master_node_uuid)
249
      result.Raise("Can't retrieve watcher pause from master node '%s'" %
250
                   lu.cfg.GetMasterNodeName())
251

    
252
      watcher_pause = result.payload
253
    else:
254
      watcher_pause = NotImplemented
255

    
256
    return query.ClusterQueryData(cluster, nodes, drain_flag, watcher_pause)
257

    
258

    
259
class LUClusterQuery(NoHooksLU):
260
  """Query cluster configuration.
261

262
  """
263
  REQ_BGL = False
264

    
265
  def ExpandNames(self):
266
    self.needed_locks = {}
267

    
268
  def Exec(self, feedback_fn):
269
    """Return cluster config.
270

271
    """
272
    cluster = self.cfg.GetClusterInfo()
273
    os_hvp = {}
274

    
275
    # Filter just for enabled hypervisors
276
    for os_name, hv_dict in cluster.os_hvp.items():
277
      os_hvp[os_name] = {}
278
      for hv_name, hv_params in hv_dict.items():
279
        if hv_name in cluster.enabled_hypervisors:
280
          os_hvp[os_name][hv_name] = hv_params
281

    
282
    # Convert ip_family to ip_version
283
    primary_ip_version = constants.IP4_VERSION
284
    if cluster.primary_ip_family == netutils.IP6Address.family:
285
      primary_ip_version = constants.IP6_VERSION
286

    
287
    result = {
288
      "software_version": constants.RELEASE_VERSION,
289
      "protocol_version": constants.PROTOCOL_VERSION,
290
      "config_version": constants.CONFIG_VERSION,
291
      "os_api_version": max(constants.OS_API_VERSIONS),
292
      "export_version": constants.EXPORT_VERSION,
293
      "vcs_version": constants.VCS_VERSION,
294
      "architecture": runtime.GetArchInfo(),
295
      "name": cluster.cluster_name,
296
      "master": self.cfg.GetMasterNodeName(),
297
      "default_hypervisor": cluster.primary_hypervisor,
298
      "enabled_hypervisors": cluster.enabled_hypervisors,
299
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
300
                        for hypervisor_name in cluster.enabled_hypervisors]),
301
      "os_hvp": os_hvp,
302
      "beparams": cluster.beparams,
303
      "osparams": cluster.osparams,
304
      "ipolicy": cluster.ipolicy,
305
      "nicparams": cluster.nicparams,
306
      "ndparams": cluster.ndparams,
307
      "diskparams": cluster.diskparams,
308
      "candidate_pool_size": cluster.candidate_pool_size,
309
      "master_netdev": cluster.master_netdev,
310
      "master_netmask": cluster.master_netmask,
311
      "use_external_mip_script": cluster.use_external_mip_script,
312
      "volume_group_name": cluster.volume_group_name,
313
      "drbd_usermode_helper": cluster.drbd_usermode_helper,
314
      "file_storage_dir": cluster.file_storage_dir,
315
      "shared_file_storage_dir": cluster.shared_file_storage_dir,
316
      "maintain_node_health": cluster.maintain_node_health,
317
      "ctime": cluster.ctime,
318
      "mtime": cluster.mtime,
319
      "uuid": cluster.uuid,
320
      "tags": list(cluster.GetTags()),
321
      "uid_pool": cluster.uid_pool,
322
      "default_iallocator": cluster.default_iallocator,
323
      "reserved_lvs": cluster.reserved_lvs,
324
      "primary_ip_version": primary_ip_version,
325
      "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
326
      "hidden_os": cluster.hidden_os,
327
      "blacklisted_os": cluster.blacklisted_os,
328
      "enabled_disk_templates": cluster.enabled_disk_templates,
329
      }
330

    
331
    return result
332

    
333

    
334
class LUClusterRedistConf(NoHooksLU):
335
  """Force the redistribution of cluster configuration.
336

337
  This is a very simple LU.
338

339
  """
340
  REQ_BGL = False
341

    
342
  def ExpandNames(self):
343
    self.needed_locks = {
344
      locking.LEVEL_NODE: locking.ALL_SET,
345
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
346
    }
347
    self.share_locks = ShareAll()
348

    
349
  def Exec(self, feedback_fn):
350
    """Redistribute the configuration.
351

352
    """
353
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
354
    RedistributeAncillaryFiles(self)
355

    
356

    
357
class LUClusterRename(LogicalUnit):
358
  """Rename the cluster.
359

360
  """
361
  HPATH = "cluster-rename"
362
  HTYPE = constants.HTYPE_CLUSTER
363

    
364
  def BuildHooksEnv(self):
365
    """Build hooks env.
366

367
    """
368
    return {
369
      "OP_TARGET": self.cfg.GetClusterName(),
370
      "NEW_NAME": self.op.name,
371
      }
372

    
373
  def BuildHooksNodes(self):
374
    """Build hooks nodes.
375

376
    """
377
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
378

    
379
  def CheckPrereq(self):
380
    """Verify that the passed name is a valid one.
381

382
    """
383
    hostname = netutils.GetHostname(name=self.op.name,
384
                                    family=self.cfg.GetPrimaryIPFamily())
385

    
386
    new_name = hostname.name
387
    self.ip = new_ip = hostname.ip
388
    old_name = self.cfg.GetClusterName()
389
    old_ip = self.cfg.GetMasterIP()
390
    if new_name == old_name and new_ip == old_ip:
391
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
392
                                 " cluster has changed",
393
                                 errors.ECODE_INVAL)
394
    if new_ip != old_ip:
395
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
396
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
397
                                   " reachable on the network" %
398
                                   new_ip, errors.ECODE_NOTUNIQUE)
399

    
400
    self.op.name = new_name
401

    
402
  def Exec(self, feedback_fn):
403
    """Rename the cluster.
404

405
    """
406
    clustername = self.op.name
407
    new_ip = self.ip
408

    
409
    # shutdown the master IP
410
    master_params = self.cfg.GetMasterNetworkParameters()
411
    ems = self.cfg.GetUseExternalMipScript()
412
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
413
                                                     master_params, ems)
414
    result.Raise("Could not disable the master role")
415

    
416
    try:
417
      cluster = self.cfg.GetClusterInfo()
418
      cluster.cluster_name = clustername
419
      cluster.master_ip = new_ip
420
      self.cfg.Update(cluster, feedback_fn)
421

    
422
      # update the known hosts file
423
      ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
424
      node_list = self.cfg.GetOnlineNodeList()
425
      try:
426
        node_list.remove(master_params.uuid)
427
      except ValueError:
428
        pass
429
      UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
430
    finally:
431
      master_params.ip = new_ip
432
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
433
                                                     master_params, ems)
434
      result.Warn("Could not re-enable the master role on the master,"
435
                  " please restart manually", self.LogWarning)
436

    
437
    return clustername
438

    
439

    
440
class LUClusterRepairDiskSizes(NoHooksLU):
441
  """Verifies the cluster disks sizes.
442

443
  """
444
  REQ_BGL = False
445

    
446
  def ExpandNames(self):
447
    if self.op.instances:
448
      (_, self.wanted_names) = GetWantedInstances(self, self.op.instances)
449
      # Not getting the node allocation lock as only a specific set of
450
      # instances (and their nodes) is going to be acquired
451
      self.needed_locks = {
452
        locking.LEVEL_NODE_RES: [],
453
        locking.LEVEL_INSTANCE: self.wanted_names,
454
        }
455
      self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
456
    else:
457
      self.wanted_names = None
458
      self.needed_locks = {
459
        locking.LEVEL_NODE_RES: locking.ALL_SET,
460
        locking.LEVEL_INSTANCE: locking.ALL_SET,
461

    
462
        # This opcode is acquires the node locks for all instances
463
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
464
        }
465

    
466
    self.share_locks = {
467
      locking.LEVEL_NODE_RES: 1,
468
      locking.LEVEL_INSTANCE: 0,
469
      locking.LEVEL_NODE_ALLOC: 1,
470
      }
471

    
472
  def DeclareLocks(self, level):
473
    if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
474
      self._LockInstancesNodes(primary_only=True, level=level)
475

    
476
  def CheckPrereq(self):
477
    """Check prerequisites.
478

479
    This only checks the optional instance list against the existing names.
480

481
    """
482
    if self.wanted_names is None:
483
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
484

    
485
    self.wanted_instances = \
486
        map(compat.snd, self.cfg.GetMultiInstanceInfoByName(self.wanted_names))
487

    
488
  def _EnsureChildSizes(self, disk):
489
    """Ensure children of the disk have the needed disk size.
490

491
    This is valid mainly for DRBD8 and fixes an issue where the
492
    children have smaller disk size.
493

494
    @param disk: an L{ganeti.objects.Disk} object
495

496
    """
497
    if disk.dev_type == constants.LD_DRBD8:
498
      assert disk.children, "Empty children for DRBD8?"
499
      fchild = disk.children[0]
500
      mismatch = fchild.size < disk.size
501
      if mismatch:
502
        self.LogInfo("Child disk has size %d, parent %d, fixing",
503
                     fchild.size, disk.size)
504
        fchild.size = disk.size
505

    
506
      # and we recurse on this child only, not on the metadev
507
      return self._EnsureChildSizes(fchild) or mismatch
508
    else:
509
      return False
510

    
511
  def Exec(self, feedback_fn):
512
    """Verify the size of cluster disks.
513

514
    """
515
    # TODO: check child disks too
516
    # TODO: check differences in size between primary/secondary nodes
517
    per_node_disks = {}
518
    for instance in self.wanted_instances:
519
      pnode = instance.primary_node
520
      if pnode not in per_node_disks:
521
        per_node_disks[pnode] = []
522
      for idx, disk in enumerate(instance.disks):
523
        per_node_disks[pnode].append((instance, idx, disk))
524

    
525
    assert not (frozenset(per_node_disks.keys()) -
526
                self.owned_locks(locking.LEVEL_NODE_RES)), \
527
      "Not owning correct locks"
528
    assert not self.owned_locks(locking.LEVEL_NODE)
529

    
530
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
531
                                               per_node_disks.keys())
532

    
533
    changed = []
534
    for node_uuid, dskl in per_node_disks.items():
535
      newl = [v[2].Copy() for v in dskl]
536
      for dsk in newl:
537
        self.cfg.SetDiskID(dsk, node_uuid)
538
      node_name = self.cfg.GetNodeName(node_uuid)
539
      result = self.rpc.call_blockdev_getdimensions(node_uuid, newl)
540
      if result.fail_msg:
541
        self.LogWarning("Failure in blockdev_getdimensions call to node"
542
                        " %s, ignoring", node_name)
543
        continue
544
      if len(result.payload) != len(dskl):
545
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
546
                        " result.payload=%s", node_name, len(dskl),
547
                        result.payload)
548
        self.LogWarning("Invalid result from node %s, ignoring node results",
549
                        node_name)
550
        continue
551
      for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
552
        if dimensions is None:
553
          self.LogWarning("Disk %d of instance %s did not return size"
554
                          " information, ignoring", idx, instance.name)
555
          continue
556
        if not isinstance(dimensions, (tuple, list)):
557
          self.LogWarning("Disk %d of instance %s did not return valid"
558
                          " dimension information, ignoring", idx,
559
                          instance.name)
560
          continue
561
        (size, spindles) = dimensions
562
        if not isinstance(size, (int, long)):
563
          self.LogWarning("Disk %d of instance %s did not return valid"
564
                          " size information, ignoring", idx, instance.name)
565
          continue
566
        size = size >> 20
567
        if size != disk.size:
568
          self.LogInfo("Disk %d of instance %s has mismatched size,"
569
                       " correcting: recorded %d, actual %d", idx,
570
                       instance.name, disk.size, size)
571
          disk.size = size
572
          self.cfg.Update(instance, feedback_fn)
573
          changed.append((instance.name, idx, "size", size))
574
        if es_flags[node_uuid]:
575
          if spindles is None:
576
            self.LogWarning("Disk %d of instance %s did not return valid"
577
                            " spindles information, ignoring", idx,
578
                            instance.name)
579
          elif disk.spindles is None or disk.spindles != spindles:
580
            self.LogInfo("Disk %d of instance %s has mismatched spindles,"
581
                         " correcting: recorded %s, actual %s",
582
                         idx, instance.name, disk.spindles, spindles)
583
            disk.spindles = spindles
584
            self.cfg.Update(instance, feedback_fn)
585
            changed.append((instance.name, idx, "spindles", disk.spindles))
586
        if self._EnsureChildSizes(disk):
587
          self.cfg.Update(instance, feedback_fn)
588
          changed.append((instance.name, idx, "size", disk.size))
589
    return changed
590

    
591

    
592
def _ValidateNetmask(cfg, netmask):
593
  """Checks if a netmask is valid.
594

595
  @type cfg: L{config.ConfigWriter}
596
  @param cfg: The cluster configuration
597
  @type netmask: int
598
  @param netmask: the netmask to be verified
599
  @raise errors.OpPrereqError: if the validation fails
600

601
  """
602
  ip_family = cfg.GetPrimaryIPFamily()
603
  try:
604
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
605
  except errors.ProgrammerError:
606
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
607
                               ip_family, errors.ECODE_INVAL)
608
  if not ipcls.ValidateNetmask(netmask):
609
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
610
                               (netmask), errors.ECODE_INVAL)
611

    
612

    
613
def CheckFileBasedStoragePathVsEnabledDiskTemplates(
614
    logging_warn_fn, file_storage_dir, enabled_disk_templates,
615
    file_disk_template):
616
  """Checks whether the given file-based storage directory is acceptable.
617

618
  Note: This function is public, because it is also used in bootstrap.py.
619

620
  @type logging_warn_fn: function
621
  @param logging_warn_fn: function which accepts a string and logs it
622
  @type file_storage_dir: string
623
  @param file_storage_dir: the directory to be used for file-based instances
624
  @type enabled_disk_templates: list of string
625
  @param enabled_disk_templates: the list of enabled disk templates
626
  @type file_disk_template: string
627
  @param file_disk_template: the file-based disk template for which the
628
      path should be checked
629

630
  """
631
  assert (file_disk_template in
632
          utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
633
  file_storage_enabled = file_disk_template in enabled_disk_templates
634
  if file_storage_dir is not None:
635
    if file_storage_dir == "":
636
      if file_storage_enabled:
637
        raise errors.OpPrereqError(
638
            "Unsetting the '%s' storage directory while having '%s' storage"
639
            " enabled is not permitted." %
640
            (file_disk_template, file_disk_template))
641
    else:
642
      if not file_storage_enabled:
643
        logging_warn_fn(
644
            "Specified a %s storage directory, although %s storage is not"
645
            " enabled." % (file_disk_template, file_disk_template))
646
  else:
647
    raise errors.ProgrammerError("Received %s storage dir with value"
648
                                 " 'None'." % file_disk_template)
649

    
650

    
651
def CheckFileStoragePathVsEnabledDiskTemplates(
652
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
653
  """Checks whether the given file storage directory is acceptable.
654

655
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
656

657
  """
658
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
659
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
660
      constants.DT_FILE)
661

    
662

    
663
def CheckSharedFileStoragePathVsEnabledDiskTemplates(
664
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
665
  """Checks whether the given shared file storage directory is acceptable.
666

667
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
668

669
  """
670
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
671
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
672
      constants.DT_SHARED_FILE)
673

    
674

    
675
class LUClusterSetParams(LogicalUnit):
676
  """Change the parameters of the cluster.
677

678
  """
679
  HPATH = "cluster-modify"
680
  HTYPE = constants.HTYPE_CLUSTER
681
  REQ_BGL = False
682

    
683
  def CheckArguments(self):
684
    """Check parameters
685

686
    """
687
    if self.op.uid_pool:
688
      uidpool.CheckUidPool(self.op.uid_pool)
689

    
690
    if self.op.add_uids:
691
      uidpool.CheckUidPool(self.op.add_uids)
692

    
693
    if self.op.remove_uids:
694
      uidpool.CheckUidPool(self.op.remove_uids)
695

    
696
    if self.op.master_netmask is not None:
697
      _ValidateNetmask(self.cfg, self.op.master_netmask)
698

    
699
    if self.op.diskparams:
700
      for dt_params in self.op.diskparams.values():
701
        utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
702
      try:
703
        utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
704
      except errors.OpPrereqError, err:
705
        raise errors.OpPrereqError("While verify diskparams options: %s" % err,
706
                                   errors.ECODE_INVAL)
707

    
708
  def ExpandNames(self):
709
    # FIXME: in the future maybe other cluster params won't require checking on
710
    # all nodes to be modified.
711
    # FIXME: This opcode changes cluster-wide settings. Is acquiring all
712
    # resource locks the right thing, shouldn't it be the BGL instead?
713
    self.needed_locks = {
714
      locking.LEVEL_NODE: locking.ALL_SET,
715
      locking.LEVEL_INSTANCE: locking.ALL_SET,
716
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
717
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
718
    }
719
    self.share_locks = ShareAll()
720

    
721
  def BuildHooksEnv(self):
722
    """Build hooks env.
723

724
    """
725
    return {
726
      "OP_TARGET": self.cfg.GetClusterName(),
727
      "NEW_VG_NAME": self.op.vg_name,
728
      }
729

    
730
  def BuildHooksNodes(self):
731
    """Build hooks nodes.
732

733
    """
734
    mn = self.cfg.GetMasterNode()
735
    return ([mn], [mn])
736

    
737
  def _CheckVgName(self, node_uuids, enabled_disk_templates,
738
                   new_enabled_disk_templates):
739
    """Check the consistency of the vg name on all nodes and in case it gets
740
       unset whether there are instances still using it.
741

742
    """
743
    if self.op.vg_name is not None and not self.op.vg_name:
744
      if self.cfg.HasAnyDiskOfType(constants.LD_LV):
745
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
746
                                   " instances exist", errors.ECODE_INVAL)
747

    
748
    if (self.op.vg_name is not None and
749
        utils.IsLvmEnabled(enabled_disk_templates)) or \
750
           (self.cfg.GetVGName() is not None and
751
            utils.LvmGetsEnabled(enabled_disk_templates,
752
                                 new_enabled_disk_templates)):
753
      self._CheckVgNameOnNodes(node_uuids)
754

    
755
  def _CheckVgNameOnNodes(self, node_uuids):
756
    """Check the status of the volume group on each node.
757

758
    """
759
    vglist = self.rpc.call_vg_list(node_uuids)
760
    for node_uuid in node_uuids:
761
      msg = vglist[node_uuid].fail_msg
762
      if msg:
763
        # ignoring down node
764
        self.LogWarning("Error while gathering data on node %s"
765
                        " (ignoring node): %s",
766
                        self.cfg.GetNodeName(node_uuid), msg)
767
        continue
768
      vgstatus = utils.CheckVolumeGroupSize(vglist[node_uuid].payload,
769
                                            self.op.vg_name,
770
                                            constants.MIN_VG_SIZE)
771
      if vgstatus:
772
        raise errors.OpPrereqError("Error on node '%s': %s" %
773
                                   (self.cfg.GetNodeName(node_uuid), vgstatus),
774
                                   errors.ECODE_ENVIRON)
775

    
776
  def _GetEnabledDiskTemplates(self, cluster):
777
    """Determines the enabled disk templates and the subset of disk templates
778
       that are newly enabled by this operation.
779

780
    """
781
    enabled_disk_templates = None
782
    new_enabled_disk_templates = []
783
    if self.op.enabled_disk_templates:
784
      enabled_disk_templates = self.op.enabled_disk_templates
785
      new_enabled_disk_templates = \
786
        list(set(enabled_disk_templates)
787
             - set(cluster.enabled_disk_templates))
788
    else:
789
      enabled_disk_templates = cluster.enabled_disk_templates
790
    return (enabled_disk_templates, new_enabled_disk_templates)
791

    
792
  @staticmethod
793
  def _CheckIpolicyVsDiskTemplates(ipolicy, enabled_disk_templates):
794
    """Checks ipolicy disk templates against enabled disk tempaltes.
795

796
    @type ipolicy: dict
797
    @param ipolicy: the new ipolicy
798
    @type enabled_disk_templates: list of string
799
    @param enabled_disk_templates: list of enabled disk templates on the
800
      cluster
801
    @rtype: errors.OpPrereqError
802
    @raises: exception if there is at least one allowed disk template that
803
      is not also enabled.
804

805
    """
806
    assert constants.IPOLICY_DTS in ipolicy
807
    allowed_disk_templates = ipolicy[constants.IPOLICY_DTS]
808
    not_enabled = []
809
    for allowed_disk_template in allowed_disk_templates:
810
      if not allowed_disk_template in enabled_disk_templates:
811
        not_enabled.append(allowed_disk_template)
812
    if not_enabled:
813
      raise errors.OpPrereqError("The following disk template are allowed"
814
                                 " by the ipolicy, but not enabled on the"
815
                                 " cluster: %s" % utils.CommaJoin(not_enabled))
816

    
817
  def _CheckIpolicy(self, cluster, enabled_disk_templates):
818
    """Checks the ipolicy.
819

820
    @type cluster: C{objects.Cluster}
821
    @param cluster: the cluster's configuration
822
    @type enabled_disk_templates: list of string
823
    @param enabled_disk_templates: list of (possibly newly) enabled disk
824
      templates
825

826
    """
827
    # FIXME: write unit tests for this
828
    if self.op.ipolicy:
829
      self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
830
                                           group_policy=False)
831

    
832
      self._CheckIpolicyVsDiskTemplates(self.new_ipolicy,
833
                                        enabled_disk_templates)
834

    
835
      all_instances = self.cfg.GetAllInstancesInfo().values()
836
      violations = set()
837
      for group in self.cfg.GetAllNodeGroupsInfo().values():
838
        instances = frozenset([inst for inst in all_instances
839
                               if compat.any(nuuid in group.members
840
                                             for nuuid in inst.all_nodes)])
841
        new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
842
        ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
843
        new = ComputeNewInstanceViolations(ipol, new_ipolicy, instances,
844
                                           self.cfg)
845
        if new:
846
          violations.update(new)
847

    
848
      if violations:
849
        self.LogWarning("After the ipolicy change the following instances"
850
                        " violate them: %s",
851
                        utils.CommaJoin(utils.NiceSort(violations)))
852
    else:
853
      self._CheckIpolicyVsDiskTemplates(cluster.ipolicy,
854
                                        enabled_disk_templates)
855

    
856
  def CheckPrereq(self):
857
    """Check prerequisites.
858

859
    This checks whether the given params don't conflict and
860
    if the given volume group is valid.
861

862
    """
863
    if self.op.drbd_helper is not None and not self.op.drbd_helper:
864
      if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
865
        raise errors.OpPrereqError("Cannot disable drbd helper while"
866
                                   " drbd-based instances exist",
867
                                   errors.ECODE_INVAL)
868

    
869
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
870
    self.cluster = cluster = self.cfg.GetClusterInfo()
871

    
872
    vm_capable_node_uuids = [node.uuid
873
                             for node in self.cfg.GetAllNodesInfo().values()
874
                             if node.uuid in node_uuids and node.vm_capable]
875

    
876
    (enabled_disk_templates, new_enabled_disk_templates) = \
877
      self._GetEnabledDiskTemplates(cluster)
878

    
879
    self._CheckVgName(vm_capable_node_uuids, enabled_disk_templates,
880
                      new_enabled_disk_templates)
881

    
882
    if self.op.file_storage_dir is not None:
883
      CheckFileStoragePathVsEnabledDiskTemplates(
884
          self.LogWarning, self.op.file_storage_dir, enabled_disk_templates)
885

    
886
    if self.op.shared_file_storage_dir is not None:
887
      CheckSharedFileStoragePathVsEnabledDiskTemplates(
888
          self.LogWarning, self.op.shared_file_storage_dir,
889
          enabled_disk_templates)
890

    
891
    if self.op.drbd_helper:
892
      # checks given drbd helper on all nodes
893
      helpers = self.rpc.call_drbd_helper(node_uuids)
894
      for (_, ninfo) in self.cfg.GetMultiNodeInfo(node_uuids):
895
        if ninfo.offline:
896
          self.LogInfo("Not checking drbd helper on offline node %s",
897
                       ninfo.name)
898
          continue
899
        msg = helpers[ninfo.uuid].fail_msg
900
        if msg:
901
          raise errors.OpPrereqError("Error checking drbd helper on node"
902
                                     " '%s': %s" % (ninfo.name, msg),
903
                                     errors.ECODE_ENVIRON)
904
        node_helper = helpers[ninfo.uuid].payload
905
        if node_helper != self.op.drbd_helper:
906
          raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
907
                                     (ninfo.name, node_helper),
908
                                     errors.ECODE_ENVIRON)
909

    
910
    # validate params changes
911
    if self.op.beparams:
912
      objects.UpgradeBeParams(self.op.beparams)
913
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
914
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
915

    
916
    if self.op.ndparams:
917
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
918
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
919

    
920
      # TODO: we need a more general way to handle resetting
921
      # cluster-level parameters to default values
922
      if self.new_ndparams["oob_program"] == "":
923
        self.new_ndparams["oob_program"] = \
924
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
925

    
926
    if self.op.hv_state:
927
      new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
928
                                           self.cluster.hv_state_static)
929
      self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
930
                               for hv, values in new_hv_state.items())
931

    
932
    if self.op.disk_state:
933
      new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
934
                                               self.cluster.disk_state_static)
935
      self.new_disk_state = \
936
        dict((storage, dict((name, cluster.SimpleFillDiskState(values))
937
                            for name, values in svalues.items()))
938
             for storage, svalues in new_disk_state.items())
939

    
940
    self._CheckIpolicy(cluster, enabled_disk_templates)
941

    
942
    if self.op.nicparams:
943
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
944
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
945
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
946
      nic_errors = []
947

    
948
      # check all instances for consistency
949
      for instance in self.cfg.GetAllInstancesInfo().values():
950
        for nic_idx, nic in enumerate(instance.nics):
951
          params_copy = copy.deepcopy(nic.nicparams)
952
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
953

    
954
          # check parameter syntax
955
          try:
956
            objects.NIC.CheckParameterSyntax(params_filled)
957
          except errors.ConfigurationError, err:
958
            nic_errors.append("Instance %s, nic/%d: %s" %
959
                              (instance.name, nic_idx, err))
960

    
961
          # if we're moving instances to routed, check that they have an ip
962
          target_mode = params_filled[constants.NIC_MODE]
963
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
964
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
965
                              " address" % (instance.name, nic_idx))
966
      if nic_errors:
967
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
968
                                   "\n".join(nic_errors), errors.ECODE_INVAL)
969

    
970
    # hypervisor list/parameters
971
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
972
    if self.op.hvparams:
973
      for hv_name, hv_dict in self.op.hvparams.items():
974
        if hv_name not in self.new_hvparams:
975
          self.new_hvparams[hv_name] = hv_dict
976
        else:
977
          self.new_hvparams[hv_name].update(hv_dict)
978

    
979
    # disk template parameters
980
    self.new_diskparams = objects.FillDict(cluster.diskparams, {})
981
    if self.op.diskparams:
982
      for dt_name, dt_params in self.op.diskparams.items():
983
        if dt_name not in self.new_diskparams:
984
          self.new_diskparams[dt_name] = dt_params
985
        else:
986
          self.new_diskparams[dt_name].update(dt_params)
987

    
988
    # os hypervisor parameters
989
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
990
    if self.op.os_hvp:
991
      for os_name, hvs in self.op.os_hvp.items():
992
        if os_name not in self.new_os_hvp:
993
          self.new_os_hvp[os_name] = hvs
994
        else:
995
          for hv_name, hv_dict in hvs.items():
996
            if hv_dict is None:
997
              # Delete if it exists
998
              self.new_os_hvp[os_name].pop(hv_name, None)
999
            elif hv_name not in self.new_os_hvp[os_name]:
1000
              self.new_os_hvp[os_name][hv_name] = hv_dict
1001
            else:
1002
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
1003

    
1004
    # os parameters
1005
    self.new_osp = objects.FillDict(cluster.osparams, {})
1006
    if self.op.osparams:
1007
      for os_name, osp in self.op.osparams.items():
1008
        if os_name not in self.new_osp:
1009
          self.new_osp[os_name] = {}
1010

    
1011
        self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
1012
                                                 use_none=True)
1013

    
1014
        if not self.new_osp[os_name]:
1015
          # we removed all parameters
1016
          del self.new_osp[os_name]
1017
        else:
1018
          # check the parameter validity (remote check)
1019
          CheckOSParams(self, False, [self.cfg.GetMasterNode()],
1020
                        os_name, self.new_osp[os_name])
1021

    
1022
    # changes to the hypervisor list
1023
    if self.op.enabled_hypervisors is not None:
1024
      self.hv_list = self.op.enabled_hypervisors
1025
      for hv in self.hv_list:
1026
        # if the hypervisor doesn't already exist in the cluster
1027
        # hvparams, we initialize it to empty, and then (in both
1028
        # cases) we make sure to fill the defaults, as we might not
1029
        # have a complete defaults list if the hypervisor wasn't
1030
        # enabled before
1031
        if hv not in new_hvp:
1032
          new_hvp[hv] = {}
1033
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
1034
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
1035
    else:
1036
      self.hv_list = cluster.enabled_hypervisors
1037

    
1038
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1039
      # either the enabled list has changed, or the parameters have, validate
1040
      for hv_name, hv_params in self.new_hvparams.items():
1041
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1042
            (self.op.enabled_hypervisors and
1043
             hv_name in self.op.enabled_hypervisors)):
1044
          # either this is a new hypervisor, or its parameters have changed
1045
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1046
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1047
          hv_class.CheckParameterSyntax(hv_params)
1048
          CheckHVParams(self, node_uuids, hv_name, hv_params)
1049

    
1050
    self._CheckDiskTemplateConsistency()
1051

    
1052
    if self.op.os_hvp:
1053
      # no need to check any newly-enabled hypervisors, since the
1054
      # defaults have already been checked in the above code-block
1055
      for os_name, os_hvp in self.new_os_hvp.items():
1056
        for hv_name, hv_params in os_hvp.items():
1057
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1058
          # we need to fill in the new os_hvp on top of the actual hv_p
1059
          cluster_defaults = self.new_hvparams.get(hv_name, {})
1060
          new_osp = objects.FillDict(cluster_defaults, hv_params)
1061
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1062
          hv_class.CheckParameterSyntax(new_osp)
1063
          CheckHVParams(self, node_uuids, hv_name, new_osp)
1064

    
1065
    if self.op.default_iallocator:
1066
      alloc_script = utils.FindFile(self.op.default_iallocator,
1067
                                    constants.IALLOCATOR_SEARCH_PATH,
1068
                                    os.path.isfile)
1069
      if alloc_script is None:
1070
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
1071
                                   " specified" % self.op.default_iallocator,
1072
                                   errors.ECODE_INVAL)
1073

    
1074
  def _CheckDiskTemplateConsistency(self):
1075
    """Check whether the disk templates that are going to be disabled
1076
       are still in use by some instances.
1077

1078
    """
1079
    if self.op.enabled_disk_templates:
1080
      cluster = self.cfg.GetClusterInfo()
1081
      instances = self.cfg.GetAllInstancesInfo()
1082

    
1083
      disk_templates_to_remove = set(cluster.enabled_disk_templates) \
1084
        - set(self.op.enabled_disk_templates)
1085
      for instance in instances.itervalues():
1086
        if instance.disk_template in disk_templates_to_remove:
1087
          raise errors.OpPrereqError("Cannot disable disk template '%s',"
1088
                                     " because instance '%s' is using it." %
1089
                                     (instance.disk_template, instance.name))
1090

    
1091
  def _SetVgName(self, feedback_fn):
1092
    """Determines and sets the new volume group name.
1093

1094
    """
1095
    if self.op.vg_name is not None:
1096
      if self.op.vg_name and not \
1097
           utils.IsLvmEnabled(self.cluster.enabled_disk_templates):
1098
        feedback_fn("Note that you specified a volume group, but did not"
1099
                    " enable any lvm disk template.")
1100
      new_volume = self.op.vg_name
1101
      if not new_volume:
1102
        if utils.IsLvmEnabled(self.cluster.enabled_disk_templates):
1103
          raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
1104
                                     " disk templates are enabled.")
1105
        new_volume = None
1106
      if new_volume != self.cfg.GetVGName():
1107
        self.cfg.SetVGName(new_volume)
1108
      else:
1109
        feedback_fn("Cluster LVM configuration already in desired"
1110
                    " state, not changing")
1111
    else:
1112
      if utils.IsLvmEnabled(self.cluster.enabled_disk_templates) and \
1113
          not self.cfg.GetVGName():
1114
        raise errors.OpPrereqError("Please specify a volume group when"
1115
                                   " enabling lvm-based disk-templates.")
1116

    
1117
  def _SetFileStorageDir(self, feedback_fn):
1118
    """Set the file storage directory.
1119

1120
    """
1121
    if self.op.file_storage_dir is not None:
1122
      if self.cluster.file_storage_dir == self.op.file_storage_dir:
1123
        feedback_fn("Global file storage dir already set to value '%s'"
1124
                    % self.cluster.file_storage_dir)
1125
      else:
1126
        self.cluster.file_storage_dir = self.op.file_storage_dir
1127

    
1128
  def Exec(self, feedback_fn):
1129
    """Change the parameters of the cluster.
1130

1131
    """
1132
    if self.op.enabled_disk_templates:
1133
      self.cluster.enabled_disk_templates = \
1134
        list(set(self.op.enabled_disk_templates))
1135

    
1136
    self._SetVgName(feedback_fn)
1137
    self._SetFileStorageDir(feedback_fn)
1138

    
1139
    if self.op.drbd_helper is not None:
1140
      if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
1141
        feedback_fn("Note that you specified a drbd user helper, but did"
1142
                    " enabled the drbd disk template.")
1143
      new_helper = self.op.drbd_helper
1144
      if not new_helper:
1145
        new_helper = None
1146
      if new_helper != self.cfg.GetDRBDHelper():
1147
        self.cfg.SetDRBDHelper(new_helper)
1148
      else:
1149
        feedback_fn("Cluster DRBD helper already in desired state,"
1150
                    " not changing")
1151
    if self.op.hvparams:
1152
      self.cluster.hvparams = self.new_hvparams
1153
    if self.op.os_hvp:
1154
      self.cluster.os_hvp = self.new_os_hvp
1155
    if self.op.enabled_hypervisors is not None:
1156
      self.cluster.hvparams = self.new_hvparams
1157
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1158
    if self.op.beparams:
1159
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1160
    if self.op.nicparams:
1161
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1162
    if self.op.ipolicy:
1163
      self.cluster.ipolicy = self.new_ipolicy
1164
    if self.op.osparams:
1165
      self.cluster.osparams = self.new_osp
1166
    if self.op.ndparams:
1167
      self.cluster.ndparams = self.new_ndparams
1168
    if self.op.diskparams:
1169
      self.cluster.diskparams = self.new_diskparams
1170
    if self.op.hv_state:
1171
      self.cluster.hv_state_static = self.new_hv_state
1172
    if self.op.disk_state:
1173
      self.cluster.disk_state_static = self.new_disk_state
1174

    
1175
    if self.op.candidate_pool_size is not None:
1176
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1177
      # we need to update the pool size here, otherwise the save will fail
1178
      AdjustCandidatePool(self, [])
1179

    
1180
    if self.op.maintain_node_health is not None:
1181
      if self.op.maintain_node_health and not constants.ENABLE_CONFD:
1182
        feedback_fn("Note: CONFD was disabled at build time, node health"
1183
                    " maintenance is not useful (still enabling it)")
1184
      self.cluster.maintain_node_health = self.op.maintain_node_health
1185

    
1186
    if self.op.modify_etc_hosts is not None:
1187
      self.cluster.modify_etc_hosts = self.op.modify_etc_hosts
1188

    
1189
    if self.op.prealloc_wipe_disks is not None:
1190
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
1191

    
1192
    if self.op.add_uids is not None:
1193
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
1194

    
1195
    if self.op.remove_uids is not None:
1196
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
1197

    
1198
    if self.op.uid_pool is not None:
1199
      self.cluster.uid_pool = self.op.uid_pool
1200

    
1201
    if self.op.default_iallocator is not None:
1202
      self.cluster.default_iallocator = self.op.default_iallocator
1203

    
1204
    if self.op.reserved_lvs is not None:
1205
      self.cluster.reserved_lvs = self.op.reserved_lvs
1206

    
1207
    if self.op.use_external_mip_script is not None:
1208
      self.cluster.use_external_mip_script = self.op.use_external_mip_script
1209

    
1210
    def helper_os(aname, mods, desc):
1211
      desc += " OS list"
1212
      lst = getattr(self.cluster, aname)
1213
      for key, val in mods:
1214
        if key == constants.DDM_ADD:
1215
          if val in lst:
1216
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1217
          else:
1218
            lst.append(val)
1219
        elif key == constants.DDM_REMOVE:
1220
          if val in lst:
1221
            lst.remove(val)
1222
          else:
1223
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1224
        else:
1225
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
1226

    
1227
    if self.op.hidden_os:
1228
      helper_os("hidden_os", self.op.hidden_os, "hidden")
1229

    
1230
    if self.op.blacklisted_os:
1231
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1232

    
1233
    if self.op.master_netdev:
1234
      master_params = self.cfg.GetMasterNetworkParameters()
1235
      ems = self.cfg.GetUseExternalMipScript()
1236
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
1237
                  self.cluster.master_netdev)
1238
      result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
1239
                                                       master_params, ems)
1240
      if not self.op.force:
1241
        result.Raise("Could not disable the master ip")
1242
      else:
1243
        if result.fail_msg:
1244
          msg = ("Could not disable the master ip (continuing anyway): %s" %
1245
                 result.fail_msg)
1246
          feedback_fn(msg)
1247
      feedback_fn("Changing master_netdev from %s to %s" %
1248
                  (master_params.netdev, self.op.master_netdev))
1249
      self.cluster.master_netdev = self.op.master_netdev
1250

    
1251
    if self.op.master_netmask:
1252
      master_params = self.cfg.GetMasterNetworkParameters()
1253
      feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1254
      result = self.rpc.call_node_change_master_netmask(
1255
                 master_params.uuid, master_params.netmask,
1256
                 self.op.master_netmask, master_params.ip,
1257
                 master_params.netdev)
1258
      result.Warn("Could not change the master IP netmask", feedback_fn)
1259
      self.cluster.master_netmask = self.op.master_netmask
1260

    
1261
    self.cfg.Update(self.cluster, feedback_fn)
1262

    
1263
    if self.op.master_netdev:
1264
      master_params = self.cfg.GetMasterNetworkParameters()
1265
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
1266
                  self.op.master_netdev)
1267
      ems = self.cfg.GetUseExternalMipScript()
1268
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
1269
                                                     master_params, ems)
1270
      result.Warn("Could not re-enable the master ip on the master,"
1271
                  " please restart manually", self.LogWarning)
1272

    
1273

    
1274
class LUClusterVerify(NoHooksLU):
1275
  """Submits all jobs necessary to verify the cluster.
1276

1277
  """
1278
  REQ_BGL = False
1279

    
1280
  def ExpandNames(self):
1281
    self.needed_locks = {}
1282

    
1283
  def Exec(self, feedback_fn):
1284
    jobs = []
1285

    
1286
    if self.op.group_name:
1287
      groups = [self.op.group_name]
1288
      depends_fn = lambda: None
1289
    else:
1290
      groups = self.cfg.GetNodeGroupList()
1291

    
1292
      # Verify global configuration
1293
      jobs.append([
1294
        opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1295
        ])
1296

    
1297
      # Always depend on global verification
1298
      depends_fn = lambda: [(-len(jobs), [])]
1299

    
1300
    jobs.extend(
1301
      [opcodes.OpClusterVerifyGroup(group_name=group,
1302
                                    ignore_errors=self.op.ignore_errors,
1303
                                    depends=depends_fn())]
1304
      for group in groups)
1305

    
1306
    # Fix up all parameters
1307
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1308
      op.debug_simulate_errors = self.op.debug_simulate_errors
1309
      op.verbose = self.op.verbose
1310
      op.error_codes = self.op.error_codes
1311
      try:
1312
        op.skip_checks = self.op.skip_checks
1313
      except AttributeError:
1314
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1315

    
1316
    return ResultWithJobs(jobs)
1317

    
1318

    
1319
class _VerifyErrors(object):
1320
  """Mix-in for cluster/group verify LUs.
1321

1322
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1323
  self.op and self._feedback_fn to be available.)
1324

1325
  """
1326

    
1327
  ETYPE_FIELD = "code"
1328
  ETYPE_ERROR = "ERROR"
1329
  ETYPE_WARNING = "WARNING"
1330

    
1331
  def _Error(self, ecode, item, msg, *args, **kwargs):
1332
    """Format an error message.
1333

1334
    Based on the opcode's error_codes parameter, either format a
1335
    parseable error code, or a simpler error string.
1336

1337
    This must be called only from Exec and functions called from Exec.
1338

1339
    """
1340
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1341
    itype, etxt, _ = ecode
1342
    # If the error code is in the list of ignored errors, demote the error to a
1343
    # warning
1344
    if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1345
      ltype = self.ETYPE_WARNING
1346
    # first complete the msg
1347
    if args:
1348
      msg = msg % args
1349
    # then format the whole message
1350
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1351
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1352
    else:
1353
      if item:
1354
        item = " " + item
1355
      else:
1356
        item = ""
1357
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1358
    # and finally report it via the feedback_fn
1359
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1360
    # do not mark the operation as failed for WARN cases only
1361
    if ltype == self.ETYPE_ERROR:
1362
      self.bad = True
1363

    
1364
  def _ErrorIf(self, cond, *args, **kwargs):
1365
    """Log an error message if the passed condition is True.
1366

1367
    """
1368
    if (bool(cond)
1369
        or self.op.debug_simulate_errors): # pylint: disable=E1101
1370
      self._Error(*args, **kwargs)
1371

    
1372

    
1373
def _VerifyCertificate(filename):
1374
  """Verifies a certificate for L{LUClusterVerifyConfig}.
1375

1376
  @type filename: string
1377
  @param filename: Path to PEM file
1378

1379
  """
1380
  try:
1381
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1382
                                           utils.ReadFile(filename))
1383
  except Exception, err: # pylint: disable=W0703
1384
    return (LUClusterVerifyConfig.ETYPE_ERROR,
1385
            "Failed to load X509 certificate %s: %s" % (filename, err))
1386

    
1387
  (errcode, msg) = \
1388
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1389
                                constants.SSL_CERT_EXPIRATION_ERROR)
1390

    
1391
  if msg:
1392
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1393
  else:
1394
    fnamemsg = None
1395

    
1396
  if errcode is None:
1397
    return (None, fnamemsg)
1398
  elif errcode == utils.CERT_WARNING:
1399
    return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1400
  elif errcode == utils.CERT_ERROR:
1401
    return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1402

    
1403
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1404

    
1405

    
1406
def _GetAllHypervisorParameters(cluster, instances):
1407
  """Compute the set of all hypervisor parameters.
1408

1409
  @type cluster: L{objects.Cluster}
1410
  @param cluster: the cluster object
1411
  @param instances: list of L{objects.Instance}
1412
  @param instances: additional instances from which to obtain parameters
1413
  @rtype: list of (origin, hypervisor, parameters)
1414
  @return: a list with all parameters found, indicating the hypervisor they
1415
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1416

1417
  """
1418
  hvp_data = []
1419

    
1420
  for hv_name in cluster.enabled_hypervisors:
1421
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1422

    
1423
  for os_name, os_hvp in cluster.os_hvp.items():
1424
    for hv_name, hv_params in os_hvp.items():
1425
      if hv_params:
1426
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1427
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1428

    
1429
  # TODO: collapse identical parameter values in a single one
1430
  for instance in instances:
1431
    if instance.hvparams:
1432
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1433
                       cluster.FillHV(instance)))
1434

    
1435
  return hvp_data
1436

    
1437

    
1438
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1439
  """Verifies the cluster config.
1440

1441
  """
1442
  REQ_BGL = False
1443

    
1444
  def _VerifyHVP(self, hvp_data):
1445
    """Verifies locally the syntax of the hypervisor parameters.
1446

1447
    """
1448
    for item, hv_name, hv_params in hvp_data:
1449
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1450
             (item, hv_name))
1451
      try:
1452
        hv_class = hypervisor.GetHypervisorClass(hv_name)
1453
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1454
        hv_class.CheckParameterSyntax(hv_params)
1455
      except errors.GenericError, err:
1456
        self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1457

    
1458
  def ExpandNames(self):
1459
    self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1460
    self.share_locks = ShareAll()
1461

    
1462
  def CheckPrereq(self):
1463
    """Check prerequisites.
1464

1465
    """
1466
    # Retrieve all information
1467
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1468
    self.all_node_info = self.cfg.GetAllNodesInfo()
1469
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1470

    
1471
  def Exec(self, feedback_fn):
1472
    """Verify integrity of cluster, performing various test on nodes.
1473

1474
    """
1475
    self.bad = False
1476
    self._feedback_fn = feedback_fn
1477

    
1478
    feedback_fn("* Verifying cluster config")
1479

    
1480
    for msg in self.cfg.VerifyConfig():
1481
      self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1482

    
1483
    feedback_fn("* Verifying cluster certificate files")
1484

    
1485
    for cert_filename in pathutils.ALL_CERT_FILES:
1486
      (errcode, msg) = _VerifyCertificate(cert_filename)
1487
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1488

    
1489
    self._ErrorIf(not utils.CanRead(constants.LUXID_USER,
1490
                                    pathutils.NODED_CERT_FILE),
1491
                  constants.CV_ECLUSTERCERT,
1492
                  None,
1493
                  pathutils.NODED_CERT_FILE + " must be accessible by the " +
1494
                    constants.LUXID_USER + " user")
1495

    
1496
    feedback_fn("* Verifying hypervisor parameters")
1497

    
1498
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1499
                                                self.all_inst_info.values()))
1500

    
1501
    feedback_fn("* Verifying all nodes belong to an existing group")
1502

    
1503
    # We do this verification here because, should this bogus circumstance
1504
    # occur, it would never be caught by VerifyGroup, which only acts on
1505
    # nodes/instances reachable from existing node groups.
1506

    
1507
    dangling_nodes = set(node for node in self.all_node_info.values()
1508
                         if node.group not in self.all_group_info)
1509

    
1510
    dangling_instances = {}
1511
    no_node_instances = []
1512

    
1513
    for inst in self.all_inst_info.values():
1514
      if inst.primary_node in [node.uuid for node in dangling_nodes]:
1515
        dangling_instances.setdefault(inst.primary_node, []).append(inst)
1516
      elif inst.primary_node not in self.all_node_info:
1517
        no_node_instances.append(inst)
1518

    
1519
    pretty_dangling = [
1520
        "%s (%s)" %
1521
        (node.name,
1522
         utils.CommaJoin(inst.name for
1523
                         inst in dangling_instances.get(node.uuid, [])))
1524
        for node in dangling_nodes]
1525

    
1526
    self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1527
                  None,
1528
                  "the following nodes (and their instances) belong to a non"
1529
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1530

    
1531
    self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1532
                  None,
1533
                  "the following instances have a non-existing primary-node:"
1534
                  " %s", utils.CommaJoin(inst.name for
1535
                                         inst in no_node_instances))
1536

    
1537
    return not self.bad
1538

    
1539

    
1540
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1541
  """Verifies the status of a node group.
1542

1543
  """
1544
  HPATH = "cluster-verify"
1545
  HTYPE = constants.HTYPE_CLUSTER
1546
  REQ_BGL = False
1547

    
1548
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1549

    
1550
  class NodeImage(object):
1551
    """A class representing the logical and physical status of a node.
1552

1553
    @type uuid: string
1554
    @ivar uuid: the node UUID to which this object refers
1555
    @ivar volumes: a structure as returned from
1556
        L{ganeti.backend.GetVolumeList} (runtime)
1557
    @ivar instances: a list of running instances (runtime)
1558
    @ivar pinst: list of configured primary instances (config)
1559
    @ivar sinst: list of configured secondary instances (config)
1560
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1561
        instances for which this node is secondary (config)
1562
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1563
    @ivar dfree: free disk, as reported by the node (runtime)
1564
    @ivar offline: the offline status (config)
1565
    @type rpc_fail: boolean
1566
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1567
        not whether the individual keys were correct) (runtime)
1568
    @type lvm_fail: boolean
1569
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1570
    @type hyp_fail: boolean
1571
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1572
    @type ghost: boolean
1573
    @ivar ghost: whether this is a known node or not (config)
1574
    @type os_fail: boolean
1575
    @ivar os_fail: whether the RPC call didn't return valid OS data
1576
    @type oslist: list
1577
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1578
    @type vm_capable: boolean
1579
    @ivar vm_capable: whether the node can host instances
1580
    @type pv_min: float
1581
    @ivar pv_min: size in MiB of the smallest PVs
1582
    @type pv_max: float
1583
    @ivar pv_max: size in MiB of the biggest PVs
1584

1585
    """
1586
    def __init__(self, offline=False, uuid=None, vm_capable=True):
1587
      self.uuid = uuid
1588
      self.volumes = {}
1589
      self.instances = []
1590
      self.pinst = []
1591
      self.sinst = []
1592
      self.sbp = {}
1593
      self.mfree = 0
1594
      self.dfree = 0
1595
      self.offline = offline
1596
      self.vm_capable = vm_capable
1597
      self.rpc_fail = False
1598
      self.lvm_fail = False
1599
      self.hyp_fail = False
1600
      self.ghost = False
1601
      self.os_fail = False
1602
      self.oslist = {}
1603
      self.pv_min = None
1604
      self.pv_max = None
1605

    
1606
  def ExpandNames(self):
1607
    # This raises errors.OpPrereqError on its own:
1608
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1609

    
1610
    # Get instances in node group; this is unsafe and needs verification later
1611
    inst_uuids = \
1612
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1613

    
1614
    self.needed_locks = {
1615
      locking.LEVEL_INSTANCE: self.cfg.GetInstanceNames(inst_uuids),
1616
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1617
      locking.LEVEL_NODE: [],
1618

    
1619
      # This opcode is run by watcher every five minutes and acquires all nodes
1620
      # for a group. It doesn't run for a long time, so it's better to acquire
1621
      # the node allocation lock as well.
1622
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1623
      }
1624

    
1625
    self.share_locks = ShareAll()
1626

    
1627
  def DeclareLocks(self, level):
1628
    if level == locking.LEVEL_NODE:
1629
      # Get members of node group; this is unsafe and needs verification later
1630
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1631

    
1632
      # In Exec(), we warn about mirrored instances that have primary and
1633
      # secondary living in separate node groups. To fully verify that
1634
      # volumes for these instances are healthy, we will need to do an
1635
      # extra call to their secondaries. We ensure here those nodes will
1636
      # be locked.
1637
      for inst_name in self.owned_locks(locking.LEVEL_INSTANCE):
1638
        # Important: access only the instances whose lock is owned
1639
        instance = self.cfg.GetInstanceInfoByName(inst_name)
1640
        if instance.disk_template in constants.DTS_INT_MIRROR:
1641
          nodes.update(instance.secondary_nodes)
1642

    
1643
      self.needed_locks[locking.LEVEL_NODE] = nodes
1644

    
1645
  def CheckPrereq(self):
1646
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1647
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1648

    
1649
    group_node_uuids = set(self.group_info.members)
1650
    group_inst_uuids = \
1651
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1652

    
1653
    unlocked_node_uuids = \
1654
        group_node_uuids.difference(self.owned_locks(locking.LEVEL_NODE))
1655

    
1656
    unlocked_inst_uuids = \
1657
        group_inst_uuids.difference(
1658
          [self.cfg.GetInstanceInfoByName(name).uuid
1659
           for name in self.owned_locks(locking.LEVEL_INSTANCE)])
1660

    
1661
    if unlocked_node_uuids:
1662
      raise errors.OpPrereqError(
1663
        "Missing lock for nodes: %s" %
1664
        utils.CommaJoin(self.cfg.GetNodeNames(unlocked_node_uuids)),
1665
        errors.ECODE_STATE)
1666

    
1667
    if unlocked_inst_uuids:
1668
      raise errors.OpPrereqError(
1669
        "Missing lock for instances: %s" %
1670
        utils.CommaJoin(self.cfg.GetInstanceNames(unlocked_inst_uuids)),
1671
        errors.ECODE_STATE)
1672

    
1673
    self.all_node_info = self.cfg.GetAllNodesInfo()
1674
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1675

    
1676
    self.my_node_uuids = group_node_uuids
1677
    self.my_node_info = dict((node_uuid, self.all_node_info[node_uuid])
1678
                             for node_uuid in group_node_uuids)
1679

    
1680
    self.my_inst_uuids = group_inst_uuids
1681
    self.my_inst_info = dict((inst_uuid, self.all_inst_info[inst_uuid])
1682
                             for inst_uuid in group_inst_uuids)
1683

    
1684
    # We detect here the nodes that will need the extra RPC calls for verifying
1685
    # split LV volumes; they should be locked.
1686
    extra_lv_nodes = set()
1687

    
1688
    for inst in self.my_inst_info.values():
1689
      if inst.disk_template in constants.DTS_INT_MIRROR:
1690
        for nuuid in inst.all_nodes:
1691
          if self.all_node_info[nuuid].group != self.group_uuid:
1692
            extra_lv_nodes.add(nuuid)
1693

    
1694
    unlocked_lv_nodes = \
1695
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1696

    
1697
    if unlocked_lv_nodes:
1698
      raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1699
                                 utils.CommaJoin(unlocked_lv_nodes),
1700
                                 errors.ECODE_STATE)
1701
    self.extra_lv_nodes = list(extra_lv_nodes)
1702

    
1703
  def _VerifyNode(self, ninfo, nresult):
1704
    """Perform some basic validation on data returned from a node.
1705

1706
      - check the result data structure is well formed and has all the
1707
        mandatory fields
1708
      - check ganeti version
1709

1710
    @type ninfo: L{objects.Node}
1711
    @param ninfo: the node to check
1712
    @param nresult: the results from the node
1713
    @rtype: boolean
1714
    @return: whether overall this call was successful (and we can expect
1715
         reasonable values in the respose)
1716

1717
    """
1718
    # main result, nresult should be a non-empty dict
1719
    test = not nresult or not isinstance(nresult, dict)
1720
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1721
                  "unable to verify node: no data returned")
1722
    if test:
1723
      return False
1724

    
1725
    # compares ganeti version
1726
    local_version = constants.PROTOCOL_VERSION
1727
    remote_version = nresult.get("version", None)
1728
    test = not (remote_version and
1729
                isinstance(remote_version, (list, tuple)) and
1730
                len(remote_version) == 2)
1731
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1732
                  "connection to node returned invalid data")
1733
    if test:
1734
      return False
1735

    
1736
    test = local_version != remote_version[0]
1737
    self._ErrorIf(test, constants.CV_ENODEVERSION, ninfo.name,
1738
                  "incompatible protocol versions: master %s,"
1739
                  " node %s", local_version, remote_version[0])
1740
    if test:
1741
      return False
1742

    
1743
    # node seems compatible, we can actually try to look into its results
1744

    
1745
    # full package version
1746
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1747
                  constants.CV_ENODEVERSION, ninfo.name,
1748
                  "software version mismatch: master %s, node %s",
1749
                  constants.RELEASE_VERSION, remote_version[1],
1750
                  code=self.ETYPE_WARNING)
1751

    
1752
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1753
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1754
      for hv_name, hv_result in hyp_result.iteritems():
1755
        test = hv_result is not None
1756
        self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
1757
                      "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1758

    
1759
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1760
    if ninfo.vm_capable and isinstance(hvp_result, list):
1761
      for item, hv_name, hv_result in hvp_result:
1762
        self._ErrorIf(True, constants.CV_ENODEHV, ninfo.name,
1763
                      "hypervisor %s parameter verify failure (source %s): %s",
1764
                      hv_name, item, hv_result)
1765

    
1766
    test = nresult.get(constants.NV_NODESETUP,
1767
                       ["Missing NODESETUP results"])
1768
    self._ErrorIf(test, constants.CV_ENODESETUP, ninfo.name,
1769
                  "node setup error: %s", "; ".join(test))
1770

    
1771
    return True
1772

    
1773
  def _VerifyNodeTime(self, ninfo, nresult,
1774
                      nvinfo_starttime, nvinfo_endtime):
1775
    """Check the node time.
1776

1777
    @type ninfo: L{objects.Node}
1778
    @param ninfo: the node to check
1779
    @param nresult: the remote results for the node
1780
    @param nvinfo_starttime: the start time of the RPC call
1781
    @param nvinfo_endtime: the end time of the RPC call
1782

1783
    """
1784
    ntime = nresult.get(constants.NV_TIME, None)
1785
    try:
1786
      ntime_merged = utils.MergeTime(ntime)
1787
    except (ValueError, TypeError):
1788
      self._ErrorIf(True, constants.CV_ENODETIME, ninfo.name,
1789
                    "Node returned invalid time")
1790
      return
1791

    
1792
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1793
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1794
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1795
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1796
    else:
1797
      ntime_diff = None
1798

    
1799
    self._ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, ninfo.name,
1800
                  "Node time diverges by at least %s from master node time",
1801
                  ntime_diff)
1802

    
1803
  def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1804
    """Check the node LVM results and update info for cross-node checks.
1805

1806
    @type ninfo: L{objects.Node}
1807
    @param ninfo: the node to check
1808
    @param nresult: the remote results for the node
1809
    @param vg_name: the configured VG name
1810
    @type nimg: L{NodeImage}
1811
    @param nimg: node image
1812

1813
    """
1814
    if vg_name is None:
1815
      return
1816

    
1817
    # checks vg existence and size > 20G
1818
    vglist = nresult.get(constants.NV_VGLIST, None)
1819
    test = not vglist
1820
    self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
1821
                  "unable to check volume groups")
1822
    if not test:
1823
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1824
                                            constants.MIN_VG_SIZE)
1825
      self._ErrorIf(vgstatus, constants.CV_ENODELVM, ninfo.name, vgstatus)
1826

    
1827
    # Check PVs
1828
    (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1829
    for em in errmsgs:
1830
      self._Error(constants.CV_ENODELVM, ninfo.name, em)
1831
    if pvminmax is not None:
1832
      (nimg.pv_min, nimg.pv_max) = pvminmax
1833

    
1834
  def _VerifyGroupDRBDVersion(self, node_verify_infos):
1835
    """Check cross-node DRBD version consistency.
1836

1837
    @type node_verify_infos: dict
1838
    @param node_verify_infos: infos about nodes as returned from the
1839
      node_verify call.
1840

1841
    """
1842
    node_versions = {}
1843
    for node_uuid, ndata in node_verify_infos.items():
1844
      nresult = ndata.payload
1845
      version = nresult.get(constants.NV_DRBDVERSION, "Missing DRBD version")
1846
      node_versions[node_uuid] = version
1847

    
1848
    if len(set(node_versions.values())) > 1:
1849
      for node_uuid, version in sorted(node_versions.items()):
1850
        msg = "DRBD version mismatch: %s" % version
1851
        self._Error(constants.CV_ENODEDRBDHELPER, node_uuid, msg,
1852
                    code=self.ETYPE_WARNING)
1853

    
1854
  def _VerifyGroupLVM(self, node_image, vg_name):
1855
    """Check cross-node consistency in LVM.
1856

1857
    @type node_image: dict
1858
    @param node_image: info about nodes, mapping from node to names to
1859
      L{NodeImage} objects
1860
    @param vg_name: the configured VG name
1861

1862
    """
1863
    if vg_name is None:
1864
      return
1865

    
1866
    # Only exclusive storage needs this kind of checks
1867
    if not self._exclusive_storage:
1868
      return
1869

    
1870
    # exclusive_storage wants all PVs to have the same size (approximately),
1871
    # if the smallest and the biggest ones are okay, everything is fine.
1872
    # pv_min is None iff pv_max is None
1873
    vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1874
    if not vals:
1875
      return
1876
    (pvmin, minnode_uuid) = min((ni.pv_min, ni.uuid) for ni in vals)
1877
    (pvmax, maxnode_uuid) = max((ni.pv_max, ni.uuid) for ni in vals)
1878
    bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1879
    self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1880
                  "PV sizes differ too much in the group; smallest (%s MB) is"
1881
                  " on %s, biggest (%s MB) is on %s",
1882
                  pvmin, self.cfg.GetNodeName(minnode_uuid),
1883
                  pvmax, self.cfg.GetNodeName(maxnode_uuid))
1884

    
1885
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1886
    """Check the node bridges.
1887

1888
    @type ninfo: L{objects.Node}
1889
    @param ninfo: the node to check
1890
    @param nresult: the remote results for the node
1891
    @param bridges: the expected list of bridges
1892

1893
    """
1894
    if not bridges:
1895
      return
1896

    
1897
    missing = nresult.get(constants.NV_BRIDGES, None)
1898
    test = not isinstance(missing, list)
1899
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1900
                  "did not return valid bridge information")
1901
    if not test:
1902
      self._ErrorIf(bool(missing), constants.CV_ENODENET, ninfo.name,
1903
                    "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1904

    
1905
  def _VerifyNodeUserScripts(self, ninfo, nresult):
1906
    """Check the results of user scripts presence and executability on the node
1907

1908
    @type ninfo: L{objects.Node}
1909
    @param ninfo: the node to check
1910
    @param nresult: the remote results for the node
1911

1912
    """
1913
    test = not constants.NV_USERSCRIPTS in nresult
1914
    self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1915
                  "did not return user scripts information")
1916

    
1917
    broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
1918
    if not test:
1919
      self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1920
                    "user scripts not present or not executable: %s" %
1921
                    utils.CommaJoin(sorted(broken_scripts)))
1922

    
1923
  def _VerifyNodeNetwork(self, ninfo, nresult):
1924
    """Check the node network connectivity results.
1925

1926
    @type ninfo: L{objects.Node}
1927
    @param ninfo: the node to check
1928
    @param nresult: the remote results for the node
1929

1930
    """
1931
    test = constants.NV_NODELIST not in nresult
1932
    self._ErrorIf(test, constants.CV_ENODESSH, ninfo.name,
1933
                  "node hasn't returned node ssh connectivity data")
1934
    if not test:
1935
      if nresult[constants.NV_NODELIST]:
1936
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1937
          self._ErrorIf(True, constants.CV_ENODESSH, ninfo.name,
1938
                        "ssh communication with node '%s': %s", a_node, a_msg)
1939

    
1940
    test = constants.NV_NODENETTEST not in nresult
1941
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1942
                  "node hasn't returned node tcp connectivity data")
1943
    if not test:
1944
      if nresult[constants.NV_NODENETTEST]:
1945
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1946
        for anode in nlist:
1947
          self._ErrorIf(True, constants.CV_ENODENET, ninfo.name,
1948
                        "tcp communication with node '%s': %s",
1949
                        anode, nresult[constants.NV_NODENETTEST][anode])
1950

    
1951
    test = constants.NV_MASTERIP not in nresult
1952
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1953
                  "node hasn't returned node master IP reachability data")
1954
    if not test:
1955
      if not nresult[constants.NV_MASTERIP]:
1956
        if ninfo.uuid == self.master_node:
1957
          msg = "the master node cannot reach the master IP (not configured?)"
1958
        else:
1959
          msg = "cannot reach the master IP"
1960
        self._ErrorIf(True, constants.CV_ENODENET, ninfo.name, msg)
1961

    
1962
  def _VerifyInstance(self, instance, node_image, diskstatus):
1963
    """Verify an instance.
1964

1965
    This function checks to see if the required block devices are
1966
    available on the instance's node, and that the nodes are in the correct
1967
    state.
1968

1969
    """
1970
    pnode_uuid = instance.primary_node
1971
    pnode_img = node_image[pnode_uuid]
1972
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
1973

    
1974
    node_vol_should = {}
1975
    instance.MapLVsByNode(node_vol_should)
1976

    
1977
    cluster = self.cfg.GetClusterInfo()
1978
    ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
1979
                                                            self.group_info)
1980
    err = ComputeIPolicyInstanceViolation(ipolicy, instance, self.cfg)
1981
    self._ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance.name,
1982
                  utils.CommaJoin(err), code=self.ETYPE_WARNING)
1983

    
1984
    for node_uuid in node_vol_should:
1985
      n_img = node_image[node_uuid]
1986
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1987
        # ignore missing volumes on offline or broken nodes
1988
        continue
1989
      for volume in node_vol_should[node_uuid]:
1990
        test = volume not in n_img.volumes
1991
        self._ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance.name,
1992
                      "volume %s missing on node %s", volume,
1993
                      self.cfg.GetNodeName(node_uuid))
1994

    
1995
    if instance.admin_state == constants.ADMINST_UP:
1996
      test = instance.uuid not in pnode_img.instances and not pnode_img.offline
1997
      self._ErrorIf(test, constants.CV_EINSTANCEDOWN, instance.name,
1998
                    "instance not running on its primary node %s",
1999
                     self.cfg.GetNodeName(pnode_uuid))
2000
      self._ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE,
2001
                    instance.name, "instance is marked as running and lives on"
2002
                    " offline node %s", self.cfg.GetNodeName(pnode_uuid))
2003

    
2004
    diskdata = [(nname, success, status, idx)
2005
                for (nname, disks) in diskstatus.items()
2006
                for idx, (success, status) in enumerate(disks)]
2007

    
2008
    for nname, success, bdev_status, idx in diskdata:
2009
      # the 'ghost node' construction in Exec() ensures that we have a
2010
      # node here
2011
      snode = node_image[nname]
2012
      bad_snode = snode.ghost or snode.offline
2013
      self._ErrorIf(instance.disks_active and
2014
                    not success and not bad_snode,
2015
                    constants.CV_EINSTANCEFAULTYDISK, instance.name,
2016
                    "couldn't retrieve status for disk/%s on %s: %s",
2017
                    idx, self.cfg.GetNodeName(nname), bdev_status)
2018

    
2019
      if instance.disks_active and success and \
2020
         (bdev_status.is_degraded or
2021
          bdev_status.ldisk_status != constants.LDS_OKAY):
2022
        msg = "disk/%s on %s" % (idx, self.cfg.GetNodeName(nname))
2023
        if bdev_status.is_degraded:
2024
          msg += " is degraded"
2025
        if bdev_status.ldisk_status != constants.LDS_OKAY:
2026
          msg += "; state is '%s'" % \
2027
                 constants.LDS_NAMES[bdev_status.ldisk_status]
2028

    
2029
        self._Error(constants.CV_EINSTANCEFAULTYDISK, instance.name, msg)
2030

    
2031
    self._ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2032
                  constants.CV_ENODERPC, self.cfg.GetNodeName(pnode_uuid),
2033
                  "instance %s, connection to primary node failed",
2034
                  instance.name)
2035

    
2036
    self._ErrorIf(len(instance.secondary_nodes) > 1,
2037
                  constants.CV_EINSTANCELAYOUT, instance.name,
2038
                  "instance has multiple secondary nodes: %s",
2039
                  utils.CommaJoin(instance.secondary_nodes),
2040
                  code=self.ETYPE_WARNING)
2041

    
2042
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, instance.all_nodes)
2043
    if any(es_flags.values()):
2044
      if instance.disk_template not in constants.DTS_EXCL_STORAGE:
2045
        # Disk template not compatible with exclusive_storage: no instance
2046
        # node should have the flag set
2047
        es_nodes = [n
2048
                    for (n, es) in es_flags.items()
2049
                    if es]
2050
        self._Error(constants.CV_EINSTANCEUNSUITABLENODE, instance.name,
2051
                    "instance has template %s, which is not supported on nodes"
2052
                    " that have exclusive storage set: %s",
2053
                    instance.disk_template,
2054
                    utils.CommaJoin(self.cfg.GetNodeNames(es_nodes)))
2055
      for (idx, disk) in enumerate(instance.disks):
2056
        self._ErrorIf(disk.spindles is None,
2057
                      constants.CV_EINSTANCEMISSINGCFGPARAMETER, instance.name,
2058
                      "number of spindles not configured for disk %s while"
2059
                      " exclusive storage is enabled, try running"
2060
                      " gnt-cluster repair-disk-sizes", idx)
2061

    
2062
    if instance.disk_template in constants.DTS_INT_MIRROR:
2063
      instance_nodes = utils.NiceSort(instance.all_nodes)
2064
      instance_groups = {}
2065

    
2066
      for node_uuid in instance_nodes:
2067
        instance_groups.setdefault(self.all_node_info[node_uuid].group,
2068
                                   []).append(node_uuid)
2069

    
2070
      pretty_list = [
2071
        "%s (group %s)" % (utils.CommaJoin(self.cfg.GetNodeNames(nodes)),
2072
                           groupinfo[group].name)
2073
        # Sort so that we always list the primary node first.
2074
        for group, nodes in sorted(instance_groups.items(),
2075
                                   key=lambda (_, nodes): pnode_uuid in nodes,
2076
                                   reverse=True)]
2077

    
2078
      self._ErrorIf(len(instance_groups) > 1,
2079
                    constants.CV_EINSTANCESPLITGROUPS,
2080
                    instance.name, "instance has primary and secondary nodes in"
2081
                    " different groups: %s", utils.CommaJoin(pretty_list),
2082
                    code=self.ETYPE_WARNING)
2083

    
2084
    inst_nodes_offline = []
2085
    for snode in instance.secondary_nodes:
2086
      s_img = node_image[snode]
2087
      self._ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
2088
                    self.cfg.GetNodeName(snode),
2089
                    "instance %s, connection to secondary node failed",
2090
                    instance.name)
2091

    
2092
      if s_img.offline:
2093
        inst_nodes_offline.append(snode)
2094

    
2095
    # warn that the instance lives on offline nodes
2096
    self._ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE,
2097
                  instance.name, "instance has offline secondary node(s) %s",
2098
                  utils.CommaJoin(self.cfg.GetNodeNames(inst_nodes_offline)))
2099
    # ... or ghost/non-vm_capable nodes
2100
    for node_uuid in instance.all_nodes:
2101
      self._ErrorIf(node_image[node_uuid].ghost, constants.CV_EINSTANCEBADNODE,
2102
                    instance.name, "instance lives on ghost node %s",
2103
                    self.cfg.GetNodeName(node_uuid))
2104
      self._ErrorIf(not node_image[node_uuid].vm_capable,
2105
                    constants.CV_EINSTANCEBADNODE, instance.name,
2106
                    "instance lives on non-vm_capable node %s",
2107
                    self.cfg.GetNodeName(node_uuid))
2108

    
2109
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2110
    """Verify if there are any unknown volumes in the cluster.
2111

2112
    The .os, .swap and backup volumes are ignored. All other volumes are
2113
    reported as unknown.
2114

2115
    @type reserved: L{ganeti.utils.FieldSet}
2116
    @param reserved: a FieldSet of reserved volume names
2117

2118
    """
2119
    for node_uuid, n_img in node_image.items():
2120
      if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
2121
          self.all_node_info[node_uuid].group != self.group_uuid):
2122
        # skip non-healthy nodes
2123
        continue
2124
      for volume in n_img.volumes:
2125
        test = ((node_uuid not in node_vol_should or
2126
                volume not in node_vol_should[node_uuid]) and
2127
                not reserved.Matches(volume))
2128
        self._ErrorIf(test, constants.CV_ENODEORPHANLV,
2129
                      self.cfg.GetNodeName(node_uuid),
2130
                      "volume %s is unknown", volume)
2131

    
2132
  def _VerifyNPlusOneMemory(self, node_image, all_insts):
2133
    """Verify N+1 Memory Resilience.
2134

2135
    Check that if one single node dies we can still start all the
2136
    instances it was primary for.
2137

2138
    """
2139
    cluster_info = self.cfg.GetClusterInfo()
2140
    for node_uuid, n_img in node_image.items():
2141
      # This code checks that every node which is now listed as
2142
      # secondary has enough memory to host all instances it is
2143
      # supposed to should a single other node in the cluster fail.
2144
      # FIXME: not ready for failover to an arbitrary node
2145
      # FIXME: does not support file-backed instances
2146
      # WARNING: we currently take into account down instances as well
2147
      # as up ones, considering that even if they're down someone
2148
      # might want to start them even in the event of a node failure.
2149
      if n_img.offline or \
2150
         self.all_node_info[node_uuid].group != self.group_uuid:
2151
        # we're skipping nodes marked offline and nodes in other groups from
2152
        # the N+1 warning, since most likely we don't have good memory
2153
        # infromation from them; we already list instances living on such
2154
        # nodes, and that's enough warning
2155
        continue
2156
      #TODO(dynmem): also consider ballooning out other instances
2157
      for prinode, inst_uuids in n_img.sbp.items():
2158
        needed_mem = 0
2159
        for inst_uuid in inst_uuids:
2160
          bep = cluster_info.FillBE(all_insts[inst_uuid])
2161
          if bep[constants.BE_AUTO_BALANCE]:
2162
            needed_mem += bep[constants.BE_MINMEM]
2163
        test = n_img.mfree < needed_mem
2164
        self._ErrorIf(test, constants.CV_ENODEN1,
2165
                      self.cfg.GetNodeName(node_uuid),
2166
                      "not enough memory to accomodate instance failovers"
2167
                      " should node %s fail (%dMiB needed, %dMiB available)",
2168
                      self.cfg.GetNodeName(prinode), needed_mem, n_img.mfree)
2169

    
2170
  def _VerifyFiles(self, nodes, master_node_uuid, all_nvinfo,
2171
                   (files_all, files_opt, files_mc, files_vm)):
2172
    """Verifies file checksums collected from all nodes.
2173

2174
    @param nodes: List of L{objects.Node} objects
2175
    @param master_node_uuid: UUID of master node
2176
    @param all_nvinfo: RPC results
2177

2178
    """
2179
    # Define functions determining which nodes to consider for a file
2180
    files2nodefn = [
2181
      (files_all, None),
2182
      (files_mc, lambda node: (node.master_candidate or
2183
                               node.uuid == master_node_uuid)),
2184
      (files_vm, lambda node: node.vm_capable),
2185
      ]
2186

    
2187
    # Build mapping from filename to list of nodes which should have the file
2188
    nodefiles = {}
2189
    for (files, fn) in files2nodefn:
2190
      if fn is None:
2191
        filenodes = nodes
2192
      else:
2193
        filenodes = filter(fn, nodes)
2194
      nodefiles.update((filename,
2195
                        frozenset(map(operator.attrgetter("uuid"), filenodes)))
2196
                       for filename in files)
2197

    
2198
    assert set(nodefiles) == (files_all | files_mc | files_vm)
2199

    
2200
    fileinfo = dict((filename, {}) for filename in nodefiles)
2201
    ignore_nodes = set()
2202

    
2203
    for node in nodes:
2204
      if node.offline:
2205
        ignore_nodes.add(node.uuid)
2206
        continue
2207

    
2208
      nresult = all_nvinfo[node.uuid]
2209

    
2210
      if nresult.fail_msg or not nresult.payload:
2211
        node_files = None
2212
      else:
2213
        fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
2214
        node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2215
                          for (key, value) in fingerprints.items())
2216
        del fingerprints
2217

    
2218
      test = not (node_files and isinstance(node_files, dict))
2219
      self._ErrorIf(test, constants.CV_ENODEFILECHECK, node.name,
2220
                    "Node did not return file checksum data")
2221
      if test:
2222
        ignore_nodes.add(node.uuid)
2223
        continue
2224

    
2225
      # Build per-checksum mapping from filename to nodes having it
2226
      for (filename, checksum) in node_files.items():
2227
        assert filename in nodefiles
2228
        fileinfo[filename].setdefault(checksum, set()).add(node.uuid)
2229

    
2230
    for (filename, checksums) in fileinfo.items():
2231
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2232

    
2233
      # Nodes having the file
2234
      with_file = frozenset(node_uuid
2235
                            for node_uuids in fileinfo[filename].values()
2236
                            for node_uuid in node_uuids) - ignore_nodes
2237

    
2238
      expected_nodes = nodefiles[filename] - ignore_nodes
2239

    
2240
      # Nodes missing file
2241
      missing_file = expected_nodes - with_file
2242

    
2243
      if filename in files_opt:
2244
        # All or no nodes
2245
        self._ErrorIf(missing_file and missing_file != expected_nodes,
2246
                      constants.CV_ECLUSTERFILECHECK, None,
2247
                      "File %s is optional, but it must exist on all or no"
2248
                      " nodes (not found on %s)",
2249
                      filename,
2250
                      utils.CommaJoin(
2251
                        utils.NiceSort(
2252
                          map(self.cfg.GetNodeName, missing_file))))
2253
      else:
2254
        self._ErrorIf(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2255
                      "File %s is missing from node(s) %s", filename,
2256
                      utils.CommaJoin(
2257
                        utils.NiceSort(
2258
                          map(self.cfg.GetNodeName, missing_file))))
2259

    
2260
        # Warn if a node has a file it shouldn't
2261
        unexpected = with_file - expected_nodes
2262
        self._ErrorIf(unexpected,
2263
                      constants.CV_ECLUSTERFILECHECK, None,
2264
                      "File %s should not exist on node(s) %s",
2265
                      filename, utils.CommaJoin(
2266
                        utils.NiceSort(map(self.cfg.GetNodeName, unexpected))))
2267

    
2268
      # See if there are multiple versions of the file
2269
      test = len(checksums) > 1
2270
      if test:
2271
        variants = ["variant %s on %s" %
2272
                    (idx + 1,
2273
                     utils.CommaJoin(utils.NiceSort(
2274
                       map(self.cfg.GetNodeName, node_uuids))))
2275
                    for (idx, (checksum, node_uuids)) in
2276
                      enumerate(sorted(checksums.items()))]
2277
      else:
2278
        variants = []
2279

    
2280
      self._ErrorIf(test, constants.CV_ECLUSTERFILECHECK, None,
2281
                    "File %s found with %s different checksums (%s)",
2282
                    filename, len(checksums), "; ".join(variants))
2283

    
2284
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2285
                      drbd_map):
2286
    """Verifies and the node DRBD status.
2287

2288
    @type ninfo: L{objects.Node}
2289
    @param ninfo: the node to check
2290
    @param nresult: the remote results for the node
2291
    @param instanceinfo: the dict of instances
2292
    @param drbd_helper: the configured DRBD usermode helper
2293
    @param drbd_map: the DRBD map as returned by
2294
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2295

2296
    """
2297
    if drbd_helper:
2298
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2299
      test = (helper_result is None)
2300
      self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2301
                    "no drbd usermode helper returned")
2302
      if helper_result:
2303
        status, payload = helper_result
2304
        test = not status
2305
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2306
                      "drbd usermode helper check unsuccessful: %s", payload)
2307
        test = status and (payload != drbd_helper)
2308
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2309
                      "wrong drbd usermode helper: %s", payload)
2310

    
2311
    # compute the DRBD minors
2312
    node_drbd = {}
2313
    for minor, inst_uuid in drbd_map[ninfo.uuid].items():
2314
      test = inst_uuid not in instanceinfo
2315
      self._ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2316
                    "ghost instance '%s' in temporary DRBD map", inst_uuid)
2317
        # ghost instance should not be running, but otherwise we
2318
        # don't give double warnings (both ghost instance and
2319
        # unallocated minor in use)
2320
      if test:
2321
        node_drbd[minor] = (inst_uuid, False)
2322
      else:
2323
        instance = instanceinfo[inst_uuid]
2324
        node_drbd[minor] = (inst_uuid, instance.disks_active)
2325

    
2326
    # and now check them
2327
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2328
    test = not isinstance(used_minors, (tuple, list))
2329
    self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2330
                  "cannot parse drbd status file: %s", str(used_minors))
2331
    if test:
2332
      # we cannot check drbd status
2333
      return
2334

    
2335
    for minor, (inst_uuid, must_exist) in node_drbd.items():
2336
      test = minor not in used_minors and must_exist
2337
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2338
                    "drbd minor %d of instance %s is not active", minor,
2339
                    self.cfg.GetInstanceName(inst_uuid))
2340
    for minor in used_minors:
2341
      test = minor not in node_drbd
2342
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2343
                    "unallocated drbd minor %d is in use", minor)
2344

    
2345
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2346
    """Builds the node OS structures.
2347

2348
    @type ninfo: L{objects.Node}
2349
    @param ninfo: the node to check
2350
    @param nresult: the remote results for the node
2351
    @param nimg: the node image object
2352

2353
    """
2354
    remote_os = nresult.get(constants.NV_OSLIST, None)
2355
    test = (not isinstance(remote_os, list) or
2356
            not compat.all(isinstance(v, list) and len(v) == 7
2357
                           for v in remote_os))
2358

    
2359
    self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2360
                  "node hasn't returned valid OS data")
2361

    
2362
    nimg.os_fail = test
2363

    
2364
    if test:
2365
      return
2366

    
2367
    os_dict = {}
2368

    
2369
    for (name, os_path, status, diagnose,
2370
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2371

    
2372
      if name not in os_dict:
2373
        os_dict[name] = []
2374

    
2375
      # parameters is a list of lists instead of list of tuples due to
2376
      # JSON lacking a real tuple type, fix it:
2377
      parameters = [tuple(v) for v in parameters]
2378
      os_dict[name].append((os_path, status, diagnose,
2379
                            set(variants), set(parameters), set(api_ver)))
2380

    
2381
    nimg.oslist = os_dict
2382

    
2383
  def _VerifyNodeOS(self, ninfo, nimg, base):
2384
    """Verifies the node OS list.
2385

2386
    @type ninfo: L{objects.Node}
2387
    @param ninfo: the node to check
2388
    @param nimg: the node image object
2389
    @param base: the 'template' node we match against (e.g. from the master)
2390

2391
    """
2392
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2393

    
2394
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2395
    for os_name, os_data in nimg.oslist.items():
2396
      assert os_data, "Empty OS status for OS %s?!" % os_name
2397
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2398
      self._ErrorIf(not f_status, constants.CV_ENODEOS, ninfo.name,
2399
                    "Invalid OS %s (located at %s): %s",
2400
                    os_name, f_path, f_diag)
2401
      self._ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, ninfo.name,
2402
                    "OS '%s' has multiple entries"
2403
                    " (first one shadows the rest): %s",
2404
                    os_name, utils.CommaJoin([v[0] for v in os_data]))
2405
      # comparisons with the 'base' image
2406
      test = os_name not in base.oslist
2407
      self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2408
                    "Extra OS %s not present on reference node (%s)",
2409
                    os_name, self.cfg.GetNodeName(base.uuid))
2410
      if test:
2411
        continue
2412
      assert base.oslist[os_name], "Base node has empty OS status?"
2413
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2414
      if not b_status:
2415
        # base OS is invalid, skipping
2416
        continue
2417
      for kind, a, b in [("API version", f_api, b_api),
2418
                         ("variants list", f_var, b_var),
2419
                         ("parameters", beautify_params(f_param),
2420
                          beautify_params(b_param))]:
2421
        self._ErrorIf(a != b, constants.CV_ENODEOS, ninfo.name,
2422
                      "OS %s for %s differs from reference node %s:"
2423
                      " [%s] vs. [%s]", kind, os_name,
2424
                      self.cfg.GetNodeName(base.uuid),
2425
                      utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2426

    
2427
    # check any missing OSes
2428
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2429
    self._ErrorIf(missing, constants.CV_ENODEOS, ninfo.name,
2430
                  "OSes present on reference node %s"
2431
                  " but missing on this node: %s",
2432
                  self.cfg.GetNodeName(base.uuid), utils.CommaJoin(missing))
2433

    
2434
  def _VerifyAcceptedFileStoragePaths(self, ninfo, nresult, is_master):
2435
    """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2436

2437
    @type ninfo: L{objects.Node}
2438
    @param ninfo: the node to check
2439
    @param nresult: the remote results for the node
2440
    @type is_master: bool
2441
    @param is_master: Whether node is the master node
2442

2443
    """
2444
    cluster = self.cfg.GetClusterInfo()
2445
    if (is_master and
2446
        (cluster.IsFileStorageEnabled() or
2447
         cluster.IsSharedFileStorageEnabled())):
2448
      try:
2449
        fspaths = nresult[constants.NV_ACCEPTED_STORAGE_PATHS]
2450
      except KeyError:
2451
        # This should never happen
2452
        self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2453
                      "Node did not return forbidden file storage paths")
2454
      else:
2455
        self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2456
                      "Found forbidden file storage paths: %s",
2457
                      utils.CommaJoin(fspaths))
2458
    else:
2459
      self._ErrorIf(constants.NV_ACCEPTED_STORAGE_PATHS in nresult,
2460
                    constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2461
                    "Node should not have returned forbidden file storage"
2462
                    " paths")
2463

    
2464
  def _VerifyStoragePaths(self, ninfo, nresult, file_disk_template,
2465
                          verify_key, error_key):
2466
    """Verifies (file) storage paths.
2467

2468
    @type ninfo: L{objects.Node}
2469
    @param ninfo: the node to check
2470
    @param nresult: the remote results for the node
2471
    @type file_disk_template: string
2472
    @param file_disk_template: file-based disk template, whose directory
2473
        is supposed to be verified
2474
    @type verify_key: string
2475
    @param verify_key: key for the verification map of this file
2476
        verification step
2477
    @param error_key: error key to be added to the verification results
2478
        in case something goes wrong in this verification step
2479

2480
    """
2481
    assert (file_disk_template in
2482
            utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
2483
    cluster = self.cfg.GetClusterInfo()
2484
    if cluster.IsDiskTemplateEnabled(file_disk_template):
2485
      self._ErrorIf(
2486
          verify_key in nresult,
2487
          error_key, ninfo.name,
2488
          "The configured %s storage path is unusable: %s" %
2489
          (file_disk_template, nresult.get(verify_key)))
2490

    
2491
  def _VerifyFileStoragePaths(self, ninfo, nresult):
2492
    """Verifies (file) storage paths.
2493

2494
    @see: C{_VerifyStoragePaths}
2495

2496
    """
2497
    self._VerifyStoragePaths(
2498
        ninfo, nresult, constants.DT_FILE,
2499
        constants.NV_FILE_STORAGE_PATH,
2500
        constants.CV_ENODEFILESTORAGEPATHUNUSABLE)
2501

    
2502
  def _VerifySharedFileStoragePaths(self, ninfo, nresult):
2503
    """Verifies (file) storage paths.
2504

2505
    @see: C{_VerifyStoragePaths}
2506

2507
    """
2508
    self._VerifyStoragePaths(
2509
        ninfo, nresult, constants.DT_SHARED_FILE,
2510
        constants.NV_SHARED_FILE_STORAGE_PATH,
2511
        constants.CV_ENODESHAREDFILESTORAGEPATHUNUSABLE)
2512

    
2513
  def _VerifyOob(self, ninfo, nresult):
2514
    """Verifies out of band functionality of a node.
2515

2516
    @type ninfo: L{objects.Node}
2517
    @param ninfo: the node to check
2518
    @param nresult: the remote results for the node
2519

2520
    """
2521
    # We just have to verify the paths on master and/or master candidates
2522
    # as the oob helper is invoked on the master
2523
    if ((ninfo.master_candidate or ninfo.master_capable) and
2524
        constants.NV_OOB_PATHS in nresult):
2525
      for path_result in nresult[constants.NV_OOB_PATHS]:
2526
        self._ErrorIf(path_result, constants.CV_ENODEOOBPATH,
2527
                      ninfo.name, path_result)
2528

    
2529
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2530
    """Verifies and updates the node volume data.
2531

2532
    This function will update a L{NodeImage}'s internal structures
2533
    with data from the remote call.
2534

2535
    @type ninfo: L{objects.Node}
2536
    @param ninfo: the node to check
2537
    @param nresult: the remote results for the node
2538
    @param nimg: the node image object
2539
    @param vg_name: the configured VG name
2540

2541
    """
2542
    nimg.lvm_fail = True
2543
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2544
    if vg_name is None:
2545
      pass
2546
    elif isinstance(lvdata, basestring):
2547
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2548
                    "LVM problem on node: %s", utils.SafeEncode(lvdata))
2549
    elif not isinstance(lvdata, dict):
2550
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2551
                    "rpc call to node failed (lvlist)")
2552
    else:
2553
      nimg.volumes = lvdata
2554
      nimg.lvm_fail = False
2555

    
2556
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2557
    """Verifies and updates the node instance list.
2558

2559
    If the listing was successful, then updates this node's instance
2560
    list. Otherwise, it marks the RPC call as failed for the instance
2561
    list key.
2562

2563
    @type ninfo: L{objects.Node}
2564
    @param ninfo: the node to check
2565
    @param nresult: the remote results for the node
2566
    @param nimg: the node image object
2567

2568
    """
2569
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2570
    test = not isinstance(idata, list)
2571
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2572
                  "rpc call to node failed (instancelist): %s",
2573
                  utils.SafeEncode(str(idata)))
2574
    if test:
2575
      nimg.hyp_fail = True
2576
    else:
2577
      nimg.instances = [inst.uuid for (_, inst) in
2578
                        self.cfg.GetMultiInstanceInfoByName(idata)]
2579

    
2580
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2581
    """Verifies and computes a node information map
2582

2583
    @type ninfo: L{objects.Node}
2584
    @param ninfo: the node to check
2585
    @param nresult: the remote results for the node
2586
    @param nimg: the node image object
2587
    @param vg_name: the configured VG name
2588

2589
    """
2590
    # try to read free memory (from the hypervisor)
2591
    hv_info = nresult.get(constants.NV_HVINFO, None)
2592
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2593
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2594
                  "rpc call to node failed (hvinfo)")
2595
    if not test:
2596
      try:
2597
        nimg.mfree = int(hv_info["memory_free"])
2598
      except (ValueError, TypeError):
2599
        self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2600
                      "node returned invalid nodeinfo, check hypervisor")
2601

    
2602
    # FIXME: devise a free space model for file based instances as well
2603
    if vg_name is not None:
2604
      test = (constants.NV_VGLIST not in nresult or
2605
              vg_name not in nresult[constants.NV_VGLIST])
2606
      self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
2607
                    "node didn't return data for the volume group '%s'"
2608
                    " - it is either missing or broken", vg_name)
2609
      if not test:
2610
        try:
2611
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2612
        except (ValueError, TypeError):
2613
          self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2614
                        "node returned invalid LVM info, check LVM status")
2615

    
2616
  def _CollectDiskInfo(self, node_uuids, node_image, instanceinfo):
2617
    """Gets per-disk status information for all instances.
2618

2619
    @type node_uuids: list of strings
2620
    @param node_uuids: Node UUIDs
2621
    @type node_image: dict of (UUID, L{objects.Node})
2622
    @param node_image: Node objects
2623
    @type instanceinfo: dict of (UUID, L{objects.Instance})
2624
    @param instanceinfo: Instance objects
2625
    @rtype: {instance: {node: [(succes, payload)]}}
2626
    @return: a dictionary of per-instance dictionaries with nodes as
2627
        keys and disk information as values; the disk information is a
2628
        list of tuples (success, payload)
2629

2630
    """
2631
    node_disks = {}
2632
    node_disks_devonly = {}
2633
    diskless_instances = set()
2634
    diskless = constants.DT_DISKLESS
2635

    
2636
    for nuuid in node_uuids:
2637
      node_inst_uuids = list(itertools.chain(node_image[nuuid].pinst,
2638
                                             node_image[nuuid].sinst))
2639
      diskless_instances.update(uuid for uuid in node_inst_uuids
2640
                                if instanceinfo[uuid].disk_template == diskless)
2641
      disks = [(inst_uuid, disk)
2642
               for inst_uuid in node_inst_uuids
2643
               for disk in instanceinfo[inst_uuid].disks]
2644

    
2645
      if not disks:
2646
        # No need to collect data
2647
        continue
2648

    
2649
      node_disks[nuuid] = disks
2650

    
2651
      # _AnnotateDiskParams makes already copies of the disks
2652
      devonly = []
2653
      for (inst_uuid, dev) in disks:
2654
        (anno_disk,) = AnnotateDiskParams(instanceinfo[inst_uuid], [dev],
2655
                                          self.cfg)
2656
        self.cfg.SetDiskID(anno_disk, nuuid)
2657
        devonly.append(anno_disk)
2658

    
2659
      node_disks_devonly[nuuid] = devonly
2660

    
2661
    assert len(node_disks) == len(node_disks_devonly)
2662

    
2663
    # Collect data from all nodes with disks
2664
    result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2665
                                                          node_disks_devonly)
2666

    
2667
    assert len(result) == len(node_disks)
2668

    
2669
    instdisk = {}
2670

    
2671
    for (nuuid, nres) in result.items():
2672
      node = self.cfg.GetNodeInfo(nuuid)
2673
      disks = node_disks[node.uuid]
2674

    
2675
      if nres.offline:
2676
        # No data from this node
2677
        data = len(disks) * [(False, "node offline")]
2678
      else:
2679
        msg = nres.fail_msg
2680
        self._ErrorIf(msg, constants.CV_ENODERPC, node.name,
2681
                      "while getting disk information: %s", msg)
2682
        if msg:
2683
          # No data from this node
2684
          data = len(disks) * [(False, msg)]
2685
        else:
2686
          data = []
2687
          for idx, i in enumerate(nres.payload):
2688
            if isinstance(i, (tuple, list)) and len(i) == 2:
2689
              data.append(i)
2690
            else:
2691
              logging.warning("Invalid result from node %s, entry %d: %s",
2692
                              node.name, idx, i)
2693
              data.append((False, "Invalid result from the remote node"))
2694

    
2695
      for ((inst_uuid, _), status) in zip(disks, data):
2696
        instdisk.setdefault(inst_uuid, {}).setdefault(node.uuid, []) \
2697
          .append(status)
2698

    
2699
    # Add empty entries for diskless instances.
2700
    for inst_uuid in diskless_instances:
2701
      assert inst_uuid not in instdisk
2702
      instdisk[inst_uuid] = {}
2703

    
2704
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2705
                      len(nuuids) <= len(instanceinfo[inst].all_nodes) and
2706
                      compat.all(isinstance(s, (tuple, list)) and
2707
                                 len(s) == 2 for s in statuses)
2708
                      for inst, nuuids in instdisk.items()
2709
                      for nuuid, statuses in nuuids.items())
2710
    if __debug__:
2711
      instdisk_keys = set(instdisk)
2712
      instanceinfo_keys = set(instanceinfo)
2713
      assert instdisk_keys == instanceinfo_keys, \
2714
        ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2715
         (instdisk_keys, instanceinfo_keys))
2716

    
2717
    return instdisk
2718

    
2719
  @staticmethod
2720
  def _SshNodeSelector(group_uuid, all_nodes):
2721
    """Create endless iterators for all potential SSH check hosts.
2722

2723
    """
2724
    nodes = [node for node in all_nodes
2725
             if (node.group != group_uuid and
2726
                 not node.offline)]
2727
    keyfunc = operator.attrgetter("group")
2728

    
2729
    return map(itertools.cycle,
2730
               [sorted(map(operator.attrgetter("name"), names))
2731
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2732
                                                  keyfunc)])
2733

    
2734
  @classmethod
2735
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2736
    """Choose which nodes should talk to which other nodes.
2737

2738
    We will make nodes contact all nodes in their group, and one node from
2739
    every other group.
2740

2741
    @warning: This algorithm has a known issue if one node group is much
2742
      smaller than others (e.g. just one node). In such a case all other
2743
      nodes will talk to the single node.
2744

2745
    """
2746
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2747
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
2748

    
2749
    return (online_nodes,
2750
            dict((name, sorted([i.next() for i in sel]))
2751
                 for name in online_nodes))
2752

    
2753
  def BuildHooksEnv(self):
2754
    """Build hooks env.
2755

2756
    Cluster-Verify hooks just ran in the post phase and their failure makes
2757
    the output be logged in the verify output and the verification to fail.
2758

2759
    """
2760
    env = {
2761
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2762
      }
2763

    
2764
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2765
               for node in self.my_node_info.values())
2766

    
2767
    return env
2768

    
2769
  def BuildHooksNodes(self):
2770
    """Build hooks nodes.
2771

2772
    """
2773
    return ([], list(self.my_node_info.keys()))
2774

    
2775
  def Exec(self, feedback_fn):
2776
    """Verify integrity of the node group, performing various test on nodes.
2777

2778
    """
2779
    # This method has too many local variables. pylint: disable=R0914
2780
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
2781

    
2782
    if not self.my_node_uuids:
2783
      # empty node group
2784
      feedback_fn("* Empty node group, skipping verification")
2785
      return True
2786

    
2787
    self.bad = False
2788
    verbose = self.op.verbose
2789
    self._feedback_fn = feedback_fn
2790

    
2791
    vg_name = self.cfg.GetVGName()
2792
    drbd_helper = self.cfg.GetDRBDHelper()
2793
    cluster = self.cfg.GetClusterInfo()
2794
    hypervisors = cluster.enabled_hypervisors
2795
    node_data_list = self.my_node_info.values()
2796

    
2797
    i_non_redundant = [] # Non redundant instances
2798
    i_non_a_balanced = [] # Non auto-balanced instances
2799
    i_offline = 0 # Count of offline instances
2800
    n_offline = 0 # Count of offline nodes
2801
    n_drained = 0 # Count of nodes being drained
2802
    node_vol_should = {}
2803

    
2804
    # FIXME: verify OS list
2805

    
2806
    # File verification
2807
    filemap = ComputeAncillaryFiles(cluster, False)
2808

    
2809
    # do local checksums
2810
    master_node_uuid = self.master_node = self.cfg.GetMasterNode()
2811
    master_ip = self.cfg.GetMasterIP()
2812

    
2813
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_uuids))
2814

    
2815
    user_scripts = []
2816
    if self.cfg.GetUseExternalMipScript():
2817
      user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2818

    
2819
    node_verify_param = {
2820
      constants.NV_FILELIST:
2821
        map(vcluster.MakeVirtualPath,
2822
            utils.UniqueSequence(filename
2823
                                 for files in filemap
2824
                                 for filename in files)),
2825
      constants.NV_NODELIST:
2826
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2827
                                  self.all_node_info.values()),
2828
      constants.NV_HYPERVISOR: hypervisors,
2829
      constants.NV_HVPARAMS:
2830
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2831
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2832
                                 for node in node_data_list
2833
                                 if not node.offline],
2834
      constants.NV_INSTANCELIST: hypervisors,
2835
      constants.NV_VERSION: None,
2836
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2837
      constants.NV_NODESETUP: None,
2838
      constants.NV_TIME: None,
2839
      constants.NV_MASTERIP: (self.cfg.GetMasterNodeName(), master_ip),
2840
      constants.NV_OSLIST: None,
2841
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2842
      constants.NV_USERSCRIPTS: user_scripts,
2843
      }
2844

    
2845
    if vg_name is not None:
2846
      node_verify_param[constants.NV_VGLIST] = None
2847
      node_verify_param[constants.NV_LVLIST] = vg_name
2848
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2849

    
2850
    if drbd_helper:
2851
      node_verify_param[constants.NV_DRBDVERSION] = None
2852
      node_verify_param[constants.NV_DRBDLIST] = None
2853
      node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2854

    
2855
    if cluster.IsFileStorageEnabled() or \
2856
        cluster.IsSharedFileStorageEnabled():
2857
      # Load file storage paths only from master node
2858
      node_verify_param[constants.NV_ACCEPTED_STORAGE_PATHS] = \
2859
        self.cfg.GetMasterNodeName()
2860
      if cluster.IsFileStorageEnabled():
2861
        node_verify_param[constants.NV_FILE_STORAGE_PATH] = \
2862
          cluster.file_storage_dir
2863

    
2864
    # bridge checks
2865
    # FIXME: this needs to be changed per node-group, not cluster-wide
2866
    bridges = set()
2867
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2868
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2869
      bridges.add(default_nicpp[constants.NIC_LINK])
2870
    for inst_uuid in self.my_inst_info.values():
2871
      for nic in inst_uuid.nics:
2872
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
2873
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2874
          bridges.add(full_nic[constants.NIC_LINK])
2875

    
2876
    if bridges:
2877
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
2878

    
2879
    # Build our expected cluster state
2880
    node_image = dict((node.uuid, self.NodeImage(offline=node.offline,
2881
                                                 uuid=node.uuid,
2882
                                                 vm_capable=node.vm_capable))
2883
                      for node in node_data_list)
2884

    
2885
    # Gather OOB paths
2886
    oob_paths = []
2887
    for node in self.all_node_info.values():
2888
      path = SupportsOob(self.cfg, node)
2889
      if path and path not in oob_paths:
2890
        oob_paths.append(path)
2891

    
2892
    if oob_paths:
2893
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2894

    
2895
    for inst_uuid in self.my_inst_uuids:
2896
      instance = self.my_inst_info[inst_uuid]
2897
      if instance.admin_state == constants.ADMINST_OFFLINE:
2898
        i_offline += 1
2899

    
2900
      for nuuid in instance.all_nodes:
2901
        if nuuid not in node_image:
2902
          gnode = self.NodeImage(uuid=nuuid)
2903
          gnode.ghost = (nuuid not in self.all_node_info)
2904
          node_image[nuuid] = gnode
2905

    
2906
      instance.MapLVsByNode(node_vol_should)
2907

    
2908
      pnode = instance.primary_node
2909
      node_image[pnode].pinst.append(instance.uuid)
2910

    
2911
      for snode in instance.secondary_nodes:
2912
        nimg = node_image[snode]
2913
        nimg.sinst.append(instance.uuid)
2914
        if pnode not in nimg.sbp:
2915
          nimg.sbp[pnode] = []
2916
        nimg.sbp[pnode].append(instance.uuid)
2917

    
2918
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
2919
                                               self.my_node_info.keys())
2920
    # The value of exclusive_storage should be the same across the group, so if
2921
    # it's True for at least a node, we act as if it were set for all the nodes
2922
    self._exclusive_storage = compat.any(es_flags.values())
2923
    if self._exclusive_storage:
2924
      node_verify_param[constants.NV_EXCLUSIVEPVS] = True
2925

    
2926
    # At this point, we have the in-memory data structures complete,
2927
    # except for the runtime information, which we'll gather next
2928

    
2929
    # Due to the way our RPC system works, exact response times cannot be
2930
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2931
    # time before and after executing the request, we can at least have a time
2932
    # window.
2933
    nvinfo_starttime = time.time()
2934
    all_nvinfo = self.rpc.call_node_verify(self.my_node_uuids,
2935
                                           node_verify_param,
2936
                                           self.cfg.GetClusterName(),
2937
                                           self.cfg.GetClusterInfo().hvparams)
2938
    nvinfo_endtime = time.time()
2939

    
2940
    if self.extra_lv_nodes and vg_name is not None:
2941
      extra_lv_nvinfo = \
2942
          self.rpc.call_node_verify(self.extra_lv_nodes,
2943
                                    {constants.NV_LVLIST: vg_name},
2944
                                    self.cfg.GetClusterName(),
2945
                                    self.cfg.GetClusterInfo().hvparams)
2946
    else:
2947
      extra_lv_nvinfo = {}
2948

    
2949
    all_drbd_map = self.cfg.ComputeDRBDMap()
2950

    
2951
    feedback_fn("* Gathering disk information (%s nodes)" %
2952
                len(self.my_node_uuids))
2953
    instdisk = self._CollectDiskInfo(self.my_node_info.keys(), node_image,
2954
                                     self.my_inst_info)
2955

    
2956
    feedback_fn("* Verifying configuration file consistency")
2957

    
2958
    # If not all nodes are being checked, we need to make sure the master node
2959
    # and a non-checked vm_capable node are in the list.
2960
    absent_node_uuids = set(self.all_node_info).difference(self.my_node_info)
2961
    if absent_node_uuids:
2962
      vf_nvinfo = all_nvinfo.copy()
2963
      vf_node_info = list(self.my_node_info.values())
2964
      additional_node_uuids = []
2965
      if master_node_uuid not in self.my_node_info:
2966
        additional_node_uuids.append(master_node_uuid)
2967
        vf_node_info.append(self.all_node_info[master_node_uuid])
2968
      # Add the first vm_capable node we find which is not included,
2969
      # excluding the master node (which we already have)
2970
      for node_uuid in absent_node_uuids:
2971
        nodeinfo = self.all_node_info[node_uuid]
2972
        if (nodeinfo.vm_capable and not nodeinfo.offline and
2973
            node_uuid != master_node_uuid):
2974
          additional_node_uuids.append(node_uuid)
2975
          vf_node_info.append(self.all_node_info[node_uuid])
2976
          break
2977
      key = constants.NV_FILELIST
2978
      vf_nvinfo.update(self.rpc.call_node_verify(
2979
         additional_node_uuids, {key: node_verify_param[key]},
2980
         self.cfg.GetClusterName(), self.cfg.GetClusterInfo().hvparams))
2981
    else:
2982
      vf_nvinfo = all_nvinfo
2983
      vf_node_info = self.my_node_info.values()
2984

    
2985
    self._VerifyFiles(vf_node_info, master_node_uuid, vf_nvinfo, filemap)
2986

    
2987
    feedback_fn("* Verifying node status")
2988

    
2989
    refos_img = None
2990

    
2991
    for node_i in node_data_list:
2992
      nimg = node_image[node_i.uuid]
2993

    
2994
      if node_i.offline:
2995
        if verbose:
2996
          feedback_fn("* Skipping offline node %s" % (node_i.name,))
2997
        n_offline += 1
2998
        continue
2999

    
3000
      if node_i.uuid == master_node_uuid:
3001
        ntype = "master"
3002
      elif node_i.master_candidate:
3003
        ntype = "master candidate"
3004
      elif node_i.drained:
3005
        ntype = "drained"
3006
        n_drained += 1
3007
      else:
3008
        ntype = "regular"
3009
      if verbose:
3010
        feedback_fn("* Verifying node %s (%s)" % (node_i.name, ntype))
3011

    
3012
      msg = all_nvinfo[node_i.uuid].fail_msg
3013
      self._ErrorIf(msg, constants.CV_ENODERPC, node_i.name,
3014
                    "while contacting node: %s", msg)
3015
      if msg:
3016
        nimg.rpc_fail = True
3017
        continue
3018

    
3019
      nresult = all_nvinfo[node_i.uuid].payload
3020

    
3021
      nimg.call_ok = self._VerifyNode(node_i, nresult)
3022
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
3023
      self._VerifyNodeNetwork(node_i, nresult)
3024
      self._VerifyNodeUserScripts(node_i, nresult)
3025
      self._VerifyOob(node_i, nresult)
3026
      self._VerifyAcceptedFileStoragePaths(node_i, nresult,
3027
                                           node_i.uuid == master_node_uuid)
3028
      self._VerifyFileStoragePaths(node_i, nresult)
3029
      self._VerifySharedFileStoragePaths(node_i, nresult)
3030

    
3031
      if nimg.vm_capable:
3032
        self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
3033
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
3034
                             all_drbd_map)
3035

    
3036
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
3037
        self._UpdateNodeInstances(node_i, nresult, nimg)
3038
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
3039
        self._UpdateNodeOS(node_i, nresult, nimg)
3040

    
3041
        if not nimg.os_fail:
3042
          if refos_img is None:
3043
            refos_img = nimg
3044
          self._VerifyNodeOS(node_i, nimg, refos_img)
3045
        self._VerifyNodeBridges(node_i, nresult, bridges)
3046

    
3047
        # Check whether all running instances are primary for the node. (This
3048
        # can no longer be done from _VerifyInstance below, since some of the
3049
        # wrong instances could be from other node groups.)
3050
        non_primary_inst_uuids = set(nimg.instances).difference(nimg.pinst)
3051

    
3052
        for inst_uuid in non_primary_inst_uuids:
3053
          test = inst_uuid in self.all_inst_info
3054
          self._ErrorIf(test, constants.CV_EINSTANCEWRONGNODE,
3055
                        self.cfg.GetInstanceName(inst_uuid),
3056
                        "instance should not run on node %s", node_i.name)
3057
          self._ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
3058
                        "node is running unknown instance %s", inst_uuid)
3059

    
3060
    self._VerifyGroupDRBDVersion(all_nvinfo)
3061
    self._VerifyGroupLVM(node_image, vg_name)
3062

    
3063
    for node_uuid, result in extra_lv_nvinfo.items():
3064
      self._UpdateNodeVolumes(self.all_node_info[node_uuid], result.payload,
3065
                              node_image[node_uuid], vg_name)
3066

    
3067
    feedback_fn("* Verifying instance status")
3068
    for inst_uuid in self.my_inst_uuids:
3069
      instance = self.my_inst_info[inst_uuid]
3070
      if verbose:
3071
        feedback_fn("* Verifying instance %s" % instance.name)
3072
      self._VerifyInstance(instance, node_image, instdisk[inst_uuid])
3073

    
3074
      # If the instance is non-redundant we cannot survive losing its primary
3075
      # node, so we are not N+1 compliant.
3076
      if instance.disk_template not in constants.DTS_MIRRORED:
3077
        i_non_redundant.append(instance)
3078

    
3079
      if not cluster.FillBE(instance)[constants.BE_AUTO_BALANCE]:
3080
        i_non_a_balanced.append(instance)
3081

    
3082
    feedback_fn("* Verifying orphan volumes")
3083
    reserved = utils.FieldSet(*cluster.reserved_lvs)
3084

    
3085
    # We will get spurious "unknown volume" warnings if any node of this group
3086
    # is secondary for an instance whose primary is in another group. To avoid
3087
    # them, we find these instances and add their volumes to node_vol_should.
3088
    for instance in self.all_inst_info.values():
3089
      for secondary in instance.secondary_nodes:
3090
        if (secondary in self.my_node_info
3091
            and instance.name not in self.my_inst_info):
3092
          instance.MapLVsByNode(node_vol_should)
3093
          break
3094

    
3095
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
3096

    
3097
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
3098
      feedback_fn("* Verifying N+1 Memory redundancy")
3099
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
3100

    
3101
    feedback_fn("* Other Notes")
3102
    if i_non_redundant:
3103
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
3104
                  % len(i_non_redundant))
3105

    
3106
    if i_non_a_balanced:
3107
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
3108
                  % len(i_non_a_balanced))
3109

    
3110
    if i_offline:
3111
      feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
3112

    
3113
    if n_offline:
3114
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
3115

    
3116
    if n_drained:
3117
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
3118

    
3119
    return not self.bad
3120

    
3121
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
3122
    """Analyze the post-hooks' result
3123

3124
    This method analyses the hook result, handles it, and sends some
3125
    nicely-formatted feedback back to the user.
3126

3127
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
3128
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
3129
    @param hooks_results: the results of the multi-node hooks rpc call
3130
    @param feedback_fn: function used send feedback back to the caller
3131
    @param lu_result: previous Exec result
3132
    @return: the new Exec result, based on the previous result
3133
        and hook results
3134

3135
    """
3136
    # We only really run POST phase hooks, only for non-empty groups,
3137
    # and are only interested in their results
3138
    if not self.my_node_uuids:
3139
      # empty node group
3140
      pass
3141
    elif phase == constants.HOOKS_PHASE_POST:
3142
      # Used to change hooks' output to proper indentation
3143
      feedback_fn("* Hooks Results")
3144
      assert hooks_results, "invalid result from hooks"
3145

    
3146
      for node_name in hooks_results:
3147
        res = hooks_results[node_name]
3148
        msg = res.fail_msg
3149
        test = msg and not res.offline
3150
        self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3151
                      "Communication failure in hooks execution: %s", msg)
3152
        if res.offline or msg:
3153
          # No need to investigate payload if node is offline or gave
3154
          # an error.
3155
          continue
3156
        for script, hkr, output in res.payload:
3157
          test = hkr == constants.HKR_FAIL
3158
          self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3159
                        "Script %s failed, output:", script)
3160
          if test:
3161
            output = self._HOOKS_INDENT_RE.sub("      ", output)
3162
            feedback_fn("%s" % output)
3163
            lu_result = False
3164

    
3165
    return lu_result
3166

    
3167

    
3168
class LUClusterVerifyDisks(NoHooksLU):
3169
  """Verifies the cluster disks status.
3170

3171
  """
3172
  REQ_BGL = False
3173

    
3174
  def ExpandNames(self):
3175
    self.share_locks = ShareAll()
3176
    self.needed_locks = {
3177
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
3178
      }
3179

    
3180
  def Exec(self, feedback_fn):
3181
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3182

    
3183
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3184
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3185
                           for group in group_names])