Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / cluster.py @ 510f672f

History | View | Annotate | Download (122.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with the cluster."""
23

    
24
import OpenSSL
25

    
26
import copy
27
import itertools
28
import logging
29
import operator
30
import os
31
import re
32
import time
33

    
34
from ganeti import compat
35
from ganeti import constants
36
from ganeti import errors
37
from ganeti import hypervisor
38
from ganeti import locking
39
from ganeti import masterd
40
from ganeti import netutils
41
from ganeti import objects
42
from ganeti import opcodes
43
from ganeti import pathutils
44
from ganeti import query
45
from ganeti import rpc
46
from ganeti import runtime
47
from ganeti import ssh
48
from ganeti import uidpool
49
from ganeti import utils
50
from ganeti import vcluster
51

    
52
from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
53
  ResultWithJobs
54
from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55
  ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56
  GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57
  GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58
  CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59
  ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob, \
60
  CheckIpolicyVsDiskTemplates, CheckDiskAccessModeValidity, \
61
  CheckDiskAccessModeConsistency
62

    
63
import ganeti.masterd.instance
64

    
65

    
66
class LUClusterActivateMasterIp(NoHooksLU):
67
  """Activate the master IP on the master node.
68

69
  """
70
  def Exec(self, feedback_fn):
71
    """Activate the master IP.
72

73
    """
74
    master_params = self.cfg.GetMasterNetworkParameters()
75
    ems = self.cfg.GetUseExternalMipScript()
76
    result = self.rpc.call_node_activate_master_ip(master_params.uuid,
77
                                                   master_params, ems)
78
    result.Raise("Could not activate the master IP")
79

    
80

    
81
class LUClusterDeactivateMasterIp(NoHooksLU):
82
  """Deactivate the master IP on the master node.
83

84
  """
85
  def Exec(self, feedback_fn):
86
    """Deactivate the master IP.
87

88
    """
89
    master_params = self.cfg.GetMasterNetworkParameters()
90
    ems = self.cfg.GetUseExternalMipScript()
91
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
92
                                                     master_params, ems)
93
    result.Raise("Could not deactivate the master IP")
94

    
95

    
96
class LUClusterConfigQuery(NoHooksLU):
97
  """Return configuration values.
98

99
  """
100
  REQ_BGL = False
101

    
102
  def CheckArguments(self):
103
    self.cq = ClusterQuery(None, self.op.output_fields, False)
104

    
105
  def ExpandNames(self):
106
    self.cq.ExpandNames(self)
107

    
108
  def DeclareLocks(self, level):
109
    self.cq.DeclareLocks(self, level)
110

    
111
  def Exec(self, feedback_fn):
112
    result = self.cq.OldStyleQuery(self)
113

    
114
    assert len(result) == 1
115

    
116
    return result[0]
117

    
118

    
119
class LUClusterDestroy(LogicalUnit):
120
  """Logical unit for destroying the cluster.
121

122
  """
123
  HPATH = "cluster-destroy"
124
  HTYPE = constants.HTYPE_CLUSTER
125

    
126
  def BuildHooksEnv(self):
127
    """Build hooks env.
128

129
    """
130
    return {
131
      "OP_TARGET": self.cfg.GetClusterName(),
132
      }
133

    
134
  def BuildHooksNodes(self):
135
    """Build hooks nodes.
136

137
    """
138
    return ([], [])
139

    
140
  def CheckPrereq(self):
141
    """Check prerequisites.
142

143
    This checks whether the cluster is empty.
144

145
    Any errors are signaled by raising errors.OpPrereqError.
146

147
    """
148
    master = self.cfg.GetMasterNode()
149

    
150
    nodelist = self.cfg.GetNodeList()
151
    if len(nodelist) != 1 or nodelist[0] != master:
152
      raise errors.OpPrereqError("There are still %d node(s) in"
153
                                 " this cluster." % (len(nodelist) - 1),
154
                                 errors.ECODE_INVAL)
155
    instancelist = self.cfg.GetInstanceList()
156
    if instancelist:
157
      raise errors.OpPrereqError("There are still %d instance(s) in"
158
                                 " this cluster." % len(instancelist),
159
                                 errors.ECODE_INVAL)
160

    
161
  def Exec(self, feedback_fn):
162
    """Destroys the cluster.
163

164
    """
165
    master_params = self.cfg.GetMasterNetworkParameters()
166

    
167
    # Run post hooks on master node before it's removed
168
    RunPostHook(self, self.cfg.GetNodeName(master_params.uuid))
169

    
170
    ems = self.cfg.GetUseExternalMipScript()
171
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
172
                                                     master_params, ems)
173
    result.Warn("Error disabling the master IP address", self.LogWarning)
174
    return master_params.uuid
175

    
176

    
177
class LUClusterPostInit(LogicalUnit):
178
  """Logical unit for running hooks after cluster initialization.
179

180
  """
181
  HPATH = "cluster-init"
182
  HTYPE = constants.HTYPE_CLUSTER
183

    
184
  def CheckArguments(self):
185
    self.master_uuid = self.cfg.GetMasterNode()
186
    self.master_ndparams = self.cfg.GetNdParams(self.cfg.GetMasterNodeInfo())
187

    
188
    # TODO: When Issue 584 is solved, and None is properly parsed when used
189
    # as a default value, ndparams.get(.., None) can be changed to
190
    # ndparams[..] to access the values directly
191

    
192
    # OpenvSwitch: Warn user if link is missing
193
    if (self.master_ndparams[constants.ND_OVS] and not
194
        self.master_ndparams.get(constants.ND_OVS_LINK, None)):
195
      self.LogInfo("No physical interface for OpenvSwitch was given."
196
                   " OpenvSwitch will not have an outside connection. This"
197
                   " might not be what you want.")
198

    
199
  def BuildHooksEnv(self):
200
    """Build hooks env.
201

202
    """
203
    return {
204
      "OP_TARGET": self.cfg.GetClusterName(),
205
      }
206

    
207
  def BuildHooksNodes(self):
208
    """Build hooks nodes.
209

210
    """
211
    return ([], [self.cfg.GetMasterNode()])
212

    
213
  def Exec(self, feedback_fn):
214
    """Create and configure Open vSwitch
215

216
    """
217
    if self.master_ndparams[constants.ND_OVS]:
218
      result = self.rpc.call_node_configure_ovs(
219
                 self.master_uuid,
220
                 self.master_ndparams[constants.ND_OVS_NAME],
221
                 self.master_ndparams.get(constants.ND_OVS_LINK, None))
222
      result.Raise("Could not successully configure Open vSwitch")
223
    return True
224

    
225

    
226
class ClusterQuery(QueryBase):
227
  FIELDS = query.CLUSTER_FIELDS
228

    
229
  #: Do not sort (there is only one item)
230
  SORT_FIELD = None
231

    
232
  def ExpandNames(self, lu):
233
    lu.needed_locks = {}
234

    
235
    # The following variables interact with _QueryBase._GetNames
236
    self.wanted = locking.ALL_SET
237
    self.do_locking = self.use_locking
238

    
239
    if self.do_locking:
240
      raise errors.OpPrereqError("Can not use locking for cluster queries",
241
                                 errors.ECODE_INVAL)
242

    
243
  def DeclareLocks(self, lu, level):
244
    pass
245

    
246
  def _GetQueryData(self, lu):
247
    """Computes the list of nodes and their attributes.
248

249
    """
250
    # Locking is not used
251
    assert not (compat.any(lu.glm.is_owned(level)
252
                           for level in locking.LEVELS
253
                           if level != locking.LEVEL_CLUSTER) or
254
                self.do_locking or self.use_locking)
255

    
256
    if query.CQ_CONFIG in self.requested_data:
257
      cluster = lu.cfg.GetClusterInfo()
258
      nodes = lu.cfg.GetAllNodesInfo()
259
    else:
260
      cluster = NotImplemented
261
      nodes = NotImplemented
262

    
263
    if query.CQ_QUEUE_DRAINED in self.requested_data:
264
      drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
265
    else:
266
      drain_flag = NotImplemented
267

    
268
    if query.CQ_WATCHER_PAUSE in self.requested_data:
269
      master_node_uuid = lu.cfg.GetMasterNode()
270

    
271
      result = lu.rpc.call_get_watcher_pause(master_node_uuid)
272
      result.Raise("Can't retrieve watcher pause from master node '%s'" %
273
                   lu.cfg.GetMasterNodeName())
274

    
275
      watcher_pause = result.payload
276
    else:
277
      watcher_pause = NotImplemented
278

    
279
    return query.ClusterQueryData(cluster, nodes, drain_flag, watcher_pause)
280

    
281

    
282
class LUClusterQuery(NoHooksLU):
283
  """Query cluster configuration.
284

285
  """
286
  REQ_BGL = False
287

    
288
  def ExpandNames(self):
289
    self.needed_locks = {}
290

    
291
  def Exec(self, feedback_fn):
292
    """Return cluster config.
293

294
    """
295
    cluster = self.cfg.GetClusterInfo()
296
    os_hvp = {}
297

    
298
    # Filter just for enabled hypervisors
299
    for os_name, hv_dict in cluster.os_hvp.items():
300
      os_hvp[os_name] = {}
301
      for hv_name, hv_params in hv_dict.items():
302
        if hv_name in cluster.enabled_hypervisors:
303
          os_hvp[os_name][hv_name] = hv_params
304

    
305
    # Convert ip_family to ip_version
306
    primary_ip_version = constants.IP4_VERSION
307
    if cluster.primary_ip_family == netutils.IP6Address.family:
308
      primary_ip_version = constants.IP6_VERSION
309

    
310
    result = {
311
      "software_version": constants.RELEASE_VERSION,
312
      "protocol_version": constants.PROTOCOL_VERSION,
313
      "config_version": constants.CONFIG_VERSION,
314
      "os_api_version": max(constants.OS_API_VERSIONS),
315
      "export_version": constants.EXPORT_VERSION,
316
      "vcs_version": constants.VCS_VERSION,
317
      "architecture": runtime.GetArchInfo(),
318
      "name": cluster.cluster_name,
319
      "master": self.cfg.GetMasterNodeName(),
320
      "default_hypervisor": cluster.primary_hypervisor,
321
      "enabled_hypervisors": cluster.enabled_hypervisors,
322
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
323
                        for hypervisor_name in cluster.enabled_hypervisors]),
324
      "os_hvp": os_hvp,
325
      "beparams": cluster.beparams,
326
      "osparams": cluster.osparams,
327
      "ipolicy": cluster.ipolicy,
328
      "nicparams": cluster.nicparams,
329
      "ndparams": cluster.ndparams,
330
      "diskparams": cluster.diskparams,
331
      "candidate_pool_size": cluster.candidate_pool_size,
332
      "master_netdev": cluster.master_netdev,
333
      "master_netmask": cluster.master_netmask,
334
      "use_external_mip_script": cluster.use_external_mip_script,
335
      "volume_group_name": cluster.volume_group_name,
336
      "drbd_usermode_helper": cluster.drbd_usermode_helper,
337
      "file_storage_dir": cluster.file_storage_dir,
338
      "shared_file_storage_dir": cluster.shared_file_storage_dir,
339
      "maintain_node_health": cluster.maintain_node_health,
340
      "ctime": cluster.ctime,
341
      "mtime": cluster.mtime,
342
      "uuid": cluster.uuid,
343
      "tags": list(cluster.GetTags()),
344
      "uid_pool": cluster.uid_pool,
345
      "default_iallocator": cluster.default_iallocator,
346
      "reserved_lvs": cluster.reserved_lvs,
347
      "primary_ip_version": primary_ip_version,
348
      "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
349
      "hidden_os": cluster.hidden_os,
350
      "blacklisted_os": cluster.blacklisted_os,
351
      "enabled_disk_templates": cluster.enabled_disk_templates,
352
      }
353

    
354
    return result
355

    
356

    
357
class LUClusterRedistConf(NoHooksLU):
358
  """Force the redistribution of cluster configuration.
359

360
  This is a very simple LU.
361

362
  """
363
  REQ_BGL = False
364

    
365
  def ExpandNames(self):
366
    self.needed_locks = {
367
      locking.LEVEL_NODE: locking.ALL_SET,
368
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
369
    }
370
    self.share_locks = ShareAll()
371

    
372
  def Exec(self, feedback_fn):
373
    """Redistribute the configuration.
374

375
    """
376
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
377
    RedistributeAncillaryFiles(self)
378

    
379

    
380
class LUClusterRename(LogicalUnit):
381
  """Rename the cluster.
382

383
  """
384
  HPATH = "cluster-rename"
385
  HTYPE = constants.HTYPE_CLUSTER
386

    
387
  def BuildHooksEnv(self):
388
    """Build hooks env.
389

390
    """
391
    return {
392
      "OP_TARGET": self.cfg.GetClusterName(),
393
      "NEW_NAME": self.op.name,
394
      }
395

    
396
  def BuildHooksNodes(self):
397
    """Build hooks nodes.
398

399
    """
400
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
401

    
402
  def CheckPrereq(self):
403
    """Verify that the passed name is a valid one.
404

405
    """
406
    hostname = netutils.GetHostname(name=self.op.name,
407
                                    family=self.cfg.GetPrimaryIPFamily())
408

    
409
    new_name = hostname.name
410
    self.ip = new_ip = hostname.ip
411
    old_name = self.cfg.GetClusterName()
412
    old_ip = self.cfg.GetMasterIP()
413
    if new_name == old_name and new_ip == old_ip:
414
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
415
                                 " cluster has changed",
416
                                 errors.ECODE_INVAL)
417
    if new_ip != old_ip:
418
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
419
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
420
                                   " reachable on the network" %
421
                                   new_ip, errors.ECODE_NOTUNIQUE)
422

    
423
    self.op.name = new_name
424

    
425
  def Exec(self, feedback_fn):
426
    """Rename the cluster.
427

428
    """
429
    clustername = self.op.name
430
    new_ip = self.ip
431

    
432
    # shutdown the master IP
433
    master_params = self.cfg.GetMasterNetworkParameters()
434
    ems = self.cfg.GetUseExternalMipScript()
435
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
436
                                                     master_params, ems)
437
    result.Raise("Could not disable the master role")
438

    
439
    try:
440
      cluster = self.cfg.GetClusterInfo()
441
      cluster.cluster_name = clustername
442
      cluster.master_ip = new_ip
443
      self.cfg.Update(cluster, feedback_fn)
444

    
445
      # update the known hosts file
446
      ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
447
      node_list = self.cfg.GetOnlineNodeList()
448
      try:
449
        node_list.remove(master_params.uuid)
450
      except ValueError:
451
        pass
452
      UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
453
    finally:
454
      master_params.ip = new_ip
455
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
456
                                                     master_params, ems)
457
      result.Warn("Could not re-enable the master role on the master,"
458
                  " please restart manually", self.LogWarning)
459

    
460
    return clustername
461

    
462

    
463
class LUClusterRepairDiskSizes(NoHooksLU):
464
  """Verifies the cluster disks sizes.
465

466
  """
467
  REQ_BGL = False
468

    
469
  def ExpandNames(self):
470
    if self.op.instances:
471
      (_, self.wanted_names) = GetWantedInstances(self, self.op.instances)
472
      # Not getting the node allocation lock as only a specific set of
473
      # instances (and their nodes) is going to be acquired
474
      self.needed_locks = {
475
        locking.LEVEL_NODE_RES: [],
476
        locking.LEVEL_INSTANCE: self.wanted_names,
477
        }
478
      self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
479
    else:
480
      self.wanted_names = None
481
      self.needed_locks = {
482
        locking.LEVEL_NODE_RES: locking.ALL_SET,
483
        locking.LEVEL_INSTANCE: locking.ALL_SET,
484

    
485
        # This opcode is acquires the node locks for all instances
486
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
487
        }
488

    
489
    self.share_locks = {
490
      locking.LEVEL_NODE_RES: 1,
491
      locking.LEVEL_INSTANCE: 0,
492
      locking.LEVEL_NODE_ALLOC: 1,
493
      }
494

    
495
  def DeclareLocks(self, level):
496
    if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
497
      self._LockInstancesNodes(primary_only=True, level=level)
498

    
499
  def CheckPrereq(self):
500
    """Check prerequisites.
501

502
    This only checks the optional instance list against the existing names.
503

504
    """
505
    if self.wanted_names is None:
506
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
507

    
508
    self.wanted_instances = \
509
        map(compat.snd, self.cfg.GetMultiInstanceInfoByName(self.wanted_names))
510

    
511
  def _EnsureChildSizes(self, disk):
512
    """Ensure children of the disk have the needed disk size.
513

514
    This is valid mainly for DRBD8 and fixes an issue where the
515
    children have smaller disk size.
516

517
    @param disk: an L{ganeti.objects.Disk} object
518

519
    """
520
    if disk.dev_type == constants.DT_DRBD8:
521
      assert disk.children, "Empty children for DRBD8?"
522
      fchild = disk.children[0]
523
      mismatch = fchild.size < disk.size
524
      if mismatch:
525
        self.LogInfo("Child disk has size %d, parent %d, fixing",
526
                     fchild.size, disk.size)
527
        fchild.size = disk.size
528

    
529
      # and we recurse on this child only, not on the metadev
530
      return self._EnsureChildSizes(fchild) or mismatch
531
    else:
532
      return False
533

    
534
  def Exec(self, feedback_fn):
535
    """Verify the size of cluster disks.
536

537
    """
538
    # TODO: check child disks too
539
    # TODO: check differences in size between primary/secondary nodes
540
    per_node_disks = {}
541
    for instance in self.wanted_instances:
542
      pnode = instance.primary_node
543
      if pnode not in per_node_disks:
544
        per_node_disks[pnode] = []
545
      for idx, disk in enumerate(instance.disks):
546
        per_node_disks[pnode].append((instance, idx, disk))
547

    
548
    assert not (frozenset(per_node_disks.keys()) -
549
                self.owned_locks(locking.LEVEL_NODE_RES)), \
550
      "Not owning correct locks"
551
    assert not self.owned_locks(locking.LEVEL_NODE)
552

    
553
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
554
                                               per_node_disks.keys())
555

    
556
    changed = []
557
    for node_uuid, dskl in per_node_disks.items():
558
      if not dskl:
559
        # no disks on the node
560
        continue
561

    
562
      newl = [([v[2].Copy()], v[0]) for v in dskl]
563
      node_name = self.cfg.GetNodeName(node_uuid)
564
      result = self.rpc.call_blockdev_getdimensions(node_uuid, newl)
565
      if result.fail_msg:
566
        self.LogWarning("Failure in blockdev_getdimensions call to node"
567
                        " %s, ignoring", node_name)
568
        continue
569
      if len(result.payload) != len(dskl):
570
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
571
                        " result.payload=%s", node_name, len(dskl),
572
                        result.payload)
573
        self.LogWarning("Invalid result from node %s, ignoring node results",
574
                        node_name)
575
        continue
576
      for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
577
        if dimensions is None:
578
          self.LogWarning("Disk %d of instance %s did not return size"
579
                          " information, ignoring", idx, instance.name)
580
          continue
581
        if not isinstance(dimensions, (tuple, list)):
582
          self.LogWarning("Disk %d of instance %s did not return valid"
583
                          " dimension information, ignoring", idx,
584
                          instance.name)
585
          continue
586
        (size, spindles) = dimensions
587
        if not isinstance(size, (int, long)):
588
          self.LogWarning("Disk %d of instance %s did not return valid"
589
                          " size information, ignoring", idx, instance.name)
590
          continue
591
        size = size >> 20
592
        if size != disk.size:
593
          self.LogInfo("Disk %d of instance %s has mismatched size,"
594
                       " correcting: recorded %d, actual %d", idx,
595
                       instance.name, disk.size, size)
596
          disk.size = size
597
          self.cfg.Update(instance, feedback_fn)
598
          changed.append((instance.name, idx, "size", size))
599
        if es_flags[node_uuid]:
600
          if spindles is None:
601
            self.LogWarning("Disk %d of instance %s did not return valid"
602
                            " spindles information, ignoring", idx,
603
                            instance.name)
604
          elif disk.spindles is None or disk.spindles != spindles:
605
            self.LogInfo("Disk %d of instance %s has mismatched spindles,"
606
                         " correcting: recorded %s, actual %s",
607
                         idx, instance.name, disk.spindles, spindles)
608
            disk.spindles = spindles
609
            self.cfg.Update(instance, feedback_fn)
610
            changed.append((instance.name, idx, "spindles", disk.spindles))
611
        if self._EnsureChildSizes(disk):
612
          self.cfg.Update(instance, feedback_fn)
613
          changed.append((instance.name, idx, "size", disk.size))
614
    return changed
615

    
616

    
617
def _ValidateNetmask(cfg, netmask):
618
  """Checks if a netmask is valid.
619

620
  @type cfg: L{config.ConfigWriter}
621
  @param cfg: The cluster configuration
622
  @type netmask: int
623
  @param netmask: the netmask to be verified
624
  @raise errors.OpPrereqError: if the validation fails
625

626
  """
627
  ip_family = cfg.GetPrimaryIPFamily()
628
  try:
629
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
630
  except errors.ProgrammerError:
631
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
632
                               ip_family, errors.ECODE_INVAL)
633
  if not ipcls.ValidateNetmask(netmask):
634
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
635
                               (netmask), errors.ECODE_INVAL)
636

    
637

    
638
def CheckFileBasedStoragePathVsEnabledDiskTemplates(
639
    logging_warn_fn, file_storage_dir, enabled_disk_templates,
640
    file_disk_template):
641
  """Checks whether the given file-based storage directory is acceptable.
642

643
  Note: This function is public, because it is also used in bootstrap.py.
644

645
  @type logging_warn_fn: function
646
  @param logging_warn_fn: function which accepts a string and logs it
647
  @type file_storage_dir: string
648
  @param file_storage_dir: the directory to be used for file-based instances
649
  @type enabled_disk_templates: list of string
650
  @param enabled_disk_templates: the list of enabled disk templates
651
  @type file_disk_template: string
652
  @param file_disk_template: the file-based disk template for which the
653
      path should be checked
654

655
  """
656
  assert (file_disk_template in
657
          utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
658
  file_storage_enabled = file_disk_template in enabled_disk_templates
659
  if file_storage_dir is not None:
660
    if file_storage_dir == "":
661
      if file_storage_enabled:
662
        raise errors.OpPrereqError(
663
            "Unsetting the '%s' storage directory while having '%s' storage"
664
            " enabled is not permitted." %
665
            (file_disk_template, file_disk_template))
666
    else:
667
      if not file_storage_enabled:
668
        logging_warn_fn(
669
            "Specified a %s storage directory, although %s storage is not"
670
            " enabled." % (file_disk_template, file_disk_template))
671
  else:
672
    raise errors.ProgrammerError("Received %s storage dir with value"
673
                                 " 'None'." % file_disk_template)
674

    
675

    
676
def CheckFileStoragePathVsEnabledDiskTemplates(
677
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
678
  """Checks whether the given file storage directory is acceptable.
679

680
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
681

682
  """
683
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
684
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
685
      constants.DT_FILE)
686

    
687

    
688
def CheckSharedFileStoragePathVsEnabledDiskTemplates(
689
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
690
  """Checks whether the given shared file storage directory is acceptable.
691

692
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
693

694
  """
695
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
696
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
697
      constants.DT_SHARED_FILE)
698

    
699

    
700
class LUClusterSetParams(LogicalUnit):
701
  """Change the parameters of the cluster.
702

703
  """
704
  HPATH = "cluster-modify"
705
  HTYPE = constants.HTYPE_CLUSTER
706
  REQ_BGL = False
707

    
708
  def CheckArguments(self):
709
    """Check parameters
710

711
    """
712
    if self.op.uid_pool:
713
      uidpool.CheckUidPool(self.op.uid_pool)
714

    
715
    if self.op.add_uids:
716
      uidpool.CheckUidPool(self.op.add_uids)
717

    
718
    if self.op.remove_uids:
719
      uidpool.CheckUidPool(self.op.remove_uids)
720

    
721
    if self.op.master_netmask is not None:
722
      _ValidateNetmask(self.cfg, self.op.master_netmask)
723

    
724
    if self.op.diskparams:
725
      for dt_params in self.op.diskparams.values():
726
        utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
727
      try:
728
        utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
729
        CheckDiskAccessModeValidity(self.op.diskparams)
730
      except errors.OpPrereqError, err:
731
        raise errors.OpPrereqError("While verify diskparams options: %s" % err,
732
                                   errors.ECODE_INVAL)
733

    
734
  def ExpandNames(self):
735
    # FIXME: in the future maybe other cluster params won't require checking on
736
    # all nodes to be modified.
737
    # FIXME: This opcode changes cluster-wide settings. Is acquiring all
738
    # resource locks the right thing, shouldn't it be the BGL instead?
739
    self.needed_locks = {
740
      locking.LEVEL_NODE: locking.ALL_SET,
741
      locking.LEVEL_INSTANCE: locking.ALL_SET,
742
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
743
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
744
    }
745
    self.share_locks = ShareAll()
746

    
747
  def BuildHooksEnv(self):
748
    """Build hooks env.
749

750
    """
751
    return {
752
      "OP_TARGET": self.cfg.GetClusterName(),
753
      "NEW_VG_NAME": self.op.vg_name,
754
      }
755

    
756
  def BuildHooksNodes(self):
757
    """Build hooks nodes.
758

759
    """
760
    mn = self.cfg.GetMasterNode()
761
    return ([mn], [mn])
762

    
763
  def _CheckVgName(self, node_uuids, enabled_disk_templates,
764
                   new_enabled_disk_templates):
765
    """Check the consistency of the vg name on all nodes and in case it gets
766
       unset whether there are instances still using it.
767

768
    """
769
    lvm_is_enabled = utils.IsLvmEnabled(enabled_disk_templates)
770
    lvm_gets_enabled = utils.LvmGetsEnabled(enabled_disk_templates,
771
                                            new_enabled_disk_templates)
772
    current_vg_name = self.cfg.GetVGName()
773

    
774
    if self.op.vg_name == '':
775
      if lvm_is_enabled:
776
        raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
777
                                   " disk templates are or get enabled.")
778

    
779
    if self.op.vg_name is None:
780
      if current_vg_name is None and lvm_is_enabled:
781
        raise errors.OpPrereqError("Please specify a volume group when"
782
                                   " enabling lvm-based disk-templates.")
783

    
784
    if self.op.vg_name is not None and not self.op.vg_name:
785
      if self.cfg.HasAnyDiskOfType(constants.DT_PLAIN):
786
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
787
                                   " instances exist", errors.ECODE_INVAL)
788

    
789
    if (self.op.vg_name is not None and lvm_is_enabled) or \
790
        (self.cfg.GetVGName() is not None and lvm_gets_enabled):
791
      self._CheckVgNameOnNodes(node_uuids)
792

    
793
  def _CheckVgNameOnNodes(self, node_uuids):
794
    """Check the status of the volume group on each node.
795

796
    """
797
    vglist = self.rpc.call_vg_list(node_uuids)
798
    for node_uuid in node_uuids:
799
      msg = vglist[node_uuid].fail_msg
800
      if msg:
801
        # ignoring down node
802
        self.LogWarning("Error while gathering data on node %s"
803
                        " (ignoring node): %s",
804
                        self.cfg.GetNodeName(node_uuid), msg)
805
        continue
806
      vgstatus = utils.CheckVolumeGroupSize(vglist[node_uuid].payload,
807
                                            self.op.vg_name,
808
                                            constants.MIN_VG_SIZE)
809
      if vgstatus:
810
        raise errors.OpPrereqError("Error on node '%s': %s" %
811
                                   (self.cfg.GetNodeName(node_uuid), vgstatus),
812
                                   errors.ECODE_ENVIRON)
813

    
814
  @staticmethod
815
  def _GetDiskTemplateSetsInner(op_enabled_disk_templates,
816
                                old_enabled_disk_templates):
817
    """Computes three sets of disk templates.
818

819
    @see: C{_GetDiskTemplateSets} for more details.
820

821
    """
822
    enabled_disk_templates = None
823
    new_enabled_disk_templates = []
824
    disabled_disk_templates = []
825
    if op_enabled_disk_templates:
826
      enabled_disk_templates = op_enabled_disk_templates
827
      new_enabled_disk_templates = \
828
        list(set(enabled_disk_templates)
829
             - set(old_enabled_disk_templates))
830
      disabled_disk_templates = \
831
        list(set(old_enabled_disk_templates)
832
             - set(enabled_disk_templates))
833
    else:
834
      enabled_disk_templates = old_enabled_disk_templates
835
    return (enabled_disk_templates, new_enabled_disk_templates,
836
            disabled_disk_templates)
837

    
838
  def _GetDiskTemplateSets(self, cluster):
839
    """Computes three sets of disk templates.
840

841
    The three sets are:
842
      - disk templates that will be enabled after this operation (no matter if
843
        they were enabled before or not)
844
      - disk templates that get enabled by this operation (thus haven't been
845
        enabled before.)
846
      - disk templates that get disabled by this operation
847

848
    """
849
    return self._GetDiskTemplateSetsInner(self.op.enabled_disk_templates,
850
                                          cluster.enabled_disk_templates)
851

    
852
  def _CheckIpolicy(self, cluster, enabled_disk_templates):
853
    """Checks the ipolicy.
854

855
    @type cluster: C{objects.Cluster}
856
    @param cluster: the cluster's configuration
857
    @type enabled_disk_templates: list of string
858
    @param enabled_disk_templates: list of (possibly newly) enabled disk
859
      templates
860

861
    """
862
    # FIXME: write unit tests for this
863
    if self.op.ipolicy:
864
      self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
865
                                           group_policy=False)
866

    
867
      CheckIpolicyVsDiskTemplates(self.new_ipolicy,
868
                                  enabled_disk_templates)
869

    
870
      all_instances = self.cfg.GetAllInstancesInfo().values()
871
      violations = set()
872
      for group in self.cfg.GetAllNodeGroupsInfo().values():
873
        instances = frozenset([inst for inst in all_instances
874
                               if compat.any(nuuid in group.members
875
                                             for nuuid in inst.all_nodes)])
876
        new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
877
        ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
878
        new = ComputeNewInstanceViolations(ipol, new_ipolicy, instances,
879
                                           self.cfg)
880
        if new:
881
          violations.update(new)
882

    
883
      if violations:
884
        self.LogWarning("After the ipolicy change the following instances"
885
                        " violate them: %s",
886
                        utils.CommaJoin(utils.NiceSort(violations)))
887
    else:
888
      CheckIpolicyVsDiskTemplates(cluster.ipolicy,
889
                                  enabled_disk_templates)
890

    
891
  def _CheckDrbdHelperOnNodes(self, drbd_helper, node_uuids):
892
    """Checks whether the set DRBD helper actually exists on the nodes.
893

894
    @type drbd_helper: string
895
    @param drbd_helper: path of the drbd usermode helper binary
896
    @type node_uuids: list of strings
897
    @param node_uuids: list of node UUIDs to check for the helper
898

899
    """
900
    # checks given drbd helper on all nodes
901
    helpers = self.rpc.call_drbd_helper(node_uuids)
902
    for (_, ninfo) in self.cfg.GetMultiNodeInfo(node_uuids):
903
      if ninfo.offline:
904
        self.LogInfo("Not checking drbd helper on offline node %s",
905
                     ninfo.name)
906
        continue
907
      msg = helpers[ninfo.uuid].fail_msg
908
      if msg:
909
        raise errors.OpPrereqError("Error checking drbd helper on node"
910
                                   " '%s': %s" % (ninfo.name, msg),
911
                                   errors.ECODE_ENVIRON)
912
      node_helper = helpers[ninfo.uuid].payload
913
      if node_helper != drbd_helper:
914
        raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
915
                                   (ninfo.name, node_helper),
916
                                   errors.ECODE_ENVIRON)
917

    
918
  def _CheckDrbdHelper(self, node_uuids, drbd_enabled, drbd_gets_enabled):
919
    """Check the DRBD usermode helper.
920

921
    @type node_uuids: list of strings
922
    @param node_uuids: a list of nodes' UUIDs
923
    @type drbd_enabled: boolean
924
    @param drbd_enabled: whether DRBD will be enabled after this operation
925
      (no matter if it was disabled before or not)
926
    @type drbd_gets_enabled: boolen
927
    @param drbd_gets_enabled: true if DRBD was disabled before this
928
      operation, but will be enabled afterwards
929

930
    """
931
    if self.op.drbd_helper == '':
932
      if drbd_enabled:
933
        raise errors.OpPrereqError("Cannot disable drbd helper while"
934
                                   " DRBD is enabled.")
935
      if self.cfg.HasAnyDiskOfType(constants.DT_DRBD8):
936
        raise errors.OpPrereqError("Cannot disable drbd helper while"
937
                                   " drbd-based instances exist",
938
                                   errors.ECODE_INVAL)
939

    
940
    else:
941
      if self.op.drbd_helper is not None and drbd_enabled:
942
        self._CheckDrbdHelperOnNodes(self.op.drbd_helper, node_uuids)
943
      else:
944
        if drbd_gets_enabled:
945
          current_drbd_helper = self.cfg.GetClusterInfo().drbd_usermode_helper
946
          if current_drbd_helper is not None:
947
            self._CheckDrbdHelperOnNodes(current_drbd_helper, node_uuids)
948
          else:
949
            raise errors.OpPrereqError("Cannot enable DRBD without a"
950
                                       " DRBD usermode helper set.")
951

    
952
  def _CheckInstancesOfDisabledDiskTemplates(
953
      self, disabled_disk_templates):
954
    """Check whether we try to disable a disk template that is in use.
955

956
    @type disabled_disk_templates: list of string
957
    @param disabled_disk_templates: list of disk templates that are going to
958
      be disabled by this operation
959

960
    """
961
    for disk_template in disabled_disk_templates:
962
      if self.cfg.HasAnyDiskOfType(disk_template):
963
        raise errors.OpPrereqError(
964
            "Cannot disable disk template '%s', because there is at least one"
965
            " instance using it." % disk_template)
966

    
967
  def CheckPrereq(self):
968
    """Check prerequisites.
969

970
    This checks whether the given params don't conflict and
971
    if the given volume group is valid.
972

973
    """
974
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
975
    self.cluster = cluster = self.cfg.GetClusterInfo()
976

    
977
    vm_capable_node_uuids = [node.uuid
978
                             for node in self.cfg.GetAllNodesInfo().values()
979
                             if node.uuid in node_uuids and node.vm_capable]
980

    
981
    (enabled_disk_templates, new_enabled_disk_templates,
982
      disabled_disk_templates) = self._GetDiskTemplateSets(cluster)
983
    self._CheckInstancesOfDisabledDiskTemplates(disabled_disk_templates)
984

    
985
    self._CheckVgName(vm_capable_node_uuids, enabled_disk_templates,
986
                      new_enabled_disk_templates)
987

    
988
    if self.op.file_storage_dir is not None:
989
      CheckFileStoragePathVsEnabledDiskTemplates(
990
          self.LogWarning, self.op.file_storage_dir, enabled_disk_templates)
991

    
992
    if self.op.shared_file_storage_dir is not None:
993
      CheckSharedFileStoragePathVsEnabledDiskTemplates(
994
          self.LogWarning, self.op.shared_file_storage_dir,
995
          enabled_disk_templates)
996

    
997
    drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
998
    drbd_gets_enabled = constants.DT_DRBD8 in new_enabled_disk_templates
999
    self._CheckDrbdHelper(node_uuids, drbd_enabled, drbd_gets_enabled)
1000

    
1001
    # validate params changes
1002
    if self.op.beparams:
1003
      objects.UpgradeBeParams(self.op.beparams)
1004
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1005
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
1006

    
1007
    if self.op.ndparams:
1008
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
1009
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
1010

    
1011
      # TODO: we need a more general way to handle resetting
1012
      # cluster-level parameters to default values
1013
      if self.new_ndparams["oob_program"] == "":
1014
        self.new_ndparams["oob_program"] = \
1015
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
1016

    
1017
    if self.op.hv_state:
1018
      new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
1019
                                           self.cluster.hv_state_static)
1020
      self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
1021
                               for hv, values in new_hv_state.items())
1022

    
1023
    if self.op.disk_state:
1024
      new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
1025
                                               self.cluster.disk_state_static)
1026
      self.new_disk_state = \
1027
        dict((storage, dict((name, cluster.SimpleFillDiskState(values))
1028
                            for name, values in svalues.items()))
1029
             for storage, svalues in new_disk_state.items())
1030

    
1031
    self._CheckIpolicy(cluster, enabled_disk_templates)
1032

    
1033
    if self.op.nicparams:
1034
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1035
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
1036
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
1037
      nic_errors = []
1038

    
1039
      # check all instances for consistency
1040
      for instance in self.cfg.GetAllInstancesInfo().values():
1041
        for nic_idx, nic in enumerate(instance.nics):
1042
          params_copy = copy.deepcopy(nic.nicparams)
1043
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
1044

    
1045
          # check parameter syntax
1046
          try:
1047
            objects.NIC.CheckParameterSyntax(params_filled)
1048
          except errors.ConfigurationError, err:
1049
            nic_errors.append("Instance %s, nic/%d: %s" %
1050
                              (instance.name, nic_idx, err))
1051

    
1052
          # if we're moving instances to routed, check that they have an ip
1053
          target_mode = params_filled[constants.NIC_MODE]
1054
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
1055
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
1056
                              " address" % (instance.name, nic_idx))
1057
      if nic_errors:
1058
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
1059
                                   "\n".join(nic_errors), errors.ECODE_INVAL)
1060

    
1061
    # hypervisor list/parameters
1062
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
1063
    if self.op.hvparams:
1064
      for hv_name, hv_dict in self.op.hvparams.items():
1065
        if hv_name not in self.new_hvparams:
1066
          self.new_hvparams[hv_name] = hv_dict
1067
        else:
1068
          self.new_hvparams[hv_name].update(hv_dict)
1069

    
1070
    # disk template parameters
1071
    self.new_diskparams = objects.FillDict(cluster.diskparams, {})
1072
    if self.op.diskparams:
1073
      for dt_name, dt_params in self.op.diskparams.items():
1074
        if dt_name not in self.new_diskparams:
1075
          self.new_diskparams[dt_name] = dt_params
1076
        else:
1077
          self.new_diskparams[dt_name].update(dt_params)
1078
      CheckDiskAccessModeConsistency(self.op.diskparams, self.cfg)
1079

    
1080
    # os hypervisor parameters
1081
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
1082
    if self.op.os_hvp:
1083
      for os_name, hvs in self.op.os_hvp.items():
1084
        if os_name not in self.new_os_hvp:
1085
          self.new_os_hvp[os_name] = hvs
1086
        else:
1087
          for hv_name, hv_dict in hvs.items():
1088
            if hv_dict is None:
1089
              # Delete if it exists
1090
              self.new_os_hvp[os_name].pop(hv_name, None)
1091
            elif hv_name not in self.new_os_hvp[os_name]:
1092
              self.new_os_hvp[os_name][hv_name] = hv_dict
1093
            else:
1094
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
1095

    
1096
    # os parameters
1097
    self.new_osp = objects.FillDict(cluster.osparams, {})
1098
    if self.op.osparams:
1099
      for os_name, osp in self.op.osparams.items():
1100
        if os_name not in self.new_osp:
1101
          self.new_osp[os_name] = {}
1102

    
1103
        self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
1104
                                                 use_none=True)
1105

    
1106
        if not self.new_osp[os_name]:
1107
          # we removed all parameters
1108
          del self.new_osp[os_name]
1109
        else:
1110
          # check the parameter validity (remote check)
1111
          CheckOSParams(self, False, [self.cfg.GetMasterNode()],
1112
                        os_name, self.new_osp[os_name])
1113

    
1114
    # changes to the hypervisor list
1115
    if self.op.enabled_hypervisors is not None:
1116
      self.hv_list = self.op.enabled_hypervisors
1117
      for hv in self.hv_list:
1118
        # if the hypervisor doesn't already exist in the cluster
1119
        # hvparams, we initialize it to empty, and then (in both
1120
        # cases) we make sure to fill the defaults, as we might not
1121
        # have a complete defaults list if the hypervisor wasn't
1122
        # enabled before
1123
        if hv not in new_hvp:
1124
          new_hvp[hv] = {}
1125
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
1126
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
1127
    else:
1128
      self.hv_list = cluster.enabled_hypervisors
1129

    
1130
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1131
      # either the enabled list has changed, or the parameters have, validate
1132
      for hv_name, hv_params in self.new_hvparams.items():
1133
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1134
            (self.op.enabled_hypervisors and
1135
             hv_name in self.op.enabled_hypervisors)):
1136
          # either this is a new hypervisor, or its parameters have changed
1137
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1138
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1139
          hv_class.CheckParameterSyntax(hv_params)
1140
          CheckHVParams(self, node_uuids, hv_name, hv_params)
1141

    
1142
    self._CheckDiskTemplateConsistency()
1143

    
1144
    if self.op.os_hvp:
1145
      # no need to check any newly-enabled hypervisors, since the
1146
      # defaults have already been checked in the above code-block
1147
      for os_name, os_hvp in self.new_os_hvp.items():
1148
        for hv_name, hv_params in os_hvp.items():
1149
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1150
          # we need to fill in the new os_hvp on top of the actual hv_p
1151
          cluster_defaults = self.new_hvparams.get(hv_name, {})
1152
          new_osp = objects.FillDict(cluster_defaults, hv_params)
1153
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1154
          hv_class.CheckParameterSyntax(new_osp)
1155
          CheckHVParams(self, node_uuids, hv_name, new_osp)
1156

    
1157
    if self.op.default_iallocator:
1158
      alloc_script = utils.FindFile(self.op.default_iallocator,
1159
                                    constants.IALLOCATOR_SEARCH_PATH,
1160
                                    os.path.isfile)
1161
      if alloc_script is None:
1162
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
1163
                                   " specified" % self.op.default_iallocator,
1164
                                   errors.ECODE_INVAL)
1165

    
1166
  def _CheckDiskTemplateConsistency(self):
1167
    """Check whether the disk templates that are going to be disabled
1168
       are still in use by some instances.
1169

1170
    """
1171
    if self.op.enabled_disk_templates:
1172
      cluster = self.cfg.GetClusterInfo()
1173
      instances = self.cfg.GetAllInstancesInfo()
1174

    
1175
      disk_templates_to_remove = set(cluster.enabled_disk_templates) \
1176
        - set(self.op.enabled_disk_templates)
1177
      for instance in instances.itervalues():
1178
        if instance.disk_template in disk_templates_to_remove:
1179
          raise errors.OpPrereqError("Cannot disable disk template '%s',"
1180
                                     " because instance '%s' is using it." %
1181
                                     (instance.disk_template, instance.name))
1182

    
1183
  def _SetVgName(self, feedback_fn):
1184
    """Determines and sets the new volume group name.
1185

1186
    """
1187
    if self.op.vg_name is not None:
1188
      new_volume = self.op.vg_name
1189
      if not new_volume:
1190
        new_volume = None
1191
      if new_volume != self.cfg.GetVGName():
1192
        self.cfg.SetVGName(new_volume)
1193
      else:
1194
        feedback_fn("Cluster LVM configuration already in desired"
1195
                    " state, not changing")
1196

    
1197
  def _SetFileStorageDir(self, feedback_fn):
1198
    """Set the file storage directory.
1199

1200
    """
1201
    if self.op.file_storage_dir is not None:
1202
      if self.cluster.file_storage_dir == self.op.file_storage_dir:
1203
        feedback_fn("Global file storage dir already set to value '%s'"
1204
                    % self.cluster.file_storage_dir)
1205
      else:
1206
        self.cluster.file_storage_dir = self.op.file_storage_dir
1207

    
1208
  def _SetDrbdHelper(self, feedback_fn):
1209
    """Set the DRBD usermode helper.
1210

1211
    """
1212
    if self.op.drbd_helper is not None:
1213
      if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
1214
        feedback_fn("Note that you specified a drbd user helper, but did not"
1215
                    " enable the drbd disk template.")
1216
      new_helper = self.op.drbd_helper
1217
      if not new_helper:
1218
        new_helper = None
1219
      if new_helper != self.cfg.GetDRBDHelper():
1220
        self.cfg.SetDRBDHelper(new_helper)
1221
      else:
1222
        feedback_fn("Cluster DRBD helper already in desired state,"
1223
                    " not changing")
1224

    
1225
  def Exec(self, feedback_fn):
1226
    """Change the parameters of the cluster.
1227

1228
    """
1229
    if self.op.enabled_disk_templates:
1230
      self.cluster.enabled_disk_templates = \
1231
        list(self.op.enabled_disk_templates)
1232

    
1233
    self._SetVgName(feedback_fn)
1234
    self._SetFileStorageDir(feedback_fn)
1235
    self._SetDrbdHelper(feedback_fn)
1236

    
1237
    if self.op.hvparams:
1238
      self.cluster.hvparams = self.new_hvparams
1239
    if self.op.os_hvp:
1240
      self.cluster.os_hvp = self.new_os_hvp
1241
    if self.op.enabled_hypervisors is not None:
1242
      self.cluster.hvparams = self.new_hvparams
1243
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1244
    if self.op.beparams:
1245
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1246
    if self.op.nicparams:
1247
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1248
    if self.op.ipolicy:
1249
      self.cluster.ipolicy = self.new_ipolicy
1250
    if self.op.osparams:
1251
      self.cluster.osparams = self.new_osp
1252
    if self.op.ndparams:
1253
      self.cluster.ndparams = self.new_ndparams
1254
    if self.op.diskparams:
1255
      self.cluster.diskparams = self.new_diskparams
1256
    if self.op.hv_state:
1257
      self.cluster.hv_state_static = self.new_hv_state
1258
    if self.op.disk_state:
1259
      self.cluster.disk_state_static = self.new_disk_state
1260

    
1261
    if self.op.candidate_pool_size is not None:
1262
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1263
      # we need to update the pool size here, otherwise the save will fail
1264
      AdjustCandidatePool(self, [])
1265

    
1266
    if self.op.maintain_node_health is not None:
1267
      if self.op.maintain_node_health and not constants.ENABLE_CONFD:
1268
        feedback_fn("Note: CONFD was disabled at build time, node health"
1269
                    " maintenance is not useful (still enabling it)")
1270
      self.cluster.maintain_node_health = self.op.maintain_node_health
1271

    
1272
    if self.op.modify_etc_hosts is not None:
1273
      self.cluster.modify_etc_hosts = self.op.modify_etc_hosts
1274

    
1275
    if self.op.prealloc_wipe_disks is not None:
1276
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
1277

    
1278
    if self.op.add_uids is not None:
1279
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
1280

    
1281
    if self.op.remove_uids is not None:
1282
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
1283

    
1284
    if self.op.uid_pool is not None:
1285
      self.cluster.uid_pool = self.op.uid_pool
1286

    
1287
    if self.op.default_iallocator is not None:
1288
      self.cluster.default_iallocator = self.op.default_iallocator
1289

    
1290
    if self.op.reserved_lvs is not None:
1291
      self.cluster.reserved_lvs = self.op.reserved_lvs
1292

    
1293
    if self.op.use_external_mip_script is not None:
1294
      self.cluster.use_external_mip_script = self.op.use_external_mip_script
1295

    
1296
    def helper_os(aname, mods, desc):
1297
      desc += " OS list"
1298
      lst = getattr(self.cluster, aname)
1299
      for key, val in mods:
1300
        if key == constants.DDM_ADD:
1301
          if val in lst:
1302
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1303
          else:
1304
            lst.append(val)
1305
        elif key == constants.DDM_REMOVE:
1306
          if val in lst:
1307
            lst.remove(val)
1308
          else:
1309
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1310
        else:
1311
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
1312

    
1313
    if self.op.hidden_os:
1314
      helper_os("hidden_os", self.op.hidden_os, "hidden")
1315

    
1316
    if self.op.blacklisted_os:
1317
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1318

    
1319
    if self.op.master_netdev:
1320
      master_params = self.cfg.GetMasterNetworkParameters()
1321
      ems = self.cfg.GetUseExternalMipScript()
1322
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
1323
                  self.cluster.master_netdev)
1324
      result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
1325
                                                       master_params, ems)
1326
      if not self.op.force:
1327
        result.Raise("Could not disable the master ip")
1328
      else:
1329
        if result.fail_msg:
1330
          msg = ("Could not disable the master ip (continuing anyway): %s" %
1331
                 result.fail_msg)
1332
          feedback_fn(msg)
1333
      feedback_fn("Changing master_netdev from %s to %s" %
1334
                  (master_params.netdev, self.op.master_netdev))
1335
      self.cluster.master_netdev = self.op.master_netdev
1336

    
1337
    if self.op.master_netmask:
1338
      master_params = self.cfg.GetMasterNetworkParameters()
1339
      feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1340
      result = self.rpc.call_node_change_master_netmask(
1341
                 master_params.uuid, master_params.netmask,
1342
                 self.op.master_netmask, master_params.ip,
1343
                 master_params.netdev)
1344
      result.Warn("Could not change the master IP netmask", feedback_fn)
1345
      self.cluster.master_netmask = self.op.master_netmask
1346

    
1347
    self.cfg.Update(self.cluster, feedback_fn)
1348

    
1349
    if self.op.master_netdev:
1350
      master_params = self.cfg.GetMasterNetworkParameters()
1351
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
1352
                  self.op.master_netdev)
1353
      ems = self.cfg.GetUseExternalMipScript()
1354
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
1355
                                                     master_params, ems)
1356
      result.Warn("Could not re-enable the master ip on the master,"
1357
                  " please restart manually", self.LogWarning)
1358

    
1359

    
1360
class LUClusterVerify(NoHooksLU):
1361
  """Submits all jobs necessary to verify the cluster.
1362

1363
  """
1364
  REQ_BGL = False
1365

    
1366
  def ExpandNames(self):
1367
    self.needed_locks = {}
1368

    
1369
  def Exec(self, feedback_fn):
1370
    jobs = []
1371

    
1372
    if self.op.group_name:
1373
      groups = [self.op.group_name]
1374
      depends_fn = lambda: None
1375
    else:
1376
      groups = self.cfg.GetNodeGroupList()
1377

    
1378
      # Verify global configuration
1379
      jobs.append([
1380
        opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1381
        ])
1382

    
1383
      # Always depend on global verification
1384
      depends_fn = lambda: [(-len(jobs), [])]
1385

    
1386
    jobs.extend(
1387
      [opcodes.OpClusterVerifyGroup(group_name=group,
1388
                                    ignore_errors=self.op.ignore_errors,
1389
                                    depends=depends_fn())]
1390
      for group in groups)
1391

    
1392
    # Fix up all parameters
1393
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1394
      op.debug_simulate_errors = self.op.debug_simulate_errors
1395
      op.verbose = self.op.verbose
1396
      op.error_codes = self.op.error_codes
1397
      try:
1398
        op.skip_checks = self.op.skip_checks
1399
      except AttributeError:
1400
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1401

    
1402
    return ResultWithJobs(jobs)
1403

    
1404

    
1405
class _VerifyErrors(object):
1406
  """Mix-in for cluster/group verify LUs.
1407

1408
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1409
  self.op and self._feedback_fn to be available.)
