Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / cluster.py @ d66acf3d

History | View | Annotate | Download (119.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with the cluster."""
23

    
24
import OpenSSL
25

    
26
import copy
27
import itertools
28
import logging
29
import operator
30
import os
31
import re
32
import time
33

    
34
from ganeti import compat
35
from ganeti import constants
36
from ganeti import errors
37
from ganeti import hypervisor
38
from ganeti import locking
39
from ganeti import masterd
40
from ganeti import netutils
41
from ganeti import objects
42
from ganeti import opcodes
43
from ganeti import pathutils
44
from ganeti import query
45
from ganeti import rpc
46
from ganeti import runtime
47
from ganeti import ssh
48
from ganeti import uidpool
49
from ganeti import utils
50
from ganeti import vcluster
51

    
52
from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
53
  ResultWithJobs
54
from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55
  ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56
  GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57
  GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58
  CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59
  ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob, \
60
  CheckIpolicyVsDiskTemplates
61

    
62
import ganeti.masterd.instance
63

    
64

    
65
class LUClusterActivateMasterIp(NoHooksLU):
66
  """Activate the master IP on the master node.
67

68
  """
69
  def Exec(self, feedback_fn):
70
    """Activate the master IP.
71

72
    """
73
    master_params = self.cfg.GetMasterNetworkParameters()
74
    ems = self.cfg.GetUseExternalMipScript()
75
    result = self.rpc.call_node_activate_master_ip(master_params.uuid,
76
                                                   master_params, ems)
77
    result.Raise("Could not activate the master IP")
78

    
79

    
80
class LUClusterDeactivateMasterIp(NoHooksLU):
81
  """Deactivate the master IP on the master node.
82

83
  """
84
  def Exec(self, feedback_fn):
85
    """Deactivate the master IP.
86

87
    """
88
    master_params = self.cfg.GetMasterNetworkParameters()
89
    ems = self.cfg.GetUseExternalMipScript()
90
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
91
                                                     master_params, ems)
92
    result.Raise("Could not deactivate the master IP")
93

    
94

    
95
class LUClusterConfigQuery(NoHooksLU):
96
  """Return configuration values.
97

98
  """
99
  REQ_BGL = False
100

    
101
  def CheckArguments(self):
102
    self.cq = ClusterQuery(None, self.op.output_fields, False)
103

    
104
  def ExpandNames(self):
105
    self.cq.ExpandNames(self)
106

    
107
  def DeclareLocks(self, level):
108
    self.cq.DeclareLocks(self, level)
109

    
110
  def Exec(self, feedback_fn):
111
    result = self.cq.OldStyleQuery(self)
112

    
113
    assert len(result) == 1
114

    
115
    return result[0]
116

    
117

    
118
class LUClusterDestroy(LogicalUnit):
119
  """Logical unit for destroying the cluster.
120

121
  """
122
  HPATH = "cluster-destroy"
123
  HTYPE = constants.HTYPE_CLUSTER
124

    
125
  def BuildHooksEnv(self):
126
    """Build hooks env.
127

128
    """
129
    return {
130
      "OP_TARGET": self.cfg.GetClusterName(),
131
      }
132

    
133
  def BuildHooksNodes(self):
134
    """Build hooks nodes.
135

136
    """
137
    return ([], [])
138

    
139
  def CheckPrereq(self):
140
    """Check prerequisites.
141

142
    This checks whether the cluster is empty.
143

144
    Any errors are signaled by raising errors.OpPrereqError.
145

146
    """
147
    master = self.cfg.GetMasterNode()
148

    
149
    nodelist = self.cfg.GetNodeList()
150
    if len(nodelist) != 1 or nodelist[0] != master:
151
      raise errors.OpPrereqError("There are still %d node(s) in"
152
                                 " this cluster." % (len(nodelist) - 1),
153
                                 errors.ECODE_INVAL)
154
    instancelist = self.cfg.GetInstanceList()
155
    if instancelist:
156
      raise errors.OpPrereqError("There are still %d instance(s) in"
157
                                 " this cluster." % len(instancelist),
158
                                 errors.ECODE_INVAL)
159

    
160
  def Exec(self, feedback_fn):
161
    """Destroys the cluster.
162

163
    """
164
    master_params = self.cfg.GetMasterNetworkParameters()
165

    
166
    # Run post hooks on master node before it's removed
167
    RunPostHook(self, self.cfg.GetNodeName(master_params.uuid))
168

    
169
    ems = self.cfg.GetUseExternalMipScript()
170
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
171
                                                     master_params, ems)
172
    result.Warn("Error disabling the master IP address", self.LogWarning)
173
    return master_params.uuid
174

    
175

    
176
class LUClusterPostInit(LogicalUnit):
177
  """Logical unit for running hooks after cluster initialization.
178

179
  """
180
  HPATH = "cluster-init"
181
  HTYPE = constants.HTYPE_CLUSTER
182

    
183
  def BuildHooksEnv(self):
184
    """Build hooks env.
185

186
    """
187
    return {
188
      "OP_TARGET": self.cfg.GetClusterName(),
189
      }
190

    
191
  def BuildHooksNodes(self):
192
    """Build hooks nodes.
193

194
    """
195
    return ([], [self.cfg.GetMasterNode()])
196

    
197
  def Exec(self, feedback_fn):
198
    """Nothing to do.
199

200
    """
201
    return True
202

    
203

    
204
class ClusterQuery(QueryBase):
205
  FIELDS = query.CLUSTER_FIELDS
206

    
207
  #: Do not sort (there is only one item)
208
  SORT_FIELD = None
209

    
210
  def ExpandNames(self, lu):
211
    lu.needed_locks = {}
212

    
213
    # The following variables interact with _QueryBase._GetNames
214
    self.wanted = locking.ALL_SET
215
    self.do_locking = self.use_locking
216

    
217
    if self.do_locking:
218
      raise errors.OpPrereqError("Can not use locking for cluster queries",
219
                                 errors.ECODE_INVAL)
220

    
221
  def DeclareLocks(self, lu, level):
222
    pass
223

    
224
  def _GetQueryData(self, lu):
225
    """Computes the list of nodes and their attributes.
226

227
    """
228
    # Locking is not used
229
    assert not (compat.any(lu.glm.is_owned(level)
230
                           for level in locking.LEVELS
231
                           if level != locking.LEVEL_CLUSTER) or
232
                self.do_locking or self.use_locking)
233

    
234
    if query.CQ_CONFIG in self.requested_data:
235
      cluster = lu.cfg.GetClusterInfo()
236
      nodes = lu.cfg.GetAllNodesInfo()
237
    else:
238
      cluster = NotImplemented
239
      nodes = NotImplemented
240

    
241
    if query.CQ_QUEUE_DRAINED in self.requested_data:
242
      drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
243
    else:
244
      drain_flag = NotImplemented
245

    
246
    if query.CQ_WATCHER_PAUSE in self.requested_data:
247
      master_node_uuid = lu.cfg.GetMasterNode()
248

    
249
      result = lu.rpc.call_get_watcher_pause(master_node_uuid)
250
      result.Raise("Can't retrieve watcher pause from master node '%s'" %
251
                   lu.cfg.GetMasterNodeName())
252

    
253
      watcher_pause = result.payload
254
    else:
255
      watcher_pause = NotImplemented
256

    
257
    return query.ClusterQueryData(cluster, nodes, drain_flag, watcher_pause)
258

    
259

    
260
class LUClusterQuery(NoHooksLU):
261
  """Query cluster configuration.
262

263
  """
264
  REQ_BGL = False
265

    
266
  def ExpandNames(self):
267
    self.needed_locks = {}
268

    
269
  def Exec(self, feedback_fn):
270
    """Return cluster config.
271

272
    """
273
    cluster = self.cfg.GetClusterInfo()
274
    os_hvp = {}
275

    
276
    # Filter just for enabled hypervisors
277
    for os_name, hv_dict in cluster.os_hvp.items():
278
      os_hvp[os_name] = {}
279
      for hv_name, hv_params in hv_dict.items():
280
        if hv_name in cluster.enabled_hypervisors:
281
          os_hvp[os_name][hv_name] = hv_params
282

    
283
    # Convert ip_family to ip_version
284
    primary_ip_version = constants.IP4_VERSION
285
    if cluster.primary_ip_family == netutils.IP6Address.family:
286
      primary_ip_version = constants.IP6_VERSION
287

    
288
    result = {
289
      "software_version": constants.RELEASE_VERSION,
290
      "protocol_version": constants.PROTOCOL_VERSION,
291
      "config_version": constants.CONFIG_VERSION,
292
      "os_api_version": max(constants.OS_API_VERSIONS),
293
      "export_version": constants.EXPORT_VERSION,
294
      "vcs_version": constants.VCS_VERSION,
295
      "architecture": runtime.GetArchInfo(),
296
      "name": cluster.cluster_name,
297
      "master": self.cfg.GetMasterNodeName(),
298
      "default_hypervisor": cluster.primary_hypervisor,
299
      "enabled_hypervisors": cluster.enabled_hypervisors,
300
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
301
                        for hypervisor_name in cluster.enabled_hypervisors]),
302
      "os_hvp": os_hvp,
303
      "beparams": cluster.beparams,
304
      "osparams": cluster.osparams,
305
      "ipolicy": cluster.ipolicy,
306
      "nicparams": cluster.nicparams,
307
      "ndparams": cluster.ndparams,
308
      "diskparams": cluster.diskparams,
309
      "candidate_pool_size": cluster.candidate_pool_size,
310
      "master_netdev": cluster.master_netdev,
311
      "master_netmask": cluster.master_netmask,
312
      "use_external_mip_script": cluster.use_external_mip_script,
313
      "volume_group_name": cluster.volume_group_name,
314
      "drbd_usermode_helper": cluster.drbd_usermode_helper,
315
      "file_storage_dir": cluster.file_storage_dir,
316
      "shared_file_storage_dir": cluster.shared_file_storage_dir,
317
      "maintain_node_health": cluster.maintain_node_health,
318
      "ctime": cluster.ctime,
319
      "mtime": cluster.mtime,
320
      "uuid": cluster.uuid,
321
      "tags": list(cluster.GetTags()),
322
      "uid_pool": cluster.uid_pool,
323
      "default_iallocator": cluster.default_iallocator,
324
      "reserved_lvs": cluster.reserved_lvs,
325
      "primary_ip_version": primary_ip_version,
326
      "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
327
      "hidden_os": cluster.hidden_os,
328
      "blacklisted_os": cluster.blacklisted_os,
329
      "enabled_disk_templates": cluster.enabled_disk_templates,
330
      }
331

    
332
    return result
333

    
334

    
335
class LUClusterRedistConf(NoHooksLU):
336
  """Force the redistribution of cluster configuration.
337

338
  This is a very simple LU.
339

340
  """
341
  REQ_BGL = False
342

    
343
  def ExpandNames(self):
344
    self.needed_locks = {
345
      locking.LEVEL_NODE: locking.ALL_SET,
346
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
347
    }
348
    self.share_locks = ShareAll()
349

    
350
  def Exec(self, feedback_fn):
351
    """Redistribute the configuration.
352

353
    """
354
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
355
    RedistributeAncillaryFiles(self)
356

    
357

    
358
class LUClusterRename(LogicalUnit):
359
  """Rename the cluster.
360

361
  """
362
  HPATH = "cluster-rename"
363
  HTYPE = constants.HTYPE_CLUSTER
364

    
365
  def BuildHooksEnv(self):
366
    """Build hooks env.
367

368
    """
369
    return {
370
      "OP_TARGET": self.cfg.GetClusterName(),
371
      "NEW_NAME": self.op.name,
372
      }
373

    
374
  def BuildHooksNodes(self):
375
    """Build hooks nodes.
376

377
    """
378
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
379

    
380
  def CheckPrereq(self):
381
    """Verify that the passed name is a valid one.
382

383
    """
384
    hostname = netutils.GetHostname(name=self.op.name,
385
                                    family=self.cfg.GetPrimaryIPFamily())
386

    
387
    new_name = hostname.name
388
    self.ip = new_ip = hostname.ip
389
    old_name = self.cfg.GetClusterName()
390
    old_ip = self.cfg.GetMasterIP()
391
    if new_name == old_name and new_ip == old_ip:
392
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
393
                                 " cluster has changed",
394
                                 errors.ECODE_INVAL)
395
    if new_ip != old_ip:
396
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
397
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
398
                                   " reachable on the network" %
399
                                   new_ip, errors.ECODE_NOTUNIQUE)
400

    
401
    self.op.name = new_name
402

    
403
  def Exec(self, feedback_fn):
404
    """Rename the cluster.
405

406
    """
407
    clustername = self.op.name
408
    new_ip = self.ip
409

    
410
    # shutdown the master IP
411
    master_params = self.cfg.GetMasterNetworkParameters()
412
    ems = self.cfg.GetUseExternalMipScript()
413
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
414
                                                     master_params, ems)
415
    result.Raise("Could not disable the master role")
416

    
417
    try:
418
      cluster = self.cfg.GetClusterInfo()
419
      cluster.cluster_name = clustername
420
      cluster.master_ip = new_ip
421
      self.cfg.Update(cluster, feedback_fn)
422

    
423
      # update the known hosts file
424
      ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
425
      node_list = self.cfg.GetOnlineNodeList()
426
      try:
427
        node_list.remove(master_params.uuid)
428
      except ValueError:
429
        pass
430
      UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
431
    finally:
432
      master_params.ip = new_ip
433
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
434
                                                     master_params, ems)
435
      result.Warn("Could not re-enable the master role on the master,"
436
                  " please restart manually", self.LogWarning)
437

    
438
    return clustername
439

    
440

    
441
class LUClusterRepairDiskSizes(NoHooksLU):
442
  """Verifies the cluster disks sizes.
443

444
  """
445
  REQ_BGL = False
446

    
447
  def ExpandNames(self):
448
    if self.op.instances:
449
      (_, self.wanted_names) = GetWantedInstances(self, self.op.instances)
450
      # Not getting the node allocation lock as only a specific set of
451
      # instances (and their nodes) is going to be acquired
452
      self.needed_locks = {
453
        locking.LEVEL_NODE_RES: [],
454
        locking.LEVEL_INSTANCE: self.wanted_names,
455
        }
456
      self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
457
    else:
458
      self.wanted_names = None
459
      self.needed_locks = {
460
        locking.LEVEL_NODE_RES: locking.ALL_SET,
461
        locking.LEVEL_INSTANCE: locking.ALL_SET,
462

    
463
        # This opcode is acquires the node locks for all instances
464
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
465
        }
466

    
467
    self.share_locks = {
468
      locking.LEVEL_NODE_RES: 1,
469
      locking.LEVEL_INSTANCE: 0,
470
      locking.LEVEL_NODE_ALLOC: 1,
471
      }
472

    
473
  def DeclareLocks(self, level):
474
    if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
475
      self._LockInstancesNodes(primary_only=True, level=level)
476

    
477
  def CheckPrereq(self):
478
    """Check prerequisites.
479

480
    This only checks the optional instance list against the existing names.
481

482
    """
483
    if self.wanted_names is None:
484
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
485

    
486
    self.wanted_instances = \
487
        map(compat.snd, self.cfg.GetMultiInstanceInfoByName(self.wanted_names))
488

    
489
  def _EnsureChildSizes(self, disk):
490
    """Ensure children of the disk have the needed disk size.
491

492
    This is valid mainly for DRBD8 and fixes an issue where the
493
    children have smaller disk size.
494

495
    @param disk: an L{ganeti.objects.Disk} object
496

497
    """
498
    if disk.dev_type == constants.DT_DRBD8:
499
      assert disk.children, "Empty children for DRBD8?"
500
      fchild = disk.children[0]
501
      mismatch = fchild.size < disk.size
502
      if mismatch:
503
        self.LogInfo("Child disk has size %d, parent %d, fixing",
504
                     fchild.size, disk.size)
505
        fchild.size = disk.size
506

    
507
      # and we recurse on this child only, not on the metadev
508
      return self._EnsureChildSizes(fchild) or mismatch
509
    else:
510
      return False
511

    
512
  def Exec(self, feedback_fn):
513
    """Verify the size of cluster disks.
514

515
    """
516
    # TODO: check child disks too
517
    # TODO: check differences in size between primary/secondary nodes
518
    per_node_disks = {}
519
    for instance in self.wanted_instances:
520
      pnode = instance.primary_node
521
      if pnode not in per_node_disks:
522
        per_node_disks[pnode] = []
523
      for idx, disk in enumerate(instance.disks):
524
        per_node_disks[pnode].append((instance, idx, disk))
525

    
526
    assert not (frozenset(per_node_disks.keys()) -
527
                self.owned_locks(locking.LEVEL_NODE_RES)), \
528
      "Not owning correct locks"
529
    assert not self.owned_locks(locking.LEVEL_NODE)
530

    
531
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
532
                                               per_node_disks.keys())
533

    
534
    changed = []
535
    for node_uuid, dskl in per_node_disks.items():
536
      if not dskl:
537
        # no disks on the node
538
        continue
539

    
540
      newl = [([v[2].Copy()], v[0]) for v in dskl]
541
      node_name = self.cfg.GetNodeName(node_uuid)
542
      result = self.rpc.call_blockdev_getdimensions(node_uuid, newl)
543
      if result.fail_msg:
544
        self.LogWarning("Failure in blockdev_getdimensions call to node"
545
                        " %s, ignoring", node_name)
546
        continue
547
      if len(result.payload) != len(dskl):
548
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
549
                        " result.payload=%s", node_name, len(dskl),
550
                        result.payload)
551
        self.LogWarning("Invalid result from node %s, ignoring node results",
552
                        node_name)
553
        continue
554
      for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
555
        if dimensions is None:
556
          self.LogWarning("Disk %d of instance %s did not return size"
557
                          " information, ignoring", idx, instance.name)
558
          continue
559
        if not isinstance(dimensions, (tuple, list)):
560
          self.LogWarning("Disk %d of instance %s did not return valid"
561
                          " dimension information, ignoring", idx,
562
                          instance.name)
563
          continue
564
        (size, spindles) = dimensions
565
        if not isinstance(size, (int, long)):
566
          self.LogWarning("Disk %d of instance %s did not return valid"
567
                          " size information, ignoring", idx, instance.name)
568
          continue
569
        size = size >> 20
570
        if size != disk.size:
571
          self.LogInfo("Disk %d of instance %s has mismatched size,"
572
                       " correcting: recorded %d, actual %d", idx,
573
                       instance.name, disk.size, size)
574
          disk.size = size
575
          self.cfg.Update(instance, feedback_fn)
576
          changed.append((instance.name, idx, "size", size))
577
        if es_flags[node_uuid]:
578
          if spindles is None:
579
            self.LogWarning("Disk %d of instance %s did not return valid"
580
                            " spindles information, ignoring", idx,
581
                            instance.name)
582
          elif disk.spindles is None or disk.spindles != spindles:
583
            self.LogInfo("Disk %d of instance %s has mismatched spindles,"
584
                         " correcting: recorded %s, actual %s",
585
                         idx, instance.name, disk.spindles, spindles)
586
            disk.spindles = spindles
587
            self.cfg.Update(instance, feedback_fn)
588
            changed.append((instance.name, idx, "spindles", disk.spindles))
589
        if self._EnsureChildSizes(disk):
590
          self.cfg.Update(instance, feedback_fn)
591
          changed.append((instance.name, idx, "size", disk.size))
592
    return changed
593

    
594

    
595
def _ValidateNetmask(cfg, netmask):
596
  """Checks if a netmask is valid.
597

598
  @type cfg: L{config.ConfigWriter}
599
  @param cfg: The cluster configuration
600
  @type netmask: int
601
  @param netmask: the netmask to be verified
602
  @raise errors.OpPrereqError: if the validation fails
603

604
  """
605
  ip_family = cfg.GetPrimaryIPFamily()
606
  try:
607
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
608
  except errors.ProgrammerError:
609
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
610
                               ip_family, errors.ECODE_INVAL)
611
  if not ipcls.ValidateNetmask(netmask):
612
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
613
                               (netmask), errors.ECODE_INVAL)
614

    
615

    
616
def CheckFileBasedStoragePathVsEnabledDiskTemplates(
617
    logging_warn_fn, file_storage_dir, enabled_disk_templates,
618
    file_disk_template):
619
  """Checks whether the given file-based storage directory is acceptable.
620

621
  Note: This function is public, because it is also used in bootstrap.py.
622

623
  @type logging_warn_fn: function
624
  @param logging_warn_fn: function which accepts a string and logs it
625
  @type file_storage_dir: string
626
  @param file_storage_dir: the directory to be used for file-based instances
627
  @type enabled_disk_templates: list of string
628
  @param enabled_disk_templates: the list of enabled disk templates
629
  @type file_disk_template: string
630
  @param file_disk_template: the file-based disk template for which the
631
      path should be checked
632

633
  """
634
  assert (file_disk_template in
635
          utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
636
  file_storage_enabled = file_disk_template in enabled_disk_templates
637
  if file_storage_dir is not None:
638
    if file_storage_dir == "":
639
      if file_storage_enabled:
640
        raise errors.OpPrereqError(
641
            "Unsetting the '%s' storage directory while having '%s' storage"
642
            " enabled is not permitted." %
643
            (file_disk_template, file_disk_template))
644
    else:
645
      if not file_storage_enabled:
646
        logging_warn_fn(
647
            "Specified a %s storage directory, although %s storage is not"
648
            " enabled." % (file_disk_template, file_disk_template))
649
  else:
650
    raise errors.ProgrammerError("Received %s storage dir with value"
651
                                 " 'None'." % file_disk_template)
652

    
653

    
654
def CheckFileStoragePathVsEnabledDiskTemplates(
655
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
656
  """Checks whether the given file storage directory is acceptable.
657

658
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
659

660
  """
661
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
662
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
663
      constants.DT_FILE)
664

    
665

    
666
def CheckSharedFileStoragePathVsEnabledDiskTemplates(
667
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
668
  """Checks whether the given shared file storage directory is acceptable.
669

670
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
671

672
  """
673
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
674
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
675
      constants.DT_SHARED_FILE)
676

    
677

    
678
class LUClusterSetParams(LogicalUnit):
679
  """Change the parameters of the cluster.
680

681
  """
682
  HPATH = "cluster-modify"
683
  HTYPE = constants.HTYPE_CLUSTER
684
  REQ_BGL = False
685

    
686
  def CheckArguments(self):
687
    """Check parameters
688

689
    """
690
    if self.op.uid_pool:
691
      uidpool.CheckUidPool(self.op.uid_pool)
692

    
693
    if self.op.add_uids:
694
      uidpool.CheckUidPool(self.op.add_uids)
695

    
696
    if self.op.remove_uids:
697
      uidpool.CheckUidPool(self.op.remove_uids)
698

    
699
    if self.op.master_netmask is not None:
700
      _ValidateNetmask(self.cfg, self.op.master_netmask)
701

    
702
    if self.op.diskparams:
703
      for dt_params in self.op.diskparams.values():
704
        utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
705
      try:
706
        utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
707
      except errors.OpPrereqError, err:
708
        raise errors.OpPrereqError("While verify diskparams options: %s" % err,
709
                                   errors.ECODE_INVAL)
710

    
711
  def ExpandNames(self):
712
    # FIXME: in the future maybe other cluster params won't require checking on
713
    # all nodes to be modified.
714
    # FIXME: This opcode changes cluster-wide settings. Is acquiring all
715
    # resource locks the right thing, shouldn't it be the BGL instead?
716
    self.needed_locks = {
717
      locking.LEVEL_NODE: locking.ALL_SET,
718
      locking.LEVEL_INSTANCE: locking.ALL_SET,
719
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
720
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
721
    }
722
    self.share_locks = ShareAll()
723

    
724
  def BuildHooksEnv(self):
725
    """Build hooks env.
726

727
    """
728
    return {
729
      "OP_TARGET": self.cfg.GetClusterName(),
730
      "NEW_VG_NAME": self.op.vg_name,
731
      }
732

    
733
  def BuildHooksNodes(self):
734
    """Build hooks nodes.
735

736
    """
737
    mn = self.cfg.GetMasterNode()
738
    return ([mn], [mn])
739

    
740
  def _CheckVgName(self, node_uuids, enabled_disk_templates,
741
                   new_enabled_disk_templates):
742
    """Check the consistency of the vg name on all nodes and in case it gets
743
       unset whether there are instances still using it.
744

745
    """
746
    lvm_is_enabled = utils.IsLvmEnabled(enabled_disk_templates)
747
    lvm_gets_enabled = utils.LvmGetsEnabled(enabled_disk_templates,
748
                                            new_enabled_disk_templates)
749
    current_vg_name = self.cfg.GetVGName()
750

    
751
    if self.op.vg_name == '':
752
      if lvm_is_enabled:
753
        raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
754
                                   " disk templates are or get enabled.")
755

    
756
    if self.op.vg_name is None:
757
      if current_vg_name is None and lvm_is_enabled:
758
        raise errors.OpPrereqError("Please specify a volume group when"
759
                                   " enabling lvm-based disk-templates.")
760

    
761
    if self.op.vg_name is not None and not self.op.vg_name:
762
      if self.cfg.HasAnyDiskOfType(constants.DT_PLAIN):
763
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
764
                                   " instances exist", errors.ECODE_INVAL)
765

    
766
    if (self.op.vg_name is not None and lvm_is_enabled) or \
767
        (self.cfg.GetVGName() is not None and lvm_gets_enabled):
768
      self._CheckVgNameOnNodes(node_uuids)
769

    
770
  def _CheckVgNameOnNodes(self, node_uuids):
771
    """Check the status of the volume group on each node.
772

773
    """
774
    vglist = self.rpc.call_vg_list(node_uuids)
775
    for node_uuid in node_uuids:
776
      msg = vglist[node_uuid].fail_msg
777
      if msg:
778
        # ignoring down node
779
        self.LogWarning("Error while gathering data on node %s"
780
                        " (ignoring node): %s",
781
                        self.cfg.GetNodeName(node_uuid), msg)
782
        continue
783
      vgstatus = utils.CheckVolumeGroupSize(vglist[node_uuid].payload,
784
                                            self.op.vg_name,
785
                                            constants.MIN_VG_SIZE)
786
      if vgstatus:
787
        raise errors.OpPrereqError("Error on node '%s': %s" %
788
                                   (self.cfg.GetNodeName(node_uuid), vgstatus),
789
                                   errors.ECODE_ENVIRON)
790

    
791
  @staticmethod
792
  def _GetEnabledDiskTemplatesInner(op_enabled_disk_templates,
793
                                    old_enabled_disk_templates):
794
    """Determines the enabled disk templates and the subset of disk templates
795
       that are newly enabled by this operation.
796

797
    """
798
    enabled_disk_templates = None
799
    new_enabled_disk_templates = []
800
    if op_enabled_disk_templates:
801
      enabled_disk_templates = op_enabled_disk_templates
802
      new_enabled_disk_templates = \
803
        list(set(enabled_disk_templates)
804
             - set(old_enabled_disk_templates))
805
    else:
806
      enabled_disk_templates = old_enabled_disk_templates
807
    return (enabled_disk_templates, new_enabled_disk_templates)
808

    
809
  def _GetEnabledDiskTemplates(self, cluster):
810
    """Determines the enabled disk templates and the subset of disk templates
811
       that are newly enabled by this operation.
812

813
    """
814
    return self._GetEnabledDiskTemplatesInner(self.op.enabled_disk_templates,
815
                                              cluster.enabled_disk_templates)
816

    
817
  def _CheckIpolicy(self, cluster, enabled_disk_templates):
818
    """Checks the ipolicy.
819

820
    @type cluster: C{objects.Cluster}
821
    @param cluster: the cluster's configuration
822
    @type enabled_disk_templates: list of string
823
    @param enabled_disk_templates: list of (possibly newly) enabled disk
824
      templates
825

826
    """
827
    # FIXME: write unit tests for this
828
    if self.op.ipolicy:
829
      self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
830
                                           group_policy=False)
831

    
832
      CheckIpolicyVsDiskTemplates(self.new_ipolicy,
833
                                  enabled_disk_templates)
834

    
835
      all_instances = self.cfg.GetAllInstancesInfo().values()
836
      violations = set()
837
      for group in self.cfg.GetAllNodeGroupsInfo().values():
838
        instances = frozenset([inst for inst in all_instances
839
                               if compat.any(nuuid in group.members
840
                                             for nuuid in inst.all_nodes)])
841
        new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
842
        ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
843
        new = ComputeNewInstanceViolations(ipol, new_ipolicy, instances,
844
                                           self.cfg)
845
        if new:
846
          violations.update(new)
847

    
848
      if violations:
849
        self.LogWarning("After the ipolicy change the following instances"
850
                        " violate them: %s",
851
                        utils.CommaJoin(utils.NiceSort(violations)))
852
    else:
853
      CheckIpolicyVsDiskTemplates(cluster.ipolicy,
854
                                  enabled_disk_templates)
855

    
856
  def _CheckDrbdHelperOnNodes(self, drbd_helper, node_uuids):
857
    """Checks whether the set DRBD helper actually exists on the nodes.
858

859
    @type drbd_helper: string
860
    @param drbd_helper: path of the drbd usermode helper binary
861
    @type node_uuids: list of strings
862
    @param node_uuids: list of node UUIDs to check for the helper
863

864
    """
865
    # checks given drbd helper on all nodes
866
    helpers = self.rpc.call_drbd_helper(node_uuids)
867
    for (_, ninfo) in self.cfg.GetMultiNodeInfo(node_uuids):
868
      if ninfo.offline:
869
        self.LogInfo("Not checking drbd helper on offline node %s",
870
                     ninfo.name)
871
        continue
872
      msg = helpers[ninfo.uuid].fail_msg
873
      if msg:
874
        raise errors.OpPrereqError("Error checking drbd helper on node"
875
                                   " '%s': %s" % (ninfo.name, msg),
876
                                   errors.ECODE_ENVIRON)
877
      node_helper = helpers[ninfo.uuid].payload
878
      if node_helper != drbd_helper:
879
        raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
880
                                   (ninfo.name, node_helper),
881
                                   errors.ECODE_ENVIRON)
882

    
883
  def _CheckDrbdHelper(self, node_uuids, drbd_enabled, drbd_gets_enabled):
884
    """Check the DRBD usermode helper.
885

886
    @type node_uuids: list of strings
887
    @param node_uuids: a list of nodes' UUIDs
888
    @type drbd_enabled: boolean
889
    @param drbd_enabled: whether DRBD will be enabled after this operation
890
      (no matter if it was disabled before or not)
891
    @type drbd_gets_enabled: boolen
892
    @param drbd_gets_enabled: true if DRBD was disabled before this
893
      operation, but will be enabled afterwards
894

895
    """
896
    if self.op.drbd_helper == '':
897
      if drbd_enabled:
898
        raise errors.OpPrereqError("Cannot disable drbd helper while"
899
                                   " DRBD is enabled.")
900
      if self.cfg.HasAnyDiskOfType(constants.DT_DRBD8):
901
        raise errors.OpPrereqError("Cannot disable drbd helper while"
902
                                   " drbd-based instances exist",
903
                                   errors.ECODE_INVAL)
904

    
905
    else:
906
      if self.op.drbd_helper is not None and drbd_enabled:
907
        self._CheckDrbdHelperOnNodes(self.op.drbd_helper, node_uuids)
908
      else:
909
        if drbd_gets_enabled:
910
          current_drbd_helper = self.cfg.GetClusterInfo().drbd_usermode_helper
911
          if current_drbd_helper is not None:
912
            self._CheckDrbdHelperOnNodes(current_drbd_helper, node_uuids)
913
          else:
914
            raise errors.OpPrereqError("Cannot enable DRBD without a"
915
                                       " DRBD usermode helper set.")
916

    
917
  def CheckPrereq(self):
918
    """Check prerequisites.
919

920
    This checks whether the given params don't conflict and
921
    if the given volume group is valid.
922

923
    """
924
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
925
    self.cluster = cluster = self.cfg.GetClusterInfo()
926

    
927
    vm_capable_node_uuids = [node.uuid
928
                             for node in self.cfg.GetAllNodesInfo().values()
929
                             if node.uuid in node_uuids and node.vm_capable]
930

    
931
    (enabled_disk_templates, new_enabled_disk_templates) = \
932
      self._GetEnabledDiskTemplates(cluster)
933

    
934
    self._CheckVgName(vm_capable_node_uuids, enabled_disk_templates,
935
                      new_enabled_disk_templates)
936

    
937
    if self.op.file_storage_dir is not None:
938
      CheckFileStoragePathVsEnabledDiskTemplates(
939
          self.LogWarning, self.op.file_storage_dir, enabled_disk_templates)
940

    
941
    if self.op.shared_file_storage_dir is not None:
942
      CheckSharedFileStoragePathVsEnabledDiskTemplates(
943
          self.LogWarning, self.op.shared_file_storage_dir,
944
          enabled_disk_templates)
945

    
946
    drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
947
    drbd_gets_enabled = constants.DT_DRBD8 in new_enabled_disk_templates
948
    self._CheckDrbdHelper(node_uuids, drbd_enabled, drbd_gets_enabled)
949

    
950
    # validate params changes
951
    if self.op.beparams:
952
      objects.UpgradeBeParams(self.op.beparams)
953
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
954
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
955

    
956
    if self.op.ndparams:
957
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
958
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
959

    
960
      # TODO: we need a more general way to handle resetting
961
      # cluster-level parameters to default values
962
      if self.new_ndparams["oob_program"] == "":
963
        self.new_ndparams["oob_program"] = \
964
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
965

    
966
    if self.op.hv_state:
967
      new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
968
                                           self.cluster.hv_state_static)
969
      self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
970
                               for hv, values in new_hv_state.items())
971

    
972
    if self.op.disk_state:
973
      new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
974
                                               self.cluster.disk_state_static)
975
      self.new_disk_state = \
976
        dict((storage, dict((name, cluster.SimpleFillDiskState(values))
977
                            for name, values in svalues.items()))
978
             for storage, svalues in new_disk_state.items())
979

    
980
    self._CheckIpolicy(cluster, enabled_disk_templates)
981

    
982
    if self.op.nicparams:
983
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
984
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
985
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
986
      nic_errors = []
987

    
988
      # check all instances for consistency
989
      for instance in self.cfg.GetAllInstancesInfo().values():
990
        for nic_idx, nic in enumerate(instance.nics):
991
          params_copy = copy.deepcopy(nic.nicparams)
992
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
993

    
994
          # check parameter syntax
995
          try:
996
            objects.NIC.CheckParameterSyntax(params_filled)
997
          except errors.ConfigurationError, err:
998
            nic_errors.append("Instance %s, nic/%d: %s" %
999
                              (instance.name, nic_idx, err))
1000

    
1001
          # if we're moving instances to routed, check that they have an ip
1002
          target_mode = params_filled[constants.NIC_MODE]
1003
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
1004
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
1005
                              " address" % (instance.name, nic_idx))
1006
      if nic_errors:
1007
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
1008
                                   "\n".join(nic_errors), errors.ECODE_INVAL)
1009

    
1010
    # hypervisor list/parameters
1011
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
1012
    if self.op.hvparams:
1013
      for hv_name, hv_dict in self.op.hvparams.items():
1014
        if hv_name not in self.new_hvparams:
1015
          self.new_hvparams[hv_name] = hv_dict
1016
        else:
1017
          self.new_hvparams[hv_name].update(hv_dict)
1018

    
1019
    # disk template parameters
1020
    self.new_diskparams = objects.FillDict(cluster.diskparams, {})
1021
    if self.op.diskparams:
1022
      for dt_name, dt_params in self.op.diskparams.items():
1023
        if dt_name not in self.new_diskparams:
1024
          self.new_diskparams[dt_name] = dt_params
1025
        else:
1026
          self.new_diskparams[dt_name].update(dt_params)
1027

    
1028
    # os hypervisor parameters
1029
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
1030
    if self.op.os_hvp:
1031
      for os_name, hvs in self.op.os_hvp.items():
1032
        if os_name not in self.new_os_hvp:
1033
          self.new_os_hvp[os_name] = hvs
1034
        else:
1035
          for hv_name, hv_dict in hvs.items():
1036
            if hv_dict is None:
1037
              # Delete if it exists
1038
              self.new_os_hvp[os_name].pop(hv_name, None)
1039
            elif hv_name not in self.new_os_hvp[os_name]:
1040
              self.new_os_hvp[os_name][hv_name] = hv_dict
1041
            else:
1042
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
1043

    
1044
    # os parameters
1045
    self.new_osp = objects.FillDict(cluster.osparams, {})
1046
    if self.op.osparams:
1047
      for os_name, osp in self.op.osparams.items():
1048
        if os_name not in self.new_osp:
1049
          self.new_osp[os_name] = {}
1050

    
1051
        self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
1052
                                                 use_none=True)
1053

    
1054
        if not self.new_osp[os_name]:
1055
          # we removed all parameters
1056
          del self.new_osp[os_name]
1057
        else:
1058
          # check the parameter validity (remote check)
1059
          CheckOSParams(self, False, [self.cfg.GetMasterNode()],
1060
                        os_name, self.new_osp[os_name])
1061

    
1062
    # changes to the hypervisor list
1063
    if self.op.enabled_hypervisors is not None:
1064
      self.hv_list = self.op.enabled_hypervisors
1065
      for hv in self.hv_list:
1066
        # if the hypervisor doesn't already exist in the cluster
1067
        # hvparams, we initialize it to empty, and then (in both
1068
        # cases) we make sure to fill the defaults, as we might not
1069
        # have a complete defaults list if the hypervisor wasn't
1070
        # enabled before
1071
        if hv not in new_hvp:
1072
          new_hvp[hv] = {}
1073
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
1074
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
1075
    else:
1076
      self.hv_list = cluster.enabled_hypervisors
1077

    
1078
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1079
      # either the enabled list has changed, or the parameters have, validate
1080
      for hv_name, hv_params in self.new_hvparams.items():
1081
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1082
            (self.op.enabled_hypervisors and
1083
             hv_name in self.op.enabled_hypervisors)):
1084
          # either this is a new hypervisor, or its parameters have changed
1085
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1086
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1087
          hv_class.CheckParameterSyntax(hv_params)
1088
          CheckHVParams(self, node_uuids, hv_name, hv_params)
1089

    
1090
    self._CheckDiskTemplateConsistency()
1091

    
1092
    if self.op.os_hvp:
1093
      # no need to check any newly-enabled hypervisors, since the
1094
      # defaults have already been checked in the above code-block
1095
      for os_name, os_hvp in self.new_os_hvp.items():
1096
        for hv_name, hv_params in os_hvp.items():
1097
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1098
          # we need to fill in the new os_hvp on top of the actual hv_p
1099
          cluster_defaults = self.new_hvparams.get(hv_name, {})
1100
          new_osp = objects.FillDict(cluster_defaults, hv_params)
1101
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1102
          hv_class.CheckParameterSyntax(new_osp)
1103
          CheckHVParams(self, node_uuids, hv_name, new_osp)
1104

    
1105
    if self.op.default_iallocator:
1106
      alloc_script = utils.FindFile(self.op.default_iallocator,
1107
                                    constants.IALLOCATOR_SEARCH_PATH,
1108
                                    os.path.isfile)
1109
      if alloc_script is None:
1110
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
1111
                                   " specified" % self.op.default_iallocator,
1112
                                   errors.ECODE_INVAL)
1113

    
1114
  def _CheckDiskTemplateConsistency(self):
1115
    """Check whether the disk templates that are going to be disabled
1116
       are still in use by some instances.
1117

1118
    """
1119
    if self.op.enabled_disk_templates:
1120
      cluster = self.cfg.GetClusterInfo()
1121
      instances = self.cfg.GetAllInstancesInfo()
1122

    
1123
      disk_templates_to_remove = set(cluster.enabled_disk_templates) \
1124
        - set(self.op.enabled_disk_templates)
1125
      for instance in instances.itervalues():
1126
        if instance.disk_template in disk_templates_to_remove:
1127
          raise errors.OpPrereqError("Cannot disable disk template '%s',"
1128
                                     " because instance '%s' is using it." %
1129
                                     (instance.disk_template, instance.name))
1130

    
1131
  def _SetVgName(self, feedback_fn):
1132
    """Determines and sets the new volume group name.
1133

1134
    """
1135
    if self.op.vg_name is not None:
1136
      new_volume = self.op.vg_name
1137
      if not new_volume:
1138
        new_volume = None
1139
      if new_volume != self.cfg.GetVGName():
1140
        self.cfg.SetVGName(new_volume)
1141
      else:
1142
        feedback_fn("Cluster LVM configuration already in desired"
1143
                    " state, not changing")
1144

    
1145
  def _SetFileStorageDir(self, feedback_fn):
1146
    """Set the file storage directory.
1147

1148
    """
1149
    if self.op.file_storage_dir is not None:
1150
      if self.cluster.file_storage_dir == self.op.file_storage_dir:
1151
        feedback_fn("Global file storage dir already set to value '%s'"
1152
                    % self.cluster.file_storage_dir)
1153
      else:
1154
        self.cluster.file_storage_dir = self.op.file_storage_dir
1155

    
1156
  def _SetDrbdHelper(self, feedback_fn):
1157
    """Set the DRBD usermode helper.
1158

1159
    """
1160
    if self.op.drbd_helper is not None:
1161
      if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
1162
        feedback_fn("Note that you specified a drbd user helper, but did not"
1163
                    " enable the drbd disk template.")
1164
      new_helper = self.op.drbd_helper
1165
      if not new_helper:
1166
        new_helper = None
1167
      if new_helper != self.cfg.GetDRBDHelper():
1168
        self.cfg.SetDRBDHelper(new_helper)
1169
      else:
1170
        feedback_fn("Cluster DRBD helper already in desired state,"
1171
                    " not changing")
1172

    
1173
  def Exec(self, feedback_fn):
1174
    """Change the parameters of the cluster.
1175

1176
    """
1177
    if self.op.enabled_disk_templates:
1178
      self.cluster.enabled_disk_templates = \
1179
        list(set(self.op.enabled_disk_templates))
1180

    
1181
    self._SetVgName(feedback_fn)
1182
    self._SetFileStorageDir(feedback_fn)
1183
    self._SetDrbdHelper(feedback_fn)
1184

    
1185
    if self.op.hvparams:
1186
      self.cluster.hvparams = self.new_hvparams
1187
    if self.op.os_hvp:
1188
      self.cluster.os_hvp = self.new_os_hvp
1189
    if self.op.enabled_hypervisors is not None:
1190
      self.cluster.hvparams = self.new_hvparams
1191
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1192
    if self.op.beparams:
1193
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1194
    if self.op.nicparams:
1195
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1196
    if self.op.ipolicy:
1197
      self.cluster.ipolicy = self.new_ipolicy
1198
    if self.op.osparams:
1199
      self.cluster.osparams = self.new_osp
1200
    if self.op.ndparams:
1201
      self.cluster.ndparams = self.new_ndparams
1202
    if self.op.diskparams:
1203
      self.cluster.diskparams = self.new_diskparams
1204
    if self.op.hv_state:
1205
      self.cluster.hv_state_static = self.new_hv_state
1206
    if self.op.disk_state:
1207
      self.cluster.disk_state_static = self.new_disk_state
1208

    
1209
    if self.op.candidate_pool_size is not None:
1210
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1211
      # we need to update the pool size here, otherwise the save will fail
1212
      AdjustCandidatePool(self, [])
1213

    
1214
    if self.op.maintain_node_health is not None:
1215
      if self.op.maintain_node_health and not constants.ENABLE_CONFD:
1216
        feedback_fn("Note: CONFD was disabled at build time, node health"
1217
                    " maintenance is not useful (still enabling it)")
1218
      self.cluster.maintain_node_health = self.op.maintain_node_health
1219

    
1220
    if self.op.modify_etc_hosts is not None:
1221
      self.cluster.modify_etc_hosts = self.op.modify_etc_hosts
1222

    
1223
    if self.op.prealloc_wipe_disks is not None:
1224
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
1225

    
1226
    if self.op.add_uids is not None:
1227
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
1228

    
1229
    if self.op.remove_uids is not None:
1230
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
1231

    
1232
    if self.op.uid_pool is not None:
1233
      self.cluster.uid_pool = self.op.uid_pool
1234

    
1235
    if self.op.default_iallocator is not None:
1236
      self.cluster.default_iallocator = self.op.default_iallocator
1237

    
1238
    if self.op.reserved_lvs is not None:
1239
      self.cluster.reserved_lvs = self.op.reserved_lvs
1240

    
1241
    if self.op.use_external_mip_script is not None:
1242
      self.cluster.use_external_mip_script = self.op.use_external_mip_script
1243

    
1244
    def helper_os(aname, mods, desc):
1245
      desc += " OS list"
1246
      lst = getattr(self.cluster, aname)
1247
      for key, val in mods:
1248
        if key == constants.DDM_ADD:
1249
          if val in lst:
1250
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1251
          else:
1252
            lst.append(val)
1253
        elif key == constants.DDM_REMOVE:
1254
          if val in lst:
1255
            lst.remove(val)
1256
          else:
1257
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1258
        else:
1259
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
1260

    
1261
    if self.op.hidden_os:
1262
      helper_os("hidden_os", self.op.hidden_os, "hidden")
1263

    
1264
    if self.op.blacklisted_os:
1265
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1266

    
1267
    if self.op.master_netdev:
1268
      master_params = self.cfg.GetMasterNetworkParameters()
1269
      ems = self.cfg.GetUseExternalMipScript()
1270
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
1271
                  self.cluster.master_netdev)
1272
      result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
1273
                                                       master_params, ems)
1274
      if not self.op.force:
1275
        result.Raise("Could not disable the master ip")
1276
      else:
1277
        if result.fail_msg:
1278
          msg = ("Could not disable the master ip (continuing anyway): %s" %
1279
                 result.fail_msg)
1280
          feedback_fn(msg)
1281
      feedback_fn("Changing master_netdev from %s to %s" %
1282
                  (master_params.netdev, self.op.master_netdev))
1283
      self.cluster.master_netdev = self.op.master_netdev
1284

    
1285
    if self.op.master_netmask:
1286
      master_params = self.cfg.GetMasterNetworkParameters()
1287
      feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1288
      result = self.rpc.call_node_change_master_netmask(
1289
                 master_params.uuid, master_params.netmask,
1290
                 self.op.master_netmask, master_params.ip,
1291
                 master_params.netdev)
1292
      result.Warn("Could not change the master IP netmask", feedback_fn)
1293
      self.cluster.master_netmask = self.op.master_netmask
1294

    
1295
    self.cfg.Update(self.cluster, feedback_fn)
1296

    
1297
    if self.op.master_netdev:
1298
      master_params = self.cfg.GetMasterNetworkParameters()
1299
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
1300
                  self.op.master_netdev)
1301
      ems = self.cfg.GetUseExternalMipScript()
1302
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
1303
                                                     master_params, ems)
1304
      result.Warn("Could not re-enable the master ip on the master,"
1305
                  " please restart manually", self.LogWarning)
1306

    
1307

    
1308
class LUClusterVerify(NoHooksLU):
1309
  """Submits all jobs necessary to verify the cluster.
1310

1311
  """
1312
  REQ_BGL = False
1313

    
1314
  def ExpandNames(self):
1315
    self.needed_locks = {}
1316

    
1317
  def Exec(self, feedback_fn):
1318
    jobs = []
1319

    
1320
    if self.op.group_name:
1321
      groups = [self.op.group_name]
1322
      depends_fn = lambda: None
1323
    else:
1324
      groups = self.cfg.GetNodeGroupList()
1325

    
1326
      # Verify global configuration
1327
      jobs.append([
1328
        opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1329
        ])
1330

    
1331
      # Always depend on global verification
1332
      depends_fn = lambda: [(-len(jobs), [])]
1333

    
1334
    jobs.extend(
1335
      [opcodes.OpClusterVerifyGroup(group_name=group,
1336
                                    ignore_errors=self.op.ignore_errors,
1337
                                    depends=depends_fn())]
1338
      for group in groups)
1339

    
1340
    # Fix up all parameters
1341
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1342
      op.debug_simulate_errors = self.op.debug_simulate_errors
1343
      op.verbose = self.op.verbose
1344
      op.error_codes = self.op.error_codes
1345
      try:
1346
        op.skip_checks = self.op.skip_checks
1347
      except AttributeError:
1348
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1349

    
1350
    return ResultWithJobs(jobs)
1351

    
1352

    
1353
class _VerifyErrors(object):
1354
  """Mix-in for cluster/group verify LUs.
1355

1356
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1357
  self.op and self._feedback_fn to be available.)
