Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / cluster.py @ a9f33339

History | View | Annotate | Download (122.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with the cluster."""
23

    
24
import OpenSSL
25

    
26
import copy
27
import itertools
28
import logging
29
import operator
30
import os
31
import re
32
import time
33

    
34
from ganeti import compat
35
from ganeti import constants
36
from ganeti import errors
37
from ganeti import hypervisor
38
from ganeti import locking
39
from ganeti import masterd
40
from ganeti import netutils
41
from ganeti import objects
42
from ganeti import opcodes
43
from ganeti import pathutils
44
from ganeti import query
45
from ganeti import rpc
46
from ganeti import runtime
47
from ganeti import ssh
48
from ganeti import uidpool
49
from ganeti import utils
50
from ganeti import vcluster
51

    
52
from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
53
  ResultWithJobs
54
from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55
  ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56
  GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57
  GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58
  CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59
  ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob, \
60
  CheckIpolicyVsDiskTemplates, CheckDiskAccessModeValidity, \
61
  CheckDiskAccessModeConsistency
62

    
63
import ganeti.masterd.instance
64

    
65

    
66
class LUClusterActivateMasterIp(NoHooksLU):
67
  """Activate the master IP on the master node.
68

69
  """
70
  def Exec(self, feedback_fn):
71
    """Activate the master IP.
72

73
    """
74
    master_params = self.cfg.GetMasterNetworkParameters()
75
    ems = self.cfg.GetUseExternalMipScript()
76
    result = self.rpc.call_node_activate_master_ip(master_params.uuid,
77
                                                   master_params, ems)
78
    result.Raise("Could not activate the master IP")
79

    
80

    
81
class LUClusterDeactivateMasterIp(NoHooksLU):
82
  """Deactivate the master IP on the master node.
83

84
  """
85
  def Exec(self, feedback_fn):
86
    """Deactivate the master IP.
87

88
    """
89
    master_params = self.cfg.GetMasterNetworkParameters()
90
    ems = self.cfg.GetUseExternalMipScript()
91
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
92
                                                     master_params, ems)
93
    result.Raise("Could not deactivate the master IP")
94

    
95

    
96
class LUClusterConfigQuery(NoHooksLU):
97
  """Return configuration values.
98

99
  """
100
  REQ_BGL = False
101

    
102
  def CheckArguments(self):
103
    self.cq = ClusterQuery(None, self.op.output_fields, False)
104

    
105
  def ExpandNames(self):
106
    self.cq.ExpandNames(self)
107

    
108
  def DeclareLocks(self, level):
109
    self.cq.DeclareLocks(self, level)
110

    
111
  def Exec(self, feedback_fn):
112
    result = self.cq.OldStyleQuery(self)
113

    
114
    assert len(result) == 1
115

    
116
    return result[0]
117

    
118

    
119
class LUClusterDestroy(LogicalUnit):
120
  """Logical unit for destroying the cluster.
121

122
  """
123
  HPATH = "cluster-destroy"
124
  HTYPE = constants.HTYPE_CLUSTER
125

    
126
  def BuildHooksEnv(self):
127
    """Build hooks env.
128

129
    """
130
    return {
131
      "OP_TARGET": self.cfg.GetClusterName(),
132
      }
133

    
134
  def BuildHooksNodes(self):
135
    """Build hooks nodes.
136

137
    """
138
    return ([], [])
139

    
140
  def CheckPrereq(self):
141
    """Check prerequisites.
142

143
    This checks whether the cluster is empty.
144

145
    Any errors are signaled by raising errors.OpPrereqError.
146

147
    """
148
    master = self.cfg.GetMasterNode()
149

    
150
    nodelist = self.cfg.GetNodeList()
151
    if len(nodelist) != 1 or nodelist[0] != master:
152
      raise errors.OpPrereqError("There are still %d node(s) in"
153
                                 " this cluster." % (len(nodelist) - 1),
154
                                 errors.ECODE_INVAL)
155
    instancelist = self.cfg.GetInstanceList()
156
    if instancelist:
157
      raise errors.OpPrereqError("There are still %d instance(s) in"
158
                                 " this cluster." % len(instancelist),
159
                                 errors.ECODE_INVAL)
160

    
161
  def Exec(self, feedback_fn):
162
    """Destroys the cluster.
163

164
    """
165
    master_params = self.cfg.GetMasterNetworkParameters()
166

    
167
    # Run post hooks on master node before it's removed
168
    RunPostHook(self, self.cfg.GetNodeName(master_params.uuid))
169

    
170
    ems = self.cfg.GetUseExternalMipScript()
171
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
172
                                                     master_params, ems)
173
    result.Warn("Error disabling the master IP address", self.LogWarning)
174
    return master_params.uuid
175

    
176

    
177
class LUClusterPostInit(LogicalUnit):
178
  """Logical unit for running hooks after cluster initialization.
179

180
  """
181
  HPATH = "cluster-init"
182
  HTYPE = constants.HTYPE_CLUSTER
183

    
184
  def CheckArguments(self):
185
    self.master_uuid = self.cfg.GetMasterNode()
186
    self.master_ndparams = self.cfg.GetNdParams(self.cfg.GetMasterNodeInfo())
187

    
188
    # TODO: When Issue 584 is solved, and None is properly parsed when used
189
    # as a default value, ndparams.get(.., None) can be changed to
190
    # ndparams[..] to access the values directly
191

    
192
    # OpenvSwitch: Warn user if link is missing
193
    if (self.master_ndparams[constants.ND_OVS] and not
194
        self.master_ndparams.get(constants.ND_OVS_LINK, None)):
195
      self.LogInfo("No physical interface for OpenvSwitch was given."
196
                   " OpenvSwitch will not have an outside connection. This"
197
                   " might not be what you want.")
198

    
199
    # OpenvSwitch: Warn if parameters are given, but OVS is not enabled.
200
    if (not self.master_ndparams[constants.ND_OVS] and
201
        (self.master_ndparams[constants.ND_OVS_NAME] or
202
         self.master_ndparams.get(constants.ND_OVS_LINK, None))):
203
      self.LogInfo("OpenvSwitch name or link were given, but"
204
                   " OpenvSwitch is not enabled. Please enable"
205
                   " OpenvSwitch with 'ovs=true' or create it manually")
206

    
207
  def BuildHooksEnv(self):
208
    """Build hooks env.
209

210
    """
211
    return {
212
      "OP_TARGET": self.cfg.GetClusterName(),
213
      }
214

    
215
  def BuildHooksNodes(self):
216
    """Build hooks nodes.
217

218
    """
219
    return ([], [self.cfg.GetMasterNode()])
220

    
221
  def Exec(self, feedback_fn):
222
    """Create and configure Open vSwitch
223

224
    """
225
    if self.master_ndparams[constants.ND_OVS]:
226
      result = self.rpc.call_node_configure_ovs(
227
                 self.master_uuid,
228
                 self.master_ndparams[constants.ND_OVS_NAME],
229
                 self.master_ndparams.get(constants.ND_OVS_LINK, None))
230
      result.Raise("Could not successully configure Open vSwitch")
231
    return True
232

    
233

    
234
class ClusterQuery(QueryBase):
235
  FIELDS = query.CLUSTER_FIELDS
236

    
237
  #: Do not sort (there is only one item)
238
  SORT_FIELD = None
239

    
240
  def ExpandNames(self, lu):
241
    lu.needed_locks = {}
242

    
243
    # The following variables interact with _QueryBase._GetNames
244
    self.wanted = locking.ALL_SET
245
    self.do_locking = self.use_locking
246

    
247
    if self.do_locking:
248
      raise errors.OpPrereqError("Can not use locking for cluster queries",
249
                                 errors.ECODE_INVAL)
250

    
251
  def DeclareLocks(self, lu, level):
252
    pass
253

    
254
  def _GetQueryData(self, lu):
255
    """Computes the list of nodes and their attributes.
256

257
    """
258
    # Locking is not used
259
    assert not (compat.any(lu.glm.is_owned(level)
260
                           for level in locking.LEVELS
261
                           if level != locking.LEVEL_CLUSTER) or
262
                self.do_locking or self.use_locking)
263

    
264
    if query.CQ_CONFIG in self.requested_data:
265
      cluster = lu.cfg.GetClusterInfo()
266
      nodes = lu.cfg.GetAllNodesInfo()
267
    else:
268
      cluster = NotImplemented
269
      nodes = NotImplemented
270

    
271
    if query.CQ_QUEUE_DRAINED in self.requested_data:
272
      drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
273
    else:
274
      drain_flag = NotImplemented
275

    
276
    if query.CQ_WATCHER_PAUSE in self.requested_data:
277
      master_node_uuid = lu.cfg.GetMasterNode()
278

    
279
      result = lu.rpc.call_get_watcher_pause(master_node_uuid)
280
      result.Raise("Can't retrieve watcher pause from master node '%s'" %
281
                   lu.cfg.GetMasterNodeName())
282

    
283
      watcher_pause = result.payload
284
    else:
285
      watcher_pause = NotImplemented
286

    
287
    return query.ClusterQueryData(cluster, nodes, drain_flag, watcher_pause)
288

    
289

    
290
class LUClusterQuery(NoHooksLU):
291
  """Query cluster configuration.
292

293
  """
294
  REQ_BGL = False
295

    
296
  def ExpandNames(self):
297
    self.needed_locks = {}
298

    
299
  def Exec(self, feedback_fn):
300
    """Return cluster config.
301

302
    """
303
    cluster = self.cfg.GetClusterInfo()
304
    os_hvp = {}
305

    
306
    # Filter just for enabled hypervisors
307
    for os_name, hv_dict in cluster.os_hvp.items():
308
      os_hvp[os_name] = {}
309
      for hv_name, hv_params in hv_dict.items():
310
        if hv_name in cluster.enabled_hypervisors:
311
          os_hvp[os_name][hv_name] = hv_params
312

    
313
    # Convert ip_family to ip_version
314
    primary_ip_version = constants.IP4_VERSION
315
    if cluster.primary_ip_family == netutils.IP6Address.family:
316
      primary_ip_version = constants.IP6_VERSION
317

    
318
    result = {
319
      "software_version": constants.RELEASE_VERSION,
320
      "protocol_version": constants.PROTOCOL_VERSION,
321
      "config_version": constants.CONFIG_VERSION,
322
      "os_api_version": max(constants.OS_API_VERSIONS),
323
      "export_version": constants.EXPORT_VERSION,
324
      "vcs_version": constants.VCS_VERSION,
325
      "architecture": runtime.GetArchInfo(),
326
      "name": cluster.cluster_name,
327
      "master": self.cfg.GetMasterNodeName(),
328
      "default_hypervisor": cluster.primary_hypervisor,
329
      "enabled_hypervisors": cluster.enabled_hypervisors,
330
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
331
                        for hypervisor_name in cluster.enabled_hypervisors]),
332
      "os_hvp": os_hvp,
333
      "beparams": cluster.beparams,
334
      "osparams": cluster.osparams,
335
      "ipolicy": cluster.ipolicy,
336
      "nicparams": cluster.nicparams,
337
      "ndparams": cluster.ndparams,
338
      "diskparams": cluster.diskparams,
339
      "candidate_pool_size": cluster.candidate_pool_size,
340
      "master_netdev": cluster.master_netdev,
341
      "master_netmask": cluster.master_netmask,
342
      "use_external_mip_script": cluster.use_external_mip_script,
343
      "volume_group_name": cluster.volume_group_name,
344
      "drbd_usermode_helper": cluster.drbd_usermode_helper,
345
      "file_storage_dir": cluster.file_storage_dir,
346
      "shared_file_storage_dir": cluster.shared_file_storage_dir,
347
      "maintain_node_health": cluster.maintain_node_health,
348
      "ctime": cluster.ctime,
349
      "mtime": cluster.mtime,
350
      "uuid": cluster.uuid,
351
      "tags": list(cluster.GetTags()),
352
      "uid_pool": cluster.uid_pool,
353
      "default_iallocator": cluster.default_iallocator,
354
      "reserved_lvs": cluster.reserved_lvs,
355
      "primary_ip_version": primary_ip_version,
356
      "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
357
      "hidden_os": cluster.hidden_os,
358
      "blacklisted_os": cluster.blacklisted_os,
359
      "enabled_disk_templates": cluster.enabled_disk_templates,
360
      }
361

    
362
    return result
363

    
364

    
365
class LUClusterRedistConf(NoHooksLU):
366
  """Force the redistribution of cluster configuration.
367

368
  This is a very simple LU.
369

370
  """
371
  REQ_BGL = False
372

    
373
  def ExpandNames(self):
374
    self.needed_locks = {
375
      locking.LEVEL_NODE: locking.ALL_SET,
376
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
377
    }
378
    self.share_locks = ShareAll()
379

    
380
  def Exec(self, feedback_fn):
381
    """Redistribute the configuration.
382

383
    """
384
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
385
    RedistributeAncillaryFiles(self)
386

    
387

    
388
class LUClusterRename(LogicalUnit):
389
  """Rename the cluster.
390

391
  """
392
  HPATH = "cluster-rename"
393
  HTYPE = constants.HTYPE_CLUSTER
394

    
395
  def BuildHooksEnv(self):
396
    """Build hooks env.
397

398
    """
399
    return {
400
      "OP_TARGET": self.cfg.GetClusterName(),
401
      "NEW_NAME": self.op.name,
402
      }
403

    
404
  def BuildHooksNodes(self):
405
    """Build hooks nodes.
406

407
    """
408
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
409

    
410
  def CheckPrereq(self):
411
    """Verify that the passed name is a valid one.
412

413
    """
414
    hostname = netutils.GetHostname(name=self.op.name,
415
                                    family=self.cfg.GetPrimaryIPFamily())
416

    
417
    new_name = hostname.name
418
    self.ip = new_ip = hostname.ip
419
    old_name = self.cfg.GetClusterName()
420
    old_ip = self.cfg.GetMasterIP()
421
    if new_name == old_name and new_ip == old_ip:
422
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
423
                                 " cluster has changed",
424
                                 errors.ECODE_INVAL)
425
    if new_ip != old_ip:
426
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
427
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
428
                                   " reachable on the network" %
429
                                   new_ip, errors.ECODE_NOTUNIQUE)
430

    
431
    self.op.name = new_name
432

    
433
  def Exec(self, feedback_fn):
434
    """Rename the cluster.
435

436
    """
437
    clustername = self.op.name
438
    new_ip = self.ip
439

    
440
    # shutdown the master IP
441
    master_params = self.cfg.GetMasterNetworkParameters()
442
    ems = self.cfg.GetUseExternalMipScript()
443
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
444
                                                     master_params, ems)
445
    result.Raise("Could not disable the master role")
446

    
447
    try:
448
      cluster = self.cfg.GetClusterInfo()
449
      cluster.cluster_name = clustername
450
      cluster.master_ip = new_ip
451
      self.cfg.Update(cluster, feedback_fn)
452

    
453
      # update the known hosts file
454
      ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
455
      node_list = self.cfg.GetOnlineNodeList()
456
      try:
457
        node_list.remove(master_params.uuid)
458
      except ValueError:
459
        pass
460
      UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
461
    finally:
462
      master_params.ip = new_ip
463
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
464
                                                     master_params, ems)
465
      result.Warn("Could not re-enable the master role on the master,"
466
                  " please restart manually", self.LogWarning)
467

    
468
    return clustername
469

    
470

    
471
class LUClusterRepairDiskSizes(NoHooksLU):
472
  """Verifies the cluster disks sizes.
473

474
  """
475
  REQ_BGL = False
476

    
477
  def ExpandNames(self):
478
    if self.op.instances:
479
      (_, self.wanted_names) = GetWantedInstances(self, self.op.instances)
480
      # Not getting the node allocation lock as only a specific set of
481
      # instances (and their nodes) is going to be acquired
482
      self.needed_locks = {
483
        locking.LEVEL_NODE_RES: [],
484
        locking.LEVEL_INSTANCE: self.wanted_names,
485
        }
486
      self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
487
    else:
488
      self.wanted_names = None
489
      self.needed_locks = {
490
        locking.LEVEL_NODE_RES: locking.ALL_SET,
491
        locking.LEVEL_INSTANCE: locking.ALL_SET,
492

    
493
        # This opcode is acquires the node locks for all instances
494
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
495
        }
496

    
497
    self.share_locks = {
498
      locking.LEVEL_NODE_RES: 1,
499
      locking.LEVEL_INSTANCE: 0,
500
      locking.LEVEL_NODE_ALLOC: 1,
501
      }
502

    
503
  def DeclareLocks(self, level):
504
    if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
505
      self._LockInstancesNodes(primary_only=True, level=level)
506

    
507
  def CheckPrereq(self):
508
    """Check prerequisites.
509

510
    This only checks the optional instance list against the existing names.
511

512
    """
513
    if self.wanted_names is None:
514
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
515

    
516
    self.wanted_instances = \
517
        map(compat.snd, self.cfg.GetMultiInstanceInfoByName(self.wanted_names))
518

    
519
  def _EnsureChildSizes(self, disk):
520
    """Ensure children of the disk have the needed disk size.
521

522
    This is valid mainly for DRBD8 and fixes an issue where the
523
    children have smaller disk size.
524

525
    @param disk: an L{ganeti.objects.Disk} object
526

527
    """
528
    if disk.dev_type == constants.DT_DRBD8:
529
      assert disk.children, "Empty children for DRBD8?"
530
      fchild = disk.children[0]
531
      mismatch = fchild.size < disk.size
532
      if mismatch:
533
        self.LogInfo("Child disk has size %d, parent %d, fixing",
534
                     fchild.size, disk.size)
535
        fchild.size = disk.size
536

    
537
      # and we recurse on this child only, not on the metadev
538
      return self._EnsureChildSizes(fchild) or mismatch
539
    else:
540
      return False
541

    
542
  def Exec(self, feedback_fn):
543
    """Verify the size of cluster disks.
544

545
    """
546
    # TODO: check child disks too
547
    # TODO: check differences in size between primary/secondary nodes
548
    per_node_disks = {}
549
    for instance in self.wanted_instances:
550
      pnode = instance.primary_node
551
      if pnode not in per_node_disks:
552
        per_node_disks[pnode] = []
553
      for idx, disk in enumerate(instance.disks):
554
        per_node_disks[pnode].append((instance, idx, disk))
555

    
556
    assert not (frozenset(per_node_disks.keys()) -
557
                self.owned_locks(locking.LEVEL_NODE_RES)), \
558
      "Not owning correct locks"
559
    assert not self.owned_locks(locking.LEVEL_NODE)
560

    
561
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
562
                                               per_node_disks.keys())
563

    
564
    changed = []
565
    for node_uuid, dskl in per_node_disks.items():
566
      if not dskl:
567
        # no disks on the node
568
        continue
569

    
570
      newl = [([v[2].Copy()], v[0]) for v in dskl]
571
      node_name = self.cfg.GetNodeName(node_uuid)
572
      result = self.rpc.call_blockdev_getdimensions(node_uuid, newl)
573
      if result.fail_msg:
574
        self.LogWarning("Failure in blockdev_getdimensions call to node"
575
                        " %s, ignoring", node_name)
576
        continue
577
      if len(result.payload) != len(dskl):
578
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
579
                        " result.payload=%s", node_name, len(dskl),
580
                        result.payload)
581
        self.LogWarning("Invalid result from node %s, ignoring node results",
582
                        node_name)
583
        continue
584
      for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
585
        if dimensions is None:
586
          self.LogWarning("Disk %d of instance %s did not return size"
587
                          " information, ignoring", idx, instance.name)
588
          continue
589
        if not isinstance(dimensions, (tuple, list)):
590
          self.LogWarning("Disk %d of instance %s did not return valid"
591
                          " dimension information, ignoring", idx,
592
                          instance.name)
593
          continue
594
        (size, spindles) = dimensions
595
        if not isinstance(size, (int, long)):
596
          self.LogWarning("Disk %d of instance %s did not return valid"
597
                          " size information, ignoring", idx, instance.name)
598
          continue
599
        size = size >> 20
600
        if size != disk.size:
601
          self.LogInfo("Disk %d of instance %s has mismatched size,"
602
                       " correcting: recorded %d, actual %d", idx,
603
                       instance.name, disk.size, size)
604
          disk.size = size
605
          self.cfg.Update(instance, feedback_fn)
606
          changed.append((instance.name, idx, "size", size))
607
        if es_flags[node_uuid]:
608
          if spindles is None:
609
            self.LogWarning("Disk %d of instance %s did not return valid"
610
                            " spindles information, ignoring", idx,
611
                            instance.name)
612
          elif disk.spindles is None or disk.spindles != spindles:
613
            self.LogInfo("Disk %d of instance %s has mismatched spindles,"
614
                         " correcting: recorded %s, actual %s",
615
                         idx, instance.name, disk.spindles, spindles)
616
            disk.spindles = spindles
617
            self.cfg.Update(instance, feedback_fn)
618
            changed.append((instance.name, idx, "spindles", disk.spindles))
619
        if self._EnsureChildSizes(disk):
620
          self.cfg.Update(instance, feedback_fn)
621
          changed.append((instance.name, idx, "size", disk.size))
622
    return changed
623

    
624

    
625
def _ValidateNetmask(cfg, netmask):
626
  """Checks if a netmask is valid.
627

628
  @type cfg: L{config.ConfigWriter}
629
  @param cfg: The cluster configuration
630
  @type netmask: int
631
  @param netmask: the netmask to be verified
632
  @raise errors.OpPrereqError: if the validation fails
633

634
  """
635
  ip_family = cfg.GetPrimaryIPFamily()
636
  try:
637
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
638
  except errors.ProgrammerError:
639
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
640
                               ip_family, errors.ECODE_INVAL)
641
  if not ipcls.ValidateNetmask(netmask):
642
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
643
                               (netmask), errors.ECODE_INVAL)
644

    
645

    
646
def CheckFileBasedStoragePathVsEnabledDiskTemplates(
647
    logging_warn_fn, file_storage_dir, enabled_disk_templates,
648
    file_disk_template):
649
  """Checks whether the given file-based storage directory is acceptable.
650

651
  Note: This function is public, because it is also used in bootstrap.py.
652

653
  @type logging_warn_fn: function
654
  @param logging_warn_fn: function which accepts a string and logs it
655
  @type file_storage_dir: string
656
  @param file_storage_dir: the directory to be used for file-based instances
657
  @type enabled_disk_templates: list of string
658
  @param enabled_disk_templates: the list of enabled disk templates
659
  @type file_disk_template: string
660
  @param file_disk_template: the file-based disk template for which the
661
      path should be checked
662

663
  """
664
  assert (file_disk_template in
665
          utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
666
  file_storage_enabled = file_disk_template in enabled_disk_templates
667
  if file_storage_dir is not None:
668
    if file_storage_dir == "":
669
      if file_storage_enabled:
670
        raise errors.OpPrereqError(
671
            "Unsetting the '%s' storage directory while having '%s' storage"
672
            " enabled is not permitted." %
673
            (file_disk_template, file_disk_template))
674
    else:
675
      if not file_storage_enabled:
676
        logging_warn_fn(
677
            "Specified a %s storage directory, although %s storage is not"
678
            " enabled." % (file_disk_template, file_disk_template))
679
  else:
680
    raise errors.ProgrammerError("Received %s storage dir with value"
681
                                 " 'None'." % file_disk_template)
682

    
683

    
684
def CheckFileStoragePathVsEnabledDiskTemplates(
685
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
686
  """Checks whether the given file storage directory is acceptable.
687

688
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
689

690
  """
691
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
692
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
693
      constants.DT_FILE)
694

    
695

    
696
def CheckSharedFileStoragePathVsEnabledDiskTemplates(
697
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
698
  """Checks whether the given shared file storage directory is acceptable.
699

700
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
701

702
  """
703
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
704
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
705
      constants.DT_SHARED_FILE)
706

    
707

    
708
class LUClusterSetParams(LogicalUnit):
709
  """Change the parameters of the cluster.
710

711
  """
712
  HPATH = "cluster-modify"
713
  HTYPE = constants.HTYPE_CLUSTER
714
  REQ_BGL = False
715

    
716
  def CheckArguments(self):
717
    """Check parameters
718

719
    """
720
    if self.op.uid_pool:
721
      uidpool.CheckUidPool(self.op.uid_pool)
722

    
723
    if self.op.add_uids:
724
      uidpool.CheckUidPool(self.op.add_uids)
725

    
726
    if self.op.remove_uids:
727
      uidpool.CheckUidPool(self.op.remove_uids)
728

    
729
    if self.op.master_netmask is not None:
730
      _ValidateNetmask(self.cfg, self.op.master_netmask)
731

    
732
    if self.op.diskparams:
733
      for dt_params in self.op.diskparams.values():
734
        utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
735
      try:
736
        utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
737
        CheckDiskAccessModeValidity(self.op.diskparams)
738
      except errors.OpPrereqError, err:
739
        raise errors.OpPrereqError("While verify diskparams options: %s" % err,
740
                                   errors.ECODE_INVAL)
741

    
742
  def ExpandNames(self):
743
    # FIXME: in the future maybe other cluster params won't require checking on
744
    # all nodes to be modified.
745
    # FIXME: This opcode changes cluster-wide settings. Is acquiring all
746
    # resource locks the right thing, shouldn't it be the BGL instead?
747
    self.needed_locks = {
748
      locking.LEVEL_NODE: locking.ALL_SET,
749
      locking.LEVEL_INSTANCE: locking.ALL_SET,
750
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
751
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
752
    }
753
    self.share_locks = ShareAll()
754

    
755
  def BuildHooksEnv(self):
756
    """Build hooks env.
757

758
    """
759
    return {
760
      "OP_TARGET": self.cfg.GetClusterName(),
761
      "NEW_VG_NAME": self.op.vg_name,
762
      }
763

    
764
  def BuildHooksNodes(self):
765
    """Build hooks nodes.
766

767
    """
768
    mn = self.cfg.GetMasterNode()
769
    return ([mn], [mn])
770

    
771
  def _CheckVgName(self, node_uuids, enabled_disk_templates,
772
                   new_enabled_disk_templates):
773
    """Check the consistency of the vg name on all nodes and in case it gets
774
       unset whether there are instances still using it.
775

776
    """
777
    lvm_is_enabled = utils.IsLvmEnabled(enabled_disk_templates)
778
    lvm_gets_enabled = utils.LvmGetsEnabled(enabled_disk_templates,
779
                                            new_enabled_disk_templates)
780
    current_vg_name = self.cfg.GetVGName()
781

    
782
    if self.op.vg_name == '':
783
      if lvm_is_enabled:
784
        raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
785
                                   " disk templates are or get enabled.")
786

    
787
    if self.op.vg_name is None:
788
      if current_vg_name is None and lvm_is_enabled:
789
        raise errors.OpPrereqError("Please specify a volume group when"
790
                                   " enabling lvm-based disk-templates.")
791

    
792
    if self.op.vg_name is not None and not self.op.vg_name:
793
      if self.cfg.HasAnyDiskOfType(constants.DT_PLAIN):
794
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
795
                                   " instances exist", errors.ECODE_INVAL)
796

    
797
    if (self.op.vg_name is not None and lvm_is_enabled) or \
798
        (self.cfg.GetVGName() is not None and lvm_gets_enabled):
799
      self._CheckVgNameOnNodes(node_uuids)
800

    
801
  def _CheckVgNameOnNodes(self, node_uuids):
802
    """Check the status of the volume group on each node.
803

804
    """
805
    vglist = self.rpc.call_vg_list(node_uuids)
806
    for node_uuid in node_uuids:
807
      msg = vglist[node_uuid].fail_msg
808
      if msg:
809
        # ignoring down node
810
        self.LogWarning("Error while gathering data on node %s"
811
                        " (ignoring node): %s",
812
                        self.cfg.GetNodeName(node_uuid), msg)
813
        continue
814
      vgstatus = utils.CheckVolumeGroupSize(vglist[node_uuid].payload,
815
                                            self.op.vg_name,
816
                                            constants.MIN_VG_SIZE)
817
      if vgstatus:
818
        raise errors.OpPrereqError("Error on node '%s': %s" %
819
                                   (self.cfg.GetNodeName(node_uuid), vgstatus),
820
                                   errors.ECODE_ENVIRON)
821

    
822
  @staticmethod
823
  def _GetDiskTemplateSetsInner(op_enabled_disk_templates,
824
                                old_enabled_disk_templates):
825
    """Computes three sets of disk templates.
826

827
    @see: C{_GetDiskTemplateSets} for more details.
828

829
    """
830
    enabled_disk_templates = None
831
    new_enabled_disk_templates = []
832
    disabled_disk_templates = []
833
    if op_enabled_disk_templates:
834
      enabled_disk_templates = op_enabled_disk_templates
835
      new_enabled_disk_templates = \
836
        list(set(enabled_disk_templates)
837
             - set(old_enabled_disk_templates))
838
      disabled_disk_templates = \
839
        list(set(old_enabled_disk_templates)
840
             - set(enabled_disk_templates))
841
    else:
842
      enabled_disk_templates = old_enabled_disk_templates
843
    return (enabled_disk_templates, new_enabled_disk_templates,
844
            disabled_disk_templates)
845

    
846
  def _GetDiskTemplateSets(self, cluster):
847
    """Computes three sets of disk templates.
848

849
    The three sets are:
850
      - disk templates that will be enabled after this operation (no matter if
851
        they were enabled before or not)
852
      - disk templates that get enabled by this operation (thus haven't been
853
        enabled before.)
854
      - disk templates that get disabled by this operation
855

856
    """
857
    return self._GetDiskTemplateSetsInner(self.op.enabled_disk_templates,
858
                                          cluster.enabled_disk_templates)
859

    
860
  def _CheckIpolicy(self, cluster, enabled_disk_templates):
861
    """Checks the ipolicy.
862

863
    @type cluster: C{objects.Cluster}
864
    @param cluster: the cluster's configuration
865
    @type enabled_disk_templates: list of string
866
    @param enabled_disk_templates: list of (possibly newly) enabled disk
867
      templates
868

869
    """
870
    # FIXME: write unit tests for this
871
    if self.op.ipolicy:
872
      self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
873
                                           group_policy=False)
874

    
875
      CheckIpolicyVsDiskTemplates(self.new_ipolicy,
876
                                  enabled_disk_templates)
877

    
878
      all_instances = self.cfg.GetAllInstancesInfo().values()
879
      violations = set()
880
      for group in self.cfg.GetAllNodeGroupsInfo().values():
881
        instances = frozenset([inst for inst in all_instances
882
                               if compat.any(nuuid in group.members
883
                                             for nuuid in inst.all_nodes)])
884
        new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
885
        ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
886
        new = ComputeNewInstanceViolations(ipol, new_ipolicy, instances,
887
                                           self.cfg)
888
        if new:
889
          violations.update(new)
890

    
891
      if violations:
892
        self.LogWarning("After the ipolicy change the following instances"
893
                        " violate them: %s",
894
                        utils.CommaJoin(utils.NiceSort(violations)))
895
    else:
896
      CheckIpolicyVsDiskTemplates(cluster.ipolicy,
897
                                  enabled_disk_templates)
898

    
899
  def _CheckDrbdHelperOnNodes(self, drbd_helper, node_uuids):
900
    """Checks whether the set DRBD helper actually exists on the nodes.
901

902
    @type drbd_helper: string
903
    @param drbd_helper: path of the drbd usermode helper binary
904
    @type node_uuids: list of strings
905
    @param node_uuids: list of node UUIDs to check for the helper
906

907
    """
908
    # checks given drbd helper on all nodes
909
    helpers = self.rpc.call_drbd_helper(node_uuids)
910
    for (_, ninfo) in self.cfg.GetMultiNodeInfo(node_uuids):
911
      if ninfo.offline:
912
        self.LogInfo("Not checking drbd helper on offline node %s",
913
                     ninfo.name)
914
        continue
915
      msg = helpers[ninfo.uuid].fail_msg
916
      if msg:
917
        raise errors.OpPrereqError("Error checking drbd helper on node"
918
                                   " '%s': %s" % (ninfo.name, msg),
919
                                   errors.ECODE_ENVIRON)
920
      node_helper = helpers[ninfo.uuid].payload
921
      if node_helper != drbd_helper:
922
        raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
923
                                   (ninfo.name, node_helper),
924
                                   errors.ECODE_ENVIRON)
925

    
926
  def _CheckDrbdHelper(self, node_uuids, drbd_enabled, drbd_gets_enabled):
927
    """Check the DRBD usermode helper.
928

929
    @type node_uuids: list of strings
930
    @param node_uuids: a list of nodes' UUIDs
931
    @type drbd_enabled: boolean
932
    @param drbd_enabled: whether DRBD will be enabled after this operation
933
      (no matter if it was disabled before or not)
934
    @type drbd_gets_enabled: boolen
935
    @param drbd_gets_enabled: true if DRBD was disabled before this
936
      operation, but will be enabled afterwards
937

938
    """
939
    if self.op.drbd_helper == '':
940
      if drbd_enabled:
941
        raise errors.OpPrereqError("Cannot disable drbd helper while"
942
                                   " DRBD is enabled.")
943
      if self.cfg.HasAnyDiskOfType(constants.DT_DRBD8):
944
        raise errors.OpPrereqError("Cannot disable drbd helper while"
945
                                   " drbd-based instances exist",
946
                                   errors.ECODE_INVAL)
947

    
948
    else:
949
      if self.op.drbd_helper is not None and drbd_enabled:
950
        self._CheckDrbdHelperOnNodes(self.op.drbd_helper, node_uuids)
951
      else:
952
        if drbd_gets_enabled:
953
          current_drbd_helper = self.cfg.GetClusterInfo().drbd_usermode_helper
954
          if current_drbd_helper is not None:
955
            self._CheckDrbdHelperOnNodes(current_drbd_helper, node_uuids)
956
          else:
957
            raise errors.OpPrereqError("Cannot enable DRBD without a"
958
                                       " DRBD usermode helper set.")
959

    
960
  def _CheckInstancesOfDisabledDiskTemplates(
961
      self, disabled_disk_templates):
962
    """Check whether we try to disable a disk template that is in use.
963

964
    @type disabled_disk_templates: list of string
965
    @param disabled_disk_templates: list of disk templates that are going to
966
      be disabled by this operation
967

968
    """
969
    for disk_template in disabled_disk_templates:
970
      if self.cfg.HasAnyDiskOfType(disk_template):
971
        raise errors.OpPrereqError(
972
            "Cannot disable disk template '%s', because there is at least one"
973
            " instance using it." % disk_template)
974

    
975
  def CheckPrereq(self):
976
    """Check prerequisites.
977

978
    This checks whether the given params don't conflict and
979
    if the given volume group is valid.
980

981
    """
982
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
983
    self.cluster = cluster = self.cfg.GetClusterInfo()
984

    
985
    vm_capable_node_uuids = [node.uuid
986
                             for node in self.cfg.GetAllNodesInfo().values()
987
                             if node.uuid in node_uuids and node.vm_capable]
988

    
989
    (enabled_disk_templates, new_enabled_disk_templates,
990
      disabled_disk_templates) = self._GetDiskTemplateSets(cluster)
991
    self._CheckInstancesOfDisabledDiskTemplates(disabled_disk_templates)
992

    
993
    self._CheckVgName(vm_capable_node_uuids, enabled_disk_templates,
994
                      new_enabled_disk_templates)
995

    
996
    if self.op.file_storage_dir is not None:
997
      CheckFileStoragePathVsEnabledDiskTemplates(
998
          self.LogWarning, self.op.file_storage_dir, enabled_disk_templates)
999

    
1000
    if self.op.shared_file_storage_dir is not None:
1001
      CheckSharedFileStoragePathVsEnabledDiskTemplates(
1002
          self.LogWarning, self.op.shared_file_storage_dir,
1003
          enabled_disk_templates)
1004

    
1005
    drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
1006
    drbd_gets_enabled = constants.DT_DRBD8 in new_enabled_disk_templates
1007
    self._CheckDrbdHelper(node_uuids, drbd_enabled, drbd_gets_enabled)
1008

    
1009
    # validate params changes
1010
    if self.op.beparams:
1011
      objects.UpgradeBeParams(self.op.beparams)
1012
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1013
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
1014

    
1015
    if self.op.ndparams:
1016
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
1017
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
1018

    
1019
      # TODO: we need a more general way to handle resetting
1020
      # cluster-level parameters to default values
1021
      if self.new_ndparams["oob_program"] == "":
1022
        self.new_ndparams["oob_program"] = \
1023
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
1024

    
1025
    if self.op.hv_state:
1026
      new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
1027
                                           self.cluster.hv_state_static)
1028
      self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
1029
                               for hv, values in new_hv_state.items())
1030

    
1031
    if self.op.disk_state:
1032
      new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
1033
                                               self.cluster.disk_state_static)
1034
      self.new_disk_state = \
1035
        dict((storage, dict((name, cluster.SimpleFillDiskState(values))
1036
                            for name, values in svalues.items()))
1037
             for storage, svalues in new_disk_state.items())
1038

    
1039
    self._CheckIpolicy(cluster, enabled_disk_templates)
1040

    
1041
    if self.op.nicparams:
1042
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1043
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
1044
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
1045
      nic_errors = []
1046

    
1047
      # check all instances for consistency
1048
      for instance in self.cfg.GetAllInstancesInfo().values():
1049
        for nic_idx, nic in enumerate(instance.nics):
1050
          params_copy = copy.deepcopy(nic.nicparams)
1051
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
1052

    
1053
          # check parameter syntax
1054
          try:
1055
            objects.NIC.CheckParameterSyntax(params_filled)
1056
          except errors.ConfigurationError, err:
1057
            nic_errors.append("Instance %s, nic/%d: %s" %
1058
                              (instance.name, nic_idx, err))
1059

    
1060
          # if we're moving instances to routed, check that they have an ip
1061
          target_mode = params_filled[constants.NIC_MODE]
1062
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
1063
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
1064
                              " address" % (instance.name, nic_idx))
1065
      if nic_errors:
1066
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
1067
                                   "\n".join(nic_errors), errors.ECODE_INVAL)
1068

    
1069
    # hypervisor list/parameters
1070
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
1071
    if self.op.hvparams:
1072
      for hv_name, hv_dict in self.op.hvparams.items():
1073
        if hv_name not in self.new_hvparams:
1074
          self.new_hvparams[hv_name] = hv_dict
1075
        else:
1076
          self.new_hvparams[hv_name].update(hv_dict)
1077

    
1078
    # disk template parameters
1079
    self.new_diskparams = objects.FillDict(cluster.diskparams, {})
1080
    if self.op.diskparams:
1081
      for dt_name, dt_params in self.op.diskparams.items():
1082
        if dt_name not in self.new_diskparams:
1083
          self.new_diskparams[dt_name] = dt_params
1084
        else:
1085
          self.new_diskparams[dt_name].update(dt_params)
1086
      CheckDiskAccessModeConsistency(self.op.diskparams, self.cfg)
1087

    
1088
    # os hypervisor parameters
1089
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
1090
    if self.op.os_hvp:
1091
      for os_name, hvs in self.op.os_hvp.items():
1092
        if os_name not in self.new_os_hvp:
1093
          self.new_os_hvp[os_name] = hvs
1094
        else:
1095
          for hv_name, hv_dict in hvs.items():
1096
            if hv_dict is None:
1097
              # Delete if it exists
1098
              self.new_os_hvp[os_name].pop(hv_name, None)
1099
            elif hv_name not in self.new_os_hvp[os_name]:
1100
              self.new_os_hvp[os_name][hv_name] = hv_dict
1101
            else:
1102
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
1103

    
1104
    # os parameters
1105
    self.new_osp = objects.FillDict(cluster.osparams, {})
1106
    if self.op.osparams:
1107
      for os_name, osp in self.op.osparams.items():
1108
        if os_name not in self.new_osp:
1109
          self.new_osp[os_name] = {}
1110

    
1111
        self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
1112
                                                 use_none=True)
1113

    
1114
        if not self.new_osp[os_name]:
1115
          # we removed all parameters
1116
          del self.new_osp[os_name]
1117
        else:
1118
          # check the parameter validity (remote check)
1119
          CheckOSParams(self, False, [self.cfg.GetMasterNode()],
1120
                        os_name, self.new_osp[os_name])
1121

    
1122
    # changes to the hypervisor list
1123
    if self.op.enabled_hypervisors is not None:
1124
      self.hv_list = self.op.enabled_hypervisors
1125
      for hv in self.hv_list:
1126
        # if the hypervisor doesn't already exist in the cluster
1127
        # hvparams, we initialize it to empty, and then (in both
1128
        # cases) we make sure to fill the defaults, as we might not
1129
        # have a complete defaults list if the hypervisor wasn't
1130
        # enabled before
1131
        if hv not in new_hvp:
1132
          new_hvp[hv] = {}
1133
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
1134
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
1135
    else:
1136
      self.hv_list = cluster.enabled_hypervisors
1137

    
1138
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1139
      # either the enabled list has changed, or the parameters have, validate
1140
      for hv_name, hv_params in self.new_hvparams.items():
1141
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1142
            (self.op.enabled_hypervisors and
1143
             hv_name in self.op.enabled_hypervisors)):
1144
          # either this is a new hypervisor, or its parameters have changed
1145
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1146
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1147
          hv_class.CheckParameterSyntax(hv_params)
1148
          CheckHVParams(self, node_uuids, hv_name, hv_params)
1149

    
1150
    self._CheckDiskTemplateConsistency()
1151

    
1152
    if self.op.os_hvp:
1153
      # no need to check any newly-enabled hypervisors, since the
1154
      # defaults have already been checked in the above code-block
1155
      for os_name, os_hvp in self.new_os_hvp.items():
1156
        for hv_name, hv_params in os_hvp.items():
1157
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1158
          # we need to fill in the new os_hvp on top of the actual hv_p
1159
          cluster_defaults = self.new_hvparams.get(hv_name, {})
1160
          new_osp = objects.FillDict(cluster_defaults, hv_params)
1161
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1162
          hv_class.CheckParameterSyntax(new_osp)
1163
          CheckHVParams(self, node_uuids, hv_name, new_osp)
1164

    
1165
    if self.op.default_iallocator:
1166
      alloc_script = utils.FindFile(self.op.default_iallocator,
1167
                                    constants.IALLOCATOR_SEARCH_PATH,
1168
                                    os.path.isfile)
1169
      if alloc_script is None:
1170
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
1171
                                   " specified" % self.op.default_iallocator,
1172
                                   errors.ECODE_INVAL)
1173

    
1174
  def _CheckDiskTemplateConsistency(self):
1175
    """Check whether the disk templates that are going to be disabled
1176
       are still in use by some instances.
1177

1178
    """
1179
    if self.op.enabled_disk_templates:
1180
      cluster = self.cfg.GetClusterInfo()
1181
      instances = self.cfg.GetAllInstancesInfo()
1182

    
1183
      disk_templates_to_remove = set(cluster.enabled_disk_templates) \
1184
        - set(self.op.enabled_disk_templates)
1185
      for instance in instances.itervalues():
1186
        if instance.disk_template in disk_templates_to_remove:
1187
          raise errors.OpPrereqError("Cannot disable disk template '%s',"
1188
                                     " because instance '%s' is using it." %
1189
                                     (instance.disk_template, instance.name))
1190

    
1191
  def _SetVgName(self, feedback_fn):
1192
    """Determines and sets the new volume group name.
1193

1194
    """
1195
    if self.op.vg_name is not None:
1196
      new_volume = self.op.vg_name
1197
      if not new_volume:
1198
        new_volume = None
1199
      if new_volume != self.cfg.GetVGName():
1200
        self.cfg.SetVGName(new_volume)
1201
      else:
1202
        feedback_fn("Cluster LVM configuration already in desired"
1203
                    " state, not changing")
1204

    
1205
  def _SetFileStorageDir(self, feedback_fn):
1206
    """Set the file storage directory.
1207

1208
    """
1209
    if self.op.file_storage_dir is not None:
1210
      if self.cluster.file_storage_dir == self.op.file_storage_dir:
1211
        feedback_fn("Global file storage dir already set to value '%s'"
1212
                    % self.cluster.file_storage_dir)
1213
      else:
1214
        self.cluster.file_storage_dir = self.op.file_storage_dir
1215

    
1216
  def _SetDrbdHelper(self, feedback_fn):
1217
    """Set the DRBD usermode helper.
1218

1219
    """
1220
    if self.op.drbd_helper is not None:
1221
      if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
1222
        feedback_fn("Note that you specified a drbd user helper, but did not"
1223
                    " enable the drbd disk template.")
1224
      new_helper = self.op.drbd_helper
1225
      if not new_helper:
1226
        new_helper = None
1227
      if new_helper != self.cfg.GetDRBDHelper():
1228
        self.cfg.SetDRBDHelper(new_helper)
1229
      else:
1230
        feedback_fn("Cluster DRBD helper already in desired state,"
1231
                    " not changing")
1232

    
1233
  def Exec(self, feedback_fn):
1234
    """Change the parameters of the cluster.
1235

1236
    """
1237
    if self.op.enabled_disk_templates:
1238
      self.cluster.enabled_disk_templates = \
1239
        list(self.op.enabled_disk_templates)
1240

    
1241
    self._SetVgName(feedback_fn)
1242
    self._SetFileStorageDir(feedback_fn)
1243
    self._SetDrbdHelper(feedback_fn)
1244

    
1245
    if self.op.hvparams:
1246
      self.cluster.hvparams = self.new_hvparams
1247
    if self.op.os_hvp:
1248
      self.cluster.os_hvp = self.new_os_hvp
1249
    if self.op.enabled_hypervisors is not None:
1250
      self.cluster.hvparams = self.new_hvparams
1251
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1252
    if self.op.beparams:
1253
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1254
    if self.op.nicparams:
1255
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1256
    if self.op.ipolicy:
1257
      self.cluster.ipolicy = self.new_ipolicy
1258
    if self.op.osparams:
1259
      self.cluster.osparams = self.new_osp
1260
    if self.op.ndparams:
1261
      self.cluster.ndparams = self.new_ndparams
1262
    if self.op.diskparams:
1263
      self.cluster.diskparams = self.new_diskparams
1264
    if self.op.hv_state:
1265
      self.cluster.hv_state_static = self.new_hv_state
1266
    if self.op.disk_state:
1267
      self.cluster.disk_state_static = self.new_disk_state
1268

    
1269
    if self.op.candidate_pool_size is not None:
1270
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1271
      # we need to update the pool size here, otherwise the save will fail
1272
      AdjustCandidatePool(self, [])
1273

    
1274
    if self.op.maintain_node_health is not None:
1275
      if self.op.maintain_node_health and not constants.ENABLE_CONFD:
1276
        feedback_fn("Note: CONFD was disabled at build time, node health"
1277
                    " maintenance is not useful (still enabling it)")
1278
      self.cluster.maintain_node_health = self.op.maintain_node_health
1279

    
1280
    if self.op.modify_etc_hosts is not None:
1281
      self.cluster.modify_etc_hosts = self.op.modify_etc_hosts
1282

    
1283
    if self.op.prealloc_wipe_disks is not None:
1284
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
1285

    
1286
    if self.op.add_uids is not None:
1287
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
1288

    
1289
    if self.op.remove_uids is not None:
1290
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
1291

    
1292
    if self.op.uid_pool is not None:
1293
      self.cluster.uid_pool = self.op.uid_pool
1294

    
1295
    if self.op.default_iallocator is not None:
1296
      self.cluster.default_iallocator = self.op.default_iallocator
1297

    
1298
    if self.op.reserved_lvs is not None:
1299
      self.cluster.reserved_lvs = self.op.reserved_lvs
1300

    
1301
    if self.op.use_external_mip_script is not None:
1302
      self.cluster.use_external_mip_script = self.op.use_external_mip_script
1303

    
1304
    def helper_os(aname, mods, desc):
1305
      desc += " OS list"
1306
      lst = getattr(self.cluster, aname)
1307
      for key, val in mods:
1308
        if key == constants.DDM_ADD:
1309
          if val in lst:
1310
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1311
          else:
1312
            lst.append(val)
1313
        elif key == constants.DDM_REMOVE:
1314
          if val in lst:
1315
            lst.remove(val)
1316
          else:
1317
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1318
        else:
1319
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
1320

    
1321
    if self.op.hidden_os:
1322
      helper_os("hidden_os", self.op.hidden_os, "hidden")
1323

    
1324
    if self.op.blacklisted_os:
1325
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1326

    
1327
    if self.op.master_netdev:
1328
      master_params = self.cfg.GetMasterNetworkParameters()
1329
      ems = self.cfg.GetUseExternalMipScript()
1330
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
1331
                  self.cluster.master_netdev)
1332
      result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
1333
                                                       master_params, ems)
1334
      if not self.op.force:
1335
        result.Raise("Could not disable the master ip")
1336
      else:
1337
        if result.fail_msg:
1338
          msg = ("Could not disable the master ip (continuing anyway): %s" %
1339
                 result.fail_msg)
1340
          feedback_fn(msg)
1341
      feedback_fn("Changing master_netdev from %s to %s" %
1342
                  (master_params.netdev, self.op.master_netdev))
1343
      self.cluster.master_netdev = self.op.master_netdev
1344

    
1345
    if self.op.master_netmask:
1346
      master_params = self.cfg.GetMasterNetworkParameters()
1347
      feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1348
      result = self.rpc.call_node_change_master_netmask(
1349
                 master_params.uuid, master_params.netmask,
1350
                 self.op.master_netmask, master_params.ip,
1351
                 master_params.netdev)
1352
      result.Warn("Could not change the master IP netmask", feedback_fn)
1353
      self.cluster.master_netmask = self.op.master_netmask
1354

    
1355
    self.cfg.Update(self.cluster, feedback_fn)
1356

    
1357
    if self.op.master_netdev:
1358
      master_params = self.cfg.GetMasterNetworkParameters()
1359
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
1360
                  self.op.master_netdev)
1361
      ems = self.cfg.GetUseExternalMipScript()
1362
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
1363
                                                     master_params, ems)
1364
      result.Warn("Could not re-enable the master ip on the master,"
1365
                  " please restart manually", self.LogWarning)
1366

    
1367

    
1368
class LUClusterVerify(NoHooksLU):
1369
  """Submits all jobs necessary to verify the cluster.
1370

1371
  """
1372
  REQ_BGL = False
1373

    
1374
  def ExpandNames(self):
1375
    self.needed_locks = {}
1376

    
1377
  def Exec(self, feedback_fn):
1378
    jobs = []
1379

    
1380
    if self.op.group_name:
1381
      groups = [self.op.group_name]
1382
      depends_fn = lambda: None
1383
    else:
1384
      groups = self.cfg.GetNodeGroupList()
1385

    
1386
      # Verify global configuration
1387
      jobs.append([
1388
        opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1389
        ])
1390

    
1391
      # Always depend on global verification
1392
      depends_fn = lambda: [(-len(jobs), [])]
1393

    
1394
    jobs.extend(
1395
      [opcodes.OpClusterVerifyGroup(group_name=group,
1396
                                    ignore_errors=self.op.ignore_errors,
1397
                                    depends=depends_fn())]
1398
      for group in groups)
1399

    
1400
    # Fix up all parameters
1401
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1402
      op.debug_simulate_errors = self.op.debug_simulate_errors
1403
      op.verbose = self.op.verbose
1404
      op.error_codes = self.op.error_codes
1405
      try:
1406
        op.skip_checks = self.op.skip_checks
1407
      except AttributeError:
1408
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1409

    
1410
    return ResultWithJobs(jobs)
1411

    
1412

    
1413
class _VerifyErrors(object):
1414
  """Mix-in for cluster/group verify LUs.
1415

1416
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1417
  self.op and self._feedback_fn to be available.)
