Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / cluster.py @ 5b6f9e35

History | View | Annotate | Download (123 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with the cluster."""
23

    
24
import OpenSSL
25

    
26
import copy
27
import itertools
28
import logging
29
import operator
30
import os
31
import re
32
import time
33

    
34
from ganeti import compat
35
from ganeti import constants
36
from ganeti import errors
37
from ganeti import hypervisor
38
from ganeti import locking
39
from ganeti import masterd
40
from ganeti import netutils
41
from ganeti import objects
42
from ganeti import opcodes
43
from ganeti import pathutils
44
from ganeti import query
45
import ganeti.rpc.node as rpc
46
from ganeti import runtime
47
from ganeti import ssh
48
from ganeti import uidpool
49
from ganeti import utils
50
from ganeti import vcluster
51

    
52
from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
53
  ResultWithJobs
54
from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55
  ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56
  GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57
  GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58
  CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59
  ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob, \
60
  CheckIpolicyVsDiskTemplates, CheckDiskAccessModeValidity, \
61
  CheckDiskAccessModeConsistency, AddNodeCertToCandidateCerts
62

    
63
import ganeti.masterd.instance
64

    
65

    
66
class LUClusterActivateMasterIp(NoHooksLU):
67
  """Activate the master IP on the master node.
68

69
  """
70
  def Exec(self, feedback_fn):
71
    """Activate the master IP.
72

73
    """
74
    master_params = self.cfg.GetMasterNetworkParameters()
75
    ems = self.cfg.GetUseExternalMipScript()
76
    result = self.rpc.call_node_activate_master_ip(master_params.uuid,
77
                                                   master_params, ems)
78
    result.Raise("Could not activate the master IP")
79

    
80

    
81
class LUClusterDeactivateMasterIp(NoHooksLU):
82
  """Deactivate the master IP on the master node.
83

84
  """
85
  def Exec(self, feedback_fn):
86
    """Deactivate the master IP.
87

88
    """
89
    master_params = self.cfg.GetMasterNetworkParameters()
90
    ems = self.cfg.GetUseExternalMipScript()
91
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
92
                                                     master_params, ems)
93
    result.Raise("Could not deactivate the master IP")
94

    
95

    
96
class LUClusterConfigQuery(NoHooksLU):
97
  """Return configuration values.
98

99
  """
100
  REQ_BGL = False
101

    
102
  def CheckArguments(self):
103
    self.cq = ClusterQuery(None, self.op.output_fields, False)
104

    
105
  def ExpandNames(self):
106
    self.cq.ExpandNames(self)
107

    
108
  def DeclareLocks(self, level):
109
    self.cq.DeclareLocks(self, level)
110

    
111
  def Exec(self, feedback_fn):
112
    result = self.cq.OldStyleQuery(self)
113

    
114
    assert len(result) == 1
115

    
116
    return result[0]
117

    
118

    
119
class LUClusterDestroy(LogicalUnit):
120
  """Logical unit for destroying the cluster.
121

122
  """
123
  HPATH = "cluster-destroy"
124
  HTYPE = constants.HTYPE_CLUSTER
125

    
126
  def BuildHooksEnv(self):
127
    """Build hooks env.
128

129
    """
130
    return {
131
      "OP_TARGET": self.cfg.GetClusterName(),
132
      }
133

    
134
  def BuildHooksNodes(self):
135
    """Build hooks nodes.
136

137
    """
138
    return ([], [])
139

    
140
  def CheckPrereq(self):
141
    """Check prerequisites.
142

143
    This checks whether the cluster is empty.
144

145
    Any errors are signaled by raising errors.OpPrereqError.
146

147
    """
148
    master = self.cfg.GetMasterNode()
149

    
150
    nodelist = self.cfg.GetNodeList()
151
    if len(nodelist) != 1 or nodelist[0] != master:
152
      raise errors.OpPrereqError("There are still %d node(s) in"
153
                                 " this cluster." % (len(nodelist) - 1),
154
                                 errors.ECODE_INVAL)
155
    instancelist = self.cfg.GetInstanceList()
156
    if instancelist:
157
      raise errors.OpPrereqError("There are still %d instance(s) in"
158
                                 " this cluster." % len(instancelist),
159
                                 errors.ECODE_INVAL)
160

    
161
  def Exec(self, feedback_fn):
162
    """Destroys the cluster.
163

164
    """
165
    master_params = self.cfg.GetMasterNetworkParameters()
166

    
167
    # Run post hooks on master node before it's removed
168
    RunPostHook(self, self.cfg.GetNodeName(master_params.uuid))
169

    
170
    ems = self.cfg.GetUseExternalMipScript()
171
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
172
                                                     master_params, ems)
173
    result.Warn("Error disabling the master IP address", self.LogWarning)
174
    return master_params.uuid
175

    
176

    
177
class LUClusterPostInit(LogicalUnit):
178
  """Logical unit for running hooks after cluster initialization.
179

180
  """
181
  HPATH = "cluster-init"
182
  HTYPE = constants.HTYPE_CLUSTER
183

    
184
  def CheckArguments(self):
185
    self.master_uuid = self.cfg.GetMasterNode()
186
    self.master_ndparams = self.cfg.GetNdParams(self.cfg.GetMasterNodeInfo())
187

    
188
    # TODO: When Issue 584 is solved, and None is properly parsed when used
189
    # as a default value, ndparams.get(.., None) can be changed to
190
    # ndparams[..] to access the values directly
191

    
192
    # OpenvSwitch: Warn user if link is missing
193
    if (self.master_ndparams[constants.ND_OVS] and not
194
        self.master_ndparams.get(constants.ND_OVS_LINK, None)):
195
      self.LogInfo("No physical interface for OpenvSwitch was given."
196
                   " OpenvSwitch will not have an outside connection. This"
197
                   " might not be what you want.")
198

    
199
  def BuildHooksEnv(self):
200
    """Build hooks env.
201

202
    """
203
    return {
204
      "OP_TARGET": self.cfg.GetClusterName(),
205
      }
206

    
207
  def BuildHooksNodes(self):
208
    """Build hooks nodes.
209

210
    """
211
    return ([], [self.cfg.GetMasterNode()])
212

    
213
  def Exec(self, feedback_fn):
214
    """Create and configure Open vSwitch
215

216
    """
217
    if self.master_ndparams[constants.ND_OVS]:
218
      result = self.rpc.call_node_configure_ovs(
219
                 self.master_uuid,
220
                 self.master_ndparams[constants.ND_OVS_NAME],
221
                 self.master_ndparams.get(constants.ND_OVS_LINK, None))
222
      result.Raise("Could not successully configure Open vSwitch")
223

    
224
    AddNodeCertToCandidateCerts(self, self.master_uuid,
225
                                self.cfg.GetClusterInfo())
226

    
227
    return True
228

    
229

    
230
class ClusterQuery(QueryBase):
231
  FIELDS = query.CLUSTER_FIELDS
232

    
233
  #: Do not sort (there is only one item)
234
  SORT_FIELD = None
235

    
236
  def ExpandNames(self, lu):
237
    lu.needed_locks = {}
238

    
239
    # The following variables interact with _QueryBase._GetNames
240
    self.wanted = locking.ALL_SET
241
    self.do_locking = self.use_locking
242

    
243
    if self.do_locking:
244
      raise errors.OpPrereqError("Can not use locking for cluster queries",
245
                                 errors.ECODE_INVAL)
246

    
247
  def DeclareLocks(self, lu, level):
248
    pass
249

    
250
  def _GetQueryData(self, lu):
251
    """Computes the list of nodes and their attributes.
252

253
    """
254
    # Locking is not used
255
    assert not (compat.any(lu.glm.is_owned(level)
256
                           for level in locking.LEVELS
257
                           if level != locking.LEVEL_CLUSTER) or
258
                self.do_locking or self.use_locking)
259

    
260
    if query.CQ_CONFIG in self.requested_data:
261
      cluster = lu.cfg.GetClusterInfo()
262
      nodes = lu.cfg.GetAllNodesInfo()
263
    else:
264
      cluster = NotImplemented
265
      nodes = NotImplemented
266

    
267
    if query.CQ_QUEUE_DRAINED in self.requested_data:
268
      drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
269
    else:
270
      drain_flag = NotImplemented
271

    
272
    if query.CQ_WATCHER_PAUSE in self.requested_data:
273
      master_node_uuid = lu.cfg.GetMasterNode()
274

    
275
      result = lu.rpc.call_get_watcher_pause(master_node_uuid)
276
      result.Raise("Can't retrieve watcher pause from master node '%s'" %
277
                   lu.cfg.GetMasterNodeName())
278

    
279
      watcher_pause = result.payload
280
    else:
281
      watcher_pause = NotImplemented
282

    
283
    return query.ClusterQueryData(cluster, nodes, drain_flag, watcher_pause)
284

    
285

    
286
class LUClusterQuery(NoHooksLU):
287
  """Query cluster configuration.
288

289
  """
290
  REQ_BGL = False
291

    
292
  def ExpandNames(self):
293
    self.needed_locks = {}
294

    
295
  def Exec(self, feedback_fn):
296
    """Return cluster config.
297

298
    """
299
    cluster = self.cfg.GetClusterInfo()
300
    os_hvp = {}
301

    
302
    # Filter just for enabled hypervisors
303
    for os_name, hv_dict in cluster.os_hvp.items():
304
      os_hvp[os_name] = {}
305
      for hv_name, hv_params in hv_dict.items():
306
        if hv_name in cluster.enabled_hypervisors:
307
          os_hvp[os_name][hv_name] = hv_params
308

    
309
    # Convert ip_family to ip_version
310
    primary_ip_version = constants.IP4_VERSION
311
    if cluster.primary_ip_family == netutils.IP6Address.family:
312
      primary_ip_version = constants.IP6_VERSION
313

    
314
    result = {
315
      "software_version": constants.RELEASE_VERSION,
316
      "protocol_version": constants.PROTOCOL_VERSION,
317
      "config_version": constants.CONFIG_VERSION,
318
      "os_api_version": max(constants.OS_API_VERSIONS),
319
      "export_version": constants.EXPORT_VERSION,
320
      "vcs_version": constants.VCS_VERSION,
321
      "architecture": runtime.GetArchInfo(),
322
      "name": cluster.cluster_name,
323
      "master": self.cfg.GetMasterNodeName(),
324
      "default_hypervisor": cluster.primary_hypervisor,
325
      "enabled_hypervisors": cluster.enabled_hypervisors,
326
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
327
                        for hypervisor_name in cluster.enabled_hypervisors]),
328
      "os_hvp": os_hvp,
329
      "beparams": cluster.beparams,
330
      "osparams": cluster.osparams,
331
      "ipolicy": cluster.ipolicy,
332
      "nicparams": cluster.nicparams,
333
      "ndparams": cluster.ndparams,
334
      "diskparams": cluster.diskparams,
335
      "candidate_pool_size": cluster.candidate_pool_size,
336
      "master_netdev": cluster.master_netdev,
337
      "master_netmask": cluster.master_netmask,
338
      "use_external_mip_script": cluster.use_external_mip_script,
339
      "volume_group_name": cluster.volume_group_name,
340
      "drbd_usermode_helper": cluster.drbd_usermode_helper,
341
      "file_storage_dir": cluster.file_storage_dir,
342
      "shared_file_storage_dir": cluster.shared_file_storage_dir,
343
      "maintain_node_health": cluster.maintain_node_health,
344
      "ctime": cluster.ctime,
345
      "mtime": cluster.mtime,
346
      "uuid": cluster.uuid,
347
      "tags": list(cluster.GetTags()),
348
      "uid_pool": cluster.uid_pool,
349
      "default_iallocator": cluster.default_iallocator,
350
      "default_iallocator_params": cluster.default_iallocator_params,
351
      "reserved_lvs": cluster.reserved_lvs,
352
      "primary_ip_version": primary_ip_version,
353
      "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
354
      "hidden_os": cluster.hidden_os,
355
      "blacklisted_os": cluster.blacklisted_os,
356
      "enabled_disk_templates": cluster.enabled_disk_templates,
357
      }
358

    
359
    return result
360

    
361

    
362
class LUClusterRedistConf(NoHooksLU):
363
  """Force the redistribution of cluster configuration.
364

365
  This is a very simple LU.
366

367
  """
368
  REQ_BGL = False
369

    
370
  def ExpandNames(self):
371
    self.needed_locks = {
372
      locking.LEVEL_NODE: locking.ALL_SET,
373
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
374
    }
375
    self.share_locks = ShareAll()
376

    
377
  def Exec(self, feedback_fn):
378
    """Redistribute the configuration.
379

380
    """
381
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
382
    RedistributeAncillaryFiles(self)
383

    
384

    
385
class LUClusterRename(LogicalUnit):
386
  """Rename the cluster.
387

388
  """
389
  HPATH = "cluster-rename"
390
  HTYPE = constants.HTYPE_CLUSTER
391

    
392
  def BuildHooksEnv(self):
393
    """Build hooks env.
394

395
    """
396
    return {
397
      "OP_TARGET": self.cfg.GetClusterName(),
398
      "NEW_NAME": self.op.name,
399
      }
400

    
401
  def BuildHooksNodes(self):
402
    """Build hooks nodes.
403

404
    """
405
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
406

    
407
  def CheckPrereq(self):
408
    """Verify that the passed name is a valid one.
409

410
    """
411
    hostname = netutils.GetHostname(name=self.op.name,
412
                                    family=self.cfg.GetPrimaryIPFamily())
413

    
414
    new_name = hostname.name
415
    self.ip = new_ip = hostname.ip
416
    old_name = self.cfg.GetClusterName()
417
    old_ip = self.cfg.GetMasterIP()
418
    if new_name == old_name and new_ip == old_ip:
419
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
420
                                 " cluster has changed",
421
                                 errors.ECODE_INVAL)
422
    if new_ip != old_ip:
423
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
424
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
425
                                   " reachable on the network" %
426
                                   new_ip, errors.ECODE_NOTUNIQUE)
427

    
428
    self.op.name = new_name
429

    
430
  def Exec(self, feedback_fn):
431
    """Rename the cluster.
432

433
    """
434
    clustername = self.op.name
435
    new_ip = self.ip
436

    
437
    # shutdown the master IP
438
    master_params = self.cfg.GetMasterNetworkParameters()
439
    ems = self.cfg.GetUseExternalMipScript()
440
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
441
                                                     master_params, ems)
442
    result.Raise("Could not disable the master role")
443

    
444
    try:
445
      cluster = self.cfg.GetClusterInfo()
446
      cluster.cluster_name = clustername
447
      cluster.master_ip = new_ip
448
      self.cfg.Update(cluster, feedback_fn)
449

    
450
      # update the known hosts file
451
      ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
452
      node_list = self.cfg.GetOnlineNodeList()
453
      try:
454
        node_list.remove(master_params.uuid)
455
      except ValueError:
456
        pass
457
      UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
458
    finally:
459
      master_params.ip = new_ip
460
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
461
                                                     master_params, ems)
462
      result.Warn("Could not re-enable the master role on the master,"
463
                  " please restart manually", self.LogWarning)
464

    
465
    return clustername
466

    
467

    
468
class LUClusterRepairDiskSizes(NoHooksLU):
469
  """Verifies the cluster disks sizes.
470

471
  """
472
  REQ_BGL = False
473

    
474
  def ExpandNames(self):
475
    if self.op.instances:
476
      (_, self.wanted_names) = GetWantedInstances(self, self.op.instances)
477
      # Not getting the node allocation lock as only a specific set of
478
      # instances (and their nodes) is going to be acquired
479
      self.needed_locks = {
480
        locking.LEVEL_NODE_RES: [],
481
        locking.LEVEL_INSTANCE: self.wanted_names,
482
        }
483
      self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
484
    else:
485
      self.wanted_names = None
486
      self.needed_locks = {
487
        locking.LEVEL_NODE_RES: locking.ALL_SET,
488
        locking.LEVEL_INSTANCE: locking.ALL_SET,
489

    
490
        # This opcode is acquires the node locks for all instances
491
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
492
        }
493

    
494
    self.share_locks = {
495
      locking.LEVEL_NODE_RES: 1,
496
      locking.LEVEL_INSTANCE: 0,
497
      locking.LEVEL_NODE_ALLOC: 1,
498
      }
499

    
500
  def DeclareLocks(self, level):
501
    if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
502
      self._LockInstancesNodes(primary_only=True, level=level)
503

    
504
  def CheckPrereq(self):
505
    """Check prerequisites.
506

507
    This only checks the optional instance list against the existing names.
508

509
    """
510
    if self.wanted_names is None:
511
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
512

    
513
    self.wanted_instances = \
514
        map(compat.snd, self.cfg.GetMultiInstanceInfoByName(self.wanted_names))
515

    
516
  def _EnsureChildSizes(self, disk):
517
    """Ensure children of the disk have the needed disk size.
518

519
    This is valid mainly for DRBD8 and fixes an issue where the
520
    children have smaller disk size.
521

522
    @param disk: an L{ganeti.objects.Disk} object
523

524
    """
525
    if disk.dev_type == constants.DT_DRBD8:
526
      assert disk.children, "Empty children for DRBD8?"
527
      fchild = disk.children[0]
528
      mismatch = fchild.size < disk.size
529
      if mismatch:
530
        self.LogInfo("Child disk has size %d, parent %d, fixing",
531
                     fchild.size, disk.size)
532
        fchild.size = disk.size
533

    
534
      # and we recurse on this child only, not on the metadev
535
      return self._EnsureChildSizes(fchild) or mismatch
536
    else:
537
      return False
538

    
539
  def Exec(self, feedback_fn):
540
    """Verify the size of cluster disks.
541

542
    """
543
    # TODO: check child disks too
544
    # TODO: check differences in size between primary/secondary nodes
545
    per_node_disks = {}
546
    for instance in self.wanted_instances:
547
      pnode = instance.primary_node
548
      if pnode not in per_node_disks:
549
        per_node_disks[pnode] = []
550
      for idx, disk in enumerate(instance.disks):
551
        per_node_disks[pnode].append((instance, idx, disk))
552

    
553
    assert not (frozenset(per_node_disks.keys()) -
554
                self.owned_locks(locking.LEVEL_NODE_RES)), \
555
      "Not owning correct locks"
556
    assert not self.owned_locks(locking.LEVEL_NODE)
557

    
558
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
559
                                               per_node_disks.keys())
560

    
561
    changed = []
562
    for node_uuid, dskl in per_node_disks.items():
563
      if not dskl:
564
        # no disks on the node
565
        continue
566

    
567
      newl = [([v[2].Copy()], v[0]) for v in dskl]
568
      node_name = self.cfg.GetNodeName(node_uuid)
569
      result = self.rpc.call_blockdev_getdimensions(node_uuid, newl)
570
      if result.fail_msg:
571
        self.LogWarning("Failure in blockdev_getdimensions call to node"
572
                        " %s, ignoring", node_name)
573
        continue
574
      if len(result.payload) != len(dskl):
575
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
576
                        " result.payload=%s", node_name, len(dskl),
577
                        result.payload)
578
        self.LogWarning("Invalid result from node %s, ignoring node results",
579
                        node_name)
580
        continue
581
      for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
582
        if dimensions is None:
583
          self.LogWarning("Disk %d of instance %s did not return size"
584
                          " information, ignoring", idx, instance.name)
585
          continue
586
        if not isinstance(dimensions, (tuple, list)):
587
          self.LogWarning("Disk %d of instance %s did not return valid"
588
                          " dimension information, ignoring", idx,
589
                          instance.name)
590
          continue
591
        (size, spindles) = dimensions
592
        if not isinstance(size, (int, long)):
593
          self.LogWarning("Disk %d of instance %s did not return valid"
594
                          " size information, ignoring", idx, instance.name)
595
          continue
596
        size = size >> 20
597
        if size != disk.size:
598
          self.LogInfo("Disk %d of instance %s has mismatched size,"
599
                       " correcting: recorded %d, actual %d", idx,
600
                       instance.name, disk.size, size)
601
          disk.size = size
602
          self.cfg.Update(instance, feedback_fn)
603
          changed.append((instance.name, idx, "size", size))
604
        if es_flags[node_uuid]:
605
          if spindles is None:
606
            self.LogWarning("Disk %d of instance %s did not return valid"
607
                            " spindles information, ignoring", idx,
608
                            instance.name)
609
          elif disk.spindles is None or disk.spindles != spindles:
610
            self.LogInfo("Disk %d of instance %s has mismatched spindles,"
611
                         " correcting: recorded %s, actual %s",
612
                         idx, instance.name, disk.spindles, spindles)
613
            disk.spindles = spindles
614
            self.cfg.Update(instance, feedback_fn)
615
            changed.append((instance.name, idx, "spindles", disk.spindles))
616
        if self._EnsureChildSizes(disk):
617
          self.cfg.Update(instance, feedback_fn)
618
          changed.append((instance.name, idx, "size", disk.size))
619
    return changed
620

    
621

    
622
def _ValidateNetmask(cfg, netmask):
623
  """Checks if a netmask is valid.
624

625
  @type cfg: L{config.ConfigWriter}
626
  @param cfg: The cluster configuration
627
  @type netmask: int
628
  @param netmask: the netmask to be verified
629
  @raise errors.OpPrereqError: if the validation fails
630

631
  """
632
  ip_family = cfg.GetPrimaryIPFamily()
633
  try:
634
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
635
  except errors.ProgrammerError:
636
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
637
                               ip_family, errors.ECODE_INVAL)
638
  if not ipcls.ValidateNetmask(netmask):
639
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
640
                               (netmask), errors.ECODE_INVAL)
641

    
642

    
643
def CheckFileBasedStoragePathVsEnabledDiskTemplates(
644
    logging_warn_fn, file_storage_dir, enabled_disk_templates,
645
    file_disk_template):
646
  """Checks whether the given file-based storage directory is acceptable.
647

648
  Note: This function is public, because it is also used in bootstrap.py.
649

650
  @type logging_warn_fn: function
651
  @param logging_warn_fn: function which accepts a string and logs it
652
  @type file_storage_dir: string
653
  @param file_storage_dir: the directory to be used for file-based instances
654
  @type enabled_disk_templates: list of string
655
  @param enabled_disk_templates: the list of enabled disk templates
656
  @type file_disk_template: string
657
  @param file_disk_template: the file-based disk template for which the
658
      path should be checked
659

660
  """
661
  assert (file_disk_template in utils.storage.GetDiskTemplatesOfStorageTypes(
662
            constants.ST_FILE, constants.ST_SHARED_FILE
663
         ))
664
  file_storage_enabled = file_disk_template in enabled_disk_templates
665
  if file_storage_dir is not None:
666
    if file_storage_dir == "":
667
      if file_storage_enabled:
668
        raise errors.OpPrereqError(
669
            "Unsetting the '%s' storage directory while having '%s' storage"
670
            " enabled is not permitted." %
671
            (file_disk_template, file_disk_template))
672
    else:
673
      if not file_storage_enabled:
674
        logging_warn_fn(
675
            "Specified a %s storage directory, although %s storage is not"
676
            " enabled." % (file_disk_template, file_disk_template))
677
  else:
678
    raise errors.ProgrammerError("Received %s storage dir with value"
679
                                 " 'None'." % file_disk_template)
680

    
681

    
682
def CheckFileStoragePathVsEnabledDiskTemplates(
683
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
684
  """Checks whether the given file storage directory is acceptable.
685

686
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
687

688
  """
689
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
690
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
691
      constants.DT_FILE)
692

    
693

    
694
def CheckSharedFileStoragePathVsEnabledDiskTemplates(
695
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
696
  """Checks whether the given shared file storage directory is acceptable.
697

698
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
699

700
  """
701
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
702
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
703
      constants.DT_SHARED_FILE)
704

    
705

    
706
class LUClusterSetParams(LogicalUnit):
707
  """Change the parameters of the cluster.
708

709
  """
710
  HPATH = "cluster-modify"
711
  HTYPE = constants.HTYPE_CLUSTER
712
  REQ_BGL = False
713

    
714
  def CheckArguments(self):
715
    """Check parameters
716

717
    """
718
    if self.op.uid_pool:
719
      uidpool.CheckUidPool(self.op.uid_pool)
720

    
721
    if self.op.add_uids:
722
      uidpool.CheckUidPool(self.op.add_uids)
723

    
724
    if self.op.remove_uids:
725
      uidpool.CheckUidPool(self.op.remove_uids)
726

    
727
    if self.op.master_netmask is not None:
728
      _ValidateNetmask(self.cfg, self.op.master_netmask)
729

    
730
    if self.op.diskparams:
731
      for dt_params in self.op.diskparams.values():
732
        utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
733
      try:
734
        utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
735
        CheckDiskAccessModeValidity(self.op.diskparams)
736
      except errors.OpPrereqError, err:
737
        raise errors.OpPrereqError("While verify diskparams options: %s" % err,
738
                                   errors.ECODE_INVAL)
739

    
740
  def ExpandNames(self):
741
    # FIXME: in the future maybe other cluster params won't require checking on
742
    # all nodes to be modified.
743
    # FIXME: This opcode changes cluster-wide settings. Is acquiring all
744
    # resource locks the right thing, shouldn't it be the BGL instead?
745
    self.needed_locks = {
746
      locking.LEVEL_NODE: locking.ALL_SET,
747
      locking.LEVEL_INSTANCE: locking.ALL_SET,
748
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
749
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
750
    }
751
    self.share_locks = ShareAll()
752

    
753
  def BuildHooksEnv(self):
754
    """Build hooks env.
755

756
    """
757
    return {
758
      "OP_TARGET": self.cfg.GetClusterName(),
759
      "NEW_VG_NAME": self.op.vg_name,
760
      }
761

    
762
  def BuildHooksNodes(self):
763
    """Build hooks nodes.
764

765
    """
766
    mn = self.cfg.GetMasterNode()
767
    return ([mn], [mn])
768

    
769
  def _CheckVgName(self, node_uuids, enabled_disk_templates,
770
                   new_enabled_disk_templates):
771
    """Check the consistency of the vg name on all nodes and in case it gets
772
       unset whether there are instances still using it.
773

774
    """
775
    lvm_is_enabled = utils.IsLvmEnabled(enabled_disk_templates)
776
    lvm_gets_enabled = utils.LvmGetsEnabled(enabled_disk_templates,
777
                                            new_enabled_disk_templates)
778
    current_vg_name = self.cfg.GetVGName()
779

    
780
    if self.op.vg_name == '':
781
      if lvm_is_enabled:
782
        raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
783
                                   " disk templates are or get enabled.")
784

    
785
    if self.op.vg_name is None:
786
      if current_vg_name is None and lvm_is_enabled:
787
        raise errors.OpPrereqError("Please specify a volume group when"
788
                                   " enabling lvm-based disk-templates.")
789

    
790
    if self.op.vg_name is not None and not self.op.vg_name:
791
      if self.cfg.HasAnyDiskOfType(constants.DT_PLAIN):
792
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
793
                                   " instances exist", errors.ECODE_INVAL)
794

    
795
    if (self.op.vg_name is not None and lvm_is_enabled) or \
796
        (self.cfg.GetVGName() is not None and lvm_gets_enabled):
797
      self._CheckVgNameOnNodes(node_uuids)
798

    
799
  def _CheckVgNameOnNodes(self, node_uuids):
800
    """Check the status of the volume group on each node.
801

802
    """
803
    vglist = self.rpc.call_vg_list(node_uuids)
804
    for node_uuid in node_uuids:
805
      msg = vglist[node_uuid].fail_msg
806
      if msg:
807
        # ignoring down node
808
        self.LogWarning("Error while gathering data on node %s"
809
                        " (ignoring node): %s",
810
                        self.cfg.GetNodeName(node_uuid), msg)
811
        continue
812
      vgstatus = utils.CheckVolumeGroupSize(vglist[node_uuid].payload,
813
                                            self.op.vg_name,
814
                                            constants.MIN_VG_SIZE)
815
      if vgstatus:
816
        raise errors.OpPrereqError("Error on node '%s': %s" %
817
                                   (self.cfg.GetNodeName(node_uuid), vgstatus),
818
                                   errors.ECODE_ENVIRON)
819

    
820
  @staticmethod
821
  def _GetDiskTemplateSetsInner(op_enabled_disk_templates,
822
                                old_enabled_disk_templates):
823
    """Computes three sets of disk templates.
824

825
    @see: C{_GetDiskTemplateSets} for more details.
826

827
    """
828
    enabled_disk_templates = None
829
    new_enabled_disk_templates = []
830
    disabled_disk_templates = []
831
    if op_enabled_disk_templates:
832
      enabled_disk_templates = op_enabled_disk_templates
833
      new_enabled_disk_templates = \
834
        list(set(enabled_disk_templates)
835
             - set(old_enabled_disk_templates))
836
      disabled_disk_templates = \
837
        list(set(old_enabled_disk_templates)
838
             - set(enabled_disk_templates))
839
    else:
840
      enabled_disk_templates = old_enabled_disk_templates
841
    return (enabled_disk_templates, new_enabled_disk_templates,
842
            disabled_disk_templates)
843

    
844
  def _GetDiskTemplateSets(self, cluster):
845
    """Computes three sets of disk templates.
846

847
    The three sets are:
848
      - disk templates that will be enabled after this operation (no matter if
849
        they were enabled before or not)
850
      - disk templates that get enabled by this operation (thus haven't been
851
        enabled before.)
852
      - disk templates that get disabled by this operation
853

854
    """
855
    return self._GetDiskTemplateSetsInner(self.op.enabled_disk_templates,
856
                                          cluster.enabled_disk_templates)
857

    
858
  def _CheckIpolicy(self, cluster, enabled_disk_templates):
859
    """Checks the ipolicy.
860

861
    @type cluster: C{objects.Cluster}
862
    @param cluster: the cluster's configuration
863
    @type enabled_disk_templates: list of string
864
    @param enabled_disk_templates: list of (possibly newly) enabled disk
865
      templates
866

867
    """
868
    # FIXME: write unit tests for this
869
    if self.op.ipolicy:
870
      self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
871
                                           group_policy=False)
872

    
873
      CheckIpolicyVsDiskTemplates(self.new_ipolicy,
874
                                  enabled_disk_templates)
875

    
876
      all_instances = self.cfg.GetAllInstancesInfo().values()
877
      violations = set()
878
      for group in self.cfg.GetAllNodeGroupsInfo().values():
879
        instances = frozenset([inst for inst in all_instances
880
                               if compat.any(nuuid in group.members
881
                                             for nuuid in inst.all_nodes)])
882
        new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
883
        ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
884
        new = ComputeNewInstanceViolations(ipol, new_ipolicy, instances,
885
                                           self.cfg)
886
        if new:
887
          violations.update(new)
888

    
889
      if violations:
890
        self.LogWarning("After the ipolicy change the following instances"
891
                        " violate them: %s",
892
                        utils.CommaJoin(utils.NiceSort(violations)))
893
    else:
894
      CheckIpolicyVsDiskTemplates(cluster.ipolicy,
895
                                  enabled_disk_templates)
896

    
897
  def _CheckDrbdHelperOnNodes(self, drbd_helper, node_uuids):
898
    """Checks whether the set DRBD helper actually exists on the nodes.
899

900
    @type drbd_helper: string
901
    @param drbd_helper: path of the drbd usermode helper binary
902
    @type node_uuids: list of strings
903
    @param node_uuids: list of node UUIDs to check for the helper
904

905
    """
906
    # checks given drbd helper on all nodes
907
    helpers = self.rpc.call_drbd_helper(node_uuids)
908
    for (_, ninfo) in self.cfg.GetMultiNodeInfo(node_uuids):
909
      if ninfo.offline:
910
        self.LogInfo("Not checking drbd helper on offline node %s",
911
                     ninfo.name)
912
        continue
913
      msg = helpers[ninfo.uuid].fail_msg
914
      if msg:
915
        raise errors.OpPrereqError("Error checking drbd helper on node"
916
                                   " '%s': %s" % (ninfo.name, msg),
917
                                   errors.ECODE_ENVIRON)
918
      node_helper = helpers[ninfo.uuid].payload
919
      if node_helper != drbd_helper:
920
        raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
921
                                   (ninfo.name, node_helper),
922
                                   errors.ECODE_ENVIRON)
923

    
924
  def _CheckDrbdHelper(self, node_uuids, drbd_enabled, drbd_gets_enabled):
925
    """Check the DRBD usermode helper.
926

927
    @type node_uuids: list of strings
928
    @param node_uuids: a list of nodes' UUIDs
929
    @type drbd_enabled: boolean
930
    @param drbd_enabled: whether DRBD will be enabled after this operation
931
      (no matter if it was disabled before or not)
932
    @type drbd_gets_enabled: boolen
933
    @param drbd_gets_enabled: true if DRBD was disabled before this
934
      operation, but will be enabled afterwards
935

936
    """
937
    if self.op.drbd_helper == '':
938
      if drbd_enabled:
939
        raise errors.OpPrereqError("Cannot disable drbd helper while"
940
                                   " DRBD is enabled.")
941
      if self.cfg.HasAnyDiskOfType(constants.DT_DRBD8):
942
        raise errors.OpPrereqError("Cannot disable drbd helper while"
943
                                   " drbd-based instances exist",
944
                                   errors.ECODE_INVAL)
945

    
946
    else:
947
      if self.op.drbd_helper is not None and drbd_enabled:
948
        self._CheckDrbdHelperOnNodes(self.op.drbd_helper, node_uuids)
949
      else:
950
        if drbd_gets_enabled:
951
          current_drbd_helper = self.cfg.GetClusterInfo().drbd_usermode_helper
952
          if current_drbd_helper is not None:
953
            self._CheckDrbdHelperOnNodes(current_drbd_helper, node_uuids)
954
          else:
955
            raise errors.OpPrereqError("Cannot enable DRBD without a"
956
                                       " DRBD usermode helper set.")
957

    
958
  def _CheckInstancesOfDisabledDiskTemplates(
959
      self, disabled_disk_templates):
960
    """Check whether we try to disable a disk template that is in use.
961

962
    @type disabled_disk_templates: list of string
963
    @param disabled_disk_templates: list of disk templates that are going to
964
      be disabled by this operation
965

966
    """
967
    for disk_template in disabled_disk_templates:
968
      if self.cfg.HasAnyDiskOfType(disk_template):
969
        raise errors.OpPrereqError(
970
            "Cannot disable disk template '%s', because there is at least one"
971
            " instance using it." % disk_template)
972

    
973
  def CheckPrereq(self):
974
    """Check prerequisites.
975

976
    This checks whether the given params don't conflict and
977
    if the given volume group is valid.
978

979
    """
980
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
981
    self.cluster = cluster = self.cfg.GetClusterInfo()
982

    
983
    vm_capable_node_uuids = [node.uuid
984
                             for node in self.cfg.GetAllNodesInfo().values()
985
                             if node.uuid in node_uuids and node.vm_capable]
986

    
987
    (enabled_disk_templates, new_enabled_disk_templates,
988
      disabled_disk_templates) = self._GetDiskTemplateSets(cluster)
989
    self._CheckInstancesOfDisabledDiskTemplates(disabled_disk_templates)
990

    
991
    self._CheckVgName(vm_capable_node_uuids, enabled_disk_templates,
992
                      new_enabled_disk_templates)
993

    
994
    if self.op.file_storage_dir is not None:
995
      CheckFileStoragePathVsEnabledDiskTemplates(
996
          self.LogWarning, self.op.file_storage_dir, enabled_disk_templates)
997

    
998
    if self.op.shared_file_storage_dir is not None:
999
      CheckSharedFileStoragePathVsEnabledDiskTemplates(
1000
          self.LogWarning, self.op.shared_file_storage_dir,
1001
          enabled_disk_templates)
1002

    
1003
    drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
1004
    drbd_gets_enabled = constants.DT_DRBD8 in new_enabled_disk_templates
1005
    self._CheckDrbdHelper(node_uuids, drbd_enabled, drbd_gets_enabled)
1006

    
1007
    # validate params changes
1008
    if self.op.beparams:
1009
      objects.UpgradeBeParams(self.op.beparams)
1010
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1011
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
1012

    
1013
    if self.op.ndparams:
1014
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
1015
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
1016

    
1017
      # TODO: we need a more general way to handle resetting
1018
      # cluster-level parameters to default values
1019
      if self.new_ndparams["oob_program"] == "":
1020
        self.new_ndparams["oob_program"] = \
1021
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
1022

    
1023
    if self.op.hv_state:
1024
      new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
1025
                                           self.cluster.hv_state_static)
1026
      self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
1027
                               for hv, values in new_hv_state.items())
1028

    
1029
    if self.op.disk_state:
1030
      new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
1031
                                               self.cluster.disk_state_static)
1032
      self.new_disk_state = \
1033
        dict((storage, dict((name, cluster.SimpleFillDiskState(values))
1034
                            for name, values in svalues.items()))
1035
             for storage, svalues in new_disk_state.items())
1036

    
1037
    self._CheckIpolicy(cluster, enabled_disk_templates)
1038

    
1039
    if self.op.nicparams:
1040
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1041
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
1042
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
1043
      nic_errors = []
1044

    
1045
      # check all instances for consistency
1046
      for instance in self.cfg.GetAllInstancesInfo().values():
1047
        for nic_idx, nic in enumerate(instance.nics):
1048
          params_copy = copy.deepcopy(nic.nicparams)
1049
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
1050

    
1051
          # check parameter syntax
1052
          try:
1053
            objects.NIC.CheckParameterSyntax(params_filled)
1054
          except errors.ConfigurationError, err:
1055
            nic_errors.append("Instance %s, nic/%d: %s" %
1056
                              (instance.name, nic_idx, err))
1057

    
1058
          # if we're moving instances to routed, check that they have an ip
1059
          target_mode = params_filled[constants.NIC_MODE]
1060
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
1061
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
1062
                              " address" % (instance.name, nic_idx))
1063
      if nic_errors:
1064
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
1065
                                   "\n".join(nic_errors), errors.ECODE_INVAL)
1066

    
1067
    # hypervisor list/parameters
1068
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
1069
    if self.op.hvparams:
1070
      for hv_name, hv_dict in self.op.hvparams.items():
1071
        if hv_name not in self.new_hvparams:
1072
          self.new_hvparams[hv_name] = hv_dict
1073
        else:
1074
          self.new_hvparams[hv_name].update(hv_dict)
1075

    
1076
    # disk template parameters
1077
    self.new_diskparams = objects.FillDict(cluster.diskparams, {})
1078
    if self.op.diskparams:
1079
      for dt_name, dt_params in self.op.diskparams.items():
1080
        if dt_name not in self.new_diskparams:
1081
          self.new_diskparams[dt_name] = dt_params
1082
        else:
1083
          self.new_diskparams[dt_name].update(dt_params)
1084
      CheckDiskAccessModeConsistency(self.op.diskparams, self.cfg)
1085

    
1086
    # os hypervisor parameters
1087
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
1088
    if self.op.os_hvp:
1089
      for os_name, hvs in self.op.os_hvp.items():
1090
        if os_name not in self.new_os_hvp:
1091
          self.new_os_hvp[os_name] = hvs
1092
        else:
1093
          for hv_name, hv_dict in hvs.items():
1094
            if hv_dict is None:
1095
              # Delete if it exists
1096
              self.new_os_hvp[os_name].pop(hv_name, None)
1097
            elif hv_name not in self.new_os_hvp[os_name]:
1098
              self.new_os_hvp[os_name][hv_name] = hv_dict
1099
            else:
1100
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
1101

    
1102
    # os parameters
1103
    self.new_osp = objects.FillDict(cluster.osparams, {})
1104
    if self.op.osparams:
1105
      for os_name, osp in self.op.osparams.items():
1106
        if os_name not in self.new_osp:
1107
          self.new_osp[os_name] = {}
1108

    
1109
        self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
1110
                                                 use_none=True)
1111

    
1112
        if not self.new_osp[os_name]:
1113
          # we removed all parameters
1114
          del self.new_osp[os_name]
1115
        else:
1116
          # check the parameter validity (remote check)
1117
          CheckOSParams(self, False, [self.cfg.GetMasterNode()],
1118
                        os_name, self.new_osp[os_name])
1119

    
1120
    # changes to the hypervisor list
1121
    if self.op.enabled_hypervisors is not None:
1122
      self.hv_list = self.op.enabled_hypervisors
1123
      for hv in self.hv_list:
1124
        # if the hypervisor doesn't already exist in the cluster
1125
        # hvparams, we initialize it to empty, and then (in both
1126
        # cases) we make sure to fill the defaults, as we might not
1127
        # have a complete defaults list if the hypervisor wasn't
1128
        # enabled before
1129
        if hv not in new_hvp:
1130
          new_hvp[hv] = {}
1131
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
1132
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
1133
    else:
1134
      self.hv_list = cluster.enabled_hypervisors
1135

    
1136
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1137
      # either the enabled list has changed, or the parameters have, validate
1138
      for hv_name, hv_params in self.new_hvparams.items():
1139
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1140
            (self.op.enabled_hypervisors and
1141
             hv_name in self.op.enabled_hypervisors)):
1142
          # either this is a new hypervisor, or its parameters have changed
1143
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1144
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1145
          hv_class.CheckParameterSyntax(hv_params)
1146
          CheckHVParams(self, node_uuids, hv_name, hv_params)
1147

    
1148
    self._CheckDiskTemplateConsistency()
1149

    
1150
    if self.op.os_hvp:
1151
      # no need to check any newly-enabled hypervisors, since the
1152
      # defaults have already been checked in the above code-block
1153
      for os_name, os_hvp in self.new_os_hvp.items():
1154
        for hv_name, hv_params in os_hvp.items():
1155
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1156
          # we need to fill in the new os_hvp on top of the actual hv_p
1157
          cluster_defaults = self.new_hvparams.get(hv_name, {})
1158
          new_osp = objects.FillDict(cluster_defaults, hv_params)
1159
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1160
          hv_class.CheckParameterSyntax(new_osp)
1161
          CheckHVParams(self, node_uuids, hv_name, new_osp)
1162

    
1163
    if self.op.default_iallocator:
1164
      alloc_script = utils.FindFile(self.op.default_iallocator,
1165
                                    constants.IALLOCATOR_SEARCH_PATH,
1166
                                    os.path.isfile)
1167
      if alloc_script is None:
1168
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
1169
                                   " specified" % self.op.default_iallocator,
1170
                                   errors.ECODE_INVAL)
1171

    
1172
  def _CheckDiskTemplateConsistency(self):
1173
    """Check whether the disk templates that are going to be disabled
1174
       are still in use by some instances.
1175

1176
    """
1177
    if self.op.enabled_disk_templates:
1178
      cluster = self.cfg.GetClusterInfo()
1179
      instances = self.cfg.GetAllInstancesInfo()
1180

    
1181
      disk_templates_to_remove = set(cluster.enabled_disk_templates) \
1182
        - set(self.op.enabled_disk_templates)
1183
      for instance in instances.itervalues():
1184
        if instance.disk_template in disk_templates_to_remove:
1185
          raise errors.OpPrereqError("Cannot disable disk template '%s',"
1186
                                     " because instance '%s' is using it." %
1187
                                     (instance.disk_template, instance.name))
1188

    
1189
  def _SetVgName(self, feedback_fn):
1190
    """Determines and sets the new volume group name.
1191

1192
    """
1193
    if self.op.vg_name is not None:
1194
      new_volume = self.op.vg_name
1195
      if not new_volume:
1196
        new_volume = None
1197
      if new_volume != self.cfg.GetVGName():
1198
        self.cfg.SetVGName(new_volume)
1199
      else:
1200
        feedback_fn("Cluster LVM configuration already in desired"
1201
                    " state, not changing")
1202

    
1203
  def _SetFileStorageDir(self, feedback_fn):
1204
    """Set the file storage directory.
1205

1206
    """
1207
    if self.op.file_storage_dir is not None:
1208
      if self.cluster.file_storage_dir == self.op.file_storage_dir:
1209
        feedback_fn("Global file storage dir already set to value '%s'"
1210
                    % self.cluster.file_storage_dir)
1211
      else:
1212
        self.cluster.file_storage_dir = self.op.file_storage_dir
1213

    
1214
  def _SetDrbdHelper(self, feedback_fn):
1215
    """Set the DRBD usermode helper.
1216

1217
    """
1218
    if self.op.drbd_helper is not None:
1219
      if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
1220
        feedback_fn("Note that you specified a drbd user helper, but did not"
1221
                    " enable the drbd disk template.")
1222
      new_helper = self.op.drbd_helper
1223
      if not new_helper:
1224
        new_helper = None
1225
      if new_helper != self.cfg.GetDRBDHelper():
1226
        self.cfg.SetDRBDHelper(new_helper)
1227
      else:
1228
        feedback_fn("Cluster DRBD helper already in desired state,"
1229
                    " not changing")
1230

    
1231
  def Exec(self, feedback_fn):
1232
    """Change the parameters of the cluster.
1233

1234
    """
1235
    if self.op.enabled_disk_templates:
1236
      self.cluster.enabled_disk_templates = \
1237
        list(self.op.enabled_disk_templates)
1238

    
1239
    self._SetVgName(feedback_fn)
1240
    self._SetFileStorageDir(feedback_fn)
1241
    self._SetDrbdHelper(feedback_fn)
1242

    
1243
    if self.op.hvparams:
1244
      self.cluster.hvparams = self.new_hvparams
1245
    if self.op.os_hvp:
1246
      self.cluster.os_hvp = self.new_os_hvp
1247
    if self.op.enabled_hypervisors is not None:
1248
      self.cluster.hvparams = self.new_hvparams
1249
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1250
    if self.op.beparams:
1251
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1252
    if self.op.nicparams:
1253
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1254
    if self.op.ipolicy:
1255
      self.cluster.ipolicy = self.new_ipolicy
1256
    if self.op.osparams:
1257
      self.cluster.osparams = self.new_osp
1258
    if self.op.ndparams:
1259
      self.cluster.ndparams = self.new_ndparams
1260
    if self.op.diskparams:
1261
      self.cluster.diskparams = self.new_diskparams
1262
    if self.op.hv_state:
1263
      self.cluster.hv_state_static = self.new_hv_state
1264
    if self.op.disk_state:
1265
      self.cluster.disk_state_static = self.new_disk_state
1266

    
1267
    if self.op.candidate_pool_size is not None:
1268
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1269
      # we need to update the pool size here, otherwise the save will fail
1270
      AdjustCandidatePool(self, [])
1271

    
1272
    if self.op.maintain_node_health is not None:
1273
      if self.op.maintain_node_health and not constants.ENABLE_CONFD:
1274
        feedback_fn("Note: CONFD was disabled at build time, node health"
1275
                    " maintenance is not useful (still enabling it)")
1276
      self.cluster.maintain_node_health = self.op.maintain_node_health
1277

    
1278
    if self.op.modify_etc_hosts is not None:
1279
      self.cluster.modify_etc_hosts = self.op.modify_etc_hosts
1280

    
1281
    if self.op.prealloc_wipe_disks is not None:
1282
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
1283

    
1284
    if self.op.add_uids is not None:
1285
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
1286

    
1287
    if self.op.remove_uids is not None:
1288
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
1289

    
1290
    if self.op.uid_pool is not None:
1291
      self.cluster.uid_pool = self.op.uid_pool
1292

    
1293
    if self.op.default_iallocator is not None:
1294
      self.cluster.default_iallocator = self.op.default_iallocator
1295

    
1296
    if self.op.default_iallocator_params is not None:
1297
      self.cluster.default_iallocator_params = self.op.default_iallocator_params
1298

    
1299
    if self.op.reserved_lvs is not None:
1300
      self.cluster.reserved_lvs = self.op.reserved_lvs
1301

    
1302
    if self.op.use_external_mip_script is not None:
1303
      self.cluster.use_external_mip_script = self.op.use_external_mip_script
1304

    
1305
    def helper_os(aname, mods, desc):
1306
      desc += " OS list"
1307
      lst = getattr(self.cluster, aname)
1308
      for key, val in mods:
1309
        if key == constants.DDM_ADD:
1310
          if val in lst:
1311
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1312
          else:
1313
            lst.append(val)
1314
        elif key == constants.DDM_REMOVE:
1315
          if val in lst:
1316
            lst.remove(val)
1317
          else:
1318
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1319
        else:
1320
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
1321

    
1322
    if self.op.hidden_os:
1323
      helper_os("hidden_os", self.op.hidden_os, "hidden")
1324

    
1325
    if self.op.blacklisted_os:
1326
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1327

    
1328
    if self.op.master_netdev:
1329
      master_params = self.cfg.GetMasterNetworkParameters()
1330
      ems = self.cfg.GetUseExternalMipScript()
1331
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
1332
                  self.cluster.master_netdev)
1333
      result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
1334
                                                       master_params, ems)
1335
      if not self.op.force:
1336
        result.Raise("Could not disable the master ip")
1337
      else:
1338
        if result.fail_msg:
1339
          msg = ("Could not disable the master ip (continuing anyway): %s" %
1340
                 result.fail_msg)
1341
          feedback_fn(msg)
1342
      feedback_fn("Changing master_netdev from %s to %s" %
1343
                  (master_params.netdev, self.op.master_netdev))
1344
      self.cluster.master_netdev = self.op.master_netdev
1345

    
1346
    if self.op.master_netmask:
1347
      master_params = self.cfg.GetMasterNetworkParameters()
1348
      feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1349
      result = self.rpc.call_node_change_master_netmask(
1350
                 master_params.uuid, master_params.netmask,
1351
                 self.op.master_netmask, master_params.ip,
1352
                 master_params.netdev)
1353
      result.Warn("Could not change the master IP netmask", feedback_fn)
1354
      self.cluster.master_netmask = self.op.master_netmask
1355

    
1356
    self.cfg.Update(self.cluster, feedback_fn)
1357

    
1358
    if self.op.master_netdev:
1359
      master_params = self.cfg.GetMasterNetworkParameters()
1360
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
1361
                  self.op.master_netdev)
1362
      ems = self.cfg.GetUseExternalMipScript()
1363
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
1364
                                                     master_params, ems)
1365
      result.Warn("Could not re-enable the master ip on the master,"
1366
                  " please restart manually", self.LogWarning)
1367

    
1368

    
1369
class LUClusterVerify(NoHooksLU):
1370
  """Submits all jobs necessary to verify the cluster.
1371

1372
  """
1373
  REQ_BGL = False
1374

    
1375
  def ExpandNames(self):
1376
    self.needed_locks = {}
1377

    
1378
  def Exec(self, feedback_fn):
1379
    jobs = []
1380

    
1381
    if self.op.group_name:
1382
      groups = [self.op.group_name]
1383
      depends_fn = lambda: None
1384
    else:
1385
      groups = self.cfg.GetNodeGroupList()
1386

    
1387
      # Verify global configuration
1388
      jobs.append([
1389
        opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1390
        ])
1391

    
1392
      # Always depend on global verification
1393
      depends_fn = lambda: [(-len(jobs), [])]
1394

    
1395
    jobs.extend(
1396
      [opcodes.OpClusterVerifyGroup(group_name=group,
1397
                                    ignore_errors=self.op.ignore_errors,
1398
                                    depends=depends_fn())]
1399
      for group in groups)
1400

    
1401
    # Fix up all parameters
1402
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1403
      op.debug_simulate_errors = self.op.debug_simulate_errors
1404
      op.verbose = self.op.verbose
1405
      op.error_codes = self.op.error_codes
1406
      try:
1407
        op.skip_checks = self.op.skip_checks
1408
      except AttributeError:
1409
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1410

    
1411
    return ResultWithJobs(jobs)
1412

    
1413

    
1414
class _VerifyErrors(object):
1415
  """Mix-in for cluster/group verify LUs.
1416

1417
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1418
  self.op and self._feedback_fn to be available.)