1358

1359
  """
1360

    
1361
  ETYPE_FIELD = "code"
1362
  ETYPE_ERROR = "ERROR"
1363
  ETYPE_WARNING = "WARNING"
1364

    
1365
  def _Error(self, ecode, item, msg, *args, **kwargs):
1366
    """Format an error message.
1367

1368
    Based on the opcode's error_codes parameter, either format a
1369
    parseable error code, or a simpler error string.
1370

1371
    This must be called only from Exec and functions called from Exec.
1372

1373
    """
1374
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1375
    itype, etxt, _ = ecode
1376
    # If the error code is in the list of ignored errors, demote the error to a
1377
    # warning
1378
    if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1379
      ltype = self.ETYPE_WARNING
1380
    # first complete the msg
1381
    if args:
1382
      msg = msg % args
1383
    # then format the whole message
1384
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1385
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1386
    else:
1387
      if item:
1388
        item = " " + item
1389
      else:
1390
        item = ""
1391
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1392
    # and finally report it via the feedback_fn
1393
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1394
    # do not mark the operation as failed for WARN cases only
1395
    if ltype == self.ETYPE_ERROR:
1396
      self.bad = True
1397

    
1398
  def _ErrorIf(self, cond, *args, **kwargs):
1399
    """Log an error message if the passed condition is True.
