Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / cluster.py @ e8b5640e

History | View | Annotate | Download (115.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with the cluster."""
23

    
24
import OpenSSL
25

    
26
import copy
27
import itertools
28
import logging
29
import operator
30
import os
31
import re
32
import time
33

    
34
from ganeti import compat
35
from ganeti import constants
36
from ganeti import errors
37
from ganeti import hypervisor
38
from ganeti import locking
39
from ganeti import masterd
40
from ganeti import netutils
41
from ganeti import objects
42
from ganeti import opcodes
43
from ganeti import pathutils
44
from ganeti import query
45
from ganeti import rpc
46
from ganeti import runtime
47
from ganeti import ssh
48
from ganeti import uidpool
49
from ganeti import utils
50
from ganeti import vcluster
51

    
52
from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
53
  ResultWithJobs
54
from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55
  ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56
  GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57
  GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58
  CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59
  ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob
60

    
61
import ganeti.masterd.instance
62

    
63

    
64
class LUClusterActivateMasterIp(NoHooksLU):
65
  """Activate the master IP on the master node.
66

67
  """
68
  def Exec(self, feedback_fn):
69
    """Activate the master IP.
70

71
    """
72
    master_params = self.cfg.GetMasterNetworkParameters()
73
    ems = self.cfg.GetUseExternalMipScript()
74
    result = self.rpc.call_node_activate_master_ip(master_params.uuid,
75
                                                   master_params, ems)
76
    result.Raise("Could not activate the master IP")
77

    
78

    
79
class LUClusterDeactivateMasterIp(NoHooksLU):
80
  """Deactivate the master IP on the master node.
81

82
  """
83
  def Exec(self, feedback_fn):
84
    """Deactivate the master IP.
85

86
    """
87
    master_params = self.cfg.GetMasterNetworkParameters()
88
    ems = self.cfg.GetUseExternalMipScript()
89
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
90
                                                     master_params, ems)
91
    result.Raise("Could not deactivate the master IP")
92

    
93

    
94
class LUClusterConfigQuery(NoHooksLU):
95
  """Return configuration values.
96

97
  """
98
  REQ_BGL = False
99

    
100
  def CheckArguments(self):
101
    self.cq = ClusterQuery(None, self.op.output_fields, False)
102

    
103
  def ExpandNames(self):
104
    self.cq.ExpandNames(self)
105

    
106
  def DeclareLocks(self, level):
107
    self.cq.DeclareLocks(self, level)
108

    
109
  def Exec(self, feedback_fn):
110
    result = self.cq.OldStyleQuery(self)
111

    
112
    assert len(result) == 1
113

    
114
    return result[0]
115

    
116

    
117
class LUClusterDestroy(LogicalUnit):
118
  """Logical unit for destroying the cluster.
119

120
  """
121
  HPATH = "cluster-destroy"
122
  HTYPE = constants.HTYPE_CLUSTER
123

    
124
  def BuildHooksEnv(self):
125
    """Build hooks env.
126

127
    """
128
    return {
129
      "OP_TARGET": self.cfg.GetClusterName(),
130
      }
131

    
132
  def BuildHooksNodes(self):
133
    """Build hooks nodes.
134

135
    """
136
    return ([], [])
137

    
138
  def CheckPrereq(self):
139
    """Check prerequisites.
140

141
    This checks whether the cluster is empty.
142

143
    Any errors are signaled by raising errors.OpPrereqError.
144

145
    """
146
    master = self.cfg.GetMasterNode()
147

    
148
    nodelist = self.cfg.GetNodeList()
149
    if len(nodelist) != 1 or nodelist[0] != master:
150
      raise errors.OpPrereqError("There are still %d node(s) in"
151
                                 " this cluster." % (len(nodelist) - 1),
152
                                 errors.ECODE_INVAL)
153
    instancelist = self.cfg.GetInstanceList()
154
    if instancelist:
155
      raise errors.OpPrereqError("There are still %d instance(s) in"
156
                                 " this cluster." % len(instancelist),
157
                                 errors.ECODE_INVAL)
158

    
159
  def Exec(self, feedback_fn):
160
    """Destroys the cluster.
161

162
    """
163
    master_params = self.cfg.GetMasterNetworkParameters()
164

    
165
    # Run post hooks on master node before it's removed
166
    RunPostHook(self, self.cfg.GetNodeName(master_params.uuid))
167

    
168
    ems = self.cfg.GetUseExternalMipScript()
169
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
170
                                                     master_params, ems)
171
    result.Warn("Error disabling the master IP address", self.LogWarning)
172
    return master_params.uuid
173

    
174

    
175
class LUClusterPostInit(LogicalUnit):
176
  """Logical unit for running hooks after cluster initialization.
177

178
  """
179
  HPATH = "cluster-init"
180
  HTYPE = constants.HTYPE_CLUSTER
181

    
182
  def BuildHooksEnv(self):
183
    """Build hooks env.
184

185
    """
186
    return {
187
      "OP_TARGET": self.cfg.GetClusterName(),
188
      }
189

    
190
  def BuildHooksNodes(self):
191
    """Build hooks nodes.
192

193
    """
194
    return ([], [self.cfg.GetMasterNode()])
195

    
196
  def Exec(self, feedback_fn):
197
    """Nothing to do.
198

199
    """
200
    return True
201

    
202

    
203
class ClusterQuery(QueryBase):
204
  FIELDS = query.CLUSTER_FIELDS
205

    
206
  #: Do not sort (there is only one item)
207
  SORT_FIELD = None
208

    
209
  def ExpandNames(self, lu):
210
    lu.needed_locks = {}
211

    
212
    # The following variables interact with _QueryBase._GetNames
213
    self.wanted = locking.ALL_SET
214
    self.do_locking = self.use_locking
215

    
216
    if self.do_locking:
217
      raise errors.OpPrereqError("Can not use locking for cluster queries",
218
                                 errors.ECODE_INVAL)
219

    
220
  def DeclareLocks(self, lu, level):
221
    pass
222

    
223
  def _GetQueryData(self, lu):
224
    """Computes the list of nodes and their attributes.
225

226
    """
227
    # Locking is not used
228
    assert not (compat.any(lu.glm.is_owned(level)
229
                           for level in locking.LEVELS
230
                           if level != locking.LEVEL_CLUSTER) or
231
                self.do_locking or self.use_locking)
232

    
233
    if query.CQ_CONFIG in self.requested_data:
234
      cluster = lu.cfg.GetClusterInfo()
235
      nodes = lu.cfg.GetAllNodesInfo()
236
    else:
237
      cluster = NotImplemented
238
      nodes = NotImplemented
239

    
240
    if query.CQ_QUEUE_DRAINED in self.requested_data:
241
      drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
242
    else:
243
      drain_flag = NotImplemented
244

    
245
    if query.CQ_WATCHER_PAUSE in self.requested_data:
246
      master_node_uuid = lu.cfg.GetMasterNode()
247

    
248
      result = lu.rpc.call_get_watcher_pause(master_node_uuid)
249
      result.Raise("Can't retrieve watcher pause from master node '%s'" %
250
                   lu.cfg.GetMasterNodeName())
251

    
252
      watcher_pause = result.payload
253
    else:
254
      watcher_pause = NotImplemented
255

    
256
    return query.ClusterQueryData(cluster, nodes, drain_flag, watcher_pause)
257

    
258

    
259
class LUClusterQuery(NoHooksLU):
260
  """Query cluster configuration.
261

262
  """
263
  REQ_BGL = False
264

    
265
  def ExpandNames(self):
266
    self.needed_locks = {}
267

    
268
  def Exec(self, feedback_fn):
269
    """Return cluster config.
270

271
    """
272
    cluster = self.cfg.GetClusterInfo()
273
    os_hvp = {}
274

    
275
    # Filter just for enabled hypervisors
276
    for os_name, hv_dict in cluster.os_hvp.items():
277
      os_hvp[os_name] = {}
278
      for hv_name, hv_params in hv_dict.items():
279
        if hv_name in cluster.enabled_hypervisors:
280
          os_hvp[os_name][hv_name] = hv_params
281

    
282
    # Convert ip_family to ip_version
283
    primary_ip_version = constants.IP4_VERSION
284
    if cluster.primary_ip_family == netutils.IP6Address.family:
285
      primary_ip_version = constants.IP6_VERSION
286

    
287
    result = {
288
      "software_version": constants.RELEASE_VERSION,
289
      "protocol_version": constants.PROTOCOL_VERSION,
290
      "config_version": constants.CONFIG_VERSION,
291
      "os_api_version": max(constants.OS_API_VERSIONS),
292
      "export_version": constants.EXPORT_VERSION,
293
      "architecture": runtime.GetArchInfo(),
294
      "name": cluster.cluster_name,
295
      "master": self.cfg.GetMasterNodeName(),
296
      "default_hypervisor": cluster.primary_hypervisor,
297
      "enabled_hypervisors": cluster.enabled_hypervisors,
298
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
299
                        for hypervisor_name in cluster.enabled_hypervisors]),
300
      "os_hvp": os_hvp,
301
      "beparams": cluster.beparams,
302
      "osparams": cluster.osparams,
303
      "ipolicy": cluster.ipolicy,
304
      "nicparams": cluster.nicparams,
305
      "ndparams": cluster.ndparams,
306
      "diskparams": cluster.diskparams,
307
      "candidate_pool_size": cluster.candidate_pool_size,
308
      "master_netdev": cluster.master_netdev,
309
      "master_netmask": cluster.master_netmask,
310
      "use_external_mip_script": cluster.use_external_mip_script,
311
      "volume_group_name": cluster.volume_group_name,
312
      "drbd_usermode_helper": cluster.drbd_usermode_helper,
313
      "file_storage_dir": cluster.file_storage_dir,
314
      "shared_file_storage_dir": cluster.shared_file_storage_dir,
315
      "maintain_node_health": cluster.maintain_node_health,
316
      "ctime": cluster.ctime,
317
      "mtime": cluster.mtime,
318
      "uuid": cluster.uuid,
319
      "tags": list(cluster.GetTags()),
320
      "uid_pool": cluster.uid_pool,
321
      "default_iallocator": cluster.default_iallocator,
322
      "reserved_lvs": cluster.reserved_lvs,
323
      "primary_ip_version": primary_ip_version,
324
      "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
325
      "hidden_os": cluster.hidden_os,
326
      "blacklisted_os": cluster.blacklisted_os,
327
      "enabled_disk_templates": cluster.enabled_disk_templates,
328
      }
329

    
330
    return result
331

    
332

    
333
class LUClusterRedistConf(NoHooksLU):
334
  """Force the redistribution of cluster configuration.
335

336
  This is a very simple LU.
337

338
  """
339
  REQ_BGL = False
340

    
341
  def ExpandNames(self):
342
    self.needed_locks = {
343
      locking.LEVEL_NODE: locking.ALL_SET,
344
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
345
    }
346
    self.share_locks = ShareAll()
347

    
348
  def Exec(self, feedback_fn):
349
    """Redistribute the configuration.
350

351
    """
352
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
353
    RedistributeAncillaryFiles(self)
354

    
355

    
356
class LUClusterRename(LogicalUnit):
357
  """Rename the cluster.
358

359
  """
360
  HPATH = "cluster-rename"
361
  HTYPE = constants.HTYPE_CLUSTER
362

    
363
  def BuildHooksEnv(self):
364
    """Build hooks env.
365

366
    """
367
    return {
368
      "OP_TARGET": self.cfg.GetClusterName(),
369
      "NEW_NAME": self.op.name,
370
      }
371

    
372
  def BuildHooksNodes(self):
373
    """Build hooks nodes.
374

375
    """
376
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
377

    
378
  def CheckPrereq(self):
379
    """Verify that the passed name is a valid one.
380

381
    """
382
    hostname = netutils.GetHostname(name=self.op.name,
383
                                    family=self.cfg.GetPrimaryIPFamily())
384

    
385
    new_name = hostname.name
386
    self.ip = new_ip = hostname.ip
387
    old_name = self.cfg.GetClusterName()
388
    old_ip = self.cfg.GetMasterIP()
389
    if new_name == old_name and new_ip == old_ip:
390
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
391
                                 " cluster has changed",
392
                                 errors.ECODE_INVAL)
393
    if new_ip != old_ip:
394
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
395
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
396
                                   " reachable on the network" %
397
                                   new_ip, errors.ECODE_NOTUNIQUE)
398

    
399
    self.op.name = new_name
400

    
401
  def Exec(self, feedback_fn):
402
    """Rename the cluster.
403

404
    """
405
    clustername = self.op.name
406
    new_ip = self.ip
407

    
408
    # shutdown the master IP
409
    master_params = self.cfg.GetMasterNetworkParameters()
410
    ems = self.cfg.GetUseExternalMipScript()
411
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
412
                                                     master_params, ems)
413
    result.Raise("Could not disable the master role")
414

    
415
    try:
416
      cluster = self.cfg.GetClusterInfo()
417
      cluster.cluster_name = clustername
418
      cluster.master_ip = new_ip
419
      self.cfg.Update(cluster, feedback_fn)
420

    
421
      # update the known hosts file
422
      ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
423
      node_list = self.cfg.GetOnlineNodeList()
424
      try:
425
        node_list.remove(master_params.uuid)
426
      except ValueError:
427
        pass
428
      UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
429
    finally:
430
      master_params.ip = new_ip
431
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
432
                                                     master_params, ems)
433
      result.Warn("Could not re-enable the master role on the master,"
434
                  " please restart manually", self.LogWarning)
435

    
436
    return clustername
437

    
438

    
439
class LUClusterRepairDiskSizes(NoHooksLU):
440
  """Verifies the cluster disks sizes.
441

442
  """
443
  REQ_BGL = False
444

    
445
  def ExpandNames(self):
446
    if self.op.instances:
447
      (_, self.wanted_names) = GetWantedInstances(self, self.op.instances)
448
      # Not getting the node allocation lock as only a specific set of
449
      # instances (and their nodes) is going to be acquired
450
      self.needed_locks = {
451
        locking.LEVEL_NODE_RES: [],
452
        locking.LEVEL_INSTANCE: self.wanted_names,
453
        }
454
      self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
455
    else:
456
      self.wanted_names = None
457
      self.needed_locks = {
458
        locking.LEVEL_NODE_RES: locking.ALL_SET,
459
        locking.LEVEL_INSTANCE: locking.ALL_SET,
460

    
461
        # This opcode is acquires the node locks for all instances
462
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
463
        }
464

    
465
    self.share_locks = {
466
      locking.LEVEL_NODE_RES: 1,
467
      locking.LEVEL_INSTANCE: 0,
468
      locking.LEVEL_NODE_ALLOC: 1,
469
      }
470

    
471
  def DeclareLocks(self, level):
472
    if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
473
      self._LockInstancesNodes(primary_only=True, level=level)
474

    
475
  def CheckPrereq(self):
476
    """Check prerequisites.
477

478
    This only checks the optional instance list against the existing names.
479

480
    """
481
    if self.wanted_names is None:
482
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
483

    
484
    self.wanted_instances = \
485
        map(compat.snd, self.cfg.GetMultiInstanceInfoByName(self.wanted_names))
486

    
487
  def _EnsureChildSizes(self, disk):
488
    """Ensure children of the disk have the needed disk size.
489

490
    This is valid mainly for DRBD8 and fixes an issue where the
491
    children have smaller disk size.
492

493
    @param disk: an L{ganeti.objects.Disk} object
494

495
    """
496
    if disk.dev_type == constants.LD_DRBD8:
497
      assert disk.children, "Empty children for DRBD8?"
498
      fchild = disk.children[0]
499
      mismatch = fchild.size < disk.size
500
      if mismatch:
501
        self.LogInfo("Child disk has size %d, parent %d, fixing",
502
                     fchild.size, disk.size)
503
        fchild.size = disk.size
504

    
505
      # and we recurse on this child only, not on the metadev
506
      return self._EnsureChildSizes(fchild) or mismatch
507
    else:
508
      return False
509

    
510
  def Exec(self, feedback_fn):
511
    """Verify the size of cluster disks.
512

513
    """
514
    # TODO: check child disks too
515
    # TODO: check differences in size between primary/secondary nodes
516
    per_node_disks = {}
517
    for instance in self.wanted_instances:
518
      pnode = instance.primary_node
519
      if pnode not in per_node_disks:
520
        per_node_disks[pnode] = []
521
      for idx, disk in enumerate(instance.disks):
522
        per_node_disks[pnode].append((instance, idx, disk))
523

    
524
    assert not (frozenset(per_node_disks.keys()) -
525
                self.owned_locks(locking.LEVEL_NODE_RES)), \
526
      "Not owning correct locks"
527
    assert not self.owned_locks(locking.LEVEL_NODE)
528

    
529
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
530
                                               per_node_disks.keys())
531

    
532
    changed = []
533
    for node_uuid, dskl in per_node_disks.items():
534
      newl = [v[2].Copy() for v in dskl]
535
      for dsk in newl:
536
        self.cfg.SetDiskID(dsk, node_uuid)
537
      node_name = self.cfg.GetNodeName(node_uuid)
538
      result = self.rpc.call_blockdev_getdimensions(node_uuid, newl)
539
      if result.fail_msg:
540
        self.LogWarning("Failure in blockdev_getdimensions call to node"
541
                        " %s, ignoring", node_name)
542
        continue
543
      if len(result.payload) != len(dskl):
544
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
545
                        " result.payload=%s", node_name, len(dskl),
546
                        result.payload)
547
        self.LogWarning("Invalid result from node %s, ignoring node results",
548
                        node_name)
549
        continue
550
      for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
551
        if dimensions is None:
552
          self.LogWarning("Disk %d of instance %s did not return size"
553
                          " information, ignoring", idx, instance.name)
554
          continue
555
        if not isinstance(dimensions, (tuple, list)):
556
          self.LogWarning("Disk %d of instance %s did not return valid"
557
                          " dimension information, ignoring", idx,
558
                          instance.name)
559
          continue
560
        (size, spindles) = dimensions
561
        if not isinstance(size, (int, long)):
562
          self.LogWarning("Disk %d of instance %s did not return valid"
563
                          " size information, ignoring", idx, instance.name)
564
          continue
565
        size = size >> 20
566
        if size != disk.size:
567
          self.LogInfo("Disk %d of instance %s has mismatched size,"
568
                       " correcting: recorded %d, actual %d", idx,
569
                       instance.name, disk.size, size)
570
          disk.size = size
571
          self.cfg.Update(instance, feedback_fn)
572
          changed.append((instance.name, idx, "size", size))
573
        if es_flags[node_uuid]:
574
          if spindles is None:
575
            self.LogWarning("Disk %d of instance %s did not return valid"
576
                            " spindles information, ignoring", idx,
577
                            instance.name)
578
          elif disk.spindles is None or disk.spindles != spindles:
579
            self.LogInfo("Disk %d of instance %s has mismatched spindles,"
580
                         " correcting: recorded %s, actual %s",
581
                         idx, instance.name, disk.spindles, spindles)
582
            disk.spindles = spindles
583
            self.cfg.Update(instance, feedback_fn)
584
            changed.append((instance.name, idx, "spindles", disk.spindles))
585
        if self._EnsureChildSizes(disk):
586
          self.cfg.Update(instance, feedback_fn)
587
          changed.append((instance.name, idx, "size", disk.size))
588
    return changed
589

    
590

    
591
def _ValidateNetmask(cfg, netmask):
592
  """Checks if a netmask is valid.
593

594
  @type cfg: L{config.ConfigWriter}
595
  @param cfg: The cluster configuration
596
  @type netmask: int
597
  @param netmask: the netmask to be verified
598
  @raise errors.OpPrereqError: if the validation fails
599

600
  """
601
  ip_family = cfg.GetPrimaryIPFamily()
602
  try:
603
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
604
  except errors.ProgrammerError:
605
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
606
                               ip_family, errors.ECODE_INVAL)
607
  if not ipcls.ValidateNetmask(netmask):
608
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
609
                               (netmask), errors.ECODE_INVAL)
610

    
611

    
612
def CheckFileBasedStoragePathVsEnabledDiskTemplates(
613
    logging_warn_fn, file_storage_dir, enabled_disk_templates,
614
    file_disk_template):
615
  """Checks whether the given file-based storage directory is acceptable.
616

617
  Note: This function is public, because it is also used in bootstrap.py.
618

619
  @type logging_warn_fn: function
620
  @param logging_warn_fn: function which accepts a string and logs it
621
  @type file_storage_dir: string
622
  @param file_storage_dir: the directory to be used for file-based instances
623
  @type enabled_disk_templates: list of string
624
  @param enabled_disk_templates: the list of enabled disk templates
625
  @type file_disk_template: string
626
  @param file_disk_template: the file-based disk template for which the
627
      path should be checked
628

629
  """
630
  assert (file_disk_template in
631
          utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
632
  file_storage_enabled = file_disk_template in enabled_disk_templates
633
  if file_storage_dir is not None:
634
    if file_storage_dir == "":
635
      if file_storage_enabled:
636
        raise errors.OpPrereqError(
637
            "Unsetting the '%s' storage directory while having '%s' storage"
638
            " enabled is not permitted." %
639
            (file_disk_template, file_disk_template))
640
    else:
641
      if not file_storage_enabled:
642
        logging_warn_fn(
643
            "Specified a %s storage directory, although %s storage is not"
644
            " enabled." % (file_disk_template, file_disk_template))
645
  else:
646
    raise errors.ProgrammerError("Received %s storage dir with value"
647
                                 " 'None'." % file_disk_template)
648

    
649

    
650
def CheckFileStoragePathVsEnabledDiskTemplates(
651
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
652
  """Checks whether the given file storage directory is acceptable.
653

654
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
655

656
  """
657
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
658
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
659
      constants.DT_FILE)
660

    
661

    
662
def CheckSharedFileStoragePathVsEnabledDiskTemplates(
663
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
664
  """Checks whether the given shared file storage directory is acceptable.
665

666
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
667

668
  """
669
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
670
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
671
      constants.DT_SHARED_FILE)
672

    
673

    
674
class LUClusterSetParams(LogicalUnit):
675
  """Change the parameters of the cluster.
676

677
  """
678
  HPATH = "cluster-modify"
679
  HTYPE = constants.HTYPE_CLUSTER
680
  REQ_BGL = False
681

    
682
  def CheckArguments(self):
683
    """Check parameters
684

685
    """
686
    if self.op.uid_pool:
687
      uidpool.CheckUidPool(self.op.uid_pool)
688

    
689
    if self.op.add_uids:
690
      uidpool.CheckUidPool(self.op.add_uids)
691

    
692
    if self.op.remove_uids:
693
      uidpool.CheckUidPool(self.op.remove_uids)
694

    
695
    if self.op.master_netmask is not None:
696
      _ValidateNetmask(self.cfg, self.op.master_netmask)
697

    
698
    if self.op.diskparams:
699
      for dt_params in self.op.diskparams.values():
700
        utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
701
      try:
702
        utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
703
      except errors.OpPrereqError, err:
704
        raise errors.OpPrereqError("While verify diskparams options: %s" % err,
705
                                   errors.ECODE_INVAL)
706

    
707
  def ExpandNames(self):
708
    # FIXME: in the future maybe other cluster params won't require checking on
709
    # all nodes to be modified.
710
    # FIXME: This opcode changes cluster-wide settings. Is acquiring all
711
    # resource locks the right thing, shouldn't it be the BGL instead?
712
    self.needed_locks = {
713
      locking.LEVEL_NODE: locking.ALL_SET,
714
      locking.LEVEL_INSTANCE: locking.ALL_SET,
715
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
716
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
717
    }
718
    self.share_locks = ShareAll()
719

    
720
  def BuildHooksEnv(self):
721
    """Build hooks env.
722

723
    """
724
    return {
725
      "OP_TARGET": self.cfg.GetClusterName(),
726
      "NEW_VG_NAME": self.op.vg_name,
727
      }
728

    
729
  def BuildHooksNodes(self):
730
    """Build hooks nodes.
731

732
    """
733
    mn = self.cfg.GetMasterNode()
734
    return ([mn], [mn])
735

    
736
  def _CheckVgName(self, node_uuids, enabled_disk_templates,
737
                   new_enabled_disk_templates):
738
    """Check the consistency of the vg name on all nodes and in case it gets
739
       unset whether there are instances still using it.
740

741
    """
742
    if self.op.vg_name is not None and not self.op.vg_name:
743
      if self.cfg.HasAnyDiskOfType(constants.LD_LV):
744
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
745
                                   " instances exist", errors.ECODE_INVAL)
746

    
747
    if (self.op.vg_name is not None and
748
        utils.IsLvmEnabled(enabled_disk_templates)) or \
749
           (self.cfg.GetVGName() is not None and
750
            utils.LvmGetsEnabled(enabled_disk_templates,
751
                                 new_enabled_disk_templates)):
752
      self._CheckVgNameOnNodes(node_uuids)
753

    
754
  def _CheckVgNameOnNodes(self, node_uuids):
755
    """Check the status of the volume group on each node.
756

757
    """
758
    vglist = self.rpc.call_vg_list(node_uuids)
759
    for node_uuid in node_uuids:
760
      msg = vglist[node_uuid].fail_msg
761
      if msg:
762
        # ignoring down node
763
        self.LogWarning("Error while gathering data on node %s"
764
                        " (ignoring node): %s",
765
                        self.cfg.GetNodeName(node_uuid), msg)
766
        continue
767
      vgstatus = utils.CheckVolumeGroupSize(vglist[node_uuid].payload,
768
                                            self.op.vg_name,
769
                                            constants.MIN_VG_SIZE)
770
      if vgstatus:
771
        raise errors.OpPrereqError("Error on node '%s': %s" %
772
                                   (self.cfg.GetNodeName(node_uuid), vgstatus),
773
                                   errors.ECODE_ENVIRON)
774

    
775
  def _GetEnabledDiskTemplates(self, cluster):
776
    """Determines the enabled disk templates and the subset of disk templates
777
       that are newly enabled by this operation.
778

779
    """
780
    enabled_disk_templates = None
781
    new_enabled_disk_templates = []
782
    if self.op.enabled_disk_templates:
783
      enabled_disk_templates = self.op.enabled_disk_templates
784
      new_enabled_disk_templates = \
785
        list(set(enabled_disk_templates)
786
             - set(cluster.enabled_disk_templates))
787
    else:
788
      enabled_disk_templates = cluster.enabled_disk_templates
789
    return (enabled_disk_templates, new_enabled_disk_templates)
790

    
791
  def CheckPrereq(self):
792
    """Check prerequisites.
793

794
    This checks whether the given params don't conflict and
795
    if the given volume group is valid.
796

797
    """
798
    if self.op.drbd_helper is not None and not self.op.drbd_helper:
799
      if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
800
        raise errors.OpPrereqError("Cannot disable drbd helper while"
801
                                   " drbd-based instances exist",
802
                                   errors.ECODE_INVAL)
803

    
804
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
805
    self.cluster = cluster = self.cfg.GetClusterInfo()
806

    
807
    vm_capable_node_uuids = [node.uuid
808
                             for node in self.cfg.GetAllNodesInfo().values()
809
                             if node.uuid in node_uuids and node.vm_capable]
810

    
811
    (enabled_disk_templates, new_enabled_disk_templates) = \
812
      self._GetEnabledDiskTemplates(cluster)
813

    
814
    self._CheckVgName(vm_capable_node_uuids, enabled_disk_templates,
815
                      new_enabled_disk_templates)
816

    
817
    if self.op.file_storage_dir is not None:
818
      CheckFileStoragePathVsEnabledDiskTemplates(
819
          self.LogWarning, self.op.file_storage_dir, enabled_disk_templates)
820

    
821
    if self.op.drbd_helper:
822
      # checks given drbd helper on all nodes
823
      helpers = self.rpc.call_drbd_helper(node_uuids)
824
      for (_, ninfo) in self.cfg.GetMultiNodeInfo(node_uuids):
825
        if ninfo.offline:
826
          self.LogInfo("Not checking drbd helper on offline node %s",
827
                       ninfo.name)
828
          continue
829
        msg = helpers[ninfo.uuid].fail_msg
830
        if msg:
831
          raise errors.OpPrereqError("Error checking drbd helper on node"
832
                                     " '%s': %s" % (ninfo.name, msg),
833
                                     errors.ECODE_ENVIRON)
834
        node_helper = helpers[ninfo.uuid].payload
835
        if node_helper != self.op.drbd_helper:
836
          raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
837
                                     (ninfo.name, node_helper),
838
                                     errors.ECODE_ENVIRON)
839

    
840
    # validate params changes
841
    if self.op.beparams:
842
      objects.UpgradeBeParams(self.op.beparams)
843
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
844
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
845

    
846
    if self.op.ndparams:
847
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
848
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
849

    
850
      # TODO: we need a more general way to handle resetting
851
      # cluster-level parameters to default values
852
      if self.new_ndparams["oob_program"] == "":
853
        self.new_ndparams["oob_program"] = \
854
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
855

    
856
    if self.op.hv_state:
857
      new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
858
                                           self.cluster.hv_state_static)
859
      self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
860
                               for hv, values in new_hv_state.items())
861

    
862
    if self.op.disk_state:
863
      new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
864
                                               self.cluster.disk_state_static)
865
      self.new_disk_state = \
866
        dict((storage, dict((name, cluster.SimpleFillDiskState(values))
867
                            for name, values in svalues.items()))
868
             for storage, svalues in new_disk_state.items())
869

    
870
    if self.op.ipolicy:
871
      self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
872
                                           group_policy=False)
873

    
874
      all_instances = self.cfg.GetAllInstancesInfo().values()
875
      violations = set()
876
      for group in self.cfg.GetAllNodeGroupsInfo().values():
877
        instances = frozenset([inst for inst in all_instances
878
                               if compat.any(nuuid in group.members
879
                                             for nuuid in inst.all_nodes)])
880
        new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
881
        ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
882
        new = ComputeNewInstanceViolations(ipol, new_ipolicy, instances,
883
                                           self.cfg)
884
        if new:
885
          violations.update(new)
886

    
887
      if violations:
888
        self.LogWarning("After the ipolicy change the following instances"
889
                        " violate them: %s",
890
                        utils.CommaJoin(utils.NiceSort(violations)))
891

    
892
    if self.op.nicparams:
893
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
894
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
895
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
896
      nic_errors = []
897

    
898
      # check all instances for consistency
899
      for instance in self.cfg.GetAllInstancesInfo().values():
900
        for nic_idx, nic in enumerate(instance.nics):
901
          params_copy = copy.deepcopy(nic.nicparams)
902
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
903

    
904
          # check parameter syntax
905
          try:
906
            objects.NIC.CheckParameterSyntax(params_filled)
907
          except errors.ConfigurationError, err:
908
            nic_errors.append("Instance %s, nic/%d: %s" %
909
                              (instance.name, nic_idx, err))
910

    
911
          # if we're moving instances to routed, check that they have an ip
912
          target_mode = params_filled[constants.NIC_MODE]
913
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
914
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
915
                              " address" % (instance.name, nic_idx))
916
      if nic_errors:
917
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
918
                                   "\n".join(nic_errors), errors.ECODE_INVAL)
919

    
920
    # hypervisor list/parameters
921
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
922
    if self.op.hvparams:
923
      for hv_name, hv_dict in self.op.hvparams.items():
924
        if hv_name not in self.new_hvparams:
925
          self.new_hvparams[hv_name] = hv_dict
926
        else:
927
          self.new_hvparams[hv_name].update(hv_dict)
928

    
929
    # disk template parameters
930
    self.new_diskparams = objects.FillDict(cluster.diskparams, {})
931
    if self.op.diskparams:
932
      for dt_name, dt_params in self.op.diskparams.items():
933
        if dt_name not in self.op.diskparams:
934
          self.new_diskparams[dt_name] = dt_params
935
        else:
936
          self.new_diskparams[dt_name].update(dt_params)
937

    
938
    # os hypervisor parameters
939
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
940
    if self.op.os_hvp:
941
      for os_name, hvs in self.op.os_hvp.items():
942
        if os_name not in self.new_os_hvp:
943
          self.new_os_hvp[os_name] = hvs
944
        else:
945
          for hv_name, hv_dict in hvs.items():
946
            if hv_dict is None:
947
              # Delete if it exists
948
              self.new_os_hvp[os_name].pop(hv_name, None)
949
            elif hv_name not in self.new_os_hvp[os_name]:
950
              self.new_os_hvp[os_name][hv_name] = hv_dict
951
            else:
952
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
953

    
954
    # os parameters
955
    self.new_osp = objects.FillDict(cluster.osparams, {})
956
    if self.op.osparams:
957
      for os_name, osp in self.op.osparams.items():
958
        if os_name not in self.new_osp:
959
          self.new_osp[os_name] = {}
960

    
961
        self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
962
                                                 use_none=True)
963

    
964
        if not self.new_osp[os_name]:
965
          # we removed all parameters
966
          del self.new_osp[os_name]
967
        else:
968
          # check the parameter validity (remote check)
969
          CheckOSParams(self, False, [self.cfg.GetMasterNode()],
970
                        os_name, self.new_osp[os_name])
971

    
972
    # changes to the hypervisor list
973
    if self.op.enabled_hypervisors is not None:
974
      self.hv_list = self.op.enabled_hypervisors
975
      for hv in self.hv_list:
976
        # if the hypervisor doesn't already exist in the cluster
977
        # hvparams, we initialize it to empty, and then (in both
978
        # cases) we make sure to fill the defaults, as we might not
979
        # have a complete defaults list if the hypervisor wasn't
980
        # enabled before
981
        if hv not in new_hvp:
982
          new_hvp[hv] = {}
983
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
984
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
985
    else:
986
      self.hv_list = cluster.enabled_hypervisors
987

    
988
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
989
      # either the enabled list has changed, or the parameters have, validate
990
      for hv_name, hv_params in self.new_hvparams.items():
991
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
992
            (self.op.enabled_hypervisors and
993
             hv_name in self.op.enabled_hypervisors)):
994
          # either this is a new hypervisor, or its parameters have changed
995
          hv_class = hypervisor.GetHypervisorClass(hv_name)
996
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
997
          hv_class.CheckParameterSyntax(hv_params)
998
          CheckHVParams(self, node_uuids, hv_name, hv_params)
999

    
1000
    self._CheckDiskTemplateConsistency()
1001

    
1002
    if self.op.os_hvp:
1003
      # no need to check any newly-enabled hypervisors, since the
1004
      # defaults have already been checked in the above code-block
1005
      for os_name, os_hvp in self.new_os_hvp.items():
1006
        for hv_name, hv_params in os_hvp.items():
1007
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1008
          # we need to fill in the new os_hvp on top of the actual hv_p
1009
          cluster_defaults = self.new_hvparams.get(hv_name, {})
1010
          new_osp = objects.FillDict(cluster_defaults, hv_params)
1011
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1012
          hv_class.CheckParameterSyntax(new_osp)
1013
          CheckHVParams(self, node_uuids, hv_name, new_osp)
1014

    
1015
    if self.op.default_iallocator:
1016
      alloc_script = utils.FindFile(self.op.default_iallocator,
1017
                                    constants.IALLOCATOR_SEARCH_PATH,
1018
                                    os.path.isfile)
1019
      if alloc_script is None:
1020
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
1021
                                   " specified" % self.op.default_iallocator,
1022
                                   errors.ECODE_INVAL)
1023

    
1024
  def _CheckDiskTemplateConsistency(self):
1025
    """Check whether the disk templates that are going to be disabled
1026
       are still in use by some instances.
1027

1028
    """
1029
    if self.op.enabled_disk_templates:
1030
      cluster = self.cfg.GetClusterInfo()
1031
      instances = self.cfg.GetAllInstancesInfo()
1032

    
1033
      disk_templates_to_remove = set(cluster.enabled_disk_templates) \
1034
        - set(self.op.enabled_disk_templates)
1035
      for instance in instances.itervalues():
1036
        if instance.disk_template in disk_templates_to_remove:
1037
          raise errors.OpPrereqError("Cannot disable disk template '%s',"
1038
                                     " because instance '%s' is using it." %
1039
                                     (instance.disk_template, instance.name))
1040

    
1041
  def _SetVgName(self, feedback_fn):
1042
    """Determines and sets the new volume group name.
1043

1044
    """
1045
    if self.op.vg_name is not None:
1046
      if self.op.vg_name and not \
1047
           utils.IsLvmEnabled(self.cluster.enabled_disk_templates):
1048
        feedback_fn("Note that you specified a volume group, but did not"
1049
                    " enable any lvm disk template.")
1050
      new_volume = self.op.vg_name
1051
      if not new_volume:
1052
        if utils.IsLvmEnabled(self.cluster.enabled_disk_templates):
1053
          raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
1054
                                     " disk templates are enabled.")
1055
        new_volume = None
1056
      if new_volume != self.cfg.GetVGName():
1057
        self.cfg.SetVGName(new_volume)
1058
      else:
1059
        feedback_fn("Cluster LVM configuration already in desired"
1060
                    " state, not changing")
1061
    else:
1062
      if utils.IsLvmEnabled(self.cluster.enabled_disk_templates) and \
1063
          not self.cfg.GetVGName():
1064
        raise errors.OpPrereqError("Please specify a volume group when"
1065
                                   " enabling lvm-based disk-templates.")
1066

    
1067
  def _SetFileStorageDir(self, feedback_fn):
1068
    """Set the file storage directory.
1069

1070
    """
1071
    if self.op.file_storage_dir is not None:
1072
      if self.cluster.file_storage_dir == self.op.file_storage_dir:
1073
        feedback_fn("Global file storage dir already set to value '%s'"
1074
                    % self.cluster.file_storage_dir)
1075
      else:
1076
        self.cluster.file_storage_dir = self.op.file_storage_dir
1077

    
1078
  def Exec(self, feedback_fn):
1079
    """Change the parameters of the cluster.
1080

1081
    """
1082
    if self.op.enabled_disk_templates:
1083
      self.cluster.enabled_disk_templates = \
1084
        list(set(self.op.enabled_disk_templates))
1085

    
1086
    self._SetVgName(feedback_fn)
1087
    self._SetFileStorageDir(feedback_fn)
1088

    
1089
    if self.op.drbd_helper is not None:
1090
      if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
1091
        feedback_fn("Note that you specified a drbd user helper, but did"
1092
                    " enabled the drbd disk template.")
1093
      new_helper = self.op.drbd_helper
1094
      if not new_helper:
1095
        new_helper = None
1096
      if new_helper != self.cfg.GetDRBDHelper():
1097
        self.cfg.SetDRBDHelper(new_helper)
1098
      else:
1099
        feedback_fn("Cluster DRBD helper already in desired state,"
1100
                    " not changing")
1101
    if self.op.hvparams:
1102
      self.cluster.hvparams = self.new_hvparams
1103
    if self.op.os_hvp:
1104
      self.cluster.os_hvp = self.new_os_hvp
1105
    if self.op.enabled_hypervisors is not None:
1106
      self.cluster.hvparams = self.new_hvparams
1107
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1108
    if self.op.beparams:
1109
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1110
    if self.op.nicparams:
1111
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1112
    if self.op.ipolicy:
1113
      self.cluster.ipolicy = self.new_ipolicy
1114
    if self.op.osparams:
1115
      self.cluster.osparams = self.new_osp
1116
    if self.op.ndparams:
1117
      self.cluster.ndparams = self.new_ndparams
1118
    if self.op.diskparams:
1119
      self.cluster.diskparams = self.new_diskparams
1120
    if self.op.hv_state:
1121
      self.cluster.hv_state_static = self.new_hv_state
1122
    if self.op.disk_state:
1123
      self.cluster.disk_state_static = self.new_disk_state
1124

    
1125
    if self.op.candidate_pool_size is not None:
1126
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1127
      # we need to update the pool size here, otherwise the save will fail
1128
      AdjustCandidatePool(self, [])
1129

    
1130
    if self.op.maintain_node_health is not None:
1131
      if self.op.maintain_node_health and not constants.ENABLE_CONFD:
1132
        feedback_fn("Note: CONFD was disabled at build time, node health"
1133
                    " maintenance is not useful (still enabling it)")
1134
      self.cluster.maintain_node_health = self.op.maintain_node_health
1135

    
1136
    if self.op.modify_etc_hosts is not None:
1137
      self.cluster.modify_etc_hosts = self.op.modify_etc_hosts
1138

    
1139
    if self.op.prealloc_wipe_disks is not None:
1140
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
1141

    
1142
    if self.op.add_uids is not None:
1143
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
1144

    
1145
    if self.op.remove_uids is not None:
1146
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
1147

    
1148
    if self.op.uid_pool is not None:
1149
      self.cluster.uid_pool = self.op.uid_pool
1150

    
1151
    if self.op.default_iallocator is not None:
1152
      self.cluster.default_iallocator = self.op.default_iallocator
1153

    
1154
    if self.op.reserved_lvs is not None:
1155
      self.cluster.reserved_lvs = self.op.reserved_lvs
1156

    
1157
    if self.op.use_external_mip_script is not None:
1158
      self.cluster.use_external_mip_script = self.op.use_external_mip_script
1159

    
1160
    def helper_os(aname, mods, desc):
1161
      desc += " OS list"
1162
      lst = getattr(self.cluster, aname)
1163
      for key, val in mods:
1164
        if key == constants.DDM_ADD:
1165
          if val in lst:
1166
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1167
          else:
1168
            lst.append(val)
1169
        elif key == constants.DDM_REMOVE:
1170
          if val in lst:
1171
            lst.remove(val)
1172
          else:
1173
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1174
        else:
1175
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
1176

    
1177
    if self.op.hidden_os:
1178
      helper_os("hidden_os", self.op.hidden_os, "hidden")
1179

    
1180
    if self.op.blacklisted_os:
1181
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1182

    
1183
    if self.op.master_netdev:
1184
      master_params = self.cfg.GetMasterNetworkParameters()
1185
      ems = self.cfg.GetUseExternalMipScript()
1186
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
1187
                  self.cluster.master_netdev)
1188
      result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
1189
                                                       master_params, ems)
1190
      if not self.op.force:
1191
        result.Raise("Could not disable the master ip")
1192
      else:
1193
        if result.fail_msg:
1194
          msg = ("Could not disable the master ip (continuing anyway): %s" %
1195
                 result.fail_msg)
1196
          feedback_fn(msg)
1197
      feedback_fn("Changing master_netdev from %s to %s" %
1198
                  (master_params.netdev, self.op.master_netdev))
1199
      self.cluster.master_netdev = self.op.master_netdev
1200

    
1201
    if self.op.master_netmask:
1202
      master_params = self.cfg.GetMasterNetworkParameters()
1203
      feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1204
      result = self.rpc.call_node_change_master_netmask(
1205
                 master_params.uuid, master_params.netmask,
1206
                 self.op.master_netmask, master_params.ip,
1207
                 master_params.netdev)
1208
      result.Warn("Could not change the master IP netmask", feedback_fn)
1209
      self.cluster.master_netmask = self.op.master_netmask
1210

    
1211
    self.cfg.Update(self.cluster, feedback_fn)
1212

    
1213
    if self.op.master_netdev:
1214
      master_params = self.cfg.GetMasterNetworkParameters()
1215
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
1216
                  self.op.master_netdev)
1217
      ems = self.cfg.GetUseExternalMipScript()
1218
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
1219
                                                     master_params, ems)
1220
      result.Warn("Could not re-enable the master ip on the master,"
1221
                  " please restart manually", self.LogWarning)
1222

    
1223

    
1224
class LUClusterVerify(NoHooksLU):
1225
  """Submits all jobs necessary to verify the cluster.
1226

1227
  """
1228
  REQ_BGL = False
1229

    
1230
  def ExpandNames(self):
1231
    self.needed_locks = {}
1232

    
1233
  def Exec(self, feedback_fn):
1234
    jobs = []
1235

    
1236
    if self.op.group_name:
1237
      groups = [self.op.group_name]
1238
      depends_fn = lambda: None
1239
    else:
1240
      groups = self.cfg.GetNodeGroupList()
1241

    
1242
      # Verify global configuration
1243
      jobs.append([
1244
        opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1245
        ])
1246

    
1247
      # Always depend on global verification
1248
      depends_fn = lambda: [(-len(jobs), [])]
1249

    
1250
    jobs.extend(
1251
      [opcodes.OpClusterVerifyGroup(group_name=group,
1252
                                    ignore_errors=self.op.ignore_errors,
1253
                                    depends=depends_fn())]
1254
      for group in groups)
1255

    
1256
    # Fix up all parameters
1257
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1258
      op.debug_simulate_errors = self.op.debug_simulate_errors
1259
      op.verbose = self.op.verbose
1260
      op.error_codes = self.op.error_codes
1261
      try:
1262
        op.skip_checks = self.op.skip_checks
1263
      except AttributeError:
1264
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1265

    
1266
    return ResultWithJobs(jobs)
1267

    
1268

    
1269
class _VerifyErrors(object):
1270
  """Mix-in for cluster/group verify LUs.
1271

1272
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1273
  self.op and self._feedback_fn to be available.)
