Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / cluster.py @ fb62843c

History | View | Annotate | Download (117.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with the cluster."""
23

    
24
import OpenSSL
25

    
26
import copy
27
import itertools
28
import logging
29
import operator
30
import os
31
import re
32
import time
33

    
34
from ganeti import compat
35
from ganeti import constants
36
from ganeti import errors
37
from ganeti import hypervisor
38
from ganeti import locking
39
from ganeti import masterd
40
from ganeti import netutils
41
from ganeti import objects
42
from ganeti import opcodes
43
from ganeti import pathutils
44
from ganeti import query
45
from ganeti import rpc
46
from ganeti import runtime
47
from ganeti import ssh
48
from ganeti import uidpool
49
from ganeti import utils
50
from ganeti import vcluster
51

    
52
from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
53
  ResultWithJobs
54
from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55
  ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56
  GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57
  GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58
  CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59
  ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob, \
60
  CheckIpolicyVsDiskTemplates
61

    
62
import ganeti.masterd.instance
63

    
64

    
65
class LUClusterActivateMasterIp(NoHooksLU):
66
  """Activate the master IP on the master node.
67

68
  """
69
  def Exec(self, feedback_fn):
70
    """Activate the master IP.
71

72
    """
73
    master_params = self.cfg.GetMasterNetworkParameters()
74
    ems = self.cfg.GetUseExternalMipScript()
75
    result = self.rpc.call_node_activate_master_ip(master_params.uuid,
76
                                                   master_params, ems)
77
    result.Raise("Could not activate the master IP")
78

    
79

    
80
class LUClusterDeactivateMasterIp(NoHooksLU):
81
  """Deactivate the master IP on the master node.
82

83
  """
84
  def Exec(self, feedback_fn):
85
    """Deactivate the master IP.
86

87
    """
88
    master_params = self.cfg.GetMasterNetworkParameters()
89
    ems = self.cfg.GetUseExternalMipScript()
90
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
91
                                                     master_params, ems)
92
    result.Raise("Could not deactivate the master IP")
93

    
94

    
95
class LUClusterConfigQuery(NoHooksLU):
96
  """Return configuration values.
97

98
  """
99
  REQ_BGL = False
100

    
101
  def CheckArguments(self):
102
    self.cq = ClusterQuery(None, self.op.output_fields, False)
103

    
104
  def ExpandNames(self):
105
    self.cq.ExpandNames(self)
106

    
107
  def DeclareLocks(self, level):
108
    self.cq.DeclareLocks(self, level)
109

    
110
  def Exec(self, feedback_fn):
111
    result = self.cq.OldStyleQuery(self)
112

    
113
    assert len(result) == 1
114

    
115
    return result[0]
116

    
117

    
118
class LUClusterDestroy(LogicalUnit):
119
  """Logical unit for destroying the cluster.
120

121
  """
122
  HPATH = "cluster-destroy"
123
  HTYPE = constants.HTYPE_CLUSTER
124

    
125
  def BuildHooksEnv(self):
126
    """Build hooks env.
127

128
    """
129
    return {
130
      "OP_TARGET": self.cfg.GetClusterName(),
131
      }
132

    
133
  def BuildHooksNodes(self):
134
    """Build hooks nodes.
135

136
    """
137
    return ([], [])
138

    
139
  def CheckPrereq(self):
140
    """Check prerequisites.
141

142
    This checks whether the cluster is empty.
143

144
    Any errors are signaled by raising errors.OpPrereqError.
145

146
    """
147
    master = self.cfg.GetMasterNode()
148

    
149
    nodelist = self.cfg.GetNodeList()
150
    if len(nodelist) != 1 or nodelist[0] != master:
151
      raise errors.OpPrereqError("There are still %d node(s) in"
152
                                 " this cluster." % (len(nodelist) - 1),
153
                                 errors.ECODE_INVAL)
154
    instancelist = self.cfg.GetInstanceList()
155
    if instancelist:
156
      raise errors.OpPrereqError("There are still %d instance(s) in"
157
                                 " this cluster." % len(instancelist),
158
                                 errors.ECODE_INVAL)
159

    
160
  def Exec(self, feedback_fn):
161
    """Destroys the cluster.
162

163
    """
164
    master_params = self.cfg.GetMasterNetworkParameters()
165

    
166
    # Run post hooks on master node before it's removed
167
    RunPostHook(self, self.cfg.GetNodeName(master_params.uuid))
168

    
169
    ems = self.cfg.GetUseExternalMipScript()
170
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
171
                                                     master_params, ems)
172
    result.Warn("Error disabling the master IP address", self.LogWarning)
173
    return master_params.uuid
174

    
175

    
176
class LUClusterPostInit(LogicalUnit):
177
  """Logical unit for running hooks after cluster initialization.
178

179
  """
180
  HPATH = "cluster-init"
181
  HTYPE = constants.HTYPE_CLUSTER
182

    
183
  def BuildHooksEnv(self):
184
    """Build hooks env.
185

186
    """
187
    return {
188
      "OP_TARGET": self.cfg.GetClusterName(),
189
      }
190

    
191
  def BuildHooksNodes(self):
192
    """Build hooks nodes.
193

194
    """
195
    return ([], [self.cfg.GetMasterNode()])
196

    
197
  def Exec(self, feedback_fn):
198
    """Nothing to do.
199

200
    """
201
    return True
202

    
203

    
204
class ClusterQuery(QueryBase):
205
  FIELDS = query.CLUSTER_FIELDS
206

    
207
  #: Do not sort (there is only one item)
208
  SORT_FIELD = None
209

    
210
  def ExpandNames(self, lu):
211
    lu.needed_locks = {}
212

    
213
    # The following variables interact with _QueryBase._GetNames
214
    self.wanted = locking.ALL_SET
215
    self.do_locking = self.use_locking
216

    
217
    if self.do_locking:
218
      raise errors.OpPrereqError("Can not use locking for cluster queries",
219
                                 errors.ECODE_INVAL)
220

    
221
  def DeclareLocks(self, lu, level):
222
    pass
223

    
224
  def _GetQueryData(self, lu):
225
    """Computes the list of nodes and their attributes.
226

227
    """
228
    # Locking is not used
229
    assert not (compat.any(lu.glm.is_owned(level)
230
                           for level in locking.LEVELS
231
                           if level != locking.LEVEL_CLUSTER) or
232
                self.do_locking or self.use_locking)
233

    
234
    if query.CQ_CONFIG in self.requested_data:
235
      cluster = lu.cfg.GetClusterInfo()
236
      nodes = lu.cfg.GetAllNodesInfo()
237
    else:
238
      cluster = NotImplemented
239
      nodes = NotImplemented
240

    
241
    if query.CQ_QUEUE_DRAINED in self.requested_data:
242
      drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
243
    else:
244
      drain_flag = NotImplemented
245

    
246
    if query.CQ_WATCHER_PAUSE in self.requested_data:
247
      master_node_uuid = lu.cfg.GetMasterNode()
248

    
249
      result = lu.rpc.call_get_watcher_pause(master_node_uuid)
250
      result.Raise("Can't retrieve watcher pause from master node '%s'" %
251
                   lu.cfg.GetMasterNodeName())
252

    
253
      watcher_pause = result.payload
254
    else:
255
      watcher_pause = NotImplemented
256

    
257
    return query.ClusterQueryData(cluster, nodes, drain_flag, watcher_pause)
258

    
259

    
260
class LUClusterQuery(NoHooksLU):
261
  """Query cluster configuration.
262

263
  """
264
  REQ_BGL = False
265

    
266
  def ExpandNames(self):
267
    self.needed_locks = {}
268

    
269
  def Exec(self, feedback_fn):
270
    """Return cluster config.
271

272
    """
273
    cluster = self.cfg.GetClusterInfo()
274
    os_hvp = {}
275

    
276
    # Filter just for enabled hypervisors
277
    for os_name, hv_dict in cluster.os_hvp.items():
278
      os_hvp[os_name] = {}
279
      for hv_name, hv_params in hv_dict.items():
280
        if hv_name in cluster.enabled_hypervisors:
281
          os_hvp[os_name][hv_name] = hv_params
282

    
283
    # Convert ip_family to ip_version
284
    primary_ip_version = constants.IP4_VERSION
285
    if cluster.primary_ip_family == netutils.IP6Address.family:
286
      primary_ip_version = constants.IP6_VERSION
287

    
288
    result = {
289
      "software_version": constants.RELEASE_VERSION,
290
      "protocol_version": constants.PROTOCOL_VERSION,
291
      "config_version": constants.CONFIG_VERSION,
292
      "os_api_version": max(constants.OS_API_VERSIONS),
293
      "export_version": constants.EXPORT_VERSION,
294
      "vcs_version": constants.VCS_VERSION,
295
      "architecture": runtime.GetArchInfo(),
296
      "name": cluster.cluster_name,
297
      "master": self.cfg.GetMasterNodeName(),
298
      "default_hypervisor": cluster.primary_hypervisor,
299
      "enabled_hypervisors": cluster.enabled_hypervisors,
300
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
301
                        for hypervisor_name in cluster.enabled_hypervisors]),
302
      "os_hvp": os_hvp,
303
      "beparams": cluster.beparams,
304
      "osparams": cluster.osparams,
305
      "ipolicy": cluster.ipolicy,
306
      "nicparams": cluster.nicparams,
307
      "ndparams": cluster.ndparams,
308
      "diskparams": cluster.diskparams,
309
      "candidate_pool_size": cluster.candidate_pool_size,
310
      "master_netdev": cluster.master_netdev,
311
      "master_netmask": cluster.master_netmask,
312
      "use_external_mip_script": cluster.use_external_mip_script,
313
      "volume_group_name": cluster.volume_group_name,
314
      "drbd_usermode_helper": cluster.drbd_usermode_helper,
315
      "file_storage_dir": cluster.file_storage_dir,
316
      "shared_file_storage_dir": cluster.shared_file_storage_dir,
317
      "maintain_node_health": cluster.maintain_node_health,
318
      "ctime": cluster.ctime,
319
      "mtime": cluster.mtime,
320
      "uuid": cluster.uuid,
321
      "tags": list(cluster.GetTags()),
322
      "uid_pool": cluster.uid_pool,
323
      "default_iallocator": cluster.default_iallocator,
324
      "reserved_lvs": cluster.reserved_lvs,
325
      "primary_ip_version": primary_ip_version,
326
      "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
327
      "hidden_os": cluster.hidden_os,
328
      "blacklisted_os": cluster.blacklisted_os,
329
      "enabled_disk_templates": cluster.enabled_disk_templates,
330
      }
331

    
332
    return result
333

    
334

    
335
class LUClusterRedistConf(NoHooksLU):
336
  """Force the redistribution of cluster configuration.
337

338
  This is a very simple LU.
339

340
  """
341
  REQ_BGL = False
342

    
343
  def ExpandNames(self):
344
    self.needed_locks = {
345
      locking.LEVEL_NODE: locking.ALL_SET,
346
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
347
    }
348
    self.share_locks = ShareAll()
349

    
350
  def Exec(self, feedback_fn):
351
    """Redistribute the configuration.
352

353
    """
354
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
355
    RedistributeAncillaryFiles(self)
356

    
357

    
358
class LUClusterRename(LogicalUnit):
359
  """Rename the cluster.
360

361
  """
362
  HPATH = "cluster-rename"
363
  HTYPE = constants.HTYPE_CLUSTER
364

    
365
  def BuildHooksEnv(self):
366
    """Build hooks env.
367

368
    """
369
    return {
370
      "OP_TARGET": self.cfg.GetClusterName(),
371
      "NEW_NAME": self.op.name,
372
      }
373

    
374
  def BuildHooksNodes(self):
375
    """Build hooks nodes.
376

377
    """
378
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
379

    
380
  def CheckPrereq(self):
381
    """Verify that the passed name is a valid one.
382

383
    """
384
    hostname = netutils.GetHostname(name=self.op.name,
385
                                    family=self.cfg.GetPrimaryIPFamily())
386

    
387
    new_name = hostname.name
388
    self.ip = new_ip = hostname.ip
389
    old_name = self.cfg.GetClusterName()
390
    old_ip = self.cfg.GetMasterIP()
391
    if new_name == old_name and new_ip == old_ip:
392
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
393
                                 " cluster has changed",
394
                                 errors.ECODE_INVAL)
395
    if new_ip != old_ip:
396
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
397
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
398
                                   " reachable on the network" %
399
                                   new_ip, errors.ECODE_NOTUNIQUE)
400

    
401
    self.op.name = new_name
402

    
403
  def Exec(self, feedback_fn):
404
    """Rename the cluster.
405

406
    """
407
    clustername = self.op.name
408
    new_ip = self.ip
409

    
410
    # shutdown the master IP
411
    master_params = self.cfg.GetMasterNetworkParameters()
412
    ems = self.cfg.GetUseExternalMipScript()
413
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
414
                                                     master_params, ems)
415
    result.Raise("Could not disable the master role")
416

    
417
    try:
418
      cluster = self.cfg.GetClusterInfo()
419
      cluster.cluster_name = clustername
420
      cluster.master_ip = new_ip
421
      self.cfg.Update(cluster, feedback_fn)
422

    
423
      # update the known hosts file
424
      ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
425
      node_list = self.cfg.GetOnlineNodeList()
426
      try:
427
        node_list.remove(master_params.uuid)
428
      except ValueError:
429
        pass
430
      UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
431
    finally:
432
      master_params.ip = new_ip
433
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
434
                                                     master_params, ems)
435
      result.Warn("Could not re-enable the master role on the master,"
436
                  " please restart manually", self.LogWarning)
437

    
438
    return clustername
439

    
440

    
441
class LUClusterRepairDiskSizes(NoHooksLU):
442
  """Verifies the cluster disks sizes.
443

444
  """
445
  REQ_BGL = False
446

    
447
  def ExpandNames(self):
448
    if self.op.instances:
449
      (_, self.wanted_names) = GetWantedInstances(self, self.op.instances)
450
      # Not getting the node allocation lock as only a specific set of
451
      # instances (and their nodes) is going to be acquired
452
      self.needed_locks = {
453
        locking.LEVEL_NODE_RES: [],
454
        locking.LEVEL_INSTANCE: self.wanted_names,
455
        }
456
      self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
457
    else:
458
      self.wanted_names = None
459
      self.needed_locks = {
460
        locking.LEVEL_NODE_RES: locking.ALL_SET,
461
        locking.LEVEL_INSTANCE: locking.ALL_SET,
462

    
463
        # This opcode is acquires the node locks for all instances
464
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
465
        }
466

    
467
    self.share_locks = {
468
      locking.LEVEL_NODE_RES: 1,
469
      locking.LEVEL_INSTANCE: 0,
470
      locking.LEVEL_NODE_ALLOC: 1,
471
      }
472

    
473
  def DeclareLocks(self, level):
474
    if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
475
      self._LockInstancesNodes(primary_only=True, level=level)
476

    
477
  def CheckPrereq(self):
478
    """Check prerequisites.
479

480
    This only checks the optional instance list against the existing names.
481

482
    """
483
    if self.wanted_names is None:
484
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
485

    
486
    self.wanted_instances = \
487
        map(compat.snd, self.cfg.GetMultiInstanceInfoByName(self.wanted_names))
488

    
489
  def _EnsureChildSizes(self, disk):
490
    """Ensure children of the disk have the needed disk size.
491

492
    This is valid mainly for DRBD8 and fixes an issue where the
493
    children have smaller disk size.
494

495
    @param disk: an L{ganeti.objects.Disk} object
496

497
    """
498
    if disk.dev_type == constants.DT_DRBD8:
499
      assert disk.children, "Empty children for DRBD8?"
500
      fchild = disk.children[0]
501
      mismatch = fchild.size < disk.size
502
      if mismatch:
503
        self.LogInfo("Child disk has size %d, parent %d, fixing",
504
                     fchild.size, disk.size)
505
        fchild.size = disk.size
506

    
507
      # and we recurse on this child only, not on the metadev
508
      return self._EnsureChildSizes(fchild) or mismatch
509
    else:
510
      return False
511

    
512
  def Exec(self, feedback_fn):
513
    """Verify the size of cluster disks.
514

515
    """
516
    # TODO: check child disks too
517
    # TODO: check differences in size between primary/secondary nodes
518
    per_node_disks = {}
519
    for instance in self.wanted_instances:
520
      pnode = instance.primary_node
521
      if pnode not in per_node_disks:
522
        per_node_disks[pnode] = []
523
      for idx, disk in enumerate(instance.disks):
524
        per_node_disks[pnode].append((instance, idx, disk))
525

    
526
    assert not (frozenset(per_node_disks.keys()) -
527
                self.owned_locks(locking.LEVEL_NODE_RES)), \
528
      "Not owning correct locks"
529
    assert not self.owned_locks(locking.LEVEL_NODE)
530

    
531
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
532
                                               per_node_disks.keys())
533

    
534
    changed = []
535
    for node_uuid, dskl in per_node_disks.items():
536
      newl = [v[2].Copy() for v in dskl]
537
      for dsk in newl:
538
        self.cfg.SetDiskID(dsk, node_uuid)
539
      node_name = self.cfg.GetNodeName(node_uuid)
540
      result = self.rpc.call_blockdev_getdimensions(node_uuid, newl)
541
      if result.fail_msg:
542
        self.LogWarning("Failure in blockdev_getdimensions call to node"
543
                        " %s, ignoring", node_name)
544
        continue
545
      if len(result.payload) != len(dskl):
546
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
547
                        " result.payload=%s", node_name, len(dskl),
548
                        result.payload)
549
        self.LogWarning("Invalid result from node %s, ignoring node results",
550
                        node_name)
551
        continue
552
      for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
553
        if dimensions is None:
554
          self.LogWarning("Disk %d of instance %s did not return size"
555
                          " information, ignoring", idx, instance.name)
556
          continue
557
        if not isinstance(dimensions, (tuple, list)):
558
          self.LogWarning("Disk %d of instance %s did not return valid"
559
                          " dimension information, ignoring", idx,
560
                          instance.name)
561
          continue
562
        (size, spindles) = dimensions
563
        if not isinstance(size, (int, long)):
564
          self.LogWarning("Disk %d of instance %s did not return valid"
565
                          " size information, ignoring", idx, instance.name)
566
          continue
567
        size = size >> 20
568
        if size != disk.size:
569
          self.LogInfo("Disk %d of instance %s has mismatched size,"
570
                       " correcting: recorded %d, actual %d", idx,
571
                       instance.name, disk.size, size)
572
          disk.size = size
573
          self.cfg.Update(instance, feedback_fn)
574
          changed.append((instance.name, idx, "size", size))
575
        if es_flags[node_uuid]:
576
          if spindles is None:
577
            self.LogWarning("Disk %d of instance %s did not return valid"
578
                            " spindles information, ignoring", idx,
579
                            instance.name)
580
          elif disk.spindles is None or disk.spindles != spindles:
581
            self.LogInfo("Disk %d of instance %s has mismatched spindles,"
582
                         " correcting: recorded %s, actual %s",
583
                         idx, instance.name, disk.spindles, spindles)
584
            disk.spindles = spindles
585
            self.cfg.Update(instance, feedback_fn)
586
            changed.append((instance.name, idx, "spindles", disk.spindles))
587
        if self._EnsureChildSizes(disk):
588
          self.cfg.Update(instance, feedback_fn)
589
          changed.append((instance.name, idx, "size", disk.size))
590
    return changed
591

    
592

    
593
def _ValidateNetmask(cfg, netmask):
594
  """Checks if a netmask is valid.
595

596
  @type cfg: L{config.ConfigWriter}
597
  @param cfg: The cluster configuration
598
  @type netmask: int
599
  @param netmask: the netmask to be verified
600
  @raise errors.OpPrereqError: if the validation fails
601

602
  """
603
  ip_family = cfg.GetPrimaryIPFamily()
604
  try:
605
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
606
  except errors.ProgrammerError:
607
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
608
                               ip_family, errors.ECODE_INVAL)
609
  if not ipcls.ValidateNetmask(netmask):
610
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
611
                               (netmask), errors.ECODE_INVAL)
612

    
613

    
614
def CheckFileBasedStoragePathVsEnabledDiskTemplates(
615
    logging_warn_fn, file_storage_dir, enabled_disk_templates,
616
    file_disk_template):
617
  """Checks whether the given file-based storage directory is acceptable.
618

619
  Note: This function is public, because it is also used in bootstrap.py.
620

621
  @type logging_warn_fn: function
622
  @param logging_warn_fn: function which accepts a string and logs it
623
  @type file_storage_dir: string
624
  @param file_storage_dir: the directory to be used for file-based instances
625
  @type enabled_disk_templates: list of string
626
  @param enabled_disk_templates: the list of enabled disk templates
627
  @type file_disk_template: string
628
  @param file_disk_template: the file-based disk template for which the
629
      path should be checked
630

631
  """
632
  assert (file_disk_template in
633
          utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
634
  file_storage_enabled = file_disk_template in enabled_disk_templates
635
  if file_storage_dir is not None:
636
    if file_storage_dir == "":
637
      if file_storage_enabled:
638
        raise errors.OpPrereqError(
639
            "Unsetting the '%s' storage directory while having '%s' storage"
640
            " enabled is not permitted." %
641
            (file_disk_template, file_disk_template))
642
    else:
643
      if not file_storage_enabled:
644
        logging_warn_fn(
645
            "Specified a %s storage directory, although %s storage is not"
646
            " enabled." % (file_disk_template, file_disk_template))
647
  else:
648
    raise errors.ProgrammerError("Received %s storage dir with value"
649
                                 " 'None'." % file_disk_template)
650

    
651

    
652
def CheckFileStoragePathVsEnabledDiskTemplates(
653
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
654
  """Checks whether the given file storage directory is acceptable.
655

656
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
657

658
  """
659
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
660
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
661
      constants.DT_FILE)
662

    
663

    
664
def CheckSharedFileStoragePathVsEnabledDiskTemplates(
665
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
666
  """Checks whether the given shared file storage directory is acceptable.
667

668
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
669

670
  """
671
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
672
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
673
      constants.DT_SHARED_FILE)
674

    
675

    
676
class LUClusterSetParams(LogicalUnit):
677
  """Change the parameters of the cluster.
678

679
  """
680
  HPATH = "cluster-modify"
681
  HTYPE = constants.HTYPE_CLUSTER
682
  REQ_BGL = False
683

    
684
  def CheckArguments(self):
685
    """Check parameters
686

687
    """
688
    if self.op.uid_pool:
689
      uidpool.CheckUidPool(self.op.uid_pool)
690

    
691
    if self.op.add_uids:
692
      uidpool.CheckUidPool(self.op.add_uids)
693

    
694
    if self.op.remove_uids:
695
      uidpool.CheckUidPool(self.op.remove_uids)
696

    
697
    if self.op.master_netmask is not None:
698
      _ValidateNetmask(self.cfg, self.op.master_netmask)
699

    
700
    if self.op.diskparams:
701
      for dt_params in self.op.diskparams.values():
702
        utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
703
      try:
704
        utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
705
      except errors.OpPrereqError, err:
706
        raise errors.OpPrereqError("While verify diskparams options: %s" % err,
707
                                   errors.ECODE_INVAL)
708

    
709
  def ExpandNames(self):
710
    # FIXME: in the future maybe other cluster params won't require checking on
711
    # all nodes to be modified.
712
    # FIXME: This opcode changes cluster-wide settings. Is acquiring all
713
    # resource locks the right thing, shouldn't it be the BGL instead?
714
    self.needed_locks = {
715
      locking.LEVEL_NODE: locking.ALL_SET,
716
      locking.LEVEL_INSTANCE: locking.ALL_SET,
717
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
718
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
719
    }
720
    self.share_locks = ShareAll()
721

    
722
  def BuildHooksEnv(self):
723
    """Build hooks env.
724

725
    """
726
    return {
727
      "OP_TARGET": self.cfg.GetClusterName(),
728
      "NEW_VG_NAME": self.op.vg_name,
729
      }
730

    
731
  def BuildHooksNodes(self):
732
    """Build hooks nodes.
733

734
    """
735
    mn = self.cfg.GetMasterNode()
736
    return ([mn], [mn])
737

    
738
  def _CheckVgName(self, node_uuids, enabled_disk_templates,
739
                   new_enabled_disk_templates):
740
    """Check the consistency of the vg name on all nodes and in case it gets
741
       unset whether there are instances still using it.
742

743
    """
744
    lvm_is_enabled = utils.IsLvmEnabled(enabled_disk_templates)
745
    lvm_gets_enabled = utils.LvmGetsEnabled(enabled_disk_templates,
746
                                            new_enabled_disk_templates)
747
    current_vg_name = self.cfg.GetVGName()
748

    
749
    if self.op.vg_name == '':
750
      if lvm_is_enabled:
751
        raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
752
                                   " disk templates are or get enabled.")
753

    
754
    if self.op.vg_name is None:
755
      if current_vg_name is None and lvm_is_enabled:
756
        raise errors.OpPrereqError("Please specify a volume group when"
757
                                   " enabling lvm-based disk-templates.")
758

    
759
    if self.op.vg_name is not None and not self.op.vg_name:
760
      if self.cfg.HasAnyDiskOfType(constants.DT_PLAIN):
761
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
762
                                   " instances exist", errors.ECODE_INVAL)
763

    
764
    if (self.op.vg_name is not None and lvm_is_enabled) or \
765
        (self.cfg.GetVGName() is not None and lvm_gets_enabled):
766
      self._CheckVgNameOnNodes(node_uuids)
767

    
768
  def _CheckVgNameOnNodes(self, node_uuids):
769
    """Check the status of the volume group on each node.
770

771
    """
772
    vglist = self.rpc.call_vg_list(node_uuids)
773
    for node_uuid in node_uuids:
774
      msg = vglist[node_uuid].fail_msg
775
      if msg:
776
        # ignoring down node
777
        self.LogWarning("Error while gathering data on node %s"
778
                        " (ignoring node): %s",
779
                        self.cfg.GetNodeName(node_uuid), msg)
780
        continue
781
      vgstatus = utils.CheckVolumeGroupSize(vglist[node_uuid].payload,
782
                                            self.op.vg_name,
783
                                            constants.MIN_VG_SIZE)
784
      if vgstatus:
785
        raise errors.OpPrereqError("Error on node '%s': %s" %
786
                                   (self.cfg.GetNodeName(node_uuid), vgstatus),
787
                                   errors.ECODE_ENVIRON)
788

    
789
  @staticmethod
790
  def _GetEnabledDiskTemplatesInner(op_enabled_disk_templates,
791
                                    old_enabled_disk_templates):
792
    """Determines the enabled disk templates and the subset of disk templates
793
       that are newly enabled by this operation.
794

795
    """
796
    enabled_disk_templates = None
797
    new_enabled_disk_templates = []
798
    if op_enabled_disk_templates:
799
      enabled_disk_templates = op_enabled_disk_templates
800
      new_enabled_disk_templates = \
801
        list(set(enabled_disk_templates)
802
             - set(old_enabled_disk_templates))
803
    else:
804
      enabled_disk_templates = old_enabled_disk_templates
805
    return (enabled_disk_templates, new_enabled_disk_templates)
806

    
807
  def _GetEnabledDiskTemplates(self, cluster):
808
    """Determines the enabled disk templates and the subset of disk templates
809
       that are newly enabled by this operation.
810

811
    """
812
    return self._GetEnabledDiskTemplatesInner(self.op.enabled_disk_templates,
813
                                              cluster.enabled_disk_templates)
814

    
815
  def _CheckIpolicy(self, cluster, enabled_disk_templates):
816
    """Checks the ipolicy.
817

818
    @type cluster: C{objects.Cluster}
819
    @param cluster: the cluster's configuration
820
    @type enabled_disk_templates: list of string
821
    @param enabled_disk_templates: list of (possibly newly) enabled disk
822
      templates
823

824
    """
825
    # FIXME: write unit tests for this
826
    if self.op.ipolicy:
827
      self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
828
                                           group_policy=False)
829

    
830
      CheckIpolicyVsDiskTemplates(self.new_ipolicy,
831
                                  enabled_disk_templates)
832

    
833
      all_instances = self.cfg.GetAllInstancesInfo().values()
834
      violations = set()
835
      for group in self.cfg.GetAllNodeGroupsInfo().values():
836
        instances = frozenset([inst for inst in all_instances
837
                               if compat.any(nuuid in group.members
838
                                             for nuuid in inst.all_nodes)])
839
        new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
840
        ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
841
        new = ComputeNewInstanceViolations(ipol, new_ipolicy, instances,
842
                                           self.cfg)
843
        if new:
844
          violations.update(new)
845

    
846
      if violations:
847
        self.LogWarning("After the ipolicy change the following instances"
848
                        " violate them: %s",
849
                        utils.CommaJoin(utils.NiceSort(violations)))
850
    else:
851
      CheckIpolicyVsDiskTemplates(cluster.ipolicy,
852
                                  enabled_disk_templates)
853

    
854
  def CheckPrereq(self):
855
    """Check prerequisites.
856

857
    This checks whether the given params don't conflict and
858
    if the given volume group is valid.
859

860
    """
861
    if self.op.drbd_helper is not None and not self.op.drbd_helper:
862
      if self.cfg.HasAnyDiskOfType(constants.DT_DRBD8):
863
        raise errors.OpPrereqError("Cannot disable drbd helper while"
864
                                   " drbd-based instances exist",
865
                                   errors.ECODE_INVAL)
866

    
867
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
868
    self.cluster = cluster = self.cfg.GetClusterInfo()
869

    
870
    vm_capable_node_uuids = [node.uuid
871
                             for node in self.cfg.GetAllNodesInfo().values()
872
                             if node.uuid in node_uuids and node.vm_capable]
873

    
874
    (enabled_disk_templates, new_enabled_disk_templates) = \
875
      self._GetEnabledDiskTemplates(cluster)
876

    
877
    self._CheckVgName(vm_capable_node_uuids, enabled_disk_templates,
878
                      new_enabled_disk_templates)
879

    
880
    if self.op.file_storage_dir is not None:
881
      CheckFileStoragePathVsEnabledDiskTemplates(
882
          self.LogWarning, self.op.file_storage_dir, enabled_disk_templates)
883

    
884
    if self.op.shared_file_storage_dir is not None:
885
      CheckSharedFileStoragePathVsEnabledDiskTemplates(
886
          self.LogWarning, self.op.shared_file_storage_dir,
887
          enabled_disk_templates)
888

    
889
    if self.op.drbd_helper:
890
      # checks given drbd helper on all nodes
891
      helpers = self.rpc.call_drbd_helper(node_uuids)
892
      for (_, ninfo) in self.cfg.GetMultiNodeInfo(node_uuids):
893
        if ninfo.offline:
894
          self.LogInfo("Not checking drbd helper on offline node %s",
895
                       ninfo.name)
896
          continue
897
        msg = helpers[ninfo.uuid].fail_msg
898
        if msg:
899
          raise errors.OpPrereqError("Error checking drbd helper on node"
900
                                     " '%s': %s" % (ninfo.name, msg),
901
                                     errors.ECODE_ENVIRON)
902
        node_helper = helpers[ninfo.uuid].payload
903
        if node_helper != self.op.drbd_helper:
904
          raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
905
                                     (ninfo.name, node_helper),
906
                                     errors.ECODE_ENVIRON)
907

    
908
    # validate params changes
909
    if self.op.beparams:
910
      objects.UpgradeBeParams(self.op.beparams)
911
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
912
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
913

    
914
    if self.op.ndparams:
915
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
916
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
917

    
918
      # TODO: we need a more general way to handle resetting
919
      # cluster-level parameters to default values
920
      if self.new_ndparams["oob_program"] == "":
921
        self.new_ndparams["oob_program"] = \
922
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
923

    
924
    if self.op.hv_state:
925
      new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
926
                                           self.cluster.hv_state_static)
927
      self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
928
                               for hv, values in new_hv_state.items())
929

    
930
    if self.op.disk_state:
931
      new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
932
                                               self.cluster.disk_state_static)
933
      self.new_disk_state = \
934
        dict((storage, dict((name, cluster.SimpleFillDiskState(values))
935
                            for name, values in svalues.items()))
936
             for storage, svalues in new_disk_state.items())
937

    
938
    self._CheckIpolicy(cluster, enabled_disk_templates)
939

    
940
    if self.op.nicparams:
941
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
942
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
943
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
944
      nic_errors = []
945

    
946
      # check all instances for consistency
947
      for instance in self.cfg.GetAllInstancesInfo().values():
948
        for nic_idx, nic in enumerate(instance.nics):
949
          params_copy = copy.deepcopy(nic.nicparams)
950
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
951

    
952
          # check parameter syntax
953
          try:
954
            objects.NIC.CheckParameterSyntax(params_filled)
955
          except errors.ConfigurationError, err:
956
            nic_errors.append("Instance %s, nic/%d: %s" %
957
                              (instance.name, nic_idx, err))
958

    
959
          # if we're moving instances to routed, check that they have an ip
960
          target_mode = params_filled[constants.NIC_MODE]
961
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
962
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
963
                              " address" % (instance.name, nic_idx))
964
      if nic_errors:
965
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
966
                                   "\n".join(nic_errors), errors.ECODE_INVAL)
967

    
968
    # hypervisor list/parameters
969
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
970
    if self.op.hvparams:
971
      for hv_name, hv_dict in self.op.hvparams.items():
972
        if hv_name not in self.new_hvparams:
973
          self.new_hvparams[hv_name] = hv_dict
974
        else:
975
          self.new_hvparams[hv_name].update(hv_dict)
976

    
977
    # disk template parameters
978
    self.new_diskparams = objects.FillDict(cluster.diskparams, {})
979
    if self.op.diskparams:
980
      for dt_name, dt_params in self.op.diskparams.items():
981
        if dt_name not in self.new_diskparams:
982
          self.new_diskparams[dt_name] = dt_params
983
        else:
984
          self.new_diskparams[dt_name].update(dt_params)
985

    
986
    # os hypervisor parameters
987
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
988
    if self.op.os_hvp:
989
      for os_name, hvs in self.op.os_hvp.items():
990
        if os_name not in self.new_os_hvp:
991
          self.new_os_hvp[os_name] = hvs
992
        else:
993
          for hv_name, hv_dict in hvs.items():
994
            if hv_dict is None:
995
              # Delete if it exists
996
              self.new_os_hvp[os_name].pop(hv_name, None)
997
            elif hv_name not in self.new_os_hvp[os_name]:
998
              self.new_os_hvp[os_name][hv_name] = hv_dict
999
            else:
1000
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
1001

    
1002
    # os parameters
1003
    self.new_osp = objects.FillDict(cluster.osparams, {})
1004
    if self.op.osparams:
1005
      for os_name, osp in self.op.osparams.items():
1006
        if os_name not in self.new_osp:
1007
          self.new_osp[os_name] = {}
1008

    
1009
        self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
1010
                                                 use_none=True)
1011

    
1012
        if not self.new_osp[os_name]:
1013
          # we removed all parameters
1014
          del self.new_osp[os_name]
1015
        else:
1016
          # check the parameter validity (remote check)
1017
          CheckOSParams(self, False, [self.cfg.GetMasterNode()],
1018
                        os_name, self.new_osp[os_name])
1019

    
1020
    # changes to the hypervisor list
1021
    if self.op.enabled_hypervisors is not None:
1022
      self.hv_list = self.op.enabled_hypervisors
1023
      for hv in self.hv_list:
1024
        # if the hypervisor doesn't already exist in the cluster
1025
        # hvparams, we initialize it to empty, and then (in both
1026
        # cases) we make sure to fill the defaults, as we might not
1027
        # have a complete defaults list if the hypervisor wasn't
1028
        # enabled before
1029
        if hv not in new_hvp:
1030
          new_hvp[hv] = {}
1031
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
1032
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
1033
    else:
1034
      self.hv_list = cluster.enabled_hypervisors
1035

    
1036
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1037
      # either the enabled list has changed, or the parameters have, validate
1038
      for hv_name, hv_params in self.new_hvparams.items():
1039
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1040
            (self.op.enabled_hypervisors and
1041
             hv_name in self.op.enabled_hypervisors)):
1042
          # either this is a new hypervisor, or its parameters have changed
1043
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1044
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1045
          hv_class.CheckParameterSyntax(hv_params)
1046
          CheckHVParams(self, node_uuids, hv_name, hv_params)
1047

    
1048
    self._CheckDiskTemplateConsistency()
1049

    
1050
    if self.op.os_hvp:
1051
      # no need to check any newly-enabled hypervisors, since the
1052
      # defaults have already been checked in the above code-block
1053
      for os_name, os_hvp in self.new_os_hvp.items():
1054
        for hv_name, hv_params in os_hvp.items():
1055
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1056
          # we need to fill in the new os_hvp on top of the actual hv_p
1057
          cluster_defaults = self.new_hvparams.get(hv_name, {})
1058
          new_osp = objects.FillDict(cluster_defaults, hv_params)
1059
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1060
          hv_class.CheckParameterSyntax(new_osp)
1061
          CheckHVParams(self, node_uuids, hv_name, new_osp)
1062

    
1063
    if self.op.default_iallocator:
1064
      alloc_script = utils.FindFile(self.op.default_iallocator,
1065
                                    constants.IALLOCATOR_SEARCH_PATH,
1066
                                    os.path.isfile)
1067
      if alloc_script is None:
1068
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
1069
                                   " specified" % self.op.default_iallocator,
1070
                                   errors.ECODE_INVAL)
1071

    
1072
  def _CheckDiskTemplateConsistency(self):
1073
    """Check whether the disk templates that are going to be disabled
1074
       are still in use by some instances.
1075

1076
    """
1077
    if self.op.enabled_disk_templates:
1078
      cluster = self.cfg.GetClusterInfo()
1079
      instances = self.cfg.GetAllInstancesInfo()
1080

    
1081
      disk_templates_to_remove = set(cluster.enabled_disk_templates) \
1082
        - set(self.op.enabled_disk_templates)
1083
      for instance in instances.itervalues():
1084
        if instance.disk_template in disk_templates_to_remove:
1085
          raise errors.OpPrereqError("Cannot disable disk template '%s',"
1086
                                     " because instance '%s' is using it." %
1087
                                     (instance.disk_template, instance.name))
1088

    
1089
  def _SetVgName(self, feedback_fn):
1090
    """Determines and sets the new volume group name.
1091

1092
    """
1093
    if self.op.vg_name is not None:
1094
      new_volume = self.op.vg_name
1095
      if not new_volume:
1096
        new_volume = None
1097
      if new_volume != self.cfg.GetVGName():
1098
        self.cfg.SetVGName(new_volume)
1099
      else:
1100
        feedback_fn("Cluster LVM configuration already in desired"
1101
                    " state, not changing")
1102

    
1103
  def _SetFileStorageDir(self, feedback_fn):
1104
    """Set the file storage directory.
1105

1106
    """
1107
    if self.op.file_storage_dir is not None:
1108
      if self.cluster.file_storage_dir == self.op.file_storage_dir:
1109
        feedback_fn("Global file storage dir already set to value '%s'"
1110
                    % self.cluster.file_storage_dir)
1111
      else:
1112
        self.cluster.file_storage_dir = self.op.file_storage_dir
1113

    
1114
  def Exec(self, feedback_fn):
1115
    """Change the parameters of the cluster.
1116

1117
    """
1118
    if self.op.enabled_disk_templates:
1119
      self.cluster.enabled_disk_templates = \
1120
        list(set(self.op.enabled_disk_templates))
1121

    
1122
    self._SetVgName(feedback_fn)
1123
    self._SetFileStorageDir(feedback_fn)
1124

    
1125
    if self.op.drbd_helper is not None:
1126
      if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
1127
        feedback_fn("Note that you specified a drbd user helper, but did"
1128
                    " enabled the drbd disk template.")
1129
      new_helper = self.op.drbd_helper
1130
      if not new_helper:
1131
        new_helper = None
1132
      if new_helper != self.cfg.GetDRBDHelper():
1133
        self.cfg.SetDRBDHelper(new_helper)
1134
      else:
1135
        feedback_fn("Cluster DRBD helper already in desired state,"
1136
                    " not changing")
1137
    if self.op.hvparams:
1138
      self.cluster.hvparams = self.new_hvparams
1139
    if self.op.os_hvp:
1140
      self.cluster.os_hvp = self.new_os_hvp
1141
    if self.op.enabled_hypervisors is not None:
1142
      self.cluster.hvparams = self.new_hvparams
1143
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1144
    if self.op.beparams:
1145
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1146
    if self.op.nicparams:
1147
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1148
    if self.op.ipolicy:
1149
      self.cluster.ipolicy = self.new_ipolicy
1150
    if self.op.osparams:
1151
      self.cluster.osparams = self.new_osp
1152
    if self.op.ndparams:
1153
      self.cluster.ndparams = self.new_ndparams
1154
    if self.op.diskparams:
1155
      self.cluster.diskparams = self.new_diskparams
1156
    if self.op.hv_state:
1157
      self.cluster.hv_state_static = self.new_hv_state
1158
    if self.op.disk_state:
1159
      self.cluster.disk_state_static = self.new_disk_state
1160

    
1161
    if self.op.candidate_pool_size is not None:
1162
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1163
      # we need to update the pool size here, otherwise the save will fail
1164
      AdjustCandidatePool(self, [])
1165

    
1166
    if self.op.maintain_node_health is not None:
1167
      if self.op.maintain_node_health and not constants.ENABLE_CONFD:
1168
        feedback_fn("Note: CONFD was disabled at build time, node health"
1169
                    " maintenance is not useful (still enabling it)")
1170
      self.cluster.maintain_node_health = self.op.maintain_node_health
1171

    
1172
    if self.op.modify_etc_hosts is not None:
1173
      self.cluster.modify_etc_hosts = self.op.modify_etc_hosts
1174

    
1175
    if self.op.prealloc_wipe_disks is not None:
1176
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
1177

    
1178
    if self.op.add_uids is not None:
1179
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
1180

    
1181
    if self.op.remove_uids is not None:
1182
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
1183

    
1184
    if self.op.uid_pool is not None:
1185
      self.cluster.uid_pool = self.op.uid_pool
1186

    
1187
    if self.op.default_iallocator is not None:
1188
      self.cluster.default_iallocator = self.op.default_iallocator
1189

    
1190
    if self.op.reserved_lvs is not None:
1191
      self.cluster.reserved_lvs = self.op.reserved_lvs
1192

    
1193
    if self.op.use_external_mip_script is not None:
1194
      self.cluster.use_external_mip_script = self.op.use_external_mip_script
1195

    
1196
    def helper_os(aname, mods, desc):
1197
      desc += " OS list"
1198
      lst = getattr(self.cluster, aname)
1199
      for key, val in mods:
1200
        if key == constants.DDM_ADD:
1201
          if val in lst:
1202
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1203
          else:
1204
            lst.append(val)
1205
        elif key == constants.DDM_REMOVE:
1206
          if val in lst:
1207
            lst.remove(val)
1208
          else:
1209
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1210
        else:
1211
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
1212

    
1213
    if self.op.hidden_os:
1214
      helper_os("hidden_os", self.op.hidden_os, "hidden")
1215

    
1216
    if self.op.blacklisted_os:
1217
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1218

    
1219
    if self.op.master_netdev:
1220
      master_params = self.cfg.GetMasterNetworkParameters()
1221
      ems = self.cfg.GetUseExternalMipScript()
1222
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
1223
                  self.cluster.master_netdev)
1224
      result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
1225
                                                       master_params, ems)
1226
      if not self.op.force:
1227
        result.Raise("Could not disable the master ip")
1228
      else:
1229
        if result.fail_msg:
1230
          msg = ("Could not disable the master ip (continuing anyway): %s" %
1231
                 result.fail_msg)
1232
          feedback_fn(msg)
1233
      feedback_fn("Changing master_netdev from %s to %s" %
1234
                  (master_params.netdev, self.op.master_netdev))
1235
      self.cluster.master_netdev = self.op.master_netdev
1236

    
1237
    if self.op.master_netmask:
1238
      master_params = self.cfg.GetMasterNetworkParameters()
1239
      feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1240
      result = self.rpc.call_node_change_master_netmask(
1241
                 master_params.uuid, master_params.netmask,
1242
                 self.op.master_netmask, master_params.ip,
1243
                 master_params.netdev)
1244
      result.Warn("Could not change the master IP netmask", feedback_fn)
1245
      self.cluster.master_netmask = self.op.master_netmask
1246

    
1247
    self.cfg.Update(self.cluster, feedback_fn)
1248

    
1249
    if self.op.master_netdev:
1250
      master_params = self.cfg.GetMasterNetworkParameters()
1251
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
1252
                  self.op.master_netdev)
1253
      ems = self.cfg.GetUseExternalMipScript()
1254
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
1255
                                                     master_params, ems)
1256
      result.Warn("Could not re-enable the master ip on the master,"
1257
                  " please restart manually", self.LogWarning)
1258

    
1259

    
1260
class LUClusterVerify(NoHooksLU):
1261
  """Submits all jobs necessary to verify the cluster.
1262

1263
  """
1264
  REQ_BGL = False
1265

    
1266
  def ExpandNames(self):
1267
    self.needed_locks = {}
1268

    
1269
  def Exec(self, feedback_fn):
1270
    jobs = []
1271

    
1272
    if self.op.group_name:
1273
      groups = [self.op.group_name]
1274
      depends_fn = lambda: None
1275
    else:
1276
      groups = self.cfg.GetNodeGroupList()
1277

    
1278
      # Verify global configuration
1279
      jobs.append([
1280
        opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1281
        ])
1282

    
1283
      # Always depend on global verification
1284
      depends_fn = lambda: [(-len(jobs), [])]
1285

    
1286
    jobs.extend(
1287
      [opcodes.OpClusterVerifyGroup(group_name=group,
1288
                                    ignore_errors=self.op.ignore_errors,
1289
                                    depends=depends_fn())]
1290
      for group in groups)
1291

    
1292
    # Fix up all parameters
1293
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1294
      op.debug_simulate_errors = self.op.debug_simulate_errors
1295
      op.verbose = self.op.verbose
1296
      op.error_codes = self.op.error_codes
1297
      try:
1298
        op.skip_checks = self.op.skip_checks
1299
      except AttributeError:
1300
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1301

    
1302
    return ResultWithJobs(jobs)
1303

    
1304

    
1305
class _VerifyErrors(object):
1306
  """Mix-in for cluster/group verify LUs.
1307

1308
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1309
  self.op and self._feedback_fn to be available.)
