Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / cluster.py @ 5cbf7832

History | View | Annotate | Download (116.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with the cluster."""
23

    
24
import OpenSSL
25

    
26
import copy
27
import itertools
28
import logging
29
import operator
30
import os
31
import re
32
import time
33

    
34
from ganeti import compat
35
from ganeti import constants
36
from ganeti import errors
37
from ganeti import hypervisor
38
from ganeti import locking
39
from ganeti import masterd
40
from ganeti import netutils
41
from ganeti import objects
42
from ganeti import opcodes
43
from ganeti import pathutils
44
from ganeti import query
45
from ganeti import rpc
46
from ganeti import runtime
47
from ganeti import ssh
48
from ganeti import uidpool
49
from ganeti import utils
50
from ganeti import vcluster
51

    
52
from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
53
  ResultWithJobs
54
from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55
  ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56
  GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57
  GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58
  CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59
  ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob
60

    
61
import ganeti.masterd.instance
62

    
63

    
64
class LUClusterActivateMasterIp(NoHooksLU):
65
  """Activate the master IP on the master node.
66

67
  """
68
  def Exec(self, feedback_fn):
69
    """Activate the master IP.
70

71
    """
72
    master_params = self.cfg.GetMasterNetworkParameters()
73
    ems = self.cfg.GetUseExternalMipScript()
74
    result = self.rpc.call_node_activate_master_ip(master_params.uuid,
75
                                                   master_params, ems)
76
    result.Raise("Could not activate the master IP")
77

    
78

    
79
class LUClusterDeactivateMasterIp(NoHooksLU):
80
  """Deactivate the master IP on the master node.
81

82
  """
83
  def Exec(self, feedback_fn):
84
    """Deactivate the master IP.
85

86
    """
87
    master_params = self.cfg.GetMasterNetworkParameters()
88
    ems = self.cfg.GetUseExternalMipScript()
89
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
90
                                                     master_params, ems)
91
    result.Raise("Could not deactivate the master IP")
92

    
93

    
94
class LUClusterConfigQuery(NoHooksLU):
95
  """Return configuration values.
96

97
  """
98
  REQ_BGL = False
99

    
100
  def CheckArguments(self):
101
    self.cq = ClusterQuery(None, self.op.output_fields, False)
102

    
103
  def ExpandNames(self):
104
    self.cq.ExpandNames(self)
105

    
106
  def DeclareLocks(self, level):
107
    self.cq.DeclareLocks(self, level)
108

    
109
  def Exec(self, feedback_fn):
110
    result = self.cq.OldStyleQuery(self)
111

    
112
    assert len(result) == 1
113

    
114
    return result[0]
115

    
116

    
117
class LUClusterDestroy(LogicalUnit):
118
  """Logical unit for destroying the cluster.
119

120
  """
121
  HPATH = "cluster-destroy"
122
  HTYPE = constants.HTYPE_CLUSTER
123

    
124
  def BuildHooksEnv(self):
125
    """Build hooks env.
126

127
    """
128
    return {
129
      "OP_TARGET": self.cfg.GetClusterName(),
130
      }
131

    
132
  def BuildHooksNodes(self):
133
    """Build hooks nodes.
134

135
    """
136
    return ([], [])
137

    
138
  def CheckPrereq(self):
139
    """Check prerequisites.
140

141
    This checks whether the cluster is empty.
142

143
    Any errors are signaled by raising errors.OpPrereqError.
144

145
    """
146
    master = self.cfg.GetMasterNode()
147

    
148
    nodelist = self.cfg.GetNodeList()
149
    if len(nodelist) != 1 or nodelist[0] != master:
150
      raise errors.OpPrereqError("There are still %d node(s) in"
151
                                 " this cluster." % (len(nodelist) - 1),
152
                                 errors.ECODE_INVAL)
153
    instancelist = self.cfg.GetInstanceList()
154
    if instancelist:
155
      raise errors.OpPrereqError("There are still %d instance(s) in"
156
                                 " this cluster." % len(instancelist),
157
                                 errors.ECODE_INVAL)
158

    
159
  def Exec(self, feedback_fn):
160
    """Destroys the cluster.
161

162
    """
163
    master_params = self.cfg.GetMasterNetworkParameters()
164

    
165
    # Run post hooks on master node before it's removed
166
    RunPostHook(self, self.cfg.GetNodeName(master_params.uuid))
167

    
168
    ems = self.cfg.GetUseExternalMipScript()
169
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
170
                                                     master_params, ems)
171
    result.Warn("Error disabling the master IP address", self.LogWarning)
172
    return master_params.uuid
173

    
174

    
175
class LUClusterPostInit(LogicalUnit):
176
  """Logical unit for running hooks after cluster initialization.
177

178
  """
179
  HPATH = "cluster-init"
180
  HTYPE = constants.HTYPE_CLUSTER
181

    
182
  def BuildHooksEnv(self):
183
    """Build hooks env.
184

185
    """
186
    return {
187
      "OP_TARGET": self.cfg.GetClusterName(),
188
      }
189

    
190
  def BuildHooksNodes(self):
191
    """Build hooks nodes.
192

193
    """
194
    return ([], [self.cfg.GetMasterNode()])
195

    
196
  def Exec(self, feedback_fn):
197
    """Nothing to do.
198

199
    """
200
    return True
201

    
202

    
203
class ClusterQuery(QueryBase):
204
  FIELDS = query.CLUSTER_FIELDS
205

    
206
  #: Do not sort (there is only one item)
207
  SORT_FIELD = None
208

    
209
  def ExpandNames(self, lu):
210
    lu.needed_locks = {}
211

    
212
    # The following variables interact with _QueryBase._GetNames
213
    self.wanted = locking.ALL_SET
214
    self.do_locking = self.use_locking
215

    
216
    if self.do_locking:
217
      raise errors.OpPrereqError("Can not use locking for cluster queries",
218
                                 errors.ECODE_INVAL)
219

    
220
  def DeclareLocks(self, lu, level):
221
    pass
222

    
223
  def _GetQueryData(self, lu):
224
    """Computes the list of nodes and their attributes.
225

226
    """
227
    # Locking is not used
228
    assert not (compat.any(lu.glm.is_owned(level)
229
                           for level in locking.LEVELS
230
                           if level != locking.LEVEL_CLUSTER) or
231
                self.do_locking or self.use_locking)
232

    
233
    if query.CQ_CONFIG in self.requested_data:
234
      cluster = lu.cfg.GetClusterInfo()
235
      nodes = lu.cfg.GetAllNodesInfo()
236
    else:
237
      cluster = NotImplemented
238
      nodes = NotImplemented
239

    
240
    if query.CQ_QUEUE_DRAINED in self.requested_data:
241
      drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
242
    else:
243
      drain_flag = NotImplemented
244

    
245
    if query.CQ_WATCHER_PAUSE in self.requested_data:
246
      master_node_uuid = lu.cfg.GetMasterNode()
247

    
248
      result = lu.rpc.call_get_watcher_pause(master_node_uuid)
249
      result.Raise("Can't retrieve watcher pause from master node '%s'" %
250
                   lu.cfg.GetMasterNodeName())
251

    
252
      watcher_pause = result.payload
253
    else:
254
      watcher_pause = NotImplemented
255

    
256
    return query.ClusterQueryData(cluster, nodes, drain_flag, watcher_pause)
257

    
258

    
259
class LUClusterQuery(NoHooksLU):
260
  """Query cluster configuration.
261

262
  """
263
  REQ_BGL = False
264

    
265
  def ExpandNames(self):
266
    self.needed_locks = {}
267

    
268
  def Exec(self, feedback_fn):
269
    """Return cluster config.
270

271
    """
272
    cluster = self.cfg.GetClusterInfo()
273
    os_hvp = {}
274

    
275
    # Filter just for enabled hypervisors
276
    for os_name, hv_dict in cluster.os_hvp.items():
277
      os_hvp[os_name] = {}
278
      for hv_name, hv_params in hv_dict.items():
279
        if hv_name in cluster.enabled_hypervisors:
280
          os_hvp[os_name][hv_name] = hv_params
281

    
282
    # Convert ip_family to ip_version
283
    primary_ip_version = constants.IP4_VERSION
284
    if cluster.primary_ip_family == netutils.IP6Address.family:
285
      primary_ip_version = constants.IP6_VERSION
286

    
287
    result = {
288
      "software_version": constants.RELEASE_VERSION,
289
      "protocol_version": constants.PROTOCOL_VERSION,
290
      "config_version": constants.CONFIG_VERSION,
291
      "os_api_version": max(constants.OS_API_VERSIONS),
292
      "export_version": constants.EXPORT_VERSION,
293
      "vcs_version": constants.VCS_VERSION,
294
      "architecture": runtime.GetArchInfo(),
295
      "name": cluster.cluster_name,
296
      "master": self.cfg.GetMasterNodeName(),
297
      "default_hypervisor": cluster.primary_hypervisor,
298
      "enabled_hypervisors": cluster.enabled_hypervisors,
299
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
300
                        for hypervisor_name in cluster.enabled_hypervisors]),
301
      "os_hvp": os_hvp,
302
      "beparams": cluster.beparams,
303
      "osparams": cluster.osparams,
304
      "ipolicy": cluster.ipolicy,
305
      "nicparams": cluster.nicparams,
306
      "ndparams": cluster.ndparams,
307
      "diskparams": cluster.diskparams,
308
      "candidate_pool_size": cluster.candidate_pool_size,
309
      "master_netdev": cluster.master_netdev,
310
      "master_netmask": cluster.master_netmask,
311
      "use_external_mip_script": cluster.use_external_mip_script,
312
      "volume_group_name": cluster.volume_group_name,
313
      "drbd_usermode_helper": cluster.drbd_usermode_helper,
314
      "file_storage_dir": cluster.file_storage_dir,
315
      "shared_file_storage_dir": cluster.shared_file_storage_dir,
316
      "maintain_node_health": cluster.maintain_node_health,
317
      "ctime": cluster.ctime,
318
      "mtime": cluster.mtime,
319
      "uuid": cluster.uuid,
320
      "tags": list(cluster.GetTags()),
321
      "uid_pool": cluster.uid_pool,
322
      "default_iallocator": cluster.default_iallocator,
323
      "reserved_lvs": cluster.reserved_lvs,
324
      "primary_ip_version": primary_ip_version,
325
      "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
326
      "hidden_os": cluster.hidden_os,
327
      "blacklisted_os": cluster.blacklisted_os,
328
      "enabled_disk_templates": cluster.enabled_disk_templates,
329
      }
330

    
331
    return result
332

    
333

    
334
class LUClusterRedistConf(NoHooksLU):
335
  """Force the redistribution of cluster configuration.
336

337
  This is a very simple LU.
338

339
  """
340
  REQ_BGL = False
341

    
342
  def ExpandNames(self):
343
    self.needed_locks = {
344
      locking.LEVEL_NODE: locking.ALL_SET,
345
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
346
    }
347
    self.share_locks = ShareAll()
348

    
349
  def Exec(self, feedback_fn):
350
    """Redistribute the configuration.
351

352
    """
353
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
354
    RedistributeAncillaryFiles(self)
355

    
356

    
357
class LUClusterRename(LogicalUnit):
358
  """Rename the cluster.
359

360
  """
361
  HPATH = "cluster-rename"
362
  HTYPE = constants.HTYPE_CLUSTER
363

    
364
  def BuildHooksEnv(self):
365
    """Build hooks env.
366

367
    """
368
    return {
369
      "OP_TARGET": self.cfg.GetClusterName(),
370
      "NEW_NAME": self.op.name,
371
      }
372

    
373
  def BuildHooksNodes(self):
374
    """Build hooks nodes.
375

376
    """
377
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
378

    
379
  def CheckPrereq(self):
380
    """Verify that the passed name is a valid one.
381

382
    """
383
    hostname = netutils.GetHostname(name=self.op.name,
384
                                    family=self.cfg.GetPrimaryIPFamily())
385

    
386
    new_name = hostname.name
387
    self.ip = new_ip = hostname.ip
388
    old_name = self.cfg.GetClusterName()
389
    old_ip = self.cfg.GetMasterIP()
390
    if new_name == old_name and new_ip == old_ip:
391
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
392
                                 " cluster has changed",
393
                                 errors.ECODE_INVAL)
394
    if new_ip != old_ip:
395
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
396
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
397
                                   " reachable on the network" %
398
                                   new_ip, errors.ECODE_NOTUNIQUE)
399

    
400
    self.op.name = new_name
401

    
402
  def Exec(self, feedback_fn):
403
    """Rename the cluster.
404

405
    """
406
    clustername = self.op.name
407
    new_ip = self.ip
408

    
409
    # shutdown the master IP
410
    master_params = self.cfg.GetMasterNetworkParameters()
411
    ems = self.cfg.GetUseExternalMipScript()
412
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
413
                                                     master_params, ems)
414
    result.Raise("Could not disable the master role")
415

    
416
    try:
417
      cluster = self.cfg.GetClusterInfo()
418
      cluster.cluster_name = clustername
419
      cluster.master_ip = new_ip
420
      self.cfg.Update(cluster, feedback_fn)
421

    
422
      # update the known hosts file
423
      ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
424
      node_list = self.cfg.GetOnlineNodeList()
425
      try:
426
        node_list.remove(master_params.uuid)
427
      except ValueError:
428
        pass
429
      UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
430
    finally:
431
      master_params.ip = new_ip
432
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
433
                                                     master_params, ems)
434
      result.Warn("Could not re-enable the master role on the master,"
435
                  " please restart manually", self.LogWarning)
436

    
437
    return clustername
438

    
439

    
440
class LUClusterRepairDiskSizes(NoHooksLU):
441
  """Verifies the cluster disks sizes.
442

443
  """
444
  REQ_BGL = False
445

    
446
  def ExpandNames(self):
447
    if self.op.instances:
448
      (_, self.wanted_names) = GetWantedInstances(self, self.op.instances)
449
      # Not getting the node allocation lock as only a specific set of
450
      # instances (and their nodes) is going to be acquired
451
      self.needed_locks = {
452
        locking.LEVEL_NODE_RES: [],
453
        locking.LEVEL_INSTANCE: self.wanted_names,
454
        }
455
      self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
456
    else:
457
      self.wanted_names = None
458
      self.needed_locks = {
459
        locking.LEVEL_NODE_RES: locking.ALL_SET,
460
        locking.LEVEL_INSTANCE: locking.ALL_SET,
461

    
462
        # This opcode is acquires the node locks for all instances
463
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
464
        }
465

    
466
    self.share_locks = {
467
      locking.LEVEL_NODE_RES: 1,
468
      locking.LEVEL_INSTANCE: 0,
469
      locking.LEVEL_NODE_ALLOC: 1,
470
      }
471

    
472
  def DeclareLocks(self, level):
473
    if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
474
      self._LockInstancesNodes(primary_only=True, level=level)
475

    
476
  def CheckPrereq(self):
477
    """Check prerequisites.
478

479
    This only checks the optional instance list against the existing names.
480

481
    """
482
    if self.wanted_names is None:
483
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
484

    
485
    self.wanted_instances = \
486
        map(compat.snd, self.cfg.GetMultiInstanceInfoByName(self.wanted_names))
487

    
488
  def _EnsureChildSizes(self, disk):
489
    """Ensure children of the disk have the needed disk size.
490

491
    This is valid mainly for DRBD8 and fixes an issue where the
492
    children have smaller disk size.
493

494
    @param disk: an L{ganeti.objects.Disk} object
495

496
    """
497
    if disk.dev_type == constants.LD_DRBD8:
498
      assert disk.children, "Empty children for DRBD8?"
499
      fchild = disk.children[0]
500
      mismatch = fchild.size < disk.size
501
      if mismatch:
502
        self.LogInfo("Child disk has size %d, parent %d, fixing",
503
                     fchild.size, disk.size)
504
        fchild.size = disk.size
505

    
506
      # and we recurse on this child only, not on the metadev
507
      return self._EnsureChildSizes(fchild) or mismatch
508
    else:
509
      return False
510

    
511
  def Exec(self, feedback_fn):
512
    """Verify the size of cluster disks.
513

514
    """
515
    # TODO: check child disks too
516
    # TODO: check differences in size between primary/secondary nodes
517
    per_node_disks = {}
518
    for instance in self.wanted_instances:
519
      pnode = instance.primary_node
520
      if pnode not in per_node_disks:
521
        per_node_disks[pnode] = []
522
      for idx, disk in enumerate(instance.disks):
523
        per_node_disks[pnode].append((instance, idx, disk))
524

    
525
    assert not (frozenset(per_node_disks.keys()) -
526
                self.owned_locks(locking.LEVEL_NODE_RES)), \
527
      "Not owning correct locks"
528
    assert not self.owned_locks(locking.LEVEL_NODE)
529

    
530
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
531
                                               per_node_disks.keys())
532

    
533
    changed = []
534
    for node_uuid, dskl in per_node_disks.items():
535
      newl = [v[2].Copy() for v in dskl]
536
      for dsk in newl:
537
        self.cfg.SetDiskID(dsk, node_uuid)
538
      node_name = self.cfg.GetNodeName(node_uuid)
539
      result = self.rpc.call_blockdev_getdimensions(node_uuid, newl)
540
      if result.fail_msg:
541
        self.LogWarning("Failure in blockdev_getdimensions call to node"
542
                        " %s, ignoring", node_name)
543
        continue
544
      if len(result.payload) != len(dskl):
545
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
546
                        " result.payload=%s", node_name, len(dskl),
547
                        result.payload)
548
        self.LogWarning("Invalid result from node %s, ignoring node results",
549
                        node_name)
550
        continue
551
      for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
552
        if dimensions is None:
553
          self.LogWarning("Disk %d of instance %s did not return size"
554
                          " information, ignoring", idx, instance.name)
555
          continue
556
        if not isinstance(dimensions, (tuple, list)):
557
          self.LogWarning("Disk %d of instance %s did not return valid"
558
                          " dimension information, ignoring", idx,
559
                          instance.name)
560
          continue
561
        (size, spindles) = dimensions
562
        if not isinstance(size, (int, long)):
563
          self.LogWarning("Disk %d of instance %s did not return valid"
564
                          " size information, ignoring", idx, instance.name)
565
          continue
566
        size = size >> 20
567
        if size != disk.size:
568
          self.LogInfo("Disk %d of instance %s has mismatched size,"
569
                       " correcting: recorded %d, actual %d", idx,
570
                       instance.name, disk.size, size)
571
          disk.size = size
572
          self.cfg.Update(instance, feedback_fn)
573
          changed.append((instance.name, idx, "size", size))
574
        if es_flags[node_uuid]:
575
          if spindles is None:
576
            self.LogWarning("Disk %d of instance %s did not return valid"
577
                            " spindles information, ignoring", idx,
578
                            instance.name)
579
          elif disk.spindles is None or disk.spindles != spindles:
580
            self.LogInfo("Disk %d of instance %s has mismatched spindles,"
581
                         " correcting: recorded %s, actual %s",
582
                         idx, instance.name, disk.spindles, spindles)
583
            disk.spindles = spindles
584
            self.cfg.Update(instance, feedback_fn)
585
            changed.append((instance.name, idx, "spindles", disk.spindles))
586
        if self._EnsureChildSizes(disk):
587
          self.cfg.Update(instance, feedback_fn)
588
          changed.append((instance.name, idx, "size", disk.size))
589
    return changed
590

    
591

    
592
def _ValidateNetmask(cfg, netmask):
593
  """Checks if a netmask is valid.
594

595
  @type cfg: L{config.ConfigWriter}
596
  @param cfg: The cluster configuration
597
  @type netmask: int
598
  @param netmask: the netmask to be verified
599
  @raise errors.OpPrereqError: if the validation fails
600

601
  """
602
  ip_family = cfg.GetPrimaryIPFamily()
603
  try:
604
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
605
  except errors.ProgrammerError:
606
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
607
                               ip_family, errors.ECODE_INVAL)
608
  if not ipcls.ValidateNetmask(netmask):
609
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
610
                               (netmask), errors.ECODE_INVAL)
611

    
612

    
613
def CheckFileBasedStoragePathVsEnabledDiskTemplates(
614
    logging_warn_fn, file_storage_dir, enabled_disk_templates,
615
    file_disk_template):
616
  """Checks whether the given file-based storage directory is acceptable.
617

618
  Note: This function is public, because it is also used in bootstrap.py.
619

620
  @type logging_warn_fn: function
621
  @param logging_warn_fn: function which accepts a string and logs it
622
  @type file_storage_dir: string
623
  @param file_storage_dir: the directory to be used for file-based instances
624
  @type enabled_disk_templates: list of string
625
  @param enabled_disk_templates: the list of enabled disk templates
626
  @type file_disk_template: string
627
  @param file_disk_template: the file-based disk template for which the
628
      path should be checked
629

630
  """
631
  assert (file_disk_template in
632
          utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
633
  file_storage_enabled = file_disk_template in enabled_disk_templates
634
  if file_storage_dir is not None:
635
    if file_storage_dir == "":
636
      if file_storage_enabled:
637
        raise errors.OpPrereqError(
638
            "Unsetting the '%s' storage directory while having '%s' storage"
639
            " enabled is not permitted." %
640
            (file_disk_template, file_disk_template))
641
    else:
642
      if not file_storage_enabled:
643
        logging_warn_fn(
644
            "Specified a %s storage directory, although %s storage is not"
645
            " enabled." % (file_disk_template, file_disk_template))
646
  else:
647
    raise errors.ProgrammerError("Received %s storage dir with value"
648
                                 " 'None'." % file_disk_template)
649

    
650

    
651
def CheckFileStoragePathVsEnabledDiskTemplates(
652
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
653
  """Checks whether the given file storage directory is acceptable.
654

655
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
656

657
  """
658
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
659
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
660
      constants.DT_FILE)
661

    
662

    
663
def CheckSharedFileStoragePathVsEnabledDiskTemplates(
664
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
665
  """Checks whether the given shared file storage directory is acceptable.
666

667
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
668

669
  """
670
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
671
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
672
      constants.DT_SHARED_FILE)
673

    
674

    
675
class LUClusterSetParams(LogicalUnit):
676
  """Change the parameters of the cluster.
677

678
  """
679
  HPATH = "cluster-modify"
680
  HTYPE = constants.HTYPE_CLUSTER
681
  REQ_BGL = False
682

    
683
  def CheckArguments(self):
684
    """Check parameters
685

686
    """
687
    if self.op.uid_pool:
688
      uidpool.CheckUidPool(self.op.uid_pool)
689

    
690
    if self.op.add_uids:
691
      uidpool.CheckUidPool(self.op.add_uids)
692

    
693
    if self.op.remove_uids:
694
      uidpool.CheckUidPool(self.op.remove_uids)
695

    
696
    if self.op.master_netmask is not None:
697
      _ValidateNetmask(self.cfg, self.op.master_netmask)
698

    
699
    if self.op.diskparams:
700
      for dt_params in self.op.diskparams.values():
701
        utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
702
      try:
703
        utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
704
      except errors.OpPrereqError, err:
705
        raise errors.OpPrereqError("While verify diskparams options: %s" % err,
706
                                   errors.ECODE_INVAL)
707

    
708
  def ExpandNames(self):
709
    # FIXME: in the future maybe other cluster params won't require checking on
710
    # all nodes to be modified.
711
    # FIXME: This opcode changes cluster-wide settings. Is acquiring all
712
    # resource locks the right thing, shouldn't it be the BGL instead?
713
    self.needed_locks = {
714
      locking.LEVEL_NODE: locking.ALL_SET,
715
      locking.LEVEL_INSTANCE: locking.ALL_SET,
716
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
717
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
718
    }
719
    self.share_locks = ShareAll()
720

    
721
  def BuildHooksEnv(self):
722
    """Build hooks env.
723

724
    """
725
    return {
726
      "OP_TARGET": self.cfg.GetClusterName(),
727
      "NEW_VG_NAME": self.op.vg_name,
728
      }
729

    
730
  def BuildHooksNodes(self):
731
    """Build hooks nodes.
732

733
    """
734
    mn = self.cfg.GetMasterNode()
735
    return ([mn], [mn])
736

    
737
  def _CheckVgName(self, node_uuids, enabled_disk_templates,
738
                   new_enabled_disk_templates):
739
    """Check the consistency of the vg name on all nodes and in case it gets
740
       unset whether there are instances still using it.
741

742
    """
743
    if self.op.vg_name is not None and not self.op.vg_name:
744
      if self.cfg.HasAnyDiskOfType(constants.LD_LV):
745
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
746
                                   " instances exist", errors.ECODE_INVAL)
747

    
748
    if (self.op.vg_name is not None and
749
        utils.IsLvmEnabled(enabled_disk_templates)) or \
750
           (self.cfg.GetVGName() is not None and
751
            utils.LvmGetsEnabled(enabled_disk_templates,
752
                                 new_enabled_disk_templates)):
753
      self._CheckVgNameOnNodes(node_uuids)
754

    
755
  def _CheckVgNameOnNodes(self, node_uuids):
756
    """Check the status of the volume group on each node.
757

758
    """
759
    vglist = self.rpc.call_vg_list(node_uuids)
760
    for node_uuid in node_uuids:
761
      msg = vglist[node_uuid].fail_msg
762
      if msg:
763
        # ignoring down node
764
        self.LogWarning("Error while gathering data on node %s"
765
                        " (ignoring node): %s",
766
                        self.cfg.GetNodeName(node_uuid), msg)
767
        continue
768
      vgstatus = utils.CheckVolumeGroupSize(vglist[node_uuid].payload,
769
                                            self.op.vg_name,
770
                                            constants.MIN_VG_SIZE)
771
      if vgstatus:
772
        raise errors.OpPrereqError("Error on node '%s': %s" %
773
                                   (self.cfg.GetNodeName(node_uuid), vgstatus),
774
                                   errors.ECODE_ENVIRON)
775

    
776
  def _GetEnabledDiskTemplates(self, cluster):
777
    """Determines the enabled disk templates and the subset of disk templates
778
       that are newly enabled by this operation.
779

780
    """
781
    enabled_disk_templates = None
782
    new_enabled_disk_templates = []
783
    if self.op.enabled_disk_templates:
784
      enabled_disk_templates = self.op.enabled_disk_templates
785
      new_enabled_disk_templates = \
786
        list(set(enabled_disk_templates)
787
             - set(cluster.enabled_disk_templates))
788
    else:
789
      enabled_disk_templates = cluster.enabled_disk_templates
790
    return (enabled_disk_templates, new_enabled_disk_templates)
791

    
792
  def CheckPrereq(self):
793
    """Check prerequisites.
794

795
    This checks whether the given params don't conflict and
796
    if the given volume group is valid.
797

798
    """
799
    if self.op.drbd_helper is not None and not self.op.drbd_helper:
800
      if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
801
        raise errors.OpPrereqError("Cannot disable drbd helper while"
802
                                   " drbd-based instances exist",
803
                                   errors.ECODE_INVAL)
804

    
805
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
806
    self.cluster = cluster = self.cfg.GetClusterInfo()
807

    
808
    vm_capable_node_uuids = [node.uuid
809
                             for node in self.cfg.GetAllNodesInfo().values()
810
                             if node.uuid in node_uuids and node.vm_capable]
811

    
812
    (enabled_disk_templates, new_enabled_disk_templates) = \
813
      self._GetEnabledDiskTemplates(cluster)
814

    
815
    self._CheckVgName(vm_capable_node_uuids, enabled_disk_templates,
816
                      new_enabled_disk_templates)
817

    
818
    if self.op.file_storage_dir is not None:
819
      CheckFileStoragePathVsEnabledDiskTemplates(
820
          self.LogWarning, self.op.file_storage_dir, enabled_disk_templates)
821

    
822
    if self.op.shared_file_storage_dir is not None:
823
      CheckSharedFileStoragePathVsEnabledDiskTemplates(
824
          self.LogWarning, self.op.shared_file_storage_dir,
825
          enabled_disk_templates)
826

    
827
    if self.op.drbd_helper:
828
      # checks given drbd helper on all nodes
829
      helpers = self.rpc.call_drbd_helper(node_uuids)
830
      for (_, ninfo) in self.cfg.GetMultiNodeInfo(node_uuids):
831
        if ninfo.offline:
832
          self.LogInfo("Not checking drbd helper on offline node %s",
833
                       ninfo.name)
834
          continue
835
        msg = helpers[ninfo.uuid].fail_msg
836
        if msg:
837
          raise errors.OpPrereqError("Error checking drbd helper on node"
838
                                     " '%s': %s" % (ninfo.name, msg),
839
                                     errors.ECODE_ENVIRON)
840
        node_helper = helpers[ninfo.uuid].payload
841
        if node_helper != self.op.drbd_helper:
842
          raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
843
                                     (ninfo.name, node_helper),
844
                                     errors.ECODE_ENVIRON)
845

    
846
    # validate params changes
847
    if self.op.beparams:
848
      objects.UpgradeBeParams(self.op.beparams)
849
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
850
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
851

    
852
    if self.op.ndparams:
853
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
854
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
855

    
856
      # TODO: we need a more general way to handle resetting
857
      # cluster-level parameters to default values
858
      if self.new_ndparams["oob_program"] == "":
859
        self.new_ndparams["oob_program"] = \
860
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
861

    
862
    if self.op.hv_state:
863
      new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
864
                                           self.cluster.hv_state_static)
865
      self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
866
                               for hv, values in new_hv_state.items())
867

    
868
    if self.op.disk_state:
869
      new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
870
                                               self.cluster.disk_state_static)
871
      self.new_disk_state = \
872
        dict((storage, dict((name, cluster.SimpleFillDiskState(values))
873
                            for name, values in svalues.items()))
874
             for storage, svalues in new_disk_state.items())
875

    
876
    if self.op.ipolicy:
877
      self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
878
                                           group_policy=False)
879

    
880
      all_instances = self.cfg.GetAllInstancesInfo().values()
881
      violations = set()
882
      for group in self.cfg.GetAllNodeGroupsInfo().values():
883
        instances = frozenset([inst for inst in all_instances
884
                               if compat.any(nuuid in group.members
885
                                             for nuuid in inst.all_nodes)])
886
        new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
887
        ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
888
        new = ComputeNewInstanceViolations(ipol, new_ipolicy, instances,
889
                                           self.cfg)
890
        if new:
891
          violations.update(new)
892

    
893
      if violations:
894
        self.LogWarning("After the ipolicy change the following instances"
895
                        " violate them: %s",
896
                        utils.CommaJoin(utils.NiceSort(violations)))
897

    
898
    if self.op.nicparams:
899
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
900
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
901
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
902
      nic_errors = []
903

    
904
      # check all instances for consistency
905
      for instance in self.cfg.GetAllInstancesInfo().values():
906
        for nic_idx, nic in enumerate(instance.nics):
907
          params_copy = copy.deepcopy(nic.nicparams)
908
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
909

    
910
          # check parameter syntax
911
          try:
912
            objects.NIC.CheckParameterSyntax(params_filled)
913
          except errors.ConfigurationError, err:
914
            nic_errors.append("Instance %s, nic/%d: %s" %
915
                              (instance.name, nic_idx, err))
916

    
917
          # if we're moving instances to routed, check that they have an ip
918
          target_mode = params_filled[constants.NIC_MODE]
919
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
920
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
921
                              " address" % (instance.name, nic_idx))
922
      if nic_errors:
923
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
924
                                   "\n".join(nic_errors), errors.ECODE_INVAL)
925

    
926
    # hypervisor list/parameters
927
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
928
    if self.op.hvparams:
929
      for hv_name, hv_dict in self.op.hvparams.items():
930
        if hv_name not in self.new_hvparams:
931
          self.new_hvparams[hv_name] = hv_dict
932
        else:
933
          self.new_hvparams[hv_name].update(hv_dict)
934

    
935
    # disk template parameters
936
    self.new_diskparams = objects.FillDict(cluster.diskparams, {})
937
    if self.op.diskparams:
938
      for dt_name, dt_params in self.op.diskparams.items():
939
        if dt_name not in self.new_diskparams:
940
          self.new_diskparams[dt_name] = dt_params
941
        else:
942
          self.new_diskparams[dt_name].update(dt_params)
943

    
944
    # os hypervisor parameters
945
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
946
    if self.op.os_hvp:
947
      for os_name, hvs in self.op.os_hvp.items():
948
        if os_name not in self.new_os_hvp:
949
          self.new_os_hvp[os_name] = hvs
950
        else:
951
          for hv_name, hv_dict in hvs.items():
952
            if hv_dict is None:
953
              # Delete if it exists
954
              self.new_os_hvp[os_name].pop(hv_name, None)
955
            elif hv_name not in self.new_os_hvp[os_name]:
956
              self.new_os_hvp[os_name][hv_name] = hv_dict
957
            else:
958
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
959

    
960
    # os parameters
961
    self.new_osp = objects.FillDict(cluster.osparams, {})
962
    if self.op.osparams:
963
      for os_name, osp in self.op.osparams.items():
964
        if os_name not in self.new_osp:
965
          self.new_osp[os_name] = {}
966

    
967
        self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
968
                                                 use_none=True)
969

    
970
        if not self.new_osp[os_name]:
971
          # we removed all parameters
972
          del self.new_osp[os_name]
973
        else:
974
          # check the parameter validity (remote check)
975
          CheckOSParams(self, False, [self.cfg.GetMasterNode()],
976
                        os_name, self.new_osp[os_name])
977

    
978
    # changes to the hypervisor list
979
    if self.op.enabled_hypervisors is not None:
980
      self.hv_list = self.op.enabled_hypervisors
981
      for hv in self.hv_list:
982
        # if the hypervisor doesn't already exist in the cluster
983
        # hvparams, we initialize it to empty, and then (in both
984
        # cases) we make sure to fill the defaults, as we might not
985
        # have a complete defaults list if the hypervisor wasn't
986
        # enabled before
987
        if hv not in new_hvp:
988
          new_hvp[hv] = {}
989
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
990
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
991
    else:
992
      self.hv_list = cluster.enabled_hypervisors
993

    
994
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
995
      # either the enabled list has changed, or the parameters have, validate
996
      for hv_name, hv_params in self.new_hvparams.items():
997
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
998
            (self.op.enabled_hypervisors and
999
             hv_name in self.op.enabled_hypervisors)):
1000
          # either this is a new hypervisor, or its parameters have changed
1001
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1002
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1003
          hv_class.CheckParameterSyntax(hv_params)
1004
          CheckHVParams(self, node_uuids, hv_name, hv_params)
1005

    
1006
    self._CheckDiskTemplateConsistency()
1007

    
1008
    if self.op.os_hvp:
1009
      # no need to check any newly-enabled hypervisors, since the
1010
      # defaults have already been checked in the above code-block
1011
      for os_name, os_hvp in self.new_os_hvp.items():
1012
        for hv_name, hv_params in os_hvp.items():
1013
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1014
          # we need to fill in the new os_hvp on top of the actual hv_p
1015
          cluster_defaults = self.new_hvparams.get(hv_name, {})
1016
          new_osp = objects.FillDict(cluster_defaults, hv_params)
1017
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1018
          hv_class.CheckParameterSyntax(new_osp)
1019
          CheckHVParams(self, node_uuids, hv_name, new_osp)
1020

    
1021
    if self.op.default_iallocator:
1022
      alloc_script = utils.FindFile(self.op.default_iallocator,
1023
                                    constants.IALLOCATOR_SEARCH_PATH,
1024
                                    os.path.isfile)
1025
      if alloc_script is None:
1026
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
1027
                                   " specified" % self.op.default_iallocator,
1028
                                   errors.ECODE_INVAL)
1029

    
1030
  def _CheckDiskTemplateConsistency(self):
1031
    """Check whether the disk templates that are going to be disabled
1032
       are still in use by some instances.
1033

1034
    """
1035
    if self.op.enabled_disk_templates:
1036
      cluster = self.cfg.GetClusterInfo()
1037
      instances = self.cfg.GetAllInstancesInfo()
1038

    
1039
      disk_templates_to_remove = set(cluster.enabled_disk_templates) \
1040
        - set(self.op.enabled_disk_templates)
1041
      for instance in instances.itervalues():
1042
        if instance.disk_template in disk_templates_to_remove:
1043
          raise errors.OpPrereqError("Cannot disable disk template '%s',"
1044
                                     " because instance '%s' is using it." %
1045
                                     (instance.disk_template, instance.name))
1046

    
1047
  def _SetVgName(self, feedback_fn):
1048
    """Determines and sets the new volume group name.
1049

1050
    """
1051
    if self.op.vg_name is not None:
1052
      if self.op.vg_name and not \
1053
           utils.IsLvmEnabled(self.cluster.enabled_disk_templates):
1054
        feedback_fn("Note that you specified a volume group, but did not"
1055
                    " enable any lvm disk template.")
1056
      new_volume = self.op.vg_name
1057
      if not new_volume:
1058
        if utils.IsLvmEnabled(self.cluster.enabled_disk_templates):
1059
          raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
1060
                                     " disk templates are enabled.")
1061
        new_volume = None
1062
      if new_volume != self.cfg.GetVGName():
1063
        self.cfg.SetVGName(new_volume)
1064
      else:
1065
        feedback_fn("Cluster LVM configuration already in desired"
1066
                    " state, not changing")
1067
    else:
1068
      if utils.IsLvmEnabled(self.cluster.enabled_disk_templates) and \
1069
          not self.cfg.GetVGName():
1070
        raise errors.OpPrereqError("Please specify a volume group when"
1071
                                   " enabling lvm-based disk-templates.")
1072

    
1073
  def _SetFileStorageDir(self, feedback_fn):
1074
    """Set the file storage directory.
1075

1076
    """
1077
    if self.op.file_storage_dir is not None:
1078
      if self.cluster.file_storage_dir == self.op.file_storage_dir:
1079
        feedback_fn("Global file storage dir already set to value '%s'"
1080
                    % self.cluster.file_storage_dir)
1081
      else:
1082
        self.cluster.file_storage_dir = self.op.file_storage_dir
1083

    
1084
  def Exec(self, feedback_fn):
1085
    """Change the parameters of the cluster.
1086

1087
    """
1088
    if self.op.enabled_disk_templates:
1089
      self.cluster.enabled_disk_templates = \
1090
        list(set(self.op.enabled_disk_templates))
1091

    
1092
    self._SetVgName(feedback_fn)
1093
    self._SetFileStorageDir(feedback_fn)
1094

    
1095
    if self.op.drbd_helper is not None:
1096
      if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
1097
        feedback_fn("Note that you specified a drbd user helper, but did not"
1098
                    " enable the drbd disk template.")
1099
      new_helper = self.op.drbd_helper
1100
      if not new_helper:
1101
        new_helper = None
1102
      if new_helper != self.cfg.GetDRBDHelper():
1103
        self.cfg.SetDRBDHelper(new_helper)
1104
      else:
1105
        feedback_fn("Cluster DRBD helper already in desired state,"
1106
                    " not changing")
1107
    if self.op.hvparams:
1108
      self.cluster.hvparams = self.new_hvparams
1109
    if self.op.os_hvp:
1110
      self.cluster.os_hvp = self.new_os_hvp
1111
    if self.op.enabled_hypervisors is not None:
1112
      self.cluster.hvparams = self.new_hvparams
1113
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1114
    if self.op.beparams:
1115
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1116
    if self.op.nicparams:
1117
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1118
    if self.op.ipolicy:
1119
      self.cluster.ipolicy = self.new_ipolicy
1120
    if self.op.osparams:
1121
      self.cluster.osparams = self.new_osp
1122
    if self.op.ndparams:
1123
      self.cluster.ndparams = self.new_ndparams
1124
    if self.op.diskparams:
1125
      self.cluster.diskparams = self.new_diskparams
1126
    if self.op.hv_state:
1127
      self.cluster.hv_state_static = self.new_hv_state
1128
    if self.op.disk_state:
1129
      self.cluster.disk_state_static = self.new_disk_state
1130

    
1131
    if self.op.candidate_pool_size is not None:
1132
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1133
      # we need to update the pool size here, otherwise the save will fail
1134
      AdjustCandidatePool(self, [])
1135

    
1136
    if self.op.maintain_node_health is not None:
1137
      if self.op.maintain_node_health and not constants.ENABLE_CONFD:
1138
        feedback_fn("Note: CONFD was disabled at build time, node health"
1139
                    " maintenance is not useful (still enabling it)")
1140
      self.cluster.maintain_node_health = self.op.maintain_node_health
1141

    
1142
    if self.op.modify_etc_hosts is not None:
1143
      self.cluster.modify_etc_hosts = self.op.modify_etc_hosts
1144

    
1145
    if self.op.prealloc_wipe_disks is not None:
1146
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
1147

    
1148
    if self.op.add_uids is not None:
1149
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
1150

    
1151
    if self.op.remove_uids is not None:
1152
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
1153

    
1154
    if self.op.uid_pool is not None:
1155
      self.cluster.uid_pool = self.op.uid_pool
1156

    
1157
    if self.op.default_iallocator is not None:
1158
      self.cluster.default_iallocator = self.op.default_iallocator
1159

    
1160
    if self.op.reserved_lvs is not None:
1161
      self.cluster.reserved_lvs = self.op.reserved_lvs
1162

    
1163
    if self.op.use_external_mip_script is not None:
1164
      self.cluster.use_external_mip_script = self.op.use_external_mip_script
1165

    
1166
    def helper_os(aname, mods, desc):
1167
      desc += " OS list"
1168
      lst = getattr(self.cluster, aname)
1169
      for key, val in mods:
1170
        if key == constants.DDM_ADD:
1171
          if val in lst:
1172
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1173
          else:
1174
            lst.append(val)
1175
        elif key == constants.DDM_REMOVE:
1176
          if val in lst:
1177
            lst.remove(val)
1178
          else:
1179
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1180
        else:
1181
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
1182

    
1183
    if self.op.hidden_os:
1184
      helper_os("hidden_os", self.op.hidden_os, "hidden")
1185

    
1186
    if self.op.blacklisted_os:
1187
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1188

    
1189
    if self.op.master_netdev:
1190
      master_params = self.cfg.GetMasterNetworkParameters()
1191
      ems = self.cfg.GetUseExternalMipScript()
1192
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
1193
                  self.cluster.master_netdev)
1194
      result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
1195
                                                       master_params, ems)
1196
      if not self.op.force:
1197
        result.Raise("Could not disable the master ip")
1198
      else:
1199
        if result.fail_msg:
1200
          msg = ("Could not disable the master ip (continuing anyway): %s" %
1201
                 result.fail_msg)
1202
          feedback_fn(msg)
1203
      feedback_fn("Changing master_netdev from %s to %s" %
1204
                  (master_params.netdev, self.op.master_netdev))
1205
      self.cluster.master_netdev = self.op.master_netdev
1206

    
1207
    if self.op.master_netmask:
1208
      master_params = self.cfg.GetMasterNetworkParameters()
1209
      feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1210
      result = self.rpc.call_node_change_master_netmask(
1211
                 master_params.uuid, master_params.netmask,
1212
                 self.op.master_netmask, master_params.ip,
1213
                 master_params.netdev)
1214
      result.Warn("Could not change the master IP netmask", feedback_fn)
1215
      self.cluster.master_netmask = self.op.master_netmask
1216

    
1217
    self.cfg.Update(self.cluster, feedback_fn)
1218

    
1219
    if self.op.master_netdev:
1220
      master_params = self.cfg.GetMasterNetworkParameters()
1221
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
1222
                  self.op.master_netdev)
1223
      ems = self.cfg.GetUseExternalMipScript()
1224
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
1225
                                                     master_params, ems)
1226
      result.Warn("Could not re-enable the master ip on the master,"
1227
                  " please restart manually", self.LogWarning)
1228

    
1229

    
1230
class LUClusterVerify(NoHooksLU):
1231
  """Submits all jobs necessary to verify the cluster.
1232

1233
  """
1234
  REQ_BGL = False
1235

    
1236
  def ExpandNames(self):
1237
    self.needed_locks = {}
1238

    
1239
  def Exec(self, feedback_fn):
1240
    jobs = []
1241

    
1242
    if self.op.group_name:
1243
      groups = [self.op.group_name]
1244
      depends_fn = lambda: None
1245
    else:
1246
      groups = self.cfg.GetNodeGroupList()
1247

    
1248
      # Verify global configuration
1249
      jobs.append([
1250
        opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1251
        ])
1252

    
1253
      # Always depend on global verification
1254
      depends_fn = lambda: [(-len(jobs), [])]
1255

    
1256
    jobs.extend(
1257
      [opcodes.OpClusterVerifyGroup(group_name=group,
1258
                                    ignore_errors=self.op.ignore_errors,
1259
                                    depends=depends_fn())]
1260
      for group in groups)
1261

    
1262
    # Fix up all parameters
1263
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1264
      op.debug_simulate_errors = self.op.debug_simulate_errors
1265
      op.verbose = self.op.verbose
1266
      op.error_codes = self.op.error_codes
1267
      try:
1268
        op.skip_checks = self.op.skip_checks
1269
      except AttributeError:
1270
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1271

    
1272
    return ResultWithJobs(jobs)
1273

    
1274

    
1275
class _VerifyErrors(object):
1276
  """Mix-in for cluster/group verify LUs.
1277

1278
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1279
  self.op and self._feedback_fn to be available.)
