Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / cluster.py @ c89eb67d

History | View | Annotate | Download (117.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with the cluster."""
23

    
24
import OpenSSL
25

    
26
import copy
27
import itertools
28
import logging
29
import operator
30
import os
31
import re
32
import time
33

    
34
from ganeti import compat
35
from ganeti import constants
36
from ganeti import errors
37
from ganeti import hypervisor
38
from ganeti import locking
39
from ganeti import masterd
40
from ganeti import netutils
41
from ganeti import objects
42
from ganeti import opcodes
43
from ganeti import pathutils
44
from ganeti import query
45
from ganeti import rpc
46
from ganeti import runtime
47
from ganeti import ssh
48
from ganeti import uidpool
49
from ganeti import utils
50
from ganeti import vcluster
51

    
52
from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
53
  ResultWithJobs
54
from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55
  ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56
  GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57
  GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58
  CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59
  ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob, \
60
  CheckIpolicyVsDiskTemplates
61

    
62
import ganeti.masterd.instance
63

    
64

    
65
class LUClusterActivateMasterIp(NoHooksLU):
66
  """Activate the master IP on the master node.
67

68
  """
69
  def Exec(self, feedback_fn):
70
    """Activate the master IP.
71

72
    """
73
    master_params = self.cfg.GetMasterNetworkParameters()
74
    ems = self.cfg.GetUseExternalMipScript()
75
    result = self.rpc.call_node_activate_master_ip(master_params.uuid,
76
                                                   master_params, ems)
77
    result.Raise("Could not activate the master IP")
78

    
79

    
80
class LUClusterDeactivateMasterIp(NoHooksLU):
81
  """Deactivate the master IP on the master node.
82

83
  """
84
  def Exec(self, feedback_fn):
85
    """Deactivate the master IP.
86

87
    """
88
    master_params = self.cfg.GetMasterNetworkParameters()
89
    ems = self.cfg.GetUseExternalMipScript()
90
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
91
                                                     master_params, ems)
92
    result.Raise("Could not deactivate the master IP")
93

    
94

    
95
class LUClusterConfigQuery(NoHooksLU):
96
  """Return configuration values.
97

98
  """
99
  REQ_BGL = False
100

    
101
  def CheckArguments(self):
102
    self.cq = ClusterQuery(None, self.op.output_fields, False)
103

    
104
  def ExpandNames(self):
105
    self.cq.ExpandNames(self)
106

    
107
  def DeclareLocks(self, level):
108
    self.cq.DeclareLocks(self, level)
109

    
110
  def Exec(self, feedback_fn):
111
    result = self.cq.OldStyleQuery(self)
112

    
113
    assert len(result) == 1
114

    
115
    return result[0]
116

    
117

    
118
class LUClusterDestroy(LogicalUnit):
119
  """Logical unit for destroying the cluster.
120

121
  """
122
  HPATH = "cluster-destroy"
123
  HTYPE = constants.HTYPE_CLUSTER
124

    
125
  def BuildHooksEnv(self):
126
    """Build hooks env.
127

128
    """
129
    return {
130
      "OP_TARGET": self.cfg.GetClusterName(),
131
      }
132

    
133
  def BuildHooksNodes(self):
134
    """Build hooks nodes.
135

136
    """
137
    return ([], [])
138

    
139
  def CheckPrereq(self):
140
    """Check prerequisites.
141

142
    This checks whether the cluster is empty.
143

144
    Any errors are signaled by raising errors.OpPrereqError.
145

146
    """
147
    master = self.cfg.GetMasterNode()
148

    
149
    nodelist = self.cfg.GetNodeList()
150
    if len(nodelist) != 1 or nodelist[0] != master:
151
      raise errors.OpPrereqError("There are still %d node(s) in"
152
                                 " this cluster." % (len(nodelist) - 1),
153
                                 errors.ECODE_INVAL)
154
    instancelist = self.cfg.GetInstanceList()
155
    if instancelist:
156
      raise errors.OpPrereqError("There are still %d instance(s) in"
157
                                 " this cluster." % len(instancelist),
158
                                 errors.ECODE_INVAL)
159

    
160
  def Exec(self, feedback_fn):
161
    """Destroys the cluster.
162

163
    """
164
    master_params = self.cfg.GetMasterNetworkParameters()
165

    
166
    # Run post hooks on master node before it's removed
167
    RunPostHook(self, self.cfg.GetNodeName(master_params.uuid))
168

    
169
    ems = self.cfg.GetUseExternalMipScript()
170
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
171
                                                     master_params, ems)
172
    result.Warn("Error disabling the master IP address", self.LogWarning)
173
    return master_params.uuid
174

    
175

    
176
class LUClusterPostInit(LogicalUnit):
177
  """Logical unit for running hooks after cluster initialization.
178

179
  """
180
  HPATH = "cluster-init"
181
  HTYPE = constants.HTYPE_CLUSTER
182

    
183
  def BuildHooksEnv(self):
184
    """Build hooks env.
185

186
    """
187
    return {
188
      "OP_TARGET": self.cfg.GetClusterName(),
189
      }
190

    
191
  def BuildHooksNodes(self):
192
    """Build hooks nodes.
193

194
    """
195
    return ([], [self.cfg.GetMasterNode()])
196

    
197
  def Exec(self, feedback_fn):
198
    """Nothing to do.
199

200
    """
201
    return True
202

    
203

    
204
class ClusterQuery(QueryBase):
205
  FIELDS = query.CLUSTER_FIELDS
206

    
207
  #: Do not sort (there is only one item)
208
  SORT_FIELD = None
209

    
210
  def ExpandNames(self, lu):
211
    lu.needed_locks = {}
212

    
213
    # The following variables interact with _QueryBase._GetNames
214
    self.wanted = locking.ALL_SET
215
    self.do_locking = self.use_locking
216

    
217
    if self.do_locking:
218
      raise errors.OpPrereqError("Can not use locking for cluster queries",
219
                                 errors.ECODE_INVAL)
220

    
221
  def DeclareLocks(self, lu, level):
222
    pass
223

    
224
  def _GetQueryData(self, lu):
225
    """Computes the list of nodes and their attributes.
226

227
    """
228
    # Locking is not used
229
    assert not (compat.any(lu.glm.is_owned(level)
230
                           for level in locking.LEVELS
231
                           if level != locking.LEVEL_CLUSTER) or
232
                self.do_locking or self.use_locking)
233

    
234
    if query.CQ_CONFIG in self.requested_data:
235
      cluster = lu.cfg.GetClusterInfo()
236
      nodes = lu.cfg.GetAllNodesInfo()
237
    else:
238
      cluster = NotImplemented
239
      nodes = NotImplemented
240

    
241
    if query.CQ_QUEUE_DRAINED in self.requested_data:
242
      drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
243
    else:
244
      drain_flag = NotImplemented
245

    
246
    if query.CQ_WATCHER_PAUSE in self.requested_data:
247
      master_node_uuid = lu.cfg.GetMasterNode()
248

    
249
      result = lu.rpc.call_get_watcher_pause(master_node_uuid)
250
      result.Raise("Can't retrieve watcher pause from master node '%s'" %
251
                   lu.cfg.GetMasterNodeName())
252

    
253
      watcher_pause = result.payload
254
    else:
255
      watcher_pause = NotImplemented
256

    
257
    return query.ClusterQueryData(cluster, nodes, drain_flag, watcher_pause)
258

    
259

    
260
class LUClusterQuery(NoHooksLU):
261
  """Query cluster configuration.
262

263
  """
264
  REQ_BGL = False
265

    
266
  def ExpandNames(self):
267
    self.needed_locks = {}
268

    
269
  def Exec(self, feedback_fn):
270
    """Return cluster config.
271

272
    """
273
    cluster = self.cfg.GetClusterInfo()
274
    os_hvp = {}
275

    
276
    # Filter just for enabled hypervisors
277
    for os_name, hv_dict in cluster.os_hvp.items():
278
      os_hvp[os_name] = {}
279
      for hv_name, hv_params in hv_dict.items():
280
        if hv_name in cluster.enabled_hypervisors:
281
          os_hvp[os_name][hv_name] = hv_params
282

    
283
    # Convert ip_family to ip_version
284
    primary_ip_version = constants.IP4_VERSION
285
    if cluster.primary_ip_family == netutils.IP6Address.family:
286
      primary_ip_version = constants.IP6_VERSION
287

    
288
    result = {
289
      "software_version": constants.RELEASE_VERSION,
290
      "protocol_version": constants.PROTOCOL_VERSION,
291
      "config_version": constants.CONFIG_VERSION,
292
      "os_api_version": max(constants.OS_API_VERSIONS),
293
      "export_version": constants.EXPORT_VERSION,
294
      "vcs_version": constants.VCS_VERSION,
295
      "architecture": runtime.GetArchInfo(),
296
      "name": cluster.cluster_name,
297
      "master": self.cfg.GetMasterNodeName(),
298
      "default_hypervisor": cluster.primary_hypervisor,
299
      "enabled_hypervisors": cluster.enabled_hypervisors,
300
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
301
                        for hypervisor_name in cluster.enabled_hypervisors]),
302
      "os_hvp": os_hvp,
303
      "beparams": cluster.beparams,
304
      "osparams": cluster.osparams,
305
      "ipolicy": cluster.ipolicy,
306
      "nicparams": cluster.nicparams,
307
      "ndparams": cluster.ndparams,
308
      "diskparams": cluster.diskparams,
309
      "candidate_pool_size": cluster.candidate_pool_size,
310
      "master_netdev": cluster.master_netdev,
311
      "master_netmask": cluster.master_netmask,
312
      "use_external_mip_script": cluster.use_external_mip_script,
313
      "volume_group_name": cluster.volume_group_name,
314
      "drbd_usermode_helper": cluster.drbd_usermode_helper,
315
      "file_storage_dir": cluster.file_storage_dir,
316
      "shared_file_storage_dir": cluster.shared_file_storage_dir,
317
      "maintain_node_health": cluster.maintain_node_health,
318
      "ctime": cluster.ctime,
319
      "mtime": cluster.mtime,
320
      "uuid": cluster.uuid,
321
      "tags": list(cluster.GetTags()),
322
      "uid_pool": cluster.uid_pool,
323
      "default_iallocator": cluster.default_iallocator,
324
      "reserved_lvs": cluster.reserved_lvs,
325
      "primary_ip_version": primary_ip_version,
326
      "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
327
      "hidden_os": cluster.hidden_os,
328
      "blacklisted_os": cluster.blacklisted_os,
329
      "enabled_disk_templates": cluster.enabled_disk_templates,
330
      }
331

    
332
    return result
333

    
334

    
335
class LUClusterRedistConf(NoHooksLU):
336
  """Force the redistribution of cluster configuration.
337

338
  This is a very simple LU.
339

340
  """
341
  REQ_BGL = False
342

    
343
  def ExpandNames(self):
344
    self.needed_locks = {
345
      locking.LEVEL_NODE: locking.ALL_SET,
346
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
347
    }
348
    self.share_locks = ShareAll()
349

    
350
  def Exec(self, feedback_fn):
351
    """Redistribute the configuration.
352

353
    """
354
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
355
    RedistributeAncillaryFiles(self)
356

    
357

    
358
class LUClusterRename(LogicalUnit):
359
  """Rename the cluster.
360

361
  """
362
  HPATH = "cluster-rename"
363
  HTYPE = constants.HTYPE_CLUSTER
364

    
365
  def BuildHooksEnv(self):
366
    """Build hooks env.
367

368
    """
369
    return {
370
      "OP_TARGET": self.cfg.GetClusterName(),
371
      "NEW_NAME": self.op.name,
372
      }
373

    
374
  def BuildHooksNodes(self):
375
    """Build hooks nodes.
376

377
    """
378
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
379

    
380
  def CheckPrereq(self):
381
    """Verify that the passed name is a valid one.
382

383
    """
384
    hostname = netutils.GetHostname(name=self.op.name,
385
                                    family=self.cfg.GetPrimaryIPFamily())
386

    
387
    new_name = hostname.name
388
    self.ip = new_ip = hostname.ip
389
    old_name = self.cfg.GetClusterName()
390
    old_ip = self.cfg.GetMasterIP()
391
    if new_name == old_name and new_ip == old_ip:
392
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
393
                                 " cluster has changed",
394
                                 errors.ECODE_INVAL)
395
    if new_ip != old_ip:
396
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
397
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
398
                                   " reachable on the network" %
399
                                   new_ip, errors.ECODE_NOTUNIQUE)
400

    
401
    self.op.name = new_name
402

    
403
  def Exec(self, feedback_fn):
404
    """Rename the cluster.
405

406
    """
407
    clustername = self.op.name
408
    new_ip = self.ip
409

    
410
    # shutdown the master IP
411
    master_params = self.cfg.GetMasterNetworkParameters()
412
    ems = self.cfg.GetUseExternalMipScript()
413
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
414
                                                     master_params, ems)
415
    result.Raise("Could not disable the master role")
416

    
417
    try:
418
      cluster = self.cfg.GetClusterInfo()
419
      cluster.cluster_name = clustername
420
      cluster.master_ip = new_ip
421
      self.cfg.Update(cluster, feedback_fn)
422

    
423
      # update the known hosts file
424
      ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
425
      node_list = self.cfg.GetOnlineNodeList()
426
      try:
427
        node_list.remove(master_params.uuid)
428
      except ValueError:
429
        pass
430
      UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
431
    finally:
432
      master_params.ip = new_ip
433
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
434
                                                     master_params, ems)
435
      result.Warn("Could not re-enable the master role on the master,"
436
                  " please restart manually", self.LogWarning)
437

    
438
    return clustername
439

    
440

    
441
class LUClusterRepairDiskSizes(NoHooksLU):
442
  """Verifies the cluster disks sizes.
443

444
  """
445
  REQ_BGL = False
446

    
447
  def ExpandNames(self):
448
    if self.op.instances:
449
      (_, self.wanted_names) = GetWantedInstances(self, self.op.instances)
450
      # Not getting the node allocation lock as only a specific set of
451
      # instances (and their nodes) is going to be acquired
452
      self.needed_locks = {
453
        locking.LEVEL_NODE_RES: [],
454
        locking.LEVEL_INSTANCE: self.wanted_names,
455
        }
456
      self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
457
    else:
458
      self.wanted_names = None
459
      self.needed_locks = {
460
        locking.LEVEL_NODE_RES: locking.ALL_SET,
461
        locking.LEVEL_INSTANCE: locking.ALL_SET,
462

    
463
        # This opcode is acquires the node locks for all instances
464
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
465
        }
466

    
467
    self.share_locks = {
468
      locking.LEVEL_NODE_RES: 1,
469
      locking.LEVEL_INSTANCE: 0,
470
      locking.LEVEL_NODE_ALLOC: 1,
471
      }
472

    
473
  def DeclareLocks(self, level):
474
    if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
475
      self._LockInstancesNodes(primary_only=True, level=level)
476

    
477
  def CheckPrereq(self):
478
    """Check prerequisites.
479

480
    This only checks the optional instance list against the existing names.
481

482
    """
483
    if self.wanted_names is None:
484
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
485

    
486
    self.wanted_instances = \
487
        map(compat.snd, self.cfg.GetMultiInstanceInfoByName(self.wanted_names))
488

    
489
  def _EnsureChildSizes(self, disk):
490
    """Ensure children of the disk have the needed disk size.
491

492
    This is valid mainly for DRBD8 and fixes an issue where the
493
    children have smaller disk size.
494

495
    @param disk: an L{ganeti.objects.Disk} object
496

497
    """
498
    if disk.dev_type == constants.LD_DRBD8:
499
      assert disk.children, "Empty children for DRBD8?"
500
      fchild = disk.children[0]
501
      mismatch = fchild.size < disk.size
502
      if mismatch:
503
        self.LogInfo("Child disk has size %d, parent %d, fixing",
504
                     fchild.size, disk.size)
505
        fchild.size = disk.size
506

    
507
      # and we recurse on this child only, not on the metadev
508
      return self._EnsureChildSizes(fchild) or mismatch
509
    else:
510
      return False
511

    
512
  def Exec(self, feedback_fn):
513
    """Verify the size of cluster disks.
514

515
    """
516
    # TODO: check child disks too
517
    # TODO: check differences in size between primary/secondary nodes
518
    per_node_disks = {}
519
    for instance in self.wanted_instances:
520
      pnode = instance.primary_node
521
      if pnode not in per_node_disks:
522
        per_node_disks[pnode] = []
523
      for idx, disk in enumerate(instance.disks):
524
        per_node_disks[pnode].append((instance, idx, disk))
525

    
526
    assert not (frozenset(per_node_disks.keys()) -
527
                self.owned_locks(locking.LEVEL_NODE_RES)), \
528
      "Not owning correct locks"
529
    assert not self.owned_locks(locking.LEVEL_NODE)
530

    
531
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
532
                                               per_node_disks.keys())
533

    
534
    changed = []
535
    for node_uuid, dskl in per_node_disks.items():
536
      newl = [v[2].Copy() for v in dskl]
537
      for dsk in newl:
538
        self.cfg.SetDiskID(dsk, node_uuid)
539
      node_name = self.cfg.GetNodeName(node_uuid)
540
      result = self.rpc.call_blockdev_getdimensions(node_uuid, newl)
541
      if result.fail_msg:
542
        self.LogWarning("Failure in blockdev_getdimensions call to node"
543
                        " %s, ignoring", node_name)
544
        continue
545
      if len(result.payload) != len(dskl):
546
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
547
                        " result.payload=%s", node_name, len(dskl),
548
                        result.payload)
549
        self.LogWarning("Invalid result from node %s, ignoring node results",
550
                        node_name)
551
        continue
552
      for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
553
        if dimensions is None:
554
          self.LogWarning("Disk %d of instance %s did not return size"
555
                          " information, ignoring", idx, instance.name)
556
          continue
557
        if not isinstance(dimensions, (tuple, list)):
558
          self.LogWarning("Disk %d of instance %s did not return valid"
559
                          " dimension information, ignoring", idx,
560
                          instance.name)
561
          continue
562
        (size, spindles) = dimensions
563
        if not isinstance(size, (int, long)):
564
          self.LogWarning("Disk %d of instance %s did not return valid"
565
                          " size information, ignoring", idx, instance.name)
566
          continue
567
        size = size >> 20
568
        if size != disk.size:
569
          self.LogInfo("Disk %d of instance %s has mismatched size,"
570
                       " correcting: recorded %d, actual %d", idx,
571
                       instance.name, disk.size, size)
572
          disk.size = size
573
          self.cfg.Update(instance, feedback_fn)
574
          changed.append((instance.name, idx, "size", size))
575
        if es_flags[node_uuid]:
576
          if spindles is None:
577
            self.LogWarning("Disk %d of instance %s did not return valid"
578
                            " spindles information, ignoring", idx,
579
                            instance.name)
580
          elif disk.spindles is None or disk.spindles != spindles:
581
            self.LogInfo("Disk %d of instance %s has mismatched spindles,"
582
                         " correcting: recorded %s, actual %s",
583
                         idx, instance.name, disk.spindles, spindles)
584
            disk.spindles = spindles
585
            self.cfg.Update(instance, feedback_fn)
586
            changed.append((instance.name, idx, "spindles", disk.spindles))
587
        if self._EnsureChildSizes(disk):
588
          self.cfg.Update(instance, feedback_fn)
589
          changed.append((instance.name, idx, "size", disk.size))
590
    return changed
591

    
592

    
593
def _ValidateNetmask(cfg, netmask):
594
  """Checks if a netmask is valid.
595

596
  @type cfg: L{config.ConfigWriter}
597
  @param cfg: The cluster configuration
598
  @type netmask: int
599
  @param netmask: the netmask to be verified
600
  @raise errors.OpPrereqError: if the validation fails
601

602
  """
603
  ip_family = cfg.GetPrimaryIPFamily()
604
  try:
605
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
606
  except errors.ProgrammerError:
607
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
608
                               ip_family, errors.ECODE_INVAL)
609
  if not ipcls.ValidateNetmask(netmask):
610
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
611
                               (netmask), errors.ECODE_INVAL)
612

    
613

    
614
def CheckFileBasedStoragePathVsEnabledDiskTemplates(
615
    logging_warn_fn, file_storage_dir, enabled_disk_templates,
616
    file_disk_template):
617
  """Checks whether the given file-based storage directory is acceptable.
618

619
  Note: This function is public, because it is also used in bootstrap.py.
620

621
  @type logging_warn_fn: function
622
  @param logging_warn_fn: function which accepts a string and logs it
623
  @type file_storage_dir: string
624
  @param file_storage_dir: the directory to be used for file-based instances
625
  @type enabled_disk_templates: list of string
626
  @param enabled_disk_templates: the list of enabled disk templates
627
  @type file_disk_template: string
628
  @param file_disk_template: the file-based disk template for which the
629
      path should be checked
630

631
  """
632
  assert (file_disk_template in
633
          utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
634
  file_storage_enabled = file_disk_template in enabled_disk_templates
635
  if file_storage_dir is not None:
636
    if file_storage_dir == "":
637
      if file_storage_enabled:
638
        raise errors.OpPrereqError(
639
            "Unsetting the '%s' storage directory while having '%s' storage"
640
            " enabled is not permitted." %
641
            (file_disk_template, file_disk_template))
642
    else:
643
      if not file_storage_enabled:
644
        logging_warn_fn(
645
            "Specified a %s storage directory, although %s storage is not"
646
            " enabled." % (file_disk_template, file_disk_template))
647
  else:
648
    raise errors.ProgrammerError("Received %s storage dir with value"
649
                                 " 'None'." % file_disk_template)
650

    
651

    
652
def CheckFileStoragePathVsEnabledDiskTemplates(
653
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
654
  """Checks whether the given file storage directory is acceptable.
655

656
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
657

658
  """
659
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
660
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
661
      constants.DT_FILE)
662

    
663

    
664
def CheckSharedFileStoragePathVsEnabledDiskTemplates(
665
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
666
  """Checks whether the given shared file storage directory is acceptable.
667

668
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
669

670
  """
671
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
672
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
673
      constants.DT_SHARED_FILE)
674

    
675

    
676
class LUClusterSetParams(LogicalUnit):
677
  """Change the parameters of the cluster.
678

679
  """
680
  HPATH = "cluster-modify"
681
  HTYPE = constants.HTYPE_CLUSTER
682
  REQ_BGL = False
683

    
684
  def CheckArguments(self):
685
    """Check parameters
686

687
    """
688
    if self.op.uid_pool:
689
      uidpool.CheckUidPool(self.op.uid_pool)
690

    
691
    if self.op.add_uids:
692
      uidpool.CheckUidPool(self.op.add_uids)
693

    
694
    if self.op.remove_uids:
695
      uidpool.CheckUidPool(self.op.remove_uids)
696

    
697
    if self.op.master_netmask is not None:
698
      _ValidateNetmask(self.cfg, self.op.master_netmask)
699

    
700
    if self.op.diskparams:
701
      for dt_params in self.op.diskparams.values():
702
        utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
703
      try:
704
        utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
705
      except errors.OpPrereqError, err:
706
        raise errors.OpPrereqError("While verify diskparams options: %s" % err,
707
                                   errors.ECODE_INVAL)
708

    
709
  def ExpandNames(self):
710
    # FIXME: in the future maybe other cluster params won't require checking on
711
    # all nodes to be modified.
712
    # FIXME: This opcode changes cluster-wide settings. Is acquiring all
713
    # resource locks the right thing, shouldn't it be the BGL instead?
714
    self.needed_locks = {
715
      locking.LEVEL_NODE: locking.ALL_SET,
716
      locking.LEVEL_INSTANCE: locking.ALL_SET,
717
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
718
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
719
    }
720
    self.share_locks = ShareAll()
721

    
722
  def BuildHooksEnv(self):
723
    """Build hooks env.
724

725
    """
726
    return {
727
      "OP_TARGET": self.cfg.GetClusterName(),
728
      "NEW_VG_NAME": self.op.vg_name,
729
      }
730

    
731
  def BuildHooksNodes(self):
732
    """Build hooks nodes.
733

734
    """
735
    mn = self.cfg.GetMasterNode()
736
    return ([mn], [mn])
737

    
738
  def _CheckVgName(self, node_uuids, enabled_disk_templates,
739
                   new_enabled_disk_templates):
740
    """Check the consistency of the vg name on all nodes and in case it gets
741
       unset whether there are instances still using it.
742

743
    """
744
    lvm_is_enabled = utils.IsLvmEnabled(enabled_disk_templates)
745
    lvm_gets_enabled = utils.LvmGetsEnabled(enabled_disk_templates,
746
                                            new_enabled_disk_templates)
747
    current_vg_name = self.cfg.GetVGName()
748

    
749
    if self.op.vg_name == '':
750
      if lvm_is_enabled:
751
        raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
752
                                   " disk templates are or get enabled.")
753

    
754
    if self.op.vg_name is None:
755
      if current_vg_name is None and lvm_is_enabled:
756
        raise errors.OpPrereqError("Please specify a volume group when"
757
                                   " enabling lvm-based disk-templates.")
758

    
759
    if self.op.vg_name is not None and not self.op.vg_name:
760
      if self.cfg.HasAnyDiskOfType(constants.LD_LV):
761
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
762
                                   " instances exist", errors.ECODE_INVAL)
763

    
764
    if (self.op.vg_name is not None and lvm_is_enabled) or \
765
        (self.cfg.GetVGName() is not None and lvm_gets_enabled):
766
      self._CheckVgNameOnNodes(node_uuids)
767

    
768
  def _CheckVgNameOnNodes(self, node_uuids):
769
    """Check the status of the volume group on each node.
770

771
    """
772
    vglist = self.rpc.call_vg_list(node_uuids)
773
    for node_uuid in node_uuids:
774
      msg = vglist[node_uuid].fail_msg
775
      if msg:
776
        # ignoring down node
777
        self.LogWarning("Error while gathering data on node %s"
778
                        " (ignoring node): %s",
779
                        self.cfg.GetNodeName(node_uuid), msg)
780
        continue
781
      vgstatus = utils.CheckVolumeGroupSize(vglist[node_uuid].payload,
782
                                            self.op.vg_name,
783
                                            constants.MIN_VG_SIZE)
784
      if vgstatus:
785
        raise errors.OpPrereqError("Error on node '%s': %s" %
786
                                   (self.cfg.GetNodeName(node_uuid), vgstatus),
787
                                   errors.ECODE_ENVIRON)
788

    
789
  @staticmethod
790
  def _GetEnabledDiskTemplatesInner(op_enabled_disk_templates,
791
                                    old_enabled_disk_templates):
792
    """Determines the enabled disk templates and the subset of disk templates
793
       that are newly enabled by this operation.
794

795
    """
796
    enabled_disk_templates = None
797
    new_enabled_disk_templates = []
798
    if op_enabled_disk_templates:
799
      enabled_disk_templates = op_enabled_disk_templates
800
      new_enabled_disk_templates = \
801
        list(set(enabled_disk_templates)
802
             - set(old_enabled_disk_templates))
803
    else:
804
      enabled_disk_templates = old_enabled_disk_templates
805
    return (enabled_disk_templates, new_enabled_disk_templates)
806

    
807
  def _GetEnabledDiskTemplates(self, cluster):
808
    """Determines the enabled disk templates and the subset of disk templates
809
       that are newly enabled by this operation.
810

811
    """
812
    return self._GetEnabledDiskTemplatesInner(self.op.enabled_disk_templates,
813
                                              cluster.enabled_disk_templates)
814

    
815
  def _CheckIpolicy(self, cluster, enabled_disk_templates):
816
    """Checks the ipolicy.
817

818
    @type cluster: C{objects.Cluster}
819
    @param cluster: the cluster's configuration
820
    @type enabled_disk_templates: list of string
821
    @param enabled_disk_templates: list of (possibly newly) enabled disk
822
      templates
823

824
    """
825
    # FIXME: write unit tests for this
826
    if self.op.ipolicy:
827
      self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
828
                                           group_policy=False)
829

    
830
      CheckIpolicyVsDiskTemplates(self.new_ipolicy,
831
                                  enabled_disk_templates)
832

    
833
      all_instances = self.cfg.GetAllInstancesInfo().values()
834
      violations = set()
835
      for group in self.cfg.GetAllNodeGroupsInfo().values():
836
        instances = frozenset([inst for inst in all_instances
837
                               if compat.any(nuuid in group.members
838
                                             for nuuid in inst.all_nodes)])
839
        new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
840
        ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
841
        new = ComputeNewInstanceViolations(ipol, new_ipolicy, instances,
842
                                           self.cfg)
843
        if new:
844
          violations.update(new)
845

    
846
      if violations:
847
        self.LogWarning("After the ipolicy change the following instances"
848
                        " violate them: %s",
849
                        utils.CommaJoin(utils.NiceSort(violations)))
850
    else:
851
      CheckIpolicyVsDiskTemplates(cluster.ipolicy,
852
                                  enabled_disk_templates)
853

    
854
  def CheckPrereq(self):
855
    """Check prerequisites.
856

857
    This checks whether the given params don't conflict and
858
    if the given volume group is valid.
859

860
    """
861
    if self.op.drbd_helper is not None and not self.op.drbd_helper:
862
      if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
863
        raise errors.OpPrereqError("Cannot disable drbd helper while"
864
                                   " drbd-based instances exist",
865
                                   errors.ECODE_INVAL)
866

    
867
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
868
    self.cluster = cluster = self.cfg.GetClusterInfo()
869

    
870
    vm_capable_node_uuids = [node.uuid
871
                             for node in self.cfg.GetAllNodesInfo().values()
872
                             if node.uuid in node_uuids and node.vm_capable]
873

    
874
    (enabled_disk_templates, new_enabled_disk_templates) = \
875
      self._GetEnabledDiskTemplates(cluster)
876

    
877
    self._CheckVgName(vm_capable_node_uuids, enabled_disk_templates,
878
                      new_enabled_disk_templates)
879

    
880
    if self.op.file_storage_dir is not None:
881
      CheckFileStoragePathVsEnabledDiskTemplates(
882
          self.LogWarning, self.op.file_storage_dir, enabled_disk_templates)
883

    
884
    if self.op.shared_file_storage_dir is not None:
885
      CheckSharedFileStoragePathVsEnabledDiskTemplates(
886
          self.LogWarning, self.op.shared_file_storage_dir,
887
          enabled_disk_templates)
888

    
889
    if self.op.drbd_helper:
890
      # checks given drbd helper on all nodes
891
      helpers = self.rpc.call_drbd_helper(node_uuids)
892
      for (_, ninfo) in self.cfg.GetMultiNodeInfo(node_uuids):
893
        if ninfo.offline:
894
          self.LogInfo("Not checking drbd helper on offline node %s",
895
                       ninfo.name)
896
          continue
897
        msg = helpers[ninfo.uuid].fail_msg
898
        if msg:
899
          raise errors.OpPrereqError("Error checking drbd helper on node"
900
                                     " '%s': %s" % (ninfo.name, msg),
901
                                     errors.ECODE_ENVIRON)
902
        node_helper = helpers[ninfo.uuid].payload
903
        if node_helper != self.op.drbd_helper:
904
          raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
905
                                     (ninfo.name, node_helper),
906
                                     errors.ECODE_ENVIRON)
907

    
908
    # validate params changes
909
    if self.op.beparams:
910
      objects.UpgradeBeParams(self.op.beparams)
911
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
912
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
913

    
914
    if self.op.ndparams:
915
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
916
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
917

    
918
      # TODO: we need a more general way to handle resetting
919
      # cluster-level parameters to default values
920
      if self.new_ndparams["oob_program"] == "":
921
        self.new_ndparams["oob_program"] = \
922
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
923

    
924
    if self.op.hv_state:
925
      new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
926
                                           self.cluster.hv_state_static)
927
      self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
928
                               for hv, values in new_hv_state.items())
929

    
930
    if self.op.disk_state:
931
      new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
932
                                               self.cluster.disk_state_static)
933
      self.new_disk_state = \
934
        dict((storage, dict((name, cluster.SimpleFillDiskState(values))
935
                            for name, values in svalues.items()))
936
             for storage, svalues in new_disk_state.items())
937

    
938
    self._CheckIpolicy(cluster, enabled_disk_templates)
939

    
940
    if self.op.nicparams:
941
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
942
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
943
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
944
      nic_errors = []
945

    
946
      # check all instances for consistency
947
      for instance in self.cfg.GetAllInstancesInfo().values():
948
        for nic_idx, nic in enumerate(instance.nics):
949
          params_copy = copy.deepcopy(nic.nicparams)
950
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
951

    
952
          # check parameter syntax
953
          try:
954
            objects.NIC.CheckParameterSyntax(params_filled)
955
          except errors.ConfigurationError, err:
956
            nic_errors.append("Instance %s, nic/%d: %s" %
957
                              (instance.name, nic_idx, err))
958

    
959
          # if we're moving instances to routed, check that they have an ip
960
          target_mode = params_filled[constants.NIC_MODE]
961
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
962
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
963
                              " address" % (instance.name, nic_idx))
964
      if nic_errors:
965
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
966
                                   "\n".join(nic_errors), errors.ECODE_INVAL)
967

    
968
    # hypervisor list/parameters
969
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
970
    if self.op.hvparams:
971
      for hv_name, hv_dict in self.op.hvparams.items():
972
        if hv_name not in self.new_hvparams:
973
          self.new_hvparams[hv_name] = hv_dict
974
        else:
975
          self.new_hvparams[hv_name].update(hv_dict)
976

    
977
    # disk template parameters
978
    self.new_diskparams = objects.FillDict(cluster.diskparams, {})
979
    if self.op.diskparams:
980
      for dt_name, dt_params in self.op.diskparams.items():
981
        if dt_name not in self.new_diskparams:
982
          self.new_diskparams[dt_name] = dt_params
983
        else:
984
          self.new_diskparams[dt_name].update(dt_params)
985

    
986
    # os hypervisor parameters
987
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
988
    if self.op.os_hvp:
989
      for os_name, hvs in self.op.os_hvp.items():
990
        if os_name not in self.new_os_hvp:
991
          self.new_os_hvp[os_name] = hvs
992
        else:
993
          for hv_name, hv_dict in hvs.items():
994
            if hv_dict is None:
995
              # Delete if it exists
996
              self.new_os_hvp[os_name].pop(hv_name, None)
997
            elif hv_name not in self.new_os_hvp[os_name]:
998
              self.new_os_hvp[os_name][hv_name] = hv_dict
999
            else:
1000
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
1001

    
1002
    # os parameters
1003
    self.new_osp = objects.FillDict(cluster.osparams, {})
1004
    if self.op.osparams:
1005
      for os_name, osp in self.op.osparams.items():
1006
        if os_name not in self.new_osp:
1007
          self.new_osp[os_name] = {}
1008

    
1009
        self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
1010
                                                 use_none=True)
1011

    
1012
        if not self.new_osp[os_name]:
1013
          # we removed all parameters
1014
          del self.new_osp[os_name]
1015
        else:
1016
          # check the parameter validity (remote check)
1017
          CheckOSParams(self, False, [self.cfg.GetMasterNode()],
1018
                        os_name, self.new_osp[os_name])
1019

    
1020
    # changes to the hypervisor list
1021
    if self.op.enabled_hypervisors is not None:
1022
      self.hv_list = self.op.enabled_hypervisors
1023
      for hv in self.hv_list:
1024
        # if the hypervisor doesn't already exist in the cluster
1025
        # hvparams, we initialize it to empty, and then (in both
1026
        # cases) we make sure to fill the defaults, as we might not
1027
        # have a complete defaults list if the hypervisor wasn't
1028
        # enabled before
1029
        if hv not in new_hvp:
1030
          new_hvp[hv] = {}
1031
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
1032
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
1033
    else:
1034
      self.hv_list = cluster.enabled_hypervisors
1035

    
1036
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1037
      # either the enabled list has changed, or the parameters have, validate
1038
      for hv_name, hv_params in self.new_hvparams.items():
1039
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1040
            (self.op.enabled_hypervisors and
1041
             hv_name in self.op.enabled_hypervisors)):
1042
          # either this is a new hypervisor, or its parameters have changed
1043
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1044
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1045
          hv_class.CheckParameterSyntax(hv_params)
1046
          CheckHVParams(self, node_uuids, hv_name, hv_params)
1047

    
1048
    self._CheckDiskTemplateConsistency()
1049

    
1050
    if self.op.os_hvp:
1051
      # no need to check any newly-enabled hypervisors, since the
1052
      # defaults have already been checked in the above code-block
1053
      for os_name, os_hvp in self.new_os_hvp.items():
1054
        for hv_name, hv_params in os_hvp.items():
1055
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1056
          # we need to fill in the new os_hvp on top of the actual hv_p
1057
          cluster_defaults = self.new_hvparams.get(hv_name, {})
1058
          new_osp = objects.FillDict(cluster_defaults, hv_params)
1059
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1060
          hv_class.CheckParameterSyntax(new_osp)
1061
          CheckHVParams(self, node_uuids, hv_name, new_osp)
1062

    
1063
    if self.op.default_iallocator:
1064
      alloc_script = utils.FindFile(self.op.default_iallocator,
1065
                                    constants.IALLOCATOR_SEARCH_PATH,
1066
                                    os.path.isfile)
1067
      if alloc_script is None:
1068
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
1069
                                   " specified" % self.op.default_iallocator,
1070
                                   errors.ECODE_INVAL)
1071

    
1072
  def _CheckDiskTemplateConsistency(self):
1073
    """Check whether the disk templates that are going to be disabled
1074
       are still in use by some instances.
1075

1076
    """
1077
    if self.op.enabled_disk_templates:
1078
      cluster = self.cfg.GetClusterInfo()
1079
      instances = self.cfg.GetAllInstancesInfo()
1080

    
1081
      disk_templates_to_remove = set(cluster.enabled_disk_templates) \
1082
        - set(self.op.enabled_disk_templates)
1083
      for instance in instances.itervalues():
1084
        if instance.disk_template in disk_templates_to_remove:
1085
          raise errors.OpPrereqError("Cannot disable disk template '%s',"
1086
                                     " because instance '%s' is using it." %
1087
                                     (instance.disk_template, instance.name))
1088

    
1089
  def _SetVgName(self, feedback_fn):
1090
    """Determines and sets the new volume group name.
1091

1092
    """
1093
    if self.op.vg_name is not None:
1094
      new_volume = self.op.vg_name
1095
      if not new_volume:
1096
        new_volume = None
1097
      if new_volume != self.cfg.GetVGName():
1098
        self.cfg.SetVGName(new_volume)
1099
      else:
1100
        feedback_fn("Cluster LVM configuration already in desired"
1101
                    " state, not changing")
1102

    
1103
  def _SetFileStorageDir(self, feedback_fn):
1104
    """Set the file storage directory.
1105

1106
    """
1107
    if self.op.file_storage_dir is not None:
1108
      if self.cluster.file_storage_dir == self.op.file_storage_dir:
1109
        feedback_fn("Global file storage dir already set to value '%s'"
1110
                    % self.cluster.file_storage_dir)
1111
      else:
1112
        self.cluster.file_storage_dir = self.op.file_storage_dir
1113

    
1114
  def Exec(self, feedback_fn):
1115
    """Change the parameters of the cluster.
1116

1117
    """
1118
    if self.op.enabled_disk_templates:
1119
      self.cluster.enabled_disk_templates = \
1120
        list(set(self.op.enabled_disk_templates))
1121

    
1122
    self._SetVgName(feedback_fn)
1123
    self._SetFileStorageDir(feedback_fn)
1124

    
1125
    if self.op.drbd_helper is not None:
1126
      if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
1127
        feedback_fn("Note that you specified a drbd user helper, but did"
1128
                    " enabled the drbd disk template.")
1129
      new_helper = self.op.drbd_helper
1130
      if not new_helper:
1131
        new_helper = None
1132
      if new_helper != self.cfg.GetDRBDHelper():
1133
        self.cfg.SetDRBDHelper(new_helper)
1134
      else:
1135
        feedback_fn("Cluster DRBD helper already in desired state,"
1136
                    " not changing")
1137
    if self.op.hvparams:
1138
      self.cluster.hvparams = self.new_hvparams
1139
    if self.op.os_hvp:
1140
      self.cluster.os_hvp = self.new_os_hvp
1141
    if self.op.enabled_hypervisors is not None:
1142
      self.cluster.hvparams = self.new_hvparams
1143
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1144
    if self.op.beparams:
1145
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1146
    if self.op.nicparams:
1147
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1148
    if self.op.ipolicy:
1149
      self.cluster.ipolicy = self.new_ipolicy
1150
    if self.op.osparams:
1151
      self.cluster.osparams = self.new_osp
1152
    if self.op.ndparams:
1153
      self.cluster.ndparams = self.new_ndparams
1154
    if self.op.diskparams:
1155
      self.cluster.diskparams = self.new_diskparams
1156
    if self.op.hv_state:
1157
      self.cluster.hv_state_static = self.new_hv_state
1158
    if self.op.disk_state:
1159
      self.cluster.disk_state_static = self.new_disk_state
1160

    
1161
    if self.op.candidate_pool_size is not None:
1162
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1163
      # we need to update the pool size here, otherwise the save will fail
1164
      AdjustCandidatePool(self, [])
1165

    
1166
    if self.op.maintain_node_health is not None:
1167
      if self.op.maintain_node_health and not constants.ENABLE_CONFD:
1168
        feedback_fn("Note: CONFD was disabled at build time, node health"
1169
                    " maintenance is not useful (still enabling it)")
1170
      self.cluster.maintain_node_health = self.op.maintain_node_health
1171

    
1172
    if self.op.modify_etc_hosts is not None:
1173
      self.cluster.modify_etc_hosts = self.op.modify_etc_hosts
1174

    
1175
    if self.op.prealloc_wipe_disks is not None:
1176
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
1177

    
1178
    if self.op.add_uids is not None:
1179
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
1180

    
1181
    if self.op.remove_uids is not None:
1182
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
1183

    
1184
    if self.op.uid_pool is not None:
1185
      self.cluster.uid_pool = self.op.uid_pool
1186

    
1187
    if self.op.default_iallocator is not None:
1188
      self.cluster.default_iallocator = self.op.default_iallocator
1189

    
1190
    if self.op.reserved_lvs is not None:
1191
      self.cluster.reserved_lvs = self.op.reserved_lvs
1192

    
1193
    if self.op.use_external_mip_script is not None:
1194
      self.cluster.use_external_mip_script = self.op.use_external_mip_script
1195

    
1196
    def helper_os(aname, mods, desc):
1197
      desc += " OS list"
1198
      lst = getattr(self.cluster, aname)
1199
      for key, val in mods:
1200
        if key == constants.DDM_ADD:
1201
          if val in lst:
1202
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1203
          else:
1204
            lst.append(val)
1205
        elif key == constants.DDM_REMOVE:
1206
          if val in lst:
1207
            lst.remove(val)
1208
          else:
1209
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1210
        else:
1211
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
1212

    
1213
    if self.op.hidden_os:
1214
      helper_os("hidden_os", self.op.hidden_os, "hidden")
1215

    
1216
    if self.op.blacklisted_os:
1217
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1218

    
1219
    if self.op.master_netdev:
1220
      master_params = self.cfg.GetMasterNetworkParameters()
1221
      ems = self.cfg.GetUseExternalMipScript()
1222
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
1223
                  self.cluster.master_netdev)
1224
      result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
1225
                                                       master_params, ems)
1226
      if not self.op.force:
1227
        result.Raise("Could not disable the master ip")
1228
      else:
1229
        if result.fail_msg:
1230
          msg = ("Could not disable the master ip (continuing anyway): %s" %
1231
                 result.fail_msg)
1232
          feedback_fn(msg)
1233
      feedback_fn("Changing master_netdev from %s to %s" %
1234
                  (master_params.netdev, self.op.master_netdev))
1235
      self.cluster.master_netdev = self.op.master_netdev
1236

    
1237
    if self.op.master_netmask:
1238
      master_params = self.cfg.GetMasterNetworkParameters()
1239
      feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1240
      result = self.rpc.call_node_change_master_netmask(
1241
                 master_params.uuid, master_params.netmask,
1242
                 self.op.master_netmask, master_params.ip,
1243
                 master_params.netdev)
1244
      result.Warn("Could not change the master IP netmask", feedback_fn)
1245
      self.cluster.master_netmask = self.op.master_netmask
1246

    
1247
    self.cfg.Update(self.cluster, feedback_fn)
1248

    
1249
    if self.op.master_netdev:
1250
      master_params = self.cfg.GetMasterNetworkParameters()
1251
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
1252
                  self.op.master_netdev)
1253
      ems = self.cfg.GetUseExternalMipScript()
1254
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
1255
                                                     master_params, ems)
1256
      result.Warn("Could not re-enable the master ip on the master,"
1257
                  " please restart manually", self.LogWarning)
1258

    
1259

    
1260
class LUClusterVerify(NoHooksLU):
1261
  """Submits all jobs necessary to verify the cluster.
1262

1263
  """
1264
  REQ_BGL = False
1265

    
1266
  def ExpandNames(self):
1267
    self.needed_locks = {}
1268

    
1269
  def Exec(self, feedback_fn):
1270
    jobs = []
1271

    
1272
    if self.op.group_name:
1273
      groups = [self.op.group_name]
1274
      depends_fn = lambda: None
1275
    else:
1276
      groups = self.cfg.GetNodeGroupList()
1277

    
1278
      # Verify global configuration
1279
      jobs.append([
1280
        opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1281
        ])
1282

    
1283
      # Always depend on global verification
1284
      depends_fn = lambda: [(-len(jobs), [])]
1285

    
1286
    jobs.extend(
1287
      [opcodes.OpClusterVerifyGroup(group_name=group,
1288
                                    ignore_errors=self.op.ignore_errors,
1289
                                    depends=depends_fn())]
1290
      for group in groups)
1291

    
1292
    # Fix up all parameters
1293
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1294
      op.debug_simulate_errors = self.op.debug_simulate_errors
1295
      op.verbose = self.op.verbose
1296
      op.error_codes = self.op.error_codes
1297
      try:
1298
        op.skip_checks = self.op.skip_checks
1299
      except AttributeError:
1300
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1301

    
1302
    return ResultWithJobs(jobs)
1303

    
1304

    
1305
class _VerifyErrors(object):
1306
  """Mix-in for cluster/group verify LUs.
1307

1308
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1309
  self.op and self._feedback_fn to be available.)