1418

1419
  """
1420

    
1421
  ETYPE_FIELD = "code"
1422
  ETYPE_ERROR = "ERROR"
1423
  ETYPE_WARNING = "WARNING"
1424

    
1425
  def _Error(self, ecode, item, msg, *args, **kwargs):
1426
    """Format an error message.
1427

1428
    Based on the opcode's error_codes parameter, either format a
1429
    parseable error code, or a simpler error string.
1430

1431
    This must be called only from Exec and functions called from Exec.
1432

1433
    """
1434
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1435
    itype, etxt, _ = ecode
1436
    # If the error code is in the list of ignored errors, demote the error to a
1437
    # warning
1438
    if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1439
      ltype = self.ETYPE_WARNING
1440
    # first complete the msg
1441
    if args:
1442
      msg = msg % args
1443
    # then format the whole message
1444
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1445
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1446
    else:
1447
      if item:
1448
        item = " " + item
1449
      else:
1450
        item = ""
1451
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1452
    # and finally report it via the feedback_fn
1453
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1454
    # do not mark the operation as failed for WARN cases only
1455
    if ltype == self.ETYPE_ERROR:
1456
      self.bad = True
1457

    
1458
  def _ErrorIf(self, cond, *args, **kwargs):
1459
    """Log an error message if the passed condition is True.
