Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / cluster.py @ 3039e2dc

History | View | Annotate | Download (114.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with the cluster."""
23

    
24
import OpenSSL
25

    
26
import copy
27
import itertools
28
import logging
29
import operator
30
import os
31
import re
32
import time
33

    
34
from ganeti import compat
35
from ganeti import constants
36
from ganeti import errors
37
from ganeti import hypervisor
38
from ganeti import locking
39
from ganeti import masterd
40
from ganeti import netutils
41
from ganeti import objects
42
from ganeti import opcodes
43
from ganeti import pathutils
44
from ganeti import query
45
from ganeti import rpc
46
from ganeti import runtime
47
from ganeti import ssh
48
from ganeti import uidpool
49
from ganeti import utils
50
from ganeti import vcluster
51

    
52
from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
53
  ResultWithJobs
54
from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55
  ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56
  GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57
  GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58
  CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59
  ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob
60

    
61
import ganeti.masterd.instance
62

    
63

    
64
class LUClusterActivateMasterIp(NoHooksLU):
65
  """Activate the master IP on the master node.
66

67
  """
68
  def Exec(self, feedback_fn):
69
    """Activate the master IP.
70

71
    """
72
    master_params = self.cfg.GetMasterNetworkParameters()
73
    ems = self.cfg.GetUseExternalMipScript()
74
    result = self.rpc.call_node_activate_master_ip(master_params.uuid,
75
                                                   master_params, ems)
76
    result.Raise("Could not activate the master IP")
77

    
78

    
79
class LUClusterDeactivateMasterIp(NoHooksLU):
80
  """Deactivate the master IP on the master node.
81

82
  """
83
  def Exec(self, feedback_fn):
84
    """Deactivate the master IP.
85

86
    """
87
    master_params = self.cfg.GetMasterNetworkParameters()
88
    ems = self.cfg.GetUseExternalMipScript()
89
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
90
                                                     master_params, ems)
91
    result.Raise("Could not deactivate the master IP")
92

    
93

    
94
class LUClusterConfigQuery(NoHooksLU):
95
  """Return configuration values.
96

97
  """
98
  REQ_BGL = False
99

    
100
  def CheckArguments(self):
101
    self.cq = ClusterQuery(None, self.op.output_fields, False)
102

    
103
  def ExpandNames(self):
104
    self.cq.ExpandNames(self)
105

    
106
  def DeclareLocks(self, level):
107
    self.cq.DeclareLocks(self, level)
108

    
109
  def Exec(self, feedback_fn):
110
    result = self.cq.OldStyleQuery(self)
111

    
112
    assert len(result) == 1
113

    
114
    return result[0]
115

    
116

    
117
class LUClusterDestroy(LogicalUnit):
118
  """Logical unit for destroying the cluster.
119

120
  """
121
  HPATH = "cluster-destroy"
122
  HTYPE = constants.HTYPE_CLUSTER
123

    
124
  def BuildHooksEnv(self):
125
    """Build hooks env.
126

127
    """
128
    return {
129
      "OP_TARGET": self.cfg.GetClusterName(),
130
      }
131

    
132
  def BuildHooksNodes(self):
133
    """Build hooks nodes.
134

135
    """
136
    return ([], [])
137

    
138
  def CheckPrereq(self):
139
    """Check prerequisites.
140

141
    This checks whether the cluster is empty.
142

143
    Any errors are signaled by raising errors.OpPrereqError.
144

145
    """
146
    master = self.cfg.GetMasterNode()
147

    
148
    nodelist = self.cfg.GetNodeList()
149
    if len(nodelist) != 1 or nodelist[0] != master:
150
      raise errors.OpPrereqError("There are still %d node(s) in"
151
                                 " this cluster." % (len(nodelist) - 1),
152
                                 errors.ECODE_INVAL)
153
    instancelist = self.cfg.GetInstanceList()
154
    if instancelist:
155
      raise errors.OpPrereqError("There are still %d instance(s) in"
156
                                 " this cluster." % len(instancelist),
157
                                 errors.ECODE_INVAL)
158

    
159
  def Exec(self, feedback_fn):
160
    """Destroys the cluster.
161

162
    """
163
    master_params = self.cfg.GetMasterNetworkParameters()
164

    
165
    # Run post hooks on master node before it's removed
166
    RunPostHook(self, self.cfg.GetNodeName(master_params.uuid))
167

    
168
    ems = self.cfg.GetUseExternalMipScript()
169
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
170
                                                     master_params, ems)
171
    result.Warn("Error disabling the master IP address", self.LogWarning)
172
    return master_params.uuid
173

    
174

    
175
class LUClusterPostInit(LogicalUnit):
176
  """Logical unit for running hooks after cluster initialization.
177

178
  """
179
  HPATH = "cluster-init"
180
  HTYPE = constants.HTYPE_CLUSTER
181

    
182
  def BuildHooksEnv(self):
183
    """Build hooks env.
184

185
    """
186
    return {
187
      "OP_TARGET": self.cfg.GetClusterName(),
188
      }
189

    
190
  def BuildHooksNodes(self):
191
    """Build hooks nodes.
192

193
    """
194
    return ([], [self.cfg.GetMasterNode()])
195

    
196
  def Exec(self, feedback_fn):
197
    """Nothing to do.
198

199
    """
200
    return True
201

    
202

    
203
class ClusterQuery(QueryBase):
204
  FIELDS = query.CLUSTER_FIELDS
205

    
206
  #: Do not sort (there is only one item)
207
  SORT_FIELD = None
208

    
209
  def ExpandNames(self, lu):
210
    lu.needed_locks = {}
211

    
212
    # The following variables interact with _QueryBase._GetNames
213
    self.wanted = locking.ALL_SET
214
    self.do_locking = self.use_locking
215

    
216
    if self.do_locking:
217
      raise errors.OpPrereqError("Can not use locking for cluster queries",
218
                                 errors.ECODE_INVAL)
219

    
220
  def DeclareLocks(self, lu, level):
221
    pass
222

    
223
  def _GetQueryData(self, lu):
224
    """Computes the list of nodes and their attributes.
225

226
    """
227
    # Locking is not used
228
    assert not (compat.any(lu.glm.is_owned(level)
229
                           for level in locking.LEVELS
230
                           if level != locking.LEVEL_CLUSTER) or
231
                self.do_locking or self.use_locking)
232

    
233
    if query.CQ_CONFIG in self.requested_data:
234
      cluster = lu.cfg.GetClusterInfo()
235
      nodes = lu.cfg.GetAllNodesInfo()
236
    else:
237
      cluster = NotImplemented
238
      nodes = NotImplemented
239

    
240
    if query.CQ_QUEUE_DRAINED in self.requested_data:
241
      drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
242
    else:
243
      drain_flag = NotImplemented
244

    
245
    if query.CQ_WATCHER_PAUSE in self.requested_data:
246
      master_node_uuid = lu.cfg.GetMasterNode()
247

    
248
      result = lu.rpc.call_get_watcher_pause(master_node_uuid)
249
      result.Raise("Can't retrieve watcher pause from master node '%s'" %
250
                   lu.cfg.GetMasterNodeName())
251

    
252
      watcher_pause = result.payload
253
    else:
254
      watcher_pause = NotImplemented
255

    
256
    return query.ClusterQueryData(cluster, nodes, drain_flag, watcher_pause)
257

    
258

    
259
class LUClusterQuery(NoHooksLU):
260
  """Query cluster configuration.
261

262
  """
263
  REQ_BGL = False
264

    
265
  def ExpandNames(self):
266
    self.needed_locks = {}
267

    
268
  def Exec(self, feedback_fn):
269
    """Return cluster config.
270

271
    """
272
    cluster = self.cfg.GetClusterInfo()
273
    os_hvp = {}
274

    
275
    # Filter just for enabled hypervisors
276
    for os_name, hv_dict in cluster.os_hvp.items():
277
      os_hvp[os_name] = {}
278
      for hv_name, hv_params in hv_dict.items():
279
        if hv_name in cluster.enabled_hypervisors:
280
          os_hvp[os_name][hv_name] = hv_params
281

    
282
    # Convert ip_family to ip_version
283
    primary_ip_version = constants.IP4_VERSION
284
    if cluster.primary_ip_family == netutils.IP6Address.family:
285
      primary_ip_version = constants.IP6_VERSION
286

    
287
    result = {
288
      "software_version": constants.RELEASE_VERSION,
289
      "protocol_version": constants.PROTOCOL_VERSION,
290
      "config_version": constants.CONFIG_VERSION,
291
      "os_api_version": max(constants.OS_API_VERSIONS),
292
      "export_version": constants.EXPORT_VERSION,
293
      "architecture": runtime.GetArchInfo(),
294
      "name": cluster.cluster_name,
295
      "master": self.cfg.GetMasterNodeName(),
296
      "default_hypervisor": cluster.primary_hypervisor,
297
      "enabled_hypervisors": cluster.enabled_hypervisors,
298
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
299
                        for hypervisor_name in cluster.enabled_hypervisors]),
300
      "os_hvp": os_hvp,
301
      "beparams": cluster.beparams,
302
      "osparams": cluster.osparams,
303
      "ipolicy": cluster.ipolicy,
304
      "nicparams": cluster.nicparams,
305
      "ndparams": cluster.ndparams,
306
      "diskparams": cluster.diskparams,
307
      "candidate_pool_size": cluster.candidate_pool_size,
308
      "master_netdev": cluster.master_netdev,
309
      "master_netmask": cluster.master_netmask,
310
      "use_external_mip_script": cluster.use_external_mip_script,
311
      "volume_group_name": cluster.volume_group_name,
312
      "drbd_usermode_helper": cluster.drbd_usermode_helper,
313
      "file_storage_dir": cluster.file_storage_dir,
314
      "shared_file_storage_dir": cluster.shared_file_storage_dir,
315
      "maintain_node_health": cluster.maintain_node_health,
316
      "ctime": cluster.ctime,
317
      "mtime": cluster.mtime,
318
      "uuid": cluster.uuid,
319
      "tags": list(cluster.GetTags()),
320
      "uid_pool": cluster.uid_pool,
321
      "default_iallocator": cluster.default_iallocator,
322
      "reserved_lvs": cluster.reserved_lvs,
323
      "primary_ip_version": primary_ip_version,
324
      "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
325
      "hidden_os": cluster.hidden_os,
326
      "blacklisted_os": cluster.blacklisted_os,
327
      "enabled_disk_templates": cluster.enabled_disk_templates,
328
      }
329

    
330
    return result
331

    
332

    
333
class LUClusterRedistConf(NoHooksLU):
334
  """Force the redistribution of cluster configuration.
335

336
  This is a very simple LU.
337

338
  """
339
  REQ_BGL = False
340

    
341
  def ExpandNames(self):
342
    self.needed_locks = {
343
      locking.LEVEL_NODE: locking.ALL_SET,
344
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
345
    }
346
    self.share_locks = ShareAll()
347

    
348
  def Exec(self, feedback_fn):
349
    """Redistribute the configuration.
350

351
    """
352
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
353
    RedistributeAncillaryFiles(self)
354

    
355

    
356
class LUClusterRename(LogicalUnit):
357
  """Rename the cluster.
358

359
  """
360
  HPATH = "cluster-rename"
361
  HTYPE = constants.HTYPE_CLUSTER
362

    
363
  def BuildHooksEnv(self):
364
    """Build hooks env.
365

366
    """
367
    return {
368
      "OP_TARGET": self.cfg.GetClusterName(),
369
      "NEW_NAME": self.op.name,
370
      }
371

    
372
  def BuildHooksNodes(self):
373
    """Build hooks nodes.
374

375
    """
376
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
377

    
378
  def CheckPrereq(self):
379
    """Verify that the passed name is a valid one.
380

381
    """
382
    hostname = netutils.GetHostname(name=self.op.name,
383
                                    family=self.cfg.GetPrimaryIPFamily())
384

    
385
    new_name = hostname.name
386
    self.ip = new_ip = hostname.ip
387
    old_name = self.cfg.GetClusterName()
388
    old_ip = self.cfg.GetMasterIP()
389
    if new_name == old_name and new_ip == old_ip:
390
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
391
                                 " cluster has changed",
392
                                 errors.ECODE_INVAL)
393
    if new_ip != old_ip:
394
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
395
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
396
                                   " reachable on the network" %
397
                                   new_ip, errors.ECODE_NOTUNIQUE)
398

    
399
    self.op.name = new_name
400

    
401
  def Exec(self, feedback_fn):
402
    """Rename the cluster.
403

404
    """
405
    clustername = self.op.name
406
    new_ip = self.ip
407

    
408
    # shutdown the master IP
409
    master_params = self.cfg.GetMasterNetworkParameters()
410
    ems = self.cfg.GetUseExternalMipScript()
411
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
412
                                                     master_params, ems)
413
    result.Raise("Could not disable the master role")
414

    
415
    try:
416
      cluster = self.cfg.GetClusterInfo()
417
      cluster.cluster_name = clustername
418
      cluster.master_ip = new_ip
419
      self.cfg.Update(cluster, feedback_fn)
420

    
421
      # update the known hosts file
422
      ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
423
      node_list = self.cfg.GetOnlineNodeList()
424
      try:
425
        node_list.remove(master_params.uuid)
426
      except ValueError:
427
        pass
428
      UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
429
    finally:
430
      master_params.ip = new_ip
431
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
432
                                                     master_params, ems)
433
      result.Warn("Could not re-enable the master role on the master,"
434
                  " please restart manually", self.LogWarning)
435

    
436
    return clustername
437

    
438

    
439
class LUClusterRepairDiskSizes(NoHooksLU):
440
  """Verifies the cluster disks sizes.
441

442
  """
443
  REQ_BGL = False
444

    
445
  def ExpandNames(self):
446
    if self.op.instances:
447
      (_, self.wanted_names) = GetWantedInstances(self, self.op.instances)
448
      # Not getting the node allocation lock as only a specific set of
449
      # instances (and their nodes) is going to be acquired
450
      self.needed_locks = {
451
        locking.LEVEL_NODE_RES: [],
452
        locking.LEVEL_INSTANCE: self.wanted_names,
453
        }
454
      self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
455
    else:
456
      self.wanted_names = None
457
      self.needed_locks = {
458
        locking.LEVEL_NODE_RES: locking.ALL_SET,
459
        locking.LEVEL_INSTANCE: locking.ALL_SET,
460

    
461
        # This opcode is acquires the node locks for all instances
462
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
463
        }
464

    
465
    self.share_locks = {
466
      locking.LEVEL_NODE_RES: 1,
467
      locking.LEVEL_INSTANCE: 0,
468
      locking.LEVEL_NODE_ALLOC: 1,
469
      }
470

    
471
  def DeclareLocks(self, level):
472
    if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
473
      self._LockInstancesNodes(primary_only=True, level=level)
474

    
475
  def CheckPrereq(self):
476
    """Check prerequisites.
477

478
    This only checks the optional instance list against the existing names.
479

480
    """
481
    if self.wanted_names is None:
482
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
483

    
484
    self.wanted_instances = \
485
        map(compat.snd, self.cfg.GetMultiInstanceInfoByName(self.wanted_names))
486

    
487
  def _EnsureChildSizes(self, disk):
488
    """Ensure children of the disk have the needed disk size.
489

490
    This is valid mainly for DRBD8 and fixes an issue where the
491
    children have smaller disk size.
492

493
    @param disk: an L{ganeti.objects.Disk} object
494

495
    """
496
    if disk.dev_type == constants.LD_DRBD8:
497
      assert disk.children, "Empty children for DRBD8?"
498
      fchild = disk.children[0]
499
      mismatch = fchild.size < disk.size
500
      if mismatch:
501
        self.LogInfo("Child disk has size %d, parent %d, fixing",
502
                     fchild.size, disk.size)
503
        fchild.size = disk.size
504

    
505
      # and we recurse on this child only, not on the metadev
506
      return self._EnsureChildSizes(fchild) or mismatch
507
    else:
508
      return False
509

    
510
  def Exec(self, feedback_fn):
511
    """Verify the size of cluster disks.
512

513
    """
514
    # TODO: check child disks too
515
    # TODO: check differences in size between primary/secondary nodes
516
    per_node_disks = {}
517
    for instance in self.wanted_instances:
518
      pnode = instance.primary_node
519
      if pnode not in per_node_disks:
520
        per_node_disks[pnode] = []
521
      for idx, disk in enumerate(instance.disks):
522
        per_node_disks[pnode].append((instance, idx, disk))
523

    
524
    assert not (frozenset(per_node_disks.keys()) -
525
                self.owned_locks(locking.LEVEL_NODE_RES)), \
526
      "Not owning correct locks"
527
    assert not self.owned_locks(locking.LEVEL_NODE)
528

    
529
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
530
                                               per_node_disks.keys())
531

    
532
    changed = []
533
    for node_uuid, dskl in per_node_disks.items():
534
      newl = [v[2].Copy() for v in dskl]
535
      for dsk in newl:
536
        self.cfg.SetDiskID(dsk, node_uuid)
537
      node_name = self.cfg.GetNodeName(node_uuid)
538
      result = self.rpc.call_blockdev_getdimensions(node_uuid, newl)
539
      if result.fail_msg:
540
        self.LogWarning("Failure in blockdev_getdimensions call to node"
541
                        " %s, ignoring", node_name)
542
        continue
543
      if len(result.payload) != len(dskl):
544
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
545
                        " result.payload=%s", node_name, len(dskl),
546
                        result.payload)
547
        self.LogWarning("Invalid result from node %s, ignoring node results",
548
                        node_name)
549
        continue
550
      for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
551
        if dimensions is None:
552
          self.LogWarning("Disk %d of instance %s did not return size"
553
                          " information, ignoring", idx, instance.name)
554
          continue
555
        if not isinstance(dimensions, (tuple, list)):
556
          self.LogWarning("Disk %d of instance %s did not return valid"
557
                          " dimension information, ignoring", idx,
558
                          instance.name)
559
          continue
560
        (size, spindles) = dimensions
561
        if not isinstance(size, (int, long)):
562
          self.LogWarning("Disk %d of instance %s did not return valid"
563
                          " size information, ignoring", idx, instance.name)
564
          continue
565
        size = size >> 20
566
        if size != disk.size:
567
          self.LogInfo("Disk %d of instance %s has mismatched size,"
568
                       " correcting: recorded %d, actual %d", idx,
569
                       instance.name, disk.size, size)
570
          disk.size = size
571
          self.cfg.Update(instance, feedback_fn)
572
          changed.append((instance.name, idx, "size", size))
573
        if es_flags[node_uuid]:
574
          if spindles is None:
575
            self.LogWarning("Disk %d of instance %s did not return valid"
576
                            " spindles information, ignoring", idx,
577
                            instance.name)
578
          elif disk.spindles is None or disk.spindles != spindles:
579
            self.LogInfo("Disk %d of instance %s has mismatched spindles,"
580
                         " correcting: recorded %s, actual %s",
581
                         idx, instance.name, disk.spindles, spindles)
582
            disk.spindles = spindles
583
            self.cfg.Update(instance, feedback_fn)
584
            changed.append((instance.name, idx, "spindles", disk.spindles))
585
        if self._EnsureChildSizes(disk):
586
          self.cfg.Update(instance, feedback_fn)
587
          changed.append((instance.name, idx, "size", disk.size))
588
    return changed
589

    
590

    
591
def _ValidateNetmask(cfg, netmask):
592
  """Checks if a netmask is valid.
593

594
  @type cfg: L{config.ConfigWriter}
595
  @param cfg: The cluster configuration
596
  @type netmask: int
597
  @param netmask: the netmask to be verified
598
  @raise errors.OpPrereqError: if the validation fails
599

600
  """
601
  ip_family = cfg.GetPrimaryIPFamily()
602
  try:
603
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
604
  except errors.ProgrammerError:
605
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
606
                               ip_family, errors.ECODE_INVAL)
607
  if not ipcls.ValidateNetmask(netmask):
608
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
609
                               (netmask), errors.ECODE_INVAL)
610

    
611

    
612
def CheckFileStoragePathVsEnabledDiskTemplates(
613
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
614
  """Checks whether the given file storage directory is acceptable.
615

616
  @type logging_warn_fn: function
617
  @param logging_warn_fn: function which accepts a string and logs it
618
  @type file_storage_dir: string
619
  @param file_storage_dir: the directory to be used for file-based instances
620
  @type enabled_disk_templates: list of string
621
  @param enabled_disk_templates: the list of enabled disk templates
622

623
  Note: This function is public, because it is also used in bootstrap.py.
624
  """
625
  file_storage_enabled = constants.DT_FILE in enabled_disk_templates
626
  if file_storage_dir is not None:
627
    if file_storage_dir == "":
628
      if file_storage_enabled:
629
        raise errors.OpPrereqError("Unsetting the file storage directory"
630
                                   " while having file storage enabled"
631
                                   " is not permitted.")
632
    else:
633
      if not file_storage_enabled:
634
        logging_warn_fn("Specified a file storage directory, although file"
635
                        " storage is not enabled.")
636
  else:
637
    raise errors.ProgrammerError("Received file storage dir with value"
638
                                 " 'None'.")
639

    
640

    
641
class LUClusterSetParams(LogicalUnit):
642
  """Change the parameters of the cluster.
643

644
  """
645
  HPATH = "cluster-modify"
646
  HTYPE = constants.HTYPE_CLUSTER
647
  REQ_BGL = False
648

    
649
  def CheckArguments(self):
650
    """Check parameters
651

652
    """
653
    if self.op.uid_pool:
654
      uidpool.CheckUidPool(self.op.uid_pool)
655

    
656
    if self.op.add_uids:
657
      uidpool.CheckUidPool(self.op.add_uids)
658

    
659
    if self.op.remove_uids:
660
      uidpool.CheckUidPool(self.op.remove_uids)
661

    
662
    if self.op.master_netmask is not None:
663
      _ValidateNetmask(self.cfg, self.op.master_netmask)
664

    
665
    if self.op.diskparams:
666
      for dt_params in self.op.diskparams.values():
667
        utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
668
      try:
669
        utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
670
      except errors.OpPrereqError, err:
671
        raise errors.OpPrereqError("While verify diskparams options: %s" % err,
672
                                   errors.ECODE_INVAL)
673

    
674
  def ExpandNames(self):
675
    # FIXME: in the future maybe other cluster params won't require checking on
676
    # all nodes to be modified.
677
    # FIXME: This opcode changes cluster-wide settings. Is acquiring all
678
    # resource locks the right thing, shouldn't it be the BGL instead?
679
    self.needed_locks = {
680
      locking.LEVEL_NODE: locking.ALL_SET,
681
      locking.LEVEL_INSTANCE: locking.ALL_SET,
682
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
683
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
684
    }
685
    self.share_locks = ShareAll()
686

    
687
  def BuildHooksEnv(self):
688
    """Build hooks env.
689

690
    """
691
    return {
692
      "OP_TARGET": self.cfg.GetClusterName(),
693
      "NEW_VG_NAME": self.op.vg_name,
694
      }
695

    
696
  def BuildHooksNodes(self):
697
    """Build hooks nodes.
698

699
    """
700
    mn = self.cfg.GetMasterNode()
701
    return ([mn], [mn])
702

    
703
  def _CheckVgName(self, node_uuids, enabled_disk_templates,
704
                   new_enabled_disk_templates):
705
    """Check the consistency of the vg name on all nodes and in case it gets
706
       unset whether there are instances still using it.
707

708
    """
709
    if self.op.vg_name is not None and not self.op.vg_name:
710
      if self.cfg.HasAnyDiskOfType(constants.LD_LV):
711
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
712
                                   " instances exist", errors.ECODE_INVAL)
713

    
714
    if (self.op.vg_name is not None and
715
        utils.IsLvmEnabled(enabled_disk_templates)) or \
716
           (self.cfg.GetVGName() is not None and
717
            utils.LvmGetsEnabled(enabled_disk_templates,
718
                                 new_enabled_disk_templates)):
719
      self._CheckVgNameOnNodes(node_uuids)
720

    
721
  def _CheckVgNameOnNodes(self, node_uuids):
722
    """Check the status of the volume group on each node.
723

724
    """
725
    vglist = self.rpc.call_vg_list(node_uuids)
726
    for node_uuid in node_uuids:
727
      msg = vglist[node_uuid].fail_msg
728
      if msg:
729
        # ignoring down node
730
        self.LogWarning("Error while gathering data on node %s"
731
                        " (ignoring node): %s",
732
                        self.cfg.GetNodeName(node_uuid), msg)
733
        continue
734
      vgstatus = utils.CheckVolumeGroupSize(vglist[node_uuid].payload,
735
                                            self.op.vg_name,
736
                                            constants.MIN_VG_SIZE)
737
      if vgstatus:
738
        raise errors.OpPrereqError("Error on node '%s': %s" %
739
                                   (self.cfg.GetNodeName(node_uuid), vgstatus),
740
                                   errors.ECODE_ENVIRON)
741

    
742
  def _GetEnabledDiskTemplates(self, cluster):
743
    """Determines the enabled disk templates and the subset of disk templates
744
       that are newly enabled by this operation.
745

746
    """
747
    enabled_disk_templates = None
748
    new_enabled_disk_templates = []
749
    if self.op.enabled_disk_templates:
750
      enabled_disk_templates = self.op.enabled_disk_templates
751
      new_enabled_disk_templates = \
752
        list(set(enabled_disk_templates)
753
             - set(cluster.enabled_disk_templates))
754
    else:
755
      enabled_disk_templates = cluster.enabled_disk_templates
756
    return (enabled_disk_templates, new_enabled_disk_templates)
757

    
758
  def CheckPrereq(self):
759
    """Check prerequisites.
760

761
    This checks whether the given params don't conflict and
762
    if the given volume group is valid.
763

764
    """
765
    if self.op.drbd_helper is not None and not self.op.drbd_helper:
766
      if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
767
        raise errors.OpPrereqError("Cannot disable drbd helper while"
768
                                   " drbd-based instances exist",
769
                                   errors.ECODE_INVAL)
770

    
771
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
772
    self.cluster = cluster = self.cfg.GetClusterInfo()
773

    
774
    vm_capable_node_uuids = [node.uuid
775
                             for node in self.cfg.GetAllNodesInfo().values()
776
                             if node.uuid in node_uuids and node.vm_capable]
777

    
778
    (enabled_disk_templates, new_enabled_disk_templates) = \
779
      self._GetEnabledDiskTemplates(cluster)
780

    
781
    self._CheckVgName(vm_capable_node_uuids, enabled_disk_templates,
782
                      new_enabled_disk_templates)
783

    
784
    if self.op.file_storage_dir is not None:
785
      CheckFileStoragePathVsEnabledDiskTemplates(
786
          self.LogWarning, self.op.file_storage_dir, enabled_disk_templates)
787

    
788
    if self.op.drbd_helper:
789
      # checks given drbd helper on all nodes
790
      helpers = self.rpc.call_drbd_helper(node_uuids)
791
      for (_, ninfo) in self.cfg.GetMultiNodeInfo(node_uuids):
792
        if ninfo.offline:
793
          self.LogInfo("Not checking drbd helper on offline node %s",
794
                       ninfo.name)
795
          continue
796
        msg = helpers[ninfo.uuid].fail_msg
797
        if msg:
798
          raise errors.OpPrereqError("Error checking drbd helper on node"
799
                                     " '%s': %s" % (ninfo.name, msg),
800
                                     errors.ECODE_ENVIRON)
801
        node_helper = helpers[ninfo.uuid].payload
802
        if node_helper != self.op.drbd_helper:
803
          raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
804
                                     (ninfo.name, node_helper),
805
                                     errors.ECODE_ENVIRON)
806

    
807
    # validate params changes
808
    if self.op.beparams:
809
      objects.UpgradeBeParams(self.op.beparams)
810
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
811
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
812

    
813
    if self.op.ndparams:
814
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
815
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
816

    
817
      # TODO: we need a more general way to handle resetting
818
      # cluster-level parameters to default values
819
      if self.new_ndparams["oob_program"] == "":
820
        self.new_ndparams["oob_program"] = \
821
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
822

    
823
    if self.op.hv_state:
824
      new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
825
                                           self.cluster.hv_state_static)
826
      self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
827
                               for hv, values in new_hv_state.items())
828

    
829
    if self.op.disk_state:
830
      new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
831
                                               self.cluster.disk_state_static)
832
      self.new_disk_state = \
833
        dict((storage, dict((name, cluster.SimpleFillDiskState(values))
834
                            for name, values in svalues.items()))
835
             for storage, svalues in new_disk_state.items())
836

    
837
    if self.op.ipolicy:
838
      self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
839
                                           group_policy=False)
840

    
841
      all_instances = self.cfg.GetAllInstancesInfo().values()
842
      violations = set()
843
      for group in self.cfg.GetAllNodeGroupsInfo().values():
844
        instances = frozenset([inst for inst in all_instances
845
                               if compat.any(nuuid in group.members
846
                                             for nuuid in inst.all_nodes)])
847
        new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
848
        ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
849
        new = ComputeNewInstanceViolations(ipol, new_ipolicy, instances,
850
                                           self.cfg)
851
        if new:
852
          violations.update(new)
853

    
854
      if violations:
855
        self.LogWarning("After the ipolicy change the following instances"
856
                        " violate them: %s",
857
                        utils.CommaJoin(utils.NiceSort(violations)))
858

    
859
    if self.op.nicparams:
860
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
861
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
862
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
863
      nic_errors = []
864

    
865
      # check all instances for consistency
866
      for instance in self.cfg.GetAllInstancesInfo().values():
867
        for nic_idx, nic in enumerate(instance.nics):
868
          params_copy = copy.deepcopy(nic.nicparams)
869
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
870

    
871
          # check parameter syntax
872
          try:
873
            objects.NIC.CheckParameterSyntax(params_filled)
874
          except errors.ConfigurationError, err:
875
            nic_errors.append("Instance %s, nic/%d: %s" %
876
                              (instance.name, nic_idx, err))
877

    
878
          # if we're moving instances to routed, check that they have an ip
879
          target_mode = params_filled[constants.NIC_MODE]
880
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
881
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
882
                              " address" % (instance.name, nic_idx))
883
      if nic_errors:
884
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
885
                                   "\n".join(nic_errors), errors.ECODE_INVAL)
886

    
887
    # hypervisor list/parameters
888
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
889
    if self.op.hvparams:
890
      for hv_name, hv_dict in self.op.hvparams.items():
891
        if hv_name not in self.new_hvparams:
892
          self.new_hvparams[hv_name] = hv_dict
893
        else:
894
          self.new_hvparams[hv_name].update(hv_dict)
895

    
896
    # disk template parameters
897
    self.new_diskparams = objects.FillDict(cluster.diskparams, {})
898
    if self.op.diskparams:
899
      for dt_name, dt_params in self.op.diskparams.items():
900
        if dt_name not in self.op.diskparams:
901
          self.new_diskparams[dt_name] = dt_params
902
        else:
903
          self.new_diskparams[dt_name].update(dt_params)
904

    
905
    # os hypervisor parameters
906
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
907
    if self.op.os_hvp:
908
      for os_name, hvs in self.op.os_hvp.items():
909
        if os_name not in self.new_os_hvp:
910
          self.new_os_hvp[os_name] = hvs
911
        else:
912
          for hv_name, hv_dict in hvs.items():
913
            if hv_dict is None:
914
              # Delete if it exists
915
              self.new_os_hvp[os_name].pop(hv_name, None)
916
            elif hv_name not in self.new_os_hvp[os_name]:
917
              self.new_os_hvp[os_name][hv_name] = hv_dict
918
            else:
919
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
920

    
921
    # os parameters
922
    self.new_osp = objects.FillDict(cluster.osparams, {})
923
    if self.op.osparams:
924
      for os_name, osp in self.op.osparams.items():
925
        if os_name not in self.new_osp:
926
          self.new_osp[os_name] = {}
927

    
928
        self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
929
                                                 use_none=True)
930

    
931
        if not self.new_osp[os_name]:
932
          # we removed all parameters
933
          del self.new_osp[os_name]
934
        else:
935
          # check the parameter validity (remote check)
936
          CheckOSParams(self, False, [self.cfg.GetMasterNode()],
937
                        os_name, self.new_osp[os_name])
938

    
939
    # changes to the hypervisor list
940
    if self.op.enabled_hypervisors is not None:
941
      self.hv_list = self.op.enabled_hypervisors
942
      for hv in self.hv_list:
943
        # if the hypervisor doesn't already exist in the cluster
944
        # hvparams, we initialize it to empty, and then (in both
945
        # cases) we make sure to fill the defaults, as we might not
946
        # have a complete defaults list if the hypervisor wasn't
947
        # enabled before
948
        if hv not in new_hvp:
949
          new_hvp[hv] = {}
950
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
951
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
952
    else:
953
      self.hv_list = cluster.enabled_hypervisors
954

    
955
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
956
      # either the enabled list has changed, or the parameters have, validate
957
      for hv_name, hv_params in self.new_hvparams.items():
958
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
959
            (self.op.enabled_hypervisors and
960
             hv_name in self.op.enabled_hypervisors)):
961
          # either this is a new hypervisor, or its parameters have changed
962
          hv_class = hypervisor.GetHypervisorClass(hv_name)
963
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
964
          hv_class.CheckParameterSyntax(hv_params)
965
          CheckHVParams(self, node_uuids, hv_name, hv_params)
966

    
967
    self._CheckDiskTemplateConsistency()
968

    
969
    if self.op.os_hvp:
970
      # no need to check any newly-enabled hypervisors, since the
971
      # defaults have already been checked in the above code-block
972
      for os_name, os_hvp in self.new_os_hvp.items():
973
        for hv_name, hv_params in os_hvp.items():
974
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
975
          # we need to fill in the new os_hvp on top of the actual hv_p
976
          cluster_defaults = self.new_hvparams.get(hv_name, {})
977
          new_osp = objects.FillDict(cluster_defaults, hv_params)
978
          hv_class = hypervisor.GetHypervisorClass(hv_name)
979
          hv_class.CheckParameterSyntax(new_osp)
980
          CheckHVParams(self, node_uuids, hv_name, new_osp)
981

    
982
    if self.op.default_iallocator:
983
      alloc_script = utils.FindFile(self.op.default_iallocator,
984
                                    constants.IALLOCATOR_SEARCH_PATH,
985
                                    os.path.isfile)
986
      if alloc_script is None:
987
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
988
                                   " specified" % self.op.default_iallocator,
989
                                   errors.ECODE_INVAL)
990

    
991
  def _CheckDiskTemplateConsistency(self):
992
    """Check whether the disk templates that are going to be disabled
993
       are still in use by some instances.
994

995
    """
996
    if self.op.enabled_disk_templates:
997
      cluster = self.cfg.GetClusterInfo()
998
      instances = self.cfg.GetAllInstancesInfo()
999

    
1000
      disk_templates_to_remove = set(cluster.enabled_disk_templates) \
1001
        - set(self.op.enabled_disk_templates)
1002
      for instance in instances.itervalues():
1003
        if instance.disk_template in disk_templates_to_remove:
1004
          raise errors.OpPrereqError("Cannot disable disk template '%s',"
1005
                                     " because instance '%s' is using it." %
1006
                                     (instance.disk_template, instance.name))
1007

    
1008
  def _SetVgName(self, feedback_fn):
1009
    """Determines and sets the new volume group name.
1010

1011
    """
1012
    if self.op.vg_name is not None:
1013
      if self.op.vg_name and not \
1014
           utils.IsLvmEnabled(self.cluster.enabled_disk_templates):
1015
        feedback_fn("Note that you specified a volume group, but did not"
1016
                    " enable any lvm disk template.")
1017
      new_volume = self.op.vg_name
1018
      if not new_volume:
1019
        if utils.IsLvmEnabled(self.cluster.enabled_disk_templates):
1020
          raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
1021
                                     " disk templates are enabled.")
1022
        new_volume = None
1023
      if new_volume != self.cfg.GetVGName():
1024
        self.cfg.SetVGName(new_volume)
1025
      else:
1026
        feedback_fn("Cluster LVM configuration already in desired"
1027
                    " state, not changing")
1028
    else:
1029
      if utils.IsLvmEnabled(self.cluster.enabled_disk_templates) and \
1030
          not self.cfg.GetVGName():
1031
        raise errors.OpPrereqError("Please specify a volume group when"
1032
                                   " enabling lvm-based disk-templates.")
1033

    
1034
  def _SetFileStorageDir(self, feedback_fn):
1035
    """Set the file storage directory.
1036

1037
    """
1038
    if self.op.file_storage_dir is not None:
1039
      if self.cluster.file_storage_dir == self.op.file_storage_dir:
1040
        feedback_fn("Global file storage dir already set to value '%s'"
1041
                    % self.cluster.file_storage_dir)
1042
      else:
1043
        self.cluster.file_storage_dir = self.op.file_storage_dir
1044

    
1045
  def Exec(self, feedback_fn):
1046
    """Change the parameters of the cluster.
1047

1048
    """
1049
    if self.op.enabled_disk_templates:
1050
      self.cluster.enabled_disk_templates = \
1051
        list(set(self.op.enabled_disk_templates))
1052

    
1053
    self._SetVgName(feedback_fn)
1054
    self._SetFileStorageDir(feedback_fn)
1055

    
1056
    if self.op.drbd_helper is not None:
1057
      if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
1058
        feedback_fn("Note that you specified a drbd user helper, but did"
1059
                    " enabled the drbd disk template.")
1060
      new_helper = self.op.drbd_helper
1061
      if not new_helper:
1062
        new_helper = None
1063
      if new_helper != self.cfg.GetDRBDHelper():
1064
        self.cfg.SetDRBDHelper(new_helper)
1065
      else:
1066
        feedback_fn("Cluster DRBD helper already in desired state,"
1067
                    " not changing")
1068
    if self.op.hvparams:
1069
      self.cluster.hvparams = self.new_hvparams
1070
    if self.op.os_hvp:
1071
      self.cluster.os_hvp = self.new_os_hvp
1072
    if self.op.enabled_hypervisors is not None:
1073
      self.cluster.hvparams = self.new_hvparams
1074
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1075
    if self.op.beparams:
1076
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1077
    if self.op.nicparams:
1078
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1079
    if self.op.ipolicy:
1080
      self.cluster.ipolicy = self.new_ipolicy
1081
    if self.op.osparams:
1082
      self.cluster.osparams = self.new_osp
1083
    if self.op.ndparams:
1084
      self.cluster.ndparams = self.new_ndparams
1085
    if self.op.diskparams:
1086
      self.cluster.diskparams = self.new_diskparams
1087
    if self.op.hv_state:
1088
      self.cluster.hv_state_static = self.new_hv_state
1089
    if self.op.disk_state:
1090
      self.cluster.disk_state_static = self.new_disk_state
1091

    
1092
    if self.op.candidate_pool_size is not None:
1093
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1094
      # we need to update the pool size here, otherwise the save will fail
1095
      AdjustCandidatePool(self, [])
1096

    
1097
    if self.op.maintain_node_health is not None:
1098
      if self.op.maintain_node_health and not constants.ENABLE_CONFD:
1099
        feedback_fn("Note: CONFD was disabled at build time, node health"
1100
                    " maintenance is not useful (still enabling it)")
1101
      self.cluster.maintain_node_health = self.op.maintain_node_health
1102

    
1103
    if self.op.modify_etc_hosts is not None:
1104
      self.cluster.modify_etc_hosts = self.op.modify_etc_hosts
1105

    
1106
    if self.op.prealloc_wipe_disks is not None:
1107
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
1108

    
1109
    if self.op.add_uids is not None:
1110
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
1111

    
1112
    if self.op.remove_uids is not None:
1113
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
1114

    
1115
    if self.op.uid_pool is not None:
1116
      self.cluster.uid_pool = self.op.uid_pool
1117

    
1118
    if self.op.default_iallocator is not None:
1119
      self.cluster.default_iallocator = self.op.default_iallocator
1120

    
1121
    if self.op.reserved_lvs is not None:
1122
      self.cluster.reserved_lvs = self.op.reserved_lvs
1123

    
1124
    if self.op.use_external_mip_script is not None:
1125
      self.cluster.use_external_mip_script = self.op.use_external_mip_script
1126

    
1127
    def helper_os(aname, mods, desc):
1128
      desc += " OS list"
1129
      lst = getattr(self.cluster, aname)
1130
      for key, val in mods:
1131
        if key == constants.DDM_ADD:
1132
          if val in lst:
1133
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1134
          else:
1135
            lst.append(val)
1136
        elif key == constants.DDM_REMOVE:
1137
          if val in lst:
1138
            lst.remove(val)
1139
          else:
1140
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1141
        else:
1142
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
1143

    
1144
    if self.op.hidden_os:
1145
      helper_os("hidden_os", self.op.hidden_os, "hidden")
1146

    
1147
    if self.op.blacklisted_os:
1148
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1149

    
1150
    if self.op.master_netdev:
1151
      master_params = self.cfg.GetMasterNetworkParameters()
1152
      ems = self.cfg.GetUseExternalMipScript()
1153
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
1154
                  self.cluster.master_netdev)
1155
      result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
1156
                                                       master_params, ems)
1157
      if not self.op.force:
1158
        result.Raise("Could not disable the master ip")
1159
      else:
1160
        if result.fail_msg:
1161
          msg = ("Could not disable the master ip (continuing anyway): %s" %
1162
                 result.fail_msg)
1163
          feedback_fn(msg)
1164
      feedback_fn("Changing master_netdev from %s to %s" %
1165
                  (master_params.netdev, self.op.master_netdev))
1166
      self.cluster.master_netdev = self.op.master_netdev
1167

    
1168
    if self.op.master_netmask:
1169
      master_params = self.cfg.GetMasterNetworkParameters()
1170
      feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1171
      result = self.rpc.call_node_change_master_netmask(
1172
                 master_params.uuid, master_params.netmask,
1173
                 self.op.master_netmask, master_params.ip,
1174
                 master_params.netdev)
1175
      result.Warn("Could not change the master IP netmask", feedback_fn)
1176
      self.cluster.master_netmask = self.op.master_netmask
1177

    
1178
    self.cfg.Update(self.cluster, feedback_fn)
1179

    
1180
    if self.op.master_netdev:
1181
      master_params = self.cfg.GetMasterNetworkParameters()
1182
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
1183
                  self.op.master_netdev)
1184
      ems = self.cfg.GetUseExternalMipScript()
1185
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
1186
                                                     master_params, ems)
1187
      result.Warn("Could not re-enable the master ip on the master,"
1188
                  " please restart manually", self.LogWarning)
1189

    
1190

    
1191
class LUClusterVerify(NoHooksLU):
1192
  """Submits all jobs necessary to verify the cluster.
1193

1194
  """
1195
  REQ_BGL = False
1196

    
1197
  def ExpandNames(self):
1198
    self.needed_locks = {}
1199

    
1200
  def Exec(self, feedback_fn):
1201
    jobs = []
1202

    
1203
    if self.op.group_name:
1204
      groups = [self.op.group_name]
1205
      depends_fn = lambda: None
1206
    else:
1207
      groups = self.cfg.GetNodeGroupList()
1208

    
1209
      # Verify global configuration
1210
      jobs.append([
1211
        opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1212
        ])
1213

    
1214
      # Always depend on global verification
1215
      depends_fn = lambda: [(-len(jobs), [])]
1216

    
1217
    jobs.extend(
1218
      [opcodes.OpClusterVerifyGroup(group_name=group,
1219
                                    ignore_errors=self.op.ignore_errors,
1220
                                    depends=depends_fn())]
1221
      for group in groups)
1222

    
1223
    # Fix up all parameters
1224
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1225
      op.debug_simulate_errors = self.op.debug_simulate_errors
1226
      op.verbose = self.op.verbose
1227
      op.error_codes = self.op.error_codes
1228
      try:
1229
        op.skip_checks = self.op.skip_checks
1230
      except AttributeError:
1231
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1232

    
1233
    return ResultWithJobs(jobs)
1234

    
1235

    
1236
class _VerifyErrors(object):
1237
  """Mix-in for cluster/group verify LUs.
1238

1239
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1240
  self.op and self._feedback_fn to be available.)