1274

1275
  """
1276

    
1277
  ETYPE_FIELD = "code"
1278
  ETYPE_ERROR = "ERROR"
1279
  ETYPE_WARNING = "WARNING"
1280

    
1281
  def _Error(self, ecode, item, msg, *args, **kwargs):
1282
    """Format an error message.
1283

1284
    Based on the opcode's error_codes parameter, either format a
1285
    parseable error code, or a simpler error string.
1286

1287
    This must be called only from Exec and functions called from Exec.
1288

1289
    """
1290
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1291
    itype, etxt, _ = ecode
1292
    # If the error code is in the list of ignored errors, demote the error to a
1293
    # warning
1294
    if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1295
      ltype = self.ETYPE_WARNING
1296
    # first complete the msg
1297
    if args:
1298
      msg = msg % args
1299
    # then format the whole message
1300
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1301
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1302
    else:
1303
      if item:
1304
        item = " " + item
1305
      else:
1306
        item = ""
1307
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1308
    # and finally report it via the feedback_fn
1309
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1310
    # do not mark the operation as failed for WARN cases only
1311
    if ltype == self.ETYPE_ERROR:
1312
      self.bad = True
1313

    
1314
  def _ErrorIf(self, cond, *args, **kwargs):
1315
    """Log an error message if the passed condition is True.