1460

1461
    """
1462
    if (bool(cond)
1463
        or self.op.debug_simulate_errors): # pylint: disable=E1101
1464
      self._Error(*args, **kwargs)
1465

    
1466

    
1467
def _VerifyCertificate(filename):
1468
  """Verifies a certificate for L{LUClusterVerifyConfig}.
1469

1470
  @type filename: string
1471
  @param filename: Path to PEM file
1472

1473
  """
1474
  try:
1475
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1476
                                           utils.ReadFile(filename))
1477
  except Exception, err: # pylint: disable=W0703
1478
    return (LUClusterVerifyConfig.ETYPE_ERROR,
1479
            "Failed to load X509 certificate %s: %s" % (filename, err))
1480

    
1481
  (errcode, msg) = \
1482
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1483
                                constants.SSL_CERT_EXPIRATION_ERROR)
1484

    
1485
  if msg:
1486
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1487
  else:
1488
    fnamemsg = None
1489

    
1490
  if errcode is None:
1491
    return (None, fnamemsg)
1492
  elif errcode == utils.CERT_WARNING:
1493
    return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1494
  elif errcode == utils.CERT_ERROR:
1495
    return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1496

    
1497
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1498

    
1499

    
1500
def _GetAllHypervisorParameters(cluster, instances):
1501
  """Compute the set of all hypervisor parameters.
