Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / cluster.py @ 1532b078

History | View | Annotate | Download (117.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with the cluster."""
23

    
24
import OpenSSL
25

    
26
import copy
27
import itertools
28
import logging
29
import operator
30
import os
31
import re
32
import time
33

    
34
from ganeti import compat
35
from ganeti import constants
36
from ganeti import errors
37
from ganeti import hypervisor
38
from ganeti import locking
39
from ganeti import masterd
40
from ganeti import netutils
41
from ganeti import objects
42
from ganeti import opcodes
43
from ganeti import pathutils
44
from ganeti import query
45
from ganeti import rpc
46
from ganeti import runtime
47
from ganeti import ssh
48
from ganeti import uidpool
49
from ganeti import utils
50
from ganeti import vcluster
51

    
52
from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
53
  ResultWithJobs
54
from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55
  ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56
  GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57
  GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58
  CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59
  ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob
60

    
61
import ganeti.masterd.instance
62

    
63

    
64
class LUClusterActivateMasterIp(NoHooksLU):
65
  """Activate the master IP on the master node.
66

67
  """
68
  def Exec(self, feedback_fn):
69
    """Activate the master IP.
70

71
    """
72
    master_params = self.cfg.GetMasterNetworkParameters()
73
    ems = self.cfg.GetUseExternalMipScript()
74
    result = self.rpc.call_node_activate_master_ip(master_params.uuid,
75
                                                   master_params, ems)
76
    result.Raise("Could not activate the master IP")
77

    
78

    
79
class LUClusterDeactivateMasterIp(NoHooksLU):
80
  """Deactivate the master IP on the master node.
81

82
  """
83
  def Exec(self, feedback_fn):
84
    """Deactivate the master IP.
85

86
    """
87
    master_params = self.cfg.GetMasterNetworkParameters()
88
    ems = self.cfg.GetUseExternalMipScript()
89
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
90
                                                     master_params, ems)
91
    result.Raise("Could not deactivate the master IP")
92

    
93

    
94
class LUClusterConfigQuery(NoHooksLU):
95
  """Return configuration values.
96

97
  """
98
  REQ_BGL = False
99

    
100
  def CheckArguments(self):
101
    self.cq = ClusterQuery(None, self.op.output_fields, False)
102

    
103
  def ExpandNames(self):
104
    self.cq.ExpandNames(self)
105

    
106
  def DeclareLocks(self, level):
107
    self.cq.DeclareLocks(self, level)
108

    
109
  def Exec(self, feedback_fn):
110
    result = self.cq.OldStyleQuery(self)
111

    
112
    assert len(result) == 1
113

    
114
    return result[0]
115

    
116

    
117
class LUClusterDestroy(LogicalUnit):
118
  """Logical unit for destroying the cluster.
119

120
  """
121
  HPATH = "cluster-destroy"
122
  HTYPE = constants.HTYPE_CLUSTER
123

    
124
  def BuildHooksEnv(self):
125
    """Build hooks env.
126

127
    """
128
    return {
129
      "OP_TARGET": self.cfg.GetClusterName(),
130
      }
131

    
132
  def BuildHooksNodes(self):
133
    """Build hooks nodes.
134

135
    """
136
    return ([], [])
137

    
138
  def CheckPrereq(self):
139
    """Check prerequisites.
140

141
    This checks whether the cluster is empty.
142

143
    Any errors are signaled by raising errors.OpPrereqError.
144

145
    """
146
    master = self.cfg.GetMasterNode()
147

    
148
    nodelist = self.cfg.GetNodeList()
149
    if len(nodelist) != 1 or nodelist[0] != master:
150
      raise errors.OpPrereqError("There are still %d node(s) in"
151
                                 " this cluster." % (len(nodelist) - 1),
152
                                 errors.ECODE_INVAL)
153
    instancelist = self.cfg.GetInstanceList()
154
    if instancelist:
155
      raise errors.OpPrereqError("There are still %d instance(s) in"
156
                                 " this cluster." % len(instancelist),
157
                                 errors.ECODE_INVAL)
158

    
159
  def Exec(self, feedback_fn):
160
    """Destroys the cluster.
161

162
    """
163
    master_params = self.cfg.GetMasterNetworkParameters()
164

    
165
    # Run post hooks on master node before it's removed
166
    RunPostHook(self, self.cfg.GetNodeName(master_params.uuid))
167

    
168
    ems = self.cfg.GetUseExternalMipScript()
169
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
170
                                                     master_params, ems)
171
    result.Warn("Error disabling the master IP address", self.LogWarning)
172
    return master_params.uuid
173

    
174

    
175
class LUClusterPostInit(LogicalUnit):
176
  """Logical unit for running hooks after cluster initialization.
177

178
  """
179
  HPATH = "cluster-init"
180
  HTYPE = constants.HTYPE_CLUSTER
181

    
182
  def BuildHooksEnv(self):
183
    """Build hooks env.
184

185
    """
186
    return {
187
      "OP_TARGET": self.cfg.GetClusterName(),
188
      }
189

    
190
  def BuildHooksNodes(self):
191
    """Build hooks nodes.
192

193
    """
194
    return ([], [self.cfg.GetMasterNode()])
195

    
196
  def Exec(self, feedback_fn):
197
    """Nothing to do.
198

199
    """
200
    return True
201

    
202

    
203
class ClusterQuery(QueryBase):
204
  FIELDS = query.CLUSTER_FIELDS
205

    
206
  #: Do not sort (there is only one item)
207
  SORT_FIELD = None
208

    
209
  def ExpandNames(self, lu):
210
    lu.needed_locks = {}
211

    
212
    # The following variables interact with _QueryBase._GetNames
213
    self.wanted = locking.ALL_SET
214
    self.do_locking = self.use_locking
215

    
216
    if self.do_locking:
217
      raise errors.OpPrereqError("Can not use locking for cluster queries",
218
                                 errors.ECODE_INVAL)
219

    
220
  def DeclareLocks(self, lu, level):
221
    pass
222

    
223
  def _GetQueryData(self, lu):
224
    """Computes the list of nodes and their attributes.
225

226
    """
227
    # Locking is not used
228
    assert not (compat.any(lu.glm.is_owned(level)
229
                           for level in locking.LEVELS
230
                           if level != locking.LEVEL_CLUSTER) or
231
                self.do_locking or self.use_locking)
232

    
233
    if query.CQ_CONFIG in self.requested_data:
234
      cluster = lu.cfg.GetClusterInfo()
235
      nodes = lu.cfg.GetAllNodesInfo()
236
    else:
237
      cluster = NotImplemented
238
      nodes = NotImplemented
239

    
240
    if query.CQ_QUEUE_DRAINED in self.requested_data:
241
      drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
242
    else:
243
      drain_flag = NotImplemented
244

    
245
    if query.CQ_WATCHER_PAUSE in self.requested_data:
246
      master_node_uuid = lu.cfg.GetMasterNode()
247

    
248
      result = lu.rpc.call_get_watcher_pause(master_node_uuid)
249
      result.Raise("Can't retrieve watcher pause from master node '%s'" %
250
                   lu.cfg.GetMasterNodeName())
251

    
252
      watcher_pause = result.payload
253
    else:
254
      watcher_pause = NotImplemented
255

    
256
    return query.ClusterQueryData(cluster, nodes, drain_flag, watcher_pause)
257

    
258

    
259
class LUClusterQuery(NoHooksLU):
260
  """Query cluster configuration.
261

262
  """
263
  REQ_BGL = False
264

    
265
  def ExpandNames(self):
266
    self.needed_locks = {}
267

    
268
  def Exec(self, feedback_fn):
269
    """Return cluster config.
270

271
    """
272
    cluster = self.cfg.GetClusterInfo()
273
    os_hvp = {}
274

    
275
    # Filter just for enabled hypervisors
276
    for os_name, hv_dict in cluster.os_hvp.items():
277
      os_hvp[os_name] = {}
278
      for hv_name, hv_params in hv_dict.items():
279
        if hv_name in cluster.enabled_hypervisors:
280
          os_hvp[os_name][hv_name] = hv_params
281

    
282
    # Convert ip_family to ip_version
283
    primary_ip_version = constants.IP4_VERSION
284
    if cluster.primary_ip_family == netutils.IP6Address.family:
285
      primary_ip_version = constants.IP6_VERSION
286

    
287
    result = {
288
      "software_version": constants.RELEASE_VERSION,
289
      "protocol_version": constants.PROTOCOL_VERSION,
290
      "config_version": constants.CONFIG_VERSION,
291
      "os_api_version": max(constants.OS_API_VERSIONS),
292
      "export_version": constants.EXPORT_VERSION,
293
      "vcs_version": constants.VCS_VERSION,
294
      "architecture": runtime.GetArchInfo(),
295
      "name": cluster.cluster_name,
296
      "master": self.cfg.GetMasterNodeName(),
297
      "default_hypervisor": cluster.primary_hypervisor,
298
      "enabled_hypervisors": cluster.enabled_hypervisors,
299
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
300
                        for hypervisor_name in cluster.enabled_hypervisors]),
301
      "os_hvp": os_hvp,
302
      "beparams": cluster.beparams,
303
      "osparams": cluster.osparams,
304
      "ipolicy": cluster.ipolicy,
305
      "nicparams": cluster.nicparams,
306
      "ndparams": cluster.ndparams,
307
      "diskparams": cluster.diskparams,
308
      "candidate_pool_size": cluster.candidate_pool_size,
309
      "master_netdev": cluster.master_netdev,
310
      "master_netmask": cluster.master_netmask,
311
      "use_external_mip_script": cluster.use_external_mip_script,
312
      "volume_group_name": cluster.volume_group_name,
313
      "drbd_usermode_helper": cluster.drbd_usermode_helper,
314
      "file_storage_dir": cluster.file_storage_dir,
315
      "shared_file_storage_dir": cluster.shared_file_storage_dir,
316
      "maintain_node_health": cluster.maintain_node_health,
317
      "ctime": cluster.ctime,
318
      "mtime": cluster.mtime,
319
      "uuid": cluster.uuid,
320
      "tags": list(cluster.GetTags()),
321
      "uid_pool": cluster.uid_pool,
322
      "default_iallocator": cluster.default_iallocator,
323
      "reserved_lvs": cluster.reserved_lvs,
324
      "primary_ip_version": primary_ip_version,
325
      "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
326
      "hidden_os": cluster.hidden_os,
327
      "blacklisted_os": cluster.blacklisted_os,
328
      "enabled_disk_templates": cluster.enabled_disk_templates,
329
      }
330

    
331
    return result
332

    
333

    
334
class LUClusterRedistConf(NoHooksLU):
335
  """Force the redistribution of cluster configuration.
336

337
  This is a very simple LU.
338

339
  """
340
  REQ_BGL = False
341

    
342
  def ExpandNames(self):
343
    self.needed_locks = {
344
      locking.LEVEL_NODE: locking.ALL_SET,
345
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
346
    }
347
    self.share_locks = ShareAll()
348

    
349
  def Exec(self, feedback_fn):
350
    """Redistribute the configuration.
351

352
    """
353
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
354
    RedistributeAncillaryFiles(self)
355

    
356

    
357
class LUClusterRename(LogicalUnit):
358
  """Rename the cluster.
359

360
  """
361
  HPATH = "cluster-rename"
362
  HTYPE = constants.HTYPE_CLUSTER
363

    
364
  def BuildHooksEnv(self):
365
    """Build hooks env.
366

367
    """
368
    return {
369
      "OP_TARGET": self.cfg.GetClusterName(),
370
      "NEW_NAME": self.op.name,
371
      }
372

    
373
  def BuildHooksNodes(self):
374
    """Build hooks nodes.
375

376
    """
377
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
378

    
379
  def CheckPrereq(self):
380
    """Verify that the passed name is a valid one.
381

382
    """
383
    hostname = netutils.GetHostname(name=self.op.name,
384
                                    family=self.cfg.GetPrimaryIPFamily())
385

    
386
    new_name = hostname.name
387
    self.ip = new_ip = hostname.ip
388
    old_name = self.cfg.GetClusterName()
389
    old_ip = self.cfg.GetMasterIP()
390
    if new_name == old_name and new_ip == old_ip:
391
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
392
                                 " cluster has changed",
393
                                 errors.ECODE_INVAL)
394
    if new_ip != old_ip:
395
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
396
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
397
                                   " reachable on the network" %
398
                                   new_ip, errors.ECODE_NOTUNIQUE)
399

    
400
    self.op.name = new_name
401

    
402
  def Exec(self, feedback_fn):
403
    """Rename the cluster.
404

405
    """
406
    clustername = self.op.name
407
    new_ip = self.ip
408

    
409
    # shutdown the master IP
410
    master_params = self.cfg.GetMasterNetworkParameters()
411
    ems = self.cfg.GetUseExternalMipScript()
412
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
413
                                                     master_params, ems)
414
    result.Raise("Could not disable the master role")
415

    
416
    try:
417
      cluster = self.cfg.GetClusterInfo()
418
      cluster.cluster_name = clustername
419
      cluster.master_ip = new_ip
420
      self.cfg.Update(cluster, feedback_fn)
421

    
422
      # update the known hosts file
423
      ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
424
      node_list = self.cfg.GetOnlineNodeList()
425
      try:
426
        node_list.remove(master_params.uuid)
427
      except ValueError:
428
        pass
429
      UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
430
    finally:
431
      master_params.ip = new_ip
432
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
433
                                                     master_params, ems)
434
      result.Warn("Could not re-enable the master role on the master,"
435
                  " please restart manually", self.LogWarning)
436

    
437
    return clustername
438

    
439

    
440
class LUClusterRepairDiskSizes(NoHooksLU):
441
  """Verifies the cluster disks sizes.
442

443
  """
444
  REQ_BGL = False
445

    
446
  def ExpandNames(self):
447
    if self.op.instances:
448
      (_, self.wanted_names) = GetWantedInstances(self, self.op.instances)
449
      # Not getting the node allocation lock as only a specific set of
450
      # instances (and their nodes) is going to be acquired
451
      self.needed_locks = {
452
        locking.LEVEL_NODE_RES: [],
453
        locking.LEVEL_INSTANCE: self.wanted_names,
454
        }
455
      self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
456
    else:
457
      self.wanted_names = None
458
      self.needed_locks = {
459
        locking.LEVEL_NODE_RES: locking.ALL_SET,
460
        locking.LEVEL_INSTANCE: locking.ALL_SET,
461

    
462
        # This opcode is acquires the node locks for all instances
463
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
464
        }
465

    
466
    self.share_locks = {
467
      locking.LEVEL_NODE_RES: 1,
468
      locking.LEVEL_INSTANCE: 0,
469
      locking.LEVEL_NODE_ALLOC: 1,
470
      }
471

    
472
  def DeclareLocks(self, level):
473
    if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
474
      self._LockInstancesNodes(primary_only=True, level=level)
475

    
476
  def CheckPrereq(self):
477
    """Check prerequisites.
478

479
    This only checks the optional instance list against the existing names.
480

481
    """
482
    if self.wanted_names is None:
483
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
484

    
485
    self.wanted_instances = \
486
        map(compat.snd, self.cfg.GetMultiInstanceInfoByName(self.wanted_names))
487

    
488
  def _EnsureChildSizes(self, disk):
489
    """Ensure children of the disk have the needed disk size.
490

491
    This is valid mainly for DRBD8 and fixes an issue where the
492
    children have smaller disk size.
493

494
    @param disk: an L{ganeti.objects.Disk} object
495

496
    """
497
    if disk.dev_type == constants.LD_DRBD8:
498
      assert disk.children, "Empty children for DRBD8?"
499
      fchild = disk.children[0]
500
      mismatch = fchild.size < disk.size
501
      if mismatch:
502
        self.LogInfo("Child disk has size %d, parent %d, fixing",
503
                     fchild.size, disk.size)
504
        fchild.size = disk.size
505

    
506
      # and we recurse on this child only, not on the metadev
507
      return self._EnsureChildSizes(fchild) or mismatch
508
    else:
509
      return False
510

    
511
  def Exec(self, feedback_fn):
512
    """Verify the size of cluster disks.
513

514
    """
515
    # TODO: check child disks too
516
    # TODO: check differences in size between primary/secondary nodes
517
    per_node_disks = {}
518
    for instance in self.wanted_instances:
519
      pnode = instance.primary_node
520
      if pnode not in per_node_disks:
521
        per_node_disks[pnode] = []
522
      for idx, disk in enumerate(instance.disks):
523
        per_node_disks[pnode].append((instance, idx, disk))
524

    
525
    assert not (frozenset(per_node_disks.keys()) -
526
                self.owned_locks(locking.LEVEL_NODE_RES)), \
527
      "Not owning correct locks"
528
    assert not self.owned_locks(locking.LEVEL_NODE)
529

    
530
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
531
                                               per_node_disks.keys())
532

    
533
    changed = []
534
    for node_uuid, dskl in per_node_disks.items():
535
      newl = [v[2].Copy() for v in dskl]
536
      for dsk in newl:
537
        self.cfg.SetDiskID(dsk, node_uuid)
538
      node_name = self.cfg.GetNodeName(node_uuid)
539
      result = self.rpc.call_blockdev_getdimensions(node_uuid, newl)
540
      if result.fail_msg:
541
        self.LogWarning("Failure in blockdev_getdimensions call to node"
542
                        " %s, ignoring", node_name)
543
        continue
544
      if len(result.payload) != len(dskl):
545
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
546
                        " result.payload=%s", node_name, len(dskl),
547
                        result.payload)
548
        self.LogWarning("Invalid result from node %s, ignoring node results",
549
                        node_name)
550
        continue
551
      for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
552
        if dimensions is None:
553
          self.LogWarning("Disk %d of instance %s did not return size"
554
                          " information, ignoring", idx, instance.name)
555
          continue
556
        if not isinstance(dimensions, (tuple, list)):
557
          self.LogWarning("Disk %d of instance %s did not return valid"
558
                          " dimension information, ignoring", idx,
559
                          instance.name)
560
          continue
561
        (size, spindles) = dimensions
562
        if not isinstance(size, (int, long)):
563
          self.LogWarning("Disk %d of instance %s did not return valid"
564
                          " size information, ignoring", idx, instance.name)
565
          continue
566
        size = size >> 20
567
        if size != disk.size:
568
          self.LogInfo("Disk %d of instance %s has mismatched size,"
569
                       " correcting: recorded %d, actual %d", idx,
570
                       instance.name, disk.size, size)
571
          disk.size = size
572
          self.cfg.Update(instance, feedback_fn)
573
          changed.append((instance.name, idx, "size", size))
574
        if es_flags[node_uuid]:
575
          if spindles is None:
576
            self.LogWarning("Disk %d of instance %s did not return valid"
577
                            " spindles information, ignoring", idx,
578
                            instance.name)
579
          elif disk.spindles is None or disk.spindles != spindles:
580
            self.LogInfo("Disk %d of instance %s has mismatched spindles,"
581
                         " correcting: recorded %s, actual %s",
582
                         idx, instance.name, disk.spindles, spindles)
583
            disk.spindles = spindles
584
            self.cfg.Update(instance, feedback_fn)
585
            changed.append((instance.name, idx, "spindles", disk.spindles))
586
        if self._EnsureChildSizes(disk):
587
          self.cfg.Update(instance, feedback_fn)
588
          changed.append((instance.name, idx, "size", disk.size))
589
    return changed
590

    
591

    
592
def _ValidateNetmask(cfg, netmask):
593
  """Checks if a netmask is valid.
594

595
  @type cfg: L{config.ConfigWriter}
596
  @param cfg: The cluster configuration
597
  @type netmask: int
598
  @param netmask: the netmask to be verified
599
  @raise errors.OpPrereqError: if the validation fails
600

601
  """
602
  ip_family = cfg.GetPrimaryIPFamily()
603
  try:
604
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
605
  except errors.ProgrammerError:
606
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
607
                               ip_family, errors.ECODE_INVAL)
608
  if not ipcls.ValidateNetmask(netmask):
609
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
610
                               (netmask), errors.ECODE_INVAL)
611

    
612

    
613
def CheckFileBasedStoragePathVsEnabledDiskTemplates(
614
    logging_warn_fn, file_storage_dir, enabled_disk_templates,
615
    file_disk_template):
616
  """Checks whether the given file-based storage directory is acceptable.
617

618
  Note: This function is public, because it is also used in bootstrap.py.
619

620
  @type logging_warn_fn: function
621
  @param logging_warn_fn: function which accepts a string and logs it
622
  @type file_storage_dir: string
623
  @param file_storage_dir: the directory to be used for file-based instances
624
  @type enabled_disk_templates: list of string
625
  @param enabled_disk_templates: the list of enabled disk templates
626
  @type file_disk_template: string
627
  @param file_disk_template: the file-based disk template for which the
628
      path should be checked
629

630
  """
631
  assert (file_disk_template in
632
          utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
633
  file_storage_enabled = file_disk_template in enabled_disk_templates
634
  if file_storage_dir is not None:
635
    if file_storage_dir == "":
636
      if file_storage_enabled:
637
        raise errors.OpPrereqError(
638
            "Unsetting the '%s' storage directory while having '%s' storage"
639
            " enabled is not permitted." %
640
            (file_disk_template, file_disk_template))
641
    else:
642
      if not file_storage_enabled:
643
        logging_warn_fn(
644
            "Specified a %s storage directory, although %s storage is not"
645
            " enabled." % (file_disk_template, file_disk_template))
646
  else:
647
    raise errors.ProgrammerError("Received %s storage dir with value"
648
                                 " 'None'." % file_disk_template)
649

    
650

    
651
def CheckFileStoragePathVsEnabledDiskTemplates(
652
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
653
  """Checks whether the given file storage directory is acceptable.
654

655
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
656

657
  """
658
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
659
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
660
      constants.DT_FILE)
661

    
662

    
663
def CheckSharedFileStoragePathVsEnabledDiskTemplates(
664
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
665
  """Checks whether the given shared file storage directory is acceptable.
666

667
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
668

669
  """
670
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
671
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
672
      constants.DT_SHARED_FILE)
673

    
674

    
675
class LUClusterSetParams(LogicalUnit):
676
  """Change the parameters of the cluster.
677

678
  """
679
  HPATH = "cluster-modify"
680
  HTYPE = constants.HTYPE_CLUSTER
681
  REQ_BGL = False
682

    
683
  def CheckArguments(self):
684
    """Check parameters
685

686
    """
687
    if self.op.uid_pool:
688
      uidpool.CheckUidPool(self.op.uid_pool)
689

    
690
    if self.op.add_uids:
691
      uidpool.CheckUidPool(self.op.add_uids)
692

    
693
    if self.op.remove_uids:
694
      uidpool.CheckUidPool(self.op.remove_uids)
695

    
696
    if self.op.master_netmask is not None:
697
      _ValidateNetmask(self.cfg, self.op.master_netmask)
698

    
699
    if self.op.diskparams:
700
      for dt_params in self.op.diskparams.values():
701
        utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
702
      try:
703
        utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
704
      except errors.OpPrereqError, err:
705
        raise errors.OpPrereqError("While verify diskparams options: %s" % err,
706
                                   errors.ECODE_INVAL)
707

    
708
  def ExpandNames(self):
709
    # FIXME: in the future maybe other cluster params won't require checking on
710
    # all nodes to be modified.
711
    # FIXME: This opcode changes cluster-wide settings. Is acquiring all
712
    # resource locks the right thing, shouldn't it be the BGL instead?
713
    self.needed_locks = {
714
      locking.LEVEL_NODE: locking.ALL_SET,
715
      locking.LEVEL_INSTANCE: locking.ALL_SET,
716
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
717
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
718
    }
719
    self.share_locks = ShareAll()
720

    
721
  def BuildHooksEnv(self):
722
    """Build hooks env.
723

724
    """
725
    return {
726
      "OP_TARGET": self.cfg.GetClusterName(),
727
      "NEW_VG_NAME": self.op.vg_name,
728
      }
729

    
730
  def BuildHooksNodes(self):
731
    """Build hooks nodes.
732

733
    """
734
    mn = self.cfg.GetMasterNode()
735
    return ([mn], [mn])
736

    
737
  def _CheckVgName(self, node_uuids, enabled_disk_templates,
738
                   new_enabled_disk_templates):
739
    """Check the consistency of the vg name on all nodes and in case it gets
740
       unset whether there are instances still using it.
741

742
    """
743
    if self.op.vg_name is not None and not self.op.vg_name:
744
      if self.cfg.HasAnyDiskOfType(constants.LD_LV):
745
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
746
                                   " instances exist", errors.ECODE_INVAL)
747

    
748
    if (self.op.vg_name is not None and
749
        utils.IsLvmEnabled(enabled_disk_templates)) or \
750
           (self.cfg.GetVGName() is not None and
751
            utils.LvmGetsEnabled(enabled_disk_templates,
752
                                 new_enabled_disk_templates)):
753
      self._CheckVgNameOnNodes(node_uuids)
754

    
755
  def _CheckVgNameOnNodes(self, node_uuids):
756
    """Check the status of the volume group on each node.
757

758
    """
759
    vglist = self.rpc.call_vg_list(node_uuids)
760
    for node_uuid in node_uuids:
761
      msg = vglist[node_uuid].fail_msg
762
      if msg:
763
        # ignoring down node
764
        self.LogWarning("Error while gathering data on node %s"
765
                        " (ignoring node): %s",
766
                        self.cfg.GetNodeName(node_uuid), msg)
767
        continue
768
      vgstatus = utils.CheckVolumeGroupSize(vglist[node_uuid].payload,
769
                                            self.op.vg_name,
770
                                            constants.MIN_VG_SIZE)
771
      if vgstatus:
772
        raise errors.OpPrereqError("Error on node '%s': %s" %
773
                                   (self.cfg.GetNodeName(node_uuid), vgstatus),
774
                                   errors.ECODE_ENVIRON)
775

    
776
  def _GetEnabledDiskTemplates(self, cluster):
777
    """Determines the enabled disk templates and the subset of disk templates
778
       that are newly enabled by this operation.
779

780
    """
781
    enabled_disk_templates = None
782
    new_enabled_disk_templates = []
783
    if self.op.enabled_disk_templates:
784
      enabled_disk_templates = self.op.enabled_disk_templates
785
      new_enabled_disk_templates = \
786
        list(set(enabled_disk_templates)
787
             - set(cluster.enabled_disk_templates))
788
    else:
789
      enabled_disk_templates = cluster.enabled_disk_templates
790
    return (enabled_disk_templates, new_enabled_disk_templates)
791

    
792
  def _CheckIpolicy(self, cluster):
793
    """Checks the ipolicy.
794

795
    @type cluster: C{objects.Cluster}
796
    @param cluster: the cluster's configuration
797

798
    """
799
    if self.op.ipolicy:
800
      self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
801
                                           group_policy=False)
802

    
803
      all_instances = self.cfg.GetAllInstancesInfo().values()
804
      violations = set()
805
      for group in self.cfg.GetAllNodeGroupsInfo().values():
806
        instances = frozenset([inst for inst in all_instances
807
                               if compat.any(nuuid in group.members
808
                                             for nuuid in inst.all_nodes)])
809
        new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
810
        ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
811
        new = ComputeNewInstanceViolations(ipol, new_ipolicy, instances,
812
                                           self.cfg)
813
        if new:
814
          violations.update(new)
815

    
816
      if violations:
817
        self.LogWarning("After the ipolicy change the following instances"
818
                        " violate them: %s",
819
                        utils.CommaJoin(utils.NiceSort(violations)))
820

    
821
  def CheckPrereq(self):
822
    """Check prerequisites.
823

824
    This checks whether the given params don't conflict and
825
    if the given volume group is valid.
826

827
    """
828
    if self.op.drbd_helper is not None and not self.op.drbd_helper:
829
      if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
830
        raise errors.OpPrereqError("Cannot disable drbd helper while"
831
                                   " drbd-based instances exist",
832
                                   errors.ECODE_INVAL)
833

    
834
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
835
    self.cluster = cluster = self.cfg.GetClusterInfo()
836

    
837
    vm_capable_node_uuids = [node.uuid
838
                             for node in self.cfg.GetAllNodesInfo().values()
839
                             if node.uuid in node_uuids and node.vm_capable]
840

    
841
    (enabled_disk_templates, new_enabled_disk_templates) = \
842
      self._GetEnabledDiskTemplates(cluster)
843

    
844
    self._CheckVgName(vm_capable_node_uuids, enabled_disk_templates,
845
                      new_enabled_disk_templates)
846

    
847
    if self.op.file_storage_dir is not None:
848
      CheckFileStoragePathVsEnabledDiskTemplates(
849
          self.LogWarning, self.op.file_storage_dir, enabled_disk_templates)
850

    
851
    if self.op.shared_file_storage_dir is not None:
852
      CheckSharedFileStoragePathVsEnabledDiskTemplates(
853
          self.LogWarning, self.op.shared_file_storage_dir,
854
          enabled_disk_templates)
855

    
856
    if self.op.drbd_helper:
857
      # checks given drbd helper on all nodes
858
      helpers = self.rpc.call_drbd_helper(node_uuids)
859
      for (_, ninfo) in self.cfg.GetMultiNodeInfo(node_uuids):
860
        if ninfo.offline:
861
          self.LogInfo("Not checking drbd helper on offline node %s",
862
                       ninfo.name)
863
          continue
864
        msg = helpers[ninfo.uuid].fail_msg
865
        if msg:
866
          raise errors.OpPrereqError("Error checking drbd helper on node"
867
                                     " '%s': %s" % (ninfo.name, msg),
868
                                     errors.ECODE_ENVIRON)
869
        node_helper = helpers[ninfo.uuid].payload
870
        if node_helper != self.op.drbd_helper:
871
          raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
872
                                     (ninfo.name, node_helper),
873
                                     errors.ECODE_ENVIRON)
874

    
875
    # validate params changes
876
    if self.op.beparams:
877
      objects.UpgradeBeParams(self.op.beparams)
878
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
879
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
880

    
881
    if self.op.ndparams:
882
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
883
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
884

    
885
      # TODO: we need a more general way to handle resetting
886
      # cluster-level parameters to default values
887
      if self.new_ndparams["oob_program"] == "":
888
        self.new_ndparams["oob_program"] = \
889
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
890

    
891
    if self.op.hv_state:
892
      new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
893
                                           self.cluster.hv_state_static)
894
      self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
895
                               for hv, values in new_hv_state.items())
896

    
897
    if self.op.disk_state:
898
      new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
899
                                               self.cluster.disk_state_static)
900
      self.new_disk_state = \
901
        dict((storage, dict((name, cluster.SimpleFillDiskState(values))
902
                            for name, values in svalues.items()))
903
             for storage, svalues in new_disk_state.items())
904

    
905
    self._CheckIpolicy(cluster)
906

    
907
    if self.op.nicparams:
908
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
909
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
910
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
911
      nic_errors = []
912

    
913
      # check all instances for consistency
914
      for instance in self.cfg.GetAllInstancesInfo().values():
915
        for nic_idx, nic in enumerate(instance.nics):
916
          params_copy = copy.deepcopy(nic.nicparams)
917
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
918

    
919
          # check parameter syntax
920
          try:
921
            objects.NIC.CheckParameterSyntax(params_filled)
922
          except errors.ConfigurationError, err:
923
            nic_errors.append("Instance %s, nic/%d: %s" %
924
                              (instance.name, nic_idx, err))
925

    
926
          # if we're moving instances to routed, check that they have an ip
927
          target_mode = params_filled[constants.NIC_MODE]
928
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
929
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
930
                              " address" % (instance.name, nic_idx))
931
      if nic_errors:
932
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
933
                                   "\n".join(nic_errors), errors.ECODE_INVAL)
934

    
935
    # hypervisor list/parameters
936
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
937
    if self.op.hvparams:
938
      for hv_name, hv_dict in self.op.hvparams.items():
939
        if hv_name not in self.new_hvparams:
940
          self.new_hvparams[hv_name] = hv_dict
941
        else:
942
          self.new_hvparams[hv_name].update(hv_dict)
943

    
944
    # disk template parameters
945
    self.new_diskparams = objects.FillDict(cluster.diskparams, {})
946
    if self.op.diskparams:
947
      for dt_name, dt_params in self.op.diskparams.items():
948
        if dt_name not in self.new_diskparams:
949
          self.new_diskparams[dt_name] = dt_params
950
        else:
951
          self.new_diskparams[dt_name].update(dt_params)
952

    
953
    # os hypervisor parameters
954
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
955
    if self.op.os_hvp:
956
      for os_name, hvs in self.op.os_hvp.items():
957
        if os_name not in self.new_os_hvp:
958
          self.new_os_hvp[os_name] = hvs
959
        else:
960
          for hv_name, hv_dict in hvs.items():
961
            if hv_dict is None:
962
              # Delete if it exists
963
              self.new_os_hvp[os_name].pop(hv_name, None)
964
            elif hv_name not in self.new_os_hvp[os_name]:
965
              self.new_os_hvp[os_name][hv_name] = hv_dict
966
            else:
967
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
968

    
969
    # os parameters
970
    self.new_osp = objects.FillDict(cluster.osparams, {})
971
    if self.op.osparams:
972
      for os_name, osp in self.op.osparams.items():
973
        if os_name not in self.new_osp:
974
          self.new_osp[os_name] = {}
975

    
976
        self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
977
                                                 use_none=True)
978

    
979
        if not self.new_osp[os_name]:
980
          # we removed all parameters
981
          del self.new_osp[os_name]
982
        else:
983
          # check the parameter validity (remote check)
984
          CheckOSParams(self, False, [self.cfg.GetMasterNode()],
985
                        os_name, self.new_osp[os_name])
986

    
987
    # changes to the hypervisor list
988
    if self.op.enabled_hypervisors is not None:
989
      self.hv_list = self.op.enabled_hypervisors
990
      for hv in self.hv_list:
991
        # if the hypervisor doesn't already exist in the cluster
992
        # hvparams, we initialize it to empty, and then (in both
993
        # cases) we make sure to fill the defaults, as we might not
994
        # have a complete defaults list if the hypervisor wasn't
995
        # enabled before
996
        if hv not in new_hvp:
997
          new_hvp[hv] = {}
998
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
999
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
1000
    else:
1001
      self.hv_list = cluster.enabled_hypervisors
1002

    
1003
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1004
      # either the enabled list has changed, or the parameters have, validate
1005
      for hv_name, hv_params in self.new_hvparams.items():
1006
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1007
            (self.op.enabled_hypervisors and
1008
             hv_name in self.op.enabled_hypervisors)):
1009
          # either this is a new hypervisor, or its parameters have changed
1010
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1011
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1012
          hv_class.CheckParameterSyntax(hv_params)
1013
          CheckHVParams(self, node_uuids, hv_name, hv_params)
1014

    
1015
    self._CheckDiskTemplateConsistency()
1016

    
1017
    if self.op.os_hvp:
1018
      # no need to check any newly-enabled hypervisors, since the
1019
      # defaults have already been checked in the above code-block
1020
      for os_name, os_hvp in self.new_os_hvp.items():
1021
        for hv_name, hv_params in os_hvp.items():
1022
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1023
          # we need to fill in the new os_hvp on top of the actual hv_p
1024
          cluster_defaults = self.new_hvparams.get(hv_name, {})
1025
          new_osp = objects.FillDict(cluster_defaults, hv_params)
1026
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1027
          hv_class.CheckParameterSyntax(new_osp)
1028
          CheckHVParams(self, node_uuids, hv_name, new_osp)
1029

    
1030
    if self.op.default_iallocator:
1031
      alloc_script = utils.FindFile(self.op.default_iallocator,
1032
                                    constants.IALLOCATOR_SEARCH_PATH,
1033
                                    os.path.isfile)
1034
      if alloc_script is None:
1035
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
1036
                                   " specified" % self.op.default_iallocator,
1037
                                   errors.ECODE_INVAL)
1038

    
1039
  def _CheckDiskTemplateConsistency(self):
1040
    """Check whether the disk templates that are going to be disabled
1041
       are still in use by some instances.
1042

1043
    """
1044
    if self.op.enabled_disk_templates:
1045
      cluster = self.cfg.GetClusterInfo()
1046
      instances = self.cfg.GetAllInstancesInfo()
1047

    
1048
      disk_templates_to_remove = set(cluster.enabled_disk_templates) \
1049
        - set(self.op.enabled_disk_templates)
1050
      for instance in instances.itervalues():
1051
        if instance.disk_template in disk_templates_to_remove:
1052
          raise errors.OpPrereqError("Cannot disable disk template '%s',"
1053
                                     " because instance '%s' is using it." %
1054
                                     (instance.disk_template, instance.name))
1055

    
1056
  def _SetVgName(self, feedback_fn):
1057
    """Determines and sets the new volume group name.
1058

1059
    """
1060
    if self.op.vg_name is not None:
1061
      if self.op.vg_name and not \
1062
           utils.IsLvmEnabled(self.cluster.enabled_disk_templates):
1063
        feedback_fn("Note that you specified a volume group, but did not"
1064
                    " enable any lvm disk template.")
1065
      new_volume = self.op.vg_name
1066
      if not new_volume:
1067
        if utils.IsLvmEnabled(self.cluster.enabled_disk_templates):
1068
          raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
1069
                                     " disk templates are enabled.")
1070
        new_volume = None
1071
      if new_volume != self.cfg.GetVGName():
1072
        self.cfg.SetVGName(new_volume)
1073
      else:
1074
        feedback_fn("Cluster LVM configuration already in desired"
1075
                    " state, not changing")
1076
    else:
1077
      if utils.IsLvmEnabled(self.cluster.enabled_disk_templates) and \
1078
          not self.cfg.GetVGName():
1079
        raise errors.OpPrereqError("Please specify a volume group when"
1080
                                   " enabling lvm-based disk-templates.")
1081

    
1082
  def _SetFileStorageDir(self, feedback_fn):
1083
    """Set the file storage directory.
1084

1085
    """
1086
    if self.op.file_storage_dir is not None:
1087
      if self.cluster.file_storage_dir == self.op.file_storage_dir:
1088
        feedback_fn("Global file storage dir already set to value '%s'"
1089
                    % self.cluster.file_storage_dir)
1090
      else:
1091
        self.cluster.file_storage_dir = self.op.file_storage_dir
1092

    
1093
  def Exec(self, feedback_fn):
1094
    """Change the parameters of the cluster.
1095

1096
    """
1097
    if self.op.enabled_disk_templates:
1098
      self.cluster.enabled_disk_templates = \
1099
        list(set(self.op.enabled_disk_templates))
1100

    
1101
    self._SetVgName(feedback_fn)
1102
    self._SetFileStorageDir(feedback_fn)
1103

    
1104
    if self.op.drbd_helper is not None:
1105
      if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
1106
        feedback_fn("Note that you specified a drbd user helper, but did"
1107
                    " enabled the drbd disk template.")
1108
      new_helper = self.op.drbd_helper
1109
      if not new_helper:
1110
        new_helper = None
1111
      if new_helper != self.cfg.GetDRBDHelper():
1112
        self.cfg.SetDRBDHelper(new_helper)
1113
      else:
1114
        feedback_fn("Cluster DRBD helper already in desired state,"
1115
                    " not changing")
1116
    if self.op.hvparams:
1117
      self.cluster.hvparams = self.new_hvparams
1118
    if self.op.os_hvp:
1119
      self.cluster.os_hvp = self.new_os_hvp
1120
    if self.op.enabled_hypervisors is not None:
1121
      self.cluster.hvparams = self.new_hvparams
1122
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1123
    if self.op.beparams:
1124
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1125
    if self.op.nicparams:
1126
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1127
    if self.op.ipolicy:
1128
      self.cluster.ipolicy = self.new_ipolicy
1129
    if self.op.osparams:
1130
      self.cluster.osparams = self.new_osp
1131
    if self.op.ndparams:
1132
      self.cluster.ndparams = self.new_ndparams
1133
    if self.op.diskparams:
1134
      self.cluster.diskparams = self.new_diskparams
1135
    if self.op.hv_state:
1136
      self.cluster.hv_state_static = self.new_hv_state
1137
    if self.op.disk_state:
1138
      self.cluster.disk_state_static = self.new_disk_state
1139

    
1140
    if self.op.candidate_pool_size is not None:
1141
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1142
      # we need to update the pool size here, otherwise the save will fail
1143
      AdjustCandidatePool(self, [])
1144

    
1145
    if self.op.maintain_node_health is not None:
1146
      if self.op.maintain_node_health and not constants.ENABLE_CONFD:
1147
        feedback_fn("Note: CONFD was disabled at build time, node health"
1148
                    " maintenance is not useful (still enabling it)")
1149
      self.cluster.maintain_node_health = self.op.maintain_node_health
1150

    
1151
    if self.op.modify_etc_hosts is not None:
1152
      self.cluster.modify_etc_hosts = self.op.modify_etc_hosts
1153

    
1154
    if self.op.prealloc_wipe_disks is not None:
1155
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
1156

    
1157
    if self.op.add_uids is not None:
1158
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
1159

    
1160
    if self.op.remove_uids is not None:
1161
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
1162

    
1163
    if self.op.uid_pool is not None:
1164
      self.cluster.uid_pool = self.op.uid_pool
1165

    
1166
    if self.op.default_iallocator is not None:
1167
      self.cluster.default_iallocator = self.op.default_iallocator
1168

    
1169
    if self.op.reserved_lvs is not None:
1170
      self.cluster.reserved_lvs = self.op.reserved_lvs
1171

    
1172
    if self.op.use_external_mip_script is not None:
1173
      self.cluster.use_external_mip_script = self.op.use_external_mip_script
1174

    
1175
    def helper_os(aname, mods, desc):
1176
      desc += " OS list"
1177
      lst = getattr(self.cluster, aname)
1178
      for key, val in mods:
1179
        if key == constants.DDM_ADD:
1180
          if val in lst:
1181
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1182
          else:
1183
            lst.append(val)
1184
        elif key == constants.DDM_REMOVE:
1185
          if val in lst:
1186
            lst.remove(val)
1187
          else:
1188
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1189
        else:
1190
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
1191

    
1192
    if self.op.hidden_os:
1193
      helper_os("hidden_os", self.op.hidden_os, "hidden")
1194

    
1195
    if self.op.blacklisted_os:
1196
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1197

    
1198
    if self.op.master_netdev:
1199
      master_params = self.cfg.GetMasterNetworkParameters()
1200
      ems = self.cfg.GetUseExternalMipScript()
1201
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
1202
                  self.cluster.master_netdev)
1203
      result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
1204
                                                       master_params, ems)
1205
      if not self.op.force:
1206
        result.Raise("Could not disable the master ip")
1207
      else:
1208
        if result.fail_msg:
1209
          msg = ("Could not disable the master ip (continuing anyway): %s" %
1210
                 result.fail_msg)
1211
          feedback_fn(msg)
1212
      feedback_fn("Changing master_netdev from %s to %s" %
1213
                  (master_params.netdev, self.op.master_netdev))
1214
      self.cluster.master_netdev = self.op.master_netdev
1215

    
1216
    if self.op.master_netmask:
1217
      master_params = self.cfg.GetMasterNetworkParameters()
1218
      feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1219
      result = self.rpc.call_node_change_master_netmask(
1220
                 master_params.uuid, master_params.netmask,
1221
                 self.op.master_netmask, master_params.ip,
1222
                 master_params.netdev)
1223
      result.Warn("Could not change the master IP netmask", feedback_fn)
1224
      self.cluster.master_netmask = self.op.master_netmask
1225

    
1226
    self.cfg.Update(self.cluster, feedback_fn)
1227

    
1228
    if self.op.master_netdev:
1229
      master_params = self.cfg.GetMasterNetworkParameters()
1230
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
1231
                  self.op.master_netdev)
1232
      ems = self.cfg.GetUseExternalMipScript()
1233
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
1234
                                                     master_params, ems)
1235
      result.Warn("Could not re-enable the master ip on the master,"
1236
                  " please restart manually", self.LogWarning)
1237

    
1238

    
1239
class LUClusterVerify(NoHooksLU):
1240
  """Submits all jobs necessary to verify the cluster.
1241

1242
  """
1243
  REQ_BGL = False
1244

    
1245
  def ExpandNames(self):
1246
    self.needed_locks = {}
1247

    
1248
  def Exec(self, feedback_fn):
1249
    jobs = []
1250

    
1251
    if self.op.group_name:
1252
      groups = [self.op.group_name]
1253
      depends_fn = lambda: None
1254
    else:
1255
      groups = self.cfg.GetNodeGroupList()
1256

    
1257
      # Verify global configuration
1258
      jobs.append([
1259
        opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1260
        ])
1261

    
1262
      # Always depend on global verification
1263
      depends_fn = lambda: [(-len(jobs), [])]
1264

    
1265
    jobs.extend(
1266
      [opcodes.OpClusterVerifyGroup(group_name=group,
1267
                                    ignore_errors=self.op.ignore_errors,
1268
                                    depends=depends_fn())]
1269
      for group in groups)
1270

    
1271
    # Fix up all parameters
1272
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1273
      op.debug_simulate_errors = self.op.debug_simulate_errors
1274
      op.verbose = self.op.verbose
1275
      op.error_codes = self.op.error_codes
1276
      try:
1277
        op.skip_checks = self.op.skip_checks
1278
      except AttributeError:
1279
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1280

    
1281
    return ResultWithJobs(jobs)
1282

    
1283

    
1284
class _VerifyErrors(object):
1285
  """Mix-in for cluster/group verify LUs.
1286

1287
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1288
  self.op and self._feedback_fn to be available.)
