Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / cluster.py @ c2e984e2

History | View | Annotate | Download (120.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with the cluster."""
23

    
24
import OpenSSL
25

    
26
import copy
27
import itertools
28
import logging
29
import operator
30
import os
31
import re
32
import time
33

    
34
from ganeti import compat
35
from ganeti import constants
36
from ganeti import errors
37
from ganeti import hypervisor
38
from ganeti import locking
39
from ganeti import masterd
40
from ganeti import netutils
41
from ganeti import objects
42
from ganeti import opcodes
43
from ganeti import pathutils
44
from ganeti import query
45
from ganeti import rpc
46
from ganeti import runtime
47
from ganeti import ssh
48
from ganeti import uidpool
49
from ganeti import utils
50
from ganeti import vcluster
51

    
52
from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
53
  ResultWithJobs
54
from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55
  ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56
  GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57
  GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58
  CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59
  ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob, \
60
  CheckIpolicyVsDiskTemplates, CheckDiskAccessModeValidity, \
61
  CheckDiskAccessModeConsistency
62

    
63
import ganeti.masterd.instance
64

    
65

    
66
class LUClusterActivateMasterIp(NoHooksLU):
67
  """Activate the master IP on the master node.
68

69
  """
70
  def Exec(self, feedback_fn):
71
    """Activate the master IP.
72

73
    """
74
    master_params = self.cfg.GetMasterNetworkParameters()
75
    ems = self.cfg.GetUseExternalMipScript()
76
    result = self.rpc.call_node_activate_master_ip(master_params.uuid,
77
                                                   master_params, ems)
78
    result.Raise("Could not activate the master IP")
79

    
80

    
81
class LUClusterDeactivateMasterIp(NoHooksLU):
82
  """Deactivate the master IP on the master node.
83

84
  """
85
  def Exec(self, feedback_fn):
86
    """Deactivate the master IP.
87

88
    """
89
    master_params = self.cfg.GetMasterNetworkParameters()
90
    ems = self.cfg.GetUseExternalMipScript()
91
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
92
                                                     master_params, ems)
93
    result.Raise("Could not deactivate the master IP")
94

    
95

    
96
class LUClusterConfigQuery(NoHooksLU):
97
  """Return configuration values.
98

99
  """
100
  REQ_BGL = False
101

    
102
  def CheckArguments(self):
103
    self.cq = ClusterQuery(None, self.op.output_fields, False)
104

    
105
  def ExpandNames(self):
106
    self.cq.ExpandNames(self)
107

    
108
  def DeclareLocks(self, level):
109
    self.cq.DeclareLocks(self, level)
110

    
111
  def Exec(self, feedback_fn):
112
    result = self.cq.OldStyleQuery(self)
113

    
114
    assert len(result) == 1
115

    
116
    return result[0]
117

    
118

    
119
class LUClusterDestroy(LogicalUnit):
120
  """Logical unit for destroying the cluster.
121

122
  """
123
  HPATH = "cluster-destroy"
124
  HTYPE = constants.HTYPE_CLUSTER
125

    
126
  def BuildHooksEnv(self):
127
    """Build hooks env.
128

129
    """
130
    return {
131
      "OP_TARGET": self.cfg.GetClusterName(),
132
      }
133

    
134
  def BuildHooksNodes(self):
135
    """Build hooks nodes.
136

137
    """
138
    return ([], [])
139

    
140
  def CheckPrereq(self):
141
    """Check prerequisites.
142

143
    This checks whether the cluster is empty.
144

145
    Any errors are signaled by raising errors.OpPrereqError.
146

147
    """
148
    master = self.cfg.GetMasterNode()
149

    
150
    nodelist = self.cfg.GetNodeList()
151
    if len(nodelist) != 1 or nodelist[0] != master:
152
      raise errors.OpPrereqError("There are still %d node(s) in"
153
                                 " this cluster." % (len(nodelist) - 1),
154
                                 errors.ECODE_INVAL)
155
    instancelist = self.cfg.GetInstanceList()
156
    if instancelist:
157
      raise errors.OpPrereqError("There are still %d instance(s) in"
158
                                 " this cluster." % len(instancelist),
159
                                 errors.ECODE_INVAL)
160

    
161
  def Exec(self, feedback_fn):
162
    """Destroys the cluster.
163

164
    """
165
    master_params = self.cfg.GetMasterNetworkParameters()
166

    
167
    # Run post hooks on master node before it's removed
168
    RunPostHook(self, self.cfg.GetNodeName(master_params.uuid))
169

    
170
    ems = self.cfg.GetUseExternalMipScript()
171
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
172
                                                     master_params, ems)
173
    result.Warn("Error disabling the master IP address", self.LogWarning)
174
    return master_params.uuid
175

    
176

    
177
class LUClusterPostInit(LogicalUnit):
178
  """Logical unit for running hooks after cluster initialization.
179

180
  """
181
  HPATH = "cluster-init"
182
  HTYPE = constants.HTYPE_CLUSTER
183

    
184
  def BuildHooksEnv(self):
185
    """Build hooks env.
186

187
    """
188
    return {
189
      "OP_TARGET": self.cfg.GetClusterName(),
190
      }
191

    
192
  def BuildHooksNodes(self):
193
    """Build hooks nodes.
194

195
    """
196
    return ([], [self.cfg.GetMasterNode()])
197

    
198
  def Exec(self, feedback_fn):
199
    """Nothing to do.
200

201
    """
202
    return True
203

    
204

    
205
class ClusterQuery(QueryBase):
206
  FIELDS = query.CLUSTER_FIELDS
207

    
208
  #: Do not sort (there is only one item)
209
  SORT_FIELD = None
210

    
211
  def ExpandNames(self, lu):
212
    lu.needed_locks = {}
213

    
214
    # The following variables interact with _QueryBase._GetNames
215
    self.wanted = locking.ALL_SET
216
    self.do_locking = self.use_locking
217

    
218
    if self.do_locking:
219
      raise errors.OpPrereqError("Can not use locking for cluster queries",
220
                                 errors.ECODE_INVAL)
221

    
222
  def DeclareLocks(self, lu, level):
223
    pass
224

    
225
  def _GetQueryData(self, lu):
226
    """Computes the list of nodes and their attributes.
227

228
    """
229
    # Locking is not used
230
    assert not (compat.any(lu.glm.is_owned(level)
231
                           for level in locking.LEVELS
232
                           if level != locking.LEVEL_CLUSTER) or
233
                self.do_locking or self.use_locking)
234

    
235
    if query.CQ_CONFIG in self.requested_data:
236
      cluster = lu.cfg.GetClusterInfo()
237
      nodes = lu.cfg.GetAllNodesInfo()
238
    else:
239
      cluster = NotImplemented
240
      nodes = NotImplemented
241

    
242
    if query.CQ_QUEUE_DRAINED in self.requested_data:
243
      drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
244
    else:
245
      drain_flag = NotImplemented
246

    
247
    if query.CQ_WATCHER_PAUSE in self.requested_data:
248
      master_node_uuid = lu.cfg.GetMasterNode()
249

    
250
      result = lu.rpc.call_get_watcher_pause(master_node_uuid)
251
      result.Raise("Can't retrieve watcher pause from master node '%s'" %
252
                   lu.cfg.GetMasterNodeName())
253

    
254
      watcher_pause = result.payload
255
    else:
256
      watcher_pause = NotImplemented
257

    
258
    return query.ClusterQueryData(cluster, nodes, drain_flag, watcher_pause)
259

    
260

    
261
class LUClusterQuery(NoHooksLU):
262
  """Query cluster configuration.
263

264
  """
265
  REQ_BGL = False
266

    
267
  def ExpandNames(self):
268
    self.needed_locks = {}
269

    
270
  def Exec(self, feedback_fn):
271
    """Return cluster config.
272

273
    """
274
    cluster = self.cfg.GetClusterInfo()
275
    os_hvp = {}
276

    
277
    # Filter just for enabled hypervisors
278
    for os_name, hv_dict in cluster.os_hvp.items():
279
      os_hvp[os_name] = {}
280
      for hv_name, hv_params in hv_dict.items():
281
        if hv_name in cluster.enabled_hypervisors:
282
          os_hvp[os_name][hv_name] = hv_params
283

    
284
    # Convert ip_family to ip_version
285
    primary_ip_version = constants.IP4_VERSION
286
    if cluster.primary_ip_family == netutils.IP6Address.family:
287
      primary_ip_version = constants.IP6_VERSION
288

    
289
    result = {
290
      "software_version": constants.RELEASE_VERSION,
291
      "protocol_version": constants.PROTOCOL_VERSION,
292
      "config_version": constants.CONFIG_VERSION,
293
      "os_api_version": max(constants.OS_API_VERSIONS),
294
      "export_version": constants.EXPORT_VERSION,
295
      "vcs_version": constants.VCS_VERSION,
296
      "architecture": runtime.GetArchInfo(),
297
      "name": cluster.cluster_name,
298
      "master": self.cfg.GetMasterNodeName(),
299
      "default_hypervisor": cluster.primary_hypervisor,
300
      "enabled_hypervisors": cluster.enabled_hypervisors,
301
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
302
                        for hypervisor_name in cluster.enabled_hypervisors]),
303
      "os_hvp": os_hvp,
304
      "beparams": cluster.beparams,
305
      "osparams": cluster.osparams,
306
      "ipolicy": cluster.ipolicy,
307
      "nicparams": cluster.nicparams,
308
      "ndparams": cluster.ndparams,
309
      "diskparams": cluster.diskparams,
310
      "candidate_pool_size": cluster.candidate_pool_size,
311
      "master_netdev": cluster.master_netdev,
312
      "master_netmask": cluster.master_netmask,
313
      "use_external_mip_script": cluster.use_external_mip_script,
314
      "volume_group_name": cluster.volume_group_name,
315
      "drbd_usermode_helper": cluster.drbd_usermode_helper,
316
      "file_storage_dir": cluster.file_storage_dir,
317
      "shared_file_storage_dir": cluster.shared_file_storage_dir,
318
      "maintain_node_health": cluster.maintain_node_health,
319
      "ctime": cluster.ctime,
320
      "mtime": cluster.mtime,
321
      "uuid": cluster.uuid,
322
      "tags": list(cluster.GetTags()),
323
      "uid_pool": cluster.uid_pool,
324
      "default_iallocator": cluster.default_iallocator,
325
      "reserved_lvs": cluster.reserved_lvs,
326
      "primary_ip_version": primary_ip_version,
327
      "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
328
      "hidden_os": cluster.hidden_os,
329
      "blacklisted_os": cluster.blacklisted_os,
330
      "enabled_disk_templates": cluster.enabled_disk_templates,
331
      }
332

    
333
    return result
334

    
335

    
336
class LUClusterRedistConf(NoHooksLU):
337
  """Force the redistribution of cluster configuration.
338

339
  This is a very simple LU.
340

341
  """
342
  REQ_BGL = False
343

    
344
  def ExpandNames(self):
345
    self.needed_locks = {
346
      locking.LEVEL_NODE: locking.ALL_SET,
347
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
348
    }
349
    self.share_locks = ShareAll()
350

    
351
  def Exec(self, feedback_fn):
352
    """Redistribute the configuration.
353

354
    """
355
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
356
    RedistributeAncillaryFiles(self)
357

    
358

    
359
class LUClusterRename(LogicalUnit):
360
  """Rename the cluster.
361

362
  """
363
  HPATH = "cluster-rename"
364
  HTYPE = constants.HTYPE_CLUSTER
365

    
366
  def BuildHooksEnv(self):
367
    """Build hooks env.
368

369
    """
370
    return {
371
      "OP_TARGET": self.cfg.GetClusterName(),
372
      "NEW_NAME": self.op.name,
373
      }
374

    
375
  def BuildHooksNodes(self):
376
    """Build hooks nodes.
377

378
    """
379
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
380

    
381
  def CheckPrereq(self):
382
    """Verify that the passed name is a valid one.
383

384
    """
385
    hostname = netutils.GetHostname(name=self.op.name,
386
                                    family=self.cfg.GetPrimaryIPFamily())
387

    
388
    new_name = hostname.name
389
    self.ip = new_ip = hostname.ip
390
    old_name = self.cfg.GetClusterName()
391
    old_ip = self.cfg.GetMasterIP()
392
    if new_name == old_name and new_ip == old_ip:
393
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
394
                                 " cluster has changed",
395
                                 errors.ECODE_INVAL)
396
    if new_ip != old_ip:
397
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
398
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
399
                                   " reachable on the network" %
400
                                   new_ip, errors.ECODE_NOTUNIQUE)
401

    
402
    self.op.name = new_name
403

    
404
  def Exec(self, feedback_fn):
405
    """Rename the cluster.
406

407
    """
408
    clustername = self.op.name
409
    new_ip = self.ip
410

    
411
    # shutdown the master IP
412
    master_params = self.cfg.GetMasterNetworkParameters()
413
    ems = self.cfg.GetUseExternalMipScript()
414
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
415
                                                     master_params, ems)
416
    result.Raise("Could not disable the master role")
417

    
418
    try:
419
      cluster = self.cfg.GetClusterInfo()
420
      cluster.cluster_name = clustername
421
      cluster.master_ip = new_ip
422
      self.cfg.Update(cluster, feedback_fn)
423

    
424
      # update the known hosts file
425
      ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
426
      node_list = self.cfg.GetOnlineNodeList()
427
      try:
428
        node_list.remove(master_params.uuid)
429
      except ValueError:
430
        pass
431
      UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
432
    finally:
433
      master_params.ip = new_ip
434
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
435
                                                     master_params, ems)
436
      result.Warn("Could not re-enable the master role on the master,"
437
                  " please restart manually", self.LogWarning)
438

    
439
    return clustername
440

    
441

    
442
class LUClusterRepairDiskSizes(NoHooksLU):
443
  """Verifies the cluster disks sizes.
444

445
  """
446
  REQ_BGL = False
447

    
448
  def ExpandNames(self):
449
    if self.op.instances:
450
      (_, self.wanted_names) = GetWantedInstances(self, self.op.instances)
451
      # Not getting the node allocation lock as only a specific set of
452
      # instances (and their nodes) is going to be acquired
453
      self.needed_locks = {
454
        locking.LEVEL_NODE_RES: [],
455
        locking.LEVEL_INSTANCE: self.wanted_names,
456
        }
457
      self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
458
    else:
459
      self.wanted_names = None
460
      self.needed_locks = {
461
        locking.LEVEL_NODE_RES: locking.ALL_SET,
462
        locking.LEVEL_INSTANCE: locking.ALL_SET,
463

    
464
        # This opcode is acquires the node locks for all instances
465
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
466
        }
467

    
468
    self.share_locks = {
469
      locking.LEVEL_NODE_RES: 1,
470
      locking.LEVEL_INSTANCE: 0,
471
      locking.LEVEL_NODE_ALLOC: 1,
472
      }
473

    
474
  def DeclareLocks(self, level):
475
    if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
476
      self._LockInstancesNodes(primary_only=True, level=level)
477

    
478
  def CheckPrereq(self):
479
    """Check prerequisites.
480

481
    This only checks the optional instance list against the existing names.
482

483
    """
484
    if self.wanted_names is None:
485
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
486

    
487
    self.wanted_instances = \
488
        map(compat.snd, self.cfg.GetMultiInstanceInfoByName(self.wanted_names))
489

    
490
  def _EnsureChildSizes(self, disk):
491
    """Ensure children of the disk have the needed disk size.
492

493
    This is valid mainly for DRBD8 and fixes an issue where the
494
    children have smaller disk size.
495

496
    @param disk: an L{ganeti.objects.Disk} object
497

498
    """
499
    if disk.dev_type == constants.DT_DRBD8:
500
      assert disk.children, "Empty children for DRBD8?"
501
      fchild = disk.children[0]
502
      mismatch = fchild.size < disk.size
503
      if mismatch:
504
        self.LogInfo("Child disk has size %d, parent %d, fixing",
505
                     fchild.size, disk.size)
506
        fchild.size = disk.size
507

    
508
      # and we recurse on this child only, not on the metadev
509
      return self._EnsureChildSizes(fchild) or mismatch
510
    else:
511
      return False
512

    
513
  def Exec(self, feedback_fn):
514
    """Verify the size of cluster disks.
515

516
    """
517
    # TODO: check child disks too
518
    # TODO: check differences in size between primary/secondary nodes
519
    per_node_disks = {}
520
    for instance in self.wanted_instances:
521
      pnode = instance.primary_node
522
      if pnode not in per_node_disks:
523
        per_node_disks[pnode] = []
524
      for idx, disk in enumerate(instance.disks):
525
        per_node_disks[pnode].append((instance, idx, disk))
526

    
527
    assert not (frozenset(per_node_disks.keys()) -
528
                self.owned_locks(locking.LEVEL_NODE_RES)), \
529
      "Not owning correct locks"
530
    assert not self.owned_locks(locking.LEVEL_NODE)
531

    
532
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
533
                                               per_node_disks.keys())
534

    
535
    changed = []
536
    for node_uuid, dskl in per_node_disks.items():
537
      if not dskl:
538
        # no disks on the node
539
        continue
540

    
541
      newl = [([v[2].Copy()], v[0]) for v in dskl]
542
      node_name = self.cfg.GetNodeName(node_uuid)
543
      result = self.rpc.call_blockdev_getdimensions(node_uuid, newl)
544
      if result.fail_msg:
545
        self.LogWarning("Failure in blockdev_getdimensions call to node"
546
                        " %s, ignoring", node_name)
547
        continue
548
      if len(result.payload) != len(dskl):
549
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
550
                        " result.payload=%s", node_name, len(dskl),
551
                        result.payload)
552
        self.LogWarning("Invalid result from node %s, ignoring node results",
553
                        node_name)
554
        continue
555
      for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
556
        if dimensions is None:
557
          self.LogWarning("Disk %d of instance %s did not return size"
558
                          " information, ignoring", idx, instance.name)
559
          continue
560
        if not isinstance(dimensions, (tuple, list)):
561
          self.LogWarning("Disk %d of instance %s did not return valid"
562
                          " dimension information, ignoring", idx,
563
                          instance.name)
564
          continue
565
        (size, spindles) = dimensions
566
        if not isinstance(size, (int, long)):
567
          self.LogWarning("Disk %d of instance %s did not return valid"
568
                          " size information, ignoring", idx, instance.name)
569
          continue
570
        size = size >> 20
571
        if size != disk.size:
572
          self.LogInfo("Disk %d of instance %s has mismatched size,"
573
                       " correcting: recorded %d, actual %d", idx,
574
                       instance.name, disk.size, size)
575
          disk.size = size
576
          self.cfg.Update(instance, feedback_fn)
577
          changed.append((instance.name, idx, "size", size))
578
        if es_flags[node_uuid]:
579
          if spindles is None:
580
            self.LogWarning("Disk %d of instance %s did not return valid"
581
                            " spindles information, ignoring", idx,
582
                            instance.name)
583
          elif disk.spindles is None or disk.spindles != spindles:
584
            self.LogInfo("Disk %d of instance %s has mismatched spindles,"
585
                         " correcting: recorded %s, actual %s",
586
                         idx, instance.name, disk.spindles, spindles)
587
            disk.spindles = spindles
588
            self.cfg.Update(instance, feedback_fn)
589
            changed.append((instance.name, idx, "spindles", disk.spindles))
590
        if self._EnsureChildSizes(disk):
591
          self.cfg.Update(instance, feedback_fn)
592
          changed.append((instance.name, idx, "size", disk.size))
593
    return changed
594

    
595

    
596
def _ValidateNetmask(cfg, netmask):
597
  """Checks if a netmask is valid.
598

599
  @type cfg: L{config.ConfigWriter}
600
  @param cfg: The cluster configuration
601
  @type netmask: int
602
  @param netmask: the netmask to be verified
603
  @raise errors.OpPrereqError: if the validation fails
604

605
  """
606
  ip_family = cfg.GetPrimaryIPFamily()
607
  try:
608
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
609
  except errors.ProgrammerError:
610
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
611
                               ip_family, errors.ECODE_INVAL)
612
  if not ipcls.ValidateNetmask(netmask):
613
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
614
                               (netmask), errors.ECODE_INVAL)
615

    
616

    
617
def CheckFileBasedStoragePathVsEnabledDiskTemplates(
618
    logging_warn_fn, file_storage_dir, enabled_disk_templates,
619
    file_disk_template):
620
  """Checks whether the given file-based storage directory is acceptable.
621

622
  Note: This function is public, because it is also used in bootstrap.py.
623

624
  @type logging_warn_fn: function
625
  @param logging_warn_fn: function which accepts a string and logs it
626
  @type file_storage_dir: string
627
  @param file_storage_dir: the directory to be used for file-based instances
628
  @type enabled_disk_templates: list of string
629
  @param enabled_disk_templates: the list of enabled disk templates
630
  @type file_disk_template: string
631
  @param file_disk_template: the file-based disk template for which the
632
      path should be checked
633

634
  """
635
  assert (file_disk_template in
636
          utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
637
  file_storage_enabled = file_disk_template in enabled_disk_templates
638
  if file_storage_dir is not None:
639
    if file_storage_dir == "":
640
      if file_storage_enabled:
641
        raise errors.OpPrereqError(
642
            "Unsetting the '%s' storage directory while having '%s' storage"
643
            " enabled is not permitted." %
644
            (file_disk_template, file_disk_template))
645
    else:
646
      if not file_storage_enabled:
647
        logging_warn_fn(
648
            "Specified a %s storage directory, although %s storage is not"
649
            " enabled." % (file_disk_template, file_disk_template))
650
  else:
651
    raise errors.ProgrammerError("Received %s storage dir with value"
652
                                 " 'None'." % file_disk_template)
653

    
654

    
655
def CheckFileStoragePathVsEnabledDiskTemplates(
656
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
657
  """Checks whether the given file storage directory is acceptable.
658

659
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
660

661
  """
662
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
663
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
664
      constants.DT_FILE)
665

    
666

    
667
def CheckSharedFileStoragePathVsEnabledDiskTemplates(
668
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
669
  """Checks whether the given shared file storage directory is acceptable.
670

671
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
672

673
  """
674
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
675
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
676
      constants.DT_SHARED_FILE)
677

    
678

    
679
class LUClusterSetParams(LogicalUnit):
680
  """Change the parameters of the cluster.
681

682
  """
683
  HPATH = "cluster-modify"
684
  HTYPE = constants.HTYPE_CLUSTER
685
  REQ_BGL = False
686

    
687
  def CheckArguments(self):
688
    """Check parameters
689

690
    """
691
    if self.op.uid_pool:
692
      uidpool.CheckUidPool(self.op.uid_pool)
693

    
694
    if self.op.add_uids:
695
      uidpool.CheckUidPool(self.op.add_uids)
696

    
697
    if self.op.remove_uids:
698
      uidpool.CheckUidPool(self.op.remove_uids)
699

    
700
    if self.op.master_netmask is not None:
701
      _ValidateNetmask(self.cfg, self.op.master_netmask)
702

    
703
    if self.op.diskparams:
704
      for dt_params in self.op.diskparams.values():
705
        utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
706
      try:
707
        utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
708
        CheckDiskAccessModeValidity(self.op.diskparams)
709
      except errors.OpPrereqError, err:
710
        raise errors.OpPrereqError("While verify diskparams options: %s" % err,
711
                                   errors.ECODE_INVAL)
712

    
713
  def ExpandNames(self):
714
    # FIXME: in the future maybe other cluster params won't require checking on
715
    # all nodes to be modified.
716
    # FIXME: This opcode changes cluster-wide settings. Is acquiring all
717
    # resource locks the right thing, shouldn't it be the BGL instead?
718
    self.needed_locks = {
719
      locking.LEVEL_NODE: locking.ALL_SET,
720
      locking.LEVEL_INSTANCE: locking.ALL_SET,
721
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
722
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
723
    }
724
    self.share_locks = ShareAll()
725

    
726
  def BuildHooksEnv(self):
727
    """Build hooks env.
728

729
    """
730
    return {
731
      "OP_TARGET": self.cfg.GetClusterName(),
732
      "NEW_VG_NAME": self.op.vg_name,
733
      }
734

    
735
  def BuildHooksNodes(self):
736
    """Build hooks nodes.
737

738
    """
739
    mn = self.cfg.GetMasterNode()
740
    return ([mn], [mn])
741

    
742
  def _CheckVgName(self, node_uuids, enabled_disk_templates,
743
                   new_enabled_disk_templates):
744
    """Check the consistency of the vg name on all nodes and in case it gets
745
       unset whether there are instances still using it.
746

747
    """
748
    lvm_is_enabled = utils.IsLvmEnabled(enabled_disk_templates)
749
    lvm_gets_enabled = utils.LvmGetsEnabled(enabled_disk_templates,
750
                                            new_enabled_disk_templates)
751
    current_vg_name = self.cfg.GetVGName()
752

    
753
    if self.op.vg_name == '':
754
      if lvm_is_enabled:
755
        raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
756
                                   " disk templates are or get enabled.")
757

    
758
    if self.op.vg_name is None:
759
      if current_vg_name is None and lvm_is_enabled:
760
        raise errors.OpPrereqError("Please specify a volume group when"
761
                                   " enabling lvm-based disk-templates.")
762

    
763
    if self.op.vg_name is not None and not self.op.vg_name:
764
      if self.cfg.HasAnyDiskOfType(constants.DT_PLAIN):
765
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
766
                                   " instances exist", errors.ECODE_INVAL)
767

    
768
    if (self.op.vg_name is not None and lvm_is_enabled) or \
769
        (self.cfg.GetVGName() is not None and lvm_gets_enabled):
770
      self._CheckVgNameOnNodes(node_uuids)
771

    
772
  def _CheckVgNameOnNodes(self, node_uuids):
773
    """Check the status of the volume group on each node.
774

775
    """
776
    vglist = self.rpc.call_vg_list(node_uuids)
777
    for node_uuid in node_uuids:
778
      msg = vglist[node_uuid].fail_msg
779
      if msg:
780
        # ignoring down node
781
        self.LogWarning("Error while gathering data on node %s"
782
                        " (ignoring node): %s",
783
                        self.cfg.GetNodeName(node_uuid), msg)
784
        continue
785
      vgstatus = utils.CheckVolumeGroupSize(vglist[node_uuid].payload,
786
                                            self.op.vg_name,
787
                                            constants.MIN_VG_SIZE)
788
      if vgstatus:
789
        raise errors.OpPrereqError("Error on node '%s': %s" %
790
                                   (self.cfg.GetNodeName(node_uuid), vgstatus),
791
                                   errors.ECODE_ENVIRON)
792

    
793
  @staticmethod
794
  def _GetEnabledDiskTemplatesInner(op_enabled_disk_templates,
795
                                    old_enabled_disk_templates):
796
    """Determines the enabled disk templates and the subset of disk templates
797
       that are newly enabled by this operation.
798

799
    """
800
    enabled_disk_templates = None
801
    new_enabled_disk_templates = []
802
    if op_enabled_disk_templates:
803
      enabled_disk_templates = op_enabled_disk_templates
804
      new_enabled_disk_templates = \
805
        list(set(enabled_disk_templates)
806
             - set(old_enabled_disk_templates))
807
    else:
808
      enabled_disk_templates = old_enabled_disk_templates
809
    return (enabled_disk_templates, new_enabled_disk_templates)
810

    
811
  def _GetEnabledDiskTemplates(self, cluster):
812
    """Determines the enabled disk templates and the subset of disk templates
813
       that are newly enabled by this operation.
814

815
    """
816
    return self._GetEnabledDiskTemplatesInner(self.op.enabled_disk_templates,
817
                                              cluster.enabled_disk_templates)
818

    
819
  def _CheckIpolicy(self, cluster, enabled_disk_templates):
820
    """Checks the ipolicy.
821

822
    @type cluster: C{objects.Cluster}
823
    @param cluster: the cluster's configuration
824
    @type enabled_disk_templates: list of string
825
    @param enabled_disk_templates: list of (possibly newly) enabled disk
826
      templates
827

828
    """
829
    # FIXME: write unit tests for this
830
    if self.op.ipolicy:
831
      self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
832
                                           group_policy=False)
833

    
834
      CheckIpolicyVsDiskTemplates(self.new_ipolicy,
835
                                  enabled_disk_templates)
836

    
837
      all_instances = self.cfg.GetAllInstancesInfo().values()
838
      violations = set()
839
      for group in self.cfg.GetAllNodeGroupsInfo().values():
840
        instances = frozenset([inst for inst in all_instances
841
                               if compat.any(nuuid in group.members
842
                                             for nuuid in inst.all_nodes)])
843
        new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
844
        ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
845
        new = ComputeNewInstanceViolations(ipol, new_ipolicy, instances,
846
                                           self.cfg)
847
        if new:
848
          violations.update(new)
849

    
850
      if violations:
851
        self.LogWarning("After the ipolicy change the following instances"
852
                        " violate them: %s",
853
                        utils.CommaJoin(utils.NiceSort(violations)))
854
    else:
855
      CheckIpolicyVsDiskTemplates(cluster.ipolicy,
856
                                  enabled_disk_templates)
857

    
858
  def _CheckDrbdHelperOnNodes(self, drbd_helper, node_uuids):
859
    """Checks whether the set DRBD helper actually exists on the nodes.
860

861
    @type drbd_helper: string
862
    @param drbd_helper: path of the drbd usermode helper binary
863
    @type node_uuids: list of strings
864
    @param node_uuids: list of node UUIDs to check for the helper
865

866
    """
867
    # checks given drbd helper on all nodes
868
    helpers = self.rpc.call_drbd_helper(node_uuids)
869
    for (_, ninfo) in self.cfg.GetMultiNodeInfo(node_uuids):
870
      if ninfo.offline:
871
        self.LogInfo("Not checking drbd helper on offline node %s",
872
                     ninfo.name)
873
        continue
874
      msg = helpers[ninfo.uuid].fail_msg
875
      if msg:
876
        raise errors.OpPrereqError("Error checking drbd helper on node"
877
                                   " '%s': %s" % (ninfo.name, msg),
878
                                   errors.ECODE_ENVIRON)
879
      node_helper = helpers[ninfo.uuid].payload
880
      if node_helper != drbd_helper:
881
        raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
882
                                   (ninfo.name, node_helper),
883
                                   errors.ECODE_ENVIRON)
884

    
885
  def _CheckDrbdHelper(self, node_uuids, drbd_enabled, drbd_gets_enabled):
886
    """Check the DRBD usermode helper.
887

888
    @type node_uuids: list of strings
889
    @param node_uuids: a list of nodes' UUIDs
890
    @type drbd_enabled: boolean
891
    @param drbd_enabled: whether DRBD will be enabled after this operation
892
      (no matter if it was disabled before or not)
893
    @type drbd_gets_enabled: boolen
894
    @param drbd_gets_enabled: true if DRBD was disabled before this
895
      operation, but will be enabled afterwards
896

897
    """
898
    if self.op.drbd_helper == '':
899
      if drbd_enabled:
900
        raise errors.OpPrereqError("Cannot disable drbd helper while"
901
                                   " DRBD is enabled.")
902
      if self.cfg.HasAnyDiskOfType(constants.DT_DRBD8):
903
        raise errors.OpPrereqError("Cannot disable drbd helper while"
904
                                   " drbd-based instances exist",
905
                                   errors.ECODE_INVAL)
906

    
907
    else:
908
      if self.op.drbd_helper is not None and drbd_enabled:
909
        self._CheckDrbdHelperOnNodes(self.op.drbd_helper, node_uuids)
910
      else:
911
        if drbd_gets_enabled:
912
          current_drbd_helper = self.cfg.GetClusterInfo().drbd_usermode_helper
913
          if current_drbd_helper is not None:
914
            self._CheckDrbdHelperOnNodes(current_drbd_helper, node_uuids)
915
          else:
916
            raise errors.OpPrereqError("Cannot enable DRBD without a"
917
                                       " DRBD usermode helper set.")
918

    
919
  def _CheckInstancesOfDisabledDiskTemplates(
920
      self, disabled_disk_templates):
921
    """Check whether we try to a disk template that is in use.
922

923
    @type disabled_disk_templates: list of string
924
    @param disabled_disk_templates: list of disk templates that are going to
925
      be disabled by this operation
926

927
    """
928
    for disk_template in disabled_disk_templates:
929
      if self.cfg.HasAnyDiskOfType(disk_template):
930
        raise errors.OpPrereqError(
931
            "Cannot disable disk template '%s', because there is at least one"
932
            " instance using it." % disk_template)
933

    
934
  def CheckPrereq(self):
935
    """Check prerequisites.
936

937
    This checks whether the given params don't conflict and
938
    if the given volume group is valid.
939

940
    """
941
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
942
    self.cluster = cluster = self.cfg.GetClusterInfo()
943

    
944
    vm_capable_node_uuids = [node.uuid
945
                             for node in self.cfg.GetAllNodesInfo().values()
946
                             if node.uuid in node_uuids and node.vm_capable]
947

    
948
    (enabled_disk_templates, new_enabled_disk_templates) = \
949
      self._GetEnabledDiskTemplates(cluster)
950

    
951
    self._CheckVgName(vm_capable_node_uuids, enabled_disk_templates,
952
                      new_enabled_disk_templates)
953

    
954
    if self.op.file_storage_dir is not None:
955
      CheckFileStoragePathVsEnabledDiskTemplates(
956
          self.LogWarning, self.op.file_storage_dir, enabled_disk_templates)
957

    
958
    if self.op.shared_file_storage_dir is not None:
959
      CheckSharedFileStoragePathVsEnabledDiskTemplates(
960
          self.LogWarning, self.op.shared_file_storage_dir,
961
          enabled_disk_templates)
962

    
963
    drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
964
    drbd_gets_enabled = constants.DT_DRBD8 in new_enabled_disk_templates
965
    self._CheckDrbdHelper(node_uuids, drbd_enabled, drbd_gets_enabled)
966

    
967
    # validate params changes
968
    if self.op.beparams:
969
      objects.UpgradeBeParams(self.op.beparams)
970
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
971
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
972

    
973
    if self.op.ndparams:
974
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
975
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
976

    
977
      # TODO: we need a more general way to handle resetting
978
      # cluster-level parameters to default values
979
      if self.new_ndparams["oob_program"] == "":
980
        self.new_ndparams["oob_program"] = \
981
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
982

    
983
    if self.op.hv_state:
984
      new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
985
                                           self.cluster.hv_state_static)
986
      self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
987
                               for hv, values in new_hv_state.items())
988

    
989
    if self.op.disk_state:
990
      new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
991
                                               self.cluster.disk_state_static)
992
      self.new_disk_state = \
993
        dict((storage, dict((name, cluster.SimpleFillDiskState(values))
994
                            for name, values in svalues.items()))
995
             for storage, svalues in new_disk_state.items())
996

    
997
    self._CheckIpolicy(cluster, enabled_disk_templates)
998

    
999
    if self.op.nicparams:
1000
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1001
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
1002
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
1003
      nic_errors = []
1004

    
1005
      # check all instances for consistency
1006
      for instance in self.cfg.GetAllInstancesInfo().values():
1007
        for nic_idx, nic in enumerate(instance.nics):
1008
          params_copy = copy.deepcopy(nic.nicparams)
1009
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
1010

    
1011
          # check parameter syntax
1012
          try:
1013
            objects.NIC.CheckParameterSyntax(params_filled)
1014
          except errors.ConfigurationError, err:
1015
            nic_errors.append("Instance %s, nic/%d: %s" %
1016
                              (instance.name, nic_idx, err))
1017

    
1018
          # if we're moving instances to routed, check that they have an ip
1019
          target_mode = params_filled[constants.NIC_MODE]
1020
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
1021
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
1022
                              " address" % (instance.name, nic_idx))
1023
      if nic_errors:
1024
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
1025
                                   "\n".join(nic_errors), errors.ECODE_INVAL)
1026

    
1027
    # hypervisor list/parameters
1028
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
1029
    if self.op.hvparams:
1030
      for hv_name, hv_dict in self.op.hvparams.items():
1031
        if hv_name not in self.new_hvparams:
1032
          self.new_hvparams[hv_name] = hv_dict
1033
        else:
1034
          self.new_hvparams[hv_name].update(hv_dict)
1035

    
1036
    # disk template parameters
1037
    self.new_diskparams = objects.FillDict(cluster.diskparams, {})
1038
    if self.op.diskparams:
1039
      for dt_name, dt_params in self.op.diskparams.items():
1040
        if dt_name not in self.new_diskparams:
1041
          self.new_diskparams[dt_name] = dt_params
1042
        else:
1043
          self.new_diskparams[dt_name].update(dt_params)
1044
      CheckDiskAccessModeConsistency(self.op.diskparams, self.cfg)
1045

    
1046
    # os hypervisor parameters
1047
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
1048
    if self.op.os_hvp:
1049
      for os_name, hvs in self.op.os_hvp.items():
1050
        if os_name not in self.new_os_hvp:
1051
          self.new_os_hvp[os_name] = hvs
1052
        else:
1053
          for hv_name, hv_dict in hvs.items():
1054
            if hv_dict is None:
1055
              # Delete if it exists
1056
              self.new_os_hvp[os_name].pop(hv_name, None)
1057
            elif hv_name not in self.new_os_hvp[os_name]:
1058
              self.new_os_hvp[os_name][hv_name] = hv_dict
1059
            else:
1060
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
1061

    
1062
    # os parameters
1063
    self.new_osp = objects.FillDict(cluster.osparams, {})
1064
    if self.op.osparams:
1065
      for os_name, osp in self.op.osparams.items():
1066
        if os_name not in self.new_osp:
1067
          self.new_osp[os_name] = {}
1068

    
1069
        self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
1070
                                                 use_none=True)
1071

    
1072
        if not self.new_osp[os_name]:
1073
          # we removed all parameters
1074
          del self.new_osp[os_name]
1075
        else:
1076
          # check the parameter validity (remote check)
1077
          CheckOSParams(self, False, [self.cfg.GetMasterNode()],
1078
                        os_name, self.new_osp[os_name])
1079

    
1080
    # changes to the hypervisor list
1081
    if self.op.enabled_hypervisors is not None:
1082
      self.hv_list = self.op.enabled_hypervisors
1083
      for hv in self.hv_list:
1084
        # if the hypervisor doesn't already exist in the cluster
1085
        # hvparams, we initialize it to empty, and then (in both
1086
        # cases) we make sure to fill the defaults, as we might not
1087
        # have a complete defaults list if the hypervisor wasn't
1088
        # enabled before
1089
        if hv not in new_hvp:
1090
          new_hvp[hv] = {}
1091
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
1092
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
1093
    else:
1094
      self.hv_list = cluster.enabled_hypervisors
1095

    
1096
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1097
      # either the enabled list has changed, or the parameters have, validate
1098
      for hv_name, hv_params in self.new_hvparams.items():
1099
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1100
            (self.op.enabled_hypervisors and
1101
             hv_name in self.op.enabled_hypervisors)):
1102
          # either this is a new hypervisor, or its parameters have changed
1103
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1104
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1105
          hv_class.CheckParameterSyntax(hv_params)
1106
          CheckHVParams(self, node_uuids, hv_name, hv_params)
1107

    
1108
    self._CheckDiskTemplateConsistency()
1109

    
1110
    if self.op.os_hvp:
1111
      # no need to check any newly-enabled hypervisors, since the
1112
      # defaults have already been checked in the above code-block
1113
      for os_name, os_hvp in self.new_os_hvp.items():
1114
        for hv_name, hv_params in os_hvp.items():
1115
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1116
          # we need to fill in the new os_hvp on top of the actual hv_p
1117
          cluster_defaults = self.new_hvparams.get(hv_name, {})
1118
          new_osp = objects.FillDict(cluster_defaults, hv_params)
1119
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1120
          hv_class.CheckParameterSyntax(new_osp)
1121
          CheckHVParams(self, node_uuids, hv_name, new_osp)
1122

    
1123
    if self.op.default_iallocator:
1124
      alloc_script = utils.FindFile(self.op.default_iallocator,
1125
                                    constants.IALLOCATOR_SEARCH_PATH,
1126
                                    os.path.isfile)
1127
      if alloc_script is None:
1128
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
1129
                                   " specified" % self.op.default_iallocator,
1130
                                   errors.ECODE_INVAL)
1131

    
1132
  def _CheckDiskTemplateConsistency(self):
1133
    """Check whether the disk templates that are going to be disabled
1134
       are still in use by some instances.
1135

1136
    """
1137
    if self.op.enabled_disk_templates:
1138
      cluster = self.cfg.GetClusterInfo()
1139
      instances = self.cfg.GetAllInstancesInfo()
1140

    
1141
      disk_templates_to_remove = set(cluster.enabled_disk_templates) \
1142
        - set(self.op.enabled_disk_templates)
1143
      for instance in instances.itervalues():
1144
        if instance.disk_template in disk_templates_to_remove:
1145
          raise errors.OpPrereqError("Cannot disable disk template '%s',"
1146
                                     " because instance '%s' is using it." %
1147
                                     (instance.disk_template, instance.name))
1148

    
1149
  def _SetVgName(self, feedback_fn):
1150
    """Determines and sets the new volume group name.
1151

1152
    """
1153
    if self.op.vg_name is not None:
1154
      new_volume = self.op.vg_name
1155
      if not new_volume:
1156
        new_volume = None
1157
      if new_volume != self.cfg.GetVGName():
1158
        self.cfg.SetVGName(new_volume)
1159
      else:
1160
        feedback_fn("Cluster LVM configuration already in desired"
1161
                    " state, not changing")
1162

    
1163
  def _SetFileStorageDir(self, feedback_fn):
1164
    """Set the file storage directory.
1165

1166
    """
1167
    if self.op.file_storage_dir is not None:
1168
      if self.cluster.file_storage_dir == self.op.file_storage_dir:
1169
        feedback_fn("Global file storage dir already set to value '%s'"
1170
                    % self.cluster.file_storage_dir)
1171
      else:
1172
        self.cluster.file_storage_dir = self.op.file_storage_dir
1173

    
1174
  def _SetDrbdHelper(self, feedback_fn):
1175
    """Set the DRBD usermode helper.
1176

1177
    """
1178
    if self.op.drbd_helper is not None:
1179
      if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
1180
        feedback_fn("Note that you specified a drbd user helper, but did not"
1181
                    " enable the drbd disk template.")
1182
      new_helper = self.op.drbd_helper
1183
      if not new_helper:
1184
        new_helper = None
1185
      if new_helper != self.cfg.GetDRBDHelper():
1186
        self.cfg.SetDRBDHelper(new_helper)
1187
      else:
1188
        feedback_fn("Cluster DRBD helper already in desired state,"
1189
                    " not changing")
1190

    
1191
  def Exec(self, feedback_fn):
1192
    """Change the parameters of the cluster.
1193

1194
    """
1195
    if self.op.enabled_disk_templates:
1196
      self.cluster.enabled_disk_templates = \
1197
        list(set(self.op.enabled_disk_templates))
1198

    
1199
    self._SetVgName(feedback_fn)
1200
    self._SetFileStorageDir(feedback_fn)
1201
    self._SetDrbdHelper(feedback_fn)
1202

    
1203
    if self.op.hvparams:
1204
      self.cluster.hvparams = self.new_hvparams
1205
    if self.op.os_hvp:
1206
      self.cluster.os_hvp = self.new_os_hvp
1207
    if self.op.enabled_hypervisors is not None:
1208
      self.cluster.hvparams = self.new_hvparams
1209
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1210
    if self.op.beparams:
1211
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1212
    if self.op.nicparams:
1213
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1214
    if self.op.ipolicy:
1215
      self.cluster.ipolicy = self.new_ipolicy
1216
    if self.op.osparams:
1217
      self.cluster.osparams = self.new_osp
1218
    if self.op.ndparams:
1219
      self.cluster.ndparams = self.new_ndparams
1220
    if self.op.diskparams:
1221
      self.cluster.diskparams = self.new_diskparams
1222
    if self.op.hv_state:
1223
      self.cluster.hv_state_static = self.new_hv_state
1224
    if self.op.disk_state:
1225
      self.cluster.disk_state_static = self.new_disk_state
1226

    
1227
    if self.op.candidate_pool_size is not None:
1228
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1229
      # we need to update the pool size here, otherwise the save will fail
1230
      AdjustCandidatePool(self, [])
1231

    
1232
    if self.op.maintain_node_health is not None:
1233
      if self.op.maintain_node_health and not constants.ENABLE_CONFD:
1234
        feedback_fn("Note: CONFD was disabled at build time, node health"
1235
                    " maintenance is not useful (still enabling it)")
1236
      self.cluster.maintain_node_health = self.op.maintain_node_health
1237

    
1238
    if self.op.modify_etc_hosts is not None:
1239
      self.cluster.modify_etc_hosts = self.op.modify_etc_hosts
1240

    
1241
    if self.op.prealloc_wipe_disks is not None:
1242
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
1243

    
1244
    if self.op.add_uids is not None:
1245
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
1246

    
1247
    if self.op.remove_uids is not None:
1248
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
1249

    
1250
    if self.op.uid_pool is not None:
1251
      self.cluster.uid_pool = self.op.uid_pool
1252

    
1253
    if self.op.default_iallocator is not None:
1254
      self.cluster.default_iallocator = self.op.default_iallocator
1255

    
1256
    if self.op.reserved_lvs is not None:
1257
      self.cluster.reserved_lvs = self.op.reserved_lvs
1258

    
1259
    if self.op.use_external_mip_script is not None:
1260
      self.cluster.use_external_mip_script = self.op.use_external_mip_script
1261

    
1262
    def helper_os(aname, mods, desc):
1263
      desc += " OS list"
1264
      lst = getattr(self.cluster, aname)
1265
      for key, val in mods:
1266
        if key == constants.DDM_ADD:
1267
          if val in lst:
1268
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1269
          else:
1270
            lst.append(val)
1271
        elif key == constants.DDM_REMOVE:
1272
          if val in lst:
1273
            lst.remove(val)
1274
          else:
1275
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1276
        else:
1277
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
1278

    
1279
    if self.op.hidden_os:
1280
      helper_os("hidden_os", self.op.hidden_os, "hidden")
1281

    
1282
    if self.op.blacklisted_os:
1283
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1284

    
1285
    if self.op.master_netdev:
1286
      master_params = self.cfg.GetMasterNetworkParameters()
1287
      ems = self.cfg.GetUseExternalMipScript()
1288
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
1289
                  self.cluster.master_netdev)
1290
      result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
1291
                                                       master_params, ems)
1292
      if not self.op.force:
1293
        result.Raise("Could not disable the master ip")
1294
      else:
1295
        if result.fail_msg:
1296
          msg = ("Could not disable the master ip (continuing anyway): %s" %
1297
                 result.fail_msg)
1298
          feedback_fn(msg)
1299
      feedback_fn("Changing master_netdev from %s to %s" %
1300
                  (master_params.netdev, self.op.master_netdev))
1301
      self.cluster.master_netdev = self.op.master_netdev
1302

    
1303
    if self.op.master_netmask:
1304
      master_params = self.cfg.GetMasterNetworkParameters()
1305
      feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1306
      result = self.rpc.call_node_change_master_netmask(
1307
                 master_params.uuid, master_params.netmask,
1308
                 self.op.master_netmask, master_params.ip,
1309
                 master_params.netdev)
1310
      result.Warn("Could not change the master IP netmask", feedback_fn)
1311
      self.cluster.master_netmask = self.op.master_netmask
1312

    
1313
    self.cfg.Update(self.cluster, feedback_fn)
1314

    
1315
    if self.op.master_netdev:
1316
      master_params = self.cfg.GetMasterNetworkParameters()
1317
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
1318
                  self.op.master_netdev)
1319
      ems = self.cfg.GetUseExternalMipScript()
1320
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
1321
                                                     master_params, ems)
1322
      result.Warn("Could not re-enable the master ip on the master,"
1323
                  " please restart manually", self.LogWarning)
1324

    
1325

    
1326
class LUClusterVerify(NoHooksLU):
1327
  """Submits all jobs necessary to verify the cluster.
1328

1329
  """
1330
  REQ_BGL = False
1331

    
1332
  def ExpandNames(self):
1333
    self.needed_locks = {}
1334

    
1335
  def Exec(self, feedback_fn):
1336
    jobs = []
1337

    
1338
    if self.op.group_name:
1339
      groups = [self.op.group_name]
1340
      depends_fn = lambda: None
1341
    else:
1342
      groups = self.cfg.GetNodeGroupList()
1343

    
1344
      # Verify global configuration
1345
      jobs.append([
1346
        opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1347
        ])
1348

    
1349
      # Always depend on global verification
1350
      depends_fn = lambda: [(-len(jobs), [])]
1351

    
1352
    jobs.extend(
1353
      [opcodes.OpClusterVerifyGroup(group_name=group,
1354
                                    ignore_errors=self.op.ignore_errors,
1355
                                    depends=depends_fn())]
1356
      for group in groups)
1357

    
1358
    # Fix up all parameters
1359
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1360
      op.debug_simulate_errors = self.op.debug_simulate_errors
1361
      op.verbose = self.op.verbose
1362
      op.error_codes = self.op.error_codes
1363
      try:
1364
        op.skip_checks = self.op.skip_checks
1365
      except AttributeError:
1366
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1367

    
1368
    return ResultWithJobs(jobs)
1369

    
1370

    
1371
class _VerifyErrors(object):
1372
  """Mix-in for cluster/group verify LUs.
1373

1374
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1375
  self.op and self._feedback_fn to be available.)