1419

1420
  """
1421

    
1422
  ETYPE_FIELD = "code"
1423
  ETYPE_ERROR = "ERROR"
1424
  ETYPE_WARNING = "WARNING"
1425

    
1426
  def _Error(self, ecode, item, msg, *args, **kwargs):
1427
    """Format an error message.
1428

1429
    Based on the opcode's error_codes parameter, either format a
1430
    parseable error code, or a simpler error string.
1431

1432
    This must be called only from Exec and functions called from Exec.
1433

1434
    """
1435
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1436
    itype, etxt, _ = ecode
1437
    # If the error code is in the list of ignored errors, demote the error to a
1438
    # warning
1439
    if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1440
      ltype = self.ETYPE_WARNING
1441
    # first complete the msg
1442
    if args:
1443
      msg = msg % args
1444
    # then format the whole message
1445
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1446
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1447
    else:
1448
      if item:
1449
        item = " " + item
1450
      else:
1451
        item = ""
1452
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1453
    # and finally report it via the feedback_fn
1454
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1455
    # do not mark the operation as failed for WARN cases only
1456
    if ltype == self.ETYPE_ERROR:
1457
      self.bad = True
1458

    
1459
  def _ErrorIf(self, cond, *args, **kwargs):
1460
    """Log an error message if the passed condition is True.
