Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / cluster.py @ 07e68848

History | View | Annotate | Download (122 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with the cluster."""
23

    
24
import OpenSSL
25

    
26
import copy
27
import itertools
28
import logging
29
import operator
30
import os
31
import re
32
import time
33

    
34
from ganeti import compat
35
from ganeti import constants
36
from ganeti import errors
37
from ganeti import hypervisor
38
from ganeti import locking
39
from ganeti import masterd
40
from ganeti import netutils
41
from ganeti import objects
42
from ganeti import opcodes
43
from ganeti import pathutils
44
from ganeti import query
45
from ganeti import rpc
46
from ganeti import runtime
47
from ganeti import ssh
48
from ganeti import uidpool
49
from ganeti import utils
50
from ganeti import vcluster
51

    
52
from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
53
  ResultWithJobs
54
from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55
  ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56
  GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57
  GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58
  CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59
  ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob, \
60
  CheckIpolicyVsDiskTemplates, CheckDiskAccessModeValidity, \
61
  CheckDiskAccessModeConsistency
62

    
63
import ganeti.masterd.instance
64

    
65

    
66
class LUClusterActivateMasterIp(NoHooksLU):
67
  """Activate the master IP on the master node.
68

69
  """
70
  def Exec(self, feedback_fn):
71
    """Activate the master IP.
72

73
    """
74
    master_params = self.cfg.GetMasterNetworkParameters()
75
    ems = self.cfg.GetUseExternalMipScript()
76
    result = self.rpc.call_node_activate_master_ip(master_params.uuid,
77
                                                   master_params, ems)
78
    result.Raise("Could not activate the master IP")
79

    
80

    
81
class LUClusterDeactivateMasterIp(NoHooksLU):
82
  """Deactivate the master IP on the master node.
83

84
  """
85
  def Exec(self, feedback_fn):
86
    """Deactivate the master IP.
87

88
    """
89
    master_params = self.cfg.GetMasterNetworkParameters()
90
    ems = self.cfg.GetUseExternalMipScript()
91
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
92
                                                     master_params, ems)
93
    result.Raise("Could not deactivate the master IP")
94

    
95

    
96
class LUClusterConfigQuery(NoHooksLU):
97
  """Return configuration values.
98

99
  """
100
  REQ_BGL = False
101

    
102
  def CheckArguments(self):
103
    self.cq = ClusterQuery(None, self.op.output_fields, False)
104

    
105
  def ExpandNames(self):
106
    self.cq.ExpandNames(self)
107

    
108
  def DeclareLocks(self, level):
109
    self.cq.DeclareLocks(self, level)
110

    
111
  def Exec(self, feedback_fn):
112
    result = self.cq.OldStyleQuery(self)
113

    
114
    assert len(result) == 1
115

    
116
    return result[0]
117

    
118

    
119
class LUClusterDestroy(LogicalUnit):
120
  """Logical unit for destroying the cluster.
121

122
  """
123
  HPATH = "cluster-destroy"
124
  HTYPE = constants.HTYPE_CLUSTER
125

    
126
  def BuildHooksEnv(self):
127
    """Build hooks env.
128

129
    """
130
    return {
131
      "OP_TARGET": self.cfg.GetClusterName(),
132
      }
133

    
134
  def BuildHooksNodes(self):
135
    """Build hooks nodes.
136

137
    """
138
    return ([], [])
139

    
140
  def CheckPrereq(self):
141
    """Check prerequisites.
142

143
    This checks whether the cluster is empty.
144

145
    Any errors are signaled by raising errors.OpPrereqError.
146

147
    """
148
    master = self.cfg.GetMasterNode()
149

    
150
    nodelist = self.cfg.GetNodeList()
151
    if len(nodelist) != 1 or nodelist[0] != master:
152
      raise errors.OpPrereqError("There are still %d node(s) in"
153
                                 " this cluster." % (len(nodelist) - 1),
154
                                 errors.ECODE_INVAL)
155
    instancelist = self.cfg.GetInstanceList()
156
    if instancelist:
157
      raise errors.OpPrereqError("There are still %d instance(s) in"
158
                                 " this cluster." % len(instancelist),
159
                                 errors.ECODE_INVAL)
160

    
161
  def Exec(self, feedback_fn):
162
    """Destroys the cluster.
163

164
    """
165
    master_params = self.cfg.GetMasterNetworkParameters()
166

    
167
    # Run post hooks on master node before it's removed
168
    RunPostHook(self, self.cfg.GetNodeName(master_params.uuid))
169

    
170
    ems = self.cfg.GetUseExternalMipScript()
171
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
172
                                                     master_params, ems)
173
    result.Warn("Error disabling the master IP address", self.LogWarning)
174
    return master_params.uuid
175

    
176

    
177
class LUClusterPostInit(LogicalUnit):
178
  """Logical unit for running hooks after cluster initialization.
179

180
  """
181
  HPATH = "cluster-init"
182
  HTYPE = constants.HTYPE_CLUSTER
183

    
184
  def CheckArguments(self):
185
    self.master_uuid = self.cfg.GetMasterNode()
186
    self.master_ndparams = self.cfg.GetNdParams(self.cfg.GetMasterNodeInfo())
187

    
188
    # TODO: When Issue 584 is solved, and None is properly parsed when used
189
    # as a default value, ndparams.get(.., None) can be changed to
190
    # ndparams[..] to access the values directly
191

    
192
    # OpenvSwitch: Warn user if link is missing
193
    if (self.master_ndparams[constants.ND_OVS] and not
194
        self.master_ndparams.get(constants.ND_OVS_LINK, None)):
195
      self.LogInfo("No physical interface for OpenvSwitch was given."
196
                   " OpenvSwitch will not have an outside connection. This"
197
                   " might not be what you want.")
198

    
199
  def BuildHooksEnv(self):
200
    """Build hooks env.
201

202
    """
203
    return {
204
      "OP_TARGET": self.cfg.GetClusterName(),
205
      }
206

    
207
  def BuildHooksNodes(self):
208
    """Build hooks nodes.
209

210
    """
211
    return ([], [self.cfg.GetMasterNode()])
212

    
213
  def Exec(self, feedback_fn):
214
    """Create and configure Open vSwitch
215

216
    """
217
    if self.master_ndparams[constants.ND_OVS]:
218
      result = self.rpc.call_node_configure_ovs(
219
                 self.master_uuid,
220
                 self.master_ndparams[constants.ND_OVS_NAME],
221
                 self.master_ndparams.get(constants.ND_OVS_LINK, None))
222
      result.Raise("Could not successully configure Open vSwitch")
223
    return True
224

    
225

    
226
class ClusterQuery(QueryBase):
227
  FIELDS = query.CLUSTER_FIELDS
228

    
229
  #: Do not sort (there is only one item)
230
  SORT_FIELD = None
231

    
232
  def ExpandNames(self, lu):
233
    lu.needed_locks = {}
234

    
235
    # The following variables interact with _QueryBase._GetNames
236
    self.wanted = locking.ALL_SET
237
    self.do_locking = self.use_locking
238

    
239
    if self.do_locking:
240
      raise errors.OpPrereqError("Can not use locking for cluster queries",
241
                                 errors.ECODE_INVAL)
242

    
243
  def DeclareLocks(self, lu, level):
244
    pass
245

    
246
  def _GetQueryData(self, lu):
247
    """Computes the list of nodes and their attributes.
248

249
    """
250
    # Locking is not used
251
    assert not (compat.any(lu.glm.is_owned(level)
252
                           for level in locking.LEVELS
253
                           if level != locking.LEVEL_CLUSTER) or
254
                self.do_locking or self.use_locking)
255

    
256
    if query.CQ_CONFIG in self.requested_data:
257
      cluster = lu.cfg.GetClusterInfo()
258
      nodes = lu.cfg.GetAllNodesInfo()
259
    else:
260
      cluster = NotImplemented
261
      nodes = NotImplemented
262

    
263
    if query.CQ_QUEUE_DRAINED in self.requested_data:
264
      drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
265
    else:
266
      drain_flag = NotImplemented
267

    
268
    if query.CQ_WATCHER_PAUSE in self.requested_data:
269
      master_node_uuid = lu.cfg.GetMasterNode()
270

    
271
      result = lu.rpc.call_get_watcher_pause(master_node_uuid)
272
      result.Raise("Can't retrieve watcher pause from master node '%s'" %
273
                   lu.cfg.GetMasterNodeName())
274

    
275
      watcher_pause = result.payload
276
    else:
277
      watcher_pause = NotImplemented
278

    
279
    return query.ClusterQueryData(cluster, nodes, drain_flag, watcher_pause)
280

    
281

    
282
class LUClusterQuery(NoHooksLU):
283
  """Query cluster configuration.
284

285
  """
286
  REQ_BGL = False
287

    
288
  def ExpandNames(self):
289
    self.needed_locks = {}
290

    
291
  def Exec(self, feedback_fn):
292
    """Return cluster config.
293

294
    """
295
    cluster = self.cfg.GetClusterInfo()
296
    os_hvp = {}
297

    
298
    # Filter just for enabled hypervisors
299
    for os_name, hv_dict in cluster.os_hvp.items():
300
      os_hvp[os_name] = {}
301
      for hv_name, hv_params in hv_dict.items():
302
        if hv_name in cluster.enabled_hypervisors:
303
          os_hvp[os_name][hv_name] = hv_params
304

    
305
    # Convert ip_family to ip_version
306
    primary_ip_version = constants.IP4_VERSION
307
    if cluster.primary_ip_family == netutils.IP6Address.family:
308
      primary_ip_version = constants.IP6_VERSION
309

    
310
    result = {
311
      "software_version": constants.RELEASE_VERSION,
312
      "protocol_version": constants.PROTOCOL_VERSION,
313
      "config_version": constants.CONFIG_VERSION,
314
      "os_api_version": max(constants.OS_API_VERSIONS),
315
      "export_version": constants.EXPORT_VERSION,
316
      "vcs_version": constants.VCS_VERSION,
317
      "architecture": runtime.GetArchInfo(),
318
      "name": cluster.cluster_name,
319
      "master": self.cfg.GetMasterNodeName(),
320
      "default_hypervisor": cluster.primary_hypervisor,
321
      "enabled_hypervisors": cluster.enabled_hypervisors,
322
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
323
                        for hypervisor_name in cluster.enabled_hypervisors]),
324
      "os_hvp": os_hvp,
325
      "beparams": cluster.beparams,
326
      "osparams": cluster.osparams,
327
      "ipolicy": cluster.ipolicy,
328
      "nicparams": cluster.nicparams,
329
      "ndparams": cluster.ndparams,
330
      "diskparams": cluster.diskparams,
331
      "candidate_pool_size": cluster.candidate_pool_size,
332
      "master_netdev": cluster.master_netdev,
333
      "master_netmask": cluster.master_netmask,
334
      "use_external_mip_script": cluster.use_external_mip_script,
335
      "volume_group_name": cluster.volume_group_name,
336
      "drbd_usermode_helper": cluster.drbd_usermode_helper,
337
      "file_storage_dir": cluster.file_storage_dir,
338
      "shared_file_storage_dir": cluster.shared_file_storage_dir,
339
      "maintain_node_health": cluster.maintain_node_health,
340
      "ctime": cluster.ctime,
341
      "mtime": cluster.mtime,
342
      "uuid": cluster.uuid,
343
      "tags": list(cluster.GetTags()),
344
      "uid_pool": cluster.uid_pool,
345
      "default_iallocator": cluster.default_iallocator,
346
      "reserved_lvs": cluster.reserved_lvs,
347
      "primary_ip_version": primary_ip_version,
348
      "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
349
      "hidden_os": cluster.hidden_os,
350
      "blacklisted_os": cluster.blacklisted_os,
351
      "enabled_disk_templates": cluster.enabled_disk_templates,
352
      }
353

    
354
    return result
355

    
356

    
357
class LUClusterRedistConf(NoHooksLU):
358
  """Force the redistribution of cluster configuration.
359

360
  This is a very simple LU.
361

362
  """
363
  REQ_BGL = False
364

    
365
  def ExpandNames(self):
366
    self.needed_locks = {
367
      locking.LEVEL_NODE: locking.ALL_SET,
368
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
369
    }
370
    self.share_locks = ShareAll()
371

    
372
  def Exec(self, feedback_fn):
373
    """Redistribute the configuration.
374

375
    """
376
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
377
    RedistributeAncillaryFiles(self)
378

    
379

    
380
class LUClusterRename(LogicalUnit):
381
  """Rename the cluster.
382

383
  """
384
  HPATH = "cluster-rename"
385
  HTYPE = constants.HTYPE_CLUSTER
386

    
387
  def BuildHooksEnv(self):
388
    """Build hooks env.
389

390
    """
391
    return {
392
      "OP_TARGET": self.cfg.GetClusterName(),
393
      "NEW_NAME": self.op.name,
394
      }
395

    
396
  def BuildHooksNodes(self):
397
    """Build hooks nodes.
398

399
    """
400
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
401

    
402
  def CheckPrereq(self):
403
    """Verify that the passed name is a valid one.
404

405
    """
406
    hostname = netutils.GetHostname(name=self.op.name,
407
                                    family=self.cfg.GetPrimaryIPFamily())
408

    
409
    new_name = hostname.name
410
    self.ip = new_ip = hostname.ip
411
    old_name = self.cfg.GetClusterName()
412
    old_ip = self.cfg.GetMasterIP()
413
    if new_name == old_name and new_ip == old_ip:
414
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
415
                                 " cluster has changed",
416
                                 errors.ECODE_INVAL)
417
    if new_ip != old_ip:
418
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
419
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
420
                                   " reachable on the network" %
421
                                   new_ip, errors.ECODE_NOTUNIQUE)
422

    
423
    self.op.name = new_name
424

    
425
  def Exec(self, feedback_fn):
426
    """Rename the cluster.
427

428
    """
429
    clustername = self.op.name
430
    new_ip = self.ip
431

    
432
    # shutdown the master IP
433
    master_params = self.cfg.GetMasterNetworkParameters()
434
    ems = self.cfg.GetUseExternalMipScript()
435
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
436
                                                     master_params, ems)
437
    result.Raise("Could not disable the master role")
438

    
439
    try:
440
      cluster = self.cfg.GetClusterInfo()
441
      cluster.cluster_name = clustername
442
      cluster.master_ip = new_ip
443
      self.cfg.Update(cluster, feedback_fn)
444

    
445
      # update the known hosts file
446
      ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
447
      node_list = self.cfg.GetOnlineNodeList()
448
      try:
449
        node_list.remove(master_params.uuid)
450
      except ValueError:
451
        pass
452
      UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
453
    finally:
454
      master_params.ip = new_ip
455
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
456
                                                     master_params, ems)
457
      result.Warn("Could not re-enable the master role on the master,"
458
                  " please restart manually", self.LogWarning)
459

    
460
    return clustername
461

    
462

    
463
class LUClusterRepairDiskSizes(NoHooksLU):
464
  """Verifies the cluster disks sizes.
465

466
  """
467
  REQ_BGL = False
468

    
469
  def ExpandNames(self):
470
    if self.op.instances:
471
      (_, self.wanted_names) = GetWantedInstances(self, self.op.instances)
472
      # Not getting the node allocation lock as only a specific set of
473
      # instances (and their nodes) is going to be acquired
474
      self.needed_locks = {
475
        locking.LEVEL_NODE_RES: [],
476
        locking.LEVEL_INSTANCE: self.wanted_names,
477
        }
478
      self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
479
    else:
480
      self.wanted_names = None
481
      self.needed_locks = {
482
        locking.LEVEL_NODE_RES: locking.ALL_SET,
483
        locking.LEVEL_INSTANCE: locking.ALL_SET,
484

    
485
        # This opcode is acquires the node locks for all instances
486
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
487
        }
488

    
489
    self.share_locks = {
490
      locking.LEVEL_NODE_RES: 1,
491
      locking.LEVEL_INSTANCE: 0,
492
      locking.LEVEL_NODE_ALLOC: 1,
493
      }
494

    
495
  def DeclareLocks(self, level):
496
    if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
497
      self._LockInstancesNodes(primary_only=True, level=level)
498

    
499
  def CheckPrereq(self):
500
    """Check prerequisites.
501

502
    This only checks the optional instance list against the existing names.
503

504
    """
505
    if self.wanted_names is None:
506
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
507

    
508
    self.wanted_instances = \
509
        map(compat.snd, self.cfg.GetMultiInstanceInfoByName(self.wanted_names))
510

    
511
  def _EnsureChildSizes(self, disk):
512
    """Ensure children of the disk have the needed disk size.
513

514
    This is valid mainly for DRBD8 and fixes an issue where the
515
    children have smaller disk size.
516

517
    @param disk: an L{ganeti.objects.Disk} object
518

519
    """
520
    if disk.dev_type == constants.DT_DRBD8:
521
      assert disk.children, "Empty children for DRBD8?"
522
      fchild = disk.children[0]
523
      mismatch = fchild.size < disk.size
524
      if mismatch:
525
        self.LogInfo("Child disk has size %d, parent %d, fixing",
526
                     fchild.size, disk.size)
527
        fchild.size = disk.size
528

    
529
      # and we recurse on this child only, not on the metadev
530
      return self._EnsureChildSizes(fchild) or mismatch
531
    else:
532
      return False
533

    
534
  def Exec(self, feedback_fn):
535
    """Verify the size of cluster disks.
536

537
    """
538
    # TODO: check child disks too
539
    # TODO: check differences in size between primary/secondary nodes
540
    per_node_disks = {}
541
    for instance in self.wanted_instances:
542
      pnode = instance.primary_node
543
      if pnode not in per_node_disks:
544
        per_node_disks[pnode] = []
545
      for idx, disk in enumerate(instance.disks):
546
        per_node_disks[pnode].append((instance, idx, disk))
547

    
548
    assert not (frozenset(per_node_disks.keys()) -
549
                self.owned_locks(locking.LEVEL_NODE_RES)), \
550
      "Not owning correct locks"
551
    assert not self.owned_locks(locking.LEVEL_NODE)
552

    
553
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
554
                                               per_node_disks.keys())
555

    
556
    changed = []
557
    for node_uuid, dskl in per_node_disks.items():
558
      if not dskl:
559
        # no disks on the node
560
        continue
561

    
562
      newl = [([v[2].Copy()], v[0]) for v in dskl]
563
      node_name = self.cfg.GetNodeName(node_uuid)
564
      result = self.rpc.call_blockdev_getdimensions(node_uuid, newl)
565
      if result.fail_msg:
566
        self.LogWarning("Failure in blockdev_getdimensions call to node"
567
                        " %s, ignoring", node_name)
568
        continue
569
      if len(result.payload) != len(dskl):
570
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
571
                        " result.payload=%s", node_name, len(dskl),
572
                        result.payload)
573
        self.LogWarning("Invalid result from node %s, ignoring node results",
574
                        node_name)
575
        continue
576
      for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
577
        if dimensions is None:
578
          self.LogWarning("Disk %d of instance %s did not return size"
579
                          " information, ignoring", idx, instance.name)
580
          continue
581
        if not isinstance(dimensions, (tuple, list)):
582
          self.LogWarning("Disk %d of instance %s did not return valid"
583
                          " dimension information, ignoring", idx,
584
                          instance.name)
585
          continue
586
        (size, spindles) = dimensions
587
        if not isinstance(size, (int, long)):
588
          self.LogWarning("Disk %d of instance %s did not return valid"
589
                          " size information, ignoring", idx, instance.name)
590
          continue
591
        size = size >> 20
592
        if size != disk.size:
593
          self.LogInfo("Disk %d of instance %s has mismatched size,"
594
                       " correcting: recorded %d, actual %d", idx,
595
                       instance.name, disk.size, size)
596
          disk.size = size
597
          self.cfg.Update(instance, feedback_fn)
598
          changed.append((instance.name, idx, "size", size))
599
        if es_flags[node_uuid]:
600
          if spindles is None:
601
            self.LogWarning("Disk %d of instance %s did not return valid"
602
                            " spindles information, ignoring", idx,
603
                            instance.name)
604
          elif disk.spindles is None or disk.spindles != spindles:
605
            self.LogInfo("Disk %d of instance %s has mismatched spindles,"
606
                         " correcting: recorded %s, actual %s",
607
                         idx, instance.name, disk.spindles, spindles)
608
            disk.spindles = spindles
609
            self.cfg.Update(instance, feedback_fn)
610
            changed.append((instance.name, idx, "spindles", disk.spindles))
611
        if self._EnsureChildSizes(disk):
612
          self.cfg.Update(instance, feedback_fn)
613
          changed.append((instance.name, idx, "size", disk.size))
614
    return changed
615

    
616

    
617
def _ValidateNetmask(cfg, netmask):
618
  """Checks if a netmask is valid.
619

620
  @type cfg: L{config.ConfigWriter}
621
  @param cfg: The cluster configuration
622
  @type netmask: int
623
  @param netmask: the netmask to be verified
624
  @raise errors.OpPrereqError: if the validation fails
625

626
  """
627
  ip_family = cfg.GetPrimaryIPFamily()
628
  try:
629
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
630
  except errors.ProgrammerError:
631
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
632
                               ip_family, errors.ECODE_INVAL)
633
  if not ipcls.ValidateNetmask(netmask):
634
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
635
                               (netmask), errors.ECODE_INVAL)
636

    
637

    
638
def CheckFileBasedStoragePathVsEnabledDiskTemplates(
639
    logging_warn_fn, file_storage_dir, enabled_disk_templates,
640
    file_disk_template):
641
  """Checks whether the given file-based storage directory is acceptable.
642

643
  Note: This function is public, because it is also used in bootstrap.py.
644

645
  @type logging_warn_fn: function
646
  @param logging_warn_fn: function which accepts a string and logs it
647
  @type file_storage_dir: string
648
  @param file_storage_dir: the directory to be used for file-based instances
649
  @type enabled_disk_templates: list of string
650
  @param enabled_disk_templates: the list of enabled disk templates
651
  @type file_disk_template: string
652
  @param file_disk_template: the file-based disk template for which the
653
      path should be checked
654

655
  """
656
  assert (file_disk_template in
657
          utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
658
  file_storage_enabled = file_disk_template in enabled_disk_templates
659
  if file_storage_dir is not None:
660
    if file_storage_dir == "":
661
      if file_storage_enabled:
662
        raise errors.OpPrereqError(
663
            "Unsetting the '%s' storage directory while having '%s' storage"
664
            " enabled is not permitted." %
665
            (file_disk_template, file_disk_template))
666
    else:
667
      if not file_storage_enabled:
668
        logging_warn_fn(
669
            "Specified a %s storage directory, although %s storage is not"
670
            " enabled." % (file_disk_template, file_disk_template))
671
  else:
672
    raise errors.ProgrammerError("Received %s storage dir with value"
673
                                 " 'None'." % file_disk_template)
674

    
675

    
676
def CheckFileStoragePathVsEnabledDiskTemplates(
677
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
678
  """Checks whether the given file storage directory is acceptable.
679

680
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
681

682
  """
683
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
684
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
685
      constants.DT_FILE)
686

    
687

    
688
def CheckSharedFileStoragePathVsEnabledDiskTemplates(
689
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
690
  """Checks whether the given shared file storage directory is acceptable.
691

692
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
693

694
  """
695
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
696
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
697
      constants.DT_SHARED_FILE)
698

    
699

    
700
class LUClusterSetParams(LogicalUnit):
701
  """Change the parameters of the cluster.
702

703
  """
704
  HPATH = "cluster-modify"
705
  HTYPE = constants.HTYPE_CLUSTER
706
  REQ_BGL = False
707

    
708
  def CheckArguments(self):
709
    """Check parameters
710

711
    """
712
    if self.op.uid_pool:
713
      uidpool.CheckUidPool(self.op.uid_pool)
714

    
715
    if self.op.add_uids:
716
      uidpool.CheckUidPool(self.op.add_uids)
717

    
718
    if self.op.remove_uids:
719
      uidpool.CheckUidPool(self.op.remove_uids)
720

    
721
    if self.op.master_netmask is not None:
722
      _ValidateNetmask(self.cfg, self.op.master_netmask)
723

    
724
    if self.op.diskparams:
725
      for dt_params in self.op.diskparams.values():
726
        utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
727
      try:
728
        utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
729
        CheckDiskAccessModeValidity(self.op.diskparams)
730
      except errors.OpPrereqError, err:
731
        raise errors.OpPrereqError("While verify diskparams options: %s" % err,
732
                                   errors.ECODE_INVAL)
733

    
734
  def ExpandNames(self):
735
    # FIXME: in the future maybe other cluster params won't require checking on
736
    # all nodes to be modified.
737
    # FIXME: This opcode changes cluster-wide settings. Is acquiring all
738
    # resource locks the right thing, shouldn't it be the BGL instead?
739
    self.needed_locks = {
740
      locking.LEVEL_NODE: locking.ALL_SET,
741
      locking.LEVEL_INSTANCE: locking.ALL_SET,
742
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
743
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
744
    }
745
    self.share_locks = ShareAll()
746

    
747
  def BuildHooksEnv(self):
748
    """Build hooks env.
749

750
    """
751
    return {
752
      "OP_TARGET": self.cfg.GetClusterName(),
753
      "NEW_VG_NAME": self.op.vg_name,
754
      }
755

    
756
  def BuildHooksNodes(self):
757
    """Build hooks nodes.
758

759
    """
760
    mn = self.cfg.GetMasterNode()
761
    return ([mn], [mn])
762

    
763
  def _CheckVgName(self, node_uuids, enabled_disk_templates,
764
                   new_enabled_disk_templates):
765
    """Check the consistency of the vg name on all nodes and in case it gets
766
       unset whether there are instances still using it.
767

768
    """
769
    lvm_is_enabled = utils.IsLvmEnabled(enabled_disk_templates)
770
    lvm_gets_enabled = utils.LvmGetsEnabled(enabled_disk_templates,
771
                                            new_enabled_disk_templates)
772
    current_vg_name = self.cfg.GetVGName()
773

    
774
    if self.op.vg_name == '':
775
      if lvm_is_enabled:
776
        raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
777
                                   " disk templates are or get enabled.")
778

    
779
    if self.op.vg_name is None:
780
      if current_vg_name is None and lvm_is_enabled:
781
        raise errors.OpPrereqError("Please specify a volume group when"
782
                                   " enabling lvm-based disk-templates.")
783

    
784
    if self.op.vg_name is not None and not self.op.vg_name:
785
      if self.cfg.HasAnyDiskOfType(constants.DT_PLAIN):
786
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
787
                                   " instances exist", errors.ECODE_INVAL)
788

    
789
    if (self.op.vg_name is not None and lvm_is_enabled) or \
790
        (self.cfg.GetVGName() is not None and lvm_gets_enabled):
791
      self._CheckVgNameOnNodes(node_uuids)
792

    
793
  def _CheckVgNameOnNodes(self, node_uuids):
794
    """Check the status of the volume group on each node.
795

796
    """
797
    vglist = self.rpc.call_vg_list(node_uuids)
798
    for node_uuid in node_uuids:
799
      msg = vglist[node_uuid].fail_msg
800
      if msg:
801
        # ignoring down node
802
        self.LogWarning("Error while gathering data on node %s"
803
                        " (ignoring node): %s",
804
                        self.cfg.GetNodeName(node_uuid), msg)
805
        continue
806
      vgstatus = utils.CheckVolumeGroupSize(vglist[node_uuid].payload,
807
                                            self.op.vg_name,
808
                                            constants.MIN_VG_SIZE)
809
      if vgstatus:
810
        raise errors.OpPrereqError("Error on node '%s': %s" %
811
                                   (self.cfg.GetNodeName(node_uuid), vgstatus),
812
                                   errors.ECODE_ENVIRON)
813

    
814
  @staticmethod
815
  def _GetDiskTemplateSetsInner(op_enabled_disk_templates,
816
                                old_enabled_disk_templates):
817
    """Computes three sets of disk templates.
818

819
    @see: C{_GetDiskTemplateSets} for more details.
820

821
    """
822
    enabled_disk_templates = None
823
    new_enabled_disk_templates = []
824
    disabled_disk_templates = []
825
    if op_enabled_disk_templates:
826
      enabled_disk_templates = op_enabled_disk_templates
827
      new_enabled_disk_templates = \
828
        list(set(enabled_disk_templates)
829
             - set(old_enabled_disk_templates))
830
      disabled_disk_templates = \
831
        list(set(old_enabled_disk_templates)
832
             - set(enabled_disk_templates))
833
    else:
834
      enabled_disk_templates = old_enabled_disk_templates
835
    return (enabled_disk_templates, new_enabled_disk_templates,
836
            disabled_disk_templates)
837

    
838
  def _GetDiskTemplateSets(self, cluster):
839
    """Computes three sets of disk templates.
840

841
    The three sets are:
842
      - disk templates that will be enabled after this operation (no matter if
843
        they were enabled before or not)
844
      - disk templates that get enabled by this operation (thus haven't been
845
        enabled before.)
846
      - disk templates that get disabled by this operation
847

848
    """
849
    return self._GetDiskTemplateSetsInner(self.op.enabled_disk_templates,
850
                                          cluster.enabled_disk_templates)
851

    
852
  def _CheckIpolicy(self, cluster, enabled_disk_templates):
853
    """Checks the ipolicy.
854

855
    @type cluster: C{objects.Cluster}
856
    @param cluster: the cluster's configuration
857
    @type enabled_disk_templates: list of string
858
    @param enabled_disk_templates: list of (possibly newly) enabled disk
859
      templates
860

861
    """
862
    # FIXME: write unit tests for this
863
    if self.op.ipolicy:
864
      self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
865
                                           group_policy=False)
866

    
867
      CheckIpolicyVsDiskTemplates(self.new_ipolicy,
868
                                  enabled_disk_templates)
869

    
870
      all_instances = self.cfg.GetAllInstancesInfo().values()
871
      violations = set()
872
      for group in self.cfg.GetAllNodeGroupsInfo().values():
873
        instances = frozenset([inst for inst in all_instances
874
                               if compat.any(nuuid in group.members
875
                                             for nuuid in inst.all_nodes)])
876
        new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
877
        ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
878
        new = ComputeNewInstanceViolations(ipol, new_ipolicy, instances,
879
                                           self.cfg)
880
        if new:
881
          violations.update(new)
882

    
883
      if violations:
884
        self.LogWarning("After the ipolicy change the following instances"
885
                        " violate them: %s",
886
                        utils.CommaJoin(utils.NiceSort(violations)))
887
    else:
888
      CheckIpolicyVsDiskTemplates(cluster.ipolicy,
889
                                  enabled_disk_templates)
890

    
891
  def _CheckDrbdHelperOnNodes(self, drbd_helper, node_uuids):
892
    """Checks whether the set DRBD helper actually exists on the nodes.
893

894
    @type drbd_helper: string
895
    @param drbd_helper: path of the drbd usermode helper binary
896
    @type node_uuids: list of strings
897
    @param node_uuids: list of node UUIDs to check for the helper
898

899
    """
900
    # checks given drbd helper on all nodes
901
    helpers = self.rpc.call_drbd_helper(node_uuids)
902
    for (_, ninfo) in self.cfg.GetMultiNodeInfo(node_uuids):
903
      if ninfo.offline:
904
        self.LogInfo("Not checking drbd helper on offline node %s",
905
                     ninfo.name)
906
        continue
907
      msg = helpers[ninfo.uuid].fail_msg
908
      if msg:
909
        raise errors.OpPrereqError("Error checking drbd helper on node"
910
                                   " '%s': %s" % (ninfo.name, msg),
911
                                   errors.ECODE_ENVIRON)
912
      node_helper = helpers[ninfo.uuid].payload
913
      if node_helper != drbd_helper:
914
        raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
915
                                   (ninfo.name, node_helper),
916
                                   errors.ECODE_ENVIRON)
917

    
918
  def _CheckDrbdHelper(self, node_uuids, drbd_enabled, drbd_gets_enabled):
919
    """Check the DRBD usermode helper.
920

921
    @type node_uuids: list of strings
922
    @param node_uuids: a list of nodes' UUIDs
923
    @type drbd_enabled: boolean
924
    @param drbd_enabled: whether DRBD will be enabled after this operation
925
      (no matter if it was disabled before or not)
926
    @type drbd_gets_enabled: boolen
927
    @param drbd_gets_enabled: true if DRBD was disabled before this
928
      operation, but will be enabled afterwards
929

930
    """
931
    if self.op.drbd_helper == '':
932
      if drbd_enabled:
933
        raise errors.OpPrereqError("Cannot disable drbd helper while"
934
                                   " DRBD is enabled.")
935
      if self.cfg.HasAnyDiskOfType(constants.DT_DRBD8):
936
        raise errors.OpPrereqError("Cannot disable drbd helper while"
937
                                   " drbd-based instances exist",
938
                                   errors.ECODE_INVAL)
939

    
940
    else:
941
      if self.op.drbd_helper is not None and drbd_enabled:
942
        self._CheckDrbdHelperOnNodes(self.op.drbd_helper, node_uuids)
943
      else:
944
        if drbd_gets_enabled:
945
          current_drbd_helper = self.cfg.GetClusterInfo().drbd_usermode_helper
946
          if current_drbd_helper is not None:
947
            self._CheckDrbdHelperOnNodes(current_drbd_helper, node_uuids)
948
          else:
949
            raise errors.OpPrereqError("Cannot enable DRBD without a"
950
                                       " DRBD usermode helper set.")
951

    
952
  def _CheckInstancesOfDisabledDiskTemplates(
953
      self, disabled_disk_templates):
954
    """Check whether we try to disable a disk template that is in use.
955

956
    @type disabled_disk_templates: list of string
957
    @param disabled_disk_templates: list of disk templates that are going to
958
      be disabled by this operation
959

960
    """
961
    for disk_template in disabled_disk_templates:
962
      if self.cfg.HasAnyDiskOfType(disk_template):
963
        raise errors.OpPrereqError(
964
            "Cannot disable disk template '%s', because there is at least one"
965
            " instance using it." % disk_template)
966

    
967
  def CheckPrereq(self):
968
    """Check prerequisites.
969

970
    This checks whether the given params don't conflict and
971
    if the given volume group is valid.
972

973
    """
974
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
975
    self.cluster = cluster = self.cfg.GetClusterInfo()
976

    
977
    vm_capable_node_uuids = [node.uuid
978
                             for node in self.cfg.GetAllNodesInfo().values()
979
                             if node.uuid in node_uuids and node.vm_capable]
980

    
981
    (enabled_disk_templates, new_enabled_disk_templates,
982
      disabled_disk_templates) = self._GetDiskTemplateSets(cluster)
983
    self._CheckInstancesOfDisabledDiskTemplates(disabled_disk_templates)
984

    
985
    self._CheckVgName(vm_capable_node_uuids, enabled_disk_templates,
986
                      new_enabled_disk_templates)
987

    
988
    if self.op.file_storage_dir is not None:
989
      CheckFileStoragePathVsEnabledDiskTemplates(
990
          self.LogWarning, self.op.file_storage_dir, enabled_disk_templates)
991

    
992
    if self.op.shared_file_storage_dir is not None:
993
      CheckSharedFileStoragePathVsEnabledDiskTemplates(
994
          self.LogWarning, self.op.shared_file_storage_dir,
995
          enabled_disk_templates)
996

    
997
    drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
998
    drbd_gets_enabled = constants.DT_DRBD8 in new_enabled_disk_templates
999
    self._CheckDrbdHelper(node_uuids, drbd_enabled, drbd_gets_enabled)
1000

    
1001
    # validate params changes
1002
    if self.op.beparams:
1003
      objects.UpgradeBeParams(self.op.beparams)
1004
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1005
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
1006

    
1007
    if self.op.ndparams:
1008
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
1009
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
1010

    
1011
      # TODO: we need a more general way to handle resetting
1012
      # cluster-level parameters to default values
1013
      if self.new_ndparams["oob_program"] == "":
1014
        self.new_ndparams["oob_program"] = \
1015
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
1016

    
1017
    if self.op.hv_state:
1018
      new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
1019
                                           self.cluster.hv_state_static)
1020
      self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
1021
                               for hv, values in new_hv_state.items())
1022

    
1023
    if self.op.disk_state:
1024
      new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
1025
                                               self.cluster.disk_state_static)
1026
      self.new_disk_state = \
1027
        dict((storage, dict((name, cluster.SimpleFillDiskState(values))
1028
                            for name, values in svalues.items()))
1029
             for storage, svalues in new_disk_state.items())
1030

    
1031
    self._CheckIpolicy(cluster, enabled_disk_templates)
1032

    
1033
    if self.op.nicparams:
1034
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1035
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
1036
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
1037
      nic_errors = []
1038

    
1039
      # check all instances for consistency
1040
      for instance in self.cfg.GetAllInstancesInfo().values():
1041
        for nic_idx, nic in enumerate(instance.nics):
1042
          params_copy = copy.deepcopy(nic.nicparams)
1043
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
1044

    
1045
          # check parameter syntax
1046
          try:
1047
            objects.NIC.CheckParameterSyntax(params_filled)
1048
          except errors.ConfigurationError, err:
1049
            nic_errors.append("Instance %s, nic/%d: %s" %
1050
                              (instance.name, nic_idx, err))
1051

    
1052
          # if we're moving instances to routed, check that they have an ip
1053
          target_mode = params_filled[constants.NIC_MODE]
1054
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
1055
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
1056
                              " address" % (instance.name, nic_idx))
1057
      if nic_errors:
1058
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
1059
                                   "\n".join(nic_errors), errors.ECODE_INVAL)
1060

    
1061
    # hypervisor list/parameters
1062
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
1063
    if self.op.hvparams:
1064
      for hv_name, hv_dict in self.op.hvparams.items():
1065
        if hv_name not in self.new_hvparams:
1066
          self.new_hvparams[hv_name] = hv_dict
1067
        else:
1068
          self.new_hvparams[hv_name].update(hv_dict)
1069

    
1070
    # disk template parameters
1071
    self.new_diskparams = objects.FillDict(cluster.diskparams, {})
1072
    if self.op.diskparams:
1073
      for dt_name, dt_params in self.op.diskparams.items():
1074
        if dt_name not in self.new_diskparams:
1075
          self.new_diskparams[dt_name] = dt_params
1076
        else:
1077
          self.new_diskparams[dt_name].update(dt_params)
1078
      CheckDiskAccessModeConsistency(self.op.diskparams, self.cfg)
1079

    
1080
    # os hypervisor parameters
1081
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
1082
    if self.op.os_hvp:
1083
      for os_name, hvs in self.op.os_hvp.items():
1084
        if os_name not in self.new_os_hvp:
1085
          self.new_os_hvp[os_name] = hvs
1086
        else:
1087
          for hv_name, hv_dict in hvs.items():
1088
            if hv_dict is None:
1089
              # Delete if it exists
1090
              self.new_os_hvp[os_name].pop(hv_name, None)
1091
            elif hv_name not in self.new_os_hvp[os_name]:
1092
              self.new_os_hvp[os_name][hv_name] = hv_dict
1093
            else:
1094
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
1095

    
1096
    # os parameters
1097
    self.new_osp = objects.FillDict(cluster.osparams, {})
1098
    if self.op.osparams:
1099
      for os_name, osp in self.op.osparams.items():
1100
        if os_name not in self.new_osp:
1101
          self.new_osp[os_name] = {}
1102

    
1103
        self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
1104
                                                 use_none=True)
1105

    
1106
        if not self.new_osp[os_name]:
1107
          # we removed all parameters
1108
          del self.new_osp[os_name]
1109
        else:
1110
          # check the parameter validity (remote check)
1111
          CheckOSParams(self, False, [self.cfg.GetMasterNode()],
1112
                        os_name, self.new_osp[os_name])
1113

    
1114
    # changes to the hypervisor list
1115
    if self.op.enabled_hypervisors is not None:
1116
      self.hv_list = self.op.enabled_hypervisors
1117
      for hv in self.hv_list:
1118
        # if the hypervisor doesn't already exist in the cluster
1119
        # hvparams, we initialize it to empty, and then (in both
1120
        # cases) we make sure to fill the defaults, as we might not
1121
        # have a complete defaults list if the hypervisor wasn't
1122
        # enabled before
1123
        if hv not in new_hvp:
1124
          new_hvp[hv] = {}
1125
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
1126
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
1127
    else:
1128
      self.hv_list = cluster.enabled_hypervisors
1129

    
1130
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1131
      # either the enabled list has changed, or the parameters have, validate
1132
      for hv_name, hv_params in self.new_hvparams.items():
1133
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1134
            (self.op.enabled_hypervisors and
1135
             hv_name in self.op.enabled_hypervisors)):
1136
          # either this is a new hypervisor, or its parameters have changed
1137
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1138
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1139
          hv_class.CheckParameterSyntax(hv_params)
1140
          CheckHVParams(self, node_uuids, hv_name, hv_params)
1141

    
1142
    self._CheckDiskTemplateConsistency()
1143

    
1144
    if self.op.os_hvp:
1145
      # no need to check any newly-enabled hypervisors, since the
1146
      # defaults have already been checked in the above code-block
1147
      for os_name, os_hvp in self.new_os_hvp.items():
1148
        for hv_name, hv_params in os_hvp.items():
1149
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1150
          # we need to fill in the new os_hvp on top of the actual hv_p
1151
          cluster_defaults = self.new_hvparams.get(hv_name, {})
1152
          new_osp = objects.FillDict(cluster_defaults, hv_params)
1153
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1154
          hv_class.CheckParameterSyntax(new_osp)
1155
          CheckHVParams(self, node_uuids, hv_name, new_osp)
1156

    
1157
    if self.op.default_iallocator:
1158
      alloc_script = utils.FindFile(self.op.default_iallocator,
1159
                                    constants.IALLOCATOR_SEARCH_PATH,
1160
                                    os.path.isfile)
1161
      if alloc_script is None:
1162
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
1163
                                   " specified" % self.op.default_iallocator,
1164
                                   errors.ECODE_INVAL)
1165

    
1166
  def _CheckDiskTemplateConsistency(self):
1167
    """Check whether the disk templates that are going to be disabled
1168
       are still in use by some instances.
1169

1170
    """
1171
    if self.op.enabled_disk_templates:
1172
      cluster = self.cfg.GetClusterInfo()
1173
      instances = self.cfg.GetAllInstancesInfo()
1174

    
1175
      disk_templates_to_remove = set(cluster.enabled_disk_templates) \
1176
        - set(self.op.enabled_disk_templates)
1177
      for instance in instances.itervalues():
1178
        if instance.disk_template in disk_templates_to_remove:
1179
          raise errors.OpPrereqError("Cannot disable disk template '%s',"
1180
                                     " because instance '%s' is using it." %
1181
                                     (instance.disk_template, instance.name))
1182

    
1183
  def _SetVgName(self, feedback_fn):
1184
    """Determines and sets the new volume group name.
1185

1186
    """
1187
    if self.op.vg_name is not None:
1188
      new_volume = self.op.vg_name
1189
      if not new_volume:
1190
        new_volume = None
1191
      if new_volume != self.cfg.GetVGName():
1192
        self.cfg.SetVGName(new_volume)
1193
      else:
1194
        feedback_fn("Cluster LVM configuration already in desired"
1195
                    " state, not changing")
1196

    
1197
  def _SetFileStorageDir(self, feedback_fn):
1198
    """Set the file storage directory.
1199

1200
    """
1201
    if self.op.file_storage_dir is not None:
1202
      if self.cluster.file_storage_dir == self.op.file_storage_dir:
1203
        feedback_fn("Global file storage dir already set to value '%s'"
1204
                    % self.cluster.file_storage_dir)
1205
      else:
1206
        self.cluster.file_storage_dir = self.op.file_storage_dir
1207

    
1208
  def _SetDrbdHelper(self, feedback_fn):
1209
    """Set the DRBD usermode helper.
1210

1211
    """
1212
    if self.op.drbd_helper is not None:
1213
      if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
1214
        feedback_fn("Note that you specified a drbd user helper, but did not"
1215
                    " enable the drbd disk template.")
1216
      new_helper = self.op.drbd_helper
1217
      if not new_helper:
1218
        new_helper = None
1219
      if new_helper != self.cfg.GetDRBDHelper():
1220
        self.cfg.SetDRBDHelper(new_helper)
1221
      else:
1222
        feedback_fn("Cluster DRBD helper already in desired state,"
1223
                    " not changing")
1224

    
1225
  def Exec(self, feedback_fn):
1226
    """Change the parameters of the cluster.
1227

1228
    """
1229
    if self.op.enabled_disk_templates:
1230
      self.cluster.enabled_disk_templates = \
1231
        list(self.op.enabled_disk_templates)
1232

    
1233
    self._SetVgName(feedback_fn)
1234
    self._SetFileStorageDir(feedback_fn)
1235
    self._SetDrbdHelper(feedback_fn)
1236

    
1237
    if self.op.hvparams:
1238
      self.cluster.hvparams = self.new_hvparams
1239
    if self.op.os_hvp:
1240
      self.cluster.os_hvp = self.new_os_hvp
1241
    if self.op.enabled_hypervisors is not None:
1242
      self.cluster.hvparams = self.new_hvparams
1243
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1244
    if self.op.beparams:
1245
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1246
    if self.op.nicparams:
1247
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1248
    if self.op.ipolicy:
1249
      self.cluster.ipolicy = self.new_ipolicy
1250
    if self.op.osparams:
1251
      self.cluster.osparams = self.new_osp
1252
    if self.op.ndparams:
1253
      self.cluster.ndparams = self.new_ndparams
1254
    if self.op.diskparams:
1255
      self.cluster.diskparams = self.new_diskparams
1256
    if self.op.hv_state:
1257
      self.cluster.hv_state_static = self.new_hv_state
1258
    if self.op.disk_state:
1259
      self.cluster.disk_state_static = self.new_disk_state
1260

    
1261
    if self.op.candidate_pool_size is not None:
1262
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1263
      # we need to update the pool size here, otherwise the save will fail
1264
      AdjustCandidatePool(self, [])
1265

    
1266
    if self.op.maintain_node_health is not None:
1267
      if self.op.maintain_node_health and not constants.ENABLE_CONFD:
1268
        feedback_fn("Note: CONFD was disabled at build time, node health"
1269
                    " maintenance is not useful (still enabling it)")
1270
      self.cluster.maintain_node_health = self.op.maintain_node_health
1271

    
1272
    if self.op.modify_etc_hosts is not None:
1273
      self.cluster.modify_etc_hosts = self.op.modify_etc_hosts
1274

    
1275
    if self.op.prealloc_wipe_disks is not None:
1276
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
1277

    
1278
    if self.op.add_uids is not None:
1279
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
1280

    
1281
    if self.op.remove_uids is not None:
1282
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
1283

    
1284
    if self.op.uid_pool is not None:
1285
      self.cluster.uid_pool = self.op.uid_pool
1286

    
1287
    if self.op.default_iallocator is not None:
1288
      self.cluster.default_iallocator = self.op.default_iallocator
1289

    
1290
    if self.op.reserved_lvs is not None:
1291
      self.cluster.reserved_lvs = self.op.reserved_lvs
1292

    
1293
    if self.op.use_external_mip_script is not None:
1294
      self.cluster.use_external_mip_script = self.op.use_external_mip_script
1295

    
1296
    def helper_os(aname, mods, desc):
1297
      desc += " OS list"
1298
      lst = getattr(self.cluster, aname)
1299
      for key, val in mods:
1300
        if key == constants.DDM_ADD:
1301
          if val in lst:
1302
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1303
          else:
1304
            lst.append(val)
1305
        elif key == constants.DDM_REMOVE:
1306
          if val in lst:
1307
            lst.remove(val)
1308
          else:
1309
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1310
        else:
1311
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
1312

    
1313
    if self.op.hidden_os:
1314
      helper_os("hidden_os", self.op.hidden_os, "hidden")
1315

    
1316
    if self.op.blacklisted_os:
1317
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1318

    
1319
    if self.op.master_netdev:
1320
      master_params = self.cfg.GetMasterNetworkParameters()
1321
      ems = self.cfg.GetUseExternalMipScript()
1322
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
1323
                  self.cluster.master_netdev)
1324
      result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
1325
                                                       master_params, ems)
1326
      if not self.op.force:
1327
        result.Raise("Could not disable the master ip")
1328
      else:
1329
        if result.fail_msg:
1330
          msg = ("Could not disable the master ip (continuing anyway): %s" %
1331
                 result.fail_msg)
1332
          feedback_fn(msg)
1333
      feedback_fn("Changing master_netdev from %s to %s" %
1334
                  (master_params.netdev, self.op.master_netdev))
1335
      self.cluster.master_netdev = self.op.master_netdev
1336

    
1337
    if self.op.master_netmask:
1338
      master_params = self.cfg.GetMasterNetworkParameters()
1339
      feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1340
      result = self.rpc.call_node_change_master_netmask(
1341
                 master_params.uuid, master_params.netmask,
1342
                 self.op.master_netmask, master_params.ip,
1343
                 master_params.netdev)
1344
      result.Warn("Could not change the master IP netmask", feedback_fn)
1345
      self.cluster.master_netmask = self.op.master_netmask
1346

    
1347
    self.cfg.Update(self.cluster, feedback_fn)
1348

    
1349
    if self.op.master_netdev:
1350
      master_params = self.cfg.GetMasterNetworkParameters()
1351
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
1352
                  self.op.master_netdev)
1353
      ems = self.cfg.GetUseExternalMipScript()
1354
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
1355
                                                     master_params, ems)
1356
      result.Warn("Could not re-enable the master ip on the master,"
1357
                  " please restart manually", self.LogWarning)
1358

    
1359

    
1360
class LUClusterVerify(NoHooksLU):
1361
  """Submits all jobs necessary to verify the cluster.
1362

1363
  """
1364
  REQ_BGL = False
1365

    
1366
  def ExpandNames(self):
1367
    self.needed_locks = {}
1368

    
1369
  def Exec(self, feedback_fn):
1370
    jobs = []
1371

    
1372
    if self.op.group_name:
1373
      groups = [self.op.group_name]
1374
      depends_fn = lambda: None
1375
    else:
1376
      groups = self.cfg.GetNodeGroupList()
1377

    
1378
      # Verify global configuration
1379
      jobs.append([
1380
        opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1381
        ])
1382

    
1383
      # Always depend on global verification
1384
      depends_fn = lambda: [(-len(jobs), [])]
1385

    
1386
    jobs.extend(
1387
      [opcodes.OpClusterVerifyGroup(group_name=group,
1388
                                    ignore_errors=self.op.ignore_errors,
1389
                                    depends=depends_fn())]
1390
      for group in groups)
1391

    
1392
    # Fix up all parameters
1393
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1394
      op.debug_simulate_errors = self.op.debug_simulate_errors
1395
      op.verbose = self.op.verbose
1396
      op.error_codes = self.op.error_codes
1397
      try:
1398
        op.skip_checks = self.op.skip_checks
1399
      except AttributeError:
1400
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1401

    
1402
    return ResultWithJobs(jobs)
1403

    
1404

    
1405
class _VerifyErrors(object):
1406
  """Mix-in for cluster/group verify LUs.
1407

1408
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1409
  self.op and self._feedback_fn to be available.)
