Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / cluster.py @ 4e6cfd11

History | View | Annotate | Download (115.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with the cluster."""
23

    
24
import OpenSSL
25

    
26
import copy
27
import itertools
28
import logging
29
import operator
30
import os
31
import re
32
import time
33

    
34
from ganeti import compat
35
from ganeti import constants
36
from ganeti import errors
37
from ganeti import hypervisor
38
from ganeti import locking
39
from ganeti import masterd
40
from ganeti import netutils
41
from ganeti import objects
42
from ganeti import opcodes
43
from ganeti import pathutils
44
from ganeti import query
45
from ganeti import rpc
46
from ganeti import runtime
47
from ganeti import ssh
48
from ganeti import uidpool
49
from ganeti import utils
50
from ganeti import vcluster
51

    
52
from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
53
  ResultWithJobs
54
from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55
  ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56
  GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57
  GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58
  CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59
  ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob
60

    
61
import ganeti.masterd.instance
62

    
63

    
64
class LUClusterActivateMasterIp(NoHooksLU):
65
  """Activate the master IP on the master node.
66

67
  """
68
  def Exec(self, feedback_fn):
69
    """Activate the master IP.
70

71
    """
72
    master_params = self.cfg.GetMasterNetworkParameters()
73
    ems = self.cfg.GetUseExternalMipScript()
74
    result = self.rpc.call_node_activate_master_ip(master_params.uuid,
75
                                                   master_params, ems)
76
    result.Raise("Could not activate the master IP")
77

    
78

    
79
class LUClusterDeactivateMasterIp(NoHooksLU):
80
  """Deactivate the master IP on the master node.
81

82
  """
83
  def Exec(self, feedback_fn):
84
    """Deactivate the master IP.
85

86
    """
87
    master_params = self.cfg.GetMasterNetworkParameters()
88
    ems = self.cfg.GetUseExternalMipScript()
89
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
90
                                                     master_params, ems)
91
    result.Raise("Could not deactivate the master IP")
92

    
93

    
94
class LUClusterConfigQuery(NoHooksLU):
95
  """Return configuration values.
96

97
  """
98
  REQ_BGL = False
99

    
100
  def CheckArguments(self):
101
    self.cq = ClusterQuery(None, self.op.output_fields, False)
102

    
103
  def ExpandNames(self):
104
    self.cq.ExpandNames(self)
105

    
106
  def DeclareLocks(self, level):
107
    self.cq.DeclareLocks(self, level)
108

    
109
  def Exec(self, feedback_fn):
110
    result = self.cq.OldStyleQuery(self)
111

    
112
    assert len(result) == 1
113

    
114
    return result[0]
115

    
116

    
117
class LUClusterDestroy(LogicalUnit):
118
  """Logical unit for destroying the cluster.
119

120
  """
121
  HPATH = "cluster-destroy"
122
  HTYPE = constants.HTYPE_CLUSTER
123

    
124
  def BuildHooksEnv(self):
125
    """Build hooks env.
126

127
    """
128
    return {
129
      "OP_TARGET": self.cfg.GetClusterName(),
130
      }
131

    
132
  def BuildHooksNodes(self):
133
    """Build hooks nodes.
134

135
    """
136
    return ([], [])
137

    
138
  def CheckPrereq(self):
139
    """Check prerequisites.
140

141
    This checks whether the cluster is empty.
142

143
    Any errors are signaled by raising errors.OpPrereqError.
144

145
    """
146
    master = self.cfg.GetMasterNode()
147

    
148
    nodelist = self.cfg.GetNodeList()
149
    if len(nodelist) != 1 or nodelist[0] != master:
150
      raise errors.OpPrereqError("There are still %d node(s) in"
151
                                 " this cluster." % (len(nodelist) - 1),
152
                                 errors.ECODE_INVAL)
153
    instancelist = self.cfg.GetInstanceList()
154
    if instancelist:
155
      raise errors.OpPrereqError("There are still %d instance(s) in"
156
                                 " this cluster." % len(instancelist),
157
                                 errors.ECODE_INVAL)
158

    
159
  def Exec(self, feedback_fn):
160
    """Destroys the cluster.
161

162
    """
163
    master_params = self.cfg.GetMasterNetworkParameters()
164

    
165
    # Run post hooks on master node before it's removed
166
    RunPostHook(self, self.cfg.GetNodeName(master_params.uuid))
167

    
168
    ems = self.cfg.GetUseExternalMipScript()
169
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
170
                                                     master_params, ems)
171
    result.Warn("Error disabling the master IP address", self.LogWarning)
172
    return master_params.uuid
173

    
174

    
175
class LUClusterPostInit(LogicalUnit):
176
  """Logical unit for running hooks after cluster initialization.
177

178
  """
179
  HPATH = "cluster-init"
180
  HTYPE = constants.HTYPE_CLUSTER
181

    
182
  def BuildHooksEnv(self):
183
    """Build hooks env.
184

185
    """
186
    return {
187
      "OP_TARGET": self.cfg.GetClusterName(),
188
      }
189

    
190
  def BuildHooksNodes(self):
191
    """Build hooks nodes.
192

193
    """
194
    return ([], [self.cfg.GetMasterNode()])
195

    
196
  def Exec(self, feedback_fn):
197
    """Nothing to do.
198

199
    """
200
    return True
201

    
202

    
203
class ClusterQuery(QueryBase):
204
  FIELDS = query.CLUSTER_FIELDS
205

    
206
  #: Do not sort (there is only one item)
207
  SORT_FIELD = None
208

    
209
  def ExpandNames(self, lu):
210
    lu.needed_locks = {}
211

    
212
    # The following variables interact with _QueryBase._GetNames
213
    self.wanted = locking.ALL_SET
214
    self.do_locking = self.use_locking
215

    
216
    if self.do_locking:
217
      raise errors.OpPrereqError("Can not use locking for cluster queries",
218
                                 errors.ECODE_INVAL)
219

    
220
  def DeclareLocks(self, lu, level):
221
    pass
222

    
223
  def _GetQueryData(self, lu):
224
    """Computes the list of nodes and their attributes.
225

226
    """
227
    # Locking is not used
228
    assert not (compat.any(lu.glm.is_owned(level)
229
                           for level in locking.LEVELS
230
                           if level != locking.LEVEL_CLUSTER) or
231
                self.do_locking or self.use_locking)
232

    
233
    if query.CQ_CONFIG in self.requested_data:
234
      cluster = lu.cfg.GetClusterInfo()
235
      nodes = lu.cfg.GetAllNodesInfo()
236
    else:
237
      cluster = NotImplemented
238
      nodes = NotImplemented
239

    
240
    if query.CQ_QUEUE_DRAINED in self.requested_data:
241
      drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
242
    else:
243
      drain_flag = NotImplemented
244

    
245
    if query.CQ_WATCHER_PAUSE in self.requested_data:
246
      master_node_uuid = lu.cfg.GetMasterNode()
247

    
248
      result = lu.rpc.call_get_watcher_pause(master_node_uuid)
249
      result.Raise("Can't retrieve watcher pause from master node '%s'" %
250
                   lu.cfg.GetMasterNodeName())
251

    
252
      watcher_pause = result.payload
253
    else:
254
      watcher_pause = NotImplemented
255

    
256
    return query.ClusterQueryData(cluster, nodes, drain_flag, watcher_pause)
257

    
258

    
259
class LUClusterQuery(NoHooksLU):
260
  """Query cluster configuration.
261

262
  """
263
  REQ_BGL = False
264

    
265
  def ExpandNames(self):
266
    self.needed_locks = {}
267

    
268
  def Exec(self, feedback_fn):
269
    """Return cluster config.
270

271
    """
272
    cluster = self.cfg.GetClusterInfo()
273
    os_hvp = {}
274

    
275
    # Filter just for enabled hypervisors
276
    for os_name, hv_dict in cluster.os_hvp.items():
277
      os_hvp[os_name] = {}
278
      for hv_name, hv_params in hv_dict.items():
279
        if hv_name in cluster.enabled_hypervisors:
280
          os_hvp[os_name][hv_name] = hv_params
281

    
282
    # Convert ip_family to ip_version
283
    primary_ip_version = constants.IP4_VERSION
284
    if cluster.primary_ip_family == netutils.IP6Address.family:
285
      primary_ip_version = constants.IP6_VERSION
286

    
287
    result = {
288
      "software_version": constants.RELEASE_VERSION,
289
      "protocol_version": constants.PROTOCOL_VERSION,
290
      "config_version": constants.CONFIG_VERSION,
291
      "os_api_version": max(constants.OS_API_VERSIONS),
292
      "export_version": constants.EXPORT_VERSION,
293
      "architecture": runtime.GetArchInfo(),
294
      "name": cluster.cluster_name,
295
      "master": self.cfg.GetMasterNodeName(),
296
      "default_hypervisor": cluster.primary_hypervisor,
297
      "enabled_hypervisors": cluster.enabled_hypervisors,
298
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
299
                        for hypervisor_name in cluster.enabled_hypervisors]),
300
      "os_hvp": os_hvp,
301
      "beparams": cluster.beparams,
302
      "osparams": cluster.osparams,
303
      "ipolicy": cluster.ipolicy,
304
      "nicparams": cluster.nicparams,
305
      "ndparams": cluster.ndparams,
306
      "diskparams": cluster.diskparams,
307
      "candidate_pool_size": cluster.candidate_pool_size,
308
      "master_netdev": cluster.master_netdev,
309
      "master_netmask": cluster.master_netmask,
310
      "use_external_mip_script": cluster.use_external_mip_script,
311
      "volume_group_name": cluster.volume_group_name,
312
      "drbd_usermode_helper": cluster.drbd_usermode_helper,
313
      "file_storage_dir": cluster.file_storage_dir,
314
      "shared_file_storage_dir": cluster.shared_file_storage_dir,
315
      "maintain_node_health": cluster.maintain_node_health,
316
      "ctime": cluster.ctime,
317
      "mtime": cluster.mtime,
318
      "uuid": cluster.uuid,
319
      "tags": list(cluster.GetTags()),
320
      "uid_pool": cluster.uid_pool,
321
      "default_iallocator": cluster.default_iallocator,
322
      "reserved_lvs": cluster.reserved_lvs,
323
      "primary_ip_version": primary_ip_version,
324
      "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
325
      "hidden_os": cluster.hidden_os,
326
      "blacklisted_os": cluster.blacklisted_os,
327
      "enabled_disk_templates": cluster.enabled_disk_templates,
328
      }
329

    
330
    return result
331

    
332

    
333
class LUClusterRedistConf(NoHooksLU):
334
  """Force the redistribution of cluster configuration.
335

336
  This is a very simple LU.
337

338
  """
339
  REQ_BGL = False
340

    
341
  def ExpandNames(self):
342
    self.needed_locks = {
343
      locking.LEVEL_NODE: locking.ALL_SET,
344
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
345
    }
346
    self.share_locks = ShareAll()
347

    
348
  def Exec(self, feedback_fn):
349
    """Redistribute the configuration.
350

351
    """
352
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
353
    RedistributeAncillaryFiles(self)
354

    
355

    
356
class LUClusterRename(LogicalUnit):
357
  """Rename the cluster.
358

359
  """
360
  HPATH = "cluster-rename"
361
  HTYPE = constants.HTYPE_CLUSTER
362

    
363
  def BuildHooksEnv(self):
364
    """Build hooks env.
365

366
    """
367
    return {
368
      "OP_TARGET": self.cfg.GetClusterName(),
369
      "NEW_NAME": self.op.name,
370
      }
371

    
372
  def BuildHooksNodes(self):
373
    """Build hooks nodes.
374

375
    """
376
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
377

    
378
  def CheckPrereq(self):
379
    """Verify that the passed name is a valid one.
380

381
    """
382
    hostname = netutils.GetHostname(name=self.op.name,
383
                                    family=self.cfg.GetPrimaryIPFamily())
384

    
385
    new_name = hostname.name
386
    self.ip = new_ip = hostname.ip
387
    old_name = self.cfg.GetClusterName()
388
    old_ip = self.cfg.GetMasterIP()
389
    if new_name == old_name and new_ip == old_ip:
390
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
391
                                 " cluster has changed",
392
                                 errors.ECODE_INVAL)
393
    if new_ip != old_ip:
394
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
395
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
396
                                   " reachable on the network" %
397
                                   new_ip, errors.ECODE_NOTUNIQUE)
398

    
399
    self.op.name = new_name
400

    
401
  def Exec(self, feedback_fn):
402
    """Rename the cluster.
403

404
    """
405
    clustername = self.op.name
406
    new_ip = self.ip
407

    
408
    # shutdown the master IP
409
    master_params = self.cfg.GetMasterNetworkParameters()
410
    ems = self.cfg.GetUseExternalMipScript()
411
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
412
                                                     master_params, ems)
413
    result.Raise("Could not disable the master role")
414

    
415
    try:
416
      cluster = self.cfg.GetClusterInfo()
417
      cluster.cluster_name = clustername
418
      cluster.master_ip = new_ip
419
      self.cfg.Update(cluster, feedback_fn)
420

    
421
      # update the known hosts file
422
      ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
423
      node_list = self.cfg.GetOnlineNodeList()
424
      try:
425
        node_list.remove(master_params.uuid)
426
      except ValueError:
427
        pass
428
      UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
429
    finally:
430
      master_params.ip = new_ip
431
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
432
                                                     master_params, ems)
433
      result.Warn("Could not re-enable the master role on the master,"
434
                  " please restart manually", self.LogWarning)
435

    
436
    return clustername
437

    
438

    
439
class LUClusterRepairDiskSizes(NoHooksLU):
440
  """Verifies the cluster disks sizes.
441

442
  """
443
  REQ_BGL = False
444

    
445
  def ExpandNames(self):
446
    if self.op.instances:
447
      (_, self.wanted_names) = GetWantedInstances(self, self.op.instances)
448
      # Not getting the node allocation lock as only a specific set of
449
      # instances (and their nodes) is going to be acquired
450
      self.needed_locks = {
451
        locking.LEVEL_NODE_RES: [],
452
        locking.LEVEL_INSTANCE: self.wanted_names,
453
        }
454
      self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
455
    else:
456
      self.wanted_names = None
457
      self.needed_locks = {
458
        locking.LEVEL_NODE_RES: locking.ALL_SET,
459
        locking.LEVEL_INSTANCE: locking.ALL_SET,
460

    
461
        # This opcode is acquires the node locks for all instances
462
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
463
        }
464

    
465
    self.share_locks = {
466
      locking.LEVEL_NODE_RES: 1,
467
      locking.LEVEL_INSTANCE: 0,
468
      locking.LEVEL_NODE_ALLOC: 1,
469
      }
470

    
471
  def DeclareLocks(self, level):
472
    if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
473
      self._LockInstancesNodes(primary_only=True, level=level)
474

    
475
  def CheckPrereq(self):
476
    """Check prerequisites.
477

478
    This only checks the optional instance list against the existing names.
479

480
    """
481
    if self.wanted_names is None:
482
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
483

    
484
    self.wanted_instances = \
485
        map(compat.snd, self.cfg.GetMultiInstanceInfoByName(self.wanted_names))
486

    
487
  def _EnsureChildSizes(self, disk):
488
    """Ensure children of the disk have the needed disk size.
489

490
    This is valid mainly for DRBD8 and fixes an issue where the
491
    children have smaller disk size.
492

493
    @param disk: an L{ganeti.objects.Disk} object
494

495
    """
496
    if disk.dev_type == constants.LD_DRBD8:
497
      assert disk.children, "Empty children for DRBD8?"
498
      fchild = disk.children[0]
499
      mismatch = fchild.size < disk.size
500
      if mismatch:
501
        self.LogInfo("Child disk has size %d, parent %d, fixing",
502
                     fchild.size, disk.size)
503
        fchild.size = disk.size
504

    
505
      # and we recurse on this child only, not on the metadev
506
      return self._EnsureChildSizes(fchild) or mismatch
507
    else:
508
      return False
509

    
510
  def Exec(self, feedback_fn):
511
    """Verify the size of cluster disks.
512

513
    """
514
    # TODO: check child disks too
515
    # TODO: check differences in size between primary/secondary nodes
516
    per_node_disks = {}
517
    for instance in self.wanted_instances:
518
      pnode = instance.primary_node
519
      if pnode not in per_node_disks:
520
        per_node_disks[pnode] = []
521
      for idx, disk in enumerate(instance.disks):
522
        per_node_disks[pnode].append((instance, idx, disk))
523

    
524
    assert not (frozenset(per_node_disks.keys()) -
525
                self.owned_locks(locking.LEVEL_NODE_RES)), \
526
      "Not owning correct locks"
527
    assert not self.owned_locks(locking.LEVEL_NODE)
528

    
529
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
530
                                               per_node_disks.keys())
531

    
532
    changed = []
533
    for node_uuid, dskl in per_node_disks.items():
534
      newl = [v[2].Copy() for v in dskl]
535
      for dsk in newl:
536
        self.cfg.SetDiskID(dsk, node_uuid)
537
      node_name = self.cfg.GetNodeName(node_uuid)
538
      result = self.rpc.call_blockdev_getdimensions(node_uuid, newl)
539
      if result.fail_msg:
540
        self.LogWarning("Failure in blockdev_getdimensions call to node"
541
                        " %s, ignoring", node_name)
542
        continue
543
      if len(result.payload) != len(dskl):
544
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
545
                        " result.payload=%s", node_name, len(dskl),
546
                        result.payload)
547
        self.LogWarning("Invalid result from node %s, ignoring node results",
548
                        node_name)
549
        continue
550
      for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
551
        if dimensions is None:
552
          self.LogWarning("Disk %d of instance %s did not return size"
553
                          " information, ignoring", idx, instance.name)
554
          continue
555
        if not isinstance(dimensions, (tuple, list)):
556
          self.LogWarning("Disk %d of instance %s did not return valid"
557
                          " dimension information, ignoring", idx,
558
                          instance.name)
559
          continue
560
        (size, spindles) = dimensions
561
        if not isinstance(size, (int, long)):
562
          self.LogWarning("Disk %d of instance %s did not return valid"
563
                          " size information, ignoring", idx, instance.name)
564
          continue
565
        size = size >> 20
566
        if size != disk.size:
567
          self.LogInfo("Disk %d of instance %s has mismatched size,"
568
                       " correcting: recorded %d, actual %d", idx,
569
                       instance.name, disk.size, size)
570
          disk.size = size
571
          self.cfg.Update(instance, feedback_fn)
572
          changed.append((instance.name, idx, "size", size))
573
        if es_flags[node_uuid]:
574
          if spindles is None:
575
            self.LogWarning("Disk %d of instance %s did not return valid"
576
                            " spindles information, ignoring", idx,
577
                            instance.name)
578
          elif disk.spindles is None or disk.spindles != spindles:
579
            self.LogInfo("Disk %d of instance %s has mismatched spindles,"
580
                         " correcting: recorded %s, actual %s",
581
                         idx, instance.name, disk.spindles, spindles)
582
            disk.spindles = spindles
583
            self.cfg.Update(instance, feedback_fn)
584
            changed.append((instance.name, idx, "spindles", disk.spindles))
585
        if self._EnsureChildSizes(disk):
586
          self.cfg.Update(instance, feedback_fn)
587
          changed.append((instance.name, idx, "size", disk.size))
588
    return changed
589

    
590

    
591
def _ValidateNetmask(cfg, netmask):
592
  """Checks if a netmask is valid.
593

594
  @type cfg: L{config.ConfigWriter}
595
  @param cfg: The cluster configuration
596
  @type netmask: int
597
  @param netmask: the netmask to be verified
598
  @raise errors.OpPrereqError: if the validation fails
599

600
  """
601
  ip_family = cfg.GetPrimaryIPFamily()
602
  try:
603
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
604
  except errors.ProgrammerError:
605
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
606
                               ip_family, errors.ECODE_INVAL)
607
  if not ipcls.ValidateNetmask(netmask):
608
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
609
                               (netmask), errors.ECODE_INVAL)
610

    
611

    
612
def CheckFileBasedStoragePathVsEnabledDiskTemplates(
613
    logging_warn_fn, file_storage_dir, enabled_disk_templates,
614
    file_disk_template):
615
  """Checks whether the given file-based storage directory is acceptable.
616

617
  Note: This function is public, because it is also used in bootstrap.py.
618

619
  @type logging_warn_fn: function
620
  @param logging_warn_fn: function which accepts a string and logs it
621
  @type file_storage_dir: string
622
  @param file_storage_dir: the directory to be used for file-based instances
623
  @type enabled_disk_templates: list of string
624
  @param enabled_disk_templates: the list of enabled disk templates
625
  @type file_disk_template: string
626
  @param file_disk_template: the file-based disk template for which the
627
      path should be checked
628

629
  """
630
  assert (file_disk_template in
631
          utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
632
  file_storage_enabled = file_disk_template in enabled_disk_templates
633
  if file_storage_dir is not None:
634
    if file_storage_dir == "":
635
      if file_storage_enabled:
636
        raise errors.OpPrereqError(
637
            "Unsetting the '%s' storage directory while having '%s' storage"
638
            " enabled is not permitted." %
639
            (file_disk_template, file_disk_template))
640
    else:
641
      if not file_storage_enabled:
642
        logging_warn_fn(
643
            "Specified a %s storage directory, although %s storage is not"
644
            " enabled." % (file_disk_template, file_disk_template))
645
  else:
646
    raise errors.ProgrammerError("Received %s storage dir with value"
647
                                 " 'None'." % file_disk_template)
648

    
649

    
650
def CheckFileStoragePathVsEnabledDiskTemplates(
651
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
652
  """Checks whether the given file storage directory is acceptable.
653

654
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
655

656
  """
657
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
658
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
659
      constants.DT_FILE)
660

    
661

    
662
def CheckSharedFileStoragePathVsEnabledDiskTemplates(
663
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
664
  """Checks whether the given shared file storage directory is acceptable.
665

666
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
667

668
  """
669
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
670
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
671
      constants.DT_SHARED_FILE)
672

    
673

    
674
class LUClusterSetParams(LogicalUnit):
675
  """Change the parameters of the cluster.
676

677
  """
678
  HPATH = "cluster-modify"
679
  HTYPE = constants.HTYPE_CLUSTER
680
  REQ_BGL = False
681

    
682
  def CheckArguments(self):
683
    """Check parameters
684

685
    """
686
    if self.op.uid_pool:
687
      uidpool.CheckUidPool(self.op.uid_pool)
688

    
689
    if self.op.add_uids:
690
      uidpool.CheckUidPool(self.op.add_uids)
691

    
692
    if self.op.remove_uids:
693
      uidpool.CheckUidPool(self.op.remove_uids)
694

    
695
    if self.op.master_netmask is not None:
696
      _ValidateNetmask(self.cfg, self.op.master_netmask)
697

    
698
    if self.op.diskparams:
699
      for dt_params in self.op.diskparams.values():
700
        utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
701
      try:
702
        utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
703
      except errors.OpPrereqError, err:
704
        raise errors.OpPrereqError("While verify diskparams options: %s" % err,
705
                                   errors.ECODE_INVAL)
706

    
707
  def ExpandNames(self):
708
    # FIXME: in the future maybe other cluster params won't require checking on
709
    # all nodes to be modified.
710
    # FIXME: This opcode changes cluster-wide settings. Is acquiring all
711
    # resource locks the right thing, shouldn't it be the BGL instead?
712
    self.needed_locks = {
713
      locking.LEVEL_NODE: locking.ALL_SET,
714
      locking.LEVEL_INSTANCE: locking.ALL_SET,
715
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
716
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
717
    }
718
    self.share_locks = ShareAll()
719

    
720
  def BuildHooksEnv(self):
721
    """Build hooks env.
722

723
    """
724
    return {
725
      "OP_TARGET": self.cfg.GetClusterName(),
726
      "NEW_VG_NAME": self.op.vg_name,
727
      }
728

    
729
  def BuildHooksNodes(self):
730
    """Build hooks nodes.
731

732
    """
733
    mn = self.cfg.GetMasterNode()
734
    return ([mn], [mn])
735

    
736
  def _CheckVgName(self, node_uuids, enabled_disk_templates,
737
                   new_enabled_disk_templates):
738
    """Check the consistency of the vg name on all nodes and in case it gets
739
       unset whether there are instances still using it.
740

741
    """
742
    if self.op.vg_name is not None and not self.op.vg_name:
743
      if self.cfg.HasAnyDiskOfType(constants.LD_LV):
744
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
745
                                   " instances exist", errors.ECODE_INVAL)
746

    
747
    if (self.op.vg_name is not None and
748
        utils.IsLvmEnabled(enabled_disk_templates)) or \
749
           (self.cfg.GetVGName() is not None and
750
            utils.LvmGetsEnabled(enabled_disk_templates,
751
                                 new_enabled_disk_templates)):
752
      self._CheckVgNameOnNodes(node_uuids)
753

    
754
  def _CheckVgNameOnNodes(self, node_uuids):
755
    """Check the status of the volume group on each node.
756

757
    """
758
    vglist = self.rpc.call_vg_list(node_uuids)
759
    for node_uuid in node_uuids:
760
      msg = vglist[node_uuid].fail_msg
761
      if msg:
762
        # ignoring down node
763
        self.LogWarning("Error while gathering data on node %s"
764
                        " (ignoring node): %s",
765
                        self.cfg.GetNodeName(node_uuid), msg)
766
        continue
767
      vgstatus = utils.CheckVolumeGroupSize(vglist[node_uuid].payload,
768
                                            self.op.vg_name,
769
                                            constants.MIN_VG_SIZE)
770
      if vgstatus:
771
        raise errors.OpPrereqError("Error on node '%s': %s" %
772
                                   (self.cfg.GetNodeName(node_uuid), vgstatus),
773
                                   errors.ECODE_ENVIRON)
774

    
775
  def _GetEnabledDiskTemplates(self, cluster):
776
    """Determines the enabled disk templates and the subset of disk templates
777
       that are newly enabled by this operation.
778

779
    """
780
    enabled_disk_templates = None
781
    new_enabled_disk_templates = []
782
    if self.op.enabled_disk_templates:
783
      enabled_disk_templates = self.op.enabled_disk_templates
784
      new_enabled_disk_templates = \
785
        list(set(enabled_disk_templates)
786
             - set(cluster.enabled_disk_templates))
787
    else:
788
      enabled_disk_templates = cluster.enabled_disk_templates
789
    return (enabled_disk_templates, new_enabled_disk_templates)
790

    
791
  def CheckPrereq(self):
792
    """Check prerequisites.
793

794
    This checks whether the given params don't conflict and
795
    if the given volume group is valid.
796

797
    """
798
    if self.op.drbd_helper is not None and not self.op.drbd_helper:
799
      if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
800
        raise errors.OpPrereqError("Cannot disable drbd helper while"
801
                                   " drbd-based instances exist",
802
                                   errors.ECODE_INVAL)
803

    
804
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
805
    self.cluster = cluster = self.cfg.GetClusterInfo()
806

    
807
    vm_capable_node_uuids = [node.uuid
808
                             for node in self.cfg.GetAllNodesInfo().values()
809
                             if node.uuid in node_uuids and node.vm_capable]
810

    
811
    (enabled_disk_templates, new_enabled_disk_templates) = \
812
      self._GetEnabledDiskTemplates(cluster)
813

    
814
    self._CheckVgName(vm_capable_node_uuids, enabled_disk_templates,
815
                      new_enabled_disk_templates)
816

    
817
    if self.op.file_storage_dir is not None:
818
      CheckFileStoragePathVsEnabledDiskTemplates(
819
          self.LogWarning, self.op.file_storage_dir, enabled_disk_templates)
820

    
821
    if self.op.shared_file_storage_dir is not None:
822
      CheckSharedFileStoragePathVsEnabledDiskTemplates(
823
          self.LogWarning, self.op.shared_file_storage_dir,
824
          enabled_disk_templates)
825

    
826
    if self.op.drbd_helper:
827
      # checks given drbd helper on all nodes
828
      helpers = self.rpc.call_drbd_helper(node_uuids)
829
      for (_, ninfo) in self.cfg.GetMultiNodeInfo(node_uuids):
830
        if ninfo.offline:
831
          self.LogInfo("Not checking drbd helper on offline node %s",
832
                       ninfo.name)
833
          continue
834
        msg = helpers[ninfo.uuid].fail_msg
835
        if msg:
836
          raise errors.OpPrereqError("Error checking drbd helper on node"
837
                                     " '%s': %s" % (ninfo.name, msg),
838
                                     errors.ECODE_ENVIRON)
839
        node_helper = helpers[ninfo.uuid].payload
840
        if node_helper != self.op.drbd_helper:
841
          raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
842
                                     (ninfo.name, node_helper),
843
                                     errors.ECODE_ENVIRON)
844

    
845
    # validate params changes
846
    if self.op.beparams:
847
      objects.UpgradeBeParams(self.op.beparams)
848
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
849
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
850

    
851
    if self.op.ndparams:
852
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
853
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
854

    
855
      # TODO: we need a more general way to handle resetting
856
      # cluster-level parameters to default values
857
      if self.new_ndparams["oob_program"] == "":
858
        self.new_ndparams["oob_program"] = \
859
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
860

    
861
    if self.op.hv_state:
862
      new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
863
                                           self.cluster.hv_state_static)
864
      self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
865
                               for hv, values in new_hv_state.items())
866

    
867
    if self.op.disk_state:
868
      new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
869
                                               self.cluster.disk_state_static)
870
      self.new_disk_state = \
871
        dict((storage, dict((name, cluster.SimpleFillDiskState(values))
872
                            for name, values in svalues.items()))
873
             for storage, svalues in new_disk_state.items())
874

    
875
    if self.op.ipolicy:
876
      self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
877
                                           group_policy=False)
878

    
879
      all_instances = self.cfg.GetAllInstancesInfo().values()
880
      violations = set()
881
      for group in self.cfg.GetAllNodeGroupsInfo().values():
882
        instances = frozenset([inst for inst in all_instances
883
                               if compat.any(nuuid in group.members
884
                                             for nuuid in inst.all_nodes)])
885
        new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
886
        ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
887
        new = ComputeNewInstanceViolations(ipol, new_ipolicy, instances,
888
                                           self.cfg)
889
        if new:
890
          violations.update(new)
891

    
892
      if violations:
893
        self.LogWarning("After the ipolicy change the following instances"
894
                        " violate them: %s",
895
                        utils.CommaJoin(utils.NiceSort(violations)))
896

    
897
    if self.op.nicparams:
898
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
899
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
900
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
901
      nic_errors = []
902

    
903
      # check all instances for consistency
904
      for instance in self.cfg.GetAllInstancesInfo().values():
905
        for nic_idx, nic in enumerate(instance.nics):
906
          params_copy = copy.deepcopy(nic.nicparams)
907
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
908

    
909
          # check parameter syntax
910
          try:
911
            objects.NIC.CheckParameterSyntax(params_filled)
912
          except errors.ConfigurationError, err:
913
            nic_errors.append("Instance %s, nic/%d: %s" %
914
                              (instance.name, nic_idx, err))
915

    
916
          # if we're moving instances to routed, check that they have an ip
917
          target_mode = params_filled[constants.NIC_MODE]
918
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
919
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
920
                              " address" % (instance.name, nic_idx))
921
      if nic_errors:
922
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
923
                                   "\n".join(nic_errors), errors.ECODE_INVAL)
924

    
925
    # hypervisor list/parameters
926
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
927
    if self.op.hvparams:
928
      for hv_name, hv_dict in self.op.hvparams.items():
929
        if hv_name not in self.new_hvparams:
930
          self.new_hvparams[hv_name] = hv_dict
931
        else:
932
          self.new_hvparams[hv_name].update(hv_dict)
933

    
934
    # disk template parameters
935
    self.new_diskparams = objects.FillDict(cluster.diskparams, {})
936
    if self.op.diskparams:
937
      for dt_name, dt_params in self.op.diskparams.items():
938
        if dt_name not in self.op.diskparams:
939
          self.new_diskparams[dt_name] = dt_params
940
        else:
941
          self.new_diskparams[dt_name].update(dt_params)
942

    
943
    # os hypervisor parameters
944
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
945
    if self.op.os_hvp:
946
      for os_name, hvs in self.op.os_hvp.items():
947
        if os_name not in self.new_os_hvp:
948
          self.new_os_hvp[os_name] = hvs
949
        else:
950
          for hv_name, hv_dict in hvs.items():
951
            if hv_dict is None:
952
              # Delete if it exists
953
              self.new_os_hvp[os_name].pop(hv_name, None)
954
            elif hv_name not in self.new_os_hvp[os_name]:
955
              self.new_os_hvp[os_name][hv_name] = hv_dict
956
            else:
957
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
958

    
959
    # os parameters
960
    self.new_osp = objects.FillDict(cluster.osparams, {})
961
    if self.op.osparams:
962
      for os_name, osp in self.op.osparams.items():
963
        if os_name not in self.new_osp:
964
          self.new_osp[os_name] = {}
965

    
966
        self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
967
                                                 use_none=True)
968

    
969
        if not self.new_osp[os_name]:
970
          # we removed all parameters
971
          del self.new_osp[os_name]
972
        else:
973
          # check the parameter validity (remote check)
974
          CheckOSParams(self, False, [self.cfg.GetMasterNode()],
975
                        os_name, self.new_osp[os_name])
976

    
977
    # changes to the hypervisor list
978
    if self.op.enabled_hypervisors is not None:
979
      self.hv_list = self.op.enabled_hypervisors
980
      for hv in self.hv_list:
981
        # if the hypervisor doesn't already exist in the cluster
982
        # hvparams, we initialize it to empty, and then (in both
983
        # cases) we make sure to fill the defaults, as we might not
984
        # have a complete defaults list if the hypervisor wasn't
985
        # enabled before
986
        if hv not in new_hvp:
987
          new_hvp[hv] = {}
988
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
989
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
990
    else:
991
      self.hv_list = cluster.enabled_hypervisors
992

    
993
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
994
      # either the enabled list has changed, or the parameters have, validate
995
      for hv_name, hv_params in self.new_hvparams.items():
996
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
997
            (self.op.enabled_hypervisors and
998
             hv_name in self.op.enabled_hypervisors)):
999
          # either this is a new hypervisor, or its parameters have changed
1000
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1001
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1002
          hv_class.CheckParameterSyntax(hv_params)
1003
          CheckHVParams(self, node_uuids, hv_name, hv_params)
1004

    
1005
    self._CheckDiskTemplateConsistency()
1006

    
1007
    if self.op.os_hvp:
1008
      # no need to check any newly-enabled hypervisors, since the
1009
      # defaults have already been checked in the above code-block
1010
      for os_name, os_hvp in self.new_os_hvp.items():
1011
        for hv_name, hv_params in os_hvp.items():
1012
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1013
          # we need to fill in the new os_hvp on top of the actual hv_p
1014
          cluster_defaults = self.new_hvparams.get(hv_name, {})
1015
          new_osp = objects.FillDict(cluster_defaults, hv_params)
1016
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1017
          hv_class.CheckParameterSyntax(new_osp)
1018
          CheckHVParams(self, node_uuids, hv_name, new_osp)
1019

    
1020
    if self.op.default_iallocator:
1021
      alloc_script = utils.FindFile(self.op.default_iallocator,
1022
                                    constants.IALLOCATOR_SEARCH_PATH,
1023
                                    os.path.isfile)
1024
      if alloc_script is None:
1025
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
1026
                                   " specified" % self.op.default_iallocator,
1027
                                   errors.ECODE_INVAL)
1028

    
1029
  def _CheckDiskTemplateConsistency(self):
1030
    """Check whether the disk templates that are going to be disabled
1031
       are still in use by some instances.
1032

1033
    """
1034
    if self.op.enabled_disk_templates:
1035
      cluster = self.cfg.GetClusterInfo()
1036
      instances = self.cfg.GetAllInstancesInfo()
1037

    
1038
      disk_templates_to_remove = set(cluster.enabled_disk_templates) \
1039
        - set(self.op.enabled_disk_templates)
1040
      for instance in instances.itervalues():
1041
        if instance.disk_template in disk_templates_to_remove:
1042
          raise errors.OpPrereqError("Cannot disable disk template '%s',"
1043
                                     " because instance '%s' is using it." %
1044
                                     (instance.disk_template, instance.name))
1045

    
1046
  def _SetVgName(self, feedback_fn):
1047
    """Determines and sets the new volume group name.
1048

1049
    """
1050
    if self.op.vg_name is not None:
1051
      if self.op.vg_name and not \
1052
           utils.IsLvmEnabled(self.cluster.enabled_disk_templates):
1053
        feedback_fn("Note that you specified a volume group, but did not"
1054
                    " enable any lvm disk template.")
1055
      new_volume = self.op.vg_name
1056
      if not new_volume:
1057
        if utils.IsLvmEnabled(self.cluster.enabled_disk_templates):
1058
          raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
1059
                                     " disk templates are enabled.")
1060
        new_volume = None
1061
      if new_volume != self.cfg.GetVGName():
1062
        self.cfg.SetVGName(new_volume)
1063
      else:
1064
        feedback_fn("Cluster LVM configuration already in desired"
1065
                    " state, not changing")
1066
    else:
1067
      if utils.IsLvmEnabled(self.cluster.enabled_disk_templates) and \
1068
          not self.cfg.GetVGName():
1069
        raise errors.OpPrereqError("Please specify a volume group when"
1070
                                   " enabling lvm-based disk-templates.")
1071

    
1072
  def _SetFileStorageDir(self, feedback_fn):
1073
    """Set the file storage directory.
1074

1075
    """
1076
    if self.op.file_storage_dir is not None:
1077
      if self.cluster.file_storage_dir == self.op.file_storage_dir:
1078
        feedback_fn("Global file storage dir already set to value '%s'"
1079
                    % self.cluster.file_storage_dir)
1080
      else:
1081
        self.cluster.file_storage_dir = self.op.file_storage_dir
1082

    
1083
  def Exec(self, feedback_fn):
1084
    """Change the parameters of the cluster.
1085

1086
    """
1087
    if self.op.enabled_disk_templates:
1088
      self.cluster.enabled_disk_templates = \
1089
        list(set(self.op.enabled_disk_templates))
1090

    
1091
    self._SetVgName(feedback_fn)
1092
    self._SetFileStorageDir(feedback_fn)
1093

    
1094
    if self.op.drbd_helper is not None:
1095
      if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
1096
        feedback_fn("Note that you specified a drbd user helper, but did"
1097
                    " enabled the drbd disk template.")
1098
      new_helper = self.op.drbd_helper
1099
      if not new_helper:
1100
        new_helper = None
1101
      if new_helper != self.cfg.GetDRBDHelper():
1102
        self.cfg.SetDRBDHelper(new_helper)
1103
      else:
1104
        feedback_fn("Cluster DRBD helper already in desired state,"
1105
                    " not changing")
1106
    if self.op.hvparams:
1107
      self.cluster.hvparams = self.new_hvparams
1108
    if self.op.os_hvp:
1109
      self.cluster.os_hvp = self.new_os_hvp
1110
    if self.op.enabled_hypervisors is not None:
1111
      self.cluster.hvparams = self.new_hvparams
1112
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1113
    if self.op.beparams:
1114
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1115
    if self.op.nicparams:
1116
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1117
    if self.op.ipolicy:
1118
      self.cluster.ipolicy = self.new_ipolicy
1119
    if self.op.osparams:
1120
      self.cluster.osparams = self.new_osp
1121
    if self.op.ndparams:
1122
      self.cluster.ndparams = self.new_ndparams
1123
    if self.op.diskparams:
1124
      self.cluster.diskparams = self.new_diskparams
1125
    if self.op.hv_state:
1126
      self.cluster.hv_state_static = self.new_hv_state
1127
    if self.op.disk_state:
1128
      self.cluster.disk_state_static = self.new_disk_state
1129

    
1130
    if self.op.candidate_pool_size is not None:
1131
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1132
      # we need to update the pool size here, otherwise the save will fail
1133
      AdjustCandidatePool(self, [])
1134

    
1135
    if self.op.maintain_node_health is not None:
1136
      if self.op.maintain_node_health and not constants.ENABLE_CONFD:
1137
        feedback_fn("Note: CONFD was disabled at build time, node health"
1138
                    " maintenance is not useful (still enabling it)")
1139
      self.cluster.maintain_node_health = self.op.maintain_node_health
1140

    
1141
    if self.op.modify_etc_hosts is not None:
1142
      self.cluster.modify_etc_hosts = self.op.modify_etc_hosts
1143

    
1144
    if self.op.prealloc_wipe_disks is not None:
1145
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
1146

    
1147
    if self.op.add_uids is not None:
1148
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
1149

    
1150
    if self.op.remove_uids is not None:
1151
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
1152

    
1153
    if self.op.uid_pool is not None:
1154
      self.cluster.uid_pool = self.op.uid_pool
1155

    
1156
    if self.op.default_iallocator is not None:
1157
      self.cluster.default_iallocator = self.op.default_iallocator
1158

    
1159
    if self.op.reserved_lvs is not None:
1160
      self.cluster.reserved_lvs = self.op.reserved_lvs
1161

    
1162
    if self.op.use_external_mip_script is not None:
1163
      self.cluster.use_external_mip_script = self.op.use_external_mip_script
1164

    
1165
    def helper_os(aname, mods, desc):
1166
      desc += " OS list"
1167
      lst = getattr(self.cluster, aname)
1168
      for key, val in mods:
1169
        if key == constants.DDM_ADD:
1170
          if val in lst:
1171
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1172
          else:
1173
            lst.append(val)
1174
        elif key == constants.DDM_REMOVE:
1175
          if val in lst:
1176
            lst.remove(val)
1177
          else:
1178
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1179
        else:
1180
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
1181

    
1182
    if self.op.hidden_os:
1183
      helper_os("hidden_os", self.op.hidden_os, "hidden")
1184

    
1185
    if self.op.blacklisted_os:
1186
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1187

    
1188
    if self.op.master_netdev:
1189
      master_params = self.cfg.GetMasterNetworkParameters()
1190
      ems = self.cfg.GetUseExternalMipScript()
1191
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
1192
                  self.cluster.master_netdev)
1193
      result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
1194
                                                       master_params, ems)
1195
      if not self.op.force:
1196
        result.Raise("Could not disable the master ip")
1197
      else:
1198
        if result.fail_msg:
1199
          msg = ("Could not disable the master ip (continuing anyway): %s" %
1200
                 result.fail_msg)
1201
          feedback_fn(msg)
1202
      feedback_fn("Changing master_netdev from %s to %s" %
1203
                  (master_params.netdev, self.op.master_netdev))
1204
      self.cluster.master_netdev = self.op.master_netdev
1205

    
1206
    if self.op.master_netmask:
1207
      master_params = self.cfg.GetMasterNetworkParameters()
1208
      feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1209
      result = self.rpc.call_node_change_master_netmask(
1210
                 master_params.uuid, master_params.netmask,
1211
                 self.op.master_netmask, master_params.ip,
1212
                 master_params.netdev)
1213
      result.Warn("Could not change the master IP netmask", feedback_fn)
1214
      self.cluster.master_netmask = self.op.master_netmask
1215

    
1216
    self.cfg.Update(self.cluster, feedback_fn)
1217

    
1218
    if self.op.master_netdev:
1219
      master_params = self.cfg.GetMasterNetworkParameters()
1220
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
1221
                  self.op.master_netdev)
1222
      ems = self.cfg.GetUseExternalMipScript()
1223
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
1224
                                                     master_params, ems)
1225
      result.Warn("Could not re-enable the master ip on the master,"
1226
                  " please restart manually", self.LogWarning)
1227

    
1228

    
1229
class LUClusterVerify(NoHooksLU):
1230
  """Submits all jobs necessary to verify the cluster.
1231

1232
  """
1233
  REQ_BGL = False
1234

    
1235
  def ExpandNames(self):
1236
    self.needed_locks = {}
1237

    
1238
  def Exec(self, feedback_fn):
1239
    jobs = []
1240

    
1241
    if self.op.group_name:
1242
      groups = [self.op.group_name]
1243
      depends_fn = lambda: None
1244
    else:
1245
      groups = self.cfg.GetNodeGroupList()
1246

    
1247
      # Verify global configuration
1248
      jobs.append([
1249
        opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1250
        ])
1251

    
1252
      # Always depend on global verification
1253
      depends_fn = lambda: [(-len(jobs), [])]
1254

    
1255
    jobs.extend(
1256
      [opcodes.OpClusterVerifyGroup(group_name=group,
1257
                                    ignore_errors=self.op.ignore_errors,
1258
                                    depends=depends_fn())]
1259
      for group in groups)
1260

    
1261
    # Fix up all parameters
1262
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1263
      op.debug_simulate_errors = self.op.debug_simulate_errors
1264
      op.verbose = self.op.verbose
1265
      op.error_codes = self.op.error_codes
1266
      try:
1267
        op.skip_checks = self.op.skip_checks
1268
      except AttributeError:
1269
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1270

    
1271
    return ResultWithJobs(jobs)
1272

    
1273

    
1274
class _VerifyErrors(object):
1275
  """Mix-in for cluster/group verify LUs.
1276

1277
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1278
  self.op and self._feedback_fn to be available.)