1410

1411
  """
1412

    
1413
  ETYPE_FIELD = "code"
1414
  ETYPE_ERROR = "ERROR"
1415
  ETYPE_WARNING = "WARNING"
1416

    
1417
  def _Error(self, ecode, item, msg, *args, **kwargs):
1418
    """Format an error message.
1419

1420
    Based on the opcode's error_codes parameter, either format a
1421
    parseable error code, or a simpler error string.
1422

1423
    This must be called only from Exec and functions called from Exec.
1424

1425
    """
1426
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1427
    itype, etxt, _ = ecode
1428
    # If the error code is in the list of ignored errors, demote the error to a
1429
    # warning
1430
    if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1431
      ltype = self.ETYPE_WARNING
1432
    # first complete the msg
1433
    if args:
1434
      msg = msg % args
1435
    # then format the whole message
1436
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1437
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1438
    else:
1439
      if item:
1440
        item = " " + item
1441
      else:
1442
        item = ""
1443
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1444
    # and finally report it via the feedback_fn
1445
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1446
    # do not mark the operation as failed for WARN cases only
1447
    if ltype == self.ETYPE_ERROR:
1448
      self.bad = True
1449

    
1450
  def _ErrorIf(self, cond, *args, **kwargs):
1451
    """Log an error message if the passed condition is True.