1376

1377
  """
1378

    
1379
  ETYPE_FIELD = "code"
1380
  ETYPE_ERROR = "ERROR"
1381
  ETYPE_WARNING = "WARNING"
1382

    
1383
  def _Error(self, ecode, item, msg, *args, **kwargs):
1384
    """Format an error message.
1385

1386
    Based on the opcode's error_codes parameter, either format a
1387
    parseable error code, or a simpler error string.
1388

1389
    This must be called only from Exec and functions called from Exec.
1390

1391
    """
1392
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1393
    itype, etxt, _ = ecode
1394
    # If the error code is in the list of ignored errors, demote the error to a
1395
    # warning
1396
    if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1397
      ltype = self.ETYPE_WARNING
1398
    # first complete the msg
1399
    if args:
1400
      msg = msg % args
1401
    # then format the whole message
1402
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1403
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1404
    else:
1405
      if item:
1406
        item = " " + item
1407
      else:
1408
        item = ""
1409
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1410
    # and finally report it via the feedback_fn
1411
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1412
    # do not mark the operation as failed for WARN cases only
1413
    if ltype == self.ETYPE_ERROR:
1414
      self.bad = True
1415

    
1416
  def _ErrorIf(self, cond, *args, **kwargs):
1417
    """Log an error message if the passed condition is True.
