Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / cluster.py @ 29625e5a

History | View | Annotate | Download (123.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
# All rights reserved.
6
#
7
# Redistribution and use in source and binary forms, with or without
8
# modification, are permitted provided that the following conditions are
9
# met:
10
#
11
# 1. Redistributions of source code must retain the above copyright notice,
12
# this list of conditions and the following disclaimer.
13
#
14
# 2. Redistributions in binary form must reproduce the above copyright
15
# notice, this list of conditions and the following disclaimer in the
16
# documentation and/or other materials provided with the distribution.
17
#
18
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
19
# IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
20
# TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
22
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
23
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
24
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
25
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
27
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29

    
30

    
31
"""Logical units dealing with the cluster."""
32

    
33
import OpenSSL
34

    
35
import copy
36
import itertools
37
import logging
38
import operator
39
import os
40
import re
41
import time
42

    
43
from ganeti import compat
44
from ganeti import constants
45
from ganeti import errors
46
from ganeti import hypervisor
47
from ganeti import locking
48
from ganeti import masterd
49
from ganeti import netutils
50
from ganeti import objects
51
from ganeti import opcodes
52
from ganeti import pathutils
53
from ganeti import query
54
from ganeti import rpc
55
from ganeti import runtime
56
from ganeti import ssh
57
from ganeti import uidpool
58
from ganeti import utils
59
from ganeti import vcluster
60

    
61
from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
62
  ResultWithJobs
63
from ganeti.cmdlib.common import ShareAll, RunPostHook, \
64
  ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
65
  GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
66
  GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
67
  CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
68
  ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob, \
69
  CheckIpolicyVsDiskTemplates, CheckDiskAccessModeValidity, \
70
  CheckDiskAccessModeConsistency
71

    
72
import ganeti.masterd.instance
73

    
74

    
75
class LUClusterActivateMasterIp(NoHooksLU):
76
  """Activate the master IP on the master node.
77

78
  """
79
  def Exec(self, feedback_fn):
80
    """Activate the master IP.
81

82
    """
83
    master_params = self.cfg.GetMasterNetworkParameters()
84
    ems = self.cfg.GetUseExternalMipScript()
85
    result = self.rpc.call_node_activate_master_ip(master_params.uuid,
86
                                                   master_params, ems)
87
    result.Raise("Could not activate the master IP")
88

    
89

    
90
class LUClusterDeactivateMasterIp(NoHooksLU):
91
  """Deactivate the master IP on the master node.
92

93
  """
94
  def Exec(self, feedback_fn):
95
    """Deactivate the master IP.
96

97
    """
98
    master_params = self.cfg.GetMasterNetworkParameters()
99
    ems = self.cfg.GetUseExternalMipScript()
100
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
101
                                                     master_params, ems)
102
    result.Raise("Could not deactivate the master IP")
103

    
104

    
105
class LUClusterConfigQuery(NoHooksLU):
106
  """Return configuration values.
107

108
  """
109
  REQ_BGL = False
110

    
111
  def CheckArguments(self):
112
    self.cq = ClusterQuery(None, self.op.output_fields, False)
113

    
114
  def ExpandNames(self):
115
    self.cq.ExpandNames(self)
116

    
117
  def DeclareLocks(self, level):
118
    self.cq.DeclareLocks(self, level)
119

    
120
  def Exec(self, feedback_fn):
121
    result = self.cq.OldStyleQuery(self)
122

    
123
    assert len(result) == 1
124

    
125
    return result[0]
126

    
127

    
128
class LUClusterDestroy(LogicalUnit):
129
  """Logical unit for destroying the cluster.
130

131
  """
132
  HPATH = "cluster-destroy"
133
  HTYPE = constants.HTYPE_CLUSTER
134

    
135
  def BuildHooksEnv(self):
136
    """Build hooks env.
137

138
    """
139
    return {
140
      "OP_TARGET": self.cfg.GetClusterName(),
141
      }
142

    
143
  def BuildHooksNodes(self):
144
    """Build hooks nodes.
145

146
    """
147
    return ([], [])
148

    
149
  def CheckPrereq(self):
150
    """Check prerequisites.
151

152
    This checks whether the cluster is empty.
153

154
    Any errors are signaled by raising errors.OpPrereqError.
155

156
    """
157
    master = self.cfg.GetMasterNode()
158

    
159
    nodelist = self.cfg.GetNodeList()
160
    if len(nodelist) != 1 or nodelist[0] != master:
161
      raise errors.OpPrereqError("There are still %d node(s) in"
162
                                 " this cluster." % (len(nodelist) - 1),
163
                                 errors.ECODE_INVAL)
164
    instancelist = self.cfg.GetInstanceList()
165
    if instancelist:
166
      raise errors.OpPrereqError("There are still %d instance(s) in"
167
                                 " this cluster." % len(instancelist),
168
                                 errors.ECODE_INVAL)
169

    
170
  def Exec(self, feedback_fn):
171
    """Destroys the cluster.
172

173
    """
174
    master_params = self.cfg.GetMasterNetworkParameters()
175

    
176
    # Run post hooks on master node before it's removed
177
    RunPostHook(self, self.cfg.GetNodeName(master_params.uuid))
178

    
179
    ems = self.cfg.GetUseExternalMipScript()
180
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
181
                                                     master_params, ems)
182
    result.Warn("Error disabling the master IP address", self.LogWarning)
183
    return master_params.uuid
184

    
185

    
186
class LUClusterPostInit(LogicalUnit):
187
  """Logical unit for running hooks after cluster initialization.
188

189
  """
190
  HPATH = "cluster-init"
191
  HTYPE = constants.HTYPE_CLUSTER
192

    
193
  def CheckArguments(self):
194
    self.master_uuid = self.cfg.GetMasterNode()
195
    self.master_ndparams = self.cfg.GetNdParams(self.cfg.GetMasterNodeInfo())
196

    
197
    # TODO: When Issue 584 is solved, and None is properly parsed when used
198
    # as a default value, ndparams.get(.., None) can be changed to
199
    # ndparams[..] to access the values directly
200

    
201
    # OpenvSwitch: Warn user if link is missing
202
    if (self.master_ndparams[constants.ND_OVS] and not
203
        self.master_ndparams.get(constants.ND_OVS_LINK, None)):
204
      self.LogInfo("No physical interface for OpenvSwitch was given."
205
                   " OpenvSwitch will not have an outside connection. This"
206
                   " might not be what you want.")
207

    
208
  def BuildHooksEnv(self):
209
    """Build hooks env.
210

211
    """
212
    return {
213
      "OP_TARGET": self.cfg.GetClusterName(),
214
      }
215

    
216
  def BuildHooksNodes(self):
217
    """Build hooks nodes.
218

219
    """
220
    return ([], [self.cfg.GetMasterNode()])
221

    
222
  def Exec(self, feedback_fn):
223
    """Create and configure Open vSwitch
224

225
    """
226
    if self.master_ndparams[constants.ND_OVS]:
227
      result = self.rpc.call_node_configure_ovs(
228
                 self.master_uuid,
229
                 self.master_ndparams[constants.ND_OVS_NAME],
230
                 self.master_ndparams.get(constants.ND_OVS_LINK, None))
231
      result.Raise("Could not successully configure Open vSwitch")
232
    return True
233

    
234

    
235
class ClusterQuery(QueryBase):
236
  FIELDS = query.CLUSTER_FIELDS
237

    
238
  #: Do not sort (there is only one item)
239
  SORT_FIELD = None
240

    
241
  def ExpandNames(self, lu):
242
    lu.needed_locks = {}
243

    
244
    # The following variables interact with _QueryBase._GetNames
245
    self.wanted = locking.ALL_SET
246
    self.do_locking = self.use_locking
247

    
248
    if self.do_locking:
249
      raise errors.OpPrereqError("Can not use locking for cluster queries",
250
                                 errors.ECODE_INVAL)
251

    
252
  def DeclareLocks(self, lu, level):
253
    pass
254

    
255
  def _GetQueryData(self, lu):
256
    """Computes the list of nodes and their attributes.
257

258
    """
259
    # Locking is not used
260
    assert not (compat.any(lu.glm.is_owned(level)
261
                           for level in locking.LEVELS
262
                           if level != locking.LEVEL_CLUSTER) or
263
                self.do_locking or self.use_locking)
264

    
265
    if query.CQ_CONFIG in self.requested_data:
266
      cluster = lu.cfg.GetClusterInfo()
267
      nodes = lu.cfg.GetAllNodesInfo()
268
    else:
269
      cluster = NotImplemented
270
      nodes = NotImplemented
271

    
272
    if query.CQ_QUEUE_DRAINED in self.requested_data:
273
      drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
274
    else:
275
      drain_flag = NotImplemented
276

    
277
    if query.CQ_WATCHER_PAUSE in self.requested_data:
278
      master_node_uuid = lu.cfg.GetMasterNode()
279

    
280
      result = lu.rpc.call_get_watcher_pause(master_node_uuid)
281
      result.Raise("Can't retrieve watcher pause from master node '%s'" %
282
                   lu.cfg.GetMasterNodeName())
283

    
284
      watcher_pause = result.payload
285
    else:
286
      watcher_pause = NotImplemented
287

    
288
    return query.ClusterQueryData(cluster, nodes, drain_flag, watcher_pause)
289

    
290

    
291
class LUClusterQuery(NoHooksLU):
292
  """Query cluster configuration.
293

294
  """
295
  REQ_BGL = False
296

    
297
  def ExpandNames(self):
298
    self.needed_locks = {}
299

    
300
  def Exec(self, feedback_fn):
301
    """Return cluster config.
302

303
    """
304
    cluster = self.cfg.GetClusterInfo()
305
    os_hvp = {}
306

    
307
    # Filter just for enabled hypervisors
308
    for os_name, hv_dict in cluster.os_hvp.items():
309
      os_hvp[os_name] = {}
310
      for hv_name, hv_params in hv_dict.items():
311
        if hv_name in cluster.enabled_hypervisors:
312
          os_hvp[os_name][hv_name] = hv_params
313

    
314
    # Convert ip_family to ip_version
315
    primary_ip_version = constants.IP4_VERSION
316
    if cluster.primary_ip_family == netutils.IP6Address.family:
317
      primary_ip_version = constants.IP6_VERSION
318

    
319
    result = {
320
      "software_version": constants.RELEASE_VERSION,
321
      "protocol_version": constants.PROTOCOL_VERSION,
322
      "config_version": constants.CONFIG_VERSION,
323
      "os_api_version": max(constants.OS_API_VERSIONS),
324
      "export_version": constants.EXPORT_VERSION,
325
      "vcs_version": constants.VCS_VERSION,
326
      "architecture": runtime.GetArchInfo(),
327
      "name": cluster.cluster_name,
328
      "master": self.cfg.GetMasterNodeName(),
329
      "default_hypervisor": cluster.primary_hypervisor,
330
      "enabled_hypervisors": cluster.enabled_hypervisors,
331
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
332
                        for hypervisor_name in cluster.enabled_hypervisors]),
333
      "os_hvp": os_hvp,
334
      "beparams": cluster.beparams,
335
      "osparams": cluster.osparams,
336
      "ipolicy": cluster.ipolicy,
337
      "nicparams": cluster.nicparams,
338
      "ndparams": cluster.ndparams,
339
      "diskparams": cluster.diskparams,
340
      "candidate_pool_size": cluster.candidate_pool_size,
341
      "master_netdev": cluster.master_netdev,
342
      "master_netmask": cluster.master_netmask,
343
      "use_external_mip_script": cluster.use_external_mip_script,
344
      "volume_group_name": cluster.volume_group_name,
345
      "drbd_usermode_helper": cluster.drbd_usermode_helper,
346
      "file_storage_dir": cluster.file_storage_dir,
347
      "shared_file_storage_dir": cluster.shared_file_storage_dir,
348
      "maintain_node_health": cluster.maintain_node_health,
349
      "ctime": cluster.ctime,
350
      "mtime": cluster.mtime,
351
      "uuid": cluster.uuid,
352
      "tags": list(cluster.GetTags()),
353
      "uid_pool": cluster.uid_pool,
354
      "default_iallocator": cluster.default_iallocator,
355
      "reserved_lvs": cluster.reserved_lvs,
356
      "primary_ip_version": primary_ip_version,
357
      "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
358
      "hidden_os": cluster.hidden_os,
359
      "blacklisted_os": cluster.blacklisted_os,
360
      "enabled_disk_templates": cluster.enabled_disk_templates,
361
      }
362

    
363
    return result
364

    
365

    
366
class LUClusterRedistConf(NoHooksLU):
367
  """Force the redistribution of cluster configuration.
368

369
  This is a very simple LU.
370

371
  """
372
  REQ_BGL = False
373

    
374
  def ExpandNames(self):
375
    self.needed_locks = {
376
      locking.LEVEL_NODE: locking.ALL_SET,
377
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
378
    }
379
    self.share_locks = ShareAll()
380

    
381
  def Exec(self, feedback_fn):
382
    """Redistribute the configuration.
383

384
    """
385
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
386
    RedistributeAncillaryFiles(self)
387

    
388

    
389
class LUClusterRename(LogicalUnit):
390
  """Rename the cluster.
391

392
  """
393
  HPATH = "cluster-rename"
394
  HTYPE = constants.HTYPE_CLUSTER
395

    
396
  def BuildHooksEnv(self):
397
    """Build hooks env.
398

399
    """
400
    return {
401
      "OP_TARGET": self.cfg.GetClusterName(),
402
      "NEW_NAME": self.op.name,
403
      }
404

    
405
  def BuildHooksNodes(self):
406
    """Build hooks nodes.
407

408
    """
409
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
410

    
411
  def CheckPrereq(self):
412
    """Verify that the passed name is a valid one.
413

414
    """
415
    hostname = netutils.GetHostname(name=self.op.name,
416
                                    family=self.cfg.GetPrimaryIPFamily())
417

    
418
    new_name = hostname.name
419
    self.ip = new_ip = hostname.ip
420
    old_name = self.cfg.GetClusterName()
421
    old_ip = self.cfg.GetMasterIP()
422
    if new_name == old_name and new_ip == old_ip:
423
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
424
                                 " cluster has changed",
425
                                 errors.ECODE_INVAL)
426
    if new_ip != old_ip:
427
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
428
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
429
                                   " reachable on the network" %
430
                                   new_ip, errors.ECODE_NOTUNIQUE)
431

    
432
    self.op.name = new_name
433

    
434
  def Exec(self, feedback_fn):
435
    """Rename the cluster.
436

437
    """
438
    clustername = self.op.name
439
    new_ip = self.ip
440

    
441
    # shutdown the master IP
442
    master_params = self.cfg.GetMasterNetworkParameters()
443
    ems = self.cfg.GetUseExternalMipScript()
444
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
445
                                                     master_params, ems)
446
    result.Raise("Could not disable the master role")
447

    
448
    try:
449
      cluster = self.cfg.GetClusterInfo()
450
      cluster.cluster_name = clustername
451
      cluster.master_ip = new_ip
452
      self.cfg.Update(cluster, feedback_fn)
453

    
454
      # update the known hosts file
455
      ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
456
      node_list = self.cfg.GetOnlineNodeList()
457
      try:
458
        node_list.remove(master_params.uuid)
459
      except ValueError:
460
        pass
461
      UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
462
    finally:
463
      master_params.ip = new_ip
464
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
465
                                                     master_params, ems)
466
      result.Warn("Could not re-enable the master role on the master,"
467
                  " please restart manually", self.LogWarning)
468

    
469
    return clustername
470

    
471

    
472
class LUClusterRepairDiskSizes(NoHooksLU):
473
  """Verifies the cluster disks sizes.
474

475
  """
476
  REQ_BGL = False
477

    
478
  def ExpandNames(self):
479
    if self.op.instances:
480
      (_, self.wanted_names) = GetWantedInstances(self, self.op.instances)
481
      # Not getting the node allocation lock as only a specific set of
482
      # instances (and their nodes) is going to be acquired