1280

1281
  """
1282

    
1283
  ETYPE_FIELD = "code"
1284
  ETYPE_ERROR = "ERROR"
1285
  ETYPE_WARNING = "WARNING"
1286

    
1287
  def _Error(self, ecode, item, msg, *args, **kwargs):
1288
    """Format an error message.
1289

1290
    Based on the opcode's error_codes parameter, either format a
1291
    parseable error code, or a simpler error string.
1292

1293
    This must be called only from Exec and functions called from Exec.
1294

1295
    """
1296
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1297
    itype, etxt, _ = ecode
1298
    # If the error code is in the list of ignored errors, demote the error to a
1299
    # warning
1300
    if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1301
      ltype = self.ETYPE_WARNING
1302
    # first complete the msg
1303
    if args:
1304
      msg = msg % args
1305
    # then format the whole message
1306
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1307
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1308
    else:
1309
      if item:
1310
        item = " " + item
1311
      else:
1312
        item = ""
1313
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1314
    # and finally report it via the feedback_fn
1315
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1316
    # do not mark the operation as failed for WARN cases only
1317
    if ltype == self.ETYPE_ERROR:
1318
      self.bad = True
1319

    
1320
  def _ErrorIf(self, cond, *args, **kwargs):
1321
    """Log an error message if the passed condition is True.