1418

1419
    """
1420
    if (bool(cond)
1421
        or self.op.debug_simulate_errors): # pylint: disable=E1101
1422
      self._Error(*args, **kwargs)
1423

    
1424

    
1425
def _VerifyCertificate(filename):
1426
  """Verifies a certificate for L{LUClusterVerifyConfig}.
1427

1428
  @type filename: string
1429
  @param filename: Path to PEM file
1430

1431
  """
1432
  try:
1433
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1434
                                           utils.ReadFile(filename))
1435
  except Exception, err: # pylint: disable=W0703
1436
    return (LUClusterVerifyConfig.ETYPE_ERROR,
1437
            "Failed to load X509 certificate %s: %s" % (filename, err))
1438

    
1439
  (errcode, msg) = \
1440
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1441
                                constants.SSL_CERT_EXPIRATION_ERROR)
1442

    
1443
  if msg:
1444
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1445
  else:
1446
    fnamemsg = None
1447

    
1448
  if errcode is None:
1449
    return (None, fnamemsg)
1450
  elif errcode == utils.CERT_WARNING:
1451
    return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1452
  elif errcode == utils.CERT_ERROR:
1453
    return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1454

    
1455
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1456

    
1457

    
1458
def _GetAllHypervisorParameters(cluster, instances):
1459
  """Compute the set of all hypervisor parameters.