483
      self.needed_locks = {
484
        locking.LEVEL_NODE_RES: [],
485
        locking.LEVEL_INSTANCE: self.wanted_names,
486
        }
487
      self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
488
    else:
489
      self.wanted_names = None
490
      self.needed_locks = {
491
        locking.LEVEL_NODE_RES: locking.ALL_SET,
492
        locking.LEVEL_INSTANCE: locking.ALL_SET,
493

    
494
        # This opcode is acquires the node locks for all instances
495
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
496
        }
497

    
498
    self.share_locks = {
499
      locking.LEVEL_NODE_RES: 1,
500
      locking.LEVEL_INSTANCE: 0,
501
      locking.LEVEL_NODE_ALLOC: 1,
502
      }
503

    
504
  def DeclareLocks(self, level):
505
    if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
506
      self._LockInstancesNodes(primary_only=True, level=level)
507

    
508
  def CheckPrereq(self):
509
    """Check prerequisites.
510

511
    This only checks the optional instance list against the existing names.
512

513
    """
514
    if self.wanted_names is None:
515
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
516

    
517
    self.wanted_instances = \
518
        map(compat.snd, self.cfg.GetMultiInstanceInfoByName(self.wanted_names))
519

    
520
  def _EnsureChildSizes(self, disk):
521
    """Ensure children of the disk have the needed disk size.
522

523
    This is valid mainly for DRBD8 and fixes an issue where the
524
    children have smaller disk size.
525

526
    @param disk: an L{ganeti.objects.Disk} object
527

528
    """
529
    if disk.dev_type == constants.DT_DRBD8:
530
      assert disk.children, "Empty children for DRBD8?"
531
      fchild = disk.children[0]
532
      mismatch = fchild.size < disk.size
533
      if mismatch:
534
        self.LogInfo("Child disk has size %d, parent %d, fixing",
535
                     fchild.size, disk.size)
536
        fchild.size = disk.size
537

    
538
      # and we recurse on this child only, not on the metadev
539
      return self._EnsureChildSizes(fchild) or mismatch
540
    else:
541
      return False
542

    
543
  def Exec(self, feedback_fn):
544
    """Verify the size of cluster disks.
545

546
    """
547
    # TODO: check child disks too
548
    # TODO: check differences in size between primary/secondary nodes
549
    per_node_disks = {}
550
    for instance in self.wanted_instances:
551
      pnode = instance.primary_node
552
      if pnode not in per_node_disks:
553
        per_node_disks[pnode] = []
554
      for idx, disk in enumerate(instance.disks):
555
        per_node_disks[pnode].append((instance, idx, disk))
556

    
557
    assert not (frozenset(per_node_disks.keys()) -
558
                self.owned_locks(locking.LEVEL_NODE_RES)), \
559
      "Not owning correct locks"
560
    assert not self.owned_locks(locking.LEVEL_NODE)
561

    
562
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
563
                                               per_node_disks.keys())
564

    
565
    changed = []
566
    for node_uuid, dskl in per_node_disks.items():
567
      if not dskl:
568
        # no disks on the node
569
        continue
570

    
571
      newl = [([v[2].Copy()], v[0]) for v in dskl]
572
      node_name = self.cfg.GetNodeName(node_uuid)
573
      result = self.rpc.call_blockdev_getdimensions(node_uuid, newl)
574
      if result.fail_msg:
575
        self.LogWarning("Failure in blockdev_getdimensions call to node"
576
                        " %s, ignoring", node_name)
577
        continue
578
      if len(result.payload) != len(dskl):
579
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
580
                        " result.payload=%s", node_name, len(dskl),
581
                        result.payload)
582
        self.LogWarning("Invalid result from node %s, ignoring node results",
583
                        node_name)
584
        continue
585
      for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
586
        if dimensions is None:
587
          self.LogWarning("Disk %d of instance %s did not return size"
588
                          " information, ignoring", idx, instance.name)
589
          continue
590
        if not isinstance(dimensions, (tuple, list)):
591
          self.LogWarning("Disk %d of instance %s did not return valid"
592
                          " dimension information, ignoring", idx,
593
                          instance.name)
594
          continue
595
        (size, spindles) = dimensions
596
        if not isinstance(size, (int, long)):
597
          self.LogWarning("Disk %d of instance %s did not return valid"
598
                          " size information, ignoring", idx, instance.name)
599
          continue
600
        size = size >> 20
601
        if size != disk.size:
602
          self.LogInfo("Disk %d of instance %s has mismatched size,"
603
                       " correcting: recorded %d, actual %d", idx,
604
                       instance.name, disk.size, size)
605
          disk.size = size
606
          self.cfg.Update(instance, feedback_fn)
607
          changed.append((instance.name, idx, "size", size))
608
        if es_flags[node_uuid]:
609
          if spindles is None:
610
            self.LogWarning("Disk %d of instance %s did not return valid"
611
                            " spindles information, ignoring", idx,
612
                            instance.name)
613
          elif disk.spindles is None or disk.spindles != spindles:
614
            self.LogInfo("Disk %d of instance %s has mismatched spindles,"
615
                         " correcting: recorded %s, actual %s",
616
                         idx, instance.name, disk.spindles, spindles)
617
            disk.spindles = spindles
618
            self.cfg.Update(instance, feedback_fn)
619
            changed.append((instance.name, idx, "spindles", disk.spindles))
620
        if self._EnsureChildSizes(disk):
621
          self.cfg.Update(instance, feedback_fn)
622
          changed.append((instance.name, idx, "size", disk.size))
623
    return changed
624

    
625

    
626
def _ValidateNetmask(cfg, netmask):
627
  """Checks if a netmask is valid.
628

629
  @type cfg: L{config.ConfigWriter}
630
  @param cfg: The cluster configuration
631
  @type netmask: int
632
  @param netmask: the netmask to be verified
633
  @raise errors.OpPrereqError: if the validation fails
634

635
  """
636
  ip_family = cfg.GetPrimaryIPFamily()
637
  try:
638
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
639
  except errors.ProgrammerError:
640
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
641
                               ip_family, errors.ECODE_INVAL)
642
  if not ipcls.ValidateNetmask(netmask):
643
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
644
                               (netmask), errors.ECODE_INVAL)
645

    
646

    
647
def CheckFileBasedStoragePathVsEnabledDiskTemplates(
648
    logging_warn_fn, file_storage_dir, enabled_disk_templates,
649
    file_disk_template):
650
  """Checks whether the given file-based storage directory is acceptable.
651

652
  Note: This function is public, because it is also used in bootstrap.py.
653

654
  @type logging_warn_fn: function
655
  @param logging_warn_fn: function which accepts a string and logs it
656
  @type file_storage_dir: string
657
  @param file_storage_dir: the directory to be used for file-based instances
658
  @type enabled_disk_templates: list of string
659
  @param enabled_disk_templates: the list of enabled disk templates
660
  @type file_disk_template: string
661
  @param file_disk_template: the file-based disk template for which the
662
      path should be checked
663

664
  """
665
  assert (file_disk_template in
666
          utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
667
  file_storage_enabled = file_disk_template in enabled_disk_templates
668
  if file_storage_dir is not None:
669
    if file_storage_dir == "":
670
      if file_storage_enabled:
671
        raise errors.OpPrereqError(
672
            "Unsetting the '%s' storage directory while having '%s' storage"
673
            " enabled is not permitted." %
674
            (file_disk_template, file_disk_template))
675
    else:
676
      if not file_storage_enabled:
677
        logging_warn_fn(
678
            "Specified a %s storage directory, although %s storage is not"
679
            " enabled." % (file_disk_template, file_disk_template))
680
  else:
681
    raise errors.ProgrammerError("Received %s storage dir with value"
682
                                 " 'None'." % file_disk_template)
683

    
684

    
685
def CheckFileStoragePathVsEnabledDiskTemplates(
686
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
687
  """Checks whether the given file storage directory is acceptable.
688

689
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
690

691
  """
692
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
693
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
694
      constants.DT_FILE)
695

    
696

    
697
def CheckSharedFileStoragePathVsEnabledDiskTemplates(
698
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
699
  """Checks whether the given shared file storage directory is acceptable.
700

701
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
702

703
  """
704
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
705
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
706
      constants.DT_SHARED_FILE)
707

    
708

    
709
class LUClusterSetParams(LogicalUnit):
710
  """Change the parameters of the cluster.
711

712
  """
713
  HPATH = "cluster-modify"
714
  HTYPE = constants.HTYPE_CLUSTER
715
  REQ_BGL = False
716

    
717
  def CheckArguments(self):
718
    """Check parameters
719

720
    """
721
    if self.op.uid_pool:
722
      uidpool.CheckUidPool(self.op.uid_pool)
723

    
724
    if self.op.add_uids:
725
      uidpool.CheckUidPool(self.op.add_uids)
726

    
727
    if self.op.remove_uids:
728
      uidpool.CheckUidPool(self.op.remove_uids)
729

    
730
    if self.op.master_netmask is not None:
731
      _ValidateNetmask(self.cfg, self.op.master_netmask)
732

    
733
    if self.op.diskparams:
734
      for dt_params in self.op.diskparams.values():
735
        utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
736
      try:
737
        utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
738
        CheckDiskAccessModeValidity(self.op.diskparams)
739
      except errors.OpPrereqError, err:
740
        raise errors.OpPrereqError("While verify diskparams options: %s" % err,
741
                                   errors.ECODE_INVAL)
742

    
743
  def ExpandNames(self):
744
    # FIXME: in the future maybe other cluster params won't require checking on
745
    # all nodes to be modified.
746
    # FIXME: This opcode changes cluster-wide settings. Is acquiring all
747
    # resource locks the right thing, shouldn't it be the BGL instead?
748
    self.needed_locks = {
749
      locking.LEVEL_NODE: locking.ALL_SET,
750
      locking.LEVEL_INSTANCE: locking.ALL_SET,
751
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
752
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
753
    }
754
    self.share_locks = ShareAll()
755

    
756
  def BuildHooksEnv(self):
757
    """Build hooks env.
758

759
    """
760
    return {
761
      "OP_TARGET": self.cfg.GetClusterName(),
762
      "NEW_VG_NAME": self.op.vg_name,
763
      }
764

    
765
  def BuildHooksNodes(self):
766
    """Build hooks nodes.
767

768
    """
769
    mn = self.cfg.GetMasterNode()
770
    return ([mn], [mn])
771

    
772
  def _CheckVgName(self, node_uuids, enabled_disk_templates,
773
                   new_enabled_disk_templates):
774
    """Check the consistency of the vg name on all nodes and in case it gets
775
       unset whether there are instances still using it.
776

777
    """
778
    lvm_is_enabled = utils.IsLvmEnabled(enabled_disk_templates)
779
    lvm_gets_enabled = utils.LvmGetsEnabled(enabled_disk_templates,
780
                                            new_enabled_disk_templates)
781
    current_vg_name = self.cfg.GetVGName()
782

    
783
    if self.op.vg_name == '':
784
      if lvm_is_enabled:
785
        raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
786
                                   " disk templates are or get enabled.")
787

    
788
    if self.op.vg_name is None:
789
      if current_vg_name is None and lvm_is_enabled:
790
        raise errors.OpPrereqError("Please specify a volume group when"
791
                                   " enabling lvm-based disk-templates.")
792

    
793
    if self.op.vg_name is not None and not self.op.vg_name:
794
      if self.cfg.HasAnyDiskOfType(constants.DT_PLAIN):
795
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
796
                                   " instances exist", errors.ECODE_INVAL)
797

    
798
    if (self.op.vg_name is not None and lvm_is_enabled) or \
799
        (self.cfg.GetVGName() is not None and lvm_gets_enabled):
800
      self._CheckVgNameOnNodes(node_uuids)
801

    
802
  def _CheckVgNameOnNodes(self, node_uuids):
803
    """Check the status of the volume group on each node.
804

805
    """
806
    vglist = self.rpc.call_vg_list(node_uuids)
807
    for node_uuid in node_uuids:
808
      msg = vglist[node_uuid].fail_msg
809
      if msg:
810
        # ignoring down node
811
        self.LogWarning("Error while gathering data on node %s"
812
                        " (ignoring node): %s",
813
                        self.cfg.GetNodeName(node_uuid), msg)
814
        continue
815
      vgstatus = utils.CheckVolumeGroupSize(vglist[node_uuid].payload,
816
                                            self.op.vg_name,
817
                                            constants.MIN_VG_SIZE)
818
      if vgstatus:
819
        raise errors.OpPrereqError("Error on node '%s': %s" %
820
                                   (self.cfg.GetNodeName(node_uuid), vgstatus),
821
                                   errors.ECODE_ENVIRON)
822

    
823
  @staticmethod
824
  def _GetDiskTemplateSetsInner(op_enabled_disk_templates,
825
                                old_enabled_disk_templates):
826
    """Computes three sets of disk templates.
827

828
    @see: C{_GetDiskTemplateSets} for more details.
829

830
    """
831
    enabled_disk_templates = None
832
    new_enabled_disk_templates = []
833
    disabled_disk_templates = []
834
    if op_enabled_disk_templates:
835
      enabled_disk_templates = op_enabled_disk_templates
836
      new_enabled_disk_templates = \
837
        list(set(enabled_disk_templates)
838
             - set(old_enabled_disk_templates))
839
      disabled_disk_templates = \
840
        list(set(old_enabled_disk_templates)
841
             - set(enabled_disk_templates))
842
    else:
843
      enabled_disk_templates = old_enabled_disk_templates
844
    return (enabled_disk_templates, new_enabled_disk_templates,
845
            disabled_disk_templates)
846

    
847
  def _GetDiskTemplateSets(self, cluster):
848
    """Computes three sets of disk templates.
849

850
    The three sets are:
851
      - disk templates that will be enabled after this operation (no matter if
852
        they were enabled before or not)
853
      - disk templates that get enabled by this operation (thus haven't been
854
        enabled before.)
855
      - disk templates that get disabled by this operation
856

857
    """
858
    return self._GetDiskTemplateSetsInner(self.op.enabled_disk_templates,
859
                                          cluster.enabled_disk_templates)
860

    
861
  def _CheckIpolicy(self, cluster, enabled_disk_templates):
862
    """Checks the ipolicy.
863

864
    @type cluster: C{objects.Cluster}
865
    @param cluster: the cluster's configuration
866
    @type enabled_disk_templates: list of string
867
    @param enabled_disk_templates: list of (possibly newly) enabled disk
868
      templates
869

870
    """
871
    # FIXME: write unit tests for this
872
    if self.op.ipolicy:
873
      self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
874
                                           group_policy=False)
875

    
876
      CheckIpolicyVsDiskTemplates(self.new_ipolicy,
877
                                  enabled_disk_templates)
878

    
879
      all_instances = self.cfg.GetAllInstancesInfo().values()
880
      violations = set()
881
      for group in self.cfg.GetAllNodeGroupsInfo().values():
882
        instances = frozenset([inst for inst in all_instances
883
                               if compat.any(nuuid in group.members
884
                                             for nuuid in inst.all_nodes)])
885
        new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
886
        ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
887
        new = ComputeNewInstanceViolations(ipol, new_ipolicy, instances,
888
                                           self.cfg)
889
        if new:
890
          violations.update(new)
891

    
892
      if violations:
893
        self.LogWarning("After the ipolicy change the following instances"
894
                        " violate them: %s",
895
                        utils.CommaJoin(utils.NiceSort(violations)))
896
    else:
897
      CheckIpolicyVsDiskTemplates(cluster.ipolicy,
898
                                  enabled_disk_templates)
899

    
900
  def _CheckDrbdHelperOnNodes(self, drbd_helper, node_uuids):
901
    """Checks whether the set DRBD helper actually exists on the nodes.
902

903
    @type drbd_helper: string
904
    @param drbd_helper: path of the drbd usermode helper binary
905
    @type node_uuids: list of strings
906
    @param node_uuids: list of node UUIDs to check for the helper
907

908
    """