1452

1453
    """
1454
    if (bool(cond)
1455
        or self.op.debug_simulate_errors): # pylint: disable=E1101
1456
      self._Error(*args, **kwargs)
1457

    
1458

    
1459
def _VerifyCertificate(filename):
1460
  """Verifies a certificate for L{LUClusterVerifyConfig}.
1461

1462
  @type filename: string
1463
  @param filename: Path to PEM file
1464

1465
  """
1466
  try:
1467
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1468
                                           utils.ReadFile(filename))
1469
  except Exception, err: # pylint: disable=W0703
1470
    return (LUClusterVerifyConfig.ETYPE_ERROR,
1471
            "Failed to load X509 certificate %s: %s" % (filename, err))
1472

    
1473
  (errcode, msg) = \
1474
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1475
                                constants.SSL_CERT_EXPIRATION_ERROR)
1476

    
1477
  if msg:
1478
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1479
  else:
1480
    fnamemsg = None
1481

    
1482
  if errcode is None:
1483
    return (None, fnamemsg)
1484
  elif errcode == utils.CERT_WARNING:
1485
    return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1486
  elif errcode == utils.CERT_ERROR:
1487
    return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1488

    
1489
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1490

    
1491

    
1492
def _GetAllHypervisorParameters(cluster, instances):
1493
  """Compute the set of all hypervisor parameters.