1461

1462
    """
1463
    if (bool(cond)
1464
        or self.op.debug_simulate_errors): # pylint: disable=E1101
1465
      self._Error(*args, **kwargs)
1466

    
1467

    
1468
def _VerifyCertificate(filename):
1469
  """Verifies a certificate for L{LUClusterVerifyConfig}.
1470

1471
  @type filename: string
1472
  @param filename: Path to PEM file
1473

1474
  """
1475
  try:
1476
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1477
                                           utils.ReadFile(filename))
1478
  except Exception, err: # pylint: disable=W0703
1479
    return (LUClusterVerifyConfig.ETYPE_ERROR,
1480
            "Failed to load X509 certificate %s: %s" % (filename, err))
1481

    
1482
  (errcode, msg) = \
1483
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1484
                                constants.SSL_CERT_EXPIRATION_ERROR)
1485

    
1486
  if msg:
1487
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1488
  else:
1489
    fnamemsg = None
1490

    
1491
  if errcode is None:
1492
    return (None, fnamemsg)
1493
  elif errcode == utils.CERT_WARNING:
1494
    return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1495
  elif errcode == utils.CERT_ERROR:
1496
    return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1497

    
1498
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1499

    
1500

    
1501
def _GetAllHypervisorParameters(cluster, instances):
1502
  """Compute the set of all hypervisor parameters.