1310

1311
  """
1312

    
1313
  ETYPE_FIELD = "code"
1314
  ETYPE_ERROR = "ERROR"
1315
  ETYPE_WARNING = "WARNING"
1316

    
1317
  def _Error(self, ecode, item, msg, *args, **kwargs):
1318
    """Format an error message.
1319

1320
    Based on the opcode's error_codes parameter, either format a
1321
    parseable error code, or a simpler error string.
1322

1323
    This must be called only from Exec and functions called from Exec.
1324

1325
    """
1326
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1327
    itype, etxt, _ = ecode
1328
    # If the error code is in the list of ignored errors, demote the error to a
1329
    # warning
1330
    if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1331
      ltype = self.ETYPE_WARNING
1332
    # first complete the msg
1333
    if args:
1334
      msg = msg % args
1335
    # then format the whole message
1336
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1337
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1338
    else:
1339
      if item:
1340
        item = " " + item
1341
      else:
1342
        item = ""
1343
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1344
    # and finally report it via the feedback_fn
1345
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1346
    # do not mark the operation as failed for WARN cases only
1347
    if ltype == self.ETYPE_ERROR:
1348
      self.bad = True
1349

    
1350
  def _ErrorIf(self, cond, *args, **kwargs):
1351
    """Log an error message if the passed condition is True.
