Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / cluster.py @ 9af7ece3

History | View | Annotate | Download (118.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with the cluster."""
23

    
24
import OpenSSL
25

    
26
import copy
27
import itertools
28
import logging
29
import operator
30
import os
31
import re
32
import time
33

    
34
from ganeti import compat
35
from ganeti import constants
36
from ganeti import errors
37
from ganeti import hypervisor
38
from ganeti import locking
39
from ganeti import masterd
40
from ganeti import netutils
41
from ganeti import objects
42
from ganeti import opcodes
43
from ganeti import pathutils
44
from ganeti import query
45
from ganeti import rpc
46
from ganeti import runtime
47
from ganeti import ssh
48
from ganeti import uidpool
49
from ganeti import utils
50
from ganeti import vcluster
51

    
52
from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
53
  ResultWithJobs
54
from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55
  ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56
  GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57
  GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58
  CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59
  ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob, \
60
  CheckIpolicyVsDiskTemplates
61

    
62
import ganeti.masterd.instance
63

    
64

    
65
class LUClusterActivateMasterIp(NoHooksLU):
66
  """Activate the master IP on the master node.
67

68
  """
69
  def Exec(self, feedback_fn):
70
    """Activate the master IP.
71

72
    """
73
    master_params = self.cfg.GetMasterNetworkParameters()
74
    ems = self.cfg.GetUseExternalMipScript()
75
    result = self.rpc.call_node_activate_master_ip(master_params.uuid,
76
                                                   master_params, ems)
77
    result.Raise("Could not activate the master IP")
78

    
79

    
80
class LUClusterDeactivateMasterIp(NoHooksLU):
81
  """Deactivate the master IP on the master node.
82

83
  """
84
  def Exec(self, feedback_fn):
85
    """Deactivate the master IP.
86

87
    """
88
    master_params = self.cfg.GetMasterNetworkParameters()
89
    ems = self.cfg.GetUseExternalMipScript()
90
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
91
                                                     master_params, ems)
92
    result.Raise("Could not deactivate the master IP")
93

    
94

    
95
class LUClusterConfigQuery(NoHooksLU):
96
  """Return configuration values.
97

98
  """
99
  REQ_BGL = False
100

    
101
  def CheckArguments(self):
102
    self.cq = ClusterQuery(None, self.op.output_fields, False)
103

    
104
  def ExpandNames(self):
105
    self.cq.ExpandNames(self)
106

    
107
  def DeclareLocks(self, level):
108
    self.cq.DeclareLocks(self, level)
109

    
110
  def Exec(self, feedback_fn):
111
    result = self.cq.OldStyleQuery(self)
112

    
113
    assert len(result) == 1
114

    
115
    return result[0]
116

    
117

    
118
class LUClusterDestroy(LogicalUnit):
119
  """Logical unit for destroying the cluster.
120

121
  """
122
  HPATH = "cluster-destroy"
123
  HTYPE = constants.HTYPE_CLUSTER
124

    
125
  def BuildHooksEnv(self):
126
    """Build hooks env.
127

128
    """
129
    return {
130
      "OP_TARGET": self.cfg.GetClusterName(),
131
      }
132

    
133
  def BuildHooksNodes(self):
134
    """Build hooks nodes.
135

136
    """
137
    return ([], [])
138

    
139
  def CheckPrereq(self):
140
    """Check prerequisites.
141

142
    This checks whether the cluster is empty.
143

144
    Any errors are signaled by raising errors.OpPrereqError.
145

146
    """
147
    master = self.cfg.GetMasterNode()
148

    
149
    nodelist = self.cfg.GetNodeList()
150
    if len(nodelist) != 1 or nodelist[0] != master:
151
      raise errors.OpPrereqError("There are still %d node(s) in"
152
                                 " this cluster." % (len(nodelist) - 1),
153
                                 errors.ECODE_INVAL)
154
    instancelist = self.cfg.GetInstanceList()
155
    if instancelist:
156
      raise errors.OpPrereqError("There are still %d instance(s) in"
157
                                 " this cluster." % len(instancelist),
158
                                 errors.ECODE_INVAL)
159

    
160
  def Exec(self, feedback_fn):
161
    """Destroys the cluster.
162

163
    """
164
    master_params = self.cfg.GetMasterNetworkParameters()
165

    
166
    # Run post hooks on master node before it's removed
167
    RunPostHook(self, self.cfg.GetNodeName(master_params.uuid))
168

    
169
    ems = self.cfg.GetUseExternalMipScript()
170
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
171
                                                     master_params, ems)
172
    result.Warn("Error disabling the master IP address", self.LogWarning)
173
    return master_params.uuid
174

    
175

    
176
class LUClusterPostInit(LogicalUnit):
177
  """Logical unit for running hooks after cluster initialization.
178

179
  """
180
  HPATH = "cluster-init"
181
  HTYPE = constants.HTYPE_CLUSTER
182

    
183
  def BuildHooksEnv(self):
184
    """Build hooks env.
185

186
    """
187
    return {
188
      "OP_TARGET": self.cfg.GetClusterName(),
189
      }
190

    
191
  def BuildHooksNodes(self):
192
    """Build hooks nodes.
193

194
    """
195
    return ([], [self.cfg.GetMasterNode()])
196

    
197
  def Exec(self, feedback_fn):
198
    """Nothing to do.
199

200
    """
201
    return True
202

    
203

    
204
class ClusterQuery(QueryBase):
205
  FIELDS = query.CLUSTER_FIELDS
206

    
207
  #: Do not sort (there is only one item)
208
  SORT_FIELD = None
209

    
210
  def ExpandNames(self, lu):
211
    lu.needed_locks = {}
212

    
213
    # The following variables interact with _QueryBase._GetNames
214
    self.wanted = locking.ALL_SET
215
    self.do_locking = self.use_locking
216

    
217
    if self.do_locking:
218
      raise errors.OpPrereqError("Can not use locking for cluster queries",
219
                                 errors.ECODE_INVAL)
220

    
221
  def DeclareLocks(self, lu, level):
222
    pass
223

    
224
  def _GetQueryData(self, lu):
225
    """Computes the list of nodes and their attributes.
226

227
    """
228
    # Locking is not used
229
    assert not (compat.any(lu.glm.is_owned(level)
230
                           for level in locking.LEVELS
231
                           if level != locking.LEVEL_CLUSTER) or
232
                self.do_locking or self.use_locking)
233

    
234
    if query.CQ_CONFIG in self.requested_data:
235
      cluster = lu.cfg.GetClusterInfo()
236
      nodes = lu.cfg.GetAllNodesInfo()
237
    else:
238
      cluster = NotImplemented
239
      nodes = NotImplemented
240

    
241
    if query.CQ_QUEUE_DRAINED in self.requested_data:
242
      drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
243
    else:
244
      drain_flag = NotImplemented
245

    
246
    if query.CQ_WATCHER_PAUSE in self.requested_data:
247
      master_node_uuid = lu.cfg.GetMasterNode()
248

    
249
      result = lu.rpc.call_get_watcher_pause(master_node_uuid)
250
      result.Raise("Can't retrieve watcher pause from master node '%s'" %
251
                   lu.cfg.GetMasterNodeName())
252

    
253
      watcher_pause = result.payload
254
    else:
255
      watcher_pause = NotImplemented
256

    
257
    return query.ClusterQueryData(cluster, nodes, drain_flag, watcher_pause)
258

    
259

    
260
class LUClusterQuery(NoHooksLU):
261
  """Query cluster configuration.
262

263
  """
264
  REQ_BGL = False
265

    
266
  def ExpandNames(self):
267
    self.needed_locks = {}
268

    
269
  def Exec(self, feedback_fn):
270
    """Return cluster config.
271

272
    """
273
    cluster = self.cfg.GetClusterInfo()
274
    os_hvp = {}
275

    
276
    # Filter just for enabled hypervisors
277
    for os_name, hv_dict in cluster.os_hvp.items():
278
      os_hvp[os_name] = {}
279
      for hv_name, hv_params in hv_dict.items():
280
        if hv_name in cluster.enabled_hypervisors:
281
          os_hvp[os_name][hv_name] = hv_params
282

    
283
    # Convert ip_family to ip_version
284
    primary_ip_version = constants.IP4_VERSION
285
    if cluster.primary_ip_family == netutils.IP6Address.family:
286
      primary_ip_version = constants.IP6_VERSION
287

    
288
    result = {
289
      "software_version": constants.RELEASE_VERSION,
290
      "protocol_version": constants.PROTOCOL_VERSION,
291
      "config_version": constants.CONFIG_VERSION,
292
      "os_api_version": max(constants.OS_API_VERSIONS),
293
      "export_version": constants.EXPORT_VERSION,
294
      "vcs_version": constants.VCS_VERSION,
295
      "architecture": runtime.GetArchInfo(),
296
      "name": cluster.cluster_name,
297
      "master": self.cfg.GetMasterNodeName(),
298
      "default_hypervisor": cluster.primary_hypervisor,
299
      "enabled_hypervisors": cluster.enabled_hypervisors,
300
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
301
                        for hypervisor_name in cluster.enabled_hypervisors]),
302
      "os_hvp": os_hvp,
303
      "beparams": cluster.beparams,
304
      "osparams": cluster.osparams,
305
      "ipolicy": cluster.ipolicy,
306
      "nicparams": cluster.nicparams,
307
      "ndparams": cluster.ndparams,
308
      "diskparams": cluster.diskparams,
309
      "candidate_pool_size": cluster.candidate_pool_size,
310
      "master_netdev": cluster.master_netdev,
311
      "master_netmask": cluster.master_netmask,
312
      "use_external_mip_script": cluster.use_external_mip_script,
313
      "volume_group_name": cluster.volume_group_name,
314
      "drbd_usermode_helper": cluster.drbd_usermode_helper,
315
      "file_storage_dir": cluster.file_storage_dir,
316
      "shared_file_storage_dir": cluster.shared_file_storage_dir,
317
      "maintain_node_health": cluster.maintain_node_health,
318
      "ctime": cluster.ctime,
319
      "mtime": cluster.mtime,
320
      "uuid": cluster.uuid,
321
      "tags": list(cluster.GetTags()),
322
      "uid_pool": cluster.uid_pool,
323
      "default_iallocator": cluster.default_iallocator,
324
      "reserved_lvs": cluster.reserved_lvs,
325
      "primary_ip_version": primary_ip_version,
326
      "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
327
      "hidden_os": cluster.hidden_os,
328
      "blacklisted_os": cluster.blacklisted_os,
329
      "enabled_disk_templates": cluster.enabled_disk_templates,
330
      }
331

    
332
    return result
333

    
334

    
335
class LUClusterRedistConf(NoHooksLU):
336
  """Force the redistribution of cluster configuration.
337

338
  This is a very simple LU.
339

340
  """
341
  REQ_BGL = False
342

    
343
  def ExpandNames(self):
344
    self.needed_locks = {
345
      locking.LEVEL_NODE: locking.ALL_SET,
346
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
347
    }
348
    self.share_locks = ShareAll()
349

    
350
  def Exec(self, feedback_fn):
351
    """Redistribute the configuration.
352

353
    """
354
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
355
    RedistributeAncillaryFiles(self)
356

    
357

    
358
class LUClusterRename(LogicalUnit):
359
  """Rename the cluster.
360

361
  """
362
  HPATH = "cluster-rename"
363
  HTYPE = constants.HTYPE_CLUSTER
364

    
365
  def BuildHooksEnv(self):
366
    """Build hooks env.
367

368
    """
369
    return {
370
      "OP_TARGET": self.cfg.GetClusterName(),
371
      "NEW_NAME": self.op.name,
372
      }
373

    
374
  def BuildHooksNodes(self):
375
    """Build hooks nodes.
376

377
    """
378
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
379

    
380
  def CheckPrereq(self):
381
    """Verify that the passed name is a valid one.
382

383
    """
384
    hostname = netutils.GetHostname(name=self.op.name,
385
                                    family=self.cfg.GetPrimaryIPFamily())
386

    
387
    new_name = hostname.name
388
    self.ip = new_ip = hostname.ip
389
    old_name = self.cfg.GetClusterName()
390
    old_ip = self.cfg.GetMasterIP()
391
    if new_name == old_name and new_ip == old_ip:
392
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
393
                                 " cluster has changed",
394
                                 errors.ECODE_INVAL)
395
    if new_ip != old_ip:
396
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
397
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
398
                                   " reachable on the network" %
399
                                   new_ip, errors.ECODE_NOTUNIQUE)
400

    
401
    self.op.name = new_name
402

    
403
  def Exec(self, feedback_fn):
404
    """Rename the cluster.
405

406
    """
407
    clustername = self.op.name
408
    new_ip = self.ip
409

    
410
    # shutdown the master IP
411
    master_params = self.cfg.GetMasterNetworkParameters()
412
    ems = self.cfg.GetUseExternalMipScript()
413
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
414
                                                     master_params, ems)
415
    result.Raise("Could not disable the master role")
416

    
417
    try:
418
      cluster = self.cfg.GetClusterInfo()
419
      cluster.cluster_name = clustername
420
      cluster.master_ip = new_ip
421
      self.cfg.Update(cluster, feedback_fn)
422

    
423
      # update the known hosts file
424
      ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
425
      node_list = self.cfg.GetOnlineNodeList()
426
      try:
427
        node_list.remove(master_params.uuid)
428
      except ValueError:
429
        pass
430
      UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
431
    finally:
432
      master_params.ip = new_ip
433
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
434
                                                     master_params, ems)
435
      result.Warn("Could not re-enable the master role on the master,"
436
                  " please restart manually", self.LogWarning)
437

    
438
    return clustername
439

    
440

    
441
class LUClusterRepairDiskSizes(NoHooksLU):
442
  """Verifies the cluster disks sizes.
443

444
  """
445
  REQ_BGL = False
446

    
447
  def ExpandNames(self):
448
    if self.op.instances:
449
      (_, self.wanted_names) = GetWantedInstances(self, self.op.instances)
450
      # Not getting the node allocation lock as only a specific set of
451
      # instances (and their nodes) is going to be acquired
452
      self.needed_locks = {
453
        locking.LEVEL_NODE_RES: [],
454
        locking.LEVEL_INSTANCE: self.wanted_names,
455
        }
456
      self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
457
    else:
458
      self.wanted_names = None
459
      self.needed_locks = {
460
        locking.LEVEL_NODE_RES: locking.ALL_SET,
461
        locking.LEVEL_INSTANCE: locking.ALL_SET,
462

    
463
        # This opcode is acquires the node locks for all instances
464
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
465
        }
466

    
467
    self.share_locks = {
468
      locking.LEVEL_NODE_RES: 1,
469
      locking.LEVEL_INSTANCE: 0,
470
      locking.LEVEL_NODE_ALLOC: 1,
471
      }
472

    
473
  def DeclareLocks(self, level):
474
    if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
475
      self._LockInstancesNodes(primary_only=True, level=level)
476

    
477
  def CheckPrereq(self):
478
    """Check prerequisites.
479

480
    This only checks the optional instance list against the existing names.
481

482
    """
483
    if self.wanted_names is None:
484
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
485

    
486
    self.wanted_instances = \
487
        map(compat.snd, self.cfg.GetMultiInstanceInfoByName(self.wanted_names))
488

    
489
  def _EnsureChildSizes(self, disk):
490
    """Ensure children of the disk have the needed disk size.
491

492
    This is valid mainly for DRBD8 and fixes an issue where the
493
    children have smaller disk size.
494

495
    @param disk: an L{ganeti.objects.Disk} object
496

497
    """
498
    if disk.dev_type == constants.LD_DRBD8:
499
      assert disk.children, "Empty children for DRBD8?"
500
      fchild = disk.children[0]
501
      mismatch = fchild.size < disk.size
502
      if mismatch:
503
        self.LogInfo("Child disk has size %d, parent %d, fixing",
504
                     fchild.size, disk.size)
505
        fchild.size = disk.size
506

    
507
      # and we recurse on this child only, not on the metadev
508
      return self._EnsureChildSizes(fchild) or mismatch
509
    else:
510
      return False
511

    
512
  def Exec(self, feedback_fn):
513
    """Verify the size of cluster disks.
514

515
    """
516
    # TODO: check child disks too
517
    # TODO: check differences in size between primary/secondary nodes
518
    per_node_disks = {}
519
    for instance in self.wanted_instances:
520
      pnode = instance.primary_node
521
      if pnode not in per_node_disks:
522
        per_node_disks[pnode] = []
523
      for idx, disk in enumerate(instance.disks):
524
        per_node_disks[pnode].append((instance, idx, disk))
525

    
526
    assert not (frozenset(per_node_disks.keys()) -
527
                self.owned_locks(locking.LEVEL_NODE_RES)), \
528
      "Not owning correct locks"
529
    assert not self.owned_locks(locking.LEVEL_NODE)
530

    
531
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
532
                                               per_node_disks.keys())
533

    
534
    changed = []
535
    for node_uuid, dskl in per_node_disks.items():
536
      newl = [v[2].Copy() for v in dskl]
537
      for dsk in newl:
538
        self.cfg.SetDiskID(dsk, node_uuid)
539
      node_name = self.cfg.GetNodeName(node_uuid)
540
      result = self.rpc.call_blockdev_getdimensions(node_uuid, newl)
541
      if result.fail_msg:
542
        self.LogWarning("Failure in blockdev_getdimensions call to node"
543
                        " %s, ignoring", node_name)
544
        continue
545
      if len(result.payload) != len(dskl):
546
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
547
                        " result.payload=%s", node_name, len(dskl),
548
                        result.payload)
549
        self.LogWarning("Invalid result from node %s, ignoring node results",
550
                        node_name)
551
        continue
552
      for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
553
        if dimensions is None:
554
          self.LogWarning("Disk %d of instance %s did not return size"
555
                          " information, ignoring", idx, instance.name)
556
          continue
557
        if not isinstance(dimensions, (tuple, list)):
558
          self.LogWarning("Disk %d of instance %s did not return valid"
559
                          " dimension information, ignoring", idx,
560
                          instance.name)
561
          continue
562
        (size, spindles) = dimensions
563
        if not isinstance(size, (int, long)):
564
          self.LogWarning("Disk %d of instance %s did not return valid"
565
                          " size information, ignoring", idx, instance.name)
566
          continue
567
        size = size >> 20
568
        if size != disk.size:
569
          self.LogInfo("Disk %d of instance %s has mismatched size,"
570
                       " correcting: recorded %d, actual %d", idx,
571
                       instance.name, disk.size, size)
572
          disk.size = size
573
          self.cfg.Update(instance, feedback_fn)
574
          changed.append((instance.name, idx, "size", size))
575
        if es_flags[node_uuid]:
576
          if spindles is None:
577
            self.LogWarning("Disk %d of instance %s did not return valid"
578
                            " spindles information, ignoring", idx,
579
                            instance.name)
580
          elif disk.spindles is None or disk.spindles != spindles:
581
            self.LogInfo("Disk %d of instance %s has mismatched spindles,"
582
                         " correcting: recorded %s, actual %s",
583
                         idx, instance.name, disk.spindles, spindles)
584
            disk.spindles = spindles
585
            self.cfg.Update(instance, feedback_fn)
586
            changed.append((instance.name, idx, "spindles", disk.spindles))
587
        if self._EnsureChildSizes(disk):
588
          self.cfg.Update(instance, feedback_fn)
589
          changed.append((instance.name, idx, "size", disk.size))
590
    return changed
591

    
592

    
593
def _ValidateNetmask(cfg, netmask):
594
  """Checks if a netmask is valid.
595

596
  @type cfg: L{config.ConfigWriter}
597
  @param cfg: The cluster configuration
598
  @type netmask: int
599
  @param netmask: the netmask to be verified
600
  @raise errors.OpPrereqError: if the validation fails
601

602
  """
603
  ip_family = cfg.GetPrimaryIPFamily()
604
  try:
605
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
606
  except errors.ProgrammerError:
607
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
608
                               ip_family, errors.ECODE_INVAL)
609
  if not ipcls.ValidateNetmask(netmask):
610
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
611
                               (netmask), errors.ECODE_INVAL)
612

    
613

    
614
def CheckFileBasedStoragePathVsEnabledDiskTemplates(
615
    logging_warn_fn, file_storage_dir, enabled_disk_templates,
616
    file_disk_template):
617
  """Checks whether the given file-based storage directory is acceptable.
618

619
  Note: This function is public, because it is also used in bootstrap.py.
620

621
  @type logging_warn_fn: function
622
  @param logging_warn_fn: function which accepts a string and logs it
623
  @type file_storage_dir: string
624
  @param file_storage_dir: the directory to be used for file-based instances
625
  @type enabled_disk_templates: list of string
626
  @param enabled_disk_templates: the list of enabled disk templates
627
  @type file_disk_template: string
628
  @param file_disk_template: the file-based disk template for which the
629
      path should be checked
630

631
  """
632
  assert (file_disk_template in
633
          utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
634
  file_storage_enabled = file_disk_template in enabled_disk_templates
635
  if file_storage_dir is not None:
636
    if file_storage_dir == "":
637
      if file_storage_enabled:
638
        raise errors.OpPrereqError(
639
            "Unsetting the '%s' storage directory while having '%s' storage"
640
            " enabled is not permitted." %
641
            (file_disk_template, file_disk_template))
642
    else:
643
      if not file_storage_enabled:
644
        logging_warn_fn(
645
            "Specified a %s storage directory, although %s storage is not"
646
            " enabled." % (file_disk_template, file_disk_template))
647
  else:
648
    raise errors.ProgrammerError("Received %s storage dir with value"
649
                                 " 'None'." % file_disk_template)
650

    
651

    
652
def CheckFileStoragePathVsEnabledDiskTemplates(
653
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
654
  """Checks whether the given file storage directory is acceptable.
655

656
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
657

658
  """
659
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
660
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
661
      constants.DT_FILE)
662

    
663

    
664
def CheckSharedFileStoragePathVsEnabledDiskTemplates(
665
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
666
  """Checks whether the given shared file storage directory is acceptable.
667

668
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
669

670
  """
671
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
672
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
673
      constants.DT_SHARED_FILE)
674

    
675

    
676
class LUClusterSetParams(LogicalUnit):
677
  """Change the parameters of the cluster.
678

679
  """
680
  HPATH = "cluster-modify"
681
  HTYPE = constants.HTYPE_CLUSTER
682
  REQ_BGL = False
683

    
684
  def CheckArguments(self):
685
    """Check parameters
686

687
    """
688
    if self.op.uid_pool:
689
      uidpool.CheckUidPool(self.op.uid_pool)
690

    
691
    if self.op.add_uids:
692
      uidpool.CheckUidPool(self.op.add_uids)
693

    
694
    if self.op.remove_uids:
695
      uidpool.CheckUidPool(self.op.remove_uids)
696

    
697
    if self.op.master_netmask is not None:
698
      _ValidateNetmask(self.cfg, self.op.master_netmask)
699

    
700
    if self.op.diskparams:
701
      for dt_params in self.op.diskparams.values():
702
        utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
703
      try:
704
        utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
705
      except errors.OpPrereqError, err:
706
        raise errors.OpPrereqError("While verify diskparams options: %s" % err,
707
                                   errors.ECODE_INVAL)
708

    
709
  def ExpandNames(self):
710
    # FIXME: in the future maybe other cluster params won't require checking on
711
    # all nodes to be modified.
712
    # FIXME: This opcode changes cluster-wide settings. Is acquiring all
713
    # resource locks the right thing, shouldn't it be the BGL instead?
714
    self.needed_locks = {
715
      locking.LEVEL_NODE: locking.ALL_SET,
716
      locking.LEVEL_INSTANCE: locking.ALL_SET,
717
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
718
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
719
    }
720
    self.share_locks = ShareAll()
721

    
722
  def BuildHooksEnv(self):
723
    """Build hooks env.
724

725
    """
726
    return {
727
      "OP_TARGET": self.cfg.GetClusterName(),
728
      "NEW_VG_NAME": self.op.vg_name,
729
      }
730

    
731
  def BuildHooksNodes(self):
732
    """Build hooks nodes.
733

734
    """
735
    mn = self.cfg.GetMasterNode()
736
    return ([mn], [mn])
737

    
738
  def _CheckVgName(self, node_uuids, enabled_disk_templates,
739
                   new_enabled_disk_templates):
740
    """Check the consistency of the vg name on all nodes and in case it gets
741
       unset whether there are instances still using it.
742

743
    """
744
    lvm_is_enabled = utils.IsLvmEnabled(enabled_disk_templates)
745
    lvm_gets_enabled = utils.LvmGetsEnabled(enabled_disk_templates,
746
                                            new_enabled_disk_templates)
747
    current_vg_name = self.cfg.GetVGName()
748

    
749
    if self.op.vg_name == '':
750
      if lvm_is_enabled:
751
        raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
752
                                   " disk templates are or get enabled.")
753

    
754
    if self.op.vg_name is None:
755
      if current_vg_name is None and lvm_is_enabled:
756
        raise errors.OpPrereqError("Please specify a volume group when"
757
                                   " enabling lvm-based disk-templates.")
758

    
759
    if self.op.vg_name is not None and not self.op.vg_name:
760
      if self.cfg.HasAnyDiskOfType(constants.LD_LV):
761
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
762
                                   " instances exist", errors.ECODE_INVAL)
763

    
764
    if (self.op.vg_name is not None and lvm_is_enabled) or \
765
        (self.cfg.GetVGName() is not None and lvm_gets_enabled):
766
      self._CheckVgNameOnNodes(node_uuids)
767

    
768
  def _CheckVgNameOnNodes(self, node_uuids):
769
    """Check the status of the volume group on each node.
770

771
    """
772
    vglist = self.rpc.call_vg_list(node_uuids)
773
    for node_uuid in node_uuids:
774
      msg = vglist[node_uuid].fail_msg
775
      if msg:
776
        # ignoring down node
777
        self.LogWarning("Error while gathering data on node %s"
778
                        " (ignoring node): %s",
779
                        self.cfg.GetNodeName(node_uuid), msg)
780
        continue
781
      vgstatus = utils.CheckVolumeGroupSize(vglist[node_uuid].payload,
782
                                            self.op.vg_name,
783
                                            constants.MIN_VG_SIZE)
784
      if vgstatus:
785
        raise errors.OpPrereqError("Error on node '%s': %s" %
786
                                   (self.cfg.GetNodeName(node_uuid), vgstatus),
787
                                   errors.ECODE_ENVIRON)
788

    
789
  @staticmethod
790
  def _GetEnabledDiskTemplatesInner(op_enabled_disk_templates,
791
                                    old_enabled_disk_templates):
792
    """Determines the enabled disk templates and the subset of disk templates
793
       that are newly enabled by this operation.
794

795
    """
796
    enabled_disk_templates = None
797
    new_enabled_disk_templates = []
798
    if op_enabled_disk_templates:
799
      enabled_disk_templates = op_enabled_disk_templates
800
      new_enabled_disk_templates = \
801
        list(set(enabled_disk_templates)
802
             - set(old_enabled_disk_templates))
803
    else:
804
      enabled_disk_templates = old_enabled_disk_templates
805
    return (enabled_disk_templates, new_enabled_disk_templates)
806

    
807
  def _GetEnabledDiskTemplates(self, cluster):
808
    """Determines the enabled disk templates and the subset of disk templates
809
       that are newly enabled by this operation.
810

811
    """
812
    return self._GetEnabledDiskTemplatesInner(self.op.enabled_disk_templates,
813
                                              cluster.enabled_disk_templates)
814

    
815
  def _CheckIpolicy(self, cluster, enabled_disk_templates):
816
    """Checks the ipolicy.
817

818
    @type cluster: C{objects.Cluster}
819
    @param cluster: the cluster's configuration
820
    @type enabled_disk_templates: list of string
821
    @param enabled_disk_templates: list of (possibly newly) enabled disk
822
      templates
823

824
    """
825
    # FIXME: write unit tests for this
826
    if self.op.ipolicy:
827
      self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
828
                                           group_policy=False)
829

    
830
      CheckIpolicyVsDiskTemplates(self.new_ipolicy,
831
                                  enabled_disk_templates)
832

    
833
      all_instances = self.cfg.GetAllInstancesInfo().values()
834
      violations = set()
835
      for group in self.cfg.GetAllNodeGroupsInfo().values():
836
        instances = frozenset([inst for inst in all_instances
837
                               if compat.any(nuuid in group.members
838
                                             for nuuid in inst.all_nodes)])
839
        new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
840
        ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
841
        new = ComputeNewInstanceViolations(ipol, new_ipolicy, instances,
842
                                           self.cfg)
843
        if new:
844
          violations.update(new)
845

    
846
      if violations:
847
        self.LogWarning("After the ipolicy change the following instances"
848
                        " violate them: %s",
849
                        utils.CommaJoin(utils.NiceSort(violations)))
850
    else:
851
      CheckIpolicyVsDiskTemplates(cluster.ipolicy,
852
                                  enabled_disk_templates)
853

    
854
  def _CheckDrbdHelper(self, node_uuids):
855
    """Check the DRBD usermode helper.
856

857
    @type node_uuids: list of strings
858
    @param node_uuids: a list of nodes' UUIDs
859

860
    """
861
    if self.op.drbd_helper is not None and not self.op.drbd_helper:
862
      if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
863
        raise errors.OpPrereqError("Cannot disable drbd helper while"
864
                                   " drbd-based instances exist",
865
                                   errors.ECODE_INVAL)
866

    
867
    if self.op.drbd_helper:
868
      # checks given drbd helper on all nodes
869
      helpers = self.rpc.call_drbd_helper(node_uuids)
870
      for (_, ninfo) in self.cfg.GetMultiNodeInfo(node_uuids):
871
        if ninfo.offline:
872
          self.LogInfo("Not checking drbd helper on offline node %s",
873
                       ninfo.name)
874
          continue
875
        msg = helpers[ninfo.uuid].fail_msg
876
        if msg:
877
          raise errors.OpPrereqError("Error checking drbd helper on node"
878
                                     " '%s': %s" % (ninfo.name, msg),
879
                                     errors.ECODE_ENVIRON)
880
        node_helper = helpers[ninfo.uuid].payload
881
        if node_helper != self.op.drbd_helper:
882
          raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
883
                                     (ninfo.name, node_helper),
884
                                     errors.ECODE_ENVIRON)
885

    
886
  def CheckPrereq(self):
887
    """Check prerequisites.
888

889
    This checks whether the given params don't conflict and
890
    if the given volume group is valid.
891

892
    """
893
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
894
    self.cluster = cluster = self.cfg.GetClusterInfo()
895

    
896
    vm_capable_node_uuids = [node.uuid
897
                             for node in self.cfg.GetAllNodesInfo().values()
898
                             if node.uuid in node_uuids and node.vm_capable]
899

    
900
    (enabled_disk_templates, new_enabled_disk_templates) = \
901
      self._GetEnabledDiskTemplates(cluster)
902

    
903
    self._CheckVgName(vm_capable_node_uuids, enabled_disk_templates,
904
                      new_enabled_disk_templates)
905

    
906
    if self.op.file_storage_dir is not None:
907
      CheckFileStoragePathVsEnabledDiskTemplates(
908
          self.LogWarning, self.op.file_storage_dir, enabled_disk_templates)
909

    
910
    if self.op.shared_file_storage_dir is not None:
911
      CheckSharedFileStoragePathVsEnabledDiskTemplates(
912
          self.LogWarning, self.op.shared_file_storage_dir,
913
          enabled_disk_templates)
914

    
915
    self._CheckDrbdHelper(node_uuids)
916

    
917
    # validate params changes
918
    if self.op.beparams:
919
      objects.UpgradeBeParams(self.op.beparams)
920
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
921
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
922

    
923
    if self.op.ndparams:
924
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
925
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
926

    
927
      # TODO: we need a more general way to handle resetting
928
      # cluster-level parameters to default values
929
      if self.new_ndparams["oob_program"] == "":
930
        self.new_ndparams["oob_program"] = \
931
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
932

    
933
    if self.op.hv_state:
934
      new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
935
                                           self.cluster.hv_state_static)
936
      self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
937
                               for hv, values in new_hv_state.items())
938

    
939
    if self.op.disk_state:
940
      new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
941
                                               self.cluster.disk_state_static)
942
      self.new_disk_state = \
943
        dict((storage, dict((name, cluster.SimpleFillDiskState(values))
944
                            for name, values in svalues.items()))
945
             for storage, svalues in new_disk_state.items())
946

    
947
    self._CheckIpolicy(cluster, enabled_disk_templates)
948

    
949
    if self.op.nicparams:
950
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
951
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
952
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
953
      nic_errors = []
954

    
955
      # check all instances for consistency
956
      for instance in self.cfg.GetAllInstancesInfo().values():
957
        for nic_idx, nic in enumerate(instance.nics):
958
          params_copy = copy.deepcopy(nic.nicparams)
959
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
960

    
961
          # check parameter syntax
962
          try:
963
            objects.NIC.CheckParameterSyntax(params_filled)
964
          except errors.ConfigurationError, err:
965
            nic_errors.append("Instance %s, nic/%d: %s" %
966
                              (instance.name, nic_idx, err))
967

    
968
          # if we're moving instances to routed, check that they have an ip
969
          target_mode = params_filled[constants.NIC_MODE]
970
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
971
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
972
                              " address" % (instance.name, nic_idx))
973
      if nic_errors:
974
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
975
                                   "\n".join(nic_errors), errors.ECODE_INVAL)
976

    
977
    # hypervisor list/parameters
978
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
979
    if self.op.hvparams:
980
      for hv_name, hv_dict in self.op.hvparams.items():
981
        if hv_name not in self.new_hvparams:
982
          self.new_hvparams[hv_name] = hv_dict
983
        else:
984
          self.new_hvparams[hv_name].update(hv_dict)
985

    
986
    # disk template parameters
987
    self.new_diskparams = objects.FillDict(cluster.diskparams, {})
988
    if self.op.diskparams:
989
      for dt_name, dt_params in self.op.diskparams.items():
990
        if dt_name not in self.new_diskparams:
991
          self.new_diskparams[dt_name] = dt_params
992
        else:
993
          self.new_diskparams[dt_name].update(dt_params)
994

    
995
    # os hypervisor parameters
996
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
997
    if self.op.os_hvp:
998
      for os_name, hvs in self.op.os_hvp.items():
999
        if os_name not in self.new_os_hvp:
1000
          self.new_os_hvp[os_name] = hvs
1001
        else:
1002
          for hv_name, hv_dict in hvs.items():
1003
            if hv_dict is None:
1004
              # Delete if it exists
1005
              self.new_os_hvp[os_name].pop(hv_name, None)
1006
            elif hv_name not in self.new_os_hvp[os_name]:
1007
              self.new_os_hvp[os_name][hv_name] = hv_dict
1008
            else:
1009
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
1010

    
1011
    # os parameters
1012
    self.new_osp = objects.FillDict(cluster.osparams, {})
1013
    if self.op.osparams:
1014
      for os_name, osp in self.op.osparams.items():
1015
        if os_name not in self.new_osp:
1016
          self.new_osp[os_name] = {}
1017

    
1018
        self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
1019
                                                 use_none=True)
1020

    
1021
        if not self.new_osp[os_name]:
1022
          # we removed all parameters
1023
          del self.new_osp[os_name]
1024
        else:
1025
          # check the parameter validity (remote check)
1026
          CheckOSParams(self, False, [self.cfg.GetMasterNode()],
1027
                        os_name, self.new_osp[os_name])
1028

    
1029
    # changes to the hypervisor list
1030
    if self.op.enabled_hypervisors is not None:
1031
      self.hv_list = self.op.enabled_hypervisors
1032
      for hv in self.hv_list:
1033
        # if the hypervisor doesn't already exist in the cluster
1034
        # hvparams, we initialize it to empty, and then (in both
1035
        # cases) we make sure to fill the defaults, as we might not
1036
        # have a complete defaults list if the hypervisor wasn't
1037
        # enabled before
1038
        if hv not in new_hvp:
1039
          new_hvp[hv] = {}
1040
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
1041
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
1042
    else:
1043
      self.hv_list = cluster.enabled_hypervisors
1044

    
1045
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1046
      # either the enabled list has changed, or the parameters have, validate
1047
      for hv_name, hv_params in self.new_hvparams.items():
1048
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1049
            (self.op.enabled_hypervisors and
1050
             hv_name in self.op.enabled_hypervisors)):
1051
          # either this is a new hypervisor, or its parameters have changed
1052
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1053
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1054
          hv_class.CheckParameterSyntax(hv_params)
1055
          CheckHVParams(self, node_uuids, hv_name, hv_params)
1056

    
1057
    self._CheckDiskTemplateConsistency()
1058

    
1059
    if self.op.os_hvp:
1060
      # no need to check any newly-enabled hypervisors, since the
1061
      # defaults have already been checked in the above code-block
1062
      for os_name, os_hvp in self.new_os_hvp.items():
1063
        for hv_name, hv_params in os_hvp.items():
1064
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1065
          # we need to fill in the new os_hvp on top of the actual hv_p
1066
          cluster_defaults = self.new_hvparams.get(hv_name, {})
1067
          new_osp = objects.FillDict(cluster_defaults, hv_params)
1068
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1069
          hv_class.CheckParameterSyntax(new_osp)
1070
          CheckHVParams(self, node_uuids, hv_name, new_osp)
1071

    
1072
    if self.op.default_iallocator:
1073
      alloc_script = utils.FindFile(self.op.default_iallocator,
1074
                                    constants.IALLOCATOR_SEARCH_PATH,
1075
                                    os.path.isfile)
1076
      if alloc_script is None:
1077
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
1078
                                   " specified" % self.op.default_iallocator,
1079
                                   errors.ECODE_INVAL)
1080

    
1081
  def _CheckDiskTemplateConsistency(self):
1082
    """Check whether the disk templates that are going to be disabled
1083
       are still in use by some instances.
1084

1085
    """
1086
    if self.op.enabled_disk_templates:
1087
      cluster = self.cfg.GetClusterInfo()
1088
      instances = self.cfg.GetAllInstancesInfo()
1089

    
1090
      disk_templates_to_remove = set(cluster.enabled_disk_templates) \
1091
        - set(self.op.enabled_disk_templates)
1092
      for instance in instances.itervalues():
1093
        if instance.disk_template in disk_templates_to_remove:
1094
          raise errors.OpPrereqError("Cannot disable disk template '%s',"
1095
                                     " because instance '%s' is using it." %
1096
                                     (instance.disk_template, instance.name))
1097

    
1098
  def _SetVgName(self, feedback_fn):
1099
    """Determines and sets the new volume group name.
1100

1101
    """
1102
    if self.op.vg_name is not None:
1103
      new_volume = self.op.vg_name
1104
      if not new_volume:
1105
        new_volume = None
1106
      if new_volume != self.cfg.GetVGName():
1107
        self.cfg.SetVGName(new_volume)
1108
      else:
1109
        feedback_fn("Cluster LVM configuration already in desired"
1110
                    " state, not changing")
1111

    
1112
  def _SetFileStorageDir(self, feedback_fn):
1113
    """Set the file storage directory.
1114

1115
    """
1116
    if self.op.file_storage_dir is not None:
1117
      if self.cluster.file_storage_dir == self.op.file_storage_dir:
1118
        feedback_fn("Global file storage dir already set to value '%s'"
1119
                    % self.cluster.file_storage_dir)
1120
      else:
1121
        self.cluster.file_storage_dir = self.op.file_storage_dir
1122

    
1123
  def _SetDrbdHelper(self, feedback_fn):
1124
    """Set the DRBD usermode helper.
1125

1126
    """
1127
    if self.op.drbd_helper is not None:
1128
      if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
1129
        feedback_fn("Note that you specified a drbd user helper, but did not"
1130
                    " enable the drbd disk template.")
1131
      new_helper = self.op.drbd_helper
1132
      if not new_helper:
1133
        new_helper = None
1134
      if new_helper != self.cfg.GetDRBDHelper():
1135
        self.cfg.SetDRBDHelper(new_helper)
1136
      else:
1137
        feedback_fn("Cluster DRBD helper already in desired state,"
1138
                    " not changing")
1139

    
1140
  def Exec(self, feedback_fn):
1141
    """Change the parameters of the cluster.
1142

1143
    """
1144
    if self.op.enabled_disk_templates:
1145
      self.cluster.enabled_disk_templates = \
1146
        list(set(self.op.enabled_disk_templates))
1147

    
1148
    self._SetVgName(feedback_fn)
1149
    self._SetFileStorageDir(feedback_fn)
1150
    self._SetDrbdHelper(feedback_fn)
1151

    
1152
    if self.op.hvparams:
1153
      self.cluster.hvparams = self.new_hvparams
1154
    if self.op.os_hvp:
1155
      self.cluster.os_hvp = self.new_os_hvp
1156
    if self.op.enabled_hypervisors is not None:
1157
      self.cluster.hvparams = self.new_hvparams
1158
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1159
    if self.op.beparams:
1160
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1161
    if self.op.nicparams:
1162
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1163
    if self.op.ipolicy:
1164
      self.cluster.ipolicy = self.new_ipolicy
1165
    if self.op.osparams:
1166
      self.cluster.osparams = self.new_osp
1167
    if self.op.ndparams:
1168
      self.cluster.ndparams = self.new_ndparams
1169
    if self.op.diskparams:
1170
      self.cluster.diskparams = self.new_diskparams
1171
    if self.op.hv_state:
1172
      self.cluster.hv_state_static = self.new_hv_state
1173
    if self.op.disk_state:
1174
      self.cluster.disk_state_static = self.new_disk_state
1175

    
1176
    if self.op.candidate_pool_size is not None:
1177
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1178
      # we need to update the pool size here, otherwise the save will fail
1179
      AdjustCandidatePool(self, [])
1180

    
1181
    if self.op.maintain_node_health is not None:
1182
      if self.op.maintain_node_health and not constants.ENABLE_CONFD:
1183
        feedback_fn("Note: CONFD was disabled at build time, node health"
1184
                    " maintenance is not useful (still enabling it)")
1185
      self.cluster.maintain_node_health = self.op.maintain_node_health
1186

    
1187
    if self.op.modify_etc_hosts is not None:
1188
      self.cluster.modify_etc_hosts = self.op.modify_etc_hosts
1189

    
1190
    if self.op.prealloc_wipe_disks is not None:
1191
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
1192

    
1193
    if self.op.add_uids is not None:
1194
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
1195

    
1196
    if self.op.remove_uids is not None:
1197
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
1198

    
1199
    if self.op.uid_pool is not None:
1200
      self.cluster.uid_pool = self.op.uid_pool
1201

    
1202
    if self.op.default_iallocator is not None:
1203
      self.cluster.default_iallocator = self.op.default_iallocator
1204

    
1205
    if self.op.reserved_lvs is not None:
1206
      self.cluster.reserved_lvs = self.op.reserved_lvs
1207

    
1208
    if self.op.use_external_mip_script is not None:
1209
      self.cluster.use_external_mip_script = self.op.use_external_mip_script
1210

    
1211
    def helper_os(aname, mods, desc):
1212
      desc += " OS list"
1213
      lst = getattr(self.cluster, aname)
1214
      for key, val in mods:
1215
        if key == constants.DDM_ADD:
1216
          if val in lst:
1217
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1218
          else:
1219
            lst.append(val)
1220
        elif key == constants.DDM_REMOVE:
1221
          if val in lst:
1222
            lst.remove(val)
1223
          else:
1224
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1225
        else:
1226
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
1227

    
1228
    if self.op.hidden_os:
1229
      helper_os("hidden_os", self.op.hidden_os, "hidden")
1230

    
1231
    if self.op.blacklisted_os:
1232
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1233

    
1234
    if self.op.master_netdev:
1235
      master_params = self.cfg.GetMasterNetworkParameters()
1236
      ems = self.cfg.GetUseExternalMipScript()
1237
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
1238
                  self.cluster.master_netdev)
1239
      result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
1240
                                                       master_params, ems)
1241
      if not self.op.force:
1242
        result.Raise("Could not disable the master ip")
1243
      else:
1244
        if result.fail_msg:
1245
          msg = ("Could not disable the master ip (continuing anyway): %s" %
1246
                 result.fail_msg)
1247
          feedback_fn(msg)
1248
      feedback_fn("Changing master_netdev from %s to %s" %
1249
                  (master_params.netdev, self.op.master_netdev))
1250
      self.cluster.master_netdev = self.op.master_netdev
1251

    
1252
    if self.op.master_netmask:
1253
      master_params = self.cfg.GetMasterNetworkParameters()
1254
      feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1255
      result = self.rpc.call_node_change_master_netmask(
1256
                 master_params.uuid, master_params.netmask,
1257
                 self.op.master_netmask, master_params.ip,
1258
                 master_params.netdev)
1259
      result.Warn("Could not change the master IP netmask", feedback_fn)
1260
      self.cluster.master_netmask = self.op.master_netmask
1261

    
1262
    self.cfg.Update(self.cluster, feedback_fn)
1263

    
1264
    if self.op.master_netdev:
1265
      master_params = self.cfg.GetMasterNetworkParameters()
1266
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
1267
                  self.op.master_netdev)
1268
      ems = self.cfg.GetUseExternalMipScript()
1269
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
1270
                                                     master_params, ems)
1271
      result.Warn("Could not re-enable the master ip on the master,"
1272
                  " please restart manually", self.LogWarning)
1273

    
1274

    
1275
class LUClusterVerify(NoHooksLU):
1276
  """Submits all jobs necessary to verify the cluster.
1277

1278
  """
1279
  REQ_BGL = False
1280

    
1281
  def ExpandNames(self):
1282
    self.needed_locks = {}
1283

    
1284
  def Exec(self, feedback_fn):
1285
    jobs = []
1286

    
1287
    if self.op.group_name:
1288
      groups = [self.op.group_name]
1289
      depends_fn = lambda: None
1290
    else:
1291
      groups = self.cfg.GetNodeGroupList()
1292

    
1293
      # Verify global configuration
1294
      jobs.append([
1295
        opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1296
        ])
1297

    
1298
      # Always depend on global verification
1299
      depends_fn = lambda: [(-len(jobs), [])]
1300

    
1301
    jobs.extend(
1302
      [opcodes.OpClusterVerifyGroup(group_name=group,
1303
                                    ignore_errors=self.op.ignore_errors,
1304
                                    depends=depends_fn())]
1305
      for group in groups)
1306

    
1307
    # Fix up all parameters
1308
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1309
      op.debug_simulate_errors = self.op.debug_simulate_errors
1310
      op.verbose = self.op.verbose
1311
      op.error_codes = self.op.error_codes
1312
      try:
1313
        op.skip_checks = self.op.skip_checks
1314
      except AttributeError:
1315
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1316

    
1317
    return ResultWithJobs(jobs)
1318

    
1319

    
1320
class _VerifyErrors(object):
1321
  """Mix-in for cluster/group verify LUs.
1322

1323
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1324
  self.op and self._feedback_fn to be available.)