1503

1504
  @type cluster: L{objects.Cluster}
1505
  @param cluster: the cluster object
1506
  @param instances: list of L{objects.Instance}
1507
  @param instances: additional instances from which to obtain parameters
1508
  @rtype: list of (origin, hypervisor, parameters)
1509
  @return: a list with all parameters found, indicating the hypervisor they
1510
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1511

1512
  """
1513
  hvp_data = []
1514

    
1515
  for hv_name in cluster.enabled_hypervisors:
1516
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1517

    
1518
  for os_name, os_hvp in cluster.os_hvp.items():
1519
    for hv_name, hv_params in os_hvp.items():
1520
      if hv_params:
1521
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1522
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1523

    
1524
  # TODO: collapse identical parameter values in a single one
1525
  for instance in instances:
1526
    if instance.hvparams:
1527
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1528
                       cluster.FillHV(instance)))
1529

    
1530
  return hvp_data
1531

    
1532

    
1533
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1534
  """Verifies the cluster config.
1535

1536
  """
1537
  REQ_BGL = False
1538

    
1539
  def _VerifyHVP(self, hvp_data):
1540
    """Verifies locally the syntax of the hypervisor parameters.
1541

1542
    """
1543
    for item, hv_name, hv_params in hvp_data:
1544
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1545
             (item, hv_name))
1546
      try:
1547
        hv_class = hypervisor.GetHypervisorClass(hv_name)
1548
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1549
        hv_class.CheckParameterSyntax(hv_params)
1550
      except errors.GenericError, err:
1551
        self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1552

    
1553
  def ExpandNames(self):
1554
    self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1555
    self.share_locks = ShareAll()
1556

    
1557
  def CheckPrereq(self):
1558
    """Check prerequisites.
1559

1560
    """
1561
    # Retrieve all information
1562
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1563
    self.all_node_info = self.cfg.GetAllNodesInfo()
1564
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1565

    
1566
  def Exec(self, feedback_fn):
1567
    """Verify integrity of cluster, performing various test on nodes.
1568

1569
    """
1570
    self.bad = False
1571
    self._feedback_fn = feedback_fn
1572

    
1573
    feedback_fn("* Verifying cluster config")
1574

    
1575
    for msg in self.cfg.VerifyConfig():
1576
      self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1577

    
1578
    feedback_fn("* Verifying cluster certificate files")
1579

    
1580
    for cert_filename in pathutils.ALL_CERT_FILES:
1581
      (errcode, msg) = _VerifyCertificate(cert_filename)
1582
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1583

    
1584
    self._ErrorIf(not utils.CanRead(constants.LUXID_USER,
1585
                                    pathutils.NODED_CERT_FILE),
1586
                  constants.CV_ECLUSTERCERT,
1587
                  None,
1588
                  pathutils.NODED_CERT_FILE + " must be accessible by the " +
1589
                    constants.LUXID_USER + " user")
1590

    
1591
    feedback_fn("* Verifying hypervisor parameters")
1592

    
1593
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1594
                                                self.all_inst_info.values()))
1595

    
1596
    feedback_fn("* Verifying all nodes belong to an existing group")
1597

    
1598
    # We do this verification here because, should this bogus circumstance
1599
    # occur, it would never be caught by VerifyGroup, which only acts on
1600
    # nodes/instances reachable from existing node groups.
1601

    
1602
    dangling_nodes = set(node for node in self.all_node_info.values()
1603
                         if node.group not in self.all_group_info)
1604

    
1605
    dangling_instances = {}
1606
    no_node_instances = []
1607

    
1608
    for inst in self.all_inst_info.values():
1609
      if inst.primary_node in [node.uuid for node in dangling_nodes]:
1610
        dangling_instances.setdefault(inst.primary_node, []).append(inst)
1611
      elif inst.primary_node not in self.all_node_info:
1612
        no_node_instances.append(inst)
1613

    
1614
    pretty_dangling = [
1615
        "%s (%s)" %
1616
        (node.name,
1617
         utils.CommaJoin(inst.name for
1618
                         inst in dangling_instances.get(node.uuid, [])))
1619
        for node in dangling_nodes]
1620

    
1621
    self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1622
                  None,
1623
                  "the following nodes (and their instances) belong to a non"
1624
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1625

    
1626
    self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1627
                  None,
1628
                  "the following instances have a non-existing primary-node:"
1629
                  " %s", utils.CommaJoin(inst.name for
1630
                                         inst in no_node_instances))
1631

    
1632
    return not self.bad
1633

    
1634

    
1635
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1636
  """Verifies the status of a node group.
1637

1638
  """
1639
  HPATH = "cluster-verify"
1640
  HTYPE = constants.HTYPE_CLUSTER
1641
  REQ_BGL = False
1642

    
1643
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1644

    
1645
  class NodeImage(object):
1646
    """A class representing the logical and physical status of a node.
1647

1648
    @type uuid: string
1649
    @ivar uuid: the node UUID to which this object refers
1650
    @ivar volumes: a structure as returned from
1651
        L{ganeti.backend.GetVolumeList} (runtime)
1652
    @ivar instances: a list of running instances (runtime)
1653
    @ivar pinst: list of configured primary instances (config)
1654
    @ivar sinst: list of configured secondary instances (config)
1655
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1656
        instances for which this node is secondary (config)
1657
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1658
    @ivar dfree: free disk, as reported by the node (runtime)
1659
    @ivar offline: the offline status (config)
1660
    @type rpc_fail: boolean
1661
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1662
        not whether the individual keys were correct) (runtime)
1663
    @type lvm_fail: boolean
1664
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1665
    @type hyp_fail: boolean
1666
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1667
    @type ghost: boolean
1668
    @ivar ghost: whether this is a known node or not (config)
1669
    @type os_fail: boolean
1670
    @ivar os_fail: whether the RPC call didn't return valid OS data
1671
    @type oslist: list
1672
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1673
    @type vm_capable: boolean
1674
    @ivar vm_capable: whether the node can host instances
1675
    @type pv_min: float
1676
    @ivar pv_min: size in MiB of the smallest PVs
1677
    @type pv_max: float
1678
    @ivar pv_max: size in MiB of the biggest PVs
1679