1316

1317
    """
1318
    if (bool(cond)
1319
        or self.op.debug_simulate_errors): # pylint: disable=E1101
1320
      self._Error(*args, **kwargs)
1321

    
1322

    
1323
def _VerifyCertificate(filename):
1324
  """Verifies a certificate for L{LUClusterVerifyConfig}.
1325

1326
  @type filename: string
1327
  @param filename: Path to PEM file
1328

1329
  """
1330
  try:
1331
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1332
                                           utils.ReadFile(filename))
1333
  except Exception, err: # pylint: disable=W0703
1334
    return (LUClusterVerifyConfig.ETYPE_ERROR,
1335
            "Failed to load X509 certificate %s: %s" % (filename, err))
1336

    
1337
  (errcode, msg) = \
1338
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1339
                                constants.SSL_CERT_EXPIRATION_ERROR)
1340

    
1341
  if msg:
1342
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1343
  else:
1344
    fnamemsg = None
1345

    
1346
  if errcode is None:
1347
    return (None, fnamemsg)
1348
  elif errcode == utils.CERT_WARNING:
1349
    return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1350
  elif errcode == utils.CERT_ERROR:
1351
    return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1352

    
1353
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1354

    
1355

    
1356
def _GetAllHypervisorParameters(cluster, instances):
1357
  """Compute the set of all hypervisor parameters.