1322

1323
    """
1324
    if (bool(cond)
1325
        or self.op.debug_simulate_errors): # pylint: disable=E1101
1326
      self._Error(*args, **kwargs)
1327

    
1328

    
1329
def _VerifyCertificate(filename):
1330
  """Verifies a certificate for L{LUClusterVerifyConfig}.
1331

1332
  @type filename: string
1333
  @param filename: Path to PEM file
1334

1335
  """
1336
  try:
1337
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1338
                                           utils.ReadFile(filename))
1339
  except Exception, err: # pylint: disable=W0703
1340
    return (LUClusterVerifyConfig.ETYPE_ERROR,
1341
            "Failed to load X509 certificate %s: %s" % (filename, err))
1342

    
1343
  (errcode, msg) = \
1344
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1345
                                constants.SSL_CERT_EXPIRATION_ERROR)
1346

    
1347
  if msg:
1348
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1349
  else:
1350
    fnamemsg = None
1351

    
1352
  if errcode is None:
1353
    return (None, fnamemsg)
1354
  elif errcode == utils.CERT_WARNING:
1355
    return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1356
  elif errcode == utils.CERT_ERROR:
1357
    return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1358

    
1359
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1360

    
1361

    
1362
def _GetAllHypervisorParameters(cluster, instances):
1363
  """Compute the set of all hypervisor parameters.