1410

1411
  """
1412

    
1413
  ETYPE_FIELD = "code"
1414
  ETYPE_ERROR = "ERROR"
1415
  ETYPE_WARNING = "WARNING"
1416

    
1417
  def _Error(self, ecode, item, msg, *args, **kwargs):
1418
    """Format an error message.
1419

1420
    Based on the opcode's error_codes parameter, either format a
1421
    parseable error code, or a simpler error string.
1422

1423
    This must be called only from Exec and functions called from Exec.
1424

1425
    """
1426
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1427
    itype, etxt, _ = ecode
1428
    # If the error code is in the list of ignored errors, demote the error to a
1429
    # warning
1430
    if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1431
      ltype = self.ETYPE_WARNING
1432
    # first complete the msg
1433
    if args:
1434
      msg = msg % args
1435
    # then format the whole message
1436
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1437
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1438
    else:
1439
      if item:
1440
        item = " " + item
1441
      else:
1442
        item = ""
1443
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1444
    # and finally report it via the feedback_fn
1445
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1446
    # do not mark the operation as failed for WARN cases only
1447
    if ltype == self.ETYPE_ERROR:
1448
      self.bad = True
1449

    
1450
  def _ErrorIf(self, cond, *args, **kwargs):
1451
    """Log an error message if the passed condition is True.