1310

1311
  """
1312

    
1313
  ETYPE_FIELD = "code"
1314
  ETYPE_ERROR = "ERROR"
1315
  ETYPE_WARNING = "WARNING"
1316

    
1317
  def _Error(self, ecode, item, msg, *args, **kwargs):
1318
    """Format an error message.
1319

1320
    Based on the opcode's error_codes parameter, either format a
1321
    parseable error code, or a simpler error string.
1322

1323
    This must be called only from Exec and functions called from Exec.
1324

1325
    """
1326
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1327
    itype, etxt, _ = ecode
1328
    # If the error code is in the list of ignored errors, demote the error to a
1329
    # warning
1330
    if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1331
      ltype = self.ETYPE_WARNING
1332
    # first complete the msg
1333
    if args:
1334
      msg = msg % args
1335
    # then format the whole message
1336
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1337
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1338
    else:
1339
      if item:
1340
        item = " " + item
1341
      else:
1342
        item = ""
1343
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1344
    # and finally report it via the feedback_fn
1345
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1346
    # do not mark the operation as failed for WARN cases only
1347
    if ltype == self.ETYPE_ERROR:
1348
      self.bad = True
1349

    
1350
  def _ErrorIf(self, cond, *args, **kwargs):
1351
    """Log an error message if the passed condition is True.