1364

1365
  @type cluster: L{objects.Cluster}
1366
  @param cluster: the cluster object
1367
  @param instances: list of L{objects.Instance}
1368
  @param instances: additional instances from which to obtain parameters
1369
  @rtype: list of (origin, hypervisor, parameters)
1370
  @return: a list with all parameters found, indicating the hypervisor they
1371
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1372

1373
  """
1374
  hvp_data = []
1375

    
1376
  for hv_name in cluster.enabled_hypervisors:
1377
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1378

    
1379
  for os_name, os_hvp in cluster.os_hvp.items():
1380
    for hv_name, hv_params in os_hvp.items():
1381
      if hv_params:
1382
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1383
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1384

    
1385
  # TODO: collapse identical parameter values in a single one
1386
  for instance in instances:
1387
    if instance.hvparams:
1388
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1389
                       cluster.FillHV(instance)))
1390

    
1391
  return hvp_data
1392

    
1393

    
1394
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1395
  """Verifies the cluster config.
1396

1397
  """
1398
  REQ_BGL = False
1399

    
1400
  def _VerifyHVP(self, hvp_data):
1401
    """Verifies locally the syntax of the hypervisor parameters.
1402

1403
    """
1404
    for item, hv_name, hv_params in hvp_data:
1405
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1406
             (item, hv_name))
1407
      try:
1408
        hv_class = hypervisor.GetHypervisorClass(hv_name)
1409
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1410
        hv_class.CheckParameterSyntax(hv_params)
1411
      except errors.GenericError, err:
1412
        self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1413

    
1414
  def ExpandNames(self):
1415
    self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1416
    self.share_locks = ShareAll()
1417

    
1418
  def CheckPrereq(self):
1419
    """Check prerequisites.
1420

1421
    """
1422
    # Retrieve all information
1423
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1424
    self.all_node_info = self.cfg.GetAllNodesInfo()
1425
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1426

    
1427
  def Exec(self, feedback_fn):
1428
    """Verify integrity of cluster, performing various test on nodes.
1429

1430
    """
1431
    self.bad = False
1432
    self._feedback_fn = feedback_fn
1433

    
1434
    feedback_fn("* Verifying cluster config")
1435

    
1436
    for msg in self.cfg.VerifyConfig():
1437
      self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1438

    
1439
    feedback_fn("* Verifying cluster certificate files")
1440

    
1441
    for cert_filename in pathutils.ALL_CERT_FILES:
1442
      (errcode, msg) = _VerifyCertificate(cert_filename)
1443
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1444

    
1445
    self._ErrorIf(not utils.CanRead(constants.LUXID_USER,
1446
                                    pathutils.NODED_CERT_FILE),
1447
                  constants.CV_ECLUSTERCERT,
1448
                  None,
1449
                  pathutils.NODED_CERT_FILE + " must be accessible by the " +
1450
                    constants.LUXID_USER + " user")
1451

    
1452
    feedback_fn("* Verifying hypervisor parameters")
1453

    
1454
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1455
                                                self.all_inst_info.values()))
1456

    
1457
    feedback_fn("* Verifying all nodes belong to an existing group")
1458

    
1459
    # We do this verification here because, should this bogus circumstance
1460
    # occur, it would never be caught by VerifyGroup, which only acts on
1461
    # nodes/instances reachable from existing node groups.
1462

    
1463
    dangling_nodes = set(node for node in self.all_node_info.values()
1464
                         if node.group not in self.all_group_info)
1465

    
1466
    dangling_instances = {}
1467
    no_node_instances = []
1468

    
1469
    for inst in self.all_inst_info.values():
1470
      if inst.primary_node in [node.uuid for node in dangling_nodes]:
1471
        dangling_instances.setdefault(inst.primary_node, []).append(inst)
1472
      elif inst.primary_node not in self.all_node_info:
1473
        no_node_instances.append(inst)
1474

    
1475
    pretty_dangling = [
1476
        "%s (%s)" %
1477
        (node.name,
1478
         utils.CommaJoin(inst.name for
1479
                         inst in dangling_instances.get(node.uuid, [])))
1480
        for node in dangling_nodes]
1481

    
1482
    self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1483
                  None,
1484
                  "the following nodes (and their instances) belong to a non"
1485
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1486

    
1487
    self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1488
                  None,
1489
                  "the following instances have a non-existing primary-node:"
1490
                  " %s", utils.CommaJoin(inst.name for
1491
                                         inst in no_node_instances))
1492

    
1493
    return not self.bad
1494

    
1495

    
1496
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1497
  """Verifies the status of a node group.
1498

1499
  """
1500
  HPATH = "cluster-verify"
1501
  HTYPE = constants.HTYPE_CLUSTER
1502
  REQ_BGL = False
1503

    
1504
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1505

    
1506
  class NodeImage(object):
1507
    """A class representing the logical and physical status of a node.
1508

1509
    @type uuid: string
1510
    @ivar uuid: the node UUID to which this object refers
1511
    @ivar volumes: a structure as returned from
1512
        L{ganeti.backend.GetVolumeList} (runtime)
1513
    @ivar instances: a list of running instances (runtime)
1514
    @ivar pinst: list of configured primary instances (config)
1515
    @ivar sinst: list of configured secondary instances (config)
1516
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1517
        instances for which this node is secondary (config)
1518
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1519
    @ivar dfree: free disk, as reported by the node (runtime)
1520
    @ivar offline: the offline status (config)
1521
    @type rpc_fail: boolean
1522
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1523
        not whether the individual keys were correct) (runtime)
1524
    @type lvm_fail: boolean
1525
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1526
    @type hyp_fail: boolean
1527
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1528
    @type ghost: boolean
1529
    @ivar ghost: whether this is a known node or not (config)
1530
    @type os_fail: boolean
1531
    @ivar os_fail: whether the RPC call didn't return valid OS data
1532
    @type oslist: list
1533
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1534
    @type vm_capable: boolean
1535
    @ivar vm_capable: whether the node can host instances
1536
    @type pv_min: float
1537
    @ivar pv_min: size in MiB of the smallest PVs
1538
    @type pv_max: float
1539
    @ivar pv_max: size in MiB of the biggest PVs
1540

1541
    """
1542
    def __init__(self, offline=False, uuid=None, vm_capable=True):
1543
      self.uuid = uuid
1544
      self.volumes = {}
1545
      self.instances = []
1546
      self.pinst = []
1547
      self.sinst = []
1548
      self.sbp = {}
1549
      self.mfree = 0
1550
      self.dfree = 0
1551
      self.offline = offline
1552
      self.vm_capable = vm_capable
1553
      self.rpc_fail = False
1554
      self.lvm_fail = False
1555
      self.hyp_fail = False
1556
      self.ghost = False
1557
      self.os_fail = False
1558
      self.oslist = {}
1559
      self.pv_min = None
1560
      self.pv_max = None
1561

    
1562
  def ExpandNames(self):
1563
    # This raises errors.OpPrereqError on its own:
1564
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1565

    
1566
    # Get instances in node group; this is unsafe and needs verification later
1567
    inst_uuids = \
1568
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1569

    
1570
    self.needed_locks = {
1571
      locking.LEVEL_INSTANCE: self.cfg.GetInstanceNames(inst_uuids),
1572
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1573
      locking.LEVEL_NODE: [],
1574

    
1575
      # This opcode is run by watcher every five minutes and acquires all nodes
1576
      # for a group. It doesn't run for a long time, so it's better to acquire
1577
      # the node allocation lock as well.
1578
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1579
      }
1580

    
1581
    self.share_locks = ShareAll()
1582

    
1583
  def DeclareLocks(self, level):
1584
    if level == locking.LEVEL_NODE:
1585
      # Get members of node group; this is unsafe and needs verification later
1586
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1587

    
1588
      # In Exec(), we warn about mirrored instances that have primary and
1589
      # secondary living in separate node groups. To fully verify that
1590
      # volumes for these instances are healthy, we will need to do an
1591
      # extra call to their secondaries. We ensure here those nodes will
1592
      # be locked.
1593
      for inst_name in self.owned_locks(locking.LEVEL_INSTANCE):
1594
        # Important: access only the instances whose lock is owned
1595
        instance = self.cfg.GetInstanceInfoByName(inst_name)
1596
        if instance.disk_template in constants.DTS_INT_MIRROR:
1597
          nodes.update(instance.secondary_nodes)
1598

    
1599
      self.needed_locks[locking.LEVEL_NODE] = nodes
1600

    
1601
  def CheckPrereq(self):
1602
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1603
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1604

    
1605
    group_node_uuids = set(self.group_info.members)
1606
    group_inst_uuids = \
1607
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1608

    
1609
    unlocked_node_uuids = \
1610
        group_node_uuids.difference(self.owned_locks(locking.LEVEL_NODE))
1611

    
1612
    unlocked_inst_uuids = \
1613
        group_inst_uuids.difference(
1614
          [self.cfg.GetInstanceInfoByName(name).uuid
1615
           for name in self.owned_locks(locking.LEVEL_INSTANCE)])
1616

    
1617
    if unlocked_node_uuids:
1618
      raise errors.OpPrereqError(
1619
        "Missing lock for nodes: %s" %
1620
        utils.CommaJoin(self.cfg.GetNodeNames(unlocked_node_uuids)),
1621
        errors.ECODE_STATE)
1622

    
1623
    if unlocked_inst_uuids:
1624
      raise errors.OpPrereqError(
1625
        "Missing lock for instances: %s" %
1626
        utils.CommaJoin(self.cfg.GetInstanceNames(unlocked_inst_uuids)),
1627
        errors.ECODE_STATE)
1628

    
1629
    self.all_node_info = self.cfg.GetAllNodesInfo()
1630
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1631

    
1632
    self.my_node_uuids = group_node_uuids
1633
    self.my_node_info = dict((node_uuid, self.all_node_info[node_uuid])
1634
                             for node_uuid in group_node_uuids)
1635

    
1636
    self.my_inst_uuids = group_inst_uuids
1637
    self.my_inst_info = dict((inst_uuid, self.all_inst_info[inst_uuid])
1638
                             for inst_uuid in group_inst_uuids)
1639

    
1640
    # We detect here the nodes that will need the extra RPC calls for verifying
1641
    # split LV volumes; they should be locked.
1642
    extra_lv_nodes = set()
1643

    
1644
    for inst in self.my_inst_info.values():
1645
      if inst.disk_template in constants.DTS_INT_MIRROR:
1646
        for nuuid in inst.all_nodes:
1647
          if self.all_node_info[nuuid].group != self.group_uuid:
1648
            extra_lv_nodes.add(nuuid)
1649

    
1650
    unlocked_lv_nodes = \
1651
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1652

    
1653
    if unlocked_lv_nodes:
1654
      raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1655
                                 utils.CommaJoin(unlocked_lv_nodes),
1656
                                 errors.ECODE_STATE)
1657
    self.extra_lv_nodes = list(extra_lv_nodes)
1658

    
1659
  def _VerifyNode(self, ninfo, nresult):
1660
    """Perform some basic validation on data returned from a node.
1661

1662
      - check the result data structure is well formed and has all the