1400

1401
    """
1402
    if (bool(cond)
1403
        or self.op.debug_simulate_errors): # pylint: disable=E1101
1404
      self._Error(*args, **kwargs)
1405

    
1406

    
1407
def _VerifyCertificate(filename):
1408
  """Verifies a certificate for L{LUClusterVerifyConfig}.
1409

1410
  @type filename: string
1411
  @param filename: Path to PEM file
1412

1413
  """
1414
  try:
1415
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1416
                                           utils.ReadFile(filename))
1417
  except Exception, err: # pylint: disable=W0703
1418
    return (LUClusterVerifyConfig.ETYPE_ERROR,
1419
            "Failed to load X509 certificate %s: %s" % (filename, err))
1420

    
1421
  (errcode, msg) = \
1422
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1423
                                constants.SSL_CERT_EXPIRATION_ERROR)
1424

    
1425
  if msg:
1426
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1427
  else:
1428
    fnamemsg = None
1429

    
1430
  if errcode is None:
1431
    return (None, fnamemsg)
1432
  elif errcode == utils.CERT_WARNING:
1433
    return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1434
  elif errcode == utils.CERT_ERROR:
1435
    return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1436

    
1437
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1438

    
1439

    
1440
def _GetAllHypervisorParameters(cluster, instances):
1441
  """Compute the set of all hypervisor parameters.
