Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / cluster.py @ 0c3d9c7c

History | View | Annotate | Download (119.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with the cluster."""
23

    
24
import OpenSSL
25

    
26
import copy
27
import itertools
28
import logging
29
import operator
30
import os
31
import re
32
import time
33

    
34
from ganeti import compat
35
from ganeti import constants
36
from ganeti import errors
37
from ganeti import hypervisor
38
from ganeti import locking
39
from ganeti import masterd
40
from ganeti import netutils
41
from ganeti import objects
42
from ganeti import opcodes
43
from ganeti import pathutils
44
from ganeti import query
45
from ganeti import rpc
46
from ganeti import runtime
47
from ganeti import ssh
48
from ganeti import uidpool
49
from ganeti import utils
50
from ganeti import vcluster
51

    
52
from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
53
  ResultWithJobs
54
from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55
  ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56
  GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57
  GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58
  CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59
  ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob, \
60
  CheckIpolicyVsDiskTemplates
61

    
62
import ganeti.masterd.instance
63

    
64

    
65
class LUClusterActivateMasterIp(NoHooksLU):
66
  """Activate the master IP on the master node.
67

68
  """
69
  def Exec(self, feedback_fn):
70
    """Activate the master IP.
71

72
    """
73
    master_params = self.cfg.GetMasterNetworkParameters()
74
    ems = self.cfg.GetUseExternalMipScript()
75
    result = self.rpc.call_node_activate_master_ip(master_params.uuid,
76
                                                   master_params, ems)
77
    result.Raise("Could not activate the master IP")
78

    
79

    
80
class LUClusterDeactivateMasterIp(NoHooksLU):
81
  """Deactivate the master IP on the master node.
82

83
  """
84
  def Exec(self, feedback_fn):
85
    """Deactivate the master IP.
86

87
    """
88
    master_params = self.cfg.GetMasterNetworkParameters()
89
    ems = self.cfg.GetUseExternalMipScript()
90
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
91
                                                     master_params, ems)
92
    result.Raise("Could not deactivate the master IP")
93

    
94

    
95
class LUClusterConfigQuery(NoHooksLU):
96
  """Return configuration values.
97

98
  """
99
  REQ_BGL = False
100

    
101
  def CheckArguments(self):
102
    self.cq = ClusterQuery(None, self.op.output_fields, False)
103

    
104
  def ExpandNames(self):
105
    self.cq.ExpandNames(self)
106

    
107
  def DeclareLocks(self, level):
108
    self.cq.DeclareLocks(self, level)
109

    
110
  def Exec(self, feedback_fn):
111
    result = self.cq.OldStyleQuery(self)
112

    
113
    assert len(result) == 1
114

    
115
    return result[0]
116

    
117

    
118
class LUClusterDestroy(LogicalUnit):
119
  """Logical unit for destroying the cluster.
120

121
  """
122
  HPATH = "cluster-destroy"
123
  HTYPE = constants.HTYPE_CLUSTER
124

    
125
  def BuildHooksEnv(self):
126
    """Build hooks env.
127

128
    """
129
    return {
130
      "OP_TARGET": self.cfg.GetClusterName(),
131
      }
132

    
133
  def BuildHooksNodes(self):
134
    """Build hooks nodes.
135

136
    """
137
    return ([], [])
138

    
139
  def CheckPrereq(self):
140
    """Check prerequisites.
141

142
    This checks whether the cluster is empty.
143

144
    Any errors are signaled by raising errors.OpPrereqError.
145

146
    """
147
    master = self.cfg.GetMasterNode()
148

    
149
    nodelist = self.cfg.GetNodeList()
150
    if len(nodelist) != 1 or nodelist[0] != master:
151
      raise errors.OpPrereqError("There are still %d node(s) in"
152
                                 " this cluster." % (len(nodelist) - 1),
153
                                 errors.ECODE_INVAL)
154
    instancelist = self.cfg.GetInstanceList()
155
    if instancelist:
156
      raise errors.OpPrereqError("There are still %d instance(s) in"
157
                                 " this cluster." % len(instancelist),
158
                                 errors.ECODE_INVAL)
159

    
160
  def Exec(self, feedback_fn):
161
    """Destroys the cluster.
162

163
    """
164
    master_params = self.cfg.GetMasterNetworkParameters()
165

    
166
    # Run post hooks on master node before it's removed
167
    RunPostHook(self, self.cfg.GetNodeName(master_params.uuid))
168

    
169
    ems = self.cfg.GetUseExternalMipScript()
170
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
171
                                                     master_params, ems)
172
    result.Warn("Error disabling the master IP address", self.LogWarning)
173
    return master_params.uuid
174

    
175

    
176
class LUClusterPostInit(LogicalUnit):
177
  """Logical unit for running hooks after cluster initialization.
178

179
  """
180
  HPATH = "cluster-init"
181
  HTYPE = constants.HTYPE_CLUSTER
182

    
183
  def BuildHooksEnv(self):
184
    """Build hooks env.
185

186
    """
187
    return {
188
      "OP_TARGET": self.cfg.GetClusterName(),
189
      }
190

    
191
  def BuildHooksNodes(self):
192
    """Build hooks nodes.
193

194
    """
195
    return ([], [self.cfg.GetMasterNode()])
196

    
197
  def Exec(self, feedback_fn):
198
    """Nothing to do.
199

200
    """
201
    return True
202

    
203

    
204
class ClusterQuery(QueryBase):
205
  FIELDS = query.CLUSTER_FIELDS
206

    
207
  #: Do not sort (there is only one item)
208
  SORT_FIELD = None
209

    
210
  def ExpandNames(self, lu):
211
    lu.needed_locks = {}
212

    
213
    # The following variables interact with _QueryBase._GetNames
214
    self.wanted = locking.ALL_SET
215
    self.do_locking = self.use_locking
216

    
217
    if self.do_locking:
218
      raise errors.OpPrereqError("Can not use locking for cluster queries",
219
                                 errors.ECODE_INVAL)
220

    
221
  def DeclareLocks(self, lu, level):
222
    pass
223

    
224
  def _GetQueryData(self, lu):
225
    """Computes the list of nodes and their attributes.
226

227
    """
228
    # Locking is not used
229
    assert not (compat.any(lu.glm.is_owned(level)
230
                           for level in locking.LEVELS
231
                           if level != locking.LEVEL_CLUSTER) or
232
                self.do_locking or self.use_locking)
233

    
234
    if query.CQ_CONFIG in self.requested_data:
235
      cluster = lu.cfg.GetClusterInfo()
236
      nodes = lu.cfg.GetAllNodesInfo()
237
    else:
238
      cluster = NotImplemented
239
      nodes = NotImplemented
240

    
241
    if query.CQ_QUEUE_DRAINED in self.requested_data:
242
      drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
243
    else:
244
      drain_flag = NotImplemented
245

    
246
    if query.CQ_WATCHER_PAUSE in self.requested_data:
247
      master_node_uuid = lu.cfg.GetMasterNode()
248

    
249
      result = lu.rpc.call_get_watcher_pause(master_node_uuid)
250
      result.Raise("Can't retrieve watcher pause from master node '%s'" %
251
                   lu.cfg.GetMasterNodeName())
252

    
253
      watcher_pause = result.payload
254
    else:
255
      watcher_pause = NotImplemented
256

    
257
    return query.ClusterQueryData(cluster, nodes, drain_flag, watcher_pause)
258

    
259

    
260
class LUClusterQuery(NoHooksLU):
261
  """Query cluster configuration.
262

263
  """
264
  REQ_BGL = False
265

    
266
  def ExpandNames(self):
267
    self.needed_locks = {}
268

    
269
  def Exec(self, feedback_fn):
270
    """Return cluster config.
271

272
    """
273
    cluster = self.cfg.GetClusterInfo()
274
    os_hvp = {}
275

    
276
    # Filter just for enabled hypervisors
277
    for os_name, hv_dict in cluster.os_hvp.items():
278
      os_hvp[os_name] = {}
279
      for hv_name, hv_params in hv_dict.items():
280
        if hv_name in cluster.enabled_hypervisors:
281
          os_hvp[os_name][hv_name] = hv_params
282

    
283
    # Convert ip_family to ip_version
284
    primary_ip_version = constants.IP4_VERSION
285
    if cluster.primary_ip_family == netutils.IP6Address.family:
286
      primary_ip_version = constants.IP6_VERSION
287

    
288
    result = {
289
      "software_version": constants.RELEASE_VERSION,
290
      "protocol_version": constants.PROTOCOL_VERSION,
291
      "config_version": constants.CONFIG_VERSION,
292
      "os_api_version": max(constants.OS_API_VERSIONS),
293
      "export_version": constants.EXPORT_VERSION,
294
      "vcs_version": constants.VCS_VERSION,
295
      "architecture": runtime.GetArchInfo(),
296
      "name": cluster.cluster_name,
297
      "master": self.cfg.GetMasterNodeName(),
298
      "default_hypervisor": cluster.primary_hypervisor,
299
      "enabled_hypervisors": cluster.enabled_hypervisors,
300
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
301
                        for hypervisor_name in cluster.enabled_hypervisors]),
302
      "os_hvp": os_hvp,
303
      "beparams": cluster.beparams,
304
      "osparams": cluster.osparams,
305
      "ipolicy": cluster.ipolicy,
306
      "nicparams": cluster.nicparams,
307
      "ndparams": cluster.ndparams,
308
      "diskparams": cluster.diskparams,
309
      "candidate_pool_size": cluster.candidate_pool_size,
310
      "master_netdev": cluster.master_netdev,
311
      "master_netmask": cluster.master_netmask,
312
      "use_external_mip_script": cluster.use_external_mip_script,
313
      "volume_group_name": cluster.volume_group_name,
314
      "drbd_usermode_helper": cluster.drbd_usermode_helper,
315
      "file_storage_dir": cluster.file_storage_dir,
316
      "shared_file_storage_dir": cluster.shared_file_storage_dir,
317
      "maintain_node_health": cluster.maintain_node_health,
318
      "ctime": cluster.ctime,
319
      "mtime": cluster.mtime,
320
      "uuid": cluster.uuid,
321
      "tags": list(cluster.GetTags()),
322
      "uid_pool": cluster.uid_pool,
323
      "default_iallocator": cluster.default_iallocator,
324
      "reserved_lvs": cluster.reserved_lvs,
325
      "primary_ip_version": primary_ip_version,
326
      "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
327
      "hidden_os": cluster.hidden_os,
328
      "blacklisted_os": cluster.blacklisted_os,
329
      "enabled_disk_templates": cluster.enabled_disk_templates,
330
      }
331

    
332
    return result
333

    
334

    
335
class LUClusterRedistConf(NoHooksLU):
336
  """Force the redistribution of cluster configuration.
337

338
  This is a very simple LU.
339

340
  """
341
  REQ_BGL = False
342

    
343
  def ExpandNames(self):
344
    self.needed_locks = {
345
      locking.LEVEL_NODE: locking.ALL_SET,
346
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
347
    }
348
    self.share_locks = ShareAll()
349

    
350
  def Exec(self, feedback_fn):
351
    """Redistribute the configuration.
352

353
    """
354
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
355
    RedistributeAncillaryFiles(self)
356

    
357

    
358
class LUClusterRename(LogicalUnit):
359
  """Rename the cluster.
360

361
  """
362
  HPATH = "cluster-rename"
363
  HTYPE = constants.HTYPE_CLUSTER
364

    
365
  def BuildHooksEnv(self):
366
    """Build hooks env.
367

368
    """
369
    return {
370
      "OP_TARGET": self.cfg.GetClusterName(),
371
      "NEW_NAME": self.op.name,
372
      }
373

    
374
  def BuildHooksNodes(self):
375
    """Build hooks nodes.
376

377
    """
378
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
379

    
380
  def CheckPrereq(self):
381
    """Verify that the passed name is a valid one.
382

383
    """
384
    hostname = netutils.GetHostname(name=self.op.name,
385
                                    family=self.cfg.GetPrimaryIPFamily())
386

    
387
    new_name = hostname.name
388
    self.ip = new_ip = hostname.ip
389
    old_name = self.cfg.GetClusterName()
390
    old_ip = self.cfg.GetMasterIP()
391
    if new_name == old_name and new_ip == old_ip:
392
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
393
                                 " cluster has changed",
394
                                 errors.ECODE_INVAL)
395
    if new_ip != old_ip:
396
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
397
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
398
                                   " reachable on the network" %
399
                                   new_ip, errors.ECODE_NOTUNIQUE)
400

    
401
    self.op.name = new_name
402

    
403
  def Exec(self, feedback_fn):
404
    """Rename the cluster.
405

406
    """
407
    clustername = self.op.name
408
    new_ip = self.ip
409

    
410
    # shutdown the master IP
411
    master_params = self.cfg.GetMasterNetworkParameters()
412
    ems = self.cfg.GetUseExternalMipScript()
413
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
414
                                                     master_params, ems)
415
    result.Raise("Could not disable the master role")
416

    
417
    try:
418
      cluster = self.cfg.GetClusterInfo()
419
      cluster.cluster_name = clustername
420
      cluster.master_ip = new_ip
421
      self.cfg.Update(cluster, feedback_fn)
422

    
423
      # update the known hosts file
424
      ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
425
      node_list = self.cfg.GetOnlineNodeList()
426
      try:
427
        node_list.remove(master_params.uuid)
428
      except ValueError:
429
        pass
430
      UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
431
    finally:
432
      master_params.ip = new_ip
433
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
434
                                                     master_params, ems)
435
      result.Warn("Could not re-enable the master role on the master,"
436
                  " please restart manually", self.LogWarning)
437

    
438
    return clustername
439

    
440

    
441
class LUClusterRepairDiskSizes(NoHooksLU):
442
  """Verifies the cluster disks sizes.
443

444
  """
445
  REQ_BGL = False
446

    
447
  def ExpandNames(self):
448
    if self.op.instances:
449
      (_, self.wanted_names) = GetWantedInstances(self, self.op.instances)
450
      # Not getting the node allocation lock as only a specific set of
451
      # instances (and their nodes) is going to be acquired
452
      self.needed_locks = {
453
        locking.LEVEL_NODE_RES: [],
454
        locking.LEVEL_INSTANCE: self.wanted_names,
455
        }
456
      self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
457
    else:
458
      self.wanted_names = None
459
      self.needed_locks = {
460
        locking.LEVEL_NODE_RES: locking.ALL_SET,
461
        locking.LEVEL_INSTANCE: locking.ALL_SET,
462

    
463
        # This opcode is acquires the node locks for all instances
464
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
465
        }
466

    
467
    self.share_locks = {
468
      locking.LEVEL_NODE_RES: 1,
469
      locking.LEVEL_INSTANCE: 0,
470
      locking.LEVEL_NODE_ALLOC: 1,
471
      }
472

    
473
  def DeclareLocks(self, level):
474
    if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
475
      self._LockInstancesNodes(primary_only=True, level=level)
476

    
477
  def CheckPrereq(self):
478
    """Check prerequisites.
479

480
    This only checks the optional instance list against the existing names.
481

482
    """
483
    if self.wanted_names is None:
484
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
485

    
486
    self.wanted_instances = \
487
        map(compat.snd, self.cfg.GetMultiInstanceInfoByName(self.wanted_names))
488

    
489
  def _EnsureChildSizes(self, disk):
490
    """Ensure children of the disk have the needed disk size.
491

492
    This is valid mainly for DRBD8 and fixes an issue where the
493
    children have smaller disk size.
494

495
    @param disk: an L{ganeti.objects.Disk} object
496

497
    """
498
    if disk.dev_type == constants.DT_DRBD8:
499
      assert disk.children, "Empty children for DRBD8?"
500
      fchild = disk.children[0]
501
      mismatch = fchild.size < disk.size
502
      if mismatch:
503
        self.LogInfo("Child disk has size %d, parent %d, fixing",
504
                     fchild.size, disk.size)
505
        fchild.size = disk.size
506

    
507
      # and we recurse on this child only, not on the metadev
508
      return self._EnsureChildSizes(fchild) or mismatch
509
    else:
510
      return False
511

    
512
  def Exec(self, feedback_fn):
513
    """Verify the size of cluster disks.
514

515
    """
516
    # TODO: check child disks too
517
    # TODO: check differences in size between primary/secondary nodes
518
    per_node_disks = {}
519
    for instance in self.wanted_instances:
520
      pnode = instance.primary_node
521
      if pnode not in per_node_disks:
522
        per_node_disks[pnode] = []
523
      for idx, disk in enumerate(instance.disks):
524
        per_node_disks[pnode].append((instance, idx, disk))
525

    
526
    assert not (frozenset(per_node_disks.keys()) -
527
                self.owned_locks(locking.LEVEL_NODE_RES)), \
528
      "Not owning correct locks"
529
    assert not self.owned_locks(locking.LEVEL_NODE)
530

    
531
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
532
                                               per_node_disks.keys())
533

    
534
    changed = []
535
    for node_uuid, dskl in per_node_disks.items():
536
      if not dskl:
537
        # no disks on the node
538
        continue
539

    
540
      newl = [(v[2].Copy(), v[0]) for v in dskl]
541
      for (dsk, _) in newl:
542
        self.cfg.SetDiskID(dsk, node_uuid)
543
      node_name = self.cfg.GetNodeName(node_uuid)
544
      result = self.rpc.call_blockdev_getdimensions(node_uuid, newl)
545
      if result.fail_msg:
546
        self.LogWarning("Failure in blockdev_getdimensions call to node"
547
                        " %s, ignoring", node_name)
548
        continue
549
      if len(result.payload) != len(dskl):
550
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
551
                        " result.payload=%s", node_name, len(dskl),
552
                        result.payload)
553
        self.LogWarning("Invalid result from node %s, ignoring node results",
554
                        node_name)
555
        continue
556
      for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
557
        if dimensions is None:
558
          self.LogWarning("Disk %d of instance %s did not return size"
559
                          " information, ignoring", idx, instance.name)
560
          continue
561
        if not isinstance(dimensions, (tuple, list)):
562
          self.LogWarning("Disk %d of instance %s did not return valid"
563
                          " dimension information, ignoring", idx,
564
                          instance.name)
565
          continue
566
        (size, spindles) = dimensions
567
        if not isinstance(size, (int, long)):
568
          self.LogWarning("Disk %d of instance %s did not return valid"
569
                          " size information, ignoring", idx, instance.name)
570
          continue
571
        size = size >> 20
572
        if size != disk.size:
573
          self.LogInfo("Disk %d of instance %s has mismatched size,"
574
                       " correcting: recorded %d, actual %d", idx,
575
                       instance.name, disk.size, size)
576
          disk.size = size
577
          self.cfg.Update(instance, feedback_fn)
578
          changed.append((instance.name, idx, "size", size))
579
        if es_flags[node_uuid]:
580
          if spindles is None:
581
            self.LogWarning("Disk %d of instance %s did not return valid"
582
                            " spindles information, ignoring", idx,
583
                            instance.name)
584
          elif disk.spindles is None or disk.spindles != spindles:
585
            self.LogInfo("Disk %d of instance %s has mismatched spindles,"
586
                         " correcting: recorded %s, actual %s",
587
                         idx, instance.name, disk.spindles, spindles)
588
            disk.spindles = spindles
589
            self.cfg.Update(instance, feedback_fn)
590
            changed.append((instance.name, idx, "spindles", disk.spindles))
591
        if self._EnsureChildSizes(disk):
592
          self.cfg.Update(instance, feedback_fn)
593
          changed.append((instance.name, idx, "size", disk.size))
594
    return changed
595

    
596

    
597
def _ValidateNetmask(cfg, netmask):
598
  """Checks if a netmask is valid.
599

600
  @type cfg: L{config.ConfigWriter}
601
  @param cfg: The cluster configuration
602
  @type netmask: int
603
  @param netmask: the netmask to be verified
604
  @raise errors.OpPrereqError: if the validation fails
605

606
  """
607
  ip_family = cfg.GetPrimaryIPFamily()
608
  try:
609
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
610
  except errors.ProgrammerError:
611
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
612
                               ip_family, errors.ECODE_INVAL)
613
  if not ipcls.ValidateNetmask(netmask):
614
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
615
                               (netmask), errors.ECODE_INVAL)
616

    
617

    
618
def CheckFileBasedStoragePathVsEnabledDiskTemplates(
619
    logging_warn_fn, file_storage_dir, enabled_disk_templates,
620
    file_disk_template):
621
  """Checks whether the given file-based storage directory is acceptable.
622

623
  Note: This function is public, because it is also used in bootstrap.py.
624

625
  @type logging_warn_fn: function
626
  @param logging_warn_fn: function which accepts a string and logs it
627
  @type file_storage_dir: string
628
  @param file_storage_dir: the directory to be used for file-based instances
629
  @type enabled_disk_templates: list of string
630
  @param enabled_disk_templates: the list of enabled disk templates
631
  @type file_disk_template: string
632
  @param file_disk_template: the file-based disk template for which the
633
      path should be checked
634

635
  """
636
  assert (file_disk_template in
637
          utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
638
  file_storage_enabled = file_disk_template in enabled_disk_templates
639
  if file_storage_dir is not None:
640
    if file_storage_dir == "":
641
      if file_storage_enabled:
642
        raise errors.OpPrereqError(
643
            "Unsetting the '%s' storage directory while having '%s' storage"
644
            " enabled is not permitted." %
645
            (file_disk_template, file_disk_template))
646
    else:
647
      if not file_storage_enabled:
648
        logging_warn_fn(
649
            "Specified a %s storage directory, although %s storage is not"
650
            " enabled." % (file_disk_template, file_disk_template))
651
  else:
652
    raise errors.ProgrammerError("Received %s storage dir with value"
653
                                 " 'None'." % file_disk_template)
654

    
655

    
656
def CheckFileStoragePathVsEnabledDiskTemplates(
657
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
658
  """Checks whether the given file storage directory is acceptable.
659

660
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
661

662
  """
663
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
664
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
665
      constants.DT_FILE)
666

    
667

    
668
def CheckSharedFileStoragePathVsEnabledDiskTemplates(
669
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
670
  """Checks whether the given shared file storage directory is acceptable.
671

672
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
673

674
  """
675
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
676
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
677
      constants.DT_SHARED_FILE)
678

    
679

    
680
class LUClusterSetParams(LogicalUnit):
681
  """Change the parameters of the cluster.
682

683
  """
684
  HPATH = "cluster-modify"
685
  HTYPE = constants.HTYPE_CLUSTER
686
  REQ_BGL = False
687

    
688
  def CheckArguments(self):
689
    """Check parameters
690

691
    """
692
    if self.op.uid_pool:
693
      uidpool.CheckUidPool(self.op.uid_pool)
694

    
695
    if self.op.add_uids:
696
      uidpool.CheckUidPool(self.op.add_uids)
697

    
698
    if self.op.remove_uids:
699
      uidpool.CheckUidPool(self.op.remove_uids)
700

    
701
    if self.op.master_netmask is not None:
702
      _ValidateNetmask(self.cfg, self.op.master_netmask)
703

    
704
    if self.op.diskparams:
705
      for dt_params in self.op.diskparams.values():
706
        utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
707
      try:
708
        utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
709
      except errors.OpPrereqError, err:
710
        raise errors.OpPrereqError("While verify diskparams options: %s" % err,
711
                                   errors.ECODE_INVAL)
712

    
713
  def ExpandNames(self):
714
    # FIXME: in the future maybe other cluster params won't require checking on
715
    # all nodes to be modified.
716
    # FIXME: This opcode changes cluster-wide settings. Is acquiring all
717
    # resource locks the right thing, shouldn't it be the BGL instead?
718
    self.needed_locks = {
719
      locking.LEVEL_NODE: locking.ALL_SET,
720
      locking.LEVEL_INSTANCE: locking.ALL_SET,
721
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
722
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
723
    }
724
    self.share_locks = ShareAll()
725

    
726
  def BuildHooksEnv(self):
727
    """Build hooks env.
728

729
    """
730
    return {
731
      "OP_TARGET": self.cfg.GetClusterName(),
732
      "NEW_VG_NAME": self.op.vg_name,
733
      }
734

    
735
  def BuildHooksNodes(self):
736
    """Build hooks nodes.
737

738
    """
739
    mn = self.cfg.GetMasterNode()
740
    return ([mn], [mn])
741

    
742
  def _CheckVgName(self, node_uuids, enabled_disk_templates,
743
                   new_enabled_disk_templates):
744
    """Check the consistency of the vg name on all nodes and in case it gets
745
       unset whether there are instances still using it.
746

747
    """
748
    lvm_is_enabled = utils.IsLvmEnabled(enabled_disk_templates)
749
    lvm_gets_enabled = utils.LvmGetsEnabled(enabled_disk_templates,
750
                                            new_enabled_disk_templates)
751
    current_vg_name = self.cfg.GetVGName()
752

    
753
    if self.op.vg_name == '':
754
      if lvm_is_enabled:
755
        raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
756
                                   " disk templates are or get enabled.")
757

    
758
    if self.op.vg_name is None:
759
      if current_vg_name is None and lvm_is_enabled:
760
        raise errors.OpPrereqError("Please specify a volume group when"
761
                                   " enabling lvm-based disk-templates.")
762

    
763
    if self.op.vg_name is not None and not self.op.vg_name:
764
      if self.cfg.HasAnyDiskOfType(constants.DT_PLAIN):
765
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
766
                                   " instances exist", errors.ECODE_INVAL)
767

    
768
    if (self.op.vg_name is not None and lvm_is_enabled) or \
769
        (self.cfg.GetVGName() is not None and lvm_gets_enabled):
770
      self._CheckVgNameOnNodes(node_uuids)
771

    
772
  def _CheckVgNameOnNodes(self, node_uuids):
773
    """Check the status of the volume group on each node.
774

775
    """
776
    vglist = self.rpc.call_vg_list(node_uuids)
777
    for node_uuid in node_uuids:
778
      msg = vglist[node_uuid].fail_msg
779
      if msg:
780
        # ignoring down node
781
        self.LogWarning("Error while gathering data on node %s"
782
                        " (ignoring node): %s",
783
                        self.cfg.GetNodeName(node_uuid), msg)
784
        continue
785
      vgstatus = utils.CheckVolumeGroupSize(vglist[node_uuid].payload,
786
                                            self.op.vg_name,
787
                                            constants.MIN_VG_SIZE)
788
      if vgstatus:
789
        raise errors.OpPrereqError("Error on node '%s': %s" %
790
                                   (self.cfg.GetNodeName(node_uuid), vgstatus),
791
                                   errors.ECODE_ENVIRON)
792

    
793
  @staticmethod
794
  def _GetEnabledDiskTemplatesInner(op_enabled_disk_templates,
795
                                    old_enabled_disk_templates):
796
    """Determines the enabled disk templates and the subset of disk templates
797
       that are newly enabled by this operation.
798

799
    """
800
    enabled_disk_templates = None
801
    new_enabled_disk_templates = []
802
    if op_enabled_disk_templates:
803
      enabled_disk_templates = op_enabled_disk_templates
804
      new_enabled_disk_templates = \
805
        list(set(enabled_disk_templates)
806
             - set(old_enabled_disk_templates))
807
    else:
808
      enabled_disk_templates = old_enabled_disk_templates
809
    return (enabled_disk_templates, new_enabled_disk_templates)
810

    
811
  def _GetEnabledDiskTemplates(self, cluster):
812
    """Determines the enabled disk templates and the subset of disk templates
813
       that are newly enabled by this operation.
814

815
    """
816
    return self._GetEnabledDiskTemplatesInner(self.op.enabled_disk_templates,
817
                                              cluster.enabled_disk_templates)
818

    
819
  def _CheckIpolicy(self, cluster, enabled_disk_templates):
820
    """Checks the ipolicy.
821

822
    @type cluster: C{objects.Cluster}
823
    @param cluster: the cluster's configuration
824
    @type enabled_disk_templates: list of string
825
    @param enabled_disk_templates: list of (possibly newly) enabled disk
826
      templates
827

828
    """
829
    # FIXME: write unit tests for this
830
    if self.op.ipolicy:
831
      self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
832
                                           group_policy=False)
833

    
834
      CheckIpolicyVsDiskTemplates(self.new_ipolicy,
835
                                  enabled_disk_templates)
836

    
837
      all_instances = self.cfg.GetAllInstancesInfo().values()
838
      violations = set()
839
      for group in self.cfg.GetAllNodeGroupsInfo().values():
840
        instances = frozenset([inst for inst in all_instances
841
                               if compat.any(nuuid in group.members
842
                                             for nuuid in inst.all_nodes)])
843
        new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
844
        ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
845
        new = ComputeNewInstanceViolations(ipol, new_ipolicy, instances,
846
                                           self.cfg)
847
        if new:
848
          violations.update(new)
849

    
850
      if violations:
851
        self.LogWarning("After the ipolicy change the following instances"
852
                        " violate them: %s",
853
                        utils.CommaJoin(utils.NiceSort(violations)))
854
    else:
855
      CheckIpolicyVsDiskTemplates(cluster.ipolicy,
856
                                  enabled_disk_templates)
857

    
858
  def _CheckDrbdHelperOnNodes(self, drbd_helper, node_uuids):
859
    """Checks whether the set DRBD helper actually exists on the nodes.
860

861
    @type drbd_helper: string
862
    @param drbd_helper: path of the drbd usermode helper binary
863
    @type node_uuids: list of strings
864
    @param node_uuids: list of node UUIDs to check for the helper
865

866
    """
867
    # checks given drbd helper on all nodes
868
    helpers = self.rpc.call_drbd_helper(node_uuids)
869
    for (_, ninfo) in self.cfg.GetMultiNodeInfo(node_uuids):
870
      if ninfo.offline:
871
        self.LogInfo("Not checking drbd helper on offline node %s",
872
                     ninfo.name)
873
        continue
874
      msg = helpers[ninfo.uuid].fail_msg
875
      if msg:
876
        raise errors.OpPrereqError("Error checking drbd helper on node"
877
                                   " '%s': %s" % (ninfo.name, msg),
878
                                   errors.ECODE_ENVIRON)
879
      node_helper = helpers[ninfo.uuid].payload
880
      if node_helper != drbd_helper:
881
        raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
882
                                   (ninfo.name, node_helper),
883
                                   errors.ECODE_ENVIRON)
884

    
885
  def _CheckDrbdHelper(self, node_uuids, drbd_enabled, drbd_gets_enabled):
886
    """Check the DRBD usermode helper.
887

888
    @type node_uuids: list of strings
889
    @param node_uuids: a list of nodes' UUIDs
890
    @type drbd_enabled: boolean
891
    @param drbd_enabled: whether DRBD will be enabled after this operation
892
      (no matter if it was disabled before or not)
893
    @type drbd_gets_enabled: boolen
894
    @param drbd_gets_enabled: true if DRBD was disabled before this
895
      operation, but will be enabled afterwards
896

897
    """
898
    if self.op.drbd_helper == '':
899
      if drbd_enabled:
900
        raise errors.OpPrereqError("Cannot disable drbd helper while"
901
                                   " DRBD is enabled.")
902
      if self.cfg.HasAnyDiskOfType(constants.DT_DRBD8):
903
        raise errors.OpPrereqError("Cannot disable drbd helper while"
904
                                   " drbd-based instances exist",
905
                                   errors.ECODE_INVAL)
906

    
907
    else:
908
      if self.op.drbd_helper is not None and drbd_enabled:
909
        self._CheckDrbdHelperOnNodes(self.op.drbd_helper, node_uuids)
910
      else:
911
        if drbd_gets_enabled:
912
          current_drbd_helper = self.cfg.GetClusterInfo().drbd_usermode_helper
913
          if current_drbd_helper is not None:
914
            self._CheckDrbdHelperOnNodes(current_drbd_helper, node_uuids)
915
          else:
916
            raise errors.OpPrereqError("Cannot enable DRBD without a"
917
                                       " DRBD usermode helper set.")
918

    
919
  def CheckPrereq(self):
920
    """Check prerequisites.
921

922
    This checks whether the given params don't conflict and
923
    if the given volume group is valid.
924

925
    """
926
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
927
    self.cluster = cluster = self.cfg.GetClusterInfo()
928

    
929
    vm_capable_node_uuids = [node.uuid
930
                             for node in self.cfg.GetAllNodesInfo().values()
931
                             if node.uuid in node_uuids and node.vm_capable]
932

    
933
    (enabled_disk_templates, new_enabled_disk_templates) = \
934
      self._GetEnabledDiskTemplates(cluster)
935

    
936
    self._CheckVgName(vm_capable_node_uuids, enabled_disk_templates,
937
                      new_enabled_disk_templates)
938

    
939
    if self.op.file_storage_dir is not None:
940
      CheckFileStoragePathVsEnabledDiskTemplates(
941
          self.LogWarning, self.op.file_storage_dir, enabled_disk_templates)
942

    
943
    if self.op.shared_file_storage_dir is not None:
944
      CheckSharedFileStoragePathVsEnabledDiskTemplates(
945
          self.LogWarning, self.op.shared_file_storage_dir,
946
          enabled_disk_templates)
947

    
948
    drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
949
    drbd_gets_enabled = constants.DT_DRBD8 in new_enabled_disk_templates
950
    self._CheckDrbdHelper(node_uuids, drbd_enabled, drbd_gets_enabled)
951

    
952
    # validate params changes
953
    if self.op.beparams:
954
      objects.UpgradeBeParams(self.op.beparams)
955
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
956
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
957

    
958
    if self.op.ndparams:
959
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
960
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
961

    
962
      # TODO: we need a more general way to handle resetting
963
      # cluster-level parameters to default values
964
      if self.new_ndparams["oob_program"] == "":
965
        self.new_ndparams["oob_program"] = \
966
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
967

    
968
    if self.op.hv_state:
969
      new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
970
                                           self.cluster.hv_state_static)
971
      self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
972
                               for hv, values in new_hv_state.items())
973

    
974
    if self.op.disk_state:
975
      new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
976
                                               self.cluster.disk_state_static)
977
      self.new_disk_state = \
978
        dict((storage, dict((name, cluster.SimpleFillDiskState(values))
979
                            for name, values in svalues.items()))
980
             for storage, svalues in new_disk_state.items())
981

    
982
    self._CheckIpolicy(cluster, enabled_disk_templates)
983

    
984
    if self.op.nicparams:
985
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
986
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
987
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
988
      nic_errors = []
989

    
990
      # check all instances for consistency
991
      for instance in self.cfg.GetAllInstancesInfo().values():
992
        for nic_idx, nic in enumerate(instance.nics):
993
          params_copy = copy.deepcopy(nic.nicparams)
994
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
995

    
996
          # check parameter syntax
997
          try:
998
            objects.NIC.CheckParameterSyntax(params_filled)
999
          except errors.ConfigurationError, err:
1000
            nic_errors.append("Instance %s, nic/%d: %s" %
1001
                              (instance.name, nic_idx, err))
1002

    
1003
          # if we're moving instances to routed, check that they have an ip
1004
          target_mode = params_filled[constants.NIC_MODE]
1005
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
1006
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
1007
                              " address" % (instance.name, nic_idx))
1008
      if nic_errors:
1009
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
1010
                                   "\n".join(nic_errors), errors.ECODE_INVAL)
1011

    
1012
    # hypervisor list/parameters
1013
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
1014
    if self.op.hvparams:
1015
      for hv_name, hv_dict in self.op.hvparams.items():
1016
        if hv_name not in self.new_hvparams:
1017
          self.new_hvparams[hv_name] = hv_dict
1018
        else:
1019
          self.new_hvparams[hv_name].update(hv_dict)
1020

    
1021
    # disk template parameters
1022
    self.new_diskparams = objects.FillDict(cluster.diskparams, {})
1023
    if self.op.diskparams:
1024
      for dt_name, dt_params in self.op.diskparams.items():
1025
        if dt_name not in self.new_diskparams:
1026
          self.new_diskparams[dt_name] = dt_params
1027
        else:
1028
          self.new_diskparams[dt_name].update(dt_params)
1029

    
1030
    # os hypervisor parameters
1031
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
1032
    if self.op.os_hvp:
1033
      for os_name, hvs in self.op.os_hvp.items():
1034
        if os_name not in self.new_os_hvp:
1035
          self.new_os_hvp[os_name] = hvs
1036
        else:
1037
          for hv_name, hv_dict in hvs.items():
1038
            if hv_dict is None:
1039
              # Delete if it exists
1040
              self.new_os_hvp[os_name].pop(hv_name, None)
1041
            elif hv_name not in self.new_os_hvp[os_name]:
1042
              self.new_os_hvp[os_name][hv_name] = hv_dict
1043
            else:
1044
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
1045

    
1046
    # os parameters
1047
    self.new_osp = objects.FillDict(cluster.osparams, {})
1048
    if self.op.osparams:
1049
      for os_name, osp in self.op.osparams.items():
1050
        if os_name not in self.new_osp:
1051
          self.new_osp[os_name] = {}
1052

    
1053
        self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
1054
                                                 use_none=True)
1055

    
1056
        if not self.new_osp[os_name]:
1057
          # we removed all parameters
1058
          del self.new_osp[os_name]
1059
        else:
1060
          # check the parameter validity (remote check)
1061
          CheckOSParams(self, False, [self.cfg.GetMasterNode()],
1062
                        os_name, self.new_osp[os_name])
1063

    
1064
    # changes to the hypervisor list
1065
    if self.op.enabled_hypervisors is not None:
1066
      self.hv_list = self.op.enabled_hypervisors
1067
      for hv in self.hv_list:
1068
        # if the hypervisor doesn't already exist in the cluster
1069
        # hvparams, we initialize it to empty, and then (in both
1070
        # cases) we make sure to fill the defaults, as we might not
1071
        # have a complete defaults list if the hypervisor wasn't
1072
        # enabled before
1073
        if hv not in new_hvp:
1074
          new_hvp[hv] = {}
1075
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
1076
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
1077
    else:
1078
      self.hv_list = cluster.enabled_hypervisors
1079

    
1080
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1081
      # either the enabled list has changed, or the parameters have, validate
1082
      for hv_name, hv_params in self.new_hvparams.items():
1083
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1084
            (self.op.enabled_hypervisors and
1085
             hv_name in self.op.enabled_hypervisors)):
1086
          # either this is a new hypervisor, or its parameters have changed
1087
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1088
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1089
          hv_class.CheckParameterSyntax(hv_params)
1090
          CheckHVParams(self, node_uuids, hv_name, hv_params)
1091

    
1092
    self._CheckDiskTemplateConsistency()
1093

    
1094
    if self.op.os_hvp:
1095
      # no need to check any newly-enabled hypervisors, since the
1096
      # defaults have already been checked in the above code-block
1097
      for os_name, os_hvp in self.new_os_hvp.items():
1098
        for hv_name, hv_params in os_hvp.items():
1099
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1100
          # we need to fill in the new os_hvp on top of the actual hv_p
1101
          cluster_defaults = self.new_hvparams.get(hv_name, {})
1102
          new_osp = objects.FillDict(cluster_defaults, hv_params)
1103
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1104
          hv_class.CheckParameterSyntax(new_osp)
1105
          CheckHVParams(self, node_uuids, hv_name, new_osp)
1106

    
1107
    if self.op.default_iallocator:
1108
      alloc_script = utils.FindFile(self.op.default_iallocator,
1109
                                    constants.IALLOCATOR_SEARCH_PATH,
1110
                                    os.path.isfile)
1111
      if alloc_script is None:
1112
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
1113
                                   " specified" % self.op.default_iallocator,
1114
                                   errors.ECODE_INVAL)
1115

    
1116
  def _CheckDiskTemplateConsistency(self):
1117
    """Check whether the disk templates that are going to be disabled
1118
       are still in use by some instances.
1119

1120
    """
1121
    if self.op.enabled_disk_templates:
1122
      cluster = self.cfg.GetClusterInfo()
1123
      instances = self.cfg.GetAllInstancesInfo()
1124

    
1125
      disk_templates_to_remove = set(cluster.enabled_disk_templates) \
1126
        - set(self.op.enabled_disk_templates)
1127
      for instance in instances.itervalues():
1128
        if instance.disk_template in disk_templates_to_remove:
1129
          raise errors.OpPrereqError("Cannot disable disk template '%s',"
1130
                                     " because instance '%s' is using it." %
1131
                                     (instance.disk_template, instance.name))
1132

    
1133
  def _SetVgName(self, feedback_fn):
1134
    """Determines and sets the new volume group name.
1135

1136
    """
1137
    if self.op.vg_name is not None:
1138
      new_volume = self.op.vg_name
1139
      if not new_volume:
1140
        new_volume = None
1141
      if new_volume != self.cfg.GetVGName():
1142
        self.cfg.SetVGName(new_volume)
1143
      else:
1144
        feedback_fn("Cluster LVM configuration already in desired"
1145
                    " state, not changing")
1146

    
1147
  def _SetFileStorageDir(self, feedback_fn):
1148
    """Set the file storage directory.
1149

1150
    """
1151
    if self.op.file_storage_dir is not None:
1152
      if self.cluster.file_storage_dir == self.op.file_storage_dir:
1153
        feedback_fn("Global file storage dir already set to value '%s'"
1154
                    % self.cluster.file_storage_dir)
1155
      else:
1156
        self.cluster.file_storage_dir = self.op.file_storage_dir
1157

    
1158
  def _SetDrbdHelper(self, feedback_fn):
1159
    """Set the DRBD usermode helper.
1160

1161
    """
1162
    if self.op.drbd_helper is not None:
1163
      if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
1164
        feedback_fn("Note that you specified a drbd user helper, but did not"
1165
                    " enable the drbd disk template.")
1166
      new_helper = self.op.drbd_helper
1167
      if not new_helper:
1168
        new_helper = None
1169
      if new_helper != self.cfg.GetDRBDHelper():
1170
        self.cfg.SetDRBDHelper(new_helper)
1171
      else:
1172
        feedback_fn("Cluster DRBD helper already in desired state,"
1173
                    " not changing")
1174

    
1175
  def Exec(self, feedback_fn):
1176
    """Change the parameters of the cluster.
1177

1178
    """
1179
    if self.op.enabled_disk_templates:
1180
      self.cluster.enabled_disk_templates = \
1181
        list(set(self.op.enabled_disk_templates))
1182

    
1183
    self._SetVgName(feedback_fn)
1184
    self._SetFileStorageDir(feedback_fn)
1185
    self._SetDrbdHelper(feedback_fn)
1186

    
1187
    if self.op.hvparams:
1188
      self.cluster.hvparams = self.new_hvparams
1189
    if self.op.os_hvp:
1190
      self.cluster.os_hvp = self.new_os_hvp
1191
    if self.op.enabled_hypervisors is not None:
1192
      self.cluster.hvparams = self.new_hvparams
1193
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1194
    if self.op.beparams:
1195
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1196
    if self.op.nicparams:
1197
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1198
    if self.op.ipolicy:
1199
      self.cluster.ipolicy = self.new_ipolicy
1200
    if self.op.osparams:
1201
      self.cluster.osparams = self.new_osp
1202
    if self.op.ndparams:
1203
      self.cluster.ndparams = self.new_ndparams
1204
    if self.op.diskparams:
1205
      self.cluster.diskparams = self.new_diskparams
1206
    if self.op.hv_state:
1207
      self.cluster.hv_state_static = self.new_hv_state
1208
    if self.op.disk_state:
1209
      self.cluster.disk_state_static = self.new_disk_state
1210

    
1211
    if self.op.candidate_pool_size is not None:
1212
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1213
      # we need to update the pool size here, otherwise the save will fail
1214
      AdjustCandidatePool(self, [])
1215

    
1216
    if self.op.maintain_node_health is not None:
1217
      if self.op.maintain_node_health and not constants.ENABLE_CONFD:
1218
        feedback_fn("Note: CONFD was disabled at build time, node health"
1219
                    " maintenance is not useful (still enabling it)")
1220
      self.cluster.maintain_node_health = self.op.maintain_node_health
1221

    
1222
    if self.op.modify_etc_hosts is not None:
1223
      self.cluster.modify_etc_hosts = self.op.modify_etc_hosts
1224

    
1225
    if self.op.prealloc_wipe_disks is not None:
1226
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
1227

    
1228
    if self.op.add_uids is not None:
1229
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
1230

    
1231
    if self.op.remove_uids is not None:
1232
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
1233

    
1234
    if self.op.uid_pool is not None:
1235
      self.cluster.uid_pool = self.op.uid_pool
1236

    
1237
    if self.op.default_iallocator is not None:
1238
      self.cluster.default_iallocator = self.op.default_iallocator
1239

    
1240
    if self.op.reserved_lvs is not None:
1241
      self.cluster.reserved_lvs = self.op.reserved_lvs
1242

    
1243
    if self.op.use_external_mip_script is not None:
1244
      self.cluster.use_external_mip_script = self.op.use_external_mip_script
1245

    
1246
    def helper_os(aname, mods, desc):
1247
      desc += " OS list"
1248
      lst = getattr(self.cluster, aname)
1249
      for key, val in mods:
1250
        if key == constants.DDM_ADD:
1251
          if val in lst:
1252
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1253
          else:
1254
            lst.append(val)
1255
        elif key == constants.DDM_REMOVE:
1256
          if val in lst:
1257
            lst.remove(val)
1258
          else:
1259
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1260
        else:
1261
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
1262

    
1263
    if self.op.hidden_os:
1264
      helper_os("hidden_os", self.op.hidden_os, "hidden")
1265

    
1266
    if self.op.blacklisted_os:
1267
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1268

    
1269
    if self.op.master_netdev:
1270
      master_params = self.cfg.GetMasterNetworkParameters()
1271
      ems = self.cfg.GetUseExternalMipScript()
1272
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
1273
                  self.cluster.master_netdev)
1274
      result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
1275
                                                       master_params, ems)
1276
      if not self.op.force:
1277
        result.Raise("Could not disable the master ip")
1278
      else:
1279
        if result.fail_msg:
1280
          msg = ("Could not disable the master ip (continuing anyway): %s" %
1281
                 result.fail_msg)
1282
          feedback_fn(msg)
1283
      feedback_fn("Changing master_netdev from %s to %s" %
1284
                  (master_params.netdev, self.op.master_netdev))
1285
      self.cluster.master_netdev = self.op.master_netdev
1286

    
1287
    if self.op.master_netmask:
1288
      master_params = self.cfg.GetMasterNetworkParameters()
1289
      feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1290
      result = self.rpc.call_node_change_master_netmask(
1291
                 master_params.uuid, master_params.netmask,
1292
                 self.op.master_netmask, master_params.ip,
1293
                 master_params.netdev)
1294
      result.Warn("Could not change the master IP netmask", feedback_fn)
1295
      self.cluster.master_netmask = self.op.master_netmask
1296

    
1297
    self.cfg.Update(self.cluster, feedback_fn)
1298

    
1299
    if self.op.master_netdev:
1300
      master_params = self.cfg.GetMasterNetworkParameters()
1301
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
1302
                  self.op.master_netdev)
1303
      ems = self.cfg.GetUseExternalMipScript()
1304
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
1305
                                                     master_params, ems)
1306
      result.Warn("Could not re-enable the master ip on the master,"
1307
                  " please restart manually", self.LogWarning)
1308

    
1309

    
1310
class LUClusterVerify(NoHooksLU):
1311
  """Submits all jobs necessary to verify the cluster.
1312

1313
  """
1314
  REQ_BGL = False
1315

    
1316
  def ExpandNames(self):
1317
    self.needed_locks = {}
1318

    
1319
  def Exec(self, feedback_fn):
1320
    jobs = []
1321

    
1322
    if self.op.group_name:
1323
      groups = [self.op.group_name]
1324
      depends_fn = lambda: None
1325
    else:
1326
      groups = self.cfg.GetNodeGroupList()
1327

    
1328
      # Verify global configuration
1329
      jobs.append([
1330
        opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1331
        ])
1332

    
1333
      # Always depend on global verification
1334
      depends_fn = lambda: [(-len(jobs), [])]
1335

    
1336
    jobs.extend(
1337
      [opcodes.OpClusterVerifyGroup(group_name=group,
1338
                                    ignore_errors=self.op.ignore_errors,
1339
                                    depends=depends_fn())]
1340
      for group in groups)
1341

    
1342
    # Fix up all parameters
1343
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1344
      op.debug_simulate_errors = self.op.debug_simulate_errors
1345
      op.verbose = self.op.verbose
1346
      op.error_codes = self.op.error_codes
1347
      try:
1348
        op.skip_checks = self.op.skip_checks
1349
      except AttributeError:
1350
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1351

    
1352
    return ResultWithJobs(jobs)
1353

    
1354

    
1355
class _VerifyErrors(object):
1356
  """Mix-in for cluster/group verify LUs.
1357

1358
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1359
  self.op and self._feedback_fn to be available.)