1663
        mandatory fields
1664
      - check ganeti version
1665

1666
    @type ninfo: L{objects.Node}
1667
    @param ninfo: the node to check
1668
    @param nresult: the results from the node
1669
    @rtype: boolean
1670
    @return: whether overall this call was successful (and we can expect
1671
         reasonable values in the respose)
1672

1673
    """
1674
    # main result, nresult should be a non-empty dict
1675
    test = not nresult or not isinstance(nresult, dict)
1676
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1677
                  "unable to verify node: no data returned")
1678
    if test:
1679
      return False
1680

    
1681
    # compares ganeti version
1682
    local_version = constants.PROTOCOL_VERSION
1683
    remote_version = nresult.get("version", None)
1684
    test = not (remote_version and
1685
                isinstance(remote_version, (list, tuple)) and
1686
                len(remote_version) == 2)
1687
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1688
                  "connection to node returned invalid data")
1689
    if test:
1690
      return False
1691

    
1692
    test = local_version != remote_version[0]
1693
    self._ErrorIf(test, constants.CV_ENODEVERSION, ninfo.name,
1694
                  "incompatible protocol versions: master %s,"
1695
                  " node %s", local_version, remote_version[0])
1696
    if test:
1697
      return False
1698

    
1699
    # node seems compatible, we can actually try to look into its results
1700

    
1701
    # full package version
1702
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1703
                  constants.CV_ENODEVERSION, ninfo.name,
1704
                  "software version mismatch: master %s, node %s",
1705
                  constants.RELEASE_VERSION, remote_version[1],
1706
                  code=self.ETYPE_WARNING)
1707

    
1708
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1709
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1710
      for hv_name, hv_result in hyp_result.iteritems():
1711
        test = hv_result is not None
1712
        self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
1713
                      "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1714

    
1715
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1716
    if ninfo.vm_capable and isinstance(hvp_result, list):
1717
      for item, hv_name, hv_result in hvp_result:
1718
        self._ErrorIf(True, constants.CV_ENODEHV, ninfo.name,
1719
                      "hypervisor %s parameter verify failure (source %s): %s",
1720
                      hv_name, item, hv_result)
1721

    
1722
    test = nresult.get(constants.NV_NODESETUP,
1723
                       ["Missing NODESETUP results"])
1724
    self._ErrorIf(test, constants.CV_ENODESETUP, ninfo.name,
1725
                  "node setup error: %s", "; ".join(test))
1726

    
1727
    return True
1728

    
1729
  def _VerifyNodeTime(self, ninfo, nresult,
1730
                      nvinfo_starttime, nvinfo_endtime):
1731
    """Check the node time.
1732

1733
    @type ninfo: L{objects.Node}
1734
    @param ninfo: the node to check
1735
    @param nresult: the remote results for the node
1736
    @param nvinfo_starttime: the start time of the RPC call
1737
    @param nvinfo_endtime: the end time of the RPC call
1738

1739
    """
1740
    ntime = nresult.get(constants.NV_TIME, None)
1741
    try:
1742
      ntime_merged = utils.MergeTime(ntime)
1743
    except (ValueError, TypeError):
1744
      self._ErrorIf(True, constants.CV_ENODETIME, ninfo.name,
1745
                    "Node returned invalid time")
1746
      return
1747

    
1748
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1749
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1750
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1751
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1752
    else:
1753
      ntime_diff = None
1754

    
1755
    self._ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, ninfo.name,
1756
                  "Node time diverges by at least %s from master node time",
1757
                  ntime_diff)
1758

    
1759
  def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1760
    """Check the node LVM results and update info for cross-node checks.
1761

1762
    @type ninfo: L{objects.Node}
1763
    @param ninfo: the node to check
1764
    @param nresult: the remote results for the node
1765
    @param vg_name: the configured VG name
1766
    @type nimg: L{NodeImage}
1767
    @param nimg: node image
1768

1769
    """
1770
    if vg_name is None:
1771
      return
1772

    
1773
    # checks vg existence and size > 20G
1774
    vglist = nresult.get(constants.NV_VGLIST, None)
1775
    test = not vglist
1776
    self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
1777
                  "unable to check volume groups")
1778
    if not test:
1779
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1780
                                            constants.MIN_VG_SIZE)
1781
      self._ErrorIf(vgstatus, constants.CV_ENODELVM, ninfo.name, vgstatus)
1782

    
1783
    # Check PVs
1784
    (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1785
    for em in errmsgs:
1786
      self._Error(constants.CV_ENODELVM, ninfo.name, em)
1787
    if pvminmax is not None:
1788
      (nimg.pv_min, nimg.pv_max) = pvminmax
1789

    
1790
  def _VerifyGroupDRBDVersion(self, node_verify_infos):
1791
    """Check cross-node DRBD version consistency.
1792

1793
    @type node_verify_infos: dict
1794
    @param node_verify_infos: infos about nodes as returned from the
1795
      node_verify call.
1796

1797
    """
1798
    node_versions = {}
1799
    for node_uuid, ndata in node_verify_infos.items():
1800
      nresult = ndata.payload
1801
      version = nresult.get(constants.NV_DRBDVERSION, "Missing DRBD version")
1802
      node_versions[node_uuid] = version
1803

    
1804
    if len(set(node_versions.values())) > 1:
1805
      for node_uuid, version in sorted(node_versions.items()):
1806
        msg = "DRBD version mismatch: %s" % version
1807
        self._Error(constants.CV_ENODEDRBDHELPER, node_uuid, msg,
1808
                    code=self.ETYPE_WARNING)
1809

    
1810
  def _VerifyGroupLVM(self, node_image, vg_name):
1811
    """Check cross-node consistency in LVM.
1812

1813
    @type node_image: dict
1814
    @param node_image: info about nodes, mapping from node to names to
1815
      L{NodeImage} objects
1816
    @param vg_name: the configured VG name
1817

1818
    """
1819
    if vg_name is None:
1820
      return
1821

    
1822
    # Only exclusive storage needs this kind of checks
1823
    if not self._exclusive_storage:
1824
      return
1825

    
1826
    # exclusive_storage wants all PVs to have the same size (approximately),
1827
    # if the smallest and the biggest ones are okay, everything is fine.
1828
    # pv_min is None iff pv_max is None
1829
    vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1830
    if not vals:
1831
      return
1832
    (pvmin, minnode_uuid) = min((ni.pv_min, ni.uuid) for ni in vals)
1833
    (pvmax, maxnode_uuid) = max((ni.pv_max, ni.uuid) for ni in vals)
1834
    bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1835
    self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1836
                  "PV sizes differ too much in the group; smallest (%s MB) is"
1837
                  " on %s, biggest (%s MB) is on %s",
1838
                  pvmin, self.cfg.GetNodeName(minnode_uuid),
1839
                  pvmax, self.cfg.GetNodeName(maxnode_uuid))
1840

    
1841
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1842
    """Check the node bridges.
1843

1844
    @type ninfo: L{objects.Node}
1845
    @param ninfo: the node to check
1846
    @param nresult: the remote results for the node
1847
    @param bridges: the expected list of bridges
1848

1849
    """
1850
    if not bridges:
1851
      return
1852

    
1853
    missing = nresult.get(constants.NV_BRIDGES, None)
1854
    test = not isinstance(missing, list)
1855
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1856
                  "did not return valid bridge information")
1857
    if not test:
1858
      self._ErrorIf(bool(missing), constants.CV_ENODENET, ninfo.name,
1859
                    "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1860

    
1861
  def _VerifyNodeUserScripts(self, ninfo, nresult):
1862
    """Check the results of user scripts presence and executability on the node
1863

1864
    @type ninfo: L{objects.Node}
1865
    @param ninfo: the node to check
1866
    @param nresult: the remote results for the node
1867

1868
    """
1869
    test = not constants.NV_USERSCRIPTS in nresult
1870
    self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1871
                  "did not return user scripts information")
1872

    
1873
    broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
1874
    if not test:
1875
      self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1876
                    "user scripts not present or not executable: %s" %
1877
                    utils.CommaJoin(sorted(broken_scripts)))
1878

    
1879
  def _VerifyNodeNetwork(self, ninfo, nresult):
1880
    """Check the node network connectivity results.
1881

1882
    @type ninfo: L{objects.Node}
1883
    @param ninfo: the node to check
1884
    @param nresult: the remote results for the node
1885

1886
    """
1887
    test = constants.NV_NODELIST not in nresult
1888
    self._ErrorIf(test, constants.CV_ENODESSH, ninfo.name,
1889
                  "node hasn't returned node ssh connectivity data")
1890
    if not test:
1891
      if nresult[constants.NV_NODELIST]:
1892
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1893
          self._ErrorIf(True, constants.CV_ENODESSH, ninfo.name,
1894
                        "ssh communication with node '%s': %s", a_node, a_msg)
1895

    
1896
    test = constants.NV_NODENETTEST not in nresult
1897
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1898
                  "node hasn't returned node tcp connectivity data")
1899
    if not test:
1900
      if nresult[constants.NV_NODENETTEST]:
1901
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1902
        for anode in nlist:
1903
          self._ErrorIf(True, constants.CV_ENODENET, ninfo.name,
1904
                        "tcp communication with node '%s': %s",
1905
                        anode, nresult[constants.NV_NODENETTEST][anode])
1906

    
1907
    test = constants.NV_MASTERIP not in nresult
1908
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1909
                  "node hasn't returned node master IP reachability data")
1910
    if not test:
1911
      if not nresult[constants.NV_MASTERIP]:
1912
        if ninfo.uuid == self.master_node:
1913
          msg = "the master node cannot reach the master IP (not configured?)"
1914
        else:
1915
          msg = "cannot reach the master IP"
1916
        self._ErrorIf(True, constants.CV_ENODENET, ninfo.name, msg)
1917

    
1918
  def _VerifyInstance(self, instance, node_image, diskstatus):
1919
    """Verify an instance.
1920

1921
    This function checks to see if the required block devices are
1922
    available on the instance's node, and that the nodes are in the correct
1923
    state.
1924

1925
    """
1926
    pnode_uuid = instance.primary_node
1927
    pnode_img = node_image[pnode_uuid]
1928
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
1929

    
1930
    node_vol_should = {}
1931
    instance.MapLVsByNode(node_vol_should)
1932

    
1933
    cluster = self.cfg.GetClusterInfo()
1934
    ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
1935
                                                            self.group_info)
1936
    err = ComputeIPolicyInstanceViolation(ipolicy, instance, self.cfg)
1937
    self._ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance.name,
1938
                  utils.CommaJoin(err), code=self.ETYPE_WARNING)
1939

    
1940
    for node_uuid in node_vol_should:
1941
      n_img = node_image[node_uuid]
1942
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1943
        # ignore missing volumes on offline or broken nodes
1944
        continue
1945
      for volume in node_vol_should[node_uuid]:
1946
        test = volume not in n_img.volumes
1947
        self._ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance.name,
1948
                      "volume %s missing on node %s", volume,
1949
                      self.cfg.GetNodeName(node_uuid))
1950

    
1951
    if instance.admin_state == constants.ADMINST_UP:
1952
      test = instance.uuid not in pnode_img.instances and not pnode_img.offline
1953
      self._ErrorIf(test, constants.CV_EINSTANCEDOWN, instance.name,
1954
                    "instance not running on its primary node %s",
1955
                     self.cfg.GetNodeName(pnode_uuid))
1956
      self._ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE,
1957
                    instance.name, "instance is marked as running and lives on"
1958
                    " offline node %s", self.cfg.GetNodeName(pnode_uuid))
1959

    
1960
    diskdata = [(nname, success, status, idx)
1961
                for (nname, disks) in diskstatus.items()
1962
                for idx, (success, status) in enumerate(disks)]
1963

    
1964
    for nname, success, bdev_status, idx in diskdata:
1965
      # the 'ghost node' construction in Exec() ensures that we have a
1966
      # node here
1967
      snode = node_image[nname]
1968
      bad_snode = snode.ghost or snode.offline
1969
      self._ErrorIf(instance.disks_active and
1970
                    not success and not bad_snode,
1971
                    constants.CV_EINSTANCEFAULTYDISK, instance.name,
1972
                    "couldn't retrieve status for disk/%s on %s: %s",
1973
                    idx, self.cfg.GetNodeName(nname), bdev_status)
1974

    
1975
      if instance.disks_active and success and \
1976
         (bdev_status.is_degraded or
1977
          bdev_status.ldisk_status != constants.LDS_OKAY):
1978
        msg = "disk/%s on %s" % (idx, self.cfg.GetNodeName(nname))
1979
        if bdev_status.is_degraded:
1980
          msg += " is degraded"
1981
        if bdev_status.ldisk_status != constants.LDS_OKAY:
1982
          msg += "; state is '%s'" % \
1983
                 constants.LDS_NAMES[bdev_status.ldisk_status]
1984

    
1985
        self._Error(constants.CV_EINSTANCEFAULTYDISK, instance.name, msg)
1986

    
1987
    self._ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
1988
                  constants.CV_ENODERPC, self.cfg.GetNodeName(pnode_uuid),
1989
                  "instance %s, connection to primary node failed",
1990
                  instance.name)
1991

    
1992
    self._ErrorIf(len(instance.secondary_nodes) > 1,
1993
                  constants.CV_EINSTANCELAYOUT, instance.name,
1994
                  "instance has multiple secondary nodes: %s",
1995
                  utils.CommaJoin(instance.secondary_nodes),
1996
                  code=self.ETYPE_WARNING)
1997

    
1998
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, instance.all_nodes)
1999
    if any(es_flags.values()):
2000
      if instance.disk_template not in constants.DTS_EXCL_STORAGE:
2001
        # Disk template not compatible with exclusive_storage: no instance
2002
        # node should have the flag set
2003
        es_nodes = [n
2004
                    for (n, es) in es_flags.items()
2005
                    if es]
2006
        self._Error(constants.CV_EINSTANCEUNSUITABLENODE, instance.name,
2007
                    "instance has template %s, which is not supported on nodes"
2008
                    " that have exclusive storage set: %s",
2009
                    instance.disk_template,
2010
                    utils.CommaJoin(self.cfg.GetNodeNames(es_nodes)))
2011
      for (idx, disk) in enumerate(instance.disks):
2012
        self._ErrorIf(disk.spindles is None,
2013
                      constants.CV_EINSTANCEMISSINGCFGPARAMETER, instance.name,
2014
                      "number of spindles not configured for disk %s while"
2015
                      " exclusive storage is enabled, try running"
2016
                      " gnt-cluster repair-disk-sizes", idx)
2017

    
2018
    if instance.disk_template in constants.DTS_INT_MIRROR:
2019
      instance_nodes = utils.NiceSort(instance.all_nodes)
2020
      instance_groups = {}
2021

    
2022
      for node_uuid in instance_nodes:
2023
        instance_groups.setdefault(self.all_node_info[node_uuid].group,
2024
                                   []).append(node_uuid)
2025

    
2026
      pretty_list = [
2027
        "%s (group %s)" % (utils.CommaJoin(self.cfg.GetNodeNames(nodes)),
2028
                           groupinfo[group].name)
2029
        # Sort so that we always list the primary node first.
2030
        for group, nodes in sorted(instance_groups.items(),
2031
                                   key=lambda (_, nodes): pnode_uuid in nodes,
2032
                                   reverse=True)]
2033

    
2034
      self._ErrorIf(len(instance_groups) > 1,
2035
                    constants.CV_EINSTANCESPLITGROUPS,
2036
                    instance.name, "instance has primary and secondary nodes in"
2037
                    " different groups: %s", utils.CommaJoin(pretty_list),
2038
                    code=self.ETYPE_WARNING)
2039

    
2040
    inst_nodes_offline = []
2041
    for snode in instance.secondary_nodes:
2042
      s_img = node_image[snode]
2043
      self._ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
2044
                    self.cfg.GetNodeName(snode),
2045
                    "instance %s, connection to secondary node failed",
2046
                    instance.name)
2047

    
2048
      if s_img.offline:
2049
        inst_nodes_offline.append(snode)
2050

    
2051
    # warn that the instance lives on offline nodes
2052
    self._ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE,
2053
                  instance.name, "instance has offline secondary node(s) %s",
2054
                  utils.CommaJoin(self.cfg.GetNodeNames(inst_nodes_offline)))
2055
    # ... or ghost/non-vm_capable nodes
2056
    for node_uuid in instance.all_nodes:
2057
      self._ErrorIf(node_image[node_uuid].ghost, constants.CV_EINSTANCEBADNODE,
2058
                    instance.name, "instance lives on ghost node %s",
2059
                    self.cfg.GetNodeName(node_uuid))
2060
      self._ErrorIf(not node_image[node_uuid].vm_capable,
2061
                    constants.CV_EINSTANCEBADNODE, instance.name,
2062
                    "instance lives on non-vm_capable node %s",
2063
                    self.cfg.GetNodeName(node_uuid))
2064

    
2065
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2066
    """Verify if there are any unknown volumes in the cluster.