1460

1461
  @type cluster: L{objects.Cluster}
1462
  @param cluster: the cluster object
1463
  @param instances: list of L{objects.Instance}
1464
  @param instances: additional instances from which to obtain parameters
1465
  @rtype: list of (origin, hypervisor, parameters)
1466
  @return: a list with all parameters found, indicating the hypervisor they
1467
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1468

1469
  """
1470
  hvp_data = []
1471

    
1472
  for hv_name in cluster.enabled_hypervisors:
1473
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1474

    
1475
  for os_name, os_hvp in cluster.os_hvp.items():
1476
    for hv_name, hv_params in os_hvp.items():
1477
      if hv_params:
1478
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1479
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1480

    
1481
  # TODO: collapse identical parameter values in a single one
1482
  for instance in instances:
1483
    if instance.hvparams:
1484
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1485
                       cluster.FillHV(instance)))
1486

    
1487
  return hvp_data
1488

    
1489

    
1490
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1491
  """Verifies the cluster config.
1492

1493
  """
1494
  REQ_BGL = False
1495

    
1496
  def _VerifyHVP(self, hvp_data):
1497
    """Verifies locally the syntax of the hypervisor parameters.
1498

1499
    """
1500
    for item, hv_name, hv_params in hvp_data:
1501
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1502
             (item, hv_name))
1503
      try:
1504
        hv_class = hypervisor.GetHypervisorClass(hv_name)
1505
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1506
        hv_class.CheckParameterSyntax(hv_params)
1507
      except errors.GenericError, err:
1508
        self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1509

    
1510
  def ExpandNames(self):
1511
    self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1512
    self.share_locks = ShareAll()
1513

    
1514
  def CheckPrereq(self):
1515
    """Check prerequisites.
1516

1517
    """
1518
    # Retrieve all information
1519
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1520
    self.all_node_info = self.cfg.GetAllNodesInfo()
1521
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1522

    
1523
  def Exec(self, feedback_fn):
1524
    """Verify integrity of cluster, performing various test on nodes.
1525

1526
    """
1527
    self.bad = False
1528
    self._feedback_fn = feedback_fn
1529

    
1530
    feedback_fn("* Verifying cluster config")
1531

    
1532
    for msg in self.cfg.VerifyConfig():
1533
      self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1534

    
1535
    feedback_fn("* Verifying cluster certificate files")
1536

    
1537
    for cert_filename in pathutils.ALL_CERT_FILES:
1538
      (errcode, msg) = _VerifyCertificate(cert_filename)
1539
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1540

    
1541
    self._ErrorIf(not utils.CanRead(constants.LUXID_USER,
1542
                                    pathutils.NODED_CERT_FILE),
1543
                  constants.CV_ECLUSTERCERT,
1544
                  None,
1545
                  pathutils.NODED_CERT_FILE + " must be accessible by the " +
1546
                    constants.LUXID_USER + " user")
1547

    
1548
    feedback_fn("* Verifying hypervisor parameters")
1549

    
1550
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1551
                                                self.all_inst_info.values()))
1552

    
1553
    feedback_fn("* Verifying all nodes belong to an existing group")
1554

    
1555
    # We do this verification here because, should this bogus circumstance
1556
    # occur, it would never be caught by VerifyGroup, which only acts on
1557
    # nodes/instances reachable from existing node groups.
1558

    
1559
    dangling_nodes = set(node for node in self.all_node_info.values()
1560
                         if node.group not in self.all_group_info)
1561

    
1562
    dangling_instances = {}
1563
    no_node_instances = []
1564

    
1565
    for inst in self.all_inst_info.values():
1566
      if inst.primary_node in [node.uuid for node in dangling_nodes]:
1567
        dangling_instances.setdefault(inst.primary_node, []).append(inst)
1568
      elif inst.primary_node not in self.all_node_info:
1569
        no_node_instances.append(inst)
1570

    
1571
    pretty_dangling = [
1572
        "%s (%s)" %
1573
        (node.name,
1574
         utils.CommaJoin(inst.name for
1575
                         inst in dangling_instances.get(node.uuid, [])))
1576
        for node in dangling_nodes]
1577

    
1578
    self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1579
                  None,
1580
                  "the following nodes (and their instances) belong to a non"
1581
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1582

    
1583
    self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1584
                  None,
1585
                  "the following instances have a non-existing primary-node:"
1586
                  " %s", utils.CommaJoin(inst.name for
1587
                                         inst in no_node_instances))
1588

    
1589
    return not self.bad
1590

    
1591

    
1592
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1593
  """Verifies the status of a node group.
1594

1595
  """
1596
  HPATH = "cluster-verify"
1597
  HTYPE = constants.HTYPE_CLUSTER
1598
  REQ_BGL = False
1599

    
1600
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1601

    
1602
  class NodeImage(object):
1603
    """A class representing the logical and physical status of a node.
1604

1605
    @type uuid: string
1606
    @ivar uuid: the node UUID to which this object refers
1607
    @ivar volumes: a structure as returned from
1608
        L{ganeti.backend.GetVolumeList} (runtime)
1609
    @ivar instances: a list of running instances (runtime)
1610
    @ivar pinst: list of configured primary instances (config)
1611
    @ivar sinst: list of configured secondary instances (config)
1612
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1613
        instances for which this node is secondary (config)
1614
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1615
    @ivar dfree: free disk, as reported by the node (runtime)
1616
    @ivar offline: the offline status (config)
1617
    @type rpc_fail: boolean
1618
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1619
        not whether the individual keys were correct) (runtime)
1620
    @type lvm_fail: boolean
1621
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1622
    @type hyp_fail: boolean
1623
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1624
    @type ghost: boolean
1625
    @ivar ghost: whether this is a known node or not (config)
1626
    @type os_fail: boolean
1627
    @ivar os_fail: whether the RPC call didn't return valid OS data
1628
    @type oslist: list
1629
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1630
    @type vm_capable: boolean
1631
    @ivar vm_capable: whether the node can host instances
1632
    @type pv_min: float
1633
    @ivar pv_min: size in MiB of the smallest PVs
1634
    @type pv_max: float
1635
    @ivar pv_max: size in MiB of the biggest PVs
1636

1637
    """
1638
    def __init__(self, offline=False, uuid=None, vm_capable=True):
1639
      self.uuid = uuid
1640
      self.volumes = {}
1641
      self.instances = []
1642
      self.pinst = []
1643
      self.sinst = []
1644
      self.sbp = {}
1645
      self.mfree = 0
1646
      self.dfree = 0
1647
      self.offline = offline
1648
      self.vm_capable = vm_capable
1649
      self.rpc_fail = False
1650
      self.lvm_fail = False
1651
      self.hyp_fail = False
1652
      self.ghost = False
1653
      self.os_fail = False
1654
      self.oslist = {}
1655
      self.pv_min = None
1656
      self.pv_max = None
1657

    
1658
  def ExpandNames(self):
1659
    # This raises errors.OpPrereqError on its own:
1660
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1661

    
1662
    # Get instances in node group; this is unsafe and needs verification later
1663
    inst_uuids = \
1664
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1665

    
1666
    self.needed_locks = {
1667
      locking.LEVEL_INSTANCE: self.cfg.GetInstanceNames(inst_uuids),
1668
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1669
      locking.LEVEL_NODE: [],
1670

    
1671
      # This opcode is run by watcher every five minutes and acquires all nodes
1672
      # for a group. It doesn't run for a long time, so it's better to acquire
1673
      # the node allocation lock as well.
1674
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1675
      }
1676

    
1677
    self.share_locks = ShareAll()
1678

    
1679
  def DeclareLocks(self, level):
1680
    if level == locking.LEVEL_NODE:
1681
      # Get members of node group; this is unsafe and needs verification later
1682
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1683

    
1684
      # In Exec(), we warn about mirrored instances that have primary and
1685
      # secondary living in separate node groups. To fully verify that
1686
      # volumes for these instances are healthy, we will need to do an
1687
      # extra call to their secondaries. We ensure here those nodes will
1688
      # be locked.
1689
      for inst_name in self.owned_locks(locking.LEVEL_INSTANCE):
1690
        # Important: access only the instances whose lock is owned
1691
        instance = self.cfg.GetInstanceInfoByName(inst_name)
1692
        if instance.disk_template in constants.DTS_INT_MIRROR:
1693
          nodes.update(instance.secondary_nodes)
1694

    
1695
      self.needed_locks[locking.LEVEL_NODE] = nodes
1696

    
1697
  def CheckPrereq(self):
1698
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1699
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1700

    
1701
    group_node_uuids = set(self.group_info.members)
1702
    group_inst_uuids = \
1703
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1704

    
1705
    unlocked_node_uuids = \
1706
        group_node_uuids.difference(self.owned_locks(locking.LEVEL_NODE))
1707

    
1708
    unlocked_inst_uuids = \
1709
        group_inst_uuids.difference(
1710
          [self.cfg.GetInstanceInfoByName(name).uuid
1711
           for name in self.owned_locks(locking.LEVEL_INSTANCE)])
1712

    
1713
    if unlocked_node_uuids:
1714
      raise errors.OpPrereqError(
1715
        "Missing lock for nodes: %s" %
1716
        utils.CommaJoin(self.cfg.GetNodeNames(unlocked_node_uuids)),
1717
        errors.ECODE_STATE)
1718

    
1719
    if unlocked_inst_uuids:
1720
      raise errors.OpPrereqError(
1721
        "Missing lock for instances: %s" %
1722
        utils.CommaJoin(self.cfg.GetInstanceNames(unlocked_inst_uuids)),
1723
        errors.ECODE_STATE)
1724

    
1725
    self.all_node_info = self.cfg.GetAllNodesInfo()
1726
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1727

    
1728
    self.my_node_uuids = group_node_uuids
1729
    self.my_node_info = dict((node_uuid, self.all_node_info[node_uuid])
1730
                             for node_uuid in group_node_uuids)
1731

    
1732
    self.my_inst_uuids = group_inst_uuids
1733
    self.my_inst_info = dict((inst_uuid, self.all_inst_info[inst_uuid])
1734
                             for inst_uuid in group_inst_uuids)
1735

    
1736
    # We detect here the nodes that will need the extra RPC calls for verifying
1737
    # split LV volumes; they should be locked.
1738
    extra_lv_nodes = set()
1739

    
1740
    for inst in self.my_inst_info.values():
1741
      if inst.disk_template in constants.DTS_INT_MIRROR:
1742
        for nuuid in inst.all_nodes:
1743
          if self.all_node_info[nuuid].group != self.group_uuid:
1744
            extra_lv_nodes.add(nuuid)
1745

    
1746
    unlocked_lv_nodes = \
1747
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1748

    
1749
    if unlocked_lv_nodes:
1750
      raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1751
                                 utils.CommaJoin(unlocked_lv_nodes),
1752
                                 errors.ECODE_STATE)
1753
    self.extra_lv_nodes = list(extra_lv_nodes)
1754

    
1755
  def _VerifyNode(self, ninfo, nresult):
1756
    """Perform some basic validation on data returned from a node.