1360

1361
  """
1362

    
1363
  ETYPE_FIELD = "code"
1364
  ETYPE_ERROR = "ERROR"
1365
  ETYPE_WARNING = "WARNING"
1366

    
1367
  def _Error(self, ecode, item, msg, *args, **kwargs):
1368
    """Format an error message.
1369

1370
    Based on the opcode's error_codes parameter, either format a
1371
    parseable error code, or a simpler error string.
1372

1373
    This must be called only from Exec and functions called from Exec.
1374

1375
    """
1376
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1377
    itype, etxt, _ = ecode
1378
    # If the error code is in the list of ignored errors, demote the error to a
1379
    # warning
1380
    if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1381
      ltype = self.ETYPE_WARNING
1382
    # first complete the msg
1383
    if args:
1384
      msg = msg % args
1385
    # then format the whole message
1386
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1387
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1388
    else:
1389
      if item:
1390
        item = " " + item
1391
      else:
1392
        item = ""
1393
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1394
    # and finally report it via the feedback_fn
1395
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1396
    # do not mark the operation as failed for WARN cases only
1397
    if ltype == self.ETYPE_ERROR:
1398
      self.bad = True
1399

    
1400
  def _ErrorIf(self, cond, *args, **kwargs):
1401
    """Log an error message if the passed condition is True.