1289

1290
  """
1291

    
1292
  ETYPE_FIELD = "code"
1293
  ETYPE_ERROR = "ERROR"
1294
  ETYPE_WARNING = "WARNING"
1295

    
1296
  def _Error(self, ecode, item, msg, *args, **kwargs):
1297
    """Format an error message.
1298

1299
    Based on the opcode's error_codes parameter, either format a
1300
    parseable error code, or a simpler error string.
1301

1302
    This must be called only from Exec and functions called from Exec.
1303

1304
    """
1305
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1306
    itype, etxt, _ = ecode
1307
    # If the error code is in the list of ignored errors, demote the error to a
1308
    # warning
1309
    if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1310
      ltype = self.ETYPE_WARNING
1311
    # first complete the msg
1312
    if args:
1313
      msg = msg % args
1314
    # then format the whole message
1315
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1316
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1317
    else:
1318
      if item:
1319
        item = " " + item
1320
      else:
1321
        item = ""
1322
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1323
    # and finally report it via the feedback_fn
1324
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1325
    # do not mark the operation as failed for WARN cases only
1326
    if ltype == self.ETYPE_ERROR:
1327
      self.bad = True
1328

    
1329
  def _ErrorIf(self, cond, *args, **kwargs):
1330
    """Log an error message if the passed condition is True.