909
    # checks given drbd helper on all nodes
910
    helpers = self.rpc.call_drbd_helper(node_uuids)
911
    for (_, ninfo) in self.cfg.GetMultiNodeInfo(node_uuids):
912
      if ninfo.offline:
913
        self.LogInfo("Not checking drbd helper on offline node %s",
914
                     ninfo.name)
915
        continue
916
      msg = helpers[ninfo.uuid].fail_msg
917
      if msg:
918
        raise errors.OpPrereqError("Error checking drbd helper on node"
919
                                   " '%s': %s" % (ninfo.name, msg),
920
                                   errors.ECODE_ENVIRON)
921
      node_helper = helpers[ninfo.uuid].payload
922
      if node_helper != drbd_helper:
923
        raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
924
                                   (ninfo.name, node_helper),
925
                                   errors.ECODE_ENVIRON)
926

    
927
  def _CheckDrbdHelper(self, node_uuids, drbd_enabled, drbd_gets_enabled):
928
    """Check the DRBD usermode helper.
929

930
    @type node_uuids: list of strings
931
    @param node_uuids: a list of nodes' UUIDs
932
    @type drbd_enabled: boolean
933
    @param drbd_enabled: whether DRBD will be enabled after this operation
934
      (no matter if it was disabled before or not)
935
    @type drbd_gets_enabled: boolen
936
    @param drbd_gets_enabled: true if DRBD was disabled before this
937
      operation, but will be enabled afterwards
938

939
    """
940
    if self.op.drbd_helper == '':
941
      if drbd_enabled:
942
        raise errors.OpPrereqError("Cannot disable drbd helper while"
943
                                   " DRBD is enabled.")
944
      if self.cfg.HasAnyDiskOfType(constants.DT_DRBD8):
945
        raise errors.OpPrereqError("Cannot disable drbd helper while"
946
                                   " drbd-based instances exist",
947
                                   errors.ECODE_INVAL)
948

    
949
    else:
950
      if self.op.drbd_helper is not None and drbd_enabled:
951
        self._CheckDrbdHelperOnNodes(self.op.drbd_helper, node_uuids)
952
      else:
953
        if drbd_gets_enabled:
954
          current_drbd_helper = self.cfg.GetClusterInfo().drbd_usermode_helper
955
          if current_drbd_helper is not None:
956
            self._CheckDrbdHelperOnNodes(current_drbd_helper, node_uuids)
957
          else:
958
            raise errors.OpPrereqError("Cannot enable DRBD without a"
959
                                       " DRBD usermode helper set.")
960

    
961
  def _CheckInstancesOfDisabledDiskTemplates(
962
      self, disabled_disk_templates):
963
    """Check whether we try to disable a disk template that is in use.
964

965
    @type disabled_disk_templates: list of string
966
    @param disabled_disk_templates: list of disk templates that are going to
967
      be disabled by this operation
968

969
    """
970
    for disk_template in disabled_disk_templates:
971
      if self.cfg.HasAnyDiskOfType(disk_template):
972
        raise errors.OpPrereqError(
973
            "Cannot disable disk template '%s', because there is at least one"
974
            " instance using it." % disk_template)
975

    
976
  def CheckPrereq(self):
977
    """Check prerequisites.
978

979
    This checks whether the given params don't conflict and
980
    if the given volume group is valid.
981

982
    """
983
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
984
    self.cluster = cluster = self.cfg.GetClusterInfo()
985

    
986
    vm_capable_node_uuids = [node.uuid
987
                             for node in self.cfg.GetAllNodesInfo().values()
988
                             if node.uuid in node_uuids and node.vm_capable]
989

    
990
    (enabled_disk_templates, new_enabled_disk_templates,
991
      disabled_disk_templates) = self._GetDiskTemplateSets(cluster)
992
    self._CheckInstancesOfDisabledDiskTemplates(disabled_disk_templates)
993

    
994
    self._CheckVgName(vm_capable_node_uuids, enabled_disk_templates,
995
                      new_enabled_disk_templates)
996

    
997
    if self.op.file_storage_dir is not None:
998
      CheckFileStoragePathVsEnabledDiskTemplates(
999
          self.LogWarning, self.op.file_storage_dir, enabled_disk_templates)
1000

    
1001
    if self.op.shared_file_storage_dir is not None:
1002
      CheckSharedFileStoragePathVsEnabledDiskTemplates(
1003
          self.LogWarning, self.op.shared_file_storage_dir,
1004
          enabled_disk_templates)
1005

    
1006
    drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
1007
    drbd_gets_enabled = constants.DT_DRBD8 in new_enabled_disk_templates
1008
    self._CheckDrbdHelper(vm_capable_node_uuids,
1009
                          drbd_enabled, drbd_gets_enabled)
1010

    
1011
    # validate params changes
1012
    if self.op.beparams:
1013
      objects.UpgradeBeParams(self.op.beparams)
1014
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1015
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
1016

    
1017
    if self.op.ndparams:
1018
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
1019
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
1020

    
1021
      # TODO: we need a more general way to handle resetting
1022
      # cluster-level parameters to default values
1023
      if self.new_ndparams["oob_program"] == "":
1024
        self.new_ndparams["oob_program"] = \
1025
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
1026

    
1027
    if self.op.hv_state:
1028
      new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
1029
                                           self.cluster.hv_state_static)
1030
      self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
1031
                               for hv, values in new_hv_state.items())
1032

    
1033
    if self.op.disk_state:
1034
      new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
1035
                                               self.cluster.disk_state_static)
1036
      self.new_disk_state = \
1037
        dict((storage, dict((name, cluster.SimpleFillDiskState(values))
1038
                            for name, values in svalues.items()))
1039
             for storage, svalues in new_disk_state.items())
1040

    
1041
    self._CheckIpolicy(cluster, enabled_disk_templates)
1042

    
1043
    if self.op.nicparams:
1044
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1045
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
1046
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
1047
      nic_errors = []
1048

    
1049
      # check all instances for consistency
1050
      for instance in self.cfg.GetAllInstancesInfo().values():
1051
        for nic_idx, nic in enumerate(instance.nics):
1052
          params_copy = copy.deepcopy(nic.nicparams)
1053
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
1054

    
1055
          # check parameter syntax
1056
          try:
1057
            objects.NIC.CheckParameterSyntax(params_filled)
1058
          except errors.ConfigurationError, err:
1059
            nic_errors.append("Instance %s, nic/%d: %s" %
1060
                              (instance.name, nic_idx, err))
1061

    
1062
          # if we're moving instances to routed, check that they have an ip
1063
          target_mode = params_filled[constants.NIC_MODE]
1064
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
1065
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
1066
                              " address" % (instance.name, nic_idx))
1067
      if nic_errors:
1068
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
1069
                                   "\n".join(nic_errors), errors.ECODE_INVAL)
1070

    
1071
    # hypervisor list/parameters
1072
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
1073
    if self.op.hvparams:
1074
      for hv_name, hv_dict in self.op.hvparams.items():
1075
        if hv_name not in self.new_hvparams:
1076
          self.new_hvparams[hv_name] = hv_dict
1077
        else:
1078
          self.new_hvparams[hv_name].update(hv_dict)
1079

    
1080
    # disk template parameters
1081
    self.new_diskparams = objects.FillDict(cluster.diskparams, {})
1082
    if self.op.diskparams:
1083
      for dt_name, dt_params in self.op.diskparams.items():
1084
        if dt_name not in self.new_diskparams:
1085
          self.new_diskparams[dt_name] = dt_params
1086
        else:
1087
          self.new_diskparams[dt_name].update(dt_params)
1088
      CheckDiskAccessModeConsistency(self.op.diskparams, self.cfg)
1089

    
1090
    # os hypervisor parameters
1091
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
1092
    if self.op.os_hvp:
1093
      for os_name, hvs in self.op.os_hvp.items():
1094
        if os_name not in self.new_os_hvp:
1095
          self.new_os_hvp[os_name] = hvs
1096
        else:
1097
          for hv_name, hv_dict in hvs.items():
1098
            if hv_dict is None:
1099
              # Delete if it exists
1100
              self.new_os_hvp[os_name].pop(hv_name, None)
1101
            elif hv_name not in self.new_os_hvp[os_name]:
1102
              self.new_os_hvp[os_name][hv_name] = hv_dict
1103
            else:
1104
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
1105

    
1106
    # os parameters
1107
    self.new_osp = objects.FillDict(cluster.osparams, {})
1108
    if self.op.osparams:
1109
      for os_name, osp in self.op.osparams.items():
1110
        if os_name not in self.new_osp:
1111
          self.new_osp[os_name] = {}
1112

    
1113
        self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
1114
                                                 use_none=True)
1115

    
1116
        if not self.new_osp[os_name]:
1117
          # we removed all parameters
1118
          del self.new_osp[os_name]
1119
        else:
1120
          # check the parameter validity (remote check)
1121
          CheckOSParams(self, False, [self.cfg.GetMasterNode()],
1122
                        os_name, self.new_osp[os_name])
1123

    
1124
    # changes to the hypervisor list
1125
    if self.op.enabled_hypervisors is not None:
1126
      self.hv_list = self.op.enabled_hypervisors
1127
      for hv in self.hv_list:
1128
        # if the hypervisor doesn't already exist in the cluster
1129
        # hvparams, we initialize it to empty, and then (in both
1130
        # cases) we make sure to fill the defaults, as we might not
1131
        # have a complete defaults list if the hypervisor wasn't
1132
        # enabled before
1133
        if hv not in new_hvp:
1134
          new_hvp[hv] = {}
1135
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
1136
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
1137
    else:
1138
      self.hv_list = cluster.enabled_hypervisors
1139

    
1140
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1141
      # either the enabled list has changed, or the parameters have, validate
1142
      for hv_name, hv_params in self.new_hvparams.items():
1143
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1144
            (self.op.enabled_hypervisors and
1145
             hv_name in self.op.enabled_hypervisors)):
1146
          # either this is a new hypervisor, or its parameters have changed
1147
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1148
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1149
          hv_class.CheckParameterSyntax(hv_params)
1150
          CheckHVParams(self, node_uuids, hv_name, hv_params)
1151

    
1152
    self._CheckDiskTemplateConsistency()
1153

    
1154
    if self.op.os_hvp:
1155
      # no need to check any newly-enabled hypervisors, since the
1156
      # defaults have already been checked in the above code-block
1157
      for os_name, os_hvp in self.new_os_hvp.items():
1158
        for hv_name, hv_params in os_hvp.items():
1159
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1160
          # we need to fill in the new os_hvp on top of the actual hv_p
1161
          cluster_defaults = self.new_hvparams.get(hv_name, {})
1162
          new_osp = objects.FillDict(cluster_defaults, hv_params)
1163
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1164
          hv_class.CheckParameterSyntax(new_osp)
1165
          CheckHVParams(self, node_uuids, hv_name, new_osp)
1166

    
1167
    if self.op.default_iallocator:
1168
      alloc_script = utils.FindFile(self.op.default_iallocator,
1169
                                    constants.IALLOCATOR_SEARCH_PATH,
1170
                                    os.path.isfile)
1171
      if alloc_script is None:
1172
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
1173
                                   " specified" % self.op.default_iallocator,
1174
                                   errors.ECODE_INVAL)
1175

    
1176
  def _CheckDiskTemplateConsistency(self):
1177
    """Check whether the disk templates that are going to be disabled
1178
       are still in use by some instances.
1179

1180
    """
1181
    if self.op.enabled_disk_templates:
1182
      cluster = self.cfg.GetClusterInfo()
1183
      instances = self.cfg.GetAllInstancesInfo()
1184

    
1185
      disk_templates_to_remove = set(cluster.enabled_disk_templates) \
1186
        - set(self.op.enabled_disk_templates)
1187
      for instance in instances.itervalues():
1188
        if instance.disk_template in disk_templates_to_remove:
1189
          raise errors.OpPrereqError("Cannot disable disk template '%s',"
1190
                                     " because instance '%s' is using it." %
1191
                                     (instance.disk_template, instance.name))
1192

    
1193
  def _SetVgName(self, feedback_fn):
1194
    """Determines and sets the new volume group name.
1195

1196
    """
1197
    if self.op.vg_name is not None:
1198
      new_volume = self.op.vg_name
1199
      if not new_volume:
1200
        new_volume = None
1201
      if new_volume != self.cfg.GetVGName():
1202
        self.cfg.SetVGName(new_volume)
1203
      else:
1204
        feedback_fn("Cluster LVM configuration already in desired"
1205
                    " state, not changing")
1206

    
1207
  def _SetFileStorageDir(self, feedback_fn):
1208
    """Set the file storage directory.
1209

1210
    """
1211
    if self.op.file_storage_dir is not None:
1212
      if self.cluster.file_storage_dir == self.op.file_storage_dir:
1213
        feedback_fn("Global file storage dir already set to value '%s'"
1214
                    % self.cluster.file_storage_dir)
1215
      else:
1216
        self.cluster.file_storage_dir = self.op.file_storage_dir
1217

    
1218
  def _SetSharedFileStorageDir(self, feedback_fn):
1219
    """Set the shared file storage directory.
1220

1221
    """
1222
    if self.op.shared_file_storage_dir is not None:
1223
      if self.cluster.shared_file_storage_dir == \
1224
          self.op.shared_file_storage_dir:
1225
        feedback_fn("Global shared file storage dir already set to value '%s'"
1226
                    % self.cluster.shared_file_storage_dir)
1227
      else:
1228
        self.cluster.shared_file_storage_dir = self.op.shared_file_storage_dir
1229

    
1230
  def _SetDrbdHelper(self, feedback_fn):
1231
    """Set the DRBD usermode helper.
1232

1233
    """
1234
    if self.op.drbd_helper is not None:
1235
      if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
1236
        feedback_fn("Note that you specified a drbd user helper, but did not"
1237
                    " enable the drbd disk template.")
1238
      new_helper = self.op.drbd_helper
1239
      if not new_helper:
1240
        new_helper = None
1241
      if new_helper != self.cfg.GetDRBDHelper():
1242
        self.cfg.SetDRBDHelper(new_helper)
1243
      else:
1244
        feedback_fn("Cluster DRBD helper already in desired state,"
1245
                    " not changing")
1246

    
1247
  def Exec(self, feedback_fn):
1248
    """Change the parameters of the cluster.
1249

1250
    """
1251
    if self.op.enabled_disk_templates:
1252
      self.cluster.enabled_disk_templates = \
1253
        list(self.op.enabled_disk_templates)
1254

    
1255
    self._SetVgName(feedback_fn)
1256
    self._SetFileStorageDir(feedback_fn)
1257
    self._SetSharedFileStorageDir(feedback_fn)
1258
    self._SetDrbdHelper(feedback_fn)
1259

    
1260
    if self.op.hvparams:
1261
      self.cluster.hvparams = self.new_hvparams
1262
    if self.op.os_hvp:
1263
      self.cluster.os_hvp = self.new_os_hvp
1264
    if self.op.enabled_hypervisors is not None:
1265
      self.cluster.hvparams = self.new_hvparams
1266
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1267
    if self.op.beparams:
1268
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1269
    if self.op.nicparams:
1270
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1271
    if self.op.ipolicy:
1272
      self.cluster.ipolicy = self.new_ipolicy
1273
    if self.op.osparams:
1274
      self.cluster.osparams = self.new_osp
1275
    if self.op.ndparams:
1276
      self.cluster.ndparams = self.new_ndparams
1277
    if self.op.diskparams:
1278
      self.cluster.diskparams = self.new_diskparams
1279
    if self.op.hv_state:
1280
      self.cluster.hv_state_static = self.new_hv_state
1281
    if self.op.disk_state:
1282
      self.cluster.disk_state_static = self.new_disk_state
1283

    
1284
    if self.op.candidate_pool_size is not None:
1285
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1286
      # we need to update the pool size here, otherwise the save will fail
1287
      AdjustCandidatePool(self, [])