1352

1353
    """
1354
    if (bool(cond)
1355
        or self.op.debug_simulate_errors): # pylint: disable=E1101
1356
      self._Error(*args, **kwargs)
1357

    
1358

    
1359
def _VerifyCertificate(filename):
1360
  """Verifies a certificate for L{LUClusterVerifyConfig}.
1361

1362
  @type filename: string
1363
  @param filename: Path to PEM file
1364

1365
  """
1366
  try:
1367
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1368
                                           utils.ReadFile(filename))
1369
  except Exception, err: # pylint: disable=W0703
1370
    return (LUClusterVerifyConfig.ETYPE_ERROR,
1371
            "Failed to load X509 certificate %s: %s" % (filename, err))
1372

    
1373
  (errcode, msg) = \
1374
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1375
                                constants.SSL_CERT_EXPIRATION_ERROR)
1376

    
1377
  if msg:
1378
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1379
  else:
1380
    fnamemsg = None
1381

    
1382
  if errcode is None:
1383
    return (None, fnamemsg)
1384
  elif errcode == utils.CERT_WARNING:
1385
    return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1386
  elif errcode == utils.CERT_ERROR:
1387
    return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1388

    
1389
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1390

    
1391

    
1392
def _GetAllHypervisorParameters(cluster, instances):
1393
  """Compute the set of all hypervisor parameters.