1680
    """
1681
    def __init__(self, offline=False, uuid=None, vm_capable=True):
1682
      self.uuid = uuid
1683
      self.volumes = {}
1684
      self.instances = []
1685
      self.pinst = []
1686
      self.sinst = []
1687
      self.sbp = {}
1688
      self.mfree = 0
1689
      self.dfree = 0
1690
      self.offline = offline
1691
      self.vm_capable = vm_capable
1692
      self.rpc_fail = False
1693
      self.lvm_fail = False
1694
      self.hyp_fail = False
1695
      self.ghost = False
1696
      self.os_fail = False
1697
      self.oslist = {}
1698
      self.pv_min = None
1699
      self.pv_max = None
1700

    
1701
  def ExpandNames(self):
1702
    # This raises errors.OpPrereqError on its own:
1703
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1704

    
1705
    # Get instances in node group; this is unsafe and needs verification later
1706
    inst_uuids = \
1707
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1708

    
1709
    self.needed_locks = {
1710
      locking.LEVEL_INSTANCE: self.cfg.GetInstanceNames(inst_uuids),
1711
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1712
      locking.LEVEL_NODE: [],
1713

    
1714
      # This opcode is run by watcher every five minutes and acquires all nodes
1715
      # for a group. It doesn't run for a long time, so it's better to acquire
1716
      # the node allocation lock as well.
1717
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1718
      }
1719

    
1720
    self.share_locks = ShareAll()
1721

    
1722
  def DeclareLocks(self, level):
1723
    if level == locking.LEVEL_NODE:
1724
      # Get members of node group; this is unsafe and needs verification later
1725
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1726

    
1727
      # In Exec(), we warn about mirrored instances that have primary and
1728
      # secondary living in separate node groups. To fully verify that
1729
      # volumes for these instances are healthy, we will need to do an
1730
      # extra call to their secondaries. We ensure here those nodes will
1731
      # be locked.
1732
      for inst_name in self.owned_locks(locking.LEVEL_INSTANCE):
1733
        # Important: access only the instances whose lock is owned
1734
        instance = self.cfg.GetInstanceInfoByName(inst_name)
1735
        if instance.disk_template in constants.DTS_INT_MIRROR:
1736
          nodes.update(instance.secondary_nodes)
1737

    
1738
      self.needed_locks[locking.LEVEL_NODE] = nodes
1739

    
1740
  def CheckPrereq(self):
1741
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1742
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1743

    
1744
    group_node_uuids = set(self.group_info.members)
1745
    group_inst_uuids = \
1746
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1747

    
1748
    unlocked_node_uuids = \
1749
        group_node_uuids.difference(self.owned_locks(locking.LEVEL_NODE))
1750

    
1751
    unlocked_inst_uuids = \
1752
        group_inst_uuids.difference(
1753
          [self.cfg.GetInstanceInfoByName(name).uuid
1754
           for name in self.owned_locks(locking.LEVEL_INSTANCE)])
1755

    
1756
    if unlocked_node_uuids:
1757
      raise errors.OpPrereqError(
1758
        "Missing lock for nodes: %s" %
1759
        utils.CommaJoin(self.cfg.GetNodeNames(unlocked_node_uuids)),
1760
        errors.ECODE_STATE)
1761

    
1762
    if unlocked_inst_uuids:
1763
      raise errors.OpPrereqError(
1764
        "Missing lock for instances: %s" %
1765
        utils.CommaJoin(self.cfg.GetInstanceNames(unlocked_inst_uuids)),
1766
        errors.ECODE_STATE)
1767

    
1768
    self.all_node_info = self.cfg.GetAllNodesInfo()
1769
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1770

    
1771
    self.my_node_uuids = group_node_uuids
1772
    self.my_node_info = dict((node_uuid, self.all_node_info[node_uuid])
1773
                             for node_uuid in group_node_uuids)
1774

    
1775
    self.my_inst_uuids = group_inst_uuids
1776
    self.my_inst_info = dict((inst_uuid, self.all_inst_info[inst_uuid])
1777
                             for inst_uuid in group_inst_uuids)
1778

    
1779
    # We detect here the nodes that will need the extra RPC calls for verifying
1780
    # split LV volumes; they should be locked.
1781
    extra_lv_nodes = set()
1782

    
1783
    for inst in self.my_inst_info.values():
1784
      if inst.disk_template in constants.DTS_INT_MIRROR:
1785
        for nuuid in inst.all_nodes:
1786
          if self.all_node_info[nuuid].group != self.group_uuid:
1787
            extra_lv_nodes.add(nuuid)
1788

    
1789
    unlocked_lv_nodes = \
1790
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1791

    
1792
    if unlocked_lv_nodes:
1793
      raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1794
                                 utils.CommaJoin(unlocked_lv_nodes),
1795
                                 errors.ECODE_STATE)
1796
    self.extra_lv_nodes = list(extra_lv_nodes)
1797

    
1798
  def _VerifyNode(self, ninfo, nresult):
1799
    """Perform some basic validation on data returned from a node.
1800

1801
      - check the result data structure is well formed and has all the
1802
        mandatory fields
1803
      - check ganeti version
1804

1805
    @type ninfo: L{objects.Node}
1806
    @param ninfo: the node to check
1807
    @param nresult: the results from the node
1808
    @rtype: boolean
1809
    @return: whether overall this call was successful (and we can expect
1810
         reasonable values in the respose)
1811

1812
    """
1813
    # main result, nresult should be a non-empty dict
1814
    test = not nresult or not isinstance(nresult, dict)
1815
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1816
                  "unable to verify node: no data returned")
1817
    if test:
1818
      return False
1819

    
1820
    # compares ganeti version
1821
    local_version = constants.PROTOCOL_VERSION
1822
    remote_version = nresult.get("version", None)
1823
    test = not (remote_version and
1824
                isinstance(remote_version, (list, tuple)) and
1825
                len(remote_version) == 2)
1826
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1827
                  "connection to node returned invalid data")
1828
    if test:
1829
      return False
1830

    
1831
    test = local_version != remote_version[0]
1832
    self._ErrorIf(test, constants.CV_ENODEVERSION, ninfo.name,
1833
                  "incompatible protocol versions: master %s,"
1834
                  " node %s", local_version, remote_version[0])
1835
    if test:
1836
      return False
1837

    
1838
    # node seems compatible, we can actually try to look into its results
1839

    
1840
    # full package version
1841
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1842
                  constants.CV_ENODEVERSION, ninfo.name,
1843
                  "software version mismatch: master %s, node %s",
1844
                  constants.RELEASE_VERSION, remote_version[1],
1845
                  code=self.ETYPE_WARNING)
1846

    
1847
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1848
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1849
      for hv_name, hv_result in hyp_result.iteritems():
1850
        test = hv_result is not None
1851
        self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
1852
                      "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1853

    
1854
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1855
    if ninfo.vm_capable and isinstance(hvp_result, list):
1856
      for item, hv_name, hv_result in hvp_result:
1857
        self._ErrorIf(True, constants.CV_ENODEHV, ninfo.name,
1858
                      "hypervisor %s parameter verify failure (source %s): %s",
1859
                      hv_name, item, hv_result)
1860

    
1861
    test = nresult.get(constants.NV_NODESETUP,
1862
                       ["Missing NODESETUP results"])
1863
    self._ErrorIf(test, constants.CV_ENODESETUP, ninfo.name,
1864
                  "node setup error: %s", "; ".join(test))
1865

    
1866
    return True
1867

    
1868
  def _VerifyNodeTime(self, ninfo, nresult,
1869
                      nvinfo_starttime, nvinfo_endtime):
1870
    """Check the node time.
1871

1872
    @type ninfo: L{objects.Node}
1873
    @param ninfo: the node to check
1874
    @param nresult: the remote results for the node
1875
    @param nvinfo_starttime: the start time of the RPC call
1876
    @param nvinfo_endtime: the end time of the RPC call
1877

1878
    """
1879
    ntime = nresult.get(constants.NV_TIME, None)
1880
    try:
1881
      ntime_merged = utils.MergeTime(ntime)
1882
    except (ValueError, TypeError):
1883
      self._ErrorIf(True, constants.CV_ENODETIME, ninfo.name,
1884
                    "Node returned invalid time")
1885
      return
1886

    
1887
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1888
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1889
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1890
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1891
    else:
1892
      ntime_diff = None
1893

    
1894
    self._ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, ninfo.name,
1895
                  "Node time diverges by at least %s from master node time",
1896
                  ntime_diff)
1897

    
1898
  def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1899
    """Check the node LVM results and update info for cross-node checks.
1900

1901
    @type ninfo: L{objects.Node}
1902
    @param ninfo: the node to check
1903
    @param nresult: the remote results for the node
1904
    @param vg_name: the configured VG name
1905
    @type nimg: L{NodeImage}
1906
    @param nimg: node image
1907

1908
    """
1909
    if vg_name is None:
1910
      return
1911

    
1912
    # checks vg existence and size > 20G
1913
    vglist = nresult.get(constants.NV_VGLIST, None)
1914
    test = not vglist
1915
    self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
1916
                  "unable to check volume groups")
1917
    if not test:
1918
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1919
                                            constants.MIN_VG_SIZE)
1920
      self._ErrorIf(vgstatus, constants.CV_ENODELVM, ninfo.name, vgstatus)
1921

    
1922
    # Check PVs
1923
    (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1924
    for em in errmsgs:
1925
      self._Error(constants.CV_ENODELVM, ninfo.name, em)
1926
    if pvminmax is not None:
1927
      (nimg.pv_min, nimg.pv_max) = pvminmax
1928

    
1929
  def _VerifyGroupDRBDVersion(self, node_verify_infos):
1930
    """Check cross-node DRBD version consistency.
1931

1932
    @type node_verify_infos: dict
1933
    @param node_verify_infos: infos about nodes as returned from the
1934
      node_verify call.
1935

1936
    """
1937
    node_versions = {}
1938
    for node_uuid, ndata in node_verify_infos.items():
1939
      nresult = ndata.payload
1940
      if nresult:
1941
        version = nresult.get(constants.NV_DRBDVERSION, "Missing DRBD version")
1942
        node_versions[node_uuid] = version
1943

    
1944
    if len(set(node_versions.values())) > 1:
1945
      for node_uuid, version in sorted(node_versions.items()):
1946
        msg = "DRBD version mismatch: %s" % version
1947
        self._Error(constants.CV_ENODEDRBDHELPER, node_uuid, msg,
1948
                    code=self.ETYPE_WARNING)
1949

    
1950
  def _VerifyGroupLVM(self, node_image, vg_name):
1951
    """Check cross-node consistency in LVM.
1952

1953
    @type node_image: dict
1954
    @param node_image: info about nodes, mapping from node to names to
1955
      L{NodeImage} objects
1956
    @param vg_name: the configured VG name
1957

1958
    """
1959
    if vg_name is None:
1960
      return
1961

    
1962
    # Only exclusive storage needs this kind of checks
1963
    if not self._exclusive_storage:
1964
      return
1965

    
1966
    # exclusive_storage wants all PVs to have the same size (approximately),
1967
    # if the smallest and the biggest ones are okay, everything is fine.
1968
    # pv_min is None iff pv_max is None
1969
    vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1970
    if not vals:
1971
      return
1972
    (pvmin, minnode_uuid) = min((ni.pv_min, ni.uuid) for ni in vals)
1973
    (pvmax, maxnode_uuid) = max((ni.pv_max, ni.uuid) for ni in vals)
1974
    bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1975
    self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1976
                  "PV sizes differ too much in the group; smallest (%s MB) is"
1977
                  " on %s, biggest (%s MB) is on %s",
1978
                  pvmin, self.cfg.GetNodeName(minnode_uuid),
1979
                  pvmax, self.cfg.GetNodeName(maxnode_uuid))
1980

    
1981
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1982
    """Check the node bridges.
1983

1984
    @type ninfo: L{objects.Node}
1985
    @param ninfo: the node to check
1986
    @param nresult: the remote results for the node
1987
    @param bridges: the expected list of bridges
1988

1989
    """
1990
    if not bridges:
1991
      return
1992

    
1993
    missing = nresult.get(constants.NV_BRIDGES, None)
1994
    test = not isinstance(missing, list)
1995
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1996
                  "did not return valid bridge information")
1997
    if not test:
1998
      self._ErrorIf(bool(missing), constants.CV_ENODENET, ninfo.name,
1999
                    "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
2000

    
2001
  def _VerifyNodeUserScripts(self, ninfo, nresult):
2002
    """Check the results of user scripts presence and executability on the node
2003

2004
    @type ninfo: L{objects.Node}
2005
    @param ninfo: the node to check
2006
    @param nresult: the remote results for the node
2007

2008
    """
2009
    test = not constants.NV_USERSCRIPTS in nresult
2010
    self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
2011
                  "did not return user scripts information")
2012

    
2013
    broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
2014
    if not test:
2015
      self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
2016
                    "user scripts not present or not executable: %s" %
2017
                    utils.CommaJoin(sorted(broken_scripts)))
2018

    
2019
  def _VerifyNodeNetwork(self, ninfo, nresult):
2020
    """Check the node network connectivity results.
2021

2022
    @type ninfo: L{objects.Node}
2023
    @param ninfo: the node to check
2024
    @param nresult: the remote results for the node
2025

2026
    """
2027
    test = constants.NV_NODELIST not in nresult
2028
    self._ErrorIf(test, constants.CV_ENODESSH, ninfo.name,
2029
                  "node hasn't returned node ssh connectivity data")
2030
    if not test:
2031
      if nresult[constants.NV_NODELIST]:
2032
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
2033
          self._ErrorIf(True, constants.CV_ENODESSH, ninfo.name,
2034
                        "ssh communication with node '%s': %s", a_node, a_msg)
2035

    
2036
    test = constants.NV_NODENETTEST not in nresult
2037
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
2038
                  "node hasn't returned node tcp connectivity data")
2039
    if not test:
2040
      if nresult[constants.NV_NODENETTEST]:
2041
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
2042
        for anode in nlist:
2043
          self._ErrorIf(True, constants.CV_ENODENET, ninfo.name,
2044
                        "tcp communication with node '%s': %s",
2045
                        anode, nresult[constants.NV_NODENETTEST][anode])
2046

    
2047
    test = constants.NV_MASTERIP not in nresult
2048
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
2049
                  "node hasn't returned node master IP reachability data")
2050
    if not test:
2051
      if not nresult[constants.NV_MASTERIP]:
2052
        if ninfo.uuid == self.master_node:
2053
          msg = "the master node cannot reach the master IP (not configured?)"
2054
        else:
2055
          msg = "cannot reach the master IP"
2056
        self._ErrorIf(True, constants.CV_ENODENET, ninfo.name, msg)
2057

    
2058
  def _VerifyInstance(self, instance, node_image, diskstatus):
2059
    """Verify an instance.
2060

2061
    This function checks to see if the required block devices are
2062
    available on the instance's node, and that the nodes are in the correct
2063
    state.
2064