1241

1242
  """
1243

    
1244
  ETYPE_FIELD = "code"
1245
  ETYPE_ERROR = "ERROR"
1246
  ETYPE_WARNING = "WARNING"
1247

    
1248
  def _Error(self, ecode, item, msg, *args, **kwargs):
1249
    """Format an error message.
1250

1251
    Based on the opcode's error_codes parameter, either format a
1252
    parseable error code, or a simpler error string.
1253

1254
    This must be called only from Exec and functions called from Exec.
1255

1256
    """
1257
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1258
    itype, etxt, _ = ecode
1259
    # If the error code is in the list of ignored errors, demote the error to a
1260
    # warning
1261
    if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1262
      ltype = self.ETYPE_WARNING
1263
    # first complete the msg
1264
    if args:
1265
      msg = msg % args
1266
    # then format the whole message
1267
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1268
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1269
    else:
1270
      if item:
1271
        item = " " + item
1272
      else:
1273
        item = ""
1274
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1275
    # and finally report it via the feedback_fn
1276
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1277
    # do not mark the operation as failed for WARN cases only
1278
    if ltype == self.ETYPE_ERROR:
1279
      self.bad = True
1280

    
1281
  def _ErrorIf(self, cond, *args, **kwargs):
1282
    """Log an error message if the passed condition is True.