1442

1443
  @type cluster: L{objects.Cluster}
1444
  @param cluster: the cluster object
1445
  @param instances: list of L{objects.Instance}
1446
  @param instances: additional instances from which to obtain parameters
1447
  @rtype: list of (origin, hypervisor, parameters)
1448
  @return: a list with all parameters found, indicating the hypervisor they
1449
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1450

1451
  """
1452
  hvp_data = []
1453

    
1454
  for hv_name in cluster.enabled_hypervisors:
1455
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1456

    
1457
  for os_name, os_hvp in cluster.os_hvp.items():
1458
    for hv_name, hv_params in os_hvp.items():
1459
      if hv_params:
1460
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1461
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1462

    
1463
  # TODO: collapse identical parameter values in a single one
1464
  for instance in instances:
1465
    if instance.hvparams:
1466
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1467
                       cluster.FillHV(instance)))
1468

    
1469
  return hvp_data
1470

    
1471

    
1472
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1473
  """Verifies the cluster config.
1474

1475
  """
1476
  REQ_BGL = False
1477

    
1478
  def _VerifyHVP(self, hvp_data):
1479
    """Verifies locally the syntax of the hypervisor parameters.
1480

1481
    """
1482
    for item, hv_name, hv_params in hvp_data:
1483
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1484
             (item, hv_name))
1485
      try:
1486
        hv_class = hypervisor.GetHypervisorClass(hv_name)
1487
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1488
        hv_class.CheckParameterSyntax(hv_params)
1489
      except errors.GenericError, err:
1490
        self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1491

    
1492
  def ExpandNames(self):
1493
    self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1494
    self.share_locks = ShareAll()
1495

    
1496
  def CheckPrereq(self):
1497
    """Check prerequisites.
1498

1499
    """
1500
    # Retrieve all information
1501
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1502
    self.all_node_info = self.cfg.GetAllNodesInfo()
1503
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1504

    
1505
  def Exec(self, feedback_fn):
1506
    """Verify integrity of cluster, performing various test on nodes.
1507

1508
    """
1509
    self.bad = False
1510
    self._feedback_fn = feedback_fn
1511

    
1512
    feedback_fn("* Verifying cluster config")
1513

    
1514
    for msg in self.cfg.VerifyConfig():
1515
      self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1516

    
1517
    feedback_fn("* Verifying cluster certificate files")
1518

    
1519
    for cert_filename in pathutils.ALL_CERT_FILES:
1520
      (errcode, msg) = _VerifyCertificate(cert_filename)
1521
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1522

    
1523
    self._ErrorIf(not utils.CanRead(constants.LUXID_USER,
1524
                                    pathutils.NODED_CERT_FILE),
1525
                  constants.CV_ECLUSTERCERT,
1526
                  None,
1527
                  pathutils.NODED_CERT_FILE + " must be accessible by the " +
1528
                    constants.LUXID_USER + " user")
1529

    
1530
    feedback_fn("* Verifying hypervisor parameters")
1531

    
1532
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1533
                                                self.all_inst_info.values()))
1534

    
1535
    feedback_fn("* Verifying all nodes belong to an existing group")
1536

    
1537
    # We do this verification here because, should this bogus circumstance
1538
    # occur, it would never be caught by VerifyGroup, which only acts on
1539
    # nodes/instances reachable from existing node groups.
1540

    
1541
    dangling_nodes = set(node for node in self.all_node_info.values()
1542
                         if node.group not in self.all_group_info)
1543

    
1544
    dangling_instances = {}
1545
    no_node_instances = []
1546

    
1547
    for inst in self.all_inst_info.values():
1548
      if inst.primary_node in [node.uuid for node in dangling_nodes]:
1549
        dangling_instances.setdefault(inst.primary_node, []).append(inst)
1550
      elif inst.primary_node not in self.all_node_info:
1551
        no_node_instances.append(inst)
1552

    
1553
    pretty_dangling = [
1554
        "%s (%s)" %
1555
        (node.name,
1556
         utils.CommaJoin(inst.name for
1557
                         inst in dangling_instances.get(node.uuid, [])))
1558
        for node in dangling_nodes]
1559

    
1560
    self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1561
                  None,
1562
                  "the following nodes (and their instances) belong to a non"
1563
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1564

    
1565
    self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1566
                  None,
1567
                  "the following instances have a non-existing primary-node:"
1568
                  " %s", utils.CommaJoin(inst.name for
1569
                                         inst in no_node_instances))
1570

    
1571
    return not self.bad
1572

    
1573

    
1574
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1575
  """Verifies the status of a node group.
1576

1577
  """
1578
  HPATH = "cluster-verify"
1579
  HTYPE = constants.HTYPE_CLUSTER
1580
  REQ_BGL = False
1581

    
1582
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1583

    
1584
  class NodeImage(object):
1585
    """A class representing the logical and physical status of a node.
1586

1587
    @type uuid: string
1588
    @ivar uuid: the node UUID to which this object refers
1589
    @ivar volumes: a structure as returned from
1590
        L{ganeti.backend.GetVolumeList} (runtime)
1591
    @ivar instances: a list of running instances (runtime)
1592
    @ivar pinst: list of configured primary instances (config)
1593
    @ivar sinst: list of configured secondary instances (config)
1594
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1595
        instances for which this node is secondary (config)
1596
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1597
    @ivar dfree: free disk, as reported by the node (runtime)
1598
    @ivar offline: the offline status (config)
1599
    @type rpc_fail: boolean
1600
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1601
        not whether the individual keys were correct) (runtime)
1602
    @type lvm_fail: boolean
1603
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1604
    @type hyp_fail: boolean
1605
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1606
    @type ghost: boolean
1607
    @ivar ghost: whether this is a known node or not (config)
1608
    @type os_fail: boolean
1609
    @ivar os_fail: whether the RPC call didn't return valid OS data
1610
    @type oslist: list
1611
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1612
    @type vm_capable: boolean
1613
    @ivar vm_capable: whether the node can host instances
1614
    @type pv_min: float
1615
    @ivar pv_min: size in MiB of the smallest PVs
1616
    @type pv_max: float
1617
    @ivar pv_max: size in MiB of the biggest PVs
1618

1619
    """
1620
    def __init__(self, offline=False, uuid=None, vm_capable=True):
1621
      self.uuid = uuid
1622
      self.volumes = {}
1623
      self.instances = []
1624
      self.pinst = []
1625
      self.sinst = []
1626
      self.sbp = {}
1627
      self.mfree = 0
1628
      self.dfree = 0
1629
      self.offline = offline
1630
      self.vm_capable = vm_capable
1631
      self.rpc_fail = False
1632
      self.lvm_fail = False
1633
      self.hyp_fail = False
1634
      self.ghost = False
1635
      self.os_fail = False
1636
      self.oslist = {}
1637
      self.pv_min = None
1638
      self.pv_max = None
1639

    
1640
  def ExpandNames(self):
1641
    # This raises errors.OpPrereqError on its own:
1642
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1643

    
1644
    # Get instances in node group; this is unsafe and needs verification later
1645
    inst_uuids = \
1646
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1647

    
1648
    self.needed_locks = {
1649
      locking.LEVEL_INSTANCE: self.cfg.GetInstanceNames(inst_uuids),
1650
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1651
      locking.LEVEL_NODE: [],
1652

    
1653
      # This opcode is run by watcher every five minutes and acquires all nodes
1654
      # for a group. It doesn't run for a long time, so it's better to acquire
1655
      # the node allocation lock as well.
1656
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1657
      }
1658

    
1659
    self.share_locks = ShareAll()
1660

    
1661
  def DeclareLocks(self, level):
1662
    if level == locking.LEVEL_NODE:
1663
      # Get members of node group; this is unsafe and needs verification later
1664
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1665

    
1666
      # In Exec(), we warn about mirrored instances that have primary and
1667
      # secondary living in separate node groups. To fully verify that
1668
      # volumes for these instances are healthy, we will need to do an
1669
      # extra call to their secondaries. We ensure here those nodes will
1670
      # be locked.
1671
      for inst_name in self.owned_locks(locking.LEVEL_INSTANCE):
1672
        # Important: access only the instances whose lock is owned
1673
        instance = self.cfg.GetInstanceInfoByName(inst_name)
1674
        if instance.disk_template in constants.DTS_INT_MIRROR:
1675
          nodes.update(instance.secondary_nodes)
1676

    
1677
      self.needed_locks[locking.LEVEL_NODE] = nodes
1678

    
1679
  def CheckPrereq(self):
1680
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1681
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1682

    
1683
    group_node_uuids = set(self.group_info.members)
1684
    group_inst_uuids = \
1685
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1686

    
1687
    unlocked_node_uuids = \
1688
        group_node_uuids.difference(self.owned_locks(locking.LEVEL_NODE))
1689

    
1690
    unlocked_inst_uuids = \
1691
        group_inst_uuids.difference(
1692
          [self.cfg.GetInstanceInfoByName(name).uuid
1693
           for name in self.owned_locks(locking.LEVEL_INSTANCE)])
1694

    
1695
    if unlocked_node_uuids:
1696
      raise errors.OpPrereqError(
1697
        "Missing lock for nodes: %s" %
1698
        utils.CommaJoin(self.cfg.GetNodeNames(unlocked_node_uuids)),
1699
        errors.ECODE_STATE)
1700

    
1701
    if unlocked_inst_uuids:
1702
      raise errors.OpPrereqError(
1703
        "Missing lock for instances: %s" %
1704
        utils.CommaJoin(self.cfg.GetInstanceNames(unlocked_inst_uuids)),
1705
        errors.ECODE_STATE)
1706

    
1707
    self.all_node_info = self.cfg.GetAllNodesInfo()
1708
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1709

    
1710
    self.my_node_uuids = group_node_uuids
1711
    self.my_node_info = dict((node_uuid, self.all_node_info[node_uuid])
1712
                             for node_uuid in group_node_uuids)
1713

    
1714
    self.my_inst_uuids = group_inst_uuids
1715
    self.my_inst_info = dict((inst_uuid, self.all_inst_info[inst_uuid])
1716
                             for inst_uuid in group_inst_uuids)
1717

    
1718
    # We detect here the nodes that will need the extra RPC calls for verifying
1719
    # split LV volumes; they should be locked.
1720
    extra_lv_nodes = set()
1721

    
1722
    for inst in self.my_inst_info.values():
1723
      if inst.disk_template in constants.DTS_INT_MIRROR:
1724
        for nuuid in inst.all_nodes:
1725
          if self.all_node_info[nuuid].group != self.group_uuid:
1726
            extra_lv_nodes.add(nuuid)
1727

    
1728
    unlocked_lv_nodes = \
1729
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1730

    
1731
    if unlocked_lv_nodes:
1732
      raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1733
                                 utils.CommaJoin(unlocked_lv_nodes),
1734
                                 errors.ECODE_STATE)
1735
    self.extra_lv_nodes = list(extra_lv_nodes)
1736

    
1737
  def _VerifyNode(self, ninfo, nresult):
1738
    """Perform some basic validation on data returned from a node.