2065
    """
2066
    pnode_uuid = instance.primary_node
2067
    pnode_img = node_image[pnode_uuid]
2068
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
2069

    
2070
    node_vol_should = {}
2071
    instance.MapLVsByNode(node_vol_should)
2072

    
2073
    cluster = self.cfg.GetClusterInfo()
2074
    ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2075
                                                            self.group_info)
2076
    err = ComputeIPolicyInstanceViolation(ipolicy, instance, self.cfg)
2077
    self._ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance.name,
2078
                  utils.CommaJoin(err), code=self.ETYPE_WARNING)
2079

    
2080
    for node_uuid in node_vol_should:
2081
      n_img = node_image[node_uuid]
2082
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
2083
        # ignore missing volumes on offline or broken nodes
2084
        continue
2085
      for volume in node_vol_should[node_uuid]:
2086
        test = volume not in n_img.volumes
2087
        self._ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance.name,
2088
                      "volume %s missing on node %s", volume,
2089
                      self.cfg.GetNodeName(node_uuid))
2090

    
2091
    if instance.admin_state == constants.ADMINST_UP:
2092
      test = instance.uuid not in pnode_img.instances and not pnode_img.offline
2093
      self._ErrorIf(test, constants.CV_EINSTANCEDOWN, instance.name,
2094
                    "instance not running on its primary node %s",
2095
                     self.cfg.GetNodeName(pnode_uuid))
2096
      self._ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE,
2097
                    instance.name, "instance is marked as running and lives on"
2098
                    " offline node %s", self.cfg.GetNodeName(pnode_uuid))
2099

    
2100
    diskdata = [(nname, success, status, idx)
2101
                for (nname, disks) in diskstatus.items()
2102
                for idx, (success, status) in enumerate(disks)]
2103

    
2104
    for nname, success, bdev_status, idx in diskdata:
2105
      # the 'ghost node' construction in Exec() ensures that we have a
2106
      # node here
2107
      snode = node_image[nname]
2108
      bad_snode = snode.ghost or snode.offline
2109
      self._ErrorIf(instance.disks_active and
2110
                    not success and not bad_snode,
2111
                    constants.CV_EINSTANCEFAULTYDISK, instance.name,
2112
                    "couldn't retrieve status for disk/%s on %s: %s",
2113
                    idx, self.cfg.GetNodeName(nname), bdev_status)
2114

    
2115
      if instance.disks_active and success and \
2116
         (bdev_status.is_degraded or
2117
          bdev_status.ldisk_status != constants.LDS_OKAY):
2118
        msg = "disk/%s on %s" % (idx, self.cfg.GetNodeName(nname))
2119
        if bdev_status.is_degraded:
2120
          msg += " is degraded"
2121
        if bdev_status.ldisk_status != constants.LDS_OKAY:
2122
          msg += "; state is '%s'" % \
2123
                 constants.LDS_NAMES[bdev_status.ldisk_status]
2124

    
2125
        self._Error(constants.CV_EINSTANCEFAULTYDISK, instance.name, msg)
2126

    
2127
    self._ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2128
                  constants.CV_ENODERPC, self.cfg.GetNodeName(pnode_uuid),
2129
                  "instance %s, connection to primary node failed",
2130
                  instance.name)
2131

    
2132
    self._ErrorIf(len(instance.secondary_nodes) > 1,
2133
                  constants.CV_EINSTANCELAYOUT, instance.name,
2134
                  "instance has multiple secondary nodes: %s",
2135
                  utils.CommaJoin(instance.secondary_nodes),
2136
                  code=self.ETYPE_WARNING)
2137

    
2138
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, instance.all_nodes)
2139
    if any(es_flags.values()):
2140
      if instance.disk_template not in constants.DTS_EXCL_STORAGE:
2141
        # Disk template not compatible with exclusive_storage: no instance
2142
        # node should have the flag set
2143
        es_nodes = [n
2144
                    for (n, es) in es_flags.items()
2145
                    if es]
2146
        self._Error(constants.CV_EINSTANCEUNSUITABLENODE, instance.name,
2147
                    "instance has template %s, which is not supported on nodes"
2148
                    " that have exclusive storage set: %s",
2149
                    instance.disk_template,
2150
                    utils.CommaJoin(self.cfg.GetNodeNames(es_nodes)))
2151
      for (idx, disk) in enumerate(instance.disks):
2152
        self._ErrorIf(disk.spindles is None,
2153
                      constants.CV_EINSTANCEMISSINGCFGPARAMETER, instance.name,
2154
                      "number of spindles not configured for disk %s while"
2155
                      " exclusive storage is enabled, try running"
2156
                      " gnt-cluster repair-disk-sizes", idx)
2157

    
2158
    if instance.disk_template in constants.DTS_INT_MIRROR:
2159
      instance_nodes = utils.NiceSort(instance.all_nodes)
2160
      instance_groups = {}
2161

    
2162
      for node_uuid in instance_nodes:
2163
        instance_groups.setdefault(self.all_node_info[node_uuid].group,
2164
                                   []).append(node_uuid)
2165

    
2166
      pretty_list = [
2167
        "%s (group %s)" % (utils.CommaJoin(self.cfg.GetNodeNames(nodes)),
2168
                           groupinfo[group].name)
2169
        # Sort so that we always list the primary node first.
2170
        for group, nodes in sorted(instance_groups.items(),
2171
                                   key=lambda (_, nodes): pnode_uuid in nodes,
2172
                                   reverse=True)]
2173

    
2174
      self._ErrorIf(len(instance_groups) > 1,
2175
                    constants.CV_EINSTANCESPLITGROUPS,
2176
                    instance.name, "instance has primary and secondary nodes in"
2177
                    " different groups: %s", utils.CommaJoin(pretty_list),
2178
                    code=self.ETYPE_WARNING)
2179

    
2180
    inst_nodes_offline = []
2181
    for snode in instance.secondary_nodes:
2182
      s_img = node_image[snode]
2183
      self._ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
2184
                    self.cfg.GetNodeName(snode),
2185
                    "instance %s, connection to secondary node failed",
2186
                    instance.name)
2187

    
2188
      if s_img.offline:
2189
        inst_nodes_offline.append(snode)
2190

    
2191
    # warn that the instance lives on offline nodes
2192
    self._ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE,
2193
                  instance.name, "instance has offline secondary node(s) %s",
2194
                  utils.CommaJoin(self.cfg.GetNodeNames(inst_nodes_offline)))
2195
    # ... or ghost/non-vm_capable nodes
2196
    for node_uuid in instance.all_nodes:
2197
      self._ErrorIf(node_image[node_uuid].ghost, constants.CV_EINSTANCEBADNODE,
2198
                    instance.name, "instance lives on ghost node %s",
2199
                    self.cfg.GetNodeName(node_uuid))
2200
      self._ErrorIf(not node_image[node_uuid].vm_capable,
2201
                    constants.CV_EINSTANCEBADNODE, instance.name,
2202
                    "instance lives on non-vm_capable node %s",
2203
                    self.cfg.GetNodeName(node_uuid))
2204

    
2205
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2206
    """Verify if there are any unknown volumes in the cluster.
2207

2208
    The .os, .swap and backup volumes are ignored. All other volumes are
2209
    reported as unknown.
2210

2211
    @type reserved: L{ganeti.utils.FieldSet}
2212
    @param reserved: a FieldSet of reserved volume names
2213

2214
    """
2215
    for node_uuid, n_img in node_image.items():
2216
      if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
2217
          self.all_node_info[node_uuid].group != self.group_uuid):
2218
        # skip non-healthy nodes
2219
        continue
2220
      for volume in n_img.volumes:
2221
        test = ((node_uuid not in node_vol_should or
2222
                volume not in node_vol_should[node_uuid]) and
2223
                not reserved.Matches(volume))
2224
        self._ErrorIf(test, constants.CV_ENODEORPHANLV,
2225
                      self.cfg.GetNodeName(node_uuid),
2226
                      "volume %s is unknown", volume,
2227
                      code=_VerifyErrors.ETYPE_WARNING)
2228

    
2229
  def _VerifyNPlusOneMemory(self, node_image, all_insts):
2230
    """Verify N+1 Memory Resilience.
2231

2232
    Check that if one single node dies we can still start all the
2233
    instances it was primary for.
2234

2235
    """
2236
    cluster_info = self.cfg.GetClusterInfo()
2237
    for node_uuid, n_img in node_image.items():
2238
      # This code checks that every node which is now listed as
2239
      # secondary has enough memory to host all instances it is
2240
      # supposed to should a single other node in the cluster fail.
2241
      # FIXME: not ready for failover to an arbitrary node
2242
      # FIXME: does not support file-backed instances
2243
      # WARNING: we currently take into account down instances as well
2244
      # as up ones, considering that even if they're down someone
2245
      # might want to start them even in the event of a node failure.
2246
      if n_img.offline or \
2247
         self.all_node_info[node_uuid].group != self.group_uuid:
2248
        # we're skipping nodes marked offline and nodes in other groups from
2249
        # the N+1 warning, since most likely we don't have good memory
2250
        # information from them; we already list instances living on such
2251
        # nodes, and that's enough warning
2252
        continue
2253
      #TODO(dynmem): also consider ballooning out other instances
2254
      for prinode, inst_uuids in n_img.sbp.items():
2255
        needed_mem = 0
2256
        for inst_uuid in inst_uuids:
2257
          bep = cluster_info.FillBE(all_insts[inst_uuid])
2258
          if bep[constants.BE_AUTO_BALANCE]:
2259
            needed_mem += bep[constants.BE_MINMEM]
2260
        test = n_img.mfree < needed_mem
2261
        self._ErrorIf(test, constants.CV_ENODEN1,
2262
                      self.cfg.GetNodeName(node_uuid),
2263
                      "not enough memory to accomodate instance failovers"
2264
                      " should node %s fail (%dMiB needed, %dMiB available)",
2265
                      self.cfg.GetNodeName(prinode), needed_mem, n_img.mfree)
2266

    
2267
  def _VerifyFiles(self, nodes, master_node_uuid, all_nvinfo,
2268
                   (files_all, files_opt, files_mc, files_vm)):
2269
    """Verifies file checksums collected from all nodes.
2270

2271
    @param nodes: List of L{objects.Node} objects
2272
    @param master_node_uuid: UUID of master node
2273
    @param all_nvinfo: RPC results
2274

2275
    """
2276
    # Define functions determining which nodes to consider for a file
2277
    files2nodefn = [
2278
      (files_all, None),
2279
      (files_mc, lambda node: (node.master_candidate or
2280
                               node.uuid == master_node_uuid)),
2281
      (files_vm, lambda node: node.vm_capable),
2282
      ]
2283

    
2284
    # Build mapping from filename to list of nodes which should have the file
2285
    nodefiles = {}
2286
    for (files, fn) in files2nodefn:
2287
      if fn is None:
2288
        filenodes = nodes
2289
      else:
2290
        filenodes = filter(fn, nodes)
2291
      nodefiles.update((filename,
2292
                        frozenset(map(operator.attrgetter("uuid"), filenodes)))
2293
                       for filename in files)
2294

    
2295
    assert set(nodefiles) == (files_all | files_mc | files_vm)
2296

    
2297
    fileinfo = dict((filename, {}) for filename in nodefiles)
2298
    ignore_nodes = set()
2299

    
2300
    for node in nodes:
2301
      if node.offline:
2302
        ignore_nodes.add(node.uuid)
2303
        continue
2304

    
2305
      nresult = all_nvinfo[node.uuid]
2306

    
2307
      if nresult.fail_msg or not nresult.payload:
2308
        node_files = None
2309
      else:
2310
        fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
2311
        node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2312
                          for (key, value) in fingerprints.items())
2313
        del fingerprints
2314

    
2315
      test = not (node_files and isinstance(node_files, dict))
2316
      self._ErrorIf(test, constants.CV_ENODEFILECHECK, node.name,
2317
                    "Node did not return file checksum data")
2318
      if test:
2319
        ignore_nodes.add(node.uuid)
2320
        continue
2321

    
2322
      # Build per-checksum mapping from filename to nodes having it
2323
      for (filename, checksum) in node_files.items():
2324
        assert filename in nodefiles
2325
        fileinfo[filename].setdefault(checksum, set()).add(node.uuid)
2326

    
2327
    for (filename, checksums) in fileinfo.items():
2328
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2329

    
2330
      # Nodes having the file
2331
      with_file = frozenset(node_uuid
2332
                            for node_uuids in fileinfo[filename].values()
2333
                            for node_uuid in node_uuids) - ignore_nodes
2334

    
2335
      expected_nodes = nodefiles[filename] - ignore_nodes
2336

    
2337
      # Nodes missing file
2338
      missing_file = expected_nodes - with_file
2339

    
2340
      if filename in files_opt:
2341
        # All or no nodes
2342
        self._ErrorIf(missing_file and missing_file != expected_nodes,
2343
                      constants.CV_ECLUSTERFILECHECK, None,
2344
                      "File %s is optional, but it must exist on all or no"
2345
                      " nodes (not found on %s)",
2346
                      filename,
2347
                      utils.CommaJoin(
2348
                        utils.NiceSort(
2349
                          map(self.cfg.GetNodeName, missing_file))))
2350
      else:
2351
        self._ErrorIf(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2352
                      "File %s is missing from node(s) %s", filename,
2353
                      utils.CommaJoin(
2354
                        utils.NiceSort(
2355
                          map(self.cfg.GetNodeName, missing_file))))
2356

    
2357
        # Warn if a node has a file it shouldn't
2358
        unexpected = with_file - expected_nodes
2359
        self._ErrorIf(unexpected,
2360
                      constants.CV_ECLUSTERFILECHECK, None,
2361
                      "File %s should not exist on node(s) %s",
2362
                      filename, utils.CommaJoin(
2363
                        utils.NiceSort(map(self.cfg.GetNodeName, unexpected))))
2364

    
2365
      # See if there are multiple versions of the file
2366
      test = len(checksums) > 1
2367
      if test:
2368
        variants = ["variant %s on %s" %
2369
                    (idx + 1,
2370
                     utils.CommaJoin(utils.NiceSort(
2371
                       map(self.cfg.GetNodeName, node_uuids))))
2372
                    for (idx, (checksum, node_uuids)) in
2373
                      enumerate(sorted(checksums.items()))]
2374
      else:
2375
        variants = []
2376

    
2377
      self._ErrorIf(test, constants.CV_ECLUSTERFILECHECK, None,
2378
                    "File %s found with %s different checksums (%s)",
2379
                    filename, len(checksums), "; ".join(variants))
2380

    
2381
  def _VerifyNodeDrbdHelper(self, ninfo, nresult, drbd_helper):
2382
    """Verify the drbd helper.
2383

2384
    """
2385
    if drbd_helper:
2386
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2387
      test = (helper_result is None)
2388
      self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2389
                    "no drbd usermode helper returned")
2390
      if helper_result:
2391
        status, payload = helper_result
2392
        test = not status
2393
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2394
                      "drbd usermode helper check unsuccessful: %s", payload)
2395
        test = status and (payload != drbd_helper)
2396
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2397
                      "wrong drbd usermode helper: %s", payload)
2398

    
2399
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2400
                      drbd_map):
2401
    """Verifies and the node DRBD status.
2402

2403
    @type ninfo: L{objects.Node}
2404
    @param ninfo: the node to check
2405
    @param nresult: the remote results for the node
2406
    @param instanceinfo: the dict of instances
2407
    @param drbd_helper: the configured DRBD usermode helper
2408
    @param drbd_map: the DRBD map as returned by
2409
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2410

2411
    """
2412
    self._VerifyNodeDrbdHelper(ninfo, nresult, drbd_helper)
2413

    
2414
    # compute the DRBD minors
2415
    node_drbd = {}
2416
    for minor, inst_uuid in drbd_map[ninfo.uuid].items():
2417
      test = inst_uuid not in instanceinfo
2418
      self._ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2419
                    "ghost instance '%s' in temporary DRBD map", inst_uuid)
2420
        # ghost instance should not be running, but otherwise we
2421
        # don't give double warnings (both ghost instance and
2422
        # unallocated minor in use)
2423
      if test:
2424
        node_drbd[minor] = (inst_uuid, False)
2425
      else:
2426
        instance = instanceinfo[inst_uuid]
2427
        node_drbd[minor] = (inst_uuid, instance.disks_active)
2428

    
2429
    # and now check them
2430
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2431
    test = not isinstance(used_minors, (tuple, list))
2432
    self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2433
                  "cannot parse drbd status file: %s", str(used_minors))
2434
    if test:
2435
      # we cannot check drbd status
2436
      return
2437

    
2438
    for minor, (inst_uuid, must_exist) in node_drbd.items():
2439
      test = minor not in used_minors and must_exist
2440
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2441
                    "drbd minor %d of instance %s is not active", minor,
2442
                    self.cfg.GetInstanceName(inst_uuid))
2443
    for minor in used_minors:
2444
      test = minor not in node_drbd
2445
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2446
                    "unallocated drbd minor %d is in use", minor)
2447

    
2448
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2449
    """Builds the node OS structures.
2450

2451
    @type ninfo: L{objects.Node}
2452
    @param ninfo: the node to check
2453
    @param nresult: the remote results for the node
2454
    @param nimg: the node image object
2455

2456
    """
2457
    remote_os = nresult.get(constants.NV_OSLIST, None)
2458
    test = (not isinstance(remote_os, list) or
2459
            not compat.all(isinstance(v, list) and len(v) == 7
2460
                           for v in remote_os))
2461

    
2462
    self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2463
                  "node hasn't returned valid OS data")
2464

    
2465
    nimg.os_fail = test
2466

    
2467
    if test:
2468
      return
2469

    
2470
    os_dict = {}
2471

    
2472
    for (name, os_path, status, diagnose,
2473
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2474

    
2475
      if name not in os_dict:
2476
        os_dict[name] = []
2477

    
2478
      # parameters is a list of lists instead of list of tuples due to
2479
      # JSON lacking a real tuple type, fix it:
2480
      parameters = [tuple(v) for v in parameters]
2481
      os_dict[name].append((os_path, status, diagnose,
2482
                            set(variants), set(parameters), set(api_ver)))
2483

    
2484
    nimg.oslist = os_dict
2485

    
2486
  def _VerifyNodeOS(self, ninfo, nimg, base):
2487
    """Verifies the node OS list.
2488

2489
    @type ninfo: L{objects.Node}
2490
    @param ninfo: the node to check
2491
    @param nimg: the node image object
2492
    @param base: the 'template' node we match against (e.g. from the master)
2493