1288

    
1289
    if self.op.maintain_node_health is not None:
1290
      if self.op.maintain_node_health and not constants.ENABLE_CONFD:
1291
        feedback_fn("Note: CONFD was disabled at build time, node health"
1292
                    " maintenance is not useful (still enabling it)")
1293
      self.cluster.maintain_node_health = self.op.maintain_node_health
1294

    
1295
    if self.op.modify_etc_hosts is not None:
1296
      self.cluster.modify_etc_hosts = self.op.modify_etc_hosts
1297

    
1298
    if self.op.prealloc_wipe_disks is not None:
1299
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
1300

    
1301
    if self.op.add_uids is not None:
1302
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
1303

    
1304
    if self.op.remove_uids is not None:
1305
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
1306

    
1307
    if self.op.uid_pool is not None:
1308
      self.cluster.uid_pool = self.op.uid_pool
1309

    
1310
    if self.op.default_iallocator is not None:
1311
      self.cluster.default_iallocator = self.op.default_iallocator
1312

    
1313
    if self.op.reserved_lvs is not None:
1314
      self.cluster.reserved_lvs = self.op.reserved_lvs
1315

    
1316
    if self.op.use_external_mip_script is not None:
1317
      self.cluster.use_external_mip_script = self.op.use_external_mip_script
1318

    
1319
    def helper_os(aname, mods, desc):
1320
      desc += " OS list"
1321
      lst = getattr(self.cluster, aname)
1322
      for key, val in mods:
1323
        if key == constants.DDM_ADD:
1324
          if val in lst:
1325
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1326
          else:
1327
            lst.append(val)
1328
        elif key == constants.DDM_REMOVE:
1329
          if val in lst:
1330
            lst.remove(val)
1331
          else:
1332
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1333
        else:
1334
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
1335

    
1336
    if self.op.hidden_os:
1337
      helper_os("hidden_os", self.op.hidden_os, "hidden")
1338

    
1339
    if self.op.blacklisted_os:
1340
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1341

    
1342
    if self.op.master_netdev:
1343
      master_params = self.cfg.GetMasterNetworkParameters()
1344
      ems = self.cfg.GetUseExternalMipScript()
1345
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
1346
                  self.cluster.master_netdev)
1347
      result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
1348
                                                       master_params, ems)
1349
      if not self.op.force:
1350
        result.Raise("Could not disable the master ip")
1351
      else:
1352
        if result.fail_msg:
1353
          msg = ("Could not disable the master ip (continuing anyway): %s" %
1354
                 result.fail_msg)
1355
          feedback_fn(msg)
1356
      feedback_fn("Changing master_netdev from %s to %s" %
1357
                  (master_params.netdev, self.op.master_netdev))
1358
      self.cluster.master_netdev = self.op.master_netdev
1359

    
1360
    if self.op.master_netmask:
1361
      master_params = self.cfg.GetMasterNetworkParameters()
1362
      feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1363
      result = self.rpc.call_node_change_master_netmask(
1364
                 master_params.uuid, master_params.netmask,
1365
                 self.op.master_netmask, master_params.ip,
1366
                 master_params.netdev)
1367
      result.Warn("Could not change the master IP netmask", feedback_fn)
1368
      self.cluster.master_netmask = self.op.master_netmask
1369

    
1370
    self.cfg.Update(self.cluster, feedback_fn)
1371

    
1372
    if self.op.master_netdev:
1373
      master_params = self.cfg.GetMasterNetworkParameters()
1374
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
1375
                  self.op.master_netdev)
1376
      ems = self.cfg.GetUseExternalMipScript()
1377
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
1378
                                                     master_params, ems)
1379
      result.Warn("Could not re-enable the master ip on the master,"
1380
                  " please restart manually", self.LogWarning)
1381

    
1382

    
1383
class LUClusterVerify(NoHooksLU):
1384
  """Submits all jobs necessary to verify the cluster.
1385

1386
  """
1387
  REQ_BGL = False
1388

    
1389
  def ExpandNames(self):
1390
    self.needed_locks = {}
1391

    
1392
  def Exec(self, feedback_fn):
1393
    jobs = []
1394

    
1395
    if self.op.group_name:
1396
      groups = [self.op.group_name]
1397
      depends_fn = lambda: None
1398
    else:
1399
      groups = self.cfg.GetNodeGroupList()
1400

    
1401
      # Verify global configuration
1402
      jobs.append([
1403
        opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1404
        ])
1405

    
1406
      # Always depend on global verification
1407
      depends_fn = lambda: [(-len(jobs), [])]
1408

    
1409
    jobs.extend(
1410
      [opcodes.OpClusterVerifyGroup(group_name=group,
1411
                                    ignore_errors=self.op.ignore_errors,
1412
                                    depends=depends_fn())]
1413
      for group in groups)
1414

    
1415
    # Fix up all parameters
1416
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1417
      op.debug_simulate_errors = self.op.debug_simulate_errors
1418
      op.verbose = self.op.verbose
1419
      op.error_codes = self.op.error_codes
1420
      try:
1421
        op.skip_checks = self.op.skip_checks
1422
      except AttributeError:
1423
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1424

    
1425
    return ResultWithJobs(jobs)
1426

    
1427

    
1428
class _VerifyErrors(object):
1429
  """Mix-in for cluster/group verify LUs.
1430

1431
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1432
  self.op and self._feedback_fn to be available.)
1433

1434
  """
1435

    
1436
  ETYPE_FIELD = "code"
1437
  ETYPE_ERROR = "ERROR"
1438
  ETYPE_WARNING = "WARNING"
1439

    
1440
  def _Error(self, ecode, item, msg, *args, **kwargs):
1441
    """Format an error message.
1442

1443
    Based on the opcode's error_codes parameter, either format a
1444
    parseable error code, or a simpler error string.
1445

1446
    This must be called only from Exec and functions called from Exec.
1447

1448
    """
1449
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1450
    itype, etxt, _ = ecode
1451
    # If the error code is in the list of ignored errors, demote the error to a
1452
    # warning
1453
    if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1454
      ltype = self.ETYPE_WARNING
1455
    # first complete the msg
1456
    if args:
1457
      msg = msg % args
1458
    # then format the whole message
1459
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1460
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1461
    else:
1462
      if item:
1463
        item = " " + item
1464
      else:
1465
        item = ""
1466
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1467
    # and finally report it via the feedback_fn
1468
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1469
    # do not mark the operation as failed for WARN cases only
1470
    if ltype == self.ETYPE_ERROR:
1471
      self.bad = True
1472

    
1473
  def _ErrorIf(self, cond, *args, **kwargs):
1474
    """Log an error message if the passed condition is True.
1475

1476
    """
1477
    if (bool(cond)
1478
        or self.op.debug_simulate_errors): # pylint: disable=E1101
1479
      self._Error(*args, **kwargs)
1480

    
1481

    
1482
def _VerifyCertificate(filename):
1483
  """Verifies a certificate for L{LUClusterVerifyConfig}.
1484

1485
  @type filename: string
1486
  @param filename: Path to PEM file
1487

1488
  """
1489
  try:
1490
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1491
                                           utils.ReadFile(filename))
1492
  except Exception, err: # pylint: disable=W0703
1493
    return (LUClusterVerifyConfig.ETYPE_ERROR,
1494
            "Failed to load X509 certificate %s: %s" % (filename, err))
1495

    
1496
  (errcode, msg) = \
1497
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1498
                                constants.SSL_CERT_EXPIRATION_ERROR)
1499

    
1500
  if msg:
1501
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1502
  else:
1503
    fnamemsg = None
1504

    
1505
  if errcode is None:
1506
    return (None, fnamemsg)
1507
  elif errcode == utils.CERT_WARNING:
1508
    return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1509
  elif errcode == utils.CERT_ERROR:
1510
    return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1511

    
1512
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1513

    
1514

    
1515
def _GetAllHypervisorParameters(cluster, instances):
1516
  """Compute the set of all hypervisor parameters.
1517

1518
  @type cluster: L{objects.Cluster}
1519
  @param cluster: the cluster object
1520
  @param instances: list of L{objects.Instance}
1521
  @param instances: additional instances from which to obtain parameters
1522
  @rtype: list of (origin, hypervisor, parameters)
1523
  @return: a list with all parameters found, indicating the hypervisor they
1524
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1525

1526
  """
1527
  hvp_data = []
1528

    
1529
  for hv_name in cluster.enabled_hypervisors:
1530
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1531

    
1532
  for os_name, os_hvp in cluster.os_hvp.items():
1533
    for hv_name, hv_params in os_hvp.items():
1534
      if hv_params:
1535
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1536
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1537

    
1538
  # TODO: collapse identical parameter values in a single one
1539
  for instance in instances:
1540
    if instance.hvparams:
1541
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1542
                       cluster.FillHV(instance)))
1543

    
1544
  return hvp_data
1545

    
1546

    
1547
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1548
  """Verifies the cluster config.
1549

1550
  """
1551
  REQ_BGL = False
1552

    
1553
  def _VerifyHVP(self, hvp_data):
1554
    """Verifies locally the syntax of the hypervisor parameters.
1555

1556
    """
1557
    for item, hv_name, hv_params in hvp_data:
1558
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1559
             (item, hv_name))
1560
      try:
1561
        hv_class = hypervisor.GetHypervisorClass(hv_name)
1562
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1563
        hv_class.CheckParameterSyntax(hv_params)
1564
      except errors.GenericError, err:
1565
        self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1566

    
1567
  def ExpandNames(self):
1568
    self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1569
    self.share_locks = ShareAll()
1570

    
1571
  def CheckPrereq(self):
1572
    """Check prerequisites.
1573

1574
    """
1575
    # Retrieve all information
1576
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1577
    self.all_node_info = self.cfg.GetAllNodesInfo()
1578
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1579

    
1580
  def Exec(self, feedback_fn):
1581
    """Verify integrity of cluster, performing various test on nodes.
1582

1583
    """
1584
    self.bad = False
1585
    self._feedback_fn = feedback_fn
1586

    
1587
    feedback_fn("* Verifying cluster config")
1588

    
1589
    for msg in self.cfg.VerifyConfig():
1590
      self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1591

    
1592
    feedback_fn("* Verifying cluster certificate files")
1593

    
1594
    for cert_filename in pathutils.ALL_CERT_FILES:
1595
      (errcode, msg) = _VerifyCertificate(cert_filename)
1596
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1597

    
1598
    self._ErrorIf(not utils.CanRead(constants.LUXID_USER,
1599
                                    pathutils.NODED_CERT_FILE),
1600
                  constants.CV_ECLUSTERCERT,
1601
                  None,
1602
                  pathutils.NODED_CERT_FILE + " must be accessible by the " +
1603
                    constants.LUXID_USER + " user")
1604

    
1605
    feedback_fn("* Verifying hypervisor parameters")
1606

    
1607
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1608
                                                self.all_inst_info.values()))
1609

    
1610
    feedback_fn("* Verifying all nodes belong to an existing group")
1611

    
1612
    # We do this verification here because, should this bogus circumstance
1613
    # occur, it would never be caught by VerifyGroup, which only acts on
1614
    # nodes/instances reachable from existing node groups.
1615

    
1616
    dangling_nodes = set(node for node in self.all_node_info.values()
1617
                         if node.group not in self.all_group_info)
1618

    
1619
    dangling_instances = {}
1620
    no_node_instances = []
1621

    
1622
    for inst in self.all_inst_info.values():
1623
      if inst.primary_node in [node.uuid for node in dangling_nodes]:
1624
        dangling_instances.setdefault(inst.primary_node, []).append(inst)
1625
      elif inst.primary_node not in self.all_node_info:
1626
        no_node_instances.append(inst)
1627

    
1628
    pretty_dangling = [
1629
        "%s (%s)" %
1630
        (node.name,
1631
         utils.CommaJoin(inst.name for
1632
                         inst in dangling_instances.get(node.uuid, [])))
1633
        for node in dangling_nodes]
1634

    
1635
    self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1636
                  None,
1637
                  "the following nodes (and their instances) belong to a non"
1638
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1639

    
1640
    self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1641
                  None,
1642
                  "the following instances have a non-existing primary-node:"
1643
                  " %s", utils.CommaJoin(inst.name for
1644
                                         inst in no_node_instances))
1645

    
1646
    return not self.bad
1647

    
1648

    
1649
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1650
  """Verifies the status of a node group.
1651

1652
  """
1653
  HPATH = "cluster-verify"
1654
  HTYPE = constants.HTYPE_CLUSTER
1655
  REQ_BGL = False
1656

    
1657
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1658

    
1659
  class NodeImage(object):
1660
    """A class representing the logical and physical status of a node.
1661

1662
    @type uuid: string
1663
    @ivar uuid: the node UUID to which this object refers
1664
    @ivar volumes: a structure as returned from
1665
        L{ganeti.backend.GetVolumeList} (runtime)
1666
    @ivar instances: a list of running instances (runtime)
1667
    @ivar pinst: list of configured primary instances (config)
1668
    @ivar sinst: list of configured secondary instances (config)
1669
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1670
        instances for which this node is secondary (config)
1671
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1672
    @ivar dfree: free disk, as reported by the node (runtime)
1673
    @ivar offline: the offline status (config)
1674
    @type rpc_fail: boolean
1675
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1676
        not whether the individual keys were correct) (runtime)
1677
    @type lvm_fail: boolean
1678
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1679
    @type hyp_fail: boolean
1680
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1681
    @type ghost: boolean
1682
    @ivar ghost: whether this is a known node or not (config)
1683
    @type os_fail: boolean
1684
    @ivar os_fail: whether the RPC call didn't return valid OS data
1685
    @type oslist: list
1686
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1687
    @type vm_capable: boolean
1688
    @ivar vm_capable: whether the node can host instances
1689
    @type pv_min: float
1690
    @ivar pv_min: size in MiB of the smallest PVs
1691
    @type pv_max: float
1692
    @ivar pv_max: size in MiB of the biggest PVs
1693

1694
    """
1695
    def __init__(self, offline=False, uuid=None, vm_capable=True):
1696
      self.uuid = uuid
1697
      self.volumes = {}
1698
      self.instances = []
1699
      self.pinst = []
1700
      self.sinst = []
1701
      self.sbp = {}
1702
      self.mfree = 0
1703
      self.dfree = 0
1704
      self.offline = offline
1705
      self.vm_capable = vm_capable
1706
      self.rpc_fail = False
1707
      self.lvm_fail = False
1708
      self.hyp_fail = False
1709
      self.ghost = False
1710
      self.os_fail = False
1711
      self.oslist = {}
1712
      self.pv_min = None
1713
      self.pv_max = None
1714

    
1715
  def ExpandNames(self):
1716
    # This raises errors.OpPrereqError on its own:
1717
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1718

    
1719
    # Get instances in node group; this is unsafe and needs verification later
1720
    inst_uuids = \
1721
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1722

    
1723
    self.needed_locks = {
1724
      locking.LEVEL_INSTANCE: self.cfg.GetInstanceNames(inst_uuids),
1725
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1726
      locking.LEVEL_NODE: [],
1727

    
1728
      # This opcode is run by watcher every five minutes and acquires all nodes
1729
      # for a group. It doesn't run for a long time, so it's better to acquire
1730
      # the node allocation lock as well.
1731
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1732
      }
1733

    
1734
    self.share_locks = ShareAll()
1735

    
1736
  def DeclareLocks(self, level):
1737
    if level == locking.LEVEL_NODE:
1738
      # Get members of node group; this is unsafe and needs verification later
1739
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1740

    
1741
      # In Exec(), we warn about mirrored instances that have primary and
1742
      # secondary living in separate node groups. To fully verify that
1743
      # volumes for these instances are healthy, we will need to do an
1744
      # extra call to their secondaries. We ensure here those nodes will
1745
      # be locked.
1746
      for inst_name in self.owned_locks(locking.LEVEL_INSTANCE):
1747
        # Important: access only the instances whose lock is owned