1279

1280
  """
1281

    
1282
  ETYPE_FIELD = "code"
1283
  ETYPE_ERROR = "ERROR"
1284
  ETYPE_WARNING = "WARNING"
1285

    
1286
  def _Error(self, ecode, item, msg, *args, **kwargs):
1287
    """Format an error message.
1288

1289
    Based on the opcode's error_codes parameter, either format a
1290
    parseable error code, or a simpler error string.
1291

1292
    This must be called only from Exec and functions called from Exec.
1293

1294
    """
1295
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1296
    itype, etxt, _ = ecode
1297
    # If the error code is in the list of ignored errors, demote the error to a
1298
    # warning
1299
    if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1300
      ltype = self.ETYPE_WARNING
1301
    # first complete the msg
1302
    if args:
1303
      msg = msg % args
1304
    # then format the whole message
1305
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1306
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1307
    else:
1308
      if item:
1309
        item = " " + item
1310
      else:
1311
        item = ""
1312
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1313
    # and finally report it via the feedback_fn
1314
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1315
    # do not mark the operation as failed for WARN cases only
1316
    if ltype == self.ETYPE_ERROR:
1317
      self.bad = True
1318

    
1319
  def _ErrorIf(self, cond, *args, **kwargs):
1320
    """Log an error message if the passed condition is True.