1494

1495
  @type cluster: L{objects.Cluster}
1496
  @param cluster: the cluster object
1497
  @param instances: list of L{objects.Instance}
1498
  @param instances: additional instances from which to obtain parameters
1499
  @rtype: list of (origin, hypervisor, parameters)
1500
  @return: a list with all parameters found, indicating the hypervisor they
1501
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1502

1503
  """
1504
  hvp_data = []
1505

    
1506
  for hv_name in cluster.enabled_hypervisors:
1507
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1508

    
1509
  for os_name, os_hvp in cluster.os_hvp.items():
1510
    for hv_name, hv_params in os_hvp.items():
1511
      if hv_params:
1512
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1513
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1514

    
1515
  # TODO: collapse identical parameter values in a single one
1516
  for instance in instances:
1517
    if instance.hvparams:
1518
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1519
                       cluster.FillHV(instance)))
1520

    
1521
  return hvp_data
1522

    
1523

    
1524
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1525
  """Verifies the cluster config.
1526

1527
  """
1528
  REQ_BGL = False
1529

    
1530
  def _VerifyHVP(self, hvp_data):
1531
    """Verifies locally the syntax of the hypervisor parameters.
1532

1533
    """
1534
    for item, hv_name, hv_params in hvp_data:
1535
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1536
             (item, hv_name))
1537
      try:
1538
        hv_class = hypervisor.GetHypervisorClass(hv_name)
1539
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1540
        hv_class.CheckParameterSyntax(hv_params)
1541
      except errors.GenericError, err:
1542
        self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1543

    
1544
  def ExpandNames(self):
1545
    self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1546
    self.share_locks = ShareAll()
1547

    
1548
  def CheckPrereq(self):
1549
    """Check prerequisites.
1550

1551
    """
1552
    # Retrieve all information
1553
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1554
    self.all_node_info = self.cfg.GetAllNodesInfo()
1555
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1556

    
1557
  def Exec(self, feedback_fn):
1558
    """Verify integrity of cluster, performing various test on nodes.
1559

1560
    """
1561
    self.bad = False
1562
    self._feedback_fn = feedback_fn
1563

    
1564
    feedback_fn("* Verifying cluster config")
1565

    
1566
    for msg in self.cfg.VerifyConfig():
1567
      self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1568

    
1569
    feedback_fn("* Verifying cluster certificate files")
1570

    
1571
    for cert_filename in pathutils.ALL_CERT_FILES:
1572
      (errcode, msg) = _VerifyCertificate(cert_filename)
1573
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1574

    
1575
    self._ErrorIf(not utils.CanRead(constants.LUXID_USER,
1576
                                    pathutils.NODED_CERT_FILE),
1577
                  constants.CV_ECLUSTERCERT,
1578
                  None,
1579
                  pathutils.NODED_CERT_FILE + " must be accessible by the " +
1580
                    constants.LUXID_USER + " user")
1581

    
1582
    feedback_fn("* Verifying hypervisor parameters")
1583

    
1584
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1585
                                                self.all_inst_info.values()))
1586

    
1587
    feedback_fn("* Verifying all nodes belong to an existing group")
1588

    
1589
    # We do this verification here because, should this bogus circumstance
1590
    # occur, it would never be caught by VerifyGroup, which only acts on
1591
    # nodes/instances reachable from existing node groups.
1592

    
1593
    dangling_nodes = set(node for node in self.all_node_info.values()
1594
                         if node.group not in self.all_group_info)
1595

    
1596
    dangling_instances = {}
1597
    no_node_instances = []
1598

    
1599
    for inst in self.all_inst_info.values():
1600
      if inst.primary_node in [node.uuid for node in dangling_nodes]:
1601
        dangling_instances.setdefault(inst.primary_node, []).append(inst)
1602
      elif inst.primary_node not in self.all_node_info:
1603
        no_node_instances.append(inst)
1604

    
1605
    pretty_dangling = [
1606
        "%s (%s)" %
1607
        (node.name,
1608
         utils.CommaJoin(inst.name for
1609
                         inst in dangling_instances.get(node.uuid, [])))
1610
        for node in dangling_nodes]
1611

    
1612
    self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1613
                  None,
1614
                  "the following nodes (and their instances) belong to a non"
1615
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1616

    
1617
    self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1618
                  None,
1619
                  "the following instances have a non-existing primary-node:"
1620
                  " %s", utils.CommaJoin(inst.name for
1621
                                         inst in no_node_instances))
1622

    
1623
    return not self.bad
1624

    
1625

    
1626
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1627
  """Verifies the status of a node group.
1628

1629
  """
1630
  HPATH = "cluster-verify"
1631
  HTYPE = constants.HTYPE_CLUSTER
1632
  REQ_BGL = False
1633

    
1634
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1635

    
1636
  class NodeImage(object):
1637
    """A class representing the logical and physical status of a node.
1638

1639
    @type uuid: string
1640
    @ivar uuid: the node UUID to which this object refers
1641
    @ivar volumes: a structure as returned from
1642
        L{ganeti.backend.GetVolumeList} (runtime)
1643
    @ivar instances: a list of running instances (runtime)
1644
    @ivar pinst: list of configured primary instances (config)
1645
    @ivar sinst: list of configured secondary instances (config)
1646
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1647
        instances for which this node is secondary (config)
1648
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1649
    @ivar dfree: free disk, as reported by the node (runtime)
1650
    @ivar offline: the offline status (config)
1651
    @type rpc_fail: boolean
1652
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1653
        not whether the individual keys were correct) (runtime)
1654
    @type lvm_fail: boolean
1655
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1656
    @type hyp_fail: boolean
1657
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1658
    @type ghost: boolean
1659
    @ivar ghost: whether this is a known node or not (config)
1660
    @type os_fail: boolean
1661
    @ivar os_fail: whether the RPC call didn't return valid OS data
1662
    @type oslist: list
1663
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1664
    @type vm_capable: boolean
1665
    @ivar vm_capable: whether the node can host instances
1666
    @type pv_min: float
1667
    @ivar pv_min: size in MiB of the smallest PVs
1668
    @type pv_max: float
1669
    @ivar pv_max: size in MiB of the biggest PVs
1670

