Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / cluster.py @ 24e96ef6

History | View | Annotate | Download (116.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with the cluster."""
23

    
24
import OpenSSL
25

    
26
import copy
27
import itertools
28
import logging
29
import operator
30
import os
31
import re
32
import time
33

    
34
from ganeti import compat
35
from ganeti import constants
36
from ganeti import errors
37
from ganeti import hypervisor
38
from ganeti import locking
39
from ganeti import masterd
40
from ganeti import netutils
41
from ganeti import objects
42
from ganeti import opcodes
43
from ganeti import pathutils
44
from ganeti import query
45
from ganeti import rpc
46
from ganeti import runtime
47
from ganeti import ssh
48
from ganeti import uidpool
49
from ganeti import utils
50
from ganeti import vcluster
51

    
52
from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
53
  ResultWithJobs
54
from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55
  ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56
  GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57
  GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58
  CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59
  ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob
60

    
61
import ganeti.masterd.instance
62

    
63

    
64
class LUClusterActivateMasterIp(NoHooksLU):
65
  """Activate the master IP on the master node.
66

67
  """
68
  def Exec(self, feedback_fn):
69
    """Activate the master IP.
70

71
    """
72
    master_params = self.cfg.GetMasterNetworkParameters()
73
    ems = self.cfg.GetUseExternalMipScript()
74
    result = self.rpc.call_node_activate_master_ip(master_params.uuid,
75
                                                   master_params, ems)
76
    result.Raise("Could not activate the master IP")
77

    
78

    
79
class LUClusterDeactivateMasterIp(NoHooksLU):
80
  """Deactivate the master IP on the master node.
81

82
  """
83
  def Exec(self, feedback_fn):
84
    """Deactivate the master IP.
85

86
    """
87
    master_params = self.cfg.GetMasterNetworkParameters()
88
    ems = self.cfg.GetUseExternalMipScript()
89
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
90
                                                     master_params, ems)
91
    result.Raise("Could not deactivate the master IP")
92

    
93

    
94
class LUClusterConfigQuery(NoHooksLU):
95
  """Return configuration values.
96

97
  """
98
  REQ_BGL = False
99

    
100
  def CheckArguments(self):
101
    self.cq = ClusterQuery(None, self.op.output_fields, False)
102

    
103
  def ExpandNames(self):
104
    self.cq.ExpandNames(self)
105

    
106
  def DeclareLocks(self, level):
107
    self.cq.DeclareLocks(self, level)
108

    
109
  def Exec(self, feedback_fn):
110
    result = self.cq.OldStyleQuery(self)
111

    
112
    assert len(result) == 1
113

    
114
    return result[0]
115

    
116

    
117
class LUClusterDestroy(LogicalUnit):
118
  """Logical unit for destroying the cluster.
119

120
  """
121
  HPATH = "cluster-destroy"
122
  HTYPE = constants.HTYPE_CLUSTER
123

    
124
  def BuildHooksEnv(self):
125
    """Build hooks env.
126

127
    """
128
    return {
129
      "OP_TARGET": self.cfg.GetClusterName(),
130
      }
131

    
132
  def BuildHooksNodes(self):
133
    """Build hooks nodes.
134

135
    """
136
    return ([], [])
137

    
138
  def CheckPrereq(self):
139
    """Check prerequisites.
140

141
    This checks whether the cluster is empty.
142

143
    Any errors are signaled by raising errors.OpPrereqError.
144

145
    """
146
    master = self.cfg.GetMasterNode()
147

    
148
    nodelist = self.cfg.GetNodeList()
149
    if len(nodelist) != 1 or nodelist[0] != master:
150
      raise errors.OpPrereqError("There are still %d node(s) in"
151
                                 " this cluster." % (len(nodelist) - 1),
152
                                 errors.ECODE_INVAL)
153
    instancelist = self.cfg.GetInstanceList()
154
    if instancelist:
155
      raise errors.OpPrereqError("There are still %d instance(s) in"
156
                                 " this cluster." % len(instancelist),
157
                                 errors.ECODE_INVAL)
158

    
159
  def Exec(self, feedback_fn):
160
    """Destroys the cluster.
161

162
    """
163
    master_params = self.cfg.GetMasterNetworkParameters()
164

    
165
    # Run post hooks on master node before it's removed
166
    RunPostHook(self, self.cfg.GetNodeName(master_params.uuid))
167

    
168
    ems = self.cfg.GetUseExternalMipScript()
169
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
170
                                                     master_params, ems)
171
    result.Warn("Error disabling the master IP address", self.LogWarning)
172
    return master_params.uuid
173

    
174

    
175
class LUClusterPostInit(LogicalUnit):
176
  """Logical unit for running hooks after cluster initialization.
177

178
  """
179
  HPATH = "cluster-init"
180
  HTYPE = constants.HTYPE_CLUSTER
181

    
182
  def BuildHooksEnv(self):
183
    """Build hooks env.
184

185
    """
186
    return {
187
      "OP_TARGET": self.cfg.GetClusterName(),
188
      }
189

    
190
  def BuildHooksNodes(self):
191
    """Build hooks nodes.
192

193
    """
194
    return ([], [self.cfg.GetMasterNode()])
195

    
196
  def Exec(self, feedback_fn):
197
    """Nothing to do.
198

199
    """
200
    return True
201

    
202

    
203
class ClusterQuery(QueryBase):
204
  FIELDS = query.CLUSTER_FIELDS
205

    
206
  #: Do not sort (there is only one item)
207
  SORT_FIELD = None
208

    
209
  def ExpandNames(self, lu):
210
    lu.needed_locks = {}
211

    
212
    # The following variables interact with _QueryBase._GetNames
213
    self.wanted = locking.ALL_SET
214
    self.do_locking = self.use_locking
215

    
216
    if self.do_locking:
217
      raise errors.OpPrereqError("Can not use locking for cluster queries",
218
                                 errors.ECODE_INVAL)
219

    
220
  def DeclareLocks(self, lu, level):
221
    pass
222

    
223
  def _GetQueryData(self, lu):
224
    """Computes the list of nodes and their attributes.
225

226
    """
227
    # Locking is not used
228
    assert not (compat.any(lu.glm.is_owned(level)
229
                           for level in locking.LEVELS
230
                           if level != locking.LEVEL_CLUSTER) or
231
                self.do_locking or self.use_locking)
232

    
233
    if query.CQ_CONFIG in self.requested_data:
234
      cluster = lu.cfg.GetClusterInfo()
235
      nodes = lu.cfg.GetAllNodesInfo()
236
    else:
237
      cluster = NotImplemented
238
      nodes = NotImplemented
239

    
240
    if query.CQ_QUEUE_DRAINED in self.requested_data:
241
      drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
242
    else:
243
      drain_flag = NotImplemented
244

    
245
    if query.CQ_WATCHER_PAUSE in self.requested_data:
246
      master_node_uuid = lu.cfg.GetMasterNode()
247

    
248
      result = lu.rpc.call_get_watcher_pause(master_node_uuid)
249
      result.Raise("Can't retrieve watcher pause from master node '%s'" %
250
                   lu.cfg.GetMasterNodeName())
251

    
252
      watcher_pause = result.payload
253
    else:
254
      watcher_pause = NotImplemented
255

    
256
    return query.ClusterQueryData(cluster, nodes, drain_flag, watcher_pause)
257

    
258

    
259
class LUClusterQuery(NoHooksLU):
260
  """Query cluster configuration.
261

262
  """
263
  REQ_BGL = False
264

    
265
  def ExpandNames(self):
266
    self.needed_locks = {}
267

    
268
  def Exec(self, feedback_fn):
269
    """Return cluster config.
270

271
    """
272
    cluster = self.cfg.GetClusterInfo()
273
    os_hvp = {}
274

    
275
    # Filter just for enabled hypervisors
276
    for os_name, hv_dict in cluster.os_hvp.items():
277
      os_hvp[os_name] = {}
278
      for hv_name, hv_params in hv_dict.items():
279
        if hv_name in cluster.enabled_hypervisors:
280
          os_hvp[os_name][hv_name] = hv_params
281

    
282
    # Convert ip_family to ip_version
283
    primary_ip_version = constants.IP4_VERSION
284
    if cluster.primary_ip_family == netutils.IP6Address.family:
285
      primary_ip_version = constants.IP6_VERSION
286

    
287
    result = {
288
      "software_version": constants.RELEASE_VERSION,
289
      "protocol_version": constants.PROTOCOL_VERSION,
290
      "config_version": constants.CONFIG_VERSION,
291
      "os_api_version": max(constants.OS_API_VERSIONS),
292
      "export_version": constants.EXPORT_VERSION,
293
      "architecture": runtime.GetArchInfo(),
294
      "name": cluster.cluster_name,
295
      "master": self.cfg.GetMasterNodeName(),
296
      "default_hypervisor": cluster.primary_hypervisor,
297
      "enabled_hypervisors": cluster.enabled_hypervisors,
298
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
299
                        for hypervisor_name in cluster.enabled_hypervisors]),
300
      "os_hvp": os_hvp,
301
      "beparams": cluster.beparams,
302
      "osparams": cluster.osparams,
303
      "ipolicy": cluster.ipolicy,
304
      "nicparams": cluster.nicparams,
305
      "ndparams": cluster.ndparams,
306
      "diskparams": cluster.diskparams,
307
      "candidate_pool_size": cluster.candidate_pool_size,
308
      "master_netdev": cluster.master_netdev,
309
      "master_netmask": cluster.master_netmask,
310
      "use_external_mip_script": cluster.use_external_mip_script,
311
      "volume_group_name": cluster.volume_group_name,
312
      "drbd_usermode_helper": cluster.drbd_usermode_helper,
313
      "file_storage_dir": cluster.file_storage_dir,
314
      "shared_file_storage_dir": cluster.shared_file_storage_dir,
315
      "maintain_node_health": cluster.maintain_node_health,
316
      "ctime": cluster.ctime,
317
      "mtime": cluster.mtime,
318
      "uuid": cluster.uuid,
319
      "tags": list(cluster.GetTags()),
320
      "uid_pool": cluster.uid_pool,
321
      "default_iallocator": cluster.default_iallocator,
322
      "reserved_lvs": cluster.reserved_lvs,
323
      "primary_ip_version": primary_ip_version,
324
      "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
325
      "hidden_os": cluster.hidden_os,
326
      "blacklisted_os": cluster.blacklisted_os,
327
      "enabled_disk_templates": cluster.enabled_disk_templates,
328
      }
329

    
330
    return result
331

    
332

    
333
class LUClusterRedistConf(NoHooksLU):
334
  """Force the redistribution of cluster configuration.
335

336
  This is a very simple LU.
337

338
  """
339
  REQ_BGL = False
340

    
341
  def ExpandNames(self):
342
    self.needed_locks = {
343
      locking.LEVEL_NODE: locking.ALL_SET,
344
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
345
    }
346
    self.share_locks = ShareAll()
347

    
348
  def Exec(self, feedback_fn):
349
    """Redistribute the configuration.
350

351
    """
352
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
353
    RedistributeAncillaryFiles(self)
354

    
355

    
356
class LUClusterRename(LogicalUnit):
357
  """Rename the cluster.
358

359
  """
360
  HPATH = "cluster-rename"
361
  HTYPE = constants.HTYPE_CLUSTER
362

    
363
  def BuildHooksEnv(self):
364
    """Build hooks env.
365

366
    """
367
    return {
368
      "OP_TARGET": self.cfg.GetClusterName(),
369
      "NEW_NAME": self.op.name,
370
      }
371

    
372
  def BuildHooksNodes(self):
373
    """Build hooks nodes.
374

375
    """
376
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
377

    
378
  def CheckPrereq(self):
379
    """Verify that the passed name is a valid one.
380

381
    """
382
    hostname = netutils.GetHostname(name=self.op.name,
383
                                    family=self.cfg.GetPrimaryIPFamily())
384

    
385
    new_name = hostname.name
386
    self.ip = new_ip = hostname.ip
387
    old_name = self.cfg.GetClusterName()
388
    old_ip = self.cfg.GetMasterIP()
389
    if new_name == old_name and new_ip == old_ip:
390
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
391
                                 " cluster has changed",
392
                                 errors.ECODE_INVAL)
393
    if new_ip != old_ip:
394
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
395
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
396
                                   " reachable on the network" %
397
                                   new_ip, errors.ECODE_NOTUNIQUE)
398

    
399
    self.op.name = new_name
400

    
401
  def Exec(self, feedback_fn):
402
    """Rename the cluster.
403

404
    """
405
    clustername = self.op.name
406
    new_ip = self.ip
407

    
408
    # shutdown the master IP
409
    master_params = self.cfg.GetMasterNetworkParameters()
410
    ems = self.cfg.GetUseExternalMipScript()
411
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
412
                                                     master_params, ems)
413
    result.Raise("Could not disable the master role")
414

    
415
    try:
416
      cluster = self.cfg.GetClusterInfo()
417
      cluster.cluster_name = clustername
418
      cluster.master_ip = new_ip
419
      self.cfg.Update(cluster, feedback_fn)
420

    
421
      # update the known hosts file
422
      ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
423
      node_list = self.cfg.GetOnlineNodeList()
424
      try:
425
        node_list.remove(master_params.uuid)
426
      except ValueError:
427
        pass
428
      UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
429
    finally:
430
      master_params.ip = new_ip
431
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
432
                                                     master_params, ems)
433
      result.Warn("Could not re-enable the master role on the master,"
434
                  " please restart manually", self.LogWarning)
435

    
436
    return clustername
437

    
438

    
439
class LUClusterRepairDiskSizes(NoHooksLU):
440
  """Verifies the cluster disks sizes.
441

442
  """
443
  REQ_BGL = False
444

    
445
  def ExpandNames(self):
446
    if self.op.instances:
447
      (_, self.wanted_names) = GetWantedInstances(self, self.op.instances)
448
      # Not getting the node allocation lock as only a specific set of
449
      # instances (and their nodes) is going to be acquired
450
      self.needed_locks = {
451
        locking.LEVEL_NODE_RES: [],
452
        locking.LEVEL_INSTANCE: self.wanted_names,
453
        }
454
      self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
455
    else:
456
      self.wanted_names = None
457
      self.needed_locks = {
458
        locking.LEVEL_NODE_RES: locking.ALL_SET,
459
        locking.LEVEL_INSTANCE: locking.ALL_SET,
460

    
461
        # This opcode is acquires the node locks for all instances
462
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
463
        }
464

    
465
    self.share_locks = {
466
      locking.LEVEL_NODE_RES: 1,
467
      locking.LEVEL_INSTANCE: 0,
468
      locking.LEVEL_NODE_ALLOC: 1,
469
      }
470

    
471
  def DeclareLocks(self, level):
472
    if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
473
      self._LockInstancesNodes(primary_only=True, level=level)
474

    
475
  def CheckPrereq(self):
476
    """Check prerequisites.
477

478
    This only checks the optional instance list against the existing names.
479

480
    """
481
    if self.wanted_names is None:
482
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
483

    
484
    self.wanted_instances = \
485
        map(compat.snd, self.cfg.GetMultiInstanceInfoByName(self.wanted_names))
486

    
487
  def _EnsureChildSizes(self, disk):
488
    """Ensure children of the disk have the needed disk size.
489

490
    This is valid mainly for DRBD8 and fixes an issue where the
491
    children have smaller disk size.
492

493
    @param disk: an L{ganeti.objects.Disk} object
494

495
    """
496
    if disk.dev_type == constants.LD_DRBD8:
497
      assert disk.children, "Empty children for DRBD8?"
498
      fchild = disk.children[0]
499
      mismatch = fchild.size < disk.size
500
      if mismatch:
501
        self.LogInfo("Child disk has size %d, parent %d, fixing",
502
                     fchild.size, disk.size)
503
        fchild.size = disk.size
504

    
505
      # and we recurse on this child only, not on the metadev
506
      return self._EnsureChildSizes(fchild) or mismatch
507
    else:
508
      return False
509

    
510
  def Exec(self, feedback_fn):
511
    """Verify the size of cluster disks.
512

513
    """
514
    # TODO: check child disks too
515
    # TODO: check differences in size between primary/secondary nodes
516
    per_node_disks = {}
517
    for instance in self.wanted_instances:
518
      pnode = instance.primary_node
519
      if pnode not in per_node_disks:
520
        per_node_disks[pnode] = []
521
      for idx, disk in enumerate(instance.disks):
522
        per_node_disks[pnode].append((instance, idx, disk))
523

    
524
    assert not (frozenset(per_node_disks.keys()) -
525
                self.owned_locks(locking.LEVEL_NODE_RES)), \
526
      "Not owning correct locks"
527
    assert not self.owned_locks(locking.LEVEL_NODE)
528

    
529
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
530
                                               per_node_disks.keys())
531

    
532
    changed = []
533
    for node_uuid, dskl in per_node_disks.items():
534
      newl = [v[2].Copy() for v in dskl]
535
      for dsk in newl:
536
        self.cfg.SetDiskID(dsk, node_uuid)
537
      node_name = self.cfg.GetNodeName(node_uuid)
538
      result = self.rpc.call_blockdev_getdimensions(node_uuid, newl)
539
      if result.fail_msg:
540
        self.LogWarning("Failure in blockdev_getdimensions call to node"
541
                        " %s, ignoring", node_name)
542
        continue
543
      if len(result.payload) != len(dskl):
544
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
545
                        " result.payload=%s", node_name, len(dskl),
546
                        result.payload)
547
        self.LogWarning("Invalid result from node %s, ignoring node results",
548
                        node_name)
549
        continue
550
      for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
551
        if dimensions is None:
552
          self.LogWarning("Disk %d of instance %s did not return size"
553
                          " information, ignoring", idx, instance.name)
554
          continue
555
        if not isinstance(dimensions, (tuple, list)):
556
          self.LogWarning("Disk %d of instance %s did not return valid"
557
                          " dimension information, ignoring", idx,
558
                          instance.name)
559
          continue
560
        (size, spindles) = dimensions
561
        if not isinstance(size, (int, long)):
562
          self.LogWarning("Disk %d of instance %s did not return valid"
563
                          " size information, ignoring", idx, instance.name)
564
          continue
565
        size = size >> 20
566
        if size != disk.size:
567
          self.LogInfo("Disk %d of instance %s has mismatched size,"
568
                       " correcting: recorded %d, actual %d", idx,
569
                       instance.name, disk.size, size)
570
          disk.size = size
571
          self.cfg.Update(instance, feedback_fn)
572
          changed.append((instance.name, idx, "size", size))
573
        if es_flags[node_uuid]:
574
          if spindles is None:
575
            self.LogWarning("Disk %d of instance %s did not return valid"
576
                            " spindles information, ignoring", idx,
577
                            instance.name)
578
          elif disk.spindles is None or disk.spindles != spindles:
579
            self.LogInfo("Disk %d of instance %s has mismatched spindles,"
580
                         " correcting: recorded %s, actual %s",
581
                         idx, instance.name, disk.spindles, spindles)
582
            disk.spindles = spindles
583
            self.cfg.Update(instance, feedback_fn)
584
            changed.append((instance.name, idx, "spindles", disk.spindles))
585
        if self._EnsureChildSizes(disk):
586
          self.cfg.Update(instance, feedback_fn)
587
          changed.append((instance.name, idx, "size", disk.size))
588
    return changed
589

    
590

    
591
def _ValidateNetmask(cfg, netmask):
592
  """Checks if a netmask is valid.
593

594
  @type cfg: L{config.ConfigWriter}
595
  @param cfg: The cluster configuration
596
  @type netmask: int
597
  @param netmask: the netmask to be verified
598
  @raise errors.OpPrereqError: if the validation fails
599

600
  """
601
  ip_family = cfg.GetPrimaryIPFamily()
602
  try:
603
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
604
  except errors.ProgrammerError:
605
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
606
                               ip_family, errors.ECODE_INVAL)
607
  if not ipcls.ValidateNetmask(netmask):
608
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
609
                               (netmask), errors.ECODE_INVAL)
610

    
611

    
612
def CheckFileBasedStoragePathVsEnabledDiskTemplates(
613
    logging_warn_fn, file_storage_dir, enabled_disk_templates,
614
    file_disk_template):
615
  """Checks whether the given file-based storage directory is acceptable.
616

617
  Note: This function is public, because it is also used in bootstrap.py.
618

619
  @type logging_warn_fn: function
620
  @param logging_warn_fn: function which accepts a string and logs it
621
  @type file_storage_dir: string
622
  @param file_storage_dir: the directory to be used for file-based instances
623
  @type enabled_disk_templates: list of string
624
  @param enabled_disk_templates: the list of enabled disk templates
625
  @type file_disk_template: string
626
  @param file_disk_template: the file-based disk template for which the
627
      path should be checked
628

629
  """
630
  assert (file_disk_template in
631
          utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
632
  file_storage_enabled = file_disk_template in enabled_disk_templates
633
  if file_storage_dir is not None:
634
    if file_storage_dir == "":
635
      if file_storage_enabled:
636
        raise errors.OpPrereqError(
637
            "Unsetting the '%s' storage directory while having '%s' storage"
638
            " enabled is not permitted." %
639
            (file_disk_template, file_disk_template))
640
    else:
641
      if not file_storage_enabled:
642
        logging_warn_fn(
643
            "Specified a %s storage directory, although %s storage is not"
644
            " enabled." % (file_disk_template, file_disk_template))
645
  else:
646
    raise errors.ProgrammerError("Received %s storage dir with value"
647
                                 " 'None'." % file_disk_template)
648

    
649

    
650
def CheckFileStoragePathVsEnabledDiskTemplates(
651
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
652
  """Checks whether the given file storage directory is acceptable.
653

654
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
655

656
  """
657
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
658
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
659
      constants.DT_FILE)
660

    
661

    
662
def CheckSharedFileStoragePathVsEnabledDiskTemplates(
663
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
664
  """Checks whether the given shared file storage directory is acceptable.
665

666
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
667

668
  """
669
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
670
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
671
      constants.DT_SHARED_FILE)
672

    
673

    
674
class LUClusterSetParams(LogicalUnit):
675
  """Change the parameters of the cluster.
676

677
  """
678
  HPATH = "cluster-modify"
679
  HTYPE = constants.HTYPE_CLUSTER
680
  REQ_BGL = False
681

    
682
  def CheckArguments(self):
683
    """Check parameters
684

685
    """
686
    if self.op.uid_pool:
687
      uidpool.CheckUidPool(self.op.uid_pool)
688

    
689
    if self.op.add_uids:
690
      uidpool.CheckUidPool(self.op.add_uids)
691

    
692
    if self.op.remove_uids:
693
      uidpool.CheckUidPool(self.op.remove_uids)
694

    
695
    if self.op.master_netmask is not None:
696
      _ValidateNetmask(self.cfg, self.op.master_netmask)
697

    
698
    if self.op.diskparams:
699
      for dt_params in self.op.diskparams.values():
700
        utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
701
      try:
702
        utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
703
      except errors.OpPrereqError, err:
704
        raise errors.OpPrereqError("While verify diskparams options: %s" % err,
705
                                   errors.ECODE_INVAL)
706

    
707
  def ExpandNames(self):
708
    # FIXME: in the future maybe other cluster params won't require checking on
709
    # all nodes to be modified.
710
    # FIXME: This opcode changes cluster-wide settings. Is acquiring all
711
    # resource locks the right thing, shouldn't it be the BGL instead?
712
    self.needed_locks = {
713
      locking.LEVEL_NODE: locking.ALL_SET,
714
      locking.LEVEL_INSTANCE: locking.ALL_SET,
715
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
716
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
717
    }
718
    self.share_locks = ShareAll()
719

    
720
  def BuildHooksEnv(self):
721
    """Build hooks env.
722

723
    """
724
    return {
725
      "OP_TARGET": self.cfg.GetClusterName(),
726
      "NEW_VG_NAME": self.op.vg_name,
727
      }
728

    
729
  def BuildHooksNodes(self):
730
    """Build hooks nodes.
731

732
    """
733
    mn = self.cfg.GetMasterNode()
734
    return ([mn], [mn])
735

    
736
  def _CheckVgName(self, node_uuids, enabled_disk_templates,
737
                   new_enabled_disk_templates):
738
    """Check the consistency of the vg name on all nodes and in case it gets
739
       unset whether there are instances still using it.
740

741
    """
742
    if self.op.vg_name is not None and not self.op.vg_name:
743
      if self.cfg.HasAnyDiskOfType(constants.LD_LV):
744
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
745
                                   " instances exist", errors.ECODE_INVAL)
746

    
747
    if (self.op.vg_name is not None and
748
        utils.IsLvmEnabled(enabled_disk_templates)) or \
749
           (self.cfg.GetVGName() is not None and
750
            utils.LvmGetsEnabled(enabled_disk_templates,
751
                                 new_enabled_disk_templates)):
752
      self._CheckVgNameOnNodes(node_uuids)
753

    
754
  def _CheckVgNameOnNodes(self, node_uuids):
755
    """Check the status of the volume group on each node.
756

757
    """
758
    vglist = self.rpc.call_vg_list(node_uuids)
759
    for node_uuid in node_uuids:
760
      msg = vglist[node_uuid].fail_msg
761
      if msg:
762
        # ignoring down node
763
        self.LogWarning("Error while gathering data on node %s"
764
                        " (ignoring node): %s",
765
                        self.cfg.GetNodeName(node_uuid), msg)
766
        continue
767
      vgstatus = utils.CheckVolumeGroupSize(vglist[node_uuid].payload,
768
                                            self.op.vg_name,
769
                                            constants.MIN_VG_SIZE)
770
      if vgstatus:
771
        raise errors.OpPrereqError("Error on node '%s': %s" %
772
                                   (self.cfg.GetNodeName(node_uuid), vgstatus),
773
                                   errors.ECODE_ENVIRON)
774

    
775
  def _GetEnabledDiskTemplates(self, cluster):
776
    """Determines the enabled disk templates and the subset of disk templates
777
       that are newly enabled by this operation.
778

779
    """
780
    enabled_disk_templates = None
781
    new_enabled_disk_templates = []
782
    if self.op.enabled_disk_templates:
783
      enabled_disk_templates = self.op.enabled_disk_templates
784
      new_enabled_disk_templates = \
785
        list(set(enabled_disk_templates)
786
             - set(cluster.enabled_disk_templates))
787
    else:
788
      enabled_disk_templates = cluster.enabled_disk_templates
789
    return (enabled_disk_templates, new_enabled_disk_templates)
790

    
791
  def CheckPrereq(self):
792
    """Check prerequisites.
793

794
    This checks whether the given params don't conflict and
795
    if the given volume group is valid.
796

797
    """
798
    if self.op.drbd_helper is not None and not self.op.drbd_helper:
799
      if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
800
        raise errors.OpPrereqError("Cannot disable drbd helper while"
801
                                   " drbd-based instances exist",
802
                                   errors.ECODE_INVAL)
803

    
804
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
805
    self.cluster = cluster = self.cfg.GetClusterInfo()
806

    
807
    vm_capable_node_uuids = [node.uuid
808
                             for node in self.cfg.GetAllNodesInfo().values()
809
                             if node.uuid in node_uuids and node.vm_capable]
810

    
811
    (enabled_disk_templates, new_enabled_disk_templates) = \
812
      self._GetEnabledDiskTemplates(cluster)
813

    
814
    self._CheckVgName(vm_capable_node_uuids, enabled_disk_templates,
815
                      new_enabled_disk_templates)
816

    
817
    if self.op.file_storage_dir is not None:
818
      CheckFileStoragePathVsEnabledDiskTemplates(
819
          self.LogWarning, self.op.file_storage_dir, enabled_disk_templates)
820

    
821
    if self.op.shared_file_storage_dir is not None:
822
      CheckSharedFileStoragePathVsEnabledDiskTemplates(
823
          self.LogWarning, self.op.shared_file_storage_dir,
824
          enabled_disk_templates)
825

    
826
    if self.op.drbd_helper:
827
      # checks given drbd helper on all nodes
828
      helpers = self.rpc.call_drbd_helper(node_uuids)
829
      for (_, ninfo) in self.cfg.GetMultiNodeInfo(node_uuids):
830
        if ninfo.offline:
831
          self.LogInfo("Not checking drbd helper on offline node %s",
832
                       ninfo.name)
833
          continue
834
        msg = helpers[ninfo.uuid].fail_msg
835
        if msg:
836
          raise errors.OpPrereqError("Error checking drbd helper on node"
837
                                     " '%s': %s" % (ninfo.name, msg),
838
                                     errors.ECODE_ENVIRON)
839
        node_helper = helpers[ninfo.uuid].payload
840
        if node_helper != self.op.drbd_helper:
841
          raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
842
                                     (ninfo.name, node_helper),
843
                                     errors.ECODE_ENVIRON)
844

    
845
    # validate params changes
846
    if self.op.beparams:
847
      objects.UpgradeBeParams(self.op.beparams)
848
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
849
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
850

    
851
    if self.op.ndparams:
852
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
853
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
854

    
855
      # TODO: we need a more general way to handle resetting
856
      # cluster-level parameters to default values
857
      if self.new_ndparams["oob_program"] == "":
858
        self.new_ndparams["oob_program"] = \
859
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
860

    
861
    if self.op.hv_state:
862
      new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
863
                                           self.cluster.hv_state_static)
864
      self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
865
                               for hv, values in new_hv_state.items())
866

    
867
    if self.op.disk_state:
868
      new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
869
                                               self.cluster.disk_state_static)
870
      self.new_disk_state = \
871
        dict((storage, dict((name, cluster.SimpleFillDiskState(values))
872
                            for name, values in svalues.items()))
873
             for storage, svalues in new_disk_state.items())
874

    
875
    if self.op.ipolicy:
876
      self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
877
                                           group_policy=False)
878

    
879
      all_instances = self.cfg.GetAllInstancesInfo().values()
880
      violations = set()
881
      for group in self.cfg.GetAllNodeGroupsInfo().values():
882
        instances = frozenset([inst for inst in all_instances
883
                               if compat.any(nuuid in group.members
884
                                             for nuuid in inst.all_nodes)])
885
        new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
886
        ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
887
        new = ComputeNewInstanceViolations(ipol, new_ipolicy, instances,
888
                                           self.cfg)
889
        if new:
890
          violations.update(new)
891

    
892
      if violations:
893
        self.LogWarning("After the ipolicy change the following instances"
894
                        " violate them: %s",
895
                        utils.CommaJoin(utils.NiceSort(violations)))
896

    
897
    if self.op.nicparams:
898
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
899
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
900
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
901
      nic_errors = []
902

    
903
      # check all instances for consistency
904
      for instance in self.cfg.GetAllInstancesInfo().values():
905
        for nic_idx, nic in enumerate(instance.nics):
906
          params_copy = copy.deepcopy(nic.nicparams)
907
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
908

    
909
          # check parameter syntax
910
          try:
911
            objects.NIC.CheckParameterSyntax(params_filled)
912
          except errors.ConfigurationError, err:
913
            nic_errors.append("Instance %s, nic/%d: %s" %
914
                              (instance.name, nic_idx, err))
915

    
916
          # if we're moving instances to routed, check that they have an ip
917
          target_mode = params_filled[constants.NIC_MODE]
918
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
919
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
920
                              " address" % (instance.name, nic_idx))
921
      if nic_errors:
922
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
923
                                   "\n".join(nic_errors), errors.ECODE_INVAL)
924

    
925
    # hypervisor list/parameters
926
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
927
    if self.op.hvparams:
928
      for hv_name, hv_dict in self.op.hvparams.items():
929
        if hv_name not in self.new_hvparams:
930
          self.new_hvparams[hv_name] = hv_dict
931
        else:
932
          self.new_hvparams[hv_name].update(hv_dict)
933

    
934
    # disk template parameters
935
    self.new_diskparams = objects.FillDict(cluster.diskparams, {})
936
    if self.op.diskparams:
937
      for dt_name, dt_params in self.op.diskparams.items():
938
        if dt_name not in self.op.diskparams:
939
          self.new_diskparams[dt_name] = dt_params
940
        else:
941
          self.new_diskparams[dt_name].update(dt_params)
942

    
943
    # os hypervisor parameters
944
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
945
    if self.op.os_hvp:
946
      for os_name, hvs in self.op.os_hvp.items():
947
        if os_name not in self.new_os_hvp:
948
          self.new_os_hvp[os_name] = hvs
949
        else:
950
          for hv_name, hv_dict in hvs.items():
951
            if hv_dict is None:
952
              # Delete if it exists
953
              self.new_os_hvp[os_name].pop(hv_name, None)
954
            elif hv_name not in self.new_os_hvp[os_name]:
955
              self.new_os_hvp[os_name][hv_name] = hv_dict
956
            else:
957
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
958

    
959
    # os parameters
960
    self.new_osp = objects.FillDict(cluster.osparams, {})
961
    if self.op.osparams:
962
      for os_name, osp in self.op.osparams.items():
963
        if os_name not in self.new_osp:
964
          self.new_osp[os_name] = {}
965

    
966
        self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
967
                                                 use_none=True)
968

    
969
        if not self.new_osp[os_name]:
970
          # we removed all parameters
971
          del self.new_osp[os_name]
972
        else:
973
          # check the parameter validity (remote check)
974
          CheckOSParams(self, False, [self.cfg.GetMasterNode()],
975
                        os_name, self.new_osp[os_name])
976

    
977
    # changes to the hypervisor list
978
    if self.op.enabled_hypervisors is not None:
979
      self.hv_list = self.op.enabled_hypervisors
980
      for hv in self.hv_list:
981
        # if the hypervisor doesn't already exist in the cluster
982
        # hvparams, we initialize it to empty, and then (in both
983
        # cases) we make sure to fill the defaults, as we might not
984
        # have a complete defaults list if the hypervisor wasn't
985
        # enabled before
986
        if hv not in new_hvp:
987
          new_hvp[hv] = {}
988
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
989
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
990
    else:
991
      self.hv_list = cluster.enabled_hypervisors
992

    
993
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
994
      # either the enabled list has changed, or the parameters have, validate
995
      for hv_name, hv_params in self.new_hvparams.items():
996
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
997
            (self.op.enabled_hypervisors and
998
             hv_name in self.op.enabled_hypervisors)):
999
          # either this is a new hypervisor, or its parameters have changed
1000
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1001
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1002
          hv_class.CheckParameterSyntax(hv_params)
1003
          CheckHVParams(self, node_uuids, hv_name, hv_params)
1004

    
1005
    self._CheckDiskTemplateConsistency()
1006

    
1007
    if self.op.os_hvp:
1008
      # no need to check any newly-enabled hypervisors, since the
1009
      # defaults have already been checked in the above code-block
1010
      for os_name, os_hvp in self.new_os_hvp.items():
1011
        for hv_name, hv_params in os_hvp.items():
1012
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1013
          # we need to fill in the new os_hvp on top of the actual hv_p
1014
          cluster_defaults = self.new_hvparams.get(hv_name, {})
1015
          new_osp = objects.FillDict(cluster_defaults, hv_params)
1016
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1017
          hv_class.CheckParameterSyntax(new_osp)
1018
          CheckHVParams(self, node_uuids, hv_name, new_osp)
1019

    
1020
    if self.op.default_iallocator:
1021
      alloc_script = utils.FindFile(self.op.default_iallocator,
1022
                                    constants.IALLOCATOR_SEARCH_PATH,
1023
                                    os.path.isfile)
1024
      if alloc_script is None:
1025
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
1026
                                   " specified" % self.op.default_iallocator,
1027
                                   errors.ECODE_INVAL)
1028

    
1029
  def _CheckDiskTemplateConsistency(self):
1030
    """Check whether the disk templates that are going to be disabled
1031
       are still in use by some instances.
1032

1033
    """
1034
    if self.op.enabled_disk_templates:
1035
      cluster = self.cfg.GetClusterInfo()
1036
      instances = self.cfg.GetAllInstancesInfo()
1037

    
1038
      disk_templates_to_remove = set(cluster.enabled_disk_templates) \
1039
        - set(self.op.enabled_disk_templates)
1040
      for instance in instances.itervalues():
1041
        if instance.disk_template in disk_templates_to_remove:
1042
          raise errors.OpPrereqError("Cannot disable disk template '%s',"
1043
                                     " because instance '%s' is using it." %
1044
                                     (instance.disk_template, instance.name))
1045

    
1046
  def _SetVgName(self, feedback_fn):
1047
    """Determines and sets the new volume group name.
1048

1049
    """
1050
    if self.op.vg_name is not None:
1051
      if self.op.vg_name and not \
1052
           utils.IsLvmEnabled(self.cluster.enabled_disk_templates):
1053
        feedback_fn("Note that you specified a volume group, but did not"
1054
                    " enable any lvm disk template.")
1055
      new_volume = self.op.vg_name
1056
      if not new_volume:
1057
        if utils.IsLvmEnabled(self.cluster.enabled_disk_templates):
1058
          raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
1059
                                     " disk templates are enabled.")
1060
        new_volume = None
1061
      if new_volume != self.cfg.GetVGName():
1062
        self.cfg.SetVGName(new_volume)
1063
      else:
1064
        feedback_fn("Cluster LVM configuration already in desired"
1065
                    " state, not changing")
1066
    else:
1067
      if utils.IsLvmEnabled(self.cluster.enabled_disk_templates) and \
1068
          not self.cfg.GetVGName():
1069
        raise errors.OpPrereqError("Please specify a volume group when"
1070
                                   " enabling lvm-based disk-templates.")
1071

    
1072
  def _SetFileStorageDir(self, feedback_fn):
1073
    """Set the file storage directory.
1074

1075
    """
1076
    if self.op.file_storage_dir is not None:
1077
      if self.cluster.file_storage_dir == self.op.file_storage_dir:
1078
        feedback_fn("Global file storage dir already set to value '%s'"
1079
                    % self.cluster.file_storage_dir)
1080
      else:
1081
        self.cluster.file_storage_dir = self.op.file_storage_dir
1082

    
1083
  def Exec(self, feedback_fn):
1084
    """Change the parameters of the cluster.
1085

1086
    """
1087
    if self.op.enabled_disk_templates:
1088
      self.cluster.enabled_disk_templates = \
1089
        list(set(self.op.enabled_disk_templates))
1090

    
1091
    self._SetVgName(feedback_fn)
1092
    self._SetFileStorageDir(feedback_fn)
1093

    
1094
    if self.op.drbd_helper is not None:
1095
      if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
1096
        feedback_fn("Note that you specified a drbd user helper, but did"
1097
                    " enabled the drbd disk template.")
1098
      new_helper = self.op.drbd_helper
1099
      if not new_helper:
1100
        new_helper = None
1101
      if new_helper != self.cfg.GetDRBDHelper():
1102
        self.cfg.SetDRBDHelper(new_helper)
1103
      else:
1104
        feedback_fn("Cluster DRBD helper already in desired state,"
1105
                    " not changing")
1106
    if self.op.hvparams:
1107
      self.cluster.hvparams = self.new_hvparams
1108
    if self.op.os_hvp:
1109
      self.cluster.os_hvp = self.new_os_hvp
1110
    if self.op.enabled_hypervisors is not None:
1111
      self.cluster.hvparams = self.new_hvparams
1112
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1113
    if self.op.beparams:
1114
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1115
    if self.op.nicparams:
1116
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1117
    if self.op.ipolicy:
1118
      self.cluster.ipolicy = self.new_ipolicy
1119
    if self.op.osparams:
1120
      self.cluster.osparams = self.new_osp
1121
    if self.op.ndparams:
1122
      self.cluster.ndparams = self.new_ndparams
1123
    if self.op.diskparams:
1124
      self.cluster.diskparams = self.new_diskparams
1125
    if self.op.hv_state:
1126
      self.cluster.hv_state_static = self.new_hv_state
1127
    if self.op.disk_state:
1128
      self.cluster.disk_state_static = self.new_disk_state
1129

    
1130
    if self.op.candidate_pool_size is not None:
1131
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1132
      # we need to update the pool size here, otherwise the save will fail
1133
      AdjustCandidatePool(self, [])
1134

    
1135
    if self.op.maintain_node_health is not None:
1136
      if self.op.maintain_node_health and not constants.ENABLE_CONFD:
1137
        feedback_fn("Note: CONFD was disabled at build time, node health"
1138
                    " maintenance is not useful (still enabling it)")
1139
      self.cluster.maintain_node_health = self.op.maintain_node_health
1140

    
1141
    if self.op.modify_etc_hosts is not None:
1142
      self.cluster.modify_etc_hosts = self.op.modify_etc_hosts
1143

    
1144
    if self.op.prealloc_wipe_disks is not None:
1145
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
1146

    
1147
    if self.op.add_uids is not None:
1148
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
1149

    
1150
    if self.op.remove_uids is not None:
1151
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
1152

    
1153
    if self.op.uid_pool is not None:
1154
      self.cluster.uid_pool = self.op.uid_pool
1155

    
1156
    if self.op.default_iallocator is not None:
1157
      self.cluster.default_iallocator = self.op.default_iallocator
1158

    
1159
    if self.op.reserved_lvs is not None:
1160
      self.cluster.reserved_lvs = self.op.reserved_lvs
1161

    
1162
    if self.op.use_external_mip_script is not None:
1163
      self.cluster.use_external_mip_script = self.op.use_external_mip_script
1164

    
1165
    def helper_os(aname, mods, desc):
1166
      desc += " OS list"
1167
      lst = getattr(self.cluster, aname)
1168
      for key, val in mods:
1169
        if key == constants.DDM_ADD:
1170
          if val in lst:
1171
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1172
          else:
1173
            lst.append(val)
1174
        elif key == constants.DDM_REMOVE:
1175
          if val in lst:
1176
            lst.remove(val)
1177
          else:
1178
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1179
        else:
1180
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
1181

    
1182
    if self.op.hidden_os:
1183
      helper_os("hidden_os", self.op.hidden_os, "hidden")
1184

    
1185
    if self.op.blacklisted_os:
1186
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1187

    
1188
    if self.op.master_netdev:
1189
      master_params = self.cfg.GetMasterNetworkParameters()
1190
      ems = self.cfg.GetUseExternalMipScript()
1191
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
1192
                  self.cluster.master_netdev)
1193
      result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
1194
                                                       master_params, ems)
1195
      if not self.op.force:
1196
        result.Raise("Could not disable the master ip")
1197
      else:
1198
        if result.fail_msg:
1199
          msg = ("Could not disable the master ip (continuing anyway): %s" %
1200
                 result.fail_msg)
1201
          feedback_fn(msg)
1202
      feedback_fn("Changing master_netdev from %s to %s" %
1203
                  (master_params.netdev, self.op.master_netdev))
1204
      self.cluster.master_netdev = self.op.master_netdev
1205

    
1206
    if self.op.master_netmask:
1207
      master_params = self.cfg.GetMasterNetworkParameters()
1208
      feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1209
      result = self.rpc.call_node_change_master_netmask(
1210
                 master_params.uuid, master_params.netmask,
1211
                 self.op.master_netmask, master_params.ip,
1212
                 master_params.netdev)
1213
      result.Warn("Could not change the master IP netmask", feedback_fn)
1214
      self.cluster.master_netmask = self.op.master_netmask
1215

    
1216
    self.cfg.Update(self.cluster, feedback_fn)
1217

    
1218
    if self.op.master_netdev:
1219
      master_params = self.cfg.GetMasterNetworkParameters()
1220
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
1221
                  self.op.master_netdev)
1222
      ems = self.cfg.GetUseExternalMipScript()
1223
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
1224
                                                     master_params, ems)
1225
      result.Warn("Could not re-enable the master ip on the master,"
1226
                  " please restart manually", self.LogWarning)
1227

    
1228

    
1229
class LUClusterVerify(NoHooksLU):
1230
  """Submits all jobs necessary to verify the cluster.
1231

1232
  """
1233
  REQ_BGL = False
1234

    
1235
  def ExpandNames(self):
1236
    self.needed_locks = {}
1237

    
1238
  def Exec(self, feedback_fn):
1239
    jobs = []
1240

    
1241
    if self.op.group_name:
1242
      groups = [self.op.group_name]
1243
      depends_fn = lambda: None
1244
    else:
1245
      groups = self.cfg.GetNodeGroupList()
1246

    
1247
      # Verify global configuration
1248
      jobs.append([
1249
        opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1250
        ])
1251

    
1252
      # Always depend on global verification
1253
      depends_fn = lambda: [(-len(jobs), [])]
1254

    
1255
    jobs.extend(
1256
      [opcodes.OpClusterVerifyGroup(group_name=group,
1257
                                    ignore_errors=self.op.ignore_errors,
1258
                                    depends=depends_fn())]
1259
      for group in groups)
1260

    
1261
    # Fix up all parameters
1262
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1263
      op.debug_simulate_errors = self.op.debug_simulate_errors
1264
      op.verbose = self.op.verbose
1265
      op.error_codes = self.op.error_codes
1266
      try:
1267
        op.skip_checks = self.op.skip_checks
1268
      except AttributeError:
1269
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1270

    
1271
    return ResultWithJobs(jobs)
1272

    
1273

    
1274
class _VerifyErrors(object):
1275
  """Mix-in for cluster/group verify LUs.
1276

1277
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1278
  self.op and self._feedback_fn to be available.)