1402

1403
    """
1404
    if (bool(cond)
1405
        or self.op.debug_simulate_errors): # pylint: disable=E1101
1406
      self._Error(*args, **kwargs)
1407

    
1408

    
1409
def _VerifyCertificate(filename):
1410
  """Verifies a certificate for L{LUClusterVerifyConfig}.
1411

1412
  @type filename: string
1413
  @param filename: Path to PEM file
1414

1415
  """
1416
  try:
1417
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1418
                                           utils.ReadFile(filename))
1419
  except Exception, err: # pylint: disable=W0703
1420
    return (LUClusterVerifyConfig.ETYPE_ERROR,
1421
            "Failed to load X509 certificate %s: %s" % (filename, err))
1422

    
1423
  (errcode, msg) = \
1424
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1425
                                constants.SSL_CERT_EXPIRATION_ERROR)
1426

    
1427
  if msg:
1428
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1429
  else:
1430
    fnamemsg = None
1431

    
1432
  if errcode is None:
1433
    return (None, fnamemsg)
1434
  elif errcode == utils.CERT_WARNING:
1435
    return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1436
  elif errcode == utils.CERT_ERROR:
1437
    return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1438

    
1439
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1440

    
1441

    
1442
def _GetAllHypervisorParameters(cluster, instances):
1443
  """Compute the set of all hypervisor parameters.