1739

1740
      - check the result data structure is well formed and has all the
1741
        mandatory fields
1742
      - check ganeti version
1743

1744
    @type ninfo: L{objects.Node}
1745
    @param ninfo: the node to check
1746
    @param nresult: the results from the node
1747
    @rtype: boolean
1748
    @return: whether overall this call was successful (and we can expect
1749
         reasonable values in the respose)
1750

1751
    """
1752
    # main result, nresult should be a non-empty dict
1753
    test = not nresult or not isinstance(nresult, dict)
1754
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1755
                  "unable to verify node: no data returned")
1756
    if test:
1757
      return False
1758

    
1759
    # compares ganeti version
1760
    local_version = constants.PROTOCOL_VERSION
1761
    remote_version = nresult.get("version", None)
1762
    test = not (remote_version and
1763
                isinstance(remote_version, (list, tuple)) and
1764
                len(remote_version) == 2)
1765
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1766
                  "connection to node returned invalid data")
1767
    if test:
1768
      return False
1769

    
1770
    test = local_version != remote_version[0]
1771
    self._ErrorIf(test, constants.CV_ENODEVERSION, ninfo.name,
1772
                  "incompatible protocol versions: master %s,"
1773
                  " node %s", local_version, remote_version[0])
1774
    if test:
1775
      return False
1776

    
1777
    # node seems compatible, we can actually try to look into its results
1778

    
1779
    # full package version
1780
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1781
                  constants.CV_ENODEVERSION, ninfo.name,
1782
                  "software version mismatch: master %s, node %s",
1783
                  constants.RELEASE_VERSION, remote_version[1],
1784
                  code=self.ETYPE_WARNING)
1785

    
1786
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1787
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1788
      for hv_name, hv_result in hyp_result.iteritems():
1789
        test = hv_result is not None
1790
        self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
1791
                      "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1792

    
1793
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1794
    if ninfo.vm_capable and isinstance(hvp_result, list):
1795
      for item, hv_name, hv_result in hvp_result:
1796
        self._ErrorIf(True, constants.CV_ENODEHV, ninfo.name,
1797
                      "hypervisor %s parameter verify failure (source %s): %s",
1798
                      hv_name, item, hv_result)
1799

    
1800
    test = nresult.get(constants.NV_NODESETUP,
1801
                       ["Missing NODESETUP results"])
1802
    self._ErrorIf(test, constants.CV_ENODESETUP, ninfo.name,
1803
                  "node setup error: %s", "; ".join(test))
1804

    
1805
    return True
1806

    
1807
  def _VerifyNodeTime(self, ninfo, nresult,
1808
                      nvinfo_starttime, nvinfo_endtime):
1809
    """Check the node time.
1810

1811
    @type ninfo: L{objects.Node}
1812
    @param ninfo: the node to check
1813
    @param nresult: the remote results for the node
1814
    @param nvinfo_starttime: the start time of the RPC call
1815
    @param nvinfo_endtime: the end time of the RPC call
1816

1817
    """
1818
    ntime = nresult.get(constants.NV_TIME, None)
1819
    try:
1820
      ntime_merged = utils.MergeTime(ntime)
1821
    except (ValueError, TypeError):
1822
      self._ErrorIf(True, constants.CV_ENODETIME, ninfo.name,
1823
                    "Node returned invalid time")
1824
      return
1825

    
1826
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1827
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1828
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1829
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1830
    else:
1831
      ntime_diff = None
1832

    
1833
    self._ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, ninfo.name,
1834
                  "Node time diverges by at least %s from master node time",
1835
                  ntime_diff)
1836

    
1837
  def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1838
    """Check the node LVM results and update info for cross-node checks.
1839

1840
    @type ninfo: L{objects.Node}
1841
    @param ninfo: the node to check
1842
    @param nresult: the remote results for the node
1843
    @param vg_name: the configured VG name
1844
    @type nimg: L{NodeImage}
1845
    @param nimg: node image
1846

1847
    """
1848
    if vg_name is None:
1849
      return
1850

    
1851
    # checks vg existence and size > 20G
1852
    vglist = nresult.get(constants.NV_VGLIST, None)
1853
    test = not vglist
1854
    self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
1855
                  "unable to check volume groups")
1856
    if not test:
1857
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1858
                                            constants.MIN_VG_SIZE)
1859
      self._ErrorIf(vgstatus, constants.CV_ENODELVM, ninfo.name, vgstatus)
1860

    
1861
    # Check PVs
1862
    (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1863
    for em in errmsgs:
1864
      self._Error(constants.CV_ENODELVM, ninfo.name, em)
1865
    if pvminmax is not None:
1866
      (nimg.pv_min, nimg.pv_max) = pvminmax
1867

    
1868
  def _VerifyGroupDRBDVersion(self, node_verify_infos):
1869
    """Check cross-node DRBD version consistency.
1870

1871
    @type node_verify_infos: dict
1872
    @param node_verify_infos: infos about nodes as returned from the
1873
      node_verify call.
1874

1875
    """
1876
    node_versions = {}
1877
    for node_uuid, ndata in node_verify_infos.items():
1878
      nresult = ndata.payload
1879
      version = nresult.get(constants.NV_DRBDVERSION, "Missing DRBD version")
1880
      node_versions[node_uuid] = version
1881

    
1882
    if len(set(node_versions.values())) > 1:
1883
      for node_uuid, version in sorted(node_versions.items()):
1884
        msg = "DRBD version mismatch: %s" % version
1885
        self._Error(constants.CV_ENODEDRBDHELPER, node_uuid, msg,
1886
                    code=self.ETYPE_WARNING)
1887

    
1888
  def _VerifyGroupLVM(self, node_image, vg_name):
1889
    """Check cross-node consistency in LVM.
1890

1891
    @type node_image: dict
1892
    @param node_image: info about nodes, mapping from node to names to
1893
      L{NodeImage} objects
1894
    @param vg_name: the configured VG name
1895

1896
    """
1897
    if vg_name is None:
1898
      return
1899

    
1900
    # Only exclusive storage needs this kind of checks
1901
    if not self._exclusive_storage:
1902
      return
1903

    
1904
    # exclusive_storage wants all PVs to have the same size (approximately),
1905
    # if the smallest and the biggest ones are okay, everything is fine.
1906
    # pv_min is None iff pv_max is None
1907
    vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1908
    if not vals:
1909
      return
1910
    (pvmin, minnode_uuid) = min((ni.pv_min, ni.uuid) for ni in vals)
1911
    (pvmax, maxnode_uuid) = max((ni.pv_max, ni.uuid) for ni in vals)
1912
    bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1913
    self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1914
                  "PV sizes differ too much in the group; smallest (%s MB) is"
1915
                  " on %s, biggest (%s MB) is on %s",
1916
                  pvmin, self.cfg.GetNodeName(minnode_uuid),
1917
                  pvmax, self.cfg.GetNodeName(maxnode_uuid))
1918

    
1919
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1920
    """Check the node bridges.
1921

1922
    @type ninfo: L{objects.Node}
1923
    @param ninfo: the node to check
1924
    @param nresult: the remote results for the node
1925
    @param bridges: the expected list of bridges
1926

1927
    """
1928
    if not bridges:
1929
      return
1930

    
1931
    missing = nresult.get(constants.NV_BRIDGES, None)
1932
    test = not isinstance(missing, list)
1933
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1934
                  "did not return valid bridge information")
1935
    if not test:
1936
      self._ErrorIf(bool(missing), constants.CV_ENODENET, ninfo.name,
1937
                    "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1938

    
1939
  def _VerifyNodeUserScripts(self, ninfo, nresult):
1940
    """Check the results of user scripts presence and executability on the node
1941

1942
    @type ninfo: L{objects.Node}
1943
    @param ninfo: the node to check
1944
    @param nresult: the remote results for the node
1945

1946
    """
1947
    test = not constants.NV_USERSCRIPTS in nresult
1948
    self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1949
                  "did not return user scripts information")
1950

    
1951
    broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
1952
    if not test:
1953
      self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1954
                    "user scripts not present or not executable: %s" %
1955
                    utils.CommaJoin(sorted(broken_scripts)))
1956

    
1957
  def _VerifyNodeNetwork(self, ninfo, nresult):
1958
    """Check the node network connectivity results.
1959

1960
    @type ninfo: L{objects.Node}
1961
    @param ninfo: the node to check
1962
    @param nresult: the remote results for the node
1963

1964
    """
1965
    test = constants.NV_NODELIST not in nresult
1966
    self._ErrorIf(test, constants.CV_ENODESSH, ninfo.name,
1967
                  "node hasn't returned node ssh connectivity data")
1968
    if not test:
1969
      if nresult[constants.NV_NODELIST]:
1970
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1971
          self._ErrorIf(True, constants.CV_ENODESSH, ninfo.name,
1972
                        "ssh communication with node '%s': %s", a_node, a_msg)
1973

    
1974
    test = constants.NV_NODENETTEST not in nresult
1975
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1976
                  "node hasn't returned node tcp connectivity data")
1977
    if not test:
1978
      if nresult[constants.NV_NODENETTEST]:
1979
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1980
        for anode in nlist:
1981
          self._ErrorIf(True, constants.CV_ENODENET, ninfo.name,
1982
                        "tcp communication with node '%s': %s",
1983
                        anode, nresult[constants.NV_NODENETTEST][anode])
1984

    
1985
    test = constants.NV_MASTERIP not in nresult
1986
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1987
                  "node hasn't returned node master IP reachability data")
1988
    if not test:
1989
      if not nresult[constants.NV_MASTERIP]:
1990
        if ninfo.uuid == self.master_node:
1991
          msg = "the master node cannot reach the master IP (not configured?)"
1992
        else:
1993
          msg = "cannot reach the master IP"
1994
        self._ErrorIf(True, constants.CV_ENODENET, ninfo.name, msg)
1995

    
1996
  def _VerifyInstance(self, instance, node_image, diskstatus):
1997
    """Verify an instance.
1998

1999
    This function checks to see if the required block devices are
2000
    available on the instance's node, and that the nodes are in the correct
2001
    state.
2002

2003
    """
2004
    pnode_uuid = instance.primary_node
2005
    pnode_img = node_image[pnode_uuid]
2006
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
2007

    
2008
    node_vol_should = {}
2009
    instance.MapLVsByNode(node_vol_should)
2010

    
2011
    cluster = self.cfg.GetClusterInfo()
2012
    ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2013
                                                            self.group_info)
2014
    err = ComputeIPolicyInstanceViolation(ipolicy, instance, self.cfg)
2015
    self._ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance.name,
2016
                  utils.CommaJoin(err), code=self.ETYPE_WARNING)
2017

    
2018
    for node_uuid in node_vol_should:
2019
      n_img = node_image[node_uuid]
2020
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
2021
        # ignore missing volumes on offline or broken nodes
2022
        continue
2023
      for volume in node_vol_should[node_uuid]:
2024
        test = volume not in n_img.volumes
2025
        self._ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance.name,
2026
                      "volume %s missing on node %s", volume,
2027
                      self.cfg.GetNodeName(node_uuid))
2028

    
2029
    if instance.admin_state == constants.ADMINST_UP:
2030
      test = instance.uuid not in pnode_img.instances and not pnode_img.offline
2031
      self._ErrorIf(test, constants.CV_EINSTANCEDOWN, instance.name,
2032
                    "instance not running on its primary node %s",
2033
                     self.cfg.GetNodeName(pnode_uuid))
2034
      self._ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE,
2035
                    instance.name, "instance is marked as running and lives on"
2036
                    " offline node %s", self.cfg.GetNodeName(pnode_uuid))
2037

    
2038
    diskdata = [(nname, success, status, idx)
2039
                for (nname, disks) in diskstatus.items()
2040
                for idx, (success, status) in enumerate(disks)]
2041

    
2042
    for nname, success, bdev_status, idx in diskdata:
2043
      # the 'ghost node' construction in Exec() ensures that we have a
2044
      # node here
2045
      snode = node_image[nname]
2046
      bad_snode = snode.ghost or snode.offline
2047
      self._ErrorIf(instance.disks_active and
2048
                    not success and not bad_snode,
2049
                    constants.CV_EINSTANCEFAULTYDISK, instance.name,
2050
                    "couldn't retrieve status for disk/%s on %s: %s",
2051
                    idx, self.cfg.GetNodeName(nname), bdev_status)
2052

    
2053
      if instance.disks_active and success and \
2054
         (bdev_status.is_degraded or
2055
          bdev_status.ldisk_status != constants.LDS_OKAY):
2056
        msg = "disk/%s on %s" % (idx, self.cfg.GetNodeName(nname))
2057
        if bdev_status.is_degraded:
2058
          msg += " is degraded"
2059
        if bdev_status.ldisk_status != constants.LDS_OKAY:
2060
          msg += "; state is '%s'" % \
2061
                 constants.LDS_NAMES[bdev_status.ldisk_status]
2062

    
2063
        self._Error(constants.CV_EINSTANCEFAULTYDISK, instance.name, msg)
2064

    
2065
    self._ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2066
                  constants.CV_ENODERPC, self.cfg.GetNodeName(pnode_uuid),
2067
                  "instance %s, connection to primary node failed",
2068
                  instance.name)
2069

    
2070
    self._ErrorIf(len(instance.secondary_nodes) > 1,
2071
                  constants.CV_EINSTANCELAYOUT, instance.name,
2072
                  "instance has multiple secondary nodes: %s",
2073
                  utils.CommaJoin(instance.secondary_nodes),
2074
                  code=self.ETYPE_WARNING)
2075

    
2076
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, instance.all_nodes)
2077
    if any(es_flags.values()):
2078
      if instance.disk_template not in constants.DTS_EXCL_STORAGE:
2079
        # Disk template not compatible with exclusive_storage: no instance
2080
        # node should have the flag set
2081
        es_nodes = [n
2082
                    for (n, es) in es_flags.items()
2083
                    if es]
2084
        self._Error(constants.CV_EINSTANCEUNSUITABLENODE, instance.name,
2085
                    "instance has template %s, which is not supported on nodes"
2086
                    " that have exclusive storage set: %s",
2087
                    instance.disk_template,
2088
                    utils.CommaJoin(self.cfg.GetNodeNames(es_nodes)))
2089
      for (idx, disk) in enumerate(instance.disks):
2090
        self._ErrorIf(disk.spindles is None,
2091
                      constants.CV_EINSTANCEMISSINGCFGPARAMETER, instance.name,
2092
                      "number of spindles not configured for disk %s while"
2093
                      " exclusive storage is enabled, try running"
2094
                      " gnt-cluster repair-disk-sizes", idx)
2095

    
2096
    if instance.disk_template in constants.DTS_INT_MIRROR:
2097
      instance_nodes = utils.NiceSort(instance.all_nodes)
2098
      instance_groups = {}
2099

    
2100
      for node_uuid in instance_nodes:
2101
        instance_groups.setdefault(self.all_node_info[node_uuid].group,
2102
                                   []).append(node_uuid)
2103

    
2104
      pretty_list = [
2105
        "%s (group %s)" % (utils.CommaJoin(self.cfg.GetNodeNames(nodes)),
2106
                           groupinfo[group].name)
2107
        # Sort so that we always list the primary node first.
2108
        for group, nodes in sorted(instance_groups.items(),
2109
                                   key=lambda (_, nodes): pnode_uuid in nodes,
2110
                                   reverse=True)]
2111

    
2112
      self._ErrorIf(len(instance_groups) > 1,
2113
                    constants.CV_EINSTANCESPLITGROUPS,
2114
                    instance.name, "instance has primary and secondary nodes in"
2115
                    " different groups: %s", utils.CommaJoin(pretty_list),
2116
                    code=self.ETYPE_WARNING)
2117

    
2118
    inst_nodes_offline = []
2119
    for snode in instance.secondary_nodes:
2120
      s_img = node_image[snode]
2121
      self._ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
2122
                    self.cfg.GetNodeName(snode),
2123
                    "instance %s, connection to secondary node failed",
2124
                    instance.name)
2125

    
2126
      if s_img.offline:
2127
        inst_nodes_offline.append(snode)
2128

    
2129
    # warn that the instance lives on offline nodes
2130
    self._ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE,
2131
                  instance.name, "instance has offline secondary node(s) %s",
2132
                  utils.CommaJoin(self.cfg.GetNodeNames(inst_nodes_offline)))
2133
    # ... or ghost/non-vm_capable nodes
2134
    for node_uuid in instance.all_nodes:
2135
      self._ErrorIf(node_image[node_uuid].ghost, constants.CV_EINSTANCEBADNODE,
2136
                    instance.name, "instance lives on ghost node %s",
2137
                    self.cfg.GetNodeName(node_uuid))
2138
      self._ErrorIf(not node_image[node_uuid].vm_capable,
2139
                    constants.CV_EINSTANCEBADNODE, instance.name,
2140
                    "instance lives on non-vm_capable node %s",
2141
                    self.cfg.GetNodeName(node_uuid))
2142

    
2143
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2144
    """Verify if there are any unknown volumes in the cluster.