1279

1280
  """
1281

    
1282
  ETYPE_FIELD = "code"
1283
  ETYPE_ERROR = "ERROR"
1284
  ETYPE_WARNING = "WARNING"
1285

    
1286
  def _Error(self, ecode, item, msg, *args, **kwargs):
1287
    """Format an error message.
1288

1289
    Based on the opcode's error_codes parameter, either format a
1290
    parseable error code, or a simpler error string.
1291

1292
    This must be called only from Exec and functions called from Exec.
1293

1294
    """
1295
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1296
    itype, etxt, _ = ecode
1297
    # If the error code is in the list of ignored errors, demote the error to a
1298
    # warning
1299
    if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1300
      ltype = self.ETYPE_WARNING
1301
    # first complete the msg
1302
    if args:
1303
      msg = msg % args
1304
    # then format the whole message
1305
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1306
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1307
    else:
1308
      if item:
1309
        item = " " + item
1310
      else:
1311
        item = ""
1312
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1313
    # and finally report it via the feedback_fn
1314
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1315
    # do not mark the operation as failed for WARN cases only
1316
    if ltype == self.ETYPE_ERROR:
1317
      self.bad = True
1318

    
1319
  def _ErrorIf(self, cond, *args, **kwargs):
1320
    """Log an error message if the passed condition is True.