1331

1332
    """
1333
    if (bool(cond)
1334
        or self.op.debug_simulate_errors): # pylint: disable=E1101
1335
      self._Error(*args, **kwargs)
1336

    
1337

    
1338
def _VerifyCertificate(filename):
1339
  """Verifies a certificate for L{LUClusterVerifyConfig}.
1340

1341
  @type filename: string
1342
  @param filename: Path to PEM file
1343

1344
  """
1345
  try:
1346
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1347
                                           utils.ReadFile(filename))
1348
  except Exception, err: # pylint: disable=W0703
1349
    return (LUClusterVerifyConfig.ETYPE_ERROR,
1350
            "Failed to load X509 certificate %s: %s" % (filename, err))
1351

    
1352
  (errcode, msg) = \
1353
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1354
                                constants.SSL_CERT_EXPIRATION_ERROR)
1355

    
1356
  if msg:
1357
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1358
  else:
1359
    fnamemsg = None
1360

    
1361
  if errcode is None:
1362
    return (None, fnamemsg)
1363
  elif errcode == utils.CERT_WARNING:
1364
    return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1365
  elif errcode == utils.CERT_ERROR:
1366
    return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1367

    
1368
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1369

    
1370

    
1371
def _GetAllHypervisorParameters(cluster, instances):
1372
  """Compute the set of all hypervisor parameters.