1444

1445
  @type cluster: L{objects.Cluster}
1446
  @param cluster: the cluster object
1447
  @param instances: list of L{objects.Instance}
1448
  @param instances: additional instances from which to obtain parameters
1449
  @rtype: list of (origin, hypervisor, parameters)
1450
  @return: a list with all parameters found, indicating the hypervisor they
1451
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1452

1453
  """
1454
  hvp_data = []
1455

    
1456
  for hv_name in cluster.enabled_hypervisors:
1457
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1458

    
1459
  for os_name, os_hvp in cluster.os_hvp.items():
1460
    for hv_name, hv_params in os_hvp.items():
1461
      if hv_params:
1462
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1463
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1464

    
1465
  # TODO: collapse identical parameter values in a single one
1466
  for instance in instances:
1467
    if instance.hvparams:
1468
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1469
                       cluster.FillHV(instance)))
1470

    
1471
  return hvp_data
1472

    
1473

    
1474
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1475
  """Verifies the cluster config.
1476

1477
  """
1478
  REQ_BGL = False
1479

    
1480
  def _VerifyHVP(self, hvp_data):
1481
    """Verifies locally the syntax of the hypervisor parameters.
1482

1483
    """
1484
    for item, hv_name, hv_params in hvp_data:
1485
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1486
             (item, hv_name))
1487
      try:
1488
        hv_class = hypervisor.GetHypervisorClass(hv_name)
1489
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1490
        hv_class.CheckParameterSyntax(hv_params)
1491
      except errors.GenericError, err:
1492
        self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1493

    
1494
  def ExpandNames(self):
1495
    self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1496
    self.share_locks = ShareAll()
1497

    
1498
  def CheckPrereq(self):
1499
    """Check prerequisites.
1500

1501
    """
1502
    # Retrieve all information
1503
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1504
    self.all_node_info = self.cfg.GetAllNodesInfo()
1505
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1506

    
1507
  def Exec(self, feedback_fn):
1508
    """Verify integrity of cluster, performing various test on nodes.
1509

1510
    """
1511
    self.bad = False
1512
    self._feedback_fn = feedback_fn
1513

    
1514
    feedback_fn("* Verifying cluster config")
1515

    
1516
    for msg in self.cfg.VerifyConfig():
1517
      self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1518

    
1519
    feedback_fn("* Verifying cluster certificate files")
1520

    
1521
    for cert_filename in pathutils.ALL_CERT_FILES:
1522
      (errcode, msg) = _VerifyCertificate(cert_filename)
1523
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1524

    
1525
    self._ErrorIf(not utils.CanRead(constants.LUXID_USER,
1526
                                    pathutils.NODED_CERT_FILE),
1527
                  constants.CV_ECLUSTERCERT,
1528
                  None,
1529
                  pathutils.NODED_CERT_FILE + " must be accessible by the " +
1530
                    constants.LUXID_USER + " user")
1531

    
1532
    feedback_fn("* Verifying hypervisor parameters")
1533

    
1534
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1535
                                                self.all_inst_info.values()))
1536

    
1537
    feedback_fn("* Verifying all nodes belong to an existing group")
1538

    
1539
    # We do this verification here because, should this bogus circumstance
1540
    # occur, it would never be caught by VerifyGroup, which only acts on
1541
    # nodes/instances reachable from existing node groups.
1542

    
1543
    dangling_nodes = set(node for node in self.all_node_info.values()
1544
                         if node.group not in self.all_group_info)
1545

    
1546
    dangling_instances = {}
1547
    no_node_instances = []
1548

    
1549
    for inst in self.all_inst_info.values():
1550
      if inst.primary_node in [node.uuid for node in dangling_nodes]:
1551
        dangling_instances.setdefault(inst.primary_node, []).append(inst)
1552
      elif inst.primary_node not in self.all_node_info:
1553
        no_node_instances.append(inst)
1554

    
1555
    pretty_dangling = [
1556
        "%s (%s)" %
1557
        (node.name,
1558
         utils.CommaJoin(inst.name for
1559
                         inst in dangling_instances.get(node.uuid, [])))
1560
        for node in dangling_nodes]
1561

    
1562
    self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1563
                  None,
1564
                  "the following nodes (and their instances) belong to a non"
1565
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1566

    
1567
    self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1568
                  None,
1569
                  "the following instances have a non-existing primary-node:"
1570
                  " %s", utils.CommaJoin(inst.name for
1571
                                         inst in no_node_instances))
1572

    
1573
    return not self.bad
1574

    
1575

    
1576
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1577
  """Verifies the status of a node group.
1578

1579
  """
1580
  HPATH = "cluster-verify"
1581
  HTYPE = constants.HTYPE_CLUSTER
1582
  REQ_BGL = False
1583

    
1584
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1585

    
1586
  class NodeImage(object):
1587
    """A class representing the logical and physical status of a node.
1588

1589
    @type uuid: string
1590
    @ivar uuid: the node UUID to which this object refers
1591
    @ivar volumes: a structure as returned from
1592
        L{ganeti.backend.GetVolumeList} (runtime)
1593
    @ivar instances: a list of running instances (runtime)
1594
    @ivar pinst: list of configured primary instances (config)
1595
    @ivar sinst: list of configured secondary instances (config)
1596
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1597
        instances for which this node is secondary (config)
1598
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1599
    @ivar dfree: free disk, as reported by the node (runtime)
1600
    @ivar offline: the offline status (config)
1601
    @type rpc_fail: boolean
1602
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1603
        not whether the individual keys were correct) (runtime)
1604
    @type lvm_fail: boolean
1605
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1606
    @type hyp_fail: boolean
1607
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1608
    @type ghost: boolean
1609
    @ivar ghost: whether this is a known node or not (config)
1610
    @type os_fail: boolean
1611
    @ivar os_fail: whether the RPC call didn't return valid OS data
1612
    @type oslist: list
1613
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1614
    @type vm_capable: boolean
1615
    @ivar vm_capable: whether the node can host instances
1616
    @type pv_min: float
1617
    @ivar pv_min: size in MiB of the smallest PVs
1618
    @type pv_max: float
1619
    @ivar pv_max: size in MiB of the biggest PVs
1620

1621
    """
1622
    def __init__(self, offline=False, uuid=None, vm_capable=True):
1623
      self.uuid = uuid
1624
      self.volumes = {}
1625
      self.instances = []
1626
      self.pinst = []
1627
      self.sinst = []
1628
      self.sbp = {}
1629
      self.mfree = 0
1630
      self.dfree = 0
1631
      self.offline = offline
1632
      self.vm_capable = vm_capable
1633
      self.rpc_fail = False
1634
      self.lvm_fail = False
1635
      self.hyp_fail = False
1636
      self.ghost = False
1637
      self.os_fail = False
1638
      self.oslist = {}
1639
      self.pv_min = None
1640
      self.pv_max = None
1641

    
1642
  def ExpandNames(self):
1643
    # This raises errors.OpPrereqError on its own:
1644
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1645

    
1646
    # Get instances in node group; this is unsafe and needs verification later
1647
    inst_uuids = \
1648
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1649

    
1650
    self.needed_locks = {
1651
      locking.LEVEL_INSTANCE: self.cfg.GetInstanceNames(inst_uuids),
1652
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1653
      locking.LEVEL_NODE: [],
1654

    
1655
      # This opcode is run by watcher every five minutes and acquires all nodes
1656
      # for a group. It doesn't run for a long time, so it's better to acquire
1657
      # the node allocation lock as well.
1658
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1659
      }
1660

    
1661
    self.share_locks = ShareAll()
1662

    
1663
  def DeclareLocks(self, level):
1664
    if level == locking.LEVEL_NODE:
1665
      # Get members of node group; this is unsafe and needs verification later
1666
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1667

    
1668
      # In Exec(), we warn about mirrored instances that have primary and
1669
      # secondary living in separate node groups. To fully verify that
1670
      # volumes for these instances are healthy, we will need to do an
1671
      # extra call to their secondaries. We ensure here those nodes will
1672
      # be locked.
1673
      for inst_name in self.owned_locks(locking.LEVEL_INSTANCE):
1674
        # Important: access only the instances whose lock is owned
1675
        instance = self.cfg.GetInstanceInfoByName(inst_name)
1676
        if instance.disk_template in constants.DTS_INT_MIRROR:
1677
          nodes.update(instance.secondary_nodes)
1678

    
1679
      self.needed_locks[locking.LEVEL_NODE] = nodes
1680

    
1681
  def CheckPrereq(self):
1682
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1683
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1684

    
1685
    group_node_uuids = set(self.group_info.members)
1686
    group_inst_uuids = \
1687
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1688

    
1689
    unlocked_node_uuids = \
1690
        group_node_uuids.difference(self.owned_locks(locking.LEVEL_NODE))
1691

    
1692
    unlocked_inst_uuids = \
1693
        group_inst_uuids.difference(
1694
          [self.cfg.GetInstanceInfoByName(name).uuid
1695
           for name in self.owned_locks(locking.LEVEL_INSTANCE)])
1696

    
1697
    if unlocked_node_uuids:
1698
      raise errors.OpPrereqError(
1699
        "Missing lock for nodes: %s" %
1700
        utils.CommaJoin(self.cfg.GetNodeNames(unlocked_node_uuids)),
1701
        errors.ECODE_STATE)
1702

    
1703
    if unlocked_inst_uuids:
1704
      raise errors.OpPrereqError(
1705
        "Missing lock for instances: %s" %
1706
        utils.CommaJoin(self.cfg.GetInstanceNames(unlocked_inst_uuids)),
1707
        errors.ECODE_STATE)
1708

    
1709
    self.all_node_info = self.cfg.GetAllNodesInfo()
1710
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1711

    
1712
    self.my_node_uuids = group_node_uuids
1713
    self.my_node_info = dict((node_uuid, self.all_node_info[node_uuid])
1714
                             for node_uuid in group_node_uuids)
1715

    
1716
    self.my_inst_uuids = group_inst_uuids
1717
    self.my_inst_info = dict((inst_uuid, self.all_inst_info[inst_uuid])
1718
                             for inst_uuid in group_inst_uuids)
1719

    
1720
    # We detect here the nodes that will need the extra RPC calls for verifying
1721
    # split LV volumes; they should be locked.
1722
    extra_lv_nodes = set()
1723

    
1724
    for inst in self.my_inst_info.values():
1725
      if inst.disk_template in constants.DTS_INT_MIRROR:
1726
        for nuuid in inst.all_nodes:
1727
          if self.all_node_info[nuuid].group != self.group_uuid:
1728
            extra_lv_nodes.add(nuuid)
1729

    
1730
    unlocked_lv_nodes = \
1731
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1732

    
1733
    if unlocked_lv_nodes:
1734
      raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1735
                                 utils.CommaJoin(unlocked_lv_nodes),
1736
                                 errors.ECODE_STATE)
1737
    self.extra_lv_nodes = list(extra_lv_nodes)
1738

    
1739
  def _VerifyNode(self, ninfo, nresult):
1740
    """Perform some basic validation on data returned from a node.