1321

1322
    """
1323
    if (bool(cond)
1324
        or self.op.debug_simulate_errors): # pylint: disable=E1101
1325
      self._Error(*args, **kwargs)
1326

    
1327

    
1328
def _VerifyCertificate(filename):
1329
  """Verifies a certificate for L{LUClusterVerifyConfig}.
1330

1331
  @type filename: string
1332
  @param filename: Path to PEM file
1333

1334
  """
1335
  try:
1336
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1337
                                           utils.ReadFile(filename))
1338
  except Exception, err: # pylint: disable=W0703
1339
    return (LUClusterVerifyConfig.ETYPE_ERROR,
1340
            "Failed to load X509 certificate %s: %s" % (filename, err))
1341

    
1342
  (errcode, msg) = \
1343
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1344
                                constants.SSL_CERT_EXPIRATION_ERROR)
1345

    
1346
  if msg:
1347
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1348
  else:
1349
    fnamemsg = None
1350

    
1351
  if errcode is None:
1352
    return (None, fnamemsg)
1353
  elif errcode == utils.CERT_WARNING:
1354
    return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1355
  elif errcode == utils.CERT_ERROR:
1356
    return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1357

    
1358
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1359

    
1360

    
1361
def _GetAllHypervisorParameters(cluster, instances):
1362
  """Compute the set of all hypervisor parameters.