1352

1353
    """
1354
    if (bool(cond)
1355
        or self.op.debug_simulate_errors): # pylint: disable=E1101
1356
      self._Error(*args, **kwargs)
1357

    
1358

    
1359
def _VerifyCertificate(filename):
1360
  """Verifies a certificate for L{LUClusterVerifyConfig}.
1361

1362
  @type filename: string
1363
  @param filename: Path to PEM file
1364

1365
  """
1366
  try:
1367
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1368
                                           utils.ReadFile(filename))
1369
  except Exception, err: # pylint: disable=W0703
1370
    return (LUClusterVerifyConfig.ETYPE_ERROR,
1371
            "Failed to load X509 certificate %s: %s" % (filename, err))
1372

    
1373
  (errcode, msg) = \
1374
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1375
                                constants.SSL_CERT_EXPIRATION_ERROR)
1376

    
1377
  if msg:
1378
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1379
  else:
1380
    fnamemsg = None
1381

    
1382
  if errcode is None:
1383
    return (None, fnamemsg)
1384
  elif errcode == utils.CERT_WARNING:
1385
    return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1386
  elif errcode == utils.CERT_ERROR:
1387
    return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1388

    
1389
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1390

    
1391

    
1392
def _GetAllHypervisorParameters(cluster, instances):
1393
  """Compute the set of all hypervisor parameters.