1741

1742
      - check the result data structure is well formed and has all the
1743
        mandatory fields
1744
      - check ganeti version
1745

1746
    @type ninfo: L{objects.Node}
1747
    @param ninfo: the node to check
1748
    @param nresult: the results from the node
1749
    @rtype: boolean
1750
    @return: whether overall this call was successful (and we can expect
1751
         reasonable values in the respose)
1752

1753
    """
1754
    # main result, nresult should be a non-empty dict
1755
    test = not nresult or not isinstance(nresult, dict)
1756
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1757
                  "unable to verify node: no data returned")
1758
    if test:
1759
      return False
1760

    
1761
    # compares ganeti version
1762
    local_version = constants.PROTOCOL_VERSION
1763
    remote_version = nresult.get("version", None)
1764
    test = not (remote_version and
1765
                isinstance(remote_version, (list, tuple)) and
1766
                len(remote_version) == 2)
1767
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1768
                  "connection to node returned invalid data")
1769
    if test:
1770
      return False
1771

    
1772
    test = local_version != remote_version[0]
1773
    self._ErrorIf(test, constants.CV_ENODEVERSION, ninfo.name,
1774
                  "incompatible protocol versions: master %s,"
1775
                  " node %s", local_version, remote_version[0])
1776
    if test:
1777
      return False
1778

    
1779
    # node seems compatible, we can actually try to look into its results
1780

    
1781
    # full package version
1782
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1783
                  constants.CV_ENODEVERSION, ninfo.name,
1784
                  "software version mismatch: master %s, node %s",
1785
                  constants.RELEASE_VERSION, remote_version[1],
1786
                  code=self.ETYPE_WARNING)
1787

    
1788
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1789
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1790
      for hv_name, hv_result in hyp_result.iteritems():
1791
        test = hv_result is not None
1792
        self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
1793
                      "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1794

    
1795
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1796
    if ninfo.vm_capable and isinstance(hvp_result, list):
1797
      for item, hv_name, hv_result in hvp_result:
1798
        self._ErrorIf(True, constants.CV_ENODEHV, ninfo.name,
1799
                      "hypervisor %s parameter verify failure (source %s): %s",
1800
                      hv_name, item, hv_result)
1801

    
1802
    test = nresult.get(constants.NV_NODESETUP,
1803
                       ["Missing NODESETUP results"])
1804
    self._ErrorIf(test, constants.CV_ENODESETUP, ninfo.name,
1805
                  "node setup error: %s", "; ".join(test))
1806

    
1807
    return True
1808

    
1809
  def _VerifyNodeTime(self, ninfo, nresult,
1810
                      nvinfo_starttime, nvinfo_endtime):
1811
    """Check the node time.
1812

1813
    @type ninfo: L{objects.Node}
1814
    @param ninfo: the node to check
1815
    @param nresult: the remote results for the node
1816
    @param nvinfo_starttime: the start time of the RPC call
1817
    @param nvinfo_endtime: the end time of the RPC call
1818

1819
    """
1820
    ntime = nresult.get(constants.NV_TIME, None)
1821
    try:
1822
      ntime_merged = utils.MergeTime(ntime)
1823
    except (ValueError, TypeError):
1824
      self._ErrorIf(True, constants.CV_ENODETIME, ninfo.name,
1825
                    "Node returned invalid time")
1826
      return
1827

    
1828
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1829
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1830
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1831
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1832
    else:
1833
      ntime_diff = None
1834

    
1835
    self._ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, ninfo.name,
1836
                  "Node time diverges by at least %s from master node time",
1837
                  ntime_diff)
1838

    
1839
  def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1840
    """Check the node LVM results and update info for cross-node checks.
1841

1842
    @type ninfo: L{objects.Node}
1843
    @param ninfo: the node to check
1844
    @param nresult: the remote results for the node
1845
    @param vg_name: the configured VG name
1846
    @type nimg: L{NodeImage}
1847
    @param nimg: node image
1848

1849
    """
1850
    if vg_name is None:
1851
      return
1852

    
1853
    # checks vg existence and size > 20G
1854
    vglist = nresult.get(constants.NV_VGLIST, None)
1855
    test = not vglist
1856
    self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
1857
                  "unable to check volume groups")
1858
    if not test:
1859
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1860
                                            constants.MIN_VG_SIZE)
1861
      self._ErrorIf(vgstatus, constants.CV_ENODELVM, ninfo.name, vgstatus)
1862

    
1863
    # Check PVs
1864
    (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1865
    for em in errmsgs:
1866
      self._Error(constants.CV_ENODELVM, ninfo.name, em)
1867
    if pvminmax is not None:
1868
      (nimg.pv_min, nimg.pv_max) = pvminmax
1869

    
1870
  def _VerifyGroupDRBDVersion(self, node_verify_infos):
1871
    """Check cross-node DRBD version consistency.
1872

1873
    @type node_verify_infos: dict
1874
    @param node_verify_infos: infos about nodes as returned from the
1875
      node_verify call.
1876

1877
    """
1878
    node_versions = {}
1879
    for node_uuid, ndata in node_verify_infos.items():
1880
      nresult = ndata.payload
1881
      version = nresult.get(constants.NV_DRBDVERSION, "Missing DRBD version")
1882
      node_versions[node_uuid] = version
1883

    
1884
    if len(set(node_versions.values())) > 1:
1885
      for node_uuid, version in sorted(node_versions.items()):
1886
        msg = "DRBD version mismatch: %s" % version
1887
        self._Error(constants.CV_ENODEDRBDHELPER, node_uuid, msg,
1888
                    code=self.ETYPE_WARNING)
1889

    
1890
  def _VerifyGroupLVM(self, node_image, vg_name):
1891
    """Check cross-node consistency in LVM.
1892

1893
    @type node_image: dict
1894
    @param node_image: info about nodes, mapping from node to names to
1895
      L{NodeImage} objects
1896
    @param vg_name: the configured VG name
1897

1898
    """
1899
    if vg_name is None:
1900
      return
1901

    
1902
    # Only exclusive storage needs this kind of checks
1903
    if not self._exclusive_storage:
1904
      return
1905

    
1906
    # exclusive_storage wants all PVs to have the same size (approximately),
1907
    # if the smallest and the biggest ones are okay, everything is fine.
1908
    # pv_min is None iff pv_max is None
1909
    vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1910
    if not vals:
1911
      return
1912
    (pvmin, minnode_uuid) = min((ni.pv_min, ni.uuid) for ni in vals)
1913
    (pvmax, maxnode_uuid) = max((ni.pv_max, ni.uuid) for ni in vals)
1914
    bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1915
    self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1916
                  "PV sizes differ too much in the group; smallest (%s MB) is"
1917
                  " on %s, biggest (%s MB) is on %s",
1918
                  pvmin, self.cfg.GetNodeName(minnode_uuid),
1919
                  pvmax, self.cfg.GetNodeName(maxnode_uuid))
1920

    
1921
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1922
    """Check the node bridges.
1923

1924
    @type ninfo: L{objects.Node}
1925
    @param ninfo: the node to check
1926
    @param nresult: the remote results for the node
1927
    @param bridges: the expected list of bridges
1928

1929
    """
1930
    if not bridges:
1931
      return
1932

    
1933
    missing = nresult.get(constants.NV_BRIDGES, None)
1934
    test = not isinstance(missing, list)
1935
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1936
                  "did not return valid bridge information")
1937
    if not test:
1938
      self._ErrorIf(bool(missing), constants.CV_ENODENET, ninfo.name,
1939
                    "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1940

    
1941
  def _VerifyNodeUserScripts(self, ninfo, nresult):
1942
    """Check the results of user scripts presence and executability on the node
1943

1944
    @type ninfo: L{objects.Node}
1945
    @param ninfo: the node to check
1946
    @param nresult: the remote results for the node
1947

1948
    """
1949
    test = not constants.NV_USERSCRIPTS in nresult
1950
    self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1951
                  "did not return user scripts information")
1952

    
1953
    broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
1954
    if not test:
1955
      self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1956
                    "user scripts not present or not executable: %s" %
1957
                    utils.CommaJoin(sorted(broken_scripts)))
1958

    
1959
  def _VerifyNodeNetwork(self, ninfo, nresult):
1960
    """Check the node network connectivity results.
1961

1962
    @type ninfo: L{objects.Node}
1963
    @param ninfo: the node to check
1964
    @param nresult: the remote results for the node
1965

1966
    """
1967
    test = constants.NV_NODELIST not in nresult
1968
    self._ErrorIf(test, constants.CV_ENODESSH, ninfo.name,
1969
                  "node hasn't returned node ssh connectivity data")
1970
    if not test:
1971
      if nresult[constants.NV_NODELIST]:
1972
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1973
          self._ErrorIf(True, constants.CV_ENODESSH, ninfo.name,
1974
                        "ssh communication with node '%s': %s", a_node, a_msg)
1975

    
1976
    test = constants.NV_NODENETTEST not in nresult
1977
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1978
                  "node hasn't returned node tcp connectivity data")
1979
    if not test:
1980
      if nresult[constants.NV_NODENETTEST]:
1981
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1982
        for anode in nlist:
1983
          self._ErrorIf(True, constants.CV_ENODENET, ninfo.name,
1984
                        "tcp communication with node '%s': %s",
1985
                        anode, nresult[constants.NV_NODENETTEST][anode])
1986

    
1987
    test = constants.NV_MASTERIP not in nresult
1988
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1989
                  "node hasn't returned node master IP reachability data")
1990
    if not test:
1991
      if not nresult[constants.NV_MASTERIP]:
1992
        if ninfo.uuid == self.master_node:
1993
          msg = "the master node cannot reach the master IP (not configured?)"
1994
        else:
1995
          msg = "cannot reach the master IP"
1996
        self._ErrorIf(True, constants.CV_ENODENET, ninfo.name, msg)
1997

    
1998
  def _VerifyInstance(self, instance, node_image, diskstatus):
1999
    """Verify an instance.
2000

2001
    This function checks to see if the required block devices are
2002
    available on the instance's node, and that the nodes are in the correct
2003
    state.
2004

2005
    """
2006
    pnode_uuid = instance.primary_node
2007
    pnode_img = node_image[pnode_uuid]
2008
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
2009

    
2010
    node_vol_should = {}
2011
    instance.MapLVsByNode(node_vol_should)
2012

    
2013
    cluster = self.cfg.GetClusterInfo()
2014
    ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2015
                                                            self.group_info)
2016
    err = ComputeIPolicyInstanceViolation(ipolicy, instance, self.cfg)
2017
    self._ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance.name,
2018
                  utils.CommaJoin(err), code=self.ETYPE_WARNING)
2019

    
2020
    for node_uuid in node_vol_should:
2021
      n_img = node_image[node_uuid]
2022
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
2023
        # ignore missing volumes on offline or broken nodes
2024
        continue
2025
      for volume in node_vol_should[node_uuid]:
2026
        test = volume not in n_img.volumes
2027
        self._ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance.name,
2028
                      "volume %s missing on node %s", volume,
2029
                      self.cfg.GetNodeName(node_uuid))
2030

    
2031
    if instance.admin_state == constants.ADMINST_UP:
2032
      test = instance.uuid not in pnode_img.instances and not pnode_img.offline
2033
      self._ErrorIf(test, constants.CV_EINSTANCEDOWN, instance.name,
2034
                    "instance not running on its primary node %s",
2035
                     self.cfg.GetNodeName(pnode_uuid))
2036
      self._ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE,
2037
                    instance.name, "instance is marked as running and lives on"
2038
                    " offline node %s", self.cfg.GetNodeName(pnode_uuid))
2039

    
2040
    diskdata = [(nname, success, status, idx)
2041
                for (nname, disks) in diskstatus.items()
2042
                for idx, (success, status) in enumerate(disks)]
2043

    
2044
    for nname, success, bdev_status, idx in diskdata:
2045
      # the 'ghost node' construction in Exec() ensures that we have a
2046
      # node here
2047
      snode = node_image[nname]
2048
      bad_snode = snode.ghost or snode.offline
2049
      self._ErrorIf(instance.disks_active and
2050
                    not success and not bad_snode,
2051
                    constants.CV_EINSTANCEFAULTYDISK, instance.name,
2052
                    "couldn't retrieve status for disk/%s on %s: %s",
2053
                    idx, self.cfg.GetNodeName(nname), bdev_status)
2054

    
2055
      if instance.disks_active and success and \
2056
         (bdev_status.is_degraded or
2057
          bdev_status.ldisk_status != constants.LDS_OKAY):
2058
        msg = "disk/%s on %s" % (idx, self.cfg.GetNodeName(nname))
2059
        if bdev_status.is_degraded:
2060
          msg += " is degraded"
2061
        if bdev_status.ldisk_status != constants.LDS_OKAY:
2062
          msg += "; state is '%s'" % \
2063
                 constants.LDS_NAMES[bdev_status.ldisk_status]
2064

    
2065
        self._Error(constants.CV_EINSTANCEFAULTYDISK, instance.name, msg)
2066

    
2067
    self._ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2068
                  constants.CV_ENODERPC, self.cfg.GetNodeName(pnode_uuid),
2069
                  "instance %s, connection to primary node failed",
2070
                  instance.name)
2071

    
2072
    self._ErrorIf(len(instance.secondary_nodes) > 1,
2073
                  constants.CV_EINSTANCELAYOUT, instance.name,
2074
                  "instance has multiple secondary nodes: %s",
2075
                  utils.CommaJoin(instance.secondary_nodes),
2076
                  code=self.ETYPE_WARNING)
2077

    
2078
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, instance.all_nodes)
2079
    if any(es_flags.values()):
2080
      if instance.disk_template not in constants.DTS_EXCL_STORAGE:
2081
        # Disk template not compatible with exclusive_storage: no instance
2082
        # node should have the flag set
2083
        es_nodes = [n
2084
                    for (n, es) in es_flags.items()
2085
                    if es]
2086
        self._Error(constants.CV_EINSTANCEUNSUITABLENODE, instance.name,
2087
                    "instance has template %s, which is not supported on nodes"
2088
                    " that have exclusive storage set: %s",
2089
                    instance.disk_template,
2090
                    utils.CommaJoin(self.cfg.GetNodeNames(es_nodes)))
2091
      for (idx, disk) in enumerate(instance.disks):
2092
        self._ErrorIf(disk.spindles is None,
2093
                      constants.CV_EINSTANCEMISSINGCFGPARAMETER, instance.name,
2094
                      "number of spindles not configured for disk %s while"
2095
                      " exclusive storage is enabled, try running"
2096
                      " gnt-cluster repair-disk-sizes", idx)
2097

    
2098
    if instance.disk_template in constants.DTS_INT_MIRROR:
2099
      instance_nodes = utils.NiceSort(instance.all_nodes)
2100
      instance_groups = {}
2101

    
2102
      for node_uuid in instance_nodes:
2103
        instance_groups.setdefault(self.all_node_info[node_uuid].group,
2104
                                   []).append(node_uuid)
2105

    
2106
      pretty_list = [
2107
        "%s (group %s)" % (utils.CommaJoin(self.cfg.GetNodeNames(nodes)),
2108
                           groupinfo[group].name)
2109
        # Sort so that we always list the primary node first.
2110
        for group, nodes in sorted(instance_groups.items(),
2111
                                   key=lambda (_, nodes): pnode_uuid in nodes,
2112
                                   reverse=True)]
2113

    
2114
      self._ErrorIf(len(instance_groups) > 1,
2115
                    constants.CV_EINSTANCESPLITGROUPS,
2116
                    instance.name, "instance has primary and secondary nodes in"
2117
                    " different groups: %s", utils.CommaJoin(pretty_list),
2118
                    code=self.ETYPE_WARNING)
2119

    
2120
    inst_nodes_offline = []
2121
    for snode in instance.secondary_nodes:
2122
      s_img = node_image[snode]
2123
      self._ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
2124
                    self.cfg.GetNodeName(snode),
2125
                    "instance %s, connection to secondary node failed",
2126
                    instance.name)
2127

    
2128
      if s_img.offline:
2129
        inst_nodes_offline.append(snode)
2130

    
2131
    # warn that the instance lives on offline nodes
2132
    self._ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE,
2133
                  instance.name, "instance has offline secondary node(s) %s",
2134
                  utils.CommaJoin(self.cfg.GetNodeNames(inst_nodes_offline)))
2135
    # ... or ghost/non-vm_capable nodes
2136
    for node_uuid in instance.all_nodes:
2137
      self._ErrorIf(node_image[node_uuid].ghost, constants.CV_EINSTANCEBADNODE,
2138
                    instance.name, "instance lives on ghost node %s",
2139
                    self.cfg.GetNodeName(node_uuid))
2140
      self._ErrorIf(not node_image[node_uuid].vm_capable,
2141
                    constants.CV_EINSTANCEBADNODE, instance.name,
2142
                    "instance lives on non-vm_capable node %s",
2143
                    self.cfg.GetNodeName(node_uuid))
2144

    
2145
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2146
    """Verify if there are any unknown volumes in the cluster.