1757

1758
      - check the result data structure is well formed and has all the
1759
        mandatory fields
1760
      - check ganeti version
1761

1762
    @type ninfo: L{objects.Node}
1763
    @param ninfo: the node to check
1764
    @param nresult: the results from the node
1765
    @rtype: boolean
1766
    @return: whether overall this call was successful (and we can expect
1767
         reasonable values in the respose)
1768

1769
    """
1770
    # main result, nresult should be a non-empty dict
1771
    test = not nresult or not isinstance(nresult, dict)
1772
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1773
                  "unable to verify node: no data returned")
1774
    if test:
1775
      return False
1776

    
1777
    # compares ganeti version
1778
    local_version = constants.PROTOCOL_VERSION
1779
    remote_version = nresult.get("version", None)
1780
    test = not (remote_version and
1781
                isinstance(remote_version, (list, tuple)) and
1782
                len(remote_version) == 2)
1783
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1784
                  "connection to node returned invalid data")
1785
    if test:
1786
      return False
1787

    
1788
    test = local_version != remote_version[0]
1789
    self._ErrorIf(test, constants.CV_ENODEVERSION, ninfo.name,
1790
                  "incompatible protocol versions: master %s,"
1791
                  " node %s", local_version, remote_version[0])
1792
    if test:
1793
      return False
1794

    
1795
    # node seems compatible, we can actually try to look into its results
1796

    
1797
    # full package version
1798
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1799
                  constants.CV_ENODEVERSION, ninfo.name,
1800
                  "software version mismatch: master %s, node %s",
1801
                  constants.RELEASE_VERSION, remote_version[1],
1802
                  code=self.ETYPE_WARNING)
1803

    
1804
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1805
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1806
      for hv_name, hv_result in hyp_result.iteritems():
1807
        test = hv_result is not None
1808
        self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
1809
                      "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1810

    
1811
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1812
    if ninfo.vm_capable and isinstance(hvp_result, list):
1813
      for item, hv_name, hv_result in hvp_result:
1814
        self._ErrorIf(True, constants.CV_ENODEHV, ninfo.name,
1815
                      "hypervisor %s parameter verify failure (source %s): %s",
1816
                      hv_name, item, hv_result)
1817

    
1818
    test = nresult.get(constants.NV_NODESETUP,
1819
                       ["Missing NODESETUP results"])
1820
    self._ErrorIf(test, constants.CV_ENODESETUP, ninfo.name,
1821
                  "node setup error: %s", "; ".join(test))
1822

    
1823
    return True
1824

    
1825
  def _VerifyNodeTime(self, ninfo, nresult,
1826
                      nvinfo_starttime, nvinfo_endtime):
1827
    """Check the node time.
1828

1829
    @type ninfo: L{objects.Node}
1830
    @param ninfo: the node to check
1831
    @param nresult: the remote results for the node
1832
    @param nvinfo_starttime: the start time of the RPC call
1833
    @param nvinfo_endtime: the end time of the RPC call
1834

1835
    """
1836
    ntime = nresult.get(constants.NV_TIME, None)
1837
    try:
1838
      ntime_merged = utils.MergeTime(ntime)
1839
    except (ValueError, TypeError):
1840
      self._ErrorIf(True, constants.CV_ENODETIME, ninfo.name,
1841
                    "Node returned invalid time")
1842
      return
1843

    
1844
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1845
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1846
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1847
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1848
    else:
1849
      ntime_diff = None
1850

    
1851
    self._ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, ninfo.name,
1852
                  "Node time diverges by at least %s from master node time",
1853
                  ntime_diff)
1854

    
1855
  def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1856
    """Check the node LVM results and update info for cross-node checks.
1857

1858
    @type ninfo: L{objects.Node}
1859
    @param ninfo: the node to check
1860
    @param nresult: the remote results for the node
1861
    @param vg_name: the configured VG name
1862
    @type nimg: L{NodeImage}
1863
    @param nimg: node image
1864

1865
    """
1866
    if vg_name is None:
1867
      return
1868

    
1869
    # checks vg existence and size > 20G
1870
    vglist = nresult.get(constants.NV_VGLIST, None)
1871
    test = not vglist
1872
    self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
1873
                  "unable to check volume groups")
1874
    if not test:
1875
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1876
                                            constants.MIN_VG_SIZE)
1877
      self._ErrorIf(vgstatus, constants.CV_ENODELVM, ninfo.name, vgstatus)
1878

    
1879
    # Check PVs
1880
    (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1881
    for em in errmsgs:
1882
      self._Error(constants.CV_ENODELVM, ninfo.name, em)
1883
    if pvminmax is not None:
1884
      (nimg.pv_min, nimg.pv_max) = pvminmax
1885

    
1886
  def _VerifyGroupDRBDVersion(self, node_verify_infos):
1887
    """Check cross-node DRBD version consistency.
1888

1889
    @type node_verify_infos: dict
1890
    @param node_verify_infos: infos about nodes as returned from the
1891
      node_verify call.
1892

1893
    """
1894
    node_versions = {}
1895
    for node_uuid, ndata in node_verify_infos.items():
1896
      nresult = ndata.payload
1897
      version = nresult.get(constants.NV_DRBDVERSION, "Missing DRBD version")
1898
      node_versions[node_uuid] = version
1899

    
1900
    if len(set(node_versions.values())) > 1:
1901
      for node_uuid, version in sorted(node_versions.items()):
1902
        msg = "DRBD version mismatch: %s" % version
1903
        self._Error(constants.CV_ENODEDRBDHELPER, node_uuid, msg,
1904
                    code=self.ETYPE_WARNING)
1905

    
1906
  def _VerifyGroupLVM(self, node_image, vg_name):
1907
    """Check cross-node consistency in LVM.
1908

1909
    @type node_image: dict
1910
    @param node_image: info about nodes, mapping from node to names to
1911
      L{NodeImage} objects
1912
    @param vg_name: the configured VG name
1913

1914
    """
1915
    if vg_name is None:
1916
      return
1917

    
1918
    # Only exclusive storage needs this kind of checks
1919
    if not self._exclusive_storage:
1920
      return
1921

    
1922
    # exclusive_storage wants all PVs to have the same size (approximately),
1923
    # if the smallest and the biggest ones are okay, everything is fine.
1924
    # pv_min is None iff pv_max is None
1925
    vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1926
    if not vals:
1927
      return
1928
    (pvmin, minnode_uuid) = min((ni.pv_min, ni.uuid) for ni in vals)
1929
    (pvmax, maxnode_uuid) = max((ni.pv_max, ni.uuid) for ni in vals)
1930
    bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1931
    self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1932
                  "PV sizes differ too much in the group; smallest (%s MB) is"
1933
                  " on %s, biggest (%s MB) is on %s",
1934
                  pvmin, self.cfg.GetNodeName(minnode_uuid),
1935
                  pvmax, self.cfg.GetNodeName(maxnode_uuid))
1936

    
1937
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1938
    """Check the node bridges.
1939

1940
    @type ninfo: L{objects.Node}
1941
    @param ninfo: the node to check
1942
    @param nresult: the remote results for the node
1943
    @param bridges: the expected list of bridges
1944

1945
    """
1946
    if not bridges:
1947
      return
1948

    
1949
    missing = nresult.get(constants.NV_BRIDGES, None)
1950
    test = not isinstance(missing, list)
1951
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1952
                  "did not return valid bridge information")
1953
    if not test:
1954
      self._ErrorIf(bool(missing), constants.CV_ENODENET, ninfo.name,
1955
                    "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1956

    
1957
  def _VerifyNodeUserScripts(self, ninfo, nresult):
1958
    """Check the results of user scripts presence and executability on the node
1959

1960
    @type ninfo: L{objects.Node}
1961
    @param ninfo: the node to check
1962
    @param nresult: the remote results for the node
1963

1964
    """
1965
    test = not constants.NV_USERSCRIPTS in nresult
1966
    self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1967
                  "did not return user scripts information")
1968

    
1969
    broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
1970
    if not test:
1971
      self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1972
                    "user scripts not present or not executable: %s" %
1973
                    utils.CommaJoin(sorted(broken_scripts)))
1974

    
1975
  def _VerifyNodeNetwork(self, ninfo, nresult):
1976
    """Check the node network connectivity results.
1977

1978
    @type ninfo: L{objects.Node}
1979
    @param ninfo: the node to check
1980
    @param nresult: the remote results for the node
1981

1982
    """
1983
    test = constants.NV_NODELIST not in nresult
1984
    self._ErrorIf(test, constants.CV_ENODESSH, ninfo.name,
1985
                  "node hasn't returned node ssh connectivity data")
1986
    if not test:
1987
      if nresult[constants.NV_NODELIST]:
1988
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1989
          self._ErrorIf(True, constants.CV_ENODESSH, ninfo.name,
1990
                        "ssh communication with node '%s': %s", a_node, a_msg)
1991

    
1992
    test = constants.NV_NODENETTEST not in nresult
1993
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1994
                  "node hasn't returned node tcp connectivity data")
1995
    if not test:
1996
      if nresult[constants.NV_NODENETTEST]:
1997
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1998
        for anode in nlist:
1999
          self._ErrorIf(True, constants.CV_ENODENET, ninfo.name,
2000
                        "tcp communication with node '%s': %s",
2001
                        anode, nresult[constants.NV_NODENETTEST][anode])
2002

    
2003
    test = constants.NV_MASTERIP not in nresult
2004
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
2005
                  "node hasn't returned node master IP reachability data")
2006
    if not test:
2007
      if not nresult[constants.NV_MASTERIP]:
2008
        if ninfo.uuid == self.master_node:
2009
          msg = "the master node cannot reach the master IP (not configured?)"
2010
        else:
2011
          msg = "cannot reach the master IP"
2012
        self._ErrorIf(True, constants.CV_ENODENET, ninfo.name, msg)
2013

    
2014
  def _VerifyInstance(self, instance, node_image, diskstatus):
2015
    """Verify an instance.
2016

2017
    This function checks to see if the required block devices are
2018
    available on the instance's node, and that the nodes are in the correct
2019
    state.
2020

2021
    """
2022
    pnode_uuid = instance.primary_node
2023
    pnode_img = node_image[pnode_uuid]
2024
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
2025

    
2026
    node_vol_should = {}
2027
    instance.MapLVsByNode(node_vol_should)
2028

    
2029
    cluster = self.cfg.GetClusterInfo()
2030
    ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2031
                                                            self.group_info)
2032
    err = ComputeIPolicyInstanceViolation(ipolicy, instance, self.cfg)
2033
    self._ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance.name,
2034
                  utils.CommaJoin(err), code=self.ETYPE_WARNING)
2035

    
2036
    for node_uuid in node_vol_should:
2037
      n_img = node_image[node_uuid]
2038
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
2039
        # ignore missing volumes on offline or broken nodes
2040
        continue
2041
      for volume in node_vol_should[node_uuid]:
2042
        test = volume not in n_img.volumes
2043
        self._ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance.name,
2044
                      "volume %s missing on node %s", volume,
2045
                      self.cfg.GetNodeName(node_uuid))
2046

    
2047
    if instance.admin_state == constants.ADMINST_UP:
2048
      test = instance.uuid not in pnode_img.instances and not pnode_img.offline
2049
      self._ErrorIf(test, constants.CV_EINSTANCEDOWN, instance.name,
2050
                    "instance not running on its primary node %s",
2051
                     self.cfg.GetNodeName(pnode_uuid))
2052
      self._ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE,
2053
                    instance.name, "instance is marked as running and lives on"
2054
                    " offline node %s", self.cfg.GetNodeName(pnode_uuid))
2055

    
2056
    diskdata = [(nname, success, status, idx)
2057
                for (nname, disks) in diskstatus.items()
2058
                for idx, (success, status) in enumerate(disks)]
2059

    
2060
    for nname, success, bdev_status, idx in diskdata:
2061
      # the 'ghost node' construction in Exec() ensures that we have a
2062
      # node here
2063
      snode = node_image[nname]
2064
      bad_snode = snode.ghost or snode.offline
2065
      self._ErrorIf(instance.disks_active and
2066
                    not success and not bad_snode,
2067
                    constants.CV_EINSTANCEFAULTYDISK, instance.name,
2068
                    "couldn't retrieve status for disk/%s on %s: %s",
2069
                    idx, self.cfg.GetNodeName(nname), bdev_status)
2070

    
2071
      if instance.disks_active and success and \
2072
         (bdev_status.is_degraded or
2073
          bdev_status.ldisk_status != constants.LDS_OKAY):
2074
        msg = "disk/%s on %s" % (idx, self.cfg.GetNodeName(nname))
2075
        if bdev_status.is_degraded:
2076
          msg += " is degraded"
2077
        if bdev_status.ldisk_status != constants.LDS_OKAY:
2078
          msg += "; state is '%s'" % \
2079
                 constants.LDS_NAMES[bdev_status.ldisk_status]
2080

    
2081
        self._Error(constants.CV_EINSTANCEFAULTYDISK, instance.name, msg)
2082

    
2083
    self._ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2084
                  constants.CV_ENODERPC, self.cfg.GetNodeName(pnode_uuid),
2085
                  "instance %s, connection to primary node failed",
2086
                  instance.name)
2087

    
2088
    self._ErrorIf(len(instance.secondary_nodes) > 1,
2089
                  constants.CV_EINSTANCELAYOUT, instance.name,
2090
                  "instance has multiple secondary nodes: %s",
2091
                  utils.CommaJoin(instance.secondary_nodes),
2092
                  code=self.ETYPE_WARNING)
2093

    
2094
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, instance.all_nodes)
2095
    if any(es_flags.values()):
2096
      if instance.disk_template not in constants.DTS_EXCL_STORAGE:
2097
        # Disk template not compatible with exclusive_storage: no instance
2098
        # node should have the flag set
2099
        es_nodes = [n
2100
                    for (n, es) in es_flags.items()
2101
                    if es]
2102
        self._Error(constants.CV_EINSTANCEUNSUITABLENODE, instance.name,
2103
                    "instance has template %s, which is not supported on nodes"
2104
                    " that have exclusive storage set: %s",
2105
                    instance.disk_template,
2106
                    utils.CommaJoin(self.cfg.GetNodeNames(es_nodes)))
2107
      for (idx, disk) in enumerate(instance.disks):
2108
        self._ErrorIf(disk.spindles is None,
2109
                      constants.CV_EINSTANCEMISSINGCFGPARAMETER, instance.name,
2110
                      "number of spindles not configured for disk %s while"
2111
                      " exclusive storage is enabled, try running"
2112
                      " gnt-cluster repair-disk-sizes", idx)
2113

    
2114
    if instance.disk_template in constants.DTS_INT_MIRROR:
2115
      instance_nodes = utils.NiceSort(instance.all_nodes)
2116
      instance_groups = {}
2117

    
2118
      for node_uuid in instance_nodes:
2119
        instance_groups.setdefault(self.all_node_info[node_uuid].group,
2120
                                   []).append(node_uuid)
2121

    
2122
      pretty_list = [
2123
        "%s (group %s)" % (utils.CommaJoin(self.cfg.GetNodeNames(nodes)),
2124
                           groupinfo[group].name)
2125
        # Sort so that we always list the primary node first.
2126
        for group, nodes in sorted(instance_groups.items(),
2127
                                   key=lambda (_, nodes): pnode_uuid in nodes,
2128
                                   reverse=True)]
2129

    
2130
      self._ErrorIf(len(instance_groups) > 1,
2131
                    constants.CV_EINSTANCESPLITGROUPS,
2132
                    instance.name, "instance has primary and secondary nodes in"
2133
                    " different groups: %s", utils.CommaJoin(pretty_list),
2134
                    code=self.ETYPE_WARNING)
2135

    
2136
    inst_nodes_offline = []
2137
    for snode in instance.secondary_nodes:
2138
      s_img = node_image[snode]
2139
      self._ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
2140
                    self.cfg.GetNodeName(snode),
2141
                    "instance %s, connection to secondary node failed",
2142
                    instance.name)
2143

    
2144
      if s_img.offline:
2145
        inst_nodes_offline.append(snode)
2146

    
2147
    # warn that the instance lives on offline nodes
2148
    self._ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE,
2149
                  instance.name, "instance has offline secondary node(s) %s",
2150
                  utils.CommaJoin(self.cfg.GetNodeNames(inst_nodes_offline)))
2151
    # ... or ghost/non-vm_capable nodes
2152
    for node_uuid in instance.all_nodes:
2153
      self._ErrorIf(node_image[node_uuid].ghost, constants.CV_EINSTANCEBADNODE,
2154
                    instance.name, "instance lives on ghost node %s",
2155
                    self.cfg.GetNodeName(node_uuid))
2156
      self._ErrorIf(not node_image[node_uuid].vm_capable,
2157
                    constants.CV_EINSTANCEBADNODE, instance.name,
2158
                    "instance lives on non-vm_capable node %s",
2159
                    self.cfg.GetNodeName(node_uuid))
2160

    
2161
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2162
    """Verify if there are any unknown volumes in the cluster.