1502

1503
  @type cluster: L{objects.Cluster}
1504
  @param cluster: the cluster object
1505
  @param instances: list of L{objects.Instance}
1506
  @param instances: additional instances from which to obtain parameters
1507
  @rtype: list of (origin, hypervisor, parameters)
1508
  @return: a list with all parameters found, indicating the hypervisor they
1509
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1510

1511
  """
1512
  hvp_data = []
1513

    
1514
  for hv_name in cluster.enabled_hypervisors:
1515
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1516

    
1517
  for os_name, os_hvp in cluster.os_hvp.items():
1518
    for hv_name, hv_params in os_hvp.items():
1519
      if hv_params:
1520
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1521
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1522

    
1523
  # TODO: collapse identical parameter values in a single one
1524
  for instance in instances:
1525
    if instance.hvparams:
1526
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1527
                       cluster.FillHV(instance)))
1528

    
1529
  return hvp_data
1530

    
1531

    
1532
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1533
  """Verifies the cluster config.
1534

1535
  """
1536
  REQ_BGL = False
1537

    
1538
  def _VerifyHVP(self, hvp_data):
1539
    """Verifies locally the syntax of the hypervisor parameters.
1540

1541
    """
1542
    for item, hv_name, hv_params in hvp_data:
1543
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1544
             (item, hv_name))
1545
      try:
1546
        hv_class = hypervisor.GetHypervisorClass(hv_name)
1547
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1548
        hv_class.CheckParameterSyntax(hv_params)
1549
      except errors.GenericError, err:
1550
        self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1551

    
1552
  def ExpandNames(self):
1553
    self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1554
    self.share_locks = ShareAll()
1555

    
1556
  def CheckPrereq(self):
1557
    """Check prerequisites.
1558

1559
    """
1560
    # Retrieve all information
1561
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1562
    self.all_node_info = self.cfg.GetAllNodesInfo()
1563
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1564

    
1565
  def Exec(self, feedback_fn):
1566
    """Verify integrity of cluster, performing various test on nodes.
1567

1568
    """
1569
    self.bad = False
1570
    self._feedback_fn = feedback_fn
1571

    
1572
    feedback_fn("* Verifying cluster config")
1573

    
1574
    for msg in self.cfg.VerifyConfig():
1575
      self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1576

    
1577
    feedback_fn("* Verifying cluster certificate files")
1578

    
1579
    for cert_filename in pathutils.ALL_CERT_FILES:
1580
      (errcode, msg) = _VerifyCertificate(cert_filename)
1581
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1582

    
1583
    self._ErrorIf(not utils.CanRead(constants.LUXID_USER,
1584
                                    pathutils.NODED_CERT_FILE),
1585
                  constants.CV_ECLUSTERCERT,
1586
                  None,
1587
                  pathutils.NODED_CERT_FILE + " must be accessible by the " +
1588
                    constants.LUXID_USER + " user")
1589

    
1590
    feedback_fn("* Verifying hypervisor parameters")
1591

    
1592
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1593
                                                self.all_inst_info.values()))
1594

    
1595
    feedback_fn("* Verifying all nodes belong to an existing group")
1596

    
1597
    # We do this verification here because, should this bogus circumstance
1598
    # occur, it would never be caught by VerifyGroup, which only acts on
1599
    # nodes/instances reachable from existing node groups.
1600

    
1601
    dangling_nodes = set(node for node in self.all_node_info.values()
1602
                         if node.group not in self.all_group_info)
1603

    
1604
    dangling_instances = {}
1605
    no_node_instances = []
1606

    
1607
    for inst in self.all_inst_info.values():
1608
      if inst.primary_node in [node.uuid for node in dangling_nodes]:
1609
        dangling_instances.setdefault(inst.primary_node, []).append(inst)
1610
      elif inst.primary_node not in self.all_node_info:
1611
        no_node_instances.append(inst)
1612

    
1613
    pretty_dangling = [
1614
        "%s (%s)" %
1615
        (node.name,
1616
         utils.CommaJoin(inst.name for
1617
                         inst in dangling_instances.get(node.uuid, [])))
1618
        for node in dangling_nodes]
1619

    
1620
    self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1621
                  None,
1622
                  "the following nodes (and their instances) belong to a non"
1623
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1624

    
1625
    self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1626
                  None,
1627
                  "the following instances have a non-existing primary-node:"
1628
                  " %s", utils.CommaJoin(inst.name for
1629
                                         inst in no_node_instances))
1630

    
1631
    return not self.bad
1632

    
1633

    
1634
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1635
  """Verifies the status of a node group.
1636

1637
  """
1638
  HPATH = "cluster-verify"
1639
  HTYPE = constants.HTYPE_CLUSTER
1640
  REQ_BGL = False
1641

    
1642
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1643

    
1644
  class NodeImage(object):
1645
    """A class representing the logical and physical status of a node.
1646

1647
    @type uuid: string
1648
    @ivar uuid: the node UUID to which this object refers
1649
    @ivar volumes: a structure as returned from
1650
        L{ganeti.backend.GetVolumeList} (runtime)
1651
    @ivar instances: a list of running instances (runtime)
1652
    @ivar pinst: list of configured primary instances (config)
1653
    @ivar sinst: list of configured secondary instances (config)
1654
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1655
        instances for which this node is secondary (config)
1656
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1657
    @ivar dfree: free disk, as reported by the node (runtime)
1658
    @ivar offline: the offline status (config)
1659
    @type rpc_fail: boolean
1660
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1661
        not whether the individual keys were correct) (runtime)
1662
    @type lvm_fail: boolean
1663
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1664
    @type hyp_fail: boolean
1665
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1666
    @type ghost: boolean
1667
    @ivar ghost: whether this is a known node or not (config)
1668
    @type os_fail: boolean
1669
    @ivar os_fail: whether the RPC call didn't return valid OS data
1670
    @type oslist: list
1671
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1672
    @type vm_capable: boolean
1673
    @ivar vm_capable: whether the node can host instances
1674
    @type pv_min: float
1675
    @ivar pv_min: size in MiB of the smallest PVs
1676
    @type pv_max: float
1677
    @ivar pv_max: size in MiB of the biggest PVs
1678