2147

2148
    The .os, .swap and backup volumes are ignored. All other volumes are
2149
    reported as unknown.
2150

2151
    @type reserved: L{ganeti.utils.FieldSet}
2152
    @param reserved: a FieldSet of reserved volume names
2153

2154
    """
2155
    for node_uuid, n_img in node_image.items():
2156
      if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
2157
          self.all_node_info[node_uuid].group != self.group_uuid):
2158
        # skip non-healthy nodes
2159
        continue
2160
      for volume in n_img.volumes:
2161
        test = ((node_uuid not in node_vol_should or
2162
                volume not in node_vol_should[node_uuid]) and
2163
                not reserved.Matches(volume))
2164
        self._ErrorIf(test, constants.CV_ENODEORPHANLV,
2165
                      self.cfg.GetNodeName(node_uuid),
2166
                      "volume %s is unknown", volume)
2167

    
2168
  def _VerifyNPlusOneMemory(self, node_image, all_insts):
2169
    """Verify N+1 Memory Resilience.
2170

2171
    Check that if one single node dies we can still start all the
2172
    instances it was primary for.
2173

2174
    """
2175
    cluster_info = self.cfg.GetClusterInfo()
2176
    for node_uuid, n_img in node_image.items():
2177
      # This code checks that every node which is now listed as
2178
      # secondary has enough memory to host all instances it is
2179
      # supposed to should a single other node in the cluster fail.
2180
      # FIXME: not ready for failover to an arbitrary node
2181
      # FIXME: does not support file-backed instances
2182
      # WARNING: we currently take into account down instances as well
2183
      # as up ones, considering that even if they're down someone
2184
      # might want to start them even in the event of a node failure.
2185
      if n_img.offline or \
2186
         self.all_node_info[node_uuid].group != self.group_uuid:
2187
        # we're skipping nodes marked offline and nodes in other groups from
2188
        # the N+1 warning, since most likely we don't have good memory
2189
        # information from them; we already list instances living on such
2190
        # nodes, and that's enough warning
2191
        continue
2192
      #TODO(dynmem): also consider ballooning out other instances
2193
      for prinode, inst_uuids in n_img.sbp.items():
2194
        needed_mem = 0
2195
        for inst_uuid in inst_uuids:
2196
          bep = cluster_info.FillBE(all_insts[inst_uuid])
2197
          if bep[constants.BE_AUTO_BALANCE]:
2198
            needed_mem += bep[constants.BE_MINMEM]
2199
        test = n_img.mfree < needed_mem
2200
        self._ErrorIf(test, constants.CV_ENODEN1,
2201
                      self.cfg.GetNodeName(node_uuid),
2202
                      "not enough memory to accomodate instance failovers"
2203
                      " should node %s fail (%dMiB needed, %dMiB available)",
2204
                      self.cfg.GetNodeName(prinode), needed_mem, n_img.mfree)
2205

    
2206
  def _VerifyFiles(self, nodes, master_node_uuid, all_nvinfo,
2207
                   (files_all, files_opt, files_mc, files_vm)):
2208
    """Verifies file checksums collected from all nodes.
2209

2210
    @param nodes: List of L{objects.Node} objects
2211
    @param master_node_uuid: UUID of master node
2212
    @param all_nvinfo: RPC results
2213

2214
    """
2215
    # Define functions determining which nodes to consider for a file
2216
    files2nodefn = [
2217
      (files_all, None),
2218
      (files_mc, lambda node: (node.master_candidate or
2219
                               node.uuid == master_node_uuid)),
2220
      (files_vm, lambda node: node.vm_capable),
2221
      ]
2222

    
2223
    # Build mapping from filename to list of nodes which should have the file
2224
    nodefiles = {}
2225
    for (files, fn) in files2nodefn:
2226
      if fn is None:
2227
        filenodes = nodes
2228
      else:
2229
        filenodes = filter(fn, nodes)
2230
      nodefiles.update((filename,
2231
                        frozenset(map(operator.attrgetter("uuid"), filenodes)))
2232
                       for filename in files)
2233

    
2234
    assert set(nodefiles) == (files_all | files_mc | files_vm)
2235

    
2236
    fileinfo = dict((filename, {}) for filename in nodefiles)
2237
    ignore_nodes = set()
2238

    
2239
    for node in nodes:
2240
      if node.offline:
2241
        ignore_nodes.add(node.uuid)
2242
        continue
2243

    
2244
      nresult = all_nvinfo[node.uuid]
2245

    
2246
      if nresult.fail_msg or not nresult.payload:
2247
        node_files = None
2248
      else:
2249
        fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
2250
        node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2251
                          for (key, value) in fingerprints.items())
2252
        del fingerprints
2253

    
2254
      test = not (node_files and isinstance(node_files, dict))
2255
      self._ErrorIf(test, constants.CV_ENODEFILECHECK, node.name,
2256
                    "Node did not return file checksum data")
2257
      if test:
2258
        ignore_nodes.add(node.uuid)
2259
        continue
2260

    
2261
      # Build per-checksum mapping from filename to nodes having it
2262
      for (filename, checksum) in node_files.items():
2263
        assert filename in nodefiles
2264
        fileinfo[filename].setdefault(checksum, set()).add(node.uuid)
2265

    
2266
    for (filename, checksums) in fileinfo.items():
2267
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2268

    
2269
      # Nodes having the file
2270
      with_file = frozenset(node_uuid
2271
                            for node_uuids in fileinfo[filename].values()
2272
                            for node_uuid in node_uuids) - ignore_nodes
2273

    
2274
      expected_nodes = nodefiles[filename] - ignore_nodes
2275

    
2276
      # Nodes missing file
2277
      missing_file = expected_nodes - with_file
2278

    
2279
      if filename in files_opt:
2280
        # All or no nodes
2281
        self._ErrorIf(missing_file and missing_file != expected_nodes,
2282
                      constants.CV_ECLUSTERFILECHECK, None,
2283
                      "File %s is optional, but it must exist on all or no"
2284
                      " nodes (not found on %s)",
2285
                      filename,
2286
                      utils.CommaJoin(
2287
                        utils.NiceSort(
2288
                          map(self.cfg.GetNodeName, missing_file))))
2289
      else:
2290
        self._ErrorIf(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2291
                      "File %s is missing from node(s) %s", filename,
2292
                      utils.CommaJoin(
2293
                        utils.NiceSort(
2294
                          map(self.cfg.GetNodeName, missing_file))))
2295

    
2296
        # Warn if a node has a file it shouldn't
2297
        unexpected = with_file - expected_nodes
2298
        self._ErrorIf(unexpected,
2299
                      constants.CV_ECLUSTERFILECHECK, None,
2300
                      "File %s should not exist on node(s) %s",
2301
                      filename, utils.CommaJoin(
2302
                        utils.NiceSort(map(self.cfg.GetNodeName, unexpected))))
2303

    
2304
      # See if there are multiple versions of the file
2305
      test = len(checksums) > 1
2306
      if test:
2307
        variants = ["variant %s on %s" %
2308
                    (idx + 1,
2309
                     utils.CommaJoin(utils.NiceSort(
2310
                       map(self.cfg.GetNodeName, node_uuids))))
2311
                    for (idx, (checksum, node_uuids)) in
2312
                      enumerate(sorted(checksums.items()))]
2313
      else:
2314
        variants = []
2315

    
2316
      self._ErrorIf(test, constants.CV_ECLUSTERFILECHECK, None,
2317
                    "File %s found with %s different checksums (%s)",
2318
                    filename, len(checksums), "; ".join(variants))
2319

    
2320
  def _VerifyNodeDrbdHelper(self, ninfo, nresult, drbd_helper):
2321
    """Verify the drbd helper.
2322

2323
    """
2324
    if drbd_helper:
2325
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2326
      test = (helper_result is None)
2327
      self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2328
                    "no drbd usermode helper returned")
2329
      if helper_result:
2330
        status, payload = helper_result
2331
        test = not status
2332
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2333
                      "drbd usermode helper check unsuccessful: %s", payload)
2334
        test = status and (payload != drbd_helper)
2335
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2336
                      "wrong drbd usermode helper: %s", payload)
2337

    
2338
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2339
                      drbd_map):
2340
    """Verifies and the node DRBD status.
2341

2342
    @type ninfo: L{objects.Node}
2343
    @param ninfo: the node to check
2344
    @param nresult: the remote results for the node
2345
    @param instanceinfo: the dict of instances
2346
    @param drbd_helper: the configured DRBD usermode helper
2347
    @param drbd_map: the DRBD map as returned by
2348
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2349

2350
    """
2351
    self._VerifyNodeDrbdHelper(ninfo, nresult, drbd_helper)
2352

    
2353
    # compute the DRBD minors
2354
    node_drbd = {}
2355
    for minor, inst_uuid in drbd_map[ninfo.uuid].items():
2356
      test = inst_uuid not in instanceinfo
2357
      self._ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2358
                    "ghost instance '%s' in temporary DRBD map", inst_uuid)
2359
        # ghost instance should not be running, but otherwise we
2360
        # don't give double warnings (both ghost instance and
2361
        # unallocated minor in use)
2362
      if test:
2363
        node_drbd[minor] = (inst_uuid, False)
2364
      else:
2365
        instance = instanceinfo[inst_uuid]
2366
        node_drbd[minor] = (inst_uuid, instance.disks_active)
2367

    
2368
    # and now check them
2369
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2370
    test = not isinstance(used_minors, (tuple, list))
2371
    self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2372
                  "cannot parse drbd status file: %s", str(used_minors))
2373
    if test:
2374
      # we cannot check drbd status
2375
      return
2376

    
2377
    for minor, (inst_uuid, must_exist) in node_drbd.items():
2378
      test = minor not in used_minors and must_exist
2379
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2380
                    "drbd minor %d of instance %s is not active", minor,
2381
                    self.cfg.GetInstanceName(inst_uuid))
2382
    for minor in used_minors:
2383
      test = minor not in node_drbd
2384
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2385
                    "unallocated drbd minor %d is in use", minor)
2386

    
2387
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2388
    """Builds the node OS structures.
2389

2390
    @type ninfo: L{objects.Node}
2391
    @param ninfo: the node to check
2392
    @param nresult: the remote results for the node
2393
    @param nimg: the node image object
2394

2395
    """
2396
    remote_os = nresult.get(constants.NV_OSLIST, None)
2397
    test = (not isinstance(remote_os, list) or
2398
            not compat.all(isinstance(v, list) and len(v) == 7
2399
                           for v in remote_os))
2400

    
2401
    self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2402
                  "node hasn't returned valid OS data")
2403

    
2404
    nimg.os_fail = test
2405

    
2406
    if test:
2407
      return
2408

    
2409
    os_dict = {}
2410

    
2411
    for (name, os_path, status, diagnose,
2412
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2413

    
2414
      if name not in os_dict:
2415
        os_dict[name] = []
2416

    
2417
      # parameters is a list of lists instead of list of tuples due to
2418
      # JSON lacking a real tuple type, fix it:
2419
      parameters = [tuple(v) for v in parameters]
2420
      os_dict[name].append((os_path, status, diagnose,
2421
                            set(variants), set(parameters), set(api_ver)))
2422

    
2423
    nimg.oslist = os_dict
2424

    
2425
  def _VerifyNodeOS(self, ninfo, nimg, base):
2426
    """Verifies the node OS list.
2427

2428
    @type ninfo: L{objects.Node}
2429
    @param ninfo: the node to check
2430
    @param nimg: the node image object
2431
    @param base: the 'template' node we match against (e.g. from the master)
2432