1283

1284
    """
1285
    if (bool(cond)
1286
        or self.op.debug_simulate_errors): # pylint: disable=E1101
1287
      self._Error(*args, **kwargs)
1288

    
1289

    
1290
def _VerifyCertificate(filename):
1291
  """Verifies a certificate for L{LUClusterVerifyConfig}.
1292

1293
  @type filename: string
1294
  @param filename: Path to PEM file
1295

1296
  """
1297
  try:
1298
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1299
                                           utils.ReadFile(filename))
1300
  except Exception, err: # pylint: disable=W0703
1301
    return (LUClusterVerifyConfig.ETYPE_ERROR,
1302
            "Failed to load X509 certificate %s: %s" % (filename, err))
1303

    
1304
  (errcode, msg) = \
1305
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1306
                                constants.SSL_CERT_EXPIRATION_ERROR)
1307

    
1308
  if msg:
1309
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1310
  else:
1311
    fnamemsg = None
1312

    
1313
  if errcode is None:
1314
    return (None, fnamemsg)
1315
  elif errcode == utils.CERT_WARNING:
1316
    return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1317
  elif errcode == utils.CERT_ERROR:
1318
    return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1319

    
1320
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1321

    
1322

    
1323
def _GetAllHypervisorParameters(cluster, instances):
1324
  """Compute the set of all hypervisor parameters.