1394

1395
  @type cluster: L{objects.Cluster}
1396
  @param cluster: the cluster object
1397
  @param instances: list of L{objects.Instance}
1398
  @param instances: additional instances from which to obtain parameters
1399
  @rtype: list of (origin, hypervisor, parameters)
1400
  @return: a list with all parameters found, indicating the hypervisor they
1401
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1402

1403
  """
1404
  hvp_data = []
1405

    
1406
  for hv_name in cluster.enabled_hypervisors:
1407
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1408

    
1409
  for os_name, os_hvp in cluster.os_hvp.items():
1410
    for hv_name, hv_params in os_hvp.items():
1411
      if hv_params:
1412
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1413
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1414

    
1415
  # TODO: collapse identical parameter values in a single one
1416
  for instance in instances:
1417
    if instance.hvparams:
1418
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1419
                       cluster.FillHV(instance)))
1420

    
1421
  return hvp_data
1422

    
1423

    
1424
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1425
  """Verifies the cluster config.
1426

1427
  """
1428
  REQ_BGL = False
1429

    
1430
  def _VerifyHVP(self, hvp_data):
1431
    """Verifies locally the syntax of the hypervisor parameters.
1432

1433
    """
1434
    for item, hv_name, hv_params in hvp_data:
1435
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1436
             (item, hv_name))
1437
      try:
1438
        hv_class = hypervisor.GetHypervisorClass(hv_name)
1439
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1440
        hv_class.CheckParameterSyntax(hv_params)
1441
      except errors.GenericError, err:
1442
        self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1443

    
1444
  def ExpandNames(self):
1445
    self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1446
    self.share_locks = ShareAll()
1447

    
1448
  def CheckPrereq(self):
1449
    """Check prerequisites.
1450

1451
    """
1452
    # Retrieve all information
1453
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1454
    self.all_node_info = self.cfg.GetAllNodesInfo()
1455
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1456

    
1457
  def Exec(self, feedback_fn):
1458
    """Verify integrity of cluster, performing various test on nodes.
1459

1460
    """
1461
    self.bad = False
1462
    self._feedback_fn = feedback_fn
1463

    
1464
    feedback_fn("* Verifying cluster config")
1465

    
1466
    for msg in self.cfg.VerifyConfig():
1467
      self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1468

    
1469
    feedback_fn("* Verifying cluster certificate files")
1470

    
1471
    for cert_filename in pathutils.ALL_CERT_FILES:
1472
      (errcode, msg) = _VerifyCertificate(cert_filename)
1473
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1474

    
1475
    self._ErrorIf(not utils.CanRead(constants.LUXID_USER,
1476
                                    pathutils.NODED_CERT_FILE),
1477
                  constants.CV_ECLUSTERCERT,
1478
                  None,
1479
                  pathutils.NODED_CERT_FILE + " must be accessible by the " +
1480
                    constants.LUXID_USER + " user")
1481

    
1482
    feedback_fn("* Verifying hypervisor parameters")
1483

    
1484
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1485
                                                self.all_inst_info.values()))
1486

    
1487
    feedback_fn("* Verifying all nodes belong to an existing group")
1488

    
1489
    # We do this verification here because, should this bogus circumstance
1490
    # occur, it would never be caught by VerifyGroup, which only acts on
1491
    # nodes/instances reachable from existing node groups.
1492

    
1493
    dangling_nodes = set(node for node in self.all_node_info.values()
1494
                         if node.group not in self.all_group_info)
1495

    
1496
    dangling_instances = {}
1497
    no_node_instances = []
1498

    
1499
    for inst in self.all_inst_info.values():
1500
      if inst.primary_node in [node.uuid for node in dangling_nodes]:
1501
        dangling_instances.setdefault(inst.primary_node, []).append(inst)
1502
      elif inst.primary_node not in self.all_node_info:
1503
        no_node_instances.append(inst)
1504

    
1505
    pretty_dangling = [
1506
        "%s (%s)" %
1507
        (node.name,
1508
         utils.CommaJoin(inst.name for
1509
                         inst in dangling_instances.get(node.uuid, [])))
1510
        for node in dangling_nodes]
1511

    
1512
    self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1513
                  None,
1514
                  "the following nodes (and their instances) belong to a non"
1515
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1516

    
1517
    self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1518
                  None,
1519
                  "the following instances have a non-existing primary-node:"
1520
                  " %s", utils.CommaJoin(inst.name for
1521
                                         inst in no_node_instances))
1522

    
1523
    return not self.bad
1524

    
1525

    
1526
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1527
  """Verifies the status of a node group.
1528

1529
  """
1530
  HPATH = "cluster-verify"
1531
  HTYPE = constants.HTYPE_CLUSTER
1532
  REQ_BGL = False
1533

    
1534
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1535

    
1536
  class NodeImage(object):
1537
    """A class representing the logical and physical status of a node.
1538

1539
    @type uuid: string
1540
    @ivar uuid: the node UUID to which this object refers
1541
    @ivar volumes: a structure as returned from
1542
        L{ganeti.backend.GetVolumeList} (runtime)
1543
    @ivar instances: a list of running instances (runtime)
1544
    @ivar pinst: list of configured primary instances (config)
1545
    @ivar sinst: list of configured secondary instances (config)
1546
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1547
        instances for which this node is secondary (config)
1548
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1549
    @ivar dfree: free disk, as reported by the node (runtime)
1550
    @ivar offline: the offline status (config)
1551
    @type rpc_fail: boolean
1552
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1553
        not whether the individual keys were correct) (runtime)
1554
    @type lvm_fail: boolean
1555
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1556
    @type hyp_fail: boolean
1557
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1558
    @type ghost: boolean
1559
    @ivar ghost: whether this is a known node or not (config)
1560
    @type os_fail: boolean
1561
    @ivar os_fail: whether the RPC call didn't return valid OS data
1562
    @type oslist: list
1563
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1564
    @type vm_capable: boolean
1565
    @ivar vm_capable: whether the node can host instances
1566
    @type pv_min: float
1567
    @ivar pv_min: size in MiB of the smallest PVs
1568
    @type pv_max: float
1569
    @ivar pv_max: size in MiB of the biggest PVs
1570

1571
    """
1572
    def __init__(self, offline=False, uuid=None, vm_capable=True):
1573
      self.uuid = uuid
1574
      self.volumes = {}
1575
      self.instances = []
1576
      self.pinst = []
1577
      self.sinst = []
1578
      self.sbp = {}
1579
      self.mfree = 0
1580
      self.dfree = 0
1581
      self.offline = offline
1582
      self.vm_capable = vm_capable
1583
      self.rpc_fail = False
1584
      self.lvm_fail = False
1585
      self.hyp_fail = False
1586
      self.ghost = False
1587
      self.os_fail = False
1588
      self.oslist = {}
1589
      self.pv_min = None
1590
      self.pv_max = None
1591

    
1592
  def ExpandNames(self):
1593
    # This raises errors.OpPrereqError on its own:
1594
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1595

    
1596
    # Get instances in node group; this is unsafe and needs verification later
1597
    inst_uuids = \
1598
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1599

    
1600
    self.needed_locks = {
1601
      locking.LEVEL_INSTANCE: self.cfg.GetInstanceNames(inst_uuids),
1602
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1603
      locking.LEVEL_NODE: [],
1604

    
1605
      # This opcode is run by watcher every five minutes and acquires all nodes
1606
      # for a group. It doesn't run for a long time, so it's better to acquire
1607
      # the node allocation lock as well.
1608
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1609
      }
1610

    
1611
    self.share_locks = ShareAll()
1612

    
1613
  def DeclareLocks(self, level):
1614
    if level == locking.LEVEL_NODE:
1615
      # Get members of node group; this is unsafe and needs verification later
1616
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1617

    
1618
      # In Exec(), we warn about mirrored instances that have primary and
1619
      # secondary living in separate node groups. To fully verify that
1620
      # volumes for these instances are healthy, we will need to do an
1621
      # extra call to their secondaries. We ensure here those nodes will
1622
      # be locked.
1623
      for inst_name in self.owned_locks(locking.LEVEL_INSTANCE):
1624
        # Important: access only the instances whose lock is owned
1625
        instance = self.cfg.GetInstanceInfoByName(inst_name)
1626
        if instance.disk_template in constants.DTS_INT_MIRROR:
1627
          nodes.update(instance.secondary_nodes)
1628

    
1629
      self.needed_locks[locking.LEVEL_NODE] = nodes
1630

    
1631
  def CheckPrereq(self):
1632
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1633
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1634

    
1635
    group_node_uuids = set(self.group_info.members)
1636
    group_inst_uuids = \
1637
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1638

    
1639
    unlocked_node_uuids = \
1640
        group_node_uuids.difference(self.owned_locks(locking.LEVEL_NODE))
1641

    
1642
    unlocked_inst_uuids = \
1643
        group_inst_uuids.difference(
1644
          [self.cfg.GetInstanceInfoByName(name).uuid
1645
           for name in self.owned_locks(locking.LEVEL_INSTANCE)])
1646

    
1647
    if unlocked_node_uuids:
1648
      raise errors.OpPrereqError(
1649
        "Missing lock for nodes: %s" %
1650
        utils.CommaJoin(self.cfg.GetNodeNames(unlocked_node_uuids)),
1651
        errors.ECODE_STATE)
1652

    
1653
    if unlocked_inst_uuids:
1654
      raise errors.OpPrereqError(
1655
        "Missing lock for instances: %s" %
1656
        utils.CommaJoin(self.cfg.GetInstanceNames(unlocked_inst_uuids)),
1657
        errors.ECODE_STATE)
1658

    
1659
    self.all_node_info = self.cfg.GetAllNodesInfo()
1660
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1661

    
1662
    self.my_node_uuids = group_node_uuids
1663
    self.my_node_info = dict((node_uuid, self.all_node_info[node_uuid])
1664
                             for node_uuid in group_node_uuids)
1665

    
1666
    self.my_inst_uuids = group_inst_uuids
1667
    self.my_inst_info = dict((inst_uuid, self.all_inst_info[inst_uuid])
1668
                             for inst_uuid in group_inst_uuids)
1669

    
1670
    # We detect here the nodes that will need the extra RPC calls for verifying
1671
    # split LV volumes; they should be locked.
1672
    extra_lv_nodes = set()
1673

    
1674
    for inst in self.my_inst_info.values():
1675
      if inst.disk_template in constants.DTS_INT_MIRROR:
1676
        for nuuid in inst.all_nodes:
1677
          if self.all_node_info[nuuid].group != self.group_uuid:
1678
            extra_lv_nodes.add(nuuid)
1679

    
1680
    unlocked_lv_nodes = \
1681
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1682

    
1683
    if unlocked_lv_nodes:
1684
      raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1685
                                 utils.CommaJoin(unlocked_lv_nodes),
1686
                                 errors.ECODE_STATE)
1687
    self.extra_lv_nodes = list(extra_lv_nodes)
1688

    
1689
  def _VerifyNode(self, ninfo, nresult):
1690
    """Perform some basic validation on data returned from a node.