1325

1326
  """
1327

    
1328
  ETYPE_FIELD = "code"
1329
  ETYPE_ERROR = "ERROR"
1330
  ETYPE_WARNING = "WARNING"
1331

    
1332
  def _Error(self, ecode, item, msg, *args, **kwargs):
1333
    """Format an error message.
1334

1335
    Based on the opcode's error_codes parameter, either format a
1336
    parseable error code, or a simpler error string.
1337

1338
    This must be called only from Exec and functions called from Exec.
1339

1340
    """
1341
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1342
    itype, etxt, _ = ecode
1343
    # If the error code is in the list of ignored errors, demote the error to a
1344
    # warning
1345
    if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1346
      ltype = self.ETYPE_WARNING
1347
    # first complete the msg
1348
    if args:
1349
      msg = msg % args
1350
    # then format the whole message
1351
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1352
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1353
    else:
1354
      if item:
1355
        item = " " + item
1356
      else:
1357
        item = ""
1358
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1359
    # and finally report it via the feedback_fn
1360
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1361
    # do not mark the operation as failed for WARN cases only
1362
    if ltype == self.ETYPE_ERROR:
1363
      self.bad = True
1364

    
1365
  def _ErrorIf(self, cond, *args, **kwargs):
1366
    """Log an error message if the passed condition is True.