1394

1395
  @type cluster: L{objects.Cluster}
1396
  @param cluster: the cluster object
1397
  @param instances: list of L{objects.Instance}
1398
  @param instances: additional instances from which to obtain parameters
1399
  @rtype: list of (origin, hypervisor, parameters)
1400
  @return: a list with all parameters found, indicating the hypervisor they
1401
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1402

1403
  """
1404
  hvp_data = []
1405

    
1406
  for hv_name in cluster.enabled_hypervisors:
1407
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1408

    
1409
  for os_name, os_hvp in cluster.os_hvp.items():
1410
    for hv_name, hv_params in os_hvp.items():
1411
      if hv_params:
1412
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1413
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1414

    
1415
  # TODO: collapse identical parameter values in a single one
1416
  for instance in instances:
1417
    if instance.hvparams:
1418
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1419
                       cluster.FillHV(instance)))
1420

    
1421
  return hvp_data
1422

    
1423

    
1424
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1425
  """Verifies the cluster config.
1426

1427
  """
1428
  REQ_BGL = False
1429

    
1430
  def _VerifyHVP(self, hvp_data):
1431
    """Verifies locally the syntax of the hypervisor parameters.
1432

1433
    """
1434
    for item, hv_name, hv_params in hvp_data:
1435
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1436
             (item, hv_name))
1437
      try:
1438
        hv_class = hypervisor.GetHypervisorClass(hv_name)
1439
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1440
        hv_class.CheckParameterSyntax(hv_params)
1441
      except errors.GenericError, err:
1442
        self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1443

    
1444
  def ExpandNames(self):
1445
    self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1446
    self.share_locks = ShareAll()
1447

    
1448
  def CheckPrereq(self):
1449
    """Check prerequisites.
1450

1451
    """
1452
    # Retrieve all information
1453
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1454
    self.all_node_info = self.cfg.GetAllNodesInfo()
1455
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1456

    
1457
  def Exec(self, feedback_fn):
1458
    """Verify integrity of cluster, performing various test on nodes.
1459

1460
    """
1461
    self.bad = False
1462
    self._feedback_fn = feedback_fn
1463

    
1464
    feedback_fn("* Verifying cluster config")
1465

    
1466
    for msg in self.cfg.VerifyConfig():
1467
      self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1468

    
1469
    feedback_fn("* Verifying cluster certificate files")
1470

    
1471
    for cert_filename in pathutils.ALL_CERT_FILES:
1472
      (errcode, msg) = _VerifyCertificate(cert_filename)
1473
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1474

    
1475
    self._ErrorIf(not utils.CanRead(constants.LUXID_USER,
1476
                                    pathutils.NODED_CERT_FILE),
1477
                  constants.CV_ECLUSTERCERT,
1478
                  None,
1479
                  pathutils.NODED_CERT_FILE + " must be accessible by the " +
1480
                    constants.LUXID_USER + " user")
1481

    
1482
    feedback_fn("* Verifying hypervisor parameters")
1483

    
1484
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1485
                                                self.all_inst_info.values()))
1486

    
1487
    feedback_fn("* Verifying all nodes belong to an existing group")
1488

    
1489
    # We do this verification here because, should this bogus circumstance
1490
    # occur, it would never be caught by VerifyGroup, which only acts on
1491
    # nodes/instances reachable from existing node groups.
1492

    
1493
    dangling_nodes = set(node for node in self.all_node_info.values()
1494
                         if node.group not in self.all_group_info)
1495

    
1496
    dangling_instances = {}
1497
    no_node_instances = []
1498

    
1499
    for inst in self.all_inst_info.values():
1500
      if inst.primary_node in [node.uuid for node in dangling_nodes]:
1501
        dangling_instances.setdefault(inst.primary_node, []).append(inst)
1502
      elif inst.primary_node not in self.all_node_info:
1503
        no_node_instances.append(inst)
1504

    
1505
    pretty_dangling = [
1506
        "%s (%s)" %
1507
        (node.name,
1508
         utils.CommaJoin(inst.name for
1509
                         inst in dangling_instances.get(node.uuid, [])))
1510
        for node in dangling_nodes]
1511

    
1512
    self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1513
                  None,
1514
                  "the following nodes (and their instances) belong to a non"
1515
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1516

    
1517
    self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1518
                  None,
1519
                  "the following instances have a non-existing primary-node:"
1520
                  " %s", utils.CommaJoin(inst.name for
1521
                                         inst in no_node_instances))
1522

    
1523
    return not self.bad
1524

    
1525

    
1526
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1527
  """Verifies the status of a node group.
1528

1529
  """
1530
  HPATH = "cluster-verify"
1531
  HTYPE = constants.HTYPE_CLUSTER
1532
  REQ_BGL = False
1533

    
1534
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1535

    
1536
  class NodeImage(object):
1537
    """A class representing the logical and physical status of a node.
1538

1539
    @type uuid: string
1540
    @ivar uuid: the node UUID to which this object refers
1541
    @ivar volumes: a structure as returned from
1542
        L{ganeti.backend.GetVolumeList} (runtime)
1543
    @ivar instances: a list of running instances (runtime)
1544
    @ivar pinst: list of configured primary instances (config)
1545
    @ivar sinst: list of configured secondary instances (config)
1546
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1547
        instances for which this node is secondary (config)
1548
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1549
    @ivar dfree: free disk, as reported by the node (runtime)
1550
    @ivar offline: the offline status (config)
1551
    @type rpc_fail: boolean
1552
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1553
        not whether the individual keys were correct) (runtime)
1554
    @type lvm_fail: boolean
1555
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1556
    @type hyp_fail: boolean
1557
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1558
    @type ghost: boolean
1559
    @ivar ghost: whether this is a known node or not (config)
1560
    @type os_fail: boolean
1561
    @ivar os_fail: whether the RPC call didn't return valid OS data
1562
    @type oslist: list
1563
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1564
    @type vm_capable: boolean
1565
    @ivar vm_capable: whether the node can host instances
1566
    @type pv_min: float
1567
    @ivar pv_min: size in MiB of the smallest PVs
1568
    @type pv_max: float
1569
    @ivar pv_max: size in MiB of the biggest PVs
1570

1571
    """
1572
    def __init__(self, offline=False, uuid=None, vm_capable=True):
1573
      self.uuid = uuid
1574
      self.volumes = {}
1575
      self.instances = []
1576
      self.pinst = []
1577
      self.sinst = []
1578
      self.sbp = {}
1579
      self.mfree = 0
1580
      self.dfree = 0
1581
      self.offline = offline
1582
      self.vm_capable = vm_capable
1583
      self.rpc_fail = False
1584
      self.lvm_fail = False
1585
      self.hyp_fail = False
1586
      self.ghost = False
1587
      self.os_fail = False
1588
      self.oslist = {}
1589
      self.pv_min = None
1590
      self.pv_max = None
1591

    
1592
  def ExpandNames(self):
1593
    # This raises errors.OpPrereqError on its own:
1594
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1595

    
1596
    # Get instances in node group; this is unsafe and needs verification later
1597
    inst_uuids = \
1598
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1599

    
1600
    self.needed_locks = {
1601
      locking.LEVEL_INSTANCE: self.cfg.GetInstanceNames(inst_uuids),
1602
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1603
      locking.LEVEL_NODE: [],
1604

    
1605
      # This opcode is run by watcher every five minutes and acquires all nodes
1606
      # for a group. It doesn't run for a long time, so it's better to acquire
1607
      # the node allocation lock as well.
1608
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1609
      }
1610

    
1611
    self.share_locks = ShareAll()
1612

    
1613
  def DeclareLocks(self, level):
1614
    if level == locking.LEVEL_NODE:
1615
      # Get members of node group; this is unsafe and needs verification later
1616
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1617

    
1618
      # In Exec(), we warn about mirrored instances that have primary and
1619
      # secondary living in separate node groups. To fully verify that
1620
      # volumes for these instances are healthy, we will need to do an
1621
      # extra call to their secondaries. We ensure here those nodes will
1622
      # be locked.
1623
      for inst_name in self.owned_locks(locking.LEVEL_INSTANCE):
1624
        # Important: access only the instances whose lock is owned
1625
        instance = self.cfg.GetInstanceInfoByName(inst_name)
1626
        if instance.disk_template in constants.DTS_INT_MIRROR:
1627
          nodes.update(instance.secondary_nodes)
1628

    
1629
      self.needed_locks[locking.LEVEL_NODE] = nodes
1630

    
1631
  def CheckPrereq(self):
1632
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1633
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1634

    
1635
    group_node_uuids = set(self.group_info.members)
1636
    group_inst_uuids = \
1637
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1638

    
1639
    unlocked_node_uuids = \
1640
        group_node_uuids.difference(self.owned_locks(locking.LEVEL_NODE))
1641

    
1642
    unlocked_inst_uuids = \
1643
        group_inst_uuids.difference(
1644
          [self.cfg.GetInstanceInfoByName(name).uuid
1645
           for name in self.owned_locks(locking.LEVEL_INSTANCE)])
1646

    
1647
    if unlocked_node_uuids:
1648
      raise errors.OpPrereqError(
1649
        "Missing lock for nodes: %s" %
1650
        utils.CommaJoin(self.cfg.GetNodeNames(unlocked_node_uuids)),
1651
        errors.ECODE_STATE)
1652

    
1653
    if unlocked_inst_uuids:
1654
      raise errors.OpPrereqError(
1655
        "Missing lock for instances: %s" %
1656
        utils.CommaJoin(self.cfg.GetInstanceNames(unlocked_inst_uuids)),
1657
        errors.ECODE_STATE)
1658

    
1659
    self.all_node_info = self.cfg.GetAllNodesInfo()
1660
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1661

    
1662
    self.my_node_uuids = group_node_uuids
1663
    self.my_node_info = dict((node_uuid, self.all_node_info[node_uuid])
1664
                             for node_uuid in group_node_uuids)
1665

    
1666
    self.my_inst_uuids = group_inst_uuids
1667
    self.my_inst_info = dict((inst_uuid, self.all_inst_info[inst_uuid])
1668
                             for inst_uuid in group_inst_uuids)
1669

    
1670
    # We detect here the nodes that will need the extra RPC calls for verifying
1671
    # split LV volumes; they should be locked.
1672
    extra_lv_nodes = set()
1673

    
1674
    for inst in self.my_inst_info.values():
1675
      if inst.disk_template in constants.DTS_INT_MIRROR:
1676
        for nuuid in inst.all_nodes:
1677
          if self.all_node_info[nuuid].group != self.group_uuid:
1678
            extra_lv_nodes.add(nuuid)
1679

    
1680
    unlocked_lv_nodes = \
1681
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1682

    
1683
    if unlocked_lv_nodes:
1684
      raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1685
                                 utils.CommaJoin(unlocked_lv_nodes),
1686
                                 errors.ECODE_STATE)
1687
    self.extra_lv_nodes = list(extra_lv_nodes)
1688

    
1689
  def _VerifyNode(self, ninfo, nresult):
1690
    """Perform some basic validation on data returned from a node.