1321

1322
    """
1323
    if (bool(cond)
1324
        or self.op.debug_simulate_errors): # pylint: disable=E1101
1325
      self._Error(*args, **kwargs)
1326

    
1327

    
1328
def _VerifyCertificate(filename):
1329
  """Verifies a certificate for L{LUClusterVerifyConfig}.
1330

1331
  @type filename: string
1332
  @param filename: Path to PEM file
1333

1334
  """
1335
  try:
1336
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1337
                                           utils.ReadFile(filename))
1338
  except Exception, err: # pylint: disable=W0703
1339
    return (LUClusterVerifyConfig.ETYPE_ERROR,
1340
            "Failed to load X509 certificate %s: %s" % (filename, err))
1341

    
1342
  (errcode, msg) = \
1343
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1344
                                constants.SSL_CERT_EXPIRATION_ERROR)
1345

    
1346
  if msg:
1347
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1348
  else:
1349
    fnamemsg = None
1350

    
1351
  if errcode is None:
1352
    return (None, fnamemsg)
1353
  elif errcode == utils.CERT_WARNING:
1354
    return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1355
  elif errcode == utils.CERT_ERROR:
1356
    return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1357

    
1358
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1359

    
1360

    
1361
def _GetAllHypervisorParameters(cluster, instances):
1362
  """Compute the set of all hypervisor parameters.