1679
    """
1680
    def __init__(self, offline=False, uuid=None, vm_capable=True):
1681
      self.uuid = uuid
1682
      self.volumes = {}
1683
      self.instances = []
1684
      self.pinst = []
1685
      self.sinst = []
1686
      self.sbp = {}
1687
      self.mfree = 0
1688
      self.dfree = 0
1689
      self.offline = offline
1690
      self.vm_capable = vm_capable
1691
      self.rpc_fail = False
1692
      self.lvm_fail = False
1693
      self.hyp_fail = False
1694
      self.ghost = False
1695
      self.os_fail = False
1696
      self.oslist = {}
1697
      self.pv_min = None
1698
      self.pv_max = None
1699

    
1700
  def ExpandNames(self):
1701
    # This raises errors.OpPrereqError on its own:
1702
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1703

    
1704
    # Get instances in node group; this is unsafe and needs verification later
1705
    inst_uuids = \
1706
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1707

    
1708
    self.needed_locks = {
1709
      locking.LEVEL_INSTANCE: self.cfg.GetInstanceNames(inst_uuids),
1710
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1711
      locking.LEVEL_NODE: [],
1712

    
1713
      # This opcode is run by watcher every five minutes and acquires all nodes
1714
      # for a group. It doesn't run for a long time, so it's better to acquire
1715
      # the node allocation lock as well.
1716
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1717
      }
1718

    
1719
    self.share_locks = ShareAll()
1720

    
1721
  def DeclareLocks(self, level):
1722
    if level == locking.LEVEL_NODE:
1723
      # Get members of node group; this is unsafe and needs verification later
1724
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1725

    
1726
      # In Exec(), we warn about mirrored instances that have primary and
1727
      # secondary living in separate node groups. To fully verify that
1728
      # volumes for these instances are healthy, we will need to do an
1729
      # extra call to their secondaries. We ensure here those nodes will
1730
      # be locked.
1731
      for inst_name in self.owned_locks(locking.LEVEL_INSTANCE):
1732
        # Important: access only the instances whose lock is owned
1733
        instance = self.cfg.GetInstanceInfoByName(inst_name)
1734
        if instance.disk_template in constants.DTS_INT_MIRROR:
1735
          nodes.update(instance.secondary_nodes)
1736

    
1737
      self.needed_locks[locking.LEVEL_NODE] = nodes
1738

    
1739
  def CheckPrereq(self):
1740
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1741
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1742

    
1743
    group_node_uuids = set(self.group_info.members)
1744
    group_inst_uuids = \
1745
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1746

    
1747
    unlocked_node_uuids = \
1748
        group_node_uuids.difference(self.owned_locks(locking.LEVEL_NODE))
1749

    
1750
    unlocked_inst_uuids = \
1751
        group_inst_uuids.difference(
1752
          [self.cfg.GetInstanceInfoByName(name).uuid
1753
           for name in self.owned_locks(locking.LEVEL_INSTANCE)])
1754

    
1755
    if unlocked_node_uuids:
1756
      raise errors.OpPrereqError(
1757
        "Missing lock for nodes: %s" %
1758
        utils.CommaJoin(self.cfg.GetNodeNames(unlocked_node_uuids)),
1759
        errors.ECODE_STATE)
1760

    
1761
    if unlocked_inst_uuids:
1762
      raise errors.OpPrereqError(
1763
        "Missing lock for instances: %s" %
1764
        utils.CommaJoin(self.cfg.GetInstanceNames(unlocked_inst_uuids)),
1765
        errors.ECODE_STATE)
1766

    
1767
    self.all_node_info = self.cfg.GetAllNodesInfo()
1768
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1769

    
1770
    self.my_node_uuids = group_node_uuids
1771
    self.my_node_info = dict((node_uuid, self.all_node_info[node_uuid])
1772
                             for node_uuid in group_node_uuids)
1773

    
1774
    self.my_inst_uuids = group_inst_uuids
1775
    self.my_inst_info = dict((inst_uuid, self.all_inst_info[inst_uuid])
1776
                             for inst_uuid in group_inst_uuids)
1777

    
1778
    # We detect here the nodes that will need the extra RPC calls for verifying
1779
    # split LV volumes; they should be locked.
1780
    extra_lv_nodes = set()
1781

    
1782
    for inst in self.my_inst_info.values():
1783
      if inst.disk_template in constants.DTS_INT_MIRROR:
1784
        for nuuid in inst.all_nodes:
1785
          if self.all_node_info[nuuid].group != self.group_uuid:
1786
            extra_lv_nodes.add(nuuid)
1787

    
1788
    unlocked_lv_nodes = \
1789
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1790

    
1791
    if unlocked_lv_nodes:
1792
      raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1793
                                 utils.CommaJoin(unlocked_lv_nodes),
1794
                                 errors.ECODE_STATE)
1795
    self.extra_lv_nodes = list(extra_lv_nodes)
1796

    
1797
  def _VerifyNode(self, ninfo, nresult):
1798
    """Perform some basic validation on data returned from a node.
1799

1800
      - check the result data structure is well formed and has all the
1801
        mandatory fields
1802
      - check ganeti version
1803

1804
    @type ninfo: L{objects.Node}
1805
    @param ninfo: the node to check
1806
    @param nresult: the results from the node
1807
    @rtype: boolean
1808
    @return: whether overall this call was successful (and we can expect
1809
         reasonable values in the respose)
1810

1811
    """
1812
    # main result, nresult should be a non-empty dict
1813
    test = not nresult or not isinstance(nresult, dict)
1814
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1815
                  "unable to verify node: no data returned")
1816
    if test:
1817
      return False
1818

    
1819
    # compares ganeti version
1820
    local_version = constants.PROTOCOL_VERSION
1821
    remote_version = nresult.get("version", None)
1822
    test = not (remote_version and
1823
                isinstance(remote_version, (list, tuple)) and
1824
                len(remote_version) == 2)
1825
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1826
                  "connection to node returned invalid data")
1827
    if test:
1828
      return False
1829

    
1830
    test = local_version != remote_version[0]
1831
    self._ErrorIf(test, constants.CV_ENODEVERSION, ninfo.name,
1832
                  "incompatible protocol versions: master %s,"
1833
                  " node %s", local_version, remote_version[0])
1834
    if test:
1835
      return False
1836

    
1837
    # node seems compatible, we can actually try to look into its results
1838

    
1839
    # full package version
1840
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1841
                  constants.CV_ENODEVERSION, ninfo.name,
1842
                  "software version mismatch: master %s, node %s",
1843
                  constants.RELEASE_VERSION, remote_version[1],
1844
                  code=self.ETYPE_WARNING)
1845

    
1846
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1847
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1848
      for hv_name, hv_result in hyp_result.iteritems():
1849
        test = hv_result is not None
1850
        self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
1851
                      "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1852

    
1853
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1854
    if ninfo.vm_capable and isinstance(hvp_result, list):
1855
      for item, hv_name, hv_result in hvp_result:
1856
        self._ErrorIf(True, constants.CV_ENODEHV, ninfo.name,
1857
                      "hypervisor %s parameter verify failure (source %s): %s",
1858
                      hv_name, item, hv_result)
1859

    
1860
    test = nresult.get(constants.NV_NODESETUP,
1861
                       ["Missing NODESETUP results"])
1862
    self._ErrorIf(test, constants.CV_ENODESETUP, ninfo.name,
1863
                  "node setup error: %s", "; ".join(test))
1864

    
1865
    return True
1866

    
1867
  def _VerifyNodeTime(self, ninfo, nresult,
1868
                      nvinfo_starttime, nvinfo_endtime):
1869
    """Check the node time.
1870

1871
    @type ninfo: L{objects.Node}
1872
    @param ninfo: the node to check
1873
    @param nresult: the remote results for the node
1874
    @param nvinfo_starttime: the start time of the RPC call
1875
    @param nvinfo_endtime: the end time of the RPC call
1876

1877
    """
1878
    ntime = nresult.get(constants.NV_TIME, None)
1879
    try:
1880
      ntime_merged = utils.MergeTime(ntime)
1881
    except (ValueError, TypeError):
1882
      self._ErrorIf(True, constants.CV_ENODETIME, ninfo.name,
1883
                    "Node returned invalid time")
1884
      return
1885

    
1886
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1887
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1888
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1889
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1890
    else:
1891
      ntime_diff = None
1892

    
1893
    self._ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, ninfo.name,
1894
                  "Node time diverges by at least %s from master node time",
1895
                  ntime_diff)
1896

    
1897
  def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1898
    """Check the node LVM results and update info for cross-node checks.
1899

1900
    @type ninfo: L{objects.Node}
1901
    @param ninfo: the node to check
1902
    @param nresult: the remote results for the node
1903
    @param vg_name: the configured VG name
1904
    @type nimg: L{NodeImage}
1905
    @param nimg: node image
1906

1907
    """
1908
    if vg_name is None:
1909
      return
1910

    
1911
    # checks vg existence and size > 20G
1912
    vglist = nresult.get(constants.NV_VGLIST, None)
1913
    test = not vglist
1914
    self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
1915
                  "unable to check volume groups")
1916
    if not test:
1917
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1918
                                            constants.MIN_VG_SIZE)
1919
      self._ErrorIf(vgstatus, constants.CV_ENODELVM, ninfo.name, vgstatus)
1920

    
1921
    # Check PVs
1922
    (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1923
    for em in errmsgs:
1924
      self._Error(constants.CV_ENODELVM, ninfo.name, em)
1925
    if pvminmax is not None:
1926
      (nimg.pv_min, nimg.pv_max) = pvminmax
1927

    
1928
  def _VerifyGroupDRBDVersion(self, node_verify_infos):
1929
    """Check cross-node DRBD version consistency.
1930

1931
    @type node_verify_infos: dict
1932
    @param node_verify_infos: infos about nodes as returned from the
1933
      node_verify call.
1934

1935
    """
1936
    node_versions = {}
1937
    for node_uuid, ndata in node_verify_infos.items():
1938
      nresult = ndata.payload
1939
      version = nresult.get(constants.NV_DRBDVERSION, "Missing DRBD version")
1940
      node_versions[node_uuid] = version
1941

    
1942
    if len(set(node_versions.values())) > 1:
1943
      for node_uuid, version in sorted(node_versions.items()):
1944
        msg = "DRBD version mismatch: %s" % version
1945
        self._Error(constants.CV_ENODEDRBDHELPER, node_uuid, msg,
1946
                    code=self.ETYPE_WARNING)
1947

    
1948
  def _VerifyGroupLVM(self, node_image, vg_name):
1949
    """Check cross-node consistency in LVM.
1950

1951
    @type node_image: dict
1952
    @param node_image: info about nodes, mapping from node to names to
1953
      L{NodeImage} objects
1954
    @param vg_name: the configured VG name
1955

1956
    """
1957
    if vg_name is None:
1958
      return
1959

    
1960
    # Only exclusive storage needs this kind of checks
1961
    if not self._exclusive_storage:
1962
      return
1963

    
1964
    # exclusive_storage wants all PVs to have the same size (approximately),
1965
    # if the smallest and the biggest ones are okay, everything is fine.
1966
    # pv_min is None iff pv_max is None
1967
    vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1968
    if not vals:
1969
      return
1970
    (pvmin, minnode_uuid) = min((ni.pv_min, ni.uuid) for ni in vals)
1971
    (pvmax, maxnode_uuid) = max((ni.pv_max, ni.uuid) for ni in vals)
1972
    bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1973
    self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1974
                  "PV sizes differ too much in the group; smallest (%s MB) is"
1975
                  " on %s, biggest (%s MB) is on %s",
1976
                  pvmin, self.cfg.GetNodeName(minnode_uuid),
1977
                  pvmax, self.cfg.GetNodeName(maxnode_uuid))
1978

    
1979
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1980
    """Check the node bridges.
1981

1982
    @type ninfo: L{objects.Node}
1983
    @param ninfo: the node to check
1984
    @param nresult: the remote results for the node
1985
    @param bridges: the expected list of bridges
1986

1987
    """
1988
    if not bridges:
1989
      return
1990

    
1991
    missing = nresult.get(constants.NV_BRIDGES, None)
1992
    test = not isinstance(missing, list)
1993
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1994
                  "did not return valid bridge information")
1995
    if not test:
1996
      self._ErrorIf(bool(missing), constants.CV_ENODENET, ninfo.name,
1997
                    "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1998

    
1999
  def _VerifyNodeUserScripts(self, ninfo, nresult):
2000
    """Check the results of user scripts presence and executability on the node
2001

2002
    @type ninfo: L{objects.Node}
2003
    @param ninfo: the node to check
2004
    @param nresult: the remote results for the node
2005

2006
    """
2007
    test = not constants.NV_USERSCRIPTS in nresult
2008
    self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
2009
                  "did not return user scripts information")
2010

    
2011
    broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
2012
    if not test:
2013
      self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
2014
                    "user scripts not present or not executable: %s" %
2015
                    utils.CommaJoin(sorted(broken_scripts)))
2016

    
2017
  def _VerifyNodeNetwork(self, ninfo, nresult):
2018
    """Check the node network connectivity results.
2019

2020
    @type ninfo: L{objects.Node}
2021
    @param ninfo: the node to check
2022
    @param nresult: the remote results for the node
2023

2024
    """
2025
    test = constants.NV_NODELIST not in nresult
2026
    self._ErrorIf(test, constants.CV_ENODESSH, ninfo.name,
2027
                  "node hasn't returned node ssh connectivity data")
2028
    if not test:
2029
      if nresult[constants.NV_NODELIST]:
2030
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
2031
          self._ErrorIf(True, constants.CV_ENODESSH, ninfo.name,
2032
                        "ssh communication with node '%s': %s", a_node, a_msg)
2033

    
2034
    test = constants.NV_NODENETTEST not in nresult
2035
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
2036
                  "node hasn't returned node tcp connectivity data")
2037
    if not test:
2038
      if nresult[constants.NV_NODENETTEST]:
2039
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
2040
        for anode in nlist:
2041
          self._ErrorIf(True, constants.CV_ENODENET, ninfo.name,
2042
                        "tcp communication with node '%s': %s",
2043
                        anode, nresult[constants.NV_NODENETTEST][anode])
2044

    
2045
    test = constants.NV_MASTERIP not in nresult
2046
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
2047
                  "node hasn't returned node master IP reachability data")
2048
    if not test:
2049
      if not nresult[constants.NV_MASTERIP]:
2050
        if ninfo.uuid == self.master_node:
2051
          msg = "the master node cannot reach the master IP (not configured?)"
2052
        else:
2053
          msg = "cannot reach the master IP"
2054
        self._ErrorIf(True, constants.CV_ENODENET, ninfo.name, msg)
2055

    
2056
  def _VerifyInstance(self, instance, node_image, diskstatus):
2057
    """Verify an instance.
2058

2059
    This function checks to see if the required block devices are
2060
    available on the instance's node, and that the nodes are in the correct
2061
    state.
2062