1373

1374
  @type cluster: L{objects.Cluster}
1375
  @param cluster: the cluster object
1376
  @param instances: list of L{objects.Instance}
1377
  @param instances: additional instances from which to obtain parameters
1378
  @rtype: list of (origin, hypervisor, parameters)
1379
  @return: a list with all parameters found, indicating the hypervisor they
1380
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1381

1382
  """
1383
  hvp_data = []
1384

    
1385
  for hv_name in cluster.enabled_hypervisors:
1386
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1387

    
1388
  for os_name, os_hvp in cluster.os_hvp.items():
1389
    for hv_name, hv_params in os_hvp.items():
1390
      if hv_params:
1391
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1392
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1393

    
1394
  # TODO: collapse identical parameter values in a single one
1395
  for instance in instances:
1396
    if instance.hvparams:
1397
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1398
                       cluster.FillHV(instance)))
1399

    
1400
  return hvp_data
1401

    
1402

    
1403
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1404
  """Verifies the cluster config.
1405

1406
  """
1407
  REQ_BGL = False
1408

    
1409
  def _VerifyHVP(self, hvp_data):
1410
    """Verifies locally the syntax of the hypervisor parameters.
1411

1412
    """
1413
    for item, hv_name, hv_params in hvp_data:
1414
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1415
             (item, hv_name))
1416
      try:
1417
        hv_class = hypervisor.GetHypervisorClass(hv_name)
1418
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1419
        hv_class.CheckParameterSyntax(hv_params)
1420
      except errors.GenericError, err:
1421
        self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1422

    
1423
  def ExpandNames(self):
1424
    self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1425
    self.share_locks = ShareAll()
1426

    
1427
  def CheckPrereq(self):
1428
    """Check prerequisites.
1429

1430
    """
1431
    # Retrieve all information
1432
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1433
    self.all_node_info = self.cfg.GetAllNodesInfo()
1434
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1435

    
1436
  def Exec(self, feedback_fn):
1437
    """Verify integrity of cluster, performing various test on nodes.
1438

1439
    """
1440
    self.bad = False
1441
    self._feedback_fn = feedback_fn
1442

    
1443
    feedback_fn("* Verifying cluster config")
1444

    
1445
    for msg in self.cfg.VerifyConfig():
1446
      self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1447

    
1448
    feedback_fn("* Verifying cluster certificate files")
1449

    
1450
    for cert_filename in pathutils.ALL_CERT_FILES:
1451
      (errcode, msg) = _VerifyCertificate(cert_filename)
1452
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1453

    
1454
    self._ErrorIf(not utils.CanRead(constants.LUXID_USER,
1455
                                    pathutils.NODED_CERT_FILE),
1456
                  constants.CV_ECLUSTERCERT,
1457
                  None,
1458
                  pathutils.NODED_CERT_FILE + " must be accessible by the " +
1459
                    constants.LUXID_USER + " user")
1460

    
1461
    feedback_fn("* Verifying hypervisor parameters")
1462

    
1463
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1464
                                                self.all_inst_info.values()))
1465

    
1466
    feedback_fn("* Verifying all nodes belong to an existing group")
1467

    
1468
    # We do this verification here because, should this bogus circumstance
1469
    # occur, it would never be caught by VerifyGroup, which only acts on
1470
    # nodes/instances reachable from existing node groups.
1471

    
1472
    dangling_nodes = set(node for node in self.all_node_info.values()
1473
                         if node.group not in self.all_group_info)
1474

    
1475
    dangling_instances = {}
1476
    no_node_instances = []
1477

    
1478
    for inst in self.all_inst_info.values():
1479
      if inst.primary_node in [node.uuid for node in dangling_nodes]:
1480
        dangling_instances.setdefault(inst.primary_node, []).append(inst)
1481
      elif inst.primary_node not in self.all_node_info:
1482
        no_node_instances.append(inst)
1483

    
1484
    pretty_dangling = [
1485
        "%s (%s)" %
1486
        (node.name,
1487
         utils.CommaJoin(inst.name for
1488
                         inst in dangling_instances.get(node.uuid, [])))
1489
        for node in dangling_nodes]
1490

    
1491
    self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1492
                  None,
1493
                  "the following nodes (and their instances) belong to a non"
1494
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1495

    
1496
    self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1497
                  None,
1498
                  "the following instances have a non-existing primary-node:"
1499
                  " %s", utils.CommaJoin(inst.name for
1500
                                         inst in no_node_instances))
1501

    
1502
    return not self.bad
1503

    
1504

    
1505
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1506
  """Verifies the status of a node group.
1507

1508
  """
1509
  HPATH = "cluster-verify"
1510
  HTYPE = constants.HTYPE_CLUSTER
1511
  REQ_BGL = False
1512

    
1513
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1514

    
1515
  class NodeImage(object):
1516
    """A class representing the logical and physical status of a node.
1517

1518
    @type uuid: string
1519
    @ivar uuid: the node UUID to which this object refers
1520
    @ivar volumes: a structure as returned from
1521
        L{ganeti.backend.GetVolumeList} (runtime)
1522
    @ivar instances: a list of running instances (runtime)
1523
    @ivar pinst: list of configured primary instances (config)
1524
    @ivar sinst: list of configured secondary instances (config)
1525
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1526
        instances for which this node is secondary (config)
1527
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1528
    @ivar dfree: free disk, as reported by the node (runtime)
1529
    @ivar offline: the offline status (config)
1530
    @type rpc_fail: boolean
1531
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1532
        not whether the individual keys were correct) (runtime)
1533
    @type lvm_fail: boolean
1534
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1535
    @type hyp_fail: boolean
1536
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1537
    @type ghost: boolean
1538
    @ivar ghost: whether this is a known node or not (config)
1539
    @type os_fail: boolean
1540
    @ivar os_fail: whether the RPC call didn't return valid OS data
1541
    @type oslist: list
1542
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1543
    @type vm_capable: boolean
1544
    @ivar vm_capable: whether the node can host instances
1545
    @type pv_min: float
1546
    @ivar pv_min: size in MiB of the smallest PVs
1547
    @type pv_max: float
1548
    @ivar pv_max: size in MiB of the biggest PVs
1549

1550
    """
1551
    def __init__(self, offline=False, uuid=None, vm_capable=True):
1552
      self.uuid = uuid
1553
      self.volumes = {}
1554
      self.instances = []
1555
      self.pinst = []
1556
      self.sinst = []
1557
      self.sbp = {}
1558
      self.mfree = 0
1559
      self.dfree = 0
1560
      self.offline = offline
1561
      self.vm_capable = vm_capable
1562
      self.rpc_fail = False
1563
      self.lvm_fail = False
1564
      self.hyp_fail = False
1565
      self.ghost = False
1566
      self.os_fail = False
1567
      self.oslist = {}
1568
      self.pv_min = None
1569
      self.pv_max = None
1570

    
1571
  def ExpandNames(self):
1572
    # This raises errors.OpPrereqError on its own:
1573
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1574

    
1575
    # Get instances in node group; this is unsafe and needs verification later
1576
    inst_uuids = \
1577
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1578

    
1579
    self.needed_locks = {
1580
      locking.LEVEL_INSTANCE: self.cfg.GetInstanceNames(inst_uuids),
1581
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1582
      locking.LEVEL_NODE: [],
1583

    
1584
      # This opcode is run by watcher every five minutes and acquires all nodes
1585
      # for a group. It doesn't run for a long time, so it's better to acquire
1586
      # the node allocation lock as well.
1587
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1588
      }
1589

    
1590
    self.share_locks = ShareAll()
1591

    
1592
  def DeclareLocks(self, level):
1593
    if level == locking.LEVEL_NODE:
1594
      # Get members of node group; this is unsafe and needs verification later
1595
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1596

    
1597
      # In Exec(), we warn about mirrored instances that have primary and
1598
      # secondary living in separate node groups. To fully verify that
1599
      # volumes for these instances are healthy, we will need to do an
1600
      # extra call to their secondaries. We ensure here those nodes will
1601
      # be locked.
1602
      for inst_name in self.owned_locks(locking.LEVEL_INSTANCE):
1603
        # Important: access only the instances whose lock is owned
1604
        instance = self.cfg.GetInstanceInfoByName(inst_name)
1605
        if instance.disk_template in constants.DTS_INT_MIRROR:
1606
          nodes.update(instance.secondary_nodes)
1607

    
1608
      self.needed_locks[locking.LEVEL_NODE] = nodes
1609

    
1610
  def CheckPrereq(self):
1611
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1612
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1613

    
1614
    group_node_uuids = set(self.group_info.members)
1615
    group_inst_uuids = \
1616
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1617

    
1618
    unlocked_node_uuids = \
1619
        group_node_uuids.difference(self.owned_locks(locking.LEVEL_NODE))
1620

    
1621
    unlocked_inst_uuids = \
1622
        group_inst_uuids.difference(
1623
          [self.cfg.GetInstanceInfoByName(name).uuid
1624
           for name in self.owned_locks(locking.LEVEL_INSTANCE)])
1625

    
1626
    if unlocked_node_uuids:
1627
      raise errors.OpPrereqError(
1628
        "Missing lock for nodes: %s" %
1629
        utils.CommaJoin(self.cfg.GetNodeNames(unlocked_node_uuids)),
1630
        errors.ECODE_STATE)
1631

    
1632
    if unlocked_inst_uuids:
1633
      raise errors.OpPrereqError(
1634
        "Missing lock for instances: %s" %
1635
        utils.CommaJoin(self.cfg.GetInstanceNames(unlocked_inst_uuids)),
1636
        errors.ECODE_STATE)
1637

    
1638
    self.all_node_info = self.cfg.GetAllNodesInfo()
1639
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1640

    
1641
    self.my_node_uuids = group_node_uuids
1642
    self.my_node_info = dict((node_uuid, self.all_node_info[node_uuid])
1643
                             for node_uuid in group_node_uuids)
1644

    
1645
    self.my_inst_uuids = group_inst_uuids
1646
    self.my_inst_info = dict((inst_uuid, self.all_inst_info[inst_uuid])
1647
                             for inst_uuid in group_inst_uuids)
1648

    
1649
    # We detect here the nodes that will need the extra RPC calls for verifying
1650
    # split LV volumes; they should be locked.
1651
    extra_lv_nodes = set()
1652

    
1653
    for inst in self.my_inst_info.values():
1654
      if inst.disk_template in constants.DTS_INT_MIRROR:
1655
        for nuuid in inst.all_nodes:
1656
          if self.all_node_info[nuuid].group != self.group_uuid:
1657
            extra_lv_nodes.add(nuuid)
1658

    
1659
    unlocked_lv_nodes = \
1660
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1661

    
1662
    if unlocked_lv_nodes:
1663
      raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1664
                                 utils.CommaJoin(unlocked_lv_nodes),
1665
                                 errors.ECODE_STATE)
1666
    self.extra_lv_nodes = list(extra_lv_nodes)
1667

    
1668
  def _VerifyNode(self, ninfo, nresult):
1669
    """Perform some basic validation on data returned from a node.