1363

1364
  @type cluster: L{objects.Cluster}
1365
  @param cluster: the cluster object
1366
  @param instances: list of L{objects.Instance}
1367
  @param instances: additional instances from which to obtain parameters
1368
  @rtype: list of (origin, hypervisor, parameters)
1369
  @return: a list with all parameters found, indicating the hypervisor they
1370
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1371

1372
  """
1373
  hvp_data = []
1374

    
1375
  for hv_name in cluster.enabled_hypervisors:
1376
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1377

    
1378
  for os_name, os_hvp in cluster.os_hvp.items():
1379
    for hv_name, hv_params in os_hvp.items():
1380
      if hv_params:
1381
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1382
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1383

    
1384
  # TODO: collapse identical parameter values in a single one
1385
  for instance in instances:
1386
    if instance.hvparams:
1387
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1388
                       cluster.FillHV(instance)))
1389

    
1390
  return hvp_data
1391

    
1392

    
1393
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1394
  """Verifies the cluster config.
1395

1396
  """
1397
  REQ_BGL = False
1398

    
1399
  def _VerifyHVP(self, hvp_data):
1400
    """Verifies locally the syntax of the hypervisor parameters.
1401

1402
    """
1403
    for item, hv_name, hv_params in hvp_data:
1404
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1405
             (item, hv_name))
1406
      try:
1407
        hv_class = hypervisor.GetHypervisorClass(hv_name)
1408
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1409
        hv_class.CheckParameterSyntax(hv_params)
1410
      except errors.GenericError, err:
1411
        self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1412

    
1413
  def ExpandNames(self):
1414
    self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1415
    self.share_locks = ShareAll()
1416

    
1417
  def CheckPrereq(self):
1418
    """Check prerequisites.
1419

1420
    """
1421
    # Retrieve all information
1422
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1423
    self.all_node_info = self.cfg.GetAllNodesInfo()
1424
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1425

    
1426
  def Exec(self, feedback_fn):
1427
    """Verify integrity of cluster, performing various test on nodes.
1428

1429
    """
1430
    self.bad = False
1431
    self._feedback_fn = feedback_fn
1432

    
1433
    feedback_fn("* Verifying cluster config")
1434

    
1435
    for msg in self.cfg.VerifyConfig():
1436
      self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1437

    
1438
    feedback_fn("* Verifying cluster certificate files")
1439

    
1440
    for cert_filename in pathutils.ALL_CERT_FILES:
1441
      (errcode, msg) = _VerifyCertificate(cert_filename)
1442
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1443

    
1444
    self._ErrorIf(not utils.CanRead(constants.CONFD_USER,
1445
                                    pathutils.NODED_CERT_FILE),
1446
                  constants.CV_ECLUSTERCERT,
1447
                  None,
1448
                  pathutils.NODED_CERT_FILE + " must be accessible by the " +
1449
                    constants.CONFD_USER + " user")
1450

    
1451
    feedback_fn("* Verifying hypervisor parameters")
1452

    
1453
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1454
                                                self.all_inst_info.values()))
1455

    
1456
    feedback_fn("* Verifying all nodes belong to an existing group")
1457

    
1458
    # We do this verification here because, should this bogus circumstance
1459
    # occur, it would never be caught by VerifyGroup, which only acts on
1460
    # nodes/instances reachable from existing node groups.
1461

    
1462
    dangling_nodes = set(node for node in self.all_node_info.values()
1463
                         if node.group not in self.all_group_info)
1464

    
1465
    dangling_instances = {}
1466
    no_node_instances = []
1467

    
1468
    for inst in self.all_inst_info.values():
1469
      if inst.primary_node in [node.uuid for node in dangling_nodes]:
1470
        dangling_instances.setdefault(inst.primary_node, []).append(inst)
1471
      elif inst.primary_node not in self.all_node_info:
1472
        no_node_instances.append(inst)
1473

    
1474
    pretty_dangling = [
1475
        "%s (%s)" %
1476
        (node.name,
1477
         utils.CommaJoin(
1478
           self.cfg.GetInstanceNames(
1479
             dangling_instances.get(node.uuid, ["no instances"]))))
1480
        for node in dangling_nodes]
1481

    
1482
    self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1483
                  None,
1484
                  "the following nodes (and their instances) belong to a non"
1485
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1486

    
1487
    self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1488
                  None,
1489
                  "the following instances have a non-existing primary-node:"
1490
                  " %s", utils.CommaJoin(
1491
                           self.cfg.GetInstanceNames(no_node_instances)))
1492

    
1493
    return not self.bad
1494

    
1495

    
1496
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1497
  """Verifies the status of a node group.
1498

1499
  """
1500
  HPATH = "cluster-verify"
1501
  HTYPE = constants.HTYPE_CLUSTER
1502
  REQ_BGL = False
1503

    
1504
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1505

    
1506
  class NodeImage(object):
1507
    """A class representing the logical and physical status of a node.
1508

1509
    @type uuid: string
1510
    @ivar uuid: the node UUID to which this object refers
1511
    @ivar volumes: a structure as returned from
1512
        L{ganeti.backend.GetVolumeList} (runtime)
1513
    @ivar instances: a list of running instances (runtime)
1514
    @ivar pinst: list of configured primary instances (config)
1515
    @ivar sinst: list of configured secondary instances (config)
1516
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1517
        instances for which this node is secondary (config)
1518
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1519
    @ivar dfree: free disk, as reported by the node (runtime)
1520
    @ivar offline: the offline status (config)
1521
    @type rpc_fail: boolean
1522
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1523
        not whether the individual keys were correct) (runtime)
1524
    @type lvm_fail: boolean
1525
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1526
    @type hyp_fail: boolean
1527
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1528
    @type ghost: boolean
1529
    @ivar ghost: whether this is a known node or not (config)
1530
    @type os_fail: boolean
1531
    @ivar os_fail: whether the RPC call didn't return valid OS data
1532
    @type oslist: list
1533
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1534
    @type vm_capable: boolean
1535
    @ivar vm_capable: whether the node can host instances
1536
    @type pv_min: float
1537
    @ivar pv_min: size in MiB of the smallest PVs
1538
    @type pv_max: float
1539
    @ivar pv_max: size in MiB of the biggest PVs
1540

1541
    """
1542
    def __init__(self, offline=False, uuid=None, vm_capable=True):
1543
      self.uuid = uuid
1544
      self.volumes = {}
1545
      self.instances = []
1546
      self.pinst = []
1547
      self.sinst = []
1548
      self.sbp = {}
1549
      self.mfree = 0
1550
      self.dfree = 0
1551
      self.offline = offline
1552
      self.vm_capable = vm_capable
1553
      self.rpc_fail = False
1554
      self.lvm_fail = False
1555
      self.hyp_fail = False
1556
      self.ghost = False
1557
      self.os_fail = False
1558
      self.oslist = {}
1559
      self.pv_min = None
1560
      self.pv_max = None
1561

    
1562
  def ExpandNames(self):
1563
    # This raises errors.OpPrereqError on its own:
1564
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1565

    
1566
    # Get instances in node group; this is unsafe and needs verification later
1567
    inst_uuids = \
1568
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1569

    
1570
    self.needed_locks = {
1571
      locking.LEVEL_INSTANCE: self.cfg.GetInstanceNames(inst_uuids),
1572
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1573
      locking.LEVEL_NODE: [],
1574

    
1575
      # This opcode is run by watcher every five minutes and acquires all nodes
1576
      # for a group. It doesn't run for a long time, so it's better to acquire
1577
      # the node allocation lock as well.
1578
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1579
      }
1580

    
1581
    self.share_locks = ShareAll()
1582

    
1583
  def DeclareLocks(self, level):
1584
    if level == locking.LEVEL_NODE:
1585
      # Get members of node group; this is unsafe and needs verification later
1586
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1587

    
1588
      # In Exec(), we warn about mirrored instances that have primary and
1589
      # secondary living in separate node groups. To fully verify that
1590
      # volumes for these instances are healthy, we will need to do an
1591
      # extra call to their secondaries. We ensure here those nodes will
1592
      # be locked.
1593
      for inst_name in self.owned_locks(locking.LEVEL_INSTANCE):
1594
        # Important: access only the instances whose lock is owned
1595
        instance = self.cfg.GetInstanceInfoByName(inst_name)
1596
        if instance.disk_template in constants.DTS_INT_MIRROR:
1597
          nodes.update(instance.secondary_nodes)
1598

    
1599
      self.needed_locks[locking.LEVEL_NODE] = nodes
1600

    
1601
  def CheckPrereq(self):
1602
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1603
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1604

    
1605
    group_node_uuids = set(self.group_info.members)
1606
    group_inst_uuids = \
1607
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1608

    
1609
    unlocked_node_uuids = \
1610
        group_node_uuids.difference(self.owned_locks(locking.LEVEL_NODE))
1611

    
1612
    unlocked_inst_uuids = \
1613
        group_inst_uuids.difference(
1614
          [self.cfg.GetInstanceInfoByName(name).uuid
1615
           for name in self.owned_locks(locking.LEVEL_INSTANCE)])
1616

    
1617
    if unlocked_node_uuids:
1618
      raise errors.OpPrereqError(
1619
        "Missing lock for nodes: %s" %
1620
        utils.CommaJoin(self.cfg.GetNodeNames(unlocked_node_uuids)),
1621
        errors.ECODE_STATE)
1622

    
1623
    if unlocked_inst_uuids:
1624
      raise errors.OpPrereqError(
1625
        "Missing lock for instances: %s" %
1626
        utils.CommaJoin(self.cfg.GetInstanceNames(unlocked_inst_uuids)),
1627
        errors.ECODE_STATE)
1628

    
1629
    self.all_node_info = self.cfg.GetAllNodesInfo()
1630
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1631

    
1632
    self.my_node_uuids = group_node_uuids
1633
    self.my_node_info = dict((node_uuid, self.all_node_info[node_uuid])
1634
                             for node_uuid in group_node_uuids)
1635

    
1636
    self.my_inst_uuids = group_inst_uuids
1637
    self.my_inst_info = dict((inst_uuid, self.all_inst_info[inst_uuid])
1638
                             for inst_uuid in group_inst_uuids)
1639

    
1640
    # We detect here the nodes that will need the extra RPC calls for verifying
1641
    # split LV volumes; they should be locked.
1642
    extra_lv_nodes = set()
1643

    
1644
    for inst in self.my_inst_info.values():
1645
      if inst.disk_template in constants.DTS_INT_MIRROR:
1646
        for nuuid in inst.all_nodes:
1647
          if self.all_node_info[nuuid].group != self.group_uuid:
1648
            extra_lv_nodes.add(nuuid)
1649

    
1650
    unlocked_lv_nodes = \
1651
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1652

    
1653
    if unlocked_lv_nodes:
1654
      raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1655
                                 utils.CommaJoin(unlocked_lv_nodes),
1656
                                 errors.ECODE_STATE)
1657
    self.extra_lv_nodes = list(extra_lv_nodes)
1658

    
1659
  def _VerifyNode(self, ninfo, nresult):
1660
    """Perform some basic validation on data returned from a node.