1367

1368
    """
1369
    if (bool(cond)
1370
        or self.op.debug_simulate_errors): # pylint: disable=E1101
1371
      self._Error(*args, **kwargs)
1372

    
1373

    
1374
def _VerifyCertificate(filename):
1375
  """Verifies a certificate for L{LUClusterVerifyConfig}.
1376

1377
  @type filename: string
1378
  @param filename: Path to PEM file
1379

1380
  """
1381
  try:
1382
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1383
                                           utils.ReadFile(filename))
1384
  except Exception, err: # pylint: disable=W0703
1385
    return (LUClusterVerifyConfig.ETYPE_ERROR,
1386
            "Failed to load X509 certificate %s: %s" % (filename, err))
1387

    
1388
  (errcode, msg) = \
1389
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1390
                                constants.SSL_CERT_EXPIRATION_ERROR)
1391

    
1392
  if msg:
1393
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1394
  else:
1395
    fnamemsg = None
1396

    
1397
  if errcode is None:
1398
    return (None, fnamemsg)
1399
  elif errcode == utils.CERT_WARNING:
1400
    return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1401
  elif errcode == utils.CERT_ERROR:
1402
    return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1403

    
1404
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1405

    
1406

    
1407
def _GetAllHypervisorParameters(cluster, instances):
1408
  """Compute the set of all hypervisor parameters.