1452

1453
    """
1454
    if (bool(cond)
1455
        or self.op.debug_simulate_errors): # pylint: disable=E1101
1456
      self._Error(*args, **kwargs)
1457

    
1458

    
1459
def _VerifyCertificate(filename):
1460
  """Verifies a certificate for L{LUClusterVerifyConfig}.
1461

1462
  @type filename: string
1463
  @param filename: Path to PEM file
1464

1465
  """
1466
  try:
1467
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1468
                                           utils.ReadFile(filename))
1469
  except Exception, err: # pylint: disable=W0703
1470
    return (LUClusterVerifyConfig.ETYPE_ERROR,
1471
            "Failed to load X509 certificate %s: %s" % (filename, err))
1472

    
1473
  (errcode, msg) = \
1474
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1475
                                constants.SSL_CERT_EXPIRATION_ERROR)
1476

    
1477
  if msg:
1478
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1479
  else:
1480
    fnamemsg = None
1481

    
1482
  if errcode is None:
1483
    return (None, fnamemsg)
1484
  elif errcode == utils.CERT_WARNING:
1485
    return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1486
  elif errcode == utils.CERT_ERROR:
1487
    return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1488

    
1489
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1490

    
1491

    
1492
def _GetAllHypervisorParameters(cluster, instances):
1493
  """Compute the set of all hypervisor parameters.