1670

1671
      - check the result data structure is well formed and has all the
1672
        mandatory fields
1673
      - check ganeti version
1674

1675
    @type ninfo: L{objects.Node}
1676
    @param ninfo: the node to check
1677
    @param nresult: the results from the node
1678
    @rtype: boolean
1679
    @return: whether overall this call was successful (and we can expect
1680
         reasonable values in the respose)
1681

1682
    """
1683
    # main result, nresult should be a non-empty dict
1684
    test = not nresult or not isinstance(nresult, dict)
1685
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1686
                  "unable to verify node: no data returned")
1687
    if test:
1688
      return False
1689

    
1690
    # compares ganeti version
1691
    local_version = constants.PROTOCOL_VERSION
1692
    remote_version = nresult.get("version", None)
1693
    test = not (remote_version and
1694
                isinstance(remote_version, (list, tuple)) and
1695
                len(remote_version) == 2)
1696
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1697
                  "connection to node returned invalid data")
1698
    if test:
1699
      return False
1700

    
1701
    test = local_version != remote_version[0]
1702
    self._ErrorIf(test, constants.CV_ENODEVERSION, ninfo.name,
1703
                  "incompatible protocol versions: master %s,"
1704
                  " node %s", local_version, remote_version[0])
1705
    if test:
1706
      return False
1707

    
1708
    # node seems compatible, we can actually try to look into its results
1709

    
1710
    # full package version
1711
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1712
                  constants.CV_ENODEVERSION, ninfo.name,
1713
                  "software version mismatch: master %s, node %s",
1714
                  constants.RELEASE_VERSION, remote_version[1],
1715
                  code=self.ETYPE_WARNING)
1716

    
1717
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1718
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1719
      for hv_name, hv_result in hyp_result.iteritems():
1720
        test = hv_result is not None
1721
        self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
1722
                      "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1723

    
1724
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1725
    if ninfo.vm_capable and isinstance(hvp_result, list):
1726
      for item, hv_name, hv_result in hvp_result:
1727
        self._ErrorIf(True, constants.CV_ENODEHV, ninfo.name,
1728
                      "hypervisor %s parameter verify failure (source %s): %s",
1729
                      hv_name, item, hv_result)
1730

    
1731
    test = nresult.get(constants.NV_NODESETUP,
1732
                       ["Missing NODESETUP results"])
1733
    self._ErrorIf(test, constants.CV_ENODESETUP, ninfo.name,
1734
                  "node setup error: %s", "; ".join(test))
1735

    
1736
    return True
1737

    
1738
  def _VerifyNodeTime(self, ninfo, nresult,
1739
                      nvinfo_starttime, nvinfo_endtime):
1740
    """Check the node time.
1741

1742
    @type ninfo: L{objects.Node}
1743
    @param ninfo: the node to check
1744
    @param nresult: the remote results for the node
1745
    @param nvinfo_starttime: the start time of the RPC call
1746
    @param nvinfo_endtime: the end time of the RPC call
1747

1748
    """
1749
    ntime = nresult.get(constants.NV_TIME, None)
1750
    try:
1751
      ntime_merged = utils.MergeTime(ntime)
1752
    except (ValueError, TypeError):
1753
      self._ErrorIf(True, constants.CV_ENODETIME, ninfo.name,
1754
                    "Node returned invalid time")
1755
      return
1756

    
1757
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1758
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1759
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1760
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1761
    else:
1762
      ntime_diff = None
1763

    
1764
    self._ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, ninfo.name,
1765
                  "Node time diverges by at least %s from master node time",
1766
                  ntime_diff)
1767

    
1768
  def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1769
    """Check the node LVM results and update info for cross-node checks.
1770

1771
    @type ninfo: L{objects.Node}
1772
    @param ninfo: the node to check
1773
    @param nresult: the remote results for the node
1774
    @param vg_name: the configured VG name
1775
    @type nimg: L{NodeImage}
1776
    @param nimg: node image
1777

1778
    """
1779
    if vg_name is None:
1780
      return
1781

    
1782
    # checks vg existence and size > 20G
1783
    vglist = nresult.get(constants.NV_VGLIST, None)
1784
    test = not vglist
1785
    self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
1786
                  "unable to check volume groups")
1787
    if not test:
1788
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1789
                                            constants.MIN_VG_SIZE)
1790
      self._ErrorIf(vgstatus, constants.CV_ENODELVM, ninfo.name, vgstatus)
1791

    
1792
    # Check PVs
1793
    (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1794
    for em in errmsgs:
1795
      self._Error(constants.CV_ENODELVM, ninfo.name, em)
1796
    if pvminmax is not None:
1797
      (nimg.pv_min, nimg.pv_max) = pvminmax
1798

    
1799
  def _VerifyGroupDRBDVersion(self, node_verify_infos):
1800
    """Check cross-node DRBD version consistency.
1801

1802
    @type node_verify_infos: dict
1803
    @param node_verify_infos: infos about nodes as returned from the
1804
      node_verify call.
1805

1806
    """
1807
    node_versions = {}
1808
    for node_uuid, ndata in node_verify_infos.items():
1809
      nresult = ndata.payload
1810
      version = nresult.get(constants.NV_DRBDVERSION, "Missing DRBD version")
1811
      node_versions[node_uuid] = version
1812

    
1813
    if len(set(node_versions.values())) > 1:
1814
      for node_uuid, version in sorted(node_versions.items()):
1815
        msg = "DRBD version mismatch: %s" % version
1816
        self._Error(constants.CV_ENODEDRBDHELPER, node_uuid, msg,
1817
                    code=self.ETYPE_WARNING)
1818

    
1819
  def _VerifyGroupLVM(self, node_image, vg_name):
1820
    """Check cross-node consistency in LVM.
1821

1822
    @type node_image: dict
1823
    @param node_image: info about nodes, mapping from node to names to
1824
      L{NodeImage} objects
1825
    @param vg_name: the configured VG name
1826

1827
    """
1828
    if vg_name is None:
1829
      return
1830

    
1831
    # Only exclusive storage needs this kind of checks
1832
    if not self._exclusive_storage:
1833
      return
1834

    
1835
    # exclusive_storage wants all PVs to have the same size (approximately),
1836
    # if the smallest and the biggest ones are okay, everything is fine.
1837
    # pv_min is None iff pv_max is None
1838
    vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1839
    if not vals:
1840
      return
1841
    (pvmin, minnode_uuid) = min((ni.pv_min, ni.uuid) for ni in vals)
1842
    (pvmax, maxnode_uuid) = max((ni.pv_max, ni.uuid) for ni in vals)
1843
    bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1844
    self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1845
                  "PV sizes differ too much in the group; smallest (%s MB) is"
1846
                  " on %s, biggest (%s MB) is on %s",
1847
                  pvmin, self.cfg.GetNodeName(minnode_uuid),
1848
                  pvmax, self.cfg.GetNodeName(maxnode_uuid))
1849

    
1850
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1851
    """Check the node bridges.
1852

1853
    @type ninfo: L{objects.Node}
1854
    @param ninfo: the node to check
1855
    @param nresult: the remote results for the node
1856
    @param bridges: the expected list of bridges
1857

1858
    """
1859
    if not bridges:
1860
      return
1861

    
1862
    missing = nresult.get(constants.NV_BRIDGES, None)
1863
    test = not isinstance(missing, list)
1864
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1865
                  "did not return valid bridge information")
1866
    if not test:
1867
      self._ErrorIf(bool(missing), constants.CV_ENODENET, ninfo.name,
1868
                    "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1869

    
1870
  def _VerifyNodeUserScripts(self, ninfo, nresult):
1871
    """Check the results of user scripts presence and executability on the node
1872

1873
    @type ninfo: L{objects.Node}
1874
    @param ninfo: the node to check
1875
    @param nresult: the remote results for the node
1876

1877
    """
1878
    test = not constants.NV_USERSCRIPTS in nresult
1879
    self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1880
                  "did not return user scripts information")
1881

    
1882
    broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
1883
    if not test:
1884
      self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1885
                    "user scripts not present or not executable: %s" %
1886
                    utils.CommaJoin(sorted(broken_scripts)))
1887

    
1888
  def _VerifyNodeNetwork(self, ninfo, nresult):
1889
    """Check the node network connectivity results.
1890

1891
    @type ninfo: L{objects.Node}
1892
    @param ninfo: the node to check
1893
    @param nresult: the remote results for the node
1894

1895
    """
1896
    test = constants.NV_NODELIST not in nresult
1897
    self._ErrorIf(test, constants.CV_ENODESSH, ninfo.name,
1898
                  "node hasn't returned node ssh connectivity data")
1899
    if not test:
1900
      if nresult[constants.NV_NODELIST]:
1901
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1902
          self._ErrorIf(True, constants.CV_ENODESSH, ninfo.name,
1903
                        "ssh communication with node '%s': %s", a_node, a_msg)
1904

    
1905
    test = constants.NV_NODENETTEST not in nresult
1906
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1907
                  "node hasn't returned node tcp connectivity data")
1908
    if not test:
1909
      if nresult[constants.NV_NODENETTEST]:
1910
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1911
        for anode in nlist:
1912
          self._ErrorIf(True, constants.CV_ENODENET, ninfo.name,
1913
                        "tcp communication with node '%s': %s",
1914
                        anode, nresult[constants.NV_NODENETTEST][anode])
1915

    
1916
    test = constants.NV_MASTERIP not in nresult
1917
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1918
                  "node hasn't returned node master IP reachability data")
1919
    if not test:
1920
      if not nresult[constants.NV_MASTERIP]:
1921
        if ninfo.uuid == self.master_node:
1922
          msg = "the master node cannot reach the master IP (not configured?)"
1923
        else:
1924
          msg = "cannot reach the master IP"
1925
        self._ErrorIf(True, constants.CV_ENODENET, ninfo.name, msg)
1926

    
1927
  def _VerifyInstance(self, instance, node_image, diskstatus):
1928
    """Verify an instance.
1929

1930
    This function checks to see if the required block devices are
1931
    available on the instance's node, and that the nodes are in the correct
1932
    state.
1933

1934
    """
1935
    pnode_uuid = instance.primary_node
1936
    pnode_img = node_image[pnode_uuid]
1937
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
1938

    
1939
    node_vol_should = {}
1940
    instance.MapLVsByNode(node_vol_should)
1941

    
1942
    cluster = self.cfg.GetClusterInfo()
1943
    ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
1944
                                                            self.group_info)
1945
    err = ComputeIPolicyInstanceViolation(ipolicy, instance, self.cfg)
1946
    self._ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance.name,
1947
                  utils.CommaJoin(err), code=self.ETYPE_WARNING)
1948

    
1949
    for node_uuid in node_vol_should:
1950
      n_img = node_image[node_uuid]
1951
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1952
        # ignore missing volumes on offline or broken nodes
1953
        continue
1954
      for volume in node_vol_should[node_uuid]:
1955
        test = volume not in n_img.volumes
1956
        self._ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance.name,
1957
                      "volume %s missing on node %s", volume,
1958
                      self.cfg.GetNodeName(node_uuid))
1959

    
1960
    if instance.admin_state == constants.ADMINST_UP:
1961
      test = instance.uuid not in pnode_img.instances and not pnode_img.offline
1962
      self._ErrorIf(test, constants.CV_EINSTANCEDOWN, instance.name,
1963
                    "instance not running on its primary node %s",
1964
                     self.cfg.GetNodeName(pnode_uuid))
1965
      self._ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE,
1966
                    instance.name, "instance is marked as running and lives on"
1967
                    " offline node %s", self.cfg.GetNodeName(pnode_uuid))
1968

    
1969
    diskdata = [(nname, success, status, idx)
1970
                for (nname, disks) in diskstatus.items()
1971
                for idx, (success, status) in enumerate(disks)]
1972

    
1973
    for nname, success, bdev_status, idx in diskdata:
1974
      # the 'ghost node' construction in Exec() ensures that we have a
1975
      # node here
1976
      snode = node_image[nname]
1977
      bad_snode = snode.ghost or snode.offline
1978
      self._ErrorIf(instance.disks_active and
1979
                    not success and not bad_snode,
1980
                    constants.CV_EINSTANCEFAULTYDISK, instance.name,
1981
                    "couldn't retrieve status for disk/%s on %s: %s",
1982
                    idx, self.cfg.GetNodeName(nname), bdev_status)
1983

    
1984
      if instance.disks_active and success and \
1985
         (bdev_status.is_degraded or
1986
          bdev_status.ldisk_status != constants.LDS_OKAY):
1987
        msg = "disk/%s on %s" % (idx, self.cfg.GetNodeName(nname))
1988
        if bdev_status.is_degraded:
1989
          msg += " is degraded"
1990
        if bdev_status.ldisk_status != constants.LDS_OKAY:
1991
          msg += "; state is '%s'" % \
1992
                 constants.LDS_NAMES[bdev_status.ldisk_status]
1993

    
1994
        self._Error(constants.CV_EINSTANCEFAULTYDISK, instance.name, msg)
1995

    
1996
    self._ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
1997
                  constants.CV_ENODERPC, self.cfg.GetNodeName(pnode_uuid),
1998
                  "instance %s, connection to primary node failed",
1999
                  instance.name)
2000

    
2001
    self._ErrorIf(len(instance.secondary_nodes) > 1,
2002
                  constants.CV_EINSTANCELAYOUT, instance.name,
2003
                  "instance has multiple secondary nodes: %s",
2004
                  utils.CommaJoin(instance.secondary_nodes),
2005
                  code=self.ETYPE_WARNING)
2006

    
2007
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, instance.all_nodes)
2008
    if any(es_flags.values()):
2009
      if instance.disk_template not in constants.DTS_EXCL_STORAGE:
2010
        # Disk template not compatible with exclusive_storage: no instance
2011
        # node should have the flag set
2012
        es_nodes = [n
2013
                    for (n, es) in es_flags.items()
2014
                    if es]
2015
        self._Error(constants.CV_EINSTANCEUNSUITABLENODE, instance.name,
2016
                    "instance has template %s, which is not supported on nodes"
2017
                    " that have exclusive storage set: %s",
2018
                    instance.disk_template,
2019
                    utils.CommaJoin(self.cfg.GetNodeNames(es_nodes)))
2020
      for (idx, disk) in enumerate(instance.disks):
2021
        self._ErrorIf(disk.spindles is None,
2022
                      constants.CV_EINSTANCEMISSINGCFGPARAMETER, instance.name,
2023
                      "number of spindles not configured for disk %s while"
2024
                      " exclusive storage is enabled, try running"
2025
                      " gnt-cluster repair-disk-sizes", idx)
2026

    
2027
    if instance.disk_template in constants.DTS_INT_MIRROR:
2028
      instance_nodes = utils.NiceSort(instance.all_nodes)
2029
      instance_groups = {}
2030

    
2031
      for node_uuid in instance_nodes:
2032
        instance_groups.setdefault(self.all_node_info[node_uuid].group,
2033
                                   []).append(node_uuid)
2034

    
2035
      pretty_list = [
2036
        "%s (group %s)" % (utils.CommaJoin(self.cfg.GetNodeNames(nodes)),
2037
                           groupinfo[group].name)
2038
        # Sort so that we always list the primary node first.
2039
        for group, nodes in sorted(instance_groups.items(),
2040
                                   key=lambda (_, nodes): pnode_uuid in nodes,
2041
                                   reverse=True)]
2042

    
2043
      self._ErrorIf(len(instance_groups) > 1,
2044
                    constants.CV_EINSTANCESPLITGROUPS,
2045
                    instance.name, "instance has primary and secondary nodes in"
2046
                    " different groups: %s", utils.CommaJoin(pretty_list),
2047
                    code=self.ETYPE_WARNING)
2048

    
2049
    inst_nodes_offline = []
2050
    for snode in instance.secondary_nodes:
2051
      s_img = node_image[snode]
2052
      self._ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
2053
                    self.cfg.GetNodeName(snode),
2054
                    "instance %s, connection to secondary node failed",
2055
                    instance.name)
2056

    
2057
      if s_img.offline:
2058
        inst_nodes_offline.append(snode)
2059

    
2060
    # warn that the instance lives on offline nodes
2061
    self._ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE,
2062
                  instance.name, "instance has offline secondary node(s) %s",
2063
                  utils.CommaJoin(self.cfg.GetNodeNames(inst_nodes_offline)))
2064
    # ... or ghost/non-vm_capable nodes
2065
    for node_uuid in instance.all_nodes:
2066
      self._ErrorIf(node_image[node_uuid].ghost, constants.CV_EINSTANCEBADNODE,
2067
                    instance.name, "instance lives on ghost node %s",
2068
                    self.cfg.GetNodeName(node_uuid))
2069
      self._ErrorIf(not node_image[node_uuid].vm_capable,
2070
                    constants.CV_EINSTANCEBADNODE, instance.name,
2071
                    "instance lives on non-vm_capable node %s",
2072
                    self.cfg.GetNodeName(node_uuid))
2073

    
2074
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2075
    """Verify if there are any unknown volumes in the cluster.