2063
    """
2064
    pnode_uuid = instance.primary_node
2065
    pnode_img = node_image[pnode_uuid]
2066
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
2067

    
2068
    node_vol_should = {}
2069
    instance.MapLVsByNode(node_vol_should)
2070

    
2071
    cluster = self.cfg.GetClusterInfo()
2072
    ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2073
                                                            self.group_info)
2074
    err = ComputeIPolicyInstanceViolation(ipolicy, instance, self.cfg)
2075
    self._ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance.name,
2076
                  utils.CommaJoin(err), code=self.ETYPE_WARNING)
2077

    
2078
    for node_uuid in node_vol_should:
2079
      n_img = node_image[node_uuid]
2080
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
2081
        # ignore missing volumes on offline or broken nodes
2082
        continue
2083
      for volume in node_vol_should[node_uuid]:
2084
        test = volume not in n_img.volumes
2085
        self._ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance.name,
2086
                      "volume %s missing on node %s", volume,
2087
                      self.cfg.GetNodeName(node_uuid))
2088

    
2089
    if instance.admin_state == constants.ADMINST_UP:
2090
      test = instance.uuid not in pnode_img.instances and not pnode_img.offline
2091
      self._ErrorIf(test, constants.CV_EINSTANCEDOWN, instance.name,
2092
                    "instance not running on its primary node %s",
2093
                     self.cfg.GetNodeName(pnode_uuid))
2094
      self._ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE,
2095
                    instance.name, "instance is marked as running and lives on"
2096
                    " offline node %s", self.cfg.GetNodeName(pnode_uuid))
2097

    
2098
    diskdata = [(nname, success, status, idx)
2099
                for (nname, disks) in diskstatus.items()
2100
                for idx, (success, status) in enumerate(disks)]
2101

    
2102
    for nname, success, bdev_status, idx in diskdata:
2103
      # the 'ghost node' construction in Exec() ensures that we have a
2104
      # node here
2105
      snode = node_image[nname]
2106
      bad_snode = snode.ghost or snode.offline
2107
      self._ErrorIf(instance.disks_active and
2108
                    not success and not bad_snode,
2109
                    constants.CV_EINSTANCEFAULTYDISK, instance.name,
2110
                    "couldn't retrieve status for disk/%s on %s: %s",
2111
                    idx, self.cfg.GetNodeName(nname), bdev_status)
2112

    
2113
      if instance.disks_active and success and \
2114
         (bdev_status.is_degraded or
2115
          bdev_status.ldisk_status != constants.LDS_OKAY):
2116
        msg = "disk/%s on %s" % (idx, self.cfg.GetNodeName(nname))
2117
        if bdev_status.is_degraded:
2118
          msg += " is degraded"
2119
        if bdev_status.ldisk_status != constants.LDS_OKAY:
2120
          msg += "; state is '%s'" % \
2121
                 constants.LDS_NAMES[bdev_status.ldisk_status]
2122

    
2123
        self._Error(constants.CV_EINSTANCEFAULTYDISK, instance.name, msg)
2124

    
2125
    self._ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2126
                  constants.CV_ENODERPC, self.cfg.GetNodeName(pnode_uuid),
2127
                  "instance %s, connection to primary node failed",
2128
                  instance.name)
2129

    
2130
    self._ErrorIf(len(instance.secondary_nodes) > 1,
2131
                  constants.CV_EINSTANCELAYOUT, instance.name,
2132
                  "instance has multiple secondary nodes: %s",
2133
                  utils.CommaJoin(instance.secondary_nodes),
2134
                  code=self.ETYPE_WARNING)
2135

    
2136
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, instance.all_nodes)
2137
    if any(es_flags.values()):
2138
      if instance.disk_template not in constants.DTS_EXCL_STORAGE:
2139
        # Disk template not compatible with exclusive_storage: no instance
2140
        # node should have the flag set
2141
        es_nodes = [n
2142
                    for (n, es) in es_flags.items()
2143
                    if es]
2144
        self._Error(constants.CV_EINSTANCEUNSUITABLENODE, instance.name,
2145
                    "instance has template %s, which is not supported on nodes"
2146
                    " that have exclusive storage set: %s",
2147
                    instance.disk_template,
2148
                    utils.CommaJoin(self.cfg.GetNodeNames(es_nodes)))
2149
      for (idx, disk) in enumerate(instance.disks):
2150
        self._ErrorIf(disk.spindles is None,
2151
                      constants.CV_EINSTANCEMISSINGCFGPARAMETER, instance.name,
2152
                      "number of spindles not configured for disk %s while"
2153
                      " exclusive storage is enabled, try running"
2154
                      " gnt-cluster repair-disk-sizes", idx)
2155

    
2156
    if instance.disk_template in constants.DTS_INT_MIRROR:
2157
      instance_nodes = utils.NiceSort(instance.all_nodes)
2158
      instance_groups = {}
2159

    
2160
      for node_uuid in instance_nodes:
2161
        instance_groups.setdefault(self.all_node_info[node_uuid].group,
2162
                                   []).append(node_uuid)
2163

    
2164
      pretty_list = [
2165
        "%s (group %s)" % (utils.CommaJoin(self.cfg.GetNodeNames(nodes)),
2166
                           groupinfo[group].name)
2167
        # Sort so that we always list the primary node first.
2168
        for group, nodes in sorted(instance_groups.items(),
2169
                                   key=lambda (_, nodes): pnode_uuid in nodes,
2170
                                   reverse=True)]
2171

    
2172
      self._ErrorIf(len(instance_groups) > 1,
2173
                    constants.CV_EINSTANCESPLITGROUPS,
2174
                    instance.name, "instance has primary and secondary nodes in"
2175
                    " different groups: %s", utils.CommaJoin(pretty_list),
2176
                    code=self.ETYPE_WARNING)
2177

    
2178
    inst_nodes_offline = []
2179
    for snode in instance.secondary_nodes:
2180
      s_img = node_image[snode]
2181
      self._ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
2182
                    self.cfg.GetNodeName(snode),
2183
                    "instance %s, connection to secondary node failed",
2184
                    instance.name)
2185

    
2186
      if s_img.offline:
2187
        inst_nodes_offline.append(snode)
2188

    
2189
    # warn that the instance lives on offline nodes
2190
    self._ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE,
2191
                  instance.name, "instance has offline secondary node(s) %s",
2192
                  utils.CommaJoin(self.cfg.GetNodeNames(inst_nodes_offline)))
2193
    # ... or ghost/non-vm_capable nodes
2194
    for node_uuid in instance.all_nodes:
2195
      self._ErrorIf(node_image[node_uuid].ghost, constants.CV_EINSTANCEBADNODE,
2196
                    instance.name, "instance lives on ghost node %s",
2197
                    self.cfg.GetNodeName(node_uuid))
2198
      self._ErrorIf(not node_image[node_uuid].vm_capable,
2199
                    constants.CV_EINSTANCEBADNODE, instance.name,
2200
                    "instance lives on non-vm_capable node %s",
2201
                    self.cfg.GetNodeName(node_uuid))
2202

    
2203
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2204
    """Verify if there are any unknown volumes in the cluster.
2205

2206
    The .os, .swap and backup volumes are ignored. All other volumes are
2207
    reported as unknown.
2208

2209
    @type reserved: L{ganeti.utils.FieldSet}
2210
    @param reserved: a FieldSet of reserved volume names
2211

2212
    """
2213
    for node_uuid, n_img in node_image.items():
2214
      if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
2215
          self.all_node_info[node_uuid].group != self.group_uuid):
2216
        # skip non-healthy nodes
2217
        continue
2218
      for volume in n_img.volumes:
2219
        test = ((node_uuid not in node_vol_should or
2220
                volume not in node_vol_should[node_uuid]) and
2221
                not reserved.Matches(volume))
2222
        self._ErrorIf(test, constants.CV_ENODEORPHANLV,
2223
                      self.cfg.GetNodeName(node_uuid),
2224
                      "volume %s is unknown", volume)
2225

    
2226
  def _VerifyNPlusOneMemory(self, node_image, all_insts):
2227
    """Verify N+1 Memory Resilience.
2228

2229
    Check that if one single node dies we can still start all the
2230
    instances it was primary for.
2231

2232
    """
2233
    cluster_info = self.cfg.GetClusterInfo()
2234
    for node_uuid, n_img in node_image.items():
2235
      # This code checks that every node which is now listed as
2236
      # secondary has enough memory to host all instances it is
2237
      # supposed to should a single other node in the cluster fail.
2238
      # FIXME: not ready for failover to an arbitrary node
2239
      # FIXME: does not support file-backed instances
2240
      # WARNING: we currently take into account down instances as well
2241
      # as up ones, considering that even if they're down someone
2242
      # might want to start them even in the event of a node failure.
2243
      if n_img.offline or \
2244
         self.all_node_info[node_uuid].group != self.group_uuid:
2245
        # we're skipping nodes marked offline and nodes in other groups from
2246
        # the N+1 warning, since most likely we don't have good memory
2247
        # information from them; we already list instances living on such
2248
        # nodes, and that's enough warning
2249
        continue
2250
      #TODO(dynmem): also consider ballooning out other instances
2251
      for prinode, inst_uuids in n_img.sbp.items():
2252
        needed_mem = 0
2253
        for inst_uuid in inst_uuids:
2254
          bep = cluster_info.FillBE(all_insts[inst_uuid])
2255
          if bep[constants.BE_AUTO_BALANCE]:
2256
            needed_mem += bep[constants.BE_MINMEM]
2257
        test = n_img.mfree < needed_mem
2258
        self._ErrorIf(test, constants.CV_ENODEN1,
2259
                      self.cfg.GetNodeName(node_uuid),
2260
                      "not enough memory to accomodate instance failovers"
2261
                      " should node %s fail (%dMiB needed, %dMiB available)",
2262
                      self.cfg.GetNodeName(prinode), needed_mem, n_img.mfree)
2263

    
2264
  def _VerifyFiles(self, nodes, master_node_uuid, all_nvinfo,
2265
                   (files_all, files_opt, files_mc, files_vm)):
2266
    """Verifies file checksums collected from all nodes.
2267

2268
    @param nodes: List of L{objects.Node} objects
2269
    @param master_node_uuid: UUID of master node
2270
    @param all_nvinfo: RPC results
2271

2272
    """
2273
    # Define functions determining which nodes to consider for a file
2274
    files2nodefn = [
2275
      (files_all, None),
2276
      (files_mc, lambda node: (node.master_candidate or
2277
                               node.uuid == master_node_uuid)),
2278
      (files_vm, lambda node: node.vm_capable),
2279
      ]
2280

    
2281
    # Build mapping from filename to list of nodes which should have the file
2282
    nodefiles = {}
2283
    for (files, fn) in files2nodefn:
2284
      if fn is None:
2285
        filenodes = nodes
2286
      else:
2287
        filenodes = filter(fn, nodes)
2288
      nodefiles.update((filename,
2289
                        frozenset(map(operator.attrgetter("uuid"), filenodes)))
2290
                       for filename in files)
2291

    
2292
    assert set(nodefiles) == (files_all | files_mc | files_vm)
2293

    
2294
    fileinfo = dict((filename, {}) for filename in nodefiles)
2295
    ignore_nodes = set()
2296

    
2297
    for node in nodes:
2298
      if node.offline:
2299
        ignore_nodes.add(node.uuid)
2300
        continue
2301

    
2302
      nresult = all_nvinfo[node.uuid]
2303

    
2304
      if nresult.fail_msg or not nresult.payload:
2305
        node_files = None
2306
      else:
2307
        fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
2308
        node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2309
                          for (key, value) in fingerprints.items())
2310
        del fingerprints
2311

    
2312
      test = not (node_files and isinstance(node_files, dict))
2313
      self._ErrorIf(test, constants.CV_ENODEFILECHECK, node.name,
2314
                    "Node did not return file checksum data")
2315
      if test:
2316
        ignore_nodes.add(node.uuid)
2317
        continue
2318

    
2319
      # Build per-checksum mapping from filename to nodes having it
2320
      for (filename, checksum) in node_files.items():
2321
        assert filename in nodefiles
2322
        fileinfo[filename].setdefault(checksum, set()).add(node.uuid)
2323

    
2324
    for (filename, checksums) in fileinfo.items():
2325
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2326

    
2327
      # Nodes having the file
2328
      with_file = frozenset(node_uuid
2329
                            for node_uuids in fileinfo[filename].values()
2330
                            for node_uuid in node_uuids) - ignore_nodes
2331

    
2332
      expected_nodes = nodefiles[filename] - ignore_nodes
2333

    
2334
      # Nodes missing file
2335
      missing_file = expected_nodes - with_file
2336

    
2337
      if filename in files_opt:
2338
        # All or no nodes
2339
        self._ErrorIf(missing_file and missing_file != expected_nodes,
2340
                      constants.CV_ECLUSTERFILECHECK, None,
2341
                      "File %s is optional, but it must exist on all or no"
2342
                      " nodes (not found on %s)",
2343
                      filename,
2344
                      utils.CommaJoin(
2345
                        utils.NiceSort(
2346
                          map(self.cfg.GetNodeName, missing_file))))
2347
      else:
2348
        self._ErrorIf(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2349
                      "File %s is missing from node(s) %s", filename,
2350
                      utils.CommaJoin(
2351
                        utils.NiceSort(
2352
                          map(self.cfg.GetNodeName, missing_file))))
2353

    
2354
        # Warn if a node has a file it shouldn't
2355
        unexpected = with_file - expected_nodes
2356
        self._ErrorIf(unexpected,
2357
                      constants.CV_ECLUSTERFILECHECK, None,
2358
                      "File %s should not exist on node(s) %s",
2359
                      filename, utils.CommaJoin(
2360
                        utils.NiceSort(map(self.cfg.GetNodeName, unexpected))))
2361

    
2362
      # See if there are multiple versions of the file
2363
      test = len(checksums) > 1
2364
      if test:
2365
        variants = ["variant %s on %s" %
2366
                    (idx + 1,
2367
                     utils.CommaJoin(utils.NiceSort(
2368
                       map(self.cfg.GetNodeName, node_uuids))))
2369
                    for (idx, (checksum, node_uuids)) in
2370
                      enumerate(sorted(checksums.items()))]
2371
      else:
2372
        variants = []
2373

    
2374
      self._ErrorIf(test, constants.CV_ECLUSTERFILECHECK, None,
2375
                    "File %s found with %s different checksums (%s)",
2376
                    filename, len(checksums), "; ".join(variants))
2377

    
2378
  def _VerifyNodeDrbdHelper(self, ninfo, nresult, drbd_helper):
2379
    """Verify the drbd helper.
2380

2381
    """
2382
    if drbd_helper:
2383
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2384
      test = (helper_result is None)
2385
      self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2386
                    "no drbd usermode helper returned")
2387
      if helper_result:
2388
        status, payload = helper_result
2389
        test = not status
2390
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2391
                      "drbd usermode helper check unsuccessful: %s", payload)
2392
        test = status and (payload != drbd_helper)
2393
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2394
                      "wrong drbd usermode helper: %s", payload)
2395

    
2396
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2397
                      drbd_map):
2398
    """Verifies and the node DRBD status.
2399

2400
    @type ninfo: L{objects.Node}
2401
    @param ninfo: the node to check
2402
    @param nresult: the remote results for the node
2403
    @param instanceinfo: the dict of instances
2404
    @param drbd_helper: the configured DRBD usermode helper
2405
    @param drbd_map: the DRBD map as returned by
2406
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2407

2408
    """
2409
    self._VerifyNodeDrbdHelper(ninfo, nresult, drbd_helper)
2410

    
2411
    # compute the DRBD minors
2412
    node_drbd = {}
2413
    for minor, inst_uuid in drbd_map[ninfo.uuid].items():
2414
      test = inst_uuid not in instanceinfo
2415
      self._ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2416
                    "ghost instance '%s' in temporary DRBD map", inst_uuid)
2417
        # ghost instance should not be running, but otherwise we
2418
        # don't give double warnings (both ghost instance and
2419
        # unallocated minor in use)
2420
      if test:
2421
        node_drbd[minor] = (inst_uuid, False)
2422
      else:
2423
        instance = instanceinfo[inst_uuid]
2424
        node_drbd[minor] = (inst_uuid, instance.disks_active)
2425

    
2426
    # and now check them
2427
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2428
    test = not isinstance(used_minors, (tuple, list))
2429
    self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2430
                  "cannot parse drbd status file: %s", str(used_minors))
2431
    if test:
2432
      # we cannot check drbd status
2433
      return
2434

    
2435
    for minor, (inst_uuid, must_exist) in node_drbd.items():
2436
      test = minor not in used_minors and must_exist
2437
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2438
                    "drbd minor %d of instance %s is not active", minor,
2439
                    self.cfg.GetInstanceName(inst_uuid))
2440
    for minor in used_minors:
2441
      test = minor not in node_drbd
2442
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2443
                    "unallocated drbd minor %d is in use", minor)
2444

    
2445
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2446
    """Builds the node OS structures.
2447

2448
    @type ninfo: L{objects.Node}
2449
    @param ninfo: the node to check
2450
    @param nresult: the remote results for the node
2451
    @param nimg: the node image object
2452

2453
    """
2454
    remote_os = nresult.get(constants.NV_OSLIST, None)
2455
    test = (not isinstance(remote_os, list) or
2456
            not compat.all(isinstance(v, list) and len(v) == 7
2457
                           for v in remote_os))
2458

    
2459
    self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2460
                  "node hasn't returned valid OS data")
2461

    
2462
    nimg.os_fail = test
2463

    
2464
    if test:
2465
      return
2466

    
2467
    os_dict = {}
2468

    
2469
    for (name, os_path, status, diagnose,
2470
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2471

    
2472
      if name not in os_dict:
2473
        os_dict[name] = []
2474

    
2475
      # parameters is a list of lists instead of list of tuples due to
2476
      # JSON lacking a real tuple type, fix it:
2477
      parameters = [tuple(v) for v in parameters]
2478
      os_dict[name].append((os_path, status, diagnose,
2479
                            set(variants), set(parameters), set(api_ver)))
2480

    
2481
    nimg.oslist = os_dict
2482

    
2483
  def _VerifyNodeOS(self, ninfo, nimg, base):
2484
    """Verifies the node OS list.
2485

2486
    @type ninfo: L{objects.Node}
2487
    @param ninfo: the node to check
2488
    @param nimg: the node image object
2489
    @param base: the 'template' node we match against (e.g. from the master)
2490