1748
        instance = self.cfg.GetInstanceInfoByName(inst_name)
1749
        if instance.disk_template in constants.DTS_INT_MIRROR:
1750
          nodes.update(instance.secondary_nodes)
1751

    
1752
      self.needed_locks[locking.LEVEL_NODE] = nodes
1753

    
1754
  def CheckPrereq(self):
1755
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1756
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1757

    
1758
    group_node_uuids = set(self.group_info.members)
1759
    group_inst_uuids = \
1760
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1761

    
1762
    unlocked_node_uuids = \
1763
        group_node_uuids.difference(self.owned_locks(locking.LEVEL_NODE))
1764

    
1765
    unlocked_inst_uuids = \
1766
        group_inst_uuids.difference(
1767
          [self.cfg.GetInstanceInfoByName(name).uuid
1768
           for name in self.owned_locks(locking.LEVEL_INSTANCE)])
1769

    
1770
    if unlocked_node_uuids:
1771
      raise errors.OpPrereqError(
1772
        "Missing lock for nodes: %s" %
1773
        utils.CommaJoin(self.cfg.GetNodeNames(unlocked_node_uuids)),
1774
        errors.ECODE_STATE)
1775

    
1776
    if unlocked_inst_uuids:
1777
      raise errors.OpPrereqError(
1778
        "Missing lock for instances: %s" %
1779
        utils.CommaJoin(self.cfg.GetInstanceNames(unlocked_inst_uuids)),
1780
        errors.ECODE_STATE)
1781

    
1782
    self.all_node_info = self.cfg.GetAllNodesInfo()
1783
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1784

    
1785
    self.my_node_uuids = group_node_uuids
1786
    self.my_node_info = dict((node_uuid, self.all_node_info[node_uuid])
1787
                             for node_uuid in group_node_uuids)
1788

    
1789
    self.my_inst_uuids = group_inst_uuids
1790
    self.my_inst_info = dict((inst_uuid, self.all_inst_info[inst_uuid])
1791
                             for inst_uuid in group_inst_uuids)
1792

    
1793
    # We detect here the nodes that will need the extra RPC calls for verifying
1794
    # split LV volumes; they should be locked.
1795
    extra_lv_nodes = set()
1796

    
1797
    for inst in self.my_inst_info.values():
1798
      if inst.disk_template in constants.DTS_INT_MIRROR:
1799
        for nuuid in inst.all_nodes:
1800
          if self.all_node_info[nuuid].group != self.group_uuid:
1801
            extra_lv_nodes.add(nuuid)
1802

    
1803
    unlocked_lv_nodes = \
1804
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1805

    
1806
    if unlocked_lv_nodes:
1807
      raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1808
                                 utils.CommaJoin(unlocked_lv_nodes),
1809
                                 errors.ECODE_STATE)
1810
    self.extra_lv_nodes = list(extra_lv_nodes)
1811

    
1812
  def _VerifyNode(self, ninfo, nresult):
1813
    """Perform some basic validation on data returned from a node.
1814

1815
      - check the result data structure is well formed and has all the
1816
        mandatory fields
1817
      - check ganeti version
1818

1819
    @type ninfo: L{objects.Node}
1820
    @param ninfo: the node to check
1821
    @param nresult: the results from the node
1822
    @rtype: boolean
1823
    @return: whether overall this call was successful (and we can expect
1824
         reasonable values in the respose)
1825

1826
    """
1827
    # main result, nresult should be a non-empty dict
1828
    test = not nresult or not isinstance(nresult, dict)
1829
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1830
                  "unable to verify node: no data returned")
1831
    if test:
1832
      return False
1833

    
1834
    # compares ganeti version
1835
    local_version = constants.PROTOCOL_VERSION
1836
    remote_version = nresult.get("version", None)
1837
    test = not (remote_version and
1838
                isinstance(remote_version, (list, tuple)) and
1839
                len(remote_version) == 2)
1840
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1841
                  "connection to node returned invalid data")
1842
    if test:
1843
      return False
1844

    
1845
    test = local_version != remote_version[0]
1846
    self._ErrorIf(test, constants.CV_ENODEVERSION, ninfo.name,
1847
                  "incompatible protocol versions: master %s,"
1848
                  " node %s", local_version, remote_version[0])
1849
    if test:
1850
      return False
1851

    
1852
    # node seems compatible, we can actually try to look into its results
1853

    
1854
    # full package version
1855
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1856
                  constants.CV_ENODEVERSION, ninfo.name,
1857
                  "software version mismatch: master %s, node %s",
1858
                  constants.RELEASE_VERSION, remote_version[1],
1859
                  code=self.ETYPE_WARNING)
1860

    
1861
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1862
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1863
      for hv_name, hv_result in hyp_result.iteritems():
1864
        test = hv_result is not None
1865
        self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
1866
                      "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1867

    
1868
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1869
    if ninfo.vm_capable and isinstance(hvp_result, list):
1870
      for item, hv_name, hv_result in hvp_result:
1871
        self._ErrorIf(True, constants.CV_ENODEHV, ninfo.name,
1872
                      "hypervisor %s parameter verify failure (source %s): %s",
1873
                      hv_name, item, hv_result)
1874

    
1875
    test = nresult.get(constants.NV_NODESETUP,
1876
                       ["Missing NODESETUP results"])
1877
    self._ErrorIf(test, constants.CV_ENODESETUP, ninfo.name,
1878
                  "node setup error: %s", "; ".join(test))
1879

    
1880
    return True
1881

    
1882
  def _VerifyNodeTime(self, ninfo, nresult,
1883
                      nvinfo_starttime, nvinfo_endtime):
1884
    """Check the node time.
1885

1886
    @type ninfo: L{objects.Node}
1887
    @param ninfo: the node to check
1888
    @param nresult: the remote results for the node
1889
    @param nvinfo_starttime: the start time of the RPC call
1890
    @param nvinfo_endtime: the end time of the RPC call
1891

1892
    """
1893
    ntime = nresult.get(constants.NV_TIME, None)
1894
    try:
1895
      ntime_merged = utils.MergeTime(ntime)
1896
    except (ValueError, TypeError):
1897
      self._ErrorIf(True, constants.CV_ENODETIME, ninfo.name,
1898
                    "Node returned invalid time")
1899
      return
1900

    
1901
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1902
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1903
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1904
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1905
    else:
1906
      ntime_diff = None
1907

    
1908
    self._ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, ninfo.name,
1909
                  "Node time diverges by at least %s from master node time",
1910
                  ntime_diff)
1911

    
1912
  def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1913
    """Check the node LVM results and update info for cross-node checks.
1914

1915
    @type ninfo: L{objects.Node}
1916
    @param ninfo: the node to check
1917
    @param nresult: the remote results for the node
1918
    @param vg_name: the configured VG name
1919
    @type nimg: L{NodeImage}
1920
    @param nimg: node image
1921

1922
    """
1923
    if vg_name is None:
1924
      return
1925

    
1926
    # checks vg existence and size > 20G
1927
    vglist = nresult.get(constants.NV_VGLIST, None)
1928
    test = not vglist
1929
    self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
1930
                  "unable to check volume groups")
1931
    if not test:
1932
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1933
                                            constants.MIN_VG_SIZE)
1934
      self._ErrorIf(vgstatus, constants.CV_ENODELVM, ninfo.name, vgstatus)
1935

    
1936
    # Check PVs
1937
    (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1938
    for em in errmsgs:
1939
      self._Error(constants.CV_ENODELVM, ninfo.name, em)
1940
    if pvminmax is not None:
1941
      (nimg.pv_min, nimg.pv_max) = pvminmax
1942

    
1943
  def _VerifyGroupDRBDVersion(self, node_verify_infos):
1944
    """Check cross-node DRBD version consistency.
1945

1946
    @type node_verify_infos: dict
1947
    @param node_verify_infos: infos about nodes as returned from the
1948
      node_verify call.
1949

1950
    """
1951
    node_versions = {}
1952
    for node_uuid, ndata in node_verify_infos.items():
1953
      nresult = ndata.payload
1954
      if nresult:
1955
        version = nresult.get(constants.NV_DRBDVERSION, "Missing DRBD version")
1956
        node_versions[node_uuid] = version
1957

    
1958
    if len(set(node_versions.values())) > 1:
1959
      for node_uuid, version in sorted(node_versions.items()):
1960
        msg = "DRBD version mismatch: %s" % version
1961
        self._Error(constants.CV_ENODEDRBDHELPER, node_uuid, msg,
1962
                    code=self.ETYPE_WARNING)
1963

    
1964
  def _VerifyGroupLVM(self, node_image, vg_name):
1965
    """Check cross-node consistency in LVM.
1966

1967
    @type node_image: dict
1968
    @param node_image: info about nodes, mapping from node to names to
1969
      L{NodeImage} objects
1970
    @param vg_name: the configured VG name
1971

1972
    """
1973
    if vg_name is None:
1974
      return
1975

    
1976
    # Only exclusive storage needs this kind of checks
1977
    if not self._exclusive_storage:
1978
      return
1979

    
1980
    # exclusive_storage wants all PVs to have the same size (approximately),
1981
    # if the smallest and the biggest ones are okay, everything is fine.
1982
    # pv_min is None iff pv_max is None
1983
    vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1984
    if not vals:
1985
      return
1986
    (pvmin, minnode_uuid) = min((ni.pv_min, ni.uuid) for ni in vals)
1987
    (pvmax, maxnode_uuid) = max((ni.pv_max, ni.uuid) for ni in vals)
1988
    bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1989
    self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1990
                  "PV sizes differ too much in the group; smallest (%s MB) is"
1991
                  " on %s, biggest (%s MB) is on %s",
1992
                  pvmin, self.cfg.GetNodeName(minnode_uuid),
1993
                  pvmax, self.cfg.GetNodeName(maxnode_uuid))
1994

    
1995
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1996
    """Check the node bridges.
1997

1998
    @type ninfo: L{objects.Node}
1999
    @param ninfo: the node to check
2000
    @param nresult: the remote results for the node
2001
    @param bridges: the expected list of bridges
2002

2003
    """
2004
    if not bridges:
2005
      return
2006

    
2007
    missing = nresult.get(constants.NV_BRIDGES, None)
2008
    test = not isinstance(missing, list)
2009
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
2010
                  "did not return valid bridge information")
2011
    if not test:
2012
      self._ErrorIf(bool(missing), constants.CV_ENODENET, ninfo.name,
2013
                    "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
2014

    
2015
  def _VerifyNodeUserScripts(self, ninfo, nresult):
2016
    """Check the results of user scripts presence and executability on the node
2017

2018
    @type ninfo: L{objects.Node}
2019
    @param ninfo: the node to check
2020
    @param nresult: the remote results for the node
2021

2022
    """
2023
    test = not constants.NV_USERSCRIPTS in nresult
2024
    self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
2025
                  "did not return user scripts information")
2026

    
2027
    broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
2028
    if not test:
2029
      self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
2030
                    "user scripts not present or not executable: %s" %
2031
                    utils.CommaJoin(sorted(broken_scripts)))
2032

    
2033
  def _VerifyNodeNetwork(self, ninfo, nresult):
2034
    """Check the node network connectivity results.
2035

2036
    @type ninfo: L{objects.Node}
2037
    @param ninfo: the node to check
2038
    @param nresult: the remote results for the node
2039

2040
    """
2041
    test = constants.NV_NODELIST not in nresult
2042
    self._ErrorIf(test, constants.CV_ENODESSH, ninfo.name,
2043
                  "node hasn't returned node ssh connectivity data")
2044
    if not test:
2045
      if nresult[constants.NV_NODELIST]:
2046
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
2047
          self._ErrorIf(True, constants.CV_ENODESSH, ninfo.name,
2048
                        "ssh communication with node '%s': %s", a_node, a_msg)
2049

    
2050
    test = constants.NV_NODENETTEST not in nresult
2051
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
2052
                  "node hasn't returned node tcp connectivity data")
2053
    if not test:
2054
      if nresult[constants.NV_NODENETTEST]:
2055
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
2056
        for anode in nlist:
2057
          self._ErrorIf(True, constants.CV_ENODENET, ninfo.name,
2058
                        "tcp communication with node '%s': %s",
2059
                        anode, nresult[constants.NV_NODENETTEST][anode])
2060

    
2061
    test = constants.NV_MASTERIP not in nresult
2062
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
2063
                  "node hasn't returned node master IP reachability data")
2064
    if not test:
2065
      if not nresult[constants.NV_MASTERIP]:
2066
        if ninfo.uuid == self.master_node:
2067
          msg = "the master node cannot reach the master IP (not configured?)"
2068
        else:
2069
          msg = "cannot reach the master IP"
2070
        self._ErrorIf(True, constants.CV_ENODENET, ninfo.name, msg)
2071

    
2072
  def _VerifyInstance(self, instance, node_image, diskstatus):
2073
    """Verify an instance.
2074

2075
    This function checks to see if the required block devices are
2076
    available on the instance's node, and that the nodes are in the correct
2077
    state.
2078

2079
    """
2080
    pnode_uuid = instance.primary_node
2081
    pnode_img = node_image[pnode_uuid]
2082
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
2083

    
2084
    node_vol_should = {}
2085
    instance.MapLVsByNode(node_vol_should)
2086

    
2087
    cluster = self.cfg.GetClusterInfo()
2088
    ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2089
                                                            self.group_info)
2090
    err = ComputeIPolicyInstanceViolation(ipolicy, instance, self.cfg)
2091
    self._ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance.name,
2092
                  utils.CommaJoin(err), code=self.ETYPE_WARNING)
2093

    
2094
    for node_uuid in node_vol_should:
2095
      n_img = node_image[node_uuid]
2096
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
2097
        # ignore missing volumes on offline or broken nodes
2098
        continue
2099
      for volume in node_vol_should[node_uuid]:
2100
        test = volume not in n_img.volumes
2101
        self._ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance.name,
2102
                      "volume %s missing on node %s", volume,
2103
                      self.cfg.GetNodeName(node_uuid))
2104

    
2105
    if instance.admin_state == constants.ADMINST_UP:
2106
      test = instance.uuid not in pnode_img.instances and not pnode_img.offline
2107
      self._ErrorIf(test, constants.CV_EINSTANCEDOWN, instance.name,
2108
                    "instance not running on its primary node %s",
2109
                     self.cfg.GetNodeName(pnode_uuid))
2110
      self._ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE,
2111
                    instance.name, "instance is marked as running and lives on"
2112
                    " offline node %s", self.cfg.GetNodeName(pnode_uuid))
2113

    
2114
    diskdata = [(nname, success, status, idx)
2115
                for (nname, disks) in diskstatus.items()
2116
                for idx, (success, status) in enumerate(disks)]
2117

    
2118
    for nname, success, bdev_status, idx in diskdata:
2119
      # the 'ghost node' construction in Exec() ensures that we have a
2120
      # node here
2121
      snode = node_image[nname]
2122
      bad_snode = snode.ghost or snode.offline
2123
      self._ErrorIf(instance.disks_active and
2124
                    not success and not bad_snode,
2125
                    constants.CV_EINSTANCEFAULTYDISK, instance.name,
2126
                    "couldn't retrieve status for disk/%s on %s: %s",
2127
                    idx, self.cfg.GetNodeName(nname), bdev_status)
2128

    
2129
      if instance.disks_active and success and \
2130
         (bdev_status.is_degraded or
2131
          bdev_status.ldisk_status != constants.LDS_OKAY):
2132
        msg = "disk/%s on %s" % (idx, self.cfg.GetNodeName(nname))
2133
        if bdev_status.is_degraded:
2134
          msg += " is degraded"
2135
        if bdev_status.ldisk_status != constants.LDS_OKAY:
2136
          msg += "; state is '%s'" % \
2137
                 constants.LDS_NAMES[bdev_status.ldisk_status]
2138

    
2139
        self._Error(constants.CV_EINSTANCEFAULTYDISK, instance.name, msg)