1409

1410
  @type cluster: L{objects.Cluster}
1411
  @param cluster: the cluster object
1412
  @param instances: list of L{objects.Instance}
1413
  @param instances: additional instances from which to obtain parameters
1414
  @rtype: list of (origin, hypervisor, parameters)
1415
  @return: a list with all parameters found, indicating the hypervisor they
1416
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1417

1418
  """
1419
  hvp_data = []
1420

    
1421
  for hv_name in cluster.enabled_hypervisors:
1422
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1423

    
1424
  for os_name, os_hvp in cluster.os_hvp.items():
1425
    for hv_name, hv_params in os_hvp.items():
1426
      if hv_params:
1427
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1428
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1429

    
1430
  # TODO: collapse identical parameter values in a single one
1431
  for instance in instances:
1432
    if instance.hvparams:
1433
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1434
                       cluster.FillHV(instance)))
1435

    
1436
  return hvp_data
1437

    
1438

    
1439
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1440
  """Verifies the cluster config.
1441

1442
  """
1443
  REQ_BGL = False
1444

    
1445
  def _VerifyHVP(self, hvp_data):
1446
    """Verifies locally the syntax of the hypervisor parameters.
1447

1448
    """
1449
    for item, hv_name, hv_params in hvp_data:
1450
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1451
             (item, hv_name))
1452
      try:
1453
        hv_class = hypervisor.GetHypervisorClass(hv_name)
1454
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1455
        hv_class.CheckParameterSyntax(hv_params)
1456
      except errors.GenericError, err:
1457
        self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1458

    
1459
  def ExpandNames(self):
1460
    self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1461
    self.share_locks = ShareAll()
1462

    
1463
  def CheckPrereq(self):
1464
    """Check prerequisites.
1465

1466
    """
1467
    # Retrieve all information
1468
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1469
    self.all_node_info = self.cfg.GetAllNodesInfo()
1470
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1471

    
1472
  def Exec(self, feedback_fn):
1473
    """Verify integrity of cluster, performing various test on nodes.
1474

1475
    """
1476
    self.bad = False
1477
    self._feedback_fn = feedback_fn
1478

    
1479
    feedback_fn("* Verifying cluster config")
1480

    
1481
    for msg in self.cfg.VerifyConfig():
1482
      self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1483

    
1484
    feedback_fn("* Verifying cluster certificate files")
1485

    
1486
    for cert_filename in pathutils.ALL_CERT_FILES:
1487
      (errcode, msg) = _VerifyCertificate(cert_filename)
1488
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1489

    
1490
    self._ErrorIf(not utils.CanRead(constants.LUXID_USER,
1491
                                    pathutils.NODED_CERT_FILE),
1492
                  constants.CV_ECLUSTERCERT,
1493
                  None,
1494
                  pathutils.NODED_CERT_FILE + " must be accessible by the " +
1495
                    constants.LUXID_USER + " user")
1496

    
1497
    feedback_fn("* Verifying hypervisor parameters")
1498

    
1499
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1500
                                                self.all_inst_info.values()))
1501

    
1502
    feedback_fn("* Verifying all nodes belong to an existing group")
1503

    
1504
    # We do this verification here because, should this bogus circumstance
1505
    # occur, it would never be caught by VerifyGroup, which only acts on
1506
    # nodes/instances reachable from existing node groups.
1507

    
1508
    dangling_nodes = set(node for node in self.all_node_info.values()
1509
                         if node.group not in self.all_group_info)
1510

    
1511
    dangling_instances = {}
1512
    no_node_instances = []
1513

    
1514
    for inst in self.all_inst_info.values():
1515
      if inst.primary_node in [node.uuid for node in dangling_nodes]:
1516
        dangling_instances.setdefault(inst.primary_node, []).append(inst)
1517
      elif inst.primary_node not in self.all_node_info:
1518
        no_node_instances.append(inst)
1519

    
1520
    pretty_dangling = [
1521
        "%s (%s)" %
1522
        (node.name,
1523
         utils.CommaJoin(inst.name for
1524
                         inst in dangling_instances.get(node.uuid, [])))
1525
        for node in dangling_nodes]
1526

    
1527
    self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1528
                  None,
1529
                  "the following nodes (and their instances) belong to a non"
1530
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1531

    
1532
    self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1533
                  None,
1534
                  "the following instances have a non-existing primary-node:"
1535
                  " %s", utils.CommaJoin(inst.name for
1536
                                         inst in no_node_instances))
1537

    
1538
    return not self.bad
1539

    
1540

    
1541
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1542
  """Verifies the status of a node group.
1543

1544
  """
1545
  HPATH = "cluster-verify"
1546
  HTYPE = constants.HTYPE_CLUSTER
1547
  REQ_BGL = False
1548

    
1549
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1550

    
1551
  class NodeImage(object):
1552
    """A class representing the logical and physical status of a node.
1553

1554
    @type uuid: string
1555
    @ivar uuid: the node UUID to which this object refers
1556
    @ivar volumes: a structure as returned from
1557
        L{ganeti.backend.GetVolumeList} (runtime)
1558
    @ivar instances: a list of running instances (runtime)
1559
    @ivar pinst: list of configured primary instances (config)
1560
    @ivar sinst: list of configured secondary instances (config)
1561
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1562
        instances for which this node is secondary (config)
1563
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1564
    @ivar dfree: free disk, as reported by the node (runtime)
1565
    @ivar offline: the offline status (config)
1566
    @type rpc_fail: boolean
1567
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1568
        not whether the individual keys were correct) (runtime)
1569
    @type lvm_fail: boolean
1570
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1571
    @type hyp_fail: boolean
1572
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1573
    @type ghost: boolean
1574
    @ivar ghost: whether this is a known node or not (config)
1575
    @type os_fail: boolean
1576
    @ivar os_fail: whether the RPC call didn't return valid OS data
1577
    @type oslist: list
1578
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1579
    @type vm_capable: boolean
1580
    @ivar vm_capable: whether the node can host instances
1581
    @type pv_min: float
1582
    @ivar pv_min: size in MiB of the smallest PVs
1583
    @type pv_max: float
1584
    @ivar pv_max: size in MiB of the biggest PVs
1585

1586
    """
1587
    def __init__(self, offline=False, uuid=None, vm_capable=True):
1588
      self.uuid = uuid
1589
      self.volumes = {}
1590
      self.instances = []
1591
      self.pinst = []
1592
      self.sinst = []
1593
      self.sbp = {}
1594
      self.mfree = 0
1595
      self.dfree = 0
1596
      self.offline = offline
1597
      self.vm_capable = vm_capable
1598
      self.rpc_fail = False
1599
      self.lvm_fail = False
1600
      self.hyp_fail = False
1601
      self.ghost = False
1602
      self.os_fail = False
1603
      self.oslist = {}
1604
      self.pv_min = None
1605
      self.pv_max = None
1606

    
1607
  def ExpandNames(self):
1608
    # This raises errors.OpPrereqError on its own:
1609
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1610

    
1611
    # Get instances in node group; this is unsafe and needs verification later
1612
    inst_uuids = \
1613
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1614

    
1615
    self.needed_locks = {
1616
      locking.LEVEL_INSTANCE: self.cfg.GetInstanceNames(inst_uuids),
1617
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1618
      locking.LEVEL_NODE: [],
1619

    
1620
      # This opcode is run by watcher every five minutes and acquires all nodes
1621
      # for a group. It doesn't run for a long time, so it's better to acquire
1622
      # the node allocation lock as well.
1623
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1624
      }
1625

    
1626
    self.share_locks = ShareAll()
1627

    
1628
  def DeclareLocks(self, level):
1629
    if level == locking.LEVEL_NODE:
1630
      # Get members of node group; this is unsafe and needs verification later
1631
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1632

    
1633
      # In Exec(), we warn about mirrored instances that have primary and
1634
      # secondary living in separate node groups. To fully verify that
1635
      # volumes for these instances are healthy, we will need to do an
1636
      # extra call to their secondaries. We ensure here those nodes will
1637
      # be locked.
1638
      for inst_name in self.owned_locks(locking.LEVEL_INSTANCE):
1639
        # Important: access only the instances whose lock is owned
1640
        instance = self.cfg.GetInstanceInfoByName(inst_name)
1641
        if instance.disk_template in constants.DTS_INT_MIRROR:
1642
          nodes.update(instance.secondary_nodes)
1643

    
1644
      self.needed_locks[locking.LEVEL_NODE] = nodes
1645

    
1646
  def CheckPrereq(self):
1647
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1648
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1649

    
1650
    group_node_uuids = set(self.group_info.members)
1651
    group_inst_uuids = \
1652
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1653

    
1654
    unlocked_node_uuids = \
1655
        group_node_uuids.difference(self.owned_locks(locking.LEVEL_NODE))
1656

    
1657
    unlocked_inst_uuids = \
1658
        group_inst_uuids.difference(
1659
          [self.cfg.GetInstanceInfoByName(name).uuid
1660
           for name in self.owned_locks(locking.LEVEL_INSTANCE)])
1661

    
1662
    if unlocked_node_uuids:
1663
      raise errors.OpPrereqError(
1664
        "Missing lock for nodes: %s" %
1665
        utils.CommaJoin(self.cfg.GetNodeNames(unlocked_node_uuids)),
1666
        errors.ECODE_STATE)
1667

    
1668
    if unlocked_inst_uuids:
1669
      raise errors.OpPrereqError(
1670
        "Missing lock for instances: %s" %
1671
        utils.CommaJoin(self.cfg.GetInstanceNames(unlocked_inst_uuids)),
1672
        errors.ECODE_STATE)
1673

    
1674
    self.all_node_info = self.cfg.GetAllNodesInfo()
1675
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1676

    
1677
    self.my_node_uuids = group_node_uuids
1678
    self.my_node_info = dict((node_uuid, self.all_node_info[node_uuid])
1679
                             for node_uuid in group_node_uuids)
1680

    
1681
    self.my_inst_uuids = group_inst_uuids
1682
    self.my_inst_info = dict((inst_uuid, self.all_inst_info[inst_uuid])
1683
                             for inst_uuid in group_inst_uuids)
1684

    
1685
    # We detect here the nodes that will need the extra RPC calls for verifying
1686
    # split LV volumes; they should be locked.
1687
    extra_lv_nodes = set()
1688

    
1689
    for inst in self.my_inst_info.values():
1690
      if inst.disk_template in constants.DTS_INT_MIRROR:
1691
        for nuuid in inst.all_nodes:
1692
          if self.all_node_info[nuuid].group != self.group_uuid:
1693
            extra_lv_nodes.add(nuuid)
1694

    
1695
    unlocked_lv_nodes = \
1696
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1697

    
1698
    if unlocked_lv_nodes:
1699
      raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1700
                                 utils.CommaJoin(unlocked_lv_nodes),
1701
                                 errors.ECODE_STATE)
1702
    self.extra_lv_nodes = list(extra_lv_nodes)
1703

    
1704
  def _VerifyNode(self, ninfo, nresult):
1705
    """Perform some basic validation on data returned from a node.