1325

1326
  @type cluster: L{objects.Cluster}
1327
  @param cluster: the cluster object
1328
  @param instances: list of L{objects.Instance}
1329
  @param instances: additional instances from which to obtain parameters
1330
  @rtype: list of (origin, hypervisor, parameters)
1331
  @return: a list with all parameters found, indicating the hypervisor they
1332
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1333

1334
  """
1335
  hvp_data = []
1336

    
1337
  for hv_name in cluster.enabled_hypervisors:
1338
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1339

    
1340
  for os_name, os_hvp in cluster.os_hvp.items():
1341
    for hv_name, hv_params in os_hvp.items():
1342
      if hv_params:
1343
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1344
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1345

    
1346
  # TODO: collapse identical parameter values in a single one
1347
  for instance in instances:
1348
    if instance.hvparams:
1349
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1350
                       cluster.FillHV(instance)))
1351

    
1352
  return hvp_data
1353

    
1354

    
1355
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1356
  """Verifies the cluster config.
1357

1358
  """
1359
  REQ_BGL = False
1360

    
1361
  def _VerifyHVP(self, hvp_data):
1362
    """Verifies locally the syntax of the hypervisor parameters.
1363

1364
    """
1365
    for item, hv_name, hv_params in hvp_data:
1366
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1367
             (item, hv_name))
1368
      try:
1369
        hv_class = hypervisor.GetHypervisorClass(hv_name)
1370
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1371
        hv_class.CheckParameterSyntax(hv_params)
1372
      except errors.GenericError, err:
1373
        self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1374

    
1375
  def ExpandNames(self):
1376
    self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1377
    self.share_locks = ShareAll()
1378

    
1379
  def CheckPrereq(self):
1380
    """Check prerequisites.
1381

1382
    """
1383
    # Retrieve all information
1384
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1385
    self.all_node_info = self.cfg.GetAllNodesInfo()
1386
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1387

    
1388
  def Exec(self, feedback_fn):
1389
    """Verify integrity of cluster, performing various test on nodes.
1390

1391
    """
1392
    self.bad = False
1393
    self._feedback_fn = feedback_fn
1394

    
1395
    feedback_fn("* Verifying cluster config")
1396

    
1397
    for msg in self.cfg.VerifyConfig():
1398
      self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1399

    
1400
    feedback_fn("* Verifying cluster certificate files")
1401

    
1402
    for cert_filename in pathutils.ALL_CERT_FILES:
1403
      (errcode, msg) = _VerifyCertificate(cert_filename)
1404
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1405

    
1406
    self._ErrorIf(not utils.CanRead(constants.CONFD_USER,
1407
                                    pathutils.NODED_CERT_FILE),
1408
                  constants.CV_ECLUSTERCERT,
1409
                  None,
1410
                  pathutils.NODED_CERT_FILE + " must be accessible by the " +
1411
                    constants.CONFD_USER + " user")
1412

    
1413
    feedback_fn("* Verifying hypervisor parameters")
1414

    
1415
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1416
                                                self.all_inst_info.values()))
1417

    
1418
    feedback_fn("* Verifying all nodes belong to an existing group")
1419

    
1420
    # We do this verification here because, should this bogus circumstance
1421
    # occur, it would never be caught by VerifyGroup, which only acts on
1422
    # nodes/instances reachable from existing node groups.
1423

    
1424
    dangling_nodes = set(node for node in self.all_node_info.values()
1425
                         if node.group not in self.all_group_info)
1426

    
1427
    dangling_instances = {}
1428
    no_node_instances = []
1429

    
1430
    for inst in self.all_inst_info.values():
1431
      if inst.primary_node in [node.uuid for node in dangling_nodes]:
1432
        dangling_instances.setdefault(inst.primary_node, []).append(inst)
1433
      elif inst.primary_node not in self.all_node_info:
1434
        no_node_instances.append(inst)
1435

    
1436
    pretty_dangling = [
1437
        "%s (%s)" %
1438
        (node.name,
1439
         utils.CommaJoin(
1440
           self.cfg.GetInstanceNames(
1441
             dangling_instances.get(node.uuid, ["no instances"]))))
1442
        for node in dangling_nodes]
1443

    
1444
    self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1445
                  None,
1446
                  "the following nodes (and their instances) belong to a non"
1447
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1448

    
1449
    self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1450
                  None,
1451
                  "the following instances have a non-existing primary-node:"
1452
                  " %s", utils.CommaJoin(
1453
                           self.cfg.GetInstanceNames(no_node_instances)))
1454

    
1455
    return not self.bad
1456

    
1457

    
1458
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1459
  """Verifies the status of a node group.
1460

1461
  """
1462
  HPATH = "cluster-verify"
1463
  HTYPE = constants.HTYPE_CLUSTER
1464
  REQ_BGL = False
1465

    
1466
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1467

    
1468
  class NodeImage(object):
1469
    """A class representing the logical and physical status of a node.
1470

1471
    @type uuid: string
1472
    @ivar uuid: the node UUID to which this object refers
1473
    @ivar volumes: a structure as returned from
1474
        L{ganeti.backend.GetVolumeList} (runtime)
1475
    @ivar instances: a list of running instances (runtime)
1476
    @ivar pinst: list of configured primary instances (config)
1477
    @ivar sinst: list of configured secondary instances (config)
1478
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1479
        instances for which this node is secondary (config)
1480
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1481
    @ivar dfree: free disk, as reported by the node (runtime)
1482
    @ivar offline: the offline status (config)
1483
    @type rpc_fail: boolean
1484
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1485
        not whether the individual keys were correct) (runtime)
1486
    @type lvm_fail: boolean
1487
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1488
    @type hyp_fail: boolean
1489
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1490
    @type ghost: boolean
1491
    @ivar ghost: whether this is a known node or not (config)
1492
    @type os_fail: boolean
1493
    @ivar os_fail: whether the RPC call didn't return valid OS data
1494
    @type oslist: list
1495
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1496
    @type vm_capable: boolean
1497
    @ivar vm_capable: whether the node can host instances
1498
    @type pv_min: float
1499
    @ivar pv_min: size in MiB of the smallest PVs
1500
    @type pv_max: float
1501
    @ivar pv_max: size in MiB of the biggest PVs
1502

1503
    """
1504
    def __init__(self, offline=False, uuid=None, vm_capable=True):
1505
      self.uuid = uuid
1506
      self.volumes = {}
1507
      self.instances = []
1508
      self.pinst = []
1509
      self.sinst = []
1510
      self.sbp = {}
1511
      self.mfree = 0
1512
      self.dfree = 0
1513
      self.offline = offline
1514
      self.vm_capable = vm_capable
1515
      self.rpc_fail = False
1516
      self.lvm_fail = False
1517
      self.hyp_fail = False
1518
      self.ghost = False
1519
      self.os_fail = False
1520
      self.oslist = {}
1521
      self.pv_min = None
1522
      self.pv_max = None
1523

    
1524
  def ExpandNames(self):
1525
    # This raises errors.OpPrereqError on its own:
1526
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1527

    
1528
    # Get instances in node group; this is unsafe and needs verification later
1529
    inst_uuids = \
1530
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1531

    
1532
    self.needed_locks = {
1533
      locking.LEVEL_INSTANCE: self.cfg.GetInstanceNames(inst_uuids),
1534
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1535
      locking.LEVEL_NODE: [],
1536

    
1537
      # This opcode is run by watcher every five minutes and acquires all nodes
1538
      # for a group. It doesn't run for a long time, so it's better to acquire
1539
      # the node allocation lock as well.
1540
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1541
      }
1542

    
1543
    self.share_locks = ShareAll()
1544

    
1545
  def DeclareLocks(self, level):
1546
    if level == locking.LEVEL_NODE:
1547
      # Get members of node group; this is unsafe and needs verification later
1548
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1549

    
1550
      # In Exec(), we warn about mirrored instances that have primary and
1551
      # secondary living in separate node groups. To fully verify that
1552
      # volumes for these instances are healthy, we will need to do an
1553
      # extra call to their secondaries. We ensure here those nodes will
1554
      # be locked.
1555
      for inst_name in self.owned_locks(locking.LEVEL_INSTANCE):
1556
        # Important: access only the instances whose lock is owned
1557
        instance = self.cfg.GetInstanceInfoByName(inst_name)
1558
        if instance.disk_template in constants.DTS_INT_MIRROR:
1559
          nodes.update(instance.secondary_nodes)
1560

    
1561
      self.needed_locks[locking.LEVEL_NODE] = nodes
1562

    
1563
  def CheckPrereq(self):
1564
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1565
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1566

    
1567
    group_node_uuids = set(self.group_info.members)
1568
    group_inst_uuids = \
1569
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1570

    
1571
    unlocked_node_uuids = \
1572
        group_node_uuids.difference(self.owned_locks(locking.LEVEL_NODE))
1573

    
1574
    unlocked_inst_uuids = \
1575
        group_inst_uuids.difference(
1576
          [self.cfg.GetInstanceInfoByName(name).uuid
1577
           for name in self.owned_locks(locking.LEVEL_INSTANCE)])
1578

    
1579
    if unlocked_node_uuids:
1580
      raise errors.OpPrereqError(
1581
        "Missing lock for nodes: %s" %
1582
        utils.CommaJoin(self.cfg.GetNodeNames(unlocked_node_uuids)),
1583
        errors.ECODE_STATE)
1584

    
1585
    if unlocked_inst_uuids:
1586
      raise errors.OpPrereqError(
1587
        "Missing lock for instances: %s" %
1588
        utils.CommaJoin(self.cfg.GetInstanceNames(unlocked_inst_uuids)),
1589
        errors.ECODE_STATE)
1590

    
1591
    self.all_node_info = self.cfg.GetAllNodesInfo()
1592
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1593

    
1594
    self.my_node_uuids = group_node_uuids
1595
    self.my_node_info = dict((node_uuid, self.all_node_info[node_uuid])
1596
                             for node_uuid in group_node_uuids)
1597

    
1598
    self.my_inst_uuids = group_inst_uuids
1599
    self.my_inst_info = dict((inst_uuid, self.all_inst_info[inst_uuid])
1600
                             for inst_uuid in group_inst_uuids)
1601

    
1602
    # We detect here the nodes that will need the extra RPC calls for verifying
1603
    # split LV volumes; they should be locked.
1604
    extra_lv_nodes = set()
1605

    
1606
    for inst in self.my_inst_info.values():
1607
      if inst.disk_template in constants.DTS_INT_MIRROR:
1608
        for nuuid in inst.all_nodes:
1609
          if self.all_node_info[nuuid].group != self.group_uuid:
1610
            extra_lv_nodes.add(nuuid)
1611

    
1612
    unlocked_lv_nodes = \
1613
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1614

    
1615
    if unlocked_lv_nodes:
1616
      raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1617
                                 utils.CommaJoin(unlocked_lv_nodes),
1618
                                 errors.ECODE_STATE)
1619
    self.extra_lv_nodes = list(extra_lv_nodes)
1620

    
1621
  def _VerifyNode(self, ninfo, nresult):
1622
    """Perform some basic validation on data returned from a node.