1691

1692
      - check the result data structure is well formed and has all the
1693
        mandatory fields
1694
      - check ganeti version
1695

1696
    @type ninfo: L{objects.Node}
1697
    @param ninfo: the node to check
1698
    @param nresult: the results from the node
1699
    @rtype: boolean
1700
    @return: whether overall this call was successful (and we can expect
1701
         reasonable values in the respose)
1702

1703
    """
1704
    # main result, nresult should be a non-empty dict
1705
    test = not nresult or not isinstance(nresult, dict)
1706
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1707
                  "unable to verify node: no data returned")
1708
    if test:
1709
      return False
1710

    
1711
    # compares ganeti version
1712
    local_version = constants.PROTOCOL_VERSION
1713
    remote_version = nresult.get("version", None)
1714
    test = not (remote_version and
1715
                isinstance(remote_version, (list, tuple)) and
1716
                len(remote_version) == 2)
1717
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1718
                  "connection to node returned invalid data")
1719
    if test:
1720
      return False
1721

    
1722
    test = local_version != remote_version[0]
1723
    self._ErrorIf(test, constants.CV_ENODEVERSION, ninfo.name,
1724
                  "incompatible protocol versions: master %s,"
1725
                  " node %s", local_version, remote_version[0])
1726
    if test:
1727
      return False
1728

    
1729
    # node seems compatible, we can actually try to look into its results
1730

    
1731
    # full package version
1732
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1733
                  constants.CV_ENODEVERSION, ninfo.name,
1734
                  "software version mismatch: master %s, node %s",
1735
                  constants.RELEASE_VERSION, remote_version[1],
1736
                  code=self.ETYPE_WARNING)
1737

    
1738
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1739
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1740
      for hv_name, hv_result in hyp_result.iteritems():
1741
        test = hv_result is not None
1742
        self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
1743
                      "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1744

    
1745
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1746
    if ninfo.vm_capable and isinstance(hvp_result, list):
1747
      for item, hv_name, hv_result in hvp_result:
1748
        self._ErrorIf(True, constants.CV_ENODEHV, ninfo.name,
1749
                      "hypervisor %s parameter verify failure (source %s): %s",
1750
                      hv_name, item, hv_result)
1751

    
1752
    test = nresult.get(constants.NV_NODESETUP,
1753
                       ["Missing NODESETUP results"])
1754
    self._ErrorIf(test, constants.CV_ENODESETUP, ninfo.name,
1755
                  "node setup error: %s", "; ".join(test))
1756

    
1757
    return True
1758

    
1759
  def _VerifyNodeTime(self, ninfo, nresult,
1760
                      nvinfo_starttime, nvinfo_endtime):
1761
    """Check the node time.
1762

1763
    @type ninfo: L{objects.Node}
1764
    @param ninfo: the node to check
1765
    @param nresult: the remote results for the node
1766
    @param nvinfo_starttime: the start time of the RPC call
1767
    @param nvinfo_endtime: the end time of the RPC call
1768

1769
    """
1770
    ntime = nresult.get(constants.NV_TIME, None)
1771
    try:
1772
      ntime_merged = utils.MergeTime(ntime)
1773
    except (ValueError, TypeError):
1774
      self._ErrorIf(True, constants.CV_ENODETIME, ninfo.name,
1775
                    "Node returned invalid time")
1776
      return
1777

    
1778
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1779
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1780
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1781
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1782
    else:
1783
      ntime_diff = None
1784

    
1785
    self._ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, ninfo.name,
1786
                  "Node time diverges by at least %s from master node time",
1787
                  ntime_diff)
1788

    
1789
  def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1790
    """Check the node LVM results and update info for cross-node checks.
1791

1792
    @type ninfo: L{objects.Node}
1793
    @param ninfo: the node to check
1794
    @param nresult: the remote results for the node
1795
    @param vg_name: the configured VG name
1796
    @type nimg: L{NodeImage}
1797
    @param nimg: node image
1798

1799
    """
1800
    if vg_name is None:
1801
      return
1802

    
1803
    # checks vg existence and size > 20G
1804
    vglist = nresult.get(constants.NV_VGLIST, None)
1805
    test = not vglist
1806
    self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
1807
                  "unable to check volume groups")
1808
    if not test:
1809
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1810
                                            constants.MIN_VG_SIZE)
1811
      self._ErrorIf(vgstatus, constants.CV_ENODELVM, ninfo.name, vgstatus)
1812

    
1813
    # Check PVs
1814
    (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1815
    for em in errmsgs:
1816
      self._Error(constants.CV_ENODELVM, ninfo.name, em)
1817
    if pvminmax is not None:
1818
      (nimg.pv_min, nimg.pv_max) = pvminmax
1819

    
1820
  def _VerifyGroupDRBDVersion(self, node_verify_infos):
1821
    """Check cross-node DRBD version consistency.
1822

1823
    @type node_verify_infos: dict
1824
    @param node_verify_infos: infos about nodes as returned from the
1825
      node_verify call.
1826

1827
    """
1828
    node_versions = {}
1829
    for node_uuid, ndata in node_verify_infos.items():
1830
      nresult = ndata.payload
1831
      version = nresult.get(constants.NV_DRBDVERSION, "Missing DRBD version")
1832
      node_versions[node_uuid] = version
1833

    
1834
    if len(set(node_versions.values())) > 1:
1835
      for node_uuid, version in sorted(node_versions.items()):
1836
        msg = "DRBD version mismatch: %s" % version
1837
        self._Error(constants.CV_ENODEDRBDHELPER, node_uuid, msg,
1838
                    code=self.ETYPE_WARNING)
1839

    
1840
  def _VerifyGroupLVM(self, node_image, vg_name):
1841
    """Check cross-node consistency in LVM.
1842

1843
    @type node_image: dict
1844
    @param node_image: info about nodes, mapping from node to names to
1845
      L{NodeImage} objects
1846
    @param vg_name: the configured VG name
1847

1848
    """
1849
    if vg_name is None:
1850
      return
1851

    
1852
    # Only exclusive storage needs this kind of checks
1853
    if not self._exclusive_storage:
1854
      return
1855

    
1856
    # exclusive_storage wants all PVs to have the same size (approximately),
1857
    # if the smallest and the biggest ones are okay, everything is fine.
1858
    # pv_min is None iff pv_max is None
1859
    vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1860
    if not vals:
1861
      return
1862
    (pvmin, minnode_uuid) = min((ni.pv_min, ni.uuid) for ni in vals)
1863
    (pvmax, maxnode_uuid) = max((ni.pv_max, ni.uuid) for ni in vals)
1864
    bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1865
    self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1866
                  "PV sizes differ too much in the group; smallest (%s MB) is"
1867
                  " on %s, biggest (%s MB) is on %s",
1868
                  pvmin, self.cfg.GetNodeName(minnode_uuid),
1869
                  pvmax, self.cfg.GetNodeName(maxnode_uuid))
1870

    
1871
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1872
    """Check the node bridges.
1873

1874
    @type ninfo: L{objects.Node}
1875
    @param ninfo: the node to check
1876
    @param nresult: the remote results for the node
1877
    @param bridges: the expected list of bridges
1878

1879
    """
1880
    if not bridges:
1881
      return
1882

    
1883
    missing = nresult.get(constants.NV_BRIDGES, None)
1884
    test = not isinstance(missing, list)
1885
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1886
                  "did not return valid bridge information")
1887
    if not test:
1888
      self._ErrorIf(bool(missing), constants.CV_ENODENET, ninfo.name,
1889
                    "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1890

    
1891
  def _VerifyNodeUserScripts(self, ninfo, nresult):
1892
    """Check the results of user scripts presence and executability on the node
1893

1894
    @type ninfo: L{objects.Node}
1895
    @param ninfo: the node to check
1896
    @param nresult: the remote results for the node
1897

1898
    """
1899
    test = not constants.NV_USERSCRIPTS in nresult
1900
    self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1901
                  "did not return user scripts information")
1902

    
1903
    broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
1904
    if not test:
1905
      self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1906
                    "user scripts not present or not executable: %s" %
1907
                    utils.CommaJoin(sorted(broken_scripts)))
1908

    
1909
  def _VerifyNodeNetwork(self, ninfo, nresult):
1910
    """Check the node network connectivity results.
1911

1912
    @type ninfo: L{objects.Node}
1913
    @param ninfo: the node to check
1914
    @param nresult: the remote results for the node
1915

1916
    """
1917
    test = constants.NV_NODELIST not in nresult
1918
    self._ErrorIf(test, constants.CV_ENODESSH, ninfo.name,
1919
                  "node hasn't returned node ssh connectivity data")
1920
    if not test:
1921
      if nresult[constants.NV_NODELIST]:
1922
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1923
          self._ErrorIf(True, constants.CV_ENODESSH, ninfo.name,
1924
                        "ssh communication with node '%s': %s", a_node, a_msg)
1925

    
1926
    test = constants.NV_NODENETTEST not in nresult
1927
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1928
                  "node hasn't returned node tcp connectivity data")
1929
    if not test:
1930
      if nresult[constants.NV_NODENETTEST]:
1931
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1932
        for anode in nlist:
1933
          self._ErrorIf(True, constants.CV_ENODENET, ninfo.name,
1934
                        "tcp communication with node '%s': %s",
1935
                        anode, nresult[constants.NV_NODENETTEST][anode])
1936

    
1937
    test = constants.NV_MASTERIP not in nresult
1938
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1939
                  "node hasn't returned node master IP reachability data")
1940
    if not test:
1941
      if not nresult[constants.NV_MASTERIP]:
1942
        if ninfo.uuid == self.master_node:
1943
          msg = "the master node cannot reach the master IP (not configured?)"
1944
        else:
1945
          msg = "cannot reach the master IP"
1946
        self._ErrorIf(True, constants.CV_ENODENET, ninfo.name, msg)
1947

    
1948
  def _VerifyInstance(self, instance, node_image, diskstatus):
1949
    """Verify an instance.
1950

1951
    This function checks to see if the required block devices are
1952
    available on the instance's node, and that the nodes are in the correct
1953
    state.
1954

1955
    """
1956
    pnode_uuid = instance.primary_node
1957
    pnode_img = node_image[pnode_uuid]
1958
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
1959

    
1960
    node_vol_should = {}
1961
    instance.MapLVsByNode(node_vol_should)
1962

    
1963
    cluster = self.cfg.GetClusterInfo()
1964
    ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
1965
                                                            self.group_info)
1966
    err = ComputeIPolicyInstanceViolation(ipolicy, instance, self.cfg)
1967
    self._ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance.name,
1968
                  utils.CommaJoin(err), code=self.ETYPE_WARNING)
1969

    
1970
    for node_uuid in node_vol_should:
1971
      n_img = node_image[node_uuid]
1972
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1973
        # ignore missing volumes on offline or broken nodes
1974
        continue
1975
      for volume in node_vol_should[node_uuid]:
1976
        test = volume not in n_img.volumes
1977
        self._ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance.name,
1978
                      "volume %s missing on node %s", volume,
1979
                      self.cfg.GetNodeName(node_uuid))
1980

    
1981
    if instance.admin_state == constants.ADMINST_UP:
1982
      test = instance.uuid not in pnode_img.instances and not pnode_img.offline
1983
      self._ErrorIf(test, constants.CV_EINSTANCEDOWN, instance.name,
1984
                    "instance not running on its primary node %s",
1985
                     self.cfg.GetNodeName(pnode_uuid))
1986
      self._ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE,
1987
                    instance.name, "instance is marked as running and lives on"
1988
                    " offline node %s", self.cfg.GetNodeName(pnode_uuid))
1989

    
1990
    diskdata = [(nname, success, status, idx)
1991
                for (nname, disks) in diskstatus.items()
1992
                for idx, (success, status) in enumerate(disks)]
1993

    
1994
    for nname, success, bdev_status, idx in diskdata:
1995
      # the 'ghost node' construction in Exec() ensures that we have a
1996
      # node here
1997
      snode = node_image[nname]
1998
      bad_snode = snode.ghost or snode.offline
1999
      self._ErrorIf(instance.disks_active and
2000
                    not success and not bad_snode,
2001
                    constants.CV_EINSTANCEFAULTYDISK, instance.name,
2002
                    "couldn't retrieve status for disk/%s on %s: %s",
2003
                    idx, self.cfg.GetNodeName(nname), bdev_status)
2004

    
2005
      if instance.disks_active and success and \
2006
         (bdev_status.is_degraded or
2007
          bdev_status.ldisk_status != constants.LDS_OKAY):
2008
        msg = "disk/%s on %s" % (idx, self.cfg.GetNodeName(nname))
2009
        if bdev_status.is_degraded:
2010
          msg += " is degraded"
2011
        if bdev_status.ldisk_status != constants.LDS_OKAY:
2012
          msg += "; state is '%s'" % \
2013
                 constants.LDS_NAMES[bdev_status.ldisk_status]
2014

    
2015
        self._Error(constants.CV_EINSTANCEFAULTYDISK, instance.name, msg)
2016

    
2017
    self._ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2018
                  constants.CV_ENODERPC, self.cfg.GetNodeName(pnode_uuid),
2019
                  "instance %s, connection to primary node failed",
2020
                  instance.name)
2021

    
2022
    self._ErrorIf(len(instance.secondary_nodes) > 1,
2023
                  constants.CV_EINSTANCELAYOUT, instance.name,
2024
                  "instance has multiple secondary nodes: %s",
2025
                  utils.CommaJoin(instance.secondary_nodes),
2026
                  code=self.ETYPE_WARNING)
2027

    
2028
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, instance.all_nodes)
2029
    if any(es_flags.values()):
2030
      if instance.disk_template not in constants.DTS_EXCL_STORAGE:
2031
        # Disk template not compatible with exclusive_storage: no instance
2032
        # node should have the flag set
2033
        es_nodes = [n
2034
                    for (n, es) in es_flags.items()
2035
                    if es]
2036
        self._Error(constants.CV_EINSTANCEUNSUITABLENODE, instance.name,
2037
                    "instance has template %s, which is not supported on nodes"
2038
                    " that have exclusive storage set: %s",
2039
                    instance.disk_template,
2040
                    utils.CommaJoin(self.cfg.GetNodeNames(es_nodes)))
2041
      for (idx, disk) in enumerate(instance.disks):
2042
        self._ErrorIf(disk.spindles is None,
2043
                      constants.CV_EINSTANCEMISSINGCFGPARAMETER, instance.name,
2044
                      "number of spindles not configured for disk %s while"
2045
                      " exclusive storage is enabled, try running"
2046
                      " gnt-cluster repair-disk-sizes", idx)
2047

    
2048
    if instance.disk_template in constants.DTS_INT_MIRROR:
2049
      instance_nodes = utils.NiceSort(instance.all_nodes)
2050
      instance_groups = {}
2051

    
2052
      for node_uuid in instance_nodes:
2053
        instance_groups.setdefault(self.all_node_info[node_uuid].group,
2054
                                   []).append(node_uuid)
2055

    
2056
      pretty_list = [
2057
        "%s (group %s)" % (utils.CommaJoin(self.cfg.GetNodeNames(nodes)),
2058
                           groupinfo[group].name)
2059
        # Sort so that we always list the primary node first.
2060
        for group, nodes in sorted(instance_groups.items(),
2061
                                   key=lambda (_, nodes): pnode_uuid in nodes,
2062
                                   reverse=True)]
2063

    
2064
      self._ErrorIf(len(instance_groups) > 1,
2065
                    constants.CV_EINSTANCESPLITGROUPS,
2066
                    instance.name, "instance has primary and secondary nodes in"
2067
                    " different groups: %s", utils.CommaJoin(pretty_list),
2068
                    code=self.ETYPE_WARNING)
2069

    
2070
    inst_nodes_offline = []
2071
    for snode in instance.secondary_nodes:
2072
      s_img = node_image[snode]
2073
      self._ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
2074
                    self.cfg.GetNodeName(snode),
2075
                    "instance %s, connection to secondary node failed",
2076
                    instance.name)
2077

    
2078
      if s_img.offline:
2079
        inst_nodes_offline.append(snode)
2080

    
2081
    # warn that the instance lives on offline nodes
2082
    self._ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE,
2083
                  instance.name, "instance has offline secondary node(s) %s",
2084
                  utils.CommaJoin(self.cfg.GetNodeNames(inst_nodes_offline)))
2085
    # ... or ghost/non-vm_capable nodes
2086
    for node_uuid in instance.all_nodes:
2087
      self._ErrorIf(node_image[node_uuid].ghost, constants.CV_EINSTANCEBADNODE,
2088
                    instance.name, "instance lives on ghost node %s",
2089
                    self.cfg.GetNodeName(node_uuid))
2090
      self._ErrorIf(not node_image[node_uuid].vm_capable,
2091
                    constants.CV_EINSTANCEBADNODE, instance.name,
2092
                    "instance lives on non-vm_capable node %s",
2093
                    self.cfg.GetNodeName(node_uuid))
2094

    
2095
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2096
    """Verify if there are any unknown volumes in the cluster.