2163

2164
    The .os, .swap and backup volumes are ignored. All other volumes are
2165
    reported as unknown.
2166

2167
    @type reserved: L{ganeti.utils.FieldSet}
2168
    @param reserved: a FieldSet of reserved volume names
2169

2170
    """
2171
    for node_uuid, n_img in node_image.items():
2172
      if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
2173
          self.all_node_info[node_uuid].group != self.group_uuid):
2174
        # skip non-healthy nodes
2175
        continue
2176
      for volume in n_img.volumes:
2177
        test = ((node_uuid not in node_vol_should or
2178
                volume not in node_vol_should[node_uuid]) and
2179
                not reserved.Matches(volume))
2180
        self._ErrorIf(test, constants.CV_ENODEORPHANLV,
2181
                      self.cfg.GetNodeName(node_uuid),
2182
                      "volume %s is unknown", volume)
2183

    
2184
  def _VerifyNPlusOneMemory(self, node_image, all_insts):
2185
    """Verify N+1 Memory Resilience.
2186

2187
    Check that if one single node dies we can still start all the
2188
    instances it was primary for.
2189

2190
    """
2191
    cluster_info = self.cfg.GetClusterInfo()
2192
    for node_uuid, n_img in node_image.items():
2193
      # This code checks that every node which is now listed as
2194
      # secondary has enough memory to host all instances it is
2195
      # supposed to should a single other node in the cluster fail.
2196
      # FIXME: not ready for failover to an arbitrary node
2197
      # FIXME: does not support file-backed instances
2198
      # WARNING: we currently take into account down instances as well
2199
      # as up ones, considering that even if they're down someone
2200
      # might want to start them even in the event of a node failure.
2201
      if n_img.offline or \
2202
         self.all_node_info[node_uuid].group != self.group_uuid:
2203
        # we're skipping nodes marked offline and nodes in other groups from
2204
        # the N+1 warning, since most likely we don't have good memory
2205
        # information from them; we already list instances living on such
2206
        # nodes, and that's enough warning
2207
        continue
2208
      #TODO(dynmem): also consider ballooning out other instances
2209
      for prinode, inst_uuids in n_img.sbp.items():
2210
        needed_mem = 0
2211
        for inst_uuid in inst_uuids:
2212
          bep = cluster_info.FillBE(all_insts[inst_uuid])
2213
          if bep[constants.BE_AUTO_BALANCE]:
2214
            needed_mem += bep[constants.BE_MINMEM]
2215
        test = n_img.mfree < needed_mem
2216
        self._ErrorIf(test, constants.CV_ENODEN1,
2217
                      self.cfg.GetNodeName(node_uuid),
2218
                      "not enough memory to accomodate instance failovers"
2219
                      " should node %s fail (%dMiB needed, %dMiB available)",
2220
                      self.cfg.GetNodeName(prinode), needed_mem, n_img.mfree)
2221

    
2222
  def _VerifyFiles(self, nodes, master_node_uuid, all_nvinfo,
2223
                   (files_all, files_opt, files_mc, files_vm)):
2224
    """Verifies file checksums collected from all nodes.
2225

2226
    @param nodes: List of L{objects.Node} objects
2227
    @param master_node_uuid: UUID of master node
2228
    @param all_nvinfo: RPC results
2229

2230
    """
2231
    # Define functions determining which nodes to consider for a file
2232
    files2nodefn = [
2233
      (files_all, None),
2234
      (files_mc, lambda node: (node.master_candidate or
2235
                               node.uuid == master_node_uuid)),
2236
      (files_vm, lambda node: node.vm_capable),
2237
      ]
2238

    
2239
    # Build mapping from filename to list of nodes which should have the file
2240
    nodefiles = {}
2241
    for (files, fn) in files2nodefn:
2242
      if fn is None:
2243
        filenodes = nodes
2244
      else:
2245
        filenodes = filter(fn, nodes)
2246
      nodefiles.update((filename,
2247
                        frozenset(map(operator.attrgetter("uuid"), filenodes)))
2248
                       for filename in files)
2249

    
2250
    assert set(nodefiles) == (files_all | files_mc | files_vm)
2251

    
2252
    fileinfo = dict((filename, {}) for filename in nodefiles)
2253
    ignore_nodes = set()
2254

    
2255
    for node in nodes:
2256
      if node.offline:
2257
        ignore_nodes.add(node.uuid)
2258
        continue
2259

    
2260
      nresult = all_nvinfo[node.uuid]
2261

    
2262
      if nresult.fail_msg or not nresult.payload:
2263
        node_files = None
2264
      else:
2265
        fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
2266
        node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2267
                          for (key, value) in fingerprints.items())
2268
        del fingerprints
2269

    
2270
      test = not (node_files and isinstance(node_files, dict))
2271
      self._ErrorIf(test, constants.CV_ENODEFILECHECK, node.name,
2272
                    "Node did not return file checksum data")
2273
      if test:
2274
        ignore_nodes.add(node.uuid)
2275
        continue
2276

    
2277
      # Build per-checksum mapping from filename to nodes having it
2278
      for (filename, checksum) in node_files.items():
2279
        assert filename in nodefiles
2280
        fileinfo[filename].setdefault(checksum, set()).add(node.uuid)
2281

    
2282
    for (filename, checksums) in fileinfo.items():
2283
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2284

    
2285
      # Nodes having the file
2286
      with_file = frozenset(node_uuid
2287
                            for node_uuids in fileinfo[filename].values()
2288
                            for node_uuid in node_uuids) - ignore_nodes
2289

    
2290
      expected_nodes = nodefiles[filename] - ignore_nodes
2291

    
2292
      # Nodes missing file
2293
      missing_file = expected_nodes - with_file
2294

    
2295
      if filename in files_opt:
2296
        # All or no nodes
2297
        self._ErrorIf(missing_file and missing_file != expected_nodes,
2298
                      constants.CV_ECLUSTERFILECHECK, None,
2299
                      "File %s is optional, but it must exist on all or no"
2300
                      " nodes (not found on %s)",
2301
                      filename,
2302
                      utils.CommaJoin(
2303
                        utils.NiceSort(
2304
                          map(self.cfg.GetNodeName, missing_file))))
2305
      else:
2306
        self._ErrorIf(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2307
                      "File %s is missing from node(s) %s", filename,
2308
                      utils.CommaJoin(
2309
                        utils.NiceSort(
2310
                          map(self.cfg.GetNodeName, missing_file))))
2311

    
2312
        # Warn if a node has a file it shouldn't
2313
        unexpected = with_file - expected_nodes
2314
        self._ErrorIf(unexpected,
2315
                      constants.CV_ECLUSTERFILECHECK, None,
2316
                      "File %s should not exist on node(s) %s",
2317
                      filename, utils.CommaJoin(
2318
                        utils.NiceSort(map(self.cfg.GetNodeName, unexpected))))
2319

    
2320
      # See if there are multiple versions of the file
2321
      test = len(checksums) > 1
2322
      if test:
2323
        variants = ["variant %s on %s" %
2324
                    (idx + 1,
2325
                     utils.CommaJoin(utils.NiceSort(
2326
                       map(self.cfg.GetNodeName, node_uuids))))
2327
                    for (idx, (checksum, node_uuids)) in
2328
                      enumerate(sorted(checksums.items()))]
2329
      else:
2330
        variants = []
2331

    
2332
      self._ErrorIf(test, constants.CV_ECLUSTERFILECHECK, None,
2333
                    "File %s found with %s different checksums (%s)",
2334
                    filename, len(checksums), "; ".join(variants))
2335

    
2336
  def _VerifyNodeDrbdHelper(self, ninfo, nresult, drbd_helper):
2337
    """Verify the drbd helper.
2338

2339
    """
2340
    if drbd_helper:
2341
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2342
      test = (helper_result is None)
2343
      self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2344
                    "no drbd usermode helper returned")
2345
      if helper_result:
2346
        status, payload = helper_result
2347
        test = not status
2348
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2349
                      "drbd usermode helper check unsuccessful: %s", payload)
2350
        test = status and (payload != drbd_helper)
2351
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2352
                      "wrong drbd usermode helper: %s", payload)
2353

    
2354
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2355
                      drbd_map):
2356
    """Verifies and the node DRBD status.
2357

2358
    @type ninfo: L{objects.Node}
2359
    @param ninfo: the node to check
2360
    @param nresult: the remote results for the node
2361
    @param instanceinfo: the dict of instances
2362
    @param drbd_helper: the configured DRBD usermode helper
2363
    @param drbd_map: the DRBD map as returned by
2364
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2365

2366
    """
2367
    self._VerifyNodeDrbdHelper(ninfo, nresult, drbd_helper)
2368

    
2369
    # compute the DRBD minors
2370
    node_drbd = {}
2371
    for minor, inst_uuid in drbd_map[ninfo.uuid].items():
2372
      test = inst_uuid not in instanceinfo
2373
      self._ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2374
                    "ghost instance '%s' in temporary DRBD map", inst_uuid)
2375
        # ghost instance should not be running, but otherwise we
2376
        # don't give double warnings (both ghost instance and
2377
        # unallocated minor in use)
2378
      if test:
2379
        node_drbd[minor] = (inst_uuid, False)
2380
      else:
2381
        instance = instanceinfo[inst_uuid]
2382
        node_drbd[minor] = (inst_uuid, instance.disks_active)
2383

    
2384
    # and now check them
2385
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2386
    test = not isinstance(used_minors, (tuple, list))
2387
    self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2388
                  "cannot parse drbd status file: %s", str(used_minors))
2389
    if test:
2390
      # we cannot check drbd status
2391
      return
2392

    
2393
    for minor, (inst_uuid, must_exist) in node_drbd.items():
2394
      test = minor not in used_minors and must_exist
2395
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2396
                    "drbd minor %d of instance %s is not active", minor,
2397
                    self.cfg.GetInstanceName(inst_uuid))
2398
    for minor in used_minors:
2399
      test = minor not in node_drbd
2400
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2401
                    "unallocated drbd minor %d is in use", minor)
2402

    
2403
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2404
    """Builds the node OS structures.
2405

2406
    @type ninfo: L{objects.Node}
2407
    @param ninfo: the node to check
2408
    @param nresult: the remote results for the node
2409
    @param nimg: the node image object
2410

2411
    """
2412
    remote_os = nresult.get(constants.NV_OSLIST, None)
2413
    test = (not isinstance(remote_os, list) or
2414
            not compat.all(isinstance(v, list) and len(v) == 7
2415
                           for v in remote_os))
2416

    
2417
    self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2418
                  "node hasn't returned valid OS data")
2419

    
2420
    nimg.os_fail = test
2421

    
2422
    if test:
2423
      return
2424

    
2425
    os_dict = {}
2426

    
2427
    for (name, os_path, status, diagnose,
2428
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2429

    
2430
      if name not in os_dict:
2431
        os_dict[name] = []
2432

    
2433
      # parameters is a list of lists instead of list of tuples due to
2434
      # JSON lacking a real tuple type, fix it:
2435
      parameters = [tuple(v) for v in parameters]
2436
      os_dict[name].append((os_path, status, diagnose,
2437
                            set(variants), set(parameters), set(api_ver)))
2438

    
2439
    nimg.oslist = os_dict
2440

    
2441
  def _VerifyNodeOS(self, ninfo, nimg, base):
2442
    """Verifies the node OS list.
2443

2444
    @type ninfo: L{objects.Node}
2445
    @param ninfo: the node to check
2446
    @param nimg: the node image object
2447
    @param base: the 'template' node we match against (e.g. from the master)
2448