1363

1364
  @type cluster: L{objects.Cluster}
1365
  @param cluster: the cluster object
1366
  @param instances: list of L{objects.Instance}
1367
  @param instances: additional instances from which to obtain parameters
1368
  @rtype: list of (origin, hypervisor, parameters)
1369
  @return: a list with all parameters found, indicating the hypervisor they
1370
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1371

1372
  """
1373
  hvp_data = []
1374

    
1375
  for hv_name in cluster.enabled_hypervisors:
1376
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1377

    
1378
  for os_name, os_hvp in cluster.os_hvp.items():
1379
    for hv_name, hv_params in os_hvp.items():
1380
      if hv_params:
1381
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1382
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1383

    
1384
  # TODO: collapse identical parameter values in a single one
1385
  for instance in instances:
1386
    if instance.hvparams:
1387
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1388
                       cluster.FillHV(instance)))
1389

    
1390
  return hvp_data
1391

    
1392

    
1393
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1394
  """Verifies the cluster config.
1395

1396
  """
1397
  REQ_BGL = False
1398

    
1399
  def _VerifyHVP(self, hvp_data):
1400
    """Verifies locally the syntax of the hypervisor parameters.
1401

1402
    """
1403
    for item, hv_name, hv_params in hvp_data:
1404
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1405
             (item, hv_name))
1406
      try:
1407
        hv_class = hypervisor.GetHypervisorClass(hv_name)
1408
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1409
        hv_class.CheckParameterSyntax(hv_params)
1410
      except errors.GenericError, err:
1411
        self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1412

    
1413
  def ExpandNames(self):
1414
    self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1415
    self.share_locks = ShareAll()
1416

    
1417
  def CheckPrereq(self):
1418
    """Check prerequisites.
1419

1420
    """
1421
    # Retrieve all information
1422
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1423
    self.all_node_info = self.cfg.GetAllNodesInfo()
1424
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1425

    
1426
  def Exec(self, feedback_fn):
1427
    """Verify integrity of cluster, performing various test on nodes.
1428

1429
    """
1430
    self.bad = False
1431
    self._feedback_fn = feedback_fn
1432

    
1433
    feedback_fn("* Verifying cluster config")
1434

    
1435
    for msg in self.cfg.VerifyConfig():
1436
      self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1437

    
1438
    feedback_fn("* Verifying cluster certificate files")
1439

    
1440
    for cert_filename in pathutils.ALL_CERT_FILES:
1441
      (errcode, msg) = _VerifyCertificate(cert_filename)
1442
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1443

    
1444
    self._ErrorIf(not utils.CanRead(constants.CONFD_USER,
1445
                                    pathutils.NODED_CERT_FILE),
1446
                  constants.CV_ECLUSTERCERT,
1447
                  None,
1448
                  pathutils.NODED_CERT_FILE + " must be accessible by the " +
1449
                    constants.CONFD_USER + " user")
1450

    
1451
    feedback_fn("* Verifying hypervisor parameters")
1452

    
1453
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1454
                                                self.all_inst_info.values()))
1455

    
1456
    feedback_fn("* Verifying all nodes belong to an existing group")
1457

    
1458
    # We do this verification here because, should this bogus circumstance
1459
    # occur, it would never be caught by VerifyGroup, which only acts on
1460
    # nodes/instances reachable from existing node groups.
1461

    
1462
    dangling_nodes = set(node for node in self.all_node_info.values()
1463
                         if node.group not in self.all_group_info)
1464

    
1465
    dangling_instances = {}
1466
    no_node_instances = []
1467

    
1468
    for inst in self.all_inst_info.values():
1469
      if inst.primary_node in [node.uuid for node in dangling_nodes]:
1470
        dangling_instances.setdefault(inst.primary_node, []).append(inst)
1471
      elif inst.primary_node not in self.all_node_info:
1472
        no_node_instances.append(inst)
1473

    
1474
    pretty_dangling = [
1475
        "%s (%s)" %
1476
        (node.name,
1477
         utils.CommaJoin(inst.name for
1478
                         inst in dangling_instances.get(node.uuid, [])))
1479
        for node in dangling_nodes]
1480

    
1481
    self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1482
                  None,
1483
                  "the following nodes (and their instances) belong to a non"
1484
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1485

    
1486
    self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1487
                  None,
1488
                  "the following instances have a non-existing primary-node:"
1489
                  " %s", utils.CommaJoin(inst.name for
1490
                                         inst in no_node_instances))
1491

    
1492
    return not self.bad
1493

    
1494

    
1495
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1496
  """Verifies the status of a node group.
1497

1498
  """
1499
  HPATH = "cluster-verify"
1500
  HTYPE = constants.HTYPE_CLUSTER
1501
  REQ_BGL = False
1502

    
1503
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1504

    
1505
  class NodeImage(object):
1506
    """A class representing the logical and physical status of a node.
1507

1508
    @type uuid: string
1509
    @ivar uuid: the node UUID to which this object refers
1510
    @ivar volumes: a structure as returned from
1511
        L{ganeti.backend.GetVolumeList} (runtime)
1512
    @ivar instances: a list of running instances (runtime)
1513
    @ivar pinst: list of configured primary instances (config)
1514
    @ivar sinst: list of configured secondary instances (config)
1515
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1516
        instances for which this node is secondary (config)
1517
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1518
    @ivar dfree: free disk, as reported by the node (runtime)
1519
    @ivar offline: the offline status (config)
1520
    @type rpc_fail: boolean
1521
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1522
        not whether the individual keys were correct) (runtime)
1523
    @type lvm_fail: boolean
1524
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1525
    @type hyp_fail: boolean
1526
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1527
    @type ghost: boolean
1528
    @ivar ghost: whether this is a known node or not (config)
1529
    @type os_fail: boolean
1530
    @ivar os_fail: whether the RPC call didn't return valid OS data
1531
    @type oslist: list
1532
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1533
    @type vm_capable: boolean
1534
    @ivar vm_capable: whether the node can host instances
1535
    @type pv_min: float
1536
    @ivar pv_min: size in MiB of the smallest PVs
1537
    @type pv_max: float
1538
    @ivar pv_max: size in MiB of the biggest PVs
1539

1540
    """
1541
    def __init__(self, offline=False, uuid=None, vm_capable=True):
1542
      self.uuid = uuid
1543
      self.volumes = {}
1544
      self.instances = []
1545
      self.pinst = []
1546
      self.sinst = []
1547
      self.sbp = {}
1548
      self.mfree = 0
1549
      self.dfree = 0
1550
      self.offline = offline
1551
      self.vm_capable = vm_capable
1552
      self.rpc_fail = False
1553
      self.lvm_fail = False
1554
      self.hyp_fail = False
1555
      self.ghost = False
1556
      self.os_fail = False
1557
      self.oslist = {}
1558
      self.pv_min = None
1559
      self.pv_max = None
1560

    
1561
  def ExpandNames(self):
1562
    # This raises errors.OpPrereqError on its own:
1563
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1564

    
1565
    # Get instances in node group; this is unsafe and needs verification later
1566
    inst_uuids = \
1567
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1568

    
1569
    self.needed_locks = {
1570
      locking.LEVEL_INSTANCE: self.cfg.GetInstanceNames(inst_uuids),
1571
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1572
      locking.LEVEL_NODE: [],
1573

    
1574
      # This opcode is run by watcher every five minutes and acquires all nodes
1575
      # for a group. It doesn't run for a long time, so it's better to acquire
1576
      # the node allocation lock as well.
1577
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1578
      }
1579

    
1580
    self.share_locks = ShareAll()
1581

    
1582
  def DeclareLocks(self, level):
1583
    if level == locking.LEVEL_NODE:
1584
      # Get members of node group; this is unsafe and needs verification later
1585
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1586

    
1587
      # In Exec(), we warn about mirrored instances that have primary and
1588
      # secondary living in separate node groups. To fully verify that
1589
      # volumes for these instances are healthy, we will need to do an
1590
      # extra call to their secondaries. We ensure here those nodes will
1591
      # be locked.
1592
      for inst_name in self.owned_locks(locking.LEVEL_INSTANCE):
1593
        # Important: access only the instances whose lock is owned
1594
        instance = self.cfg.GetInstanceInfoByName(inst_name)
1595
        if instance.disk_template in constants.DTS_INT_MIRROR:
1596
          nodes.update(instance.secondary_nodes)
1597

    
1598
      self.needed_locks[locking.LEVEL_NODE] = nodes
1599

    
1600
  def CheckPrereq(self):
1601
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1602
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1603

    
1604
    group_node_uuids = set(self.group_info.members)
1605
    group_inst_uuids = \
1606
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1607

    
1608
    unlocked_node_uuids = \
1609
        group_node_uuids.difference(self.owned_locks(locking.LEVEL_NODE))
1610

    
1611
    unlocked_inst_uuids = \
1612
        group_inst_uuids.difference(
1613
          [self.cfg.GetInstanceInfoByName(name).uuid
1614
           for name in self.owned_locks(locking.LEVEL_INSTANCE)])
1615

    
1616
    if unlocked_node_uuids:
1617
      raise errors.OpPrereqError(
1618
        "Missing lock for nodes: %s" %
1619
        utils.CommaJoin(self.cfg.GetNodeNames(unlocked_node_uuids)),
1620
        errors.ECODE_STATE)
1621

    
1622
    if unlocked_inst_uuids:
1623
      raise errors.OpPrereqError(
1624
        "Missing lock for instances: %s" %
1625
        utils.CommaJoin(self.cfg.GetInstanceNames(unlocked_inst_uuids)),
1626
        errors.ECODE_STATE)
1627

    
1628
    self.all_node_info = self.cfg.GetAllNodesInfo()
1629
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1630

    
1631
    self.my_node_uuids = group_node_uuids
1632
    self.my_node_info = dict((node_uuid, self.all_node_info[node_uuid])
1633
                             for node_uuid in group_node_uuids)
1634

    
1635
    self.my_inst_uuids = group_inst_uuids
1636
    self.my_inst_info = dict((inst_uuid, self.all_inst_info[inst_uuid])
1637
                             for inst_uuid in group_inst_uuids)
1638

    
1639
    # We detect here the nodes that will need the extra RPC calls for verifying
1640
    # split LV volumes; they should be locked.
1641
    extra_lv_nodes = set()
1642

    
1643
    for inst in self.my_inst_info.values():
1644
      if inst.disk_template in constants.DTS_INT_MIRROR:
1645
        for nuuid in inst.all_nodes:
1646
          if self.all_node_info[nuuid].group != self.group_uuid:
1647
            extra_lv_nodes.add(nuuid)
1648

    
1649
    unlocked_lv_nodes = \
1650
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1651

    
1652
    if unlocked_lv_nodes:
1653
      raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1654
                                 utils.CommaJoin(unlocked_lv_nodes),
1655
                                 errors.ECODE_STATE)
1656
    self.extra_lv_nodes = list(extra_lv_nodes)
1657

    
1658
  def _VerifyNode(self, ninfo, nresult):
1659
    """Perform some basic validation on data returned from a node.
1660

1661
      - check the result data structure is well formed and has all the
1662
        mandatory fields