1691

1692
      - check the result data structure is well formed and has all the
1693
        mandatory fields
1694
      - check ganeti version
1695

1696
    @type ninfo: L{objects.Node}
1697
    @param ninfo: the node to check
1698
    @param nresult: the results from the node
1699
    @rtype: boolean
1700
    @return: whether overall this call was successful (and we can expect
1701
         reasonable values in the respose)
1702

1703
    """
1704
    # main result, nresult should be a non-empty dict
1705
    test = not nresult or not isinstance(nresult, dict)
1706
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1707
                  "unable to verify node: no data returned")
1708
    if test:
1709
      return False
1710

    
1711
    # compares ganeti version
1712
    local_version = constants.PROTOCOL_VERSION
1713
    remote_version = nresult.get("version", None)
1714
    test = not (remote_version and
1715
                isinstance(remote_version, (list, tuple)) and
1716
                len(remote_version) == 2)
1717
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1718
                  "connection to node returned invalid data")
1719
    if test:
1720
      return False
1721

    
1722
    test = local_version != remote_version[0]
1723
    self._ErrorIf(test, constants.CV_ENODEVERSION, ninfo.name,
1724
                  "incompatible protocol versions: master %s,"
1725
                  " node %s", local_version, remote_version[0])
1726
    if test:
1727
      return False
1728

    
1729
    # node seems compatible, we can actually try to look into its results
1730

    
1731
    # full package version
1732
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1733
                  constants.CV_ENODEVERSION, ninfo.name,
1734
                  "software version mismatch: master %s, node %s",
1735
                  constants.RELEASE_VERSION, remote_version[1],
1736
                  code=self.ETYPE_WARNING)
1737

    
1738
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1739
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1740
      for hv_name, hv_result in hyp_result.iteritems():
1741
        test = hv_result is not None
1742
        self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
1743
                      "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1744

    
1745
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1746
    if ninfo.vm_capable and isinstance(hvp_result, list):
1747
      for item, hv_name, hv_result in hvp_result:
1748
        self._ErrorIf(True, constants.CV_ENODEHV, ninfo.name,
1749
                      "hypervisor %s parameter verify failure (source %s): %s",
1750
                      hv_name, item, hv_result)
1751

    
1752
    test = nresult.get(constants.NV_NODESETUP,
1753
                       ["Missing NODESETUP results"])
1754
    self._ErrorIf(test, constants.CV_ENODESETUP, ninfo.name,
1755
                  "node setup error: %s", "; ".join(test))
1756

    
1757
    return True
1758

    
1759
  def _VerifyNodeTime(self, ninfo, nresult,
1760
                      nvinfo_starttime, nvinfo_endtime):
1761
    """Check the node time.
1762

1763
    @type ninfo: L{objects.Node}
1764
    @param ninfo: the node to check
1765
    @param nresult: the remote results for the node
1766
    @param nvinfo_starttime: the start time of the RPC call
1767
    @param nvinfo_endtime: the end time of the RPC call
1768

1769
    """
1770
    ntime = nresult.get(constants.NV_TIME, None)
1771
    try:
1772
      ntime_merged = utils.MergeTime(ntime)
1773
    except (ValueError, TypeError):
1774
      self._ErrorIf(True, constants.CV_ENODETIME, ninfo.name,
1775
                    "Node returned invalid time")
1776
      return
1777

    
1778
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1779
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1780
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1781
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1782
    else:
1783
      ntime_diff = None
1784

    
1785
    self._ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, ninfo.name,
1786
                  "Node time diverges by at least %s from master node time",
1787
                  ntime_diff)
1788

    
1789
  def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1790
    """Check the node LVM results and update info for cross-node checks.
1791

1792
    @type ninfo: L{objects.Node}
1793
    @param ninfo: the node to check
1794
    @param nresult: the remote results for the node
1795
    @param vg_name: the configured VG name
1796
    @type nimg: L{NodeImage}
1797
    @param nimg: node image
1798

1799
    """
1800
    if vg_name is None:
1801
      return
1802

    
1803
    # checks vg existence and size > 20G
1804
    vglist = nresult.get(constants.NV_VGLIST, None)
1805
    test = not vglist
1806
    self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
1807
                  "unable to check volume groups")
1808
    if not test:
1809
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1810
                                            constants.MIN_VG_SIZE)
1811
      self._ErrorIf(vgstatus, constants.CV_ENODELVM, ninfo.name, vgstatus)
1812

    
1813
    # Check PVs
1814
    (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1815
    for em in errmsgs:
1816
      self._Error(constants.CV_ENODELVM, ninfo.name, em)
1817
    if pvminmax is not None:
1818
      (nimg.pv_min, nimg.pv_max) = pvminmax
1819

    
1820
  def _VerifyGroupDRBDVersion(self, node_verify_infos):
1821
    """Check cross-node DRBD version consistency.
1822

1823
    @type node_verify_infos: dict
1824
    @param node_verify_infos: infos about nodes as returned from the
1825
      node_verify call.
1826

1827
    """
1828
    node_versions = {}
1829
    for node_uuid, ndata in node_verify_infos.items():
1830
      nresult = ndata.payload
1831
      if nresult:
1832
        version = nresult.get(constants.NV_DRBDVERSION, "Missing DRBD version")
1833
        node_versions[node_uuid] = version
1834

    
1835
    if len(set(node_versions.values())) > 1:
1836
      for node_uuid, version in sorted(node_versions.items()):
1837
        msg = "DRBD version mismatch: %s" % version
1838
        self._Error(constants.CV_ENODEDRBDHELPER, node_uuid, msg,
1839
                    code=self.ETYPE_WARNING)
1840

    
1841
  def _VerifyGroupLVM(self, node_image, vg_name):
1842
    """Check cross-node consistency in LVM.
1843

1844
    @type node_image: dict
1845
    @param node_image: info about nodes, mapping from node to names to
1846
      L{NodeImage} objects
1847
    @param vg_name: the configured VG name
1848

1849
    """
1850
    if vg_name is None:
1851
      return
1852

    
1853
    # Only exclusive storage needs this kind of checks
1854
    if not self._exclusive_storage:
1855
      return
1856

    
1857
    # exclusive_storage wants all PVs to have the same size (approximately),
1858
    # if the smallest and the biggest ones are okay, everything is fine.
1859
    # pv_min is None iff pv_max is None
1860
    vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1861
    if not vals:
1862
      return
1863
    (pvmin, minnode_uuid) = min((ni.pv_min, ni.uuid) for ni in vals)
1864
    (pvmax, maxnode_uuid) = max((ni.pv_max, ni.uuid) for ni in vals)
1865
    bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1866
    self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1867
                  "PV sizes differ too much in the group; smallest (%s MB) is"
1868
                  " on %s, biggest (%s MB) is on %s",
1869
                  pvmin, self.cfg.GetNodeName(minnode_uuid),
1870
                  pvmax, self.cfg.GetNodeName(maxnode_uuid))
1871

    
1872
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1873
    """Check the node bridges.
1874

1875
    @type ninfo: L{objects.Node}
1876
    @param ninfo: the node to check
1877
    @param nresult: the remote results for the node
1878
    @param bridges: the expected list of bridges
1879

1880
    """
1881
    if not bridges:
1882
      return
1883

    
1884
    missing = nresult.get(constants.NV_BRIDGES, None)
1885
    test = not isinstance(missing, list)
1886
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1887
                  "did not return valid bridge information")
1888
    if not test:
1889
      self._ErrorIf(bool(missing), constants.CV_ENODENET, ninfo.name,
1890
                    "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1891

    
1892
  def _VerifyNodeUserScripts(self, ninfo, nresult):
1893
    """Check the results of user scripts presence and executability on the node
1894

1895
    @type ninfo: L{objects.Node}
1896
    @param ninfo: the node to check
1897
    @param nresult: the remote results for the node
1898

1899
    """
1900
    test = not constants.NV_USERSCRIPTS in nresult
1901
    self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1902
                  "did not return user scripts information")
1903

    
1904
    broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
1905
    if not test:
1906
      self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1907
                    "user scripts not present or not executable: %s" %
1908
                    utils.CommaJoin(sorted(broken_scripts)))
1909

    
1910
  def _VerifyNodeNetwork(self, ninfo, nresult):
1911
    """Check the node network connectivity results.
1912

1913
    @type ninfo: L{objects.Node}
1914
    @param ninfo: the node to check
1915
    @param nresult: the remote results for the node
1916

1917
    """
1918
    test = constants.NV_NODELIST not in nresult
1919
    self._ErrorIf(test, constants.CV_ENODESSH, ninfo.name,
1920
                  "node hasn't returned node ssh connectivity data")
1921
    if not test:
1922
      if nresult[constants.NV_NODELIST]:
1923
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1924
          self._ErrorIf(True, constants.CV_ENODESSH, ninfo.name,
1925
                        "ssh communication with node '%s': %s", a_node, a_msg)
1926

    
1927
    test = constants.NV_NODENETTEST not in nresult
1928
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1929
                  "node hasn't returned node tcp connectivity data")
1930
    if not test:
1931
      if nresult[constants.NV_NODENETTEST]:
1932
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1933
        for anode in nlist:
1934
          self._ErrorIf(True, constants.CV_ENODENET, ninfo.name,
1935
                        "tcp communication with node '%s': %s",
1936
                        anode, nresult[constants.NV_NODENETTEST][anode])
1937

    
1938
    test = constants.NV_MASTERIP not in nresult
1939
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1940
                  "node hasn't returned node master IP reachability data")
1941
    if not test:
1942
      if not nresult[constants.NV_MASTERIP]:
1943
        if ninfo.uuid == self.master_node:
1944
          msg = "the master node cannot reach the master IP (not configured?)"
1945
        else:
1946
          msg = "cannot reach the master IP"
1947
        self._ErrorIf(True, constants.CV_ENODENET, ninfo.name, msg)
1948

    
1949
  def _VerifyInstance(self, instance, node_image, diskstatus):
1950
    """Verify an instance.
1951

1952
    This function checks to see if the required block devices are
1953
    available on the instance's node, and that the nodes are in the correct
1954
    state.
1955

1956
    """
1957
    pnode_uuid = instance.primary_node
1958
    pnode_img = node_image[pnode_uuid]
1959
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
1960

    
1961
    node_vol_should = {}
1962
    instance.MapLVsByNode(node_vol_should)
1963

    
1964
    cluster = self.cfg.GetClusterInfo()
1965
    ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
1966
                                                            self.group_info)
1967
    err = ComputeIPolicyInstanceViolation(ipolicy, instance, self.cfg)
1968
    self._ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance.name,
1969
                  utils.CommaJoin(err), code=self.ETYPE_WARNING)
1970

    
1971
    for node_uuid in node_vol_should:
1972
      n_img = node_image[node_uuid]
1973
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1974
        # ignore missing volumes on offline or broken nodes
1975
        continue
1976
      for volume in node_vol_should[node_uuid]:
1977
        test = volume not in n_img.volumes
1978
        self._ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance.name,
1979
                      "volume %s missing on node %s", volume,
1980
                      self.cfg.GetNodeName(node_uuid))
1981

    
1982
    if instance.admin_state == constants.ADMINST_UP:
1983
      test = instance.uuid not in pnode_img.instances and not pnode_img.offline
1984
      self._ErrorIf(test, constants.CV_EINSTANCEDOWN, instance.name,
1985
                    "instance not running on its primary node %s",
1986
                     self.cfg.GetNodeName(pnode_uuid))
1987
      self._ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE,
1988
                    instance.name, "instance is marked as running and lives on"
1989
                    " offline node %s", self.cfg.GetNodeName(pnode_uuid))
1990

    
1991
    diskdata = [(nname, success, status, idx)
1992
                for (nname, disks) in diskstatus.items()
1993
                for idx, (success, status) in enumerate(disks)]
1994

    
1995
    for nname, success, bdev_status, idx in diskdata:
1996
      # the 'ghost node' construction in Exec() ensures that we have a
1997
      # node here
1998
      snode = node_image[nname]
1999
      bad_snode = snode.ghost or snode.offline
2000
      self._ErrorIf(instance.disks_active and
2001
                    not success and not bad_snode,
2002
                    constants.CV_EINSTANCEFAULTYDISK, instance.name,
2003
                    "couldn't retrieve status for disk/%s on %s: %s",
2004
                    idx, self.cfg.GetNodeName(nname), bdev_status)
2005

    
2006
      if instance.disks_active and success and \
2007
         (bdev_status.is_degraded or
2008
          bdev_status.ldisk_status != constants.LDS_OKAY):
2009
        msg = "disk/%s on %s" % (idx, self.cfg.GetNodeName(nname))
2010
        if bdev_status.is_degraded:
2011
          msg += " is degraded"
2012
        if bdev_status.ldisk_status != constants.LDS_OKAY:
2013
          msg += "; state is '%s'" % \
2014
                 constants.LDS_NAMES[bdev_status.ldisk_status]
2015

    
2016
        self._Error(constants.CV_EINSTANCEFAULTYDISK, instance.name, msg)
2017

    
2018
    self._ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2019
                  constants.CV_ENODERPC, self.cfg.GetNodeName(pnode_uuid),
2020
                  "instance %s, connection to primary node failed",
2021
                  instance.name)
2022

    
2023
    self._ErrorIf(len(instance.secondary_nodes) > 1,
2024
                  constants.CV_EINSTANCELAYOUT, instance.name,
2025
                  "instance has multiple secondary nodes: %s",
2026
                  utils.CommaJoin(instance.secondary_nodes),
2027
                  code=self.ETYPE_WARNING)
2028

    
2029
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, instance.all_nodes)
2030
    if any(es_flags.values()):
2031
      if instance.disk_template not in constants.DTS_EXCL_STORAGE:
2032
        # Disk template not compatible with exclusive_storage: no instance
2033
        # node should have the flag set
2034
        es_nodes = [n
2035
                    for (n, es) in es_flags.items()
2036
                    if es]
2037
        self._Error(constants.CV_EINSTANCEUNSUITABLENODE, instance.name,
2038
                    "instance has template %s, which is not supported on nodes"
2039
                    " that have exclusive storage set: %s",
2040
                    instance.disk_template,
2041
                    utils.CommaJoin(self.cfg.GetNodeNames(es_nodes)))
2042
      for (idx, disk) in enumerate(instance.disks):
2043
        self._ErrorIf(disk.spindles is None,
2044
                      constants.CV_EINSTANCEMISSINGCFGPARAMETER, instance.name,
2045
                      "number of spindles not configured for disk %s while"
2046
                      " exclusive storage is enabled, try running"
2047
                      " gnt-cluster repair-disk-sizes", idx)
2048

    
2049
    if instance.disk_template in constants.DTS_INT_MIRROR:
2050
      instance_nodes = utils.NiceSort(instance.all_nodes)
2051
      instance_groups = {}
2052

    
2053
      for node_uuid in instance_nodes:
2054
        instance_groups.setdefault(self.all_node_info[node_uuid].group,
2055
                                   []).append(node_uuid)
2056

    
2057
      pretty_list = [
2058
        "%s (group %s)" % (utils.CommaJoin(self.cfg.GetNodeNames(nodes)),
2059
                           groupinfo[group].name)
2060
        # Sort so that we always list the primary node first.
2061
        for group, nodes in sorted(instance_groups.items(),
2062
                                   key=lambda (_, nodes): pnode_uuid in nodes,
2063
                                   reverse=True)]
2064

    
2065
      self._ErrorIf(len(instance_groups) > 1,
2066
                    constants.CV_EINSTANCESPLITGROUPS,
2067
                    instance.name, "instance has primary and secondary nodes in"
2068
                    " different groups: %s", utils.CommaJoin(pretty_list),
2069
                    code=self.ETYPE_WARNING)
2070

    
2071
    inst_nodes_offline = []
2072
    for snode in instance.secondary_nodes:
2073
      s_img = node_image[snode]
2074
      self._ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
2075
                    self.cfg.GetNodeName(snode),
2076
                    "instance %s, connection to secondary node failed",
2077
                    instance.name)
2078

    
2079
      if s_img.offline:
2080
        inst_nodes_offline.append(snode)
2081

    
2082
    # warn that the instance lives on offline nodes
2083
    self._ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE,
2084
                  instance.name, "instance has offline secondary node(s) %s",
2085
                  utils.CommaJoin(self.cfg.GetNodeNames(inst_nodes_offline)))
2086
    # ... or ghost/non-vm_capable nodes
2087
    for node_uuid in instance.all_nodes:
2088
      self._ErrorIf(node_image[node_uuid].ghost, constants.CV_EINSTANCEBADNODE,
2089
                    instance.name, "instance lives on ghost node %s",
2090
                    self.cfg.GetNodeName(node_uuid))
2091
      self._ErrorIf(not node_image[node_uuid].vm_capable,
2092
                    constants.CV_EINSTANCEBADNODE, instance.name,
2093
                    "instance lives on non-vm_capable node %s",
2094
                    self.cfg.GetNodeName(node_uuid))
2095

    
2096
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2097
    """Verify if there are any unknown volumes in the cluster.