1706

1707
      - check the result data structure is well formed and has all the
1708
        mandatory fields
1709
      - check ganeti version
1710

1711
    @type ninfo: L{objects.Node}
1712
    @param ninfo: the node to check
1713
    @param nresult: the results from the node
1714
    @rtype: boolean
1715
    @return: whether overall this call was successful (and we can expect
1716
         reasonable values in the respose)
1717

1718
    """
1719
    # main result, nresult should be a non-empty dict
1720
    test = not nresult or not isinstance(nresult, dict)
1721
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1722
                  "unable to verify node: no data returned")
1723
    if test:
1724
      return False
1725

    
1726
    # compares ganeti version
1727
    local_version = constants.PROTOCOL_VERSION
1728
    remote_version = nresult.get("version", None)
1729
    test = not (remote_version and
1730
                isinstance(remote_version, (list, tuple)) and
1731
                len(remote_version) == 2)
1732
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1733
                  "connection to node returned invalid data")
1734
    if test:
1735
      return False
1736

    
1737
    test = local_version != remote_version[0]
1738
    self._ErrorIf(test, constants.CV_ENODEVERSION, ninfo.name,
1739
                  "incompatible protocol versions: master %s,"
1740
                  " node %s", local_version, remote_version[0])
1741
    if test:
1742
      return False
1743

    
1744
    # node seems compatible, we can actually try to look into its results
1745

    
1746
    # full package version
1747
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1748
                  constants.CV_ENODEVERSION, ninfo.name,
1749
                  "software version mismatch: master %s, node %s",
1750
                  constants.RELEASE_VERSION, remote_version[1],
1751
                  code=self.ETYPE_WARNING)
1752

    
1753
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1754
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1755
      for hv_name, hv_result in hyp_result.iteritems():
1756
        test = hv_result is not None
1757
        self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
1758
                      "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1759

    
1760
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1761
    if ninfo.vm_capable and isinstance(hvp_result, list):
1762
      for item, hv_name, hv_result in hvp_result:
1763
        self._ErrorIf(True, constants.CV_ENODEHV, ninfo.name,
1764
                      "hypervisor %s parameter verify failure (source %s): %s",
1765
                      hv_name, item, hv_result)
1766

    
1767
    test = nresult.get(constants.NV_NODESETUP,
1768
                       ["Missing NODESETUP results"])
1769
    self._ErrorIf(test, constants.CV_ENODESETUP, ninfo.name,
1770
                  "node setup error: %s", "; ".join(test))
1771

    
1772
    return True
1773

    
1774
  def _VerifyNodeTime(self, ninfo, nresult,
1775
                      nvinfo_starttime, nvinfo_endtime):
1776
    """Check the node time.
1777

1778
    @type ninfo: L{objects.Node}
1779
    @param ninfo: the node to check
1780
    @param nresult: the remote results for the node
1781
    @param nvinfo_starttime: the start time of the RPC call
1782
    @param nvinfo_endtime: the end time of the RPC call
1783

1784
    """
1785
    ntime = nresult.get(constants.NV_TIME, None)
1786
    try:
1787
      ntime_merged = utils.MergeTime(ntime)
1788
    except (ValueError, TypeError):
1789
      self._ErrorIf(True, constants.CV_ENODETIME, ninfo.name,
1790
                    "Node returned invalid time")
1791
      return
1792

    
1793
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1794
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1795
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1796
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1797
    else:
1798
      ntime_diff = None
1799

    
1800
    self._ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, ninfo.name,
1801
                  "Node time diverges by at least %s from master node time",
1802
                  ntime_diff)
1803

    
1804
  def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1805
    """Check the node LVM results and update info for cross-node checks.
1806

1807
    @type ninfo: L{objects.Node}
1808
    @param ninfo: the node to check
1809
    @param nresult: the remote results for the node
1810
    @param vg_name: the configured VG name
1811
    @type nimg: L{NodeImage}
1812
    @param nimg: node image
1813

1814
    """
1815
    if vg_name is None:
1816
      return
1817

    
1818
    # checks vg existence and size > 20G
1819
    vglist = nresult.get(constants.NV_VGLIST, None)
1820
    test = not vglist
1821
    self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
1822
                  "unable to check volume groups")
1823
    if not test:
1824
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1825
                                            constants.MIN_VG_SIZE)
1826
      self._ErrorIf(vgstatus, constants.CV_ENODELVM, ninfo.name, vgstatus)
1827

    
1828
    # Check PVs
1829
    (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1830
    for em in errmsgs:
1831
      self._Error(constants.CV_ENODELVM, ninfo.name, em)
1832
    if pvminmax is not None:
1833
      (nimg.pv_min, nimg.pv_max) = pvminmax
1834

    
1835
  def _VerifyGroupDRBDVersion(self, node_verify_infos):
1836
    """Check cross-node DRBD version consistency.
1837

1838
    @type node_verify_infos: dict
1839
    @param node_verify_infos: infos about nodes as returned from the
1840
      node_verify call.
1841

1842
    """
1843
    node_versions = {}
1844
    for node_uuid, ndata in node_verify_infos.items():
1845
      nresult = ndata.payload
1846
      version = nresult.get(constants.NV_DRBDVERSION, "Missing DRBD version")
1847
      node_versions[node_uuid] = version
1848

    
1849
    if len(set(node_versions.values())) > 1:
1850
      for node_uuid, version in sorted(node_versions.items()):
1851
        msg = "DRBD version mismatch: %s" % version
1852
        self._Error(constants.CV_ENODEDRBDHELPER, node_uuid, msg,
1853
                    code=self.ETYPE_WARNING)
1854

    
1855
  def _VerifyGroupLVM(self, node_image, vg_name):
1856
    """Check cross-node consistency in LVM.
1857

1858
    @type node_image: dict
1859
    @param node_image: info about nodes, mapping from node to names to
1860
      L{NodeImage} objects
1861
    @param vg_name: the configured VG name
1862

1863
    """
1864
    if vg_name is None:
1865
      return
1866

    
1867
    # Only exclusive storage needs this kind of checks
1868
    if not self._exclusive_storage:
1869
      return
1870

    
1871
    # exclusive_storage wants all PVs to have the same size (approximately),
1872
    # if the smallest and the biggest ones are okay, everything is fine.
1873
    # pv_min is None iff pv_max is None
1874
    vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1875
    if not vals:
1876
      return
1877
    (pvmin, minnode_uuid) = min((ni.pv_min, ni.uuid) for ni in vals)
1878
    (pvmax, maxnode_uuid) = max((ni.pv_max, ni.uuid) for ni in vals)
1879
    bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1880
    self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1881
                  "PV sizes differ too much in the group; smallest (%s MB) is"
1882
                  " on %s, biggest (%s MB) is on %s",
1883
                  pvmin, self.cfg.GetNodeName(minnode_uuid),
1884
                  pvmax, self.cfg.GetNodeName(maxnode_uuid))
1885

    
1886
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1887
    """Check the node bridges.
1888

1889
    @type ninfo: L{objects.Node}
1890
    @param ninfo: the node to check
1891
    @param nresult: the remote results for the node
1892
    @param bridges: the expected list of bridges
1893

1894
    """
1895
    if not bridges:
1896
      return
1897

    
1898
    missing = nresult.get(constants.NV_BRIDGES, None)
1899
    test = not isinstance(missing, list)
1900
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1901
                  "did not return valid bridge information")
1902
    if not test:
1903
      self._ErrorIf(bool(missing), constants.CV_ENODENET, ninfo.name,
1904
                    "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1905

    
1906
  def _VerifyNodeUserScripts(self, ninfo, nresult):
1907
    """Check the results of user scripts presence and executability on the node
1908

1909
    @type ninfo: L{objects.Node}
1910
    @param ninfo: the node to check
1911
    @param nresult: the remote results for the node
1912

1913
    """
1914
    test = not constants.NV_USERSCRIPTS in nresult
1915
    self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1916
                  "did not return user scripts information")
1917

    
1918
    broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
1919
    if not test:
1920
      self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1921
                    "user scripts not present or not executable: %s" %
1922
                    utils.CommaJoin(sorted(broken_scripts)))
1923

    
1924
  def _VerifyNodeNetwork(self, ninfo, nresult):
1925
    """Check the node network connectivity results.
1926

1927
    @type ninfo: L{objects.Node}
1928
    @param ninfo: the node to check
1929
    @param nresult: the remote results for the node
1930

1931
    """
1932
    test = constants.NV_NODELIST not in nresult
1933
    self._ErrorIf(test, constants.CV_ENODESSH, ninfo.name,
1934
                  "node hasn't returned node ssh connectivity data")
1935
    if not test:
1936
      if nresult[constants.NV_NODELIST]:
1937
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1938
          self._ErrorIf(True, constants.CV_ENODESSH, ninfo.name,
1939
                        "ssh communication with node '%s': %s", a_node, a_msg)
1940

    
1941
    test = constants.NV_NODENETTEST not in nresult
1942
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1943
                  "node hasn't returned node tcp connectivity data")
1944
    if not test:
1945
      if nresult[constants.NV_NODENETTEST]:
1946
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1947
        for anode in nlist:
1948
          self._ErrorIf(True, constants.CV_ENODENET, ninfo.name,
1949
                        "tcp communication with node '%s': %s",
1950
                        anode, nresult[constants.NV_NODENETTEST][anode])
1951

    
1952
    test = constants.NV_MASTERIP not in nresult
1953
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1954
                  "node hasn't returned node master IP reachability data")
1955
    if not test:
1956
      if not nresult[constants.NV_MASTERIP]:
1957
        if ninfo.uuid == self.master_node:
1958
          msg = "the master node cannot reach the master IP (not configured?)"
1959
        else:
1960
          msg = "cannot reach the master IP"
1961
        self._ErrorIf(True, constants.CV_ENODENET, ninfo.name, msg)
1962

    
1963
  def _VerifyInstance(self, instance, node_image, diskstatus):
1964
    """Verify an instance.
1965

1966
    This function checks to see if the required block devices are
1967
    available on the instance's node, and that the nodes are in the correct
1968
    state.
1969

1970
    """
1971
    pnode_uuid = instance.primary_node
1972
    pnode_img = node_image[pnode_uuid]
1973
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
1974

    
1975
    node_vol_should = {}
1976
    instance.MapLVsByNode(node_vol_should)
1977

    
1978
    cluster = self.cfg.GetClusterInfo()
1979
    ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
1980
                                                            self.group_info)
1981
    err = ComputeIPolicyInstanceViolation(ipolicy, instance, self.cfg)
1982
    self._ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance.name,
1983
                  utils.CommaJoin(err), code=self.ETYPE_WARNING)
1984

    
1985
    for node_uuid in node_vol_should:
1986
      n_img = node_image[node_uuid]
1987
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1988
        # ignore missing volumes on offline or broken nodes
1989
        continue
1990
      for volume in node_vol_should[node_uuid]:
1991
        test = volume not in n_img.volumes
1992
        self._ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance.name,
1993
                      "volume %s missing on node %s", volume,
1994
                      self.cfg.GetNodeName(node_uuid))
1995

    
1996
    if instance.admin_state == constants.ADMINST_UP:
1997
      test = instance.uuid not in pnode_img.instances and not pnode_img.offline
1998
      self._ErrorIf(test, constants.CV_EINSTANCEDOWN, instance.name,
1999
                    "instance not running on its primary node %s",
2000
                     self.cfg.GetNodeName(pnode_uuid))
2001
      self._ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE,
2002
                    instance.name, "instance is marked as running and lives on"
2003
                    " offline node %s", self.cfg.GetNodeName(pnode_uuid))
2004

    
2005
    diskdata = [(nname, success, status, idx)
2006
                for (nname, disks) in diskstatus.items()
2007
                for idx, (success, status) in enumerate(disks)]
2008

    
2009
    for nname, success, bdev_status, idx in diskdata:
2010
      # the 'ghost node' construction in Exec() ensures that we have a
2011
      # node here
2012
      snode = node_image[nname]
2013
      bad_snode = snode.ghost or snode.offline
2014
      self._ErrorIf(instance.disks_active and
2015
                    not success and not bad_snode,
2016
                    constants.CV_EINSTANCEFAULTYDISK, instance.name,
2017
                    "couldn't retrieve status for disk/%s on %s: %s",
2018
                    idx, self.cfg.GetNodeName(nname), bdev_status)
2019

    
2020
      if instance.disks_active and success and \
2021
         (bdev_status.is_degraded or
2022
          bdev_status.ldisk_status != constants.LDS_OKAY):
2023
        msg = "disk/%s on %s" % (idx, self.cfg.GetNodeName(nname))
2024
        if bdev_status.is_degraded:
2025
          msg += " is degraded"
2026
        if bdev_status.ldisk_status != constants.LDS_OKAY:
2027
          msg += "; state is '%s'" % \
2028
                 constants.LDS_NAMES[bdev_status.ldisk_status]
2029

    
2030
        self._Error(constants.CV_EINSTANCEFAULTYDISK, instance.name, msg)
2031

    
2032
    self._ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2033
                  constants.CV_ENODERPC, self.cfg.GetNodeName(pnode_uuid),
2034
                  "instance %s, connection to primary node failed",
2035
                  instance.name)
2036

    
2037
    self._ErrorIf(len(instance.secondary_nodes) > 1,
2038
                  constants.CV_EINSTANCELAYOUT, instance.name,
2039
                  "instance has multiple secondary nodes: %s",
2040
                  utils.CommaJoin(instance.secondary_nodes),
2041
                  code=self.ETYPE_WARNING)
2042

    
2043
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, instance.all_nodes)
2044
    if any(es_flags.values()):
2045
      if instance.disk_template not in constants.DTS_EXCL_STORAGE:
2046
        # Disk template not compatible with exclusive_storage: no instance
2047
        # node should have the flag set
2048
        es_nodes = [n
2049
                    for (n, es) in es_flags.items()
2050
                    if es]
2051
        self._Error(constants.CV_EINSTANCEUNSUITABLENODE, instance.name,
2052
                    "instance has template %s, which is not supported on nodes"
2053
                    " that have exclusive storage set: %s",
2054
                    instance.disk_template,
2055
                    utils.CommaJoin(self.cfg.GetNodeNames(es_nodes)))
2056
      for (idx, disk) in enumerate(instance.disks):
2057
        self._ErrorIf(disk.spindles is None,
2058
                      constants.CV_EINSTANCEMISSINGCFGPARAMETER, instance.name,
2059
                      "number of spindles not configured for disk %s while"
2060
                      " exclusive storage is enabled, try running"
2061
                      " gnt-cluster repair-disk-sizes", idx)
2062

    
2063
    if instance.disk_template in constants.DTS_INT_MIRROR:
2064
      instance_nodes = utils.NiceSort(instance.all_nodes)
2065
      instance_groups = {}
2066

    
2067
      for node_uuid in instance_nodes:
2068
        instance_groups.setdefault(self.all_node_info[node_uuid].group,
2069
                                   []).append(node_uuid)
2070

    
2071
      pretty_list = [
2072
        "%s (group %s)" % (utils.CommaJoin(self.cfg.GetNodeNames(nodes)),
2073
                           groupinfo[group].name)
2074
        # Sort so that we always list the primary node first.
2075
        for group, nodes in sorted(instance_groups.items(),
2076
                                   key=lambda (_, nodes): pnode_uuid in nodes,
2077
                                   reverse=True)]
2078

    
2079
      self._ErrorIf(len(instance_groups) > 1,
2080
                    constants.CV_EINSTANCESPLITGROUPS,
2081
                    instance.name, "instance has primary and secondary nodes in"
2082
                    " different groups: %s", utils.CommaJoin(pretty_list),
2083
                    code=self.ETYPE_WARNING)
2084

    
2085
    inst_nodes_offline = []
2086
    for snode in instance.secondary_nodes:
2087
      s_img = node_image[snode]
2088
      self._ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
2089
                    self.cfg.GetNodeName(snode),
2090
                    "instance %s, connection to secondary node failed",
2091
                    instance.name)
2092

    
2093
      if s_img.offline:
2094
        inst_nodes_offline.append(snode)
2095

    
2096
    # warn that the instance lives on offline nodes
2097
    self._ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE,
2098
                  instance.name, "instance has offline secondary node(s) %s",
2099
                  utils.CommaJoin(self.cfg.GetNodeNames(inst_nodes_offline)))
2100
    # ... or ghost/non-vm_capable nodes
2101
    for node_uuid in instance.all_nodes:
2102
      self._ErrorIf(node_image[node_uuid].ghost, constants.CV_EINSTANCEBADNODE,
2103
                    instance.name, "instance lives on ghost node %s",
2104
                    self.cfg.GetNodeName(node_uuid))
2105
      self._ErrorIf(not node_image[node_uuid].vm_capable,
2106
                    constants.CV_EINSTANCEBADNODE, instance.name,
2107
                    "instance lives on non-vm_capable node %s",
2108
                    self.cfg.GetNodeName(node_uuid))
2109

    
2110
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2111
    """Verify if there are any unknown volumes in the cluster.