2140

    
2141
    self._ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2142
                  constants.CV_ENODERPC, self.cfg.GetNodeName(pnode_uuid),
2143
                  "instance %s, connection to primary node failed",
2144
                  instance.name)
2145

    
2146
    self._ErrorIf(len(instance.secondary_nodes) > 1,
2147
                  constants.CV_EINSTANCELAYOUT, instance.name,
2148
                  "instance has multiple secondary nodes: %s",
2149
                  utils.CommaJoin(instance.secondary_nodes),
2150
                  code=self.ETYPE_WARNING)
2151

    
2152
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, instance.all_nodes)
2153
    if any(es_flags.values()):
2154
      if instance.disk_template not in constants.DTS_EXCL_STORAGE:
2155
        # Disk template not compatible with exclusive_storage: no instance
2156
        # node should have the flag set
2157
        es_nodes = [n
2158
                    for (n, es) in es_flags.items()
2159
                    if es]
2160
        self._Error(constants.CV_EINSTANCEUNSUITABLENODE, instance.name,
2161
                    "instance has template %s, which is not supported on nodes"
2162
                    " that have exclusive storage set: %s",
2163
                    instance.disk_template,
2164
                    utils.CommaJoin(self.cfg.GetNodeNames(es_nodes)))
2165
      for (idx, disk) in enumerate(instance.disks):
2166
        self._ErrorIf(disk.spindles is None,
2167
                      constants.CV_EINSTANCEMISSINGCFGPARAMETER, instance.name,
2168
                      "number of spindles not configured for disk %s while"
2169
                      " exclusive storage is enabled, try running"
2170
                      " gnt-cluster repair-disk-sizes", idx)
2171

    
2172
    if instance.disk_template in constants.DTS_INT_MIRROR:
2173
      instance_nodes = utils.NiceSort(instance.all_nodes)
2174
      instance_groups = {}
2175

    
2176
      for node_uuid in instance_nodes:
2177
        instance_groups.setdefault(self.all_node_info[node_uuid].group,
2178
                                   []).append(node_uuid)
2179

    
2180
      pretty_list = [
2181
        "%s (group %s)" % (utils.CommaJoin(self.cfg.GetNodeNames(nodes)),
2182
                           groupinfo[group].name)
2183
        # Sort so that we always list the primary node first.
2184
        for group, nodes in sorted(instance_groups.items(),
2185
                                   key=lambda (_, nodes): pnode_uuid in nodes,
2186
                                   reverse=True)]
2187

    
2188
      self._ErrorIf(len(instance_groups) > 1,
2189
                    constants.CV_EINSTANCESPLITGROUPS,
2190
                    instance.name, "instance has primary and secondary nodes in"
2191
                    " different groups: %s", utils.CommaJoin(pretty_list),
2192
                    code=self.ETYPE_WARNING)
2193

    
2194
    inst_nodes_offline = []
2195
    for snode in instance.secondary_nodes:
2196
      s_img = node_image[snode]
2197
      self._ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
2198
                    self.cfg.GetNodeName(snode),
2199
                    "instance %s, connection to secondary node failed",
2200
                    instance.name)
2201

    
2202
      if s_img.offline:
2203
        inst_nodes_offline.append(snode)
2204

    
2205
    # warn that the instance lives on offline nodes
2206
    self._ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE,
2207
                  instance.name, "instance has offline secondary node(s) %s",
2208
                  utils.CommaJoin(self.cfg.GetNodeNames(inst_nodes_offline)))
2209
    # ... or ghost/non-vm_capable nodes
2210
    for node_uuid in instance.all_nodes:
2211
      self._ErrorIf(node_image[node_uuid].ghost, constants.CV_EINSTANCEBADNODE,
2212
                    instance.name, "instance lives on ghost node %s",
2213
                    self.cfg.GetNodeName(node_uuid))
2214
      self._ErrorIf(not node_image[node_uuid].vm_capable,
2215
                    constants.CV_EINSTANCEBADNODE, instance.name,
2216
                    "instance lives on non-vm_capable node %s",
2217
                    self.cfg.GetNodeName(node_uuid))
2218

    
2219
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2220
    """Verify if there are any unknown volumes in the cluster.
2221

2222
    The .os, .swap and backup volumes are ignored. All other volumes are
2223
    reported as unknown.
2224

2225
    @type reserved: L{ganeti.utils.FieldSet}
2226
    @param reserved: a FieldSet of reserved volume names
2227

2228
    """
2229
    for node_uuid, n_img in node_image.items():
2230
      if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
2231
          self.all_node_info[node_uuid].group != self.group_uuid):
2232
        # skip non-healthy nodes
2233
        continue
2234
      for volume in n_img.volumes:
2235
        test = ((node_uuid not in node_vol_should or
2236
                volume not in node_vol_should[node_uuid]) and
2237
                not reserved.Matches(volume))
2238
        self._ErrorIf(test, constants.CV_ENODEORPHANLV,
2239
                      self.cfg.GetNodeName(node_uuid),
2240
                      "volume %s is unknown", volume)
2241

    
2242
  def _VerifyNPlusOneMemory(self, node_image, all_insts):
2243
    """Verify N+1 Memory Resilience.
2244

2245
    Check that if one single node dies we can still start all the
2246
    instances it was primary for.
2247

2248
    """
2249
    cluster_info = self.cfg.GetClusterInfo()
2250
    for node_uuid, n_img in node_image.items():
2251
      # This code checks that every node which is now listed as
2252
      # secondary has enough memory to host all instances it is
2253
      # supposed to should a single other node in the cluster fail.
2254
      # FIXME: not ready for failover to an arbitrary node
2255
      # FIXME: does not support file-backed instances
2256
      # WARNING: we currently take into account down instances as well
2257
      # as up ones, considering that even if they're down someone
2258
      # might want to start them even in the event of a node failure.
2259
      if n_img.offline or \
2260
         self.all_node_info[node_uuid].group != self.group_uuid:
2261
        # we're skipping nodes marked offline and nodes in other groups from
2262
        # the N+1 warning, since most likely we don't have good memory
2263
        # information from them; we already list instances living on such
2264
        # nodes, and that's enough warning
2265
        continue
2266
      #TODO(dynmem): also consider ballooning out other instances
2267
      for prinode, inst_uuids in n_img.sbp.items():
2268
        needed_mem = 0
2269
        for inst_uuid in inst_uuids:
2270
          bep = cluster_info.FillBE(all_insts[inst_uuid])
2271
          if bep[constants.BE_AUTO_BALANCE]:
2272
            needed_mem += bep[constants.BE_MINMEM]
2273
        test = n_img.mfree < needed_mem
2274
        self._ErrorIf(test, constants.CV_ENODEN1,
2275
                      self.cfg.GetNodeName(node_uuid),
2276
                      "not enough memory to accomodate instance failovers"
2277
                      " should node %s fail (%dMiB needed, %dMiB available)",
2278
                      self.cfg.GetNodeName(prinode), needed_mem, n_img.mfree)
2279

    
2280
  def _VerifyFiles(self, nodes, master_node_uuid, all_nvinfo,
2281
                   (files_all, files_opt, files_mc, files_vm)):
2282
    """Verifies file checksums collected from all nodes.
2283

2284
    @param nodes: List of L{objects.Node} objects
2285
    @param master_node_uuid: UUID of master node
2286
    @param all_nvinfo: RPC results
2287

2288
    """
2289
    # Define functions determining which nodes to consider for a file
2290
    files2nodefn = [
2291
      (files_all, None),
2292
      (files_mc, lambda node: (node.master_candidate or
2293
                               node.uuid == master_node_uuid)),
2294
      (files_vm, lambda node: node.vm_capable),
2295
      ]
2296

    
2297
    # Build mapping from filename to list of nodes which should have the file
2298
    nodefiles = {}
2299
    for (files, fn) in files2nodefn:
2300
      if fn is None:
2301
        filenodes = nodes
2302
      else:
2303
        filenodes = filter(fn, nodes)
2304
      nodefiles.update((filename,
2305
                        frozenset(map(operator.attrgetter("uuid"), filenodes)))
2306
                       for filename in files)
2307

    
2308
    assert set(nodefiles) == (files_all | files_mc | files_vm)
2309

    
2310
    fileinfo = dict((filename, {}) for filename in nodefiles)
2311
    ignore_nodes = set()
2312

    
2313
    for node in nodes:
2314
      if node.offline:
2315
        ignore_nodes.add(node.uuid)
2316
        continue
2317

    
2318
      nresult = all_nvinfo[node.uuid]
2319

    
2320
      if nresult.fail_msg or not nresult.payload:
2321
        node_files = None
2322
      else:
2323
        fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
2324
        node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2325
                          for (key, value) in fingerprints.items())
2326
        del fingerprints
2327

    
2328
      test = not (node_files and isinstance(node_files, dict))
2329
      self._ErrorIf(test, constants.CV_ENODEFILECHECK, node.name,
2330
                    "Node did not return file checksum data")
2331
      if test:
2332
        ignore_nodes.add(node.uuid)
2333
        continue
2334

    
2335
      # Build per-checksum mapping from filename to nodes having it
2336
      for (filename, checksum) in node_files.items():
2337
        assert filename in nodefiles
2338
        fileinfo[filename].setdefault(checksum, set()).add(node.uuid)
2339

    
2340
    for (filename, checksums) in fileinfo.items():
2341
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2342

    
2343
      # Nodes having the file
2344
      with_file = frozenset(node_uuid
2345
                            for node_uuids in fileinfo[filename].values()
2346
                            for node_uuid in node_uuids) - ignore_nodes
2347

    
2348
      expected_nodes = nodefiles[filename] - ignore_nodes
2349

    
2350
      # Nodes missing file
2351
      missing_file = expected_nodes - with_file
2352

    
2353
      if filename in files_opt:
2354
        # All or no nodes
2355
        self._ErrorIf(missing_file and missing_file != expected_nodes,
2356
                      constants.CV_ECLUSTERFILECHECK, None,
2357
                      "File %s is optional, but it must exist on all or no"
2358
                      " nodes (not found on %s)",
2359
                      filename,
2360
                      utils.CommaJoin(
2361
                        utils.NiceSort(
2362
                          map(self.cfg.GetNodeName, missing_file))))
2363
      else:
2364
        self._ErrorIf(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2365
                      "File %s is missing from node(s) %s", filename,
2366
                      utils.CommaJoin(
2367
                        utils.NiceSort(
2368
                          map(self.cfg.GetNodeName, missing_file))))
2369

    
2370
        # Warn if a node has a file it shouldn't
2371
        unexpected = with_file - expected_nodes
2372
        self._ErrorIf(unexpected,
2373
                      constants.CV_ECLUSTERFILECHECK, None,
2374
                      "File %s should not exist on node(s) %s",
2375
                      filename, utils.CommaJoin(
2376
                        utils.NiceSort(map(self.cfg.GetNodeName, unexpected))))
2377

    
2378
      # See if there are multiple versions of the file
2379
      test = len(checksums) > 1
2380
      if test:
2381
        variants = ["variant %s on %s" %
2382
                    (idx + 1,
2383
                     utils.CommaJoin(utils.NiceSort(
2384
                       map(self.cfg.GetNodeName, node_uuids))))
2385
                    for (idx, (checksum, node_uuids)) in
2386
                      enumerate(sorted(checksums.items()))]
2387
      else:
2388
        variants = []
2389

    
2390
      self._ErrorIf(test, constants.CV_ECLUSTERFILECHECK, None,
2391
                    "File %s found with %s different checksums (%s)",
2392
                    filename, len(checksums), "; ".join(variants))
2393

    
2394
  def _VerifyNodeDrbdHelper(self, ninfo, nresult, drbd_helper):
2395
    """Verify the drbd helper.
2396

2397
    """
2398
    if drbd_helper:
2399
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2400
      test = (helper_result is None)
2401
      self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2402
                    "no drbd usermode helper returned")
2403
      if helper_result:
2404
        status, payload = helper_result
2405
        test = not status
2406
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2407
                      "drbd usermode helper check unsuccessful: %s", payload)
2408
        test = status and (payload != drbd_helper)
2409
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2410
                      "wrong drbd usermode helper: %s", payload)
2411

    
2412
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2413
                      drbd_map):
2414
    """Verifies and the node DRBD status.
2415

2416
    @type ninfo: L{objects.Node}
2417
    @param ninfo: the node to check
2418
    @param nresult: the remote results for the node
2419
    @param instanceinfo: the dict of instances
2420
    @param drbd_helper: the configured DRBD usermode helper
2421
    @param drbd_map: the DRBD map as returned by
2422
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2423

2424
    """
2425
    self._VerifyNodeDrbdHelper(ninfo, nresult, drbd_helper)
2426

    
2427
    # compute the DRBD minors
2428
    node_drbd = {}
2429
    for minor, inst_uuid in drbd_map[ninfo.uuid].items():
2430
      test = inst_uuid not in instanceinfo
2431
      self._ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2432
                    "ghost instance '%s' in temporary DRBD map", inst_uuid)
2433
        # ghost instance should not be running, but otherwise we
2434
        # don't give double warnings (both ghost instance and
2435
        # unallocated minor in use)
2436
      if test:
2437
        node_drbd[minor] = (inst_uuid, False)
2438
      else:
2439
        instance = instanceinfo[inst_uuid]
2440
        node_drbd[minor] = (inst_uuid, instance.disks_active)
2441

    
2442
    # and now check them
2443
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2444
    test = not isinstance(used_minors, (tuple, list))
2445
    self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2446
                  "cannot parse drbd status file: %s", str(used_minors))
2447
    if test:
2448
      # we cannot check drbd status
2449
      return
2450

    
2451
    for minor, (inst_uuid, must_exist) in node_drbd.items():
2452
      test = minor not in used_minors and must_exist
2453
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2454
                    "drbd minor %d of instance %s is not active", minor,
2455
                    self.cfg.GetInstanceName(inst_uuid))
2456
    for minor in used_minors:
2457
      test = minor not in node_drbd
2458
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2459
                    "unallocated drbd minor %d is in use", minor)
2460

    
2461
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2462
    """Builds the node OS structures.
2463

2464
    @type ninfo: L{objects.Node}
2465
    @param ninfo: the node to check
2466
    @param nresult: the remote results for the node
2467
    @param nimg: the node image object
2468

2469
    """
2470
    remote_os = nresult.get(constants.NV_OSLIST, None)
2471
    test = (not isinstance(remote_os, list) or
2472
            not compat.all(isinstance(v, list) and len(v) == 7
2473
                           for v in remote_os))
2474

    
2475
    self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2476
                  "node hasn't returned valid OS data")
2477

    
2478
    nimg.os_fail = test
2479

    
2480
    if test:
2481
      return
2482

    
2483
    os_dict = {}
2484

    
2485
    for (name, os_path, status, diagnose,
2486
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2487

    
2488
      if name not in os_dict:
2489
        os_dict[name] = []
2490

    
2491
      # parameters is a list of lists instead of list of tuples due to
2492
      # JSON lacking a real tuple type, fix it:
2493
      parameters = [tuple(v) for v in parameters]
2494
      os_dict[name].append((os_path, status, diagnose,
2495
                            set(variants), set(parameters), set(api_ver)))
2496

    
2497
    nimg.oslist = os_dict
2498

    
2499
  def _VerifyNodeOS(self, ninfo, nimg, base):
2500
    """Verifies the node OS list.
2501

2502
    @type ninfo: L{objects.Node}
2503
    @param ninfo: the node to check
2504
    @param nimg: the node image object
2505
    @param base: the 'template' node we match against (e.g. from the master)
2506

2507
    """
2508
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2509

    
2510
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2511
    for os_name, os_data in nimg.oslist.items():
2512
      assert os_data, "Empty OS status for OS %s?!" % os_name
2513
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2514
      self._ErrorIf(not f_status, constants.CV_ENODEOS, ninfo.name,
2515
                    "Invalid OS %s (located at %s): %s",
2516
                    os_name, f_path, f_diag)
2517
      self._ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, ninfo.name,
2518
                    "OS '%s' has multiple entries"
2519
                    " (first one shadows the rest): %s",
2520
                    os_name, utils.CommaJoin([v[0] for v in os_data]))