2076

2077
    The .os, .swap and backup volumes are ignored. All other volumes are
2078
    reported as unknown.
2079

2080
    @type reserved: L{ganeti.utils.FieldSet}
2081
    @param reserved: a FieldSet of reserved volume names
2082

2083
    """
2084
    for node_uuid, n_img in node_image.items():
2085
      if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
2086
          self.all_node_info[node_uuid].group != self.group_uuid):
2087
        # skip non-healthy nodes
2088
        continue
2089
      for volume in n_img.volumes:
2090
        test = ((node_uuid not in node_vol_should or
2091
                volume not in node_vol_should[node_uuid]) and
2092
                not reserved.Matches(volume))
2093
        self._ErrorIf(test, constants.CV_ENODEORPHANLV,
2094
                      self.cfg.GetNodeName(node_uuid),
2095
                      "volume %s is unknown", volume)
2096

    
2097
  def _VerifyNPlusOneMemory(self, node_image, all_insts):
2098
    """Verify N+1 Memory Resilience.
2099

2100
    Check that if one single node dies we can still start all the
2101
    instances it was primary for.
2102

2103
    """
2104
    cluster_info = self.cfg.GetClusterInfo()
2105
    for node_uuid, n_img in node_image.items():
2106
      # This code checks that every node which is now listed as
2107
      # secondary has enough memory to host all instances it is
2108
      # supposed to should a single other node in the cluster fail.
2109
      # FIXME: not ready for failover to an arbitrary node
2110
      # FIXME: does not support file-backed instances
2111
      # WARNING: we currently take into account down instances as well
2112
      # as up ones, considering that even if they're down someone
2113
      # might want to start them even in the event of a node failure.
2114
      if n_img.offline or \
2115
         self.all_node_info[node_uuid].group != self.group_uuid:
2116
        # we're skipping nodes marked offline and nodes in other groups from
2117
        # the N+1 warning, since most likely we don't have good memory
2118
        # infromation from them; we already list instances living on such
2119
        # nodes, and that's enough warning
2120
        continue
2121
      #TODO(dynmem): also consider ballooning out other instances
2122
      for prinode, inst_uuids in n_img.sbp.items():
2123
        needed_mem = 0
2124
        for inst_uuid in inst_uuids:
2125
          bep = cluster_info.FillBE(all_insts[inst_uuid])
2126
          if bep[constants.BE_AUTO_BALANCE]:
2127
            needed_mem += bep[constants.BE_MINMEM]
2128
        test = n_img.mfree < needed_mem
2129
        self._ErrorIf(test, constants.CV_ENODEN1,
2130
                      self.cfg.GetNodeName(node_uuid),
2131
                      "not enough memory to accomodate instance failovers"
2132
                      " should node %s fail (%dMiB needed, %dMiB available)",
2133
                      self.cfg.GetNodeName(prinode), needed_mem, n_img.mfree)
2134

    
2135
  def _VerifyFiles(self, nodes, master_node_uuid, all_nvinfo,
2136
                   (files_all, files_opt, files_mc, files_vm)):
2137
    """Verifies file checksums collected from all nodes.
2138

2139
    @param nodes: List of L{objects.Node} objects
2140
    @param master_node_uuid: UUID of master node
2141
    @param all_nvinfo: RPC results
2142

2143
    """
2144
    # Define functions determining which nodes to consider for a file
2145
    files2nodefn = [
2146
      (files_all, None),
2147
      (files_mc, lambda node: (node.master_candidate or
2148
                               node.uuid == master_node_uuid)),
2149
      (files_vm, lambda node: node.vm_capable),
2150
      ]
2151

    
2152
    # Build mapping from filename to list of nodes which should have the file
2153
    nodefiles = {}
2154
    for (files, fn) in files2nodefn:
2155
      if fn is None:
2156
        filenodes = nodes
2157
      else:
2158
        filenodes = filter(fn, nodes)
2159
      nodefiles.update((filename,
2160
                        frozenset(map(operator.attrgetter("uuid"), filenodes)))
2161
                       for filename in files)
2162

    
2163
    assert set(nodefiles) == (files_all | files_mc | files_vm)
2164

    
2165
    fileinfo = dict((filename, {}) for filename in nodefiles)
2166
    ignore_nodes = set()
2167

    
2168
    for node in nodes:
2169
      if node.offline:
2170
        ignore_nodes.add(node.uuid)
2171
        continue
2172

    
2173
      nresult = all_nvinfo[node.uuid]
2174

    
2175
      if nresult.fail_msg or not nresult.payload:
2176
        node_files = None
2177
      else:
2178
        fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
2179
        node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2180
                          for (key, value) in fingerprints.items())
2181
        del fingerprints
2182

    
2183
      test = not (node_files and isinstance(node_files, dict))
2184
      self._ErrorIf(test, constants.CV_ENODEFILECHECK, node.name,
2185
                    "Node did not return file checksum data")
2186
      if test:
2187
        ignore_nodes.add(node.uuid)
2188
        continue
2189

    
2190
      # Build per-checksum mapping from filename to nodes having it
2191
      for (filename, checksum) in node_files.items():
2192
        assert filename in nodefiles
2193
        fileinfo[filename].setdefault(checksum, set()).add(node.uuid)
2194

    
2195
    for (filename, checksums) in fileinfo.items():
2196
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2197

    
2198
      # Nodes having the file
2199
      with_file = frozenset(node_uuid
2200
                            for node_uuids in fileinfo[filename].values()
2201
                            for node_uuid in node_uuids) - ignore_nodes
2202

    
2203
      expected_nodes = nodefiles[filename] - ignore_nodes
2204

    
2205
      # Nodes missing file
2206
      missing_file = expected_nodes - with_file
2207

    
2208
      if filename in files_opt:
2209
        # All or no nodes
2210
        self._ErrorIf(missing_file and missing_file != expected_nodes,
2211
                      constants.CV_ECLUSTERFILECHECK, None,
2212
                      "File %s is optional, but it must exist on all or no"
2213
                      " nodes (not found on %s)",
2214
                      filename,
2215
                      utils.CommaJoin(
2216
                        utils.NiceSort(
2217
                          map(self.cfg.GetNodeName, missing_file))))
2218
      else:
2219
        self._ErrorIf(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2220
                      "File %s is missing from node(s) %s", filename,
2221
                      utils.CommaJoin(
2222
                        utils.NiceSort(
2223
                          map(self.cfg.GetNodeName, missing_file))))
2224

    
2225
        # Warn if a node has a file it shouldn't
2226
        unexpected = with_file - expected_nodes
2227
        self._ErrorIf(unexpected,
2228
                      constants.CV_ECLUSTERFILECHECK, None,
2229
                      "File %s should not exist on node(s) %s",
2230
                      filename, utils.CommaJoin(
2231
                        utils.NiceSort(map(self.cfg.GetNodeName, unexpected))))
2232

    
2233
      # See if there are multiple versions of the file
2234
      test = len(checksums) > 1
2235
      if test:
2236
        variants = ["variant %s on %s" %
2237
                    (idx + 1,
2238
                     utils.CommaJoin(utils.NiceSort(
2239
                       map(self.cfg.GetNodeName, node_uuids))))
2240
                    for (idx, (checksum, node_uuids)) in
2241
                      enumerate(sorted(checksums.items()))]
2242
      else:
2243
        variants = []
2244

    
2245
      self._ErrorIf(test, constants.CV_ECLUSTERFILECHECK, None,
2246
                    "File %s found with %s different checksums (%s)",
2247
                    filename, len(checksums), "; ".join(variants))
2248

    
2249
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2250
                      drbd_map):
2251
    """Verifies and the node DRBD status.
2252

2253
    @type ninfo: L{objects.Node}
2254
    @param ninfo: the node to check
2255
    @param nresult: the remote results for the node
2256
    @param instanceinfo: the dict of instances
2257
    @param drbd_helper: the configured DRBD usermode helper
2258
    @param drbd_map: the DRBD map as returned by
2259
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2260

2261
    """
2262
    if drbd_helper:
2263
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2264
      test = (helper_result is None)
2265
      self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2266
                    "no drbd usermode helper returned")
2267
      if helper_result:
2268
        status, payload = helper_result
2269
        test = not status
2270
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2271
                      "drbd usermode helper check unsuccessful: %s", payload)
2272
        test = status and (payload != drbd_helper)
2273
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2274
                      "wrong drbd usermode helper: %s", payload)
2275

    
2276
    # compute the DRBD minors
2277
    node_drbd = {}
2278
    for minor, inst_uuid in drbd_map[ninfo.uuid].items():
2279
      test = inst_uuid not in instanceinfo
2280
      self._ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2281
                    "ghost instance '%s' in temporary DRBD map", inst_uuid)
2282
        # ghost instance should not be running, but otherwise we
2283
        # don't give double warnings (both ghost instance and
2284
        # unallocated minor in use)
2285
      if test:
2286
        node_drbd[minor] = (inst_uuid, False)
2287
      else:
2288
        instance = instanceinfo[inst_uuid]
2289
        node_drbd[minor] = (inst_uuid, instance.disks_active)
2290

    
2291
    # and now check them
2292
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2293
    test = not isinstance(used_minors, (tuple, list))
2294
    self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2295
                  "cannot parse drbd status file: %s", str(used_minors))
2296
    if test:
2297
      # we cannot check drbd status
2298
      return
2299

    
2300
    for minor, (inst_uuid, must_exist) in node_drbd.items():
2301
      test = minor not in used_minors and must_exist
2302
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2303
                    "drbd minor %d of instance %s is not active", minor,
2304
                    self.cfg.GetInstanceName(inst_uuid))
2305
    for minor in used_minors:
2306
      test = minor not in node_drbd
2307
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2308
                    "unallocated drbd minor %d is in use", minor)
2309

    
2310
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2311
    """Builds the node OS structures.
2312

2313
    @type ninfo: L{objects.Node}
2314
    @param ninfo: the node to check
2315
    @param nresult: the remote results for the node
2316
    @param nimg: the node image object
2317

2318
    """
2319
    remote_os = nresult.get(constants.NV_OSLIST, None)
2320
    test = (not isinstance(remote_os, list) or
2321
            not compat.all(isinstance(v, list) and len(v) == 7
2322
                           for v in remote_os))
2323

    
2324
    self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2325
                  "node hasn't returned valid OS data")
2326

    
2327
    nimg.os_fail = test
2328

    
2329
    if test:
2330
      return
2331

    
2332
    os_dict = {}
2333

    
2334
    for (name, os_path, status, diagnose,
2335
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2336

    
2337
      if name not in os_dict:
2338
        os_dict[name] = []
2339

    
2340
      # parameters is a list of lists instead of list of tuples due to
2341
      # JSON lacking a real tuple type, fix it:
2342
      parameters = [tuple(v) for v in parameters]
2343
      os_dict[name].append((os_path, status, diagnose,
2344
                            set(variants), set(parameters), set(api_ver)))
2345

    
2346
    nimg.oslist = os_dict
2347

    
2348
  def _VerifyNodeOS(self, ninfo, nimg, base):
2349
    """Verifies the node OS list.
2350

2351
    @type ninfo: L{objects.Node}
2352
    @param ninfo: the node to check
2353
    @param nimg: the node image object
2354
    @param base: the 'template' node we match against (e.g. from the master)
2355

2356
    """
2357
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2358

    
2359
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2360
    for os_name, os_data in nimg.oslist.items():
2361
      assert os_data, "Empty OS status for OS %s?!" % os_name
2362
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2363
      self._ErrorIf(not f_status, constants.CV_ENODEOS, ninfo.name,
2364
                    "Invalid OS %s (located at %s): %s",
2365
                    os_name, f_path, f_diag)
2366
      self._ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, ninfo.name,
2367
                    "OS '%s' has multiple entries"
2368
                    " (first one shadows the rest): %s",
2369
                    os_name, utils.CommaJoin([v[0] for v in os_data]))
2370
      # comparisons with the 'base' image
2371
      test = os_name not in base.oslist
2372
      self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2373
                    "Extra OS %s not present on reference node (%s)",
2374
                    os_name, self.cfg.GetNodeName(base.uuid))
2375
      if test:
2376
        continue
2377
      assert base.oslist[os_name], "Base node has empty OS status?"
2378
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2379
      if not b_status:
2380
        # base OS is invalid, skipping
2381
        continue
2382
      for kind, a, b in [("API version", f_api, b_api),
2383
                         ("variants list", f_var, b_var),
2384
                         ("parameters", beautify_params(f_param),
2385
                          beautify_params(b_param))]:
2386
        self._ErrorIf(a != b, constants.CV_ENODEOS, ninfo.name,
2387
                      "OS %s for %s differs from reference node %s:"
2388
                      " [%s] vs. [%s]", kind, os_name,
2389
                      self.cfg.GetNodeName(base.uuid),
2390
                      utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2391

    
2392
    # check any missing OSes
2393
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2394
    self._ErrorIf(missing, constants.CV_ENODEOS, ninfo.name,
2395
                  "OSes present on reference node %s"
2396
                  " but missing on this node: %s",
2397
                  self.cfg.GetNodeName(base.uuid), utils.CommaJoin(missing))
2398

    
2399
  def _VerifyAcceptedFileStoragePaths(self, ninfo, nresult, is_master):
2400
    """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2401