1494

1495
  @type cluster: L{objects.Cluster}
1496
  @param cluster: the cluster object
1497
  @param instances: list of L{objects.Instance}
1498
  @param instances: additional instances from which to obtain parameters
1499
  @rtype: list of (origin, hypervisor, parameters)
1500
  @return: a list with all parameters found, indicating the hypervisor they
1501
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1502

1503
  """
1504
  hvp_data = []
1505

    
1506
  for hv_name in cluster.enabled_hypervisors:
1507
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1508

    
1509
  for os_name, os_hvp in cluster.os_hvp.items():
1510
    for hv_name, hv_params in os_hvp.items():
1511
      if hv_params:
1512
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1513
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1514

    
1515
  # TODO: collapse identical parameter values in a single one
1516
  for instance in instances:
1517
    if instance.hvparams:
1518
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1519
                       cluster.FillHV(instance)))
1520

    
1521
  return hvp_data
1522

    
1523

    
1524
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1525
  """Verifies the cluster config.
1526

1527
  """
1528
  REQ_BGL = False
1529

    
1530
  def _VerifyHVP(self, hvp_data):
1531
    """Verifies locally the syntax of the hypervisor parameters.
1532

1533
    """
1534
    for item, hv_name, hv_params in hvp_data:
1535
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1536
             (item, hv_name))
1537
      try:
1538
        hv_class = hypervisor.GetHypervisorClass(hv_name)
1539
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1540
        hv_class.CheckParameterSyntax(hv_params)
1541
      except errors.GenericError, err:
1542
        self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1543

    
1544
  def ExpandNames(self):
1545
    self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1546
    self.share_locks = ShareAll()
1547

    
1548
  def CheckPrereq(self):
1549
    """Check prerequisites.
1550

1551
    """
1552
    # Retrieve all information
1553
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1554
    self.all_node_info = self.cfg.GetAllNodesInfo()
1555
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1556

    
1557
  def Exec(self, feedback_fn):
1558
    """Verify integrity of cluster, performing various test on nodes.
1559

1560
    """
1561
    self.bad = False
1562
    self._feedback_fn = feedback_fn
1563

    
1564
    feedback_fn("* Verifying cluster config")
1565

    
1566
    for msg in self.cfg.VerifyConfig():
1567
      self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1568

    
1569
    feedback_fn("* Verifying cluster certificate files")
1570

    
1571
    for cert_filename in pathutils.ALL_CERT_FILES:
1572
      (errcode, msg) = _VerifyCertificate(cert_filename)
1573
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1574

    
1575
    self._ErrorIf(not utils.CanRead(constants.LUXID_USER,
1576
                                    pathutils.NODED_CERT_FILE),
1577
                  constants.CV_ECLUSTERCERT,
1578
                  None,
1579
                  pathutils.NODED_CERT_FILE + " must be accessible by the " +
1580
                    constants.LUXID_USER + " user")
1581

    
1582
    feedback_fn("* Verifying hypervisor parameters")
1583

    
1584
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1585
                                                self.all_inst_info.values()))
1586

    
1587
    feedback_fn("* Verifying all nodes belong to an existing group")
1588

    
1589
    # We do this verification here because, should this bogus circumstance
1590
    # occur, it would never be caught by VerifyGroup, which only acts on
1591
    # nodes/instances reachable from existing node groups.
1592

    
1593
    dangling_nodes = set(node for node in self.all_node_info.values()
1594
                         if node.group not in self.all_group_info)
1595

    
1596
    dangling_instances = {}
1597
    no_node_instances = []
1598

    
1599
    for inst in self.all_inst_info.values():
1600
      if inst.primary_node in [node.uuid for node in dangling_nodes]:
1601
        dangling_instances.setdefault(inst.primary_node, []).append(inst)
1602
      elif inst.primary_node not in self.all_node_info:
1603
        no_node_instances.append(inst)
1604

    
1605
    pretty_dangling = [
1606
        "%s (%s)" %
1607
        (node.name,
1608
         utils.CommaJoin(inst.name for
1609
                         inst in dangling_instances.get(node.uuid, [])))
1610
        for node in dangling_nodes]
1611

    
1612
    self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1613
                  None,
1614
                  "the following nodes (and their instances) belong to a non"
1615
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1616

    
1617
    self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1618
                  None,
1619
                  "the following instances have a non-existing primary-node:"
1620
                  " %s", utils.CommaJoin(inst.name for
1621
                                         inst in no_node_instances))
1622

    
1623
    return not self.bad
1624

    
1625

    
1626
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1627
  """Verifies the status of a node group.
1628

1629
  """
1630
  HPATH = "cluster-verify"
1631
  HTYPE = constants.HTYPE_CLUSTER
1632
  REQ_BGL = False
1633

    
1634
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1635

    
1636
  class NodeImage(object):
1637
    """A class representing the logical and physical status of a node.
1638

1639
    @type uuid: string
1640
    @ivar uuid: the node UUID to which this object refers
1641
    @ivar volumes: a structure as returned from
1642
        L{ganeti.backend.GetVolumeList} (runtime)
1643
    @ivar instances: a list of running instances (runtime)
1644
    @ivar pinst: list of configured primary instances (config)
1645
    @ivar sinst: list of configured secondary instances (config)
1646
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1647
        instances for which this node is secondary (config)
1648
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1649
    @ivar dfree: free disk, as reported by the node (runtime)
1650
    @ivar offline: the offline status (config)
1651
    @type rpc_fail: boolean
1652
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1653
        not whether the individual keys were correct) (runtime)
1654
    @type lvm_fail: boolean
1655
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1656
    @type hyp_fail: boolean
1657
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1658
    @type ghost: boolean
1659
    @ivar ghost: whether this is a known node or not (config)
1660
    @type os_fail: boolean
1661
    @ivar os_fail: whether the RPC call didn't return valid OS data
1662
    @type oslist: list
1663
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1664
    @type vm_capable: boolean
1665
    @ivar vm_capable: whether the node can host instances
1666
    @type pv_min: float
1667
    @ivar pv_min: size in MiB of the smallest PVs
1668
    @type pv_max: float
1669
    @ivar pv_max: size in MiB of the biggest PVs
1670