1661

1662
      - check the result data structure is well formed and has all the
1663
        mandatory fields
1664
      - check ganeti version
1665

1666
    @type ninfo: L{objects.Node}
1667
    @param ninfo: the node to check
1668
    @param nresult: the results from the node
1669
    @rtype: boolean
1670
    @return: whether overall this call was successful (and we can expect
1671
         reasonable values in the respose)
1672

1673
    """
1674
    # main result, nresult should be a non-empty dict
1675
    test = not nresult or not isinstance(nresult, dict)
1676
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1677
                  "unable to verify node: no data returned")
1678
    if test:
1679
      return False
1680

    
1681
    # compares ganeti version
1682
    local_version = constants.PROTOCOL_VERSION
1683
    remote_version = nresult.get("version", None)
1684
    test = not (remote_version and
1685
                isinstance(remote_version, (list, tuple)) and
1686
                len(remote_version) == 2)
1687
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1688
                  "connection to node returned invalid data")
1689
    if test:
1690
      return False
1691

    
1692
    test = local_version != remote_version[0]
1693
    self._ErrorIf(test, constants.CV_ENODEVERSION, ninfo.name,
1694
                  "incompatible protocol versions: master %s,"
1695
                  " node %s", local_version, remote_version[0])
1696
    if test:
1697
      return False
1698

    
1699
    # node seems compatible, we can actually try to look into its results
1700

    
1701
    # full package version
1702
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1703
                  constants.CV_ENODEVERSION, ninfo.name,
1704
                  "software version mismatch: master %s, node %s",
1705
                  constants.RELEASE_VERSION, remote_version[1],
1706
                  code=self.ETYPE_WARNING)
1707

    
1708
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1709
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1710
      for hv_name, hv_result in hyp_result.iteritems():
1711
        test = hv_result is not None
1712
        self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
1713
                      "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1714

    
1715
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1716
    if ninfo.vm_capable and isinstance(hvp_result, list):
1717
      for item, hv_name, hv_result in hvp_result:
1718
        self._ErrorIf(True, constants.CV_ENODEHV, ninfo.name,
1719
                      "hypervisor %s parameter verify failure (source %s): %s",
1720
                      hv_name, item, hv_result)
1721

    
1722
    test = nresult.get(constants.NV_NODESETUP,
1723
                       ["Missing NODESETUP results"])
1724
    self._ErrorIf(test, constants.CV_ENODESETUP, ninfo.name,
1725
                  "node setup error: %s", "; ".join(test))
1726

    
1727
    return True
1728

    
1729
  def _VerifyNodeTime(self, ninfo, nresult,
1730
                      nvinfo_starttime, nvinfo_endtime):
1731
    """Check the node time.
1732

1733
    @type ninfo: L{objects.Node}
1734
    @param ninfo: the node to check
1735
    @param nresult: the remote results for the node
1736
    @param nvinfo_starttime: the start time of the RPC call
1737
    @param nvinfo_endtime: the end time of the RPC call
1738

1739
    """
1740
    ntime = nresult.get(constants.NV_TIME, None)
1741
    try:
1742
      ntime_merged = utils.MergeTime(ntime)
1743
    except (ValueError, TypeError):
1744
      self._ErrorIf(True, constants.CV_ENODETIME, ninfo.name,
1745
                    "Node returned invalid time")
1746
      return
1747

    
1748
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1749
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1750
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1751
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1752
    else:
1753
      ntime_diff = None
1754

    
1755
    self._ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, ninfo.name,
1756
                  "Node time diverges by at least %s from master node time",
1757
                  ntime_diff)
1758

    
1759
  def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1760
    """Check the node LVM results and update info for cross-node checks.
1761

1762
    @type ninfo: L{objects.Node}
1763
    @param ninfo: the node to check
1764
    @param nresult: the remote results for the node
1765
    @param vg_name: the configured VG name
1766
    @type nimg: L{NodeImage}
1767
    @param nimg: node image
1768

1769
    """
1770
    if vg_name is None:
1771
      return
1772

    
1773
    # checks vg existence and size > 20G
1774
    vglist = nresult.get(constants.NV_VGLIST, None)
1775
    test = not vglist
1776
    self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
1777
                  "unable to check volume groups")
1778
    if not test:
1779
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1780
                                            constants.MIN_VG_SIZE)
1781
      self._ErrorIf(vgstatus, constants.CV_ENODELVM, ninfo.name, vgstatus)
1782

    
1783
    # Check PVs
1784
    (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1785
    for em in errmsgs:
1786
      self._Error(constants.CV_ENODELVM, ninfo.name, em)
1787
    if pvminmax is not None:
1788
      (nimg.pv_min, nimg.pv_max) = pvminmax
1789

    
1790
  def _VerifyGroupDRBDVersion(self, node_verify_infos):
1791
    """Check cross-node DRBD version consistency.
1792

1793
    @type node_verify_infos: dict
1794
    @param node_verify_infos: infos about nodes as returned from the
1795
      node_verify call.
1796

1797
    """
1798
    node_versions = {}
1799
    for node_uuid, ndata in node_verify_infos.items():
1800
      nresult = ndata.payload
1801
      version = nresult.get(constants.NV_DRBDVERSION, "Missing DRBD version")
1802
      node_versions[node_uuid] = version
1803

    
1804
    if len(set(node_versions.values())) > 1:
1805
      for node_uuid, version in sorted(node_versions.items()):
1806
        msg = "DRBD version mismatch: %s" % version
1807
        self._Error(constants.CV_ENODEDRBDHELPER, node_uuid, msg,
1808
                    code=self.ETYPE_WARNING)
1809

    
1810
  def _VerifyGroupLVM(self, node_image, vg_name):
1811
    """Check cross-node consistency in LVM.
1812

1813
    @type node_image: dict
1814
    @param node_image: info about nodes, mapping from node to names to
1815
      L{NodeImage} objects
1816
    @param vg_name: the configured VG name
1817

1818
    """
1819
    if vg_name is None:
1820
      return
1821

    
1822
    # Only exclusive storage needs this kind of checks
1823
    if not self._exclusive_storage:
1824
      return
1825

    
1826
    # exclusive_storage wants all PVs to have the same size (approximately),
1827
    # if the smallest and the biggest ones are okay, everything is fine.
1828
    # pv_min is None iff pv_max is None
1829
    vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1830
    if not vals:
1831
      return
1832
    (pvmin, minnode_uuid) = min((ni.pv_min, ni.uuid) for ni in vals)
1833
    (pvmax, maxnode_uuid) = max((ni.pv_max, ni.uuid) for ni in vals)
1834
    bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1835
    self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1836
                  "PV sizes differ too much in the group; smallest (%s MB) is"
1837
                  " on %s, biggest (%s MB) is on %s",
1838
                  pvmin, self.cfg.GetNodeName(minnode_uuid),
1839
                  pvmax, self.cfg.GetNodeName(maxnode_uuid))
1840

    
1841
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1842
    """Check the node bridges.
1843

1844
    @type ninfo: L{objects.Node}
1845
    @param ninfo: the node to check
1846
    @param nresult: the remote results for the node
1847
    @param bridges: the expected list of bridges
1848

1849
    """
1850
    if not bridges:
1851
      return
1852

    
1853
    missing = nresult.get(constants.NV_BRIDGES, None)
1854
    test = not isinstance(missing, list)
1855
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1856
                  "did not return valid bridge information")
1857
    if not test:
1858
      self._ErrorIf(bool(missing), constants.CV_ENODENET, ninfo.name,
1859
                    "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1860

    
1861
  def _VerifyNodeUserScripts(self, ninfo, nresult):
1862
    """Check the results of user scripts presence and executability on the node
1863

1864
    @type ninfo: L{objects.Node}
1865
    @param ninfo: the node to check
1866
    @param nresult: the remote results for the node
1867

1868
    """
1869
    test = not constants.NV_USERSCRIPTS in nresult
1870
    self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1871
                  "did not return user scripts information")
1872

    
1873
    broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
1874
    if not test:
1875
      self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1876
                    "user scripts not present or not executable: %s" %
1877
                    utils.CommaJoin(sorted(broken_scripts)))
1878

    
1879
  def _VerifyNodeNetwork(self, ninfo, nresult):
1880
    """Check the node network connectivity results.
1881

1882
    @type ninfo: L{objects.Node}
1883
    @param ninfo: the node to check
1884
    @param nresult: the remote results for the node
1885

1886
    """
1887
    test = constants.NV_NODELIST not in nresult
1888
    self._ErrorIf(test, constants.CV_ENODESSH, ninfo.name,
1889
                  "node hasn't returned node ssh connectivity data")
1890
    if not test:
1891
      if nresult[constants.NV_NODELIST]:
1892
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1893
          self._ErrorIf(True, constants.CV_ENODESSH, ninfo.name,
1894
                        "ssh communication with node '%s': %s", a_node, a_msg)
1895

    
1896
    test = constants.NV_NODENETTEST not in nresult
1897
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1898
                  "node hasn't returned node tcp connectivity data")
1899
    if not test:
1900
      if nresult[constants.NV_NODENETTEST]:
1901
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1902
        for anode in nlist:
1903
          self._ErrorIf(True, constants.CV_ENODENET, ninfo.name,
1904
                        "tcp communication with node '%s': %s",
1905
                        anode, nresult[constants.NV_NODENETTEST][anode])
1906

    
1907
    test = constants.NV_MASTERIP not in nresult
1908
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1909
                  "node hasn't returned node master IP reachability data")
1910
    if not test:
1911
      if not nresult[constants.NV_MASTERIP]:
1912
        if ninfo.uuid == self.master_node:
1913
          msg = "the master node cannot reach the master IP (not configured?)"
1914
        else:
1915
          msg = "cannot reach the master IP"
1916
        self._ErrorIf(True, constants.CV_ENODENET, ninfo.name, msg)
1917

    
1918
  def _VerifyInstance(self, instance, node_image, diskstatus):
1919
    """Verify an instance.
1920

1921
    This function checks to see if the required block devices are
1922
    available on the instance's node, and that the nodes are in the correct
1923
    state.
1924

1925
    """
1926
    pnode_uuid = instance.primary_node
1927
    pnode_img = node_image[pnode_uuid]
1928
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
1929

    
1930
    node_vol_should = {}
1931
    instance.MapLVsByNode(node_vol_should)
1932

    
1933
    cluster = self.cfg.GetClusterInfo()
1934
    ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
1935
                                                            self.group_info)
1936
    err = ComputeIPolicyInstanceViolation(ipolicy, instance, self.cfg)
1937
    self._ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance.name,
1938
                  utils.CommaJoin(err), code=self.ETYPE_WARNING)
1939

    
1940
    for node_uuid in node_vol_should:
1941
      n_img = node_image[node_uuid]
1942
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1943
        # ignore missing volumes on offline or broken nodes
1944
        continue
1945
      for volume in node_vol_should[node_uuid]:
1946
        test = volume not in n_img.volumes
1947
        self._ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance.name,
1948
                      "volume %s missing on node %s", volume,
1949
                      self.cfg.GetNodeName(node_uuid))
1950

    
1951
    if instance.admin_state == constants.ADMINST_UP:
1952
      test = instance.uuid not in pnode_img.instances and not pnode_img.offline
1953
      self._ErrorIf(test, constants.CV_EINSTANCEDOWN, instance.name,
1954
                    "instance not running on its primary node %s",
1955
                     self.cfg.GetNodeName(pnode_uuid))
1956
      self._ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE,
1957
                    instance.name, "instance is marked as running and lives on"
1958
                    " offline node %s", self.cfg.GetNodeName(pnode_uuid))
1959

    
1960
    diskdata = [(nname, success, status, idx)
1961
                for (nname, disks) in diskstatus.items()
1962
                for idx, (success, status) in enumerate(disks)]
1963

    
1964
    for nname, success, bdev_status, idx in diskdata:
1965
      # the 'ghost node' construction in Exec() ensures that we have a
1966
      # node here
1967
      snode = node_image[nname]
1968
      bad_snode = snode.ghost or snode.offline
1969
      self._ErrorIf(instance.disks_active and
1970
                    not success and not bad_snode,
1971
                    constants.CV_EINSTANCEFAULTYDISK, instance.name,
1972
                    "couldn't retrieve status for disk/%s on %s: %s",
1973
                    idx, self.cfg.GetNodeName(nname), bdev_status)
1974

    
1975
      if instance.disks_active and success and \
1976
         (bdev_status.is_degraded or
1977
          bdev_status.ldisk_status != constants.LDS_OKAY):
1978
        msg = "disk/%s on %s" % (idx, self.cfg.GetNodeName(nname))
1979
        if bdev_status.is_degraded:
1980
          msg += " is degraded"
1981
        if bdev_status.ldisk_status != constants.LDS_OKAY:
1982
          msg += "; state is '%s'" % \
1983
                 constants.LDS_NAMES[bdev_status.ldisk_status]
1984

    
1985
        self._Error(constants.CV_EINSTANCEFAULTYDISK, instance.name, msg)
1986

    
1987
    self._ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
1988
                  constants.CV_ENODERPC, self.cfg.GetNodeName(pnode_uuid),
1989
                  "instance %s, connection to primary node failed",
1990
                  instance.name)
1991

    
1992
    self._ErrorIf(len(instance.secondary_nodes) > 1,
1993
                  constants.CV_EINSTANCELAYOUT, instance.name,
1994
                  "instance has multiple secondary nodes: %s",
1995
                  utils.CommaJoin(instance.secondary_nodes),
1996
                  code=self.ETYPE_WARNING)
1997

    
1998
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, instance.all_nodes)
1999
    if any(es_flags.values()):
2000
      if instance.disk_template not in constants.DTS_EXCL_STORAGE:
2001
        # Disk template not compatible with exclusive_storage: no instance
2002
        # node should have the flag set
2003
        es_nodes = [n
2004
                    for (n, es) in es_flags.items()
2005
                    if es]
2006
        self._Error(constants.CV_EINSTANCEUNSUITABLENODE, instance.name,
2007
                    "instance has template %s, which is not supported on nodes"
2008
                    " that have exclusive storage set: %s",
2009
                    instance.disk_template,
2010
                    utils.CommaJoin(self.cfg.GetNodeNames(es_nodes)))
2011
      for (idx, disk) in enumerate(instance.disks):
2012
        self._ErrorIf(disk.spindles is None,
2013
                      constants.CV_EINSTANCEMISSINGCFGPARAMETER, instance.name,
2014
                      "number of spindles not configured for disk %s while"
2015
                      " exclusive storage is enabled, try running"
2016
                      " gnt-cluster repair-disk-sizes", idx)
2017

    
2018
    if instance.disk_template in constants.DTS_INT_MIRROR:
2019
      instance_nodes = utils.NiceSort(instance.all_nodes)
2020
      instance_groups = {}
2021

    
2022
      for node_uuid in instance_nodes:
2023
        instance_groups.setdefault(self.all_node_info[node_uuid].group,
2024
                                   []).append(node_uuid)
2025

    
2026
      pretty_list = [
2027
        "%s (group %s)" % (utils.CommaJoin(self.cfg.GetNodeNames(nodes)),
2028
                           groupinfo[group].name)
2029
        # Sort so that we always list the primary node first.
2030
        for group, nodes in sorted(instance_groups.items(),
2031
                                   key=lambda (_, nodes): pnode_uuid in nodes,
2032
                                   reverse=True)]
2033

    
2034
      self._ErrorIf(len(instance_groups) > 1,
2035
                    constants.CV_EINSTANCESPLITGROUPS,
2036
                    instance.name, "instance has primary and secondary nodes in"
2037
                    " different groups: %s", utils.CommaJoin(pretty_list),
2038
                    code=self.ETYPE_WARNING)
2039

    
2040
    inst_nodes_offline = []
2041
    for snode in instance.secondary_nodes:
2042
      s_img = node_image[snode]
2043
      self._ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
2044
                    self.cfg.GetNodeName(snode),
2045
                    "instance %s, connection to secondary node failed",
2046
                    instance.name)
2047

    
2048
      if s_img.offline:
2049
        inst_nodes_offline.append(snode)
2050

    
2051
    # warn that the instance lives on offline nodes
2052
    self._ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE,
2053
                  instance.name, "instance has offline secondary node(s) %s",
2054
                  utils.CommaJoin(self.cfg.GetNodeNames(inst_nodes_offline)))
2055
    # ... or ghost/non-vm_capable nodes
2056
    for node_uuid in instance.all_nodes:
2057
      self._ErrorIf(node_image[node_uuid].ghost, constants.CV_EINSTANCEBADNODE,
2058
                    instance.name, "instance lives on ghost node %s",
2059
                    self.cfg.GetNodeName(node_uuid))
2060
      self._ErrorIf(not node_image[node_uuid].vm_capable,
2061
                    constants.CV_EINSTANCEBADNODE, instance.name,
2062
                    "instance lives on non-vm_capable node %s",
2063
                    self.cfg.GetNodeName(node_uuid))
2064

    
2065
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2066
    """Verify if there are any unknown volumes in the cluster.