1358

1359
  @type cluster: L{objects.Cluster}
1360
  @param cluster: the cluster object
1361
  @param instances: list of L{objects.Instance}
1362
  @param instances: additional instances from which to obtain parameters
1363
  @rtype: list of (origin, hypervisor, parameters)
1364
  @return: a list with all parameters found, indicating the hypervisor they
1365
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1366

1367
  """
1368
  hvp_data = []
1369

    
1370
  for hv_name in cluster.enabled_hypervisors:
1371
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1372

    
1373
  for os_name, os_hvp in cluster.os_hvp.items():
1374
    for hv_name, hv_params in os_hvp.items():
1375
      if hv_params:
1376
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1377
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1378

    
1379
  # TODO: collapse identical parameter values in a single one
1380
  for instance in instances:
1381
    if instance.hvparams:
1382
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1383
                       cluster.FillHV(instance)))
1384

    
1385
  return hvp_data
1386

    
1387

    
1388
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1389
  """Verifies the cluster config.
1390

1391
  """
1392
  REQ_BGL = False
1393

    
1394
  def _VerifyHVP(self, hvp_data):
1395
    """Verifies locally the syntax of the hypervisor parameters.
1396

1397
    """
1398
    for item, hv_name, hv_params in hvp_data:
1399
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1400
             (item, hv_name))
1401
      try:
1402
        hv_class = hypervisor.GetHypervisorClass(hv_name)
1403
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1404
        hv_class.CheckParameterSyntax(hv_params)
1405
      except errors.GenericError, err:
1406
        self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1407

    
1408
  def ExpandNames(self):
1409
    self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1410
    self.share_locks = ShareAll()
1411

    
1412
  def CheckPrereq(self):
1413
    """Check prerequisites.
1414

1415
    """
1416
    # Retrieve all information
1417
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1418
    self.all_node_info = self.cfg.GetAllNodesInfo()
1419
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1420

    
1421
  def Exec(self, feedback_fn):
1422
    """Verify integrity of cluster, performing various test on nodes.
1423

1424
    """
1425
    self.bad = False
1426
    self._feedback_fn = feedback_fn
1427

    
1428
    feedback_fn("* Verifying cluster config")
1429

    
1430
    for msg in self.cfg.VerifyConfig():
1431
      self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1432

    
1433
    feedback_fn("* Verifying cluster certificate files")
1434

    
1435
    for cert_filename in pathutils.ALL_CERT_FILES:
1436
      (errcode, msg) = _VerifyCertificate(cert_filename)
1437
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1438

    
1439
    self._ErrorIf(not utils.CanRead(constants.CONFD_USER,
1440
                                    pathutils.NODED_CERT_FILE),
1441
                  constants.CV_ECLUSTERCERT,
1442
                  None,
1443
                  pathutils.NODED_CERT_FILE + " must be accessible by the " +
1444
                    constants.CONFD_USER + " user")
1445

    
1446
    feedback_fn("* Verifying hypervisor parameters")
1447

    
1448
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1449
                                                self.all_inst_info.values()))
1450

    
1451
    feedback_fn("* Verifying all nodes belong to an existing group")
1452

    
1453
    # We do this verification here because, should this bogus circumstance
1454
    # occur, it would never be caught by VerifyGroup, which only acts on
1455
    # nodes/instances reachable from existing node groups.
1456

    
1457
    dangling_nodes = set(node for node in self.all_node_info.values()
1458
                         if node.group not in self.all_group_info)
1459

    
1460
    dangling_instances = {}
1461
    no_node_instances = []
1462

    
1463
    for inst in self.all_inst_info.values():
1464
      if inst.primary_node in [node.uuid for node in dangling_nodes]:
1465
        dangling_instances.setdefault(inst.primary_node, []).append(inst)
1466
      elif inst.primary_node not in self.all_node_info:
1467
        no_node_instances.append(inst)
1468

    
1469
    pretty_dangling = [
1470
        "%s (%s)" %
1471
        (node.name,
1472
         utils.CommaJoin(
1473
           self.cfg.GetInstanceNames(
1474
             dangling_instances.get(node.uuid, ["no instances"]))))
1475
        for node in dangling_nodes]
1476

    
1477
    self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1478
                  None,
1479
                  "the following nodes (and their instances) belong to a non"
1480
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1481

    
1482
    self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1483
                  None,
1484
                  "the following instances have a non-existing primary-node:"
1485
                  " %s", utils.CommaJoin(
1486
                           self.cfg.GetInstanceNames(no_node_instances)))
1487

    
1488
    return not self.bad
1489

    
1490

    
1491
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1492
  """Verifies the status of a node group.
1493

1494
  """
1495
  HPATH = "cluster-verify"
1496
  HTYPE = constants.HTYPE_CLUSTER
1497
  REQ_BGL = False
1498

    
1499
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1500

    
1501
  class NodeImage(object):
1502
    """A class representing the logical and physical status of a node.
1503

1504
    @type uuid: string
1505
    @ivar uuid: the node UUID to which this object refers
1506
    @ivar volumes: a structure as returned from
1507
        L{ganeti.backend.GetVolumeList} (runtime)
1508
    @ivar instances: a list of running instances (runtime)
1509
    @ivar pinst: list of configured primary instances (config)
1510
    @ivar sinst: list of configured secondary instances (config)
1511
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1512
        instances for which this node is secondary (config)
1513
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1514
    @ivar dfree: free disk, as reported by the node (runtime)
1515
    @ivar offline: the offline status (config)
1516
    @type rpc_fail: boolean
1517
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1518
        not whether the individual keys were correct) (runtime)
1519
    @type lvm_fail: boolean
1520
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1521
    @type hyp_fail: boolean
1522
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1523
    @type ghost: boolean
1524
    @ivar ghost: whether this is a known node or not (config)
1525
    @type os_fail: boolean
1526
    @ivar os_fail: whether the RPC call didn't return valid OS data
1527
    @type oslist: list
1528
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1529
    @type vm_capable: boolean
1530
    @ivar vm_capable: whether the node can host instances
1531
    @type pv_min: float
1532
    @ivar pv_min: size in MiB of the smallest PVs
1533
    @type pv_max: float
1534
    @ivar pv_max: size in MiB of the biggest PVs
1535

1536
    """
1537
    def __init__(self, offline=False, uuid=None, vm_capable=True):
1538
      self.uuid = uuid
1539
      self.volumes = {}
1540
      self.instances = []
1541
      self.pinst = []
1542
      self.sinst = []
1543
      self.sbp = {}
1544
      self.mfree = 0
1545
      self.dfree = 0
1546
      self.offline = offline
1547
      self.vm_capable = vm_capable
1548
      self.rpc_fail = False
1549
      self.lvm_fail = False
1550
      self.hyp_fail = False
1551
      self.ghost = False
1552
      self.os_fail = False
1553
      self.oslist = {}
1554
      self.pv_min = None
1555
      self.pv_max = None
1556

    
1557
  def ExpandNames(self):
1558
    # This raises errors.OpPrereqError on its own:
1559
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1560

    
1561
    # Get instances in node group; this is unsafe and needs verification later
1562
    inst_uuids = \
1563
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1564

    
1565
    self.needed_locks = {
1566
      locking.LEVEL_INSTANCE: self.cfg.GetInstanceNames(inst_uuids),
1567
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1568
      locking.LEVEL_NODE: [],
1569

    
1570
      # This opcode is run by watcher every five minutes and acquires all nodes
1571
      # for a group. It doesn't run for a long time, so it's better to acquire
1572
      # the node allocation lock as well.
1573
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1574
      }
1575

    
1576
    self.share_locks = ShareAll()
1577

    
1578
  def DeclareLocks(self, level):
1579
    if level == locking.LEVEL_NODE:
1580
      # Get members of node group; this is unsafe and needs verification later
1581
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1582

    
1583
      # In Exec(), we warn about mirrored instances that have primary and
1584
      # secondary living in separate node groups. To fully verify that
1585
      # volumes for these instances are healthy, we will need to do an
1586
      # extra call to their secondaries. We ensure here those nodes will
1587
      # be locked.
1588
      for inst_name in self.owned_locks(locking.LEVEL_INSTANCE):
1589
        # Important: access only the instances whose lock is owned
1590
        instance = self.cfg.GetInstanceInfoByName(inst_name)
1591
        if instance.disk_template in constants.DTS_INT_MIRROR:
1592
          nodes.update(instance.secondary_nodes)
1593

    
1594
      self.needed_locks[locking.LEVEL_NODE] = nodes
1595

    
1596
  def CheckPrereq(self):
1597
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1598
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1599

    
1600
    group_node_uuids = set(self.group_info.members)
1601
    group_inst_uuids = \
1602
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1603

    
1604
    unlocked_node_uuids = \
1605
        group_node_uuids.difference(self.owned_locks(locking.LEVEL_NODE))
1606

    
1607
    unlocked_inst_uuids = \
1608
        group_inst_uuids.difference(
1609
          [self.cfg.GetInstanceInfoByName(name).uuid
1610
           for name in self.owned_locks(locking.LEVEL_INSTANCE)])
1611

    
1612
    if unlocked_node_uuids:
1613
      raise errors.OpPrereqError(
1614
        "Missing lock for nodes: %s" %
1615
        utils.CommaJoin(self.cfg.GetNodeNames(unlocked_node_uuids)),
1616
        errors.ECODE_STATE)
1617

    
1618
    if unlocked_inst_uuids:
1619
      raise errors.OpPrereqError(
1620
        "Missing lock for instances: %s" %
1621
        utils.CommaJoin(self.cfg.GetInstanceNames(unlocked_inst_uuids)),
1622
        errors.ECODE_STATE)
1623

    
1624
    self.all_node_info = self.cfg.GetAllNodesInfo()
1625
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1626

    
1627
    self.my_node_uuids = group_node_uuids
1628
    self.my_node_info = dict((node_uuid, self.all_node_info[node_uuid])
1629
                             for node_uuid in group_node_uuids)
1630

    
1631
    self.my_inst_uuids = group_inst_uuids
1632
    self.my_inst_info = dict((inst_uuid, self.all_inst_info[inst_uuid])
1633
                             for inst_uuid in group_inst_uuids)
1634

    
1635
    # We detect here the nodes that will need the extra RPC calls for verifying
1636
    # split LV volumes; they should be locked.
1637
    extra_lv_nodes = set()
1638

    
1639
    for inst in self.my_inst_info.values():
1640
      if inst.disk_template in constants.DTS_INT_MIRROR:
1641
        for nuuid in inst.all_nodes:
1642
          if self.all_node_info[nuuid].group != self.group_uuid:
1643
            extra_lv_nodes.add(nuuid)
1644

    
1645
    unlocked_lv_nodes = \
1646
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1647

    
1648
    if unlocked_lv_nodes:
1649
      raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1650
                                 utils.CommaJoin(unlocked_lv_nodes),
1651
                                 errors.ECODE_STATE)
1652
    self.extra_lv_nodes = list(extra_lv_nodes)
1653

    
1654
  def _VerifyNode(self, ninfo, nresult):
1655
    """Perform some basic validation on data returned from a node.