2112

2113
    The .os, .swap and backup volumes are ignored. All other volumes are
2114
    reported as unknown.
2115

2116
    @type reserved: L{ganeti.utils.FieldSet}
2117
    @param reserved: a FieldSet of reserved volume names
2118

2119
    """
2120
    for node_uuid, n_img in node_image.items():
2121
      if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
2122
          self.all_node_info[node_uuid].group != self.group_uuid):
2123
        # skip non-healthy nodes
2124
        continue
2125
      for volume in n_img.volumes:
2126
        test = ((node_uuid not in node_vol_should or
2127
                volume not in node_vol_should[node_uuid]) and
2128
                not reserved.Matches(volume))
2129
        self._ErrorIf(test, constants.CV_ENODEORPHANLV,
2130
                      self.cfg.GetNodeName(node_uuid),
2131
                      "volume %s is unknown", volume)
2132

    
2133
  def _VerifyNPlusOneMemory(self, node_image, all_insts):
2134
    """Verify N+1 Memory Resilience.
2135

2136
    Check that if one single node dies we can still start all the
2137
    instances it was primary for.
2138

2139
    """
2140
    cluster_info = self.cfg.GetClusterInfo()
2141
    for node_uuid, n_img in node_image.items():
2142
      # This code checks that every node which is now listed as
2143
      # secondary has enough memory to host all instances it is
2144
      # supposed to should a single other node in the cluster fail.
2145
      # FIXME: not ready for failover to an arbitrary node
2146
      # FIXME: does not support file-backed instances
2147
      # WARNING: we currently take into account down instances as well
2148
      # as up ones, considering that even if they're down someone
2149
      # might want to start them even in the event of a node failure.
2150
      if n_img.offline or \
2151
         self.all_node_info[node_uuid].group != self.group_uuid:
2152
        # we're skipping nodes marked offline and nodes in other groups from
2153
        # the N+1 warning, since most likely we don't have good memory
2154
        # information from them; we already list instances living on such
2155
        # nodes, and that's enough warning
2156
        continue
2157
      #TODO(dynmem): also consider ballooning out other instances
2158
      for prinode, inst_uuids in n_img.sbp.items():
2159
        needed_mem = 0
2160
        for inst_uuid in inst_uuids:
2161
          bep = cluster_info.FillBE(all_insts[inst_uuid])
2162
          if bep[constants.BE_AUTO_BALANCE]:
2163
            needed_mem += bep[constants.BE_MINMEM]
2164
        test = n_img.mfree < needed_mem
2165
        self._ErrorIf(test, constants.CV_ENODEN1,
2166
                      self.cfg.GetNodeName(node_uuid),
2167
                      "not enough memory to accomodate instance failovers"
2168
                      " should node %s fail (%dMiB needed, %dMiB available)",
2169
                      self.cfg.GetNodeName(prinode), needed_mem, n_img.mfree)
2170

    
2171
  def _VerifyFiles(self, nodes, master_node_uuid, all_nvinfo,
2172
                   (files_all, files_opt, files_mc, files_vm)):
2173
    """Verifies file checksums collected from all nodes.
2174

2175
    @param nodes: List of L{objects.Node} objects
2176
    @param master_node_uuid: UUID of master node
2177
    @param all_nvinfo: RPC results
2178

2179
    """
2180
    # Define functions determining which nodes to consider for a file
2181
    files2nodefn = [
2182
      (files_all, None),
2183
      (files_mc, lambda node: (node.master_candidate or
2184
                               node.uuid == master_node_uuid)),
2185
      (files_vm, lambda node: node.vm_capable),
2186
      ]
2187

    
2188
    # Build mapping from filename to list of nodes which should have the file
2189
    nodefiles = {}
2190
    for (files, fn) in files2nodefn:
2191
      if fn is None:
2192
        filenodes = nodes
2193
      else:
2194
        filenodes = filter(fn, nodes)
2195
      nodefiles.update((filename,
2196
                        frozenset(map(operator.attrgetter("uuid"), filenodes)))
2197
                       for filename in files)
2198

    
2199
    assert set(nodefiles) == (files_all | files_mc | files_vm)
2200

    
2201
    fileinfo = dict((filename, {}) for filename in nodefiles)
2202
    ignore_nodes = set()
2203

    
2204
    for node in nodes:
2205
      if node.offline:
2206
        ignore_nodes.add(node.uuid)
2207
        continue
2208

    
2209
      nresult = all_nvinfo[node.uuid]
2210

    
2211
      if nresult.fail_msg or not nresult.payload:
2212
        node_files = None
2213
      else:
2214
        fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
2215
        node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2216
                          for (key, value) in fingerprints.items())
2217
        del fingerprints
2218

    
2219
      test = not (node_files and isinstance(node_files, dict))
2220
      self._ErrorIf(test, constants.CV_ENODEFILECHECK, node.name,
2221
                    "Node did not return file checksum data")
2222
      if test:
2223
        ignore_nodes.add(node.uuid)
2224
        continue
2225

    
2226
      # Build per-checksum mapping from filename to nodes having it
2227
      for (filename, checksum) in node_files.items():
2228
        assert filename in nodefiles
2229
        fileinfo[filename].setdefault(checksum, set()).add(node.uuid)
2230

    
2231
    for (filename, checksums) in fileinfo.items():
2232
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2233

    
2234
      # Nodes having the file
2235
      with_file = frozenset(node_uuid
2236
                            for node_uuids in fileinfo[filename].values()
2237
                            for node_uuid in node_uuids) - ignore_nodes
2238

    
2239
      expected_nodes = nodefiles[filename] - ignore_nodes
2240

    
2241
      # Nodes missing file
2242
      missing_file = expected_nodes - with_file
2243

    
2244
      if filename in files_opt:
2245
        # All or no nodes
2246
        self._ErrorIf(missing_file and missing_file != expected_nodes,
2247
                      constants.CV_ECLUSTERFILECHECK, None,
2248
                      "File %s is optional, but it must exist on all or no"
2249
                      " nodes (not found on %s)",
2250
                      filename,
2251
                      utils.CommaJoin(
2252
                        utils.NiceSort(
2253
                          map(self.cfg.GetNodeName, missing_file))))
2254
      else:
2255
        self._ErrorIf(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2256
                      "File %s is missing from node(s) %s", filename,
2257
                      utils.CommaJoin(
2258
                        utils.NiceSort(
2259
                          map(self.cfg.GetNodeName, missing_file))))
2260

    
2261
        # Warn if a node has a file it shouldn't
2262
        unexpected = with_file - expected_nodes
2263
        self._ErrorIf(unexpected,
2264
                      constants.CV_ECLUSTERFILECHECK, None,
2265
                      "File %s should not exist on node(s) %s",
2266
                      filename, utils.CommaJoin(
2267
                        utils.NiceSort(map(self.cfg.GetNodeName, unexpected))))
2268

    
2269
      # See if there are multiple versions of the file
2270
      test = len(checksums) > 1
2271
      if test:
2272
        variants = ["variant %s on %s" %
2273
                    (idx + 1,
2274
                     utils.CommaJoin(utils.NiceSort(
2275
                       map(self.cfg.GetNodeName, node_uuids))))
2276
                    for (idx, (checksum, node_uuids)) in
2277
                      enumerate(sorted(checksums.items()))]
2278
      else:
2279
        variants = []
2280

    
2281
      self._ErrorIf(test, constants.CV_ECLUSTERFILECHECK, None,
2282
                    "File %s found with %s different checksums (%s)",
2283
                    filename, len(checksums), "; ".join(variants))
2284

    
2285
  def _VerifyNodeDrbdHelper(self, ninfo, nresult, drbd_helper):
2286
    """Verify the drbd helper.
2287

2288
    """
2289
    if drbd_helper:
2290
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2291
      test = (helper_result is None)
2292
      self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2293
                    "no drbd usermode helper returned")
2294
      if helper_result:
2295
        status, payload = helper_result
2296
        test = not status
2297
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2298
                      "drbd usermode helper check unsuccessful: %s", payload)
2299
        test = status and (payload != drbd_helper)
2300
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2301
                      "wrong drbd usermode helper: %s", payload)
2302

    
2303
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2304
                      drbd_map):
2305
    """Verifies and the node DRBD status.
2306

2307
    @type ninfo: L{objects.Node}
2308
    @param ninfo: the node to check
2309
    @param nresult: the remote results for the node
2310
    @param instanceinfo: the dict of instances
2311
    @param drbd_helper: the configured DRBD usermode helper
2312
    @param drbd_map: the DRBD map as returned by
2313
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2314

2315
    """
2316
    self._VerifyNodeDrbdHelper(ninfo, nresult, drbd_helper)
2317

    
2318
    # compute the DRBD minors
2319
    node_drbd = {}
2320
    for minor, inst_uuid in drbd_map[ninfo.uuid].items():
2321
      test = inst_uuid not in instanceinfo
2322
      self._ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2323
                    "ghost instance '%s' in temporary DRBD map", inst_uuid)
2324
        # ghost instance should not be running, but otherwise we
2325
        # don't give double warnings (both ghost instance and
2326
        # unallocated minor in use)
2327
      if test:
2328
        node_drbd[minor] = (inst_uuid, False)
2329
      else:
2330
        instance = instanceinfo[inst_uuid]
2331
        node_drbd[minor] = (inst_uuid, instance.disks_active)
2332

    
2333
    # and now check them
2334
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2335
    test = not isinstance(used_minors, (tuple, list))
2336
    self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2337
                  "cannot parse drbd status file: %s", str(used_minors))
2338
    if test:
2339
      # we cannot check drbd status
2340
      return
2341

    
2342
    for minor, (inst_uuid, must_exist) in node_drbd.items():
2343
      test = minor not in used_minors and must_exist
2344
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2345
                    "drbd minor %d of instance %s is not active", minor,
2346
                    self.cfg.GetInstanceName(inst_uuid))
2347
    for minor in used_minors:
2348
      test = minor not in node_drbd
2349
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2350
                    "unallocated drbd minor %d is in use", minor)
2351

    
2352
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2353
    """Builds the node OS structures.
2354

2355
    @type ninfo: L{objects.Node}
2356
    @param ninfo: the node to check
2357
    @param nresult: the remote results for the node
2358
    @param nimg: the node image object
2359

2360
    """
2361
    remote_os = nresult.get(constants.NV_OSLIST, None)
2362
    test = (not isinstance(remote_os, list) or
2363
            not compat.all(isinstance(v, list) and len(v) == 7
2364
                           for v in remote_os))
2365

    
2366
    self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2367
                  "node hasn't returned valid OS data")
2368

    
2369
    nimg.os_fail = test
2370

    
2371
    if test:
2372
      return
2373

    
2374
    os_dict = {}
2375

    
2376
    for (name, os_path, status, diagnose,
2377
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2378

    
2379
      if name not in os_dict:
2380
        os_dict[name] = []
2381

    
2382
      # parameters is a list of lists instead of list of tuples due to
2383
      # JSON lacking a real tuple type, fix it:
2384
      parameters = [tuple(v) for v in parameters]
2385
      os_dict[name].append((os_path, status, diagnose,
2386
                            set(variants), set(parameters), set(api_ver)))
2387

    
2388
    nimg.oslist = os_dict
2389

    
2390
  def _VerifyNodeOS(self, ninfo, nimg, base):
2391
    """Verifies the node OS list.
2392

2393
    @type ninfo: L{objects.Node}
2394
    @param ninfo: the node to check
2395
    @param nimg: the node image object
2396
    @param base: the 'template' node we match against (e.g. from the master)
2397

2398
    """
2399
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2400

    
2401
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2402
    for os_name, os_data in nimg.oslist.items():
2403
      assert os_data, "Empty OS status for OS %s?!" % os_name
2404
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2405
      self._ErrorIf(not f_status, constants.CV_ENODEOS, ninfo.name,
2406
                    "Invalid OS %s (located at %s): %s",
2407
                    os_name, f_path, f_diag)
2408
      self._ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, ninfo.name,
2409
                    "OS '%s' has multiple entries"
2410
                    " (first one shadows the rest): %s",
2411
                    os_name, utils.CommaJoin([v[0] for v in os_data]))
2412
      # comparisons with the 'base' image
2413
      test = os_name not in base.oslist
2414
      self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2415
                    "Extra OS %s not present on reference node (%s)",
2416
                    os_name, self.cfg.GetNodeName(base.uuid))
2417
      if test:
2418
        continue
2419
      assert base.oslist[os_name], "Base node has empty OS status?"
2420
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2421
      if not b_status:
2422
        # base OS is invalid, skipping
2423
        continue
2424
      for kind, a, b in [("API version", f_api, b_api),
2425
                         ("variants list", f_var, b_var),
2426
                         ("parameters", beautify_params(f_param),
2427
                          beautify_params(b_param))]:
2428
        self._ErrorIf(a != b, constants.CV_ENODEOS, ninfo.name,
2429
                      "OS %s for %s differs from reference node %s:"
2430
                      " [%s] vs. [%s]", kind, os_name,
2431
                      self.cfg.GetNodeName(base.uuid),
2432
                      utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2433

    
2434
    # check any missing OSes
2435
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2436
    self._ErrorIf(missing, constants.CV_ENODEOS, ninfo.name,
2437
                  "OSes present on reference node %s"
2438
                  " but missing on this node: %s",
2439
                  self.cfg.GetNodeName(base.uuid), utils.CommaJoin(missing))
2440

    
2441
  def _VerifyAcceptedFileStoragePaths(self, ninfo, nresult, is_master):
2442
    """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2443