1623

1624
      - check the result data structure is well formed and has all the
1625
        mandatory fields
1626
      - check ganeti version
1627

1628
    @type ninfo: L{objects.Node}
1629
    @param ninfo: the node to check
1630
    @param nresult: the results from the node
1631
    @rtype: boolean
1632
    @return: whether overall this call was successful (and we can expect
1633
         reasonable values in the respose)
1634

1635
    """
1636
    # main result, nresult should be a non-empty dict
1637
    test = not nresult or not isinstance(nresult, dict)
1638
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1639
                  "unable to verify node: no data returned")
1640
    if test:
1641
      return False
1642

    
1643
    # compares ganeti version
1644
    local_version = constants.PROTOCOL_VERSION
1645
    remote_version = nresult.get("version", None)
1646
    test = not (remote_version and
1647
                isinstance(remote_version, (list, tuple)) and
1648
                len(remote_version) == 2)
1649
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1650
                  "connection to node returned invalid data")
1651
    if test:
1652
      return False
1653

    
1654
    test = local_version != remote_version[0]
1655
    self._ErrorIf(test, constants.CV_ENODEVERSION, ninfo.name,
1656
                  "incompatible protocol versions: master %s,"
1657
                  " node %s", local_version, remote_version[0])
1658
    if test:
1659
      return False
1660

    
1661
    # node seems compatible, we can actually try to look into its results
1662

    
1663
    # full package version
1664
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1665
                  constants.CV_ENODEVERSION, ninfo.name,
1666
                  "software version mismatch: master %s, node %s",
1667
                  constants.RELEASE_VERSION, remote_version[1],
1668
                  code=self.ETYPE_WARNING)
1669

    
1670
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1671
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1672
      for hv_name, hv_result in hyp_result.iteritems():
1673
        test = hv_result is not None
1674
        self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
1675
                      "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1676

    
1677
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1678
    if ninfo.vm_capable and isinstance(hvp_result, list):
1679
      for item, hv_name, hv_result in hvp_result:
1680
        self._ErrorIf(True, constants.CV_ENODEHV, ninfo.name,
1681
                      "hypervisor %s parameter verify failure (source %s): %s",
1682
                      hv_name, item, hv_result)
1683

    
1684
    test = nresult.get(constants.NV_NODESETUP,
1685
                       ["Missing NODESETUP results"])
1686
    self._ErrorIf(test, constants.CV_ENODESETUP, ninfo.name,
1687
                  "node setup error: %s", "; ".join(test))
1688

    
1689
    return True
1690

    
1691
  def _VerifyNodeTime(self, ninfo, nresult,
1692
                      nvinfo_starttime, nvinfo_endtime):
1693
    """Check the node time.
1694

1695
    @type ninfo: L{objects.Node}
1696
    @param ninfo: the node to check
1697
    @param nresult: the remote results for the node
1698
    @param nvinfo_starttime: the start time of the RPC call
1699
    @param nvinfo_endtime: the end time of the RPC call
1700

1701
    """
1702
    ntime = nresult.get(constants.NV_TIME, None)
1703
    try:
1704
      ntime_merged = utils.MergeTime(ntime)
1705
    except (ValueError, TypeError):
1706
      self._ErrorIf(True, constants.CV_ENODETIME, ninfo.name,
1707
                    "Node returned invalid time")
1708
      return
1709

    
1710
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1711
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1712
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1713
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1714
    else:
1715
      ntime_diff = None
1716

    
1717
    self._ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, ninfo.name,
1718
                  "Node time diverges by at least %s from master node time",
1719
                  ntime_diff)
1720

    
1721
  def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1722
    """Check the node LVM results and update info for cross-node checks.
1723

1724
    @type ninfo: L{objects.Node}
1725
    @param ninfo: the node to check
1726
    @param nresult: the remote results for the node
1727
    @param vg_name: the configured VG name
1728
    @type nimg: L{NodeImage}
1729
    @param nimg: node image
1730

1731
    """
1732
    if vg_name is None:
1733
      return
1734

    
1735
    # checks vg existence and size > 20G
1736
    vglist = nresult.get(constants.NV_VGLIST, None)
1737
    test = not vglist
1738
    self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
1739
                  "unable to check volume groups")
1740
    if not test:
1741
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1742
                                            constants.MIN_VG_SIZE)
1743
      self._ErrorIf(vgstatus, constants.CV_ENODELVM, ninfo.name, vgstatus)
1744

    
1745
    # Check PVs
1746
    (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1747
    for em in errmsgs:
1748
      self._Error(constants.CV_ENODELVM, ninfo.name, em)
1749
    if pvminmax is not None:
1750
      (nimg.pv_min, nimg.pv_max) = pvminmax
1751

    
1752
  def _VerifyGroupDRBDVersion(self, node_verify_infos):
1753
    """Check cross-node DRBD version consistency.
1754

1755
    @type node_verify_infos: dict
1756
    @param node_verify_infos: infos about nodes as returned from the
1757
      node_verify call.
1758

1759
    """
1760
    node_versions = {}
1761
    for node_uuid, ndata in node_verify_infos.items():
1762
      nresult = ndata.payload
1763
      version = nresult.get(constants.NV_DRBDVERSION, "Missing DRBD version")
1764
      node_versions[node_uuid] = version
1765

    
1766
    if len(set(node_versions.values())) > 1:
1767
      for node_uuid, version in sorted(node_versions.items()):
1768
        msg = "DRBD version mismatch: %s" % version
1769
        self._Error(constants.CV_ENODEDRBDHELPER, node_uuid, msg,
1770
                    code=self.ETYPE_WARNING)
1771

    
1772
  def _VerifyGroupLVM(self, node_image, vg_name):
1773
    """Check cross-node consistency in LVM.
1774

1775
    @type node_image: dict
1776
    @param node_image: info about nodes, mapping from node to names to
1777
      L{NodeImage} objects
1778
    @param vg_name: the configured VG name
1779

1780
    """
1781
    if vg_name is None:
1782
      return
1783

    
1784
    # Only exclusive storage needs this kind of checks
1785
    if not self._exclusive_storage:
1786
      return
1787

    
1788
    # exclusive_storage wants all PVs to have the same size (approximately),
1789
    # if the smallest and the biggest ones are okay, everything is fine.
1790
    # pv_min is None iff pv_max is None
1791
    vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1792
    if not vals:
1793
      return
1794
    (pvmin, minnode_uuid) = min((ni.pv_min, ni.uuid) for ni in vals)
1795
    (pvmax, maxnode_uuid) = max((ni.pv_max, ni.uuid) for ni in vals)
1796
    bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1797
    self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1798
                  "PV sizes differ too much in the group; smallest (%s MB) is"
1799
                  " on %s, biggest (%s MB) is on %s",
1800
                  pvmin, self.cfg.GetNodeName(minnode_uuid),
1801
                  pvmax, self.cfg.GetNodeName(maxnode_uuid))
1802

    
1803
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1804
    """Check the node bridges.
1805

1806
    @type ninfo: L{objects.Node}
1807
    @param ninfo: the node to check
1808
    @param nresult: the remote results for the node
1809
    @param bridges: the expected list of bridges
1810

1811
    """
1812
    if not bridges:
1813
      return
1814

    
1815
    missing = nresult.get(constants.NV_BRIDGES, None)
1816
    test = not isinstance(missing, list)
1817
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1818
                  "did not return valid bridge information")
1819
    if not test:
1820
      self._ErrorIf(bool(missing), constants.CV_ENODENET, ninfo.name,
1821
                    "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1822

    
1823
  def _VerifyNodeUserScripts(self, ninfo, nresult):
1824
    """Check the results of user scripts presence and executability on the node
1825

1826
    @type ninfo: L{objects.Node}
1827
    @param ninfo: the node to check
1828
    @param nresult: the remote results for the node
1829

1830
    """
1831
    test = not constants.NV_USERSCRIPTS in nresult
1832
    self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1833
                  "did not return user scripts information")
1834

    
1835
    broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
1836
    if not test:
1837
      self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1838
                    "user scripts not present or not executable: %s" %
1839
                    utils.CommaJoin(sorted(broken_scripts)))
1840

    
1841
  def _VerifyNodeNetwork(self, ninfo, nresult):
1842
    """Check the node network connectivity results.
1843

1844
    @type ninfo: L{objects.Node}
1845
    @param ninfo: the node to check
1846
    @param nresult: the remote results for the node
1847

1848
    """
1849
    test = constants.NV_NODELIST not in nresult
1850
    self._ErrorIf(test, constants.CV_ENODESSH, ninfo.name,
1851
                  "node hasn't returned node ssh connectivity data")
1852
    if not test:
1853
      if nresult[constants.NV_NODELIST]:
1854
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1855
          self._ErrorIf(True, constants.CV_ENODESSH, ninfo.name,
1856
                        "ssh communication with node '%s': %s", a_node, a_msg)
1857

    
1858
    test = constants.NV_NODENETTEST not in nresult
1859
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1860
                  "node hasn't returned node tcp connectivity data")
1861
    if not test:
1862
      if nresult[constants.NV_NODENETTEST]:
1863
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1864
        for anode in nlist:
1865
          self._ErrorIf(True, constants.CV_ENODENET, ninfo.name,
1866
                        "tcp communication with node '%s': %s",
1867
                        anode, nresult[constants.NV_NODENETTEST][anode])
1868

    
1869
    test = constants.NV_MASTERIP not in nresult
1870
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1871
                  "node hasn't returned node master IP reachability data")
1872
    if not test:
1873
      if not nresult[constants.NV_MASTERIP]:
1874
        if ninfo.uuid == self.master_node:
1875
          msg = "the master node cannot reach the master IP (not configured?)"
1876
        else:
1877
          msg = "cannot reach the master IP"
1878
        self._ErrorIf(True, constants.CV_ENODENET, ninfo.name, msg)
1879

    
1880
  def _VerifyInstance(self, instance, node_image, diskstatus):
1881
    """Verify an instance.
1882

1883
    This function checks to see if the required block devices are
1884
    available on the instance's node, and that the nodes are in the correct
1885
    state.
1886

1887
    """
1888
    pnode_uuid = instance.primary_node
1889
    pnode_img = node_image[pnode_uuid]
1890
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
1891

    
1892
    node_vol_should = {}
1893
    instance.MapLVsByNode(node_vol_should)
1894

    
1895
    cluster = self.cfg.GetClusterInfo()
1896
    ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
1897
                                                            self.group_info)
1898
    err = ComputeIPolicyInstanceViolation(ipolicy, instance, self.cfg)
1899
    self._ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance.name,
1900
                  utils.CommaJoin(err), code=self.ETYPE_WARNING)
1901

    
1902
    for node_uuid in node_vol_should:
1903
      n_img = node_image[node_uuid]
1904
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1905
        # ignore missing volumes on offline or broken nodes
1906
        continue
1907
      for volume in node_vol_should[node_uuid]:
1908
        test = volume not in n_img.volumes
1909
        self._ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance.name,
1910
                      "volume %s missing on node %s", volume,
1911
                      self.cfg.GetNodeName(node_uuid))
1912

    
1913
    if instance.admin_state == constants.ADMINST_UP:
1914
      test = instance.uuid not in pnode_img.instances and not pnode_img.offline
1915
      self._ErrorIf(test, constants.CV_EINSTANCEDOWN, instance.name,
1916
                    "instance not running on its primary node %s",
1917
                     self.cfg.GetNodeName(pnode_uuid))
1918
      self._ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE,
1919
                    instance.name, "instance is marked as running and lives on"
1920
                    " offline node %s", self.cfg.GetNodeName(pnode_uuid))
1921

    
1922
    diskdata = [(nname, success, status, idx)
1923
                for (nname, disks) in diskstatus.items()
1924
                for idx, (success, status) in enumerate(disks)]
1925

    
1926
    for nname, success, bdev_status, idx in diskdata:
1927
      # the 'ghost node' construction in Exec() ensures that we have a
1928
      # node here
1929
      snode = node_image[nname]
1930
      bad_snode = snode.ghost or snode.offline
1931
      self._ErrorIf(instance.disks_active and
1932
                    not success and not bad_snode,
1933
                    constants.CV_EINSTANCEFAULTYDISK, instance.name,
1934
                    "couldn't retrieve status for disk/%s on %s: %s",
1935
                    idx, self.cfg.GetNodeName(nname), bdev_status)
1936

    
1937
      if instance.disks_active and success and \
1938
         (bdev_status.is_degraded or
1939
          bdev_status.ldisk_status != constants.LDS_OKAY):
1940
        msg = "disk/%s on %s" % (idx, self.cfg.GetNodeName(nname))
1941
        if bdev_status.is_degraded:
1942
          msg += " is degraded"
1943
        if bdev_status.ldisk_status != constants.LDS_OKAY:
1944
          msg += "; state is '%s'" % \
1945
                 constants.LDS_NAMES[bdev_status.ldisk_status]
1946

    
1947
        self._Error(constants.CV_EINSTANCEFAULTYDISK, instance.name, msg)
1948

    
1949
    self._ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
1950
                  constants.CV_ENODERPC, self.cfg.GetNodeName(pnode_uuid),
1951
                  "instance %s, connection to primary node failed",
1952
                  instance.name)
1953

    
1954
    self._ErrorIf(len(instance.secondary_nodes) > 1,
1955
                  constants.CV_EINSTANCELAYOUT, instance.name,
1956
                  "instance has multiple secondary nodes: %s",
1957
                  utils.CommaJoin(instance.secondary_nodes),
1958
                  code=self.ETYPE_WARNING)
1959

    
1960
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, instance.all_nodes)
1961
    if any(es_flags.values()):
1962
      if instance.disk_template not in constants.DTS_EXCL_STORAGE:
1963
        # Disk template not compatible with exclusive_storage: no instance
1964
        # node should have the flag set
1965
        es_nodes = [n
1966
                    for (n, es) in es_flags.items()
1967
                    if es]
1968
        self._Error(constants.CV_EINSTANCEUNSUITABLENODE, instance.name,
1969
                    "instance has template %s, which is not supported on nodes"
1970
                    " that have exclusive storage set: %s",
1971
                    instance.disk_template,
1972
                    utils.CommaJoin(self.cfg.GetNodeNames(es_nodes)))
1973
      for (idx, disk) in enumerate(instance.disks):
1974
        self._ErrorIf(disk.spindles is None,
1975
                      constants.CV_EINSTANCEMISSINGCFGPARAMETER, instance.name,
1976
                      "number of spindles not configured for disk %s while"
1977
                      " exclusive storage is enabled, try running"
1978
                      " gnt-cluster repair-disk-sizes", idx)
1979

    
1980
    if instance.disk_template in constants.DTS_INT_MIRROR:
1981
      instance_nodes = utils.NiceSort(instance.all_nodes)
1982
      instance_groups = {}
1983

    
1984
      for node_uuid in instance_nodes:
1985
        instance_groups.setdefault(self.all_node_info[node_uuid].group,
1986
                                   []).append(node_uuid)
1987

    
1988
      pretty_list = [
1989
        "%s (group %s)" % (utils.CommaJoin(self.cfg.GetNodeNames(nodes)),
1990
                           groupinfo[group].name)
1991
        # Sort so that we always list the primary node first.
1992
        for group, nodes in sorted(instance_groups.items(),
1993
                                   key=lambda (_, nodes): pnode_uuid in nodes,
1994
                                   reverse=True)]
1995

    
1996
      self._ErrorIf(len(instance_groups) > 1,
1997
                    constants.CV_EINSTANCESPLITGROUPS,
1998
                    instance.name, "instance has primary and secondary nodes in"
1999
                    " different groups: %s", utils.CommaJoin(pretty_list),
2000
                    code=self.ETYPE_WARNING)
2001

    
2002
    inst_nodes_offline = []
2003
    for snode in instance.secondary_nodes:
2004
      s_img = node_image[snode]
2005
      self._ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
2006
                    self.cfg.GetNodeName(snode),
2007
                    "instance %s, connection to secondary node failed",
2008
                    instance.name)
2009

    
2010
      if s_img.offline:
2011
        inst_nodes_offline.append(snode)
2012

    
2013
    # warn that the instance lives on offline nodes
2014
    self._ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE,
2015
                  instance.name, "instance has offline secondary node(s) %s",
2016
                  utils.CommaJoin(self.cfg.GetNodeNames(inst_nodes_offline)))
2017
    # ... or ghost/non-vm_capable nodes
2018
    for node_uuid in instance.all_nodes:
2019
      self._ErrorIf(node_image[node_uuid].ghost, constants.CV_EINSTANCEBADNODE,
2020
                    instance.name, "instance lives on ghost node %s",
2021
                    self.cfg.GetNodeName(node_uuid))
2022
      self._ErrorIf(not node_image[node_uuid].vm_capable,
2023
                    constants.CV_EINSTANCEBADNODE, instance.name,
2024
                    "instance lives on non-vm_capable node %s",
2025
                    self.cfg.GetNodeName(node_uuid))
2026

    
2027
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2028
    """Verify if there are any unknown volumes in the cluster.