2145

2146
    The .os, .swap and backup volumes are ignored. All other volumes are
2147
    reported as unknown.
2148

2149
    @type reserved: L{ganeti.utils.FieldSet}
2150
    @param reserved: a FieldSet of reserved volume names
2151

2152
    """
2153
    for node_uuid, n_img in node_image.items():
2154
      if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
2155
          self.all_node_info[node_uuid].group != self.group_uuid):
2156
        # skip non-healthy nodes
2157
        continue
2158
      for volume in n_img.volumes:
2159
        test = ((node_uuid not in node_vol_should or
2160
                volume not in node_vol_should[node_uuid]) and
2161
                not reserved.Matches(volume))
2162
        self._ErrorIf(test, constants.CV_ENODEORPHANLV,
2163
                      self.cfg.GetNodeName(node_uuid),
2164
                      "volume %s is unknown", volume)
2165

    
2166
  def _VerifyNPlusOneMemory(self, node_image, all_insts):
2167
    """Verify N+1 Memory Resilience.
2168

2169
    Check that if one single node dies we can still start all the
2170
    instances it was primary for.
2171

2172
    """
2173
    cluster_info = self.cfg.GetClusterInfo()
2174
    for node_uuid, n_img in node_image.items():
2175
      # This code checks that every node which is now listed as
2176
      # secondary has enough memory to host all instances it is
2177
      # supposed to should a single other node in the cluster fail.
2178
      # FIXME: not ready for failover to an arbitrary node
2179
      # FIXME: does not support file-backed instances
2180
      # WARNING: we currently take into account down instances as well
2181
      # as up ones, considering that even if they're down someone
2182
      # might want to start them even in the event of a node failure.
2183
      if n_img.offline or \
2184
         self.all_node_info[node_uuid].group != self.group_uuid:
2185
        # we're skipping nodes marked offline and nodes in other groups from
2186
        # the N+1 warning, since most likely we don't have good memory
2187
        # information from them; we already list instances living on such
2188
        # nodes, and that's enough warning
2189
        continue
2190
      #TODO(dynmem): also consider ballooning out other instances
2191
      for prinode, inst_uuids in n_img.sbp.items():
2192
        needed_mem = 0
2193
        for inst_uuid in inst_uuids:
2194
          bep = cluster_info.FillBE(all_insts[inst_uuid])
2195
          if bep[constants.BE_AUTO_BALANCE]:
2196
            needed_mem += bep[constants.BE_MINMEM]
2197
        test = n_img.mfree < needed_mem
2198
        self._ErrorIf(test, constants.CV_ENODEN1,
2199
                      self.cfg.GetNodeName(node_uuid),
2200
                      "not enough memory to accomodate instance failovers"
2201
                      " should node %s fail (%dMiB needed, %dMiB available)",
2202
                      self.cfg.GetNodeName(prinode), needed_mem, n_img.mfree)
2203

    
2204
  def _VerifyFiles(self, nodes, master_node_uuid, all_nvinfo,
2205
                   (files_all, files_opt, files_mc, files_vm)):
2206
    """Verifies file checksums collected from all nodes.
2207

2208
    @param nodes: List of L{objects.Node} objects
2209
    @param master_node_uuid: UUID of master node
2210
    @param all_nvinfo: RPC results
2211

2212
    """
2213
    # Define functions determining which nodes to consider for a file
2214
    files2nodefn = [
2215
      (files_all, None),
2216
      (files_mc, lambda node: (node.master_candidate or
2217
                               node.uuid == master_node_uuid)),
2218
      (files_vm, lambda node: node.vm_capable),
2219
      ]
2220

    
2221
    # Build mapping from filename to list of nodes which should have the file
2222
    nodefiles = {}
2223
    for (files, fn) in files2nodefn:
2224
      if fn is None:
2225
        filenodes = nodes
2226
      else:
2227
        filenodes = filter(fn, nodes)
2228
      nodefiles.update((filename,
2229
                        frozenset(map(operator.attrgetter("uuid"), filenodes)))
2230
                       for filename in files)
2231

    
2232
    assert set(nodefiles) == (files_all | files_mc | files_vm)
2233

    
2234
    fileinfo = dict((filename, {}) for filename in nodefiles)
2235
    ignore_nodes = set()
2236

    
2237
    for node in nodes:
2238
      if node.offline:
2239
        ignore_nodes.add(node.uuid)
2240
        continue
2241

    
2242
      nresult = all_nvinfo[node.uuid]
2243

    
2244
      if nresult.fail_msg or not nresult.payload:
2245
        node_files = None
2246
      else:
2247
        fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
2248
        node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2249
                          for (key, value) in fingerprints.items())
2250
        del fingerprints
2251

    
2252
      test = not (node_files and isinstance(node_files, dict))
2253
      self._ErrorIf(test, constants.CV_ENODEFILECHECK, node.name,
2254
                    "Node did not return file checksum data")
2255
      if test:
2256
        ignore_nodes.add(node.uuid)
2257
        continue
2258

    
2259
      # Build per-checksum mapping from filename to nodes having it
2260
      for (filename, checksum) in node_files.items():
2261
        assert filename in nodefiles
2262
        fileinfo[filename].setdefault(checksum, set()).add(node.uuid)
2263

    
2264
    for (filename, checksums) in fileinfo.items():
2265
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2266

    
2267
      # Nodes having the file
2268
      with_file = frozenset(node_uuid
2269
                            for node_uuids in fileinfo[filename].values()
2270
                            for node_uuid in node_uuids) - ignore_nodes
2271

    
2272
      expected_nodes = nodefiles[filename] - ignore_nodes
2273

    
2274
      # Nodes missing file
2275
      missing_file = expected_nodes - with_file
2276

    
2277
      if filename in files_opt:
2278
        # All or no nodes
2279
        self._ErrorIf(missing_file and missing_file != expected_nodes,
2280
                      constants.CV_ECLUSTERFILECHECK, None,
2281
                      "File %s is optional, but it must exist on all or no"
2282
                      " nodes (not found on %s)",
2283
                      filename,
2284
                      utils.CommaJoin(
2285
                        utils.NiceSort(
2286
                          map(self.cfg.GetNodeName, missing_file))))
2287
      else:
2288
        self._ErrorIf(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2289
                      "File %s is missing from node(s) %s", filename,
2290
                      utils.CommaJoin(
2291
                        utils.NiceSort(
2292
                          map(self.cfg.GetNodeName, missing_file))))
2293

    
2294
        # Warn if a node has a file it shouldn't
2295
        unexpected = with_file - expected_nodes
2296
        self._ErrorIf(unexpected,
2297
                      constants.CV_ECLUSTERFILECHECK, None,
2298
                      "File %s should not exist on node(s) %s",
2299
                      filename, utils.CommaJoin(
2300
                        utils.NiceSort(map(self.cfg.GetNodeName, unexpected))))
2301

    
2302
      # See if there are multiple versions of the file
2303
      test = len(checksums) > 1
2304
      if test:
2305
        variants = ["variant %s on %s" %
2306
                    (idx + 1,
2307
                     utils.CommaJoin(utils.NiceSort(
2308
                       map(self.cfg.GetNodeName, node_uuids))))
2309
                    for (idx, (checksum, node_uuids)) in
2310
                      enumerate(sorted(checksums.items()))]
2311
      else:
2312
        variants = []
2313

    
2314
      self._ErrorIf(test, constants.CV_ECLUSTERFILECHECK, None,
2315
                    "File %s found with %s different checksums (%s)",
2316
                    filename, len(checksums), "; ".join(variants))
2317

    
2318
  def _VerifyNodeDrbdHelper(self, ninfo, nresult, drbd_helper):
2319
    """Verify the drbd helper.
2320

2321
    """
2322
    if drbd_helper:
2323
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2324
      test = (helper_result is None)
2325
      self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2326
                    "no drbd usermode helper returned")
2327
      if helper_result:
2328
        status, payload = helper_result
2329
        test = not status
2330
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2331
                      "drbd usermode helper check unsuccessful: %s", payload)
2332
        test = status and (payload != drbd_helper)
2333
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2334
                      "wrong drbd usermode helper: %s", payload)
2335

    
2336
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2337
                      drbd_map):
2338
    """Verifies and the node DRBD status.
2339

2340
    @type ninfo: L{objects.Node}
2341
    @param ninfo: the node to check
2342
    @param nresult: the remote results for the node
2343
    @param instanceinfo: the dict of instances
2344
    @param drbd_helper: the configured DRBD usermode helper
2345
    @param drbd_map: the DRBD map as returned by
2346
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2347

2348
    """
2349
    self._VerifyNodeDrbdHelper(ninfo, nresult, drbd_helper)
2350

    
2351
    # compute the DRBD minors
2352
    node_drbd = {}
2353
    for minor, inst_uuid in drbd_map[ninfo.uuid].items():
2354
      test = inst_uuid not in instanceinfo
2355
      self._ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2356
                    "ghost instance '%s' in temporary DRBD map", inst_uuid)
2357
        # ghost instance should not be running, but otherwise we
2358
        # don't give double warnings (both ghost instance and
2359
        # unallocated minor in use)
2360
      if test:
2361
        node_drbd[minor] = (inst_uuid, False)
2362
      else:
2363
        instance = instanceinfo[inst_uuid]
2364
        node_drbd[minor] = (inst_uuid, instance.disks_active)
2365

    
2366
    # and now check them
2367
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2368
    test = not isinstance(used_minors, (tuple, list))
2369
    self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2370
                  "cannot parse drbd status file: %s", str(used_minors))
2371
    if test:
2372
      # we cannot check drbd status
2373
      return
2374

    
2375
    for minor, (inst_uuid, must_exist) in node_drbd.items():
2376
      test = minor not in used_minors and must_exist
2377
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2378
                    "drbd minor %d of instance %s is not active", minor,
2379
                    self.cfg.GetInstanceName(inst_uuid))
2380
    for minor in used_minors:
2381
      test = minor not in node_drbd
2382
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2383
                    "unallocated drbd minor %d is in use", minor)
2384

    
2385
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2386
    """Builds the node OS structures.
2387

2388
    @type ninfo: L{objects.Node}
2389
    @param ninfo: the node to check
2390
    @param nresult: the remote results for the node
2391
    @param nimg: the node image object
2392

2393
    """
2394
    remote_os = nresult.get(constants.NV_OSLIST, None)
2395
    test = (not isinstance(remote_os, list) or
2396
            not compat.all(isinstance(v, list) and len(v) == 7
2397
                           for v in remote_os))
2398

    
2399
    self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2400
                  "node hasn't returned valid OS data")
2401

    
2402
    nimg.os_fail = test
2403

    
2404
    if test:
2405
      return
2406

    
2407
    os_dict = {}
2408

    
2409
    for (name, os_path, status, diagnose,
2410
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2411

    
2412
      if name not in os_dict:
2413
        os_dict[name] = []
2414

    
2415
      # parameters is a list of lists instead of list of tuples due to
2416
      # JSON lacking a real tuple type, fix it:
2417
      parameters = [tuple(v) for v in parameters]
2418
      os_dict[name].append((os_path, status, diagnose,
2419
                            set(variants), set(parameters), set(api_ver)))
2420

    
2421
    nimg.oslist = os_dict
2422

    
2423
  def _VerifyNodeOS(self, ninfo, nimg, base):
2424
    """Verifies the node OS list.
2425

2426
    @type ninfo: L{objects.Node}
2427
    @param ninfo: the node to check
2428
    @param nimg: the node image object
2429
    @param base: the 'template' node we match against (e.g. from the master)
2430