2098

2099
    The .os, .swap and backup volumes are ignored. All other volumes are
2100
    reported as unknown.
2101

2102
    @type reserved: L{ganeti.utils.FieldSet}
2103
    @param reserved: a FieldSet of reserved volume names
2104

2105
    """
2106
    for node_uuid, n_img in node_image.items():
2107
      if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
2108
          self.all_node_info[node_uuid].group != self.group_uuid):
2109
        # skip non-healthy nodes
2110
        continue
2111
      for volume in n_img.volumes:
2112
        test = ((node_uuid not in node_vol_should or
2113
                volume not in node_vol_should[node_uuid]) and
2114
                not reserved.Matches(volume))
2115
        self._ErrorIf(test, constants.CV_ENODEORPHANLV,
2116
                      self.cfg.GetNodeName(node_uuid),
2117
                      "volume %s is unknown", volume)
2118

    
2119
  def _VerifyNPlusOneMemory(self, node_image, all_insts):
2120
    """Verify N+1 Memory Resilience.
2121

2122
    Check that if one single node dies we can still start all the
2123
    instances it was primary for.
2124

2125
    """
2126
    cluster_info = self.cfg.GetClusterInfo()
2127
    for node_uuid, n_img in node_image.items():
2128
      # This code checks that every node which is now listed as
2129
      # secondary has enough memory to host all instances it is
2130
      # supposed to should a single other node in the cluster fail.
2131
      # FIXME: not ready for failover to an arbitrary node
2132
      # FIXME: does not support file-backed instances
2133
      # WARNING: we currently take into account down instances as well
2134
      # as up ones, considering that even if they're down someone
2135
      # might want to start them even in the event of a node failure.
2136
      if n_img.offline or \
2137
         self.all_node_info[node_uuid].group != self.group_uuid:
2138
        # we're skipping nodes marked offline and nodes in other groups from
2139
        # the N+1 warning, since most likely we don't have good memory
2140
        # infromation from them; we already list instances living on such
2141
        # nodes, and that's enough warning
2142
        continue
2143
      #TODO(dynmem): also consider ballooning out other instances
2144
      for prinode, inst_uuids in n_img.sbp.items():
2145
        needed_mem = 0
2146
        for inst_uuid in inst_uuids:
2147
          bep = cluster_info.FillBE(all_insts[inst_uuid])
2148
          if bep[constants.BE_AUTO_BALANCE]:
2149
            needed_mem += bep[constants.BE_MINMEM]
2150
        test = n_img.mfree < needed_mem
2151
        self._ErrorIf(test, constants.CV_ENODEN1,
2152
                      self.cfg.GetNodeName(node_uuid),
2153
                      "not enough memory to accomodate instance failovers"
2154
                      " should node %s fail (%dMiB needed, %dMiB available)",
2155
                      self.cfg.GetNodeName(prinode), needed_mem, n_img.mfree)
2156

    
2157
  def _VerifyFiles(self, nodes, master_node_uuid, all_nvinfo,
2158
                   (files_all, files_opt, files_mc, files_vm)):
2159
    """Verifies file checksums collected from all nodes.
2160

2161
    @param nodes: List of L{objects.Node} objects
2162
    @param master_node_uuid: UUID of master node
2163
    @param all_nvinfo: RPC results
2164

2165
    """
2166
    # Define functions determining which nodes to consider for a file
2167
    files2nodefn = [
2168
      (files_all, None),
2169
      (files_mc, lambda node: (node.master_candidate or
2170
                               node.uuid == master_node_uuid)),
2171
      (files_vm, lambda node: node.vm_capable),
2172
      ]
2173

    
2174
    # Build mapping from filename to list of nodes which should have the file
2175
    nodefiles = {}
2176
    for (files, fn) in files2nodefn:
2177
      if fn is None:
2178
        filenodes = nodes
2179
      else:
2180
        filenodes = filter(fn, nodes)
2181
      nodefiles.update((filename,
2182
                        frozenset(map(operator.attrgetter("uuid"), filenodes)))
2183
                       for filename in files)
2184

    
2185
    assert set(nodefiles) == (files_all | files_mc | files_vm)
2186

    
2187
    fileinfo = dict((filename, {}) for filename in nodefiles)
2188
    ignore_nodes = set()
2189

    
2190
    for node in nodes:
2191
      if node.offline:
2192
        ignore_nodes.add(node.uuid)
2193
        continue
2194

    
2195
      nresult = all_nvinfo[node.uuid]
2196

    
2197
      if nresult.fail_msg or not nresult.payload:
2198
        node_files = None
2199
      else:
2200
        fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
2201
        node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2202
                          for (key, value) in fingerprints.items())
2203
        del fingerprints
2204

    
2205
      test = not (node_files and isinstance(node_files, dict))
2206
      self._ErrorIf(test, constants.CV_ENODEFILECHECK, node.name,
2207
                    "Node did not return file checksum data")
2208
      if test:
2209
        ignore_nodes.add(node.uuid)
2210
        continue
2211

    
2212
      # Build per-checksum mapping from filename to nodes having it
2213
      for (filename, checksum) in node_files.items():
2214
        assert filename in nodefiles
2215
        fileinfo[filename].setdefault(checksum, set()).add(node.uuid)
2216

    
2217
    for (filename, checksums) in fileinfo.items():
2218
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2219

    
2220
      # Nodes having the file
2221
      with_file = frozenset(node_uuid
2222
                            for node_uuids in fileinfo[filename].values()
2223
                            for node_uuid in node_uuids) - ignore_nodes
2224

    
2225
      expected_nodes = nodefiles[filename] - ignore_nodes
2226

    
2227
      # Nodes missing file
2228
      missing_file = expected_nodes - with_file
2229

    
2230
      if filename in files_opt:
2231
        # All or no nodes
2232
        self._ErrorIf(missing_file and missing_file != expected_nodes,
2233
                      constants.CV_ECLUSTERFILECHECK, None,
2234
                      "File %s is optional, but it must exist on all or no"
2235
                      " nodes (not found on %s)",
2236
                      filename,
2237
                      utils.CommaJoin(
2238
                        utils.NiceSort(
2239
                          map(self.cfg.GetNodeName, missing_file))))
2240
      else:
2241
        self._ErrorIf(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2242
                      "File %s is missing from node(s) %s", filename,
2243
                      utils.CommaJoin(
2244
                        utils.NiceSort(
2245
                          map(self.cfg.GetNodeName, missing_file))))
2246

    
2247
        # Warn if a node has a file it shouldn't
2248
        unexpected = with_file - expected_nodes
2249
        self._ErrorIf(unexpected,
2250
                      constants.CV_ECLUSTERFILECHECK, None,
2251
                      "File %s should not exist on node(s) %s",
2252
                      filename, utils.CommaJoin(
2253
                        utils.NiceSort(map(self.cfg.GetNodeName, unexpected))))
2254

    
2255
      # See if there are multiple versions of the file
2256
      test = len(checksums) > 1
2257
      if test:
2258
        variants = ["variant %s on %s" %
2259
                    (idx + 1,
2260
                     utils.CommaJoin(utils.NiceSort(
2261
                       map(self.cfg.GetNodeName, node_uuids))))
2262
                    for (idx, (checksum, node_uuids)) in
2263
                      enumerate(sorted(checksums.items()))]
2264
      else:
2265
        variants = []
2266

    
2267
      self._ErrorIf(test, constants.CV_ECLUSTERFILECHECK, None,
2268
                    "File %s found with %s different checksums (%s)",
2269
                    filename, len(checksums), "; ".join(variants))
2270

    
2271
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2272
                      drbd_map):
2273
    """Verifies and the node DRBD status.
2274

2275
    @type ninfo: L{objects.Node}
2276
    @param ninfo: the node to check
2277
    @param nresult: the remote results for the node
2278
    @param instanceinfo: the dict of instances
2279
    @param drbd_helper: the configured DRBD usermode helper
2280
    @param drbd_map: the DRBD map as returned by
2281
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2282

2283
    """
2284
    if drbd_helper:
2285
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2286
      test = (helper_result is None)
2287
      self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2288
                    "no drbd usermode helper returned")
2289
      if helper_result:
2290
        status, payload = helper_result
2291
        test = not status
2292
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2293
                      "drbd usermode helper check unsuccessful: %s", payload)
2294
        test = status and (payload != drbd_helper)
2295
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2296
                      "wrong drbd usermode helper: %s", payload)
2297

    
2298
    # compute the DRBD minors
2299
    node_drbd = {}
2300
    for minor, inst_uuid in drbd_map[ninfo.uuid].items():
2301
      test = inst_uuid not in instanceinfo
2302
      self._ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2303
                    "ghost instance '%s' in temporary DRBD map", inst_uuid)
2304
        # ghost instance should not be running, but otherwise we
2305
        # don't give double warnings (both ghost instance and
2306
        # unallocated minor in use)
2307
      if test:
2308
        node_drbd[minor] = (inst_uuid, False)
2309
      else:
2310
        instance = instanceinfo[inst_uuid]
2311
        node_drbd[minor] = (inst_uuid, instance.disks_active)
2312

    
2313
    # and now check them
2314
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2315
    test = not isinstance(used_minors, (tuple, list))
2316
    self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2317
                  "cannot parse drbd status file: %s", str(used_minors))
2318
    if test:
2319
      # we cannot check drbd status
2320
      return
2321

    
2322
    for minor, (inst_uuid, must_exist) in node_drbd.items():
2323
      test = minor not in used_minors and must_exist
2324
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2325
                    "drbd minor %d of instance %s is not active", minor,
2326
                    self.cfg.GetInstanceName(inst_uuid))
2327
    for minor in used_minors:
2328
      test = minor not in node_drbd
2329
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2330
                    "unallocated drbd minor %d is in use", minor)
2331

    
2332
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2333
    """Builds the node OS structures.
2334

2335
    @type ninfo: L{objects.Node}
2336
    @param ninfo: the node to check
2337
    @param nresult: the remote results for the node
2338
    @param nimg: the node image object
2339

2340
    """
2341
    remote_os = nresult.get(constants.NV_OSLIST, None)
2342
    test = (not isinstance(remote_os, list) or
2343
            not compat.all(isinstance(v, list) and len(v) == 7
2344
                           for v in remote_os))
2345

    
2346
    self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2347
                  "node hasn't returned valid OS data")
2348

    
2349
    nimg.os_fail = test
2350

    
2351
    if test:
2352
      return
2353

    
2354
    os_dict = {}
2355

    
2356
    for (name, os_path, status, diagnose,
2357
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2358

    
2359
      if name not in os_dict:
2360
        os_dict[name] = []
2361

    
2362
      # parameters is a list of lists instead of list of tuples due to
2363
      # JSON lacking a real tuple type, fix it:
2364
      parameters = [tuple(v) for v in parameters]
2365
      os_dict[name].append((os_path, status, diagnose,
2366
                            set(variants), set(parameters), set(api_ver)))
2367

    
2368
    nimg.oslist = os_dict
2369

    
2370
  def _VerifyNodeOS(self, ninfo, nimg, base):
2371
    """Verifies the node OS list.
2372

2373
    @type ninfo: L{objects.Node}
2374
    @param ninfo: the node to check
2375
    @param nimg: the node image object
2376
    @param base: the 'template' node we match against (e.g. from the master)
2377

2378
    """
2379
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2380

    
2381
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2382
    for os_name, os_data in nimg.oslist.items():
2383
      assert os_data, "Empty OS status for OS %s?!" % os_name
2384
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2385
      self._ErrorIf(not f_status, constants.CV_ENODEOS, ninfo.name,
2386
                    "Invalid OS %s (located at %s): %s",
2387
                    os_name, f_path, f_diag)
2388
      self._ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, ninfo.name,
2389
                    "OS '%s' has multiple entries"
2390
                    " (first one shadows the rest): %s",
2391
                    os_name, utils.CommaJoin([v[0] for v in os_data]))
2392
      # comparisons with the 'base' image
2393
      test = os_name not in base.oslist
2394
      self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2395
                    "Extra OS %s not present on reference node (%s)",
2396
                    os_name, self.cfg.GetNodeName(base.uuid))
2397
      if test:
2398
        continue
2399
      assert base.oslist[os_name], "Base node has empty OS status?"
2400
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2401
      if not b_status:
2402
        # base OS is invalid, skipping
2403
        continue
2404
      for kind, a, b in [("API version", f_api, b_api),
2405
                         ("variants list", f_var, b_var),
2406
                         ("parameters", beautify_params(f_param),
2407
                          beautify_params(b_param))]:
2408
        self._ErrorIf(a != b, constants.CV_ENODEOS, ninfo.name,
2409
                      "OS %s for %s differs from reference node %s:"
2410
                      " [%s] vs. [%s]", kind, os_name,
2411
                      self.cfg.GetNodeName(base.uuid),
2412
                      utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2413

    
2414
    # check any missing OSes
2415
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2416
    self._ErrorIf(missing, constants.CV_ENODEOS, ninfo.name,
2417
                  "OSes present on reference node %s"
2418
                  " but missing on this node: %s",
2419
                  self.cfg.GetNodeName(base.uuid), utils.CommaJoin(missing))
2420

    
2421
  def _VerifyAcceptedFileStoragePaths(self, ninfo, nresult, is_master):
2422
    """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2423