2067

2068
    The .os, .swap and backup volumes are ignored. All other volumes are
2069
    reported as unknown.
2070

2071
    @type reserved: L{ganeti.utils.FieldSet}
2072
    @param reserved: a FieldSet of reserved volume names
2073

2074
    """
2075
    for node_uuid, n_img in node_image.items():
2076
      if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
2077
          self.all_node_info[node_uuid].group != self.group_uuid):
2078
        # skip non-healthy nodes
2079
        continue
2080
      for volume in n_img.volumes:
2081
        test = ((node_uuid not in node_vol_should or
2082
                volume not in node_vol_should[node_uuid]) and
2083
                not reserved.Matches(volume))
2084
        self._ErrorIf(test, constants.CV_ENODEORPHANLV,
2085
                      self.cfg.GetNodeName(node_uuid),
2086
                      "volume %s is unknown", volume)
2087

    
2088
  def _VerifyNPlusOneMemory(self, node_image, all_insts):
2089
    """Verify N+1 Memory Resilience.
2090

2091
    Check that if one single node dies we can still start all the
2092
    instances it was primary for.
2093

2094
    """
2095
    cluster_info = self.cfg.GetClusterInfo()
2096
    for node_uuid, n_img in node_image.items():
2097
      # This code checks that every node which is now listed as
2098
      # secondary has enough memory to host all instances it is
2099
      # supposed to should a single other node in the cluster fail.
2100
      # FIXME: not ready for failover to an arbitrary node
2101
      # FIXME: does not support file-backed instances
2102
      # WARNING: we currently take into account down instances as well
2103
      # as up ones, considering that even if they're down someone
2104
      # might want to start them even in the event of a node failure.
2105
      if n_img.offline or \
2106
         self.all_node_info[node_uuid].group != self.group_uuid:
2107
        # we're skipping nodes marked offline and nodes in other groups from
2108
        # the N+1 warning, since most likely we don't have good memory
2109
        # infromation from them; we already list instances living on such
2110
        # nodes, and that's enough warning
2111
        continue
2112
      #TODO(dynmem): also consider ballooning out other instances
2113
      for prinode, inst_uuids in n_img.sbp.items():
2114
        needed_mem = 0
2115
        for inst_uuid in inst_uuids:
2116
          bep = cluster_info.FillBE(all_insts[inst_uuid])
2117
          if bep[constants.BE_AUTO_BALANCE]:
2118
            needed_mem += bep[constants.BE_MINMEM]
2119
        test = n_img.mfree < needed_mem
2120
        self._ErrorIf(test, constants.CV_ENODEN1,
2121
                      self.cfg.GetNodeName(node_uuid),
2122
                      "not enough memory to accomodate instance failovers"
2123
                      " should node %s fail (%dMiB needed, %dMiB available)",
2124
                      self.cfg.GetNodeName(prinode), needed_mem, n_img.mfree)
2125

    
2126
  def _VerifyFiles(self, nodes, master_node_uuid, all_nvinfo,
2127
                   (files_all, files_opt, files_mc, files_vm)):
2128
    """Verifies file checksums collected from all nodes.
2129

2130
    @param nodes: List of L{objects.Node} objects
2131
    @param master_node_uuid: UUID of master node
2132
    @param all_nvinfo: RPC results
2133

2134
    """
2135
    # Define functions determining which nodes to consider for a file
2136
    files2nodefn = [
2137
      (files_all, None),
2138
      (files_mc, lambda node: (node.master_candidate or
2139
                               node.uuid == master_node_uuid)),
2140
      (files_vm, lambda node: node.vm_capable),
2141
      ]
2142

    
2143
    # Build mapping from filename to list of nodes which should have the file
2144
    nodefiles = {}
2145
    for (files, fn) in files2nodefn:
2146
      if fn is None:
2147
        filenodes = nodes
2148
      else:
2149
        filenodes = filter(fn, nodes)
2150
      nodefiles.update((filename,
2151
                        frozenset(map(operator.attrgetter("uuid"), filenodes)))
2152
                       for filename in files)
2153

    
2154
    assert set(nodefiles) == (files_all | files_mc | files_vm)
2155

    
2156
    fileinfo = dict((filename, {}) for filename in nodefiles)
2157
    ignore_nodes = set()
2158

    
2159
    for node in nodes:
2160
      if node.offline:
2161
        ignore_nodes.add(node.uuid)
2162
        continue
2163

    
2164
      nresult = all_nvinfo[node.uuid]
2165

    
2166
      if nresult.fail_msg or not nresult.payload:
2167
        node_files = None
2168
      else:
2169
        fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
2170
        node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2171
                          for (key, value) in fingerprints.items())
2172
        del fingerprints
2173

    
2174
      test = not (node_files and isinstance(node_files, dict))
2175
      self._ErrorIf(test, constants.CV_ENODEFILECHECK, node.name,
2176
                    "Node did not return file checksum data")
2177
      if test:
2178
        ignore_nodes.add(node.uuid)
2179
        continue
2180

    
2181
      # Build per-checksum mapping from filename to nodes having it
2182
      for (filename, checksum) in node_files.items():
2183
        assert filename in nodefiles
2184
        fileinfo[filename].setdefault(checksum, set()).add(node.uuid)
2185

    
2186
    for (filename, checksums) in fileinfo.items():
2187
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2188

    
2189
      # Nodes having the file
2190
      with_file = frozenset(node_uuid
2191
                            for node_uuids in fileinfo[filename].values()
2192
                            for node_uuid in node_uuids) - ignore_nodes
2193

    
2194
      expected_nodes = nodefiles[filename] - ignore_nodes
2195

    
2196
      # Nodes missing file
2197
      missing_file = expected_nodes - with_file
2198

    
2199
      if filename in files_opt:
2200
        # All or no nodes
2201
        self._ErrorIf(missing_file and missing_file != expected_nodes,
2202
                      constants.CV_ECLUSTERFILECHECK, None,
2203
                      "File %s is optional, but it must exist on all or no"
2204
                      " nodes (not found on %s)",
2205
                      filename,
2206
                      utils.CommaJoin(
2207
                        utils.NiceSort(
2208
                          map(self.cfg.GetNodeName, missing_file))))
2209
      else:
2210
        self._ErrorIf(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2211
                      "File %s is missing from node(s) %s", filename,
2212
                      utils.CommaJoin(
2213
                        utils.NiceSort(
2214
                          map(self.cfg.GetNodeName, missing_file))))
2215

    
2216
        # Warn if a node has a file it shouldn't
2217
        unexpected = with_file - expected_nodes
2218
        self._ErrorIf(unexpected,
2219
                      constants.CV_ECLUSTERFILECHECK, None,
2220
                      "File %s should not exist on node(s) %s",
2221
                      filename, utils.CommaJoin(
2222
                        utils.NiceSort(map(self.cfg.GetNodeName, unexpected))))
2223

    
2224
      # See if there are multiple versions of the file
2225
      test = len(checksums) > 1
2226
      if test:
2227
        variants = ["variant %s on %s" %
2228
                    (idx + 1,
2229
                     utils.CommaJoin(utils.NiceSort(
2230
                       map(self.cfg.GetNodeName, node_uuids))))
2231
                    for (idx, (checksum, node_uuids)) in
2232
                      enumerate(sorted(checksums.items()))]
2233
      else:
2234
        variants = []
2235

    
2236
      self._ErrorIf(test, constants.CV_ECLUSTERFILECHECK, None,
2237
                    "File %s found with %s different checksums (%s)",
2238
                    filename, len(checksums), "; ".join(variants))
2239

    
2240
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2241
                      drbd_map):
2242
    """Verifies and the node DRBD status.
2243

2244
    @type ninfo: L{objects.Node}
2245
    @param ninfo: the node to check
2246
    @param nresult: the remote results for the node
2247
    @param instanceinfo: the dict of instances
2248
    @param drbd_helper: the configured DRBD usermode helper
2249
    @param drbd_map: the DRBD map as returned by
2250
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2251

2252
    """
2253
    if drbd_helper:
2254
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2255
      test = (helper_result is None)
2256
      self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2257
                    "no drbd usermode helper returned")
2258
      if helper_result:
2259
        status, payload = helper_result
2260
        test = not status
2261
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2262
                      "drbd usermode helper check unsuccessful: %s", payload)
2263
        test = status and (payload != drbd_helper)
2264
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2265
                      "wrong drbd usermode helper: %s", payload)
2266

    
2267
    # compute the DRBD minors
2268
    node_drbd = {}
2269
    for minor, inst_uuid in drbd_map[ninfo.uuid].items():
2270
      test = inst_uuid not in instanceinfo
2271
      self._ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2272
                    "ghost instance '%s' in temporary DRBD map", inst_uuid)
2273
        # ghost instance should not be running, but otherwise we
2274
        # don't give double warnings (both ghost instance and
2275
        # unallocated minor in use)
2276
      if test:
2277
        node_drbd[minor] = (inst_uuid, False)
2278
      else:
2279
        instance = instanceinfo[inst_uuid]
2280
        node_drbd[minor] = (inst_uuid, instance.disks_active)
2281

    
2282
    # and now check them
2283
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2284
    test = not isinstance(used_minors, (tuple, list))
2285
    self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2286
                  "cannot parse drbd status file: %s", str(used_minors))
2287
    if test:
2288
      # we cannot check drbd status
2289
      return
2290

    
2291
    for minor, (inst_uuid, must_exist) in node_drbd.items():
2292
      test = minor not in used_minors and must_exist
2293
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2294
                    "drbd minor %d of instance %s is not active", minor,
2295
                    self.cfg.GetInstanceName(inst_uuid))
2296
    for minor in used_minors:
2297
      test = minor not in node_drbd
2298
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2299
                    "unallocated drbd minor %d is in use", minor)
2300

    
2301
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2302
    """Builds the node OS structures.
2303

2304
    @type ninfo: L{objects.Node}
2305
    @param ninfo: the node to check
2306
    @param nresult: the remote results for the node
2307
    @param nimg: the node image object
2308

2309
    """
2310
    remote_os = nresult.get(constants.NV_OSLIST, None)
2311
    test = (not isinstance(remote_os, list) or
2312
            not compat.all(isinstance(v, list) and len(v) == 7
2313
                           for v in remote_os))
2314

    
2315
    self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2316
                  "node hasn't returned valid OS data")
2317

    
2318
    nimg.os_fail = test
2319

    
2320
    if test:
2321
      return
2322

    
2323
    os_dict = {}
2324

    
2325
    for (name, os_path, status, diagnose,
2326
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2327

    
2328
      if name not in os_dict:
2329
        os_dict[name] = []
2330

    
2331
      # parameters is a list of lists instead of list of tuples due to
2332
      # JSON lacking a real tuple type, fix it:
2333
      parameters = [tuple(v) for v in parameters]
2334
      os_dict[name].append((os_path, status, diagnose,
2335
                            set(variants), set(parameters), set(api_ver)))
2336

    
2337
    nimg.oslist = os_dict
2338

    
2339
  def _VerifyNodeOS(self, ninfo, nimg, base):
2340
    """Verifies the node OS list.
2341

2342
    @type ninfo: L{objects.Node}
2343
    @param ninfo: the node to check
2344
    @param nimg: the node image object
2345
    @param base: the 'template' node we match against (e.g. from the master)
2346

2347
    """
2348
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2349

    
2350
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2351
    for os_name, os_data in nimg.oslist.items():
2352
      assert os_data, "Empty OS status for OS %s?!" % os_name
2353
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2354
      self._ErrorIf(not f_status, constants.CV_ENODEOS, ninfo.name,
2355
                    "Invalid OS %s (located at %s): %s",
2356
                    os_name, f_path, f_diag)
2357
      self._ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, ninfo.name,
2358
                    "OS '%s' has multiple entries"
2359
                    " (first one shadows the rest): %s",
2360
                    os_name, utils.CommaJoin([v[0] for v in os_data]))
2361
      # comparisons with the 'base' image
2362
      test = os_name not in base.oslist
2363
      self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2364
                    "Extra OS %s not present on reference node (%s)",
2365
                    os_name, self.cfg.GetNodeName(base.uuid))
2366
      if test:
2367
        continue
2368
      assert base.oslist[os_name], "Base node has empty OS status?"
2369
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2370
      if not b_status:
2371
        # base OS is invalid, skipping
2372
        continue
2373
      for kind, a, b in [("API version", f_api, b_api),
2374
                         ("variants list", f_var, b_var),
2375
                         ("parameters", beautify_params(f_param),
2376
                          beautify_params(b_param))]:
2377
        self._ErrorIf(a != b, constants.CV_ENODEOS, ninfo.name,
2378
                      "OS %s for %s differs from reference node %s:"
2379
                      " [%s] vs. [%s]", kind, os_name,
2380
                      self.cfg.GetNodeName(base.uuid),
2381
                      utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2382

    
2383
    # check any missing OSes
2384
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2385
    self._ErrorIf(missing, constants.CV_ENODEOS, ninfo.name,
2386
                  "OSes present on reference node %s"
2387
                  " but missing on this node: %s",
2388
                  self.cfg.GetNodeName(base.uuid), utils.CommaJoin(missing))
2389

    
2390
  def _VerifyAcceptedFileStoragePaths(self, ninfo, nresult, is_master):
2391
    """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2392