2449
    """
2450
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2451

    
2452
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2453
    for os_name, os_data in nimg.oslist.items():
2454
      assert os_data, "Empty OS status for OS %s?!" % os_name
2455
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2456
      self._ErrorIf(not f_status, constants.CV_ENODEOS, ninfo.name,
2457
                    "Invalid OS %s (located at %s): %s",
2458
                    os_name, f_path, f_diag)
2459
      self._ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, ninfo.name,
2460
                    "OS '%s' has multiple entries"
2461
                    " (first one shadows the rest): %s",
2462
                    os_name, utils.CommaJoin([v[0] for v in os_data]))
2463
      # comparisons with the 'base' image
2464
      test = os_name not in base.oslist
2465
      self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2466
                    "Extra OS %s not present on reference node (%s)",
2467
                    os_name, self.cfg.GetNodeName(base.uuid))
2468
      if test:
2469
        continue
2470
      assert base.oslist[os_name], "Base node has empty OS status?"
2471
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2472
      if not b_status:
2473
        # base OS is invalid, skipping
2474
        continue
2475
      for kind, a, b in [("API version", f_api, b_api),
2476
                         ("variants list", f_var, b_var),
2477
                         ("parameters", beautify_params(f_param),
2478
                          beautify_params(b_param))]:
2479
        self._ErrorIf(a != b, constants.CV_ENODEOS, ninfo.name,
2480
                      "OS %s for %s differs from reference node %s:"
2481
                      " [%s] vs. [%s]", kind, os_name,
2482
                      self.cfg.GetNodeName(base.uuid),
2483
                      utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2484

    
2485
    # check any missing OSes
2486
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2487
    self._ErrorIf(missing, constants.CV_ENODEOS, ninfo.name,
2488
                  "OSes present on reference node %s"
2489
                  " but missing on this node: %s",
2490
                  self.cfg.GetNodeName(base.uuid), utils.CommaJoin(missing))
2491

    
2492
  def _VerifyAcceptedFileStoragePaths(self, ninfo, nresult, is_master):
2493
    """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2494

2495
    @type ninfo: L{objects.Node}
2496
    @param ninfo: the node to check
2497
    @param nresult: the remote results for the node
2498
    @type is_master: bool
2499
    @param is_master: Whether node is the master node
2500

2501
    """
2502
    cluster = self.cfg.GetClusterInfo()
2503
    if (is_master and
2504
        (cluster.IsFileStorageEnabled() or
2505
         cluster.IsSharedFileStorageEnabled())):
2506
      try:
2507
        fspaths = nresult[constants.NV_ACCEPTED_STORAGE_PATHS]
2508
      except KeyError:
2509
        # This should never happen
2510
        self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2511
                      "Node did not return forbidden file storage paths")
2512
      else:
2513
        self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2514
                      "Found forbidden file storage paths: %s",
2515
                      utils.CommaJoin(fspaths))
2516
    else:
2517
      self._ErrorIf(constants.NV_ACCEPTED_STORAGE_PATHS in nresult,
2518
                    constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2519
                    "Node should not have returned forbidden file storage"
2520
                    " paths")
2521

    
2522
  def _VerifyStoragePaths(self, ninfo, nresult, file_disk_template,
2523
                          verify_key, error_key):
2524
    """Verifies (file) storage paths.
2525

2526
    @type ninfo: L{objects.Node}
2527
    @param ninfo: the node to check
2528
    @param nresult: the remote results for the node
2529
    @type file_disk_template: string
2530
    @param file_disk_template: file-based disk template, whose directory
2531
        is supposed to be verified
2532
    @type verify_key: string
2533
    @param verify_key: key for the verification map of this file
2534
        verification step
2535
    @param error_key: error key to be added to the verification results
2536
        in case something goes wrong in this verification step
2537

2538
    """
2539
    assert (file_disk_template in
2540
            utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
2541
    cluster = self.cfg.GetClusterInfo()
2542
    if cluster.IsDiskTemplateEnabled(file_disk_template):
2543
      self._ErrorIf(
2544
          verify_key in nresult,
2545
          error_key, ninfo.name,
2546
          "The configured %s storage path is unusable: %s" %
2547
          (file_disk_template, nresult.get(verify_key)))
2548

    
2549
  def _VerifyFileStoragePaths(self, ninfo, nresult):
2550
    """Verifies (file) storage paths.
2551

2552
    @see: C{_VerifyStoragePaths}
2553

2554
    """
2555
    self._VerifyStoragePaths(
2556
        ninfo, nresult, constants.DT_FILE,
2557
        constants.NV_FILE_STORAGE_PATH,
2558
        constants.CV_ENODEFILESTORAGEPATHUNUSABLE)
2559

    
2560
  def _VerifySharedFileStoragePaths(self, ninfo, nresult):
2561
    """Verifies (file) storage paths.
2562

2563
    @see: C{_VerifyStoragePaths}
2564

2565
    """
2566
    self._VerifyStoragePaths(
2567
        ninfo, nresult, constants.DT_SHARED_FILE,
2568
        constants.NV_SHARED_FILE_STORAGE_PATH,
2569
        constants.CV_ENODESHAREDFILESTORAGEPATHUNUSABLE)
2570

    
2571
  def _VerifyOob(self, ninfo, nresult):
2572
    """Verifies out of band functionality of a node.
2573

2574
    @type ninfo: L{objects.Node}
2575
    @param ninfo: the node to check
2576
    @param nresult: the remote results for the node
2577

2578
    """
2579
    # We just have to verify the paths on master and/or master candidates
2580
    # as the oob helper is invoked on the master
2581
    if ((ninfo.master_candidate or ninfo.master_capable) and
2582
        constants.NV_OOB_PATHS in nresult):
2583
      for path_result in nresult[constants.NV_OOB_PATHS]:
2584
        self._ErrorIf(path_result, constants.CV_ENODEOOBPATH,
2585
                      ninfo.name, path_result)
2586

    
2587
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2588
    """Verifies and updates the node volume data.
2589

2590
    This function will update a L{NodeImage}'s internal structures
2591
    with data from the remote call.
2592

2593
    @type ninfo: L{objects.Node}
2594
    @param ninfo: the node to check
2595
    @param nresult: the remote results for the node
2596
    @param nimg: the node image object
2597
    @param vg_name: the configured VG name
2598

2599
    """
2600
    nimg.lvm_fail = True
2601
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2602
    if vg_name is None:
2603
      pass
2604
    elif isinstance(lvdata, basestring):
2605
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2606
                    "LVM problem on node: %s", utils.SafeEncode(lvdata))
2607
    elif not isinstance(lvdata, dict):
2608
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2609
                    "rpc call to node failed (lvlist)")
2610
    else:
2611
      nimg.volumes = lvdata
2612
      nimg.lvm_fail = False
2613

    
2614
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2615
    """Verifies and updates the node instance list.
2616

2617
    If the listing was successful, then updates this node's instance
2618
    list. Otherwise, it marks the RPC call as failed for the instance
2619
    list key.
2620

2621
    @type ninfo: L{objects.Node}
2622
    @param ninfo: the node to check
2623
    @param nresult: the remote results for the node
2624
    @param nimg: the node image object
2625

2626
    """
2627
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2628
    test = not isinstance(idata, list)
2629
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2630
                  "rpc call to node failed (instancelist): %s",
2631
                  utils.SafeEncode(str(idata)))
2632
    if test:
2633
      nimg.hyp_fail = True
2634
    else:
2635
      nimg.instances = [inst.uuid for (_, inst) in
2636
                        self.cfg.GetMultiInstanceInfoByName(idata)]
2637

    
2638
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2639
    """Verifies and computes a node information map
2640

2641
    @type ninfo: L{objects.Node}
2642
    @param ninfo: the node to check
2643
    @param nresult: the remote results for the node
2644
    @param nimg: the node image object
2645
    @param vg_name: the configured VG name
2646

2647
    """
2648
    # try to read free memory (from the hypervisor)
2649
    hv_info = nresult.get(constants.NV_HVINFO, None)
2650
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2651
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2652
                  "rpc call to node failed (hvinfo)")
2653
    if not test:
2654
      try:
2655
        nimg.mfree = int(hv_info["memory_free"])
2656
      except (ValueError, TypeError):
2657
        self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2658
                      "node returned invalid nodeinfo, check hypervisor")
2659

    
2660
    # FIXME: devise a free space model for file based instances as well
2661
    if vg_name is not None:
2662
      test = (constants.NV_VGLIST not in nresult or
2663
              vg_name not in nresult[constants.NV_VGLIST])
2664
      self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
2665
                    "node didn't return data for the volume group '%s'"
2666
                    " - it is either missing or broken", vg_name)
2667
      if not test:
2668
        try:
2669
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2670
        except (ValueError, TypeError):
2671
          self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2672
                        "node returned invalid LVM info, check LVM status")
2673

    
2674
  def _CollectDiskInfo(self, node_uuids, node_image, instanceinfo):
2675
    """Gets per-disk status information for all instances.
2676

2677
    @type node_uuids: list of strings
2678
    @param node_uuids: Node UUIDs
2679
    @type node_image: dict of (UUID, L{objects.Node})
2680
    @param node_image: Node objects
2681
    @type instanceinfo: dict of (UUID, L{objects.Instance})
2682
    @param instanceinfo: Instance objects
2683
    @rtype: {instance: {node: [(succes, payload)]}}
2684
    @return: a dictionary of per-instance dictionaries with nodes as
2685
        keys and disk information as values; the disk information is a
2686
        list of tuples (success, payload)
2687

2688
    """
2689
    node_disks = {}
2690
    node_disks_dev_inst_only = {}
2691
    diskless_instances = set()
2692
    diskless = constants.DT_DISKLESS
2693

    
2694
    for nuuid in node_uuids:
2695
      node_inst_uuids = list(itertools.chain(node_image[nuuid].pinst,
2696
                                             node_image[nuuid].sinst))
2697
      diskless_instances.update(uuid for uuid in node_inst_uuids
2698
                                if instanceinfo[uuid].disk_template == diskless)
2699
      disks = [(inst_uuid, disk)
2700
               for inst_uuid in node_inst_uuids
2701
               for disk in instanceinfo[inst_uuid].disks]
2702

    
2703
      if not disks:
2704
        # No need to collect data
2705
        continue
2706

    
2707
      node_disks[nuuid] = disks
2708

    
2709
      # _AnnotateDiskParams makes already copies of the disks
2710
      dev_inst_only = []
2711
      for (inst_uuid, dev) in disks:
2712
        (anno_disk,) = AnnotateDiskParams(instanceinfo[inst_uuid], [dev],
2713
                                          self.cfg)
2714
        dev_inst_only.append((anno_disk, instanceinfo[inst_uuid]))
2715

    
2716
      node_disks_dev_inst_only[nuuid] = dev_inst_only
2717

    
2718
    assert len(node_disks) == len(node_disks_dev_inst_only)
2719

    
2720
    # Collect data from all nodes with disks
2721
    result = self.rpc.call_blockdev_getmirrorstatus_multi(
2722
               node_disks.keys(), node_disks_dev_inst_only)
2723

    
2724
    assert len(result) == len(node_disks)
2725

    
2726
    instdisk = {}
2727

    
2728
    for (nuuid, nres) in result.items():
2729
      node = self.cfg.GetNodeInfo(nuuid)
2730
      disks = node_disks[node.uuid]
2731

    
2732
      if nres.offline:
2733
        # No data from this node
2734
        data = len(disks) * [(False, "node offline")]
2735
      else:
2736
        msg = nres.fail_msg
2737
        self._ErrorIf(msg, constants.CV_ENODERPC, node.name,
2738
                      "while getting disk information: %s", msg)
2739
        if msg:
2740
          # No data from this node
2741
          data = len(disks) * [(False, msg)]
2742
        else:
2743
          data = []
2744
          for idx, i in enumerate(nres.payload):
2745
            if isinstance(i, (tuple, list)) and len(i) == 2:
2746
              data.append(i)
2747
            else:
2748
              logging.warning("Invalid result from node %s, entry %d: %s",
2749
                              node.name, idx, i)
2750
              data.append((False, "Invalid result from the remote node"))
2751

    
2752
      for ((inst_uuid, _), status) in zip(disks, data):
2753
        instdisk.setdefault(inst_uuid, {}).setdefault(node.uuid, []) \
2754
          .append(status)
2755

    
2756
    # Add empty entries for diskless instances.
2757
    for inst_uuid in diskless_instances:
2758
      assert inst_uuid not in instdisk
2759
      instdisk[inst_uuid] = {}
2760

    
2761
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2762
                      len(nuuids) <= len(instanceinfo[inst].all_nodes) and
2763
                      compat.all(isinstance(s, (tuple, list)) and
2764
                                 len(s) == 2 for s in statuses)
2765
                      for inst, nuuids in instdisk.items()
2766
                      for nuuid, statuses in nuuids.items())
2767
    if __debug__:
2768
      instdisk_keys = set(instdisk)
2769
      instanceinfo_keys = set(instanceinfo)
2770
      assert instdisk_keys == instanceinfo_keys, \
2771
        ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2772
         (instdisk_keys, instanceinfo_keys))
2773

    
2774
    return instdisk
2775

    
2776
  @staticmethod
2777
  def _SshNodeSelector(group_uuid, all_nodes):
2778
    """Create endless iterators for all potential SSH check hosts.
2779

2780
    """
2781
    nodes = [node for node in all_nodes
2782
             if (node.group != group_uuid and
2783
                 not node.offline)]
2784
    keyfunc = operator.attrgetter("group")
2785

    
2786
    return map(itertools.cycle,
2787
               [sorted(map(operator.attrgetter("name"), names))
2788
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2789
                                                  keyfunc)])
2790

    
2791
  @classmethod
2792
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2793
    """Choose which nodes should talk to which other nodes.
2794

2795
    We will make nodes contact all nodes in their group, and one node from
2796
    every other group.
2797

2798
    @warning: This algorithm has a known issue if one node group is much
2799
      smaller than others (e.g. just one node). In such a case all other
2800
      nodes will talk to the single node.
2801

2802
    """
2803
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2804
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
2805

    
2806
    return (online_nodes,
2807
            dict((name, sorted([i.next() for i in sel]))
2808
                 for name in online_nodes))
2809

    
2810
  def BuildHooksEnv(self):
2811
    """Build hooks env.
2812

2813
    Cluster-Verify hooks just ran in the post phase and their failure makes
2814
    the output be logged in the verify output and the verification to fail.
2815

2816
    """
2817
    env = {
2818
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2819
      }
2820

    
2821
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2822
               for node in self.my_node_info.values())
2823

    
2824
    return env
2825

    
2826
  def BuildHooksNodes(self):
2827
    """Build hooks nodes.
2828

2829
    """
2830
    return ([], list(self.my_node_info.keys()))
2831

    
2832
  def Exec(self, feedback_fn):
2833
    """Verify integrity of the node group, performing various test on nodes.
2834