1671
    """
1672
    def __init__(self, offline=False, uuid=None, vm_capable=True):
1673
      self.uuid = uuid
1674
      self.volumes = {}
1675
      self.instances = []
1676
      self.pinst = []
1677
      self.sinst = []
1678
      self.sbp = {}
1679
      self.mfree = 0
1680
      self.dfree = 0
1681
      self.offline = offline
1682
      self.vm_capable = vm_capable
1683
      self.rpc_fail = False
1684
      self.lvm_fail = False
1685
      self.hyp_fail = False
1686
      self.ghost = False
1687
      self.os_fail = False
1688
      self.oslist = {}
1689
      self.pv_min = None
1690
      self.pv_max = None
1691

    
1692
  def ExpandNames(self):
1693
    # This raises errors.OpPrereqError on its own:
1694
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1695

    
1696
    # Get instances in node group; this is unsafe and needs verification later
1697
    inst_uuids = \
1698
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1699

    
1700
    self.needed_locks = {
1701
      locking.LEVEL_INSTANCE: self.cfg.GetInstanceNames(inst_uuids),
1702
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1703
      locking.LEVEL_NODE: [],
1704

    
1705
      # This opcode is run by watcher every five minutes and acquires all nodes
1706
      # for a group. It doesn't run for a long time, so it's better to acquire
1707
      # the node allocation lock as well.
1708
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1709
      }
1710

    
1711
    self.share_locks = ShareAll()
1712

    
1713
  def DeclareLocks(self, level):
1714
    if level == locking.LEVEL_NODE:
1715
      # Get members of node group; this is unsafe and needs verification later
1716
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1717

    
1718
      # In Exec(), we warn about mirrored instances that have primary and
1719
      # secondary living in separate node groups. To fully verify that
1720
      # volumes for these instances are healthy, we will need to do an
1721
      # extra call to their secondaries. We ensure here those nodes will
1722
      # be locked.
1723
      for inst_name in self.owned_locks(locking.LEVEL_INSTANCE):
1724
        # Important: access only the instances whose lock is owned
1725
        instance = self.cfg.GetInstanceInfoByName(inst_name)
1726
        if instance.disk_template in constants.DTS_INT_MIRROR:
1727
          nodes.update(instance.secondary_nodes)
1728

    
1729
      self.needed_locks[locking.LEVEL_NODE] = nodes
1730

    
1731
  def CheckPrereq(self):
1732
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1733
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1734

    
1735
    group_node_uuids = set(self.group_info.members)
1736
    group_inst_uuids = \
1737
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1738

    
1739
    unlocked_node_uuids = \
1740
        group_node_uuids.difference(self.owned_locks(locking.LEVEL_NODE))
1741

    
1742
    unlocked_inst_uuids = \
1743
        group_inst_uuids.difference(
1744
          [self.cfg.GetInstanceInfoByName(name).uuid
1745
           for name in self.owned_locks(locking.LEVEL_INSTANCE)])
1746

    
1747
    if unlocked_node_uuids:
1748
      raise errors.OpPrereqError(
1749
        "Missing lock for nodes: %s" %
1750
        utils.CommaJoin(self.cfg.GetNodeNames(unlocked_node_uuids)),
1751
        errors.ECODE_STATE)
1752

    
1753
    if unlocked_inst_uuids:
1754
      raise errors.OpPrereqError(
1755
        "Missing lock for instances: %s" %
1756
        utils.CommaJoin(self.cfg.GetInstanceNames(unlocked_inst_uuids)),
1757
        errors.ECODE_STATE)
1758

    
1759
    self.all_node_info = self.cfg.GetAllNodesInfo()
1760
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1761

    
1762
    self.my_node_uuids = group_node_uuids
1763
    self.my_node_info = dict((node_uuid, self.all_node_info[node_uuid])
1764
                             for node_uuid in group_node_uuids)
1765

    
1766
    self.my_inst_uuids = group_inst_uuids
1767
    self.my_inst_info = dict((inst_uuid, self.all_inst_info[inst_uuid])
1768
                             for inst_uuid in group_inst_uuids)
1769

    
1770
    # We detect here the nodes that will need the extra RPC calls for verifying
1771
    # split LV volumes; they should be locked.
1772
    extra_lv_nodes = set()
1773

    
1774
    for inst in self.my_inst_info.values():
1775
      if inst.disk_template in constants.DTS_INT_MIRROR:
1776
        for nuuid in inst.all_nodes:
1777
          if self.all_node_info[nuuid].group != self.group_uuid:
1778
            extra_lv_nodes.add(nuuid)
1779

    
1780
    unlocked_lv_nodes = \
1781
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1782

    
1783
    if unlocked_lv_nodes:
1784
      raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1785
                                 utils.CommaJoin(unlocked_lv_nodes),
1786
                                 errors.ECODE_STATE)
1787
    self.extra_lv_nodes = list(extra_lv_nodes)
1788

    
1789
  def _VerifyNode(self, ninfo, nresult):
1790
    """Perform some basic validation on data returned from a node.
1791

1792
      - check the result data structure is well formed and has all the
1793
        mandatory fields
1794
      - check ganeti version
1795

1796
    @type ninfo: L{objects.Node}
1797
    @param ninfo: the node to check
1798
    @param nresult: the results from the node
1799
    @rtype: boolean
1800
    @return: whether overall this call was successful (and we can expect
1801
         reasonable values in the respose)
1802

1803
    """
1804
    # main result, nresult should be a non-empty dict
1805
    test = not nresult or not isinstance(nresult, dict)
1806
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1807
                  "unable to verify node: no data returned")
1808
    if test:
1809
      return False
1810

    
1811
    # compares ganeti version
1812
    local_version = constants.PROTOCOL_VERSION
1813
    remote_version = nresult.get("version", None)
1814
    test = not (remote_version and
1815
                isinstance(remote_version, (list, tuple)) and
1816
                len(remote_version) == 2)
1817
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1818
                  "connection to node returned invalid data")
1819
    if test:
1820
      return False
1821

    
1822
    test = local_version != remote_version[0]
1823
    self._ErrorIf(test, constants.CV_ENODEVERSION, ninfo.name,
1824
                  "incompatible protocol versions: master %s,"
1825
                  " node %s", local_version, remote_version[0])
1826
    if test:
1827
      return False
1828

    
1829
    # node seems compatible, we can actually try to look into its results
1830

    
1831
    # full package version
1832
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1833
                  constants.CV_ENODEVERSION, ninfo.name,
1834
                  "software version mismatch: master %s, node %s",
1835
                  constants.RELEASE_VERSION, remote_version[1],
1836
                  code=self.ETYPE_WARNING)
1837

    
1838
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1839
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1840
      for hv_name, hv_result in hyp_result.iteritems():
1841
        test = hv_result is not None
1842
        self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
1843
                      "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1844

    
1845
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1846
    if ninfo.vm_capable and isinstance(hvp_result, list):
1847
      for item, hv_name, hv_result in hvp_result:
1848
        self._ErrorIf(True, constants.CV_ENODEHV, ninfo.name,
1849
                      "hypervisor %s parameter verify failure (source %s): %s",
1850
                      hv_name, item, hv_result)
1851

    
1852
    test = nresult.get(constants.NV_NODESETUP,
1853
                       ["Missing NODESETUP results"])
1854
    self._ErrorIf(test, constants.CV_ENODESETUP, ninfo.name,
1855
                  "node setup error: %s", "; ".join(test))
1856

    
1857
    return True
1858

    
1859
  def _VerifyNodeTime(self, ninfo, nresult,
1860
                      nvinfo_starttime, nvinfo_endtime):
1861
    """Check the node time.
1862

1863
    @type ninfo: L{objects.Node}
1864
    @param ninfo: the node to check
1865
    @param nresult: the remote results for the node
1866
    @param nvinfo_starttime: the start time of the RPC call
1867
    @param nvinfo_endtime: the end time of the RPC call
1868

1869
    """
1870
    ntime = nresult.get(constants.NV_TIME, None)
1871
    try:
1872
      ntime_merged = utils.MergeTime(ntime)
1873
    except (ValueError, TypeError):
1874
      self._ErrorIf(True, constants.CV_ENODETIME, ninfo.name,
1875
                    "Node returned invalid time")
1876
      return
1877

    
1878
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1879
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1880
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1881
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1882
    else:
1883
      ntime_diff = None
1884

    
1885
    self._ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, ninfo.name,
1886
                  "Node time diverges by at least %s from master node time",
1887
                  ntime_diff)
1888

    
1889
  def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1890
    """Check the node LVM results and update info for cross-node checks.
1891

1892
    @type ninfo: L{objects.Node}
1893
    @param ninfo: the node to check
1894
    @param nresult: the remote results for the node
1895
    @param vg_name: the configured VG name
1896
    @type nimg: L{NodeImage}
1897
    @param nimg: node image
1898

1899
    """
1900
    if vg_name is None:
1901
      return
1902

    
1903
    # checks vg existence and size > 20G
1904
    vglist = nresult.get(constants.NV_VGLIST, None)
1905
    test = not vglist
1906
    self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
1907
                  "unable to check volume groups")
1908
    if not test:
1909
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1910
                                            constants.MIN_VG_SIZE)
1911
      self._ErrorIf(vgstatus, constants.CV_ENODELVM, ninfo.name, vgstatus)
1912

    
1913
    # Check PVs
1914
    (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1915
    for em in errmsgs:
1916
      self._Error(constants.CV_ENODELVM, ninfo.name, em)
1917
    if pvminmax is not None:
1918
      (nimg.pv_min, nimg.pv_max) = pvminmax
1919

    
1920
  def _VerifyGroupDRBDVersion(self, node_verify_infos):
1921
    """Check cross-node DRBD version consistency.
1922

1923
    @type node_verify_infos: dict
1924
    @param node_verify_infos: infos about nodes as returned from the
1925
      node_verify call.
1926

1927
    """
1928
    node_versions = {}
1929
    for node_uuid, ndata in node_verify_infos.items():
1930
      nresult = ndata.payload
1931
      version = nresult.get(constants.NV_DRBDVERSION, "Missing DRBD version")
1932
      node_versions[node_uuid] = version
1933

    
1934
    if len(set(node_versions.values())) > 1:
1935
      for node_uuid, version in sorted(node_versions.items()):
1936
        msg = "DRBD version mismatch: %s" % version
1937
        self._Error(constants.CV_ENODEDRBDHELPER, node_uuid, msg,
1938
                    code=self.ETYPE_WARNING)
1939

    
1940
  def _VerifyGroupLVM(self, node_image, vg_name):
1941
    """Check cross-node consistency in LVM.
1942

1943
    @type node_image: dict
1944
    @param node_image: info about nodes, mapping from node to names to
1945
      L{NodeImage} objects
1946
    @param vg_name: the configured VG name
1947

1948
    """
1949
    if vg_name is None:
1950
      return
1951

    
1952
    # Only exclusive storage needs this kind of checks
1953
    if not self._exclusive_storage:
1954
      return
1955

    
1956
    # exclusive_storage wants all PVs to have the same size (approximately),
1957
    # if the smallest and the biggest ones are okay, everything is fine.
1958
    # pv_min is None iff pv_max is None
1959
    vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1960
    if not vals:
1961
      return
1962
    (pvmin, minnode_uuid) = min((ni.pv_min, ni.uuid) for ni in vals)
1963
    (pvmax, maxnode_uuid) = max((ni.pv_max, ni.uuid) for ni in vals)
1964
    bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1965
    self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1966
                  "PV sizes differ too much in the group; smallest (%s MB) is"
1967
                  " on %s, biggest (%s MB) is on %s",
1968
                  pvmin, self.cfg.GetNodeName(minnode_uuid),
1969
                  pvmax, self.cfg.GetNodeName(maxnode_uuid))
1970

    
1971
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1972
    """Check the node bridges.
1973

1974
    @type ninfo: L{objects.Node}
1975
    @param ninfo: the node to check
1976
    @param nresult: the remote results for the node
1977
    @param bridges: the expected list of bridges
1978

1979
    """
1980
    if not bridges:
1981
      return
1982

    
1983
    missing = nresult.get(constants.NV_BRIDGES, None)
1984
    test = not isinstance(missing, list)
1985
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1986
                  "did not return valid bridge information")
1987
    if not test:
1988
      self._ErrorIf(bool(missing), constants.CV_ENODENET, ninfo.name,
1989
                    "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1990

    
1991
  def _VerifyNodeUserScripts(self, ninfo, nresult):
1992
    """Check the results of user scripts presence and executability on the node
1993

1994
    @type ninfo: L{objects.Node}
1995
    @param ninfo: the node to check
1996
    @param nresult: the remote results for the node
1997

1998
    """
1999
    test = not constants.NV_USERSCRIPTS in nresult
2000
    self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
2001
                  "did not return user scripts information")
2002

    
2003
    broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
2004
    if not test:
2005
      self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
2006
                    "user scripts not present or not executable: %s" %
2007
                    utils.CommaJoin(sorted(broken_scripts)))
2008

    
2009
  def _VerifyNodeNetwork(self, ninfo, nresult):
2010
    """Check the node network connectivity results.
2011

2012
    @type ninfo: L{objects.Node}
2013
    @param ninfo: the node to check
2014
    @param nresult: the remote results for the node
2015

2016
    """
2017
    test = constants.NV_NODELIST not in nresult
2018
    self._ErrorIf(test, constants.CV_ENODESSH, ninfo.name,
2019
                  "node hasn't returned node ssh connectivity data")
2020
    if not test:
2021
      if nresult[constants.NV_NODELIST]:
2022
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
2023
          self._ErrorIf(True, constants.CV_ENODESSH, ninfo.name,
2024
                        "ssh communication with node '%s': %s", a_node, a_msg)
2025

    
2026
    test = constants.NV_NODENETTEST not in nresult
2027
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
2028
                  "node hasn't returned node tcp connectivity data")
2029
    if not test:
2030
      if nresult[constants.NV_NODENETTEST]:
2031
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
2032
        for anode in nlist:
2033
          self._ErrorIf(True, constants.CV_ENODENET, ninfo.name,
2034
                        "tcp communication with node '%s': %s",
2035
                        anode, nresult[constants.NV_NODENETTEST][anode])
2036

    
2037
    test = constants.NV_MASTERIP not in nresult
2038
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
2039
                  "node hasn't returned node master IP reachability data")
2040
    if not test:
2041
      if not nresult[constants.NV_MASTERIP]:
2042
        if ninfo.uuid == self.master_node:
2043
          msg = "the master node cannot reach the master IP (not configured?)"
2044
        else:
2045
          msg = "cannot reach the master IP"
2046
        self._ErrorIf(True, constants.CV_ENODENET, ninfo.name, msg)
2047

    
2048
  def _VerifyInstance(self, instance, node_image, diskstatus):
2049
    """Verify an instance.
2050

2051
    This function checks to see if the required block devices are
2052
    available on the instance's node, and that the nodes are in the correct
2053
    state.
2054