2029

2030
    The .os, .swap and backup volumes are ignored. All other volumes are
2031
    reported as unknown.
2032

2033
    @type reserved: L{ganeti.utils.FieldSet}
2034
    @param reserved: a FieldSet of reserved volume names
2035

2036
    """
2037
    for node_uuid, n_img in node_image.items():
2038
      if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
2039
          self.all_node_info[node_uuid].group != self.group_uuid):
2040
        # skip non-healthy nodes
2041
        continue
2042
      for volume in n_img.volumes:
2043
        test = ((node_uuid not in node_vol_should or
2044
                volume not in node_vol_should[node_uuid]) and
2045
                not reserved.Matches(volume))
2046
        self._ErrorIf(test, constants.CV_ENODEORPHANLV,
2047
                      self.cfg.GetNodeName(node_uuid),
2048
                      "volume %s is unknown", volume)
2049

    
2050
  def _VerifyNPlusOneMemory(self, node_image, all_insts):
2051
    """Verify N+1 Memory Resilience.
2052

2053
    Check that if one single node dies we can still start all the
2054
    instances it was primary for.
2055

2056
    """
2057
    cluster_info = self.cfg.GetClusterInfo()
2058
    for node_uuid, n_img in node_image.items():
2059
      # This code checks that every node which is now listed as
2060
      # secondary has enough memory to host all instances it is
2061
      # supposed to should a single other node in the cluster fail.
2062
      # FIXME: not ready for failover to an arbitrary node
2063
      # FIXME: does not support file-backed instances
2064
      # WARNING: we currently take into account down instances as well
2065
      # as up ones, considering that even if they're down someone
2066
      # might want to start them even in the event of a node failure.
2067
      if n_img.offline or \
2068
         self.all_node_info[node_uuid].group != self.group_uuid:
2069
        # we're skipping nodes marked offline and nodes in other groups from
2070
        # the N+1 warning, since most likely we don't have good memory
2071
        # infromation from them; we already list instances living on such
2072
        # nodes, and that's enough warning
2073
        continue
2074
      #TODO(dynmem): also consider ballooning out other instances
2075
      for prinode, inst_uuids in n_img.sbp.items():
2076
        needed_mem = 0
2077
        for inst_uuid in inst_uuids:
2078
          bep = cluster_info.FillBE(all_insts[inst_uuid])
2079
          if bep[constants.BE_AUTO_BALANCE]:
2080
            needed_mem += bep[constants.BE_MINMEM]
2081
        test = n_img.mfree < needed_mem
2082
        self._ErrorIf(test, constants.CV_ENODEN1,
2083
                      self.cfg.GetNodeName(node_uuid),
2084
                      "not enough memory to accomodate instance failovers"
2085
                      " should node %s fail (%dMiB needed, %dMiB available)",
2086
                      self.cfg.GetNodeName(prinode), needed_mem, n_img.mfree)
2087

    
2088
  def _VerifyFiles(self, nodes, master_node_uuid, all_nvinfo,
2089
                   (files_all, files_opt, files_mc, files_vm)):
2090
    """Verifies file checksums collected from all nodes.
2091

2092
    @param nodes: List of L{objects.Node} objects
2093
    @param master_node_uuid: UUID of master node
2094
    @param all_nvinfo: RPC results
2095

2096
    """
2097
    # Define functions determining which nodes to consider for a file
2098
    files2nodefn = [
2099
      (files_all, None),
2100
      (files_mc, lambda node: (node.master_candidate or
2101
                               node.uuid == master_node_uuid)),
2102
      (files_vm, lambda node: node.vm_capable),
2103
      ]
2104

    
2105
    # Build mapping from filename to list of nodes which should have the file
2106
    nodefiles = {}
2107
    for (files, fn) in files2nodefn:
2108
      if fn is None:
2109
        filenodes = nodes
2110
      else:
2111
        filenodes = filter(fn, nodes)
2112
      nodefiles.update((filename,
2113
                        frozenset(map(operator.attrgetter("uuid"), filenodes)))
2114
                       for filename in files)
2115

    
2116
    assert set(nodefiles) == (files_all | files_mc | files_vm)
2117

    
2118
    fileinfo = dict((filename, {}) for filename in nodefiles)
2119
    ignore_nodes = set()
2120

    
2121
    for node in nodes:
2122
      if node.offline:
2123
        ignore_nodes.add(node.uuid)
2124
        continue
2125

    
2126
      nresult = all_nvinfo[node.uuid]
2127

    
2128
      if nresult.fail_msg or not nresult.payload:
2129
        node_files = None
2130
      else:
2131
        fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
2132
        node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2133
                          for (key, value) in fingerprints.items())
2134
        del fingerprints
2135

    
2136
      test = not (node_files and isinstance(node_files, dict))
2137
      self._ErrorIf(test, constants.CV_ENODEFILECHECK, node.name,
2138
                    "Node did not return file checksum data")
2139
      if test:
2140
        ignore_nodes.add(node.uuid)
2141
        continue
2142

    
2143
      # Build per-checksum mapping from filename to nodes having it
2144
      for (filename, checksum) in node_files.items():
2145
        assert filename in nodefiles
2146
        fileinfo[filename].setdefault(checksum, set()).add(node.uuid)
2147

    
2148
    for (filename, checksums) in fileinfo.items():
2149
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2150

    
2151
      # Nodes having the file
2152
      with_file = frozenset(node_uuid
2153
                            for node_uuids in fileinfo[filename].values()
2154
                            for node_uuid in node_uuids) - ignore_nodes
2155

    
2156
      expected_nodes = nodefiles[filename] - ignore_nodes
2157

    
2158
      # Nodes missing file
2159
      missing_file = expected_nodes - with_file
2160

    
2161
      if filename in files_opt:
2162
        # All or no nodes
2163
        self._ErrorIf(missing_file and missing_file != expected_nodes,
2164
                      constants.CV_ECLUSTERFILECHECK, None,
2165
                      "File %s is optional, but it must exist on all or no"
2166
                      " nodes (not found on %s)",
2167
                      filename,
2168
                      utils.CommaJoin(
2169
                        utils.NiceSort(
2170
                          map(self.cfg.GetNodeName, missing_file))))
2171
      else:
2172
        self._ErrorIf(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2173
                      "File %s is missing from node(s) %s", filename,
2174
                      utils.CommaJoin(
2175
                        utils.NiceSort(
2176
                          map(self.cfg.GetNodeName, missing_file))))
2177

    
2178
        # Warn if a node has a file it shouldn't
2179
        unexpected = with_file - expected_nodes
2180
        self._ErrorIf(unexpected,
2181
                      constants.CV_ECLUSTERFILECHECK, None,
2182
                      "File %s should not exist on node(s) %s",
2183
                      filename, utils.CommaJoin(
2184
                        utils.NiceSort(map(self.cfg.GetNodeName, unexpected))))
2185

    
2186
      # See if there are multiple versions of the file
2187
      test = len(checksums) > 1
2188
      if test:
2189
        variants = ["variant %s on %s" %
2190
                    (idx + 1,
2191
                     utils.CommaJoin(utils.NiceSort(
2192
                       map(self.cfg.GetNodeName, node_uuids))))
2193
                    for (idx, (checksum, node_uuids)) in
2194
                      enumerate(sorted(checksums.items()))]
2195
      else:
2196
        variants = []
2197

    
2198
      self._ErrorIf(test, constants.CV_ECLUSTERFILECHECK, None,
2199
                    "File %s found with %s different checksums (%s)",
2200
                    filename, len(checksums), "; ".join(variants))
2201

    
2202
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2203
                      drbd_map):
2204
    """Verifies and the node DRBD status.
2205

2206
    @type ninfo: L{objects.Node}
2207
    @param ninfo: the node to check
2208
    @param nresult: the remote results for the node
2209
    @param instanceinfo: the dict of instances
2210
    @param drbd_helper: the configured DRBD usermode helper
2211
    @param drbd_map: the DRBD map as returned by
2212
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2213

2214
    """
2215
    if drbd_helper:
2216
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2217
      test = (helper_result is None)
2218
      self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2219
                    "no drbd usermode helper returned")
2220
      if helper_result:
2221
        status, payload = helper_result
2222
        test = not status
2223
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2224
                      "drbd usermode helper check unsuccessful: %s", payload)
2225
        test = status and (payload != drbd_helper)
2226
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2227
                      "wrong drbd usermode helper: %s", payload)
2228

    
2229
    # compute the DRBD minors
2230
    node_drbd = {}
2231
    for minor, inst_uuid in drbd_map[ninfo.uuid].items():
2232
      test = inst_uuid not in instanceinfo
2233
      self._ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2234
                    "ghost instance '%s' in temporary DRBD map", inst_uuid)
2235
        # ghost instance should not be running, but otherwise we
2236
        # don't give double warnings (both ghost instance and
2237
        # unallocated minor in use)
2238
      if test:
2239
        node_drbd[minor] = (inst_uuid, False)
2240
      else:
2241
        instance = instanceinfo[inst_uuid]
2242
        node_drbd[minor] = (inst_uuid, instance.disks_active)
2243

    
2244
    # and now check them
2245
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2246
    test = not isinstance(used_minors, (tuple, list))
2247
    self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2248
                  "cannot parse drbd status file: %s", str(used_minors))
2249
    if test:
2250
      # we cannot check drbd status
2251
      return
2252

    
2253
    for minor, (inst_uuid, must_exist) in node_drbd.items():
2254
      test = minor not in used_minors and must_exist
2255
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2256
                    "drbd minor %d of instance %s is not active", minor,
2257
                    self.cfg.GetInstanceName(inst_uuid))
2258
    for minor in used_minors:
2259
      test = minor not in node_drbd
2260
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2261
                    "unallocated drbd minor %d is in use", minor)
2262

    
2263
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2264
    """Builds the node OS structures.
2265

2266
    @type ninfo: L{objects.Node}
2267
    @param ninfo: the node to check
2268
    @param nresult: the remote results for the node
2269
    @param nimg: the node image object
2270

2271
    """
2272
    remote_os = nresult.get(constants.NV_OSLIST, None)
2273
    test = (not isinstance(remote_os, list) or
2274
            not compat.all(isinstance(v, list) and len(v) == 7
2275
                           for v in remote_os))
2276

    
2277
    self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2278
                  "node hasn't returned valid OS data")
2279

    
2280
    nimg.os_fail = test
2281

    
2282
    if test:
2283
      return
2284

    
2285
    os_dict = {}
2286

    
2287
    for (name, os_path, status, diagnose,
2288
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2289

    
2290
      if name not in os_dict:
2291
        os_dict[name] = []
2292

    
2293
      # parameters is a list of lists instead of list of tuples due to
2294
      # JSON lacking a real tuple type, fix it:
2295
      parameters = [tuple(v) for v in parameters]
2296
      os_dict[name].append((os_path, status, diagnose,
2297
                            set(variants), set(parameters), set(api_ver)))
2298

    
2299
    nimg.oslist = os_dict
2300

    
2301
  def _VerifyNodeOS(self, ninfo, nimg, base):
2302
    """Verifies the node OS list.
2303

2304
    @type ninfo: L{objects.Node}
2305
    @param ninfo: the node to check
2306
    @param nimg: the node image object
2307
    @param base: the 'template' node we match against (e.g. from the master)
2308