1663
      - check ganeti version
1664

1665
    @type ninfo: L{objects.Node}
1666
    @param ninfo: the node to check
1667
    @param nresult: the results from the node
1668
    @rtype: boolean
1669
    @return: whether overall this call was successful (and we can expect
1670
         reasonable values in the respose)
1671

1672
    """
1673
    # main result, nresult should be a non-empty dict
1674
    test = not nresult or not isinstance(nresult, dict)
1675
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1676
                  "unable to verify node: no data returned")
1677
    if test:
1678
      return False
1679

    
1680
    # compares ganeti version
1681
    local_version = constants.PROTOCOL_VERSION
1682
    remote_version = nresult.get("version", None)
1683
    test = not (remote_version and
1684
                isinstance(remote_version, (list, tuple)) and
1685
                len(remote_version) == 2)
1686
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1687
                  "connection to node returned invalid data")
1688
    if test:
1689
      return False
1690

    
1691
    test = local_version != remote_version[0]
1692
    self._ErrorIf(test, constants.CV_ENODEVERSION, ninfo.name,
1693
                  "incompatible protocol versions: master %s,"
1694
                  " node %s", local_version, remote_version[0])
1695
    if test:
1696
      return False
1697

    
1698
    # node seems compatible, we can actually try to look into its results
1699

    
1700
    # full package version
1701
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1702
                  constants.CV_ENODEVERSION, ninfo.name,
1703
                  "software version mismatch: master %s, node %s",
1704
                  constants.RELEASE_VERSION, remote_version[1],
1705
                  code=self.ETYPE_WARNING)
1706

    
1707
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1708
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1709
      for hv_name, hv_result in hyp_result.iteritems():
1710
        test = hv_result is not None
1711
        self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
1712
                      "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1713

    
1714
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1715
    if ninfo.vm_capable and isinstance(hvp_result, list):
1716
      for item, hv_name, hv_result in hvp_result:
1717
        self._ErrorIf(True, constants.CV_ENODEHV, ninfo.name,
1718
                      "hypervisor %s parameter verify failure (source %s): %s",
1719
                      hv_name, item, hv_result)
1720

    
1721
    test = nresult.get(constants.NV_NODESETUP,
1722
                       ["Missing NODESETUP results"])
1723
    self._ErrorIf(test, constants.CV_ENODESETUP, ninfo.name,
1724
                  "node setup error: %s", "; ".join(test))
1725

    
1726
    return True
1727

    
1728
  def _VerifyNodeTime(self, ninfo, nresult,
1729
                      nvinfo_starttime, nvinfo_endtime):
1730
    """Check the node time.
1731

1732
    @type ninfo: L{objects.Node}
1733
    @param ninfo: the node to check
1734
    @param nresult: the remote results for the node
1735
    @param nvinfo_starttime: the start time of the RPC call
1736
    @param nvinfo_endtime: the end time of the RPC call
1737

1738
    """
1739
    ntime = nresult.get(constants.NV_TIME, None)
1740
    try:
1741
      ntime_merged = utils.MergeTime(ntime)
1742
    except (ValueError, TypeError):
1743
      self._ErrorIf(True, constants.CV_ENODETIME, ninfo.name,
1744
                    "Node returned invalid time")
1745
      return
1746

    
1747
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1748
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1749
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1750
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1751
    else:
1752
      ntime_diff = None
1753

    
1754
    self._ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, ninfo.name,
1755
                  "Node time diverges by at least %s from master node time",
1756
                  ntime_diff)
1757

    
1758
  def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1759
    """Check the node LVM results and update info for cross-node checks.
1760

1761
    @type ninfo: L{objects.Node}
1762
    @param ninfo: the node to check
1763
    @param nresult: the remote results for the node
1764
    @param vg_name: the configured VG name
1765
    @type nimg: L{NodeImage}
1766
    @param nimg: node image
1767

1768
    """
1769
    if vg_name is None:
1770
      return
1771

    
1772
    # checks vg existence and size > 20G
1773
    vglist = nresult.get(constants.NV_VGLIST, None)
1774
    test = not vglist
1775
    self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
1776
                  "unable to check volume groups")
1777
    if not test:
1778
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1779
                                            constants.MIN_VG_SIZE)
1780
      self._ErrorIf(vgstatus, constants.CV_ENODELVM, ninfo.name, vgstatus)
1781

    
1782
    # Check PVs
1783
    (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1784
    for em in errmsgs:
1785
      self._Error(constants.CV_ENODELVM, ninfo.name, em)
1786
    if pvminmax is not None:
1787
      (nimg.pv_min, nimg.pv_max) = pvminmax
1788

    
1789
  def _VerifyGroupDRBDVersion(self, node_verify_infos):
1790
    """Check cross-node DRBD version consistency.
1791

1792
    @type node_verify_infos: dict
1793
    @param node_verify_infos: infos about nodes as returned from the
1794
      node_verify call.
1795

1796
    """
1797
    node_versions = {}
1798
    for node_uuid, ndata in node_verify_infos.items():
1799
      nresult = ndata.payload
1800
      version = nresult.get(constants.NV_DRBDVERSION, "Missing DRBD version")
1801
      node_versions[node_uuid] = version
1802

    
1803
    if len(set(node_versions.values())) > 1:
1804
      for node_uuid, version in sorted(node_versions.items()):
1805
        msg = "DRBD version mismatch: %s" % version
1806
        self._Error(constants.CV_ENODEDRBDHELPER, node_uuid, msg,
1807
                    code=self.ETYPE_WARNING)
1808

    
1809
  def _VerifyGroupLVM(self, node_image, vg_name):
1810
    """Check cross-node consistency in LVM.
1811

1812
    @type node_image: dict
1813
    @param node_image: info about nodes, mapping from node to names to
1814
      L{NodeImage} objects
1815
    @param vg_name: the configured VG name
1816

1817
    """
1818
    if vg_name is None:
1819
      return
1820

    
1821
    # Only exclusive storage needs this kind of checks
1822
    if not self._exclusive_storage:
1823
      return
1824

    
1825
    # exclusive_storage wants all PVs to have the same size (approximately),
1826
    # if the smallest and the biggest ones are okay, everything is fine.
1827
    # pv_min is None iff pv_max is None
1828
    vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1829
    if not vals:
1830
      return
1831
    (pvmin, minnode_uuid) = min((ni.pv_min, ni.uuid) for ni in vals)
1832
    (pvmax, maxnode_uuid) = max((ni.pv_max, ni.uuid) for ni in vals)
1833
    bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1834
    self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1835
                  "PV sizes differ too much in the group; smallest (%s MB) is"
1836
                  " on %s, biggest (%s MB) is on %s",
1837
                  pvmin, self.cfg.GetNodeName(minnode_uuid),
1838
                  pvmax, self.cfg.GetNodeName(maxnode_uuid))
1839

    
1840
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1841
    """Check the node bridges.
1842

1843
    @type ninfo: L{objects.Node}
1844
    @param ninfo: the node to check
1845
    @param nresult: the remote results for the node
1846
    @param bridges: the expected list of bridges
1847

1848
    """
1849
    if not bridges:
1850
      return
1851

    
1852
    missing = nresult.get(constants.NV_BRIDGES, None)
1853
    test = not isinstance(missing, list)
1854
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1855
                  "did not return valid bridge information")
1856
    if not test:
1857
      self._ErrorIf(bool(missing), constants.CV_ENODENET, ninfo.name,
1858
                    "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1859

    
1860
  def _VerifyNodeUserScripts(self, ninfo, nresult):
1861
    """Check the results of user scripts presence and executability on the node
1862

1863
    @type ninfo: L{objects.Node}
1864
    @param ninfo: the node to check
1865
    @param nresult: the remote results for the node
1866

1867
    """
1868
    test = not constants.NV_USERSCRIPTS in nresult
1869
    self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1870
                  "did not return user scripts information")
1871

    
1872
    broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
1873
    if not test:
1874
      self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1875
                    "user scripts not present or not executable: %s" %
1876
                    utils.CommaJoin(sorted(broken_scripts)))
1877

    
1878
  def _VerifyNodeNetwork(self, ninfo, nresult):
1879
    """Check the node network connectivity results.
1880

1881
    @type ninfo: L{objects.Node}
1882
    @param ninfo: the node to check
1883
    @param nresult: the remote results for the node
1884

1885
    """
1886
    test = constants.NV_NODELIST not in nresult
1887
    self._ErrorIf(test, constants.CV_ENODESSH, ninfo.name,
1888
                  "node hasn't returned node ssh connectivity data")
1889
    if not test:
1890
      if nresult[constants.NV_NODELIST]:
1891
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1892
          self._ErrorIf(True, constants.CV_ENODESSH, ninfo.name,
1893
                        "ssh communication with node '%s': %s", a_node, a_msg)
1894

    
1895
    test = constants.NV_NODENETTEST not in nresult
1896
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1897
                  "node hasn't returned node tcp connectivity data")
1898
    if not test:
1899
      if nresult[constants.NV_NODENETTEST]:
1900
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1901
        for anode in nlist:
1902
          self._ErrorIf(True, constants.CV_ENODENET, ninfo.name,
1903
                        "tcp communication with node '%s': %s",
1904
                        anode, nresult[constants.NV_NODENETTEST][anode])
1905

    
1906
    test = constants.NV_MASTERIP not in nresult
1907
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1908
                  "node hasn't returned node master IP reachability data")
1909
    if not test:
1910
      if not nresult[constants.NV_MASTERIP]:
1911
        if ninfo.uuid == self.master_node:
1912
          msg = "the master node cannot reach the master IP (not configured?)"
1913
        else:
1914
          msg = "cannot reach the master IP"
1915
        self._ErrorIf(True, constants.CV_ENODENET, ninfo.name, msg)
1916

    
1917
  def _VerifyInstance(self, instance, node_image, diskstatus):
1918
    """Verify an instance.
1919

1920
    This function checks to see if the required block devices are
1921
    available on the instance's node, and that the nodes are in the correct
1922
    state.
1923

1924
    """
1925
    pnode_uuid = instance.primary_node
1926
    pnode_img = node_image[pnode_uuid]
1927
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
1928

    
1929
    node_vol_should = {}
1930
    instance.MapLVsByNode(node_vol_should)
1931

    
1932
    cluster = self.cfg.GetClusterInfo()
1933
    ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
1934
                                                            self.group_info)
1935
    err = ComputeIPolicyInstanceViolation(ipolicy, instance, self.cfg)
1936
    self._ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance.name,
1937
                  utils.CommaJoin(err), code=self.ETYPE_WARNING)
1938

    
1939
    for node_uuid in node_vol_should:
1940
      n_img = node_image[node_uuid]
1941
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1942
        # ignore missing volumes on offline or broken nodes
1943
        continue
1944
      for volume in node_vol_should[node_uuid]:
1945
        test = volume not in n_img.volumes
1946
        self._ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance.name,
1947
                      "volume %s missing on node %s", volume,
1948
                      self.cfg.GetNodeName(node_uuid))
1949

    
1950
    if instance.admin_state == constants.ADMINST_UP:
1951
      test = instance.uuid not in pnode_img.instances and not pnode_img.offline
1952
      self._ErrorIf(test, constants.CV_EINSTANCEDOWN, instance.name,
1953
                    "instance not running on its primary node %s",
1954
                     self.cfg.GetNodeName(pnode_uuid))
1955
      self._ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE,
1956
                    instance.name, "instance is marked as running and lives on"
1957
                    " offline node %s", self.cfg.GetNodeName(pnode_uuid))
1958

    
1959
    diskdata = [(nname, success, status, idx)
1960
                for (nname, disks) in diskstatus.items()
1961
                for idx, (success, status) in enumerate(disks)]
1962

    
1963
    for nname, success, bdev_status, idx in diskdata:
1964
      # the 'ghost node' construction in Exec() ensures that we have a
1965
      # node here
1966
      snode = node_image[nname]
1967
      bad_snode = snode.ghost or snode.offline
1968
      self._ErrorIf(instance.disks_active and
1969
                    not success and not bad_snode,
1970
                    constants.CV_EINSTANCEFAULTYDISK, instance.name,
1971
                    "couldn't retrieve status for disk/%s on %s: %s",
1972
                    idx, self.cfg.GetNodeName(nname), bdev_status)
1973

    
1974
      if instance.disks_active and success and \
1975
         (bdev_status.is_degraded or
1976
          bdev_status.ldisk_status != constants.LDS_OKAY):
1977
        msg = "disk/%s on %s" % (idx, self.cfg.GetNodeName(nname))
1978
        if bdev_status.is_degraded:
1979
          msg += " is degraded"
1980
        if bdev_status.ldisk_status != constants.LDS_OKAY:
1981
          msg += "; state is '%s'" % \
1982
                 constants.LDS_NAMES[bdev_status.ldisk_status]
1983

    
1984
        self._Error(constants.CV_EINSTANCEFAULTYDISK, instance.name, msg)
1985

    
1986
    self._ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
1987
                  constants.CV_ENODERPC, self.cfg.GetNodeName(pnode_uuid),
1988
                  "instance %s, connection to primary node failed",
1989
                  instance.name)
1990

    
1991
    self._ErrorIf(len(instance.secondary_nodes) > 1,
1992
                  constants.CV_EINSTANCELAYOUT, instance.name,
1993
                  "instance has multiple secondary nodes: %s",
1994
                  utils.CommaJoin(instance.secondary_nodes),
1995
                  code=self.ETYPE_WARNING)
1996

    
1997
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, instance.all_nodes)
1998
    if any(es_flags.values()):
1999
      if instance.disk_template not in constants.DTS_EXCL_STORAGE:
2000
        # Disk template not compatible with exclusive_storage: no instance
2001
        # node should have the flag set
2002
        es_nodes = [n
2003
                    for (n, es) in es_flags.items()
2004
                    if es]
2005
        self._Error(constants.CV_EINSTANCEUNSUITABLENODE, instance.name,
2006
                    "instance has template %s, which is not supported on nodes"
2007
                    " that have exclusive storage set: %s",
2008
                    instance.disk_template,
2009
                    utils.CommaJoin(self.cfg.GetNodeNames(es_nodes)))
2010
      for (idx, disk) in enumerate(instance.disks):
2011
        self._ErrorIf(disk.spindles is None,
2012
                      constants.CV_EINSTANCEMISSINGCFGPARAMETER, instance.name,
2013
                      "number of spindles not configured for disk %s while"
2014
                      " exclusive storage is enabled, try running"
2015
                      " gnt-cluster repair-disk-sizes", idx)
2016

    
2017
    if instance.disk_template in constants.DTS_INT_MIRROR:
2018
      instance_nodes = utils.NiceSort(instance.all_nodes)
2019
      instance_groups = {}
2020

    
2021
      for node_uuid in instance_nodes:
2022
        instance_groups.setdefault(self.all_node_info[node_uuid].group,
2023
                                   []).append(node_uuid)
2024

    
2025
      pretty_list = [
2026
        "%s (group %s)" % (utils.CommaJoin(self.cfg.GetNodeNames(nodes)),
2027
                           groupinfo[group].name)
2028
        # Sort so that we always list the primary node first.
2029
        for group, nodes in sorted(instance_groups.items(),
2030
                                   key=lambda (_, nodes): pnode_uuid in nodes,
2031
                                   reverse=True)]
2032

    
2033
      self._ErrorIf(len(instance_groups) > 1,
2034
                    constants.CV_EINSTANCESPLITGROUPS,
2035
                    instance.name, "instance has primary and secondary nodes in"
2036
                    " different groups: %s", utils.CommaJoin(pretty_list),
2037
                    code=self.ETYPE_WARNING)
2038

    
2039
    inst_nodes_offline = []
2040
    for snode in instance.secondary_nodes:
2041
      s_img = node_image[snode]
2042
      self._ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
2043
                    self.cfg.GetNodeName(snode),
2044
                    "instance %s, connection to secondary node failed",
2045
                    instance.name)
2046

    
2047
      if s_img.offline:
2048
        inst_nodes_offline.append(snode)
2049

    
2050
    # warn that the instance lives on offline nodes
2051
    self._ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE,
2052
                  instance.name, "instance has offline secondary node(s) %s",
2053
                  utils.CommaJoin(self.cfg.GetNodeNames(inst_nodes_offline)))
2054
    # ... or ghost/non-vm_capable nodes
2055
    for node_uuid in instance.all_nodes:
2056
      self._ErrorIf(node_image[node_uuid].ghost, constants.CV_EINSTANCEBADNODE,
2057
                    instance.name, "instance lives on ghost node %s",
2058
                    self.cfg.GetNodeName(node_uuid))
2059
      self._ErrorIf(not node_image[node_uuid].vm_capable,
2060
                    constants.CV_EINSTANCEBADNODE, instance.name,
2061
                    "instance lives on non-vm_capable node %s",
2062
                    self.cfg.GetNodeName(node_uuid))
2063

    
2064
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2065
    """Verify if there are any unknown volumes in the cluster.