2055
    """
2056
    pnode_uuid = instance.primary_node
2057
    pnode_img = node_image[pnode_uuid]
2058
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
2059

    
2060
    node_vol_should = {}
2061
    instance.MapLVsByNode(node_vol_should)
2062

    
2063
    cluster = self.cfg.GetClusterInfo()
2064
    ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2065
                                                            self.group_info)
2066
    err = ComputeIPolicyInstanceViolation(ipolicy, instance, self.cfg)
2067
    self._ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance.name,
2068
                  utils.CommaJoin(err), code=self.ETYPE_WARNING)
2069

    
2070
    for node_uuid in node_vol_should:
2071
      n_img = node_image[node_uuid]
2072
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
2073
        # ignore missing volumes on offline or broken nodes
2074
        continue
2075
      for volume in node_vol_should[node_uuid]:
2076
        test = volume not in n_img.volumes
2077
        self._ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance.name,
2078
                      "volume %s missing on node %s", volume,
2079
                      self.cfg.GetNodeName(node_uuid))
2080

    
2081
    if instance.admin_state == constants.ADMINST_UP:
2082
      test = instance.uuid not in pnode_img.instances and not pnode_img.offline
2083
      self._ErrorIf(test, constants.CV_EINSTANCEDOWN, instance.name,
2084
                    "instance not running on its primary node %s",
2085
                     self.cfg.GetNodeName(pnode_uuid))
2086
      self._ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE,
2087
                    instance.name, "instance is marked as running and lives on"
2088
                    " offline node %s", self.cfg.GetNodeName(pnode_uuid))
2089

    
2090
    diskdata = [(nname, success, status, idx)
2091
                for (nname, disks) in diskstatus.items()
2092
                for idx, (success, status) in enumerate(disks)]
2093

    
2094
    for nname, success, bdev_status, idx in diskdata:
2095
      # the 'ghost node' construction in Exec() ensures that we have a
2096
      # node here
2097
      snode = node_image[nname]
2098
      bad_snode = snode.ghost or snode.offline
2099
      self._ErrorIf(instance.disks_active and
2100
                    not success and not bad_snode,
2101
                    constants.CV_EINSTANCEFAULTYDISK, instance.name,
2102
                    "couldn't retrieve status for disk/%s on %s: %s",
2103
                    idx, self.cfg.GetNodeName(nname), bdev_status)
2104

    
2105
      if instance.disks_active and success and \
2106
         (bdev_status.is_degraded or
2107
          bdev_status.ldisk_status != constants.LDS_OKAY):
2108
        msg = "disk/%s on %s" % (idx, self.cfg.GetNodeName(nname))
2109
        if bdev_status.is_degraded:
2110
          msg += " is degraded"
2111
        if bdev_status.ldisk_status != constants.LDS_OKAY:
2112
          msg += "; state is '%s'" % \
2113
                 constants.LDS_NAMES[bdev_status.ldisk_status]
2114

    
2115
        self._Error(constants.CV_EINSTANCEFAULTYDISK, instance.name, msg)
2116

    
2117
    self._ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2118
                  constants.CV_ENODERPC, self.cfg.GetNodeName(pnode_uuid),
2119
                  "instance %s, connection to primary node failed",
2120
                  instance.name)
2121

    
2122
    self._ErrorIf(len(instance.secondary_nodes) > 1,
2123
                  constants.CV_EINSTANCELAYOUT, instance.name,
2124
                  "instance has multiple secondary nodes: %s",
2125
                  utils.CommaJoin(instance.secondary_nodes),
2126
                  code=self.ETYPE_WARNING)
2127

    
2128
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, instance.all_nodes)
2129
    if any(es_flags.values()):
2130
      if instance.disk_template not in constants.DTS_EXCL_STORAGE:
2131
        # Disk template not compatible with exclusive_storage: no instance
2132
        # node should have the flag set
2133
        es_nodes = [n
2134
                    for (n, es) in es_flags.items()
2135
                    if es]
2136
        self._Error(constants.CV_EINSTANCEUNSUITABLENODE, instance.name,
2137
                    "instance has template %s, which is not supported on nodes"
2138
                    " that have exclusive storage set: %s",
2139
                    instance.disk_template,
2140
                    utils.CommaJoin(self.cfg.GetNodeNames(es_nodes)))
2141
      for (idx, disk) in enumerate(instance.disks):
2142
        self._ErrorIf(disk.spindles is None,
2143
                      constants.CV_EINSTANCEMISSINGCFGPARAMETER, instance.name,
2144
                      "number of spindles not configured for disk %s while"
2145
                      " exclusive storage is enabled, try running"
2146
                      " gnt-cluster repair-disk-sizes", idx)
2147

    
2148
    if instance.disk_template in constants.DTS_INT_MIRROR:
2149
      instance_nodes = utils.NiceSort(instance.all_nodes)
2150
      instance_groups = {}
2151

    
2152
      for node_uuid in instance_nodes:
2153
        instance_groups.setdefault(self.all_node_info[node_uuid].group,
2154
                                   []).append(node_uuid)
2155

    
2156
      pretty_list = [
2157
        "%s (group %s)" % (utils.CommaJoin(self.cfg.GetNodeNames(nodes)),
2158
                           groupinfo[group].name)
2159
        # Sort so that we always list the primary node first.
2160
        for group, nodes in sorted(instance_groups.items(),
2161
                                   key=lambda (_, nodes): pnode_uuid in nodes,
2162
                                   reverse=True)]
2163

    
2164
      self._ErrorIf(len(instance_groups) > 1,
2165
                    constants.CV_EINSTANCESPLITGROUPS,
2166
                    instance.name, "instance has primary and secondary nodes in"
2167
                    " different groups: %s", utils.CommaJoin(pretty_list),
2168
                    code=self.ETYPE_WARNING)
2169

    
2170
    inst_nodes_offline = []
2171
    for snode in instance.secondary_nodes:
2172
      s_img = node_image[snode]
2173
      self._ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
2174
                    self.cfg.GetNodeName(snode),
2175
                    "instance %s, connection to secondary node failed",
2176
                    instance.name)
2177

    
2178
      if s_img.offline:
2179
        inst_nodes_offline.append(snode)
2180

    
2181
    # warn that the instance lives on offline nodes
2182
    self._ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE,
2183
                  instance.name, "instance has offline secondary node(s) %s",
2184
                  utils.CommaJoin(self.cfg.GetNodeNames(inst_nodes_offline)))
2185
    # ... or ghost/non-vm_capable nodes
2186
    for node_uuid in instance.all_nodes:
2187
      self._ErrorIf(node_image[node_uuid].ghost, constants.CV_EINSTANCEBADNODE,
2188
                    instance.name, "instance lives on ghost node %s",
2189
                    self.cfg.GetNodeName(node_uuid))
2190
      self._ErrorIf(not node_image[node_uuid].vm_capable,
2191
                    constants.CV_EINSTANCEBADNODE, instance.name,
2192
                    "instance lives on non-vm_capable node %s",
2193
                    self.cfg.GetNodeName(node_uuid))
2194

    
2195
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2196
    """Verify if there are any unknown volumes in the cluster.
2197

2198
    The .os, .swap and backup volumes are ignored. All other volumes are
2199
    reported as unknown.
2200

2201
    @type reserved: L{ganeti.utils.FieldSet}
2202
    @param reserved: a FieldSet of reserved volume names
2203

2204
    """
2205
    for node_uuid, n_img in node_image.items():
2206
      if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
2207
          self.all_node_info[node_uuid].group != self.group_uuid):
2208
        # skip non-healthy nodes
2209
        continue
2210
      for volume in n_img.volumes:
2211
        test = ((node_uuid not in node_vol_should or
2212
                volume not in node_vol_should[node_uuid]) and
2213
                not reserved.Matches(volume))
2214
        self._ErrorIf(test, constants.CV_ENODEORPHANLV,
2215
                      self.cfg.GetNodeName(node_uuid),
2216
                      "volume %s is unknown", volume)
2217

    
2218
  def _VerifyNPlusOneMemory(self, node_image, all_insts):
2219
    """Verify N+1 Memory Resilience.
2220

2221
    Check that if one single node dies we can still start all the
2222
    instances it was primary for.
2223

2224
    """
2225
    cluster_info = self.cfg.GetClusterInfo()
2226
    for node_uuid, n_img in node_image.items():
2227
      # This code checks that every node which is now listed as
2228
      # secondary has enough memory to host all instances it is
2229
      # supposed to should a single other node in the cluster fail.
2230
      # FIXME: not ready for failover to an arbitrary node
2231
      # FIXME: does not support file-backed instances
2232
      # WARNING: we currently take into account down instances as well
2233
      # as up ones, considering that even if they're down someone
2234
      # might want to start them even in the event of a node failure.
2235
      if n_img.offline or \
2236
         self.all_node_info[node_uuid].group != self.group_uuid:
2237
        # we're skipping nodes marked offline and nodes in other groups from
2238
        # the N+1 warning, since most likely we don't have good memory
2239
        # information from them; we already list instances living on such
2240
        # nodes, and that's enough warning
2241
        continue
2242
      #TODO(dynmem): also consider ballooning out other instances
2243
      for prinode, inst_uuids in n_img.sbp.items():
2244
        needed_mem = 0
2245
        for inst_uuid in inst_uuids:
2246
          bep = cluster_info.FillBE(all_insts[inst_uuid])
2247
          if bep[constants.BE_AUTO_BALANCE]:
2248
            needed_mem += bep[constants.BE_MINMEM]
2249
        test = n_img.mfree < needed_mem
2250
        self._ErrorIf(test, constants.CV_ENODEN1,
2251
                      self.cfg.GetNodeName(node_uuid),
2252
                      "not enough memory to accomodate instance failovers"
2253
                      " should node %s fail (%dMiB needed, %dMiB available)",
2254
                      self.cfg.GetNodeName(prinode), needed_mem, n_img.mfree)
2255

    
2256
  def _VerifyFiles(self, nodes, master_node_uuid, all_nvinfo,
2257
                   (files_all, files_opt, files_mc, files_vm)):
2258
    """Verifies file checksums collected from all nodes.
2259

2260
    @param nodes: List of L{objects.Node} objects
2261
    @param master_node_uuid: UUID of master node
2262
    @param all_nvinfo: RPC results
2263

2264
    """
2265
    # Define functions determining which nodes to consider for a file
2266
    files2nodefn = [
2267
      (files_all, None),
2268
      (files_mc, lambda node: (node.master_candidate or
2269
                               node.uuid == master_node_uuid)),
2270
      (files_vm, lambda node: node.vm_capable),
2271
      ]
2272

    
2273
    # Build mapping from filename to list of nodes which should have the file
2274
    nodefiles = {}
2275
    for (files, fn) in files2nodefn:
2276
      if fn is None:
2277
        filenodes = nodes
2278
      else:
2279
        filenodes = filter(fn, nodes)
2280
      nodefiles.update((filename,
2281
                        frozenset(map(operator.attrgetter("uuid"), filenodes)))
2282
                       for filename in files)
2283

    
2284
    assert set(nodefiles) == (files_all | files_mc | files_vm)
2285

    
2286
    fileinfo = dict((filename, {}) for filename in nodefiles)
2287
    ignore_nodes = set()
2288

    
2289
    for node in nodes:
2290
      if node.offline:
2291
        ignore_nodes.add(node.uuid)
2292
        continue
2293

    
2294
      nresult = all_nvinfo[node.uuid]
2295

    
2296
      if nresult.fail_msg or not nresult.payload:
2297
        node_files = None
2298
      else:
2299
        fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
2300
        node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2301
                          for (key, value) in fingerprints.items())
2302
        del fingerprints
2303

    
2304
      test = not (node_files and isinstance(node_files, dict))
2305
      self._ErrorIf(test, constants.CV_ENODEFILECHECK, node.name,
2306
                    "Node did not return file checksum data")
2307
      if test:
2308
        ignore_nodes.add(node.uuid)
2309
        continue
2310

    
2311
      # Build per-checksum mapping from filename to nodes having it
2312
      for (filename, checksum) in node_files.items():
2313
        assert filename in nodefiles
2314
        fileinfo[filename].setdefault(checksum, set()).add(node.uuid)
2315

    
2316
    for (filename, checksums) in fileinfo.items():
2317
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2318

    
2319
      # Nodes having the file
2320
      with_file = frozenset(node_uuid
2321
                            for node_uuids in fileinfo[filename].values()
2322
                            for node_uuid in node_uuids) - ignore_nodes
2323

    
2324
      expected_nodes = nodefiles[filename] - ignore_nodes
2325

    
2326
      # Nodes missing file
2327
      missing_file = expected_nodes - with_file
2328

    
2329
      if filename in files_opt:
2330
        # All or no nodes
2331
        self._ErrorIf(missing_file and missing_file != expected_nodes,
2332
                      constants.CV_ECLUSTERFILECHECK, None,
2333
                      "File %s is optional, but it must exist on all or no"
2334
                      " nodes (not found on %s)",
2335
                      filename,
2336
                      utils.CommaJoin(
2337
                        utils.NiceSort(
2338
                          map(self.cfg.GetNodeName, missing_file))))
2339
      else:
2340
        self._ErrorIf(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2341
                      "File %s is missing from node(s) %s", filename,
2342
                      utils.CommaJoin(
2343
                        utils.NiceSort(
2344
                          map(self.cfg.GetNodeName, missing_file))))
2345

    
2346
        # Warn if a node has a file it shouldn't
2347
        unexpected = with_file - expected_nodes
2348
        self._ErrorIf(unexpected,
2349
                      constants.CV_ECLUSTERFILECHECK, None,
2350
                      "File %s should not exist on node(s) %s",
2351
                      filename, utils.CommaJoin(
2352
                        utils.NiceSort(map(self.cfg.GetNodeName, unexpected))))
2353

    
2354
      # See if there are multiple versions of the file
2355
      test = len(checksums) > 1
2356
      if test:
2357
        variants = ["variant %s on %s" %
2358
                    (idx + 1,
2359
                     utils.CommaJoin(utils.NiceSort(
2360
                       map(self.cfg.GetNodeName, node_uuids))))
2361
                    for (idx, (checksum, node_uuids)) in
2362
                      enumerate(sorted(checksums.items()))]
2363
      else:
2364
        variants = []
2365

    
2366
      self._ErrorIf(test, constants.CV_ECLUSTERFILECHECK, None,
2367
                    "File %s found with %s different checksums (%s)",
2368
                    filename, len(checksums), "; ".join(variants))
2369

    
2370
  def _VerifyNodeDrbdHelper(self, ninfo, nresult, drbd_helper):
2371
    """Verify the drbd helper.
2372

2373
    """
2374
    if drbd_helper:
2375
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2376
      test = (helper_result is None)
2377
      self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2378
                    "no drbd usermode helper returned")
2379
      if helper_result:
2380
        status, payload = helper_result
2381
        test = not status
2382
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2383
                      "drbd usermode helper check unsuccessful: %s", payload)
2384
        test = status and (payload != drbd_helper)
2385
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2386
                      "wrong drbd usermode helper: %s", payload)
2387

    
2388
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2389
                      drbd_map):
2390
    """Verifies and the node DRBD status.
2391

2392
    @type ninfo: L{objects.Node}
2393
    @param ninfo: the node to check
2394
    @param nresult: the remote results for the node
2395
    @param instanceinfo: the dict of instances
2396
    @param drbd_helper: the configured DRBD usermode helper
2397
    @param drbd_map: the DRBD map as returned by
2398
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2399

2400
    """
2401
    self._VerifyNodeDrbdHelper(ninfo, nresult, drbd_helper)
2402

    
2403
    # compute the DRBD minors
2404
    node_drbd = {}
2405
    for minor, inst_uuid in drbd_map[ninfo.uuid].items():
2406
      test = inst_uuid not in instanceinfo
2407
      self._ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2408
                    "ghost instance '%s' in temporary DRBD map", inst_uuid)
2409
        # ghost instance should not be running, but otherwise we
2410
        # don't give double warnings (both ghost instance and
2411
        # unallocated minor in use)
2412
      if test:
2413
        node_drbd[minor] = (inst_uuid, False)
2414
      else:
2415
        instance = instanceinfo[inst_uuid]
2416
        node_drbd[minor] = (inst_uuid, instance.disks_active)
2417

    
2418
    # and now check them
2419
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2420
    test = not isinstance(used_minors, (tuple, list))
2421
    self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2422
                  "cannot parse drbd status file: %s", str(used_minors))
2423
    if test:
2424
      # we cannot check drbd status
2425
      return
2426

    
2427
    for minor, (inst_uuid, must_exist) in node_drbd.items():
2428
      test = minor not in used_minors and must_exist
2429
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2430
                    "drbd minor %d of instance %s is not active", minor,
2431
                    self.cfg.GetInstanceName(inst_uuid))
2432
    for minor in used_minors:
2433
      test = minor not in node_drbd
2434
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2435
                    "unallocated drbd minor %d is in use", minor)
2436

    
2437
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2438
    """Builds the node OS structures.
2439

2440
    @type ninfo: L{objects.Node}
2441
    @param ninfo: the node to check
2442
    @param nresult: the remote results for the node
2443
    @param nimg: the node image object
2444

2445
    """
2446
    remote_os = nresult.get(constants.NV_OSLIST, None)
2447
    test = (not isinstance(remote_os, list) or
2448
            not compat.all(isinstance(v, list) and len(v) == 7
2449
                           for v in remote_os))
2450

    
2451
    self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2452
                  "node hasn't returned valid OS data")
2453

    
2454
    nimg.os_fail = test
2455

    
2456
    if test:
2457
      return
2458

    
2459
    os_dict = {}
2460

    
2461
    for (name, os_path, status, diagnose,
2462
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2463

    
2464
      if name not in os_dict:
2465
        os_dict[name] = []
2466

    
2467
      # parameters is a list of lists instead of list of tuples due to
2468
      # JSON lacking a real tuple type, fix it:
2469
      parameters = [tuple(v) for v in parameters]
2470
      os_dict[name].append((os_path, status, diagnose,
2471
                            set(variants), set(parameters), set(api_ver)))
2472

    
2473
    nimg.oslist = os_dict
2474

    
2475
  def _VerifyNodeOS(self, ninfo, nimg, base):
2476
    """Verifies the node OS list.
2477

2478
    @type ninfo: L{objects.Node}
2479
    @param ninfo: the node to check
2480
    @param nimg: the node image object
2481
    @param base: the 'template' node we match against (e.g. from the master)
2482