1671
    """
1672
    def __init__(self, offline=False, uuid=None, vm_capable=True):
1673
      self.uuid = uuid
1674
      self.volumes = {}
1675
      self.instances = []
1676
      self.pinst = []
1677
      self.sinst = []
1678
      self.sbp = {}
1679
      self.mfree = 0
1680
      self.dfree = 0
1681
      self.offline = offline
1682
      self.vm_capable = vm_capable
1683
      self.rpc_fail = False
1684
      self.lvm_fail = False
1685
      self.hyp_fail = False
1686
      self.ghost = False
1687
      self.os_fail = False
1688
      self.oslist = {}
1689
      self.pv_min = None
1690
      self.pv_max = None
1691

    
1692
  def ExpandNames(self):
1693
    # This raises errors.OpPrereqError on its own:
1694
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1695

    
1696
    # Get instances in node group; this is unsafe and needs verification later
1697
    inst_uuids = \
1698
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1699

    
1700
    self.needed_locks = {
1701
      locking.LEVEL_INSTANCE: self.cfg.GetInstanceNames(inst_uuids),
1702
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1703
      locking.LEVEL_NODE: [],
1704

    
1705
      # This opcode is run by watcher every five minutes and acquires all nodes
1706
      # for a group. It doesn't run for a long time, so it's better to acquire
1707
      # the node allocation lock as well.
1708
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1709
      }
1710

    
1711
    self.share_locks = ShareAll()
1712

    
1713
  def DeclareLocks(self, level):
1714
    if level == locking.LEVEL_NODE:
1715
      # Get members of node group; this is unsafe and needs verification later
1716
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1717

    
1718
      # In Exec(), we warn about mirrored instances that have primary and
1719
      # secondary living in separate node groups. To fully verify that
1720
      # volumes for these instances are healthy, we will need to do an
1721
      # extra call to their secondaries. We ensure here those nodes will
1722
      # be locked.
1723
      for inst_name in self.owned_locks(locking.LEVEL_INSTANCE):
1724
        # Important: access only the instances whose lock is owned
1725
        instance = self.cfg.GetInstanceInfoByName(inst_name)
1726
        if instance.disk_template in constants.DTS_INT_MIRROR:
1727
          nodes.update(instance.secondary_nodes)
1728

    
1729
      self.needed_locks[locking.LEVEL_NODE] = nodes
1730

    
1731
  def CheckPrereq(self):
1732
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1733
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1734

    
1735
    group_node_uuids = set(self.group_info.members)
1736
    group_inst_uuids = \
1737
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1738

    
1739
    unlocked_node_uuids = \
1740
        group_node_uuids.difference(self.owned_locks(locking.LEVEL_NODE))
1741

    
1742
    unlocked_inst_uuids = \
1743
        group_inst_uuids.difference(
1744
          [self.cfg.GetInstanceInfoByName(name).uuid
1745
           for name in self.owned_locks(locking.LEVEL_INSTANCE)])
1746

    
1747
    if unlocked_node_uuids:
1748
      raise errors.OpPrereqError(
1749
        "Missing lock for nodes: %s" %
1750
        utils.CommaJoin(self.cfg.GetNodeNames(unlocked_node_uuids)),
1751
        errors.ECODE_STATE)
1752

    
1753
    if unlocked_inst_uuids:
1754
      raise errors.OpPrereqError(
1755
        "Missing lock for instances: %s" %
1756
        utils.CommaJoin(self.cfg.GetInstanceNames(unlocked_inst_uuids)),
1757
        errors.ECODE_STATE)
1758

    
1759
    self.all_node_info = self.cfg.GetAllNodesInfo()
1760
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1761

    
1762
    self.my_node_uuids = group_node_uuids
1763
    self.my_node_info = dict((node_uuid, self.all_node_info[node_uuid])
1764
                             for node_uuid in group_node_uuids)
1765

    
1766
    self.my_inst_uuids = group_inst_uuids
1767
    self.my_inst_info = dict((inst_uuid, self.all_inst_info[inst_uuid])
1768
                             for inst_uuid in group_inst_uuids)
1769

    
1770
    # We detect here the nodes that will need the extra RPC calls for verifying
1771
    # split LV volumes; they should be locked.
1772
    extra_lv_nodes = set()
1773

    
1774
    for inst in self.my_inst_info.values():
1775
      if inst.disk_template in constants.DTS_INT_MIRROR:
1776
        for nuuid in inst.all_nodes:
1777
          if self.all_node_info[nuuid].group != self.group_uuid:
1778
            extra_lv_nodes.add(nuuid)
1779

    
1780
    unlocked_lv_nodes = \
1781
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1782

    
1783
    if unlocked_lv_nodes:
1784
      raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1785
                                 utils.CommaJoin(unlocked_lv_nodes),
1786
                                 errors.ECODE_STATE)
1787
    self.extra_lv_nodes = list(extra_lv_nodes)
1788

    
1789
  def _VerifyNode(self, ninfo, nresult):
1790
    """Perform some basic validation on data returned from a node.
1791

1792
      - check the result data structure is well formed and has all the
1793
        mandatory fields
1794
      - check ganeti version
1795

1796
    @type ninfo: L{objects.Node}
1797
    @param ninfo: the node to check
1798
    @param nresult: the results from the node
1799
    @rtype: boolean
1800
    @return: whether overall this call was successful (and we can expect
1801
         reasonable values in the respose)
1802

1803
    """
1804
    # main result, nresult should be a non-empty dict
1805
    test = not nresult or not isinstance(nresult, dict)
1806
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1807
                  "unable to verify node: no data returned")
1808
    if test:
1809
      return False
1810

    
1811
    # compares ganeti version
1812
    local_version = constants.PROTOCOL_VERSION
1813
    remote_version = nresult.get("version", None)
1814
    test = not (remote_version and
1815
                isinstance(remote_version, (list, tuple)) and
1816
                len(remote_version) == 2)
1817
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1818
                  "connection to node returned invalid data")
1819
    if test:
1820
      return False
1821

    
1822
    test = local_version != remote_version[0]
1823
    self._ErrorIf(test, constants.CV_ENODEVERSION, ninfo.name,
1824
                  "incompatible protocol versions: master %s,"
1825
                  " node %s", local_version, remote_version[0])
1826
    if test:
1827
      return False
1828

    
1829
    # node seems compatible, we can actually try to look into its results
1830

    
1831
    # full package version
1832
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1833
                  constants.CV_ENODEVERSION, ninfo.name,
1834
                  "software version mismatch: master %s, node %s",
1835
                  constants.RELEASE_VERSION, remote_version[1],
1836
                  code=self.ETYPE_WARNING)
1837

    
1838
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1839
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1840
      for hv_name, hv_result in hyp_result.iteritems():
1841
        test = hv_result is not None
1842
        self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
1843
                      "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1844

    
1845
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1846
    if ninfo.vm_capable and isinstance(hvp_result, list):
1847
      for item, hv_name, hv_result in hvp_result:
1848
        self._ErrorIf(True, constants.CV_ENODEHV, ninfo.name,
1849
                      "hypervisor %s parameter verify failure (source %s): %s",
1850
                      hv_name, item, hv_result)
1851

    
1852
    test = nresult.get(constants.NV_NODESETUP,
1853
                       ["Missing NODESETUP results"])
1854
    self._ErrorIf(test, constants.CV_ENODESETUP, ninfo.name,
1855
                  "node setup error: %s", "; ".join(test))
1856

    
1857
    return True
1858

    
1859
  def _VerifyNodeTime(self, ninfo, nresult,
1860
                      nvinfo_starttime, nvinfo_endtime):
1861
    """Check the node time.
1862

1863
    @type ninfo: L{objects.Node}
1864
    @param ninfo: the node to check
1865
    @param nresult: the remote results for the node
1866
    @param nvinfo_starttime: the start time of the RPC call
1867
    @param nvinfo_endtime: the end time of the RPC call
1868

1869
    """
1870
    ntime = nresult.get(constants.NV_TIME, None)
1871
    try:
1872
      ntime_merged = utils.MergeTime(ntime)
1873
    except (ValueError, TypeError):
1874
      self._ErrorIf(True, constants.CV_ENODETIME, ninfo.name,
1875
                    "Node returned invalid time")
1876
      return
1877

    
1878
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1879
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1880
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1881
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1882
    else:
1883
      ntime_diff = None
1884

    
1885
    self._ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, ninfo.name,
1886
                  "Node time diverges by at least %s from master node time",
1887
                  ntime_diff)
1888

    
1889
  def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1890
    """Check the node LVM results and update info for cross-node checks.
1891

1892
    @type ninfo: L{objects.Node}
1893
    @param ninfo: the node to check
1894
    @param nresult: the remote results for the node
1895
    @param vg_name: the configured VG name
1896
    @type nimg: L{NodeImage}
1897
    @param nimg: node image
1898

1899
    """
1900
    if vg_name is None:
1901
      return
1902

    
1903
    # checks vg existence and size > 20G
1904
    vglist = nresult.get(constants.NV_VGLIST, None)
1905
    test = not vglist
1906
    self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
1907
                  "unable to check volume groups")
1908
    if not test:
1909
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1910
                                            constants.MIN_VG_SIZE)
1911
      self._ErrorIf(vgstatus, constants.CV_ENODELVM, ninfo.name, vgstatus)
1912

    
1913
    # Check PVs
1914
    (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1915
    for em in errmsgs:
1916
      self._Error(constants.CV_ENODELVM, ninfo.name, em)
1917
    if pvminmax is not None:
1918
      (nimg.pv_min, nimg.pv_max) = pvminmax
1919

    
1920
  def _VerifyGroupDRBDVersion(self, node_verify_infos):
1921
    """Check cross-node DRBD version consistency.
1922

1923
    @type node_verify_infos: dict
1924
    @param node_verify_infos: infos about nodes as returned from the
1925
      node_verify call.
1926

1927
    """
1928
    node_versions = {}
1929
    for node_uuid, ndata in node_verify_infos.items():
1930
      nresult = ndata.payload
1931
      if nresult:
1932
        version = nresult.get(constants.NV_DRBDVERSION, "Missing DRBD version")
1933
        node_versions[node_uuid] = version
1934

    
1935
    if len(set(node_versions.values())) > 1:
1936
      for node_uuid, version in sorted(node_versions.items()):
1937
        msg = "DRBD version mismatch: %s" % version
1938
        self._Error(constants.CV_ENODEDRBDHELPER, node_uuid, msg,
1939
                    code=self.ETYPE_WARNING)
1940

    
1941
  def _VerifyGroupLVM(self, node_image, vg_name):
1942
    """Check cross-node consistency in LVM.
1943

1944
    @type node_image: dict
1945
    @param node_image: info about nodes, mapping from node to names to
1946
      L{NodeImage} objects
1947
    @param vg_name: the configured VG name
1948

1949
    """
1950
    if vg_name is None:
1951
      return
1952

    
1953
    # Only exclusive storage needs this kind of checks
1954
    if not self._exclusive_storage:
1955
      return
1956

    
1957
    # exclusive_storage wants all PVs to have the same size (approximately),
1958
    # if the smallest and the biggest ones are okay, everything is fine.
1959
    # pv_min is None iff pv_max is None
1960
    vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1961
    if not vals:
1962
      return
1963
    (pvmin, minnode_uuid) = min((ni.pv_min, ni.uuid) for ni in vals)
1964
    (pvmax, maxnode_uuid) = max((ni.pv_max, ni.uuid) for ni in vals)
1965
    bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1966
    self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1967
                  "PV sizes differ too much in the group; smallest (%s MB) is"
1968
                  " on %s, biggest (%s MB) is on %s",
1969
                  pvmin, self.cfg.GetNodeName(minnode_uuid),
1970
                  pvmax, self.cfg.GetNodeName(maxnode_uuid))
1971

    
1972
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1973
    """Check the node bridges.
1974

1975
    @type ninfo: L{objects.Node}
1976
    @param ninfo: the node to check
1977
    @param nresult: the remote results for the node
1978
    @param bridges: the expected list of bridges
1979

1980
    """
1981
    if not bridges:
1982
      return
1983

    
1984
    missing = nresult.get(constants.NV_BRIDGES, None)
1985
    test = not isinstance(missing, list)
1986
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1987
                  "did not return valid bridge information")
1988
    if not test:
1989
      self._ErrorIf(bool(missing), constants.CV_ENODENET, ninfo.name,
1990
                    "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1991

    
1992
  def _VerifyNodeUserScripts(self, ninfo, nresult):
1993
    """Check the results of user scripts presence and executability on the node
1994

1995
    @type ninfo: L{objects.Node}
1996
    @param ninfo: the node to check
1997
    @param nresult: the remote results for the node
1998

1999
    """
2000
    test = not constants.NV_USERSCRIPTS in nresult
2001
    self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
2002
                  "did not return user scripts information")
2003

    
2004
    broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
2005
    if not test:
2006
      self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
2007
                    "user scripts not present or not executable: %s" %
2008
                    utils.CommaJoin(sorted(broken_scripts)))
2009

    
2010
  def _VerifyNodeNetwork(self, ninfo, nresult):
2011
    """Check the node network connectivity results.
2012

2013
    @type ninfo: L{objects.Node}
2014
    @param ninfo: the node to check
2015
    @param nresult: the remote results for the node
2016

2017
    """
2018
    test = constants.NV_NODELIST not in nresult
2019
    self._ErrorIf(test, constants.CV_ENODESSH, ninfo.name,
2020
                  "node hasn't returned node ssh connectivity data")
2021
    if not test:
2022
      if nresult[constants.NV_NODELIST]:
2023
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
2024
          self._ErrorIf(True, constants.CV_ENODESSH, ninfo.name,
2025
                        "ssh communication with node '%s': %s", a_node, a_msg)
2026

    
2027
    test = constants.NV_NODENETTEST not in nresult
2028
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
2029
                  "node hasn't returned node tcp connectivity data")
2030
    if not test:
2031
      if nresult[constants.NV_NODENETTEST]:
2032
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
2033
        for anode in nlist:
2034
          self._ErrorIf(True, constants.CV_ENODENET, ninfo.name,
2035
                        "tcp communication with node '%s': %s",
2036
                        anode, nresult[constants.NV_NODENETTEST][anode])
2037

    
2038
    test = constants.NV_MASTERIP not in nresult
2039
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
2040
                  "node hasn't returned node master IP reachability data")
2041
    if not test:
2042
      if not nresult[constants.NV_MASTERIP]:
2043
        if ninfo.uuid == self.master_node:
2044
          msg = "the master node cannot reach the master IP (not configured?)"
2045
        else:
2046
          msg = "cannot reach the master IP"
2047
        self._ErrorIf(True, constants.CV_ENODENET, ninfo.name, msg)
2048

    
2049
  def _VerifyInstance(self, instance, node_image, diskstatus):
2050
    """Verify an instance.
2051

2052
    This function checks to see if the required block devices are
2053
    available on the instance's node, and that the nodes are in the correct
2054
    state.
2055