2066

2067
    The .os, .swap and backup volumes are ignored. All other volumes are
2068
    reported as unknown.
2069

2070
    @type reserved: L{ganeti.utils.FieldSet}
2071
    @param reserved: a FieldSet of reserved volume names
2072

2073
    """
2074
    for node_uuid, n_img in node_image.items():
2075
      if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
2076
          self.all_node_info[node_uuid].group != self.group_uuid):
2077
        # skip non-healthy nodes
2078
        continue
2079
      for volume in n_img.volumes:
2080
        test = ((node_uuid not in node_vol_should or
2081
                volume not in node_vol_should[node_uuid]) and
2082
                not reserved.Matches(volume))
2083
        self._ErrorIf(test, constants.CV_ENODEORPHANLV,
2084
                      self.cfg.GetNodeName(node_uuid),
2085
                      "volume %s is unknown", volume)
2086

    
2087
  def _VerifyNPlusOneMemory(self, node_image, all_insts):
2088
    """Verify N+1 Memory Resilience.
2089

2090
    Check that if one single node dies we can still start all the
2091
    instances it was primary for.
2092

2093
    """
2094
    cluster_info = self.cfg.GetClusterInfo()
2095
    for node_uuid, n_img in node_image.items():
2096
      # This code checks that every node which is now listed as
2097
      # secondary has enough memory to host all instances it is
2098
      # supposed to should a single other node in the cluster fail.
2099
      # FIXME: not ready for failover to an arbitrary node
2100
      # FIXME: does not support file-backed instances
2101
      # WARNING: we currently take into account down instances as well
2102
      # as up ones, considering that even if they're down someone
2103
      # might want to start them even in the event of a node failure.
2104
      if n_img.offline or \
2105
         self.all_node_info[node_uuid].group != self.group_uuid:
2106
        # we're skipping nodes marked offline and nodes in other groups from
2107
        # the N+1 warning, since most likely we don't have good memory
2108
        # infromation from them; we already list instances living on such
2109
        # nodes, and that's enough warning
2110
        continue
2111
      #TODO(dynmem): also consider ballooning out other instances
2112
      for prinode, inst_uuids in n_img.sbp.items():
2113
        needed_mem = 0
2114
        for inst_uuid in inst_uuids:
2115
          bep = cluster_info.FillBE(all_insts[inst_uuid])
2116
          if bep[constants.BE_AUTO_BALANCE]:
2117
            needed_mem += bep[constants.BE_MINMEM]
2118
        test = n_img.mfree < needed_mem
2119
        self._ErrorIf(test, constants.CV_ENODEN1,
2120
                      self.cfg.GetNodeName(node_uuid),
2121
                      "not enough memory to accomodate instance failovers"
2122
                      " should node %s fail (%dMiB needed, %dMiB available)",
2123
                      self.cfg.GetNodeName(prinode), needed_mem, n_img.mfree)
2124

    
2125
  def _VerifyFiles(self, nodes, master_node_uuid, all_nvinfo,
2126
                   (files_all, files_opt, files_mc, files_vm)):
2127
    """Verifies file checksums collected from all nodes.
2128

2129
    @param nodes: List of L{objects.Node} objects
2130
    @param master_node_uuid: UUID of master node
2131
    @param all_nvinfo: RPC results
2132

2133
    """
2134
    # Define functions determining which nodes to consider for a file
2135
    files2nodefn = [
2136
      (files_all, None),
2137
      (files_mc, lambda node: (node.master_candidate or
2138
                               node.uuid == master_node_uuid)),
2139
      (files_vm, lambda node: node.vm_capable),
2140
      ]
2141

    
2142
    # Build mapping from filename to list of nodes which should have the file
2143
    nodefiles = {}
2144
    for (files, fn) in files2nodefn:
2145
      if fn is None:
2146
        filenodes = nodes
2147
      else:
2148
        filenodes = filter(fn, nodes)
2149
      nodefiles.update((filename,
2150
                        frozenset(map(operator.attrgetter("uuid"), filenodes)))
2151
                       for filename in files)
2152

    
2153
    assert set(nodefiles) == (files_all | files_mc | files_vm)
2154

    
2155
    fileinfo = dict((filename, {}) for filename in nodefiles)
2156
    ignore_nodes = set()
2157

    
2158
    for node in nodes:
2159
      if node.offline:
2160
        ignore_nodes.add(node.uuid)
2161
        continue
2162

    
2163
      nresult = all_nvinfo[node.uuid]
2164

    
2165
      if nresult.fail_msg or not nresult.payload:
2166
        node_files = None
2167
      else:
2168
        fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
2169
        node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2170
                          for (key, value) in fingerprints.items())
2171
        del fingerprints
2172

    
2173
      test = not (node_files and isinstance(node_files, dict))
2174
      self._ErrorIf(test, constants.CV_ENODEFILECHECK, node.name,
2175
                    "Node did not return file checksum data")
2176
      if test:
2177
        ignore_nodes.add(node.uuid)
2178
        continue
2179

    
2180
      # Build per-checksum mapping from filename to nodes having it
2181
      for (filename, checksum) in node_files.items():
2182
        assert filename in nodefiles
2183
        fileinfo[filename].setdefault(checksum, set()).add(node.uuid)
2184

    
2185
    for (filename, checksums) in fileinfo.items():
2186
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2187

    
2188
      # Nodes having the file
2189
      with_file = frozenset(node_uuid
2190
                            for node_uuids in fileinfo[filename].values()
2191
                            for node_uuid in node_uuids) - ignore_nodes
2192

    
2193
      expected_nodes = nodefiles[filename] - ignore_nodes
2194

    
2195
      # Nodes missing file
2196
      missing_file = expected_nodes - with_file
2197

    
2198
      if filename in files_opt:
2199
        # All or no nodes
2200
        self._ErrorIf(missing_file and missing_file != expected_nodes,
2201
                      constants.CV_ECLUSTERFILECHECK, None,
2202
                      "File %s is optional, but it must exist on all or no"
2203
                      " nodes (not found on %s)",
2204
                      filename,
2205
                      utils.CommaJoin(
2206
                        utils.NiceSort(
2207
                          map(self.cfg.GetNodeName, missing_file))))
2208
      else:
2209
        self._ErrorIf(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2210
                      "File %s is missing from node(s) %s", filename,
2211
                      utils.CommaJoin(
2212
                        utils.NiceSort(
2213
                          map(self.cfg.GetNodeName, missing_file))))
2214

    
2215
        # Warn if a node has a file it shouldn't
2216
        unexpected = with_file - expected_nodes
2217
        self._ErrorIf(unexpected,
2218
                      constants.CV_ECLUSTERFILECHECK, None,
2219
                      "File %s should not exist on node(s) %s",
2220
                      filename, utils.CommaJoin(
2221
                        utils.NiceSort(map(self.cfg.GetNodeName, unexpected))))
2222

    
2223
      # See if there are multiple versions of the file
2224
      test = len(checksums) > 1
2225
      if test:
2226
        variants = ["variant %s on %s" %
2227
                    (idx + 1,
2228
                     utils.CommaJoin(utils.NiceSort(
2229
                       map(self.cfg.GetNodeName, node_uuids))))
2230
                    for (idx, (checksum, node_uuids)) in
2231
                      enumerate(sorted(checksums.items()))]
2232
      else:
2233
        variants = []
2234

    
2235
      self._ErrorIf(test, constants.CV_ECLUSTERFILECHECK, None,
2236
                    "File %s found with %s different checksums (%s)",
2237
                    filename, len(checksums), "; ".join(variants))
2238

    
2239
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2240
                      drbd_map):
2241
    """Verifies and the node DRBD status.
2242

2243
    @type ninfo: L{objects.Node}
2244
    @param ninfo: the node to check
2245
    @param nresult: the remote results for the node
2246
    @param instanceinfo: the dict of instances
2247
    @param drbd_helper: the configured DRBD usermode helper
2248
    @param drbd_map: the DRBD map as returned by
2249
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2250

2251
    """
2252
    if drbd_helper:
2253
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2254
      test = (helper_result is None)
2255
      self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2256
                    "no drbd usermode helper returned")
2257
      if helper_result:
2258
        status, payload = helper_result
2259
        test = not status
2260
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2261
                      "drbd usermode helper check unsuccessful: %s", payload)
2262
        test = status and (payload != drbd_helper)
2263
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2264
                      "wrong drbd usermode helper: %s", payload)
2265

    
2266
    # compute the DRBD minors
2267
    node_drbd = {}
2268
    for minor, inst_uuid in drbd_map[ninfo.uuid].items():
2269
      test = inst_uuid not in instanceinfo
2270
      self._ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2271
                    "ghost instance '%s' in temporary DRBD map", inst_uuid)
2272
        # ghost instance should not be running, but otherwise we
2273
        # don't give double warnings (both ghost instance and
2274
        # unallocated minor in use)
2275
      if test:
2276
        node_drbd[minor] = (inst_uuid, False)
2277
      else:
2278
        instance = instanceinfo[inst_uuid]
2279
        node_drbd[minor] = (inst_uuid, instance.disks_active)
2280

    
2281
    # and now check them
2282
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2283
    test = not isinstance(used_minors, (tuple, list))
2284
    self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2285
                  "cannot parse drbd status file: %s", str(used_minors))
2286
    if test:
2287
      # we cannot check drbd status
2288
      return
2289

    
2290
    for minor, (inst_uuid, must_exist) in node_drbd.items():
2291
      test = minor not in used_minors and must_exist
2292
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2293
                    "drbd minor %d of instance %s is not active", minor,
2294
                    self.cfg.GetInstanceName(inst_uuid))
2295
    for minor in used_minors:
2296
      test = minor not in node_drbd
2297
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2298
                    "unallocated drbd minor %d is in use", minor)
2299

    
2300
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2301
    """Builds the node OS structures.
2302

2303
    @type ninfo: L{objects.Node}
2304
    @param ninfo: the node to check
2305
    @param nresult: the remote results for the node
2306
    @param nimg: the node image object
2307

2308
    """
2309
    remote_os = nresult.get(constants.NV_OSLIST, None)
2310
    test = (not isinstance(remote_os, list) or
2311
            not compat.all(isinstance(v, list) and len(v) == 7
2312
                           for v in remote_os))
2313

    
2314
    self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2315
                  "node hasn't returned valid OS data")
2316

    
2317
    nimg.os_fail = test
2318

    
2319
    if test:
2320
      return
2321

    
2322
    os_dict = {}
2323

    
2324
    for (name, os_path, status, diagnose,
2325
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2326

    
2327
      if name not in os_dict:
2328
        os_dict[name] = []
2329

    
2330
      # parameters is a list of lists instead of list of tuples due to
2331
      # JSON lacking a real tuple type, fix it:
2332
      parameters = [tuple(v) for v in parameters]
2333
      os_dict[name].append((os_path, status, diagnose,
2334
                            set(variants), set(parameters), set(api_ver)))
2335

    
2336
    nimg.oslist = os_dict
2337

    
2338
  def _VerifyNodeOS(self, ninfo, nimg, base):
2339
    """Verifies the node OS list.
2340

2341
    @type ninfo: L{objects.Node}
2342
    @param ninfo: the node to check
2343
    @param nimg: the node image object
2344
    @param base: the 'template' node we match against (e.g. from the master)
2345

2346
    """
2347
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2348

    
2349
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2350
    for os_name, os_data in nimg.oslist.items():
2351
      assert os_data, "Empty OS status for OS %s?!" % os_name
2352
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2353
      self._ErrorIf(not f_status, constants.CV_ENODEOS, ninfo.name,
2354
                    "Invalid OS %s (located at %s): %s",
2355
                    os_name, f_path, f_diag)
2356
      self._ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, ninfo.name,
2357
                    "OS '%s' has multiple entries"
2358
                    " (first one shadows the rest): %s",
2359
                    os_name, utils.CommaJoin([v[0] for v in os_data]))
2360
      # comparisons with the 'base' image
2361
      test = os_name not in base.oslist
2362
      self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2363
                    "Extra OS %s not present on reference node (%s)",
2364
                    os_name, self.cfg.GetNodeName(base.uuid))
2365
      if test:
2366
        continue
2367
      assert base.oslist[os_name], "Base node has empty OS status?"
2368
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2369
      if not b_status:
2370
        # base OS is invalid, skipping
2371
        continue
2372
      for kind, a, b in [("API version", f_api, b_api),
2373
                         ("variants list", f_var, b_var),
2374
                         ("parameters", beautify_params(f_param),
2375
                          beautify_params(b_param))]:
2376
        self._ErrorIf(a != b, constants.CV_ENODEOS, ninfo.name,
2377
                      "OS %s for %s differs from reference node %s:"
2378
                      " [%s] vs. [%s]", kind, os_name,
2379
                      self.cfg.GetNodeName(base.uuid),
2380
                      utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2381

    
2382
    # check any missing OSes
2383
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2384
    self._ErrorIf(missing, constants.CV_ENODEOS, ninfo.name,
2385
                  "OSes present on reference node %s"
2386
                  " but missing on this node: %s",
2387
                  self.cfg.GetNodeName(base.uuid), utils.CommaJoin(missing))
2388

    
2389
  def _VerifyAcceptedFileStoragePaths(self, ninfo, nresult, is_master):
2390
    """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2391