2491
    """
2492
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2493

    
2494
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2495
    for os_name, os_data in nimg.oslist.items():
2496
      assert os_data, "Empty OS status for OS %s?!" % os_name
2497
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2498
      self._ErrorIf(not f_status, constants.CV_ENODEOS, ninfo.name,
2499
                    "Invalid OS %s (located at %s): %s",
2500
                    os_name, f_path, f_diag)
2501
      self._ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, ninfo.name,
2502
                    "OS '%s' has multiple entries"
2503
                    " (first one shadows the rest): %s",
2504
                    os_name, utils.CommaJoin([v[0] for v in os_data]))
2505
      # comparisons with the 'base' image
2506
      test = os_name not in base.oslist
2507
      self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2508
                    "Extra OS %s not present on reference node (%s)",
2509
                    os_name, self.cfg.GetNodeName(base.uuid))
2510
      if test:
2511
        continue
2512
      assert base.oslist[os_name], "Base node has empty OS status?"
2513
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2514
      if not b_status:
2515
        # base OS is invalid, skipping
2516
        continue
2517
      for kind, a, b in [("API version", f_api, b_api),
2518
                         ("variants list", f_var, b_var),
2519
                         ("parameters", beautify_params(f_param),
2520
                          beautify_params(b_param))]:
2521
        self._ErrorIf(a != b, constants.CV_ENODEOS, ninfo.name,
2522
                      "OS %s for %s differs from reference node %s:"
2523
                      " [%s] vs. [%s]", kind, os_name,
2524
                      self.cfg.GetNodeName(base.uuid),
2525
                      utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2526

    
2527
    # check any missing OSes
2528
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2529
    self._ErrorIf(missing, constants.CV_ENODEOS, ninfo.name,
2530
                  "OSes present on reference node %s"
2531
                  " but missing on this node: %s",
2532
                  self.cfg.GetNodeName(base.uuid), utils.CommaJoin(missing))
2533

    
2534
  def _VerifyAcceptedFileStoragePaths(self, ninfo, nresult, is_master):
2535
    """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2536

2537
    @type ninfo: L{objects.Node}
2538
    @param ninfo: the node to check
2539
    @param nresult: the remote results for the node
2540
    @type is_master: bool
2541
    @param is_master: Whether node is the master node
2542

2543
    """
2544
    cluster = self.cfg.GetClusterInfo()
2545
    if (is_master and
2546
        (cluster.IsFileStorageEnabled() or
2547
         cluster.IsSharedFileStorageEnabled())):
2548
      try:
2549
        fspaths = nresult[constants.NV_ACCEPTED_STORAGE_PATHS]
2550
      except KeyError:
2551
        # This should never happen
2552
        self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2553
                      "Node did not return forbidden file storage paths")
2554
      else:
2555
        self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2556
                      "Found forbidden file storage paths: %s",
2557
                      utils.CommaJoin(fspaths))
2558
    else:
2559
      self._ErrorIf(constants.NV_ACCEPTED_STORAGE_PATHS in nresult,
2560
                    constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2561
                    "Node should not have returned forbidden file storage"
2562
                    " paths")
2563

    
2564
  def _VerifyStoragePaths(self, ninfo, nresult, file_disk_template,
2565
                          verify_key, error_key):
2566
    """Verifies (file) storage paths.
2567

2568
    @type ninfo: L{objects.Node}
2569
    @param ninfo: the node to check
2570
    @param nresult: the remote results for the node
2571
    @type file_disk_template: string
2572
    @param file_disk_template: file-based disk template, whose directory
2573
        is supposed to be verified
2574
    @type verify_key: string
2575
    @param verify_key: key for the verification map of this file
2576
        verification step
2577
    @param error_key: error key to be added to the verification results
2578
        in case something goes wrong in this verification step
2579

2580
    """
2581
    assert (file_disk_template in
2582
            utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
2583
    cluster = self.cfg.GetClusterInfo()
2584
    if cluster.IsDiskTemplateEnabled(file_disk_template):
2585
      self._ErrorIf(
2586
          verify_key in nresult,
2587
          error_key, ninfo.name,
2588
          "The configured %s storage path is unusable: %s" %
2589
          (file_disk_template, nresult.get(verify_key)))
2590

    
2591
  def _VerifyFileStoragePaths(self, ninfo, nresult):
2592
    """Verifies (file) storage paths.
2593

2594
    @see: C{_VerifyStoragePaths}
2595

2596
    """
2597
    self._VerifyStoragePaths(
2598
        ninfo, nresult, constants.DT_FILE,
2599
        constants.NV_FILE_STORAGE_PATH,
2600
        constants.CV_ENODEFILESTORAGEPATHUNUSABLE)
2601

    
2602
  def _VerifySharedFileStoragePaths(self, ninfo, nresult):
2603
    """Verifies (file) storage paths.
2604

2605
    @see: C{_VerifyStoragePaths}
2606

2607
    """
2608
    self._VerifyStoragePaths(
2609
        ninfo, nresult, constants.DT_SHARED_FILE,
2610
        constants.NV_SHARED_FILE_STORAGE_PATH,
2611
        constants.CV_ENODESHAREDFILESTORAGEPATHUNUSABLE)
2612

    
2613
  def _VerifyOob(self, ninfo, nresult):
2614
    """Verifies out of band functionality of a node.
2615

2616
    @type ninfo: L{objects.Node}
2617
    @param ninfo: the node to check
2618
    @param nresult: the remote results for the node
2619

2620
    """
2621
    # We just have to verify the paths on master and/or master candidates
2622
    # as the oob helper is invoked on the master
2623
    if ((ninfo.master_candidate or ninfo.master_capable) and
2624
        constants.NV_OOB_PATHS in nresult):
2625
      for path_result in nresult[constants.NV_OOB_PATHS]:
2626
        self._ErrorIf(path_result, constants.CV_ENODEOOBPATH,
2627
                      ninfo.name, path_result)
2628

    
2629
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2630
    """Verifies and updates the node volume data.
2631

2632
    This function will update a L{NodeImage}'s internal structures
2633
    with data from the remote call.
2634

2635
    @type ninfo: L{objects.Node}
2636
    @param ninfo: the node to check
2637
    @param nresult: the remote results for the node
2638
    @param nimg: the node image object
2639
    @param vg_name: the configured VG name
2640

2641
    """
2642
    nimg.lvm_fail = True
2643
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2644
    if vg_name is None:
2645
      pass
2646
    elif isinstance(lvdata, basestring):
2647
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2648
                    "LVM problem on node: %s", utils.SafeEncode(lvdata))
2649
    elif not isinstance(lvdata, dict):
2650
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2651
                    "rpc call to node failed (lvlist)")
2652
    else:
2653
      nimg.volumes = lvdata
2654
      nimg.lvm_fail = False
2655

    
2656
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2657
    """Verifies and updates the node instance list.
2658

2659
    If the listing was successful, then updates this node's instance
2660
    list. Otherwise, it marks the RPC call as failed for the instance
2661
    list key.
2662

2663
    @type ninfo: L{objects.Node}
2664
    @param ninfo: the node to check
2665
    @param nresult: the remote results for the node
2666
    @param nimg: the node image object
2667

2668
    """
2669
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2670
    test = not isinstance(idata, list)
2671
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2672
                  "rpc call to node failed (instancelist): %s",
2673
                  utils.SafeEncode(str(idata)))
2674
    if test:
2675
      nimg.hyp_fail = True
2676
    else:
2677
      nimg.instances = [inst.uuid for (_, inst) in
2678
                        self.cfg.GetMultiInstanceInfoByName(idata)]
2679

    
2680
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2681
    """Verifies and computes a node information map
2682

2683
    @type ninfo: L{objects.Node}
2684
    @param ninfo: the node to check
2685
    @param nresult: the remote results for the node
2686
    @param nimg: the node image object
2687
    @param vg_name: the configured VG name
2688

2689
    """
2690
    # try to read free memory (from the hypervisor)
2691
    hv_info = nresult.get(constants.NV_HVINFO, None)
2692
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2693
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2694
                  "rpc call to node failed (hvinfo)")
2695
    if not test:
2696
      try:
2697
        nimg.mfree = int(hv_info["memory_free"])
2698
      except (ValueError, TypeError):
2699
        self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2700
                      "node returned invalid nodeinfo, check hypervisor")
2701

    
2702
    # FIXME: devise a free space model for file based instances as well
2703
    if vg_name is not None:
2704
      test = (constants.NV_VGLIST not in nresult or
2705
              vg_name not in nresult[constants.NV_VGLIST])
2706
      self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
2707
                    "node didn't return data for the volume group '%s'"
2708
                    " - it is either missing or broken", vg_name)
2709
      if not test:
2710
        try:
2711
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2712
        except (ValueError, TypeError):
2713
          self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2714
                        "node returned invalid LVM info, check LVM status")
2715

    
2716
  def _CollectDiskInfo(self, node_uuids, node_image, instanceinfo):
2717
    """Gets per-disk status information for all instances.
2718

2719
    @type node_uuids: list of strings
2720
    @param node_uuids: Node UUIDs
2721
    @type node_image: dict of (UUID, L{objects.Node})
2722
    @param node_image: Node objects
2723
    @type instanceinfo: dict of (UUID, L{objects.Instance})
2724
    @param instanceinfo: Instance objects
2725
    @rtype: {instance: {node: [(succes, payload)]}}
2726
    @return: a dictionary of per-instance dictionaries with nodes as
2727
        keys and disk information as values; the disk information is a
2728
        list of tuples (success, payload)
2729

2730
    """
2731
    node_disks = {}
2732
    node_disks_dev_inst_only = {}
2733
    diskless_instances = set()
2734
    diskless = constants.DT_DISKLESS
2735

    
2736
    for nuuid in node_uuids:
2737
      node_inst_uuids = list(itertools.chain(node_image[nuuid].pinst,
2738
                                             node_image[nuuid].sinst))
2739
      diskless_instances.update(uuid for uuid in node_inst_uuids
2740
                                if instanceinfo[uuid].disk_template == diskless)
2741
      disks = [(inst_uuid, disk)
2742
               for inst_uuid in node_inst_uuids
2743
               for disk in instanceinfo[inst_uuid].disks]
2744

    
2745
      if not disks:
2746
        # No need to collect data
2747
        continue
2748

    
2749
      node_disks[nuuid] = disks
2750

    
2751
      # _AnnotateDiskParams makes already copies of the disks
2752
      dev_inst_only = []
2753
      for (inst_uuid, dev) in disks:
2754
        (anno_disk,) = AnnotateDiskParams(instanceinfo[inst_uuid], [dev],
2755
                                          self.cfg)
2756
        dev_inst_only.append((anno_disk, instanceinfo[inst_uuid]))
2757

    
2758
      node_disks_dev_inst_only[nuuid] = dev_inst_only
2759

    
2760
    assert len(node_disks) == len(node_disks_dev_inst_only)
2761

    
2762
    # Collect data from all nodes with disks
2763
    result = self.rpc.call_blockdev_getmirrorstatus_multi(
2764
               node_disks.keys(), node_disks_dev_inst_only)
2765

    
2766
    assert len(result) == len(node_disks)
2767

    
2768
    instdisk = {}
2769

    
2770
    for (nuuid, nres) in result.items():
2771
      node = self.cfg.GetNodeInfo(nuuid)
2772
      disks = node_disks[node.uuid]
2773

    
2774
      if nres.offline:
2775
        # No data from this node
2776
        data = len(disks) * [(False, "node offline")]
2777
      else:
2778
        msg = nres.fail_msg
2779
        self._ErrorIf(msg, constants.CV_ENODERPC, node.name,
2780
                      "while getting disk information: %s", msg)
2781
        if msg:
2782
          # No data from this node
2783
          data = len(disks) * [(False, msg)]
2784
        else:
2785
          data = []
2786
          for idx, i in enumerate(nres.payload):
2787
            if isinstance(i, (tuple, list)) and len(i) == 2:
2788
              data.append(i)
2789
            else:
2790
              logging.warning("Invalid result from node %s, entry %d: %s",
2791
                              node.name, idx, i)
2792
              data.append((False, "Invalid result from the remote node"))
2793

    
2794
      for ((inst_uuid, _), status) in zip(disks, data):
2795
        instdisk.setdefault(inst_uuid, {}).setdefault(node.uuid, []) \
2796
          .append(status)
2797

    
2798
    # Add empty entries for diskless instances.
2799
    for inst_uuid in diskless_instances:
2800
      assert inst_uuid not in instdisk
2801
      instdisk[inst_uuid] = {}
2802

    
2803
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2804
                      len(nuuids) <= len(instanceinfo[inst].all_nodes) and
2805
                      compat.all(isinstance(s, (tuple, list)) and
2806
                                 len(s) == 2 for s in statuses)
2807
                      for inst, nuuids in instdisk.items()
2808
                      for nuuid, statuses in nuuids.items())
2809
    if __debug__:
2810
      instdisk_keys = set(instdisk)
2811
      instanceinfo_keys = set(instanceinfo)
2812
      assert instdisk_keys == instanceinfo_keys, \
2813
        ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2814
         (instdisk_keys, instanceinfo_keys))
2815

    
2816
    return instdisk
2817

    
2818
  @staticmethod
2819
  def _SshNodeSelector(group_uuid, all_nodes):
2820
    """Create endless iterators for all potential SSH check hosts.
2821

2822
    """
2823
    nodes = [node for node in all_nodes
2824
             if (node.group != group_uuid and
2825
                 not node.offline)]
2826
    keyfunc = operator.attrgetter("group")
2827

    
2828
    return map(itertools.cycle,
2829
               [sorted(map(operator.attrgetter("name"), names))
2830
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2831
                                                  keyfunc)])
2832

    
2833
  @classmethod
2834
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2835
    """Choose which nodes should talk to which other nodes.
2836

2837
    We will make nodes contact all nodes in their group, and one node from
2838
    every other group.
2839

2840
    @warning: This algorithm has a known issue if one node group is much
2841
      smaller than others (e.g. just one node). In such a case all other
2842
      nodes will talk to the single node.
2843

2844
    """
2845
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2846
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
2847

    
2848
    return (online_nodes,
2849
            dict((name, sorted([i.next() for i in sel]))
2850
                 for name in online_nodes))
2851

    
2852
  def BuildHooksEnv(self):
2853
    """Build hooks env.
2854

2855
    Cluster-Verify hooks just ran in the post phase and their failure makes
2856
    the output be logged in the verify output and the verification to fail.
2857

2858
    """
2859
    env = {
2860
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2861
      }
2862

    
2863
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2864
               for node in self.my_node_info.values())
2865

    
2866
    return env
2867

    
2868
  def BuildHooksNodes(self):
2869
    """Build hooks nodes.
2870

2871
    """
2872
    return ([], list(self.my_node_info.keys()))
2873

    
2874
  def Exec(self, feedback_fn):
2875
    """Verify integrity of the node group, performing various test on nodes.
2876