2056
    """
2057
    pnode_uuid = instance.primary_node
2058
    pnode_img = node_image[pnode_uuid]
2059
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
2060

    
2061
    node_vol_should = {}
2062
    instance.MapLVsByNode(node_vol_should)
2063

    
2064
    cluster = self.cfg.GetClusterInfo()
2065
    ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2066
                                                            self.group_info)
2067
    err = ComputeIPolicyInstanceViolation(ipolicy, instance, self.cfg)
2068
    self._ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance.name,
2069
                  utils.CommaJoin(err), code=self.ETYPE_WARNING)
2070

    
2071
    for node_uuid in node_vol_should:
2072
      n_img = node_image[node_uuid]
2073
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
2074
        # ignore missing volumes on offline or broken nodes
2075
        continue
2076
      for volume in node_vol_should[node_uuid]:
2077
        test = volume not in n_img.volumes
2078
        self._ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance.name,
2079
                      "volume %s missing on node %s", volume,
2080
                      self.cfg.GetNodeName(node_uuid))
2081

    
2082
    if instance.admin_state == constants.ADMINST_UP:
2083
      test = instance.uuid not in pnode_img.instances and not pnode_img.offline
2084
      self._ErrorIf(test, constants.CV_EINSTANCEDOWN, instance.name,
2085
                    "instance not running on its primary node %s",
2086
                     self.cfg.GetNodeName(pnode_uuid))
2087
      self._ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE,
2088
                    instance.name, "instance is marked as running and lives on"
2089
                    " offline node %s", self.cfg.GetNodeName(pnode_uuid))
2090

    
2091
    diskdata = [(nname, success, status, idx)
2092
                for (nname, disks) in diskstatus.items()
2093
                for idx, (success, status) in enumerate(disks)]
2094

    
2095
    for nname, success, bdev_status, idx in diskdata:
2096
      # the 'ghost node' construction in Exec() ensures that we have a
2097
      # node here
2098
      snode = node_image[nname]
2099
      bad_snode = snode.ghost or snode.offline
2100
      self._ErrorIf(instance.disks_active and
2101
                    not success and not bad_snode,
2102
                    constants.CV_EINSTANCEFAULTYDISK, instance.name,
2103
                    "couldn't retrieve status for disk/%s on %s: %s",
2104
                    idx, self.cfg.GetNodeName(nname), bdev_status)
2105

    
2106
      if instance.disks_active and success and \
2107
         (bdev_status.is_degraded or
2108
          bdev_status.ldisk_status != constants.LDS_OKAY):
2109
        msg = "disk/%s on %s" % (idx, self.cfg.GetNodeName(nname))
2110
        if bdev_status.is_degraded:
2111
          msg += " is degraded"
2112
        if bdev_status.ldisk_status != constants.LDS_OKAY:
2113
          msg += "; state is '%s'" % \
2114
                 constants.LDS_NAMES[bdev_status.ldisk_status]
2115

    
2116
        self._Error(constants.CV_EINSTANCEFAULTYDISK, instance.name, msg)
2117

    
2118
    self._ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2119
                  constants.CV_ENODERPC, self.cfg.GetNodeName(pnode_uuid),
2120
                  "instance %s, connection to primary node failed",
2121
                  instance.name)
2122

    
2123
    self._ErrorIf(len(instance.secondary_nodes) > 1,
2124
                  constants.CV_EINSTANCELAYOUT, instance.name,
2125
                  "instance has multiple secondary nodes: %s",
2126
                  utils.CommaJoin(instance.secondary_nodes),
2127
                  code=self.ETYPE_WARNING)
2128

    
2129
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, instance.all_nodes)
2130
    if any(es_flags.values()):
2131
      if instance.disk_template not in constants.DTS_EXCL_STORAGE:
2132
        # Disk template not compatible with exclusive_storage: no instance
2133
        # node should have the flag set
2134
        es_nodes = [n
2135
                    for (n, es) in es_flags.items()
2136
                    if es]
2137
        self._Error(constants.CV_EINSTANCEUNSUITABLENODE, instance.name,
2138
                    "instance has template %s, which is not supported on nodes"
2139
                    " that have exclusive storage set: %s",
2140
                    instance.disk_template,
2141
                    utils.CommaJoin(self.cfg.GetNodeNames(es_nodes)))
2142
      for (idx, disk) in enumerate(instance.disks):
2143
        self._ErrorIf(disk.spindles is None,
2144
                      constants.CV_EINSTANCEMISSINGCFGPARAMETER, instance.name,
2145
                      "number of spindles not configured for disk %s while"
2146
                      " exclusive storage is enabled, try running"
2147
                      " gnt-cluster repair-disk-sizes", idx)
2148

    
2149
    if instance.disk_template in constants.DTS_INT_MIRROR:
2150
      instance_nodes = utils.NiceSort(instance.all_nodes)
2151
      instance_groups = {}
2152

    
2153
      for node_uuid in instance_nodes:
2154
        instance_groups.setdefault(self.all_node_info[node_uuid].group,
2155
                                   []).append(node_uuid)
2156

    
2157
      pretty_list = [
2158
        "%s (group %s)" % (utils.CommaJoin(self.cfg.GetNodeNames(nodes)),
2159
                           groupinfo[group].name)
2160
        # Sort so that we always list the primary node first.
2161
        for group, nodes in sorted(instance_groups.items(),
2162
                                   key=lambda (_, nodes): pnode_uuid in nodes,
2163
                                   reverse=True)]
2164

    
2165
      self._ErrorIf(len(instance_groups) > 1,
2166
                    constants.CV_EINSTANCESPLITGROUPS,
2167
                    instance.name, "instance has primary and secondary nodes in"
2168
                    " different groups: %s", utils.CommaJoin(pretty_list),
2169
                    code=self.ETYPE_WARNING)
2170

    
2171
    inst_nodes_offline = []
2172
    for snode in instance.secondary_nodes:
2173
      s_img = node_image[snode]
2174
      self._ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
2175
                    self.cfg.GetNodeName(snode),
2176
                    "instance %s, connection to secondary node failed",
2177
                    instance.name)
2178

    
2179
      if s_img.offline:
2180
        inst_nodes_offline.append(snode)
2181

    
2182
    # warn that the instance lives on offline nodes
2183
    self._ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE,
2184
                  instance.name, "instance has offline secondary node(s) %s",
2185
                  utils.CommaJoin(self.cfg.GetNodeNames(inst_nodes_offline)))
2186
    # ... or ghost/non-vm_capable nodes
2187
    for node_uuid in instance.all_nodes:
2188
      self._ErrorIf(node_image[node_uuid].ghost, constants.CV_EINSTANCEBADNODE,
2189
                    instance.name, "instance lives on ghost node %s",
2190
                    self.cfg.GetNodeName(node_uuid))
2191
      self._ErrorIf(not node_image[node_uuid].vm_capable,
2192
                    constants.CV_EINSTANCEBADNODE, instance.name,
2193
                    "instance lives on non-vm_capable node %s",
2194
                    self.cfg.GetNodeName(node_uuid))
2195

    
2196
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2197
    """Verify if there are any unknown volumes in the cluster.
2198

2199
    The .os, .swap and backup volumes are ignored. All other volumes are
2200
    reported as unknown.
2201

2202
    @type reserved: L{ganeti.utils.FieldSet}
2203
    @param reserved: a FieldSet of reserved volume names
2204

2205
    """
2206
    for node_uuid, n_img in node_image.items():
2207
      if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
2208
          self.all_node_info[node_uuid].group != self.group_uuid):
2209
        # skip non-healthy nodes
2210
        continue
2211
      for volume in n_img.volumes:
2212
        test = ((node_uuid not in node_vol_should or
2213
                volume not in node_vol_should[node_uuid]) and
2214
                not reserved.Matches(volume))
2215
        self._ErrorIf(test, constants.CV_ENODEORPHANLV,
2216
                      self.cfg.GetNodeName(node_uuid),
2217
                      "volume %s is unknown", volume)
2218

    
2219
  def _VerifyNPlusOneMemory(self, node_image, all_insts):
2220
    """Verify N+1 Memory Resilience.
2221

2222
    Check that if one single node dies we can still start all the
2223
    instances it was primary for.
2224

2225
    """
2226
    cluster_info = self.cfg.GetClusterInfo()
2227
    for node_uuid, n_img in node_image.items():
2228
      # This code checks that every node which is now listed as
2229
      # secondary has enough memory to host all instances it is
2230
      # supposed to should a single other node in the cluster fail.
2231
      # FIXME: not ready for failover to an arbitrary node
2232
      # FIXME: does not support file-backed instances
2233
      # WARNING: we currently take into account down instances as well
2234
      # as up ones, considering that even if they're down someone
2235
      # might want to start them even in the event of a node failure.
2236
      if n_img.offline or \
2237
         self.all_node_info[node_uuid].group != self.group_uuid:
2238
        # we're skipping nodes marked offline and nodes in other groups from
2239
        # the N+1 warning, since most likely we don't have good memory
2240
        # information from them; we already list instances living on such
2241
        # nodes, and that's enough warning
2242
        continue
2243
      #TODO(dynmem): also consider ballooning out other instances
2244
      for prinode, inst_uuids in n_img.sbp.items():
2245
        needed_mem = 0
2246
        for inst_uuid in inst_uuids:
2247
          bep = cluster_info.FillBE(all_insts[inst_uuid])
2248
          if bep[constants.BE_AUTO_BALANCE]:
2249
            needed_mem += bep[constants.BE_MINMEM]
2250
        test = n_img.mfree < needed_mem
2251
        self._ErrorIf(test, constants.CV_ENODEN1,
2252
                      self.cfg.GetNodeName(node_uuid),
2253
                      "not enough memory to accomodate instance failovers"
2254
                      " should node %s fail (%dMiB needed, %dMiB available)",
2255
                      self.cfg.GetNodeName(prinode), needed_mem, n_img.mfree)
2256

    
2257
  def _VerifyFiles(self, nodes, master_node_uuid, all_nvinfo,
2258
                   (files_all, files_opt, files_mc, files_vm)):
2259
    """Verifies file checksums collected from all nodes.
2260

2261
    @param nodes: List of L{objects.Node} objects
2262
    @param master_node_uuid: UUID of master node
2263
    @param all_nvinfo: RPC results
2264

2265
    """
2266
    # Define functions determining which nodes to consider for a file
2267
    files2nodefn = [
2268
      (files_all, None),
2269
      (files_mc, lambda node: (node.master_candidate or
2270
                               node.uuid == master_node_uuid)),
2271
      (files_vm, lambda node: node.vm_capable),
2272
      ]
2273

    
2274
    # Build mapping from filename to list of nodes which should have the file
2275
    nodefiles = {}
2276
    for (files, fn) in files2nodefn:
2277
      if fn is None:
2278
        filenodes = nodes
2279
      else:
2280
        filenodes = filter(fn, nodes)
2281
      nodefiles.update((filename,
2282
                        frozenset(map(operator.attrgetter("uuid"), filenodes)))
2283
                       for filename in files)
2284

    
2285
    assert set(nodefiles) == (files_all | files_mc | files_vm)
2286

    
2287
    fileinfo = dict((filename, {}) for filename in nodefiles)
2288
    ignore_nodes = set()
2289

    
2290
    for node in nodes:
2291
      if node.offline:
2292
        ignore_nodes.add(node.uuid)
2293
        continue
2294

    
2295
      nresult = all_nvinfo[node.uuid]
2296

    
2297
      if nresult.fail_msg or not nresult.payload:
2298
        node_files = None
2299
      else:
2300
        fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
2301
        node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2302
                          for (key, value) in fingerprints.items())
2303
        del fingerprints
2304

    
2305
      test = not (node_files and isinstance(node_files, dict))
2306
      self._ErrorIf(test, constants.CV_ENODEFILECHECK, node.name,
2307
                    "Node did not return file checksum data")
2308
      if test:
2309
        ignore_nodes.add(node.uuid)
2310
        continue
2311

    
2312
      # Build per-checksum mapping from filename to nodes having it
2313
      for (filename, checksum) in node_files.items():
2314
        assert filename in nodefiles
2315
        fileinfo[filename].setdefault(checksum, set()).add(node.uuid)
2316

    
2317
    for (filename, checksums) in fileinfo.items():
2318
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2319

    
2320
      # Nodes having the file
2321
      with_file = frozenset(node_uuid
2322
                            for node_uuids in fileinfo[filename].values()
2323
                            for node_uuid in node_uuids) - ignore_nodes
2324

    
2325
      expected_nodes = nodefiles[filename] - ignore_nodes
2326

    
2327
      # Nodes missing file
2328
      missing_file = expected_nodes - with_file
2329

    
2330
      if filename in files_opt:
2331
        # All or no nodes
2332
        self._ErrorIf(missing_file and missing_file != expected_nodes,
2333
                      constants.CV_ECLUSTERFILECHECK, None,
2334
                      "File %s is optional, but it must exist on all or no"
2335
                      " nodes (not found on %s)",
2336
                      filename,
2337
                      utils.CommaJoin(
2338
                        utils.NiceSort(
2339
                          map(self.cfg.GetNodeName, missing_file))))
2340
      else:
2341
        self._ErrorIf(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2342
                      "File %s is missing from node(s) %s", filename,
2343
                      utils.CommaJoin(
2344
                        utils.NiceSort(
2345
                          map(self.cfg.GetNodeName, missing_file))))
2346

    
2347
        # Warn if a node has a file it shouldn't
2348
        unexpected = with_file - expected_nodes
2349
        self._ErrorIf(unexpected,
2350
                      constants.CV_ECLUSTERFILECHECK, None,
2351
                      "File %s should not exist on node(s) %s",
2352
                      filename, utils.CommaJoin(
2353
                        utils.NiceSort(map(self.cfg.GetNodeName, unexpected))))
2354

    
2355
      # See if there are multiple versions of the file
2356
      test = len(checksums) > 1
2357
      if test:
2358
        variants = ["variant %s on %s" %
2359
                    (idx + 1,
2360
                     utils.CommaJoin(utils.NiceSort(
2361
                       map(self.cfg.GetNodeName, node_uuids))))
2362
                    for (idx, (checksum, node_uuids)) in
2363
                      enumerate(sorted(checksums.items()))]
2364
      else:
2365
        variants = []
2366

    
2367
      self._ErrorIf(test, constants.CV_ECLUSTERFILECHECK, None,
2368
                    "File %s found with %s different checksums (%s)",
2369
                    filename, len(checksums), "; ".join(variants))
2370

    
2371
  def _VerifyNodeDrbdHelper(self, ninfo, nresult, drbd_helper):
2372
    """Verify the drbd helper.
2373

2374
    """
2375
    if drbd_helper:
2376
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2377
      test = (helper_result is None)
2378
      self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2379
                    "no drbd usermode helper returned")
2380
      if helper_result:
2381
        status, payload = helper_result
2382
        test = not status
2383
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2384
                      "drbd usermode helper check unsuccessful: %s", payload)
2385
        test = status and (payload != drbd_helper)
2386
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2387
                      "wrong drbd usermode helper: %s", payload)
2388

    
2389
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2390
                      drbd_map):
2391
    """Verifies and the node DRBD status.
2392

2393
    @type ninfo: L{objects.Node}
2394
    @param ninfo: the node to check
2395
    @param nresult: the remote results for the node
2396
    @param instanceinfo: the dict of instances
2397
    @param drbd_helper: the configured DRBD usermode helper
2398
    @param drbd_map: the DRBD map as returned by
2399
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2400

2401
    """
2402
    self._VerifyNodeDrbdHelper(ninfo, nresult, drbd_helper)
2403

    
2404
    # compute the DRBD minors
2405
    node_drbd = {}
2406
    for minor, inst_uuid in drbd_map[ninfo.uuid].items():
2407
      test = inst_uuid not in instanceinfo
2408
      self._ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2409
                    "ghost instance '%s' in temporary DRBD map", inst_uuid)
2410
        # ghost instance should not be running, but otherwise we
2411
        # don't give double warnings (both ghost instance and
2412
        # unallocated minor in use)
2413
      if test:
2414
        node_drbd[minor] = (inst_uuid, False)
2415
      else:
2416
        instance = instanceinfo[inst_uuid]
2417
        node_drbd[minor] = (inst_uuid, instance.disks_active)
2418

    
2419
    # and now check them
2420
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2421
    test = not isinstance(used_minors, (tuple, list))
2422
    self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2423
                  "cannot parse drbd status file: %s", str(used_minors))
2424
    if test:
2425
      # we cannot check drbd status
2426
      return
2427

    
2428
    for minor, (inst_uuid, must_exist) in node_drbd.items():
2429
      test = minor not in used_minors and must_exist
2430
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2431
                    "drbd minor %d of instance %s is not active", minor,
2432
                    self.cfg.GetInstanceName(inst_uuid))
2433
    for minor in used_minors:
2434
      test = minor not in node_drbd
2435
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2436
                    "unallocated drbd minor %d is in use", minor)
2437

    
2438
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2439
    """Builds the node OS structures.
2440

2441
    @type ninfo: L{objects.Node}
2442
    @param ninfo: the node to check
2443
    @param nresult: the remote results for the node
2444
    @param nimg: the node image object
2445

2446
    """
2447
    remote_os = nresult.get(constants.NV_OSLIST, None)
2448
    test = (not isinstance(remote_os, list) or
2449
            not compat.all(isinstance(v, list) and len(v) == 7
2450
                           for v in remote_os))
2451

    
2452
    self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2453
                  "node hasn't returned valid OS data")
2454

    
2455
    nimg.os_fail = test
2456

    
2457
    if test:
2458
      return
2459

    
2460
    os_dict = {}
2461

    
2462
    for (name, os_path, status, diagnose,
2463
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2464

    
2465
      if name not in os_dict:
2466
        os_dict[name] = []
2467

    
2468
      # parameters is a list of lists instead of list of tuples due to
2469
      # JSON lacking a real tuple type, fix it:
2470
      parameters = [tuple(v) for v in parameters]
2471
      os_dict[name].append((os_path, status, diagnose,
2472
                            set(variants), set(parameters), set(api_ver)))
2473

    
2474
    nimg.oslist = os_dict
2475

    
2476
  def _VerifyNodeOS(self, ninfo, nimg, base):
2477
    """Verifies the node OS list.
2478

2479
    @type ninfo: L{objects.Node}
2480
    @param ninfo: the node to check
2481
    @param nimg: the node image object
2482
    @param base: the 'template' node we match against (e.g. from the master)
2483