2309
    """
2310
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2311

    
2312
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2313
    for os_name, os_data in nimg.oslist.items():
2314
      assert os_data, "Empty OS status for OS %s?!" % os_name
2315
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2316
      self._ErrorIf(not f_status, constants.CV_ENODEOS, ninfo.name,
2317
                    "Invalid OS %s (located at %s): %s",
2318
                    os_name, f_path, f_diag)
2319
      self._ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, ninfo.name,
2320
                    "OS '%s' has multiple entries"
2321
                    " (first one shadows the rest): %s",
2322
                    os_name, utils.CommaJoin([v[0] for v in os_data]))
2323
      # comparisons with the 'base' image
2324
      test = os_name not in base.oslist
2325
      self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2326
                    "Extra OS %s not present on reference node (%s)",
2327
                    os_name, self.cfg.GetNodeName(base.uuid))
2328
      if test:
2329
        continue
2330
      assert base.oslist[os_name], "Base node has empty OS status?"
2331
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2332
      if not b_status:
2333
        # base OS is invalid, skipping
2334
        continue
2335
      for kind, a, b in [("API version", f_api, b_api),
2336
                         ("variants list", f_var, b_var),
2337
                         ("parameters", beautify_params(f_param),
2338
                          beautify_params(b_param))]:
2339
        self._ErrorIf(a != b, constants.CV_ENODEOS, ninfo.name,
2340
                      "OS %s for %s differs from reference node %s:"
2341
                      " [%s] vs. [%s]", kind, os_name,
2342
                      self.cfg.GetNodeName(base.uuid),
2343
                      utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2344

    
2345
    # check any missing OSes
2346
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2347
    self._ErrorIf(missing, constants.CV_ENODEOS, ninfo.name,
2348
                  "OSes present on reference node %s"
2349
                  " but missing on this node: %s",
2350
                  self.cfg.GetNodeName(base.uuid), utils.CommaJoin(missing))
2351

    
2352
  def _VerifyAcceptedFileStoragePaths(self, ninfo, nresult, is_master):
2353
    """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2354

2355
    @type ninfo: L{objects.Node}
2356
    @param ninfo: the node to check
2357
    @param nresult: the remote results for the node
2358
    @type is_master: bool
2359
    @param is_master: Whether node is the master node
2360

2361
    """
2362
    cluster = self.cfg.GetClusterInfo()
2363
    if (is_master and
2364
        (cluster.IsFileStorageEnabled() or
2365
         cluster.IsSharedFileStorageEnabled())):
2366
      try:
2367
        fspaths = nresult[constants.NV_ACCEPTED_STORAGE_PATHS]
2368
      except KeyError:
2369
        # This should never happen
2370
        self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2371
                      "Node did not return forbidden file storage paths")
2372
      else:
2373
        self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2374
                      "Found forbidden file storage paths: %s",
2375
                      utils.CommaJoin(fspaths))
2376
    else:
2377
      self._ErrorIf(constants.NV_ACCEPTED_STORAGE_PATHS in nresult,
2378
                    constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2379
                    "Node should not have returned forbidden file storage"
2380
                    " paths")
2381

    
2382
  def _VerifyStoragePaths(self, ninfo, nresult):
2383
    """Verifies (file) storage paths.
2384

2385
    @type ninfo: L{objects.Node}
2386
    @param ninfo: the node to check
2387
    @param nresult: the remote results for the node
2388

2389
    """
2390
    cluster = self.cfg.GetClusterInfo()
2391
    if cluster.IsFileStorageEnabled():
2392
      self._ErrorIf(
2393
          constants.NV_FILE_STORAGE_PATH in nresult,
2394
          constants.CV_ENODEFILESTORAGEPATHUNUSABLE, ninfo.name,
2395
          "The configured file storage path is unusable: %s" %
2396
          nresult.get(constants.NV_FILE_STORAGE_PATH))
2397

    
2398
  def _VerifyOob(self, ninfo, nresult):
2399
    """Verifies out of band functionality of a node.
2400

2401
    @type ninfo: L{objects.Node}
2402
    @param ninfo: the node to check
2403
    @param nresult: the remote results for the node
2404

2405
    """
2406
    # We just have to verify the paths on master and/or master candidates
2407
    # as the oob helper is invoked on the master
2408
    if ((ninfo.master_candidate or ninfo.master_capable) and
2409
        constants.NV_OOB_PATHS in nresult):
2410
      for path_result in nresult[constants.NV_OOB_PATHS]:
2411
        self._ErrorIf(path_result, constants.CV_ENODEOOBPATH,
2412
                      ninfo.name, path_result)
2413

    
2414
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2415
    """Verifies and updates the node volume data.
2416

2417
    This function will update a L{NodeImage}'s internal structures
2418
    with data from the remote call.
2419

2420
    @type ninfo: L{objects.Node}
2421
    @param ninfo: the node to check
2422
    @param nresult: the remote results for the node
2423
    @param nimg: the node image object
2424
    @param vg_name: the configured VG name
2425

2426
    """
2427
    nimg.lvm_fail = True
2428
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2429
    if vg_name is None:
2430
      pass
2431
    elif isinstance(lvdata, basestring):
2432
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2433
                    "LVM problem on node: %s", utils.SafeEncode(lvdata))
2434
    elif not isinstance(lvdata, dict):
2435
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2436
                    "rpc call to node failed (lvlist)")
2437
    else:
2438
      nimg.volumes = lvdata
2439
      nimg.lvm_fail = False
2440

    
2441
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2442
    """Verifies and updates the node instance list.
2443

2444
    If the listing was successful, then updates this node's instance
2445
    list. Otherwise, it marks the RPC call as failed for the instance
2446
    list key.
2447

2448
    @type ninfo: L{objects.Node}
2449
    @param ninfo: the node to check
2450
    @param nresult: the remote results for the node
2451
    @param nimg: the node image object
2452

2453
    """
2454
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2455
    test = not isinstance(idata, list)
2456
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2457
                  "rpc call to node failed (instancelist): %s",
2458
                  utils.SafeEncode(str(idata)))
2459
    if test:
2460
      nimg.hyp_fail = True
2461
    else:
2462
      nimg.instances = [inst.uuid for (_, inst) in
2463
                        self.cfg.GetMultiInstanceInfoByName(idata)]
2464

    
2465
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2466
    """Verifies and computes a node information map
2467

2468
    @type ninfo: L{objects.Node}
2469
    @param ninfo: the node to check
2470
    @param nresult: the remote results for the node
2471
    @param nimg: the node image object
2472
    @param vg_name: the configured VG name
2473

2474
    """
2475
    # try to read free memory (from the hypervisor)
2476
    hv_info = nresult.get(constants.NV_HVINFO, None)
2477
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2478
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2479
                  "rpc call to node failed (hvinfo)")
2480
    if not test:
2481
      try:
2482
        nimg.mfree = int(hv_info["memory_free"])
2483
      except (ValueError, TypeError):
2484
        self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2485
                      "node returned invalid nodeinfo, check hypervisor")
2486

    
2487
    # FIXME: devise a free space model for file based instances as well
2488
    if vg_name is not None:
2489
      test = (constants.NV_VGLIST not in nresult or
2490
              vg_name not in nresult[constants.NV_VGLIST])
2491
      self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
2492
                    "node didn't return data for the volume group '%s'"
2493
                    " - it is either missing or broken", vg_name)
2494
      if not test:
2495
        try:
2496
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2497
        except (ValueError, TypeError):
2498
          self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2499
                        "node returned invalid LVM info, check LVM status")
2500

    
2501
  def _CollectDiskInfo(self, node_uuids, node_image, instanceinfo):
2502
    """Gets per-disk status information for all instances.
2503

2504
    @type node_uuids: list of strings
2505
    @param node_uuids: Node UUIDs
2506
    @type node_image: dict of (UUID, L{objects.Node})
2507
    @param node_image: Node objects
2508
    @type instanceinfo: dict of (UUID, L{objects.Instance})
2509
    @param instanceinfo: Instance objects
2510
    @rtype: {instance: {node: [(succes, payload)]}}
2511
    @return: a dictionary of per-instance dictionaries with nodes as
2512
        keys and disk information as values; the disk information is a
2513
        list of tuples (success, payload)
2514

2515
    """
2516
    node_disks = {}
2517
    node_disks_devonly = {}
2518
    diskless_instances = set()
2519
    diskless = constants.DT_DISKLESS
2520

    
2521
    for nuuid in node_uuids:
2522
      node_inst_uuids = list(itertools.chain(node_image[nuuid].pinst,
2523
                                             node_image[nuuid].sinst))
2524
      diskless_instances.update(uuid for uuid in node_inst_uuids
2525
                                if instanceinfo[uuid].disk_template == diskless)
2526
      disks = [(inst_uuid, disk)
2527
               for inst_uuid in node_inst_uuids
2528
               for disk in instanceinfo[inst_uuid].disks]
2529

    
2530
      if not disks:
2531
        # No need to collect data
2532
        continue
2533

    
2534
      node_disks[nuuid] = disks
2535

    
2536
      # _AnnotateDiskParams makes already copies of the disks
2537
      devonly = []
2538
      for (inst_uuid, dev) in disks:
2539
        (anno_disk,) = AnnotateDiskParams(instanceinfo[inst_uuid], [dev],
2540
                                          self.cfg)
2541
        self.cfg.SetDiskID(anno_disk, nuuid)
2542
        devonly.append(anno_disk)
2543

    
2544
      node_disks_devonly[nuuid] = devonly
2545

    
2546
    assert len(node_disks) == len(node_disks_devonly)
2547

    
2548
    # Collect data from all nodes with disks
2549
    result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2550
                                                          node_disks_devonly)
2551

    
2552
    assert len(result) == len(node_disks)
2553

    
2554
    instdisk = {}
2555

    
2556
    for (nuuid, nres) in result.items():
2557
      node = self.cfg.GetNodeInfo(nuuid)
2558
      disks = node_disks[node.uuid]
2559

    
2560
      if nres.offline:
2561
        # No data from this node
2562
        data = len(disks) * [(False, "node offline")]
2563
      else:
2564
        msg = nres.fail_msg
2565
        self._ErrorIf(msg, constants.CV_ENODERPC, node.name,
2566
                      "while getting disk information: %s", msg)
2567
        if msg:
2568
          # No data from this node
2569
          data = len(disks) * [(False, msg)]
2570
        else:
2571
          data = []
2572
          for idx, i in enumerate(nres.payload):
2573
            if isinstance(i, (tuple, list)) and len(i) == 2:
2574
              data.append(i)
2575
            else:
2576
              logging.warning("Invalid result from node %s, entry %d: %s",
2577
                              node.name, idx, i)
2578
              data.append((False, "Invalid result from the remote node"))
2579

    
2580
      for ((inst_uuid, _), status) in zip(disks, data):
2581
        instdisk.setdefault(inst_uuid, {}).setdefault(node.uuid, []) \
2582
          .append(status)
2583

    
2584
    # Add empty entries for diskless instances.
2585
    for inst_uuid in diskless_instances:
2586
      assert inst_uuid not in instdisk
2587
      instdisk[inst_uuid] = {}
2588

    
2589
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2590
                      len(nuuids) <= len(instanceinfo[inst].all_nodes) and
2591
                      compat.all(isinstance(s, (tuple, list)) and
2592
                                 len(s) == 2 for s in statuses)
2593
                      for inst, nuuids in instdisk.items()
2594
                      for nuuid, statuses in nuuids.items())
2595
    if __debug__:
2596
      instdisk_keys = set(instdisk)
2597
      instanceinfo_keys = set(instanceinfo)
2598
      assert instdisk_keys == instanceinfo_keys, \
2599
        ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2600
         (instdisk_keys, instanceinfo_keys))
2601

    
2602
    return instdisk
2603

    
2604
  @staticmethod
2605
  def _SshNodeSelector(group_uuid, all_nodes):
2606
    """Create endless iterators for all potential SSH check hosts.
2607

2608
    """
2609
    nodes = [node for node in all_nodes
2610
             if (node.group != group_uuid and
2611
                 not node.offline)]
2612
    keyfunc = operator.attrgetter("group")
2613

    
2614
    return map(itertools.cycle,
2615
               [sorted(map(operator.attrgetter("name"), names))
2616
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2617
                                                  keyfunc)])
2618

    
2619
  @classmethod
2620
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2621
    """Choose which nodes should talk to which other nodes.
2622

2623
    We will make nodes contact all nodes in their group, and one node from
2624
    every other group.
2625

2626
    @warning: This algorithm has a known issue if one node group is much
2627
      smaller than others (e.g. just one node). In such a case all other
2628
      nodes will talk to the single node.
2629

2630
    """
2631
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2632
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
2633

    
2634
    return (online_nodes,
2635
            dict((name, sorted([i.next() for i in sel]))
2636
                 for name in online_nodes))
2637

    
2638
  def BuildHooksEnv(self):
2639
    """Build hooks env.
2640

2641
    Cluster-Verify hooks just ran in the post phase and their failure makes
2642
    the output be logged in the verify output and the verification to fail.
2643

2644
    """
2645
    env = {
2646
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2647
      }
2648

    
2649
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2650
               for node in self.my_node_info.values())
2651

    
2652
    return env
2653

    
2654
  def BuildHooksNodes(self):
2655
    """Build hooks nodes.
2656

2657
    """
2658
    return ([], list(self.my_node_info.keys()))
2659

    
2660
  def Exec(self, feedback_fn):
2661
    """Verify integrity of the node group, performing various test on nodes.
2662