2444
    @type ninfo: L{objects.Node}
2445
    @param ninfo: the node to check
2446
    @param nresult: the remote results for the node
2447
    @type is_master: bool
2448
    @param is_master: Whether node is the master node
2449

2450
    """
2451
    cluster = self.cfg.GetClusterInfo()
2452
    if (is_master and
2453
        (cluster.IsFileStorageEnabled() or
2454
         cluster.IsSharedFileStorageEnabled())):
2455
      try:
2456
        fspaths = nresult[constants.NV_ACCEPTED_STORAGE_PATHS]
2457
      except KeyError:
2458
        # This should never happen
2459
        self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2460
                      "Node did not return forbidden file storage paths")
2461
      else:
2462
        self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2463
                      "Found forbidden file storage paths: %s",
2464
                      utils.CommaJoin(fspaths))
2465
    else:
2466
      self._ErrorIf(constants.NV_ACCEPTED_STORAGE_PATHS in nresult,
2467
                    constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2468
                    "Node should not have returned forbidden file storage"
2469
                    " paths")
2470

    
2471
  def _VerifyStoragePaths(self, ninfo, nresult, file_disk_template,
2472
                          verify_key, error_key):
2473
    """Verifies (file) storage paths.
2474

2475
    @type ninfo: L{objects.Node}
2476
    @param ninfo: the node to check
2477
    @param nresult: the remote results for the node
2478
    @type file_disk_template: string
2479
    @param file_disk_template: file-based disk template, whose directory
2480
        is supposed to be verified
2481
    @type verify_key: string
2482
    @param verify_key: key for the verification map of this file
2483
        verification step
2484
    @param error_key: error key to be added to the verification results
2485
        in case something goes wrong in this verification step
2486

2487
    """
2488
    assert (file_disk_template in
2489
            utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
2490
    cluster = self.cfg.GetClusterInfo()
2491
    if cluster.IsDiskTemplateEnabled(file_disk_template):
2492
      self._ErrorIf(
2493
          verify_key in nresult,
2494
          error_key, ninfo.name,
2495
          "The configured %s storage path is unusable: %s" %
2496
          (file_disk_template, nresult.get(verify_key)))
2497

    
2498
  def _VerifyFileStoragePaths(self, ninfo, nresult):
2499
    """Verifies (file) storage paths.
2500

2501
    @see: C{_VerifyStoragePaths}
2502

2503
    """
2504
    self._VerifyStoragePaths(
2505
        ninfo, nresult, constants.DT_FILE,
2506
        constants.NV_FILE_STORAGE_PATH,
2507
        constants.CV_ENODEFILESTORAGEPATHUNUSABLE)
2508

    
2509
  def _VerifySharedFileStoragePaths(self, ninfo, nresult):
2510
    """Verifies (file) storage paths.
2511

2512
    @see: C{_VerifyStoragePaths}
2513

2514
    """
2515
    self._VerifyStoragePaths(
2516
        ninfo, nresult, constants.DT_SHARED_FILE,
2517
        constants.NV_SHARED_FILE_STORAGE_PATH,
2518
        constants.CV_ENODESHAREDFILESTORAGEPATHUNUSABLE)
2519

    
2520
  def _VerifyOob(self, ninfo, nresult):
2521
    """Verifies out of band functionality of a node.
2522

2523
    @type ninfo: L{objects.Node}
2524
    @param ninfo: the node to check
2525
    @param nresult: the remote results for the node
2526

2527
    """
2528
    # We just have to verify the paths on master and/or master candidates
2529
    # as the oob helper is invoked on the master
2530
    if ((ninfo.master_candidate or ninfo.master_capable) and
2531
        constants.NV_OOB_PATHS in nresult):
2532
      for path_result in nresult[constants.NV_OOB_PATHS]:
2533
        self._ErrorIf(path_result, constants.CV_ENODEOOBPATH,
2534
                      ninfo.name, path_result)
2535

    
2536
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2537
    """Verifies and updates the node volume data.
2538

2539
    This function will update a L{NodeImage}'s internal structures
2540
    with data from the remote call.
2541

2542
    @type ninfo: L{objects.Node}
2543
    @param ninfo: the node to check
2544
    @param nresult: the remote results for the node
2545
    @param nimg: the node image object
2546
    @param vg_name: the configured VG name
2547

2548
    """
2549
    nimg.lvm_fail = True
2550
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2551
    if vg_name is None:
2552
      pass
2553
    elif isinstance(lvdata, basestring):
2554
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2555
                    "LVM problem on node: %s", utils.SafeEncode(lvdata))
2556
    elif not isinstance(lvdata, dict):
2557
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2558
                    "rpc call to node failed (lvlist)")
2559
    else:
2560
      nimg.volumes = lvdata
2561
      nimg.lvm_fail = False
2562

    
2563
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2564
    """Verifies and updates the node instance list.
2565

2566
    If the listing was successful, then updates this node's instance
2567
    list. Otherwise, it marks the RPC call as failed for the instance
2568
    list key.
2569

2570
    @type ninfo: L{objects.Node}
2571
    @param ninfo: the node to check
2572
    @param nresult: the remote results for the node
2573
    @param nimg: the node image object
2574

2575
    """
2576
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2577
    test = not isinstance(idata, list)
2578
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2579
                  "rpc call to node failed (instancelist): %s",
2580
                  utils.SafeEncode(str(idata)))
2581
    if test:
2582
      nimg.hyp_fail = True
2583
    else:
2584
      nimg.instances = [inst.uuid for (_, inst) in
2585
                        self.cfg.GetMultiInstanceInfoByName(idata)]
2586

    
2587
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2588
    """Verifies and computes a node information map
2589

2590
    @type ninfo: L{objects.Node}
2591
    @param ninfo: the node to check
2592
    @param nresult: the remote results for the node
2593
    @param nimg: the node image object
2594
    @param vg_name: the configured VG name
2595

2596
    """
2597
    # try to read free memory (from the hypervisor)
2598
    hv_info = nresult.get(constants.NV_HVINFO, None)
2599
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2600
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2601
                  "rpc call to node failed (hvinfo)")
2602
    if not test:
2603
      try:
2604
        nimg.mfree = int(hv_info["memory_free"])
2605
      except (ValueError, TypeError):
2606
        self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2607
                      "node returned invalid nodeinfo, check hypervisor")
2608

    
2609
    # FIXME: devise a free space model for file based instances as well
2610
    if vg_name is not None:
2611
      test = (constants.NV_VGLIST not in nresult or
2612
              vg_name not in nresult[constants.NV_VGLIST])
2613
      self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
2614
                    "node didn't return data for the volume group '%s'"
2615
                    " - it is either missing or broken", vg_name)
2616
      if not test:
2617
        try:
2618
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2619
        except (ValueError, TypeError):
2620
          self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2621
                        "node returned invalid LVM info, check LVM status")
2622

    
2623
  def _CollectDiskInfo(self, node_uuids, node_image, instanceinfo):
2624
    """Gets per-disk status information for all instances.
2625

2626
    @type node_uuids: list of strings
2627
    @param node_uuids: Node UUIDs
2628
    @type node_image: dict of (UUID, L{objects.Node})
2629
    @param node_image: Node objects
2630
    @type instanceinfo: dict of (UUID, L{objects.Instance})
2631
    @param instanceinfo: Instance objects
2632
    @rtype: {instance: {node: [(succes, payload)]}}
2633
    @return: a dictionary of per-instance dictionaries with nodes as
2634
        keys and disk information as values; the disk information is a
2635
        list of tuples (success, payload)
2636

2637
    """
2638
    node_disks = {}
2639
    node_disks_devonly = {}
2640
    diskless_instances = set()
2641
    diskless = constants.DT_DISKLESS
2642

    
2643
    for nuuid in node_uuids:
2644
      node_inst_uuids = list(itertools.chain(node_image[nuuid].pinst,
2645
                                             node_image[nuuid].sinst))
2646
      diskless_instances.update(uuid for uuid in node_inst_uuids
2647
                                if instanceinfo[uuid].disk_template == diskless)
2648
      disks = [(inst_uuid, disk)
2649
               for inst_uuid in node_inst_uuids
2650
               for disk in instanceinfo[inst_uuid].disks]
2651

    
2652
      if not disks:
2653
        # No need to collect data
2654
        continue
2655

    
2656
      node_disks[nuuid] = disks
2657

    
2658
      # _AnnotateDiskParams makes already copies of the disks
2659
      devonly = []
2660
      for (inst_uuid, dev) in disks:
2661
        (anno_disk,) = AnnotateDiskParams(instanceinfo[inst_uuid], [dev],
2662
                                          self.cfg)
2663
        self.cfg.SetDiskID(anno_disk, nuuid)
2664
        devonly.append(anno_disk)
2665

    
2666
      node_disks_devonly[nuuid] = devonly
2667

    
2668
    assert len(node_disks) == len(node_disks_devonly)
2669

    
2670
    # Collect data from all nodes with disks
2671
    result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2672
                                                          node_disks_devonly)
2673

    
2674
    assert len(result) == len(node_disks)
2675

    
2676
    instdisk = {}
2677

    
2678
    for (nuuid, nres) in result.items():
2679
      node = self.cfg.GetNodeInfo(nuuid)
2680
      disks = node_disks[node.uuid]
2681

    
2682
      if nres.offline:
2683
        # No data from this node
2684
        data = len(disks) * [(False, "node offline")]
2685
      else:
2686
        msg = nres.fail_msg
2687
        self._ErrorIf(msg, constants.CV_ENODERPC, node.name,
2688
                      "while getting disk information: %s", msg)
2689
        if msg:
2690
          # No data from this node
2691
          data = len(disks) * [(False, msg)]
2692
        else:
2693
          data = []
2694
          for idx, i in enumerate(nres.payload):
2695
            if isinstance(i, (tuple, list)) and len(i) == 2:
2696
              data.append(i)
2697
            else:
2698
              logging.warning("Invalid result from node %s, entry %d: %s",
2699
                              node.name, idx, i)
2700
              data.append((False, "Invalid result from the remote node"))
2701

    
2702
      for ((inst_uuid, _), status) in zip(disks, data):
2703
        instdisk.setdefault(inst_uuid, {}).setdefault(node.uuid, []) \
2704
          .append(status)
2705

    
2706
    # Add empty entries for diskless instances.
2707
    for inst_uuid in diskless_instances:
2708
      assert inst_uuid not in instdisk
2709
      instdisk[inst_uuid] = {}
2710

    
2711
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2712
                      len(nuuids) <= len(instanceinfo[inst].all_nodes) and
2713
                      compat.all(isinstance(s, (tuple, list)) and
2714
                                 len(s) == 2 for s in statuses)
2715
                      for inst, nuuids in instdisk.items()
2716
                      for nuuid, statuses in nuuids.items())
2717
    if __debug__:
2718
      instdisk_keys = set(instdisk)
2719
      instanceinfo_keys = set(instanceinfo)
2720
      assert instdisk_keys == instanceinfo_keys, \
2721
        ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2722
         (instdisk_keys, instanceinfo_keys))
2723

    
2724
    return instdisk
2725

    
2726
  @staticmethod
2727
  def _SshNodeSelector(group_uuid, all_nodes):
2728
    """Create endless iterators for all potential SSH check hosts.
2729

2730
    """
2731
    nodes = [node for node in all_nodes
2732
             if (node.group != group_uuid and
2733
                 not node.offline)]
2734
    keyfunc = operator.attrgetter("group")
2735

    
2736
    return map(itertools.cycle,
2737
               [sorted(map(operator.attrgetter("name"), names))
2738
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2739
                                                  keyfunc)])
2740

    
2741
  @classmethod
2742
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2743
    """Choose which nodes should talk to which other nodes.
2744

2745
    We will make nodes contact all nodes in their group, and one node from
2746
    every other group.
2747

2748
    @warning: This algorithm has a known issue if one node group is much
2749
      smaller than others (e.g. just one node). In such a case all other
2750
      nodes will talk to the single node.
2751

2752
    """
2753
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2754
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
2755

    
2756
    return (online_nodes,
2757
            dict((name, sorted([i.next() for i in sel]))
2758
                 for name in online_nodes))
2759

    
2760
  def BuildHooksEnv(self):
2761
    """Build hooks env.
2762

2763
    Cluster-Verify hooks just ran in the post phase and their failure makes
2764
    the output be logged in the verify output and the verification to fail.
2765

2766
    """
2767
    env = {
2768
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2769
      }
2770

    
2771
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2772
               for node in self.my_node_info.values())
2773

    
2774
    return env
2775

    
2776
  def BuildHooksNodes(self):
2777
    """Build hooks nodes.
2778

2779
    """
2780
    return ([], list(self.my_node_info.keys()))
2781

    
2782
  def Exec(self, feedback_fn):
2783
    """Verify integrity of the node group, performing various test on nodes.
2784