2067

2068
    The .os, .swap and backup volumes are ignored. All other volumes are
2069
    reported as unknown.
2070

2071
    @type reserved: L{ganeti.utils.FieldSet}
2072
    @param reserved: a FieldSet of reserved volume names
2073

2074
    """
2075
    for node_uuid, n_img in node_image.items():
2076
      if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
2077
          self.all_node_info[node_uuid].group != self.group_uuid):
2078
        # skip non-healthy nodes
2079
        continue
2080
      for volume in n_img.volumes:
2081
        test = ((node_uuid not in node_vol_should or
2082
                volume not in node_vol_should[node_uuid]) and
2083
                not reserved.Matches(volume))
2084
        self._ErrorIf(test, constants.CV_ENODEORPHANLV,
2085
                      self.cfg.GetNodeName(node_uuid),
2086
                      "volume %s is unknown", volume)
2087

    
2088
  def _VerifyNPlusOneMemory(self, node_image, all_insts):
2089
    """Verify N+1 Memory Resilience.
2090

2091
    Check that if one single node dies we can still start all the
2092
    instances it was primary for.
2093

2094
    """
2095
    cluster_info = self.cfg.GetClusterInfo()
2096
    for node_uuid, n_img in node_image.items():
2097
      # This code checks that every node which is now listed as
2098
      # secondary has enough memory to host all instances it is
2099
      # supposed to should a single other node in the cluster fail.
2100
      # FIXME: not ready for failover to an arbitrary node
2101
      # FIXME: does not support file-backed instances
2102
      # WARNING: we currently take into account down instances as well
2103
      # as up ones, considering that even if they're down someone
2104
      # might want to start them even in the event of a node failure.
2105
      if n_img.offline or \
2106
         self.all_node_info[node_uuid].group != self.group_uuid:
2107
        # we're skipping nodes marked offline and nodes in other groups from
2108
        # the N+1 warning, since most likely we don't have good memory
2109
        # infromation from them; we already list instances living on such
2110
        # nodes, and that's enough warning
2111
        continue
2112
      #TODO(dynmem): also consider ballooning out other instances
2113
      for prinode, inst_uuids in n_img.sbp.items():
2114
        needed_mem = 0
2115
        for inst_uuid in inst_uuids:
2116
          bep = cluster_info.FillBE(all_insts[inst_uuid])
2117
          if bep[constants.BE_AUTO_BALANCE]:
2118
            needed_mem += bep[constants.BE_MINMEM]
2119
        test = n_img.mfree < needed_mem
2120
        self._ErrorIf(test, constants.CV_ENODEN1,
2121
                      self.cfg.GetNodeName(node_uuid),
2122
                      "not enough memory to accomodate instance failovers"
2123
                      " should node %s fail (%dMiB needed, %dMiB available)",
2124
                      self.cfg.GetNodeName(prinode), needed_mem, n_img.mfree)
2125

    
2126
  def _VerifyFiles(self, nodes, master_node_uuid, all_nvinfo,
2127
                   (files_all, files_opt, files_mc, files_vm)):
2128
    """Verifies file checksums collected from all nodes.
2129

2130
    @param nodes: List of L{objects.Node} objects
2131
    @param master_node_uuid: UUID of master node
2132
    @param all_nvinfo: RPC results
2133

2134
    """
2135
    # Define functions determining which nodes to consider for a file
2136
    files2nodefn = [
2137
      (files_all, None),
2138
      (files_mc, lambda node: (node.master_candidate or
2139
                               node.uuid == master_node_uuid)),
2140
      (files_vm, lambda node: node.vm_capable),
2141
      ]
2142

    
2143
    # Build mapping from filename to list of nodes which should have the file
2144
    nodefiles = {}
2145
    for (files, fn) in files2nodefn:
2146
      if fn is None:
2147
        filenodes = nodes
2148
      else:
2149
        filenodes = filter(fn, nodes)
2150
      nodefiles.update((filename,
2151
                        frozenset(map(operator.attrgetter("uuid"), filenodes)))
2152
                       for filename in files)
2153

    
2154
    assert set(nodefiles) == (files_all | files_mc | files_vm)
2155

    
2156
    fileinfo = dict((filename, {}) for filename in nodefiles)
2157
    ignore_nodes = set()
2158

    
2159
    for node in nodes:
2160
      if node.offline:
2161
        ignore_nodes.add(node.uuid)
2162
        continue
2163

    
2164
      nresult = all_nvinfo[node.uuid]
2165

    
2166
      if nresult.fail_msg or not nresult.payload:
2167
        node_files = None
2168
      else:
2169
        fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
2170
        node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2171
                          for (key, value) in fingerprints.items())
2172
        del fingerprints
2173

    
2174
      test = not (node_files and isinstance(node_files, dict))
2175
      self._ErrorIf(test, constants.CV_ENODEFILECHECK, node.name,
2176
                    "Node did not return file checksum data")
2177
      if test:
2178
        ignore_nodes.add(node.uuid)
2179
        continue
2180

    
2181
      # Build per-checksum mapping from filename to nodes having it
2182
      for (filename, checksum) in node_files.items():
2183
        assert filename in nodefiles
2184
        fileinfo[filename].setdefault(checksum, set()).add(node.uuid)
2185

    
2186
    for (filename, checksums) in fileinfo.items():
2187
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2188

    
2189
      # Nodes having the file
2190
      with_file = frozenset(node_uuid
2191
                            for node_uuids in fileinfo[filename].values()
2192
                            for node_uuid in node_uuids) - ignore_nodes
2193

    
2194
      expected_nodes = nodefiles[filename] - ignore_nodes
2195

    
2196
      # Nodes missing file
2197
      missing_file = expected_nodes - with_file
2198

    
2199
      if filename in files_opt:
2200
        # All or no nodes
2201
        self._ErrorIf(missing_file and missing_file != expected_nodes,
2202
                      constants.CV_ECLUSTERFILECHECK, None,
2203
                      "File %s is optional, but it must exist on all or no"
2204
                      " nodes (not found on %s)",
2205
                      filename,
2206
                      utils.CommaJoin(
2207
                        utils.NiceSort(
2208
                          map(self.cfg.GetNodeName, missing_file))))
2209
      else:
2210
        self._ErrorIf(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2211
                      "File %s is missing from node(s) %s", filename,
2212
                      utils.CommaJoin(
2213
                        utils.NiceSort(
2214
                          map(self.cfg.GetNodeName, missing_file))))
2215

    
2216
        # Warn if a node has a file it shouldn't
2217
        unexpected = with_file - expected_nodes
2218
        self._ErrorIf(unexpected,
2219
                      constants.CV_ECLUSTERFILECHECK, None,
2220
                      "File %s should not exist on node(s) %s",
2221
                      filename, utils.CommaJoin(
2222
                        utils.NiceSort(map(self.cfg.GetNodeName, unexpected))))
2223

    
2224
      # See if there are multiple versions of the file
2225
      test = len(checksums) > 1
2226
      if test:
2227
        variants = ["variant %s on %s" %
2228
                    (idx + 1,
2229
                     utils.CommaJoin(utils.NiceSort(
2230
                       map(self.cfg.GetNodeName, node_uuids))))
2231
                    for (idx, (checksum, node_uuids)) in
2232
                      enumerate(sorted(checksums.items()))]
2233
      else:
2234
        variants = []
2235

    
2236
      self._ErrorIf(test, constants.CV_ECLUSTERFILECHECK, None,
2237
                    "File %s found with %s different checksums (%s)",
2238
                    filename, len(checksums), "; ".join(variants))
2239

    
2240
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2241
                      drbd_map):
2242
    """Verifies and the node DRBD status.
2243

2244
    @type ninfo: L{objects.Node}
2245
    @param ninfo: the node to check
2246
    @param nresult: the remote results for the node
2247
    @param instanceinfo: the dict of instances
2248
    @param drbd_helper: the configured DRBD usermode helper
2249
    @param drbd_map: the DRBD map as returned by
2250
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2251

2252
    """
2253
    if drbd_helper:
2254
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2255
      test = (helper_result is None)
2256
      self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2257
                    "no drbd usermode helper returned")
2258
      if helper_result:
2259
        status, payload = helper_result
2260
        test = not status
2261
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2262
                      "drbd usermode helper check unsuccessful: %s", payload)
2263
        test = status and (payload != drbd_helper)
2264
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2265
                      "wrong drbd usermode helper: %s", payload)
2266

    
2267
    # compute the DRBD minors
2268
    node_drbd = {}
2269
    for minor, inst_uuid in drbd_map[ninfo.uuid].items():
2270
      test = inst_uuid not in instanceinfo
2271
      self._ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2272
                    "ghost instance '%s' in temporary DRBD map", inst_uuid)
2273
        # ghost instance should not be running, but otherwise we
2274
        # don't give double warnings (both ghost instance and
2275
        # unallocated minor in use)
2276
      if test:
2277
        node_drbd[minor] = (inst_uuid, False)
2278
      else:
2279
        instance = instanceinfo[inst_uuid]
2280
        node_drbd[minor] = (inst_uuid, instance.disks_active)
2281

    
2282
    # and now check them
2283
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2284
    test = not isinstance(used_minors, (tuple, list))
2285
    self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2286
                  "cannot parse drbd status file: %s", str(used_minors))
2287
    if test:
2288
      # we cannot check drbd status
2289
      return
2290

    
2291
    for minor, (inst_uuid, must_exist) in node_drbd.items():
2292
      test = minor not in used_minors and must_exist
2293
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2294
                    "drbd minor %d of instance %s is not active", minor,
2295
                    self.cfg.GetInstanceName(inst_uuid))
2296
    for minor in used_minors:
2297
      test = minor not in node_drbd
2298
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2299
                    "unallocated drbd minor %d is in use", minor)
2300

    
2301
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2302
    """Builds the node OS structures.
2303

2304
    @type ninfo: L{objects.Node}
2305
    @param ninfo: the node to check
2306
    @param nresult: the remote results for the node
2307
    @param nimg: the node image object
2308

2309
    """
2310
    remote_os = nresult.get(constants.NV_OSLIST, None)
2311
    test = (not isinstance(remote_os, list) or
2312
            not compat.all(isinstance(v, list) and len(v) == 7
2313
                           for v in remote_os))
2314

    
2315
    self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2316
                  "node hasn't returned valid OS data")
2317

    
2318
    nimg.os_fail = test
2319

    
2320
    if test:
2321
      return
2322

    
2323
    os_dict = {}
2324

    
2325
    for (name, os_path, status, diagnose,
2326
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2327

    
2328
      if name not in os_dict:
2329
        os_dict[name] = []
2330

    
2331
      # parameters is a list of lists instead of list of tuples due to
2332
      # JSON lacking a real tuple type, fix it:
2333
      parameters = [tuple(v) for v in parameters]
2334
      os_dict[name].append((os_path, status, diagnose,
2335
                            set(variants), set(parameters), set(api_ver)))
2336

    
2337
    nimg.oslist = os_dict
2338

    
2339
  def _VerifyNodeOS(self, ninfo, nimg, base):
2340
    """Verifies the node OS list.
2341

2342
    @type ninfo: L{objects.Node}
2343
    @param ninfo: the node to check
2344
    @param nimg: the node image object
2345
    @param base: the 'template' node we match against (e.g. from the master)
2346