2424
    @type ninfo: L{objects.Node}
2425
    @param ninfo: the node to check
2426
    @param nresult: the remote results for the node
2427
    @type is_master: bool
2428
    @param is_master: Whether node is the master node
2429

2430
    """
2431
    cluster = self.cfg.GetClusterInfo()
2432
    if (is_master and
2433
        (cluster.IsFileStorageEnabled() or
2434
         cluster.IsSharedFileStorageEnabled())):
2435
      try:
2436
        fspaths = nresult[constants.NV_ACCEPTED_STORAGE_PATHS]
2437
      except KeyError:
2438
        # This should never happen
2439
        self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2440
                      "Node did not return forbidden file storage paths")
2441
      else:
2442
        self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2443
                      "Found forbidden file storage paths: %s",
2444
                      utils.CommaJoin(fspaths))
2445
    else:
2446
      self._ErrorIf(constants.NV_ACCEPTED_STORAGE_PATHS in nresult,
2447
                    constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2448
                    "Node should not have returned forbidden file storage"
2449
                    " paths")
2450

    
2451
  def _VerifyStoragePaths(self, ninfo, nresult, file_disk_template,
2452
                          verify_key, error_key):
2453
    """Verifies (file) storage paths.
2454

2455
    @type ninfo: L{objects.Node}
2456
    @param ninfo: the node to check
2457
    @param nresult: the remote results for the node
2458
    @type file_disk_template: string
2459
    @param file_disk_template: file-based disk template, whose directory
2460
        is supposed to be verified
2461
    @type verify_key: string
2462
    @param verify_key: key for the verification map of this file
2463
        verification step
2464
    @param error_key: error key to be added to the verification results
2465
        in case something goes wrong in this verification step
2466

2467
    """
2468
    assert (file_disk_template in
2469
            utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
2470
    cluster = self.cfg.GetClusterInfo()
2471
    if cluster.IsDiskTemplateEnabled(file_disk_template):
2472
      self._ErrorIf(
2473
          verify_key in nresult,
2474
          error_key, ninfo.name,
2475
          "The configured %s storage path is unusable: %s" %
2476
          (file_disk_template, nresult.get(verify_key)))
2477

    
2478
  def _VerifyFileStoragePaths(self, ninfo, nresult):
2479
    """Verifies (file) storage paths.
2480

2481
    @see: C{_VerifyStoragePaths}
2482

2483
    """
2484
    self._VerifyStoragePaths(
2485
        ninfo, nresult, constants.DT_FILE,
2486
        constants.NV_FILE_STORAGE_PATH,
2487
        constants.CV_ENODEFILESTORAGEPATHUNUSABLE)
2488

    
2489
  def _VerifySharedFileStoragePaths(self, ninfo, nresult):
2490
    """Verifies (file) storage paths.
2491

2492
    @see: C{_VerifyStoragePaths}
2493

2494
    """
2495
    self._VerifyStoragePaths(
2496
        ninfo, nresult, constants.DT_SHARED_FILE,
2497
        constants.NV_SHARED_FILE_STORAGE_PATH,
2498
        constants.CV_ENODESHAREDFILESTORAGEPATHUNUSABLE)
2499

    
2500
  def _VerifyOob(self, ninfo, nresult):
2501
    """Verifies out of band functionality of a node.
2502

2503
    @type ninfo: L{objects.Node}
2504
    @param ninfo: the node to check
2505
    @param nresult: the remote results for the node
2506

2507
    """
2508
    # We just have to verify the paths on master and/or master candidates
2509
    # as the oob helper is invoked on the master
2510
    if ((ninfo.master_candidate or ninfo.master_capable) and
2511
        constants.NV_OOB_PATHS in nresult):
2512
      for path_result in nresult[constants.NV_OOB_PATHS]:
2513
        self._ErrorIf(path_result, constants.CV_ENODEOOBPATH,
2514
                      ninfo.name, path_result)
2515

    
2516
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2517
    """Verifies and updates the node volume data.
2518

2519
    This function will update a L{NodeImage}'s internal structures
2520
    with data from the remote call.
2521

2522
    @type ninfo: L{objects.Node}
2523
    @param ninfo: the node to check
2524
    @param nresult: the remote results for the node
2525
    @param nimg: the node image object
2526
    @param vg_name: the configured VG name
2527

2528
    """
2529
    nimg.lvm_fail = True
2530
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2531
    if vg_name is None:
2532
      pass
2533
    elif isinstance(lvdata, basestring):
2534
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2535
                    "LVM problem on node: %s", utils.SafeEncode(lvdata))
2536
    elif not isinstance(lvdata, dict):
2537
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2538
                    "rpc call to node failed (lvlist)")
2539
    else:
2540
      nimg.volumes = lvdata
2541
      nimg.lvm_fail = False
2542

    
2543
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2544
    """Verifies and updates the node instance list.
2545

2546
    If the listing was successful, then updates this node's instance
2547
    list. Otherwise, it marks the RPC call as failed for the instance
2548
    list key.
2549

2550
    @type ninfo: L{objects.Node}
2551
    @param ninfo: the node to check
2552
    @param nresult: the remote results for the node
2553
    @param nimg: the node image object
2554

2555
    """
2556
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2557
    test = not isinstance(idata, list)
2558
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2559
                  "rpc call to node failed (instancelist): %s",
2560
                  utils.SafeEncode(str(idata)))
2561
    if test:
2562
      nimg.hyp_fail = True
2563
    else:
2564
      nimg.instances = [inst.uuid for (_, inst) in
2565
                        self.cfg.GetMultiInstanceInfoByName(idata)]
2566

    
2567
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2568
    """Verifies and computes a node information map
2569

2570
    @type ninfo: L{objects.Node}
2571
    @param ninfo: the node to check
2572
    @param nresult: the remote results for the node
2573
    @param nimg: the node image object
2574
    @param vg_name: the configured VG name
2575

2576
    """
2577
    # try to read free memory (from the hypervisor)
2578
    hv_info = nresult.get(constants.NV_HVINFO, None)
2579
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2580
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2581
                  "rpc call to node failed (hvinfo)")
2582
    if not test:
2583
      try:
2584
        nimg.mfree = int(hv_info["memory_free"])
2585
      except (ValueError, TypeError):
2586
        self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2587
                      "node returned invalid nodeinfo, check hypervisor")
2588

    
2589
    # FIXME: devise a free space model for file based instances as well
2590
    if vg_name is not None:
2591
      test = (constants.NV_VGLIST not in nresult or
2592
              vg_name not in nresult[constants.NV_VGLIST])
2593
      self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
2594
                    "node didn't return data for the volume group '%s'"
2595
                    " - it is either missing or broken", vg_name)
2596
      if not test:
2597
        try:
2598
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2599
        except (ValueError, TypeError):
2600
          self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2601
                        "node returned invalid LVM info, check LVM status")
2602

    
2603
  def _CollectDiskInfo(self, node_uuids, node_image, instanceinfo):
2604
    """Gets per-disk status information for all instances.
2605

2606
    @type node_uuids: list of strings
2607
    @param node_uuids: Node UUIDs
2608
    @type node_image: dict of (UUID, L{objects.Node})
2609
    @param node_image: Node objects
2610
    @type instanceinfo: dict of (UUID, L{objects.Instance})
2611
    @param instanceinfo: Instance objects
2612
    @rtype: {instance: {node: [(succes, payload)]}}
2613
    @return: a dictionary of per-instance dictionaries with nodes as
2614
        keys and disk information as values; the disk information is a
2615
        list of tuples (success, payload)
2616

2617
    """
2618
    node_disks = {}
2619
    node_disks_devonly = {}
2620
    diskless_instances = set()
2621
    diskless = constants.DT_DISKLESS
2622

    
2623
    for nuuid in node_uuids:
2624
      node_inst_uuids = list(itertools.chain(node_image[nuuid].pinst,
2625
                                             node_image[nuuid].sinst))
2626
      diskless_instances.update(uuid for uuid in node_inst_uuids
2627
                                if instanceinfo[uuid].disk_template == diskless)
2628
      disks = [(inst_uuid, disk)
2629
               for inst_uuid in node_inst_uuids
2630
               for disk in instanceinfo[inst_uuid].disks]
2631

    
2632
      if not disks:
2633
        # No need to collect data
2634
        continue
2635

    
2636
      node_disks[nuuid] = disks
2637

    
2638
      # _AnnotateDiskParams makes already copies of the disks
2639
      devonly = []
2640
      for (inst_uuid, dev) in disks:
2641
        (anno_disk,) = AnnotateDiskParams(instanceinfo[inst_uuid], [dev],
2642
                                          self.cfg)
2643
        self.cfg.SetDiskID(anno_disk, nuuid)
2644
        devonly.append(anno_disk)
2645

    
2646
      node_disks_devonly[nuuid] = devonly
2647

    
2648
    assert len(node_disks) == len(node_disks_devonly)
2649

    
2650
    # Collect data from all nodes with disks
2651
    result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2652
                                                          node_disks_devonly)
2653

    
2654
    assert len(result) == len(node_disks)
2655

    
2656
    instdisk = {}
2657

    
2658
    for (nuuid, nres) in result.items():
2659
      node = self.cfg.GetNodeInfo(nuuid)
2660
      disks = node_disks[node.uuid]
2661

    
2662
      if nres.offline:
2663
        # No data from this node
2664
        data = len(disks) * [(False, "node offline")]
2665
      else:
2666
        msg = nres.fail_msg
2667
        self._ErrorIf(msg, constants.CV_ENODERPC, node.name,
2668
                      "while getting disk information: %s", msg)
2669
        if msg:
2670
          # No data from this node
2671
          data = len(disks) * [(False, msg)]
2672
        else:
2673
          data = []
2674
          for idx, i in enumerate(nres.payload):
2675
            if isinstance(i, (tuple, list)) and len(i) == 2:
2676
              data.append(i)
2677
            else:
2678
              logging.warning("Invalid result from node %s, entry %d: %s",
2679
                              node.name, idx, i)
2680
              data.append((False, "Invalid result from the remote node"))
2681

    
2682
      for ((inst_uuid, _), status) in zip(disks, data):
2683
        instdisk.setdefault(inst_uuid, {}).setdefault(node.uuid, []) \
2684
          .append(status)
2685

    
2686
    # Add empty entries for diskless instances.
2687
    for inst_uuid in diskless_instances:
2688
      assert inst_uuid not in instdisk
2689
      instdisk[inst_uuid] = {}
2690

    
2691
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2692
                      len(nuuids) <= len(instanceinfo[inst].all_nodes) and
2693
                      compat.all(isinstance(s, (tuple, list)) and
2694
                                 len(s) == 2 for s in statuses)
2695
                      for inst, nuuids in instdisk.items()
2696
                      for nuuid, statuses in nuuids.items())
2697
    if __debug__:
2698
      instdisk_keys = set(instdisk)
2699
      instanceinfo_keys = set(instanceinfo)
2700
      assert instdisk_keys == instanceinfo_keys, \
2701
        ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2702
         (instdisk_keys, instanceinfo_keys))
2703

    
2704
    return instdisk
2705

    
2706
  @staticmethod
2707
  def _SshNodeSelector(group_uuid, all_nodes):
2708
    """Create endless iterators for all potential SSH check hosts.
2709

2710
    """
2711
    nodes = [node for node in all_nodes
2712
             if (node.group != group_uuid and
2713
                 not node.offline)]
2714
    keyfunc = operator.attrgetter("group")
2715

    
2716
    return map(itertools.cycle,
2717
               [sorted(map(operator.attrgetter("name"), names))
2718
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2719
                                                  keyfunc)])
2720

    
2721
  @classmethod
2722
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2723
    """Choose which nodes should talk to which other nodes.
2724

2725
    We will make nodes contact all nodes in their group, and one node from
2726
    every other group.
2727

2728
    @warning: This algorithm has a known issue if one node group is much
2729
      smaller than others (e.g. just one node). In such a case all other
2730
      nodes will talk to the single node.
2731

2732
    """
2733
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2734
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
2735

    
2736
    return (online_nodes,
2737
            dict((name, sorted([i.next() for i in sel]))
2738
                 for name in online_nodes))
2739

    
2740
  def BuildHooksEnv(self):
2741
    """Build hooks env.
2742

2743
    Cluster-Verify hooks just ran in the post phase and their failure makes
2744
    the output be logged in the verify output and the verification to fail.
2745

2746
    """
2747
    env = {
2748
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2749
      }
2750

    
2751
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2752
               for node in self.my_node_info.values())
2753

    
2754
    return env
2755

    
2756
  def BuildHooksNodes(self):
2757
    """Build hooks nodes.
2758

2759
    """
2760
    return ([], list(self.my_node_info.keys()))
2761

    
2762
  def Exec(self, feedback_fn):
2763
    """Verify integrity of the node group, performing various test on nodes.
2764