2484
    """
2485
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2486

    
2487
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2488
    for os_name, os_data in nimg.oslist.items():
2489
      assert os_data, "Empty OS status for OS %s?!" % os_name
2490
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2491
      self._ErrorIf(not f_status, constants.CV_ENODEOS, ninfo.name,
2492
                    "Invalid OS %s (located at %s): %s",
2493
                    os_name, f_path, f_diag)
2494
      self._ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, ninfo.name,
2495
                    "OS '%s' has multiple entries"
2496
                    " (first one shadows the rest): %s",
2497
                    os_name, utils.CommaJoin([v[0] for v in os_data]))
2498
      # comparisons with the 'base' image
2499
      test = os_name not in base.oslist
2500
      self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2501
                    "Extra OS %s not present on reference node (%s)",
2502
                    os_name, self.cfg.GetNodeName(base.uuid))
2503
      if test:
2504
        continue
2505
      assert base.oslist[os_name], "Base node has empty OS status?"
2506
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2507
      if not b_status:
2508
        # base OS is invalid, skipping
2509
        continue
2510
      for kind, a, b in [("API version", f_api, b_api),
2511
                         ("variants list", f_var, b_var),
2512
                         ("parameters", beautify_params(f_param),
2513
                          beautify_params(b_param))]:
2514
        self._ErrorIf(a != b, constants.CV_ENODEOS, ninfo.name,
2515
                      "OS %s for %s differs from reference node %s:"
2516
                      " [%s] vs. [%s]", kind, os_name,
2517
                      self.cfg.GetNodeName(base.uuid),
2518
                      utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2519

    
2520
    # check any missing OSes
2521
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2522
    self._ErrorIf(missing, constants.CV_ENODEOS, ninfo.name,
2523
                  "OSes present on reference node %s"
2524
                  " but missing on this node: %s",
2525
                  self.cfg.GetNodeName(base.uuid), utils.CommaJoin(missing))
2526

    
2527
  def _VerifyAcceptedFileStoragePaths(self, ninfo, nresult, is_master):
2528
    """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2529

2530
    @type ninfo: L{objects.Node}
2531
    @param ninfo: the node to check
2532
    @param nresult: the remote results for the node
2533
    @type is_master: bool
2534
    @param is_master: Whether node is the master node
2535

2536
    """
2537
    cluster = self.cfg.GetClusterInfo()
2538
    if (is_master and
2539
        (cluster.IsFileStorageEnabled() or
2540
         cluster.IsSharedFileStorageEnabled())):
2541
      try:
2542
        fspaths = nresult[constants.NV_ACCEPTED_STORAGE_PATHS]
2543
      except KeyError:
2544
        # This should never happen
2545
        self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2546
                      "Node did not return forbidden file storage paths")
2547
      else:
2548
        self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2549
                      "Found forbidden file storage paths: %s",
2550
                      utils.CommaJoin(fspaths))
2551
    else:
2552
      self._ErrorIf(constants.NV_ACCEPTED_STORAGE_PATHS in nresult,
2553
                    constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2554
                    "Node should not have returned forbidden file storage"
2555
                    " paths")
2556

    
2557
  def _VerifyStoragePaths(self, ninfo, nresult, file_disk_template,
2558
                          verify_key, error_key):
2559
    """Verifies (file) storage paths.
2560

2561
    @type ninfo: L{objects.Node}
2562
    @param ninfo: the node to check
2563
    @param nresult: the remote results for the node
2564
    @type file_disk_template: string
2565
    @param file_disk_template: file-based disk template, whose directory
2566
        is supposed to be verified
2567
    @type verify_key: string
2568
    @param verify_key: key for the verification map of this file
2569
        verification step
2570
    @param error_key: error key to be added to the verification results
2571
        in case something goes wrong in this verification step
2572

2573
    """
2574
    assert (file_disk_template in
2575
            utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
2576
    cluster = self.cfg.GetClusterInfo()
2577
    if cluster.IsDiskTemplateEnabled(file_disk_template):
2578
      self._ErrorIf(
2579
          verify_key in nresult,
2580
          error_key, ninfo.name,
2581
          "The configured %s storage path is unusable: %s" %
2582
          (file_disk_template, nresult.get(verify_key)))
2583

    
2584
  def _VerifyFileStoragePaths(self, ninfo, nresult):
2585
    """Verifies (file) storage paths.
2586

2587
    @see: C{_VerifyStoragePaths}
2588

2589
    """
2590
    self._VerifyStoragePaths(
2591
        ninfo, nresult, constants.DT_FILE,
2592
        constants.NV_FILE_STORAGE_PATH,
2593
        constants.CV_ENODEFILESTORAGEPATHUNUSABLE)
2594

    
2595
  def _VerifySharedFileStoragePaths(self, ninfo, nresult):
2596
    """Verifies (file) storage paths.
2597

2598
    @see: C{_VerifyStoragePaths}
2599

2600
    """
2601
    self._VerifyStoragePaths(
2602
        ninfo, nresult, constants.DT_SHARED_FILE,
2603
        constants.NV_SHARED_FILE_STORAGE_PATH,
2604
        constants.CV_ENODESHAREDFILESTORAGEPATHUNUSABLE)
2605

    
2606
  def _VerifyOob(self, ninfo, nresult):
2607
    """Verifies out of band functionality of a node.
2608

2609
    @type ninfo: L{objects.Node}
2610
    @param ninfo: the node to check
2611
    @param nresult: the remote results for the node
2612

2613
    """
2614
    # We just have to verify the paths on master and/or master candidates
2615
    # as the oob helper is invoked on the master
2616
    if ((ninfo.master_candidate or ninfo.master_capable) and
2617
        constants.NV_OOB_PATHS in nresult):
2618
      for path_result in nresult[constants.NV_OOB_PATHS]:
2619
        self._ErrorIf(path_result, constants.CV_ENODEOOBPATH,
2620
                      ninfo.name, path_result)
2621

    
2622
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2623
    """Verifies and updates the node volume data.
2624

2625
    This function will update a L{NodeImage}'s internal structures
2626
    with data from the remote call.
2627

2628
    @type ninfo: L{objects.Node}
2629
    @param ninfo: the node to check
2630
    @param nresult: the remote results for the node
2631
    @param nimg: the node image object
2632
    @param vg_name: the configured VG name
2633

2634
    """
2635
    nimg.lvm_fail = True
2636
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2637
    if vg_name is None:
2638
      pass
2639
    elif isinstance(lvdata, basestring):
2640
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2641
                    "LVM problem on node: %s", utils.SafeEncode(lvdata))
2642
    elif not isinstance(lvdata, dict):
2643
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2644
                    "rpc call to node failed (lvlist)")
2645
    else:
2646
      nimg.volumes = lvdata
2647
      nimg.lvm_fail = False
2648

    
2649
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2650
    """Verifies and updates the node instance list.
2651

2652
    If the listing was successful, then updates this node's instance
2653
    list. Otherwise, it marks the RPC call as failed for the instance
2654
    list key.
2655

2656
    @type ninfo: L{objects.Node}
2657
    @param ninfo: the node to check
2658
    @param nresult: the remote results for the node
2659
    @param nimg: the node image object
2660

2661
    """
2662
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2663
    test = not isinstance(idata, list)
2664
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2665
                  "rpc call to node failed (instancelist): %s",
2666
                  utils.SafeEncode(str(idata)))
2667
    if test:
2668
      nimg.hyp_fail = True
2669
    else:
2670
      nimg.instances = [inst.uuid for (_, inst) in
2671
                        self.cfg.GetMultiInstanceInfoByName(idata)]
2672

    
2673
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2674
    """Verifies and computes a node information map
2675

2676
    @type ninfo: L{objects.Node}
2677
    @param ninfo: the node to check
2678
    @param nresult: the remote results for the node
2679
    @param nimg: the node image object
2680
    @param vg_name: the configured VG name
2681

2682
    """
2683
    # try to read free memory (from the hypervisor)
2684
    hv_info = nresult.get(constants.NV_HVINFO, None)
2685
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2686
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2687
                  "rpc call to node failed (hvinfo)")
2688
    if not test:
2689
      try:
2690
        nimg.mfree = int(hv_info["memory_free"])
2691
      except (ValueError, TypeError):
2692
        self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2693
                      "node returned invalid nodeinfo, check hypervisor")
2694

    
2695
    # FIXME: devise a free space model for file based instances as well
2696
    if vg_name is not None:
2697
      test = (constants.NV_VGLIST not in nresult or
2698
              vg_name not in nresult[constants.NV_VGLIST])
2699
      self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
2700
                    "node didn't return data for the volume group '%s'"
2701
                    " - it is either missing or broken", vg_name)
2702
      if not test:
2703
        try:
2704
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2705
        except (ValueError, TypeError):
2706
          self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2707
                        "node returned invalid LVM info, check LVM status")
2708

    
2709
  def _CollectDiskInfo(self, node_uuids, node_image, instanceinfo):
2710
    """Gets per-disk status information for all instances.
2711

2712
    @type node_uuids: list of strings
2713
    @param node_uuids: Node UUIDs
2714
    @type node_image: dict of (UUID, L{objects.Node})
2715
    @param node_image: Node objects
2716
    @type instanceinfo: dict of (UUID, L{objects.Instance})
2717
    @param instanceinfo: Instance objects
2718
    @rtype: {instance: {node: [(succes, payload)]}}
2719
    @return: a dictionary of per-instance dictionaries with nodes as
2720
        keys and disk information as values; the disk information is a
2721
        list of tuples (success, payload)
2722

2723
    """
2724
    node_disks = {}
2725
    node_disks_dev_inst_only = {}
2726
    diskless_instances = set()
2727
    nodisk_instances = set()
2728
    diskless = constants.DT_DISKLESS
2729

    
2730
    for nuuid in node_uuids:
2731
      node_inst_uuids = list(itertools.chain(node_image[nuuid].pinst,
2732
                                             node_image[nuuid].sinst))
2733
      diskless_instances.update(uuid for uuid in node_inst_uuids
2734
                                if instanceinfo[uuid].disk_template == diskless)
2735
      disks = [(inst_uuid, disk)
2736
               for inst_uuid in node_inst_uuids
2737
               for disk in instanceinfo[inst_uuid].disks]
2738

    
2739
      if not disks:
2740
        nodisk_instances.update(uuid for uuid in node_inst_uuids
2741
                                if instanceinfo[uuid].disk_template != diskless)
2742
        # No need to collect data
2743
        continue
2744

    
2745
      node_disks[nuuid] = disks
2746

    
2747
      # _AnnotateDiskParams makes already copies of the disks
2748
      dev_inst_only = []
2749
      for (inst_uuid, dev) in disks:
2750
        (anno_disk,) = AnnotateDiskParams(instanceinfo[inst_uuid], [dev],
2751
                                          self.cfg)
2752
        dev_inst_only.append((anno_disk, instanceinfo[inst_uuid]))
2753

    
2754
      node_disks_dev_inst_only[nuuid] = dev_inst_only
2755

    
2756
    assert len(node_disks) == len(node_disks_dev_inst_only)
2757

    
2758
    # Collect data from all nodes with disks
2759
    result = self.rpc.call_blockdev_getmirrorstatus_multi(
2760
               node_disks.keys(), node_disks_dev_inst_only)
2761

    
2762
    assert len(result) == len(node_disks)
2763

    
2764
    instdisk = {}
2765

    
2766
    for (nuuid, nres) in result.items():
2767
      node = self.cfg.GetNodeInfo(nuuid)
2768
      disks = node_disks[node.uuid]
2769

    
2770
      if nres.offline:
2771
        # No data from this node
2772
        data = len(disks) * [(False, "node offline")]
2773
      else:
2774
        msg = nres.fail_msg
2775
        self._ErrorIf(msg, constants.CV_ENODERPC, node.name,
2776
                      "while getting disk information: %s", msg)
2777
        if msg:
2778
          # No data from this node
2779
          data = len(disks) * [(False, msg)]
2780
        else:
2781
          data = []
2782
          for idx, i in enumerate(nres.payload):
2783
            if isinstance(i, (tuple, list)) and len(i) == 2:
2784
              data.append(i)
2785
            else:
2786
              logging.warning("Invalid result from node %s, entry %d: %s",
2787
                              node.name, idx, i)
2788
              data.append((False, "Invalid result from the remote node"))
2789

    
2790
      for ((inst_uuid, _), status) in zip(disks, data):
2791
        instdisk.setdefault(inst_uuid, {}).setdefault(node.uuid, []) \
2792
          .append(status)
2793

    
2794
    # Add empty entries for diskless instances.
2795
    for inst_uuid in diskless_instances:
2796
      assert inst_uuid not in instdisk
2797
      instdisk[inst_uuid] = {}
2798
    # ...and disk-full instances that happen to have no disks
2799
    for inst_uuid in nodisk_instances:
2800
      assert inst_uuid not in instdisk
2801
      instdisk[inst_uuid] = {}
2802

    
2803
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2804
                      len(nuuids) <= len(instanceinfo[inst].all_nodes) and
2805
                      compat.all(isinstance(s, (tuple, list)) and
2806
                                 len(s) == 2 for s in statuses)
2807
                      for inst, nuuids in instdisk.items()
2808
                      for nuuid, statuses in nuuids.items())
2809
    if __debug__:
2810
      instdisk_keys = set(instdisk)
2811
      instanceinfo_keys = set(instanceinfo)
2812
      assert instdisk_keys == instanceinfo_keys, \
2813
        ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2814
         (instdisk_keys, instanceinfo_keys))
2815

    
2816
    return instdisk
2817

    
2818
  @staticmethod
2819
  def _SshNodeSelector(group_uuid, all_nodes):
2820
    """Create endless iterators for all potential SSH check hosts.
2821

2822
    """
2823
    nodes = [node for node in all_nodes
2824
             if (node.group != group_uuid and
2825
                 not node.offline)]
2826
    keyfunc = operator.attrgetter("group")
2827

    
2828
    return map(itertools.cycle,
2829
               [sorted(map(operator.attrgetter("name"), names))
2830
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2831
                                                  keyfunc)])
2832

    
2833
  @classmethod
2834
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2835
    """Choose which nodes should talk to which other nodes.
2836

2837
    We will make nodes contact all nodes in their group, and one node from
2838
    every other group.
2839

2840
    @warning: This algorithm has a known issue if one node group is much
2841
      smaller than others (e.g. just one node). In such a case all other
2842
      nodes will talk to the single node.
2843

2844
    """
2845
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2846
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
2847

    
2848
    return (online_nodes,
2849
            dict((name, sorted([i.next() for i in sel]))
2850
                 for name in online_nodes))
2851

    
2852
  def BuildHooksEnv(self):
2853
    """Build hooks env.
2854

2855
    Cluster-Verify hooks just ran in the post phase and their failure makes
2856
    the output be logged in the verify output and the verification to fail.
2857

2858
    """
2859
    env = {
2860
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2861
      }
2862

    
2863
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2864
               for node in self.my_node_info.values())
2865

    
2866
    return env
2867

    
2868
  def BuildHooksNodes(self):
2869
    """Build hooks nodes.
2870

2871
    """
2872
    return ([], list(self.my_node_info.keys()))
2873

    
2874
  def Exec(self, feedback_fn):
2875
    """Verify integrity of the node group, performing various test on nodes.
2876