2347
    """
2348
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2349

    
2350
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2351
    for os_name, os_data in nimg.oslist.items():
2352
      assert os_data, "Empty OS status for OS %s?!" % os_name
2353
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2354
      self._ErrorIf(not f_status, constants.CV_ENODEOS, ninfo.name,
2355
                    "Invalid OS %s (located at %s): %s",
2356
                    os_name, f_path, f_diag)
2357
      self._ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, ninfo.name,
2358
                    "OS '%s' has multiple entries"
2359
                    " (first one shadows the rest): %s",
2360
                    os_name, utils.CommaJoin([v[0] for v in os_data]))
2361
      # comparisons with the 'base' image
2362
      test = os_name not in base.oslist
2363
      self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2364
                    "Extra OS %s not present on reference node (%s)",
2365
                    os_name, self.cfg.GetNodeName(base.uuid))
2366
      if test:
2367
        continue
2368
      assert base.oslist[os_name], "Base node has empty OS status?"
2369
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2370
      if not b_status:
2371
        # base OS is invalid, skipping
2372
        continue
2373
      for kind, a, b in [("API version", f_api, b_api),
2374
                         ("variants list", f_var, b_var),
2375
                         ("parameters", beautify_params(f_param),
2376
                          beautify_params(b_param))]:
2377
        self._ErrorIf(a != b, constants.CV_ENODEOS, ninfo.name,
2378
                      "OS %s for %s differs from reference node %s:"
2379
                      " [%s] vs. [%s]", kind, os_name,
2380
                      self.cfg.GetNodeName(base.uuid),
2381
                      utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2382

    
2383
    # check any missing OSes
2384
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2385
    self._ErrorIf(missing, constants.CV_ENODEOS, ninfo.name,
2386
                  "OSes present on reference node %s"
2387
                  " but missing on this node: %s",
2388
                  self.cfg.GetNodeName(base.uuid), utils.CommaJoin(missing))
2389

    
2390
  def _VerifyAcceptedFileStoragePaths(self, ninfo, nresult, is_master):
2391
    """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2392

2393
    @type ninfo: L{objects.Node}
2394
    @param ninfo: the node to check
2395
    @param nresult: the remote results for the node
2396
    @type is_master: bool
2397
    @param is_master: Whether node is the master node
2398

2399
    """
2400
    cluster = self.cfg.GetClusterInfo()
2401
    if (is_master and
2402
        (cluster.IsFileStorageEnabled() or
2403
         cluster.IsSharedFileStorageEnabled())):
2404
      try:
2405
        fspaths = nresult[constants.NV_ACCEPTED_STORAGE_PATHS]
2406
      except KeyError:
2407
        # This should never happen
2408
        self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2409
                      "Node did not return forbidden file storage paths")
2410
      else:
2411
        self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2412
                      "Found forbidden file storage paths: %s",
2413
                      utils.CommaJoin(fspaths))
2414
    else:
2415
      self._ErrorIf(constants.NV_ACCEPTED_STORAGE_PATHS in nresult,
2416
                    constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2417
                    "Node should not have returned forbidden file storage"
2418
                    " paths")
2419

    
2420
  def _VerifyStoragePaths(self, ninfo, nresult):
2421
    """Verifies (file) storage paths.
2422

2423
    @type ninfo: L{objects.Node}
2424
    @param ninfo: the node to check
2425
    @param nresult: the remote results for the node
2426

2427
    """
2428
    cluster = self.cfg.GetClusterInfo()
2429
    if cluster.IsFileStorageEnabled():
2430
      self._ErrorIf(
2431
          constants.NV_FILE_STORAGE_PATH in nresult,
2432
          constants.CV_ENODEFILESTORAGEPATHUNUSABLE, ninfo.name,
2433
          "The configured file storage path is unusable: %s" %
2434
          nresult.get(constants.NV_FILE_STORAGE_PATH))
2435

    
2436
  def _VerifyOob(self, ninfo, nresult):
2437
    """Verifies out of band functionality of a node.
2438

2439
    @type ninfo: L{objects.Node}
2440
    @param ninfo: the node to check
2441
    @param nresult: the remote results for the node
2442

2443
    """
2444
    # We just have to verify the paths on master and/or master candidates
2445
    # as the oob helper is invoked on the master
2446
    if ((ninfo.master_candidate or ninfo.master_capable) and
2447
        constants.NV_OOB_PATHS in nresult):
2448
      for path_result in nresult[constants.NV_OOB_PATHS]:
2449
        self._ErrorIf(path_result, constants.CV_ENODEOOBPATH,
2450
                      ninfo.name, path_result)
2451

    
2452
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2453
    """Verifies and updates the node volume data.
2454

2455
    This function will update a L{NodeImage}'s internal structures
2456
    with data from the remote call.
2457

2458
    @type ninfo: L{objects.Node}
2459
    @param ninfo: the node to check
2460
    @param nresult: the remote results for the node
2461
    @param nimg: the node image object
2462
    @param vg_name: the configured VG name
2463

2464
    """
2465
    nimg.lvm_fail = True
2466
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2467
    if vg_name is None:
2468
      pass
2469
    elif isinstance(lvdata, basestring):
2470
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2471
                    "LVM problem on node: %s", utils.SafeEncode(lvdata))
2472
    elif not isinstance(lvdata, dict):
2473
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2474
                    "rpc call to node failed (lvlist)")
2475
    else:
2476
      nimg.volumes = lvdata
2477
      nimg.lvm_fail = False
2478

    
2479
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2480
    """Verifies and updates the node instance list.
2481

2482
    If the listing was successful, then updates this node's instance
2483
    list. Otherwise, it marks the RPC call as failed for the instance
2484
    list key.
2485

2486
    @type ninfo: L{objects.Node}
2487
    @param ninfo: the node to check
2488
    @param nresult: the remote results for the node
2489
    @param nimg: the node image object
2490

2491
    """
2492
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2493
    test = not isinstance(idata, list)
2494
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2495
                  "rpc call to node failed (instancelist): %s",
2496
                  utils.SafeEncode(str(idata)))
2497
    if test:
2498
      nimg.hyp_fail = True
2499
    else:
2500
      nimg.instances = [inst.uuid for (_, inst) in
2501
                        self.cfg.GetMultiInstanceInfoByName(idata)]
2502

    
2503
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2504
    """Verifies and computes a node information map
2505

2506
    @type ninfo: L{objects.Node}
2507
    @param ninfo: the node to check
2508
    @param nresult: the remote results for the node
2509
    @param nimg: the node image object
2510
    @param vg_name: the configured VG name
2511

2512
    """
2513
    # try to read free memory (from the hypervisor)
2514
    hv_info = nresult.get(constants.NV_HVINFO, None)
2515
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2516
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2517
                  "rpc call to node failed (hvinfo)")
2518
    if not test:
2519
      try:
2520
        nimg.mfree = int(hv_info["memory_free"])
2521
      except (ValueError, TypeError):
2522
        self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2523
                      "node returned invalid nodeinfo, check hypervisor")
2524

    
2525
    # FIXME: devise a free space model for file based instances as well
2526
    if vg_name is not None:
2527
      test = (constants.NV_VGLIST not in nresult or
2528
              vg_name not in nresult[constants.NV_VGLIST])
2529
      self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
2530
                    "node didn't return data for the volume group '%s'"
2531
                    " - it is either missing or broken", vg_name)
2532
      if not test:
2533
        try:
2534
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2535
        except (ValueError, TypeError):
2536
          self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2537
                        "node returned invalid LVM info, check LVM status")
2538

    
2539
  def _CollectDiskInfo(self, node_uuids, node_image, instanceinfo):
2540
    """Gets per-disk status information for all instances.
2541

2542
    @type node_uuids: list of strings
2543
    @param node_uuids: Node UUIDs
2544
    @type node_image: dict of (UUID, L{objects.Node})
2545
    @param node_image: Node objects
2546
    @type instanceinfo: dict of (UUID, L{objects.Instance})
2547
    @param instanceinfo: Instance objects
2548
    @rtype: {instance: {node: [(succes, payload)]}}
2549
    @return: a dictionary of per-instance dictionaries with nodes as
2550
        keys and disk information as values; the disk information is a
2551
        list of tuples (success, payload)
2552

2553
    """
2554
    node_disks = {}
2555
    node_disks_devonly = {}
2556
    diskless_instances = set()
2557
    diskless = constants.DT_DISKLESS
2558

    
2559
    for nuuid in node_uuids:
2560
      node_inst_uuids = list(itertools.chain(node_image[nuuid].pinst,
2561
                                             node_image[nuuid].sinst))
2562
      diskless_instances.update(uuid for uuid in node_inst_uuids
2563
                                if instanceinfo[uuid].disk_template == diskless)
2564
      disks = [(inst_uuid, disk)
2565
               for inst_uuid in node_inst_uuids
2566
               for disk in instanceinfo[inst_uuid].disks]
2567

    
2568
      if not disks:
2569
        # No need to collect data
2570
        continue
2571

    
2572
      node_disks[nuuid] = disks
2573

    
2574
      # _AnnotateDiskParams makes already copies of the disks
2575
      devonly = []
2576
      for (inst_uuid, dev) in disks:
2577
        (anno_disk,) = AnnotateDiskParams(instanceinfo[inst_uuid], [dev],
2578
                                          self.cfg)
2579
        self.cfg.SetDiskID(anno_disk, nuuid)
2580
        devonly.append(anno_disk)
2581

    
2582
      node_disks_devonly[nuuid] = devonly
2583

    
2584
    assert len(node_disks) == len(node_disks_devonly)
2585

    
2586
    # Collect data from all nodes with disks
2587
    result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2588
                                                          node_disks_devonly)
2589

    
2590
    assert len(result) == len(node_disks)
2591

    
2592
    instdisk = {}
2593

    
2594
    for (nuuid, nres) in result.items():
2595
      node = self.cfg.GetNodeInfo(nuuid)
2596
      disks = node_disks[node.uuid]
2597

    
2598
      if nres.offline:
2599
        # No data from this node
2600
        data = len(disks) * [(False, "node offline")]
2601
      else:
2602
        msg = nres.fail_msg
2603
        self._ErrorIf(msg, constants.CV_ENODERPC, node.name,
2604
                      "while getting disk information: %s", msg)
2605
        if msg:
2606
          # No data from this node
2607
          data = len(disks) * [(False, msg)]
2608
        else:
2609
          data = []
2610
          for idx, i in enumerate(nres.payload):
2611
            if isinstance(i, (tuple, list)) and len(i) == 2:
2612
              data.append(i)
2613
            else:
2614
              logging.warning("Invalid result from node %s, entry %d: %s",
2615
                              node.name, idx, i)
2616
              data.append((False, "Invalid result from the remote node"))
2617

    
2618
      for ((inst_uuid, _), status) in zip(disks, data):
2619
        instdisk.setdefault(inst_uuid, {}).setdefault(node.uuid, []) \
2620
          .append(status)
2621

    
2622
    # Add empty entries for diskless instances.
2623
    for inst_uuid in diskless_instances:
2624
      assert inst_uuid not in instdisk
2625
      instdisk[inst_uuid] = {}
2626

    
2627
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2628
                      len(nuuids) <= len(instanceinfo[inst].all_nodes) and
2629
                      compat.all(isinstance(s, (tuple, list)) and
2630
                                 len(s) == 2 for s in statuses)
2631
                      for inst, nuuids in instdisk.items()
2632
                      for nuuid, statuses in nuuids.items())
2633
    if __debug__:
2634
      instdisk_keys = set(instdisk)
2635
      instanceinfo_keys = set(instanceinfo)
2636
      assert instdisk_keys == instanceinfo_keys, \
2637
        ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2638
         (instdisk_keys, instanceinfo_keys))
2639

    
2640
    return instdisk
2641

    
2642
  @staticmethod
2643
  def _SshNodeSelector(group_uuid, all_nodes):
2644
    """Create endless iterators for all potential SSH check hosts.
2645

2646
    """
2647
    nodes = [node for node in all_nodes
2648
             if (node.group != group_uuid and
2649
                 not node.offline)]
2650
    keyfunc = operator.attrgetter("group")
2651

    
2652
    return map(itertools.cycle,
2653
               [sorted(map(operator.attrgetter("name"), names))
2654
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2655
                                                  keyfunc)])
2656

    
2657
  @classmethod
2658
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2659
    """Choose which nodes should talk to which other nodes.
2660

2661
    We will make nodes contact all nodes in their group, and one node from
2662
    every other group.
2663

2664
    @warning: This algorithm has a known issue if one node group is much
2665
      smaller than others (e.g. just one node). In such a case all other
2666
      nodes will talk to the single node.
2667

2668
    """
2669
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2670
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
2671

    
2672
    return (online_nodes,
2673
            dict((name, sorted([i.next() for i in sel]))
2674
                 for name in online_nodes))
2675

    
2676
  def BuildHooksEnv(self):
2677
    """Build hooks env.
2678

2679
    Cluster-Verify hooks just ran in the post phase and their failure makes
2680
    the output be logged in the verify output and the verification to fail.
2681

2682
    """
2683
    env = {
2684
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2685
      }
2686

    
2687
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2688
               for node in self.my_node_info.values())
2689

    
2690
    return env
2691

    
2692
  def BuildHooksNodes(self):
2693
    """Build hooks nodes.
2694

2695
    """
2696
    return ([], list(self.my_node_info.keys()))
2697

    
2698
  def Exec(self, feedback_fn):
2699
    """Verify integrity of the node group, performing various test on nodes.
2700