2097

2098
    The .os, .swap and backup volumes are ignored. All other volumes are
2099
    reported as unknown.
2100

2101
    @type reserved: L{ganeti.utils.FieldSet}
2102
    @param reserved: a FieldSet of reserved volume names
2103

2104
    """
2105
    for node_uuid, n_img in node_image.items():
2106
      if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
2107
          self.all_node_info[node_uuid].group != self.group_uuid):
2108
        # skip non-healthy nodes
2109
        continue
2110
      for volume in n_img.volumes:
2111
        test = ((node_uuid not in node_vol_should or
2112
                volume not in node_vol_should[node_uuid]) and
2113
                not reserved.Matches(volume))
2114
        self._ErrorIf(test, constants.CV_ENODEORPHANLV,
2115
                      self.cfg.GetNodeName(node_uuid),
2116
                      "volume %s is unknown", volume)
2117

    
2118
  def _VerifyNPlusOneMemory(self, node_image, all_insts):
2119
    """Verify N+1 Memory Resilience.
2120

2121
    Check that if one single node dies we can still start all the
2122
    instances it was primary for.
2123

2124
    """
2125
    cluster_info = self.cfg.GetClusterInfo()
2126
    for node_uuid, n_img in node_image.items():
2127
      # This code checks that every node which is now listed as
2128
      # secondary has enough memory to host all instances it is
2129
      # supposed to should a single other node in the cluster fail.
2130
      # FIXME: not ready for failover to an arbitrary node
2131
      # FIXME: does not support file-backed instances
2132
      # WARNING: we currently take into account down instances as well
2133
      # as up ones, considering that even if they're down someone
2134
      # might want to start them even in the event of a node failure.
2135
      if n_img.offline or \
2136
         self.all_node_info[node_uuid].group != self.group_uuid:
2137
        # we're skipping nodes marked offline and nodes in other groups from
2138
        # the N+1 warning, since most likely we don't have good memory
2139
        # infromation from them; we already list instances living on such
2140
        # nodes, and that's enough warning
2141
        continue
2142
      #TODO(dynmem): also consider ballooning out other instances
2143
      for prinode, inst_uuids in n_img.sbp.items():
2144
        needed_mem = 0
2145
        for inst_uuid in inst_uuids:
2146
          bep = cluster_info.FillBE(all_insts[inst_uuid])
2147
          if bep[constants.BE_AUTO_BALANCE]:
2148
            needed_mem += bep[constants.BE_MINMEM]
2149
        test = n_img.mfree < needed_mem
2150
        self._ErrorIf(test, constants.CV_ENODEN1,
2151
                      self.cfg.GetNodeName(node_uuid),
2152
                      "not enough memory to accomodate instance failovers"
2153
                      " should node %s fail (%dMiB needed, %dMiB available)",
2154
                      self.cfg.GetNodeName(prinode), needed_mem, n_img.mfree)
2155

    
2156
  def _VerifyFiles(self, nodes, master_node_uuid, all_nvinfo,
2157
                   (files_all, files_opt, files_mc, files_vm)):
2158
    """Verifies file checksums collected from all nodes.
2159

2160
    @param nodes: List of L{objects.Node} objects
2161
    @param master_node_uuid: UUID of master node
2162
    @param all_nvinfo: RPC results
2163

2164
    """
2165
    # Define functions determining which nodes to consider for a file
2166
    files2nodefn = [
2167
      (files_all, None),
2168
      (files_mc, lambda node: (node.master_candidate or
2169
                               node.uuid == master_node_uuid)),
2170
      (files_vm, lambda node: node.vm_capable),
2171
      ]
2172

    
2173
    # Build mapping from filename to list of nodes which should have the file
2174
    nodefiles = {}
2175
    for (files, fn) in files2nodefn:
2176
      if fn is None:
2177
        filenodes = nodes
2178
      else:
2179
        filenodes = filter(fn, nodes)
2180
      nodefiles.update((filename,
2181
                        frozenset(map(operator.attrgetter("uuid"), filenodes)))
2182
                       for filename in files)
2183

    
2184
    assert set(nodefiles) == (files_all | files_mc | files_vm)
2185

    
2186
    fileinfo = dict((filename, {}) for filename in nodefiles)
2187
    ignore_nodes = set()
2188

    
2189
    for node in nodes:
2190
      if node.offline:
2191
        ignore_nodes.add(node.uuid)
2192
        continue
2193

    
2194
      nresult = all_nvinfo[node.uuid]
2195

    
2196
      if nresult.fail_msg or not nresult.payload:
2197
        node_files = None
2198
      else:
2199
        fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
2200
        node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2201
                          for (key, value) in fingerprints.items())
2202
        del fingerprints
2203

    
2204
      test = not (node_files and isinstance(node_files, dict))
2205
      self._ErrorIf(test, constants.CV_ENODEFILECHECK, node.name,
2206
                    "Node did not return file checksum data")
2207
      if test:
2208
        ignore_nodes.add(node.uuid)
2209
        continue
2210

    
2211
      # Build per-checksum mapping from filename to nodes having it
2212
      for (filename, checksum) in node_files.items():
2213
        assert filename in nodefiles
2214
        fileinfo[filename].setdefault(checksum, set()).add(node.uuid)
2215

    
2216
    for (filename, checksums) in fileinfo.items():
2217
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2218

    
2219
      # Nodes having the file
2220
      with_file = frozenset(node_uuid
2221
                            for node_uuids in fileinfo[filename].values()
2222
                            for node_uuid in node_uuids) - ignore_nodes
2223

    
2224
      expected_nodes = nodefiles[filename] - ignore_nodes
2225

    
2226
      # Nodes missing file
2227
      missing_file = expected_nodes - with_file
2228

    
2229
      if filename in files_opt:
2230
        # All or no nodes
2231
        self._ErrorIf(missing_file and missing_file != expected_nodes,
2232
                      constants.CV_ECLUSTERFILECHECK, None,
2233
                      "File %s is optional, but it must exist on all or no"
2234
                      " nodes (not found on %s)",
2235
                      filename,
2236
                      utils.CommaJoin(
2237
                        utils.NiceSort(
2238
                          map(self.cfg.GetNodeName, missing_file))))
2239
      else:
2240
        self._ErrorIf(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2241
                      "File %s is missing from node(s) %s", filename,
2242
                      utils.CommaJoin(
2243
                        utils.NiceSort(
2244
                          map(self.cfg.GetNodeName, missing_file))))
2245

    
2246
        # Warn if a node has a file it shouldn't
2247
        unexpected = with_file - expected_nodes
2248
        self._ErrorIf(unexpected,
2249
                      constants.CV_ECLUSTERFILECHECK, None,
2250
                      "File %s should not exist on node(s) %s",
2251
                      filename, utils.CommaJoin(
2252
                        utils.NiceSort(map(self.cfg.GetNodeName, unexpected))))
2253

    
2254
      # See if there are multiple versions of the file
2255
      test = len(checksums) > 1
2256
      if test:
2257
        variants = ["variant %s on %s" %
2258
                    (idx + 1,
2259
                     utils.CommaJoin(utils.NiceSort(
2260
                       map(self.cfg.GetNodeName, node_uuids))))
2261
                    for (idx, (checksum, node_uuids)) in
2262
                      enumerate(sorted(checksums.items()))]
2263
      else:
2264
        variants = []
2265

    
2266
      self._ErrorIf(test, constants.CV_ECLUSTERFILECHECK, None,
2267
                    "File %s found with %s different checksums (%s)",
2268
                    filename, len(checksums), "; ".join(variants))
2269

    
2270
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2271
                      drbd_map):
2272
    """Verifies and the node DRBD status.
2273

2274
    @type ninfo: L{objects.Node}
2275
    @param ninfo: the node to check
2276
    @param nresult: the remote results for the node
2277
    @param instanceinfo: the dict of instances
2278
    @param drbd_helper: the configured DRBD usermode helper
2279
    @param drbd_map: the DRBD map as returned by
2280
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2281

2282
    """
2283
    if drbd_helper:
2284
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2285
      test = (helper_result is None)
2286
      self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2287
                    "no drbd usermode helper returned")
2288
      if helper_result:
2289
        status, payload = helper_result
2290
        test = not status
2291
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2292
                      "drbd usermode helper check unsuccessful: %s", payload)
2293
        test = status and (payload != drbd_helper)
2294
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2295
                      "wrong drbd usermode helper: %s", payload)
2296

    
2297
    # compute the DRBD minors
2298
    node_drbd = {}
2299
    for minor, inst_uuid in drbd_map[ninfo.uuid].items():
2300
      test = inst_uuid not in instanceinfo
2301
      self._ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2302
                    "ghost instance '%s' in temporary DRBD map", inst_uuid)
2303
        # ghost instance should not be running, but otherwise we
2304
        # don't give double warnings (both ghost instance and
2305
        # unallocated minor in use)
2306
      if test:
2307
        node_drbd[minor] = (inst_uuid, False)
2308
      else:
2309
        instance = instanceinfo[inst_uuid]
2310
        node_drbd[minor] = (inst_uuid, instance.disks_active)
2311

    
2312
    # and now check them
2313
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2314
    test = not isinstance(used_minors, (tuple, list))
2315
    self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2316
                  "cannot parse drbd status file: %s", str(used_minors))
2317
    if test:
2318
      # we cannot check drbd status
2319
      return
2320

    
2321
    for minor, (inst_uuid, must_exist) in node_drbd.items():
2322
      test = minor not in used_minors and must_exist
2323
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2324
                    "drbd minor %d of instance %s is not active", minor,
2325
                    self.cfg.GetInstanceName(inst_uuid))
2326
    for minor in used_minors:
2327
      test = minor not in node_drbd
2328
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2329
                    "unallocated drbd minor %d is in use", minor)
2330

    
2331
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2332
    """Builds the node OS structures.
2333

2334
    @type ninfo: L{objects.Node}
2335
    @param ninfo: the node to check
2336
    @param nresult: the remote results for the node
2337
    @param nimg: the node image object
2338

2339
    """
2340
    remote_os = nresult.get(constants.NV_OSLIST, None)
2341
    test = (not isinstance(remote_os, list) or
2342
            not compat.all(isinstance(v, list) and len(v) == 7
2343
                           for v in remote_os))
2344

    
2345
    self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2346
                  "node hasn't returned valid OS data")
2347

    
2348
    nimg.os_fail = test
2349

    
2350
    if test:
2351
      return
2352

    
2353
    os_dict = {}
2354

    
2355
    for (name, os_path, status, diagnose,
2356
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2357

    
2358
      if name not in os_dict:
2359
        os_dict[name] = []
2360

    
2361
      # parameters is a list of lists instead of list of tuples due to
2362
      # JSON lacking a real tuple type, fix it:
2363
      parameters = [tuple(v) for v in parameters]
2364
      os_dict[name].append((os_path, status, diagnose,
2365
                            set(variants), set(parameters), set(api_ver)))
2366

    
2367
    nimg.oslist = os_dict
2368

    
2369
  def _VerifyNodeOS(self, ninfo, nimg, base):
2370
    """Verifies the node OS list.
2371

2372
    @type ninfo: L{objects.Node}
2373
    @param ninfo: the node to check
2374
    @param nimg: the node image object
2375
    @param base: the 'template' node we match against (e.g. from the master)
2376

2377
    """
2378
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2379

    
2380
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2381
    for os_name, os_data in nimg.oslist.items():
2382
      assert os_data, "Empty OS status for OS %s?!" % os_name
2383
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2384
      self._ErrorIf(not f_status, constants.CV_ENODEOS, ninfo.name,
2385
                    "Invalid OS %s (located at %s): %s",
2386
                    os_name, f_path, f_diag)
2387
      self._ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, ninfo.name,
2388
                    "OS '%s' has multiple entries"
2389
                    " (first one shadows the rest): %s",
2390
                    os_name, utils.CommaJoin([v[0] for v in os_data]))
2391
      # comparisons with the 'base' image
2392
      test = os_name not in base.oslist
2393
      self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2394
                    "Extra OS %s not present on reference node (%s)",
2395
                    os_name, self.cfg.GetNodeName(base.uuid))
2396
      if test:
2397
        continue
2398
      assert base.oslist[os_name], "Base node has empty OS status?"
2399
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2400
      if not b_status:
2401
        # base OS is invalid, skipping
2402
        continue
2403
      for kind, a, b in [("API version", f_api, b_api),
2404
                         ("variants list", f_var, b_var),
2405
                         ("parameters", beautify_params(f_param),
2406
                          beautify_params(b_param))]:
2407
        self._ErrorIf(a != b, constants.CV_ENODEOS, ninfo.name,
2408
                      "OS %s for %s differs from reference node %s:"
2409
                      " [%s] vs. [%s]", kind, os_name,
2410
                      self.cfg.GetNodeName(base.uuid),
2411
                      utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2412

    
2413
    # check any missing OSes
2414
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2415
    self._ErrorIf(missing, constants.CV_ENODEOS, ninfo.name,
2416
                  "OSes present on reference node %s"
2417
                  " but missing on this node: %s",
2418
                  self.cfg.GetNodeName(base.uuid), utils.CommaJoin(missing))
2419

    
2420
  def _VerifyAcceptedFileStoragePaths(self, ninfo, nresult, is_master):
2421
    """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2422