2431
    """
2432
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2433

    
2434
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2435
    for os_name, os_data in nimg.oslist.items():
2436
      assert os_data, "Empty OS status for OS %s?!" % os_name
2437
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2438
      self._ErrorIf(not f_status, constants.CV_ENODEOS, ninfo.name,
2439
                    "Invalid OS %s (located at %s): %s",
2440
                    os_name, f_path, f_diag)
2441
      self._ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, ninfo.name,
2442
                    "OS '%s' has multiple entries"
2443
                    " (first one shadows the rest): %s",
2444
                    os_name, utils.CommaJoin([v[0] for v in os_data]))
2445
      # comparisons with the 'base' image
2446
      test = os_name not in base.oslist
2447
      self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2448
                    "Extra OS %s not present on reference node (%s)",
2449
                    os_name, self.cfg.GetNodeName(base.uuid))
2450
      if test:
2451
        continue
2452
      assert base.oslist[os_name], "Base node has empty OS status?"
2453
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2454
      if not b_status:
2455
        # base OS is invalid, skipping
2456
        continue
2457
      for kind, a, b in [("API version", f_api, b_api),
2458
                         ("variants list", f_var, b_var),
2459
                         ("parameters", beautify_params(f_param),
2460
                          beautify_params(b_param))]:
2461
        self._ErrorIf(a != b, constants.CV_ENODEOS, ninfo.name,
2462
                      "OS %s for %s differs from reference node %s:"
2463
                      " [%s] vs. [%s]", kind, os_name,
2464
                      self.cfg.GetNodeName(base.uuid),
2465
                      utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2466

    
2467
    # check any missing OSes
2468
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2469
    self._ErrorIf(missing, constants.CV_ENODEOS, ninfo.name,
2470
                  "OSes present on reference node %s"
2471
                  " but missing on this node: %s",
2472
                  self.cfg.GetNodeName(base.uuid), utils.CommaJoin(missing))
2473

    
2474
  def _VerifyAcceptedFileStoragePaths(self, ninfo, nresult, is_master):
2475
    """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2476

2477
    @type ninfo: L{objects.Node}
2478
    @param ninfo: the node to check
2479
    @param nresult: the remote results for the node
2480
    @type is_master: bool
2481
    @param is_master: Whether node is the master node
2482

2483
    """
2484
    cluster = self.cfg.GetClusterInfo()
2485
    if (is_master and
2486
        (cluster.IsFileStorageEnabled() or
2487
         cluster.IsSharedFileStorageEnabled())):
2488
      try:
2489
        fspaths = nresult[constants.NV_ACCEPTED_STORAGE_PATHS]
2490
      except KeyError:
2491
        # This should never happen
2492
        self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2493
                      "Node did not return forbidden file storage paths")
2494
      else:
2495
        self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2496
                      "Found forbidden file storage paths: %s",
2497
                      utils.CommaJoin(fspaths))
2498
    else:
2499
      self._ErrorIf(constants.NV_ACCEPTED_STORAGE_PATHS in nresult,
2500
                    constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2501
                    "Node should not have returned forbidden file storage"
2502
                    " paths")
2503

    
2504
  def _VerifyStoragePaths(self, ninfo, nresult, file_disk_template,
2505
                          verify_key, error_key):
2506
    """Verifies (file) storage paths.
2507

2508
    @type ninfo: L{objects.Node}
2509
    @param ninfo: the node to check
2510
    @param nresult: the remote results for the node
2511
    @type file_disk_template: string
2512
    @param file_disk_template: file-based disk template, whose directory
2513
        is supposed to be verified
2514
    @type verify_key: string
2515
    @param verify_key: key for the verification map of this file
2516
        verification step
2517
    @param error_key: error key to be added to the verification results
2518
        in case something goes wrong in this verification step
2519

2520
    """
2521
    assert (file_disk_template in
2522
            utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
2523
    cluster = self.cfg.GetClusterInfo()
2524
    if cluster.IsDiskTemplateEnabled(file_disk_template):
2525
      self._ErrorIf(
2526
          verify_key in nresult,
2527
          error_key, ninfo.name,
2528
          "The configured %s storage path is unusable: %s" %
2529
          (file_disk_template, nresult.get(verify_key)))
2530

    
2531
  def _VerifyFileStoragePaths(self, ninfo, nresult):
2532
    """Verifies (file) storage paths.
2533

2534
    @see: C{_VerifyStoragePaths}
2535

2536
    """
2537
    self._VerifyStoragePaths(
2538
        ninfo, nresult, constants.DT_FILE,
2539
        constants.NV_FILE_STORAGE_PATH,
2540
        constants.CV_ENODEFILESTORAGEPATHUNUSABLE)
2541

    
2542
  def _VerifySharedFileStoragePaths(self, ninfo, nresult):
2543
    """Verifies (file) storage paths.
2544

2545
    @see: C{_VerifyStoragePaths}
2546

2547
    """
2548
    self._VerifyStoragePaths(
2549
        ninfo, nresult, constants.DT_SHARED_FILE,
2550
        constants.NV_SHARED_FILE_STORAGE_PATH,
2551
        constants.CV_ENODESHAREDFILESTORAGEPATHUNUSABLE)
2552

    
2553
  def _VerifyOob(self, ninfo, nresult):
2554
    """Verifies out of band functionality of a node.
2555

2556
    @type ninfo: L{objects.Node}
2557
    @param ninfo: the node to check
2558
    @param nresult: the remote results for the node
2559

2560
    """
2561
    # We just have to verify the paths on master and/or master candidates
2562
    # as the oob helper is invoked on the master
2563
    if ((ninfo.master_candidate or ninfo.master_capable) and
2564
        constants.NV_OOB_PATHS in nresult):
2565
      for path_result in nresult[constants.NV_OOB_PATHS]:
2566
        self._ErrorIf(path_result, constants.CV_ENODEOOBPATH,
2567
                      ninfo.name, path_result)
2568

    
2569
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2570
    """Verifies and updates the node volume data.
2571

2572
    This function will update a L{NodeImage}'s internal structures
2573
    with data from the remote call.
2574

2575
    @type ninfo: L{objects.Node}
2576
    @param ninfo: the node to check
2577
    @param nresult: the remote results for the node
2578
    @param nimg: the node image object
2579
    @param vg_name: the configured VG name
2580

2581
    """
2582
    nimg.lvm_fail = True
2583
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2584
    if vg_name is None:
2585
      pass
2586
    elif isinstance(lvdata, basestring):
2587
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2588
                    "LVM problem on node: %s", utils.SafeEncode(lvdata))
2589
    elif not isinstance(lvdata, dict):
2590
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2591
                    "rpc call to node failed (lvlist)")
2592
    else:
2593
      nimg.volumes = lvdata
2594
      nimg.lvm_fail = False
2595

    
2596
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2597
    """Verifies and updates the node instance list.
2598

2599
    If the listing was successful, then updates this node's instance
2600
    list. Otherwise, it marks the RPC call as failed for the instance
2601
    list key.
2602

2603
    @type ninfo: L{objects.Node}
2604
    @param ninfo: the node to check
2605
    @param nresult: the remote results for the node
2606
    @param nimg: the node image object
2607

2608
    """
2609
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2610
    test = not isinstance(idata, list)
2611
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2612
                  "rpc call to node failed (instancelist): %s",
2613
                  utils.SafeEncode(str(idata)))
2614
    if test:
2615
      nimg.hyp_fail = True
2616
    else:
2617
      nimg.instances = [inst.uuid for (_, inst) in
2618
                        self.cfg.GetMultiInstanceInfoByName(idata)]
2619

    
2620
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2621
    """Verifies and computes a node information map
2622

2623
    @type ninfo: L{objects.Node}
2624
    @param ninfo: the node to check
2625
    @param nresult: the remote results for the node
2626
    @param nimg: the node image object
2627
    @param vg_name: the configured VG name
2628

2629
    """
2630
    # try to read free memory (from the hypervisor)
2631
    hv_info = nresult.get(constants.NV_HVINFO, None)
2632
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2633
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2634
                  "rpc call to node failed (hvinfo)")
2635
    if not test:
2636
      try:
2637
        nimg.mfree = int(hv_info["memory_free"])
2638
      except (ValueError, TypeError):
2639
        self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2640
                      "node returned invalid nodeinfo, check hypervisor")
2641

    
2642
    # FIXME: devise a free space model for file based instances as well
2643
    if vg_name is not None:
2644
      test = (constants.NV_VGLIST not in nresult or
2645
              vg_name not in nresult[constants.NV_VGLIST])
2646
      self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
2647
                    "node didn't return data for the volume group '%s'"
2648
                    " - it is either missing or broken", vg_name)
2649
      if not test:
2650
        try:
2651
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2652
        except (ValueError, TypeError):
2653
          self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2654
                        "node returned invalid LVM info, check LVM status")
2655

    
2656
  def _CollectDiskInfo(self, node_uuids, node_image, instanceinfo):
2657
    """Gets per-disk status information for all instances.
2658

2659
    @type node_uuids: list of strings
2660
    @param node_uuids: Node UUIDs
2661
    @type node_image: dict of (UUID, L{objects.Node})
2662
    @param node_image: Node objects
2663
    @type instanceinfo: dict of (UUID, L{objects.Instance})
2664
    @param instanceinfo: Instance objects
2665
    @rtype: {instance: {node: [(succes, payload)]}}
2666
    @return: a dictionary of per-instance dictionaries with nodes as
2667
        keys and disk information as values; the disk information is a
2668
        list of tuples (success, payload)
2669

2670
    """
2671
    node_disks = {}
2672
    node_disks_dev_inst_only = {}
2673
    diskless_instances = set()
2674
    diskless = constants.DT_DISKLESS
2675

    
2676
    for nuuid in node_uuids:
2677
      node_inst_uuids = list(itertools.chain(node_image[nuuid].pinst,
2678
                                             node_image[nuuid].sinst))
2679
      diskless_instances.update(uuid for uuid in node_inst_uuids
2680
                                if instanceinfo[uuid].disk_template == diskless)
2681
      disks = [(inst_uuid, disk)
2682
               for inst_uuid in node_inst_uuids
2683
               for disk in instanceinfo[inst_uuid].disks]
2684

    
2685
      if not disks:
2686
        # No need to collect data
2687
        continue
2688

    
2689
      node_disks[nuuid] = disks
2690

    
2691
      # _AnnotateDiskParams makes already copies of the disks
2692
      dev_inst_only = []
2693
      for (inst_uuid, dev) in disks:
2694
        (anno_disk,) = AnnotateDiskParams(instanceinfo[inst_uuid], [dev],
2695
                                          self.cfg)
2696
        dev_inst_only.append((anno_disk, instanceinfo[inst_uuid]))
2697

    
2698
      node_disks_dev_inst_only[nuuid] = dev_inst_only
2699

    
2700
    assert len(node_disks) == len(node_disks_dev_inst_only)
2701

    
2702
    # Collect data from all nodes with disks
2703
    result = self.rpc.call_blockdev_getmirrorstatus_multi(
2704
               node_disks.keys(), node_disks_dev_inst_only)
2705

    
2706
    assert len(result) == len(node_disks)
2707

    
2708
    instdisk = {}
2709

    
2710
    for (nuuid, nres) in result.items():
2711
      node = self.cfg.GetNodeInfo(nuuid)
2712
      disks = node_disks[node.uuid]
2713

    
2714
      if nres.offline:
2715
        # No data from this node
2716
        data = len(disks) * [(False, "node offline")]
2717
      else:
2718
        msg = nres.fail_msg
2719
        self._ErrorIf(msg, constants.CV_ENODERPC, node.name,
2720
                      "while getting disk information: %s", msg)
2721
        if msg:
2722
          # No data from this node
2723
          data = len(disks) * [(False, msg)]
2724
        else:
2725
          data = []
2726
          for idx, i in enumerate(nres.payload):
2727
            if isinstance(i, (tuple, list)) and len(i) == 2:
2728
              data.append(i)
2729
            else:
2730
              logging.warning("Invalid result from node %s, entry %d: %s",
2731
                              node.name, idx, i)
2732
              data.append((False, "Invalid result from the remote node"))
2733

    
2734
      for ((inst_uuid, _), status) in zip(disks, data):
2735
        instdisk.setdefault(inst_uuid, {}).setdefault(node.uuid, []) \
2736
          .append(status)
2737

    
2738
    # Add empty entries for diskless instances.
2739
    for inst_uuid in diskless_instances:
2740
      assert inst_uuid not in instdisk
2741
      instdisk[inst_uuid] = {}
2742

    
2743
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2744
                      len(nuuids) <= len(instanceinfo[inst].all_nodes) and
2745
                      compat.all(isinstance(s, (tuple, list)) and
2746
                                 len(s) == 2 for s in statuses)
2747
                      for inst, nuuids in instdisk.items()
2748
                      for nuuid, statuses in nuuids.items())
2749
    if __debug__:
2750
      instdisk_keys = set(instdisk)
2751
      instanceinfo_keys = set(instanceinfo)
2752
      assert instdisk_keys == instanceinfo_keys, \
2753
        ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2754
         (instdisk_keys, instanceinfo_keys))
2755

    
2756
    return instdisk
2757

    
2758
  @staticmethod
2759
  def _SshNodeSelector(group_uuid, all_nodes):
2760
    """Create endless iterators for all potential SSH check hosts.
2761

2762
    """
2763
    nodes = [node for node in all_nodes
2764
             if (node.group != group_uuid and
2765
                 not node.offline)]
2766
    keyfunc = operator.attrgetter("group")
2767

    
2768
    return map(itertools.cycle,
2769
               [sorted(map(operator.attrgetter("name"), names))
2770
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2771
                                                  keyfunc)])
2772

    
2773
  @classmethod
2774
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2775
    """Choose which nodes should talk to which other nodes.
2776

2777
    We will make nodes contact all nodes in their group, and one node from
2778
    every other group.
2779

2780
    @warning: This algorithm has a known issue if one node group is much
2781
      smaller than others (e.g. just one node). In such a case all other
2782
      nodes will talk to the single node.
2783

2784
    """
2785
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2786
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
2787

    
2788
    return (online_nodes,
2789
            dict((name, sorted([i.next() for i in sel]))
2790
                 for name in online_nodes))
2791

    
2792
  def BuildHooksEnv(self):
2793
    """Build hooks env.
2794

2795
    Cluster-Verify hooks just ran in the post phase and their failure makes
2796
    the output be logged in the verify output and the verification to fail.
2797

2798
    """
2799
    env = {
2800
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2801
      }
2802

    
2803
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2804
               for node in self.my_node_info.values())
2805

    
2806
    return env
2807

    
2808
  def BuildHooksNodes(self):
2809
    """Build hooks nodes.
2810

2811
    """
2812
    return ([], list(self.my_node_info.keys()))
2813

    
2814
  def Exec(self, feedback_fn):
2815
    """Verify integrity of the node group, performing various test on nodes.
2816