1656

1657
      - check the result data structure is well formed and has all the
1658
        mandatory fields
1659
      - check ganeti version
1660

1661
    @type ninfo: L{objects.Node}
1662
    @param ninfo: the node to check
1663
    @param nresult: the results from the node
1664
    @rtype: boolean
1665
    @return: whether overall this call was successful (and we can expect
1666
         reasonable values in the respose)
1667

1668
    """
1669
    # main result, nresult should be a non-empty dict
1670
    test = not nresult or not isinstance(nresult, dict)
1671
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1672
                  "unable to verify node: no data returned")
1673
    if test:
1674
      return False
1675

    
1676
    # compares ganeti version
1677
    local_version = constants.PROTOCOL_VERSION
1678
    remote_version = nresult.get("version", None)
1679
    test = not (remote_version and
1680
                isinstance(remote_version, (list, tuple)) and
1681
                len(remote_version) == 2)
1682
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1683
                  "connection to node returned invalid data")
1684
    if test:
1685
      return False
1686

    
1687
    test = local_version != remote_version[0]
1688
    self._ErrorIf(test, constants.CV_ENODEVERSION, ninfo.name,
1689
                  "incompatible protocol versions: master %s,"
1690
                  " node %s", local_version, remote_version[0])
1691
    if test:
1692
      return False
1693

    
1694
    # node seems compatible, we can actually try to look into its results
1695

    
1696
    # full package version
1697
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1698
                  constants.CV_ENODEVERSION, ninfo.name,
1699
                  "software version mismatch: master %s, node %s",
1700
                  constants.RELEASE_VERSION, remote_version[1],
1701
                  code=self.ETYPE_WARNING)
1702

    
1703
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1704
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1705
      for hv_name, hv_result in hyp_result.iteritems():
1706
        test = hv_result is not None
1707
        self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
1708
                      "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1709

    
1710
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1711
    if ninfo.vm_capable and isinstance(hvp_result, list):
1712
      for item, hv_name, hv_result in hvp_result:
1713
        self._ErrorIf(True, constants.CV_ENODEHV, ninfo.name,
1714
                      "hypervisor %s parameter verify failure (source %s): %s",
1715
                      hv_name, item, hv_result)
1716

    
1717
    test = nresult.get(constants.NV_NODESETUP,
1718
                       ["Missing NODESETUP results"])
1719
    self._ErrorIf(test, constants.CV_ENODESETUP, ninfo.name,
1720
                  "node setup error: %s", "; ".join(test))
1721

    
1722
    return True
1723

    
1724
  def _VerifyNodeTime(self, ninfo, nresult,
1725
                      nvinfo_starttime, nvinfo_endtime):
1726
    """Check the node time.
1727

1728
    @type ninfo: L{objects.Node}
1729
    @param ninfo: the node to check
1730
    @param nresult: the remote results for the node
1731
    @param nvinfo_starttime: the start time of the RPC call
1732
    @param nvinfo_endtime: the end time of the RPC call
1733

1734
    """
1735
    ntime = nresult.get(constants.NV_TIME, None)
1736
    try:
1737
      ntime_merged = utils.MergeTime(ntime)
1738
    except (ValueError, TypeError):
1739
      self._ErrorIf(True, constants.CV_ENODETIME, ninfo.name,
1740
                    "Node returned invalid time")
1741
      return
1742

    
1743
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1744
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1745
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1746
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1747
    else:
1748
      ntime_diff = None
1749

    
1750
    self._ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, ninfo.name,
1751
                  "Node time diverges by at least %s from master node time",
1752
                  ntime_diff)
1753

    
1754
  def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1755
    """Check the node LVM results and update info for cross-node checks.
1756

1757
    @type ninfo: L{objects.Node}
1758
    @param ninfo: the node to check
1759
    @param nresult: the remote results for the node
1760
    @param vg_name: the configured VG name
1761
    @type nimg: L{NodeImage}
1762
    @param nimg: node image
1763

1764
    """
1765
    if vg_name is None:
1766
      return
1767

    
1768
    # checks vg existence and size > 20G
1769
    vglist = nresult.get(constants.NV_VGLIST, None)
1770
    test = not vglist
1771
    self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
1772
                  "unable to check volume groups")
1773
    if not test:
1774
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1775
                                            constants.MIN_VG_SIZE)
1776
      self._ErrorIf(vgstatus, constants.CV_ENODELVM, ninfo.name, vgstatus)
1777

    
1778
    # Check PVs
1779
    (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1780
    for em in errmsgs:
1781
      self._Error(constants.CV_ENODELVM, ninfo.name, em)
1782
    if pvminmax is not None:
1783
      (nimg.pv_min, nimg.pv_max) = pvminmax
1784

    
1785
  def _VerifyGroupDRBDVersion(self, node_verify_infos):
1786
    """Check cross-node DRBD version consistency.
1787

1788
    @type node_verify_infos: dict
1789
    @param node_verify_infos: infos about nodes as returned from the
1790
      node_verify call.
1791

1792
    """
1793
    node_versions = {}
1794
    for node_uuid, ndata in node_verify_infos.items():
1795
      nresult = ndata.payload
1796
      version = nresult.get(constants.NV_DRBDVERSION, "Missing DRBD version")
1797
      node_versions[node_uuid] = version
1798

    
1799
    if len(set(node_versions.values())) > 1:
1800
      for node_uuid, version in sorted(node_versions.items()):
1801
        msg = "DRBD version mismatch: %s" % version
1802
        self._Error(constants.CV_ENODEDRBDHELPER, node_uuid, msg,
1803
                    code=self.ETYPE_WARNING)
1804

    
1805
  def _VerifyGroupLVM(self, node_image, vg_name):
1806
    """Check cross-node consistency in LVM.
1807

1808
    @type node_image: dict
1809
    @param node_image: info about nodes, mapping from node to names to
1810
      L{NodeImage} objects
1811
    @param vg_name: the configured VG name
1812

1813
    """
1814
    if vg_name is None:
1815
      return
1816

    
1817
    # Only exclusive storage needs this kind of checks
1818
    if not self._exclusive_storage:
1819
      return
1820

    
1821
    # exclusive_storage wants all PVs to have the same size (approximately),
1822
    # if the smallest and the biggest ones are okay, everything is fine.
1823
    # pv_min is None iff pv_max is None
1824
    vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1825
    if not vals:
1826
      return
1827
    (pvmin, minnode_uuid) = min((ni.pv_min, ni.uuid) for ni in vals)
1828
    (pvmax, maxnode_uuid) = max((ni.pv_max, ni.uuid) for ni in vals)
1829
    bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1830
    self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1831
                  "PV sizes differ too much in the group; smallest (%s MB) is"
1832
                  " on %s, biggest (%s MB) is on %s",
1833
                  pvmin, self.cfg.GetNodeName(minnode_uuid),
1834
                  pvmax, self.cfg.GetNodeName(maxnode_uuid))
1835

    
1836
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1837
    """Check the node bridges.
1838

1839
    @type ninfo: L{objects.Node}
1840
    @param ninfo: the node to check
1841
    @param nresult: the remote results for the node
1842
    @param bridges: the expected list of bridges
1843

1844
    """
1845
    if not bridges:
1846
      return
1847

    
1848
    missing = nresult.get(constants.NV_BRIDGES, None)
1849
    test = not isinstance(missing, list)
1850
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1851
                  "did not return valid bridge information")
1852
    if not test:
1853
      self._ErrorIf(bool(missing), constants.CV_ENODENET, ninfo.name,
1854
                    "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1855

    
1856
  def _VerifyNodeUserScripts(self, ninfo, nresult):
1857
    """Check the results of user scripts presence and executability on the node
1858

1859
    @type ninfo: L{objects.Node}
1860
    @param ninfo: the node to check
1861
    @param nresult: the remote results for the node
1862

1863
    """
1864
    test = not constants.NV_USERSCRIPTS in nresult
1865
    self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1866
                  "did not return user scripts information")
1867

    
1868
    broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
1869
    if not test:
1870
      self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1871
                    "user scripts not present or not executable: %s" %
1872
                    utils.CommaJoin(sorted(broken_scripts)))
1873

    
1874
  def _VerifyNodeNetwork(self, ninfo, nresult):
1875
    """Check the node network connectivity results.
1876

1877
    @type ninfo: L{objects.Node}
1878
    @param ninfo: the node to check
1879
    @param nresult: the remote results for the node
1880

1881
    """
1882
    test = constants.NV_NODELIST not in nresult
1883
    self._ErrorIf(test, constants.CV_ENODESSH, ninfo.name,
1884
                  "node hasn't returned node ssh connectivity data")
1885
    if not test:
1886
      if nresult[constants.NV_NODELIST]:
1887
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1888
          self._ErrorIf(True, constants.CV_ENODESSH, ninfo.name,
1889
                        "ssh communication with node '%s': %s", a_node, a_msg)
1890

    
1891
    test = constants.NV_NODENETTEST not in nresult
1892
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1893
                  "node hasn't returned node tcp connectivity data")
1894
    if not test:
1895
      if nresult[constants.NV_NODENETTEST]:
1896
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1897
        for anode in nlist:
1898
          self._ErrorIf(True, constants.CV_ENODENET, ninfo.name,
1899
                        "tcp communication with node '%s': %s",
1900
                        anode, nresult[constants.NV_NODENETTEST][anode])
1901

    
1902
    test = constants.NV_MASTERIP not in nresult
1903
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1904
                  "node hasn't returned node master IP reachability data")
1905
    if not test:
1906
      if not nresult[constants.NV_MASTERIP]:
1907
        if ninfo.uuid == self.master_node:
1908
          msg = "the master node cannot reach the master IP (not configured?)"
1909
        else:
1910
          msg = "cannot reach the master IP"
1911
        self._ErrorIf(True, constants.CV_ENODENET, ninfo.name, msg)
1912

    
1913
  def _VerifyInstance(self, instance, node_image, diskstatus):
1914
    """Verify an instance.
1915

1916
    This function checks to see if the required block devices are
1917
    available on the instance's node, and that the nodes are in the correct
1918
    state.
1919

1920
    """
1921
    pnode_uuid = instance.primary_node
1922
    pnode_img = node_image[pnode_uuid]
1923
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
1924

    
1925
    node_vol_should = {}
1926
    instance.MapLVsByNode(node_vol_should)
1927

    
1928
    cluster = self.cfg.GetClusterInfo()
1929
    ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
1930
                                                            self.group_info)
1931
    err = ComputeIPolicyInstanceViolation(ipolicy, instance, self.cfg)
1932
    self._ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance.name,
1933
                  utils.CommaJoin(err), code=self.ETYPE_WARNING)
1934

    
1935
    for node_uuid in node_vol_should:
1936
      n_img = node_image[node_uuid]
1937
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1938
        # ignore missing volumes on offline or broken nodes
1939
        continue
1940
      for volume in node_vol_should[node_uuid]:
1941
        test = volume not in n_img.volumes
1942
        self._ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance.name,
1943
                      "volume %s missing on node %s", volume,
1944
                      self.cfg.GetNodeName(node_uuid))
1945

    
1946
    if instance.admin_state == constants.ADMINST_UP:
1947
      test = instance.uuid not in pnode_img.instances and not pnode_img.offline
1948
      self._ErrorIf(test, constants.CV_EINSTANCEDOWN, instance.name,
1949
                    "instance not running on its primary node %s",
1950
                     self.cfg.GetNodeName(pnode_uuid))
1951
      self._ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE,
1952
                    instance.name, "instance is marked as running and lives on"
1953
                    " offline node %s", self.cfg.GetNodeName(pnode_uuid))
1954

    
1955
    diskdata = [(nname, success, status, idx)
1956
                for (nname, disks) in diskstatus.items()
1957
                for idx, (success, status) in enumerate(disks)]
1958

    
1959
    for nname, success, bdev_status, idx in diskdata:
1960
      # the 'ghost node' construction in Exec() ensures that we have a
1961
      # node here
1962
      snode = node_image[nname]
1963
      bad_snode = snode.ghost or snode.offline
1964
      self._ErrorIf(instance.disks_active and
1965
                    not success and not bad_snode,
1966
                    constants.CV_EINSTANCEFAULTYDISK, instance.name,
1967
                    "couldn't retrieve status for disk/%s on %s: %s",
1968
                    idx, self.cfg.GetNodeName(nname), bdev_status)
1969

    
1970
      if instance.disks_active and success and \
1971
         (bdev_status.is_degraded or
1972
          bdev_status.ldisk_status != constants.LDS_OKAY):
1973
        msg = "disk/%s on %s" % (idx, self.cfg.GetNodeName(nname))
1974
        if bdev_status.is_degraded:
1975
          msg += " is degraded"
1976
        if bdev_status.ldisk_status != constants.LDS_OKAY:
1977
          msg += "; state is '%s'" % \
1978
                 constants.LDS_NAMES[bdev_status.ldisk_status]
1979

    
1980
        self._Error(constants.CV_EINSTANCEFAULTYDISK, instance.name, msg)
1981

    
1982
    self._ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
1983
                  constants.CV_ENODERPC, self.cfg.GetNodeName(pnode_uuid),
1984
                  "instance %s, connection to primary node failed",
1985
                  instance.name)
1986

    
1987
    self._ErrorIf(len(instance.secondary_nodes) > 1,
1988
                  constants.CV_EINSTANCELAYOUT, instance.name,
1989
                  "instance has multiple secondary nodes: %s",
1990
                  utils.CommaJoin(instance.secondary_nodes),
1991
                  code=self.ETYPE_WARNING)
1992

    
1993
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, instance.all_nodes)
1994
    if any(es_flags.values()):
1995
      if instance.disk_template not in constants.DTS_EXCL_STORAGE:
1996
        # Disk template not compatible with exclusive_storage: no instance
1997
        # node should have the flag set
1998
        es_nodes = [n
1999
                    for (n, es) in es_flags.items()
2000
                    if es]
2001
        self._Error(constants.CV_EINSTANCEUNSUITABLENODE, instance.name,
2002
                    "instance has template %s, which is not supported on nodes"
2003
                    " that have exclusive storage set: %s",
2004
                    instance.disk_template,
2005
                    utils.CommaJoin(self.cfg.GetNodeNames(es_nodes)))
2006
      for (idx, disk) in enumerate(instance.disks):
2007
        self._ErrorIf(disk.spindles is None,
2008
                      constants.CV_EINSTANCEMISSINGCFGPARAMETER, instance.name,
2009
                      "number of spindles not configured for disk %s while"
2010
                      " exclusive storage is enabled, try running"
2011
                      " gnt-cluster repair-disk-sizes", idx)
2012

    
2013
    if instance.disk_template in constants.DTS_INT_MIRROR:
2014
      instance_nodes = utils.NiceSort(instance.all_nodes)
2015
      instance_groups = {}
2016

    
2017
      for node_uuid in instance_nodes:
2018
        instance_groups.setdefault(self.all_node_info[node_uuid].group,
2019
                                   []).append(node_uuid)
2020

    
2021
      pretty_list = [
2022
        "%s (group %s)" % (utils.CommaJoin(self.cfg.GetNodeNames(nodes)),
2023
                           groupinfo[group].name)
2024
        # Sort so that we always list the primary node first.
2025
        for group, nodes in sorted(instance_groups.items(),
2026
                                   key=lambda (_, nodes): pnode_uuid in nodes,
2027
                                   reverse=True)]
2028

    
2029
      self._ErrorIf(len(instance_groups) > 1,
2030
                    constants.CV_EINSTANCESPLITGROUPS,
2031
                    instance.name, "instance has primary and secondary nodes in"
2032
                    " different groups: %s", utils.CommaJoin(pretty_list),
2033
                    code=self.ETYPE_WARNING)
2034

    
2035
    inst_nodes_offline = []
2036
    for snode in instance.secondary_nodes:
2037
      s_img = node_image[snode]
2038
      self._ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
2039
                    self.cfg.GetNodeName(snode),
2040
                    "instance %s, connection to secondary node failed",
2041
                    instance.name)
2042

    
2043
      if s_img.offline:
2044
        inst_nodes_offline.append(snode)
2045

    
2046
    # warn that the instance lives on offline nodes
2047
    self._ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE,
2048
                  instance.name, "instance has offline secondary node(s) %s",
2049
                  utils.CommaJoin(self.cfg.GetNodeNames(inst_nodes_offline)))
2050
    # ... or ghost/non-vm_capable nodes
2051
    for node_uuid in instance.all_nodes:
2052
      self._ErrorIf(node_image[node_uuid].ghost, constants.CV_EINSTANCEBADNODE,
2053
                    instance.name, "instance lives on ghost node %s",
2054
                    self.cfg.GetNodeName(node_uuid))
2055
      self._ErrorIf(not node_image[node_uuid].vm_capable,
2056
                    constants.CV_EINSTANCEBADNODE, instance.name,
2057
                    "instance lives on non-vm_capable node %s",
2058
                    self.cfg.GetNodeName(node_uuid))
2059

    
2060
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2061
    """Verify if there are any unknown volumes in the cluster.