2433
    """
2434
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2435

    
2436
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2437
    for os_name, os_data in nimg.oslist.items():
2438
      assert os_data, "Empty OS status for OS %s?!" % os_name
2439
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2440
      self._ErrorIf(not f_status, constants.CV_ENODEOS, ninfo.name,
2441
                    "Invalid OS %s (located at %s): %s",
2442
                    os_name, f_path, f_diag)
2443
      self._ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, ninfo.name,
2444
                    "OS '%s' has multiple entries"
2445
                    " (first one shadows the rest): %s",
2446
                    os_name, utils.CommaJoin([v[0] for v in os_data]))
2447
      # comparisons with the 'base' image
2448
      test = os_name not in base.oslist
2449
      self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2450
                    "Extra OS %s not present on reference node (%s)",
2451
                    os_name, self.cfg.GetNodeName(base.uuid))
2452
      if test:
2453
        continue
2454
      assert base.oslist[os_name], "Base node has empty OS status?"
2455
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2456
      if not b_status:
2457
        # base OS is invalid, skipping
2458
        continue
2459
      for kind, a, b in [("API version", f_api, b_api),
2460
                         ("variants list", f_var, b_var),
2461
                         ("parameters", beautify_params(f_param),
2462
                          beautify_params(b_param))]:
2463
        self._ErrorIf(a != b, constants.CV_ENODEOS, ninfo.name,
2464
                      "OS %s for %s differs from reference node %s:"
2465
                      " [%s] vs. [%s]", kind, os_name,
2466
                      self.cfg.GetNodeName(base.uuid),
2467
                      utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2468

    
2469
    # check any missing OSes
2470
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2471
    self._ErrorIf(missing, constants.CV_ENODEOS, ninfo.name,
2472
                  "OSes present on reference node %s"
2473
                  " but missing on this node: %s",
2474
                  self.cfg.GetNodeName(base.uuid), utils.CommaJoin(missing))
2475

    
2476
  def _VerifyAcceptedFileStoragePaths(self, ninfo, nresult, is_master):
2477
    """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2478

2479
    @type ninfo: L{objects.Node}
2480
    @param ninfo: the node to check
2481
    @param nresult: the remote results for the node
2482
    @type is_master: bool
2483
    @param is_master: Whether node is the master node
2484

2485
    """
2486
    cluster = self.cfg.GetClusterInfo()
2487
    if (is_master and
2488
        (cluster.IsFileStorageEnabled() or
2489
         cluster.IsSharedFileStorageEnabled())):
2490
      try:
2491
        fspaths = nresult[constants.NV_ACCEPTED_STORAGE_PATHS]
2492
      except KeyError:
2493
        # This should never happen
2494
        self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2495
                      "Node did not return forbidden file storage paths")
2496
      else:
2497
        self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2498
                      "Found forbidden file storage paths: %s",
2499
                      utils.CommaJoin(fspaths))
2500
    else:
2501
      self._ErrorIf(constants.NV_ACCEPTED_STORAGE_PATHS in nresult,
2502
                    constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2503
                    "Node should not have returned forbidden file storage"
2504
                    " paths")
2505

    
2506
  def _VerifyStoragePaths(self, ninfo, nresult, file_disk_template,
2507
                          verify_key, error_key):
2508
    """Verifies (file) storage paths.
2509

2510
    @type ninfo: L{objects.Node}
2511
    @param ninfo: the node to check
2512
    @param nresult: the remote results for the node
2513
    @type file_disk_template: string
2514
    @param file_disk_template: file-based disk template, whose directory
2515
        is supposed to be verified
2516
    @type verify_key: string
2517
    @param verify_key: key for the verification map of this file
2518
        verification step
2519
    @param error_key: error key to be added to the verification results
2520
        in case something goes wrong in this verification step
2521

2522
    """
2523
    assert (file_disk_template in
2524
            utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
2525
    cluster = self.cfg.GetClusterInfo()
2526
    if cluster.IsDiskTemplateEnabled(file_disk_template):
2527
      self._ErrorIf(
2528
          verify_key in nresult,
2529
          error_key, ninfo.name,
2530
          "The configured %s storage path is unusable: %s" %
2531
          (file_disk_template, nresult.get(verify_key)))
2532

    
2533
  def _VerifyFileStoragePaths(self, ninfo, nresult):
2534
    """Verifies (file) storage paths.
2535

2536
    @see: C{_VerifyStoragePaths}
2537

2538
    """
2539
    self._VerifyStoragePaths(
2540
        ninfo, nresult, constants.DT_FILE,
2541
        constants.NV_FILE_STORAGE_PATH,
2542
        constants.CV_ENODEFILESTORAGEPATHUNUSABLE)
2543

    
2544
  def _VerifySharedFileStoragePaths(self, ninfo, nresult):
2545
    """Verifies (file) storage paths.
2546

2547
    @see: C{_VerifyStoragePaths}
2548

2549
    """
2550
    self._VerifyStoragePaths(
2551
        ninfo, nresult, constants.DT_SHARED_FILE,
2552
        constants.NV_SHARED_FILE_STORAGE_PATH,
2553
        constants.CV_ENODESHAREDFILESTORAGEPATHUNUSABLE)
2554

    
2555
  def _VerifyOob(self, ninfo, nresult):
2556
    """Verifies out of band functionality of a node.
2557

2558
    @type ninfo: L{objects.Node}
2559
    @param ninfo: the node to check
2560
    @param nresult: the remote results for the node
2561

2562
    """
2563
    # We just have to verify the paths on master and/or master candidates
2564
    # as the oob helper is invoked on the master
2565
    if ((ninfo.master_candidate or ninfo.master_capable) and
2566
        constants.NV_OOB_PATHS in nresult):
2567
      for path_result in nresult[constants.NV_OOB_PATHS]:
2568
        self._ErrorIf(path_result, constants.CV_ENODEOOBPATH,
2569
                      ninfo.name, path_result)
2570

    
2571
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2572
    """Verifies and updates the node volume data.
2573

2574
    This function will update a L{NodeImage}'s internal structures
2575
    with data from the remote call.
2576

2577
    @type ninfo: L{objects.Node}
2578
    @param ninfo: the node to check
2579
    @param nresult: the remote results for the node
2580
    @param nimg: the node image object
2581
    @param vg_name: the configured VG name
2582

2583
    """
2584
    nimg.lvm_fail = True
2585
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2586
    if vg_name is None:
2587
      pass
2588
    elif isinstance(lvdata, basestring):
2589
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2590
                    "LVM problem on node: %s", utils.SafeEncode(lvdata))
2591
    elif not isinstance(lvdata, dict):
2592
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2593
                    "rpc call to node failed (lvlist)")
2594
    else:
2595
      nimg.volumes = lvdata
2596
      nimg.lvm_fail = False
2597

    
2598
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2599
    """Verifies and updates the node instance list.
2600

2601
    If the listing was successful, then updates this node's instance
2602
    list. Otherwise, it marks the RPC call as failed for the instance
2603
    list key.
2604

2605
    @type ninfo: L{objects.Node}
2606
    @param ninfo: the node to check
2607
    @param nresult: the remote results for the node
2608
    @param nimg: the node image object
2609

2610
    """
2611
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2612
    test = not isinstance(idata, list)
2613
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2614
                  "rpc call to node failed (instancelist): %s",
2615
                  utils.SafeEncode(str(idata)))
2616
    if test:
2617
      nimg.hyp_fail = True
2618
    else:
2619
      nimg.instances = [inst.uuid for (_, inst) in
2620
                        self.cfg.GetMultiInstanceInfoByName(idata)]
2621

    
2622
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2623
    """Verifies and computes a node information map
2624

2625
    @type ninfo: L{objects.Node}
2626
    @param ninfo: the node to check
2627
    @param nresult: the remote results for the node
2628
    @param nimg: the node image object
2629
    @param vg_name: the configured VG name
2630

2631
    """
2632
    # try to read free memory (from the hypervisor)
2633
    hv_info = nresult.get(constants.NV_HVINFO, None)
2634
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2635
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2636
                  "rpc call to node failed (hvinfo)")
2637
    if not test:
2638
      try:
2639
        nimg.mfree = int(hv_info["memory_free"])
2640
      except (ValueError, TypeError):
2641
        self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2642
                      "node returned invalid nodeinfo, check hypervisor")
2643

    
2644
    # FIXME: devise a free space model for file based instances as well
2645
    if vg_name is not None:
2646
      test = (constants.NV_VGLIST not in nresult or
2647
              vg_name not in nresult[constants.NV_VGLIST])
2648
      self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
2649
                    "node didn't return data for the volume group '%s'"
2650
                    " - it is either missing or broken", vg_name)
2651
      if not test:
2652
        try:
2653
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2654
        except (ValueError, TypeError):
2655
          self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2656
                        "node returned invalid LVM info, check LVM status")
2657

    
2658
  def _CollectDiskInfo(self, node_uuids, node_image, instanceinfo):
2659
    """Gets per-disk status information for all instances.
2660

2661
    @type node_uuids: list of strings
2662
    @param node_uuids: Node UUIDs
2663
    @type node_image: dict of (UUID, L{objects.Node})
2664
    @param node_image: Node objects
2665
    @type instanceinfo: dict of (UUID, L{objects.Instance})
2666
    @param instanceinfo: Instance objects
2667
    @rtype: {instance: {node: [(succes, payload)]}}
2668
    @return: a dictionary of per-instance dictionaries with nodes as
2669
        keys and disk information as values; the disk information is a
2670
        list of tuples (success, payload)
2671

2672
    """
2673
    node_disks = {}
2674
    node_disks_dev_inst_only = {}
2675
    diskless_instances = set()
2676
    diskless = constants.DT_DISKLESS
2677

    
2678
    for nuuid in node_uuids:
2679
      node_inst_uuids = list(itertools.chain(node_image[nuuid].pinst,
2680
                                             node_image[nuuid].sinst))
2681
      diskless_instances.update(uuid for uuid in node_inst_uuids
2682
                                if instanceinfo[uuid].disk_template == diskless)
2683
      disks = [(inst_uuid, disk)
2684
               for inst_uuid in node_inst_uuids
2685
               for disk in instanceinfo[inst_uuid].disks]
2686

    
2687
      if not disks:
2688
        # No need to collect data
2689
        continue
2690

    
2691
      node_disks[nuuid] = disks
2692

    
2693
      # _AnnotateDiskParams makes already copies of the disks
2694
      dev_inst_only = []
2695
      for (inst_uuid, dev) in disks:
2696
        (anno_disk,) = AnnotateDiskParams(instanceinfo[inst_uuid], [dev],
2697
                                          self.cfg)
2698
        self.cfg.SetDiskID(anno_disk, nuuid)
2699
        dev_inst_only.append((anno_disk, instanceinfo[inst_uuid]))
2700

    
2701
      node_disks_dev_inst_only[nuuid] = dev_inst_only
2702

    
2703
    assert len(node_disks) == len(node_disks_dev_inst_only)
2704

    
2705
    # Collect data from all nodes with disks
2706
    result = self.rpc.call_blockdev_getmirrorstatus_multi(
2707
               node_disks.keys(), node_disks_dev_inst_only)
2708

    
2709
    assert len(result) == len(node_disks)
2710

    
2711
    instdisk = {}
2712

    
2713
    for (nuuid, nres) in result.items():
2714
      node = self.cfg.GetNodeInfo(nuuid)
2715
      disks = node_disks[node.uuid]
2716

    
2717
      if nres.offline:
2718
        # No data from this node
2719
        data = len(disks) * [(False, "node offline")]
2720
      else:
2721
        msg = nres.fail_msg
2722
        self._ErrorIf(msg, constants.CV_ENODERPC, node.name,
2723
                      "while getting disk information: %s", msg)
2724
        if msg:
2725
          # No data from this node
2726
          data = len(disks) * [(False, msg)]
2727
        else:
2728
          data = []
2729
          for idx, i in enumerate(nres.payload):
2730
            if isinstance(i, (tuple, list)) and len(i) == 2:
2731
              data.append(i)
2732
            else:
2733
              logging.warning("Invalid result from node %s, entry %d: %s",
2734
                              node.name, idx, i)
2735
              data.append((False, "Invalid result from the remote node"))
2736

    
2737
      for ((inst_uuid, _), status) in zip(disks, data):
2738
        instdisk.setdefault(inst_uuid, {}).setdefault(node.uuid, []) \
2739
          .append(status)
2740

    
2741
    # Add empty entries for diskless instances.
2742
    for inst_uuid in diskless_instances:
2743
      assert inst_uuid not in instdisk
2744
      instdisk[inst_uuid] = {}
2745

    
2746
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2747
                      len(nuuids) <= len(instanceinfo[inst].all_nodes) and
2748
                      compat.all(isinstance(s, (tuple, list)) and
2749
                                 len(s) == 2 for s in statuses)
2750
                      for inst, nuuids in instdisk.items()
2751
                      for nuuid, statuses in nuuids.items())
2752
    if __debug__:
2753
      instdisk_keys = set(instdisk)
2754
      instanceinfo_keys = set(instanceinfo)
2755
      assert instdisk_keys == instanceinfo_keys, \
2756
        ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2757
         (instdisk_keys, instanceinfo_keys))
2758

    
2759
    return instdisk
2760

    
2761
  @staticmethod
2762
  def _SshNodeSelector(group_uuid, all_nodes):
2763
    """Create endless iterators for all potential SSH check hosts.
2764

2765
    """
2766
    nodes = [node for node in all_nodes
2767
             if (node.group != group_uuid and
2768
                 not node.offline)]
2769
    keyfunc = operator.attrgetter("group")
2770

    
2771
    return map(itertools.cycle,
2772
               [sorted(map(operator.attrgetter("name"), names))
2773
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2774
                                                  keyfunc)])
2775

    
2776
  @classmethod
2777
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2778
    """Choose which nodes should talk to which other nodes.
2779

2780
    We will make nodes contact all nodes in their group, and one node from
2781
    every other group.
2782

2783
    @warning: This algorithm has a known issue if one node group is much
2784
      smaller than others (e.g. just one node). In such a case all other
2785
      nodes will talk to the single node.
2786

2787
    """
2788
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2789
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
2790

    
2791
    return (online_nodes,
2792
            dict((name, sorted([i.next() for i in sel]))
2793
                 for name in online_nodes))
2794

    
2795
  def BuildHooksEnv(self):
2796
    """Build hooks env.
2797

2798
    Cluster-Verify hooks just ran in the post phase and their failure makes
2799
    the output be logged in the verify output and the verification to fail.
2800

2801
    """
2802
    env = {
2803
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2804
      }
2805

    
2806
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2807
               for node in self.my_node_info.values())
2808

    
2809
    return env
2810

    
2811
  def BuildHooksNodes(self):
2812
    """Build hooks nodes.
2813

2814
    """
2815
    return ([], list(self.my_node_info.keys()))
2816

    
2817
  def Exec(self, feedback_fn):
2818
    """Verify integrity of the node group, performing various test on nodes.
2819