2877
    """
2878
    # This method has too many local variables. pylint: disable=R0914
2879
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
2880

    
2881
    if not self.my_node_uuids:
2882
      # empty node group
2883
      feedback_fn("* Empty node group, skipping verification")
2884
      return True
2885

    
2886
    self.bad = False
2887
    verbose = self.op.verbose
2888
    self._feedback_fn = feedback_fn
2889

    
2890
    vg_name = self.cfg.GetVGName()
2891
    drbd_helper = self.cfg.GetDRBDHelper()
2892
    cluster = self.cfg.GetClusterInfo()
2893
    hypervisors = cluster.enabled_hypervisors
2894
    node_data_list = self.my_node_info.values()
2895

    
2896
    i_non_redundant = [] # Non redundant instances
2897
    i_non_a_balanced = [] # Non auto-balanced instances
2898
    i_offline = 0 # Count of offline instances
2899
    n_offline = 0 # Count of offline nodes
2900
    n_drained = 0 # Count of nodes being drained
2901
    node_vol_should = {}
2902

    
2903
    # FIXME: verify OS list
2904

    
2905
    # File verification
2906
    filemap = ComputeAncillaryFiles(cluster, False)
2907

    
2908
    # do local checksums
2909
    master_node_uuid = self.master_node = self.cfg.GetMasterNode()
2910
    master_ip = self.cfg.GetMasterIP()
2911

    
2912
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_uuids))
2913

    
2914
    user_scripts = []
2915
    if self.cfg.GetUseExternalMipScript():
2916
      user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2917

    
2918
    node_verify_param = {
2919
      constants.NV_FILELIST:
2920
        map(vcluster.MakeVirtualPath,
2921
            utils.UniqueSequence(filename
2922
                                 for files in filemap
2923
                                 for filename in files)),
2924
      constants.NV_NODELIST:
2925
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2926
                                  self.all_node_info.values()),
2927
      constants.NV_HYPERVISOR: hypervisors,
2928
      constants.NV_HVPARAMS:
2929
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2930
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2931
                                 for node in node_data_list
2932
                                 if not node.offline],
2933
      constants.NV_INSTANCELIST: hypervisors,
2934
      constants.NV_VERSION: None,
2935
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2936
      constants.NV_NODESETUP: None,
2937
      constants.NV_TIME: None,
2938
      constants.NV_MASTERIP: (self.cfg.GetMasterNodeName(), master_ip),
2939
      constants.NV_OSLIST: None,
2940
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2941
      constants.NV_USERSCRIPTS: user_scripts,
2942
      }
2943

    
2944
    if vg_name is not None:
2945
      node_verify_param[constants.NV_VGLIST] = None
2946
      node_verify_param[constants.NV_LVLIST] = vg_name
2947
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2948

    
2949
    if cluster.IsDiskTemplateEnabled(constants.DT_DRBD8):
2950
      if drbd_helper:
2951
        node_verify_param[constants.NV_DRBDVERSION] = None
2952
        node_verify_param[constants.NV_DRBDLIST] = None
2953
        node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2954

    
2955
    if cluster.IsFileStorageEnabled() or \
2956
        cluster.IsSharedFileStorageEnabled():
2957
      # Load file storage paths only from master node
2958
      node_verify_param[constants.NV_ACCEPTED_STORAGE_PATHS] = \
2959
        self.cfg.GetMasterNodeName()
2960
      if cluster.IsFileStorageEnabled():
2961
        node_verify_param[constants.NV_FILE_STORAGE_PATH] = \
2962
          cluster.file_storage_dir
2963

    
2964
    # bridge checks
2965
    # FIXME: this needs to be changed per node-group, not cluster-wide
2966
    bridges = set()
2967
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2968
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2969
      bridges.add(default_nicpp[constants.NIC_LINK])
2970
    for inst_uuid in self.my_inst_info.values():
2971
      for nic in inst_uuid.nics:
2972
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
2973
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2974
          bridges.add(full_nic[constants.NIC_LINK])
2975

    
2976
    if bridges:
2977
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
2978

    
2979
    # Build our expected cluster state
2980
    node_image = dict((node.uuid, self.NodeImage(offline=node.offline,
2981
                                                 uuid=node.uuid,
2982
                                                 vm_capable=node.vm_capable))
2983
                      for node in node_data_list)
2984

    
2985
    # Gather OOB paths
2986
    oob_paths = []
2987
    for node in self.all_node_info.values():
2988
      path = SupportsOob(self.cfg, node)
2989
      if path and path not in oob_paths:
2990
        oob_paths.append(path)
2991

    
2992
    if oob_paths:
2993
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2994

    
2995
    for inst_uuid in self.my_inst_uuids:
2996
      instance = self.my_inst_info[inst_uuid]
2997
      if instance.admin_state == constants.ADMINST_OFFLINE:
2998
        i_offline += 1
2999

    
3000
      for nuuid in instance.all_nodes:
3001
        if nuuid not in node_image:
3002
          gnode = self.NodeImage(uuid=nuuid)
3003
          gnode.ghost = (nuuid not in self.all_node_info)
3004
          node_image[nuuid] = gnode
3005

    
3006
      instance.MapLVsByNode(node_vol_should)
3007

    
3008
      pnode = instance.primary_node
3009
      node_image[pnode].pinst.append(instance.uuid)
3010

    
3011
      for snode in instance.secondary_nodes:
3012
        nimg = node_image[snode]
3013
        nimg.sinst.append(instance.uuid)
3014
        if pnode not in nimg.sbp:
3015
          nimg.sbp[pnode] = []
3016
        nimg.sbp[pnode].append(instance.uuid)
3017

    
3018
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
3019
                                               self.my_node_info.keys())
3020
    # The value of exclusive_storage should be the same across the group, so if
3021
    # it's True for at least a node, we act as if it were set for all the nodes
3022
    self._exclusive_storage = compat.any(es_flags.values())
3023
    if self._exclusive_storage:
3024
      node_verify_param[constants.NV_EXCLUSIVEPVS] = True
3025

    
3026
    # At this point, we have the in-memory data structures complete,
3027
    # except for the runtime information, which we'll gather next
3028

    
3029
    # Due to the way our RPC system works, exact response times cannot be
3030
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
3031
    # time before and after executing the request, we can at least have a time
3032
    # window.
3033
    nvinfo_starttime = time.time()
3034
    all_nvinfo = self.rpc.call_node_verify(self.my_node_uuids,
3035
                                           node_verify_param,
3036
                                           self.cfg.GetClusterName(),
3037
                                           self.cfg.GetClusterInfo().hvparams)
3038
    nvinfo_endtime = time.time()
3039

    
3040
    if self.extra_lv_nodes and vg_name is not None:
3041
      extra_lv_nvinfo = \
3042
          self.rpc.call_node_verify(self.extra_lv_nodes,
3043
                                    {constants.NV_LVLIST: vg_name},
3044
                                    self.cfg.GetClusterName(),
3045
                                    self.cfg.GetClusterInfo().hvparams)
3046
    else:
3047
      extra_lv_nvinfo = {}
3048

    
3049
    all_drbd_map = self.cfg.ComputeDRBDMap()
3050

    
3051
    feedback_fn("* Gathering disk information (%s nodes)" %
3052
                len(self.my_node_uuids))
3053
    instdisk = self._CollectDiskInfo(self.my_node_info.keys(), node_image,
3054
                                     self.my_inst_info)
3055

    
3056
    feedback_fn("* Verifying configuration file consistency")
3057

    
3058
    # If not all nodes are being checked, we need to make sure the master node
3059
    # and a non-checked vm_capable node are in the list.
3060
    absent_node_uuids = set(self.all_node_info).difference(self.my_node_info)
3061
    if absent_node_uuids:
3062
      vf_nvinfo = all_nvinfo.copy()
3063
      vf_node_info = list(self.my_node_info.values())
3064
      additional_node_uuids = []
3065
      if master_node_uuid not in self.my_node_info:
3066
        additional_node_uuids.append(master_node_uuid)
3067
        vf_node_info.append(self.all_node_info[master_node_uuid])
3068
      # Add the first vm_capable node we find which is not included,
3069
      # excluding the master node (which we already have)
3070
      for node_uuid in absent_node_uuids:
3071
        nodeinfo = self.all_node_info[node_uuid]
3072
        if (nodeinfo.vm_capable and not nodeinfo.offline and
3073
            node_uuid != master_node_uuid):
3074
          additional_node_uuids.append(node_uuid)
3075
          vf_node_info.append(self.all_node_info[node_uuid])
3076
          break
3077
      key = constants.NV_FILELIST
3078
      vf_nvinfo.update(self.rpc.call_node_verify(
3079
         additional_node_uuids, {key: node_verify_param[key]},
3080
         self.cfg.GetClusterName(), self.cfg.GetClusterInfo().hvparams))
3081
    else:
3082
      vf_nvinfo = all_nvinfo
3083
      vf_node_info = self.my_node_info.values()
3084

    
3085
    self._VerifyFiles(vf_node_info, master_node_uuid, vf_nvinfo, filemap)
3086

    
3087
    feedback_fn("* Verifying node status")
3088

    
3089
    refos_img = None
3090

    
3091
    for node_i in node_data_list:
3092
      nimg = node_image[node_i.uuid]
3093

    
3094
      if node_i.offline:
3095
        if verbose:
3096
          feedback_fn("* Skipping offline node %s" % (node_i.name,))
3097
        n_offline += 1
3098
        continue
3099

    
3100
      if node_i.uuid == master_node_uuid:
3101
        ntype = "master"
3102
      elif node_i.master_candidate:
3103
        ntype = "master candidate"
3104
      elif node_i.drained:
3105
        ntype = "drained"
3106
        n_drained += 1
3107
      else:
3108
        ntype = "regular"
3109
      if verbose:
3110
        feedback_fn("* Verifying node %s (%s)" % (node_i.name, ntype))
3111

    
3112
      msg = all_nvinfo[node_i.uuid].fail_msg
3113
      self._ErrorIf(msg, constants.CV_ENODERPC, node_i.name,
3114
                    "while contacting node: %s", msg)
3115
      if msg:
3116
        nimg.rpc_fail = True
3117
        continue
3118

    
3119
      nresult = all_nvinfo[node_i.uuid].payload
3120

    
3121
      nimg.call_ok = self._VerifyNode(node_i, nresult)
3122
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
3123
      self._VerifyNodeNetwork(node_i, nresult)
3124
      self._VerifyNodeUserScripts(node_i, nresult)
3125
      self._VerifyOob(node_i, nresult)
3126
      self._VerifyAcceptedFileStoragePaths(node_i, nresult,
3127
                                           node_i.uuid == master_node_uuid)
3128
      self._VerifyFileStoragePaths(node_i, nresult)
3129
      self._VerifySharedFileStoragePaths(node_i, nresult)
3130

    
3131
      if nimg.vm_capable:
3132
        self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
3133
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
3134
                             all_drbd_map)
3135

    
3136
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
3137
        self._UpdateNodeInstances(node_i, nresult, nimg)
3138
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
3139
        self._UpdateNodeOS(node_i, nresult, nimg)
3140

    
3141
        if not nimg.os_fail:
3142
          if refos_img is None:
3143
            refos_img = nimg
3144
          self._VerifyNodeOS(node_i, nimg, refos_img)
3145
        self._VerifyNodeBridges(node_i, nresult, bridges)
3146

    
3147
        # Check whether all running instances are primary for the node. (This
3148
        # can no longer be done from _VerifyInstance below, since some of the
3149
        # wrong instances could be from other node groups.)
3150
        non_primary_inst_uuids = set(nimg.instances).difference(nimg.pinst)
3151

    
3152
        for inst_uuid in non_primary_inst_uuids:
3153
          test = inst_uuid in self.all_inst_info
3154
          self._ErrorIf(test, constants.CV_EINSTANCEWRONGNODE,
3155
                        self.cfg.GetInstanceName(inst_uuid),
3156
                        "instance should not run on node %s", node_i.name)
3157
          self._ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
3158
                        "node is running unknown instance %s", inst_uuid)
3159

    
3160
    self._VerifyGroupDRBDVersion(all_nvinfo)
3161
    self._VerifyGroupLVM(node_image, vg_name)
3162

    
3163
    for node_uuid, result in extra_lv_nvinfo.items():
3164
      self._UpdateNodeVolumes(self.all_node_info[node_uuid], result.payload,
3165
                              node_image[node_uuid], vg_name)
3166

    
3167
    feedback_fn("* Verifying instance status")
3168
    for inst_uuid in self.my_inst_uuids:
3169
      instance = self.my_inst_info[inst_uuid]
3170
      if verbose:
3171
        feedback_fn("* Verifying instance %s" % instance.name)
3172
      self._VerifyInstance(instance, node_image, instdisk[inst_uuid])
3173

    
3174
      # If the instance is non-redundant we cannot survive losing its primary
3175
      # node, so we are not N+1 compliant.
3176
      if instance.disk_template not in constants.DTS_MIRRORED:
3177
        i_non_redundant.append(instance)
3178

    
3179
      if not cluster.FillBE(instance)[constants.BE_AUTO_BALANCE]:
3180
        i_non_a_balanced.append(instance)
3181

    
3182
    feedback_fn("* Verifying orphan volumes")
3183
    reserved = utils.FieldSet(*cluster.reserved_lvs)
3184

    
3185
    # We will get spurious "unknown volume" warnings if any node of this group
3186
    # is secondary for an instance whose primary is in another group. To avoid
3187
    # them, we find these instances and add their volumes to node_vol_should.
3188
    for instance in self.all_inst_info.values():
3189
      for secondary in instance.secondary_nodes:
3190
        if (secondary in self.my_node_info
3191
            and instance.name not in self.my_inst_info):
3192
          instance.MapLVsByNode(node_vol_should)
3193
          break
3194

    
3195
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
3196

    
3197
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
3198
      feedback_fn("* Verifying N+1 Memory redundancy")
3199
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
3200

    
3201
    feedback_fn("* Other Notes")
3202
    if i_non_redundant:
3203
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
3204
                  % len(i_non_redundant))
3205

    
3206
    if i_non_a_balanced:
3207
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
3208
                  % len(i_non_a_balanced))
3209

    
3210
    if i_offline:
3211
      feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
3212

    
3213
    if n_offline:
3214
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
3215

    
3216
    if n_drained:
3217
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
3218

    
3219
    return not self.bad
3220

    
3221
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
3222
    """Analyze the post-hooks' result
3223

3224
    This method analyses the hook result, handles it, and sends some
3225
    nicely-formatted feedback back to the user.
3226

3227
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
3228
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
3229
    @param hooks_results: the results of the multi-node hooks rpc call
3230
    @param feedback_fn: function used send feedback back to the caller
3231
    @param lu_result: previous Exec result
3232
    @return: the new Exec result, based on the previous result
3233
        and hook results
3234

3235
    """
3236
    # We only really run POST phase hooks, only for non-empty groups,
3237
    # and are only interested in their results
3238
    if not self.my_node_uuids:
3239
      # empty node group
3240
      pass
3241
    elif phase == constants.HOOKS_PHASE_POST:
3242
      # Used to change hooks' output to proper indentation
3243
      feedback_fn("* Hooks Results")
3244
      assert hooks_results, "invalid result from hooks"
3245

    
3246
      for node_name in hooks_results:
3247
        res = hooks_results[node_name]
3248
        msg = res.fail_msg
3249
        test = msg and not res.offline
3250
        self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3251
                      "Communication failure in hooks execution: %s", msg)
3252
        if test:
3253
          lu_result = False
3254
          continue
3255
        if res.offline:
3256
          # No need to investigate payload if node is offline
3257
          continue
3258
        for script, hkr, output in res.payload:
3259
          test = hkr == constants.HKR_FAIL
3260
          self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3261
                        "Script %s failed, output:", script)
3262
          if test:
3263
            output = self._HOOKS_INDENT_RE.sub("      ", output)
3264
            feedback_fn("%s" % output)
3265
            lu_result = False
3266

    
3267
    return lu_result
3268

    
3269

    
3270
class LUClusterVerifyDisks(NoHooksLU):
3271
  """Verifies the cluster disks status.
3272

3273
  """
3274
  REQ_BGL = False
3275

    
3276
  def ExpandNames(self):
3277
    self.share_locks = ShareAll()
3278
    self.needed_locks = {
3279
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
3280
      }
3281

    
3282
  def Exec(self, feedback_fn):
3283
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3284

    
3285
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3286
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3287
                           for group in group_names])