2521
      # comparisons with the 'base' image
2522
      test = os_name not in base.oslist
2523
      self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2524
                    "Extra OS %s not present on reference node (%s)",
2525
                    os_name, self.cfg.GetNodeName(base.uuid))
2526
      if test:
2527
        continue
2528
      assert base.oslist[os_name], "Base node has empty OS status?"
2529
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2530
      if not b_status:
2531
        # base OS is invalid, skipping
2532
        continue
2533
      for kind, a, b in [("API version", f_api, b_api),
2534
                         ("variants list", f_var, b_var),
2535
                         ("parameters", beautify_params(f_param),
2536
                          beautify_params(b_param))]:
2537
        self._ErrorIf(a != b, constants.CV_ENODEOS, ninfo.name,
2538
                      "OS %s for %s differs from reference node %s:"
2539
                      " [%s] vs. [%s]", kind, os_name,
2540
                      self.cfg.GetNodeName(base.uuid),
2541
                      utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2542

    
2543
    # check any missing OSes
2544
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2545
    self._ErrorIf(missing, constants.CV_ENODEOS, ninfo.name,
2546
                  "OSes present on reference node %s"
2547
                  " but missing on this node: %s",
2548
                  self.cfg.GetNodeName(base.uuid), utils.CommaJoin(missing))
2549

    
2550
  def _VerifyAcceptedFileStoragePaths(self, ninfo, nresult, is_master):
2551
    """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2552

2553
    @type ninfo: L{objects.Node}
2554
    @param ninfo: the node to check
2555
    @param nresult: the remote results for the node
2556
    @type is_master: bool
2557
    @param is_master: Whether node is the master node
2558

2559
    """
2560
    cluster = self.cfg.GetClusterInfo()
2561
    if (is_master and
2562
        (cluster.IsFileStorageEnabled() or
2563
         cluster.IsSharedFileStorageEnabled())):
2564
      try:
2565
        fspaths = nresult[constants.NV_ACCEPTED_STORAGE_PATHS]
2566
      except KeyError:
2567
        # This should never happen
2568
        self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2569
                      "Node did not return forbidden file storage paths")
2570
      else:
2571
        self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2572
                      "Found forbidden file storage paths: %s",
2573
                      utils.CommaJoin(fspaths))
2574
    else:
2575
      self._ErrorIf(constants.NV_ACCEPTED_STORAGE_PATHS in nresult,
2576
                    constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2577
                    "Node should not have returned forbidden file storage"
2578
                    " paths")
2579

    
2580
  def _VerifyStoragePaths(self, ninfo, nresult, file_disk_template,
2581
                          verify_key, error_key):
2582
    """Verifies (file) storage paths.
2583

2584
    @type ninfo: L{objects.Node}
2585
    @param ninfo: the node to check
2586
    @param nresult: the remote results for the node
2587
    @type file_disk_template: string
2588
    @param file_disk_template: file-based disk template, whose directory
2589
        is supposed to be verified
2590
    @type verify_key: string
2591
    @param verify_key: key for the verification map of this file
2592
        verification step
2593
    @param error_key: error key to be added to the verification results
2594
        in case something goes wrong in this verification step
2595

2596
    """
2597
    assert (file_disk_template in
2598
            utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
2599
    cluster = self.cfg.GetClusterInfo()
2600
    if cluster.IsDiskTemplateEnabled(file_disk_template):
2601
      self._ErrorIf(
2602
          verify_key in nresult,
2603
          error_key, ninfo.name,
2604
          "The configured %s storage path is unusable: %s" %
2605
          (file_disk_template, nresult.get(verify_key)))
2606

    
2607
  def _VerifyFileStoragePaths(self, ninfo, nresult):
2608
    """Verifies (file) storage paths.
2609

2610
    @see: C{_VerifyStoragePaths}
2611

2612
    """
2613
    self._VerifyStoragePaths(
2614
        ninfo, nresult, constants.DT_FILE,
2615
        constants.NV_FILE_STORAGE_PATH,
2616
        constants.CV_ENODEFILESTORAGEPATHUNUSABLE)
2617

    
2618
  def _VerifySharedFileStoragePaths(self, ninfo, nresult):
2619
    """Verifies (file) storage paths.
2620

2621
    @see: C{_VerifyStoragePaths}
2622

2623
    """
2624
    self._VerifyStoragePaths(
2625
        ninfo, nresult, constants.DT_SHARED_FILE,
2626
        constants.NV_SHARED_FILE_STORAGE_PATH,
2627
        constants.CV_ENODESHAREDFILESTORAGEPATHUNUSABLE)
2628

    
2629
  def _VerifyOob(self, ninfo, nresult):
2630
    """Verifies out of band functionality of a node.
2631

2632
    @type ninfo: L{objects.Node}
2633
    @param ninfo: the node to check
2634
    @param nresult: the remote results for the node
2635

2636
    """
2637
    # We just have to verify the paths on master and/or master candidates
2638
    # as the oob helper is invoked on the master
2639
    if ((ninfo.master_candidate or ninfo.master_capable) and
2640
        constants.NV_OOB_PATHS in nresult):
2641
      for path_result in nresult[constants.NV_OOB_PATHS]:
2642
        self._ErrorIf(path_result, constants.CV_ENODEOOBPATH,
2643
                      ninfo.name, path_result)
2644

    
2645
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2646
    """Verifies and updates the node volume data.
2647

2648
    This function will update a L{NodeImage}'s internal structures
2649
    with data from the remote call.
2650

2651
    @type ninfo: L{objects.Node}
2652
    @param ninfo: the node to check
2653
    @param nresult: the remote results for the node
2654
    @param nimg: the node image object
2655
    @param vg_name: the configured VG name
2656

2657
    """
2658
    nimg.lvm_fail = True
2659
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2660
    if vg_name is None:
2661
      pass
2662
    elif isinstance(lvdata, basestring):
2663
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2664
                    "LVM problem on node: %s", utils.SafeEncode(lvdata))
2665
    elif not isinstance(lvdata, dict):
2666
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2667
                    "rpc call to node failed (lvlist)")
2668
    else:
2669
      nimg.volumes = lvdata
2670
      nimg.lvm_fail = False
2671

    
2672
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2673
    """Verifies and updates the node instance list.
2674

2675
    If the listing was successful, then updates this node's instance
2676
    list. Otherwise, it marks the RPC call as failed for the instance
2677
    list key.
2678

2679
    @type ninfo: L{objects.Node}
2680
    @param ninfo: the node to check
2681
    @param nresult: the remote results for the node
2682
    @param nimg: the node image object
2683

2684
    """
2685
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2686
    test = not isinstance(idata, list)
2687
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2688
                  "rpc call to node failed (instancelist): %s",
2689
                  utils.SafeEncode(str(idata)))
2690
    if test:
2691
      nimg.hyp_fail = True
2692
    else:
2693
      nimg.instances = [inst.uuid for (_, inst) in
2694
                        self.cfg.GetMultiInstanceInfoByName(idata)]
2695

    
2696
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2697
    """Verifies and computes a node information map
2698

2699
    @type ninfo: L{objects.Node}
2700
    @param ninfo: the node to check
2701
    @param nresult: the remote results for the node
2702
    @param nimg: the node image object
2703
    @param vg_name: the configured VG name
2704

2705
    """
2706
    # try to read free memory (from the hypervisor)
2707
    hv_info = nresult.get(constants.NV_HVINFO, None)
2708
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2709
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2710
                  "rpc call to node failed (hvinfo)")
2711
    if not test:
2712
      try:
2713
        nimg.mfree = int(hv_info["memory_free"])
2714
      except (ValueError, TypeError):
2715
        self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2716
                      "node returned invalid nodeinfo, check hypervisor")
2717

    
2718
    # FIXME: devise a free space model for file based instances as well
2719
    if vg_name is not None:
2720
      test = (constants.NV_VGLIST not in nresult or
2721
              vg_name not in nresult[constants.NV_VGLIST])
2722
      self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
2723
                    "node didn't return data for the volume group '%s'"
2724
                    " - it is either missing or broken", vg_name)
2725
      if not test:
2726
        try:
2727
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2728
        except (ValueError, TypeError):
2729
          self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2730
                        "node returned invalid LVM info, check LVM status")
2731

    
2732
  def _CollectDiskInfo(self, node_uuids, node_image, instanceinfo):
2733
    """Gets per-disk status information for all instances.
2734

2735
    @type node_uuids: list of strings
2736
    @param node_uuids: Node UUIDs
2737
    @type node_image: dict of (UUID, L{objects.Node})
2738
    @param node_image: Node objects
2739
    @type instanceinfo: dict of (UUID, L{objects.Instance})
2740
    @param instanceinfo: Instance objects
2741
    @rtype: {instance: {node: [(succes, payload)]}}
2742
    @return: a dictionary of per-instance dictionaries with nodes as
2743
        keys and disk information as values; the disk information is a
2744
        list of tuples (success, payload)
2745

2746
    """
2747
    node_disks = {}
2748
    node_disks_dev_inst_only = {}
2749
    diskless_instances = set()
2750
    nodisk_instances = set()
2751
    diskless = constants.DT_DISKLESS
2752

    
2753
    for nuuid in node_uuids:
2754
      node_inst_uuids = list(itertools.chain(node_image[nuuid].pinst,
2755
                                             node_image[nuuid].sinst))
2756
      diskless_instances.update(uuid for uuid in node_inst_uuids
2757
                                if instanceinfo[uuid].disk_template == diskless)
2758
      disks = [(inst_uuid, disk)
2759
               for inst_uuid in node_inst_uuids
2760
               for disk in instanceinfo[inst_uuid].disks]
2761

    
2762
      if not disks:
2763
        nodisk_instances.update(uuid for uuid in node_inst_uuids
2764
                                if instanceinfo[uuid].disk_template != diskless)
2765
        # No need to collect data
2766
        continue
2767

    
2768
      node_disks[nuuid] = disks
2769

    
2770
      # _AnnotateDiskParams makes already copies of the disks
2771
      dev_inst_only = []
2772
      for (inst_uuid, dev) in disks:
2773
        (anno_disk,) = AnnotateDiskParams(instanceinfo[inst_uuid], [dev],
2774
                                          self.cfg)
2775
        dev_inst_only.append((anno_disk, instanceinfo[inst_uuid]))
2776

    
2777
      node_disks_dev_inst_only[nuuid] = dev_inst_only
2778

    
2779
    assert len(node_disks) == len(node_disks_dev_inst_only)
2780

    
2781
    # Collect data from all nodes with disks
2782
    result = self.rpc.call_blockdev_getmirrorstatus_multi(
2783
               node_disks.keys(), node_disks_dev_inst_only)
2784

    
2785
    assert len(result) == len(node_disks)
2786

    
2787
    instdisk = {}
2788

    
2789
    for (nuuid, nres) in result.items():
2790
      node = self.cfg.GetNodeInfo(nuuid)
2791
      disks = node_disks[node.uuid]
2792

    
2793
      if nres.offline:
2794
        # No data from this node
2795
        data = len(disks) * [(False, "node offline")]
2796
      else:
2797
        msg = nres.fail_msg
2798
        self._ErrorIf(msg, constants.CV_ENODERPC, node.name,
2799
                      "while getting disk information: %s", msg)
2800
        if msg:
2801
          # No data from this node
2802
          data = len(disks) * [(False, msg)]
2803
        else:
2804
          data = []
2805
          for idx, i in enumerate(nres.payload):
2806
            if isinstance(i, (tuple, list)) and len(i) == 2:
2807
              data.append(i)
2808
            else:
2809
              logging.warning("Invalid result from node %s, entry %d: %s",
2810
                              node.name, idx, i)
2811
              data.append((False, "Invalid result from the remote node"))
2812

    
2813
      for ((inst_uuid, _), status) in zip(disks, data):
2814
        instdisk.setdefault(inst_uuid, {}).setdefault(node.uuid, []) \
2815
          .append(status)
2816

    
2817
    # Add empty entries for diskless instances.
2818
    for inst_uuid in diskless_instances:
2819
      assert inst_uuid not in instdisk
2820
      instdisk[inst_uuid] = {}
2821
    # ...and disk-full instances that happen to have no disks
2822
    for inst_uuid in nodisk_instances:
2823
      assert inst_uuid not in instdisk
2824
      instdisk[inst_uuid] = {}
2825

    
2826
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2827
                      len(nuuids) <= len(instanceinfo[inst].all_nodes) and
2828
                      compat.all(isinstance(s, (tuple, list)) and
2829
                                 len(s) == 2 for s in statuses)
2830
                      for inst, nuuids in instdisk.items()
2831
                      for nuuid, statuses in nuuids.items())
2832
    if __debug__:
2833
      instdisk_keys = set(instdisk)
2834
      instanceinfo_keys = set(instanceinfo)
2835
      assert instdisk_keys == instanceinfo_keys, \
2836
        ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2837
         (instdisk_keys, instanceinfo_keys))
2838

    
2839
    return instdisk
2840

    
2841
  @staticmethod
2842
  def _SshNodeSelector(group_uuid, all_nodes):
2843
    """Create endless iterators for all potential SSH check hosts.
2844

2845
    """
2846
    nodes = [node for node in all_nodes
2847
             if (node.group != group_uuid and
2848
                 not node.offline)]
2849
    keyfunc = operator.attrgetter("group")
2850

    
2851
    return map(itertools.cycle,
2852
               [sorted(map(operator.attrgetter("name"), names))
2853
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2854
                                                  keyfunc)])
2855

    
2856
  @classmethod
2857
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2858
    """Choose which nodes should talk to which other nodes.
2859

2860
    We will make nodes contact all nodes in their group, and one node from
2861
    every other group.
2862

2863
    @warning: This algorithm has a known issue if one node group is much
2864
      smaller than others (e.g. just one node). In such a case all other
2865
      nodes will talk to the single node.
2866

2867
    """
2868
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2869
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
2870

    
2871
    return (online_nodes,
2872
            dict((name, sorted([i.next() for i in sel]))
2873
                 for name in online_nodes))
2874

    
2875
  def BuildHooksEnv(self):
2876
    """Build hooks env.
2877

2878
    Cluster-Verify hooks just ran in the post phase and their failure makes
2879
    the output be logged in the verify output and the verification to fail.
2880

2881
    """
2882
    env = {
2883
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2884
      }
2885

    
2886
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2887
               for node in self.my_node_info.values())
2888

    
2889
    return env
2890

    
2891
  def BuildHooksNodes(self):
2892
    """Build hooks nodes.
2893

2894
    """
2895
    return ([], list(self.my_node_info.keys()))
2896

    
2897
  def Exec(self, feedback_fn):
2898
    """Verify integrity of the node group, performing various test on nodes.
2899

2900
    """
2901
    # This method has too many local variables. pylint: disable=R0914
2902
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
2903

    
2904
    if not self.my_node_uuids:
2905
      # empty node group
2906
      feedback_fn("* Empty node group, skipping verification")
2907
      return True
2908

    
2909
    self.bad = False
2910
    verbose = self.op.verbose
2911
    self._feedback_fn = feedback_fn
2912

    
2913
    vg_name = self.cfg.GetVGName()
2914
    drbd_helper = self.cfg.GetDRBDHelper()
2915
    cluster = self.cfg.GetClusterInfo()
2916
    hypervisors = cluster.enabled_hypervisors
2917
    node_data_list = self.my_node_info.values()
2918

    
2919
    i_non_redundant = [] # Non redundant instances
2920
    i_non_a_balanced = [] # Non auto-balanced instances
2921
    i_offline = 0 # Count of offline instances
2922
    n_offline = 0 # Count of offline nodes
2923
    n_drained = 0 # Count of nodes being drained
2924
    node_vol_should = {}
2925

    
2926
    # FIXME: verify OS list
2927

    
2928
    # File verification