2820
    """
2821
    # This method has too many local variables. pylint: disable=R0914
2822
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
2823

    
2824
    if not self.my_node_uuids:
2825
      # empty node group
2826
      feedback_fn("* Empty node group, skipping verification")
2827
      return True
2828

    
2829
    self.bad = False
2830
    verbose = self.op.verbose
2831
    self._feedback_fn = feedback_fn
2832

    
2833
    vg_name = self.cfg.GetVGName()
2834
    drbd_helper = self.cfg.GetDRBDHelper()
2835
    cluster = self.cfg.GetClusterInfo()
2836
    hypervisors = cluster.enabled_hypervisors
2837
    node_data_list = self.my_node_info.values()
2838

    
2839
    i_non_redundant = [] # Non redundant instances
2840
    i_non_a_balanced = [] # Non auto-balanced instances
2841
    i_offline = 0 # Count of offline instances
2842
    n_offline = 0 # Count of offline nodes
2843
    n_drained = 0 # Count of nodes being drained
2844
    node_vol_should = {}
2845

    
2846
    # FIXME: verify OS list
2847

    
2848
    # File verification
2849
    filemap = ComputeAncillaryFiles(cluster, False)
2850

    
2851
    # do local checksums
2852
    master_node_uuid = self.master_node = self.cfg.GetMasterNode()
2853
    master_ip = self.cfg.GetMasterIP()
2854

    
2855
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_uuids))
2856

    
2857
    user_scripts = []
2858
    if self.cfg.GetUseExternalMipScript():
2859
      user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2860

    
2861
    node_verify_param = {
2862
      constants.NV_FILELIST:
2863
        map(vcluster.MakeVirtualPath,
2864
            utils.UniqueSequence(filename
2865
                                 for files in filemap
2866
                                 for filename in files)),
2867
      constants.NV_NODELIST:
2868
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2869
                                  self.all_node_info.values()),
2870
      constants.NV_HYPERVISOR: hypervisors,
2871
      constants.NV_HVPARAMS:
2872
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2873
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2874
                                 for node in node_data_list
2875
                                 if not node.offline],
2876
      constants.NV_INSTANCELIST: hypervisors,
2877
      constants.NV_VERSION: None,
2878
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2879
      constants.NV_NODESETUP: None,
2880
      constants.NV_TIME: None,
2881
      constants.NV_MASTERIP: (self.cfg.GetMasterNodeName(), master_ip),
2882
      constants.NV_OSLIST: None,
2883
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2884
      constants.NV_USERSCRIPTS: user_scripts,
2885
      }
2886

    
2887
    if vg_name is not None:
2888
      node_verify_param[constants.NV_VGLIST] = None
2889
      node_verify_param[constants.NV_LVLIST] = vg_name
2890
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2891

    
2892
    if cluster.IsDiskTemplateEnabled(constants.DT_DRBD8):
2893
      if drbd_helper:
2894
        node_verify_param[constants.NV_DRBDVERSION] = None
2895
        node_verify_param[constants.NV_DRBDLIST] = None
2896
        node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2897

    
2898
    if cluster.IsFileStorageEnabled() or \
2899
        cluster.IsSharedFileStorageEnabled():
2900
      # Load file storage paths only from master node
2901
      node_verify_param[constants.NV_ACCEPTED_STORAGE_PATHS] = \
2902
        self.cfg.GetMasterNodeName()
2903
      if cluster.IsFileStorageEnabled():
2904
        node_verify_param[constants.NV_FILE_STORAGE_PATH] = \
2905
          cluster.file_storage_dir
2906

    
2907
    # bridge checks
2908
    # FIXME: this needs to be changed per node-group, not cluster-wide
2909
    bridges = set()
2910
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2911
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2912
      bridges.add(default_nicpp[constants.NIC_LINK])
2913
    for inst_uuid in self.my_inst_info.values():
2914
      for nic in inst_uuid.nics:
2915
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
2916
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2917
          bridges.add(full_nic[constants.NIC_LINK])
2918

    
2919
    if bridges:
2920
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
2921

    
2922
    # Build our expected cluster state
2923
    node_image = dict((node.uuid, self.NodeImage(offline=node.offline,
2924
                                                 uuid=node.uuid,
2925
                                                 vm_capable=node.vm_capable))
2926
                      for node in node_data_list)
2927

    
2928
    # Gather OOB paths
2929
    oob_paths = []
2930
    for node in self.all_node_info.values():
2931
      path = SupportsOob(self.cfg, node)
2932
      if path and path not in oob_paths:
2933
        oob_paths.append(path)
2934

    
2935
    if oob_paths:
2936
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2937

    
2938
    for inst_uuid in self.my_inst_uuids:
2939
      instance = self.my_inst_info[inst_uuid]
2940
      if instance.admin_state == constants.ADMINST_OFFLINE:
2941
        i_offline += 1
2942

    
2943
      for nuuid in instance.all_nodes:
2944
        if nuuid not in node_image:
2945
          gnode = self.NodeImage(uuid=nuuid)
2946
          gnode.ghost = (nuuid not in self.all_node_info)
2947
          node_image[nuuid] = gnode
2948

    
2949
      instance.MapLVsByNode(node_vol_should)
2950

    
2951
      pnode = instance.primary_node
2952
      node_image[pnode].pinst.append(instance.uuid)
2953

    
2954
      for snode in instance.secondary_nodes:
2955
        nimg = node_image[snode]
2956
        nimg.sinst.append(instance.uuid)
2957
        if pnode not in nimg.sbp:
2958
          nimg.sbp[pnode] = []
2959
        nimg.sbp[pnode].append(instance.uuid)
2960

    
2961
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
2962
                                               self.my_node_info.keys())
2963
    # The value of exclusive_storage should be the same across the group, so if
2964
    # it's True for at least a node, we act as if it were set for all the nodes
2965
    self._exclusive_storage = compat.any(es_flags.values())
2966
    if self._exclusive_storage:
2967
      node_verify_param[constants.NV_EXCLUSIVEPVS] = True
2968

    
2969
    # At this point, we have the in-memory data structures complete,
2970
    # except for the runtime information, which we'll gather next
2971

    
2972
    # Due to the way our RPC system works, exact response times cannot be
2973
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2974
    # time before and after executing the request, we can at least have a time
2975
    # window.
2976
    nvinfo_starttime = time.time()
2977
    all_nvinfo = self.rpc.call_node_verify(self.my_node_uuids,
2978
                                           node_verify_param,
2979
                                           self.cfg.GetClusterName(),
2980
                                           self.cfg.GetClusterInfo().hvparams)
2981
    nvinfo_endtime = time.time()
2982

    
2983
    if self.extra_lv_nodes and vg_name is not None:
2984
      extra_lv_nvinfo = \
2985
          self.rpc.call_node_verify(self.extra_lv_nodes,
2986
                                    {constants.NV_LVLIST: vg_name},
2987
                                    self.cfg.GetClusterName(),
2988
                                    self.cfg.GetClusterInfo().hvparams)
2989
    else:
2990
      extra_lv_nvinfo = {}
2991

    
2992
    all_drbd_map = self.cfg.ComputeDRBDMap()
2993

    
2994
    feedback_fn("* Gathering disk information (%s nodes)" %
2995
                len(self.my_node_uuids))
2996
    instdisk = self._CollectDiskInfo(self.my_node_info.keys(), node_image,
2997
                                     self.my_inst_info)
2998

    
2999
    feedback_fn("* Verifying configuration file consistency")
3000

    
3001
    # If not all nodes are being checked, we need to make sure the master node
3002
    # and a non-checked vm_capable node are in the list.
3003
    absent_node_uuids = set(self.all_node_info).difference(self.my_node_info)
3004
    if absent_node_uuids:
3005
      vf_nvinfo = all_nvinfo.copy()
3006
      vf_node_info = list(self.my_node_info.values())
3007
      additional_node_uuids = []
3008
      if master_node_uuid not in self.my_node_info:
3009
        additional_node_uuids.append(master_node_uuid)
3010
        vf_node_info.append(self.all_node_info[master_node_uuid])
3011
      # Add the first vm_capable node we find which is not included,
3012
      # excluding the master node (which we already have)
3013
      for node_uuid in absent_node_uuids:
3014
        nodeinfo = self.all_node_info[node_uuid]
3015
        if (nodeinfo.vm_capable and not nodeinfo.offline and
3016
            node_uuid != master_node_uuid):
3017
          additional_node_uuids.append(node_uuid)
3018
          vf_node_info.append(self.all_node_info[node_uuid])
3019
          break
3020
      key = constants.NV_FILELIST
3021
      vf_nvinfo.update(self.rpc.call_node_verify(
3022
         additional_node_uuids, {key: node_verify_param[key]},
3023
         self.cfg.GetClusterName(), self.cfg.GetClusterInfo().hvparams))
3024
    else:
3025
      vf_nvinfo = all_nvinfo
3026
      vf_node_info = self.my_node_info.values()
3027

    
3028
    self._VerifyFiles(vf_node_info, master_node_uuid, vf_nvinfo, filemap)
3029

    
3030
    feedback_fn("* Verifying node status")
3031

    
3032
    refos_img = None
3033

    
3034
    for node_i in node_data_list:
3035
      nimg = node_image[node_i.uuid]
3036

    
3037
      if node_i.offline:
3038
        if verbose:
3039
          feedback_fn("* Skipping offline node %s" % (node_i.name,))
3040
        n_offline += 1
3041
        continue
3042

    
3043
      if node_i.uuid == master_node_uuid:
3044
        ntype = "master"
3045
      elif node_i.master_candidate:
3046
        ntype = "master candidate"
3047
      elif node_i.drained:
3048
        ntype = "drained"
3049
        n_drained += 1
3050
      else:
3051
        ntype = "regular"
3052
      if verbose:
3053
        feedback_fn("* Verifying node %s (%s)" % (node_i.name, ntype))
3054

    
3055
      msg = all_nvinfo[node_i.uuid].fail_msg
3056
      self._ErrorIf(msg, constants.CV_ENODERPC, node_i.name,
3057
                    "while contacting node: %s", msg)
3058
      if msg:
3059
        nimg.rpc_fail = True
3060
        continue
3061

    
3062
      nresult = all_nvinfo[node_i.uuid].payload
3063

    
3064
      nimg.call_ok = self._VerifyNode(node_i, nresult)
3065
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
3066
      self._VerifyNodeNetwork(node_i, nresult)
3067
      self._VerifyNodeUserScripts(node_i, nresult)
3068
      self._VerifyOob(node_i, nresult)
3069
      self._VerifyAcceptedFileStoragePaths(node_i, nresult,
3070
                                           node_i.uuid == master_node_uuid)
3071
      self._VerifyFileStoragePaths(node_i, nresult)
3072
      self._VerifySharedFileStoragePaths(node_i, nresult)
3073

    
3074
      if nimg.vm_capable:
3075
        self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
3076
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
3077
                             all_drbd_map)
3078

    
3079
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
3080
        self._UpdateNodeInstances(node_i, nresult, nimg)
3081
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
3082
        self._UpdateNodeOS(node_i, nresult, nimg)
3083

    
3084
        if not nimg.os_fail:
3085
          if refos_img is None:
3086
            refos_img = nimg
3087
          self._VerifyNodeOS(node_i, nimg, refos_img)
3088
        self._VerifyNodeBridges(node_i, nresult, bridges)
3089

    
3090
        # Check whether all running instances are primary for the node. (This
3091
        # can no longer be done from _VerifyInstance below, since some of the
3092
        # wrong instances could be from other node groups.)
3093
        non_primary_inst_uuids = set(nimg.instances).difference(nimg.pinst)
3094

    
3095
        for inst_uuid in non_primary_inst_uuids:
3096
          test = inst_uuid in self.all_inst_info
3097
          self._ErrorIf(test, constants.CV_EINSTANCEWRONGNODE,
3098
                        self.cfg.GetInstanceName(inst_uuid),
3099
                        "instance should not run on node %s", node_i.name)
3100
          self._ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
3101
                        "node is running unknown instance %s", inst_uuid)
3102

    
3103
    self._VerifyGroupDRBDVersion(all_nvinfo)
3104
    self._VerifyGroupLVM(node_image, vg_name)
3105

    
3106
    for node_uuid, result in extra_lv_nvinfo.items():
3107
      self._UpdateNodeVolumes(self.all_node_info[node_uuid], result.payload,
3108
                              node_image[node_uuid], vg_name)
3109

    
3110
    feedback_fn("* Verifying instance status")
3111
    for inst_uuid in self.my_inst_uuids:
3112
      instance = self.my_inst_info[inst_uuid]
3113
      if verbose:
3114
        feedback_fn("* Verifying instance %s" % instance.name)
3115
      self._VerifyInstance(instance, node_image, instdisk[inst_uuid])
3116

    
3117
      # If the instance is non-redundant we cannot survive losing its primary
3118
      # node, so we are not N+1 compliant.
3119
      if instance.disk_template not in constants.DTS_MIRRORED:
3120
        i_non_redundant.append(instance)
3121

    
3122
      if not cluster.FillBE(instance)[constants.BE_AUTO_BALANCE]:
3123
        i_non_a_balanced.append(instance)
3124

    
3125
    feedback_fn("* Verifying orphan volumes")
3126
    reserved = utils.FieldSet(*cluster.reserved_lvs)
3127

    
3128
    # We will get spurious "unknown volume" warnings if any node of this group
3129
    # is secondary for an instance whose primary is in another group. To avoid
3130
    # them, we find these instances and add their volumes to node_vol_should.
3131
    for instance in self.all_inst_info.values():
3132
      for secondary in instance.secondary_nodes:
3133
        if (secondary in self.my_node_info
3134
            and instance.name not in self.my_inst_info):
3135
          instance.MapLVsByNode(node_vol_should)
3136
          break
3137

    
3138
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
3139

    
3140
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
3141
      feedback_fn("* Verifying N+1 Memory redundancy")
3142
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
3143

    
3144
    feedback_fn("* Other Notes")
3145
    if i_non_redundant:
3146
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
3147
                  % len(i_non_redundant))
3148

    
3149
    if i_non_a_balanced:
3150
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
3151
                  % len(i_non_a_balanced))
3152

    
3153
    if i_offline:
3154
      feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
3155

    
3156
    if n_offline:
3157
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
3158

    
3159
    if n_drained:
3160
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
3161

    
3162
    return not self.bad
3163

    
3164
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
3165
    """Analyze the post-hooks' result
3166

3167
    This method analyses the hook result, handles it, and sends some
3168
    nicely-formatted feedback back to the user.
3169

3170
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
3171
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
3172
    @param hooks_results: the results of the multi-node hooks rpc call
3173
    @param feedback_fn: function used send feedback back to the caller
3174
    @param lu_result: previous Exec result
3175
    @return: the new Exec result, based on the previous result
3176
        and hook results
3177

3178
    """
3179
    # We only really run POST phase hooks, only for non-empty groups,
3180
    # and are only interested in their results
3181
    if not self.my_node_uuids:
3182
      # empty node group
3183
      pass
3184
    elif phase == constants.HOOKS_PHASE_POST:
3185
      # Used to change hooks' output to proper indentation
3186
      feedback_fn("* Hooks Results")
3187
      assert hooks_results, "invalid result from hooks"
3188

    
3189
      for node_name in hooks_results:
3190
        res = hooks_results[node_name]
3191
        msg = res.fail_msg
3192
        test = msg and not res.offline
3193
        self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3194
                      "Communication failure in hooks execution: %s", msg)
3195
        if res.offline or msg:
3196
          # No need to investigate payload if node is offline or gave
3197
          # an error.
3198
          continue
3199
        for script, hkr, output in res.payload:
3200
          test = hkr == constants.HKR_FAIL
3201
          self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3202
                        "Script %s failed, output:", script)
3203
          if test:
3204
            output = self._HOOKS_INDENT_RE.sub("      ", output)
3205
            feedback_fn("%s" % output)
3206
            lu_result = False
3207

    
3208
    return lu_result
3209

    
3210

    
3211
class LUClusterVerifyDisks(NoHooksLU):
3212
  """Verifies the cluster disks status.
3213

3214
  """
3215
  REQ_BGL = False
3216

    
3217
  def ExpandNames(self):
3218
    self.share_locks = ShareAll()
3219
    self.needed_locks = {
3220
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
3221
      }
3222

    
3223
  def Exec(self, feedback_fn):
3224
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3225

    
3226
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3227
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3228
                           for group in group_names])