2393
    @type ninfo: L{objects.Node}
2394
    @param ninfo: the node to check
2395
    @param nresult: the remote results for the node
2396
    @type is_master: bool
2397
    @param is_master: Whether node is the master node
2398

2399
    """
2400
    cluster = self.cfg.GetClusterInfo()
2401
    if (is_master and
2402
        (cluster.IsFileStorageEnabled() or
2403
         cluster.IsSharedFileStorageEnabled())):
2404
      try:
2405
        fspaths = nresult[constants.NV_ACCEPTED_STORAGE_PATHS]
2406
      except KeyError:
2407
        # This should never happen
2408
        self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2409
                      "Node did not return forbidden file storage paths")
2410
      else:
2411
        self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2412
                      "Found forbidden file storage paths: %s",
2413
                      utils.CommaJoin(fspaths))
2414
    else:
2415
      self._ErrorIf(constants.NV_ACCEPTED_STORAGE_PATHS in nresult,
2416
                    constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2417
                    "Node should not have returned forbidden file storage"
2418
                    " paths")
2419

    
2420
  def _VerifyStoragePaths(self, ninfo, nresult, file_disk_template,
2421
                          verify_key, error_key):
2422
    """Verifies (file) storage paths.
2423

2424
    @type ninfo: L{objects.Node}
2425
    @param ninfo: the node to check
2426
    @param nresult: the remote results for the node
2427
    @type file_disk_template: string
2428
    @param file_disk_template: file-based disk template, whose directory
2429
        is supposed to be verified
2430
    @type verify_key: string
2431
    @param verify_key: key for the verification map of this file
2432
        verification step
2433
    @param error_key: error key to be added to the verification results
2434
        in case something goes wrong in this verification step
2435

2436
    """
2437
    assert (file_disk_template in
2438
            utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
2439
    cluster = self.cfg.GetClusterInfo()
2440
    if cluster.IsDiskTemplateEnabled(file_disk_template):
2441
      self._ErrorIf(
2442
          verify_key in nresult,
2443
          error_key, ninfo.name,
2444
          "The configured %s storage path is unusable: %s" %
2445
          (file_disk_template, nresult.get(verify_key)))
2446

    
2447
  def _VerifyFileStoragePaths(self, ninfo, nresult):
2448
    """Verifies (file) storage paths.
2449

2450
    @see: C{_VerifyStoragePaths}
2451

2452
    """
2453
    self._VerifyStoragePaths(
2454
        ninfo, nresult, constants.DT_FILE,
2455
        constants.NV_FILE_STORAGE_PATH,
2456
        constants.CV_ENODEFILESTORAGEPATHUNUSABLE)
2457

    
2458
  def _VerifySharedFileStoragePaths(self, ninfo, nresult):
2459
    """Verifies (file) storage paths.
2460

2461
    @see: C{_VerifyStoragePaths}
2462

2463
    """
2464
    self._VerifyStoragePaths(
2465
        ninfo, nresult, constants.DT_SHARED_FILE,
2466
        constants.NV_SHARED_FILE_STORAGE_PATH,
2467
        constants.CV_ENODESHAREDFILESTORAGEPATHUNUSABLE)
2468

    
2469
  def _VerifyOob(self, ninfo, nresult):
2470
    """Verifies out of band functionality of a node.
2471

2472
    @type ninfo: L{objects.Node}
2473
    @param ninfo: the node to check
2474
    @param nresult: the remote results for the node
2475

2476
    """
2477
    # We just have to verify the paths on master and/or master candidates
2478
    # as the oob helper is invoked on the master
2479
    if ((ninfo.master_candidate or ninfo.master_capable) and
2480
        constants.NV_OOB_PATHS in nresult):
2481
      for path_result in nresult[constants.NV_OOB_PATHS]:
2482
        self._ErrorIf(path_result, constants.CV_ENODEOOBPATH,
2483
                      ninfo.name, path_result)
2484

    
2485
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2486
    """Verifies and updates the node volume data.
2487

2488
    This function will update a L{NodeImage}'s internal structures
2489
    with data from the remote call.
2490

2491
    @type ninfo: L{objects.Node}
2492
    @param ninfo: the node to check
2493
    @param nresult: the remote results for the node
2494
    @param nimg: the node image object
2495
    @param vg_name: the configured VG name
2496

2497
    """
2498
    nimg.lvm_fail = True
2499
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2500
    if vg_name is None:
2501
      pass
2502
    elif isinstance(lvdata, basestring):
2503
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2504
                    "LVM problem on node: %s", utils.SafeEncode(lvdata))
2505
    elif not isinstance(lvdata, dict):
2506
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2507
                    "rpc call to node failed (lvlist)")
2508
    else:
2509
      nimg.volumes = lvdata
2510
      nimg.lvm_fail = False
2511

    
2512
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2513
    """Verifies and updates the node instance list.
2514

2515
    If the listing was successful, then updates this node's instance
2516
    list. Otherwise, it marks the RPC call as failed for the instance
2517
    list key.
2518

2519
    @type ninfo: L{objects.Node}
2520
    @param ninfo: the node to check
2521
    @param nresult: the remote results for the node
2522
    @param nimg: the node image object
2523

2524
    """
2525
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2526
    test = not isinstance(idata, list)
2527
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2528
                  "rpc call to node failed (instancelist): %s",
2529
                  utils.SafeEncode(str(idata)))
2530
    if test:
2531
      nimg.hyp_fail = True
2532
    else:
2533
      nimg.instances = [inst.uuid for (_, inst) in
2534
                        self.cfg.GetMultiInstanceInfoByName(idata)]
2535

    
2536
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2537
    """Verifies and computes a node information map
2538

2539
    @type ninfo: L{objects.Node}
2540
    @param ninfo: the node to check
2541
    @param nresult: the remote results for the node
2542
    @param nimg: the node image object
2543
    @param vg_name: the configured VG name
2544

2545
    """
2546
    # try to read free memory (from the hypervisor)
2547
    hv_info = nresult.get(constants.NV_HVINFO, None)
2548
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2549
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2550
                  "rpc call to node failed (hvinfo)")
2551
    if not test:
2552
      try:
2553
        nimg.mfree = int(hv_info["memory_free"])
2554
      except (ValueError, TypeError):
2555
        self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2556
                      "node returned invalid nodeinfo, check hypervisor")
2557

    
2558
    # FIXME: devise a free space model for file based instances as well
2559
    if vg_name is not None:
2560
      test = (constants.NV_VGLIST not in nresult or
2561
              vg_name not in nresult[constants.NV_VGLIST])
2562
      self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
2563
                    "node didn't return data for the volume group '%s'"
2564
                    " - it is either missing or broken", vg_name)
2565
      if not test:
2566
        try:
2567
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2568
        except (ValueError, TypeError):
2569
          self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2570
                        "node returned invalid LVM info, check LVM status")
2571

    
2572
  def _CollectDiskInfo(self, node_uuids, node_image, instanceinfo):
2573
    """Gets per-disk status information for all instances.
2574

2575
    @type node_uuids: list of strings
2576
    @param node_uuids: Node UUIDs
2577
    @type node_image: dict of (UUID, L{objects.Node})
2578
    @param node_image: Node objects
2579
    @type instanceinfo: dict of (UUID, L{objects.Instance})
2580
    @param instanceinfo: Instance objects
2581
    @rtype: {instance: {node: [(succes, payload)]}}
2582
    @return: a dictionary of per-instance dictionaries with nodes as
2583
        keys and disk information as values; the disk information is a
2584
        list of tuples (success, payload)
2585

2586
    """
2587
    node_disks = {}
2588
    node_disks_devonly = {}
2589
    diskless_instances = set()
2590
    diskless = constants.DT_DISKLESS
2591

    
2592
    for nuuid in node_uuids:
2593
      node_inst_uuids = list(itertools.chain(node_image[nuuid].pinst,
2594
                                             node_image[nuuid].sinst))
2595
      diskless_instances.update(uuid for uuid in node_inst_uuids
2596
                                if instanceinfo[uuid].disk_template == diskless)
2597
      disks = [(inst_uuid, disk)
2598
               for inst_uuid in node_inst_uuids
2599
               for disk in instanceinfo[inst_uuid].disks]
2600

    
2601
      if not disks:
2602
        # No need to collect data
2603
        continue
2604

    
2605
      node_disks[nuuid] = disks
2606

    
2607
      # _AnnotateDiskParams makes already copies of the disks
2608
      devonly = []
2609
      for (inst_uuid, dev) in disks:
2610
        (anno_disk,) = AnnotateDiskParams(instanceinfo[inst_uuid], [dev],
2611
                                          self.cfg)
2612
        self.cfg.SetDiskID(anno_disk, nuuid)
2613
        devonly.append(anno_disk)
2614

    
2615
      node_disks_devonly[nuuid] = devonly
2616

    
2617
    assert len(node_disks) == len(node_disks_devonly)
2618

    
2619
    # Collect data from all nodes with disks
2620
    result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2621
                                                          node_disks_devonly)
2622

    
2623
    assert len(result) == len(node_disks)
2624

    
2625
    instdisk = {}
2626

    
2627
    for (nuuid, nres) in result.items():
2628
      node = self.cfg.GetNodeInfo(nuuid)
2629
      disks = node_disks[node.uuid]
2630

    
2631
      if nres.offline:
2632
        # No data from this node
2633
        data = len(disks) * [(False, "node offline")]
2634
      else:
2635
        msg = nres.fail_msg
2636
        self._ErrorIf(msg, constants.CV_ENODERPC, node.name,
2637
                      "while getting disk information: %s", msg)
2638
        if msg:
2639
          # No data from this node
2640
          data = len(disks) * [(False, msg)]
2641
        else:
2642
          data = []
2643
          for idx, i in enumerate(nres.payload):
2644
            if isinstance(i, (tuple, list)) and len(i) == 2:
2645
              data.append(i)
2646
            else:
2647
              logging.warning("Invalid result from node %s, entry %d: %s",
2648
                              node.name, idx, i)
2649
              data.append((False, "Invalid result from the remote node"))
2650

    
2651
      for ((inst_uuid, _), status) in zip(disks, data):
2652
        instdisk.setdefault(inst_uuid, {}).setdefault(node.uuid, []) \
2653
          .append(status)
2654

    
2655
    # Add empty entries for diskless instances.
2656
    for inst_uuid in diskless_instances:
2657
      assert inst_uuid not in instdisk
2658
      instdisk[inst_uuid] = {}
2659

    
2660
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2661
                      len(nuuids) <= len(instanceinfo[inst].all_nodes) and
2662
                      compat.all(isinstance(s, (tuple, list)) and
2663
                                 len(s) == 2 for s in statuses)
2664
                      for inst, nuuids in instdisk.items()
2665
                      for nuuid, statuses in nuuids.items())
2666
    if __debug__:
2667
      instdisk_keys = set(instdisk)
2668
      instanceinfo_keys = set(instanceinfo)
2669
      assert instdisk_keys == instanceinfo_keys, \
2670
        ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2671
         (instdisk_keys, instanceinfo_keys))
2672

    
2673
    return instdisk
2674

    
2675
  @staticmethod
2676
  def _SshNodeSelector(group_uuid, all_nodes):
2677
    """Create endless iterators for all potential SSH check hosts.
2678

2679
    """
2680
    nodes = [node for node in all_nodes
2681
             if (node.group != group_uuid and
2682
                 not node.offline)]
2683
    keyfunc = operator.attrgetter("group")
2684

    
2685
    return map(itertools.cycle,
2686
               [sorted(map(operator.attrgetter("name"), names))
2687
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2688
                                                  keyfunc)])
2689

    
2690
  @classmethod
2691
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2692
    """Choose which nodes should talk to which other nodes.
2693

2694
    We will make nodes contact all nodes in their group, and one node from
2695
    every other group.
2696

2697
    @warning: This algorithm has a known issue if one node group is much
2698
      smaller than others (e.g. just one node). In such a case all other
2699
      nodes will talk to the single node.
2700

2701
    """
2702
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2703
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
2704

    
2705
    return (online_nodes,
2706
            dict((name, sorted([i.next() for i in sel]))
2707
                 for name in online_nodes))
2708

    
2709
  def BuildHooksEnv(self):
2710
    """Build hooks env.
2711

2712
    Cluster-Verify hooks just ran in the post phase and their failure makes
2713
    the output be logged in the verify output and the verification to fail.
2714

2715
    """
2716
    env = {
2717
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2718
      }
2719

    
2720
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2721
               for node in self.my_node_info.values())
2722

    
2723
    return env
2724

    
2725
  def BuildHooksNodes(self):
2726
    """Build hooks nodes.
2727

2728
    """
2729
    return ([], list(self.my_node_info.keys()))
2730

    
2731
  def Exec(self, feedback_fn):
2732
    """Verify integrity of the node group, performing various test on nodes.
2733