2062

2063
    The .os, .swap and backup volumes are ignored. All other volumes are
2064
    reported as unknown.
2065

2066
    @type reserved: L{ganeti.utils.FieldSet}
2067
    @param reserved: a FieldSet of reserved volume names
2068

2069
    """
2070
    for node_uuid, n_img in node_image.items():
2071
      if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
2072
          self.all_node_info[node_uuid].group != self.group_uuid):
2073
        # skip non-healthy nodes
2074
        continue
2075
      for volume in n_img.volumes:
2076
        test = ((node_uuid not in node_vol_should or
2077
                volume not in node_vol_should[node_uuid]) and
2078
                not reserved.Matches(volume))
2079
        self._ErrorIf(test, constants.CV_ENODEORPHANLV,
2080
                      self.cfg.GetNodeName(node_uuid),
2081
                      "volume %s is unknown", volume)
2082

    
2083
  def _VerifyNPlusOneMemory(self, node_image, all_insts):
2084
    """Verify N+1 Memory Resilience.
2085

2086
    Check that if one single node dies we can still start all the
2087
    instances it was primary for.
2088

2089
    """
2090
    cluster_info = self.cfg.GetClusterInfo()
2091
    for node_uuid, n_img in node_image.items():
2092
      # This code checks that every node which is now listed as
2093
      # secondary has enough memory to host all instances it is
2094
      # supposed to should a single other node in the cluster fail.
2095
      # FIXME: not ready for failover to an arbitrary node
2096
      # FIXME: does not support file-backed instances
2097
      # WARNING: we currently take into account down instances as well
2098
      # as up ones, considering that even if they're down someone
2099
      # might want to start them even in the event of a node failure.
2100
      if n_img.offline or \
2101
         self.all_node_info[node_uuid].group != self.group_uuid:
2102
        # we're skipping nodes marked offline and nodes in other groups from
2103
        # the N+1 warning, since most likely we don't have good memory
2104
        # infromation from them; we already list instances living on such
2105
        # nodes, and that's enough warning
2106
        continue
2107
      #TODO(dynmem): also consider ballooning out other instances
2108
      for prinode, inst_uuids in n_img.sbp.items():
2109
        needed_mem = 0
2110
        for inst_uuid in inst_uuids:
2111
          bep = cluster_info.FillBE(all_insts[inst_uuid])
2112
          if bep[constants.BE_AUTO_BALANCE]:
2113
            needed_mem += bep[constants.BE_MINMEM]
2114
        test = n_img.mfree < needed_mem
2115
        self._ErrorIf(test, constants.CV_ENODEN1,
2116
                      self.cfg.GetNodeName(node_uuid),
2117
                      "not enough memory to accomodate instance failovers"
2118
                      " should node %s fail (%dMiB needed, %dMiB available)",
2119
                      self.cfg.GetNodeName(prinode), needed_mem, n_img.mfree)
2120

    
2121
  def _VerifyFiles(self, nodes, master_node_uuid, all_nvinfo,
2122
                   (files_all, files_opt, files_mc, files_vm)):
2123
    """Verifies file checksums collected from all nodes.
2124

2125
    @param nodes: List of L{objects.Node} objects
2126
    @param master_node_uuid: UUID of master node
2127
    @param all_nvinfo: RPC results
2128

2129
    """
2130
    # Define functions determining which nodes to consider for a file
2131
    files2nodefn = [
2132
      (files_all, None),
2133
      (files_mc, lambda node: (node.master_candidate or
2134
                               node.uuid == master_node_uuid)),
2135
      (files_vm, lambda node: node.vm_capable),
2136
      ]
2137

    
2138
    # Build mapping from filename to list of nodes which should have the file
2139
    nodefiles = {}
2140
    for (files, fn) in files2nodefn:
2141
      if fn is None:
2142
        filenodes = nodes
2143
      else:
2144
        filenodes = filter(fn, nodes)
2145
      nodefiles.update((filename,
2146
                        frozenset(map(operator.attrgetter("uuid"), filenodes)))
2147
                       for filename in files)
2148

    
2149
    assert set(nodefiles) == (files_all | files_mc | files_vm)
2150

    
2151
    fileinfo = dict((filename, {}) for filename in nodefiles)
2152
    ignore_nodes = set()
2153

    
2154
    for node in nodes:
2155
      if node.offline:
2156
        ignore_nodes.add(node.uuid)
2157
        continue
2158

    
2159
      nresult = all_nvinfo[node.uuid]
2160

    
2161
      if nresult.fail_msg or not nresult.payload:
2162
        node_files = None
2163
      else:
2164
        fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
2165
        node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2166
                          for (key, value) in fingerprints.items())
2167
        del fingerprints
2168

    
2169
      test = not (node_files and isinstance(node_files, dict))
2170
      self._ErrorIf(test, constants.CV_ENODEFILECHECK, node.name,
2171
                    "Node did not return file checksum data")
2172
      if test:
2173
        ignore_nodes.add(node.uuid)
2174
        continue
2175

    
2176
      # Build per-checksum mapping from filename to nodes having it
2177
      for (filename, checksum) in node_files.items():
2178
        assert filename in nodefiles
2179
        fileinfo[filename].setdefault(checksum, set()).add(node.uuid)
2180

    
2181
    for (filename, checksums) in fileinfo.items():
2182
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2183

    
2184
      # Nodes having the file
2185
      with_file = frozenset(node_uuid
2186
                            for node_uuids in fileinfo[filename].values()
2187
                            for node_uuid in node_uuids) - ignore_nodes
2188

    
2189
      expected_nodes = nodefiles[filename] - ignore_nodes
2190

    
2191
      # Nodes missing file
2192
      missing_file = expected_nodes - with_file
2193

    
2194
      if filename in files_opt:
2195
        # All or no nodes
2196
        self._ErrorIf(missing_file and missing_file != expected_nodes,
2197
                      constants.CV_ECLUSTERFILECHECK, None,
2198
                      "File %s is optional, but it must exist on all or no"
2199
                      " nodes (not found on %s)",
2200
                      filename,
2201
                      utils.CommaJoin(
2202
                        utils.NiceSort(
2203
                          map(self.cfg.GetNodeName, missing_file))))
2204
      else:
2205
        self._ErrorIf(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2206
                      "File %s is missing from node(s) %s", filename,
2207
                      utils.CommaJoin(
2208
                        utils.NiceSort(
2209
                          map(self.cfg.GetNodeName, missing_file))))
2210

    
2211
        # Warn if a node has a file it shouldn't
2212
        unexpected = with_file - expected_nodes
2213
        self._ErrorIf(unexpected,
2214
                      constants.CV_ECLUSTERFILECHECK, None,
2215
                      "File %s should not exist on node(s) %s",
2216
                      filename, utils.CommaJoin(
2217
                        utils.NiceSort(map(self.cfg.GetNodeName, unexpected))))
2218

    
2219
      # See if there are multiple versions of the file
2220
      test = len(checksums) > 1
2221
      if test:
2222
        variants = ["variant %s on %s" %
2223
                    (idx + 1,
2224
                     utils.CommaJoin(utils.NiceSort(
2225
                       map(self.cfg.GetNodeName, node_uuids))))
2226
                    for (idx, (checksum, node_uuids)) in
2227
                      enumerate(sorted(checksums.items()))]
2228
      else:
2229
        variants = []
2230

    
2231
      self._ErrorIf(test, constants.CV_ECLUSTERFILECHECK, None,
2232
                    "File %s found with %s different checksums (%s)",
2233
                    filename, len(checksums), "; ".join(variants))
2234

    
2235
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2236
                      drbd_map):
2237
    """Verifies and the node DRBD status.
2238

2239
    @type ninfo: L{objects.Node}
2240
    @param ninfo: the node to check
2241
    @param nresult: the remote results for the node
2242
    @param instanceinfo: the dict of instances
2243
    @param drbd_helper: the configured DRBD usermode helper
2244
    @param drbd_map: the DRBD map as returned by
2245
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2246

2247
    """
2248
    if drbd_helper:
2249
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2250
      test = (helper_result is None)
2251
      self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2252
                    "no drbd usermode helper returned")
2253
      if helper_result:
2254
        status, payload = helper_result
2255
        test = not status
2256
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2257
                      "drbd usermode helper check unsuccessful: %s", payload)
2258
        test = status and (payload != drbd_helper)
2259
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2260
                      "wrong drbd usermode helper: %s", payload)
2261

    
2262
    # compute the DRBD minors
2263
    node_drbd = {}
2264
    for minor, inst_uuid in drbd_map[ninfo.uuid].items():
2265
      test = inst_uuid not in instanceinfo
2266
      self._ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2267
                    "ghost instance '%s' in temporary DRBD map", inst_uuid)
2268
        # ghost instance should not be running, but otherwise we
2269
        # don't give double warnings (both ghost instance and
2270
        # unallocated minor in use)
2271
      if test:
2272
        node_drbd[minor] = (inst_uuid, False)
2273
      else:
2274
        instance = instanceinfo[inst_uuid]
2275
        node_drbd[minor] = (inst_uuid, instance.disks_active)
2276

    
2277
    # and now check them
2278
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2279
    test = not isinstance(used_minors, (tuple, list))
2280
    self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2281
                  "cannot parse drbd status file: %s", str(used_minors))
2282
    if test:
2283
      # we cannot check drbd status
2284
      return
2285

    
2286
    for minor, (inst_uuid, must_exist) in node_drbd.items():
2287
      test = minor not in used_minors and must_exist
2288
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2289
                    "drbd minor %d of instance %s is not active", minor,
2290
                    self.cfg.GetInstanceName(inst_uuid))
2291
    for minor in used_minors:
2292
      test = minor not in node_drbd
2293
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2294
                    "unallocated drbd minor %d is in use", minor)
2295

    
2296
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2297
    """Builds the node OS structures.
2298

2299
    @type ninfo: L{objects.Node}
2300
    @param ninfo: the node to check
2301
    @param nresult: the remote results for the node
2302
    @param nimg: the node image object
2303

2304
    """
2305
    remote_os = nresult.get(constants.NV_OSLIST, None)
2306
    test = (not isinstance(remote_os, list) or
2307
            not compat.all(isinstance(v, list) and len(v) == 7
2308
                           for v in remote_os))
2309

    
2310
    self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2311
                  "node hasn't returned valid OS data")
2312

    
2313
    nimg.os_fail = test
2314

    
2315
    if test:
2316
      return
2317

    
2318
    os_dict = {}
2319

    
2320
    for (name, os_path, status, diagnose,
2321
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2322

    
2323
      if name not in os_dict:
2324
        os_dict[name] = []
2325

    
2326
      # parameters is a list of lists instead of list of tuples due to
2327
      # JSON lacking a real tuple type, fix it:
2328
      parameters = [tuple(v) for v in parameters]
2329
      os_dict[name].append((os_path, status, diagnose,
2330
                            set(variants), set(parameters), set(api_ver)))
2331

    
2332
    nimg.oslist = os_dict
2333

    
2334
  def _VerifyNodeOS(self, ninfo, nimg, base):
2335
    """Verifies the node OS list.
2336

2337
    @type ninfo: L{objects.Node}
2338
    @param ninfo: the node to check
2339
    @param nimg: the node image object
2340
    @param base: the 'template' node we match against (e.g. from the master)
2341