2701
    """
2702
    # This method has too many local variables. pylint: disable=R0914
2703
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
2704

    
2705
    if not self.my_node_uuids:
2706
      # empty node group
2707
      feedback_fn("* Empty node group, skipping verification")
2708
      return True
2709

    
2710
    self.bad = False
2711
    verbose = self.op.verbose
2712
    self._feedback_fn = feedback_fn
2713

    
2714
    vg_name = self.cfg.GetVGName()
2715
    drbd_helper = self.cfg.GetDRBDHelper()
2716
    cluster = self.cfg.GetClusterInfo()
2717
    hypervisors = cluster.enabled_hypervisors
2718
    node_data_list = self.my_node_info.values()
2719

    
2720
    i_non_redundant = [] # Non redundant instances
2721
    i_non_a_balanced = [] # Non auto-balanced instances
2722
    i_offline = 0 # Count of offline instances
2723
    n_offline = 0 # Count of offline nodes
2724
    n_drained = 0 # Count of nodes being drained
2725
    node_vol_should = {}
2726

    
2727
    # FIXME: verify OS list
2728

    
2729
    # File verification
2730
    filemap = ComputeAncillaryFiles(cluster, False)
2731

    
2732
    # do local checksums
2733
    master_node_uuid = self.master_node = self.cfg.GetMasterNode()
2734
    master_ip = self.cfg.GetMasterIP()
2735

    
2736
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_uuids))
2737

    
2738
    user_scripts = []
2739
    if self.cfg.GetUseExternalMipScript():
2740
      user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2741

    
2742
    node_verify_param = {
2743
      constants.NV_FILELIST:
2744
        map(vcluster.MakeVirtualPath,
2745
            utils.UniqueSequence(filename
2746
                                 for files in filemap
2747
                                 for filename in files)),
2748
      constants.NV_NODELIST:
2749
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2750
                                  self.all_node_info.values()),
2751
      constants.NV_HYPERVISOR: hypervisors,
2752
      constants.NV_HVPARAMS:
2753
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2754
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2755
                                 for node in node_data_list
2756
                                 if not node.offline],
2757
      constants.NV_INSTANCELIST: hypervisors,
2758
      constants.NV_VERSION: None,
2759
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2760
      constants.NV_NODESETUP: None,
2761
      constants.NV_TIME: None,
2762
      constants.NV_MASTERIP: (self.cfg.GetMasterNodeName(), master_ip),
2763
      constants.NV_OSLIST: None,
2764
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2765
      constants.NV_USERSCRIPTS: user_scripts,
2766
      }
2767

    
2768
    if vg_name is not None:
2769
      node_verify_param[constants.NV_VGLIST] = None
2770
      node_verify_param[constants.NV_LVLIST] = vg_name
2771
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2772

    
2773
    if drbd_helper:
2774
      node_verify_param[constants.NV_DRBDVERSION] = None
2775
      node_verify_param[constants.NV_DRBDLIST] = None
2776
      node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2777

    
2778
    if cluster.IsFileStorageEnabled() or \
2779
        cluster.IsSharedFileStorageEnabled():
2780
      # Load file storage paths only from master node
2781
      node_verify_param[constants.NV_ACCEPTED_STORAGE_PATHS] = \
2782
        self.cfg.GetMasterNodeName()
2783
      if cluster.IsFileStorageEnabled():
2784
        node_verify_param[constants.NV_FILE_STORAGE_PATH] = \
2785
          cluster.file_storage_dir
2786

    
2787
    # bridge checks
2788
    # FIXME: this needs to be changed per node-group, not cluster-wide
2789
    bridges = set()
2790
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2791
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2792
      bridges.add(default_nicpp[constants.NIC_LINK])
2793
    for inst_uuid in self.my_inst_info.values():
2794
      for nic in inst_uuid.nics:
2795
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
2796
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2797
          bridges.add(full_nic[constants.NIC_LINK])
2798

    
2799
    if bridges:
2800
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
2801

    
2802
    # Build our expected cluster state
2803
    node_image = dict((node.uuid, self.NodeImage(offline=node.offline,
2804
                                                 uuid=node.uuid,
2805
                                                 vm_capable=node.vm_capable))
2806
                      for node in node_data_list)
2807

    
2808
    # Gather OOB paths
2809
    oob_paths = []
2810
    for node in self.all_node_info.values():
2811
      path = SupportsOob(self.cfg, node)
2812
      if path and path not in oob_paths:
2813
        oob_paths.append(path)
2814

    
2815
    if oob_paths:
2816
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2817

    
2818
    for inst_uuid in self.my_inst_uuids:
2819
      instance = self.my_inst_info[inst_uuid]
2820
      if instance.admin_state == constants.ADMINST_OFFLINE:
2821
        i_offline += 1
2822

    
2823
      for nuuid in instance.all_nodes:
2824
        if nuuid not in node_image:
2825
          gnode = self.NodeImage(uuid=nuuid)
2826
          gnode.ghost = (nuuid not in self.all_node_info)
2827
          node_image[nuuid] = gnode
2828

    
2829
      instance.MapLVsByNode(node_vol_should)
2830

    
2831
      pnode = instance.primary_node
2832
      node_image[pnode].pinst.append(instance.uuid)
2833

    
2834
      for snode in instance.secondary_nodes:
2835
        nimg = node_image[snode]
2836
        nimg.sinst.append(instance.uuid)
2837
        if pnode not in nimg.sbp:
2838
          nimg.sbp[pnode] = []
2839
        nimg.sbp[pnode].append(instance.uuid)
2840

    
2841
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
2842
                                               self.my_node_info.keys())
2843
    # The value of exclusive_storage should be the same across the group, so if
2844
    # it's True for at least a node, we act as if it were set for all the nodes
2845
    self._exclusive_storage = compat.any(es_flags.values())
2846
    if self._exclusive_storage:
2847
      node_verify_param[constants.NV_EXCLUSIVEPVS] = True
2848

    
2849
    # At this point, we have the in-memory data structures complete,
2850
    # except for the runtime information, which we'll gather next
2851

    
2852
    # Due to the way our RPC system works, exact response times cannot be
2853
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2854
    # time before and after executing the request, we can at least have a time
2855
    # window.
2856
    nvinfo_starttime = time.time()
2857
    all_nvinfo = self.rpc.call_node_verify(self.my_node_uuids,
2858
                                           node_verify_param,
2859
                                           self.cfg.GetClusterName(),
2860
                                           self.cfg.GetClusterInfo().hvparams)
2861
    nvinfo_endtime = time.time()
2862

    
2863
    if self.extra_lv_nodes and vg_name is not None:
2864
      extra_lv_nvinfo = \
2865
          self.rpc.call_node_verify(self.extra_lv_nodes,
2866
                                    {constants.NV_LVLIST: vg_name},
2867
                                    self.cfg.GetClusterName(),
2868
                                    self.cfg.GetClusterInfo().hvparams)
2869
    else:
2870
      extra_lv_nvinfo = {}
2871

    
2872
    all_drbd_map = self.cfg.ComputeDRBDMap()
2873

    
2874
    feedback_fn("* Gathering disk information (%s nodes)" %
2875
                len(self.my_node_uuids))
2876
    instdisk = self._CollectDiskInfo(self.my_node_info.keys(), node_image,
2877
                                     self.my_inst_info)
2878

    
2879
    feedback_fn("* Verifying configuration file consistency")
2880

    
2881
    # If not all nodes are being checked, we need to make sure the master node
2882
    # and a non-checked vm_capable node are in the list.
2883
    absent_node_uuids = set(self.all_node_info).difference(self.my_node_info)
2884
    if absent_node_uuids:
2885
      vf_nvinfo = all_nvinfo.copy()
2886
      vf_node_info = list(self.my_node_info.values())
2887
      additional_node_uuids = []
2888
      if master_node_uuid not in self.my_node_info:
2889
        additional_node_uuids.append(master_node_uuid)
2890
        vf_node_info.append(self.all_node_info[master_node_uuid])
2891
      # Add the first vm_capable node we find which is not included,
2892
      # excluding the master node (which we already have)
2893
      for node_uuid in absent_node_uuids:
2894
        nodeinfo = self.all_node_info[node_uuid]
2895
        if (nodeinfo.vm_capable and not nodeinfo.offline and
2896
            node_uuid != master_node_uuid):
2897
          additional_node_uuids.append(node_uuid)
2898
          vf_node_info.append(self.all_node_info[node_uuid])
2899
          break
2900
      key = constants.NV_FILELIST
2901
      vf_nvinfo.update(self.rpc.call_node_verify(
2902
         additional_node_uuids, {key: node_verify_param[key]},
2903
         self.cfg.GetClusterName(), self.cfg.GetClusterInfo().hvparams))
2904
    else:
2905
      vf_nvinfo = all_nvinfo
2906
      vf_node_info = self.my_node_info.values()
2907

    
2908
    self._VerifyFiles(vf_node_info, master_node_uuid, vf_nvinfo, filemap)
2909

    
2910
    feedback_fn("* Verifying node status")
2911

    
2912
    refos_img = None
2913

    
2914
    for node_i in node_data_list:
2915
      nimg = node_image[node_i.uuid]
2916

    
2917
      if node_i.offline:
2918
        if verbose:
2919
          feedback_fn("* Skipping offline node %s" % (node_i.name,))
2920
        n_offline += 1
2921
        continue
2922

    
2923
      if node_i.uuid == master_node_uuid:
2924
        ntype = "master"
2925
      elif node_i.master_candidate:
2926
        ntype = "master candidate"
2927
      elif node_i.drained:
2928
        ntype = "drained"
2929
        n_drained += 1
2930
      else:
2931
        ntype = "regular"
2932
      if verbose:
2933
        feedback_fn("* Verifying node %s (%s)" % (node_i.name, ntype))
2934

    
2935
      msg = all_nvinfo[node_i.uuid].fail_msg
2936
      self._ErrorIf(msg, constants.CV_ENODERPC, node_i.name,
2937
                    "while contacting node: %s", msg)
2938
      if msg:
2939
        nimg.rpc_fail = True
2940
        continue
2941

    
2942
      nresult = all_nvinfo[node_i.uuid].payload
2943

    
2944
      nimg.call_ok = self._VerifyNode(node_i, nresult)
2945
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2946
      self._VerifyNodeNetwork(node_i, nresult)
2947
      self._VerifyNodeUserScripts(node_i, nresult)
2948
      self._VerifyOob(node_i, nresult)
2949
      self._VerifyAcceptedFileStoragePaths(node_i, nresult,
2950
                                           node_i.uuid == master_node_uuid)
2951
      self._VerifyStoragePaths(node_i, nresult)
2952

    
2953
      if nimg.vm_capable:
2954
        self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
2955
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
2956
                             all_drbd_map)
2957

    
2958
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2959
        self._UpdateNodeInstances(node_i, nresult, nimg)
2960
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2961
        self._UpdateNodeOS(node_i, nresult, nimg)
2962

    
2963
        if not nimg.os_fail:
2964
          if refos_img is None:
2965
            refos_img = nimg
2966
          self._VerifyNodeOS(node_i, nimg, refos_img)
2967
        self._VerifyNodeBridges(node_i, nresult, bridges)
2968

    
2969
        # Check whether all running instances are primary for the node. (This
2970
        # can no longer be done from _VerifyInstance below, since some of the
2971
        # wrong instances could be from other node groups.)
2972
        non_primary_inst_uuids = set(nimg.instances).difference(nimg.pinst)
2973

    
2974
        for inst_uuid in non_primary_inst_uuids:
2975
          test = inst_uuid in self.all_inst_info
2976
          self._ErrorIf(test, constants.CV_EINSTANCEWRONGNODE,
2977
                        self.cfg.GetInstanceName(inst_uuid),
2978
                        "instance should not run on node %s", node_i.name)
2979
          self._ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
2980
                        "node is running unknown instance %s", inst_uuid)
2981

    
2982
    self._VerifyGroupDRBDVersion(all_nvinfo)
2983
    self._VerifyGroupLVM(node_image, vg_name)
2984

    
2985
    for node_uuid, result in extra_lv_nvinfo.items():
2986
      self._UpdateNodeVolumes(self.all_node_info[node_uuid], result.payload,
2987
                              node_image[node_uuid], vg_name)
2988

    
2989
    feedback_fn("* Verifying instance status")
2990
    for inst_uuid in self.my_inst_uuids:
2991
      instance = self.my_inst_info[inst_uuid]
2992
      if verbose:
2993
        feedback_fn("* Verifying instance %s" % instance.name)
2994
      self._VerifyInstance(instance, node_image, instdisk[inst_uuid])
2995

    
2996
      # If the instance is non-redundant we cannot survive losing its primary
2997
      # node, so we are not N+1 compliant.
2998
      if instance.disk_template not in constants.DTS_MIRRORED:
2999
        i_non_redundant.append(instance)
3000

    
3001
      if not cluster.FillBE(instance)[constants.BE_AUTO_BALANCE]:
3002
        i_non_a_balanced.append(instance)
3003

    
3004
    feedback_fn("* Verifying orphan volumes")
3005
    reserved = utils.FieldSet(*cluster.reserved_lvs)
3006

    
3007
    # We will get spurious "unknown volume" warnings if any node of this group
3008
    # is secondary for an instance whose primary is in another group. To avoid
3009
    # them, we find these instances and add their volumes to node_vol_should.
3010
    for instance in self.all_inst_info.values():
3011
      for secondary in instance.secondary_nodes:
3012
        if (secondary in self.my_node_info
3013
            and instance.name not in self.my_inst_info):
3014
          instance.MapLVsByNode(node_vol_should)
3015
          break
3016

    
3017
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
3018

    
3019
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
3020
      feedback_fn("* Verifying N+1 Memory redundancy")
3021
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
3022

    
3023
    feedback_fn("* Other Notes")
3024
    if i_non_redundant:
3025
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
3026
                  % len(i_non_redundant))
3027

    
3028
    if i_non_a_balanced:
3029
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
3030
                  % len(i_non_a_balanced))
3031

    
3032
    if i_offline:
3033
      feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
3034

    
3035
    if n_offline:
3036
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
3037

    
3038
    if n_drained:
3039
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
3040

    
3041
    return not self.bad
3042

    
3043
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
3044
    """Analyze the post-hooks' result
3045

3046
    This method analyses the hook result, handles it, and sends some
3047
    nicely-formatted feedback back to the user.
3048

3049
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
3050
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
3051
    @param hooks_results: the results of the multi-node hooks rpc call
3052
    @param feedback_fn: function used send feedback back to the caller
3053
    @param lu_result: previous Exec result
3054
    @return: the new Exec result, based on the previous result
3055
        and hook results
3056

3057
    """
3058
    # We only really run POST phase hooks, only for non-empty groups,
3059
    # and are only interested in their results
3060
    if not self.my_node_uuids:
3061
      # empty node group
3062
      pass
3063
    elif phase == constants.HOOKS_PHASE_POST:
3064
      # Used to change hooks' output to proper indentation
3065
      feedback_fn("* Hooks Results")
3066
      assert hooks_results, "invalid result from hooks"
3067

    
3068
      for node_name in hooks_results:
3069
        res = hooks_results[node_name]
3070
        msg = res.fail_msg
3071
        test = msg and not res.offline
3072
        self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3073
                      "Communication failure in hooks execution: %s", msg)
3074
        if res.offline or msg:
3075
          # No need to investigate payload if node is offline or gave
3076
          # an error.
3077
          continue
3078
        for script, hkr, output in res.payload:
3079
          test = hkr == constants.HKR_FAIL
3080
          self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3081
                        "Script %s failed, output:", script)
3082
          if test:
3083
            output = self._HOOKS_INDENT_RE.sub("      ", output)
3084
            feedback_fn("%s" % output)
3085
            lu_result = False
3086

    
3087
    return lu_result
3088

    
3089

    
3090
class LUClusterVerifyDisks(NoHooksLU):
3091
  """Verifies the cluster disks status.
3092

3093
  """
3094
  REQ_BGL = False
3095

    
3096
  def ExpandNames(self):
3097
    self.share_locks = ShareAll()
3098
    self.needed_locks = {
3099
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
3100
      }
3101

    
3102
  def Exec(self, feedback_fn):
3103
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3104

    
3105
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3106
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3107
                           for group in group_names])