2929
    filemap = ComputeAncillaryFiles(cluster, False)
2930

    
2931
    # do local checksums
2932
    master_node_uuid = self.master_node = self.cfg.GetMasterNode()
2933
    master_ip = self.cfg.GetMasterIP()
2934

    
2935
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_uuids))
2936

    
2937
    user_scripts = []
2938
    if self.cfg.GetUseExternalMipScript():
2939
      user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2940

    
2941
    node_verify_param = {
2942
      constants.NV_FILELIST:
2943
        map(vcluster.MakeVirtualPath,
2944
            utils.UniqueSequence(filename
2945
                                 for files in filemap
2946
                                 for filename in files)),
2947
      constants.NV_NODELIST:
2948
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2949
                                  self.all_node_info.values()),
2950
      constants.NV_HYPERVISOR: hypervisors,
2951
      constants.NV_HVPARAMS:
2952
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2953
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2954
                                 for node in node_data_list
2955
                                 if not node.offline],
2956
      constants.NV_INSTANCELIST: hypervisors,
2957
      constants.NV_VERSION: None,
2958
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2959
      constants.NV_NODESETUP: None,
2960
      constants.NV_TIME: None,
2961
      constants.NV_MASTERIP: (self.cfg.GetMasterNodeName(), master_ip),
2962
      constants.NV_OSLIST: None,
2963
      constants.NV_NONVMNODES: self.cfg.GetNonVmCapableNodeNameList(),
2964
      constants.NV_USERSCRIPTS: user_scripts,
2965
      }
2966

    
2967
    if vg_name is not None:
2968
      node_verify_param[constants.NV_VGLIST] = None
2969
      node_verify_param[constants.NV_LVLIST] = vg_name
2970
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2971

    
2972
    if cluster.IsDiskTemplateEnabled(constants.DT_DRBD8):
2973
      if drbd_helper:
2974
        node_verify_param[constants.NV_DRBDVERSION] = None
2975
        node_verify_param[constants.NV_DRBDLIST] = None
2976
        node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2977

    
2978
    if cluster.IsFileStorageEnabled() or \
2979
        cluster.IsSharedFileStorageEnabled():
2980
      # Load file storage paths only from master node
2981
      node_verify_param[constants.NV_ACCEPTED_STORAGE_PATHS] = \
2982
        self.cfg.GetMasterNodeName()
2983
      if cluster.IsFileStorageEnabled():
2984
        node_verify_param[constants.NV_FILE_STORAGE_PATH] = \
2985
          cluster.file_storage_dir
2986
      if cluster.IsSharedFileStorageEnabled():
2987
        node_verify_param[constants.NV_SHARED_FILE_STORAGE_PATH] = \
2988
          cluster.shared_file_storage_dir
2989

    
2990
    # bridge checks
2991
    # FIXME: this needs to be changed per node-group, not cluster-wide
2992
    bridges = set()
2993
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2994
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2995
      bridges.add(default_nicpp[constants.NIC_LINK])
2996
    for inst_uuid in self.my_inst_info.values():
2997
      for nic in inst_uuid.nics:
2998
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
2999
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
3000
          bridges.add(full_nic[constants.NIC_LINK])
3001

    
3002
    if bridges:
3003
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
3004

    
3005
    # Build our expected cluster state
3006
    node_image = dict((node.uuid, self.NodeImage(offline=node.offline,
3007
                                                 uuid=node.uuid,
3008
                                                 vm_capable=node.vm_capable))
3009
                      for node in node_data_list)
3010

    
3011
    # Gather OOB paths
3012
    oob_paths = []
3013
    for node in self.all_node_info.values():
3014
      path = SupportsOob(self.cfg, node)
3015
      if path and path not in oob_paths:
3016
        oob_paths.append(path)
3017

    
3018
    if oob_paths:
3019
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
3020

    
3021
    for inst_uuid in self.my_inst_uuids:
3022
      instance = self.my_inst_info[inst_uuid]
3023
      if instance.admin_state == constants.ADMINST_OFFLINE:
3024
        i_offline += 1
3025

    
3026
      for nuuid in instance.all_nodes:
3027
        if nuuid not in node_image:
3028
          gnode = self.NodeImage(uuid=nuuid)
3029
          gnode.ghost = (nuuid not in self.all_node_info)
3030
          node_image[nuuid] = gnode
3031

    
3032
      instance.MapLVsByNode(node_vol_should)
3033

    
3034
      pnode = instance.primary_node
3035
      node_image[pnode].pinst.append(instance.uuid)
3036

    
3037
      for snode in instance.secondary_nodes:
3038
        nimg = node_image[snode]
3039
        nimg.sinst.append(instance.uuid)
3040
        if pnode not in nimg.sbp:
3041
          nimg.sbp[pnode] = []
3042
        nimg.sbp[pnode].append(instance.uuid)
3043

    
3044
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
3045
                                               self.my_node_info.keys())
3046
    # The value of exclusive_storage should be the same across the group, so if
3047
    # it's True for at least a node, we act as if it were set for all the nodes
3048
    self._exclusive_storage = compat.any(es_flags.values())
3049
    if self._exclusive_storage:
3050
      node_verify_param[constants.NV_EXCLUSIVEPVS] = True
3051

    
3052
    # At this point, we have the in-memory data structures complete,
3053
    # except for the runtime information, which we'll gather next
3054

    
3055
    # Due to the way our RPC system works, exact response times cannot be
3056
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
3057
    # time before and after executing the request, we can at least have a time
3058
    # window.
3059
    nvinfo_starttime = time.time()
3060
    all_nvinfo = self.rpc.call_node_verify(self.my_node_uuids,
3061
                                           node_verify_param,
3062
                                           self.cfg.GetClusterName(),
3063
                                           self.cfg.GetClusterInfo().hvparams)
3064
    nvinfo_endtime = time.time()
3065

    
3066
    if self.extra_lv_nodes and vg_name is not None:
3067
      extra_lv_nvinfo = \
3068
          self.rpc.call_node_verify(self.extra_lv_nodes,
3069
                                    {constants.NV_LVLIST: vg_name},
3070
                                    self.cfg.GetClusterName(),
3071
                                    self.cfg.GetClusterInfo().hvparams)
3072
    else:
3073
      extra_lv_nvinfo = {}
3074

    
3075
    all_drbd_map = self.cfg.ComputeDRBDMap()
3076

    
3077
    feedback_fn("* Gathering disk information (%s nodes)" %
3078
                len(self.my_node_uuids))
3079
    instdisk = self._CollectDiskInfo(self.my_node_info.keys(), node_image,
3080
                                     self.my_inst_info)
3081

    
3082
    feedback_fn("* Verifying configuration file consistency")
3083

    
3084
    # If not all nodes are being checked, we need to make sure the master node
3085
    # and a non-checked vm_capable node are in the list.
3086
    absent_node_uuids = set(self.all_node_info).difference(self.my_node_info)
3087
    if absent_node_uuids:
3088
      vf_nvinfo = all_nvinfo.copy()
3089
      vf_node_info = list(self.my_node_info.values())
3090
      additional_node_uuids = []
3091
      if master_node_uuid not in self.my_node_info:
3092
        additional_node_uuids.append(master_node_uuid)
3093
        vf_node_info.append(self.all_node_info[master_node_uuid])
3094
      # Add the first vm_capable node we find which is not included,
3095
      # excluding the master node (which we already have)
3096
      for node_uuid in absent_node_uuids:
3097
        nodeinfo = self.all_node_info[node_uuid]
3098
        if (nodeinfo.vm_capable and not nodeinfo.offline and
3099
            node_uuid != master_node_uuid):
3100
          additional_node_uuids.append(node_uuid)
3101
          vf_node_info.append(self.all_node_info[node_uuid])
3102
          break
3103
      key = constants.NV_FILELIST
3104
      vf_nvinfo.update(self.rpc.call_node_verify(
3105
         additional_node_uuids, {key: node_verify_param[key]},
3106
         self.cfg.GetClusterName(), self.cfg.GetClusterInfo().hvparams))
3107
    else:
3108
      vf_nvinfo = all_nvinfo
3109
      vf_node_info = self.my_node_info.values()
3110

    
3111
    self._VerifyFiles(vf_node_info, master_node_uuid, vf_nvinfo, filemap)
3112

    
3113
    feedback_fn("* Verifying node status")
3114

    
3115
    refos_img = None
3116

    
3117
    for node_i in node_data_list:
3118
      nimg = node_image[node_i.uuid]
3119

    
3120
      if node_i.offline:
3121
        if verbose:
3122
          feedback_fn("* Skipping offline node %s" % (node_i.name,))
3123
        n_offline += 1
3124
        continue
3125

    
3126
      if node_i.uuid == master_node_uuid:
3127
        ntype = "master"
3128
      elif node_i.master_candidate:
3129
        ntype = "master candidate"
3130
      elif node_i.drained:
3131
        ntype = "drained"
3132
        n_drained += 1
3133
      else:
3134
        ntype = "regular"
3135
      if verbose:
3136
        feedback_fn("* Verifying node %s (%s)" % (node_i.name, ntype))
3137

    
3138
      msg = all_nvinfo[node_i.uuid].fail_msg
3139
      self._ErrorIf(msg, constants.CV_ENODERPC, node_i.name,
3140
                    "while contacting node: %s", msg)
3141
      if msg:
3142
        nimg.rpc_fail = True
3143
        continue
3144

    
3145
      nresult = all_nvinfo[node_i.uuid].payload
3146

    
3147
      nimg.call_ok = self._VerifyNode(node_i, nresult)
3148
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
3149
      self._VerifyNodeNetwork(node_i, nresult)
3150
      self._VerifyNodeUserScripts(node_i, nresult)
3151
      self._VerifyOob(node_i, nresult)
3152
      self._VerifyAcceptedFileStoragePaths(node_i, nresult,
3153
                                           node_i.uuid == master_node_uuid)
3154
      self._VerifyFileStoragePaths(node_i, nresult)
3155
      self._VerifySharedFileStoragePaths(node_i, nresult)
3156

    
3157
      if nimg.vm_capable:
3158
        self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
3159
        if constants.DT_DRBD8 in cluster.enabled_disk_templates:
3160
          self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
3161
                               all_drbd_map)
3162

    
3163
        if (constants.DT_PLAIN in cluster.enabled_disk_templates) or \
3164
            (constants.DT_DRBD8 in cluster.enabled_disk_templates):
3165
          self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
3166
        self._UpdateNodeInstances(node_i, nresult, nimg)
3167
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
3168
        self._UpdateNodeOS(node_i, nresult, nimg)
3169

    
3170
        if not nimg.os_fail:
3171
          if refos_img is None:
3172
            refos_img = nimg
3173
          self._VerifyNodeOS(node_i, nimg, refos_img)
3174
        self._VerifyNodeBridges(node_i, nresult, bridges)
3175

    
3176
        # Check whether all running instances are primary for the node. (This
3177
        # can no longer be done from _VerifyInstance below, since some of the
3178
        # wrong instances could be from other node groups.)
3179
        non_primary_inst_uuids = set(nimg.instances).difference(nimg.pinst)
3180

    
3181
        for inst_uuid in non_primary_inst_uuids:
3182
          test = inst_uuid in self.all_inst_info
3183
          self._ErrorIf(test, constants.CV_EINSTANCEWRONGNODE,
3184
                        self.cfg.GetInstanceName(inst_uuid),
3185
                        "instance should not run on node %s", node_i.name)
3186
          self._ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
3187
                        "node is running unknown instance %s", inst_uuid)
3188

    
3189
    self._VerifyGroupDRBDVersion(all_nvinfo)
3190
    self._VerifyGroupLVM(node_image, vg_name)
3191

    
3192
    for node_uuid, result in extra_lv_nvinfo.items():
3193
      self._UpdateNodeVolumes(self.all_node_info[node_uuid], result.payload,
3194
                              node_image[node_uuid], vg_name)
3195

    
3196
    feedback_fn("* Verifying instance status")
3197
    for inst_uuid in self.my_inst_uuids:
3198
      instance = self.my_inst_info[inst_uuid]
3199
      if verbose:
3200
        feedback_fn("* Verifying instance %s" % instance.name)
3201
      self._VerifyInstance(instance, node_image, instdisk[inst_uuid])
3202

    
3203
      # If the instance is non-redundant we cannot survive losing its primary
3204
      # node, so we are not N+1 compliant.
3205
      if instance.disk_template not in constants.DTS_MIRRORED:
3206
        i_non_redundant.append(instance)
3207

    
3208
      if not cluster.FillBE(instance)[constants.BE_AUTO_BALANCE]:
3209
        i_non_a_balanced.append(instance)
3210

    
3211
    feedback_fn("* Verifying orphan volumes")
3212
    reserved = utils.FieldSet(*cluster.reserved_lvs)
3213

    
3214
    # We will get spurious "unknown volume" warnings if any node of this group
3215
    # is secondary for an instance whose primary is in another group. To avoid
3216
    # them, we find these instances and add their volumes to node_vol_should.
3217
    for instance in self.all_inst_info.values():
3218
      for secondary in instance.secondary_nodes:
3219
        if (secondary in self.my_node_info
3220
            and instance.name not in self.my_inst_info):
3221
          instance.MapLVsByNode(node_vol_should)
3222
          break
3223

    
3224
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
3225

    
3226
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
3227
      feedback_fn("* Verifying N+1 Memory redundancy")
3228
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
3229

    
3230
    feedback_fn("* Other Notes")
3231
    if i_non_redundant:
3232
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
3233
                  % len(i_non_redundant))
3234

    
3235
    if i_non_a_balanced:
3236
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
3237
                  % len(i_non_a_balanced))
3238

    
3239
    if i_offline:
3240
      feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
3241

    
3242
    if n_offline:
3243
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
3244

    
3245
    if n_drained:
3246
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
3247

    
3248
    return not self.bad
3249

    
3250
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
3251
    """Analyze the post-hooks' result
3252

3253
    This method analyses the hook result, handles it, and sends some
3254
    nicely-formatted feedback back to the user.
3255

3256
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
3257
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
3258
    @param hooks_results: the results of the multi-node hooks rpc call
3259
    @param feedback_fn: function used send feedback back to the caller
3260
    @param lu_result: previous Exec result
3261
    @return: the new Exec result, based on the previous result
3262
        and hook results
3263

3264
    """
3265
    # We only really run POST phase hooks, only for non-empty groups,
3266
    # and are only interested in their results
3267
    if not self.my_node_uuids:
3268
      # empty node group
3269
      pass
3270
    elif phase == constants.HOOKS_PHASE_POST:
3271
      # Used to change hooks' output to proper indentation
3272
      feedback_fn("* Hooks Results")
3273
      assert hooks_results, "invalid result from hooks"
3274

    
3275
      for node_name in hooks_results:
3276
        res = hooks_results[node_name]
3277
        msg = res.fail_msg
3278
        test = msg and not res.offline
3279
        self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3280
                      "Communication failure in hooks execution: %s", msg)
3281
        if test:
3282
          lu_result = False
3283
          continue
3284
        if res.offline:
3285
          # No need to investigate payload if node is offline
3286
          continue
3287
        for script, hkr, output in res.payload:
3288
          test = hkr == constants.HKR_FAIL
3289
          self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3290
                        "Script %s failed, output:", script)
3291
          if test:
3292
            output = self._HOOKS_INDENT_RE.sub("      ", output)
3293
            feedback_fn("%s" % output)
3294
            lu_result = False
3295

    
3296
    return lu_result
3297

    
3298

    
3299
class LUClusterVerifyDisks(NoHooksLU):
3300
  """Verifies the cluster disks status.
3301

3302
  """
3303
  REQ_BGL = False
3304

    
3305
  def ExpandNames(self):
3306
    self.share_locks = ShareAll()
3307
    self.needed_locks = {
3308
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
3309
      }
3310

    
3311
  def Exec(self, feedback_fn):
3312
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3313

    
3314
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3315
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3316
                           for group in group_names])