2402
    @type ninfo: L{objects.Node}
2403
    @param ninfo: the node to check
2404
    @param nresult: the remote results for the node
2405
    @type is_master: bool
2406
    @param is_master: Whether node is the master node
2407

2408
    """
2409
    cluster = self.cfg.GetClusterInfo()
2410
    if (is_master and
2411
        (cluster.IsFileStorageEnabled() or
2412
         cluster.IsSharedFileStorageEnabled())):
2413
      try:
2414
        fspaths = nresult[constants.NV_ACCEPTED_STORAGE_PATHS]
2415
      except KeyError:
2416
        # This should never happen
2417
        self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2418
                      "Node did not return forbidden file storage paths")
2419
      else:
2420
        self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2421
                      "Found forbidden file storage paths: %s",
2422
                      utils.CommaJoin(fspaths))
2423
    else:
2424
      self._ErrorIf(constants.NV_ACCEPTED_STORAGE_PATHS in nresult,
2425
                    constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2426
                    "Node should not have returned forbidden file storage"
2427
                    " paths")
2428

    
2429
  def _VerifyStoragePaths(self, ninfo, nresult, file_disk_template,
2430
                          verify_key, error_key):
2431
    """Verifies (file) storage paths.
2432

2433
    @type ninfo: L{objects.Node}
2434
    @param ninfo: the node to check
2435
    @param nresult: the remote results for the node
2436
    @type file_disk_template: string
2437
    @param file_disk_template: file-based disk template, whose directory
2438
        is supposed to be verified
2439
    @type verify_key: string
2440
    @param verify_key: key for the verification map of this file
2441
        verification step
2442
    @param error_key: error key to be added to the verification results
2443
        in case something goes wrong in this verification step
2444

2445
    """
2446
    assert (file_disk_template in
2447
            utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
2448
    cluster = self.cfg.GetClusterInfo()
2449
    if cluster.IsDiskTemplateEnabled(file_disk_template):
2450
      self._ErrorIf(
2451
          verify_key in nresult,
2452
          error_key, ninfo.name,
2453
          "The configured %s storage path is unusable: %s" %
2454
          (file_disk_template, nresult.get(verify_key)))
2455

    
2456
  def _VerifyFileStoragePaths(self, ninfo, nresult):
2457
    """Verifies (file) storage paths.
2458

2459
    @see: C{_VerifyStoragePaths}
2460

2461
    """
2462
    self._VerifyStoragePaths(
2463
        ninfo, nresult, constants.DT_FILE,
2464
        constants.NV_FILE_STORAGE_PATH,
2465
        constants.CV_ENODEFILESTORAGEPATHUNUSABLE)
2466

    
2467
  def _VerifySharedFileStoragePaths(self, ninfo, nresult):
2468
    """Verifies (file) storage paths.
2469

2470
    @see: C{_VerifyStoragePaths}
2471

2472
    """
2473
    self._VerifyStoragePaths(
2474
        ninfo, nresult, constants.DT_SHARED_FILE,
2475
        constants.NV_SHARED_FILE_STORAGE_PATH,
2476
        constants.CV_ENODESHAREDFILESTORAGEPATHUNUSABLE)
2477

    
2478
  def _VerifyOob(self, ninfo, nresult):
2479
    """Verifies out of band functionality of a node.
2480

2481
    @type ninfo: L{objects.Node}
2482
    @param ninfo: the node to check
2483
    @param nresult: the remote results for the node
2484

2485
    """
2486
    # We just have to verify the paths on master and/or master candidates
2487
    # as the oob helper is invoked on the master
2488
    if ((ninfo.master_candidate or ninfo.master_capable) and
2489
        constants.NV_OOB_PATHS in nresult):
2490
      for path_result in nresult[constants.NV_OOB_PATHS]:
2491
        self._ErrorIf(path_result, constants.CV_ENODEOOBPATH,
2492
                      ninfo.name, path_result)
2493

    
2494
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2495
    """Verifies and updates the node volume data.
2496

2497
    This function will update a L{NodeImage}'s internal structures
2498
    with data from the remote call.
2499

2500
    @type ninfo: L{objects.Node}
2501
    @param ninfo: the node to check
2502
    @param nresult: the remote results for the node
2503
    @param nimg: the node image object
2504
    @param vg_name: the configured VG name
2505

2506
    """
2507
    nimg.lvm_fail = True
2508
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2509
    if vg_name is None:
2510
      pass
2511
    elif isinstance(lvdata, basestring):
2512
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2513
                    "LVM problem on node: %s", utils.SafeEncode(lvdata))
2514
    elif not isinstance(lvdata, dict):
2515
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2516
                    "rpc call to node failed (lvlist)")
2517
    else:
2518
      nimg.volumes = lvdata
2519
      nimg.lvm_fail = False
2520

    
2521
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2522
    """Verifies and updates the node instance list.
2523

2524
    If the listing was successful, then updates this node's instance
2525
    list. Otherwise, it marks the RPC call as failed for the instance
2526
    list key.
2527

2528
    @type ninfo: L{objects.Node}
2529
    @param ninfo: the node to check
2530
    @param nresult: the remote results for the node
2531
    @param nimg: the node image object
2532

2533
    """
2534
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2535
    test = not isinstance(idata, list)
2536
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2537
                  "rpc call to node failed (instancelist): %s",
2538
                  utils.SafeEncode(str(idata)))
2539
    if test:
2540
      nimg.hyp_fail = True
2541
    else:
2542
      nimg.instances = [inst.uuid for (_, inst) in
2543
                        self.cfg.GetMultiInstanceInfoByName(idata)]
2544

    
2545
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2546
    """Verifies and computes a node information map
2547

2548
    @type ninfo: L{objects.Node}
2549
    @param ninfo: the node to check
2550
    @param nresult: the remote results for the node
2551
    @param nimg: the node image object
2552
    @param vg_name: the configured VG name
2553

2554
    """
2555
    # try to read free memory (from the hypervisor)
2556
    hv_info = nresult.get(constants.NV_HVINFO, None)
2557
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2558
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2559
                  "rpc call to node failed (hvinfo)")
2560
    if not test:
2561
      try:
2562
        nimg.mfree = int(hv_info["memory_free"])
2563
      except (ValueError, TypeError):
2564
        self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2565
                      "node returned invalid nodeinfo, check hypervisor")
2566

    
2567
    # FIXME: devise a free space model for file based instances as well
2568
    if vg_name is not None:
2569
      test = (constants.NV_VGLIST not in nresult or
2570
              vg_name not in nresult[constants.NV_VGLIST])
2571
      self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
2572
                    "node didn't return data for the volume group '%s'"
2573
                    " - it is either missing or broken", vg_name)
2574
      if not test:
2575
        try:
2576
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2577
        except (ValueError, TypeError):
2578
          self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2579
                        "node returned invalid LVM info, check LVM status")
2580

    
2581
  def _CollectDiskInfo(self, node_uuids, node_image, instanceinfo):
2582
    """Gets per-disk status information for all instances.
2583

2584
    @type node_uuids: list of strings
2585
    @param node_uuids: Node UUIDs
2586
    @type node_image: dict of (UUID, L{objects.Node})
2587
    @param node_image: Node objects
2588
    @type instanceinfo: dict of (UUID, L{objects.Instance})
2589
    @param instanceinfo: Instance objects
2590
    @rtype: {instance: {node: [(succes, payload)]}}
2591
    @return: a dictionary of per-instance dictionaries with nodes as
2592
        keys and disk information as values; the disk information is a
2593
        list of tuples (success, payload)
2594

2595
    """
2596
    node_disks = {}
2597
    node_disks_devonly = {}
2598
    diskless_instances = set()
2599
    diskless = constants.DT_DISKLESS
2600

    
2601
    for nuuid in node_uuids:
2602
      node_inst_uuids = list(itertools.chain(node_image[nuuid].pinst,
2603
                                             node_image[nuuid].sinst))
2604
      diskless_instances.update(uuid for uuid in node_inst_uuids
2605
                                if instanceinfo[uuid].disk_template == diskless)
2606
      disks = [(inst_uuid, disk)
2607
               for inst_uuid in node_inst_uuids
2608
               for disk in instanceinfo[inst_uuid].disks]
2609

    
2610
      if not disks:
2611
        # No need to collect data
2612
        continue
2613

    
2614
      node_disks[nuuid] = disks
2615

    
2616
      # _AnnotateDiskParams makes already copies of the disks
2617
      devonly = []
2618
      for (inst_uuid, dev) in disks:
2619
        (anno_disk,) = AnnotateDiskParams(instanceinfo[inst_uuid], [dev],
2620
                                          self.cfg)
2621
        self.cfg.SetDiskID(anno_disk, nuuid)
2622
        devonly.append(anno_disk)
2623

    
2624
      node_disks_devonly[nuuid] = devonly
2625

    
2626
    assert len(node_disks) == len(node_disks_devonly)
2627

    
2628
    # Collect data from all nodes with disks
2629
    result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2630
                                                          node_disks_devonly)
2631

    
2632
    assert len(result) == len(node_disks)
2633

    
2634
    instdisk = {}
2635

    
2636
    for (nuuid, nres) in result.items():
2637
      node = self.cfg.GetNodeInfo(nuuid)
2638
      disks = node_disks[node.uuid]
2639

    
2640
      if nres.offline:
2641
        # No data from this node
2642
        data = len(disks) * [(False, "node offline")]
2643
      else:
2644
        msg = nres.fail_msg
2645
        self._ErrorIf(msg, constants.CV_ENODERPC, node.name,
2646
                      "while getting disk information: %s", msg)
2647
        if msg:
2648
          # No data from this node
2649
          data = len(disks) * [(False, msg)]
2650
        else:
2651
          data = []
2652
          for idx, i in enumerate(nres.payload):
2653
            if isinstance(i, (tuple, list)) and len(i) == 2:
2654
              data.append(i)
2655
            else:
2656
              logging.warning("Invalid result from node %s, entry %d: %s",
2657
                              node.name, idx, i)
2658
              data.append((False, "Invalid result from the remote node"))
2659

    
2660
      for ((inst_uuid, _), status) in zip(disks, data):
2661
        instdisk.setdefault(inst_uuid, {}).setdefault(node.uuid, []) \
2662
          .append(status)
2663

    
2664
    # Add empty entries for diskless instances.
2665
    for inst_uuid in diskless_instances:
2666
      assert inst_uuid not in instdisk
2667
      instdisk[inst_uuid] = {}
2668

    
2669
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2670
                      len(nuuids) <= len(instanceinfo[inst].all_nodes) and
2671
                      compat.all(isinstance(s, (tuple, list)) and
2672
                                 len(s) == 2 for s in statuses)
2673
                      for inst, nuuids in instdisk.items()
2674
                      for nuuid, statuses in nuuids.items())
2675
    if __debug__:
2676
      instdisk_keys = set(instdisk)
2677
      instanceinfo_keys = set(instanceinfo)
2678
      assert instdisk_keys == instanceinfo_keys, \
2679
        ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2680
         (instdisk_keys, instanceinfo_keys))
2681

    
2682
    return instdisk
2683

    
2684
  @staticmethod
2685
  def _SshNodeSelector(group_uuid, all_nodes):
2686
    """Create endless iterators for all potential SSH check hosts.
2687

2688
    """
2689
    nodes = [node for node in all_nodes
2690
             if (node.group != group_uuid and
2691
                 not node.offline)]
2692
    keyfunc = operator.attrgetter("group")
2693

    
2694
    return map(itertools.cycle,
2695
               [sorted(map(operator.attrgetter("name"), names))
2696
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2697
                                                  keyfunc)])
2698

    
2699
  @classmethod
2700
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2701
    """Choose which nodes should talk to which other nodes.
2702

2703
    We will make nodes contact all nodes in their group, and one node from
2704
    every other group.
2705

2706
    @warning: This algorithm has a known issue if one node group is much
2707
      smaller than others (e.g. just one node). In such a case all other
2708
      nodes will talk to the single node.
2709

2710
    """
2711
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2712
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
2713

    
2714
    return (online_nodes,
2715
            dict((name, sorted([i.next() for i in sel]))
2716
                 for name in online_nodes))
2717

    
2718
  def BuildHooksEnv(self):
2719
    """Build hooks env.
2720

2721
    Cluster-Verify hooks just ran in the post phase and their failure makes
2722
    the output be logged in the verify output and the verification to fail.
2723

2724
    """
2725
    env = {
2726
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2727
      }
2728

    
2729
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2730
               for node in self.my_node_info.values())
2731

    
2732
    return env
2733

    
2734
  def BuildHooksNodes(self):
2735
    """Build hooks nodes.
2736

2737
    """
2738
    return ([], list(self.my_node_info.keys()))
2739

    
2740
  def Exec(self, feedback_fn):
2741
    """Verify integrity of the node group, performing various test on nodes.
2742