2494
    """
2495
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2496

    
2497
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2498
    for os_name, os_data in nimg.oslist.items():
2499
      assert os_data, "Empty OS status for OS %s?!" % os_name
2500
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2501
      self._ErrorIf(not f_status, constants.CV_ENODEOS, ninfo.name,
2502
                    "Invalid OS %s (located at %s): %s",
2503
                    os_name, f_path, f_diag)
2504
      self._ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, ninfo.name,
2505
                    "OS '%s' has multiple entries"
2506
                    " (first one shadows the rest): %s",
2507
                    os_name, utils.CommaJoin([v[0] for v in os_data]))
2508
      # comparisons with the 'base' image
2509
      test = os_name not in base.oslist
2510
      self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2511
                    "Extra OS %s not present on reference node (%s)",
2512
                    os_name, self.cfg.GetNodeName(base.uuid))
2513
      if test:
2514
        continue
2515
      assert base.oslist[os_name], "Base node has empty OS status?"
2516
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2517
      if not b_status:
2518
        # base OS is invalid, skipping
2519
        continue
2520
      for kind, a, b in [("API version", f_api, b_api),
2521
                         ("variants list", f_var, b_var),
2522
                         ("parameters", beautify_params(f_param),
2523
                          beautify_params(b_param))]:
2524
        self._ErrorIf(a != b, constants.CV_ENODEOS, ninfo.name,
2525
                      "OS %s for %s differs from reference node %s:"
2526
                      " [%s] vs. [%s]", kind, os_name,
2527
                      self.cfg.GetNodeName(base.uuid),
2528
                      utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2529

    
2530
    # check any missing OSes
2531
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2532
    self._ErrorIf(missing, constants.CV_ENODEOS, ninfo.name,
2533
                  "OSes present on reference node %s"
2534
                  " but missing on this node: %s",
2535
                  self.cfg.GetNodeName(base.uuid), utils.CommaJoin(missing))
2536

    
2537
  def _VerifyAcceptedFileStoragePaths(self, ninfo, nresult, is_master):
2538
    """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2539

2540
    @type ninfo: L{objects.Node}
2541
    @param ninfo: the node to check
2542
    @param nresult: the remote results for the node
2543
    @type is_master: bool
2544
    @param is_master: Whether node is the master node
2545

2546
    """
2547
    cluster = self.cfg.GetClusterInfo()
2548
    if (is_master and
2549
        (cluster.IsFileStorageEnabled() or
2550
         cluster.IsSharedFileStorageEnabled())):
2551
      try:
2552
        fspaths = nresult[constants.NV_ACCEPTED_STORAGE_PATHS]
2553
      except KeyError:
2554
        # This should never happen
2555
        self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2556
                      "Node did not return forbidden file storage paths")
2557
      else:
2558
        self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2559
                      "Found forbidden file storage paths: %s",
2560
                      utils.CommaJoin(fspaths))
2561
    else:
2562
      self._ErrorIf(constants.NV_ACCEPTED_STORAGE_PATHS in nresult,
2563
                    constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2564
                    "Node should not have returned forbidden file storage"
2565
                    " paths")
2566

    
2567
  def _VerifyStoragePaths(self, ninfo, nresult, file_disk_template,
2568
                          verify_key, error_key):
2569
    """Verifies (file) storage paths.
2570

2571
    @type ninfo: L{objects.Node}
2572
    @param ninfo: the node to check
2573
    @param nresult: the remote results for the node
2574
    @type file_disk_template: string
2575
    @param file_disk_template: file-based disk template, whose directory
2576
        is supposed to be verified
2577
    @type verify_key: string
2578
    @param verify_key: key for the verification map of this file
2579
        verification step
2580
    @param error_key: error key to be added to the verification results
2581
        in case something goes wrong in this verification step
2582

2583
    """
2584
    assert (file_disk_template in utils.storage.GetDiskTemplatesOfStorageTypes(
2585
              constants.ST_FILE, constants.ST_SHARED_FILE
2586
           ))
2587

    
2588
    cluster = self.cfg.GetClusterInfo()
2589
    if cluster.IsDiskTemplateEnabled(file_disk_template):
2590
      self._ErrorIf(
2591
          verify_key in nresult,
2592
          error_key, ninfo.name,
2593
          "The configured %s storage path is unusable: %s" %
2594
          (file_disk_template, nresult.get(verify_key)))
2595

    
2596
  def _VerifyFileStoragePaths(self, ninfo, nresult):
2597
    """Verifies (file) storage paths.
2598

2599
    @see: C{_VerifyStoragePaths}
2600

2601
    """
2602
    self._VerifyStoragePaths(
2603
        ninfo, nresult, constants.DT_FILE,
2604
        constants.NV_FILE_STORAGE_PATH,
2605
        constants.CV_ENODEFILESTORAGEPATHUNUSABLE)
2606

    
2607
  def _VerifySharedFileStoragePaths(self, ninfo, nresult):
2608
    """Verifies (file) storage paths.
2609

2610
    @see: C{_VerifyStoragePaths}
2611

2612
    """
2613
    self._VerifyStoragePaths(
2614
        ninfo, nresult, constants.DT_SHARED_FILE,
2615
        constants.NV_SHARED_FILE_STORAGE_PATH,
2616
        constants.CV_ENODESHAREDFILESTORAGEPATHUNUSABLE)
2617

    
2618
  def _VerifyOob(self, ninfo, nresult):
2619
    """Verifies out of band functionality of a node.
2620

2621
    @type ninfo: L{objects.Node}
2622
    @param ninfo: the node to check
2623
    @param nresult: the remote results for the node
2624

2625
    """
2626
    # We just have to verify the paths on master and/or master candidates
2627
    # as the oob helper is invoked on the master
2628
    if ((ninfo.master_candidate or ninfo.master_capable) and
2629
        constants.NV_OOB_PATHS in nresult):
2630
      for path_result in nresult[constants.NV_OOB_PATHS]:
2631
        self._ErrorIf(path_result, constants.CV_ENODEOOBPATH,
2632
                      ninfo.name, path_result)
2633

    
2634
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2635
    """Verifies and updates the node volume data.
2636

2637
    This function will update a L{NodeImage}'s internal structures
2638
    with data from the remote call.
2639

2640
    @type ninfo: L{objects.Node}
2641
    @param ninfo: the node to check
2642
    @param nresult: the remote results for the node
2643
    @param nimg: the node image object
2644
    @param vg_name: the configured VG name
2645

2646
    """
2647
    nimg.lvm_fail = True
2648
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2649
    if vg_name is None:
2650
      pass
2651
    elif isinstance(lvdata, basestring):
2652
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2653
                    "LVM problem on node: %s", utils.SafeEncode(lvdata))
2654
    elif not isinstance(lvdata, dict):
2655
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2656
                    "rpc call to node failed (lvlist)")
2657
    else:
2658
      nimg.volumes = lvdata
2659
      nimg.lvm_fail = False
2660

    
2661
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2662
    """Verifies and updates the node instance list.
2663

2664
    If the listing was successful, then updates this node's instance
2665
    list. Otherwise, it marks the RPC call as failed for the instance
2666
    list key.
2667

2668
    @type ninfo: L{objects.Node}
2669
    @param ninfo: the node to check
2670
    @param nresult: the remote results for the node
2671
    @param nimg: the node image object
2672

2673
    """
2674
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2675
    test = not isinstance(idata, list)
2676
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2677
                  "rpc call to node failed (instancelist): %s",
2678
                  utils.SafeEncode(str(idata)))
2679
    if test:
2680
      nimg.hyp_fail = True
2681
    else:
2682
      nimg.instances = [inst.uuid for (_, inst) in
2683
                        self.cfg.GetMultiInstanceInfoByName(idata)]
2684

    
2685
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2686
    """Verifies and computes a node information map
2687

2688
    @type ninfo: L{objects.Node}
2689
    @param ninfo: the node to check
2690
    @param nresult: the remote results for the node
2691
    @param nimg: the node image object
2692
    @param vg_name: the configured VG name
2693

2694
    """
2695
    # try to read free memory (from the hypervisor)
2696
    hv_info = nresult.get(constants.NV_HVINFO, None)
2697
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2698
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2699
                  "rpc call to node failed (hvinfo)")
2700
    if not test:
2701
      try:
2702
        nimg.mfree = int(hv_info["memory_free"])
2703
      except (ValueError, TypeError):
2704
        self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2705
                      "node returned invalid nodeinfo, check hypervisor")
2706

    
2707
    # FIXME: devise a free space model for file based instances as well
2708
    if vg_name is not None:
2709
      test = (constants.NV_VGLIST not in nresult or
2710
              vg_name not in nresult[constants.NV_VGLIST])
2711
      self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
2712
                    "node didn't return data for the volume group '%s'"
2713
                    " - it is either missing or broken", vg_name)
2714
      if not test:
2715
        try:
2716
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2717
        except (ValueError, TypeError):
2718
          self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2719
                        "node returned invalid LVM info, check LVM status")
2720

    
2721
  def _CollectDiskInfo(self, node_uuids, node_image, instanceinfo):
2722
    """Gets per-disk status information for all instances.
2723

2724
    @type node_uuids: list of strings
2725
    @param node_uuids: Node UUIDs
2726
    @type node_image: dict of (UUID, L{objects.Node})
2727
    @param node_image: Node objects
2728
    @type instanceinfo: dict of (UUID, L{objects.Instance})
2729
    @param instanceinfo: Instance objects
2730
    @rtype: {instance: {node: [(succes, payload)]}}
2731
    @return: a dictionary of per-instance dictionaries with nodes as
2732
        keys and disk information as values; the disk information is a
2733
        list of tuples (success, payload)
2734

2735
    """
2736
    node_disks = {}
2737
    node_disks_dev_inst_only = {}
2738
    diskless_instances = set()
2739
    diskless = constants.DT_DISKLESS
2740

    
2741
    for nuuid in node_uuids:
2742
      node_inst_uuids = list(itertools.chain(node_image[nuuid].pinst,
2743
                                             node_image[nuuid].sinst))
2744
      diskless_instances.update(uuid for uuid in node_inst_uuids
2745
                                if instanceinfo[uuid].disk_template == diskless)
2746
      disks = [(inst_uuid, disk)
2747
               for inst_uuid in node_inst_uuids
2748
               for disk in instanceinfo[inst_uuid].disks]
2749

    
2750
      if not disks:
2751
        # No need to collect data
2752
        continue
2753

    
2754
      node_disks[nuuid] = disks
2755

    
2756
      # _AnnotateDiskParams makes already copies of the disks
2757
      dev_inst_only = []
2758
      for (inst_uuid, dev) in disks:
2759
        (anno_disk,) = AnnotateDiskParams(instanceinfo[inst_uuid], [dev],
2760
                                          self.cfg)
2761
        dev_inst_only.append((anno_disk, instanceinfo[inst_uuid]))
2762

    
2763
      node_disks_dev_inst_only[nuuid] = dev_inst_only
2764

    
2765
    assert len(node_disks) == len(node_disks_dev_inst_only)
2766

    
2767
    # Collect data from all nodes with disks
2768
    result = self.rpc.call_blockdev_getmirrorstatus_multi(
2769
               node_disks.keys(), node_disks_dev_inst_only)
2770

    
2771
    assert len(result) == len(node_disks)
2772

    
2773
    instdisk = {}
2774

    
2775
    for (nuuid, nres) in result.items():
2776
      node = self.cfg.GetNodeInfo(nuuid)
2777
      disks = node_disks[node.uuid]
2778

    
2779
      if nres.offline:
2780
        # No data from this node
2781
        data = len(disks) * [(False, "node offline")]
2782
      else:
2783
        msg = nres.fail_msg
2784
        self._ErrorIf(msg, constants.CV_ENODERPC, node.name,
2785
                      "while getting disk information: %s", msg)
2786
        if msg:
2787
          # No data from this node
2788
          data = len(disks) * [(False, msg)]
2789
        else:
2790
          data = []
2791
          for idx, i in enumerate(nres.payload):
2792
            if isinstance(i, (tuple, list)) and len(i) == 2:
2793
              data.append(i)
2794
            else:
2795
              logging.warning("Invalid result from node %s, entry %d: %s",
2796
                              node.name, idx, i)
2797
              data.append((False, "Invalid result from the remote node"))
2798

    
2799
      for ((inst_uuid, _), status) in zip(disks, data):
2800
        instdisk.setdefault(inst_uuid, {}).setdefault(node.uuid, []) \
2801
          .append(status)
2802

    
2803
    # Add empty entries for diskless instances.
2804
    for inst_uuid in diskless_instances:
2805
      assert inst_uuid not in instdisk
2806
      instdisk[inst_uuid] = {}
2807

    
2808
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2809
                      len(nuuids) <= len(instanceinfo[inst].all_nodes) and
2810
                      compat.all(isinstance(s, (tuple, list)) and
2811
                                 len(s) == 2 for s in statuses)
2812
                      for inst, nuuids in instdisk.items()
2813
                      for nuuid, statuses in nuuids.items())
2814
    if __debug__:
2815
      instdisk_keys = set(instdisk)
2816
      instanceinfo_keys = set(instanceinfo)
2817
      assert instdisk_keys == instanceinfo_keys, \
2818
        ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2819
         (instdisk_keys, instanceinfo_keys))
2820

    
2821
    return instdisk
2822

    
2823
  @staticmethod
2824
  def _SshNodeSelector(group_uuid, all_nodes):
2825
    """Create endless iterators for all potential SSH check hosts.
2826

2827
    """
2828
    nodes = [node for node in all_nodes
2829
             if (node.group != group_uuid and
2830
                 not node.offline)]
2831
    keyfunc = operator.attrgetter("group")
2832

    
2833
    return map(itertools.cycle,
2834
               [sorted(map(operator.attrgetter("name"), names))
2835
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2836
                                                  keyfunc)])
2837

    
2838
  @classmethod
2839
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2840
    """Choose which nodes should talk to which other nodes.
2841

2842
    We will make nodes contact all nodes in their group, and one node from
2843
    every other group.
2844

2845
    @warning: This algorithm has a known issue if one node group is much
2846
      smaller than others (e.g. just one node). In such a case all other
2847
      nodes will talk to the single node.
2848

2849
    """
2850
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2851
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
2852

    
2853
    return (online_nodes,
2854
            dict((name, sorted([i.next() for i in sel]))
2855
                 for name in online_nodes))
2856

    
2857
  def BuildHooksEnv(self):
2858
    """Build hooks env.
2859

2860
    Cluster-Verify hooks just ran in the post phase and their failure makes
2861
    the output be logged in the verify output and the verification to fail.
2862

2863
    """
2864
    env = {
2865
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2866
      }
2867

    
2868
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2869
               for node in self.my_node_info.values())
2870

    
2871
    return env
2872

    
2873
  def BuildHooksNodes(self):
2874
    """Build hooks nodes.
2875

2876
    """
2877
    return ([], list(self.my_node_info.keys()))
2878

    
2879
  def Exec(self, feedback_fn):
2880
    """Verify integrity of the node group, performing various test on nodes.
2881