2483
    """
2484
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2485

    
2486
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2487
    for os_name, os_data in nimg.oslist.items():
2488
      assert os_data, "Empty OS status for OS %s?!" % os_name
2489
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2490
      self._ErrorIf(not f_status, constants.CV_ENODEOS, ninfo.name,
2491
                    "Invalid OS %s (located at %s): %s",
2492
                    os_name, f_path, f_diag)
2493
      self._ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, ninfo.name,
2494
                    "OS '%s' has multiple entries"
2495
                    " (first one shadows the rest): %s",
2496
                    os_name, utils.CommaJoin([v[0] for v in os_data]))
2497
      # comparisons with the 'base' image
2498
      test = os_name not in base.oslist
2499
      self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2500
                    "Extra OS %s not present on reference node (%s)",
2501
                    os_name, self.cfg.GetNodeName(base.uuid))
2502
      if test:
2503
        continue
2504
      assert base.oslist[os_name], "Base node has empty OS status?"
2505
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2506
      if not b_status:
2507
        # base OS is invalid, skipping
2508
        continue
2509
      for kind, a, b in [("API version", f_api, b_api),
2510
                         ("variants list", f_var, b_var),
2511
                         ("parameters", beautify_params(f_param),
2512
                          beautify_params(b_param))]:
2513
        self._ErrorIf(a != b, constants.CV_ENODEOS, ninfo.name,
2514
                      "OS %s for %s differs from reference node %s:"
2515
                      " [%s] vs. [%s]", kind, os_name,
2516
                      self.cfg.GetNodeName(base.uuid),
2517
                      utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2518

    
2519
    # check any missing OSes
2520
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2521
    self._ErrorIf(missing, constants.CV_ENODEOS, ninfo.name,
2522
                  "OSes present on reference node %s"
2523
                  " but missing on this node: %s",
2524
                  self.cfg.GetNodeName(base.uuid), utils.CommaJoin(missing))
2525

    
2526
  def _VerifyAcceptedFileStoragePaths(self, ninfo, nresult, is_master):
2527
    """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2528

2529
    @type ninfo: L{objects.Node}
2530
    @param ninfo: the node to check
2531
    @param nresult: the remote results for the node
2532
    @type is_master: bool
2533
    @param is_master: Whether node is the master node
2534

2535
    """
2536
    cluster = self.cfg.GetClusterInfo()
2537
    if (is_master and
2538
        (cluster.IsFileStorageEnabled() or
2539
         cluster.IsSharedFileStorageEnabled())):
2540
      try:
2541
        fspaths = nresult[constants.NV_ACCEPTED_STORAGE_PATHS]
2542
      except KeyError:
2543
        # This should never happen
2544
        self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2545
                      "Node did not return forbidden file storage paths")
2546
      else:
2547
        self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2548
                      "Found forbidden file storage paths: %s",
2549
                      utils.CommaJoin(fspaths))
2550
    else:
2551
      self._ErrorIf(constants.NV_ACCEPTED_STORAGE_PATHS in nresult,
2552
                    constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2553
                    "Node should not have returned forbidden file storage"
2554
                    " paths")
2555

    
2556
  def _VerifyStoragePaths(self, ninfo, nresult, file_disk_template,
2557
                          verify_key, error_key):
2558
    """Verifies (file) storage paths.
2559

2560
    @type ninfo: L{objects.Node}
2561
    @param ninfo: the node to check
2562
    @param nresult: the remote results for the node
2563
    @type file_disk_template: string
2564
    @param file_disk_template: file-based disk template, whose directory
2565
        is supposed to be verified
2566
    @type verify_key: string
2567
    @param verify_key: key for the verification map of this file
2568
        verification step
2569
    @param error_key: error key to be added to the verification results
2570
        in case something goes wrong in this verification step
2571

2572
    """
2573
    assert (file_disk_template in
2574
            utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
2575
    cluster = self.cfg.GetClusterInfo()
2576
    if cluster.IsDiskTemplateEnabled(file_disk_template):
2577
      self._ErrorIf(
2578
          verify_key in nresult,
2579
          error_key, ninfo.name,
2580
          "The configured %s storage path is unusable: %s" %
2581
          (file_disk_template, nresult.get(verify_key)))
2582

    
2583
  def _VerifyFileStoragePaths(self, ninfo, nresult):
2584
    """Verifies (file) storage paths.
2585

2586
    @see: C{_VerifyStoragePaths}
2587

2588
    """
2589
    self._VerifyStoragePaths(
2590
        ninfo, nresult, constants.DT_FILE,
2591
        constants.NV_FILE_STORAGE_PATH,
2592
        constants.CV_ENODEFILESTORAGEPATHUNUSABLE)
2593

    
2594
  def _VerifySharedFileStoragePaths(self, ninfo, nresult):
2595
    """Verifies (file) storage paths.
2596

2597
    @see: C{_VerifyStoragePaths}
2598

2599
    """
2600
    self._VerifyStoragePaths(
2601
        ninfo, nresult, constants.DT_SHARED_FILE,
2602
        constants.NV_SHARED_FILE_STORAGE_PATH,
2603
        constants.CV_ENODESHAREDFILESTORAGEPATHUNUSABLE)
2604

    
2605
  def _VerifyOob(self, ninfo, nresult):
2606
    """Verifies out of band functionality of a node.
2607

2608
    @type ninfo: L{objects.Node}
2609
    @param ninfo: the node to check
2610
    @param nresult: the remote results for the node
2611

2612
    """
2613
    # We just have to verify the paths on master and/or master candidates
2614
    # as the oob helper is invoked on the master
2615
    if ((ninfo.master_candidate or ninfo.master_capable) and
2616
        constants.NV_OOB_PATHS in nresult):
2617
      for path_result in nresult[constants.NV_OOB_PATHS]:
2618
        self._ErrorIf(path_result, constants.CV_ENODEOOBPATH,
2619
                      ninfo.name, path_result)
2620

    
2621
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2622
    """Verifies and updates the node volume data.
2623

2624
    This function will update a L{NodeImage}'s internal structures
2625
    with data from the remote call.
2626

2627
    @type ninfo: L{objects.Node}
2628
    @param ninfo: the node to check
2629
    @param nresult: the remote results for the node
2630
    @param nimg: the node image object
2631
    @param vg_name: the configured VG name
2632

2633
    """
2634
    nimg.lvm_fail = True
2635
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2636
    if vg_name is None:
2637
      pass
2638
    elif isinstance(lvdata, basestring):
2639
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2640
                    "LVM problem on node: %s", utils.SafeEncode(lvdata))
2641
    elif not isinstance(lvdata, dict):
2642
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2643
                    "rpc call to node failed (lvlist)")
2644
    else:
2645
      nimg.volumes = lvdata
2646
      nimg.lvm_fail = False
2647

    
2648
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2649
    """Verifies and updates the node instance list.
2650

2651
    If the listing was successful, then updates this node's instance
2652
    list. Otherwise, it marks the RPC call as failed for the instance
2653
    list key.
2654

2655
    @type ninfo: L{objects.Node}
2656
    @param ninfo: the node to check
2657
    @param nresult: the remote results for the node
2658
    @param nimg: the node image object
2659

2660
    """
2661
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2662
    test = not isinstance(idata, list)
2663
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2664
                  "rpc call to node failed (instancelist): %s",
2665
                  utils.SafeEncode(str(idata)))
2666
    if test:
2667
      nimg.hyp_fail = True
2668
    else:
2669
      nimg.instances = [inst.uuid for (_, inst) in
2670
                        self.cfg.GetMultiInstanceInfoByName(idata)]
2671

    
2672
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2673
    """Verifies and computes a node information map
2674

2675
    @type ninfo: L{objects.Node}
2676
    @param ninfo: the node to check
2677
    @param nresult: the remote results for the node
2678
    @param nimg: the node image object
2679
    @param vg_name: the configured VG name
2680

2681
    """
2682
    # try to read free memory (from the hypervisor)
2683
    hv_info = nresult.get(constants.NV_HVINFO, None)
2684
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2685
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2686
                  "rpc call to node failed (hvinfo)")
2687
    if not test:
2688
      try:
2689
        nimg.mfree = int(hv_info["memory_free"])
2690
      except (ValueError, TypeError):
2691
        self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2692
                      "node returned invalid nodeinfo, check hypervisor")
2693

    
2694
    # FIXME: devise a free space model for file based instances as well
2695
    if vg_name is not None:
2696
      test = (constants.NV_VGLIST not in nresult or
2697
              vg_name not in nresult[constants.NV_VGLIST])
2698
      self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
2699
                    "node didn't return data for the volume group '%s'"
2700
                    " - it is either missing or broken", vg_name)
2701
      if not test:
2702
        try:
2703
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2704
        except (ValueError, TypeError):
2705
          self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2706
                        "node returned invalid LVM info, check LVM status")
2707

    
2708
  def _CollectDiskInfo(self, node_uuids, node_image, instanceinfo):
2709
    """Gets per-disk status information for all instances.
2710

2711
    @type node_uuids: list of strings
2712
    @param node_uuids: Node UUIDs
2713
    @type node_image: dict of (UUID, L{objects.Node})
2714
    @param node_image: Node objects
2715
    @type instanceinfo: dict of (UUID, L{objects.Instance})
2716
    @param instanceinfo: Instance objects
2717
    @rtype: {instance: {node: [(succes, payload)]}}
2718
    @return: a dictionary of per-instance dictionaries with nodes as
2719
        keys and disk information as values; the disk information is a
2720
        list of tuples (success, payload)
2721

2722
    """
2723
    node_disks = {}
2724
    node_disks_dev_inst_only = {}
2725
    diskless_instances = set()
2726
    diskless = constants.DT_DISKLESS
2727

    
2728
    for nuuid in node_uuids:
2729
      node_inst_uuids = list(itertools.chain(node_image[nuuid].pinst,
2730
                                             node_image[nuuid].sinst))
2731
      diskless_instances.update(uuid for uuid in node_inst_uuids
2732
                                if instanceinfo[uuid].disk_template == diskless)
2733
      disks = [(inst_uuid, disk)
2734
               for inst_uuid in node_inst_uuids
2735
               for disk in instanceinfo[inst_uuid].disks]
2736

    
2737
      if not disks:
2738
        # No need to collect data
2739
        continue
2740

    
2741
      node_disks[nuuid] = disks
2742

    
2743
      # _AnnotateDiskParams makes already copies of the disks
2744
      dev_inst_only = []
2745
      for (inst_uuid, dev) in disks:
2746
        (anno_disk,) = AnnotateDiskParams(instanceinfo[inst_uuid], [dev],
2747
                                          self.cfg)
2748
        dev_inst_only.append((anno_disk, instanceinfo[inst_uuid]))
2749

    
2750
      node_disks_dev_inst_only[nuuid] = dev_inst_only
2751

    
2752
    assert len(node_disks) == len(node_disks_dev_inst_only)
2753

    
2754
    # Collect data from all nodes with disks
2755
    result = self.rpc.call_blockdev_getmirrorstatus_multi(
2756
               node_disks.keys(), node_disks_dev_inst_only)
2757

    
2758
    assert len(result) == len(node_disks)
2759

    
2760
    instdisk = {}
2761

    
2762
    for (nuuid, nres) in result.items():
2763
      node = self.cfg.GetNodeInfo(nuuid)
2764
      disks = node_disks[node.uuid]
2765

    
2766
      if nres.offline:
2767
        # No data from this node
2768
        data = len(disks) * [(False, "node offline")]
2769
      else:
2770
        msg = nres.fail_msg
2771
        self._ErrorIf(msg, constants.CV_ENODERPC, node.name,
2772
                      "while getting disk information: %s", msg)
2773
        if msg:
2774
          # No data from this node
2775
          data = len(disks) * [(False, msg)]
2776
        else:
2777
          data = []
2778
          for idx, i in enumerate(nres.payload):
2779
            if isinstance(i, (tuple, list)) and len(i) == 2:
2780
              data.append(i)
2781
            else:
2782
              logging.warning("Invalid result from node %s, entry %d: %s",
2783
                              node.name, idx, i)
2784
              data.append((False, "Invalid result from the remote node"))
2785

    
2786
      for ((inst_uuid, _), status) in zip(disks, data):
2787
        instdisk.setdefault(inst_uuid, {}).setdefault(node.uuid, []) \
2788
          .append(status)
2789

    
2790
    # Add empty entries for diskless instances.
2791
    for inst_uuid in diskless_instances:
2792
      assert inst_uuid not in instdisk
2793
      instdisk[inst_uuid] = {}
2794

    
2795
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2796
                      len(nuuids) <= len(instanceinfo[inst].all_nodes) and
2797
                      compat.all(isinstance(s, (tuple, list)) and
2798
                                 len(s) == 2 for s in statuses)
2799
                      for inst, nuuids in instdisk.items()
2800
                      for nuuid, statuses in nuuids.items())
2801
    if __debug__:
2802
      instdisk_keys = set(instdisk)
2803
      instanceinfo_keys = set(instanceinfo)
2804
      assert instdisk_keys == instanceinfo_keys, \
2805
        ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2806
         (instdisk_keys, instanceinfo_keys))
2807

    
2808
    return instdisk
2809

    
2810
  @staticmethod
2811
  def _SshNodeSelector(group_uuid, all_nodes):
2812
    """Create endless iterators for all potential SSH check hosts.
2813

2814
    """
2815
    nodes = [node for node in all_nodes
2816
             if (node.group != group_uuid and
2817
                 not node.offline)]
2818
    keyfunc = operator.attrgetter("group")
2819

    
2820
    return map(itertools.cycle,
2821
               [sorted(map(operator.attrgetter("name"), names))
2822
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2823
                                                  keyfunc)])
2824

    
2825
  @classmethod
2826
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2827
    """Choose which nodes should talk to which other nodes.
2828

2829
    We will make nodes contact all nodes in their group, and one node from
2830
    every other group.
2831

2832
    @warning: This algorithm has a known issue if one node group is much
2833
      smaller than others (e.g. just one node). In such a case all other
2834
      nodes will talk to the single node.
2835

2836
    """
2837
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2838
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
2839

    
2840
    return (online_nodes,
2841
            dict((name, sorted([i.next() for i in sel]))
2842
                 for name in online_nodes))
2843

    
2844
  def BuildHooksEnv(self):
2845
    """Build hooks env.
2846

2847
    Cluster-Verify hooks just ran in the post phase and their failure makes
2848
    the output be logged in the verify output and the verification to fail.
2849

2850
    """
2851
    env = {
2852
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2853
      }
2854

    
2855
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2856
               for node in self.my_node_info.values())
2857

    
2858
    return env
2859

    
2860
  def BuildHooksNodes(self):
2861
    """Build hooks nodes.
2862

2863
    """
2864
    return ([], list(self.my_node_info.keys()))
2865

    
2866
  def Exec(self, feedback_fn):
2867
    """Verify integrity of the node group, performing various test on nodes.
2868