2342
    """
2343
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2344

    
2345
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2346
    for os_name, os_data in nimg.oslist.items():
2347
      assert os_data, "Empty OS status for OS %s?!" % os_name
2348
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2349
      self._ErrorIf(not f_status, constants.CV_ENODEOS, ninfo.name,
2350
                    "Invalid OS %s (located at %s): %s",
2351
                    os_name, f_path, f_diag)
2352
      self._ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, ninfo.name,
2353
                    "OS '%s' has multiple entries"
2354
                    " (first one shadows the rest): %s",
2355
                    os_name, utils.CommaJoin([v[0] for v in os_data]))
2356
      # comparisons with the 'base' image
2357
      test = os_name not in base.oslist
2358
      self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2359
                    "Extra OS %s not present on reference node (%s)",
2360
                    os_name, self.cfg.GetNodeName(base.uuid))
2361
      if test:
2362
        continue
2363
      assert base.oslist[os_name], "Base node has empty OS status?"
2364
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2365
      if not b_status:
2366
        # base OS is invalid, skipping
2367
        continue
2368
      for kind, a, b in [("API version", f_api, b_api),
2369
                         ("variants list", f_var, b_var),
2370
                         ("parameters", beautify_params(f_param),
2371
                          beautify_params(b_param))]:
2372
        self._ErrorIf(a != b, constants.CV_ENODEOS, ninfo.name,
2373
                      "OS %s for %s differs from reference node %s:"
2374
                      " [%s] vs. [%s]", kind, os_name,
2375
                      self.cfg.GetNodeName(base.uuid),
2376
                      utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2377

    
2378
    # check any missing OSes
2379
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2380
    self._ErrorIf(missing, constants.CV_ENODEOS, ninfo.name,
2381
                  "OSes present on reference node %s"
2382
                  " but missing on this node: %s",
2383
                  self.cfg.GetNodeName(base.uuid), utils.CommaJoin(missing))
2384

    
2385
  def _VerifyAcceptedFileStoragePaths(self, ninfo, nresult, is_master):
2386
    """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2387

2388
    @type ninfo: L{objects.Node}
2389
    @param ninfo: the node to check
2390
    @param nresult: the remote results for the node
2391
    @type is_master: bool
2392
    @param is_master: Whether node is the master node
2393

2394
    """
2395
    cluster = self.cfg.GetClusterInfo()
2396
    if (is_master and
2397
        (cluster.IsFileStorageEnabled() or
2398
         cluster.IsSharedFileStorageEnabled())):
2399
      try:
2400
        fspaths = nresult[constants.NV_ACCEPTED_STORAGE_PATHS]
2401
      except KeyError:
2402
        # This should never happen
2403
        self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2404
                      "Node did not return forbidden file storage paths")
2405
      else:
2406
        self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2407
                      "Found forbidden file storage paths: %s",
2408
                      utils.CommaJoin(fspaths))
2409
    else:
2410
      self._ErrorIf(constants.NV_ACCEPTED_STORAGE_PATHS in nresult,
2411
                    constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2412
                    "Node should not have returned forbidden file storage"
2413
                    " paths")
2414

    
2415
  def _VerifyStoragePaths(self, ninfo, nresult):
2416
    """Verifies (file) storage paths.
2417

2418
    @type ninfo: L{objects.Node}
2419
    @param ninfo: the node to check
2420
    @param nresult: the remote results for the node
2421

2422
    """
2423
    cluster = self.cfg.GetClusterInfo()
2424
    if cluster.IsFileStorageEnabled():
2425
      self._ErrorIf(
2426
          constants.NV_FILE_STORAGE_PATH in nresult,
2427
          constants.CV_ENODEFILESTORAGEPATHUNUSABLE, ninfo.name,
2428
          "The configured file storage path is unusable: %s" %
2429
          nresult.get(constants.NV_FILE_STORAGE_PATH))
2430

    
2431
  def _VerifyOob(self, ninfo, nresult):
2432
    """Verifies out of band functionality of a node.
2433

2434
    @type ninfo: L{objects.Node}
2435
    @param ninfo: the node to check
2436
    @param nresult: the remote results for the node
2437

2438
    """
2439
    # We just have to verify the paths on master and/or master candidates
2440
    # as the oob helper is invoked on the master
2441
    if ((ninfo.master_candidate or ninfo.master_capable) and
2442
        constants.NV_OOB_PATHS in nresult):
2443
      for path_result in nresult[constants.NV_OOB_PATHS]:
2444
        self._ErrorIf(path_result, constants.CV_ENODEOOBPATH,
2445
                      ninfo.name, path_result)
2446

    
2447
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2448
    """Verifies and updates the node volume data.
2449

2450
    This function will update a L{NodeImage}'s internal structures
2451
    with data from the remote call.
2452

2453
    @type ninfo: L{objects.Node}
2454
    @param ninfo: the node to check
2455
    @param nresult: the remote results for the node
2456
    @param nimg: the node image object
2457
    @param vg_name: the configured VG name
2458

2459
    """
2460
    nimg.lvm_fail = True
2461
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2462
    if vg_name is None:
2463
      pass
2464
    elif isinstance(lvdata, basestring):
2465
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2466
                    "LVM problem on node: %s", utils.SafeEncode(lvdata))
2467
    elif not isinstance(lvdata, dict):
2468
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2469
                    "rpc call to node failed (lvlist)")
2470
    else:
2471
      nimg.volumes = lvdata
2472
      nimg.lvm_fail = False
2473

    
2474
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2475
    """Verifies and updates the node instance list.
2476

2477
    If the listing was successful, then updates this node's instance
2478
    list. Otherwise, it marks the RPC call as failed for the instance
2479
    list key.
2480

2481
    @type ninfo: L{objects.Node}
2482
    @param ninfo: the node to check
2483
    @param nresult: the remote results for the node
2484
    @param nimg: the node image object
2485

2486
    """
2487
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2488
    test = not isinstance(idata, list)
2489
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2490
                  "rpc call to node failed (instancelist): %s",
2491
                  utils.SafeEncode(str(idata)))
2492
    if test:
2493
      nimg.hyp_fail = True
2494
    else:
2495
      nimg.instances = [inst.uuid for (_, inst) in
2496
                        self.cfg.GetMultiInstanceInfoByName(idata)]
2497

    
2498
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2499
    """Verifies and computes a node information map
2500

2501
    @type ninfo: L{objects.Node}
2502
    @param ninfo: the node to check
2503
    @param nresult: the remote results for the node
2504
    @param nimg: the node image object
2505
    @param vg_name: the configured VG name
2506

2507
    """
2508
    # try to read free memory (from the hypervisor)
2509
    hv_info = nresult.get(constants.NV_HVINFO, None)
2510
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2511
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2512
                  "rpc call to node failed (hvinfo)")
2513
    if not test:
2514
      try:
2515
        nimg.mfree = int(hv_info["memory_free"])
2516
      except (ValueError, TypeError):
2517
        self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2518
                      "node returned invalid nodeinfo, check hypervisor")
2519

    
2520
    # FIXME: devise a free space model for file based instances as well
2521
    if vg_name is not None:
2522
      test = (constants.NV_VGLIST not in nresult or
2523
              vg_name not in nresult[constants.NV_VGLIST])
2524
      self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
2525
                    "node didn't return data for the volume group '%s'"
2526
                    " - it is either missing or broken", vg_name)
2527
      if not test:
2528
        try:
2529
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2530
        except (ValueError, TypeError):
2531
          self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2532
                        "node returned invalid LVM info, check LVM status")
2533

    
2534
  def _CollectDiskInfo(self, node_uuids, node_image, instanceinfo):
2535
    """Gets per-disk status information for all instances.
2536

2537
    @type node_uuids: list of strings
2538
    @param node_uuids: Node UUIDs
2539
    @type node_image: dict of (UUID, L{objects.Node})
2540
    @param node_image: Node objects
2541
    @type instanceinfo: dict of (UUID, L{objects.Instance})
2542
    @param instanceinfo: Instance objects
2543
    @rtype: {instance: {node: [(succes, payload)]}}
2544
    @return: a dictionary of per-instance dictionaries with nodes as
2545
        keys and disk information as values; the disk information is a
2546
        list of tuples (success, payload)
2547

2548
    """
2549
    node_disks = {}
2550
    node_disks_devonly = {}
2551
    diskless_instances = set()
2552
    diskless = constants.DT_DISKLESS
2553

    
2554
    for nuuid in node_uuids:
2555
      node_inst_uuids = list(itertools.chain(node_image[nuuid].pinst,
2556
                                             node_image[nuuid].sinst))
2557
      diskless_instances.update(uuid for uuid in node_inst_uuids
2558
                                if instanceinfo[uuid].disk_template == diskless)
2559
      disks = [(inst_uuid, disk)
2560
               for inst_uuid in node_inst_uuids
2561
               for disk in instanceinfo[inst_uuid].disks]
2562

    
2563
      if not disks:
2564
        # No need to collect data
2565
        continue
2566

    
2567
      node_disks[nuuid] = disks
2568

    
2569
      # _AnnotateDiskParams makes already copies of the disks
2570
      devonly = []
2571
      for (inst_uuid, dev) in disks:
2572
        (anno_disk,) = AnnotateDiskParams(instanceinfo[inst_uuid], [dev],
2573
                                          self.cfg)
2574
        self.cfg.SetDiskID(anno_disk, nuuid)
2575
        devonly.append(anno_disk)
2576

    
2577
      node_disks_devonly[nuuid] = devonly
2578

    
2579
    assert len(node_disks) == len(node_disks_devonly)
2580

    
2581
    # Collect data from all nodes with disks
2582
    result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2583
                                                          node_disks_devonly)
2584

    
2585
    assert len(result) == len(node_disks)
2586

    
2587
    instdisk = {}
2588

    
2589
    for (nuuid, nres) in result.items():
2590
      node = self.cfg.GetNodeInfo(nuuid)
2591
      disks = node_disks[node.uuid]
2592

    
2593
      if nres.offline:
2594
        # No data from this node
2595
        data = len(disks) * [(False, "node offline")]
2596
      else:
2597
        msg = nres.fail_msg
2598
        self._ErrorIf(msg, constants.CV_ENODERPC, node.name,
2599
                      "while getting disk information: %s", msg)
2600
        if msg:
2601
          # No data from this node
2602
          data = len(disks) * [(False, msg)]
2603
        else:
2604
          data = []
2605
          for idx, i in enumerate(nres.payload):
2606
            if isinstance(i, (tuple, list)) and len(i) == 2:
2607
              data.append(i)
2608
            else:
2609
              logging.warning("Invalid result from node %s, entry %d: %s",
2610
                              node.name, idx, i)
2611
              data.append((False, "Invalid result from the remote node"))
2612

    
2613
      for ((inst_uuid, _), status) in zip(disks, data):
2614
        instdisk.setdefault(inst_uuid, {}).setdefault(node.uuid, []) \
2615
          .append(status)
2616

    
2617
    # Add empty entries for diskless instances.
2618
    for inst_uuid in diskless_instances:
2619
      assert inst_uuid not in instdisk
2620
      instdisk[inst_uuid] = {}
2621

    
2622
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2623
                      len(nuuids) <= len(instanceinfo[inst].all_nodes) and
2624
                      compat.all(isinstance(s, (tuple, list)) and
2625
                                 len(s) == 2 for s in statuses)
2626
                      for inst, nuuids in instdisk.items()
2627
                      for nuuid, statuses in nuuids.items())
2628
    if __debug__:
2629
      instdisk_keys = set(instdisk)
2630
      instanceinfo_keys = set(instanceinfo)
2631
      assert instdisk_keys == instanceinfo_keys, \
2632
        ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2633
         (instdisk_keys, instanceinfo_keys))
2634

    
2635
    return instdisk
2636

    
2637
  @staticmethod
2638
  def _SshNodeSelector(group_uuid, all_nodes):
2639
    """Create endless iterators for all potential SSH check hosts.
2640

2641
    """
2642
    nodes = [node for node in all_nodes
2643
             if (node.group != group_uuid and
2644
                 not node.offline)]
2645
    keyfunc = operator.attrgetter("group")
2646

    
2647
    return map(itertools.cycle,
2648
               [sorted(map(operator.attrgetter("name"), names))
2649
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2650
                                                  keyfunc)])
2651

    
2652
  @classmethod
2653
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2654
    """Choose which nodes should talk to which other nodes.
2655

2656
    We will make nodes contact all nodes in their group, and one node from
2657
    every other group.
2658

2659
    @warning: This algorithm has a known issue if one node group is much
2660
      smaller than others (e.g. just one node). In such a case all other
2661
      nodes will talk to the single node.
2662

2663
    """
2664
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2665
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
2666

    
2667
    return (online_nodes,
2668
            dict((name, sorted([i.next() for i in sel]))
2669
                 for name in online_nodes))
2670

    
2671
  def BuildHooksEnv(self):
2672
    """Build hooks env.
2673

2674
    Cluster-Verify hooks just ran in the post phase and their failure makes
2675
    the output be logged in the verify output and the verification to fail.
2676

2677
    """
2678
    env = {
2679
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2680
      }
2681

    
2682
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2683
               for node in self.my_node_info.values())
2684

    
2685
    return env
2686

    
2687
  def BuildHooksNodes(self):
2688
    """Build hooks nodes.
2689

2690
    """
2691
    return ([], list(self.my_node_info.keys()))
2692

    
2693
  def Exec(self, feedback_fn):
2694
    """Verify integrity of the node group, performing various test on nodes.
2695