2743
    """
2744
    # This method has too many local variables. pylint: disable=R0914
2745
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
2746

    
2747
    if not self.my_node_uuids:
2748
      # empty node group
2749
      feedback_fn("* Empty node group, skipping verification")
2750
      return True
2751

    
2752
    self.bad = False
2753
    verbose = self.op.verbose
2754
    self._feedback_fn = feedback_fn
2755

    
2756
    vg_name = self.cfg.GetVGName()
2757
    drbd_helper = self.cfg.GetDRBDHelper()
2758
    cluster = self.cfg.GetClusterInfo()
2759
    hypervisors = cluster.enabled_hypervisors
2760
    node_data_list = self.my_node_info.values()
2761

    
2762
    i_non_redundant = [] # Non redundant instances
2763
    i_non_a_balanced = [] # Non auto-balanced instances
2764
    i_offline = 0 # Count of offline instances
2765
    n_offline = 0 # Count of offline nodes
2766
    n_drained = 0 # Count of nodes being drained
2767
    node_vol_should = {}
2768

    
2769
    # FIXME: verify OS list
2770

    
2771
    # File verification
2772
    filemap = ComputeAncillaryFiles(cluster, False)
2773

    
2774
    # do local checksums
2775
    master_node_uuid = self.master_node = self.cfg.GetMasterNode()
2776
    master_ip = self.cfg.GetMasterIP()
2777

    
2778
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_uuids))
2779

    
2780
    user_scripts = []
2781
    if self.cfg.GetUseExternalMipScript():
2782
      user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2783

    
2784
    node_verify_param = {
2785
      constants.NV_FILELIST:
2786
        map(vcluster.MakeVirtualPath,
2787
            utils.UniqueSequence(filename
2788
                                 for files in filemap
2789
                                 for filename in files)),
2790
      constants.NV_NODELIST:
2791
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2792
                                  self.all_node_info.values()),
2793
      constants.NV_HYPERVISOR: hypervisors,
2794
      constants.NV_HVPARAMS:
2795
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2796
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2797
                                 for node in node_data_list
2798
                                 if not node.offline],
2799
      constants.NV_INSTANCELIST: hypervisors,
2800
      constants.NV_VERSION: None,
2801
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2802
      constants.NV_NODESETUP: None,
2803
      constants.NV_TIME: None,
2804
      constants.NV_MASTERIP: (self.cfg.GetMasterNodeName(), master_ip),
2805
      constants.NV_OSLIST: None,
2806
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2807
      constants.NV_USERSCRIPTS: user_scripts,
2808
      }
2809

    
2810
    if vg_name is not None:
2811
      node_verify_param[constants.NV_VGLIST] = None
2812
      node_verify_param[constants.NV_LVLIST] = vg_name
2813
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2814

    
2815
    if drbd_helper:
2816
      node_verify_param[constants.NV_DRBDVERSION] = None
2817
      node_verify_param[constants.NV_DRBDLIST] = None
2818
      node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2819

    
2820
    if cluster.IsFileStorageEnabled() or \
2821
        cluster.IsSharedFileStorageEnabled():
2822
      # Load file storage paths only from master node
2823
      node_verify_param[constants.NV_ACCEPTED_STORAGE_PATHS] = \
2824
        self.cfg.GetMasterNodeName()
2825
      if cluster.IsFileStorageEnabled():
2826
        node_verify_param[constants.NV_FILE_STORAGE_PATH] = \
2827
          cluster.file_storage_dir
2828

    
2829
    # bridge checks
2830
    # FIXME: this needs to be changed per node-group, not cluster-wide
2831
    bridges = set()
2832
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2833
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2834
      bridges.add(default_nicpp[constants.NIC_LINK])
2835
    for inst_uuid in self.my_inst_info.values():
2836
      for nic in inst_uuid.nics:
2837
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
2838
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2839
          bridges.add(full_nic[constants.NIC_LINK])
2840

    
2841
    if bridges:
2842
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
2843

    
2844
    # Build our expected cluster state
2845
    node_image = dict((node.uuid, self.NodeImage(offline=node.offline,
2846
                                                 uuid=node.uuid,
2847
                                                 vm_capable=node.vm_capable))
2848
                      for node in node_data_list)
2849

    
2850
    # Gather OOB paths
2851
    oob_paths = []
2852
    for node in self.all_node_info.values():
2853
      path = SupportsOob(self.cfg, node)
2854
      if path and path not in oob_paths:
2855
        oob_paths.append(path)
2856

    
2857
    if oob_paths:
2858
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2859

    
2860
    for inst_uuid in self.my_inst_uuids:
2861
      instance = self.my_inst_info[inst_uuid]
2862
      if instance.admin_state == constants.ADMINST_OFFLINE:
2863
        i_offline += 1
2864

    
2865
      for nuuid in instance.all_nodes:
2866
        if nuuid not in node_image:
2867
          gnode = self.NodeImage(uuid=nuuid)
2868
          gnode.ghost = (nuuid not in self.all_node_info)
2869
          node_image[nuuid] = gnode
2870

    
2871
      instance.MapLVsByNode(node_vol_should)
2872

    
2873
      pnode = instance.primary_node
2874
      node_image[pnode].pinst.append(instance.uuid)
2875

    
2876
      for snode in instance.secondary_nodes:
2877
        nimg = node_image[snode]
2878
        nimg.sinst.append(instance.uuid)
2879
        if pnode not in nimg.sbp:
2880
          nimg.sbp[pnode] = []
2881
        nimg.sbp[pnode].append(instance.uuid)
2882

    
2883
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
2884
                                               self.my_node_info.keys())
2885
    # The value of exclusive_storage should be the same across the group, so if
2886
    # it's True for at least a node, we act as if it were set for all the nodes
2887
    self._exclusive_storage = compat.any(es_flags.values())
2888
    if self._exclusive_storage:
2889
      node_verify_param[constants.NV_EXCLUSIVEPVS] = True
2890

    
2891
    # At this point, we have the in-memory data structures complete,
2892
    # except for the runtime information, which we'll gather next
2893

    
2894
    # Due to the way our RPC system works, exact response times cannot be
2895
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2896
    # time before and after executing the request, we can at least have a time
2897
    # window.
2898
    nvinfo_starttime = time.time()
2899
    all_nvinfo = self.rpc.call_node_verify(self.my_node_uuids,
2900
                                           node_verify_param,
2901
                                           self.cfg.GetClusterName(),
2902
                                           self.cfg.GetClusterInfo().hvparams)
2903
    nvinfo_endtime = time.time()
2904

    
2905
    if self.extra_lv_nodes and vg_name is not None:
2906
      extra_lv_nvinfo = \
2907
          self.rpc.call_node_verify(self.extra_lv_nodes,
2908
                                    {constants.NV_LVLIST: vg_name},
2909
                                    self.cfg.GetClusterName(),
2910
                                    self.cfg.GetClusterInfo().hvparams)
2911
    else:
2912
      extra_lv_nvinfo = {}
2913

    
2914
    all_drbd_map = self.cfg.ComputeDRBDMap()
2915

    
2916
    feedback_fn("* Gathering disk information (%s nodes)" %
2917
                len(self.my_node_uuids))
2918
    instdisk = self._CollectDiskInfo(self.my_node_info.keys(), node_image,
2919
                                     self.my_inst_info)
2920

    
2921
    feedback_fn("* Verifying configuration file consistency")
2922

    
2923
    # If not all nodes are being checked, we need to make sure the master node
2924
    # and a non-checked vm_capable node are in the list.
2925
    absent_node_uuids = set(self.all_node_info).difference(self.my_node_info)
2926
    if absent_node_uuids:
2927
      vf_nvinfo = all_nvinfo.copy()
2928
      vf_node_info = list(self.my_node_info.values())
2929
      additional_node_uuids = []
2930
      if master_node_uuid not in self.my_node_info:
2931
        additional_node_uuids.append(master_node_uuid)
2932
        vf_node_info.append(self.all_node_info[master_node_uuid])
2933
      # Add the first vm_capable node we find which is not included,
2934
      # excluding the master node (which we already have)
2935
      for node_uuid in absent_node_uuids:
2936
        nodeinfo = self.all_node_info[node_uuid]
2937
        if (nodeinfo.vm_capable and not nodeinfo.offline and
2938
            node_uuid != master_node_uuid):
2939
          additional_node_uuids.append(node_uuid)
2940
          vf_node_info.append(self.all_node_info[node_uuid])
2941
          break
2942
      key = constants.NV_FILELIST
2943
      vf_nvinfo.update(self.rpc.call_node_verify(
2944
         additional_node_uuids, {key: node_verify_param[key]},
2945
         self.cfg.GetClusterName(), self.cfg.GetClusterInfo().hvparams))
2946
    else:
2947
      vf_nvinfo = all_nvinfo
2948
      vf_node_info = self.my_node_info.values()
2949

    
2950
    self._VerifyFiles(vf_node_info, master_node_uuid, vf_nvinfo, filemap)
2951

    
2952
    feedback_fn("* Verifying node status")
2953

    
2954
    refos_img = None
2955

    
2956
    for node_i in node_data_list:
2957
      nimg = node_image[node_i.uuid]
2958

    
2959
      if node_i.offline:
2960
        if verbose:
2961
          feedback_fn("* Skipping offline node %s" % (node_i.name,))
2962
        n_offline += 1
2963
        continue
2964

    
2965
      if node_i.uuid == master_node_uuid:
2966
        ntype = "master"
2967
      elif node_i.master_candidate:
2968
        ntype = "master candidate"
2969
      elif node_i.drained:
2970
        ntype = "drained"
2971
        n_drained += 1
2972
      else:
2973
        ntype = "regular"
2974
      if verbose:
2975
        feedback_fn("* Verifying node %s (%s)" % (node_i.name, ntype))
2976

    
2977
      msg = all_nvinfo[node_i.uuid].fail_msg
2978
      self._ErrorIf(msg, constants.CV_ENODERPC, node_i.name,
2979
                    "while contacting node: %s", msg)
2980
      if msg:
2981
        nimg.rpc_fail = True
2982
        continue
2983

    
2984
      nresult = all_nvinfo[node_i.uuid].payload
2985

    
2986
      nimg.call_ok = self._VerifyNode(node_i, nresult)
2987
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2988
      self._VerifyNodeNetwork(node_i, nresult)
2989
      self._VerifyNodeUserScripts(node_i, nresult)
2990
      self._VerifyOob(node_i, nresult)
2991
      self._VerifyAcceptedFileStoragePaths(node_i, nresult,
2992
                                           node_i.uuid == master_node_uuid)
2993
      self._VerifyFileStoragePaths(node_i, nresult)
2994
      self._VerifySharedFileStoragePaths(node_i, nresult)
2995

    
2996
      if nimg.vm_capable:
2997
        self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
2998
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
2999
                             all_drbd_map)
3000

    
3001
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
3002
        self._UpdateNodeInstances(node_i, nresult, nimg)
3003
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
3004
        self._UpdateNodeOS(node_i, nresult, nimg)
3005

    
3006
        if not nimg.os_fail:
3007
          if refos_img is None:
3008
            refos_img = nimg
3009
          self._VerifyNodeOS(node_i, nimg, refos_img)
3010
        self._VerifyNodeBridges(node_i, nresult, bridges)
3011

    
3012
        # Check whether all running instances are primary for the node. (This
3013
        # can no longer be done from _VerifyInstance below, since some of the
3014
        # wrong instances could be from other node groups.)
3015
        non_primary_inst_uuids = set(nimg.instances).difference(nimg.pinst)
3016

    
3017
        for inst_uuid in non_primary_inst_uuids:
3018
          test = inst_uuid in self.all_inst_info
3019
          self._ErrorIf(test, constants.CV_EINSTANCEWRONGNODE,
3020
                        self.cfg.GetInstanceName(inst_uuid),
3021
                        "instance should not run on node %s", node_i.name)
3022
          self._ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
3023
                        "node is running unknown instance %s", inst_uuid)
3024

    
3025
    self._VerifyGroupDRBDVersion(all_nvinfo)
3026
    self._VerifyGroupLVM(node_image, vg_name)
3027

    
3028
    for node_uuid, result in extra_lv_nvinfo.items():
3029
      self._UpdateNodeVolumes(self.all_node_info[node_uuid], result.payload,
3030
                              node_image[node_uuid], vg_name)
3031

    
3032
    feedback_fn("* Verifying instance status")
3033
    for inst_uuid in self.my_inst_uuids:
3034
      instance = self.my_inst_info[inst_uuid]
3035
      if verbose:
3036
        feedback_fn("* Verifying instance %s" % instance.name)
3037
      self._VerifyInstance(instance, node_image, instdisk[inst_uuid])
3038

    
3039
      # If the instance is non-redundant we cannot survive losing its primary
3040
      # node, so we are not N+1 compliant.
3041
      if instance.disk_template not in constants.DTS_MIRRORED:
3042
        i_non_redundant.append(instance)
3043

    
3044
      if not cluster.FillBE(instance)[constants.BE_AUTO_BALANCE]:
3045
        i_non_a_balanced.append(instance)
3046

    
3047
    feedback_fn("* Verifying orphan volumes")
3048
    reserved = utils.FieldSet(*cluster.reserved_lvs)
3049

    
3050
    # We will get spurious "unknown volume" warnings if any node of this group
3051
    # is secondary for an instance whose primary is in another group. To avoid
3052
    # them, we find these instances and add their volumes to node_vol_should.
3053
    for instance in self.all_inst_info.values():
3054
      for secondary in instance.secondary_nodes:
3055
        if (secondary in self.my_node_info
3056
            and instance.name not in self.my_inst_info):
3057
          instance.MapLVsByNode(node_vol_should)
3058
          break
3059

    
3060
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
3061

    
3062
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
3063
      feedback_fn("* Verifying N+1 Memory redundancy")
3064
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
3065

    
3066
    feedback_fn("* Other Notes")
3067
    if i_non_redundant:
3068
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
3069
                  % len(i_non_redundant))
3070

    
3071
    if i_non_a_balanced:
3072
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
3073
                  % len(i_non_a_balanced))
3074

    
3075
    if i_offline:
3076
      feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
3077

    
3078
    if n_offline:
3079
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
3080

    
3081
    if n_drained:
3082
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
3083

    
3084
    return not self.bad
3085

    
3086
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
3087
    """Analyze the post-hooks' result
3088

3089
    This method analyses the hook result, handles it, and sends some
3090
    nicely-formatted feedback back to the user.
3091

3092
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
3093
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
3094
    @param hooks_results: the results of the multi-node hooks rpc call
3095
    @param feedback_fn: function used send feedback back to the caller
3096
    @param lu_result: previous Exec result
3097
    @return: the new Exec result, based on the previous result
3098
        and hook results
3099

3100
    """
3101
    # We only really run POST phase hooks, only for non-empty groups,
3102
    # and are only interested in their results
3103
    if not self.my_node_uuids:
3104
      # empty node group
3105
      pass
3106
    elif phase == constants.HOOKS_PHASE_POST:
3107
      # Used to change hooks' output to proper indentation
3108
      feedback_fn("* Hooks Results")
3109
      assert hooks_results, "invalid result from hooks"
3110

    
3111
      for node_name in hooks_results:
3112
        res = hooks_results[node_name]
3113
        msg = res.fail_msg
3114
        test = msg and not res.offline
3115
        self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3116
                      "Communication failure in hooks execution: %s", msg)
3117
        if res.offline or msg:
3118
          # No need to investigate payload if node is offline or gave
3119
          # an error.
3120
          continue
3121
        for script, hkr, output in res.payload:
3122
          test = hkr == constants.HKR_FAIL
3123
          self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3124
                        "Script %s failed, output:", script)
3125
          if test:
3126
            output = self._HOOKS_INDENT_RE.sub("      ", output)
3127
            feedback_fn("%s" % output)
3128
            lu_result = False
3129

    
3130
    return lu_result
3131

    
3132

    
3133
class LUClusterVerifyDisks(NoHooksLU):
3134
  """Verifies the cluster disks status.
3135

3136
  """
3137
  REQ_BGL = False
3138

    
3139
  def ExpandNames(self):
3140
    self.share_locks = ShareAll()
3141
    self.needed_locks = {
3142
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
3143
      }
3144

    
3145
  def Exec(self, feedback_fn):
3146
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3147

    
3148
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3149
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3150
                           for group in group_names])