2392
    @type ninfo: L{objects.Node}
2393
    @param ninfo: the node to check
2394
    @param nresult: the remote results for the node
2395
    @type is_master: bool
2396
    @param is_master: Whether node is the master node
2397

2398
    """
2399
    cluster = self.cfg.GetClusterInfo()
2400
    if (is_master and
2401
        (cluster.IsFileStorageEnabled() or
2402
         cluster.IsSharedFileStorageEnabled())):
2403
      try:
2404
        fspaths = nresult[constants.NV_ACCEPTED_STORAGE_PATHS]
2405
      except KeyError:
2406
        # This should never happen
2407
        self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2408
                      "Node did not return forbidden file storage paths")
2409
      else:
2410
        self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2411
                      "Found forbidden file storage paths: %s",
2412
                      utils.CommaJoin(fspaths))
2413
    else:
2414
      self._ErrorIf(constants.NV_ACCEPTED_STORAGE_PATHS in nresult,
2415
                    constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2416
                    "Node should not have returned forbidden file storage"
2417
                    " paths")
2418

    
2419
  def _VerifyStoragePaths(self, ninfo, nresult, file_disk_template,
2420
                          verify_key, error_key):
2421
    """Verifies (file) storage paths.
2422

2423
    @type ninfo: L{objects.Node}
2424
    @param ninfo: the node to check
2425
    @param nresult: the remote results for the node
2426
    @type file_disk_template: string
2427
    @param file_disk_template: file-based disk template, whose directory
2428
        is supposed to be verified
2429
    @type verify_key: string
2430
    @param verify_key: key for the verification map of this file
2431
        verification step
2432
    @param error_key: error key to be added to the verification results
2433
        in case something goes wrong in this verification step
2434

2435
    """
2436
    assert (file_disk_template in
2437
            utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
2438
    cluster = self.cfg.GetClusterInfo()
2439
    if cluster.IsDiskTemplateEnabled(file_disk_template):
2440
      self._ErrorIf(
2441
          verify_key in nresult,
2442
          error_key, ninfo.name,
2443
          "The configured %s storage path is unusable: %s" %
2444
          (file_disk_template, nresult.get(verify_key)))
2445

    
2446
  def _VerifyFileStoragePaths(self, ninfo, nresult):
2447
    """Verifies (file) storage paths.
2448

2449
    @see: C{_VerifyStoragePaths}
2450

2451
    """
2452
    self._VerifyStoragePaths(
2453
        ninfo, nresult, constants.DT_FILE,
2454
        constants.NV_FILE_STORAGE_PATH,
2455
        constants.CV_ENODEFILESTORAGEPATHUNUSABLE)
2456

    
2457
  def _VerifySharedFileStoragePaths(self, ninfo, nresult):
2458
    """Verifies (file) storage paths.
2459

2460
    @see: C{_VerifyStoragePaths}
2461

2462
    """
2463
    self._VerifyStoragePaths(
2464
        ninfo, nresult, constants.DT_SHARED_FILE,
2465
        constants.NV_SHARED_FILE_STORAGE_PATH,
2466
        constants.CV_ENODESHAREDFILESTORAGEPATHUNUSABLE)
2467

    
2468
  def _VerifyOob(self, ninfo, nresult):
2469
    """Verifies out of band functionality of a node.
2470

2471
    @type ninfo: L{objects.Node}
2472
    @param ninfo: the node to check
2473
    @param nresult: the remote results for the node
2474

2475
    """
2476
    # We just have to verify the paths on master and/or master candidates
2477
    # as the oob helper is invoked on the master
2478
    if ((ninfo.master_candidate or ninfo.master_capable) and
2479
        constants.NV_OOB_PATHS in nresult):
2480
      for path_result in nresult[constants.NV_OOB_PATHS]:
2481
        self._ErrorIf(path_result, constants.CV_ENODEOOBPATH,
2482
                      ninfo.name, path_result)
2483

    
2484
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2485
    """Verifies and updates the node volume data.
2486

2487
    This function will update a L{NodeImage}'s internal structures
2488
    with data from the remote call.
2489

2490
    @type ninfo: L{objects.Node}
2491
    @param ninfo: the node to check
2492
    @param nresult: the remote results for the node
2493
    @param nimg: the node image object
2494
    @param vg_name: the configured VG name
2495

2496
    """
2497
    nimg.lvm_fail = True
2498
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2499
    if vg_name is None:
2500
      pass
2501
    elif isinstance(lvdata, basestring):
2502
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2503
                    "LVM problem on node: %s", utils.SafeEncode(lvdata))
2504
    elif not isinstance(lvdata, dict):
2505
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2506
                    "rpc call to node failed (lvlist)")
2507
    else:
2508
      nimg.volumes = lvdata
2509
      nimg.lvm_fail = False
2510

    
2511
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2512
    """Verifies and updates the node instance list.
2513

2514
    If the listing was successful, then updates this node's instance
2515
    list. Otherwise, it marks the RPC call as failed for the instance
2516
    list key.
2517

2518
    @type ninfo: L{objects.Node}
2519
    @param ninfo: the node to check
2520
    @param nresult: the remote results for the node
2521
    @param nimg: the node image object
2522

2523
    """
2524
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2525
    test = not isinstance(idata, list)
2526
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2527
                  "rpc call to node failed (instancelist): %s",
2528
                  utils.SafeEncode(str(idata)))
2529
    if test:
2530
      nimg.hyp_fail = True
2531
    else:
2532
      nimg.instances = [inst.uuid for (_, inst) in
2533
                        self.cfg.GetMultiInstanceInfoByName(idata)]
2534

    
2535
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2536
    """Verifies and computes a node information map
2537

2538
    @type ninfo: L{objects.Node}
2539
    @param ninfo: the node to check
2540
    @param nresult: the remote results for the node
2541
    @param nimg: the node image object
2542
    @param vg_name: the configured VG name
2543

2544
    """
2545
    # try to read free memory (from the hypervisor)
2546
    hv_info = nresult.get(constants.NV_HVINFO, None)
2547
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2548
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2549
                  "rpc call to node failed (hvinfo)")
2550
    if not test:
2551
      try:
2552
        nimg.mfree = int(hv_info["memory_free"])
2553
      except (ValueError, TypeError):
2554
        self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2555
                      "node returned invalid nodeinfo, check hypervisor")
2556

    
2557
    # FIXME: devise a free space model for file based instances as well
2558
    if vg_name is not None:
2559
      test = (constants.NV_VGLIST not in nresult or
2560
              vg_name not in nresult[constants.NV_VGLIST])
2561
      self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
2562
                    "node didn't return data for the volume group '%s'"
2563
                    " - it is either missing or broken", vg_name)
2564
      if not test:
2565
        try:
2566
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2567
        except (ValueError, TypeError):
2568
          self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2569
                        "node returned invalid LVM info, check LVM status")
2570

    
2571
  def _CollectDiskInfo(self, node_uuids, node_image, instanceinfo):
2572
    """Gets per-disk status information for all instances.
2573

2574
    @type node_uuids: list of strings
2575
    @param node_uuids: Node UUIDs
2576
    @type node_image: dict of (UUID, L{objects.Node})
2577
    @param node_image: Node objects
2578
    @type instanceinfo: dict of (UUID, L{objects.Instance})
2579
    @param instanceinfo: Instance objects
2580
    @rtype: {instance: {node: [(succes, payload)]}}
2581
    @return: a dictionary of per-instance dictionaries with nodes as
2582
        keys and disk information as values; the disk information is a
2583
        list of tuples (success, payload)
2584

2585
    """
2586
    node_disks = {}
2587
    node_disks_devonly = {}
2588
    diskless_instances = set()
2589
    diskless = constants.DT_DISKLESS
2590

    
2591
    for nuuid in node_uuids:
2592
      node_inst_uuids = list(itertools.chain(node_image[nuuid].pinst,
2593
                                             node_image[nuuid].sinst))
2594
      diskless_instances.update(uuid for uuid in node_inst_uuids
2595
                                if instanceinfo[uuid].disk_template == diskless)
2596
      disks = [(inst_uuid, disk)
2597
               for inst_uuid in node_inst_uuids
2598
               for disk in instanceinfo[inst_uuid].disks]
2599

    
2600
      if not disks:
2601
        # No need to collect data
2602
        continue
2603

    
2604
      node_disks[nuuid] = disks
2605

    
2606
      # _AnnotateDiskParams makes already copies of the disks
2607
      devonly = []
2608
      for (inst_uuid, dev) in disks:
2609
        (anno_disk,) = AnnotateDiskParams(instanceinfo[inst_uuid], [dev],
2610
                                          self.cfg)
2611
        self.cfg.SetDiskID(anno_disk, nuuid)
2612
        devonly.append(anno_disk)
2613

    
2614
      node_disks_devonly[nuuid] = devonly
2615

    
2616
    assert len(node_disks) == len(node_disks_devonly)
2617

    
2618
    # Collect data from all nodes with disks
2619
    result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2620
                                                          node_disks_devonly)
2621

    
2622
    assert len(result) == len(node_disks)
2623

    
2624
    instdisk = {}
2625

    
2626
    for (nuuid, nres) in result.items():
2627
      node = self.cfg.GetNodeInfo(nuuid)
2628
      disks = node_disks[node.uuid]
2629

    
2630
      if nres.offline:
2631
        # No data from this node
2632
        data = len(disks) * [(False, "node offline")]
2633
      else:
2634
        msg = nres.fail_msg
2635
        self._ErrorIf(msg, constants.CV_ENODERPC, node.name,
2636
                      "while getting disk information: %s", msg)
2637
        if msg:
2638
          # No data from this node
2639
          data = len(disks) * [(False, msg)]
2640
        else:
2641
          data = []
2642
          for idx, i in enumerate(nres.payload):
2643
            if isinstance(i, (tuple, list)) and len(i) == 2:
2644
              data.append(i)
2645
            else:
2646
              logging.warning("Invalid result from node %s, entry %d: %s",
2647
                              node.name, idx, i)
2648
              data.append((False, "Invalid result from the remote node"))
2649

    
2650
      for ((inst_uuid, _), status) in zip(disks, data):
2651
        instdisk.setdefault(inst_uuid, {}).setdefault(node.uuid, []) \
2652
          .append(status)
2653

    
2654
    # Add empty entries for diskless instances.
2655
    for inst_uuid in diskless_instances:
2656
      assert inst_uuid not in instdisk
2657
      instdisk[inst_uuid] = {}
2658

    
2659
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2660
                      len(nuuids) <= len(instanceinfo[inst].all_nodes) and
2661
                      compat.all(isinstance(s, (tuple, list)) and
2662
                                 len(s) == 2 for s in statuses)
2663
                      for inst, nuuids in instdisk.items()
2664
                      for nuuid, statuses in nuuids.items())
2665
    if __debug__:
2666
      instdisk_keys = set(instdisk)
2667
      instanceinfo_keys = set(instanceinfo)
2668
      assert instdisk_keys == instanceinfo_keys, \
2669
        ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2670
         (instdisk_keys, instanceinfo_keys))
2671

    
2672
    return instdisk
2673

    
2674
  @staticmethod
2675
  def _SshNodeSelector(group_uuid, all_nodes):
2676
    """Create endless iterators for all potential SSH check hosts.
2677

2678
    """
2679
    nodes = [node for node in all_nodes
2680
             if (node.group != group_uuid and
2681
                 not node.offline)]
2682
    keyfunc = operator.attrgetter("group")
2683

    
2684
    return map(itertools.cycle,
2685
               [sorted(map(operator.attrgetter("name"), names))
2686
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2687
                                                  keyfunc)])
2688

    
2689
  @classmethod
2690
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2691
    """Choose which nodes should talk to which other nodes.
2692

2693
    We will make nodes contact all nodes in their group, and one node from
2694
    every other group.
2695

2696
    @warning: This algorithm has a known issue if one node group is much
2697
      smaller than others (e.g. just one node). In such a case all other
2698
      nodes will talk to the single node.
2699

2700
    """
2701
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2702
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
2703

    
2704
    return (online_nodes,
2705
            dict((name, sorted([i.next() for i in sel]))
2706
                 for name in online_nodes))
2707

    
2708
  def BuildHooksEnv(self):
2709
    """Build hooks env.
2710

2711
    Cluster-Verify hooks just ran in the post phase and their failure makes
2712
    the output be logged in the verify output and the verification to fail.
2713

2714
    """
2715
    env = {
2716
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2717
      }
2718

    
2719
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2720
               for node in self.my_node_info.values())
2721

    
2722
    return env
2723

    
2724
  def BuildHooksNodes(self):
2725
    """Build hooks nodes.
2726

2727
    """
2728
    return ([], list(self.my_node_info.keys()))
2729

    
2730
  def Exec(self, feedback_fn):
2731
    """Verify integrity of the node group, performing various test on nodes.
2732