2817
    """
2818
    # This method has too many local variables. pylint: disable=R0914
2819
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
2820

    
2821
    if not self.my_node_uuids:
2822
      # empty node group
2823
      feedback_fn("* Empty node group, skipping verification")
2824
      return True
2825

    
2826
    self.bad = False
2827
    verbose = self.op.verbose
2828
    self._feedback_fn = feedback_fn
2829

    
2830
    vg_name = self.cfg.GetVGName()
2831
    drbd_helper = self.cfg.GetDRBDHelper()
2832
    cluster = self.cfg.GetClusterInfo()
2833
    hypervisors = cluster.enabled_hypervisors
2834
    node_data_list = self.my_node_info.values()
2835

    
2836
    i_non_redundant = [] # Non redundant instances
2837
    i_non_a_balanced = [] # Non auto-balanced instances
2838
    i_offline = 0 # Count of offline instances
2839
    n_offline = 0 # Count of offline nodes
2840
    n_drained = 0 # Count of nodes being drained
2841
    node_vol_should = {}
2842

    
2843
    # FIXME: verify OS list
2844

    
2845
    # File verification
2846
    filemap = ComputeAncillaryFiles(cluster, False)
2847

    
2848
    # do local checksums
2849
    master_node_uuid = self.master_node = self.cfg.GetMasterNode()
2850
    master_ip = self.cfg.GetMasterIP()
2851

    
2852
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_uuids))
2853

    
2854
    user_scripts = []
2855
    if self.cfg.GetUseExternalMipScript():
2856
      user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2857

    
2858
    node_verify_param = {
2859
      constants.NV_FILELIST:
2860
        map(vcluster.MakeVirtualPath,
2861
            utils.UniqueSequence(filename
2862
                                 for files in filemap
2863
                                 for filename in files)),
2864
      constants.NV_NODELIST:
2865
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2866
                                  self.all_node_info.values()),
2867
      constants.NV_HYPERVISOR: hypervisors,
2868
      constants.NV_HVPARAMS:
2869
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2870
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2871
                                 for node in node_data_list
2872
                                 if not node.offline],
2873
      constants.NV_INSTANCELIST: hypervisors,
2874
      constants.NV_VERSION: None,
2875
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2876
      constants.NV_NODESETUP: None,
2877
      constants.NV_TIME: None,
2878
      constants.NV_MASTERIP: (self.cfg.GetMasterNodeName(), master_ip),
2879
      constants.NV_OSLIST: None,
2880
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2881
      constants.NV_USERSCRIPTS: user_scripts,
2882
      }
2883

    
2884
    if vg_name is not None:
2885
      node_verify_param[constants.NV_VGLIST] = None
2886
      node_verify_param[constants.NV_LVLIST] = vg_name
2887
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2888

    
2889
    if cluster.IsDiskTemplateEnabled(constants.DT_DRBD8):
2890
      if drbd_helper:
2891
        node_verify_param[constants.NV_DRBDVERSION] = None
2892
        node_verify_param[constants.NV_DRBDLIST] = None
2893
        node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2894

    
2895
    if cluster.IsFileStorageEnabled() or \
2896
        cluster.IsSharedFileStorageEnabled():
2897
      # Load file storage paths only from master node
2898
      node_verify_param[constants.NV_ACCEPTED_STORAGE_PATHS] = \
2899
        self.cfg.GetMasterNodeName()
2900
      if cluster.IsFileStorageEnabled():
2901
        node_verify_param[constants.NV_FILE_STORAGE_PATH] = \
2902
          cluster.file_storage_dir
2903

    
2904
    # bridge checks
2905
    # FIXME: this needs to be changed per node-group, not cluster-wide
2906
    bridges = set()
2907
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2908
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2909
      bridges.add(default_nicpp[constants.NIC_LINK])
2910
    for inst_uuid in self.my_inst_info.values():
2911
      for nic in inst_uuid.nics:
2912
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
2913
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2914
          bridges.add(full_nic[constants.NIC_LINK])
2915

    
2916
    if bridges:
2917
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
2918

    
2919
    # Build our expected cluster state
2920
    node_image = dict((node.uuid, self.NodeImage(offline=node.offline,
2921
                                                 uuid=node.uuid,
2922
                                                 vm_capable=node.vm_capable))
2923
                      for node in node_data_list)
2924

    
2925
    # Gather OOB paths
2926
    oob_paths = []
2927
    for node in self.all_node_info.values():
2928
      path = SupportsOob(self.cfg, node)
2929
      if path and path not in oob_paths:
2930
        oob_paths.append(path)
2931

    
2932
    if oob_paths:
2933
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2934

    
2935
    for inst_uuid in self.my_inst_uuids:
2936
      instance = self.my_inst_info[inst_uuid]
2937
      if instance.admin_state == constants.ADMINST_OFFLINE:
2938
        i_offline += 1
2939

    
2940
      for nuuid in instance.all_nodes:
2941
        if nuuid not in node_image:
2942
          gnode = self.NodeImage(uuid=nuuid)
2943
          gnode.ghost = (nuuid not in self.all_node_info)
2944
          node_image[nuuid] = gnode
2945

    
2946
      instance.MapLVsByNode(node_vol_should)
2947

    
2948
      pnode = instance.primary_node
2949
      node_image[pnode].pinst.append(instance.uuid)
2950

    
2951
      for snode in instance.secondary_nodes:
2952
        nimg = node_image[snode]
2953
        nimg.sinst.append(instance.uuid)
2954
        if pnode not in nimg.sbp:
2955
          nimg.sbp[pnode] = []
2956
        nimg.sbp[pnode].append(instance.uuid)
2957

    
2958
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
2959
                                               self.my_node_info.keys())
2960
    # The value of exclusive_storage should be the same across the group, so if
2961
    # it's True for at least a node, we act as if it were set for all the nodes
2962
    self._exclusive_storage = compat.any(es_flags.values())
2963
    if self._exclusive_storage:
2964
      node_verify_param[constants.NV_EXCLUSIVEPVS] = True
2965

    
2966
    # At this point, we have the in-memory data structures complete,
2967
    # except for the runtime information, which we'll gather next
2968

    
2969
    # Due to the way our RPC system works, exact response times cannot be
2970
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2971
    # time before and after executing the request, we can at least have a time
2972
    # window.
2973
    nvinfo_starttime = time.time()
2974
    all_nvinfo = self.rpc.call_node_verify(self.my_node_uuids,
2975
                                           node_verify_param,
2976
                                           self.cfg.GetClusterName(),
2977
                                           self.cfg.GetClusterInfo().hvparams)
2978
    nvinfo_endtime = time.time()
2979

    
2980
    if self.extra_lv_nodes and vg_name is not None:
2981
      extra_lv_nvinfo = \
2982
          self.rpc.call_node_verify(self.extra_lv_nodes,
2983
                                    {constants.NV_LVLIST: vg_name},
2984
                                    self.cfg.GetClusterName(),
2985
                                    self.cfg.GetClusterInfo().hvparams)
2986
    else:
2987
      extra_lv_nvinfo = {}
2988

    
2989
    all_drbd_map = self.cfg.ComputeDRBDMap()
2990

    
2991
    feedback_fn("* Gathering disk information (%s nodes)" %
2992
                len(self.my_node_uuids))
2993
    instdisk = self._CollectDiskInfo(self.my_node_info.keys(), node_image,
2994
                                     self.my_inst_info)
2995

    
2996
    feedback_fn("* Verifying configuration file consistency")
2997

    
2998
    # If not all nodes are being checked, we need to make sure the master node
2999
    # and a non-checked vm_capable node are in the list.
3000
    absent_node_uuids = set(self.all_node_info).difference(self.my_node_info)
3001
    if absent_node_uuids:
3002
      vf_nvinfo = all_nvinfo.copy()
3003
      vf_node_info = list(self.my_node_info.values())
3004
      additional_node_uuids = []
3005
      if master_node_uuid not in self.my_node_info:
3006
        additional_node_uuids.append(master_node_uuid)
3007
        vf_node_info.append(self.all_node_info[master_node_uuid])
3008
      # Add the first vm_capable node we find which is not included,
3009
      # excluding the master node (which we already have)
3010
      for node_uuid in absent_node_uuids:
3011
        nodeinfo = self.all_node_info[node_uuid]
3012
        if (nodeinfo.vm_capable and not nodeinfo.offline and
3013
            node_uuid != master_node_uuid):
3014
          additional_node_uuids.append(node_uuid)
3015
          vf_node_info.append(self.all_node_info[node_uuid])
3016
          break
3017
      key = constants.NV_FILELIST
3018
      vf_nvinfo.update(self.rpc.call_node_verify(
3019
         additional_node_uuids, {key: node_verify_param[key]},
3020
         self.cfg.GetClusterName(), self.cfg.GetClusterInfo().hvparams))
3021
    else:
3022
      vf_nvinfo = all_nvinfo
3023
      vf_node_info = self.my_node_info.values()
3024

    
3025
    self._VerifyFiles(vf_node_info, master_node_uuid, vf_nvinfo, filemap)
3026

    
3027
    feedback_fn("* Verifying node status")
3028

    
3029
    refos_img = None
3030

    
3031
    for node_i in node_data_list:
3032
      nimg = node_image[node_i.uuid]
3033

    
3034
      if node_i.offline:
3035
        if verbose:
3036
          feedback_fn("* Skipping offline node %s" % (node_i.name,))
3037
        n_offline += 1
3038
        continue
3039

    
3040
      if node_i.uuid == master_node_uuid:
3041
        ntype = "master"
3042
      elif node_i.master_candidate:
3043
        ntype = "master candidate"
3044
      elif node_i.drained:
3045
        ntype = "drained"
3046
        n_drained += 1
3047
      else:
3048
        ntype = "regular"
3049
      if verbose:
3050
        feedback_fn("* Verifying node %s (%s)" % (node_i.name, ntype))
3051

    
3052
      msg = all_nvinfo[node_i.uuid].fail_msg
3053
      self._ErrorIf(msg, constants.CV_ENODERPC, node_i.name,
3054
                    "while contacting node: %s", msg)
3055
      if msg:
3056
        nimg.rpc_fail = True
3057
        continue
3058

    
3059
      nresult = all_nvinfo[node_i.uuid].payload
3060

    
3061
      nimg.call_ok = self._VerifyNode(node_i, nresult)
3062
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
3063
      self._VerifyNodeNetwork(node_i, nresult)
3064
      self._VerifyNodeUserScripts(node_i, nresult)
3065
      self._VerifyOob(node_i, nresult)
3066
      self._VerifyAcceptedFileStoragePaths(node_i, nresult,
3067
                                           node_i.uuid == master_node_uuid)
3068
      self._VerifyFileStoragePaths(node_i, nresult)
3069
      self._VerifySharedFileStoragePaths(node_i, nresult)
3070

    
3071
      if nimg.vm_capable:
3072
        self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
3073
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
3074
                             all_drbd_map)
3075

    
3076
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
3077
        self._UpdateNodeInstances(node_i, nresult, nimg)
3078
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
3079
        self._UpdateNodeOS(node_i, nresult, nimg)
3080

    
3081
        if not nimg.os_fail:
3082
          if refos_img is None:
3083
            refos_img = nimg
3084
          self._VerifyNodeOS(node_i, nimg, refos_img)
3085
        self._VerifyNodeBridges(node_i, nresult, bridges)
3086

    
3087
        # Check whether all running instances are primary for the node. (This
3088
        # can no longer be done from _VerifyInstance below, since some of the
3089
        # wrong instances could be from other node groups.)
3090
        non_primary_inst_uuids = set(nimg.instances).difference(nimg.pinst)
3091

    
3092
        for inst_uuid in non_primary_inst_uuids:
3093
          test = inst_uuid in self.all_inst_info
3094
          self._ErrorIf(test, constants.CV_EINSTANCEWRONGNODE,
3095
                        self.cfg.GetInstanceName(inst_uuid),
3096
                        "instance should not run on node %s", node_i.name)
3097
          self._ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
3098
                        "node is running unknown instance %s", inst_uuid)
3099

    
3100
    self._VerifyGroupDRBDVersion(all_nvinfo)
3101
    self._VerifyGroupLVM(node_image, vg_name)
3102

    
3103
    for node_uuid, result in extra_lv_nvinfo.items():
3104
      self._UpdateNodeVolumes(self.all_node_info[node_uuid], result.payload,
3105
                              node_image[node_uuid], vg_name)
3106

    
3107
    feedback_fn("* Verifying instance status")
3108
    for inst_uuid in self.my_inst_uuids:
3109
      instance = self.my_inst_info[inst_uuid]
3110
      if verbose:
3111
        feedback_fn("* Verifying instance %s" % instance.name)
3112
      self._VerifyInstance(instance, node_image, instdisk[inst_uuid])
3113

    
3114
      # If the instance is non-redundant we cannot survive losing its primary
3115
      # node, so we are not N+1 compliant.
3116
      if instance.disk_template not in constants.DTS_MIRRORED:
3117
        i_non_redundant.append(instance)
3118

    
3119
      if not cluster.FillBE(instance)[constants.BE_AUTO_BALANCE]:
3120
        i_non_a_balanced.append(instance)
3121

    
3122
    feedback_fn("* Verifying orphan volumes")
3123
    reserved = utils.FieldSet(*cluster.reserved_lvs)
3124

    
3125
    # We will get spurious "unknown volume" warnings if any node of this group
3126
    # is secondary for an instance whose primary is in another group. To avoid
3127
    # them, we find these instances and add their volumes to node_vol_should.
3128
    for instance in self.all_inst_info.values():
3129
      for secondary in instance.secondary_nodes:
3130
        if (secondary in self.my_node_info
3131
            and instance.name not in self.my_inst_info):
3132
          instance.MapLVsByNode(node_vol_should)
3133
          break
3134

    
3135
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
3136

    
3137
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
3138
      feedback_fn("* Verifying N+1 Memory redundancy")
3139
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
3140

    
3141
    feedback_fn("* Other Notes")
3142
    if i_non_redundant:
3143
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
3144
                  % len(i_non_redundant))
3145

    
3146
    if i_non_a_balanced:
3147
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
3148
                  % len(i_non_a_balanced))
3149

    
3150
    if i_offline:
3151
      feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
3152

    
3153
    if n_offline:
3154
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
3155

    
3156
    if n_drained:
3157
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
3158

    
3159
    return not self.bad
3160

    
3161
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
3162
    """Analyze the post-hooks' result
3163

3164
    This method analyses the hook result, handles it, and sends some
3165
    nicely-formatted feedback back to the user.
3166

3167
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
3168
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
3169
    @param hooks_results: the results of the multi-node hooks rpc call
3170
    @param feedback_fn: function used send feedback back to the caller
3171
    @param lu_result: previous Exec result
3172
    @return: the new Exec result, based on the previous result
3173
        and hook results
3174

3175
    """
3176
    # We only really run POST phase hooks, only for non-empty groups,
3177
    # and are only interested in their results
3178
    if not self.my_node_uuids:
3179
      # empty node group
3180
      pass
3181
    elif phase == constants.HOOKS_PHASE_POST:
3182
      # Used to change hooks' output to proper indentation
3183
      feedback_fn("* Hooks Results")
3184
      assert hooks_results, "invalid result from hooks"
3185

    
3186
      for node_name in hooks_results:
3187
        res = hooks_results[node_name]
3188
        msg = res.fail_msg
3189
        test = msg and not res.offline
3190
        self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3191
                      "Communication failure in hooks execution: %s", msg)
3192
        if res.offline or msg:
3193
          # No need to investigate payload if node is offline or gave
3194
          # an error.
3195
          continue
3196
        for script, hkr, output in res.payload:
3197
          test = hkr == constants.HKR_FAIL
3198
          self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3199
                        "Script %s failed, output:", script)
3200
          if test:
3201
            output = self._HOOKS_INDENT_RE.sub("      ", output)
3202
            feedback_fn("%s" % output)
3203
            lu_result = False
3204

    
3205
    return lu_result
3206

    
3207

    
3208
class LUClusterVerifyDisks(NoHooksLU):
3209
  """Verifies the cluster disks status.
3210

3211
  """
3212
  REQ_BGL = False
3213

    
3214
  def ExpandNames(self):
3215
    self.share_locks = ShareAll()
3216
    self.needed_locks = {
3217
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
3218
      }
3219

    
3220
  def Exec(self, feedback_fn):
3221
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3222

    
3223
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3224
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3225
                           for group in group_names])