2882
    """
2883
    # This method has too many local variables. pylint: disable=R0914
2884
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
2885

    
2886
    if not self.my_node_uuids:
2887
      # empty node group
2888
      feedback_fn("* Empty node group, skipping verification")
2889
      return True
2890

    
2891
    self.bad = False
2892
    verbose = self.op.verbose
2893
    self._feedback_fn = feedback_fn
2894

    
2895
    vg_name = self.cfg.GetVGName()
2896
    drbd_helper = self.cfg.GetDRBDHelper()
2897
    cluster = self.cfg.GetClusterInfo()
2898
    hypervisors = cluster.enabled_hypervisors
2899
    node_data_list = self.my_node_info.values()
2900

    
2901
    i_non_redundant = [] # Non redundant instances
2902
    i_non_a_balanced = [] # Non auto-balanced instances
2903
    i_offline = 0 # Count of offline instances
2904
    n_offline = 0 # Count of offline nodes
2905
    n_drained = 0 # Count of nodes being drained
2906
    node_vol_should = {}
2907

    
2908
    # FIXME: verify OS list
2909

    
2910
    # File verification
2911
    filemap = ComputeAncillaryFiles(cluster, False)
2912

    
2913
    # do local checksums
2914
    master_node_uuid = self.master_node = self.cfg.GetMasterNode()
2915
    master_ip = self.cfg.GetMasterIP()
2916

    
2917
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_uuids))
2918

    
2919
    user_scripts = []
2920
    if self.cfg.GetUseExternalMipScript():
2921
      user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2922

    
2923
    node_verify_param = {
2924
      constants.NV_FILELIST:
2925
        map(vcluster.MakeVirtualPath,
2926
            utils.UniqueSequence(filename
2927
                                 for files in filemap
2928
                                 for filename in files)),
2929
      constants.NV_NODELIST:
2930
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2931
                                  self.all_node_info.values()),
2932
      constants.NV_HYPERVISOR: hypervisors,
2933
      constants.NV_HVPARAMS:
2934
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2935
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2936
                                 for node in node_data_list
2937
                                 if not node.offline],
2938
      constants.NV_INSTANCELIST: hypervisors,
2939
      constants.NV_VERSION: None,
2940
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2941
      constants.NV_NODESETUP: None,
2942
      constants.NV_TIME: None,
2943
      constants.NV_MASTERIP: (self.cfg.GetMasterNodeName(), master_ip),
2944
      constants.NV_OSLIST: None,
2945
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2946
      constants.NV_USERSCRIPTS: user_scripts,
2947
      }
2948

    
2949
    if vg_name is not None:
2950
      node_verify_param[constants.NV_VGLIST] = None
2951
      node_verify_param[constants.NV_LVLIST] = vg_name
2952
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2953

    
2954
    if cluster.IsDiskTemplateEnabled(constants.DT_DRBD8):
2955
      if drbd_helper:
2956
        node_verify_param[constants.NV_DRBDVERSION] = None
2957
        node_verify_param[constants.NV_DRBDLIST] = None
2958
        node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2959

    
2960
    if cluster.IsFileStorageEnabled() or \
2961
        cluster.IsSharedFileStorageEnabled():
2962
      # Load file storage paths only from master node
2963
      node_verify_param[constants.NV_ACCEPTED_STORAGE_PATHS] = \
2964
        self.cfg.GetMasterNodeName()
2965
      if cluster.IsFileStorageEnabled():
2966
        node_verify_param[constants.NV_FILE_STORAGE_PATH] = \
2967
          cluster.file_storage_dir
2968

    
2969
    # bridge checks
2970
    # FIXME: this needs to be changed per node-group, not cluster-wide
2971
    bridges = set()
2972
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2973
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2974
      bridges.add(default_nicpp[constants.NIC_LINK])
2975
    for inst_uuid in self.my_inst_info.values():
2976
      for nic in inst_uuid.nics:
2977
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
2978
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2979
          bridges.add(full_nic[constants.NIC_LINK])
2980

    
2981
    if bridges:
2982
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
2983

    
2984
    # Build our expected cluster state
2985
    node_image = dict((node.uuid, self.NodeImage(offline=node.offline,
2986
                                                 uuid=node.uuid,
2987
                                                 vm_capable=node.vm_capable))
2988
                      for node in node_data_list)
2989

    
2990
    # Gather OOB paths
2991
    oob_paths = []
2992
    for node in self.all_node_info.values():
2993
      path = SupportsOob(self.cfg, node)
2994
      if path and path not in oob_paths:
2995
        oob_paths.append(path)
2996

    
2997
    if oob_paths:
2998
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2999

    
3000
    for inst_uuid in self.my_inst_uuids:
3001
      instance = self.my_inst_info[inst_uuid]
3002
      if instance.admin_state == constants.ADMINST_OFFLINE:
3003
        i_offline += 1
3004

    
3005
      for nuuid in instance.all_nodes:
3006
        if nuuid not in node_image:
3007
          gnode = self.NodeImage(uuid=nuuid)
3008
          gnode.ghost = (nuuid not in self.all_node_info)
3009
          node_image[nuuid] = gnode
3010

    
3011
      instance.MapLVsByNode(node_vol_should)
3012

    
3013
      pnode = instance.primary_node
3014
      node_image[pnode].pinst.append(instance.uuid)
3015

    
3016
      for snode in instance.secondary_nodes:
3017
        nimg = node_image[snode]
3018
        nimg.sinst.append(instance.uuid)
3019
        if pnode not in nimg.sbp:
3020
          nimg.sbp[pnode] = []
3021
        nimg.sbp[pnode].append(instance.uuid)
3022

    
3023
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
3024
                                               self.my_node_info.keys())
3025
    # The value of exclusive_storage should be the same across the group, so if
3026
    # it's True for at least a node, we act as if it were set for all the nodes
3027
    self._exclusive_storage = compat.any(es_flags.values())
3028
    if self._exclusive_storage:
3029
      node_verify_param[constants.NV_EXCLUSIVEPVS] = True
3030

    
3031
    node_group_uuids = dict(map(lambda n: (n.name, n.group),
3032
                                self.cfg.GetAllNodesInfo().values()))
3033
    groups_config = self.cfg.GetAllNodeGroupsInfoDict()
3034

    
3035
    # At this point, we have the in-memory data structures complete,
3036
    # except for the runtime information, which we'll gather next
3037

    
3038
    # Due to the way our RPC system works, exact response times cannot be
3039
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
3040
    # time before and after executing the request, we can at least have a time
3041
    # window.
3042
    nvinfo_starttime = time.time()
3043
    all_nvinfo = self.rpc.call_node_verify(self.my_node_uuids,
3044
                                           node_verify_param,
3045
                                           self.cfg.GetClusterName(),
3046
                                           self.cfg.GetClusterInfo().hvparams,
3047
                                           node_group_uuids,
3048
                                           groups_config)
3049
    nvinfo_endtime = time.time()
3050

    
3051
    if self.extra_lv_nodes and vg_name is not None:
3052
      extra_lv_nvinfo = \
3053
          self.rpc.call_node_verify(self.extra_lv_nodes,
3054
                                    {constants.NV_LVLIST: vg_name},
3055
                                    self.cfg.GetClusterName(),
3056
                                    self.cfg.GetClusterInfo().hvparams,
3057
                                    node_group_uuids,
3058
                                    groups_config)
3059
    else:
3060
      extra_lv_nvinfo = {}
3061

    
3062
    all_drbd_map = self.cfg.ComputeDRBDMap()
3063

    
3064
    feedback_fn("* Gathering disk information (%s nodes)" %
3065
                len(self.my_node_uuids))
3066
    instdisk = self._CollectDiskInfo(self.my_node_info.keys(), node_image,
3067
                                     self.my_inst_info)
3068

    
3069
    feedback_fn("* Verifying configuration file consistency")
3070

    
3071
    # If not all nodes are being checked, we need to make sure the master node
3072
    # and a non-checked vm_capable node are in the list.
3073
    absent_node_uuids = set(self.all_node_info).difference(self.my_node_info)
3074
    if absent_node_uuids:
3075
      vf_nvinfo = all_nvinfo.copy()
3076
      vf_node_info = list(self.my_node_info.values())
3077
      additional_node_uuids = []
3078
      if master_node_uuid not in self.my_node_info:
3079
        additional_node_uuids.append(master_node_uuid)
3080
        vf_node_info.append(self.all_node_info[master_node_uuid])
3081
      # Add the first vm_capable node we find which is not included,
3082
      # excluding the master node (which we already have)
3083
      for node_uuid in absent_node_uuids:
3084
        nodeinfo = self.all_node_info[node_uuid]
3085
        if (nodeinfo.vm_capable and not nodeinfo.offline and
3086
            node_uuid != master_node_uuid):
3087
          additional_node_uuids.append(node_uuid)
3088
          vf_node_info.append(self.all_node_info[node_uuid])
3089
          break
3090
      key = constants.NV_FILELIST
3091
      vf_nvinfo.update(self.rpc.call_node_verify(
3092
         additional_node_uuids, {key: node_verify_param[key]},
3093
         self.cfg.GetClusterName(), self.cfg.GetClusterInfo().hvparams,
3094
         node_group_uuids,
3095
         groups_config))
3096
    else:
3097
      vf_nvinfo = all_nvinfo
3098
      vf_node_info = self.my_node_info.values()
3099

    
3100
    self._VerifyFiles(vf_node_info, master_node_uuid, vf_nvinfo, filemap)
3101

    
3102
    feedback_fn("* Verifying node status")
3103

    
3104
    refos_img = None
3105

    
3106
    for node_i in node_data_list:
3107
      nimg = node_image[node_i.uuid]
3108

    
3109
      if node_i.offline:
3110
        if verbose:
3111
          feedback_fn("* Skipping offline node %s" % (node_i.name,))
3112
        n_offline += 1
3113
        continue
3114

    
3115
      if node_i.uuid == master_node_uuid:
3116
        ntype = "master"
3117
      elif node_i.master_candidate:
3118
        ntype = "master candidate"
3119
      elif node_i.drained:
3120
        ntype = "drained"
3121
        n_drained += 1
3122
      else:
3123
        ntype = "regular"
3124
      if verbose:
3125
        feedback_fn("* Verifying node %s (%s)" % (node_i.name, ntype))
3126

    
3127
      msg = all_nvinfo[node_i.uuid].fail_msg
3128
      self._ErrorIf(msg, constants.CV_ENODERPC, node_i.name,
3129
                    "while contacting node: %s", msg)
3130
      if msg:
3131
        nimg.rpc_fail = True
3132
        continue
3133

    
3134
      nresult = all_nvinfo[node_i.uuid].payload
3135

    
3136
      nimg.call_ok = self._VerifyNode(node_i, nresult)
3137
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
3138
      self._VerifyNodeNetwork(node_i, nresult)
3139
      self._VerifyNodeUserScripts(node_i, nresult)
3140
      self._VerifyOob(node_i, nresult)
3141
      self._VerifyAcceptedFileStoragePaths(node_i, nresult,
3142
                                           node_i.uuid == master_node_uuid)
3143
      self._VerifyFileStoragePaths(node_i, nresult)
3144
      self._VerifySharedFileStoragePaths(node_i, nresult)
3145

    
3146
      if nimg.vm_capable:
3147
        self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
3148
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
3149
                             all_drbd_map)
3150

    
3151
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
3152
        self._UpdateNodeInstances(node_i, nresult, nimg)
3153
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
3154
        self._UpdateNodeOS(node_i, nresult, nimg)
3155

    
3156
        if not nimg.os_fail:
3157
          if refos_img is None:
3158
            refos_img = nimg
3159
          self._VerifyNodeOS(node_i, nimg, refos_img)
3160
        self._VerifyNodeBridges(node_i, nresult, bridges)
3161

    
3162
        # Check whether all running instances are primary for the node. (This
3163
        # can no longer be done from _VerifyInstance below, since some of the
3164
        # wrong instances could be from other node groups.)
3165
        non_primary_inst_uuids = set(nimg.instances).difference(nimg.pinst)
3166

    
3167
        for inst_uuid in non_primary_inst_uuids:
3168
          test = inst_uuid in self.all_inst_info
3169
          self._ErrorIf(test, constants.CV_EINSTANCEWRONGNODE,
3170
                        self.cfg.GetInstanceName(inst_uuid),
3171
                        "instance should not run on node %s", node_i.name)
3172
          self._ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
3173
                        "node is running unknown instance %s", inst_uuid)
3174

    
3175
    self._VerifyGroupDRBDVersion(all_nvinfo)
3176
    self._VerifyGroupLVM(node_image, vg_name)
3177

    
3178
    for node_uuid, result in extra_lv_nvinfo.items():
3179
      self._UpdateNodeVolumes(self.all_node_info[node_uuid], result.payload,
3180
                              node_image[node_uuid], vg_name)
3181

    
3182
    feedback_fn("* Verifying instance status")
3183
    for inst_uuid in self.my_inst_uuids:
3184
      instance = self.my_inst_info[inst_uuid]
3185
      if verbose:
3186
        feedback_fn("* Verifying instance %s" % instance.name)
3187
      self._VerifyInstance(instance, node_image, instdisk[inst_uuid])
3188

    
3189
      # If the instance is non-redundant we cannot survive losing its primary
3190
      # node, so we are not N+1 compliant.
3191
      if instance.disk_template not in constants.DTS_MIRRORED:
3192
        i_non_redundant.append(instance)
3193

    
3194
      if not cluster.FillBE(instance)[constants.BE_AUTO_BALANCE]:
3195
        i_non_a_balanced.append(instance)
3196

    
3197
    feedback_fn("* Verifying orphan volumes")
3198
    reserved = utils.FieldSet(*cluster.reserved_lvs)
3199

    
3200
    # We will get spurious "unknown volume" warnings if any node of this group
3201
    # is secondary for an instance whose primary is in another group. To avoid
3202
    # them, we find these instances and add their volumes to node_vol_should.
3203
    for instance in self.all_inst_info.values():
3204
      for secondary in instance.secondary_nodes:
3205
        if (secondary in self.my_node_info
3206
            and instance.name not in self.my_inst_info):
3207
          instance.MapLVsByNode(node_vol_should)
3208
          break
3209

    
3210
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
3211

    
3212
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
3213
      feedback_fn("* Verifying N+1 Memory redundancy")
3214
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
3215

    
3216
    feedback_fn("* Other Notes")
3217
    if i_non_redundant:
3218
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
3219
                  % len(i_non_redundant))
3220

    
3221
    if i_non_a_balanced:
3222
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
3223
                  % len(i_non_a_balanced))
3224

    
3225
    if i_offline:
3226
      feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
3227

    
3228
    if n_offline:
3229
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
3230

    
3231
    if n_drained:
3232
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
3233

    
3234
    return not self.bad
3235

    
3236
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
3237
    """Analyze the post-hooks' result
3238

3239
    This method analyses the hook result, handles it, and sends some
3240
    nicely-formatted feedback back to the user.
3241

3242
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
3243
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
3244
    @param hooks_results: the results of the multi-node hooks rpc call
3245
    @param feedback_fn: function used send feedback back to the caller
3246
    @param lu_result: previous Exec result
3247
    @return: the new Exec result, based on the previous result
3248
        and hook results
3249

3250
    """
3251
    # We only really run POST phase hooks, only for non-empty groups,
3252
    # and are only interested in their results
3253
    if not self.my_node_uuids:
3254
      # empty node group
3255
      pass
3256
    elif phase == constants.HOOKS_PHASE_POST:
3257
      # Used to change hooks' output to proper indentation
3258
      feedback_fn("* Hooks Results")
3259
      assert hooks_results, "invalid result from hooks"
3260

    
3261
      for node_name in hooks_results:
3262
        res = hooks_results[node_name]
3263
        msg = res.fail_msg
3264
        test = msg and not res.offline
3265
        self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3266
                      "Communication failure in hooks execution: %s", msg)
3267
        if res.offline or msg:
3268
          # No need to investigate payload if node is offline or gave
3269
          # an error.
3270
          continue
3271
        for script, hkr, output in res.payload:
3272
          test = hkr == constants.HKR_FAIL
3273
          self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3274
                        "Script %s failed, output:", script)
3275
          if test:
3276
            output = self._HOOKS_INDENT_RE.sub("      ", output)
3277
            feedback_fn("%s" % output)
3278
            lu_result = False
3279

    
3280
    return lu_result
3281

    
3282

    
3283
class LUClusterVerifyDisks(NoHooksLU):
3284
  """Verifies the cluster disks status.
3285

3286
  """
3287
  REQ_BGL = False
3288

    
3289
  def ExpandNames(self):
3290
    self.share_locks = ShareAll()
3291
    self.needed_locks = {
3292
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
3293
      }
3294

    
3295
  def Exec(self, feedback_fn):
3296
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3297

    
3298
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3299
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3300
                           for group in group_names])