2877
    """
2878
    # This method has too many local variables. pylint: disable=R0914
2879
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
2880

    
2881
    if not self.my_node_uuids:
2882
      # empty node group
2883
      feedback_fn("* Empty node group, skipping verification")
2884
      return True
2885

    
2886
    self.bad = False
2887
    verbose = self.op.verbose
2888
    self._feedback_fn = feedback_fn
2889

    
2890
    vg_name = self.cfg.GetVGName()
2891
    drbd_helper = self.cfg.GetDRBDHelper()
2892
    cluster = self.cfg.GetClusterInfo()
2893
    hypervisors = cluster.enabled_hypervisors
2894
    node_data_list = self.my_node_info.values()
2895

    
2896
    i_non_redundant = [] # Non redundant instances
2897
    i_non_a_balanced = [] # Non auto-balanced instances
2898
    i_offline = 0 # Count of offline instances
2899
    n_offline = 0 # Count of offline nodes
2900
    n_drained = 0 # Count of nodes being drained
2901
    node_vol_should = {}
2902

    
2903
    # FIXME: verify OS list
2904

    
2905
    # File verification
2906
    filemap = ComputeAncillaryFiles(cluster, False)
2907

    
2908
    # do local checksums
2909
    master_node_uuid = self.master_node = self.cfg.GetMasterNode()
2910
    master_ip = self.cfg.GetMasterIP()
2911

    
2912
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_uuids))
2913

    
2914
    user_scripts = []
2915
    if self.cfg.GetUseExternalMipScript():
2916
      user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2917

    
2918
    node_verify_param = {
2919
      constants.NV_FILELIST:
2920
        map(vcluster.MakeVirtualPath,
2921
            utils.UniqueSequence(filename
2922
                                 for files in filemap
2923
                                 for filename in files)),
2924
      constants.NV_NODELIST:
2925
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2926
                                  self.all_node_info.values()),
2927
      constants.NV_HYPERVISOR: hypervisors,
2928
      constants.NV_HVPARAMS:
2929
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2930
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2931
                                 for node in node_data_list
2932
                                 if not node.offline],
2933
      constants.NV_INSTANCELIST: hypervisors,
2934
      constants.NV_VERSION: None,
2935
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2936
      constants.NV_NODESETUP: None,
2937
      constants.NV_TIME: None,
2938
      constants.NV_MASTERIP: (self.cfg.GetMasterNodeName(), master_ip),
2939
      constants.NV_OSLIST: None,
2940
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2941
      constants.NV_USERSCRIPTS: user_scripts,
2942
      }
2943

    
2944
    if vg_name is not None:
2945
      node_verify_param[constants.NV_VGLIST] = None
2946
      node_verify_param[constants.NV_LVLIST] = vg_name
2947
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2948

    
2949
    if cluster.IsDiskTemplateEnabled(constants.DT_DRBD8):
2950
      if drbd_helper:
2951
        node_verify_param[constants.NV_DRBDVERSION] = None
2952
        node_verify_param[constants.NV_DRBDLIST] = None
2953
        node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2954

    
2955
    if cluster.IsFileStorageEnabled() or \
2956
        cluster.IsSharedFileStorageEnabled():
2957
      # Load file storage paths only from master node
2958
      node_verify_param[constants.NV_ACCEPTED_STORAGE_PATHS] = \
2959
        self.cfg.GetMasterNodeName()
2960
      if cluster.IsFileStorageEnabled():
2961
        node_verify_param[constants.NV_FILE_STORAGE_PATH] = \
2962
          cluster.file_storage_dir
2963

    
2964
    # bridge checks
2965
    # FIXME: this needs to be changed per node-group, not cluster-wide
2966
    bridges = set()
2967
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2968
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2969
      bridges.add(default_nicpp[constants.NIC_LINK])
2970
    for inst_uuid in self.my_inst_info.values():
2971
      for nic in inst_uuid.nics:
2972
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
2973
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2974
          bridges.add(full_nic[constants.NIC_LINK])
2975

    
2976
    if bridges:
2977
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
2978

    
2979
    # Build our expected cluster state
2980
    node_image = dict((node.uuid, self.NodeImage(offline=node.offline,
2981
                                                 uuid=node.uuid,
2982
                                                 vm_capable=node.vm_capable))
2983
                      for node in node_data_list)
2984

    
2985
    # Gather OOB paths
2986
    oob_paths = []
2987
    for node in self.all_node_info.values():
2988
      path = SupportsOob(self.cfg, node)
2989
      if path and path not in oob_paths:
2990
        oob_paths.append(path)
2991

    
2992
    if oob_paths:
2993
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2994

    
2995
    for inst_uuid in self.my_inst_uuids:
2996
      instance = self.my_inst_info[inst_uuid]
2997
      if instance.admin_state == constants.ADMINST_OFFLINE:
2998
        i_offline += 1
2999

    
3000
      for nuuid in instance.all_nodes:
3001
        if nuuid not in node_image:
3002
          gnode = self.NodeImage(uuid=nuuid)
3003
          gnode.ghost = (nuuid not in self.all_node_info)
3004
          node_image[nuuid] = gnode
3005

    
3006
      instance.MapLVsByNode(node_vol_should)
3007

    
3008
      pnode = instance.primary_node
3009
      node_image[pnode].pinst.append(instance.uuid)
3010

    
3011
      for snode in instance.secondary_nodes:
3012
        nimg = node_image[snode]
3013
        nimg.sinst.append(instance.uuid)
3014
        if pnode not in nimg.sbp:
3015
          nimg.sbp[pnode] = []
3016
        nimg.sbp[pnode].append(instance.uuid)
3017

    
3018
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
3019
                                               self.my_node_info.keys())
3020
    # The value of exclusive_storage should be the same across the group, so if
3021
    # it's True for at least a node, we act as if it were set for all the nodes
3022
    self._exclusive_storage = compat.any(es_flags.values())
3023
    if self._exclusive_storage:
3024
      node_verify_param[constants.NV_EXCLUSIVEPVS] = True
3025

    
3026
    node_group_uuids = dict(map(lambda n: (n.name, n.group),
3027
                                self.cfg.GetAllNodesInfo().values()))
3028
    groups_config = self.cfg.GetAllNodeGroupsInfoDict()
3029

    
3030
    # At this point, we have the in-memory data structures complete,
3031
    # except for the runtime information, which we'll gather next
3032

    
3033
    # Due to the way our RPC system works, exact response times cannot be
3034
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
3035
    # time before and after executing the request, we can at least have a time
3036
    # window.
3037
    nvinfo_starttime = time.time()
3038
    all_nvinfo = self.rpc.call_node_verify(self.my_node_uuids,
3039
                                           node_verify_param,
3040
                                           self.cfg.GetClusterName(),
3041
                                           self.cfg.GetClusterInfo().hvparams,
3042
                                           node_group_uuids,
3043
                                           groups_config)
3044
    nvinfo_endtime = time.time()
3045

    
3046
    if self.extra_lv_nodes and vg_name is not None:
3047
      extra_lv_nvinfo = \
3048
          self.rpc.call_node_verify(self.extra_lv_nodes,
3049
                                    {constants.NV_LVLIST: vg_name},
3050
                                    self.cfg.GetClusterName(),
3051
                                    self.cfg.GetClusterInfo().hvparams,
3052
                                    node_group_uuids,
3053
                                    groups_config)
3054
    else:
3055
      extra_lv_nvinfo = {}
3056

    
3057
    all_drbd_map = self.cfg.ComputeDRBDMap()
3058

    
3059
    feedback_fn("* Gathering disk information (%s nodes)" %
3060
                len(self.my_node_uuids))
3061
    instdisk = self._CollectDiskInfo(self.my_node_info.keys(), node_image,
3062
                                     self.my_inst_info)
3063

    
3064
    feedback_fn("* Verifying configuration file consistency")
3065

    
3066
    # If not all nodes are being checked, we need to make sure the master node
3067
    # and a non-checked vm_capable node are in the list.
3068
    absent_node_uuids = set(self.all_node_info).difference(self.my_node_info)
3069
    if absent_node_uuids:
3070
      vf_nvinfo = all_nvinfo.copy()
3071
      vf_node_info = list(self.my_node_info.values())
3072
      additional_node_uuids = []
3073
      if master_node_uuid not in self.my_node_info:
3074
        additional_node_uuids.append(master_node_uuid)
3075
        vf_node_info.append(self.all_node_info[master_node_uuid])
3076
      # Add the first vm_capable node we find which is not included,
3077
      # excluding the master node (which we already have)
3078
      for node_uuid in absent_node_uuids:
3079
        nodeinfo = self.all_node_info[node_uuid]
3080
        if (nodeinfo.vm_capable and not nodeinfo.offline and
3081
            node_uuid != master_node_uuid):
3082
          additional_node_uuids.append(node_uuid)
3083
          vf_node_info.append(self.all_node_info[node_uuid])
3084
          break
3085
      key = constants.NV_FILELIST
3086
      vf_nvinfo.update(self.rpc.call_node_verify(
3087
         additional_node_uuids, {key: node_verify_param[key]},
3088
         self.cfg.GetClusterName(), self.cfg.GetClusterInfo().hvparams,
3089
         node_group_uuids,
3090
         groups_config))
3091
    else:
3092
      vf_nvinfo = all_nvinfo
3093
      vf_node_info = self.my_node_info.values()
3094

    
3095
    self._VerifyFiles(vf_node_info, master_node_uuid, vf_nvinfo, filemap)
3096

    
3097
    feedback_fn("* Verifying node status")
3098

    
3099
    refos_img = None
3100

    
3101
    for node_i in node_data_list:
3102
      nimg = node_image[node_i.uuid]
3103

    
3104
      if node_i.offline:
3105
        if verbose:
3106
          feedback_fn("* Skipping offline node %s" % (node_i.name,))
3107
        n_offline += 1
3108
        continue
3109

    
3110
      if node_i.uuid == master_node_uuid:
3111
        ntype = "master"
3112
      elif node_i.master_candidate:
3113
        ntype = "master candidate"
3114
      elif node_i.drained:
3115
        ntype = "drained"
3116
        n_drained += 1
3117
      else:
3118
        ntype = "regular"
3119
      if verbose:
3120
        feedback_fn("* Verifying node %s (%s)" % (node_i.name, ntype))
3121

    
3122
      msg = all_nvinfo[node_i.uuid].fail_msg
3123
      self._ErrorIf(msg, constants.CV_ENODERPC, node_i.name,
3124
                    "while contacting node: %s", msg)
3125
      if msg:
3126
        nimg.rpc_fail = True
3127
        continue
3128

    
3129
      nresult = all_nvinfo[node_i.uuid].payload
3130

    
3131
      nimg.call_ok = self._VerifyNode(node_i, nresult)
3132
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
3133
      self._VerifyNodeNetwork(node_i, nresult)
3134
      self._VerifyNodeUserScripts(node_i, nresult)
3135
      self._VerifyOob(node_i, nresult)
3136
      self._VerifyAcceptedFileStoragePaths(node_i, nresult,
3137
                                           node_i.uuid == master_node_uuid)
3138
      self._VerifyFileStoragePaths(node_i, nresult)
3139
      self._VerifySharedFileStoragePaths(node_i, nresult)
3140

    
3141
      if nimg.vm_capable:
3142
        self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
3143
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
3144
                             all_drbd_map)
3145

    
3146
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
3147
        self._UpdateNodeInstances(node_i, nresult, nimg)
3148
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
3149
        self._UpdateNodeOS(node_i, nresult, nimg)
3150

    
3151
        if not nimg.os_fail:
3152
          if refos_img is None:
3153
            refos_img = nimg
3154
          self._VerifyNodeOS(node_i, nimg, refos_img)
3155
        self._VerifyNodeBridges(node_i, nresult, bridges)
3156

    
3157
        # Check whether all running instances are primary for the node. (This
3158
        # can no longer be done from _VerifyInstance below, since some of the
3159
        # wrong instances could be from other node groups.)
3160
        non_primary_inst_uuids = set(nimg.instances).difference(nimg.pinst)
3161

    
3162
        for inst_uuid in non_primary_inst_uuids:
3163
          test = inst_uuid in self.all_inst_info
3164
          self._ErrorIf(test, constants.CV_EINSTANCEWRONGNODE,
3165
                        self.cfg.GetInstanceName(inst_uuid),
3166
                        "instance should not run on node %s", node_i.name)
3167
          self._ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
3168
                        "node is running unknown instance %s", inst_uuid)
3169

    
3170
    self._VerifyGroupDRBDVersion(all_nvinfo)
3171
    self._VerifyGroupLVM(node_image, vg_name)
3172

    
3173
    for node_uuid, result in extra_lv_nvinfo.items():
3174
      self._UpdateNodeVolumes(self.all_node_info[node_uuid], result.payload,
3175
                              node_image[node_uuid], vg_name)
3176

    
3177
    feedback_fn("* Verifying instance status")
3178
    for inst_uuid in self.my_inst_uuids:
3179
      instance = self.my_inst_info[inst_uuid]
3180
      if verbose:
3181
        feedback_fn("* Verifying instance %s" % instance.name)
3182
      self._VerifyInstance(instance, node_image, instdisk[inst_uuid])
3183

    
3184
      # If the instance is non-redundant we cannot survive losing its primary
3185
      # node, so we are not N+1 compliant.
3186
      if instance.disk_template not in constants.DTS_MIRRORED:
3187
        i_non_redundant.append(instance)
3188

    
3189
      if not cluster.FillBE(instance)[constants.BE_AUTO_BALANCE]:
3190
        i_non_a_balanced.append(instance)
3191

    
3192
    feedback_fn("* Verifying orphan volumes")
3193
    reserved = utils.FieldSet(*cluster.reserved_lvs)
3194

    
3195
    # We will get spurious "unknown volume" warnings if any node of this group
3196
    # is secondary for an instance whose primary is in another group. To avoid
3197
    # them, we find these instances and add their volumes to node_vol_should.
3198
    for instance in self.all_inst_info.values():
3199
      for secondary in instance.secondary_nodes:
3200
        if (secondary in self.my_node_info
3201
            and instance.name not in self.my_inst_info):
3202
          instance.MapLVsByNode(node_vol_should)
3203
          break
3204

    
3205
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
3206

    
3207
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
3208
      feedback_fn("* Verifying N+1 Memory redundancy")
3209
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
3210

    
3211
    feedback_fn("* Other Notes")
3212
    if i_non_redundant:
3213
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
3214
                  % len(i_non_redundant))
3215

    
3216
    if i_non_a_balanced:
3217
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
3218
                  % len(i_non_a_balanced))
3219

    
3220
    if i_offline:
3221
      feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
3222

    
3223
    if n_offline:
3224
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
3225

    
3226
    if n_drained:
3227
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
3228

    
3229
    return not self.bad
3230

    
3231
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
3232
    """Analyze the post-hooks' result
3233

3234
    This method analyses the hook result, handles it, and sends some
3235
    nicely-formatted feedback back to the user.
3236

3237
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
3238
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
3239
    @param hooks_results: the results of the multi-node hooks rpc call
3240
    @param feedback_fn: function used send feedback back to the caller
3241
    @param lu_result: previous Exec result
3242
    @return: the new Exec result, based on the previous result
3243
        and hook results
3244

3245
    """
3246
    # We only really run POST phase hooks, only for non-empty groups,
3247
    # and are only interested in their results
3248
    if not self.my_node_uuids:
3249
      # empty node group
3250
      pass
3251
    elif phase == constants.HOOKS_PHASE_POST:
3252
      # Used to change hooks' output to proper indentation
3253
      feedback_fn("* Hooks Results")
3254
      assert hooks_results, "invalid result from hooks"
3255

    
3256
      for node_name in hooks_results:
3257
        res = hooks_results[node_name]
3258
        msg = res.fail_msg
3259
        test = msg and not res.offline
3260
        self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3261
                      "Communication failure in hooks execution: %s", msg)
3262
        if res.offline or msg:
3263
          # No need to investigate payload if node is offline or gave
3264
          # an error.
3265
          continue
3266
        for script, hkr, output in res.payload:
3267
          test = hkr == constants.HKR_FAIL
3268
          self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3269
                        "Script %s failed, output:", script)
3270
          if test:
3271
            output = self._HOOKS_INDENT_RE.sub("      ", output)
3272
            feedback_fn("%s" % output)
3273
            lu_result = False
3274

    
3275
    return lu_result
3276

    
3277

    
3278
class LUClusterVerifyDisks(NoHooksLU):
3279
  """Verifies the cluster disks status.
3280

3281
  """
3282
  REQ_BGL = False
3283

    
3284
  def ExpandNames(self):
3285
    self.share_locks = ShareAll()
3286
    self.needed_locks = {
3287
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
3288
      }
3289

    
3290
  def Exec(self, feedback_fn):
3291
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3292

    
3293
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3294
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3295
                           for group in group_names])