2733
    """
2734
    # This method has too many local variables. pylint: disable=R0914
2735
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
2736

    
2737
    if not self.my_node_uuids:
2738
      # empty node group
2739
      feedback_fn("* Empty node group, skipping verification")
2740
      return True
2741

    
2742
    self.bad = False
2743
    verbose = self.op.verbose
2744
    self._feedback_fn = feedback_fn
2745

    
2746
    vg_name = self.cfg.GetVGName()
2747
    drbd_helper = self.cfg.GetDRBDHelper()
2748
    cluster = self.cfg.GetClusterInfo()
2749
    hypervisors = cluster.enabled_hypervisors
2750
    node_data_list = self.my_node_info.values()
2751

    
2752
    i_non_redundant = [] # Non redundant instances
2753
    i_non_a_balanced = [] # Non auto-balanced instances
2754
    i_offline = 0 # Count of offline instances
2755
    n_offline = 0 # Count of offline nodes
2756
    n_drained = 0 # Count of nodes being drained
2757
    node_vol_should = {}
2758

    
2759
    # FIXME: verify OS list
2760

    
2761
    # File verification
2762
    filemap = ComputeAncillaryFiles(cluster, False)
2763

    
2764
    # do local checksums
2765
    master_node_uuid = self.master_node = self.cfg.GetMasterNode()
2766
    master_ip = self.cfg.GetMasterIP()
2767

    
2768
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_uuids))
2769

    
2770
    user_scripts = []
2771
    if self.cfg.GetUseExternalMipScript():
2772
      user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2773

    
2774
    node_verify_param = {
2775
      constants.NV_FILELIST:
2776
        map(vcluster.MakeVirtualPath,
2777
            utils.UniqueSequence(filename
2778
                                 for files in filemap
2779
                                 for filename in files)),
2780
      constants.NV_NODELIST:
2781
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2782
                                  self.all_node_info.values()),
2783
      constants.NV_HYPERVISOR: hypervisors,
2784
      constants.NV_HVPARAMS:
2785
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2786
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2787
                                 for node in node_data_list
2788
                                 if not node.offline],
2789
      constants.NV_INSTANCELIST: hypervisors,
2790
      constants.NV_VERSION: None,
2791
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2792
      constants.NV_NODESETUP: None,
2793
      constants.NV_TIME: None,
2794
      constants.NV_MASTERIP: (self.cfg.GetMasterNodeName(), master_ip),
2795
      constants.NV_OSLIST: None,
2796
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2797
      constants.NV_USERSCRIPTS: user_scripts,
2798
      }
2799

    
2800
    if vg_name is not None:
2801
      node_verify_param[constants.NV_VGLIST] = None
2802
      node_verify_param[constants.NV_LVLIST] = vg_name
2803
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2804

    
2805
    if drbd_helper:
2806
      node_verify_param[constants.NV_DRBDVERSION] = None
2807
      node_verify_param[constants.NV_DRBDLIST] = None
2808
      node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2809

    
2810
    if cluster.IsFileStorageEnabled() or \
2811
        cluster.IsSharedFileStorageEnabled():
2812
      # Load file storage paths only from master node
2813
      node_verify_param[constants.NV_ACCEPTED_STORAGE_PATHS] = \
2814
        self.cfg.GetMasterNodeName()
2815
      if cluster.IsFileStorageEnabled():
2816
        node_verify_param[constants.NV_FILE_STORAGE_PATH] = \
2817
          cluster.file_storage_dir
2818

    
2819
    # bridge checks
2820
    # FIXME: this needs to be changed per node-group, not cluster-wide
2821
    bridges = set()
2822
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2823
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2824
      bridges.add(default_nicpp[constants.NIC_LINK])
2825
    for inst_uuid in self.my_inst_info.values():
2826
      for nic in inst_uuid.nics:
2827
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
2828
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2829
          bridges.add(full_nic[constants.NIC_LINK])
2830

    
2831
    if bridges:
2832
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
2833

    
2834
    # Build our expected cluster state
2835
    node_image = dict((node.uuid, self.NodeImage(offline=node.offline,
2836
                                                 uuid=node.uuid,
2837
                                                 vm_capable=node.vm_capable))
2838
                      for node in node_data_list)
2839

    
2840
    # Gather OOB paths
2841
    oob_paths = []
2842
    for node in self.all_node_info.values():
2843
      path = SupportsOob(self.cfg, node)
2844
      if path and path not in oob_paths:
2845
        oob_paths.append(path)
2846

    
2847
    if oob_paths:
2848
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2849

    
2850
    for inst_uuid in self.my_inst_uuids:
2851
      instance = self.my_inst_info[inst_uuid]
2852
      if instance.admin_state == constants.ADMINST_OFFLINE:
2853
        i_offline += 1
2854

    
2855
      for nuuid in instance.all_nodes:
2856
        if nuuid not in node_image:
2857
          gnode = self.NodeImage(uuid=nuuid)
2858
          gnode.ghost = (nuuid not in self.all_node_info)
2859
          node_image[nuuid] = gnode
2860

    
2861
      instance.MapLVsByNode(node_vol_should)
2862

    
2863
      pnode = instance.primary_node
2864
      node_image[pnode].pinst.append(instance.uuid)
2865

    
2866
      for snode in instance.secondary_nodes:
2867
        nimg = node_image[snode]
2868
        nimg.sinst.append(instance.uuid)
2869
        if pnode not in nimg.sbp:
2870
          nimg.sbp[pnode] = []
2871
        nimg.sbp[pnode].append(instance.uuid)
2872

    
2873
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
2874
                                               self.my_node_info.keys())
2875
    # The value of exclusive_storage should be the same across the group, so if
2876
    # it's True for at least a node, we act as if it were set for all the nodes
2877
    self._exclusive_storage = compat.any(es_flags.values())
2878
    if self._exclusive_storage:
2879
      node_verify_param[constants.NV_EXCLUSIVEPVS] = True
2880

    
2881
    # At this point, we have the in-memory data structures complete,
2882
    # except for the runtime information, which we'll gather next
2883

    
2884
    # Due to the way our RPC system works, exact response times cannot be
2885
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2886
    # time before and after executing the request, we can at least have a time
2887
    # window.
2888
    nvinfo_starttime = time.time()
2889
    all_nvinfo = self.rpc.call_node_verify(self.my_node_uuids,
2890
                                           node_verify_param,
2891
                                           self.cfg.GetClusterName(),
2892
                                           self.cfg.GetClusterInfo().hvparams)
2893
    nvinfo_endtime = time.time()
2894

    
2895
    if self.extra_lv_nodes and vg_name is not None:
2896
      extra_lv_nvinfo = \
2897
          self.rpc.call_node_verify(self.extra_lv_nodes,
2898
                                    {constants.NV_LVLIST: vg_name},
2899
                                    self.cfg.GetClusterName(),
2900
                                    self.cfg.GetClusterInfo().hvparams)
2901
    else:
2902
      extra_lv_nvinfo = {}
2903

    
2904
    all_drbd_map = self.cfg.ComputeDRBDMap()
2905

    
2906
    feedback_fn("* Gathering disk information (%s nodes)" %
2907
                len(self.my_node_uuids))
2908
    instdisk = self._CollectDiskInfo(self.my_node_info.keys(), node_image,
2909
                                     self.my_inst_info)
2910

    
2911
    feedback_fn("* Verifying configuration file consistency")
2912

    
2913
    # If not all nodes are being checked, we need to make sure the master node
2914
    # and a non-checked vm_capable node are in the list.
2915
    absent_node_uuids = set(self.all_node_info).difference(self.my_node_info)
2916
    if absent_node_uuids:
2917
      vf_nvinfo = all_nvinfo.copy()
2918
      vf_node_info = list(self.my_node_info.values())
2919
      additional_node_uuids = []
2920
      if master_node_uuid not in self.my_node_info:
2921
        additional_node_uuids.append(master_node_uuid)
2922
        vf_node_info.append(self.all_node_info[master_node_uuid])
2923
      # Add the first vm_capable node we find which is not included,
2924
      # excluding the master node (which we already have)
2925
      for node_uuid in absent_node_uuids:
2926
        nodeinfo = self.all_node_info[node_uuid]
2927
        if (nodeinfo.vm_capable and not nodeinfo.offline and
2928
            node_uuid != master_node_uuid):
2929
          additional_node_uuids.append(node_uuid)
2930
          vf_node_info.append(self.all_node_info[node_uuid])
2931
          break
2932
      key = constants.NV_FILELIST
2933
      vf_nvinfo.update(self.rpc.call_node_verify(
2934
         additional_node_uuids, {key: node_verify_param[key]},
2935
         self.cfg.GetClusterName(), self.cfg.GetClusterInfo().hvparams))
2936
    else:
2937
      vf_nvinfo = all_nvinfo
2938
      vf_node_info = self.my_node_info.values()
2939

    
2940
    self._VerifyFiles(vf_node_info, master_node_uuid, vf_nvinfo, filemap)
2941

    
2942
    feedback_fn("* Verifying node status")
2943

    
2944
    refos_img = None
2945

    
2946
    for node_i in node_data_list:
2947
      nimg = node_image[node_i.uuid]
2948

    
2949
      if node_i.offline:
2950
        if verbose:
2951
          feedback_fn("* Skipping offline node %s" % (node_i.name,))
2952
        n_offline += 1
2953
        continue
2954

    
2955
      if node_i.uuid == master_node_uuid:
2956
        ntype = "master"
2957
      elif node_i.master_candidate:
2958
        ntype = "master candidate"
2959
      elif node_i.drained:
2960
        ntype = "drained"
2961
        n_drained += 1
2962
      else:
2963
        ntype = "regular"
2964
      if verbose:
2965
        feedback_fn("* Verifying node %s (%s)" % (node_i.name, ntype))
2966

    
2967
      msg = all_nvinfo[node_i.uuid].fail_msg
2968
      self._ErrorIf(msg, constants.CV_ENODERPC, node_i.name,
2969
                    "while contacting node: %s", msg)
2970
      if msg:
2971
        nimg.rpc_fail = True
2972
        continue
2973

    
2974
      nresult = all_nvinfo[node_i.uuid].payload
2975

    
2976
      nimg.call_ok = self._VerifyNode(node_i, nresult)
2977
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2978
      self._VerifyNodeNetwork(node_i, nresult)
2979
      self._VerifyNodeUserScripts(node_i, nresult)
2980
      self._VerifyOob(node_i, nresult)
2981
      self._VerifyAcceptedFileStoragePaths(node_i, nresult,
2982
                                           node_i.uuid == master_node_uuid)
2983
      self._VerifyFileStoragePaths(node_i, nresult)
2984
      self._VerifySharedFileStoragePaths(node_i, nresult)
2985

    
2986
      if nimg.vm_capable:
2987
        self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
2988
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
2989
                             all_drbd_map)
2990

    
2991
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2992
        self._UpdateNodeInstances(node_i, nresult, nimg)
2993
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2994
        self._UpdateNodeOS(node_i, nresult, nimg)
2995

    
2996
        if not nimg.os_fail:
2997
          if refos_img is None:
2998
            refos_img = nimg
2999
          self._VerifyNodeOS(node_i, nimg, refos_img)
3000
        self._VerifyNodeBridges(node_i, nresult, bridges)
3001

    
3002
        # Check whether all running instances are primary for the node. (This
3003
        # can no longer be done from _VerifyInstance below, since some of the
3004
        # wrong instances could be from other node groups.)
3005
        non_primary_inst_uuids = set(nimg.instances).difference(nimg.pinst)
3006

    
3007
        for inst_uuid in non_primary_inst_uuids:
3008
          test = inst_uuid in self.all_inst_info
3009
          self._ErrorIf(test, constants.CV_EINSTANCEWRONGNODE,
3010
                        self.cfg.GetInstanceName(inst_uuid),
3011
                        "instance should not run on node %s", node_i.name)
3012
          self._ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
3013
                        "node is running unknown instance %s", inst_uuid)
3014

    
3015
    self._VerifyGroupDRBDVersion(all_nvinfo)
3016
    self._VerifyGroupLVM(node_image, vg_name)
3017

    
3018
    for node_uuid, result in extra_lv_nvinfo.items():
3019
      self._UpdateNodeVolumes(self.all_node_info[node_uuid], result.payload,
3020
                              node_image[node_uuid], vg_name)
3021

    
3022
    feedback_fn("* Verifying instance status")
3023
    for inst_uuid in self.my_inst_uuids:
3024
      instance = self.my_inst_info[inst_uuid]
3025
      if verbose:
3026
        feedback_fn("* Verifying instance %s" % instance.name)
3027
      self._VerifyInstance(instance, node_image, instdisk[inst_uuid])
3028

    
3029
      # If the instance is non-redundant we cannot survive losing its primary
3030
      # node, so we are not N+1 compliant.
3031
      if instance.disk_template not in constants.DTS_MIRRORED:
3032
        i_non_redundant.append(instance)
3033

    
3034
      if not cluster.FillBE(instance)[constants.BE_AUTO_BALANCE]:
3035
        i_non_a_balanced.append(instance)
3036

    
3037
    feedback_fn("* Verifying orphan volumes")
3038
    reserved = utils.FieldSet(*cluster.reserved_lvs)
3039

    
3040
    # We will get spurious "unknown volume" warnings if any node of this group
3041
    # is secondary for an instance whose primary is in another group. To avoid
3042
    # them, we find these instances and add their volumes to node_vol_should.
3043
    for instance in self.all_inst_info.values():
3044
      for secondary in instance.secondary_nodes:
3045
        if (secondary in self.my_node_info
3046
            and instance.name not in self.my_inst_info):
3047
          instance.MapLVsByNode(node_vol_should)
3048
          break
3049

    
3050
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
3051

    
3052
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
3053
      feedback_fn("* Verifying N+1 Memory redundancy")
3054
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
3055

    
3056
    feedback_fn("* Other Notes")
3057
    if i_non_redundant:
3058
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
3059
                  % len(i_non_redundant))
3060

    
3061
    if i_non_a_balanced:
3062
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
3063
                  % len(i_non_a_balanced))
3064

    
3065
    if i_offline:
3066
      feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
3067

    
3068
    if n_offline:
3069
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
3070

    
3071
    if n_drained:
3072
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
3073

    
3074
    return not self.bad
3075

    
3076
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
3077
    """Analyze the post-hooks' result
3078

3079
    This method analyses the hook result, handles it, and sends some
3080
    nicely-formatted feedback back to the user.
3081

3082
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
3083
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
3084
    @param hooks_results: the results of the multi-node hooks rpc call
3085
    @param feedback_fn: function used send feedback back to the caller
3086
    @param lu_result: previous Exec result
3087
    @return: the new Exec result, based on the previous result
3088
        and hook results
3089

3090
    """
3091
    # We only really run POST phase hooks, only for non-empty groups,
3092
    # and are only interested in their results
3093
    if not self.my_node_uuids:
3094
      # empty node group
3095
      pass
3096
    elif phase == constants.HOOKS_PHASE_POST:
3097
      # Used to change hooks' output to proper indentation
3098
      feedback_fn("* Hooks Results")
3099
      assert hooks_results, "invalid result from hooks"
3100

    
3101
      for node_name in hooks_results:
3102
        res = hooks_results[node_name]
3103
        msg = res.fail_msg
3104
        test = msg and not res.offline
3105
        self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3106
                      "Communication failure in hooks execution: %s", msg)
3107
        if res.offline or msg:
3108
          # No need to investigate payload if node is offline or gave
3109
          # an error.
3110
          continue
3111
        for script, hkr, output in res.payload:
3112
          test = hkr == constants.HKR_FAIL
3113
          self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3114
                        "Script %s failed, output:", script)
3115
          if test:
3116
            output = self._HOOKS_INDENT_RE.sub("      ", output)
3117
            feedback_fn("%s" % output)
3118
            lu_result = False
3119

    
3120
    return lu_result
3121

    
3122

    
3123
class LUClusterVerifyDisks(NoHooksLU):
3124
  """Verifies the cluster disks status.
3125

3126
  """
3127
  REQ_BGL = False
3128

    
3129
  def ExpandNames(self):
3130
    self.share_locks = ShareAll()
3131
    self.needed_locks = {
3132
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
3133
      }
3134

    
3135
  def Exec(self, feedback_fn):
3136
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3137

    
3138
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3139
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3140
                           for group in group_names])