2765
    """
2766
    # This method has too many local variables. pylint: disable=R0914
2767
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
2768

    
2769
    if not self.my_node_uuids:
2770
      # empty node group
2771
      feedback_fn("* Empty node group, skipping verification")
2772
      return True
2773

    
2774
    self.bad = False
2775
    verbose = self.op.verbose
2776
    self._feedback_fn = feedback_fn
2777

    
2778
    vg_name = self.cfg.GetVGName()
2779
    drbd_helper = self.cfg.GetDRBDHelper()
2780
    cluster = self.cfg.GetClusterInfo()
2781
    hypervisors = cluster.enabled_hypervisors
2782
    node_data_list = self.my_node_info.values()
2783

    
2784
    i_non_redundant = [] # Non redundant instances
2785
    i_non_a_balanced = [] # Non auto-balanced instances
2786
    i_offline = 0 # Count of offline instances
2787
    n_offline = 0 # Count of offline nodes
2788
    n_drained = 0 # Count of nodes being drained
2789
    node_vol_should = {}
2790

    
2791
    # FIXME: verify OS list
2792

    
2793
    # File verification
2794
    filemap = ComputeAncillaryFiles(cluster, False)
2795

    
2796
    # do local checksums
2797
    master_node_uuid = self.master_node = self.cfg.GetMasterNode()
2798
    master_ip = self.cfg.GetMasterIP()
2799

    
2800
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_uuids))
2801

    
2802
    user_scripts = []
2803
    if self.cfg.GetUseExternalMipScript():
2804
      user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2805

    
2806
    node_verify_param = {
2807
      constants.NV_FILELIST:
2808
        map(vcluster.MakeVirtualPath,
2809
            utils.UniqueSequence(filename
2810
                                 for files in filemap
2811
                                 for filename in files)),
2812
      constants.NV_NODELIST:
2813
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2814
                                  self.all_node_info.values()),
2815
      constants.NV_HYPERVISOR: hypervisors,
2816
      constants.NV_HVPARAMS:
2817
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2818
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2819
                                 for node in node_data_list
2820
                                 if not node.offline],
2821
      constants.NV_INSTANCELIST: hypervisors,
2822
      constants.NV_VERSION: None,
2823
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2824
      constants.NV_NODESETUP: None,
2825
      constants.NV_TIME: None,
2826
      constants.NV_MASTERIP: (self.cfg.GetMasterNodeName(), master_ip),
2827
      constants.NV_OSLIST: None,
2828
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2829
      constants.NV_USERSCRIPTS: user_scripts,
2830
      }
2831

    
2832
    if vg_name is not None:
2833
      node_verify_param[constants.NV_VGLIST] = None
2834
      node_verify_param[constants.NV_LVLIST] = vg_name
2835
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2836

    
2837
    if drbd_helper:
2838
      node_verify_param[constants.NV_DRBDVERSION] = None
2839
      node_verify_param[constants.NV_DRBDLIST] = None
2840
      node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2841

    
2842
    if cluster.IsFileStorageEnabled() or \
2843
        cluster.IsSharedFileStorageEnabled():
2844
      # Load file storage paths only from master node
2845
      node_verify_param[constants.NV_ACCEPTED_STORAGE_PATHS] = \
2846
        self.cfg.GetMasterNodeName()
2847
      if cluster.IsFileStorageEnabled():
2848
        node_verify_param[constants.NV_FILE_STORAGE_PATH] = \
2849
          cluster.file_storage_dir
2850

    
2851
    # bridge checks
2852
    # FIXME: this needs to be changed per node-group, not cluster-wide
2853
    bridges = set()
2854
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2855
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2856
      bridges.add(default_nicpp[constants.NIC_LINK])
2857
    for inst_uuid in self.my_inst_info.values():
2858
      for nic in inst_uuid.nics:
2859
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
2860
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2861
          bridges.add(full_nic[constants.NIC_LINK])
2862

    
2863
    if bridges:
2864
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
2865

    
2866
    # Build our expected cluster state
2867
    node_image = dict((node.uuid, self.NodeImage(offline=node.offline,
2868
                                                 uuid=node.uuid,
2869
                                                 vm_capable=node.vm_capable))
2870
                      for node in node_data_list)
2871

    
2872
    # Gather OOB paths
2873
    oob_paths = []
2874
    for node in self.all_node_info.values():
2875
      path = SupportsOob(self.cfg, node)
2876
      if path and path not in oob_paths:
2877
        oob_paths.append(path)
2878

    
2879
    if oob_paths:
2880
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2881

    
2882
    for inst_uuid in self.my_inst_uuids:
2883
      instance = self.my_inst_info[inst_uuid]
2884
      if instance.admin_state == constants.ADMINST_OFFLINE:
2885
        i_offline += 1
2886

    
2887
      for nuuid in instance.all_nodes:
2888
        if nuuid not in node_image:
2889
          gnode = self.NodeImage(uuid=nuuid)
2890
          gnode.ghost = (nuuid not in self.all_node_info)
2891
          node_image[nuuid] = gnode
2892

    
2893
      instance.MapLVsByNode(node_vol_should)
2894

    
2895
      pnode = instance.primary_node
2896
      node_image[pnode].pinst.append(instance.uuid)
2897

    
2898
      for snode in instance.secondary_nodes:
2899
        nimg = node_image[snode]
2900
        nimg.sinst.append(instance.uuid)
2901
        if pnode not in nimg.sbp:
2902
          nimg.sbp[pnode] = []
2903
        nimg.sbp[pnode].append(instance.uuid)
2904

    
2905
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
2906
                                               self.my_node_info.keys())
2907
    # The value of exclusive_storage should be the same across the group, so if
2908
    # it's True for at least a node, we act as if it were set for all the nodes
2909
    self._exclusive_storage = compat.any(es_flags.values())
2910
    if self._exclusive_storage:
2911
      node_verify_param[constants.NV_EXCLUSIVEPVS] = True
2912

    
2913
    # At this point, we have the in-memory data structures complete,
2914
    # except for the runtime information, which we'll gather next
2915

    
2916
    # Due to the way our RPC system works, exact response times cannot be
2917
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2918
    # time before and after executing the request, we can at least have a time
2919
    # window.
2920
    nvinfo_starttime = time.time()
2921
    all_nvinfo = self.rpc.call_node_verify(self.my_node_uuids,
2922
                                           node_verify_param,
2923
                                           self.cfg.GetClusterName(),
2924
                                           self.cfg.GetClusterInfo().hvparams)
2925
    nvinfo_endtime = time.time()
2926

    
2927
    if self.extra_lv_nodes and vg_name is not None:
2928
      extra_lv_nvinfo = \
2929
          self.rpc.call_node_verify(self.extra_lv_nodes,
2930
                                    {constants.NV_LVLIST: vg_name},
2931
                                    self.cfg.GetClusterName(),
2932
                                    self.cfg.GetClusterInfo().hvparams)
2933
    else:
2934
      extra_lv_nvinfo = {}
2935

    
2936
    all_drbd_map = self.cfg.ComputeDRBDMap()
2937

    
2938
    feedback_fn("* Gathering disk information (%s nodes)" %
2939
                len(self.my_node_uuids))
2940
    instdisk = self._CollectDiskInfo(self.my_node_info.keys(), node_image,
2941
                                     self.my_inst_info)
2942

    
2943
    feedback_fn("* Verifying configuration file consistency")
2944

    
2945
    # If not all nodes are being checked, we need to make sure the master node
2946
    # and a non-checked vm_capable node are in the list.
2947
    absent_node_uuids = set(self.all_node_info).difference(self.my_node_info)
2948
    if absent_node_uuids:
2949
      vf_nvinfo = all_nvinfo.copy()
2950
      vf_node_info = list(self.my_node_info.values())
2951
      additional_node_uuids = []
2952
      if master_node_uuid not in self.my_node_info:
2953
        additional_node_uuids.append(master_node_uuid)
2954
        vf_node_info.append(self.all_node_info[master_node_uuid])
2955
      # Add the first vm_capable node we find which is not included,
2956
      # excluding the master node (which we already have)
2957
      for node_uuid in absent_node_uuids:
2958
        nodeinfo = self.all_node_info[node_uuid]
2959
        if (nodeinfo.vm_capable and not nodeinfo.offline and
2960
            node_uuid != master_node_uuid):
2961
          additional_node_uuids.append(node_uuid)
2962
          vf_node_info.append(self.all_node_info[node_uuid])
2963
          break
2964
      key = constants.NV_FILELIST
2965
      vf_nvinfo.update(self.rpc.call_node_verify(
2966
         additional_node_uuids, {key: node_verify_param[key]},
2967
         self.cfg.GetClusterName(), self.cfg.GetClusterInfo().hvparams))
2968
    else:
2969
      vf_nvinfo = all_nvinfo
2970
      vf_node_info = self.my_node_info.values()
2971

    
2972
    self._VerifyFiles(vf_node_info, master_node_uuid, vf_nvinfo, filemap)
2973

    
2974
    feedback_fn("* Verifying node status")
2975

    
2976
    refos_img = None
2977

    
2978
    for node_i in node_data_list:
2979
      nimg = node_image[node_i.uuid]
2980

    
2981
      if node_i.offline:
2982
        if verbose:
2983
          feedback_fn("* Skipping offline node %s" % (node_i.name,))
2984
        n_offline += 1
2985
        continue
2986

    
2987
      if node_i.uuid == master_node_uuid:
2988
        ntype = "master"
2989
      elif node_i.master_candidate:
2990
        ntype = "master candidate"
2991
      elif node_i.drained:
2992
        ntype = "drained"
2993
        n_drained += 1
2994
      else:
2995
        ntype = "regular"
2996
      if verbose:
2997
        feedback_fn("* Verifying node %s (%s)" % (node_i.name, ntype))
2998

    
2999
      msg = all_nvinfo[node_i.uuid].fail_msg
3000
      self._ErrorIf(msg, constants.CV_ENODERPC, node_i.name,
3001
                    "while contacting node: %s", msg)
3002
      if msg:
3003
        nimg.rpc_fail = True
3004
        continue
3005

    
3006
      nresult = all_nvinfo[node_i.uuid].payload
3007

    
3008
      nimg.call_ok = self._VerifyNode(node_i, nresult)
3009
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
3010
      self._VerifyNodeNetwork(node_i, nresult)
3011
      self._VerifyNodeUserScripts(node_i, nresult)
3012
      self._VerifyOob(node_i, nresult)
3013
      self._VerifyAcceptedFileStoragePaths(node_i, nresult,
3014
                                           node_i.uuid == master_node_uuid)
3015
      self._VerifyFileStoragePaths(node_i, nresult)
3016
      self._VerifySharedFileStoragePaths(node_i, nresult)
3017

    
3018
      if nimg.vm_capable:
3019
        self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
3020
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
3021
                             all_drbd_map)
3022

    
3023
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
3024
        self._UpdateNodeInstances(node_i, nresult, nimg)
3025
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
3026
        self._UpdateNodeOS(node_i, nresult, nimg)
3027

    
3028
        if not nimg.os_fail:
3029
          if refos_img is None:
3030
            refos_img = nimg
3031
          self._VerifyNodeOS(node_i, nimg, refos_img)
3032
        self._VerifyNodeBridges(node_i, nresult, bridges)
3033

    
3034
        # Check whether all running instances are primary for the node. (This
3035
        # can no longer be done from _VerifyInstance below, since some of the
3036
        # wrong instances could be from other node groups.)
3037
        non_primary_inst_uuids = set(nimg.instances).difference(nimg.pinst)
3038

    
3039
        for inst_uuid in non_primary_inst_uuids:
3040
          test = inst_uuid in self.all_inst_info
3041
          self._ErrorIf(test, constants.CV_EINSTANCEWRONGNODE,
3042
                        self.cfg.GetInstanceName(inst_uuid),
3043
                        "instance should not run on node %s", node_i.name)
3044
          self._ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
3045
                        "node is running unknown instance %s", inst_uuid)
3046

    
3047
    self._VerifyGroupDRBDVersion(all_nvinfo)
3048
    self._VerifyGroupLVM(node_image, vg_name)
3049

    
3050
    for node_uuid, result in extra_lv_nvinfo.items():
3051
      self._UpdateNodeVolumes(self.all_node_info[node_uuid], result.payload,
3052
                              node_image[node_uuid], vg_name)
3053

    
3054
    feedback_fn("* Verifying instance status")
3055
    for inst_uuid in self.my_inst_uuids:
3056
      instance = self.my_inst_info[inst_uuid]
3057
      if verbose:
3058
        feedback_fn("* Verifying instance %s" % instance.name)
3059
      self._VerifyInstance(instance, node_image, instdisk[inst_uuid])
3060

    
3061
      # If the instance is non-redundant we cannot survive losing its primary
3062
      # node, so we are not N+1 compliant.
3063
      if instance.disk_template not in constants.DTS_MIRRORED:
3064
        i_non_redundant.append(instance)
3065

    
3066
      if not cluster.FillBE(instance)[constants.BE_AUTO_BALANCE]:
3067
        i_non_a_balanced.append(instance)
3068

    
3069
    feedback_fn("* Verifying orphan volumes")
3070
    reserved = utils.FieldSet(*cluster.reserved_lvs)
3071

    
3072
    # We will get spurious "unknown volume" warnings if any node of this group
3073
    # is secondary for an instance whose primary is in another group. To avoid
3074
    # them, we find these instances and add their volumes to node_vol_should.
3075
    for instance in self.all_inst_info.values():
3076
      for secondary in instance.secondary_nodes:
3077
        if (secondary in self.my_node_info
3078
            and instance.name not in self.my_inst_info):
3079
          instance.MapLVsByNode(node_vol_should)
3080
          break
3081

    
3082
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
3083

    
3084
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
3085
      feedback_fn("* Verifying N+1 Memory redundancy")
3086
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
3087

    
3088
    feedback_fn("* Other Notes")
3089
    if i_non_redundant:
3090
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
3091
                  % len(i_non_redundant))
3092

    
3093
    if i_non_a_balanced:
3094
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
3095
                  % len(i_non_a_balanced))
3096

    
3097
    if i_offline:
3098
      feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
3099

    
3100
    if n_offline:
3101
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
3102

    
3103
    if n_drained:
3104
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
3105

    
3106
    return not self.bad
3107

    
3108
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
3109
    """Analyze the post-hooks' result
3110

3111
    This method analyses the hook result, handles it, and sends some
3112
    nicely-formatted feedback back to the user.
3113

3114
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
3115
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
3116
    @param hooks_results: the results of the multi-node hooks rpc call
3117
    @param feedback_fn: function used send feedback back to the caller
3118
    @param lu_result: previous Exec result
3119
    @return: the new Exec result, based on the previous result
3120
        and hook results
3121

3122
    """
3123
    # We only really run POST phase hooks, only for non-empty groups,
3124
    # and are only interested in their results
3125
    if not self.my_node_uuids:
3126
      # empty node group
3127
      pass
3128
    elif phase == constants.HOOKS_PHASE_POST:
3129
      # Used to change hooks' output to proper indentation
3130
      feedback_fn("* Hooks Results")
3131
      assert hooks_results, "invalid result from hooks"
3132

    
3133
      for node_name in hooks_results:
3134
        res = hooks_results[node_name]
3135
        msg = res.fail_msg
3136
        test = msg and not res.offline
3137
        self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3138
                      "Communication failure in hooks execution: %s", msg)
3139
        if res.offline or msg:
3140
          # No need to investigate payload if node is offline or gave
3141
          # an error.
3142
          continue
3143
        for script, hkr, output in res.payload:
3144
          test = hkr == constants.HKR_FAIL
3145
          self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3146
                        "Script %s failed, output:", script)
3147
          if test:
3148
            output = self._HOOKS_INDENT_RE.sub("      ", output)
3149
            feedback_fn("%s" % output)
3150
            lu_result = False
3151

    
3152
    return lu_result
3153

    
3154

    
3155
class LUClusterVerifyDisks(NoHooksLU):
3156
  """Verifies the cluster disks status.
3157

3158
  """
3159
  REQ_BGL = False
3160

    
3161
  def ExpandNames(self):
3162
    self.share_locks = ShareAll()
3163
    self.needed_locks = {
3164
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
3165
      }
3166

    
3167
  def Exec(self, feedback_fn):
3168
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3169

    
3170
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3171
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3172
                           for group in group_names])