2696
    """
2697
    # This method has too many local variables. pylint: disable=R0914
2698
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
2699

    
2700
    if not self.my_node_uuids:
2701
      # empty node group
2702
      feedback_fn("* Empty node group, skipping verification")
2703
      return True
2704

    
2705
    self.bad = False
2706
    verbose = self.op.verbose
2707
    self._feedback_fn = feedback_fn
2708

    
2709
    vg_name = self.cfg.GetVGName()
2710
    drbd_helper = self.cfg.GetDRBDHelper()
2711
    cluster = self.cfg.GetClusterInfo()
2712
    hypervisors = cluster.enabled_hypervisors
2713
    node_data_list = self.my_node_info.values()
2714

    
2715
    i_non_redundant = [] # Non redundant instances
2716
    i_non_a_balanced = [] # Non auto-balanced instances
2717
    i_offline = 0 # Count of offline instances
2718
    n_offline = 0 # Count of offline nodes
2719
    n_drained = 0 # Count of nodes being drained
2720
    node_vol_should = {}
2721

    
2722
    # FIXME: verify OS list
2723

    
2724
    # File verification
2725
    filemap = ComputeAncillaryFiles(cluster, False)
2726

    
2727
    # do local checksums
2728
    master_node_uuid = self.master_node = self.cfg.GetMasterNode()
2729
    master_ip = self.cfg.GetMasterIP()
2730

    
2731
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_uuids))
2732

    
2733
    user_scripts = []
2734
    if self.cfg.GetUseExternalMipScript():
2735
      user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2736

    
2737
    node_verify_param = {
2738
      constants.NV_FILELIST:
2739
        map(vcluster.MakeVirtualPath,
2740
            utils.UniqueSequence(filename
2741
                                 for files in filemap
2742
                                 for filename in files)),
2743
      constants.NV_NODELIST:
2744
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2745
                                  self.all_node_info.values()),
2746
      constants.NV_HYPERVISOR: hypervisors,
2747
      constants.NV_HVPARAMS:
2748
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2749
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2750
                                 for node in node_data_list
2751
                                 if not node.offline],
2752
      constants.NV_INSTANCELIST: hypervisors,
2753
      constants.NV_VERSION: None,
2754
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2755
      constants.NV_NODESETUP: None,
2756
      constants.NV_TIME: None,
2757
      constants.NV_MASTERIP: (self.cfg.GetMasterNodeName(), master_ip),
2758
      constants.NV_OSLIST: None,
2759
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2760
      constants.NV_USERSCRIPTS: user_scripts,
2761
      }
2762

    
2763
    if vg_name is not None:
2764
      node_verify_param[constants.NV_VGLIST] = None
2765
      node_verify_param[constants.NV_LVLIST] = vg_name
2766
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2767

    
2768
    if drbd_helper:
2769
      node_verify_param[constants.NV_DRBDVERSION] = None
2770
      node_verify_param[constants.NV_DRBDLIST] = None
2771
      node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2772

    
2773
    if cluster.IsFileStorageEnabled() or \
2774
        cluster.IsSharedFileStorageEnabled():
2775
      # Load file storage paths only from master node
2776
      node_verify_param[constants.NV_ACCEPTED_STORAGE_PATHS] = \
2777
        self.cfg.GetMasterNodeName()
2778
      if cluster.IsFileStorageEnabled():
2779
        node_verify_param[constants.NV_FILE_STORAGE_PATH] = \
2780
          cluster.file_storage_dir
2781

    
2782
    # bridge checks
2783
    # FIXME: this needs to be changed per node-group, not cluster-wide
2784
    bridges = set()
2785
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2786
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2787
      bridges.add(default_nicpp[constants.NIC_LINK])
2788
    for inst_uuid in self.my_inst_info.values():
2789
      for nic in inst_uuid.nics:
2790
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
2791
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2792
          bridges.add(full_nic[constants.NIC_LINK])
2793

    
2794
    if bridges:
2795
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
2796

    
2797
    # Build our expected cluster state
2798
    node_image = dict((node.uuid, self.NodeImage(offline=node.offline,
2799
                                                 uuid=node.uuid,
2800
                                                 vm_capable=node.vm_capable))
2801
                      for node in node_data_list)
2802

    
2803
    # Gather OOB paths
2804
    oob_paths = []
2805
    for node in self.all_node_info.values():
2806
      path = SupportsOob(self.cfg, node)
2807
      if path and path not in oob_paths:
2808
        oob_paths.append(path)
2809

    
2810
    if oob_paths:
2811
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2812

    
2813
    for inst_uuid in self.my_inst_uuids:
2814
      instance = self.my_inst_info[inst_uuid]
2815
      if instance.admin_state == constants.ADMINST_OFFLINE:
2816
        i_offline += 1
2817

    
2818
      for nuuid in instance.all_nodes:
2819
        if nuuid not in node_image:
2820
          gnode = self.NodeImage(uuid=nuuid)
2821
          gnode.ghost = (nuuid not in self.all_node_info)
2822
          node_image[nuuid] = gnode
2823

    
2824
      instance.MapLVsByNode(node_vol_should)
2825

    
2826
      pnode = instance.primary_node
2827
      node_image[pnode].pinst.append(instance.uuid)
2828

    
2829
      for snode in instance.secondary_nodes:
2830
        nimg = node_image[snode]
2831
        nimg.sinst.append(instance.uuid)
2832
        if pnode not in nimg.sbp:
2833
          nimg.sbp[pnode] = []
2834
        nimg.sbp[pnode].append(instance.uuid)
2835

    
2836
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
2837
                                               self.my_node_info.keys())
2838
    # The value of exclusive_storage should be the same across the group, so if
2839
    # it's True for at least a node, we act as if it were set for all the nodes
2840
    self._exclusive_storage = compat.any(es_flags.values())
2841
    if self._exclusive_storage:
2842
      node_verify_param[constants.NV_EXCLUSIVEPVS] = True
2843

    
2844
    # At this point, we have the in-memory data structures complete,
2845
    # except for the runtime information, which we'll gather next
2846

    
2847
    # Due to the way our RPC system works, exact response times cannot be
2848
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2849
    # time before and after executing the request, we can at least have a time
2850
    # window.
2851
    nvinfo_starttime = time.time()
2852
    all_nvinfo = self.rpc.call_node_verify(self.my_node_uuids,
2853
                                           node_verify_param,
2854
                                           self.cfg.GetClusterName(),
2855
                                           self.cfg.GetClusterInfo().hvparams)
2856
    nvinfo_endtime = time.time()
2857

    
2858
    if self.extra_lv_nodes and vg_name is not None:
2859
      extra_lv_nvinfo = \
2860
          self.rpc.call_node_verify(self.extra_lv_nodes,
2861
                                    {constants.NV_LVLIST: vg_name},
2862
                                    self.cfg.GetClusterName(),
2863
                                    self.cfg.GetClusterInfo().hvparams)
2864
    else:
2865
      extra_lv_nvinfo = {}
2866

    
2867
    all_drbd_map = self.cfg.ComputeDRBDMap()
2868

    
2869
    feedback_fn("* Gathering disk information (%s nodes)" %
2870
                len(self.my_node_uuids))
2871
    instdisk = self._CollectDiskInfo(self.my_node_info.keys(), node_image,
2872
                                     self.my_inst_info)
2873

    
2874
    feedback_fn("* Verifying configuration file consistency")
2875

    
2876
    # If not all nodes are being checked, we need to make sure the master node
2877
    # and a non-checked vm_capable node are in the list.
2878
    absent_node_uuids = set(self.all_node_info).difference(self.my_node_info)
2879
    if absent_node_uuids:
2880
      vf_nvinfo = all_nvinfo.copy()
2881
      vf_node_info = list(self.my_node_info.values())
2882
      additional_node_uuids = []
2883
      if master_node_uuid not in self.my_node_info:
2884
        additional_node_uuids.append(master_node_uuid)
2885
        vf_node_info.append(self.all_node_info[master_node_uuid])
2886
      # Add the first vm_capable node we find which is not included,
2887
      # excluding the master node (which we already have)
2888
      for node_uuid in absent_node_uuids:
2889
        nodeinfo = self.all_node_info[node_uuid]
2890
        if (nodeinfo.vm_capable and not nodeinfo.offline and
2891
            node_uuid != master_node_uuid):
2892
          additional_node_uuids.append(node_uuid)
2893
          vf_node_info.append(self.all_node_info[node_uuid])
2894
          break
2895
      key = constants.NV_FILELIST
2896
      vf_nvinfo.update(self.rpc.call_node_verify(
2897
         additional_node_uuids, {key: node_verify_param[key]},
2898
         self.cfg.GetClusterName(), self.cfg.GetClusterInfo().hvparams))
2899
    else:
2900
      vf_nvinfo = all_nvinfo
2901
      vf_node_info = self.my_node_info.values()
2902

    
2903
    self._VerifyFiles(vf_node_info, master_node_uuid, vf_nvinfo, filemap)
2904

    
2905
    feedback_fn("* Verifying node status")
2906

    
2907
    refos_img = None
2908

    
2909
    for node_i in node_data_list:
2910
      nimg = node_image[node_i.uuid]
2911

    
2912
      if node_i.offline:
2913
        if verbose:
2914
          feedback_fn("* Skipping offline node %s" % (node_i.name,))
2915
        n_offline += 1
2916
        continue
2917

    
2918
      if node_i.uuid == master_node_uuid:
2919
        ntype = "master"
2920
      elif node_i.master_candidate:
2921
        ntype = "master candidate"
2922
      elif node_i.drained:
2923
        ntype = "drained"
2924
        n_drained += 1
2925
      else:
2926
        ntype = "regular"
2927
      if verbose:
2928
        feedback_fn("* Verifying node %s (%s)" % (node_i.name, ntype))
2929

    
2930
      msg = all_nvinfo[node_i.uuid].fail_msg
2931
      self._ErrorIf(msg, constants.CV_ENODERPC, node_i.name,
2932
                    "while contacting node: %s", msg)
2933
      if msg:
2934
        nimg.rpc_fail = True
2935
        continue
2936

    
2937
      nresult = all_nvinfo[node_i.uuid].payload
2938

    
2939
      nimg.call_ok = self._VerifyNode(node_i, nresult)
2940
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2941
      self._VerifyNodeNetwork(node_i, nresult)
2942
      self._VerifyNodeUserScripts(node_i, nresult)
2943
      self._VerifyOob(node_i, nresult)
2944
      self._VerifyAcceptedFileStoragePaths(node_i, nresult,
2945
                                           node_i.uuid == master_node_uuid)
2946
      self._VerifyStoragePaths(node_i, nresult)
2947

    
2948
      if nimg.vm_capable:
2949
        self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
2950
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
2951
                             all_drbd_map)
2952

    
2953
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2954
        self._UpdateNodeInstances(node_i, nresult, nimg)
2955
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2956
        self._UpdateNodeOS(node_i, nresult, nimg)
2957

    
2958
        if not nimg.os_fail:
2959
          if refos_img is None:
2960
            refos_img = nimg
2961
          self._VerifyNodeOS(node_i, nimg, refos_img)
2962
        self._VerifyNodeBridges(node_i, nresult, bridges)
2963

    
2964
        # Check whether all running instances are primary for the node. (This
2965
        # can no longer be done from _VerifyInstance below, since some of the
2966
        # wrong instances could be from other node groups.)
2967
        non_primary_inst_uuids = set(nimg.instances).difference(nimg.pinst)
2968

    
2969
        for inst_uuid in non_primary_inst_uuids:
2970
          test = inst_uuid in self.all_inst_info
2971
          self._ErrorIf(test, constants.CV_EINSTANCEWRONGNODE,
2972
                        self.cfg.GetInstanceName(inst_uuid),
2973
                        "instance should not run on node %s", node_i.name)
2974
          self._ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
2975
                        "node is running unknown instance %s", inst_uuid)
2976

    
2977
    self._VerifyGroupDRBDVersion(all_nvinfo)
2978
    self._VerifyGroupLVM(node_image, vg_name)
2979

    
2980
    for node_uuid, result in extra_lv_nvinfo.items():
2981
      self._UpdateNodeVolumes(self.all_node_info[node_uuid], result.payload,
2982
                              node_image[node_uuid], vg_name)
2983

    
2984
    feedback_fn("* Verifying instance status")
2985
    for inst_uuid in self.my_inst_uuids:
2986
      instance = self.my_inst_info[inst_uuid]
2987
      if verbose:
2988
        feedback_fn("* Verifying instance %s" % instance.name)
2989
      self._VerifyInstance(instance, node_image, instdisk[inst_uuid])
2990

    
2991
      # If the instance is non-redundant we cannot survive losing its primary
2992
      # node, so we are not N+1 compliant.
2993
      if instance.disk_template not in constants.DTS_MIRRORED:
2994
        i_non_redundant.append(instance)
2995

    
2996
      if not cluster.FillBE(instance)[constants.BE_AUTO_BALANCE]:
2997
        i_non_a_balanced.append(instance)
2998

    
2999
    feedback_fn("* Verifying orphan volumes")
3000
    reserved = utils.FieldSet(*cluster.reserved_lvs)
3001

    
3002
    # We will get spurious "unknown volume" warnings if any node of this group
3003
    # is secondary for an instance whose primary is in another group. To avoid
3004
    # them, we find these instances and add their volumes to node_vol_should.
3005
    for instance in self.all_inst_info.values():
3006
      for secondary in instance.secondary_nodes:
3007
        if (secondary in self.my_node_info
3008
            and instance.name not in self.my_inst_info):
3009
          instance.MapLVsByNode(node_vol_should)
3010
          break
3011

    
3012
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
3013

    
3014
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
3015
      feedback_fn("* Verifying N+1 Memory redundancy")
3016
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
3017

    
3018
    feedback_fn("* Other Notes")
3019
    if i_non_redundant:
3020
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
3021
                  % len(i_non_redundant))
3022

    
3023
    if i_non_a_balanced:
3024
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
3025
                  % len(i_non_a_balanced))
3026

    
3027
    if i_offline:
3028
      feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
3029

    
3030
    if n_offline:
3031
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
3032

    
3033
    if n_drained:
3034
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
3035

    
3036
    return not self.bad
3037

    
3038
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
3039
    """Analyze the post-hooks' result
3040

3041
    This method analyses the hook result, handles it, and sends some
3042
    nicely-formatted feedback back to the user.
3043

3044
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
3045
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
3046
    @param hooks_results: the results of the multi-node hooks rpc call
3047
    @param feedback_fn: function used send feedback back to the caller
3048
    @param lu_result: previous Exec result
3049
    @return: the new Exec result, based on the previous result
3050
        and hook results
3051

3052
    """
3053
    # We only really run POST phase hooks, only for non-empty groups,
3054
    # and are only interested in their results
3055
    if not self.my_node_uuids:
3056
      # empty node group
3057
      pass
3058
    elif phase == constants.HOOKS_PHASE_POST:
3059
      # Used to change hooks' output to proper indentation
3060
      feedback_fn("* Hooks Results")
3061
      assert hooks_results, "invalid result from hooks"
3062

    
3063
      for node_name in hooks_results:
3064
        res = hooks_results[node_name]
3065
        msg = res.fail_msg
3066
        test = msg and not res.offline
3067
        self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3068
                      "Communication failure in hooks execution: %s", msg)
3069
        if res.offline or msg:
3070
          # No need to investigate payload if node is offline or gave
3071
          # an error.
3072
          continue
3073
        for script, hkr, output in res.payload:
3074
          test = hkr == constants.HKR_FAIL
3075
          self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3076
                        "Script %s failed, output:", script)
3077
          if test:
3078
            output = self._HOOKS_INDENT_RE.sub("      ", output)
3079
            feedback_fn("%s" % output)
3080
            lu_result = False
3081

    
3082
    return lu_result
3083

    
3084

    
3085
class LUClusterVerifyDisks(NoHooksLU):
3086
  """Verifies the cluster disks status.
3087

3088
  """
3089
  REQ_BGL = False
3090

    
3091
  def ExpandNames(self):
3092
    self.share_locks = ShareAll()
3093
    self.needed_locks = {
3094
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
3095
      }
3096

    
3097
  def Exec(self, feedback_fn):
3098
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3099

    
3100
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3101
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3102
                           for group in group_names])