2423
    @type ninfo: L{objects.Node}
2424
    @param ninfo: the node to check
2425
    @param nresult: the remote results for the node
2426
    @type is_master: bool
2427
    @param is_master: Whether node is the master node
2428

2429
    """
2430
    cluster = self.cfg.GetClusterInfo()
2431
    if (is_master and
2432
        (cluster.IsFileStorageEnabled() or
2433
         cluster.IsSharedFileStorageEnabled())):
2434
      try:
2435
        fspaths = nresult[constants.NV_ACCEPTED_STORAGE_PATHS]
2436
      except KeyError:
2437
        # This should never happen
2438
        self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2439
                      "Node did not return forbidden file storage paths")
2440
      else:
2441
        self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2442
                      "Found forbidden file storage paths: %s",
2443
                      utils.CommaJoin(fspaths))
2444
    else:
2445
      self._ErrorIf(constants.NV_ACCEPTED_STORAGE_PATHS in nresult,
2446
                    constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2447
                    "Node should not have returned forbidden file storage"
2448
                    " paths")
2449

    
2450
  def _VerifyStoragePaths(self, ninfo, nresult, file_disk_template,
2451
                          verify_key, error_key):
2452
    """Verifies (file) storage paths.
2453

2454
    @type ninfo: L{objects.Node}
2455
    @param ninfo: the node to check
2456
    @param nresult: the remote results for the node
2457
    @type file_disk_template: string
2458
    @param file_disk_template: file-based disk template, whose directory
2459
        is supposed to be verified
2460
    @type verify_key: string
2461
    @param verify_key: key for the verification map of this file
2462
        verification step
2463
    @param error_key: error key to be added to the verification results
2464
        in case something goes wrong in this verification step
2465

2466
    """
2467
    assert (file_disk_template in
2468
            utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
2469
    cluster = self.cfg.GetClusterInfo()
2470
    if cluster.IsDiskTemplateEnabled(file_disk_template):
2471
      self._ErrorIf(
2472
          verify_key in nresult,
2473
          error_key, ninfo.name,
2474
          "The configured %s storage path is unusable: %s" %
2475
          (file_disk_template, nresult.get(verify_key)))
2476

    
2477
  def _VerifyFileStoragePaths(self, ninfo, nresult):
2478
    """Verifies (file) storage paths.
2479

2480
    @see: C{_VerifyStoragePaths}
2481

2482
    """
2483
    self._VerifyStoragePaths(
2484
        ninfo, nresult, constants.DT_FILE,
2485
        constants.NV_FILE_STORAGE_PATH,
2486
        constants.CV_ENODEFILESTORAGEPATHUNUSABLE)
2487

    
2488
  def _VerifySharedFileStoragePaths(self, ninfo, nresult):
2489
    """Verifies (file) storage paths.
2490

2491
    @see: C{_VerifyStoragePaths}
2492

2493
    """
2494
    self._VerifyStoragePaths(
2495
        ninfo, nresult, constants.DT_SHARED_FILE,
2496
        constants.NV_SHARED_FILE_STORAGE_PATH,
2497
        constants.CV_ENODESHAREDFILESTORAGEPATHUNUSABLE)
2498

    
2499
  def _VerifyOob(self, ninfo, nresult):
2500
    """Verifies out of band functionality of a node.
2501

2502
    @type ninfo: L{objects.Node}
2503
    @param ninfo: the node to check
2504
    @param nresult: the remote results for the node
2505

2506
    """
2507
    # We just have to verify the paths on master and/or master candidates
2508
    # as the oob helper is invoked on the master
2509
    if ((ninfo.master_candidate or ninfo.master_capable) and
2510
        constants.NV_OOB_PATHS in nresult):
2511
      for path_result in nresult[constants.NV_OOB_PATHS]:
2512
        self._ErrorIf(path_result, constants.CV_ENODEOOBPATH,
2513
                      ninfo.name, path_result)
2514

    
2515
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2516
    """Verifies and updates the node volume data.
2517

2518
    This function will update a L{NodeImage}'s internal structures
2519
    with data from the remote call.
2520

2521
    @type ninfo: L{objects.Node}
2522
    @param ninfo: the node to check
2523
    @param nresult: the remote results for the node
2524
    @param nimg: the node image object
2525
    @param vg_name: the configured VG name
2526

2527
    """
2528
    nimg.lvm_fail = True
2529
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2530
    if vg_name is None:
2531
      pass
2532
    elif isinstance(lvdata, basestring):
2533
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2534
                    "LVM problem on node: %s", utils.SafeEncode(lvdata))
2535
    elif not isinstance(lvdata, dict):
2536
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2537
                    "rpc call to node failed (lvlist)")
2538
    else:
2539
      nimg.volumes = lvdata
2540
      nimg.lvm_fail = False
2541

    
2542
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2543
    """Verifies and updates the node instance list.
2544

2545
    If the listing was successful, then updates this node's instance
2546
    list. Otherwise, it marks the RPC call as failed for the instance
2547
    list key.
2548

2549
    @type ninfo: L{objects.Node}
2550
    @param ninfo: the node to check
2551
    @param nresult: the remote results for the node
2552
    @param nimg: the node image object
2553

2554
    """
2555
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2556
    test = not isinstance(idata, list)
2557
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2558
                  "rpc call to node failed (instancelist): %s",
2559
                  utils.SafeEncode(str(idata)))
2560
    if test:
2561
      nimg.hyp_fail = True
2562
    else:
2563
      nimg.instances = [inst.uuid for (_, inst) in
2564
                        self.cfg.GetMultiInstanceInfoByName(idata)]
2565

    
2566
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2567
    """Verifies and computes a node information map
2568

2569
    @type ninfo: L{objects.Node}
2570
    @param ninfo: the node to check
2571
    @param nresult: the remote results for the node
2572
    @param nimg: the node image object
2573
    @param vg_name: the configured VG name
2574

2575
    """
2576
    # try to read free memory (from the hypervisor)
2577
    hv_info = nresult.get(constants.NV_HVINFO, None)
2578
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2579
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2580
                  "rpc call to node failed (hvinfo)")
2581
    if not test:
2582
      try:
2583
        nimg.mfree = int(hv_info["memory_free"])
2584
      except (ValueError, TypeError):
2585
        self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2586
                      "node returned invalid nodeinfo, check hypervisor")
2587

    
2588
    # FIXME: devise a free space model for file based instances as well
2589
    if vg_name is not None:
2590
      test = (constants.NV_VGLIST not in nresult or
2591
              vg_name not in nresult[constants.NV_VGLIST])
2592
      self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
2593
                    "node didn't return data for the volume group '%s'"
2594
                    " - it is either missing or broken", vg_name)
2595
      if not test:
2596
        try:
2597
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2598
        except (ValueError, TypeError):
2599
          self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2600
                        "node returned invalid LVM info, check LVM status")
2601

    
2602
  def _CollectDiskInfo(self, node_uuids, node_image, instanceinfo):
2603
    """Gets per-disk status information for all instances.
2604

2605
    @type node_uuids: list of strings
2606
    @param node_uuids: Node UUIDs
2607
    @type node_image: dict of (UUID, L{objects.Node})
2608
    @param node_image: Node objects
2609
    @type instanceinfo: dict of (UUID, L{objects.Instance})
2610
    @param instanceinfo: Instance objects
2611
    @rtype: {instance: {node: [(succes, payload)]}}
2612
    @return: a dictionary of per-instance dictionaries with nodes as
2613
        keys and disk information as values; the disk information is a
2614
        list of tuples (success, payload)
2615

2616
    """
2617
    node_disks = {}
2618
    node_disks_devonly = {}
2619
    diskless_instances = set()
2620
    diskless = constants.DT_DISKLESS
2621

    
2622
    for nuuid in node_uuids:
2623
      node_inst_uuids = list(itertools.chain(node_image[nuuid].pinst,
2624
                                             node_image[nuuid].sinst))
2625
      diskless_instances.update(uuid for uuid in node_inst_uuids
2626
                                if instanceinfo[uuid].disk_template == diskless)
2627
      disks = [(inst_uuid, disk)
2628
               for inst_uuid in node_inst_uuids
2629
               for disk in instanceinfo[inst_uuid].disks]
2630

    
2631
      if not disks:
2632
        # No need to collect data
2633
        continue
2634

    
2635
      node_disks[nuuid] = disks
2636

    
2637
      # _AnnotateDiskParams makes already copies of the disks
2638
      devonly = []
2639
      for (inst_uuid, dev) in disks:
2640
        (anno_disk,) = AnnotateDiskParams(instanceinfo[inst_uuid], [dev],
2641
                                          self.cfg)
2642
        self.cfg.SetDiskID(anno_disk, nuuid)
2643
        devonly.append(anno_disk)
2644

    
2645
      node_disks_devonly[nuuid] = devonly
2646

    
2647
    assert len(node_disks) == len(node_disks_devonly)
2648

    
2649
    # Collect data from all nodes with disks
2650
    result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2651
                                                          node_disks_devonly)
2652

    
2653
    assert len(result) == len(node_disks)
2654

    
2655
    instdisk = {}
2656

    
2657
    for (nuuid, nres) in result.items():
2658
      node = self.cfg.GetNodeInfo(nuuid)
2659
      disks = node_disks[node.uuid]
2660

    
2661
      if nres.offline:
2662
        # No data from this node
2663
        data = len(disks) * [(False, "node offline")]
2664
      else:
2665
        msg = nres.fail_msg
2666
        self._ErrorIf(msg, constants.CV_ENODERPC, node.name,
2667
                      "while getting disk information: %s", msg)
2668
        if msg:
2669
          # No data from this node
2670
          data = len(disks) * [(False, msg)]
2671
        else:
2672
          data = []
2673
          for idx, i in enumerate(nres.payload):
2674
            if isinstance(i, (tuple, list)) and len(i) == 2:
2675
              data.append(i)
2676
            else:
2677
              logging.warning("Invalid result from node %s, entry %d: %s",
2678
                              node.name, idx, i)
2679
              data.append((False, "Invalid result from the remote node"))
2680

    
2681
      for ((inst_uuid, _), status) in zip(disks, data):
2682
        instdisk.setdefault(inst_uuid, {}).setdefault(node.uuid, []) \
2683
          .append(status)
2684

    
2685
    # Add empty entries for diskless instances.
2686
    for inst_uuid in diskless_instances:
2687
      assert inst_uuid not in instdisk
2688
      instdisk[inst_uuid] = {}
2689

    
2690
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2691
                      len(nuuids) <= len(instanceinfo[inst].all_nodes) and
2692
                      compat.all(isinstance(s, (tuple, list)) and
2693
                                 len(s) == 2 for s in statuses)
2694
                      for inst, nuuids in instdisk.items()
2695
                      for nuuid, statuses in nuuids.items())
2696
    if __debug__:
2697
      instdisk_keys = set(instdisk)
2698
      instanceinfo_keys = set(instanceinfo)
2699
      assert instdisk_keys == instanceinfo_keys, \
2700
        ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2701
         (instdisk_keys, instanceinfo_keys))
2702

    
2703
    return instdisk
2704

    
2705
  @staticmethod
2706
  def _SshNodeSelector(group_uuid, all_nodes):
2707
    """Create endless iterators for all potential SSH check hosts.
2708

2709
    """
2710
    nodes = [node for node in all_nodes
2711
             if (node.group != group_uuid and
2712
                 not node.offline)]
2713
    keyfunc = operator.attrgetter("group")
2714

    
2715
    return map(itertools.cycle,
2716
               [sorted(map(operator.attrgetter("name"), names))
2717
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2718
                                                  keyfunc)])
2719

    
2720
  @classmethod
2721
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2722
    """Choose which nodes should talk to which other nodes.
2723

2724
    We will make nodes contact all nodes in their group, and one node from
2725
    every other group.
2726

2727
    @warning: This algorithm has a known issue if one node group is much
2728
      smaller than others (e.g. just one node). In such a case all other
2729
      nodes will talk to the single node.
2730

2731
    """
2732
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2733
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
2734

    
2735
    return (online_nodes,
2736
            dict((name, sorted([i.next() for i in sel]))
2737
                 for name in online_nodes))
2738

    
2739
  def BuildHooksEnv(self):
2740
    """Build hooks env.
2741

2742
    Cluster-Verify hooks just ran in the post phase and their failure makes
2743
    the output be logged in the verify output and the verification to fail.
2744

2745
    """
2746
    env = {
2747
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2748
      }
2749

    
2750
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2751
               for node in self.my_node_info.values())
2752

    
2753
    return env
2754

    
2755
  def BuildHooksNodes(self):
2756
    """Build hooks nodes.
2757

2758
    """
2759
    return ([], list(self.my_node_info.keys()))
2760

    
2761
  def Exec(self, feedback_fn):
2762
    """Verify integrity of the node group, performing various test on nodes.
2763