2869
    """
2870
    # This method has too many local variables. pylint: disable=R0914
2871
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
2872

    
2873
    if not self.my_node_uuids:
2874
      # empty node group
2875
      feedback_fn("* Empty node group, skipping verification")
2876
      return True
2877

    
2878
    self.bad = False
2879
    verbose = self.op.verbose
2880
    self._feedback_fn = feedback_fn
2881

    
2882
    vg_name = self.cfg.GetVGName()
2883
    drbd_helper = self.cfg.GetDRBDHelper()
2884
    cluster = self.cfg.GetClusterInfo()
2885
    hypervisors = cluster.enabled_hypervisors
2886
    node_data_list = self.my_node_info.values()
2887

    
2888
    i_non_redundant = [] # Non redundant instances
2889
    i_non_a_balanced = [] # Non auto-balanced instances
2890
    i_offline = 0 # Count of offline instances
2891
    n_offline = 0 # Count of offline nodes
2892
    n_drained = 0 # Count of nodes being drained
2893
    node_vol_should = {}
2894

    
2895
    # FIXME: verify OS list
2896

    
2897
    # File verification
2898
    filemap = ComputeAncillaryFiles(cluster, False)
2899

    
2900
    # do local checksums
2901
    master_node_uuid = self.master_node = self.cfg.GetMasterNode()
2902
    master_ip = self.cfg.GetMasterIP()
2903

    
2904
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_uuids))
2905

    
2906
    user_scripts = []
2907
    if self.cfg.GetUseExternalMipScript():
2908
      user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2909

    
2910
    node_verify_param = {
2911
      constants.NV_FILELIST:
2912
        map(vcluster.MakeVirtualPath,
2913
            utils.UniqueSequence(filename
2914
                                 for files in filemap
2915
                                 for filename in files)),
2916
      constants.NV_NODELIST:
2917
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2918
                                  self.all_node_info.values()),
2919
      constants.NV_HYPERVISOR: hypervisors,
2920
      constants.NV_HVPARAMS:
2921
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2922
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2923
                                 for node in node_data_list
2924
                                 if not node.offline],
2925
      constants.NV_INSTANCELIST: hypervisors,
2926
      constants.NV_VERSION: None,
2927
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2928
      constants.NV_NODESETUP: None,
2929
      constants.NV_TIME: None,
2930
      constants.NV_MASTERIP: (self.cfg.GetMasterNodeName(), master_ip),
2931
      constants.NV_OSLIST: None,
2932
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2933
      constants.NV_USERSCRIPTS: user_scripts,
2934
      }
2935

    
2936
    if vg_name is not None:
2937
      node_verify_param[constants.NV_VGLIST] = None
2938
      node_verify_param[constants.NV_LVLIST] = vg_name
2939
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2940

    
2941
    if cluster.IsDiskTemplateEnabled(constants.DT_DRBD8):
2942
      if drbd_helper:
2943
        node_verify_param[constants.NV_DRBDVERSION] = None
2944
        node_verify_param[constants.NV_DRBDLIST] = None
2945
        node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2946

    
2947
    if cluster.IsFileStorageEnabled() or \
2948
        cluster.IsSharedFileStorageEnabled():
2949
      # Load file storage paths only from master node
2950
      node_verify_param[constants.NV_ACCEPTED_STORAGE_PATHS] = \
2951
        self.cfg.GetMasterNodeName()
2952
      if cluster.IsFileStorageEnabled():
2953
        node_verify_param[constants.NV_FILE_STORAGE_PATH] = \
2954
          cluster.file_storage_dir
2955

    
2956
    # bridge checks
2957
    # FIXME: this needs to be changed per node-group, not cluster-wide
2958
    bridges = set()
2959
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2960
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2961
      bridges.add(default_nicpp[constants.NIC_LINK])
2962
    for inst_uuid in self.my_inst_info.values():
2963
      for nic in inst_uuid.nics:
2964
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
2965
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2966
          bridges.add(full_nic[constants.NIC_LINK])
2967

    
2968
    if bridges:
2969
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
2970

    
2971
    # Build our expected cluster state
2972
    node_image = dict((node.uuid, self.NodeImage(offline=node.offline,
2973
                                                 uuid=node.uuid,
2974
                                                 vm_capable=node.vm_capable))
2975
                      for node in node_data_list)
2976

    
2977
    # Gather OOB paths
2978
    oob_paths = []
2979
    for node in self.all_node_info.values():
2980
      path = SupportsOob(self.cfg, node)
2981
      if path and path not in oob_paths:
2982
        oob_paths.append(path)
2983

    
2984
    if oob_paths:
2985
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2986

    
2987
    for inst_uuid in self.my_inst_uuids:
2988
      instance = self.my_inst_info[inst_uuid]
2989
      if instance.admin_state == constants.ADMINST_OFFLINE:
2990
        i_offline += 1
2991

    
2992
      for nuuid in instance.all_nodes:
2993
        if nuuid not in node_image:
2994
          gnode = self.NodeImage(uuid=nuuid)
2995
          gnode.ghost = (nuuid not in self.all_node_info)
2996
          node_image[nuuid] = gnode
2997

    
2998
      instance.MapLVsByNode(node_vol_should)
2999

    
3000
      pnode = instance.primary_node
3001
      node_image[pnode].pinst.append(instance.uuid)
3002

    
3003
      for snode in instance.secondary_nodes:
3004
        nimg = node_image[snode]
3005
        nimg.sinst.append(instance.uuid)
3006
        if pnode not in nimg.sbp:
3007
          nimg.sbp[pnode] = []
3008
        nimg.sbp[pnode].append(instance.uuid)
3009

    
3010
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
3011
                                               self.my_node_info.keys())
3012
    # The value of exclusive_storage should be the same across the group, so if
3013
    # it's True for at least a node, we act as if it were set for all the nodes
3014
    self._exclusive_storage = compat.any(es_flags.values())
3015
    if self._exclusive_storage:
3016
      node_verify_param[constants.NV_EXCLUSIVEPVS] = True
3017

    
3018
    # At this point, we have the in-memory data structures complete,
3019
    # except for the runtime information, which we'll gather next
3020

    
3021
    # Due to the way our RPC system works, exact response times cannot be
3022
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
3023
    # time before and after executing the request, we can at least have a time
3024
    # window.
3025
    nvinfo_starttime = time.time()
3026
    all_nvinfo = self.rpc.call_node_verify(self.my_node_uuids,
3027
                                           node_verify_param,
3028
                                           self.cfg.GetClusterName(),
3029
                                           self.cfg.GetClusterInfo().hvparams)
3030
    nvinfo_endtime = time.time()
3031

    
3032
    if self.extra_lv_nodes and vg_name is not None:
3033
      extra_lv_nvinfo = \
3034
          self.rpc.call_node_verify(self.extra_lv_nodes,
3035
                                    {constants.NV_LVLIST: vg_name},
3036
                                    self.cfg.GetClusterName(),
3037
                                    self.cfg.GetClusterInfo().hvparams)
3038
    else:
3039
      extra_lv_nvinfo = {}
3040

    
3041
    all_drbd_map = self.cfg.ComputeDRBDMap()
3042

    
3043
    feedback_fn("* Gathering disk information (%s nodes)" %
3044
                len(self.my_node_uuids))
3045
    instdisk = self._CollectDiskInfo(self.my_node_info.keys(), node_image,
3046
                                     self.my_inst_info)
3047

    
3048
    feedback_fn("* Verifying configuration file consistency")
3049

    
3050
    # If not all nodes are being checked, we need to make sure the master node
3051
    # and a non-checked vm_capable node are in the list.
3052
    absent_node_uuids = set(self.all_node_info).difference(self.my_node_info)
3053
    if absent_node_uuids:
3054
      vf_nvinfo = all_nvinfo.copy()
3055
      vf_node_info = list(self.my_node_info.values())
3056
      additional_node_uuids = []
3057
      if master_node_uuid not in self.my_node_info:
3058
        additional_node_uuids.append(master_node_uuid)
3059
        vf_node_info.append(self.all_node_info[master_node_uuid])
3060
      # Add the first vm_capable node we find which is not included,
3061
      # excluding the master node (which we already have)
3062
      for node_uuid in absent_node_uuids:
3063
        nodeinfo = self.all_node_info[node_uuid]
3064
        if (nodeinfo.vm_capable and not nodeinfo.offline and
3065
            node_uuid != master_node_uuid):
3066
          additional_node_uuids.append(node_uuid)
3067
          vf_node_info.append(self.all_node_info[node_uuid])
3068
          break
3069
      key = constants.NV_FILELIST
3070
      vf_nvinfo.update(self.rpc.call_node_verify(
3071
         additional_node_uuids, {key: node_verify_param[key]},
3072
         self.cfg.GetClusterName(), self.cfg.GetClusterInfo().hvparams))
3073
    else:
3074
      vf_nvinfo = all_nvinfo
3075
      vf_node_info = self.my_node_info.values()
3076

    
3077
    self._VerifyFiles(vf_node_info, master_node_uuid, vf_nvinfo, filemap)
3078

    
3079
    feedback_fn("* Verifying node status")
3080

    
3081
    refos_img = None
3082

    
3083
    for node_i in node_data_list:
3084
      nimg = node_image[node_i.uuid]
3085

    
3086
      if node_i.offline:
3087
        if verbose:
3088
          feedback_fn("* Skipping offline node %s" % (node_i.name,))
3089
        n_offline += 1
3090
        continue
3091

    
3092
      if node_i.uuid == master_node_uuid:
3093
        ntype = "master"
3094
      elif node_i.master_candidate:
3095
        ntype = "master candidate"
3096
      elif node_i.drained:
3097
        ntype = "drained"
3098
        n_drained += 1
3099
      else:
3100
        ntype = "regular"
3101
      if verbose:
3102
        feedback_fn("* Verifying node %s (%s)" % (node_i.name, ntype))
3103

    
3104
      msg = all_nvinfo[node_i.uuid].fail_msg
3105
      self._ErrorIf(msg, constants.CV_ENODERPC, node_i.name,
3106
                    "while contacting node: %s", msg)
3107
      if msg:
3108
        nimg.rpc_fail = True
3109
        continue
3110

    
3111
      nresult = all_nvinfo[node_i.uuid].payload
3112

    
3113
      nimg.call_ok = self._VerifyNode(node_i, nresult)
3114
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
3115
      self._VerifyNodeNetwork(node_i, nresult)
3116
      self._VerifyNodeUserScripts(node_i, nresult)
3117
      self._VerifyOob(node_i, nresult)
3118
      self._VerifyAcceptedFileStoragePaths(node_i, nresult,
3119
                                           node_i.uuid == master_node_uuid)
3120
      self._VerifyFileStoragePaths(node_i, nresult)
3121
      self._VerifySharedFileStoragePaths(node_i, nresult)
3122

    
3123
      if nimg.vm_capable:
3124
        self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
3125
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
3126
                             all_drbd_map)
3127

    
3128
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
3129
        self._UpdateNodeInstances(node_i, nresult, nimg)
3130
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
3131
        self._UpdateNodeOS(node_i, nresult, nimg)
3132

    
3133
        if not nimg.os_fail:
3134
          if refos_img is None:
3135
            refos_img = nimg
3136
          self._VerifyNodeOS(node_i, nimg, refos_img)
3137
        self._VerifyNodeBridges(node_i, nresult, bridges)
3138

    
3139
        # Check whether all running instances are primary for the node. (This
3140
        # can no longer be done from _VerifyInstance below, since some of the
3141
        # wrong instances could be from other node groups.)
3142
        non_primary_inst_uuids = set(nimg.instances).difference(nimg.pinst)
3143

    
3144
        for inst_uuid in non_primary_inst_uuids:
3145
          test = inst_uuid in self.all_inst_info
3146
          self._ErrorIf(test, constants.CV_EINSTANCEWRONGNODE,
3147
                        self.cfg.GetInstanceName(inst_uuid),
3148
                        "instance should not run on node %s", node_i.name)
3149
          self._ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
3150
                        "node is running unknown instance %s", inst_uuid)
3151

    
3152
    self._VerifyGroupDRBDVersion(all_nvinfo)
3153
    self._VerifyGroupLVM(node_image, vg_name)
3154

    
3155
    for node_uuid, result in extra_lv_nvinfo.items():
3156
      self._UpdateNodeVolumes(self.all_node_info[node_uuid], result.payload,
3157
                              node_image[node_uuid], vg_name)
3158

    
3159
    feedback_fn("* Verifying instance status")
3160
    for inst_uuid in self.my_inst_uuids:
3161
      instance = self.my_inst_info[inst_uuid]
3162
      if verbose:
3163
        feedback_fn("* Verifying instance %s" % instance.name)
3164
      self._VerifyInstance(instance, node_image, instdisk[inst_uuid])
3165

    
3166
      # If the instance is non-redundant we cannot survive losing its primary
3167
      # node, so we are not N+1 compliant.
3168
      if instance.disk_template not in constants.DTS_MIRRORED:
3169
        i_non_redundant.append(instance)
3170

    
3171
      if not cluster.FillBE(instance)[constants.BE_AUTO_BALANCE]:
3172
        i_non_a_balanced.append(instance)
3173

    
3174
    feedback_fn("* Verifying orphan volumes")
3175
    reserved = utils.FieldSet(*cluster.reserved_lvs)
3176

    
3177
    # We will get spurious "unknown volume" warnings if any node of this group
3178
    # is secondary for an instance whose primary is in another group. To avoid
3179
    # them, we find these instances and add their volumes to node_vol_should.
3180
    for instance in self.all_inst_info.values():
3181
      for secondary in instance.secondary_nodes:
3182
        if (secondary in self.my_node_info
3183
            and instance.name not in self.my_inst_info):
3184
          instance.MapLVsByNode(node_vol_should)
3185
          break
3186

    
3187
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
3188

    
3189
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
3190
      feedback_fn("* Verifying N+1 Memory redundancy")
3191
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
3192

    
3193
    feedback_fn("* Other Notes")
3194
    if i_non_redundant:
3195
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
3196
                  % len(i_non_redundant))
3197

    
3198
    if i_non_a_balanced:
3199
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
3200
                  % len(i_non_a_balanced))
3201

    
3202
    if i_offline:
3203
      feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
3204

    
3205
    if n_offline:
3206
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
3207

    
3208
    if n_drained:
3209
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
3210

    
3211
    return not self.bad
3212

    
3213
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
3214
    """Analyze the post-hooks' result
3215

3216
    This method analyses the hook result, handles it, and sends some
3217
    nicely-formatted feedback back to the user.
3218

3219
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
3220
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
3221
    @param hooks_results: the results of the multi-node hooks rpc call
3222
    @param feedback_fn: function used send feedback back to the caller
3223
    @param lu_result: previous Exec result
3224
    @return: the new Exec result, based on the previous result
3225
        and hook results
3226

3227
    """
3228
    # We only really run POST phase hooks, only for non-empty groups,
3229
    # and are only interested in their results
3230
    if not self.my_node_uuids:
3231
      # empty node group
3232
      pass
3233
    elif phase == constants.HOOKS_PHASE_POST:
3234
      # Used to change hooks' output to proper indentation
3235
      feedback_fn("* Hooks Results")
3236
      assert hooks_results, "invalid result from hooks"
3237

    
3238
      for node_name in hooks_results:
3239
        res = hooks_results[node_name]
3240
        msg = res.fail_msg
3241
        test = msg and not res.offline
3242
        self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3243
                      "Communication failure in hooks execution: %s", msg)
3244
        if res.offline or msg:
3245
          # No need to investigate payload if node is offline or gave
3246
          # an error.
3247
          continue
3248
        for script, hkr, output in res.payload:
3249
          test = hkr == constants.HKR_FAIL
3250
          self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3251
                        "Script %s failed, output:", script)
3252
          if test:
3253
            output = self._HOOKS_INDENT_RE.sub("      ", output)
3254
            feedback_fn("%s" % output)
3255
            lu_result = False
3256

    
3257
    return lu_result
3258

    
3259

    
3260
class LUClusterVerifyDisks(NoHooksLU):
3261
  """Verifies the cluster disks status.
3262

3263
  """
3264
  REQ_BGL = False
3265

    
3266
  def ExpandNames(self):
3267
    self.share_locks = ShareAll()
3268
    self.needed_locks = {
3269
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
3270
      }
3271

    
3272
  def Exec(self, feedback_fn):
3273
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3274

    
3275
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3276
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3277
                           for group in group_names])