2785
    """
2786
    # This method has too many local variables. pylint: disable=R0914
2787
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
2788

    
2789
    if not self.my_node_uuids:
2790
      # empty node group
2791
      feedback_fn("* Empty node group, skipping verification")
2792
      return True
2793

    
2794
    self.bad = False
2795
    verbose = self.op.verbose
2796
    self._feedback_fn = feedback_fn
2797

    
2798
    vg_name = self.cfg.GetVGName()
2799
    drbd_helper = self.cfg.GetDRBDHelper()
2800
    cluster = self.cfg.GetClusterInfo()
2801
    hypervisors = cluster.enabled_hypervisors
2802
    node_data_list = self.my_node_info.values()
2803

    
2804
    i_non_redundant = [] # Non redundant instances
2805
    i_non_a_balanced = [] # Non auto-balanced instances
2806
    i_offline = 0 # Count of offline instances
2807
    n_offline = 0 # Count of offline nodes
2808
    n_drained = 0 # Count of nodes being drained
2809
    node_vol_should = {}
2810

    
2811
    # FIXME: verify OS list
2812

    
2813
    # File verification
2814
    filemap = ComputeAncillaryFiles(cluster, False)
2815

    
2816
    # do local checksums
2817
    master_node_uuid = self.master_node = self.cfg.GetMasterNode()
2818
    master_ip = self.cfg.GetMasterIP()
2819

    
2820
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_uuids))
2821

    
2822
    user_scripts = []
2823
    if self.cfg.GetUseExternalMipScript():
2824
      user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2825

    
2826
    node_verify_param = {
2827
      constants.NV_FILELIST:
2828
        map(vcluster.MakeVirtualPath,
2829
            utils.UniqueSequence(filename
2830
                                 for files in filemap
2831
                                 for filename in files)),
2832
      constants.NV_NODELIST:
2833
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2834
                                  self.all_node_info.values()),
2835
      constants.NV_HYPERVISOR: hypervisors,
2836
      constants.NV_HVPARAMS:
2837
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2838
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2839
                                 for node in node_data_list
2840
                                 if not node.offline],
2841
      constants.NV_INSTANCELIST: hypervisors,
2842
      constants.NV_VERSION: None,
2843
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2844
      constants.NV_NODESETUP: None,
2845
      constants.NV_TIME: None,
2846
      constants.NV_MASTERIP: (self.cfg.GetMasterNodeName(), master_ip),
2847
      constants.NV_OSLIST: None,
2848
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2849
      constants.NV_USERSCRIPTS: user_scripts,
2850
      }
2851

    
2852
    if vg_name is not None:
2853
      node_verify_param[constants.NV_VGLIST] = None
2854
      node_verify_param[constants.NV_LVLIST] = vg_name
2855
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2856

    
2857
    if cluster.IsDiskTemplateEnabled(constants.DT_DRBD8):
2858
      if drbd_helper:
2859
        node_verify_param[constants.NV_DRBDVERSION] = None
2860
        node_verify_param[constants.NV_DRBDLIST] = None
2861
        node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2862

    
2863
    if cluster.IsFileStorageEnabled() or \
2864
        cluster.IsSharedFileStorageEnabled():
2865
      # Load file storage paths only from master node
2866
      node_verify_param[constants.NV_ACCEPTED_STORAGE_PATHS] = \
2867
        self.cfg.GetMasterNodeName()
2868
      if cluster.IsFileStorageEnabled():
2869
        node_verify_param[constants.NV_FILE_STORAGE_PATH] = \
2870
          cluster.file_storage_dir
2871

    
2872
    # bridge checks
2873
    # FIXME: this needs to be changed per node-group, not cluster-wide
2874
    bridges = set()
2875
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2876
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2877
      bridges.add(default_nicpp[constants.NIC_LINK])
2878
    for inst_uuid in self.my_inst_info.values():
2879
      for nic in inst_uuid.nics:
2880
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
2881
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2882
          bridges.add(full_nic[constants.NIC_LINK])
2883

    
2884
    if bridges:
2885
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
2886

    
2887
    # Build our expected cluster state
2888
    node_image = dict((node.uuid, self.NodeImage(offline=node.offline,
2889
                                                 uuid=node.uuid,
2890
                                                 vm_capable=node.vm_capable))
2891
                      for node in node_data_list)
2892

    
2893
    # Gather OOB paths
2894
    oob_paths = []
2895
    for node in self.all_node_info.values():
2896
      path = SupportsOob(self.cfg, node)
2897
      if path and path not in oob_paths:
2898
        oob_paths.append(path)
2899

    
2900
    if oob_paths:
2901
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2902

    
2903
    for inst_uuid in self.my_inst_uuids:
2904
      instance = self.my_inst_info[inst_uuid]
2905
      if instance.admin_state == constants.ADMINST_OFFLINE:
2906
        i_offline += 1
2907

    
2908
      for nuuid in instance.all_nodes:
2909
        if nuuid not in node_image:
2910
          gnode = self.NodeImage(uuid=nuuid)
2911
          gnode.ghost = (nuuid not in self.all_node_info)
2912
          node_image[nuuid] = gnode
2913

    
2914
      instance.MapLVsByNode(node_vol_should)
2915

    
2916
      pnode = instance.primary_node
2917
      node_image[pnode].pinst.append(instance.uuid)
2918

    
2919
      for snode in instance.secondary_nodes:
2920
        nimg = node_image[snode]
2921
        nimg.sinst.append(instance.uuid)
2922
        if pnode not in nimg.sbp:
2923
          nimg.sbp[pnode] = []
2924
        nimg.sbp[pnode].append(instance.uuid)
2925

    
2926
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
2927
                                               self.my_node_info.keys())
2928
    # The value of exclusive_storage should be the same across the group, so if
2929
    # it's True for at least a node, we act as if it were set for all the nodes
2930
    self._exclusive_storage = compat.any(es_flags.values())
2931
    if self._exclusive_storage:
2932
      node_verify_param[constants.NV_EXCLUSIVEPVS] = True
2933

    
2934
    # At this point, we have the in-memory data structures complete,
2935
    # except for the runtime information, which we'll gather next
2936

    
2937
    # Due to the way our RPC system works, exact response times cannot be
2938
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2939
    # time before and after executing the request, we can at least have a time
2940
    # window.
2941
    nvinfo_starttime = time.time()
2942
    all_nvinfo = self.rpc.call_node_verify(self.my_node_uuids,
2943
                                           node_verify_param,
2944
                                           self.cfg.GetClusterName(),
2945
                                           self.cfg.GetClusterInfo().hvparams)
2946
    nvinfo_endtime = time.time()
2947

    
2948
    if self.extra_lv_nodes and vg_name is not None:
2949
      extra_lv_nvinfo = \
2950
          self.rpc.call_node_verify(self.extra_lv_nodes,
2951
                                    {constants.NV_LVLIST: vg_name},
2952
                                    self.cfg.GetClusterName(),
2953
                                    self.cfg.GetClusterInfo().hvparams)
2954
    else:
2955
      extra_lv_nvinfo = {}
2956

    
2957
    all_drbd_map = self.cfg.ComputeDRBDMap()
2958

    
2959
    feedback_fn("* Gathering disk information (%s nodes)" %
2960
                len(self.my_node_uuids))
2961
    instdisk = self._CollectDiskInfo(self.my_node_info.keys(), node_image,
2962
                                     self.my_inst_info)
2963

    
2964
    feedback_fn("* Verifying configuration file consistency")
2965

    
2966
    # If not all nodes are being checked, we need to make sure the master node
2967
    # and a non-checked vm_capable node are in the list.
2968
    absent_node_uuids = set(self.all_node_info).difference(self.my_node_info)
2969
    if absent_node_uuids:
2970
      vf_nvinfo = all_nvinfo.copy()
2971
      vf_node_info = list(self.my_node_info.values())
2972
      additional_node_uuids = []
2973
      if master_node_uuid not in self.my_node_info:
2974
        additional_node_uuids.append(master_node_uuid)
2975
        vf_node_info.append(self.all_node_info[master_node_uuid])
2976
      # Add the first vm_capable node we find which is not included,
2977
      # excluding the master node (which we already have)
2978
      for node_uuid in absent_node_uuids:
2979
        nodeinfo = self.all_node_info[node_uuid]
2980
        if (nodeinfo.vm_capable and not nodeinfo.offline and
2981
            node_uuid != master_node_uuid):
2982
          additional_node_uuids.append(node_uuid)
2983
          vf_node_info.append(self.all_node_info[node_uuid])
2984
          break
2985
      key = constants.NV_FILELIST
2986
      vf_nvinfo.update(self.rpc.call_node_verify(
2987
         additional_node_uuids, {key: node_verify_param[key]},
2988
         self.cfg.GetClusterName(), self.cfg.GetClusterInfo().hvparams))
2989
    else:
2990
      vf_nvinfo = all_nvinfo
2991
      vf_node_info = self.my_node_info.values()
2992

    
2993
    self._VerifyFiles(vf_node_info, master_node_uuid, vf_nvinfo, filemap)
2994

    
2995
    feedback_fn("* Verifying node status")
2996

    
2997
    refos_img = None
2998

    
2999
    for node_i in node_data_list:
3000
      nimg = node_image[node_i.uuid]
3001

    
3002
      if node_i.offline:
3003
        if verbose:
3004
          feedback_fn("* Skipping offline node %s" % (node_i.name,))
3005
        n_offline += 1
3006
        continue
3007

    
3008
      if node_i.uuid == master_node_uuid:
3009
        ntype = "master"
3010
      elif node_i.master_candidate:
3011
        ntype = "master candidate"
3012
      elif node_i.drained:
3013
        ntype = "drained"
3014
        n_drained += 1
3015
      else:
3016
        ntype = "regular"
3017
      if verbose:
3018
        feedback_fn("* Verifying node %s (%s)" % (node_i.name, ntype))
3019

    
3020
      msg = all_nvinfo[node_i.uuid].fail_msg
3021
      self._ErrorIf(msg, constants.CV_ENODERPC, node_i.name,
3022
                    "while contacting node: %s", msg)
3023
      if msg:
3024
        nimg.rpc_fail = True
3025
        continue
3026

    
3027
      nresult = all_nvinfo[node_i.uuid].payload
3028

    
3029
      nimg.call_ok = self._VerifyNode(node_i, nresult)
3030
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
3031
      self._VerifyNodeNetwork(node_i, nresult)
3032
      self._VerifyNodeUserScripts(node_i, nresult)
3033
      self._VerifyOob(node_i, nresult)
3034
      self._VerifyAcceptedFileStoragePaths(node_i, nresult,
3035
                                           node_i.uuid == master_node_uuid)
3036
      self._VerifyFileStoragePaths(node_i, nresult)
3037
      self._VerifySharedFileStoragePaths(node_i, nresult)
3038

    
3039
      if nimg.vm_capable:
3040
        self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
3041
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
3042
                             all_drbd_map)
3043

    
3044
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
3045
        self._UpdateNodeInstances(node_i, nresult, nimg)
3046
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
3047
        self._UpdateNodeOS(node_i, nresult, nimg)
3048

    
3049
        if not nimg.os_fail:
3050
          if refos_img is None:
3051
            refos_img = nimg
3052
          self._VerifyNodeOS(node_i, nimg, refos_img)
3053
        self._VerifyNodeBridges(node_i, nresult, bridges)
3054

    
3055
        # Check whether all running instances are primary for the node. (This
3056
        # can no longer be done from _VerifyInstance below, since some of the
3057
        # wrong instances could be from other node groups.)
3058
        non_primary_inst_uuids = set(nimg.instances).difference(nimg.pinst)
3059

    
3060
        for inst_uuid in non_primary_inst_uuids:
3061
          test = inst_uuid in self.all_inst_info
3062
          self._ErrorIf(test, constants.CV_EINSTANCEWRONGNODE,
3063
                        self.cfg.GetInstanceName(inst_uuid),
3064
                        "instance should not run on node %s", node_i.name)
3065
          self._ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
3066
                        "node is running unknown instance %s", inst_uuid)
3067

    
3068
    self._VerifyGroupDRBDVersion(all_nvinfo)
3069
    self._VerifyGroupLVM(node_image, vg_name)
3070

    
3071
    for node_uuid, result in extra_lv_nvinfo.items():
3072
      self._UpdateNodeVolumes(self.all_node_info[node_uuid], result.payload,
3073
                              node_image[node_uuid], vg_name)
3074

    
3075
    feedback_fn("* Verifying instance status")
3076
    for inst_uuid in self.my_inst_uuids:
3077
      instance = self.my_inst_info[inst_uuid]
3078
      if verbose:
3079
        feedback_fn("* Verifying instance %s" % instance.name)
3080
      self._VerifyInstance(instance, node_image, instdisk[inst_uuid])
3081

    
3082
      # If the instance is non-redundant we cannot survive losing its primary
3083
      # node, so we are not N+1 compliant.
3084
      if instance.disk_template not in constants.DTS_MIRRORED:
3085
        i_non_redundant.append(instance)
3086

    
3087
      if not cluster.FillBE(instance)[constants.BE_AUTO_BALANCE]:
3088
        i_non_a_balanced.append(instance)
3089

    
3090
    feedback_fn("* Verifying orphan volumes")
3091
    reserved = utils.FieldSet(*cluster.reserved_lvs)
3092

    
3093
    # We will get spurious "unknown volume" warnings if any node of this group
3094
    # is secondary for an instance whose primary is in another group. To avoid
3095
    # them, we find these instances and add their volumes to node_vol_should.
3096
    for instance in self.all_inst_info.values():
3097
      for secondary in instance.secondary_nodes:
3098
        if (secondary in self.my_node_info
3099
            and instance.name not in self.my_inst_info):
3100
          instance.MapLVsByNode(node_vol_should)
3101
          break
3102

    
3103
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
3104

    
3105
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
3106
      feedback_fn("* Verifying N+1 Memory redundancy")
3107
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
3108

    
3109
    feedback_fn("* Other Notes")
3110
    if i_non_redundant:
3111
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
3112
                  % len(i_non_redundant))
3113

    
3114
    if i_non_a_balanced:
3115
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
3116
                  % len(i_non_a_balanced))
3117

    
3118
    if i_offline:
3119
      feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
3120

    
3121
    if n_offline:
3122
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
3123

    
3124
    if n_drained:
3125
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
3126

    
3127
    return not self.bad
3128

    
3129
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
3130
    """Analyze the post-hooks' result
3131

3132
    This method analyses the hook result, handles it, and sends some
3133
    nicely-formatted feedback back to the user.
3134

3135
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
3136
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
3137
    @param hooks_results: the results of the multi-node hooks rpc call
3138
    @param feedback_fn: function used send feedback back to the caller
3139
    @param lu_result: previous Exec result
3140
    @return: the new Exec result, based on the previous result
3141
        and hook results
3142

3143
    """
3144
    # We only really run POST phase hooks, only for non-empty groups,
3145
    # and are only interested in their results
3146
    if not self.my_node_uuids:
3147
      # empty node group
3148
      pass
3149
    elif phase == constants.HOOKS_PHASE_POST:
3150
      # Used to change hooks' output to proper indentation
3151
      feedback_fn("* Hooks Results")
3152
      assert hooks_results, "invalid result from hooks"
3153

    
3154
      for node_name in hooks_results:
3155
        res = hooks_results[node_name]
3156
        msg = res.fail_msg
3157
        test = msg and not res.offline
3158
        self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3159
                      "Communication failure in hooks execution: %s", msg)
3160
        if res.offline or msg:
3161
          # No need to investigate payload if node is offline or gave
3162
          # an error.
3163
          continue
3164
        for script, hkr, output in res.payload:
3165
          test = hkr == constants.HKR_FAIL
3166
          self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3167
                        "Script %s failed, output:", script)
3168
          if test:
3169
            output = self._HOOKS_INDENT_RE.sub("      ", output)
3170
            feedback_fn("%s" % output)
3171
            lu_result = False
3172

    
3173
    return lu_result
3174

    
3175

    
3176
class LUClusterVerifyDisks(NoHooksLU):
3177
  """Verifies the cluster disks status.
3178

3179
  """
3180
  REQ_BGL = False
3181

    
3182
  def ExpandNames(self):
3183
    self.share_locks = ShareAll()
3184
    self.needed_locks = {
3185
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
3186
      }
3187

    
3188
  def Exec(self, feedback_fn):
3189
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3190

    
3191
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3192
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3193
                           for group in group_names])