2764
    """
2765
    # This method has too many local variables. pylint: disable=R0914
2766
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
2767

    
2768
    if not self.my_node_uuids:
2769
      # empty node group
2770
      feedback_fn("* Empty node group, skipping verification")
2771
      return True
2772

    
2773
    self.bad = False
2774
    verbose = self.op.verbose
2775
    self._feedback_fn = feedback_fn
2776

    
2777
    vg_name = self.cfg.GetVGName()
2778
    drbd_helper = self.cfg.GetDRBDHelper()
2779
    cluster = self.cfg.GetClusterInfo()
2780
    hypervisors = cluster.enabled_hypervisors
2781
    node_data_list = self.my_node_info.values()
2782

    
2783
    i_non_redundant = [] # Non redundant instances
2784
    i_non_a_balanced = [] # Non auto-balanced instances
2785
    i_offline = 0 # Count of offline instances
2786
    n_offline = 0 # Count of offline nodes
2787
    n_drained = 0 # Count of nodes being drained
2788
    node_vol_should = {}
2789

    
2790
    # FIXME: verify OS list
2791

    
2792
    # File verification
2793
    filemap = ComputeAncillaryFiles(cluster, False)
2794

    
2795
    # do local checksums
2796
    master_node_uuid = self.master_node = self.cfg.GetMasterNode()
2797
    master_ip = self.cfg.GetMasterIP()
2798

    
2799
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_uuids))
2800

    
2801
    user_scripts = []
2802
    if self.cfg.GetUseExternalMipScript():
2803
      user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2804

    
2805
    node_verify_param = {
2806
      constants.NV_FILELIST:
2807
        map(vcluster.MakeVirtualPath,
2808
            utils.UniqueSequence(filename
2809
                                 for files in filemap
2810
                                 for filename in files)),
2811
      constants.NV_NODELIST:
2812
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2813
                                  self.all_node_info.values()),
2814
      constants.NV_HYPERVISOR: hypervisors,
2815
      constants.NV_HVPARAMS:
2816
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2817
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2818
                                 for node in node_data_list
2819
                                 if not node.offline],
2820
      constants.NV_INSTANCELIST: hypervisors,
2821
      constants.NV_VERSION: None,
2822
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2823
      constants.NV_NODESETUP: None,
2824
      constants.NV_TIME: None,
2825
      constants.NV_MASTERIP: (self.cfg.GetMasterNodeName(), master_ip),
2826
      constants.NV_OSLIST: None,
2827
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2828
      constants.NV_USERSCRIPTS: user_scripts,
2829
      }
2830

    
2831
    if vg_name is not None:
2832
      node_verify_param[constants.NV_VGLIST] = None
2833
      node_verify_param[constants.NV_LVLIST] = vg_name
2834
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2835

    
2836
    if drbd_helper:
2837
      node_verify_param[constants.NV_DRBDVERSION] = None
2838
      node_verify_param[constants.NV_DRBDLIST] = None
2839
      node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2840

    
2841
    if cluster.IsFileStorageEnabled() or \
2842
        cluster.IsSharedFileStorageEnabled():
2843
      # Load file storage paths only from master node
2844
      node_verify_param[constants.NV_ACCEPTED_STORAGE_PATHS] = \
2845
        self.cfg.GetMasterNodeName()
2846
      if cluster.IsFileStorageEnabled():
2847
        node_verify_param[constants.NV_FILE_STORAGE_PATH] = \
2848
          cluster.file_storage_dir
2849

    
2850
    # bridge checks
2851
    # FIXME: this needs to be changed per node-group, not cluster-wide
2852
    bridges = set()
2853
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2854
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2855
      bridges.add(default_nicpp[constants.NIC_LINK])
2856
    for inst_uuid in self.my_inst_info.values():
2857
      for nic in inst_uuid.nics:
2858
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
2859
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2860
          bridges.add(full_nic[constants.NIC_LINK])
2861

    
2862
    if bridges:
2863
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
2864

    
2865
    # Build our expected cluster state
2866
    node_image = dict((node.uuid, self.NodeImage(offline=node.offline,
2867
                                                 uuid=node.uuid,
2868
                                                 vm_capable=node.vm_capable))
2869
                      for node in node_data_list)
2870

    
2871
    # Gather OOB paths
2872
    oob_paths = []
2873
    for node in self.all_node_info.values():
2874
      path = SupportsOob(self.cfg, node)
2875
      if path and path not in oob_paths:
2876
        oob_paths.append(path)
2877

    
2878
    if oob_paths:
2879
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2880

    
2881
    for inst_uuid in self.my_inst_uuids:
2882
      instance = self.my_inst_info[inst_uuid]
2883
      if instance.admin_state == constants.ADMINST_OFFLINE:
2884
        i_offline += 1
2885

    
2886
      for nuuid in instance.all_nodes:
2887
        if nuuid not in node_image:
2888
          gnode = self.NodeImage(uuid=nuuid)
2889
          gnode.ghost = (nuuid not in self.all_node_info)
2890
          node_image[nuuid] = gnode
2891

    
2892
      instance.MapLVsByNode(node_vol_should)
2893

    
2894
      pnode = instance.primary_node
2895
      node_image[pnode].pinst.append(instance.uuid)
2896

    
2897
      for snode in instance.secondary_nodes:
2898
        nimg = node_image[snode]
2899
        nimg.sinst.append(instance.uuid)
2900
        if pnode not in nimg.sbp:
2901
          nimg.sbp[pnode] = []
2902
        nimg.sbp[pnode].append(instance.uuid)
2903

    
2904
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
2905
                                               self.my_node_info.keys())
2906
    # The value of exclusive_storage should be the same across the group, so if
2907
    # it's True for at least a node, we act as if it were set for all the nodes
2908
    self._exclusive_storage = compat.any(es_flags.values())
2909
    if self._exclusive_storage:
2910
      node_verify_param[constants.NV_EXCLUSIVEPVS] = True
2911

    
2912
    # At this point, we have the in-memory data structures complete,
2913
    # except for the runtime information, which we'll gather next
2914

    
2915
    # Due to the way our RPC system works, exact response times cannot be
2916
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2917
    # time before and after executing the request, we can at least have a time
2918
    # window.
2919
    nvinfo_starttime = time.time()
2920
    all_nvinfo = self.rpc.call_node_verify(self.my_node_uuids,
2921
                                           node_verify_param,
2922
                                           self.cfg.GetClusterName(),
2923
                                           self.cfg.GetClusterInfo().hvparams)
2924
    nvinfo_endtime = time.time()
2925

    
2926
    if self.extra_lv_nodes and vg_name is not None:
2927
      extra_lv_nvinfo = \
2928
          self.rpc.call_node_verify(self.extra_lv_nodes,
2929
                                    {constants.NV_LVLIST: vg_name},
2930
                                    self.cfg.GetClusterName(),
2931
                                    self.cfg.GetClusterInfo().hvparams)
2932
    else:
2933
      extra_lv_nvinfo = {}
2934

    
2935
    all_drbd_map = self.cfg.ComputeDRBDMap()
2936

    
2937
    feedback_fn("* Gathering disk information (%s nodes)" %
2938
                len(self.my_node_uuids))
2939
    instdisk = self._CollectDiskInfo(self.my_node_info.keys(), node_image,
2940
                                     self.my_inst_info)
2941

    
2942
    feedback_fn("* Verifying configuration file consistency")
2943

    
2944
    # If not all nodes are being checked, we need to make sure the master node
2945
    # and a non-checked vm_capable node are in the list.
2946
    absent_node_uuids = set(self.all_node_info).difference(self.my_node_info)
2947
    if absent_node_uuids:
2948
      vf_nvinfo = all_nvinfo.copy()
2949
      vf_node_info = list(self.my_node_info.values())
2950
      additional_node_uuids = []
2951
      if master_node_uuid not in self.my_node_info:
2952
        additional_node_uuids.append(master_node_uuid)
2953
        vf_node_info.append(self.all_node_info[master_node_uuid])
2954
      # Add the first vm_capable node we find which is not included,
2955
      # excluding the master node (which we already have)
2956
      for node_uuid in absent_node_uuids:
2957
        nodeinfo = self.all_node_info[node_uuid]
2958
        if (nodeinfo.vm_capable and not nodeinfo.offline and
2959
            node_uuid != master_node_uuid):
2960
          additional_node_uuids.append(node_uuid)
2961
          vf_node_info.append(self.all_node_info[node_uuid])
2962
          break
2963
      key = constants.NV_FILELIST
2964
      vf_nvinfo.update(self.rpc.call_node_verify(
2965
         additional_node_uuids, {key: node_verify_param[key]},
2966
         self.cfg.GetClusterName(), self.cfg.GetClusterInfo().hvparams))
2967
    else:
2968
      vf_nvinfo = all_nvinfo
2969
      vf_node_info = self.my_node_info.values()
2970

    
2971
    self._VerifyFiles(vf_node_info, master_node_uuid, vf_nvinfo, filemap)
2972

    
2973
    feedback_fn("* Verifying node status")
2974

    
2975
    refos_img = None
2976

    
2977
    for node_i in node_data_list:
2978
      nimg = node_image[node_i.uuid]
2979

    
2980
      if node_i.offline:
2981
        if verbose:
2982
          feedback_fn("* Skipping offline node %s" % (node_i.name,))
2983
        n_offline += 1
2984
        continue
2985

    
2986
      if node_i.uuid == master_node_uuid:
2987
        ntype = "master"
2988
      elif node_i.master_candidate:
2989
        ntype = "master candidate"
2990
      elif node_i.drained:
2991
        ntype = "drained"
2992
        n_drained += 1
2993
      else:
2994
        ntype = "regular"
2995
      if verbose:
2996
        feedback_fn("* Verifying node %s (%s)" % (node_i.name, ntype))
2997

    
2998
      msg = all_nvinfo[node_i.uuid].fail_msg
2999
      self._ErrorIf(msg, constants.CV_ENODERPC, node_i.name,
3000
                    "while contacting node: %s", msg)
3001
      if msg:
3002
        nimg.rpc_fail = True
3003
        continue
3004

    
3005
      nresult = all_nvinfo[node_i.uuid].payload
3006

    
3007
      nimg.call_ok = self._VerifyNode(node_i, nresult)
3008
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
3009
      self._VerifyNodeNetwork(node_i, nresult)
3010
      self._VerifyNodeUserScripts(node_i, nresult)
3011
      self._VerifyOob(node_i, nresult)
3012
      self._VerifyAcceptedFileStoragePaths(node_i, nresult,
3013
                                           node_i.uuid == master_node_uuid)
3014
      self._VerifyFileStoragePaths(node_i, nresult)
3015
      self._VerifySharedFileStoragePaths(node_i, nresult)
3016

    
3017
      if nimg.vm_capable:
3018
        self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
3019
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
3020
                             all_drbd_map)
3021

    
3022
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
3023
        self._UpdateNodeInstances(node_i, nresult, nimg)
3024
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
3025
        self._UpdateNodeOS(node_i, nresult, nimg)
3026

    
3027
        if not nimg.os_fail:
3028
          if refos_img is None:
3029
            refos_img = nimg
3030
          self._VerifyNodeOS(node_i, nimg, refos_img)
3031
        self._VerifyNodeBridges(node_i, nresult, bridges)
3032

    
3033
        # Check whether all running instances are primary for the node. (This
3034
        # can no longer be done from _VerifyInstance below, since some of the
3035
        # wrong instances could be from other node groups.)
3036
        non_primary_inst_uuids = set(nimg.instances).difference(nimg.pinst)
3037

    
3038
        for inst_uuid in non_primary_inst_uuids:
3039
          test = inst_uuid in self.all_inst_info
3040
          self._ErrorIf(test, constants.CV_EINSTANCEWRONGNODE,
3041
                        self.cfg.GetInstanceName(inst_uuid),
3042
                        "instance should not run on node %s", node_i.name)
3043
          self._ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
3044
                        "node is running unknown instance %s", inst_uuid)
3045

    
3046
    self._VerifyGroupDRBDVersion(all_nvinfo)
3047
    self._VerifyGroupLVM(node_image, vg_name)
3048

    
3049
    for node_uuid, result in extra_lv_nvinfo.items():
3050
      self._UpdateNodeVolumes(self.all_node_info[node_uuid], result.payload,
3051
                              node_image[node_uuid], vg_name)
3052

    
3053
    feedback_fn("* Verifying instance status")
3054
    for inst_uuid in self.my_inst_uuids:
3055
      instance = self.my_inst_info[inst_uuid]
3056
      if verbose:
3057
        feedback_fn("* Verifying instance %s" % instance.name)
3058
      self._VerifyInstance(instance, node_image, instdisk[inst_uuid])
3059

    
3060
      # If the instance is non-redundant we cannot survive losing its primary
3061
      # node, so we are not N+1 compliant.
3062
      if instance.disk_template not in constants.DTS_MIRRORED:
3063
        i_non_redundant.append(instance)
3064

    
3065
      if not cluster.FillBE(instance)[constants.BE_AUTO_BALANCE]:
3066
        i_non_a_balanced.append(instance)
3067

    
3068
    feedback_fn("* Verifying orphan volumes")
3069
    reserved = utils.FieldSet(*cluster.reserved_lvs)
3070

    
3071
    # We will get spurious "unknown volume" warnings if any node of this group
3072
    # is secondary for an instance whose primary is in another group. To avoid
3073
    # them, we find these instances and add their volumes to node_vol_should.
3074
    for instance in self.all_inst_info.values():
3075
      for secondary in instance.secondary_nodes:
3076
        if (secondary in self.my_node_info
3077
            and instance.name not in self.my_inst_info):
3078
          instance.MapLVsByNode(node_vol_should)
3079
          break
3080

    
3081
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
3082

    
3083
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
3084
      feedback_fn("* Verifying N+1 Memory redundancy")
3085
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
3086

    
3087
    feedback_fn("* Other Notes")
3088
    if i_non_redundant:
3089
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
3090
                  % len(i_non_redundant))
3091

    
3092
    if i_non_a_balanced:
3093
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
3094
                  % len(i_non_a_balanced))
3095

    
3096
    if i_offline:
3097
      feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
3098

    
3099
    if n_offline:
3100
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
3101

    
3102
    if n_drained:
3103
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
3104

    
3105
    return not self.bad
3106

    
3107
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
3108
    """Analyze the post-hooks' result
3109

3110
    This method analyses the hook result, handles it, and sends some
3111
    nicely-formatted feedback back to the user.
3112

3113
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
3114
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
3115
    @param hooks_results: the results of the multi-node hooks rpc call
3116
    @param feedback_fn: function used send feedback back to the caller
3117
    @param lu_result: previous Exec result
3118
    @return: the new Exec result, based on the previous result
3119
        and hook results
3120

3121
    """
3122
    # We only really run POST phase hooks, only for non-empty groups,
3123
    # and are only interested in their results
3124
    if not self.my_node_uuids:
3125
      # empty node group
3126
      pass
3127
    elif phase == constants.HOOKS_PHASE_POST:
3128
      # Used to change hooks' output to proper indentation
3129
      feedback_fn("* Hooks Results")
3130
      assert hooks_results, "invalid result from hooks"
3131

    
3132
      for node_name in hooks_results:
3133
        res = hooks_results[node_name]
3134
        msg = res.fail_msg
3135
        test = msg and not res.offline
3136
        self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3137
                      "Communication failure in hooks execution: %s", msg)
3138
        if res.offline or msg:
3139
          # No need to investigate payload if node is offline or gave
3140
          # an error.
3141
          continue
3142
        for script, hkr, output in res.payload:
3143
          test = hkr == constants.HKR_FAIL
3144
          self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3145
                        "Script %s failed, output:", script)
3146
          if test:
3147
            output = self._HOOKS_INDENT_RE.sub("      ", output)
3148
            feedback_fn("%s" % output)
3149
            lu_result = False
3150

    
3151
    return lu_result
3152

    
3153

    
3154
class LUClusterVerifyDisks(NoHooksLU):
3155
  """Verifies the cluster disks status.
3156

3157
  """
3158
  REQ_BGL = False
3159

    
3160
  def ExpandNames(self):
3161
    self.share_locks = ShareAll()
3162
    self.needed_locks = {
3163
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
3164
      }
3165

    
3166
  def Exec(self, feedback_fn):
3167
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3168

    
3169
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3170
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3171
                           for group in group_names])