2734
    """
2735
    # This method has too many local variables. pylint: disable=R0914
2736
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
2737

    
2738
    if not self.my_node_uuids:
2739
      # empty node group
2740
      feedback_fn("* Empty node group, skipping verification")
2741
      return True
2742

    
2743
    self.bad = False
2744
    verbose = self.op.verbose
2745
    self._feedback_fn = feedback_fn
2746

    
2747
    vg_name = self.cfg.GetVGName()
2748
    drbd_helper = self.cfg.GetDRBDHelper()
2749
    cluster = self.cfg.GetClusterInfo()
2750
    hypervisors = cluster.enabled_hypervisors
2751
    node_data_list = self.my_node_info.values()
2752

    
2753
    i_non_redundant = [] # Non redundant instances
2754
    i_non_a_balanced = [] # Non auto-balanced instances
2755
    i_offline = 0 # Count of offline instances
2756
    n_offline = 0 # Count of offline nodes
2757
    n_drained = 0 # Count of nodes being drained
2758
    node_vol_should = {}
2759

    
2760
    # FIXME: verify OS list
2761

    
2762
    # File verification
2763
    filemap = ComputeAncillaryFiles(cluster, False)
2764

    
2765
    # do local checksums
2766
    master_node_uuid = self.master_node = self.cfg.GetMasterNode()
2767
    master_ip = self.cfg.GetMasterIP()
2768

    
2769
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_uuids))
2770

    
2771
    user_scripts = []
2772
    if self.cfg.GetUseExternalMipScript():
2773
      user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2774

    
2775
    node_verify_param = {
2776
      constants.NV_FILELIST:
2777
        map(vcluster.MakeVirtualPath,
2778
            utils.UniqueSequence(filename
2779
                                 for files in filemap
2780
                                 for filename in files)),
2781
      constants.NV_NODELIST:
2782
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2783
                                  self.all_node_info.values()),
2784
      constants.NV_HYPERVISOR: hypervisors,
2785
      constants.NV_HVPARAMS:
2786
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2787
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2788
                                 for node in node_data_list
2789
                                 if not node.offline],
2790
      constants.NV_INSTANCELIST: hypervisors,
2791
      constants.NV_VERSION: None,
2792
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2793
      constants.NV_NODESETUP: None,
2794
      constants.NV_TIME: None,
2795
      constants.NV_MASTERIP: (self.cfg.GetMasterNodeName(), master_ip),
2796
      constants.NV_OSLIST: None,
2797
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2798
      constants.NV_USERSCRIPTS: user_scripts,
2799
      }
2800

    
2801
    if vg_name is not None:
2802
      node_verify_param[constants.NV_VGLIST] = None
2803
      node_verify_param[constants.NV_LVLIST] = vg_name
2804
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2805

    
2806
    if drbd_helper:
2807
      node_verify_param[constants.NV_DRBDVERSION] = None
2808
      node_verify_param[constants.NV_DRBDLIST] = None
2809
      node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2810

    
2811
    if cluster.IsFileStorageEnabled() or \
2812
        cluster.IsSharedFileStorageEnabled():
2813
      # Load file storage paths only from master node
2814
      node_verify_param[constants.NV_ACCEPTED_STORAGE_PATHS] = \
2815
        self.cfg.GetMasterNodeName()
2816
      if cluster.IsFileStorageEnabled():
2817
        node_verify_param[constants.NV_FILE_STORAGE_PATH] = \
2818
          cluster.file_storage_dir
2819

    
2820
    # bridge checks
2821
    # FIXME: this needs to be changed per node-group, not cluster-wide
2822
    bridges = set()
2823
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2824
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2825
      bridges.add(default_nicpp[constants.NIC_LINK])
2826
    for inst_uuid in self.my_inst_info.values():
2827
      for nic in inst_uuid.nics:
2828
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
2829
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2830
          bridges.add(full_nic[constants.NIC_LINK])
2831

    
2832
    if bridges:
2833
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
2834

    
2835
    # Build our expected cluster state
2836
    node_image = dict((node.uuid, self.NodeImage(offline=node.offline,
2837
                                                 uuid=node.uuid,
2838
                                                 vm_capable=node.vm_capable))
2839
                      for node in node_data_list)
2840

    
2841
    # Gather OOB paths
2842
    oob_paths = []
2843
    for node in self.all_node_info.values():
2844
      path = SupportsOob(self.cfg, node)
2845
      if path and path not in oob_paths:
2846
        oob_paths.append(path)
2847

    
2848
    if oob_paths:
2849
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2850

    
2851
    for inst_uuid in self.my_inst_uuids:
2852
      instance = self.my_inst_info[inst_uuid]
2853
      if instance.admin_state == constants.ADMINST_OFFLINE:
2854
        i_offline += 1
2855

    
2856
      for nuuid in instance.all_nodes:
2857
        if nuuid not in node_image:
2858
          gnode = self.NodeImage(uuid=nuuid)
2859
          gnode.ghost = (nuuid not in self.all_node_info)
2860
          node_image[nuuid] = gnode
2861

    
2862
      instance.MapLVsByNode(node_vol_should)
2863

    
2864
      pnode = instance.primary_node
2865
      node_image[pnode].pinst.append(instance.uuid)
2866

    
2867
      for snode in instance.secondary_nodes:
2868
        nimg = node_image[snode]
2869
        nimg.sinst.append(instance.uuid)
2870
        if pnode not in nimg.sbp:
2871
          nimg.sbp[pnode] = []
2872
        nimg.sbp[pnode].append(instance.uuid)
2873

    
2874
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
2875
                                               self.my_node_info.keys())
2876
    # The value of exclusive_storage should be the same across the group, so if
2877
    # it's True for at least a node, we act as if it were set for all the nodes
2878
    self._exclusive_storage = compat.any(es_flags.values())
2879
    if self._exclusive_storage:
2880
      node_verify_param[constants.NV_EXCLUSIVEPVS] = True
2881

    
2882
    # At this point, we have the in-memory data structures complete,
2883
    # except for the runtime information, which we'll gather next
2884

    
2885
    # Due to the way our RPC system works, exact response times cannot be
2886
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2887
    # time before and after executing the request, we can at least have a time
2888
    # window.
2889
    nvinfo_starttime = time.time()
2890
    all_nvinfo = self.rpc.call_node_verify(self.my_node_uuids,
2891
                                           node_verify_param,
2892
                                           self.cfg.GetClusterName(),
2893
                                           self.cfg.GetClusterInfo().hvparams)
2894
    nvinfo_endtime = time.time()
2895

    
2896
    if self.extra_lv_nodes and vg_name is not None:
2897
      extra_lv_nvinfo = \
2898
          self.rpc.call_node_verify(self.extra_lv_nodes,
2899
                                    {constants.NV_LVLIST: vg_name},
2900
                                    self.cfg.GetClusterName(),
2901
                                    self.cfg.GetClusterInfo().hvparams)
2902
    else:
2903
      extra_lv_nvinfo = {}
2904

    
2905
    all_drbd_map = self.cfg.ComputeDRBDMap()
2906

    
2907
    feedback_fn("* Gathering disk information (%s nodes)" %
2908
                len(self.my_node_uuids))
2909
    instdisk = self._CollectDiskInfo(self.my_node_info.keys(), node_image,
2910
                                     self.my_inst_info)
2911

    
2912
    feedback_fn("* Verifying configuration file consistency")
2913

    
2914
    # If not all nodes are being checked, we need to make sure the master node
2915
    # and a non-checked vm_capable node are in the list.
2916
    absent_node_uuids = set(self.all_node_info).difference(self.my_node_info)
2917
    if absent_node_uuids:
2918
      vf_nvinfo = all_nvinfo.copy()
2919
      vf_node_info = list(self.my_node_info.values())
2920
      additional_node_uuids = []
2921
      if master_node_uuid not in self.my_node_info:
2922
        additional_node_uuids.append(master_node_uuid)
2923
        vf_node_info.append(self.all_node_info[master_node_uuid])
2924
      # Add the first vm_capable node we find which is not included,
2925
      # excluding the master node (which we already have)
2926
      for node_uuid in absent_node_uuids:
2927
        nodeinfo = self.all_node_info[node_uuid]
2928
        if (nodeinfo.vm_capable and not nodeinfo.offline and
2929
            node_uuid != master_node_uuid):
2930
          additional_node_uuids.append(node_uuid)
2931
          vf_node_info.append(self.all_node_info[node_uuid])
2932
          break
2933
      key = constants.NV_FILELIST
2934
      vf_nvinfo.update(self.rpc.call_node_verify(
2935
         additional_node_uuids, {key: node_verify_param[key]},
2936
         self.cfg.GetClusterName(), self.cfg.GetClusterInfo().hvparams))
2937
    else:
2938
      vf_nvinfo = all_nvinfo
2939
      vf_node_info = self.my_node_info.values()
2940

    
2941
    self._VerifyFiles(vf_node_info, master_node_uuid, vf_nvinfo, filemap)
2942

    
2943
    feedback_fn("* Verifying node status")
2944

    
2945
    refos_img = None
2946

    
2947
    for node_i in node_data_list:
2948
      nimg = node_image[node_i.uuid]
2949

    
2950
      if node_i.offline:
2951
        if verbose:
2952
          feedback_fn("* Skipping offline node %s" % (node_i.name,))
2953
        n_offline += 1
2954
        continue
2955

    
2956
      if node_i.uuid == master_node_uuid:
2957
        ntype = "master"
2958
      elif node_i.master_candidate:
2959
        ntype = "master candidate"
2960
      elif node_i.drained:
2961
        ntype = "drained"
2962
        n_drained += 1
2963
      else:
2964
        ntype = "regular"
2965
      if verbose:
2966
        feedback_fn("* Verifying node %s (%s)" % (node_i.name, ntype))
2967

    
2968
      msg = all_nvinfo[node_i.uuid].fail_msg
2969
      self._ErrorIf(msg, constants.CV_ENODERPC, node_i.name,
2970
                    "while contacting node: %s", msg)
2971
      if msg:
2972
        nimg.rpc_fail = True
2973
        continue
2974

    
2975
      nresult = all_nvinfo[node_i.uuid].payload
2976

    
2977
      nimg.call_ok = self._VerifyNode(node_i, nresult)
2978
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2979
      self._VerifyNodeNetwork(node_i, nresult)
2980
      self._VerifyNodeUserScripts(node_i, nresult)
2981
      self._VerifyOob(node_i, nresult)
2982
      self._VerifyAcceptedFileStoragePaths(node_i, nresult,
2983
                                           node_i.uuid == master_node_uuid)
2984
      self._VerifyFileStoragePaths(node_i, nresult)
2985
      self._VerifySharedFileStoragePaths(node_i, nresult)
2986

    
2987
      if nimg.vm_capable:
2988
        self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
2989
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
2990
                             all_drbd_map)
2991

    
2992
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2993
        self._UpdateNodeInstances(node_i, nresult, nimg)
2994
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2995
        self._UpdateNodeOS(node_i, nresult, nimg)
2996

    
2997
        if not nimg.os_fail:
2998
          if refos_img is None:
2999
            refos_img = nimg
3000
          self._VerifyNodeOS(node_i, nimg, refos_img)
3001
        self._VerifyNodeBridges(node_i, nresult, bridges)
3002

    
3003
        # Check whether all running instances are primary for the node. (This
3004
        # can no longer be done from _VerifyInstance below, since some of the
3005
        # wrong instances could be from other node groups.)
3006
        non_primary_inst_uuids = set(nimg.instances).difference(nimg.pinst)
3007

    
3008
        for inst_uuid in non_primary_inst_uuids:
3009
          test = inst_uuid in self.all_inst_info
3010
          self._ErrorIf(test, constants.CV_EINSTANCEWRONGNODE,
3011
                        self.cfg.GetInstanceName(inst_uuid),
3012
                        "instance should not run on node %s", node_i.name)
3013
          self._ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
3014
                        "node is running unknown instance %s", inst_uuid)
3015

    
3016
    self._VerifyGroupDRBDVersion(all_nvinfo)
3017
    self._VerifyGroupLVM(node_image, vg_name)
3018

    
3019
    for node_uuid, result in extra_lv_nvinfo.items():
3020
      self._UpdateNodeVolumes(self.all_node_info[node_uuid], result.payload,
3021
                              node_image[node_uuid], vg_name)
3022

    
3023
    feedback_fn("* Verifying instance status")
3024
    for inst_uuid in self.my_inst_uuids:
3025
      instance = self.my_inst_info[inst_uuid]
3026
      if verbose:
3027
        feedback_fn("* Verifying instance %s" % instance.name)
3028
      self._VerifyInstance(instance, node_image, instdisk[inst_uuid])
3029

    
3030
      # If the instance is non-redundant we cannot survive losing its primary
3031
      # node, so we are not N+1 compliant.
3032
      if instance.disk_template not in constants.DTS_MIRRORED:
3033
        i_non_redundant.append(instance)
3034

    
3035
      if not cluster.FillBE(instance)[constants.BE_AUTO_BALANCE]:
3036
        i_non_a_balanced.append(instance)
3037

    
3038
    feedback_fn("* Verifying orphan volumes")
3039
    reserved = utils.FieldSet(*cluster.reserved_lvs)
3040

    
3041
    # We will get spurious "unknown volume" warnings if any node of this group
3042
    # is secondary for an instance whose primary is in another group. To avoid
3043
    # them, we find these instances and add their volumes to node_vol_should.
3044
    for instance in self.all_inst_info.values():
3045
      for secondary in instance.secondary_nodes:
3046
        if (secondary in self.my_node_info
3047
            and instance.name not in self.my_inst_info):
3048
          instance.MapLVsByNode(node_vol_should)
3049
          break
3050

    
3051
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
3052

    
3053
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
3054
      feedback_fn("* Verifying N+1 Memory redundancy")
3055
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
3056

    
3057
    feedback_fn("* Other Notes")
3058
    if i_non_redundant:
3059
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
3060
                  % len(i_non_redundant))
3061

    
3062
    if i_non_a_balanced:
3063
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
3064
                  % len(i_non_a_balanced))
3065

    
3066
    if i_offline:
3067
      feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
3068

    
3069
    if n_offline:
3070
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
3071

    
3072
    if n_drained:
3073
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
3074

    
3075
    return not self.bad
3076

    
3077
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
3078
    """Analyze the post-hooks' result
3079

3080
    This method analyses the hook result, handles it, and sends some
3081
    nicely-formatted feedback back to the user.
3082

3083
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
3084
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
3085
    @param hooks_results: the results of the multi-node hooks rpc call
3086
    @param feedback_fn: function used send feedback back to the caller
3087
    @param lu_result: previous Exec result
3088
    @return: the new Exec result, based on the previous result
3089
        and hook results
3090

3091
    """
3092
    # We only really run POST phase hooks, only for non-empty groups,
3093
    # and are only interested in their results
3094
    if not self.my_node_uuids:
3095
      # empty node group
3096
      pass
3097
    elif phase == constants.HOOKS_PHASE_POST:
3098
      # Used to change hooks' output to proper indentation
3099
      feedback_fn("* Hooks Results")
3100
      assert hooks_results, "invalid result from hooks"
3101

    
3102
      for node_name in hooks_results:
3103
        res = hooks_results[node_name]
3104
        msg = res.fail_msg
3105
        test = msg and not res.offline
3106
        self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3107
                      "Communication failure in hooks execution: %s", msg)
3108
        if res.offline or msg:
3109
          # No need to investigate payload if node is offline or gave
3110
          # an error.
3111
          continue
3112
        for script, hkr, output in res.payload:
3113
          test = hkr == constants.HKR_FAIL
3114
          self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3115
                        "Script %s failed, output:", script)
3116
          if test:
3117
            output = self._HOOKS_INDENT_RE.sub("      ", output)
3118
            feedback_fn("%s" % output)
3119
            lu_result = False
3120

    
3121
    return lu_result
3122

    
3123

    
3124
class LUClusterVerifyDisks(NoHooksLU):
3125
  """Verifies the cluster disks status.
3126

3127
  """
3128
  REQ_BGL = False
3129

    
3130
  def ExpandNames(self):
3131
    self.share_locks = ShareAll()
3132
    self.needed_locks = {
3133
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
3134
      }
3135

    
3136
  def Exec(self, feedback_fn):
3137
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3138

    
3139
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3140
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3141
                           for group in group_names])