2835
    """
2836
    # This method has too many local variables. pylint: disable=R0914
2837
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
2838

    
2839
    if not self.my_node_uuids:
2840
      # empty node group
2841
      feedback_fn("* Empty node group, skipping verification")
2842
      return True
2843

    
2844
    self.bad = False
2845
    verbose = self.op.verbose
2846
    self._feedback_fn = feedback_fn
2847

    
2848
    vg_name = self.cfg.GetVGName()
2849
    drbd_helper = self.cfg.GetDRBDHelper()
2850
    cluster = self.cfg.GetClusterInfo()
2851
    hypervisors = cluster.enabled_hypervisors
2852
    node_data_list = self.my_node_info.values()
2853

    
2854
    i_non_redundant = [] # Non redundant instances
2855
    i_non_a_balanced = [] # Non auto-balanced instances
2856
    i_offline = 0 # Count of offline instances
2857
    n_offline = 0 # Count of offline nodes
2858
    n_drained = 0 # Count of nodes being drained
2859
    node_vol_should = {}
2860

    
2861
    # FIXME: verify OS list
2862

    
2863
    # File verification
2864
    filemap = ComputeAncillaryFiles(cluster, False)
2865

    
2866
    # do local checksums
2867
    master_node_uuid = self.master_node = self.cfg.GetMasterNode()
2868
    master_ip = self.cfg.GetMasterIP()
2869

    
2870
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_uuids))
2871

    
2872
    user_scripts = []
2873
    if self.cfg.GetUseExternalMipScript():
2874
      user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2875

    
2876
    node_verify_param = {
2877
      constants.NV_FILELIST:
2878
        map(vcluster.MakeVirtualPath,
2879
            utils.UniqueSequence(filename
2880
                                 for files in filemap
2881
                                 for filename in files)),
2882
      constants.NV_NODELIST:
2883
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2884
                                  self.all_node_info.values()),
2885
      constants.NV_HYPERVISOR: hypervisors,
2886
      constants.NV_HVPARAMS:
2887
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2888
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2889
                                 for node in node_data_list
2890
                                 if not node.offline],
2891
      constants.NV_INSTANCELIST: hypervisors,
2892
      constants.NV_VERSION: None,
2893
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2894
      constants.NV_NODESETUP: None,
2895
      constants.NV_TIME: None,
2896
      constants.NV_MASTERIP: (self.cfg.GetMasterNodeName(), master_ip),
2897
      constants.NV_OSLIST: None,
2898
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2899
      constants.NV_USERSCRIPTS: user_scripts,
2900
      }
2901

    
2902
    if vg_name is not None:
2903
      node_verify_param[constants.NV_VGLIST] = None
2904
      node_verify_param[constants.NV_LVLIST] = vg_name
2905
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2906

    
2907
    if cluster.IsDiskTemplateEnabled(constants.DT_DRBD8):
2908
      if drbd_helper:
2909
        node_verify_param[constants.NV_DRBDVERSION] = None
2910
        node_verify_param[constants.NV_DRBDLIST] = None
2911
        node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2912

    
2913
    if cluster.IsFileStorageEnabled() or \
2914
        cluster.IsSharedFileStorageEnabled():
2915
      # Load file storage paths only from master node
2916
      node_verify_param[constants.NV_ACCEPTED_STORAGE_PATHS] = \
2917
        self.cfg.GetMasterNodeName()
2918
      if cluster.IsFileStorageEnabled():
2919
        node_verify_param[constants.NV_FILE_STORAGE_PATH] = \
2920
          cluster.file_storage_dir
2921

    
2922
    # bridge checks
2923
    # FIXME: this needs to be changed per node-group, not cluster-wide
2924
    bridges = set()
2925
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2926
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2927
      bridges.add(default_nicpp[constants.NIC_LINK])
2928
    for inst_uuid in self.my_inst_info.values():
2929
      for nic in inst_uuid.nics:
2930
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
2931
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2932
          bridges.add(full_nic[constants.NIC_LINK])
2933

    
2934
    if bridges:
2935
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
2936

    
2937
    # Build our expected cluster state
2938
    node_image = dict((node.uuid, self.NodeImage(offline=node.offline,
2939
                                                 uuid=node.uuid,
2940
                                                 vm_capable=node.vm_capable))
2941
                      for node in node_data_list)
2942

    
2943
    # Gather OOB paths
2944
    oob_paths = []
2945
    for node in self.all_node_info.values():
2946
      path = SupportsOob(self.cfg, node)
2947
      if path and path not in oob_paths:
2948
        oob_paths.append(path)
2949

    
2950
    if oob_paths:
2951
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2952

    
2953
    for inst_uuid in self.my_inst_uuids:
2954
      instance = self.my_inst_info[inst_uuid]
2955
      if instance.admin_state == constants.ADMINST_OFFLINE:
2956
        i_offline += 1
2957

    
2958
      for nuuid in instance.all_nodes:
2959
        if nuuid not in node_image:
2960
          gnode = self.NodeImage(uuid=nuuid)
2961
          gnode.ghost = (nuuid not in self.all_node_info)
2962
          node_image[nuuid] = gnode
2963

    
2964
      instance.MapLVsByNode(node_vol_should)
2965

    
2966
      pnode = instance.primary_node
2967
      node_image[pnode].pinst.append(instance.uuid)
2968

    
2969
      for snode in instance.secondary_nodes:
2970
        nimg = node_image[snode]
2971
        nimg.sinst.append(instance.uuid)
2972
        if pnode not in nimg.sbp:
2973
          nimg.sbp[pnode] = []
2974
        nimg.sbp[pnode].append(instance.uuid)
2975

    
2976
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
2977
                                               self.my_node_info.keys())
2978
    # The value of exclusive_storage should be the same across the group, so if
2979
    # it's True for at least a node, we act as if it were set for all the nodes
2980
    self._exclusive_storage = compat.any(es_flags.values())
2981
    if self._exclusive_storage:
2982
      node_verify_param[constants.NV_EXCLUSIVEPVS] = True
2983

    
2984
    # At this point, we have the in-memory data structures complete,
2985
    # except for the runtime information, which we'll gather next
2986

    
2987
    # Due to the way our RPC system works, exact response times cannot be
2988
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2989
    # time before and after executing the request, we can at least have a time
2990
    # window.
2991
    nvinfo_starttime = time.time()
2992
    all_nvinfo = self.rpc.call_node_verify(self.my_node_uuids,
2993
                                           node_verify_param,
2994
                                           self.cfg.GetClusterName(),
2995
                                           self.cfg.GetClusterInfo().hvparams)
2996
    nvinfo_endtime = time.time()
2997

    
2998
    if self.extra_lv_nodes and vg_name is not None:
2999
      extra_lv_nvinfo = \
3000
          self.rpc.call_node_verify(self.extra_lv_nodes,
3001
                                    {constants.NV_LVLIST: vg_name},
3002
                                    self.cfg.GetClusterName(),
3003
                                    self.cfg.GetClusterInfo().hvparams)
3004
    else:
3005
      extra_lv_nvinfo = {}
3006

    
3007
    all_drbd_map = self.cfg.ComputeDRBDMap()
3008

    
3009
    feedback_fn("* Gathering disk information (%s nodes)" %
3010
                len(self.my_node_uuids))
3011
    instdisk = self._CollectDiskInfo(self.my_node_info.keys(), node_image,
3012
                                     self.my_inst_info)
3013

    
3014
    feedback_fn("* Verifying configuration file consistency")
3015

    
3016
    # If not all nodes are being checked, we need to make sure the master node
3017
    # and a non-checked vm_capable node are in the list.
3018
    absent_node_uuids = set(self.all_node_info).difference(self.my_node_info)
3019
    if absent_node_uuids:
3020
      vf_nvinfo = all_nvinfo.copy()
3021
      vf_node_info = list(self.my_node_info.values())
3022
      additional_node_uuids = []
3023
      if master_node_uuid not in self.my_node_info:
3024
        additional_node_uuids.append(master_node_uuid)
3025
        vf_node_info.append(self.all_node_info[master_node_uuid])
3026
      # Add the first vm_capable node we find which is not included,
3027
      # excluding the master node (which we already have)
3028
      for node_uuid in absent_node_uuids:
3029
        nodeinfo = self.all_node_info[node_uuid]
3030
        if (nodeinfo.vm_capable and not nodeinfo.offline and
3031
            node_uuid != master_node_uuid):
3032
          additional_node_uuids.append(node_uuid)
3033
          vf_node_info.append(self.all_node_info[node_uuid])
3034
          break
3035
      key = constants.NV_FILELIST
3036
      vf_nvinfo.update(self.rpc.call_node_verify(
3037
         additional_node_uuids, {key: node_verify_param[key]},
3038
         self.cfg.GetClusterName(), self.cfg.GetClusterInfo().hvparams))
3039
    else:
3040
      vf_nvinfo = all_nvinfo
3041
      vf_node_info = self.my_node_info.values()
3042

    
3043
    self._VerifyFiles(vf_node_info, master_node_uuid, vf_nvinfo, filemap)
3044

    
3045
    feedback_fn("* Verifying node status")
3046

    
3047
    refos_img = None
3048

    
3049
    for node_i in node_data_list:
3050
      nimg = node_image[node_i.uuid]
3051

    
3052
      if node_i.offline:
3053
        if verbose:
3054
          feedback_fn("* Skipping offline node %s" % (node_i.name,))
3055
        n_offline += 1
3056
        continue
3057

    
3058
      if node_i.uuid == master_node_uuid:
3059
        ntype = "master"
3060
      elif node_i.master_candidate:
3061
        ntype = "master candidate"
3062
      elif node_i.drained:
3063
        ntype = "drained"
3064
        n_drained += 1
3065
      else:
3066
        ntype = "regular"
3067
      if verbose:
3068
        feedback_fn("* Verifying node %s (%s)" % (node_i.name, ntype))
3069

    
3070
      msg = all_nvinfo[node_i.uuid].fail_msg
3071
      self._ErrorIf(msg, constants.CV_ENODERPC, node_i.name,
3072
                    "while contacting node: %s", msg)
3073
      if msg:
3074
        nimg.rpc_fail = True
3075
        continue
3076

    
3077
      nresult = all_nvinfo[node_i.uuid].payload
3078

    
3079
      nimg.call_ok = self._VerifyNode(node_i, nresult)
3080
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
3081
      self._VerifyNodeNetwork(node_i, nresult)
3082
      self._VerifyNodeUserScripts(node_i, nresult)
3083
      self._VerifyOob(node_i, nresult)
3084
      self._VerifyAcceptedFileStoragePaths(node_i, nresult,
3085
                                           node_i.uuid == master_node_uuid)
3086
      self._VerifyFileStoragePaths(node_i, nresult)
3087
      self._VerifySharedFileStoragePaths(node_i, nresult)
3088

    
3089
      if nimg.vm_capable:
3090
        self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
3091
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
3092
                             all_drbd_map)
3093

    
3094
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
3095
        self._UpdateNodeInstances(node_i, nresult, nimg)
3096
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
3097
        self._UpdateNodeOS(node_i, nresult, nimg)
3098

    
3099
        if not nimg.os_fail:
3100
          if refos_img is None:
3101
            refos_img = nimg
3102
          self._VerifyNodeOS(node_i, nimg, refos_img)
3103
        self._VerifyNodeBridges(node_i, nresult, bridges)
3104

    
3105
        # Check whether all running instances are primary for the node. (This
3106
        # can no longer be done from _VerifyInstance below, since some of the
3107
        # wrong instances could be from other node groups.)
3108
        non_primary_inst_uuids = set(nimg.instances).difference(nimg.pinst)
3109

    
3110
        for inst_uuid in non_primary_inst_uuids:
3111
          test = inst_uuid in self.all_inst_info
3112
          self._ErrorIf(test, constants.CV_EINSTANCEWRONGNODE,
3113
                        self.cfg.GetInstanceName(inst_uuid),
3114
                        "instance should not run on node %s", node_i.name)
3115
          self._ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
3116
                        "node is running unknown instance %s", inst_uuid)
3117

    
3118
    self._VerifyGroupDRBDVersion(all_nvinfo)
3119
    self._VerifyGroupLVM(node_image, vg_name)
3120

    
3121
    for node_uuid, result in extra_lv_nvinfo.items():
3122
      self._UpdateNodeVolumes(self.all_node_info[node_uuid], result.payload,
3123
                              node_image[node_uuid], vg_name)
3124

    
3125
    feedback_fn("* Verifying instance status")
3126
    for inst_uuid in self.my_inst_uuids:
3127
      instance = self.my_inst_info[inst_uuid]
3128
      if verbose:
3129
        feedback_fn("* Verifying instance %s" % instance.name)
3130
      self._VerifyInstance(instance, node_image, instdisk[inst_uuid])
3131

    
3132
      # If the instance is non-redundant we cannot survive losing its primary
3133
      # node, so we are not N+1 compliant.
3134
      if instance.disk_template not in constants.DTS_MIRRORED:
3135
        i_non_redundant.append(instance)
3136

    
3137
      if not cluster.FillBE(instance)[constants.BE_AUTO_BALANCE]:
3138
        i_non_a_balanced.append(instance)
3139

    
3140
    feedback_fn("* Verifying orphan volumes")
3141
    reserved = utils.FieldSet(*cluster.reserved_lvs)
3142

    
3143
    # We will get spurious "unknown volume" warnings if any node of this group
3144
    # is secondary for an instance whose primary is in another group. To avoid
3145
    # them, we find these instances and add their volumes to node_vol_should.
3146
    for instance in self.all_inst_info.values():
3147
      for secondary in instance.secondary_nodes:
3148
        if (secondary in self.my_node_info
3149
            and instance.name not in self.my_inst_info):
3150
          instance.MapLVsByNode(node_vol_should)
3151
          break
3152

    
3153
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
3154

    
3155
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
3156
      feedback_fn("* Verifying N+1 Memory redundancy")
3157
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
3158

    
3159
    feedback_fn("* Other Notes")
3160
    if i_non_redundant:
3161
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
3162
                  % len(i_non_redundant))
3163

    
3164
    if i_non_a_balanced:
3165
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
3166
                  % len(i_non_a_balanced))
3167

    
3168
    if i_offline:
3169
      feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
3170

    
3171
    if n_offline:
3172
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
3173

    
3174
    if n_drained:
3175
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
3176

    
3177
    return not self.bad
3178

    
3179
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
3180
    """Analyze the post-hooks' result
3181

3182
    This method analyses the hook result, handles it, and sends some
3183
    nicely-formatted feedback back to the user.
3184

3185
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
3186
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
3187
    @param hooks_results: the results of the multi-node hooks rpc call
3188
    @param feedback_fn: function used send feedback back to the caller
3189
    @param lu_result: previous Exec result
3190
    @return: the new Exec result, based on the previous result
3191
        and hook results
3192

3193
    """
3194
    # We only really run POST phase hooks, only for non-empty groups,
3195
    # and are only interested in their results
3196
    if not self.my_node_uuids:
3197
      # empty node group
3198
      pass
3199
    elif phase == constants.HOOKS_PHASE_POST:
3200
      # Used to change hooks' output to proper indentation
3201
      feedback_fn("* Hooks Results")
3202
      assert hooks_results, "invalid result from hooks"
3203

    
3204
      for node_name in hooks_results:
3205
        res = hooks_results[node_name]
3206
        msg = res.fail_msg
3207
        test = msg and not res.offline
3208
        self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3209
                      "Communication failure in hooks execution: %s", msg)
3210
        if res.offline or msg:
3211
          # No need to investigate payload if node is offline or gave
3212
          # an error.
3213
          continue
3214
        for script, hkr, output in res.payload:
3215
          test = hkr == constants.HKR_FAIL
3216
          self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3217
                        "Script %s failed, output:", script)
3218
          if test:
3219
            output = self._HOOKS_INDENT_RE.sub("      ", output)
3220
            feedback_fn("%s" % output)
3221
            lu_result = False
3222

    
3223
    return lu_result
3224

    
3225

    
3226
class LUClusterVerifyDisks(NoHooksLU):
3227
  """Verifies the cluster disks status.
3228

3229
  """
3230
  REQ_BGL = False
3231

    
3232
  def ExpandNames(self):
3233
    self.share_locks = ShareAll()
3234
    self.needed_locks = {
3235
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
3236
      }
3237

    
3238
  def Exec(self, feedback_fn):
3239
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3240

    
3241
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3242
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3243
                           for group in group_names])