2663
    """
2664
    # This method has too many local variables. pylint: disable=R0914
2665
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
2666

    
2667
    if not self.my_node_uuids:
2668
      # empty node group
2669
      feedback_fn("* Empty node group, skipping verification")
2670
      return True
2671

    
2672
    self.bad = False
2673
    verbose = self.op.verbose
2674
    self._feedback_fn = feedback_fn
2675

    
2676
    vg_name = self.cfg.GetVGName()
2677
    drbd_helper = self.cfg.GetDRBDHelper()
2678
    cluster = self.cfg.GetClusterInfo()
2679
    hypervisors = cluster.enabled_hypervisors
2680
    node_data_list = self.my_node_info.values()
2681

    
2682
    i_non_redundant = [] # Non redundant instances
2683
    i_non_a_balanced = [] # Non auto-balanced instances
2684
    i_offline = 0 # Count of offline instances
2685
    n_offline = 0 # Count of offline nodes
2686
    n_drained = 0 # Count of nodes being drained
2687
    node_vol_should = {}
2688

    
2689
    # FIXME: verify OS list
2690

    
2691
    # File verification
2692
    filemap = ComputeAncillaryFiles(cluster, False)
2693

    
2694
    # do local checksums
2695
    master_node_uuid = self.master_node = self.cfg.GetMasterNode()
2696
    master_ip = self.cfg.GetMasterIP()
2697

    
2698
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_uuids))
2699

    
2700
    user_scripts = []
2701
    if self.cfg.GetUseExternalMipScript():
2702
      user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2703

    
2704
    node_verify_param = {
2705
      constants.NV_FILELIST:
2706
        map(vcluster.MakeVirtualPath,
2707
            utils.UniqueSequence(filename
2708
                                 for files in filemap
2709
                                 for filename in files)),
2710
      constants.NV_NODELIST:
2711
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2712
                                  self.all_node_info.values()),
2713
      constants.NV_HYPERVISOR: hypervisors,
2714
      constants.NV_HVPARAMS:
2715
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2716
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2717
                                 for node in node_data_list
2718
                                 if not node.offline],
2719
      constants.NV_INSTANCELIST: hypervisors,
2720
      constants.NV_VERSION: None,
2721
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2722
      constants.NV_NODESETUP: None,
2723
      constants.NV_TIME: None,
2724
      constants.NV_MASTERIP: (self.cfg.GetMasterNodeName(), master_ip),
2725
      constants.NV_OSLIST: None,
2726
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2727
      constants.NV_USERSCRIPTS: user_scripts,
2728
      }
2729

    
2730
    if vg_name is not None:
2731
      node_verify_param[constants.NV_VGLIST] = None
2732
      node_verify_param[constants.NV_LVLIST] = vg_name
2733
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2734

    
2735
    if drbd_helper:
2736
      node_verify_param[constants.NV_DRBDVERSION] = None
2737
      node_verify_param[constants.NV_DRBDLIST] = None
2738
      node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2739

    
2740
    if cluster.IsFileStorageEnabled() or \
2741
        cluster.IsSharedFileStorageEnabled():
2742
      # Load file storage paths only from master node
2743
      node_verify_param[constants.NV_ACCEPTED_STORAGE_PATHS] = \
2744
        self.cfg.GetMasterNodeName()
2745
      if cluster.IsFileStorageEnabled():
2746
        node_verify_param[constants.NV_FILE_STORAGE_PATH] = \
2747
          cluster.file_storage_dir
2748

    
2749
    # bridge checks
2750
    # FIXME: this needs to be changed per node-group, not cluster-wide
2751
    bridges = set()
2752
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2753
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2754
      bridges.add(default_nicpp[constants.NIC_LINK])
2755
    for inst_uuid in self.my_inst_info.values():
2756
      for nic in inst_uuid.nics:
2757
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
2758
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2759
          bridges.add(full_nic[constants.NIC_LINK])
2760

    
2761
    if bridges:
2762
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
2763

    
2764
    # Build our expected cluster state
2765
    node_image = dict((node.uuid, self.NodeImage(offline=node.offline,
2766
                                                 uuid=node.uuid,
2767
                                                 vm_capable=node.vm_capable))
2768
                      for node in node_data_list)
2769

    
2770
    # Gather OOB paths
2771
    oob_paths = []
2772
    for node in self.all_node_info.values():
2773
      path = SupportsOob(self.cfg, node)
2774
      if path and path not in oob_paths:
2775
        oob_paths.append(path)
2776

    
2777
    if oob_paths:
2778
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2779

    
2780
    for inst_uuid in self.my_inst_uuids:
2781
      instance = self.my_inst_info[inst_uuid]
2782
      if instance.admin_state == constants.ADMINST_OFFLINE:
2783
        i_offline += 1
2784

    
2785
      for nuuid in instance.all_nodes:
2786
        if nuuid not in node_image:
2787
          gnode = self.NodeImage(uuid=nuuid)
2788
          gnode.ghost = (nuuid not in self.all_node_info)
2789
          node_image[nuuid] = gnode
2790

    
2791
      instance.MapLVsByNode(node_vol_should)
2792

    
2793
      pnode = instance.primary_node
2794
      node_image[pnode].pinst.append(instance.uuid)
2795

    
2796
      for snode in instance.secondary_nodes:
2797
        nimg = node_image[snode]
2798
        nimg.sinst.append(instance.uuid)
2799
        if pnode not in nimg.sbp:
2800
          nimg.sbp[pnode] = []
2801
        nimg.sbp[pnode].append(instance.uuid)
2802

    
2803
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
2804
                                               self.my_node_info.keys())
2805
    # The value of exclusive_storage should be the same across the group, so if
2806
    # it's True for at least a node, we act as if it were set for all the nodes
2807
    self._exclusive_storage = compat.any(es_flags.values())
2808
    if self._exclusive_storage:
2809
      node_verify_param[constants.NV_EXCLUSIVEPVS] = True
2810

    
2811
    # At this point, we have the in-memory data structures complete,
2812
    # except for the runtime information, which we'll gather next
2813

    
2814
    # Due to the way our RPC system works, exact response times cannot be
2815
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2816
    # time before and after executing the request, we can at least have a time
2817
    # window.
2818
    nvinfo_starttime = time.time()
2819
    all_nvinfo = self.rpc.call_node_verify(self.my_node_uuids,
2820
                                           node_verify_param,
2821
                                           self.cfg.GetClusterName(),
2822
                                           self.cfg.GetClusterInfo().hvparams)
2823
    nvinfo_endtime = time.time()
2824

    
2825
    if self.extra_lv_nodes and vg_name is not None:
2826
      extra_lv_nvinfo = \
2827
          self.rpc.call_node_verify(self.extra_lv_nodes,
2828
                                    {constants.NV_LVLIST: vg_name},
2829
                                    self.cfg.GetClusterName(),
2830
                                    self.cfg.GetClusterInfo().hvparams)
2831
    else:
2832
      extra_lv_nvinfo = {}
2833

    
2834
    all_drbd_map = self.cfg.ComputeDRBDMap()
2835

    
2836
    feedback_fn("* Gathering disk information (%s nodes)" %
2837
                len(self.my_node_uuids))
2838
    instdisk = self._CollectDiskInfo(self.my_node_info.keys(), node_image,
2839
                                     self.my_inst_info)
2840

    
2841
    feedback_fn("* Verifying configuration file consistency")
2842

    
2843
    # If not all nodes are being checked, we need to make sure the master node
2844
    # and a non-checked vm_capable node are in the list.
2845
    absent_node_uuids = set(self.all_node_info).difference(self.my_node_info)
2846
    if absent_node_uuids:
2847
      vf_nvinfo = all_nvinfo.copy()
2848
      vf_node_info = list(self.my_node_info.values())
2849
      additional_node_uuids = []
2850
      if master_node_uuid not in self.my_node_info:
2851
        additional_node_uuids.append(master_node_uuid)
2852
        vf_node_info.append(self.all_node_info[master_node_uuid])
2853
      # Add the first vm_capable node we find which is not included,
2854
      # excluding the master node (which we already have)
2855
      for node_uuid in absent_node_uuids:
2856
        nodeinfo = self.all_node_info[node_uuid]
2857
        if (nodeinfo.vm_capable and not nodeinfo.offline and
2858
            node_uuid != master_node_uuid):
2859
          additional_node_uuids.append(node_uuid)
2860
          vf_node_info.append(self.all_node_info[node_uuid])
2861
          break
2862
      key = constants.NV_FILELIST
2863
      vf_nvinfo.update(self.rpc.call_node_verify(
2864
         additional_node_uuids, {key: node_verify_param[key]},
2865
         self.cfg.GetClusterName(), self.cfg.GetClusterInfo().hvparams))
2866
    else:
2867
      vf_nvinfo = all_nvinfo
2868
      vf_node_info = self.my_node_info.values()
2869

    
2870
    self._VerifyFiles(vf_node_info, master_node_uuid, vf_nvinfo, filemap)
2871

    
2872
    feedback_fn("* Verifying node status")
2873

    
2874
    refos_img = None
2875

    
2876
    for node_i in node_data_list:
2877
      nimg = node_image[node_i.uuid]
2878

    
2879
      if node_i.offline:
2880
        if verbose:
2881
          feedback_fn("* Skipping offline node %s" % (node_i.name,))
2882
        n_offline += 1
2883
        continue
2884

    
2885
      if node_i.uuid == master_node_uuid:
2886
        ntype = "master"
2887
      elif node_i.master_candidate:
2888
        ntype = "master candidate"
2889
      elif node_i.drained:
2890
        ntype = "drained"
2891
        n_drained += 1
2892
      else:
2893
        ntype = "regular"
2894
      if verbose:
2895
        feedback_fn("* Verifying node %s (%s)" % (node_i.name, ntype))
2896

    
2897
      msg = all_nvinfo[node_i.uuid].fail_msg
2898
      self._ErrorIf(msg, constants.CV_ENODERPC, node_i.name,
2899
                    "while contacting node: %s", msg)
2900
      if msg:
2901
        nimg.rpc_fail = True
2902
        continue
2903

    
2904
      nresult = all_nvinfo[node_i.uuid].payload
2905

    
2906
      nimg.call_ok = self._VerifyNode(node_i, nresult)
2907
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2908
      self._VerifyNodeNetwork(node_i, nresult)
2909
      self._VerifyNodeUserScripts(node_i, nresult)
2910
      self._VerifyOob(node_i, nresult)
2911
      self._VerifyAcceptedFileStoragePaths(node_i, nresult,
2912
                                           node_i.uuid == master_node_uuid)
2913
      self._VerifyStoragePaths(node_i, nresult)
2914

    
2915
      if nimg.vm_capable:
2916
        self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
2917
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
2918
                             all_drbd_map)
2919

    
2920
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2921
        self._UpdateNodeInstances(node_i, nresult, nimg)
2922
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2923
        self._UpdateNodeOS(node_i, nresult, nimg)
2924

    
2925
        if not nimg.os_fail:
2926
          if refos_img is None:
2927
            refos_img = nimg
2928
          self._VerifyNodeOS(node_i, nimg, refos_img)
2929
        self._VerifyNodeBridges(node_i, nresult, bridges)
2930

    
2931
        # Check whether all running instances are primary for the node. (This
2932
        # can no longer be done from _VerifyInstance below, since some of the
2933
        # wrong instances could be from other node groups.)
2934
        non_primary_inst_uuids = set(nimg.instances).difference(nimg.pinst)
2935

    
2936
        for inst_uuid in non_primary_inst_uuids:
2937
          test = inst_uuid in self.all_inst_info
2938
          self._ErrorIf(test, constants.CV_EINSTANCEWRONGNODE,
2939
                        self.cfg.GetInstanceName(inst_uuid),
2940
                        "instance should not run on node %s", node_i.name)
2941
          self._ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
2942
                        "node is running unknown instance %s", inst_uuid)
2943

    
2944
    self._VerifyGroupDRBDVersion(all_nvinfo)
2945
    self._VerifyGroupLVM(node_image, vg_name)
2946

    
2947
    for node_uuid, result in extra_lv_nvinfo.items():
2948
      self._UpdateNodeVolumes(self.all_node_info[node_uuid], result.payload,
2949
                              node_image[node_uuid], vg_name)
2950

    
2951
    feedback_fn("* Verifying instance status")
2952
    for inst_uuid in self.my_inst_uuids:
2953
      instance = self.my_inst_info[inst_uuid]
2954
      if verbose:
2955
        feedback_fn("* Verifying instance %s" % instance.name)
2956
      self._VerifyInstance(instance, node_image, instdisk[inst_uuid])
2957

    
2958
      # If the instance is non-redundant we cannot survive losing its primary
2959
      # node, so we are not N+1 compliant.
2960
      if instance.disk_template not in constants.DTS_MIRRORED:
2961
        i_non_redundant.append(instance)
2962

    
2963
      if not cluster.FillBE(instance)[constants.BE_AUTO_BALANCE]:
2964
        i_non_a_balanced.append(instance)
2965

    
2966
    feedback_fn("* Verifying orphan volumes")
2967
    reserved = utils.FieldSet(*cluster.reserved_lvs)
2968

    
2969
    # We will get spurious "unknown volume" warnings if any node of this group
2970
    # is secondary for an instance whose primary is in another group. To avoid
2971
    # them, we find these instances and add their volumes to node_vol_should.
2972
    for instance in self.all_inst_info.values():
2973
      for secondary in instance.secondary_nodes:
2974
        if (secondary in self.my_node_info
2975
            and instance.name not in self.my_inst_info):
2976
          instance.MapLVsByNode(node_vol_should)
2977
          break
2978

    
2979
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
2980

    
2981
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
2982
      feedback_fn("* Verifying N+1 Memory redundancy")
2983
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
2984

    
2985
    feedback_fn("* Other Notes")
2986
    if i_non_redundant:
2987
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
2988
                  % len(i_non_redundant))
2989

    
2990
    if i_non_a_balanced:
2991
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
2992
                  % len(i_non_a_balanced))
2993

    
2994
    if i_offline:
2995
      feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
2996

    
2997
    if n_offline:
2998
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
2999

    
3000
    if n_drained:
3001
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
3002

    
3003
    return not self.bad
3004

    
3005
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
3006
    """Analyze the post-hooks' result
3007

3008
    This method analyses the hook result, handles it, and sends some
3009
    nicely-formatted feedback back to the user.
3010

3011
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
3012
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
3013
    @param hooks_results: the results of the multi-node hooks rpc call
3014
    @param feedback_fn: function used send feedback back to the caller
3015
    @param lu_result: previous Exec result
3016
    @return: the new Exec result, based on the previous result
3017
        and hook results
3018

3019
    """
3020
    # We only really run POST phase hooks, only for non-empty groups,
3021
    # and are only interested in their results
3022
    if not self.my_node_uuids:
3023
      # empty node group
3024
      pass
3025
    elif phase == constants.HOOKS_PHASE_POST:
3026
      # Used to change hooks' output to proper indentation
3027
      feedback_fn("* Hooks Results")
3028
      assert hooks_results, "invalid result from hooks"
3029

    
3030
      for node_name in hooks_results:
3031
        res = hooks_results[node_name]
3032
        msg = res.fail_msg
3033
        test = msg and not res.offline
3034
        self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3035
                      "Communication failure in hooks execution: %s", msg)
3036
        if res.offline or msg:
3037
          # No need to investigate payload if node is offline or gave
3038
          # an error.
3039
          continue
3040
        for script, hkr, output in res.payload:
3041
          test = hkr == constants.HKR_FAIL
3042
          self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3043
                        "Script %s failed, output:", script)
3044
          if test:
3045
            output = self._HOOKS_INDENT_RE.sub("      ", output)
3046
            feedback_fn("%s" % output)
3047
            lu_result = False
3048

    
3049
    return lu_result
3050

    
3051

    
3052
class LUClusterVerifyDisks(NoHooksLU):
3053
  """Verifies the cluster disks status.
3054

3055
  """
3056
  REQ_BGL = False
3057

    
3058
  def ExpandNames(self):
3059
    self.share_locks = ShareAll()
3060
    self.needed_locks = {
3061
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
3062
      }
3063

    
3064
  def Exec(self, feedback_fn):
3065
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3066

    
3067
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3068
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3069
                           for group in group_names])