Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / cluster.py @ 0359e5d0

History | View | Annotate | Download (122.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with the cluster."""
23

    
24
import OpenSSL
25

    
26
import copy
27
import itertools
28
import logging
29
import operator
30
import os
31
import re
32
import time
33

    
34
from ganeti import compat
35
from ganeti import constants
36
from ganeti import errors
37
from ganeti import hypervisor
38
from ganeti import locking
39
from ganeti import masterd
40
from ganeti import netutils
41
from ganeti import objects
42
from ganeti import opcodes
43
from ganeti import pathutils
44
from ganeti import query
45
from ganeti import rpc
46
from ganeti import runtime
47
from ganeti import ssh
48
from ganeti import uidpool
49
from ganeti import utils
50
from ganeti import vcluster
51

    
52
from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
53
  ResultWithJobs
54
from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55
  ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56
  GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57
  GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58
  CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59
  ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob, \
60
  CheckIpolicyVsDiskTemplates, CheckDiskAccessModeValidity, \
61
  CheckDiskAccessModeConsistency
62

    
63
import ganeti.masterd.instance
64

    
65

    
66
class LUClusterActivateMasterIp(NoHooksLU):
67
  """Activate the master IP on the master node.
68

69
  """
70
  def Exec(self, feedback_fn):
71
    """Activate the master IP.
72

73
    """
74
    master_params = self.cfg.GetMasterNetworkParameters()
75
    ems = self.cfg.GetUseExternalMipScript()
76
    result = self.rpc.call_node_activate_master_ip(master_params.uuid,
77
                                                   master_params, ems)
78
    result.Raise("Could not activate the master IP")
79

    
80

    
81
class LUClusterDeactivateMasterIp(NoHooksLU):
82
  """Deactivate the master IP on the master node.
83

84
  """
85
  def Exec(self, feedback_fn):
86
    """Deactivate the master IP.
87

88
    """
89
    master_params = self.cfg.GetMasterNetworkParameters()
90
    ems = self.cfg.GetUseExternalMipScript()
91
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
92
                                                     master_params, ems)
93
    result.Raise("Could not deactivate the master IP")
94

    
95

    
96
class LUClusterConfigQuery(NoHooksLU):
97
  """Return configuration values.
98

99
  """
100
  REQ_BGL = False
101

    
102
  def CheckArguments(self):
103
    self.cq = ClusterQuery(None, self.op.output_fields, False)
104

    
105
  def ExpandNames(self):
106
    self.cq.ExpandNames(self)
107

    
108
  def DeclareLocks(self, level):
109
    self.cq.DeclareLocks(self, level)
110

    
111
  def Exec(self, feedback_fn):
112
    result = self.cq.OldStyleQuery(self)
113

    
114
    assert len(result) == 1
115

    
116
    return result[0]
117

    
118

    
119
class LUClusterDestroy(LogicalUnit):
120
  """Logical unit for destroying the cluster.
121

122
  """
123
  HPATH = "cluster-destroy"
124
  HTYPE = constants.HTYPE_CLUSTER
125

    
126
  def BuildHooksEnv(self):
127
    """Build hooks env.
128

129
    """
130
    return {
131
      "OP_TARGET": self.cfg.GetClusterName(),
132
      }
133

    
134
  def BuildHooksNodes(self):
135
    """Build hooks nodes.
136

137
    """
138
    return ([], [])
139

    
140
  def CheckPrereq(self):
141
    """Check prerequisites.
142

143
    This checks whether the cluster is empty.
144

145
    Any errors are signaled by raising errors.OpPrereqError.
146

147
    """
148
    master = self.cfg.GetMasterNode()
149

    
150
    nodelist = self.cfg.GetNodeList()
151
    if len(nodelist) != 1 or nodelist[0] != master:
152
      raise errors.OpPrereqError("There are still %d node(s) in"
153
                                 " this cluster." % (len(nodelist) - 1),
154
                                 errors.ECODE_INVAL)
155
    instancelist = self.cfg.GetInstanceList()
156
    if instancelist:
157
      raise errors.OpPrereqError("There are still %d instance(s) in"
158
                                 " this cluster." % len(instancelist),
159
                                 errors.ECODE_INVAL)
160

    
161
  def Exec(self, feedback_fn):
162
    """Destroys the cluster.
163

164
    """
165
    master_params = self.cfg.GetMasterNetworkParameters()
166

    
167
    # Run post hooks on master node before it's removed
168
    RunPostHook(self, self.cfg.GetNodeName(master_params.uuid))
169

    
170
    ems = self.cfg.GetUseExternalMipScript()
171
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
172
                                                     master_params, ems)
173
    result.Warn("Error disabling the master IP address", self.LogWarning)
174
    return master_params.uuid
175

    
176

    
177
class LUClusterPostInit(LogicalUnit):
178
  """Logical unit for running hooks after cluster initialization.
179

180
  """
181
  HPATH = "cluster-init"
182
  HTYPE = constants.HTYPE_CLUSTER
183

    
184
  def CheckArguments(self):
185
    self.master_uuid = self.cfg.GetMasterNode()
186
    self.master_ndparams = self.cfg.GetNdParams(self.cfg.GetMasterNodeInfo())
187

    
188
    # TODO: When Issue 584 is solved, and None is properly parsed when used
189
    # as a default value, ndparams.get(.., None) can be changed to
190
    # ndparams[..] to access the values directly
191

    
192
    # OpenvSwitch: Warn user if link is missing
193
    if (self.master_ndparams[constants.ND_OVS] and not
194
        self.master_ndparams.get(constants.ND_OVS_LINK, None)):
195
      self.LogInfo("No physical interface for OpenvSwitch was given."
196
                   " OpenvSwitch will not have an outside connection. This"
197
                   " might not be what you want.")
198

    
199
  def BuildHooksEnv(self):
200
    """Build hooks env.
201

202
    """
203
    return {
204
      "OP_TARGET": self.cfg.GetClusterName(),
205
      }
206

    
207
  def BuildHooksNodes(self):
208
    """Build hooks nodes.
209

210
    """
211
    return ([], [self.cfg.GetMasterNode()])
212

    
213
  def Exec(self, feedback_fn):
214
    """Create and configure Open vSwitch
215

216
    """
217
    if self.master_ndparams[constants.ND_OVS]:
218
      result = self.rpc.call_node_configure_ovs(
219
                 self.master_uuid,
220
                 self.master_ndparams[constants.ND_OVS_NAME],
221
                 self.master_ndparams.get(constants.ND_OVS_LINK, None))
222
      result.Raise("Could not successully configure Open vSwitch")
223
    return True
224

    
225

    
226
class ClusterQuery(QueryBase):
227
  FIELDS = query.CLUSTER_FIELDS
228

    
229
  #: Do not sort (there is only one item)
230
  SORT_FIELD = None
231

    
232
  def ExpandNames(self, lu):
233
    lu.needed_locks = {}
234

    
235
    # The following variables interact with _QueryBase._GetNames
236
    self.wanted = locking.ALL_SET
237
    self.do_locking = self.use_locking
238

    
239
    if self.do_locking:
240
      raise errors.OpPrereqError("Can not use locking for cluster queries",
241
                                 errors.ECODE_INVAL)
242

    
243
  def DeclareLocks(self, lu, level):
244
    pass
245

    
246
  def _GetQueryData(self, lu):
247
    """Computes the list of nodes and their attributes.
248

249
    """
250
    # Locking is not used
251
    assert not (compat.any(lu.glm.is_owned(level)
252
                           for level in locking.LEVELS
253
                           if level != locking.LEVEL_CLUSTER) or
254
                self.do_locking or self.use_locking)
255

    
256
    if query.CQ_CONFIG in self.requested_data:
257
      cluster = lu.cfg.GetClusterInfo()
258
      nodes = lu.cfg.GetAllNodesInfo()
259
    else:
260
      cluster = NotImplemented
261
      nodes = NotImplemented
262

    
263
    if query.CQ_QUEUE_DRAINED in self.requested_data:
264
      drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
265
    else:
266
      drain_flag = NotImplemented
267

    
268
    if query.CQ_WATCHER_PAUSE in self.requested_data:
269
      master_node_uuid = lu.cfg.GetMasterNode()
270

    
271
      result = lu.rpc.call_get_watcher_pause(master_node_uuid)
272
      result.Raise("Can't retrieve watcher pause from master node '%s'" %
273
                   lu.cfg.GetMasterNodeName())
274

    
275
      watcher_pause = result.payload
276
    else:
277
      watcher_pause = NotImplemented
278

    
279
    return query.ClusterQueryData(cluster, nodes, drain_flag, watcher_pause)
280

    
281

    
282
class LUClusterQuery(NoHooksLU):
283
  """Query cluster configuration.
284

285
  """
286
  REQ_BGL = False
287

    
288
  def ExpandNames(self):
289
    self.needed_locks = {}
290

    
291
  def Exec(self, feedback_fn):
292
    """Return cluster config.
293

294
    """
295
    cluster = self.cfg.GetClusterInfo()
296
    os_hvp = {}
297

    
298
    # Filter just for enabled hypervisors
299
    for os_name, hv_dict in cluster.os_hvp.items():
300
      os_hvp[os_name] = {}
301
      for hv_name, hv_params in hv_dict.items():
302
        if hv_name in cluster.enabled_hypervisors:
303
          os_hvp[os_name][hv_name] = hv_params
304

    
305
    # Convert ip_family to ip_version
306
    primary_ip_version = constants.IP4_VERSION
307
    if cluster.primary_ip_family == netutils.IP6Address.family:
308
      primary_ip_version = constants.IP6_VERSION
309

    
310
    result = {
311
      "software_version": constants.RELEASE_VERSION,
312
      "protocol_version": constants.PROTOCOL_VERSION,
313
      "config_version": constants.CONFIG_VERSION,
314
      "os_api_version": max(constants.OS_API_VERSIONS),
315
      "export_version": constants.EXPORT_VERSION,
316
      "vcs_version": constants.VCS_VERSION,
317
      "architecture": runtime.GetArchInfo(),
318
      "name": cluster.cluster_name,
319
      "master": self.cfg.GetMasterNodeName(),
320
      "default_hypervisor": cluster.primary_hypervisor,
321
      "enabled_hypervisors": cluster.enabled_hypervisors,
322
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
323
                        for hypervisor_name in cluster.enabled_hypervisors]),
324
      "os_hvp": os_hvp,
325
      "beparams": cluster.beparams,
326
      "osparams": cluster.osparams,
327
      "ipolicy": cluster.ipolicy,
328
      "nicparams": cluster.nicparams,
329
      "ndparams": cluster.ndparams,
330
      "diskparams": cluster.diskparams,
331
      "candidate_pool_size": cluster.candidate_pool_size,
332
      "master_netdev": cluster.master_netdev,
333
      "master_netmask": cluster.master_netmask,
334
      "use_external_mip_script": cluster.use_external_mip_script,
335
      "volume_group_name": cluster.volume_group_name,
336
      "drbd_usermode_helper": cluster.drbd_usermode_helper,
337
      "file_storage_dir": cluster.file_storage_dir,
338
      "shared_file_storage_dir": cluster.shared_file_storage_dir,
339
      "maintain_node_health": cluster.maintain_node_health,
340
      "ctime": cluster.ctime,
341
      "mtime": cluster.mtime,
342
      "uuid": cluster.uuid,
343
      "tags": list(cluster.GetTags()),
344
      "uid_pool": cluster.uid_pool,
345
      "default_iallocator": cluster.default_iallocator,
346
      "default_iallocator_params": cluster.default_iallocator_params,
347
      "reserved_lvs": cluster.reserved_lvs,
348
      "primary_ip_version": primary_ip_version,
349
      "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
350
      "hidden_os": cluster.hidden_os,
351
      "blacklisted_os": cluster.blacklisted_os,
352
      "enabled_disk_templates": cluster.enabled_disk_templates,
353
      }
354

    
355
    return result
356

    
357

    
358
class LUClusterRedistConf(NoHooksLU):
359
  """Force the redistribution of cluster configuration.
360

361
  This is a very simple LU.
362

363
  """
364
  REQ_BGL = False
365

    
366
  def ExpandNames(self):
367
    self.needed_locks = {
368
      locking.LEVEL_NODE: locking.ALL_SET,
369
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
370
    }
371
    self.share_locks = ShareAll()
372

    
373
  def Exec(self, feedback_fn):
374
    """Redistribute the configuration.
375

376
    """
377
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
378
    RedistributeAncillaryFiles(self)
379

    
380

    
381
class LUClusterRename(LogicalUnit):
382
  """Rename the cluster.
383

384
  """
385
  HPATH = "cluster-rename"
386
  HTYPE = constants.HTYPE_CLUSTER
387

    
388
  def BuildHooksEnv(self):
389
    """Build hooks env.
390

391
    """
392
    return {
393
      "OP_TARGET": self.cfg.GetClusterName(),
394
      "NEW_NAME": self.op.name,
395
      }
396

    
397
  def BuildHooksNodes(self):
398
    """Build hooks nodes.
399

400
    """
401
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
402

    
403
  def CheckPrereq(self):
404
    """Verify that the passed name is a valid one.
405

406
    """
407
    hostname = netutils.GetHostname(name=self.op.name,
408
                                    family=self.cfg.GetPrimaryIPFamily())
409

    
410
    new_name = hostname.name
411
    self.ip = new_ip = hostname.ip
412
    old_name = self.cfg.GetClusterName()
413
    old_ip = self.cfg.GetMasterIP()
414
    if new_name == old_name and new_ip == old_ip:
415
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
416
                                 " cluster has changed",
417
                                 errors.ECODE_INVAL)
418
    if new_ip != old_ip:
419
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
420
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
421
                                   " reachable on the network" %
422
                                   new_ip, errors.ECODE_NOTUNIQUE)
423

    
424
    self.op.name = new_name
425

    
426
  def Exec(self, feedback_fn):
427
    """Rename the cluster.
428

429
    """
430
    clustername = self.op.name
431
    new_ip = self.ip
432

    
433
    # shutdown the master IP
434
    master_params = self.cfg.GetMasterNetworkParameters()
435
    ems = self.cfg.GetUseExternalMipScript()
436
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
437
                                                     master_params, ems)
438
    result.Raise("Could not disable the master role")
439

    
440
    try:
441
      cluster = self.cfg.GetClusterInfo()
442
      cluster.cluster_name = clustername
443
      cluster.master_ip = new_ip
444
      self.cfg.Update(cluster, feedback_fn)
445

    
446
      # update the known hosts file
447
      ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
448
      node_list = self.cfg.GetOnlineNodeList()
449
      try:
450
        node_list.remove(master_params.uuid)
451
      except ValueError:
452
        pass
453
      UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
454
    finally:
455
      master_params.ip = new_ip
456
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
457
                                                     master_params, ems)
458
      result.Warn("Could not re-enable the master role on the master,"
459
                  " please restart manually", self.LogWarning)
460

    
461
    return clustername
462

    
463

    
464
class LUClusterRepairDiskSizes(NoHooksLU):
465
  """Verifies the cluster disks sizes.
466

467
  """
468
  REQ_BGL = False
469

    
470
  def ExpandNames(self):
471
    if self.op.instances:
472
      (_, self.wanted_names) = GetWantedInstances(self, self.op.instances)
473
      # Not getting the node allocation lock as only a specific set of
474
      # instances (and their nodes) is going to be acquired
475
      self.needed_locks = {
476
        locking.LEVEL_NODE_RES: [],
477
        locking.LEVEL_INSTANCE: self.wanted_names,
478
        }
479
      self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
480
    else:
481
      self.wanted_names = None
482
      self.needed_locks = {
483
        locking.LEVEL_NODE_RES: locking.ALL_SET,
484
        locking.LEVEL_INSTANCE: locking.ALL_SET,
485

    
486
        # This opcode is acquires the node locks for all instances
487
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
488
        }
489

    
490
    self.share_locks = {
491
      locking.LEVEL_NODE_RES: 1,
492
      locking.LEVEL_INSTANCE: 0,
493
      locking.LEVEL_NODE_ALLOC: 1,
494
      }
495

    
496
  def DeclareLocks(self, level):
497
    if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
498
      self._LockInstancesNodes(primary_only=True, level=level)
499

    
500
  def CheckPrereq(self):
501
    """Check prerequisites.
502

503
    This only checks the optional instance list against the existing names.
504

505
    """
506
    if self.wanted_names is None:
507
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
508

    
509
    self.wanted_instances = \
510
        map(compat.snd, self.cfg.GetMultiInstanceInfoByName(self.wanted_names))
511

    
512
  def _EnsureChildSizes(self, disk):
513
    """Ensure children of the disk have the needed disk size.
514

515
    This is valid mainly for DRBD8 and fixes an issue where the
516
    children have smaller disk size.
517

518
    @param disk: an L{ganeti.objects.Disk} object
519

520
    """
521
    if disk.dev_type == constants.DT_DRBD8:
522
      assert disk.children, "Empty children for DRBD8?"
523
      fchild = disk.children[0]
524
      mismatch = fchild.size < disk.size
525
      if mismatch:
526
        self.LogInfo("Child disk has size %d, parent %d, fixing",
527
                     fchild.size, disk.size)
528
        fchild.size = disk.size
529

    
530
      # and we recurse on this child only, not on the metadev
531
      return self._EnsureChildSizes(fchild) or mismatch
532
    else:
533
      return False
534

    
535
  def Exec(self, feedback_fn):
536
    """Verify the size of cluster disks.
537

538
    """
539
    # TODO: check child disks too
540
    # TODO: check differences in size between primary/secondary nodes
541
    per_node_disks = {}
542
    for instance in self.wanted_instances:
543
      pnode = instance.primary_node
544
      if pnode not in per_node_disks:
545
        per_node_disks[pnode] = []
546
      for idx, disk in enumerate(instance.disks):
547
        per_node_disks[pnode].append((instance, idx, disk))
548

    
549
    assert not (frozenset(per_node_disks.keys()) -
550
                self.owned_locks(locking.LEVEL_NODE_RES)), \
551
      "Not owning correct locks"
552
    assert not self.owned_locks(locking.LEVEL_NODE)
553

    
554
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
555
                                               per_node_disks.keys())
556

    
557
    changed = []
558
    for node_uuid, dskl in per_node_disks.items():
559
      if not dskl:
560
        # no disks on the node
561
        continue
562

    
563
      newl = [([v[2].Copy()], v[0]) for v in dskl]
564
      node_name = self.cfg.GetNodeName(node_uuid)
565
      result = self.rpc.call_blockdev_getdimensions(node_uuid, newl)
566
      if result.fail_msg:
567
        self.LogWarning("Failure in blockdev_getdimensions call to node"
568
                        " %s, ignoring", node_name)
569
        continue
570
      if len(result.payload) != len(dskl):
571
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
572
                        " result.payload=%s", node_name, len(dskl),
573
                        result.payload)
574
        self.LogWarning("Invalid result from node %s, ignoring node results",
575
                        node_name)
576
        continue
577
      for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
578
        if dimensions is None:
579
          self.LogWarning("Disk %d of instance %s did not return size"
580
                          " information, ignoring", idx, instance.name)
581
          continue
582
        if not isinstance(dimensions, (tuple, list)):
583
          self.LogWarning("Disk %d of instance %s did not return valid"
584
                          " dimension information, ignoring", idx,
585
                          instance.name)
586
          continue
587
        (size, spindles) = dimensions
588
        if not isinstance(size, (int, long)):
589
          self.LogWarning("Disk %d of instance %s did not return valid"
590
                          " size information, ignoring", idx, instance.name)
591
          continue
592
        size = size >> 20
593
        if size != disk.size:
594
          self.LogInfo("Disk %d of instance %s has mismatched size,"
595
                       " correcting: recorded %d, actual %d", idx,
596
                       instance.name, disk.size, size)
597
          disk.size = size
598
          self.cfg.Update(instance, feedback_fn)
599
          changed.append((instance.name, idx, "size", size))
600
        if es_flags[node_uuid]:
601
          if spindles is None:
602
            self.LogWarning("Disk %d of instance %s did not return valid"
603
                            " spindles information, ignoring", idx,
604
                            instance.name)
605
          elif disk.spindles is None or disk.spindles != spindles:
606
            self.LogInfo("Disk %d of instance %s has mismatched spindles,"
607
                         " correcting: recorded %s, actual %s",
608
                         idx, instance.name, disk.spindles, spindles)
609
            disk.spindles = spindles
610
            self.cfg.Update(instance, feedback_fn)
611
            changed.append((instance.name, idx, "spindles", disk.spindles))
612
        if self._EnsureChildSizes(disk):
613
          self.cfg.Update(instance, feedback_fn)
614
          changed.append((instance.name, idx, "size", disk.size))
615
    return changed
616

    
617

    
618
def _ValidateNetmask(cfg, netmask):
619
  """Checks if a netmask is valid.
620

621
  @type cfg: L{config.ConfigWriter}
622
  @param cfg: The cluster configuration
623
  @type netmask: int
624
  @param netmask: the netmask to be verified
625
  @raise errors.OpPrereqError: if the validation fails
626

627
  """
628
  ip_family = cfg.GetPrimaryIPFamily()
629
  try:
630
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
631
  except errors.ProgrammerError:
632
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
633
                               ip_family, errors.ECODE_INVAL)
634
  if not ipcls.ValidateNetmask(netmask):
635
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
636
                               (netmask), errors.ECODE_INVAL)
637

    
638

    
639
def CheckFileBasedStoragePathVsEnabledDiskTemplates(
640
    logging_warn_fn, file_storage_dir, enabled_disk_templates,
641
    file_disk_template):
642
  """Checks whether the given file-based storage directory is acceptable.
643

644
  Note: This function is public, because it is also used in bootstrap.py.
645

646
  @type logging_warn_fn: function
647
  @param logging_warn_fn: function which accepts a string and logs it
648
  @type file_storage_dir: string
649
  @param file_storage_dir: the directory to be used for file-based instances
650
  @type enabled_disk_templates: list of string
651
  @param enabled_disk_templates: the list of enabled disk templates
652
  @type file_disk_template: string
653
  @param file_disk_template: the file-based disk template for which the
654
      path should be checked
655

656
  """
657
  assert (file_disk_template in
658
          utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
659
  file_storage_enabled = file_disk_template in enabled_disk_templates
660
  if file_storage_dir is not None:
661
    if file_storage_dir == "":
662
      if file_storage_enabled:
663
        raise errors.OpPrereqError(
664
            "Unsetting the '%s' storage directory while having '%s' storage"
665
            " enabled is not permitted." %
666
            (file_disk_template, file_disk_template))
667
    else:
668
      if not file_storage_enabled:
669
        logging_warn_fn(
670
            "Specified a %s storage directory, although %s storage is not"
671
            " enabled." % (file_disk_template, file_disk_template))
672
  else:
673
    raise errors.ProgrammerError("Received %s storage dir with value"
674
                                 " 'None'." % file_disk_template)
675

    
676

    
677
def CheckFileStoragePathVsEnabledDiskTemplates(
678
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
679
  """Checks whether the given file storage directory is acceptable.
680

681
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
682

683
  """
684
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
685
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
686
      constants.DT_FILE)
687

    
688

    
689
def CheckSharedFileStoragePathVsEnabledDiskTemplates(
690
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
691
  """Checks whether the given shared file storage directory is acceptable.
692

693
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
694

695
  """
696
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
697
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
698
      constants.DT_SHARED_FILE)
699

    
700

    
701
class LUClusterSetParams(LogicalUnit):
702
  """Change the parameters of the cluster.
703

704
  """
705
  HPATH = "cluster-modify"
706
  HTYPE = constants.HTYPE_CLUSTER
707
  REQ_BGL = False
708

    
709
  def CheckArguments(self):
710
    """Check parameters
711

712
    """
713
    if self.op.uid_pool:
714
      uidpool.CheckUidPool(self.op.uid_pool)
715

    
716
    if self.op.add_uids:
717
      uidpool.CheckUidPool(self.op.add_uids)
718

    
719
    if self.op.remove_uids:
720
      uidpool.CheckUidPool(self.op.remove_uids)
721

    
722
    if self.op.master_netmask is not None:
723
      _ValidateNetmask(self.cfg, self.op.master_netmask)
724

    
725
    if self.op.diskparams:
726
      for dt_params in self.op.diskparams.values():
727
        utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
728
      try:
729
        utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
730
        CheckDiskAccessModeValidity(self.op.diskparams)
731
      except errors.OpPrereqError, err:
732
        raise errors.OpPrereqError("While verify diskparams options: %s" % err,
733
                                   errors.ECODE_INVAL)
734

    
735
  def ExpandNames(self):
736
    # FIXME: in the future maybe other cluster params won't require checking on
737
    # all nodes to be modified.
738
    # FIXME: This opcode changes cluster-wide settings. Is acquiring all
739
    # resource locks the right thing, shouldn't it be the BGL instead?
740
    self.needed_locks = {
741
      locking.LEVEL_NODE: locking.ALL_SET,
742
      locking.LEVEL_INSTANCE: locking.ALL_SET,
743
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
744
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
745
    }
746
    self.share_locks = ShareAll()
747

    
748
  def BuildHooksEnv(self):
749
    """Build hooks env.
750

751
    """
752
    return {
753
      "OP_TARGET": self.cfg.GetClusterName(),
754
      "NEW_VG_NAME": self.op.vg_name,
755
      }
756

    
757
  def BuildHooksNodes(self):
758
    """Build hooks nodes.
759

760
    """
761
    mn = self.cfg.GetMasterNode()
762
    return ([mn], [mn])
763

    
764
  def _CheckVgName(self, node_uuids, enabled_disk_templates,
765
                   new_enabled_disk_templates):
766
    """Check the consistency of the vg name on all nodes and in case it gets
767
       unset whether there are instances still using it.
768

769
    """
770
    lvm_is_enabled = utils.IsLvmEnabled(enabled_disk_templates)
771
    lvm_gets_enabled = utils.LvmGetsEnabled(enabled_disk_templates,
772
                                            new_enabled_disk_templates)
773
    current_vg_name = self.cfg.GetVGName()
774

    
775
    if self.op.vg_name == '':
776
      if lvm_is_enabled:
777
        raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
778
                                   " disk templates are or get enabled.")
779

    
780
    if self.op.vg_name is None:
781
      if current_vg_name is None and lvm_is_enabled:
782
        raise errors.OpPrereqError("Please specify a volume group when"
783
                                   " enabling lvm-based disk-templates.")
784

    
785
    if self.op.vg_name is not None and not self.op.vg_name:
786
      if self.cfg.HasAnyDiskOfType(constants.DT_PLAIN):
787
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
788
                                   " instances exist", errors.ECODE_INVAL)
789

    
790
    if (self.op.vg_name is not None and lvm_is_enabled) or \
791
        (self.cfg.GetVGName() is not None and lvm_gets_enabled):
792
      self._CheckVgNameOnNodes(node_uuids)
793

    
794
  def _CheckVgNameOnNodes(self, node_uuids):
795
    """Check the status of the volume group on each node.
796

797
    """
798
    vglist = self.rpc.call_vg_list(node_uuids)
799
    for node_uuid in node_uuids:
800
      msg = vglist[node_uuid].fail_msg
801
      if msg:
802
        # ignoring down node
803
        self.LogWarning("Error while gathering data on node %s"
804
                        " (ignoring node): %s",
805
                        self.cfg.GetNodeName(node_uuid), msg)
806
        continue
807
      vgstatus = utils.CheckVolumeGroupSize(vglist[node_uuid].payload,
808
                                            self.op.vg_name,
809
                                            constants.MIN_VG_SIZE)
810
      if vgstatus:
811
        raise errors.OpPrereqError("Error on node '%s': %s" %
812
                                   (self.cfg.GetNodeName(node_uuid), vgstatus),
813
                                   errors.ECODE_ENVIRON)
814

    
815
  @staticmethod
816
  def _GetDiskTemplateSetsInner(op_enabled_disk_templates,
817
                                old_enabled_disk_templates):
818
    """Computes three sets of disk templates.
819

820
    @see: C{_GetDiskTemplateSets} for more details.
821

822
    """
823
    enabled_disk_templates = None
824
    new_enabled_disk_templates = []
825
    disabled_disk_templates = []
826
    if op_enabled_disk_templates:
827
      enabled_disk_templates = op_enabled_disk_templates
828
      new_enabled_disk_templates = \
829
        list(set(enabled_disk_templates)
830
             - set(old_enabled_disk_templates))
831
      disabled_disk_templates = \
832
        list(set(old_enabled_disk_templates)
833
             - set(enabled_disk_templates))
834
    else:
835
      enabled_disk_templates = old_enabled_disk_templates
836
    return (enabled_disk_templates, new_enabled_disk_templates,
837
            disabled_disk_templates)
838

    
839
  def _GetDiskTemplateSets(self, cluster):
840
    """Computes three sets of disk templates.
841

842
    The three sets are:
843
      - disk templates that will be enabled after this operation (no matter if
844
        they were enabled before or not)
845
      - disk templates that get enabled by this operation (thus haven't been
846
        enabled before.)
847
      - disk templates that get disabled by this operation
848

849
    """
850
    return self._GetDiskTemplateSetsInner(self.op.enabled_disk_templates,
851
                                          cluster.enabled_disk_templates)
852

    
853
  def _CheckIpolicy(self, cluster, enabled_disk_templates):
854
    """Checks the ipolicy.
855

856
    @type cluster: C{objects.Cluster}
857
    @param cluster: the cluster's configuration
858
    @type enabled_disk_templates: list of string
859
    @param enabled_disk_templates: list of (possibly newly) enabled disk
860
      templates
861

862
    """
863
    # FIXME: write unit tests for this
864
    if self.op.ipolicy:
865
      self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
866
                                           group_policy=False)
867

    
868
      CheckIpolicyVsDiskTemplates(self.new_ipolicy,
869
                                  enabled_disk_templates)
870

    
871
      all_instances = self.cfg.GetAllInstancesInfo().values()
872
      violations = set()
873
      for group in self.cfg.GetAllNodeGroupsInfo().values():
874
        instances = frozenset([inst for inst in all_instances
875
                               if compat.any(nuuid in group.members
876
                                             for nuuid in inst.all_nodes)])
877
        new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
878
        ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
879
        new = ComputeNewInstanceViolations(ipol, new_ipolicy, instances,
880
                                           self.cfg)
881
        if new:
882
          violations.update(new)
883

    
884
      if violations:
885
        self.LogWarning("After the ipolicy change the following instances"
886
                        " violate them: %s",
887
                        utils.CommaJoin(utils.NiceSort(violations)))
888
    else:
889
      CheckIpolicyVsDiskTemplates(cluster.ipolicy,
890
                                  enabled_disk_templates)
891

    
892
  def _CheckDrbdHelperOnNodes(self, drbd_helper, node_uuids):
893
    """Checks whether the set DRBD helper actually exists on the nodes.
894

895
    @type drbd_helper: string
896
    @param drbd_helper: path of the drbd usermode helper binary
897
    @type node_uuids: list of strings
898
    @param node_uuids: list of node UUIDs to check for the helper
899

900
    """
901
    # checks given drbd helper on all nodes
902
    helpers = self.rpc.call_drbd_helper(node_uuids)
903
    for (_, ninfo) in self.cfg.GetMultiNodeInfo(node_uuids):
904
      if ninfo.offline:
905
        self.LogInfo("Not checking drbd helper on offline node %s",
906
                     ninfo.name)
907
        continue
908
      msg = helpers[ninfo.uuid].fail_msg
909
      if msg:
910
        raise errors.OpPrereqError("Error checking drbd helper on node"
911
                                   " '%s': %s" % (ninfo.name, msg),
912
                                   errors.ECODE_ENVIRON)
913
      node_helper = helpers[ninfo.uuid].payload
914
      if node_helper != drbd_helper:
915
        raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
916
                                   (ninfo.name, node_helper),
917
                                   errors.ECODE_ENVIRON)
918

    
919
  def _CheckDrbdHelper(self, node_uuids, drbd_enabled, drbd_gets_enabled):
920
    """Check the DRBD usermode helper.
921

922
    @type node_uuids: list of strings
923
    @param node_uuids: a list of nodes' UUIDs
924
    @type drbd_enabled: boolean
925
    @param drbd_enabled: whether DRBD will be enabled after this operation
926
      (no matter if it was disabled before or not)
927
    @type drbd_gets_enabled: boolen
928
    @param drbd_gets_enabled: true if DRBD was disabled before this
929
      operation, but will be enabled afterwards
930

931
    """
932
    if self.op.drbd_helper == '':
933
      if drbd_enabled:
934
        raise errors.OpPrereqError("Cannot disable drbd helper while"
935
                                   " DRBD is enabled.")
936
      if self.cfg.HasAnyDiskOfType(constants.DT_DRBD8):
937
        raise errors.OpPrereqError("Cannot disable drbd helper while"
938
                                   " drbd-based instances exist",
939
                                   errors.ECODE_INVAL)
940

    
941
    else:
942
      if self.op.drbd_helper is not None and drbd_enabled:
943
        self._CheckDrbdHelperOnNodes(self.op.drbd_helper, node_uuids)
944
      else:
945
        if drbd_gets_enabled:
946
          current_drbd_helper = self.cfg.GetClusterInfo().drbd_usermode_helper
947
          if current_drbd_helper is not None:
948
            self._CheckDrbdHelperOnNodes(current_drbd_helper, node_uuids)
949
          else:
950
            raise errors.OpPrereqError("Cannot enable DRBD without a"
951
                                       " DRBD usermode helper set.")
952

    
953
  def _CheckInstancesOfDisabledDiskTemplates(
954
      self, disabled_disk_templates):
955
    """Check whether we try to disable a disk template that is in use.
956

957
    @type disabled_disk_templates: list of string
958
    @param disabled_disk_templates: list of disk templates that are going to
959
      be disabled by this operation
960

961
    """
962
    for disk_template in disabled_disk_templates:
963
      if self.cfg.HasAnyDiskOfType(disk_template):
964
        raise errors.OpPrereqError(
965
            "Cannot disable disk template '%s', because there is at least one"
966
            " instance using it." % disk_template)
967

    
968
  def CheckPrereq(self):
969
    """Check prerequisites.
970

971
    This checks whether the given params don't conflict and
972
    if the given volume group is valid.
973

974
    """
975
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
976
    self.cluster = cluster = self.cfg.GetClusterInfo()
977

    
978
    vm_capable_node_uuids = [node.uuid
979
                             for node in self.cfg.GetAllNodesInfo().values()
980
                             if node.uuid in node_uuids and node.vm_capable]
981

    
982
    (enabled_disk_templates, new_enabled_disk_templates,
983
      disabled_disk_templates) = self._GetDiskTemplateSets(cluster)
984
    self._CheckInstancesOfDisabledDiskTemplates(disabled_disk_templates)
985

    
986
    self._CheckVgName(vm_capable_node_uuids, enabled_disk_templates,
987
                      new_enabled_disk_templates)
988

    
989
    if self.op.file_storage_dir is not None:
990
      CheckFileStoragePathVsEnabledDiskTemplates(
991
          self.LogWarning, self.op.file_storage_dir, enabled_disk_templates)
992

    
993
    if self.op.shared_file_storage_dir is not None:
994
      CheckSharedFileStoragePathVsEnabledDiskTemplates(
995
          self.LogWarning, self.op.shared_file_storage_dir,
996
          enabled_disk_templates)
997

    
998
    drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
999
    drbd_gets_enabled = constants.DT_DRBD8 in new_enabled_disk_templates
1000
    self._CheckDrbdHelper(node_uuids, drbd_enabled, drbd_gets_enabled)
1001

    
1002
    # validate params changes
1003
    if self.op.beparams:
1004
      objects.UpgradeBeParams(self.op.beparams)
1005
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1006
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
1007

    
1008
    if self.op.ndparams:
1009
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
1010
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
1011

    
1012
      # TODO: we need a more general way to handle resetting
1013
      # cluster-level parameters to default values
1014
      if self.new_ndparams["oob_program"] == "":
1015
        self.new_ndparams["oob_program"] = \
1016
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
1017

    
1018
    if self.op.hv_state:
1019
      new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
1020
                                           self.cluster.hv_state_static)
1021
      self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
1022
                               for hv, values in new_hv_state.items())
1023

    
1024
    if self.op.disk_state:
1025
      new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
1026
                                               self.cluster.disk_state_static)
1027
      self.new_disk_state = \
1028
        dict((storage, dict((name, cluster.SimpleFillDiskState(values))
1029
                            for name, values in svalues.items()))
1030
             for storage, svalues in new_disk_state.items())
1031

    
1032
    self._CheckIpolicy(cluster, enabled_disk_templates)
1033

    
1034
    if self.op.nicparams:
1035
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1036
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
1037
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
1038
      nic_errors = []
1039

    
1040
      # check all instances for consistency
1041
      for instance in self.cfg.GetAllInstancesInfo().values():
1042
        for nic_idx, nic in enumerate(instance.nics):
1043
          params_copy = copy.deepcopy(nic.nicparams)
1044
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
1045

    
1046
          # check parameter syntax
1047
          try:
1048
            objects.NIC.CheckParameterSyntax(params_filled)
1049
          except errors.ConfigurationError, err:
1050
            nic_errors.append("Instance %s, nic/%d: %s" %
1051
                              (instance.name, nic_idx, err))
1052

    
1053
          # if we're moving instances to routed, check that they have an ip
1054
          target_mode = params_filled[constants.NIC_MODE]
1055
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
1056
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
1057
                              " address" % (instance.name, nic_idx))
1058
      if nic_errors:
1059
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
1060
                                   "\n".join(nic_errors), errors.ECODE_INVAL)
1061

    
1062
    # hypervisor list/parameters
1063
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
1064
    if self.op.hvparams:
1065
      for hv_name, hv_dict in self.op.hvparams.items():
1066
        if hv_name not in self.new_hvparams:
1067
          self.new_hvparams[hv_name] = hv_dict
1068
        else:
1069
          self.new_hvparams[hv_name].update(hv_dict)
1070

    
1071
    # disk template parameters
1072
    self.new_diskparams = objects.FillDict(cluster.diskparams, {})
1073
    if self.op.diskparams:
1074
      for dt_name, dt_params in self.op.diskparams.items():
1075
        if dt_name not in self.new_diskparams:
1076
          self.new_diskparams[dt_name] = dt_params
1077
        else:
1078
          self.new_diskparams[dt_name].update(dt_params)
1079
      CheckDiskAccessModeConsistency(self.op.diskparams, self.cfg)
1080

    
1081
    # os hypervisor parameters
1082
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
1083
    if self.op.os_hvp:
1084
      for os_name, hvs in self.op.os_hvp.items():
1085
        if os_name not in self.new_os_hvp:
1086
          self.new_os_hvp[os_name] = hvs
1087
        else:
1088
          for hv_name, hv_dict in hvs.items():
1089
            if hv_dict is None:
1090
              # Delete if it exists
1091
              self.new_os_hvp[os_name].pop(hv_name, None)
1092
            elif hv_name not in self.new_os_hvp[os_name]:
1093
              self.new_os_hvp[os_name][hv_name] = hv_dict
1094
            else:
1095
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
1096

    
1097
    # os parameters
1098
    self.new_osp = objects.FillDict(cluster.osparams, {})
1099
    if self.op.osparams:
1100
      for os_name, osp in self.op.osparams.items():
1101
        if os_name not in self.new_osp:
1102
          self.new_osp[os_name] = {}
1103

    
1104
        self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
1105
                                                 use_none=True)
1106

    
1107
        if not self.new_osp[os_name]:
1108
          # we removed all parameters
1109
          del self.new_osp[os_name]
1110
        else:
1111
          # check the parameter validity (remote check)
1112
          CheckOSParams(self, False, [self.cfg.GetMasterNode()],
1113
                        os_name, self.new_osp[os_name])
1114

    
1115
    # changes to the hypervisor list
1116
    if self.op.enabled_hypervisors is not None:
1117
      self.hv_list = self.op.enabled_hypervisors
1118
      for hv in self.hv_list:
1119
        # if the hypervisor doesn't already exist in the cluster
1120
        # hvparams, we initialize it to empty, and then (in both
1121
        # cases) we make sure to fill the defaults, as we might not
1122
        # have a complete defaults list if the hypervisor wasn't
1123
        # enabled before
1124
        if hv not in new_hvp:
1125
          new_hvp[hv] = {}
1126
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
1127
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
1128
    else:
1129
      self.hv_list = cluster.enabled_hypervisors
1130

    
1131
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1132
      # either the enabled list has changed, or the parameters have, validate
1133
      for hv_name, hv_params in self.new_hvparams.items():
1134
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1135
            (self.op.enabled_hypervisors and
1136
             hv_name in self.op.enabled_hypervisors)):
1137
          # either this is a new hypervisor, or its parameters have changed
1138
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1139
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1140
          hv_class.CheckParameterSyntax(hv_params)
1141
          CheckHVParams(self, node_uuids, hv_name, hv_params)
1142

    
1143
    self._CheckDiskTemplateConsistency()
1144

    
1145
    if self.op.os_hvp:
1146
      # no need to check any newly-enabled hypervisors, since the
1147
      # defaults have already been checked in the above code-block
1148
      for os_name, os_hvp in self.new_os_hvp.items():
1149
        for hv_name, hv_params in os_hvp.items():
1150
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1151
          # we need to fill in the new os_hvp on top of the actual hv_p
1152
          cluster_defaults = self.new_hvparams.get(hv_name, {})
1153
          new_osp = objects.FillDict(cluster_defaults, hv_params)
1154
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1155
          hv_class.CheckParameterSyntax(new_osp)
1156
          CheckHVParams(self, node_uuids, hv_name, new_osp)
1157

    
1158
    if self.op.default_iallocator:
1159
      alloc_script = utils.FindFile(self.op.default_iallocator,
1160
                                    constants.IALLOCATOR_SEARCH_PATH,
1161
                                    os.path.isfile)
1162
      if alloc_script is None:
1163
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
1164
                                   " specified" % self.op.default_iallocator,
1165
                                   errors.ECODE_INVAL)
1166

    
1167
  def _CheckDiskTemplateConsistency(self):
1168
    """Check whether the disk templates that are going to be disabled
1169
       are still in use by some instances.
1170

1171
    """
1172
    if self.op.enabled_disk_templates:
1173
      cluster = self.cfg.GetClusterInfo()
1174
      instances = self.cfg.GetAllInstancesInfo()
1175

    
1176
      disk_templates_to_remove = set(cluster.enabled_disk_templates) \
1177
        - set(self.op.enabled_disk_templates)
1178
      for instance in instances.itervalues():
1179
        if instance.disk_template in disk_templates_to_remove:
1180
          raise errors.OpPrereqError("Cannot disable disk template '%s',"
1181
                                     " because instance '%s' is using it." %
1182
                                     (instance.disk_template, instance.name))
1183

    
1184
  def _SetVgName(self, feedback_fn):
1185
    """Determines and sets the new volume group name.
1186

1187
    """
1188
    if self.op.vg_name is not None:
1189
      new_volume = self.op.vg_name
1190
      if not new_volume:
1191
        new_volume = None
1192
      if new_volume != self.cfg.GetVGName():
1193
        self.cfg.SetVGName(new_volume)
1194
      else:
1195
        feedback_fn("Cluster LVM configuration already in desired"
1196
                    " state, not changing")
1197

    
1198
  def _SetFileStorageDir(self, feedback_fn):
1199
    """Set the file storage directory.
1200

1201
    """
1202
    if self.op.file_storage_dir is not None:
1203
      if self.cluster.file_storage_dir == self.op.file_storage_dir:
1204
        feedback_fn("Global file storage dir already set to value '%s'"
1205
                    % self.cluster.file_storage_dir)
1206
      else:
1207
        self.cluster.file_storage_dir = self.op.file_storage_dir
1208

    
1209
  def _SetDrbdHelper(self, feedback_fn):
1210
    """Set the DRBD usermode helper.
1211

1212
    """
1213
    if self.op.drbd_helper is not None:
1214
      if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
1215
        feedback_fn("Note that you specified a drbd user helper, but did not"
1216
                    " enable the drbd disk template.")
1217
      new_helper = self.op.drbd_helper
1218
      if not new_helper:
1219
        new_helper = None
1220
      if new_helper != self.cfg.GetDRBDHelper():
1221
        self.cfg.SetDRBDHelper(new_helper)
1222
      else:
1223
        feedback_fn("Cluster DRBD helper already in desired state,"
1224
                    " not changing")
1225

    
1226
  def Exec(self, feedback_fn):
1227
    """Change the parameters of the cluster.
1228

1229
    """
1230
    if self.op.enabled_disk_templates:
1231
      self.cluster.enabled_disk_templates = \
1232
        list(self.op.enabled_disk_templates)
1233

    
1234
    self._SetVgName(feedback_fn)
1235
    self._SetFileStorageDir(feedback_fn)
1236
    self._SetDrbdHelper(feedback_fn)
1237

    
1238
    if self.op.hvparams:
1239
      self.cluster.hvparams = self.new_hvparams
1240
    if self.op.os_hvp:
1241
      self.cluster.os_hvp = self.new_os_hvp
1242
    if self.op.enabled_hypervisors is not None:
1243
      self.cluster.hvparams = self.new_hvparams
1244
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1245
    if self.op.beparams:
1246
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1247
    if self.op.nicparams:
1248
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1249
    if self.op.ipolicy:
1250
      self.cluster.ipolicy = self.new_ipolicy
1251
    if self.op.osparams:
1252
      self.cluster.osparams = self.new_osp
1253
    if self.op.ndparams:
1254
      self.cluster.ndparams = self.new_ndparams
1255
    if self.op.diskparams:
1256
      self.cluster.diskparams = self.new_diskparams
1257
    if self.op.hv_state:
1258
      self.cluster.hv_state_static = self.new_hv_state
1259
    if self.op.disk_state:
1260
      self.cluster.disk_state_static = self.new_disk_state
1261

    
1262
    if self.op.candidate_pool_size is not None:
1263
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1264
      # we need to update the pool size here, otherwise the save will fail
1265
      AdjustCandidatePool(self, [])
1266

    
1267
    if self.op.maintain_node_health is not None:
1268
      if self.op.maintain_node_health and not constants.ENABLE_CONFD:
1269
        feedback_fn("Note: CONFD was disabled at build time, node health"
1270
                    " maintenance is not useful (still enabling it)")
1271
      self.cluster.maintain_node_health = self.op.maintain_node_health
1272

    
1273
    if self.op.modify_etc_hosts is not None:
1274
      self.cluster.modify_etc_hosts = self.op.modify_etc_hosts
1275

    
1276
    if self.op.prealloc_wipe_disks is not None:
1277
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
1278

    
1279
    if self.op.add_uids is not None:
1280
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
1281

    
1282
    if self.op.remove_uids is not None:
1283
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
1284

    
1285
    if self.op.uid_pool is not None:
1286
      self.cluster.uid_pool = self.op.uid_pool
1287

    
1288
    if self.op.default_iallocator is not None:
1289
      self.cluster.default_iallocator = self.op.default_iallocator
1290

    
1291
    if self.op.default_iallocator_params is not None:
1292
      self.cluster.default_iallocator_params = self.op.default_iallocator_params
1293

    
1294
    if self.op.reserved_lvs is not None:
1295
      self.cluster.reserved_lvs = self.op.reserved_lvs
1296

    
1297
    if self.op.use_external_mip_script is not None:
1298
      self.cluster.use_external_mip_script = self.op.use_external_mip_script
1299

    
1300
    def helper_os(aname, mods, desc):
1301
      desc += " OS list"
1302
      lst = getattr(self.cluster, aname)
1303
      for key, val in mods:
1304
        if key == constants.DDM_ADD:
1305
          if val in lst:
1306
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1307
          else:
1308
            lst.append(val)
1309
        elif key == constants.DDM_REMOVE:
1310
          if val in lst:
1311
            lst.remove(val)
1312
          else:
1313
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1314
        else:
1315
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
1316

    
1317
    if self.op.hidden_os:
1318
      helper_os("hidden_os", self.op.hidden_os, "hidden")
1319

    
1320
    if self.op.blacklisted_os:
1321
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1322

    
1323
    if self.op.master_netdev:
1324
      master_params = self.cfg.GetMasterNetworkParameters()
1325
      ems = self.cfg.GetUseExternalMipScript()
1326
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
1327
                  self.cluster.master_netdev)
1328
      result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
1329
                                                       master_params, ems)
1330
      if not self.op.force:
1331
        result.Raise("Could not disable the master ip")
1332
      else:
1333
        if result.fail_msg:
1334
          msg = ("Could not disable the master ip (continuing anyway): %s" %
1335
                 result.fail_msg)
1336
          feedback_fn(msg)
1337
      feedback_fn("Changing master_netdev from %s to %s" %
1338
                  (master_params.netdev, self.op.master_netdev))
1339
      self.cluster.master_netdev = self.op.master_netdev
1340

    
1341
    if self.op.master_netmask:
1342
      master_params = self.cfg.GetMasterNetworkParameters()
1343
      feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1344
      result = self.rpc.call_node_change_master_netmask(
1345
                 master_params.uuid, master_params.netmask,
1346
                 self.op.master_netmask, master_params.ip,
1347
                 master_params.netdev)
1348
      result.Warn("Could not change the master IP netmask", feedback_fn)
1349
      self.cluster.master_netmask = self.op.master_netmask
1350

    
1351
    self.cfg.Update(self.cluster, feedback_fn)
1352

    
1353
    if self.op.master_netdev:
1354
      master_params = self.cfg.GetMasterNetworkParameters()
1355
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
1356
                  self.op.master_netdev)
1357
      ems = self.cfg.GetUseExternalMipScript()
1358
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
1359
                                                     master_params, ems)
1360
      result.Warn("Could not re-enable the master ip on the master,"
1361
                  " please restart manually", self.LogWarning)
1362

    
1363

    
1364
class LUClusterVerify(NoHooksLU):
1365
  """Submits all jobs necessary to verify the cluster.
1366

1367
  """
1368
  REQ_BGL = False
1369

    
1370
  def ExpandNames(self):
1371
    self.needed_locks = {}
1372

    
1373
  def Exec(self, feedback_fn):
1374
    jobs = []
1375

    
1376
    if self.op.group_name:
1377
      groups = [self.op.group_name]
1378
      depends_fn = lambda: None
1379
    else:
1380
      groups = self.cfg.GetNodeGroupList()
1381

    
1382
      # Verify global configuration
1383
      jobs.append([
1384
        opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1385
        ])
1386

    
1387
      # Always depend on global verification
1388
      depends_fn = lambda: [(-len(jobs), [])]
1389

    
1390
    jobs.extend(
1391
      [opcodes.OpClusterVerifyGroup(group_name=group,
1392
                                    ignore_errors=self.op.ignore_errors,
1393
                                    depends=depends_fn())]
1394
      for group in groups)
1395

    
1396
    # Fix up all parameters
1397
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1398
      op.debug_simulate_errors = self.op.debug_simulate_errors
1399
      op.verbose = self.op.verbose
1400
      op.error_codes = self.op.error_codes
1401
      try:
1402
        op.skip_checks = self.op.skip_checks
1403
      except AttributeError:
1404
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1405

    
1406
    return ResultWithJobs(jobs)
1407

    
1408

    
1409
class _VerifyErrors(object):
1410
  """Mix-in for cluster/group verify LUs.
1411

1412
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1413
  self.op and self._feedback_fn to be available.)
1414

1415
  """
1416

    
1417
  ETYPE_FIELD = "code"
1418
  ETYPE_ERROR = "ERROR"
1419
  ETYPE_WARNING = "WARNING"
1420

    
1421
  def _Error(self, ecode, item, msg, *args, **kwargs):
1422
    """Format an error message.
1423

1424
    Based on the opcode's error_codes parameter, either format a
1425
    parseable error code, or a simpler error string.
1426

1427
    This must be called only from Exec and functions called from Exec.
1428

1429
    """
1430
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1431
    itype, etxt, _ = ecode
1432
    # If the error code is in the list of ignored errors, demote the error to a
1433
    # warning
1434
    if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1435
      ltype = self.ETYPE_WARNING
1436
    # first complete the msg
1437
    if args:
1438
      msg = msg % args
1439
    # then format the whole message
1440
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1441
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1442
    else:
1443
      if item:
1444
        item = " " + item
1445
      else:
1446
        item = ""
1447
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1448
    # and finally report it via the feedback_fn
1449
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1450
    # do not mark the operation as failed for WARN cases only
1451
    if ltype == self.ETYPE_ERROR:
1452
      self.bad = True
1453

    
1454
  def _ErrorIf(self, cond, *args, **kwargs):
1455
    """Log an error message if the passed condition is True.
1456

1457
    """
1458
    if (bool(cond)
1459
        or self.op.debug_simulate_errors): # pylint: disable=E1101
1460
      self._Error(*args, **kwargs)
1461

    
1462

    
1463
def _VerifyCertificate(filename):
1464
  """Verifies a certificate for L{LUClusterVerifyConfig}.
1465

1466
  @type filename: string
1467
  @param filename: Path to PEM file
1468

1469
  """
1470
  try:
1471
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1472
                                           utils.ReadFile(filename))
1473
  except Exception, err: # pylint: disable=W0703
1474
    return (LUClusterVerifyConfig.ETYPE_ERROR,
1475
            "Failed to load X509 certificate %s: %s" % (filename, err))
1476

    
1477
  (errcode, msg) = \
1478
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1479
                                constants.SSL_CERT_EXPIRATION_ERROR)
1480

    
1481
  if msg:
1482
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1483
  else:
1484
    fnamemsg = None
1485

    
1486
  if errcode is None:
1487
    return (None, fnamemsg)
1488
  elif errcode == utils.CERT_WARNING:
1489
    return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1490
  elif errcode == utils.CERT_ERROR:
1491
    return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1492

    
1493
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1494

    
1495

    
1496
def _GetAllHypervisorParameters(cluster, instances):
1497
  """Compute the set of all hypervisor parameters.
1498

1499
  @type cluster: L{objects.Cluster}
1500
  @param cluster: the cluster object
1501
  @param instances: list of L{objects.Instance}
1502
  @param instances: additional instances from which to obtain parameters
1503
  @rtype: list of (origin, hypervisor, parameters)
1504
  @return: a list with all parameters found, indicating the hypervisor they
1505
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1506

1507
  """
1508
  hvp_data = []
1509

    
1510
  for hv_name in cluster.enabled_hypervisors:
1511
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1512

    
1513
  for os_name, os_hvp in cluster.os_hvp.items():
1514
    for hv_name, hv_params in os_hvp.items():
1515
      if hv_params:
1516
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1517
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1518

    
1519
  # TODO: collapse identical parameter values in a single one
1520
  for instance in instances:
1521
    if instance.hvparams:
1522
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1523
                       cluster.FillHV(instance)))
1524

    
1525
  return hvp_data
1526

    
1527

    
1528
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1529
  """Verifies the cluster config.
1530

1531
  """
1532
  REQ_BGL = False
1533

    
1534
  def _VerifyHVP(self, hvp_data):
1535
    """Verifies locally the syntax of the hypervisor parameters.
1536

1537
    """
1538
    for item, hv_name, hv_params in hvp_data:
1539
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1540
             (item, hv_name))
1541
      try:
1542
        hv_class = hypervisor.GetHypervisorClass(hv_name)
1543
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1544
        hv_class.CheckParameterSyntax(hv_params)
1545
      except errors.GenericError, err:
1546
        self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1547

    
1548
  def ExpandNames(self):
1549
    self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1550
    self.share_locks = ShareAll()
1551

    
1552
  def CheckPrereq(self):
1553
    """Check prerequisites.
1554

1555
    """
1556
    # Retrieve all information
1557
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1558
    self.all_node_info = self.cfg.GetAllNodesInfo()
1559
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1560

    
1561
  def Exec(self, feedback_fn):
1562
    """Verify integrity of cluster, performing various test on nodes.
1563

1564
    """
1565
    self.bad = False
1566
    self._feedback_fn = feedback_fn
1567

    
1568
    feedback_fn("* Verifying cluster config")
1569

    
1570
    for msg in self.cfg.VerifyConfig():
1571
      self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1572

    
1573
    feedback_fn("* Verifying cluster certificate files")
1574

    
1575
    for cert_filename in pathutils.ALL_CERT_FILES:
1576
      (errcode, msg) = _VerifyCertificate(cert_filename)
1577
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1578

    
1579
    self._ErrorIf(not utils.CanRead(constants.LUXID_USER,
1580
                                    pathutils.NODED_CERT_FILE),
1581
                  constants.CV_ECLUSTERCERT,
1582
                  None,
1583
                  pathutils.NODED_CERT_FILE + " must be accessible by the " +
1584
                    constants.LUXID_USER + " user")
1585

    
1586
    feedback_fn("* Verifying hypervisor parameters")
1587

    
1588
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1589
                                                self.all_inst_info.values()))
1590

    
1591
    feedback_fn("* Verifying all nodes belong to an existing group")
1592

    
1593
    # We do this verification here because, should this bogus circumstance
1594
    # occur, it would never be caught by VerifyGroup, which only acts on
1595
    # nodes/instances reachable from existing node groups.
1596

    
1597
    dangling_nodes = set(node for node in self.all_node_info.values()
1598
                         if node.group not in self.all_group_info)
1599

    
1600
    dangling_instances = {}
1601
    no_node_instances = []
1602

    
1603
    for inst in self.all_inst_info.values():
1604
      if inst.primary_node in [node.uuid for node in dangling_nodes]:
1605
        dangling_instances.setdefault(inst.primary_node, []).append(inst)
1606
      elif inst.primary_node not in self.all_node_info:
1607
        no_node_instances.append(inst)
1608

    
1609
    pretty_dangling = [
1610
        "%s (%s)" %
1611
        (node.name,
1612
         utils.CommaJoin(inst.name for
1613
                         inst in dangling_instances.get(node.uuid, [])))
1614
        for node in dangling_nodes]
1615

    
1616
    self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1617
                  None,
1618
                  "the following nodes (and their instances) belong to a non"
1619
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1620

    
1621
    self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1622
                  None,
1623
                  "the following instances have a non-existing primary-node:"
1624
                  " %s", utils.CommaJoin(inst.name for
1625
                                         inst in no_node_instances))
1626

    
1627
    return not self.bad
1628

    
1629

    
1630
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1631
  """Verifies the status of a node group.
1632

1633
  """
1634
  HPATH = "cluster-verify"
1635
  HTYPE = constants.HTYPE_CLUSTER
1636
  REQ_BGL = False
1637

    
1638
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1639

    
1640
  class NodeImage(object):
1641
    """A class representing the logical and physical status of a node.
1642

1643
    @type uuid: string
1644
    @ivar uuid: the node UUID to which this object refers
1645
    @ivar volumes: a structure as returned from
1646
        L{ganeti.backend.GetVolumeList} (runtime)
1647
    @ivar instances: a list of running instances (runtime)
1648
    @ivar pinst: list of configured primary instances (config)
1649
    @ivar sinst: list of configured secondary instances (config)
1650
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1651
        instances for which this node is secondary (config)
1652
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1653
    @ivar dfree: free disk, as reported by the node (runtime)
1654
    @ivar offline: the offline status (config)
1655
    @type rpc_fail: boolean
1656
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1657
        not whether the individual keys were correct) (runtime)
1658
    @type lvm_fail: boolean
1659
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1660
    @type hyp_fail: boolean
1661
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1662
    @type ghost: boolean
1663
    @ivar ghost: whether this is a known node or not (config)
1664
    @type os_fail: boolean
1665
    @ivar os_fail: whether the RPC call didn't return valid OS data
1666
    @type oslist: list
1667
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1668
    @type vm_capable: boolean
1669
    @ivar vm_capable: whether the node can host instances
1670
    @type pv_min: float
1671
    @ivar pv_min: size in MiB of the smallest PVs
1672
    @type pv_max: float
1673
    @ivar pv_max: size in MiB of the biggest PVs
1674

1675
    """
1676
    def __init__(self, offline=False, uuid=None, vm_capable=True):
1677
      self.uuid = uuid
1678
      self.volumes = {}
1679
      self.instances = []
1680
      self.pinst = []
1681
      self.sinst = []
1682
      self.sbp = {}
1683
      self.mfree = 0
1684
      self.dfree = 0
1685
      self.offline = offline
1686
      self.vm_capable = vm_capable
1687
      self.rpc_fail = False
1688
      self.lvm_fail = False
1689
      self.hyp_fail = False
1690
      self.ghost = False
1691
      self.os_fail = False
1692
      self.oslist = {}
1693
      self.pv_min = None
1694
      self.pv_max = None
1695

    
1696
  def ExpandNames(self):
1697
    # This raises errors.OpPrereqError on its own:
1698
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1699

    
1700
    # Get instances in node group; this is unsafe and needs verification later
1701
    inst_uuids = \
1702
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1703

    
1704
    self.needed_locks = {
1705
      locking.LEVEL_INSTANCE: self.cfg.GetInstanceNames(inst_uuids),
1706
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1707
      locking.LEVEL_NODE: [],
1708

    
1709
      # This opcode is run by watcher every five minutes and acquires all nodes
1710
      # for a group. It doesn't run for a long time, so it's better to acquire
1711
      # the node allocation lock as well.
1712
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1713
      }
1714

    
1715
    self.share_locks = ShareAll()
1716

    
1717
  def DeclareLocks(self, level):
1718
    if level == locking.LEVEL_NODE:
1719
      # Get members of node group; this is unsafe and needs verification later
1720
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1721

    
1722
      # In Exec(), we warn about mirrored instances that have primary and
1723
      # secondary living in separate node groups. To fully verify that
1724
      # volumes for these instances are healthy, we will need to do an
1725
      # extra call to their secondaries. We ensure here those nodes will
1726
      # be locked.
1727
      for inst_name in self.owned_locks(locking.LEVEL_INSTANCE):
1728
        # Important: access only the instances whose lock is owned
1729
        instance = self.cfg.GetInstanceInfoByName(inst_name)
1730
        if instance.disk_template in constants.DTS_INT_MIRROR:
1731
          nodes.update(instance.secondary_nodes)
1732

    
1733
      self.needed_locks[locking.LEVEL_NODE] = nodes
1734

    
1735
  def CheckPrereq(self):
1736
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1737
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1738

    
1739
    group_node_uuids = set(self.group_info.members)
1740
    group_inst_uuids = \
1741
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1742

    
1743
    unlocked_node_uuids = \
1744
        group_node_uuids.difference(self.owned_locks(locking.LEVEL_NODE))
1745

    
1746
    unlocked_inst_uuids = \
1747
        group_inst_uuids.difference(
1748
          [self.cfg.GetInstanceInfoByName(name).uuid
1749
           for name in self.owned_locks(locking.LEVEL_INSTANCE)])
1750

    
1751
    if unlocked_node_uuids:
1752
      raise errors.OpPrereqError(
1753
        "Missing lock for nodes: %s" %
1754
        utils.CommaJoin(self.cfg.GetNodeNames(unlocked_node_uuids)),
1755
        errors.ECODE_STATE)
1756

    
1757
    if unlocked_inst_uuids:
1758
      raise errors.OpPrereqError(
1759
        "Missing lock for instances: %s" %
1760
        utils.CommaJoin(self.cfg.GetInstanceNames(unlocked_inst_uuids)),
1761
        errors.ECODE_STATE)
1762

    
1763
    self.all_node_info = self.cfg.GetAllNodesInfo()
1764
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1765

    
1766
    self.my_node_uuids = group_node_uuids
1767
    self.my_node_info = dict((node_uuid, self.all_node_info[node_uuid])
1768
                             for node_uuid in group_node_uuids)
1769

    
1770
    self.my_inst_uuids = group_inst_uuids
1771
    self.my_inst_info = dict((inst_uuid, self.all_inst_info[inst_uuid])
1772
                             for inst_uuid in group_inst_uuids)
1773

    
1774
    # We detect here the nodes that will need the extra RPC calls for verifying
1775
    # split LV volumes; they should be locked.
1776
    extra_lv_nodes = set()
1777

    
1778
    for inst in self.my_inst_info.values():
1779
      if inst.disk_template in constants.DTS_INT_MIRROR:
1780
        for nuuid in inst.all_nodes:
1781
          if self.all_node_info[nuuid].group != self.group_uuid:
1782
            extra_lv_nodes.add(nuuid)
1783

    
1784
    unlocked_lv_nodes = \
1785
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1786

    
1787
    if unlocked_lv_nodes:
1788
      raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1789
                                 utils.CommaJoin(unlocked_lv_nodes),
1790
                                 errors.ECODE_STATE)
1791
    self.extra_lv_nodes = list(extra_lv_nodes)
1792

    
1793
  def _VerifyNode(self, ninfo, nresult):
1794
    """Perform some basic validation on data returned from a node.
1795

1796
      - check the result data structure is well formed and has all the
1797
        mandatory fields
1798
      - check ganeti version
1799

1800
    @type ninfo: L{objects.Node}
1801
    @param ninfo: the node to check
1802
    @param nresult: the results from the node
1803
    @rtype: boolean
1804
    @return: whether overall this call was successful (and we can expect
1805
         reasonable values in the respose)
1806

1807
    """
1808
    # main result, nresult should be a non-empty dict
1809
    test = not nresult or not isinstance(nresult, dict)
1810
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1811
                  "unable to verify node: no data returned")
1812
    if test:
1813
      return False
1814

    
1815
    # compares ganeti version
1816
    local_version = constants.PROTOCOL_VERSION
1817
    remote_version = nresult.get("version", None)
1818
    test = not (remote_version and
1819
                isinstance(remote_version, (list, tuple)) and
1820
                len(remote_version) == 2)
1821
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1822
                  "connection to node returned invalid data")
1823
    if test:
1824
      return False
1825

    
1826
    test = local_version != remote_version[0]
1827
    self._ErrorIf(test, constants.CV_ENODEVERSION, ninfo.name,
1828
                  "incompatible protocol versions: master %s,"
1829
                  " node %s", local_version, remote_version[0])
1830
    if test:
1831
      return False
1832

    
1833
    # node seems compatible, we can actually try to look into its results
1834

    
1835
    # full package version
1836
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1837
                  constants.CV_ENODEVERSION, ninfo.name,
1838
                  "software version mismatch: master %s, node %s",
1839
                  constants.RELEASE_VERSION, remote_version[1],
1840
                  code=self.ETYPE_WARNING)
1841

    
1842
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1843
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1844
      for hv_name, hv_result in hyp_result.iteritems():
1845
        test = hv_result is not None
1846
        self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
1847
                      "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1848

    
1849
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1850
    if ninfo.vm_capable and isinstance(hvp_result, list):
1851
      for item, hv_name, hv_result in hvp_result:
1852
        self._ErrorIf(True, constants.CV_ENODEHV, ninfo.name,
1853
                      "hypervisor %s parameter verify failure (source %s): %s",
1854
                      hv_name, item, hv_result)
1855

    
1856
    test = nresult.get(constants.NV_NODESETUP,
1857
                       ["Missing NODESETUP results"])
1858
    self._ErrorIf(test, constants.CV_ENODESETUP, ninfo.name,
1859
                  "node setup error: %s", "; ".join(test))
1860

    
1861
    return True
1862

    
1863
  def _VerifyNodeTime(self, ninfo, nresult,
1864
                      nvinfo_starttime, nvinfo_endtime):
1865
    """Check the node time.
1866

1867
    @type ninfo: L{objects.Node}
1868
    @param ninfo: the node to check
1869
    @param nresult: the remote results for the node
1870
    @param nvinfo_starttime: the start time of the RPC call
1871
    @param nvinfo_endtime: the end time of the RPC call
1872

1873
    """
1874
    ntime = nresult.get(constants.NV_TIME, None)
1875
    try:
1876
      ntime_merged = utils.MergeTime(ntime)
1877
    except (ValueError, TypeError):
1878
      self._ErrorIf(True, constants.CV_ENODETIME, ninfo.name,
1879
                    "Node returned invalid time")
1880
      return
1881

    
1882
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1883
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1884
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1885
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1886
    else:
1887
      ntime_diff = None
1888

    
1889
    self._ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, ninfo.name,
1890
                  "Node time diverges by at least %s from master node time",
1891
                  ntime_diff)
1892

    
1893
  def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1894
    """Check the node LVM results and update info for cross-node checks.
1895

1896
    @type ninfo: L{objects.Node}
1897
    @param ninfo: the node to check
1898
    @param nresult: the remote results for the node
1899
    @param vg_name: the configured VG name
1900
    @type nimg: L{NodeImage}
1901
    @param nimg: node image
1902

1903
    """
1904
    if vg_name is None:
1905
      return
1906

    
1907
    # checks vg existence and size > 20G
1908
    vglist = nresult.get(constants.NV_VGLIST, None)
1909
    test = not vglist
1910
    self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
1911
                  "unable to check volume groups")
1912
    if not test:
1913
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1914
                                            constants.MIN_VG_SIZE)
1915
      self._ErrorIf(vgstatus, constants.CV_ENODELVM, ninfo.name, vgstatus)
1916

    
1917
    # Check PVs
1918
    (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1919
    for em in errmsgs:
1920
      self._Error(constants.CV_ENODELVM, ninfo.name, em)
1921
    if pvminmax is not None:
1922
      (nimg.pv_min, nimg.pv_max) = pvminmax
1923

    
1924
  def _VerifyGroupDRBDVersion(self, node_verify_infos):
1925
    """Check cross-node DRBD version consistency.
1926

1927
    @type node_verify_infos: dict
1928
    @param node_verify_infos: infos about nodes as returned from the
1929
      node_verify call.
1930

1931
    """
1932
    node_versions = {}
1933
    for node_uuid, ndata in node_verify_infos.items():
1934
      nresult = ndata.payload
1935
      if nresult:
1936
        version = nresult.get(constants.NV_DRBDVERSION, "Missing DRBD version")
1937
        node_versions[node_uuid] = version
1938

    
1939
    if len(set(node_versions.values())) > 1:
1940
      for node_uuid, version in sorted(node_versions.items()):
1941
        msg = "DRBD version mismatch: %s" % version
1942
        self._Error(constants.CV_ENODEDRBDHELPER, node_uuid, msg,
1943
                    code=self.ETYPE_WARNING)
1944

    
1945
  def _VerifyGroupLVM(self, node_image, vg_name):
1946
    """Check cross-node consistency in LVM.
1947

1948
    @type node_image: dict
1949
    @param node_image: info about nodes, mapping from node to names to
1950
      L{NodeImage} objects
1951
    @param vg_name: the configured VG name
1952

1953
    """
1954
    if vg_name is None:
1955
      return
1956

    
1957
    # Only exclusive storage needs this kind of checks
1958
    if not self._exclusive_storage:
1959
      return
1960

    
1961
    # exclusive_storage wants all PVs to have the same size (approximately),
1962
    # if the smallest and the biggest ones are okay, everything is fine.
1963
    # pv_min is None iff pv_max is None
1964
    vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1965
    if not vals:
1966
      return
1967
    (pvmin, minnode_uuid) = min((ni.pv_min, ni.uuid) for ni in vals)
1968
    (pvmax, maxnode_uuid) = max((ni.pv_max, ni.uuid) for ni in vals)
1969
    bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1970
    self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1971
                  "PV sizes differ too much in the group; smallest (%s MB) is"
1972
                  " on %s, biggest (%s MB) is on %s",
1973
                  pvmin, self.cfg.GetNodeName(minnode_uuid),
1974
                  pvmax, self.cfg.GetNodeName(maxnode_uuid))
1975

    
1976
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1977
    """Check the node bridges.
1978

1979
    @type ninfo: L{objects.Node}
1980
    @param ninfo: the node to check
1981
    @param nresult: the remote results for the node
1982
    @param bridges: the expected list of bridges
1983

1984
    """
1985
    if not bridges:
1986
      return
1987

    
1988
    missing = nresult.get(constants.NV_BRIDGES, None)
1989
    test = not isinstance(missing, list)
1990
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1991
                  "did not return valid bridge information")
1992
    if not test:
1993
      self._ErrorIf(bool(missing), constants.CV_ENODENET, ninfo.name,
1994
                    "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1995

    
1996
  def _VerifyNodeUserScripts(self, ninfo, nresult):
1997
    """Check the results of user scripts presence and executability on the node
1998

1999
    @type ninfo: L{objects.Node}
2000
    @param ninfo: the node to check
2001
    @param nresult: the remote results for the node
2002

2003
    """
2004
    test = not constants.NV_USERSCRIPTS in nresult
2005
    self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
2006
                  "did not return user scripts information")
2007

    
2008
    broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
2009
    if not test:
2010
      self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
2011
                    "user scripts not present or not executable: %s" %
2012
                    utils.CommaJoin(sorted(broken_scripts)))
2013

    
2014
  def _VerifyNodeNetwork(self, ninfo, nresult):
2015
    """Check the node network connectivity results.
2016

2017
    @type ninfo: L{objects.Node}
2018
    @param ninfo: the node to check
2019
    @param nresult: the remote results for the node
2020

2021
    """
2022
    test = constants.NV_NODELIST not in nresult
2023
    self._ErrorIf(test, constants.CV_ENODESSH, ninfo.name,
2024
                  "node hasn't returned node ssh connectivity data")
2025
    if not test:
2026
      if nresult[constants.NV_NODELIST]:
2027
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
2028
          self._ErrorIf(True, constants.CV_ENODESSH, ninfo.name,
2029
                        "ssh communication with node '%s': %s", a_node, a_msg)
2030

    
2031
    test = constants.NV_NODENETTEST not in nresult
2032
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
2033
                  "node hasn't returned node tcp connectivity data")
2034
    if not test:
2035
      if nresult[constants.NV_NODENETTEST]:
2036
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
2037
        for anode in nlist:
2038
          self._ErrorIf(True, constants.CV_ENODENET, ninfo.name,
2039
                        "tcp communication with node '%s': %s",
2040
                        anode, nresult[constants.NV_NODENETTEST][anode])
2041

    
2042
    test = constants.NV_MASTERIP not in nresult
2043
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
2044
                  "node hasn't returned node master IP reachability data")
2045
    if not test:
2046
      if not nresult[constants.NV_MASTERIP]:
2047
        if ninfo.uuid == self.master_node:
2048
          msg = "the master node cannot reach the master IP (not configured?)"
2049
        else:
2050
          msg = "cannot reach the master IP"
2051
        self._ErrorIf(True, constants.CV_ENODENET, ninfo.name, msg)
2052

    
2053
  def _VerifyInstance(self, instance, node_image, diskstatus):
2054
    """Verify an instance.
2055

2056
    This function checks to see if the required block devices are
2057
    available on the instance's node, and that the nodes are in the correct
2058
    state.
2059

2060
    """
2061
    pnode_uuid = instance.primary_node
2062
    pnode_img = node_image[pnode_uuid]
2063
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
2064

    
2065
    node_vol_should = {}
2066
    instance.MapLVsByNode(node_vol_should)
2067

    
2068
    cluster = self.cfg.GetClusterInfo()
2069
    ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2070
                                                            self.group_info)
2071
    err = ComputeIPolicyInstanceViolation(ipolicy, instance, self.cfg)
2072
    self._ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance.name,
2073
                  utils.CommaJoin(err), code=self.ETYPE_WARNING)
2074

    
2075
    for node_uuid in node_vol_should:
2076
      n_img = node_image[node_uuid]
2077
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
2078
        # ignore missing volumes on offline or broken nodes
2079
        continue
2080
      for volume in node_vol_should[node_uuid]:
2081
        test = volume not in n_img.volumes
2082
        self._ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance.name,
2083
                      "volume %s missing on node %s", volume,
2084
                      self.cfg.GetNodeName(node_uuid))
2085

    
2086
    if instance.admin_state == constants.ADMINST_UP:
2087
      test = instance.uuid not in pnode_img.instances and not pnode_img.offline
2088
      self._ErrorIf(test, constants.CV_EINSTANCEDOWN, instance.name,
2089
                    "instance not running on its primary node %s",
2090
                     self.cfg.GetNodeName(pnode_uuid))
2091
      self._ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE,
2092
                    instance.name, "instance is marked as running and lives on"
2093
                    " offline node %s", self.cfg.GetNodeName(pnode_uuid))
2094

    
2095
    diskdata = [(nname, success, status, idx)
2096
                for (nname, disks) in diskstatus.items()
2097
                for idx, (success, status) in enumerate(disks)]
2098

    
2099
    for nname, success, bdev_status, idx in diskdata:
2100
      # the 'ghost node' construction in Exec() ensures that we have a
2101
      # node here
2102
      snode = node_image[nname]
2103
      bad_snode = snode.ghost or snode.offline
2104
      self._ErrorIf(instance.disks_active and
2105
                    not success and not bad_snode,
2106
                    constants.CV_EINSTANCEFAULTYDISK, instance.name,
2107
                    "couldn't retrieve status for disk/%s on %s: %s",
2108
                    idx, self.cfg.GetNodeName(nname), bdev_status)
2109

    
2110
      if instance.disks_active and success and \
2111
         (bdev_status.is_degraded or
2112
          bdev_status.ldisk_status != constants.LDS_OKAY):
2113
        msg = "disk/%s on %s" % (idx, self.cfg.GetNodeName(nname))
2114
        if bdev_status.is_degraded:
2115
          msg += " is degraded"
2116
        if bdev_status.ldisk_status != constants.LDS_OKAY:
2117
          msg += "; state is '%s'" % \
2118
                 constants.LDS_NAMES[bdev_status.ldisk_status]
2119

    
2120
        self._Error(constants.CV_EINSTANCEFAULTYDISK, instance.name, msg)
2121

    
2122
    self._ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2123
                  constants.CV_ENODERPC, self.cfg.GetNodeName(pnode_uuid),
2124
                  "instance %s, connection to primary node failed",
2125
                  instance.name)
2126

    
2127
    self._ErrorIf(len(instance.secondary_nodes) > 1,
2128
                  constants.CV_EINSTANCELAYOUT, instance.name,
2129
                  "instance has multiple secondary nodes: %s",
2130
                  utils.CommaJoin(instance.secondary_nodes),
2131
                  code=self.ETYPE_WARNING)
2132

    
2133
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, instance.all_nodes)
2134
    if any(es_flags.values()):
2135
      if instance.disk_template not in constants.DTS_EXCL_STORAGE:
2136
        # Disk template not compatible with exclusive_storage: no instance
2137
        # node should have the flag set
2138
        es_nodes = [n
2139
                    for (n, es) in es_flags.items()
2140
                    if es]
2141
        self._Error(constants.CV_EINSTANCEUNSUITABLENODE, instance.name,
2142
                    "instance has template %s, which is not supported on nodes"
2143
                    " that have exclusive storage set: %s",
2144
                    instance.disk_template,
2145
                    utils.CommaJoin(self.cfg.GetNodeNames(es_nodes)))
2146
      for (idx, disk) in enumerate(instance.disks):
2147
        self._ErrorIf(disk.spindles is None,
2148
                      constants.CV_EINSTANCEMISSINGCFGPARAMETER, instance.name,
2149
                      "number of spindles not configured for disk %s while"
2150
                      " exclusive storage is enabled, try running"
2151
                      " gnt-cluster repair-disk-sizes", idx)
2152

    
2153
    if instance.disk_template in constants.DTS_INT_MIRROR:
2154
      instance_nodes = utils.NiceSort(instance.all_nodes)
2155
      instance_groups = {}
2156

    
2157
      for node_uuid in instance_nodes:
2158
        instance_groups.setdefault(self.all_node_info[node_uuid].group,
2159
                                   []).append(node_uuid)
2160

    
2161
      pretty_list = [
2162
        "%s (group %s)" % (utils.CommaJoin(self.cfg.GetNodeNames(nodes)),
2163
                           groupinfo[group].name)
2164
        # Sort so that we always list the primary node first.
2165
        for group, nodes in sorted(instance_groups.items(),
2166
                                   key=lambda (_, nodes): pnode_uuid in nodes,
2167
                                   reverse=True)]
2168

    
2169
      self._ErrorIf(len(instance_groups) > 1,
2170
                    constants.CV_EINSTANCESPLITGROUPS,
2171
                    instance.name, "instance has primary and secondary nodes in"
2172
                    " different groups: %s", utils.CommaJoin(pretty_list),
2173
                    code=self.ETYPE_WARNING)
2174

    
2175
    inst_nodes_offline = []
2176
    for snode in instance.secondary_nodes:
2177
      s_img = node_image[snode]
2178
      self._ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
2179
                    self.cfg.GetNodeName(snode),
2180
                    "instance %s, connection to secondary node failed",
2181
                    instance.name)
2182

    
2183
      if s_img.offline:
2184
        inst_nodes_offline.append(snode)
2185

    
2186
    # warn that the instance lives on offline nodes
2187
    self._ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE,
2188
                  instance.name, "instance has offline secondary node(s) %s",
2189
                  utils.CommaJoin(self.cfg.GetNodeNames(inst_nodes_offline)))
2190
    # ... or ghost/non-vm_capable nodes
2191
    for node_uuid in instance.all_nodes:
2192
      self._ErrorIf(node_image[node_uuid].ghost, constants.CV_EINSTANCEBADNODE,
2193
                    instance.name, "instance lives on ghost node %s",
2194
                    self.cfg.GetNodeName(node_uuid))
2195
      self._ErrorIf(not node_image[node_uuid].vm_capable,
2196
                    constants.CV_EINSTANCEBADNODE, instance.name,
2197
                    "instance lives on non-vm_capable node %s",
2198
                    self.cfg.GetNodeName(node_uuid))
2199

    
2200
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2201
    """Verify if there are any unknown volumes in the cluster.
2202

2203
    The .os, .swap and backup volumes are ignored. All other volumes are
2204
    reported as unknown.
2205

2206
    @type reserved: L{ganeti.utils.FieldSet}
2207
    @param reserved: a FieldSet of reserved volume names
2208

2209
    """
2210
    for node_uuid, n_img in node_image.items():
2211
      if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
2212
          self.all_node_info[node_uuid].group != self.group_uuid):
2213
        # skip non-healthy nodes
2214
        continue
2215
      for volume in n_img.volumes:
2216
        test = ((node_uuid not in node_vol_should or
2217
                volume not in node_vol_should[node_uuid]) and
2218
                not reserved.Matches(volume))
2219
        self._ErrorIf(test, constants.CV_ENODEORPHANLV,
2220
                      self.cfg.GetNodeName(node_uuid),
2221
                      "volume %s is unknown", volume)
2222

    
2223
  def _VerifyNPlusOneMemory(self, node_image, all_insts):
2224
    """Verify N+1 Memory Resilience.
2225

2226
    Check that if one single node dies we can still start all the
2227
    instances it was primary for.
2228

2229
    """
2230
    cluster_info = self.cfg.GetClusterInfo()
2231
    for node_uuid, n_img in node_image.items():
2232
      # This code checks that every node which is now listed as
2233
      # secondary has enough memory to host all instances it is
2234
      # supposed to should a single other node in the cluster fail.
2235
      # FIXME: not ready for failover to an arbitrary node
2236
      # FIXME: does not support file-backed instances
2237
      # WARNING: we currently take into account down instances as well
2238
      # as up ones, considering that even if they're down someone
2239
      # might want to start them even in the event of a node failure.
2240
      if n_img.offline or \
2241
         self.all_node_info[node_uuid].group != self.group_uuid:
2242
        # we're skipping nodes marked offline and nodes in other groups from
2243
        # the N+1 warning, since most likely we don't have good memory
2244
        # information from them; we already list instances living on such
2245
        # nodes, and that's enough warning
2246
        continue
2247
      #TODO(dynmem): also consider ballooning out other instances
2248
      for prinode, inst_uuids in n_img.sbp.items():
2249
        needed_mem = 0
2250
        for inst_uuid in inst_uuids:
2251
          bep = cluster_info.FillBE(all_insts[inst_uuid])
2252
          if bep[constants.BE_AUTO_BALANCE]:
2253
            needed_mem += bep[constants.BE_MINMEM]
2254
        test = n_img.mfree < needed_mem
2255
        self._ErrorIf(test, constants.CV_ENODEN1,
2256
                      self.cfg.GetNodeName(node_uuid),
2257
                      "not enough memory to accomodate instance failovers"
2258
                      " should node %s fail (%dMiB needed, %dMiB available)",
2259
                      self.cfg.GetNodeName(prinode), needed_mem, n_img.mfree)
2260

    
2261
  def _VerifyFiles(self, nodes, master_node_uuid, all_nvinfo,
2262
                   (files_all, files_opt, files_mc, files_vm)):
2263
    """Verifies file checksums collected from all nodes.
2264

2265
    @param nodes: List of L{objects.Node} objects
2266
    @param master_node_uuid: UUID of master node
2267
    @param all_nvinfo: RPC results
2268

2269
    """
2270
    # Define functions determining which nodes to consider for a file
2271
    files2nodefn = [
2272
      (files_all, None),
2273
      (files_mc, lambda node: (node.master_candidate or
2274
                               node.uuid == master_node_uuid)),
2275
      (files_vm, lambda node: node.vm_capable),
2276
      ]
2277

    
2278
    # Build mapping from filename to list of nodes which should have the file
2279
    nodefiles = {}
2280
    for (files, fn) in files2nodefn:
2281
      if fn is None:
2282
        filenodes = nodes
2283
      else:
2284
        filenodes = filter(fn, nodes)
2285
      nodefiles.update((filename,
2286
                        frozenset(map(operator.attrgetter("uuid"), filenodes)))
2287
                       for filename in files)
2288

    
2289
    assert set(nodefiles) == (files_all | files_mc | files_vm)
2290

    
2291
    fileinfo = dict((filename, {}) for filename in nodefiles)
2292
    ignore_nodes = set()
2293

    
2294
    for node in nodes:
2295
      if node.offline:
2296
        ignore_nodes.add(node.uuid)
2297
        continue
2298

    
2299
      nresult = all_nvinfo[node.uuid]
2300

    
2301
      if nresult.fail_msg or not nresult.payload:
2302
        node_files = None
2303
      else:
2304
        fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
2305
        node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2306
                          for (key, value) in fingerprints.items())
2307
        del fingerprints
2308

    
2309
      test = not (node_files and isinstance(node_files, dict))
2310
      self._ErrorIf(test, constants.CV_ENODEFILECHECK, node.name,
2311
                    "Node did not return file checksum data")
2312
      if test:
2313
        ignore_nodes.add(node.uuid)
2314
        continue
2315

    
2316
      # Build per-checksum mapping from filename to nodes having it
2317
      for (filename, checksum) in node_files.items():
2318
        assert filename in nodefiles
2319
        fileinfo[filename].setdefault(checksum, set()).add(node.uuid)
2320

    
2321
    for (filename, checksums) in fileinfo.items():
2322
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2323

    
2324
      # Nodes having the file
2325
      with_file = frozenset(node_uuid
2326
                            for node_uuids in fileinfo[filename].values()
2327
                            for node_uuid in node_uuids) - ignore_nodes
2328

    
2329
      expected_nodes = nodefiles[filename] - ignore_nodes
2330

    
2331
      # Nodes missing file
2332
      missing_file = expected_nodes - with_file
2333

    
2334
      if filename in files_opt:
2335
        # All or no nodes
2336
        self._ErrorIf(missing_file and missing_file != expected_nodes,
2337
                      constants.CV_ECLUSTERFILECHECK, None,
2338
                      "File %s is optional, but it must exist on all or no"
2339
                      " nodes (not found on %s)",
2340
                      filename,
2341
                      utils.CommaJoin(
2342
                        utils.NiceSort(
2343
                          map(self.cfg.GetNodeName, missing_file))))
2344
      else:
2345
        self._ErrorIf(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2346
                      "File %s is missing from node(s) %s", filename,
2347
                      utils.CommaJoin(
2348
                        utils.NiceSort(
2349
                          map(self.cfg.GetNodeName, missing_file))))
2350

    
2351
        # Warn if a node has a file it shouldn't
2352
        unexpected = with_file - expected_nodes
2353
        self._ErrorIf(unexpected,
2354
                      constants.CV_ECLUSTERFILECHECK, None,
2355
                      "File %s should not exist on node(s) %s",
2356
                      filename, utils.CommaJoin(
2357
                        utils.NiceSort(map(self.cfg.GetNodeName, unexpected))))
2358

    
2359
      # See if there are multiple versions of the file
2360
      test = len(checksums) > 1
2361
      if test:
2362
        variants = ["variant %s on %s" %
2363
                    (idx + 1,
2364
                     utils.CommaJoin(utils.NiceSort(
2365
                       map(self.cfg.GetNodeName, node_uuids))))
2366
                    for (idx, (checksum, node_uuids)) in
2367
                      enumerate(sorted(checksums.items()))]
2368
      else:
2369
        variants = []
2370

    
2371
      self._ErrorIf(test, constants.CV_ECLUSTERFILECHECK, None,
2372
                    "File %s found with %s different checksums (%s)",
2373
                    filename, len(checksums), "; ".join(variants))
2374

    
2375
  def _VerifyNodeDrbdHelper(self, ninfo, nresult, drbd_helper):
2376
    """Verify the drbd helper.
2377

2378
    """
2379
    if drbd_helper:
2380
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2381
      test = (helper_result is None)
2382
      self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2383
                    "no drbd usermode helper returned")
2384
      if helper_result:
2385
        status, payload = helper_result
2386
        test = not status
2387
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2388
                      "drbd usermode helper check unsuccessful: %s", payload)
2389
        test = status and (payload != drbd_helper)
2390
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2391
                      "wrong drbd usermode helper: %s", payload)
2392

    
2393
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2394
                      drbd_map):
2395
    """Verifies and the node DRBD status.
2396

2397
    @type ninfo: L{objects.Node}
2398
    @param ninfo: the node to check
2399
    @param nresult: the remote results for the node
2400
    @param instanceinfo: the dict of instances
2401
    @param drbd_helper: the configured DRBD usermode helper
2402
    @param drbd_map: the DRBD map as returned by
2403
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2404

2405
    """
2406
    self._VerifyNodeDrbdHelper(ninfo, nresult, drbd_helper)
2407

    
2408
    # compute the DRBD minors
2409
    node_drbd = {}
2410
    for minor, inst_uuid in drbd_map[ninfo.uuid].items():
2411
      test = inst_uuid not in instanceinfo
2412
      self._ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2413
                    "ghost instance '%s' in temporary DRBD map", inst_uuid)
2414
        # ghost instance should not be running, but otherwise we
2415
        # don't give double warnings (both ghost instance and
2416
        # unallocated minor in use)
2417
      if test:
2418
        node_drbd[minor] = (inst_uuid, False)
2419
      else:
2420
        instance = instanceinfo[inst_uuid]
2421
        node_drbd[minor] = (inst_uuid, instance.disks_active)
2422

    
2423
    # and now check them
2424
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2425
    test = not isinstance(used_minors, (tuple, list))
2426
    self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2427
                  "cannot parse drbd status file: %s", str(used_minors))
2428
    if test:
2429
      # we cannot check drbd status
2430
      return
2431

    
2432
    for minor, (inst_uuid, must_exist) in node_drbd.items():
2433
      test = minor not in used_minors and must_exist
2434
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2435
                    "drbd minor %d of instance %s is not active", minor,
2436
                    self.cfg.GetInstanceName(inst_uuid))
2437
    for minor in used_minors:
2438
      test = minor not in node_drbd
2439
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2440
                    "unallocated drbd minor %d is in use", minor)
2441

    
2442
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2443
    """Builds the node OS structures.
2444

2445
    @type ninfo: L{objects.Node}
2446
    @param ninfo: the node to check
2447
    @param nresult: the remote results for the node
2448
    @param nimg: the node image object
2449

2450
    """
2451
    remote_os = nresult.get(constants.NV_OSLIST, None)
2452
    test = (not isinstance(remote_os, list) or
2453
            not compat.all(isinstance(v, list) and len(v) == 7
2454
                           for v in remote_os))
2455

    
2456
    self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2457
                  "node hasn't returned valid OS data")
2458

    
2459
    nimg.os_fail = test
2460

    
2461
    if test:
2462
      return
2463

    
2464
    os_dict = {}
2465

    
2466
    for (name, os_path, status, diagnose,
2467
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2468

    
2469
      if name not in os_dict:
2470
        os_dict[name] = []
2471

    
2472
      # parameters is a list of lists instead of list of tuples due to
2473
      # JSON lacking a real tuple type, fix it:
2474
      parameters = [tuple(v) for v in parameters]
2475
      os_dict[name].append((os_path, status, diagnose,
2476
                            set(variants), set(parameters), set(api_ver)))
2477

    
2478
    nimg.oslist = os_dict
2479

    
2480
  def _VerifyNodeOS(self, ninfo, nimg, base):
2481
    """Verifies the node OS list.
2482

2483
    @type ninfo: L{objects.Node}
2484
    @param ninfo: the node to check
2485
    @param nimg: the node image object
2486
    @param base: the 'template' node we match against (e.g. from the master)
2487

2488
    """
2489
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2490

    
2491
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2492
    for os_name, os_data in nimg.oslist.items():
2493
      assert os_data, "Empty OS status for OS %s?!" % os_name
2494
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2495
      self._ErrorIf(not f_status, constants.CV_ENODEOS, ninfo.name,
2496
                    "Invalid OS %s (located at %s): %s",
2497
                    os_name, f_path, f_diag)
2498
      self._ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, ninfo.name,
2499
                    "OS '%s' has multiple entries"
2500
                    " (first one shadows the rest): %s",
2501
                    os_name, utils.CommaJoin([v[0] for v in os_data]))
2502
      # comparisons with the 'base' image
2503
      test = os_name not in base.oslist
2504
      self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2505
                    "Extra OS %s not present on reference node (%s)",
2506
                    os_name, self.cfg.GetNodeName(base.uuid))
2507
      if test:
2508
        continue
2509
      assert base.oslist[os_name], "Base node has empty OS status?"
2510
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2511
      if not b_status:
2512
        # base OS is invalid, skipping
2513
        continue
2514
      for kind, a, b in [("API version", f_api, b_api),
2515
                         ("variants list", f_var, b_var),
2516
                         ("parameters", beautify_params(f_param),
2517
                          beautify_params(b_param))]:
2518
        self._ErrorIf(a != b, constants.CV_ENODEOS, ninfo.name,
2519
                      "OS %s for %s differs from reference node %s:"
2520
                      " [%s] vs. [%s]", kind, os_name,
2521
                      self.cfg.GetNodeName(base.uuid),
2522
                      utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2523

    
2524
    # check any missing OSes
2525
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2526
    self._ErrorIf(missing, constants.CV_ENODEOS, ninfo.name,
2527
                  "OSes present on reference node %s"
2528
                  " but missing on this node: %s",
2529
                  self.cfg.GetNodeName(base.uuid), utils.CommaJoin(missing))
2530

    
2531
  def _VerifyAcceptedFileStoragePaths(self, ninfo, nresult, is_master):
2532
    """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2533

2534
    @type ninfo: L{objects.Node}
2535
    @param ninfo: the node to check
2536
    @param nresult: the remote results for the node
2537
    @type is_master: bool
2538
    @param is_master: Whether node is the master node
2539

2540
    """
2541
    cluster = self.cfg.GetClusterInfo()
2542
    if (is_master and
2543
        (cluster.IsFileStorageEnabled() or
2544
         cluster.IsSharedFileStorageEnabled())):
2545
      try:
2546
        fspaths = nresult[constants.NV_ACCEPTED_STORAGE_PATHS]
2547
      except KeyError:
2548
        # This should never happen
2549
        self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2550
                      "Node did not return forbidden file storage paths")
2551
      else:
2552
        self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2553
                      "Found forbidden file storage paths: %s",
2554
                      utils.CommaJoin(fspaths))
2555
    else:
2556
      self._ErrorIf(constants.NV_ACCEPTED_STORAGE_PATHS in nresult,
2557
                    constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2558
                    "Node should not have returned forbidden file storage"
2559
                    " paths")
2560

    
2561
  def _VerifyStoragePaths(self, ninfo, nresult, file_disk_template,
2562
                          verify_key, error_key):
2563
    """Verifies (file) storage paths.
2564

2565
    @type ninfo: L{objects.Node}
2566
    @param ninfo: the node to check
2567
    @param nresult: the remote results for the node
2568
    @type file_disk_template: string
2569
    @param file_disk_template: file-based disk template, whose directory
2570
        is supposed to be verified
2571
    @type verify_key: string
2572
    @param verify_key: key for the verification map of this file
2573
        verification step
2574
    @param error_key: error key to be added to the verification results
2575
        in case something goes wrong in this verification step
2576

2577
    """
2578
    assert (file_disk_template in
2579
            utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
2580
    cluster = self.cfg.GetClusterInfo()
2581
    if cluster.IsDiskTemplateEnabled(file_disk_template):
2582
      self._ErrorIf(
2583
          verify_key in nresult,
2584
          error_key, ninfo.name,
2585
          "The configured %s storage path is unusable: %s" %
2586
          (file_disk_template, nresult.get(verify_key)))
2587

    
2588
  def _VerifyFileStoragePaths(self, ninfo, nresult):
2589
    """Verifies (file) storage paths.
2590

2591
    @see: C{_VerifyStoragePaths}
2592

2593
    """
2594
    self._VerifyStoragePaths(
2595
        ninfo, nresult, constants.DT_FILE,
2596
        constants.NV_FILE_STORAGE_PATH,
2597
        constants.CV_ENODEFILESTORAGEPATHUNUSABLE)
2598

    
2599
  def _VerifySharedFileStoragePaths(self, ninfo, nresult):
2600
    """Verifies (file) storage paths.
2601

2602
    @see: C{_VerifyStoragePaths}
2603

2604
    """
2605
    self._VerifyStoragePaths(
2606
        ninfo, nresult, constants.DT_SHARED_FILE,
2607
        constants.NV_SHARED_FILE_STORAGE_PATH,
2608
        constants.CV_ENODESHAREDFILESTORAGEPATHUNUSABLE)
2609

    
2610
  def _VerifyOob(self, ninfo, nresult):
2611
    """Verifies out of band functionality of a node.
2612

2613
    @type ninfo: L{objects.Node}
2614
    @param ninfo: the node to check
2615
    @param nresult: the remote results for the node
2616

2617
    """
2618
    # We just have to verify the paths on master and/or master candidates
2619
    # as the oob helper is invoked on the master
2620
    if ((ninfo.master_candidate or ninfo.master_capable) and
2621
        constants.NV_OOB_PATHS in nresult):
2622
      for path_result in nresult[constants.NV_OOB_PATHS]:
2623
        self._ErrorIf(path_result, constants.CV_ENODEOOBPATH,
2624
                      ninfo.name, path_result)
2625

    
2626
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2627
    """Verifies and updates the node volume data.
2628

2629
    This function will update a L{NodeImage}'s internal structures
2630
    with data from the remote call.
2631

2632
    @type ninfo: L{objects.Node}
2633
    @param ninfo: the node to check
2634
    @param nresult: the remote results for the node
2635
    @param nimg: the node image object
2636
    @param vg_name: the configured VG name
2637

2638
    """
2639
    nimg.lvm_fail = True
2640
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2641
    if vg_name is None:
2642
      pass
2643
    elif isinstance(lvdata, basestring):
2644
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2645
                    "LVM problem on node: %s", utils.SafeEncode(lvdata))
2646
    elif not isinstance(lvdata, dict):
2647
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2648
                    "rpc call to node failed (lvlist)")
2649
    else:
2650
      nimg.volumes = lvdata
2651
      nimg.lvm_fail = False
2652

    
2653
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2654
    """Verifies and updates the node instance list.
2655

2656
    If the listing was successful, then updates this node's instance
2657
    list. Otherwise, it marks the RPC call as failed for the instance
2658
    list key.
2659

2660
    @type ninfo: L{objects.Node}
2661
    @param ninfo: the node to check
2662
    @param nresult: the remote results for the node
2663
    @param nimg: the node image object
2664

2665
    """
2666
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2667
    test = not isinstance(idata, list)
2668
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2669
                  "rpc call to node failed (instancelist): %s",
2670
                  utils.SafeEncode(str(idata)))
2671
    if test:
2672
      nimg.hyp_fail = True
2673
    else:
2674
      nimg.instances = [inst.uuid for (_, inst) in
2675
                        self.cfg.GetMultiInstanceInfoByName(idata)]
2676

    
2677
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2678
    """Verifies and computes a node information map
2679

2680
    @type ninfo: L{objects.Node}
2681
    @param ninfo: the node to check
2682
    @param nresult: the remote results for the node
2683
    @param nimg: the node image object
2684
    @param vg_name: the configured VG name
2685

2686
    """
2687
    # try to read free memory (from the hypervisor)
2688
    hv_info = nresult.get(constants.NV_HVINFO, None)
2689
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2690
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2691
                  "rpc call to node failed (hvinfo)")
2692
    if not test:
2693
      try:
2694
        nimg.mfree = int(hv_info["memory_free"])
2695
      except (ValueError, TypeError):
2696
        self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2697
                      "node returned invalid nodeinfo, check hypervisor")
2698

    
2699
    # FIXME: devise a free space model for file based instances as well
2700
    if vg_name is not None:
2701
      test = (constants.NV_VGLIST not in nresult or
2702
              vg_name not in nresult[constants.NV_VGLIST])
2703
      self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
2704
                    "node didn't return data for the volume group '%s'"
2705
                    " - it is either missing or broken", vg_name)
2706
      if not test:
2707
        try:
2708
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2709
        except (ValueError, TypeError):
2710
          self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2711
                        "node returned invalid LVM info, check LVM status")
2712

    
2713
  def _CollectDiskInfo(self, node_uuids, node_image, instanceinfo):
2714
    """Gets per-disk status information for all instances.
2715

2716
    @type node_uuids: list of strings
2717
    @param node_uuids: Node UUIDs
2718
    @type node_image: dict of (UUID, L{objects.Node})
2719
    @param node_image: Node objects
2720
    @type instanceinfo: dict of (UUID, L{objects.Instance})
2721
    @param instanceinfo: Instance objects
2722
    @rtype: {instance: {node: [(succes, payload)]}}
2723
    @return: a dictionary of per-instance dictionaries with nodes as
2724
        keys and disk information as values; the disk information is a
2725
        list of tuples (success, payload)
2726

2727
    """
2728
    node_disks = {}
2729
    node_disks_dev_inst_only = {}
2730
    diskless_instances = set()
2731
    diskless = constants.DT_DISKLESS
2732

    
2733
    for nuuid in node_uuids:
2734
      node_inst_uuids = list(itertools.chain(node_image[nuuid].pinst,
2735
                                             node_image[nuuid].sinst))
2736
      diskless_instances.update(uuid for uuid in node_inst_uuids
2737
                                if instanceinfo[uuid].disk_template == diskless)
2738
      disks = [(inst_uuid, disk)
2739
               for inst_uuid in node_inst_uuids
2740
               for disk in instanceinfo[inst_uuid].disks]
2741

    
2742
      if not disks:
2743
        # No need to collect data
2744
        continue
2745

    
2746
      node_disks[nuuid] = disks
2747

    
2748
      # _AnnotateDiskParams makes already copies of the disks
2749
      dev_inst_only = []
2750
      for (inst_uuid, dev) in disks:
2751
        (anno_disk,) = AnnotateDiskParams(instanceinfo[inst_uuid], [dev],
2752
                                          self.cfg)
2753
        dev_inst_only.append((anno_disk, instanceinfo[inst_uuid]))
2754

    
2755
      node_disks_dev_inst_only[nuuid] = dev_inst_only
2756

    
2757
    assert len(node_disks) == len(node_disks_dev_inst_only)
2758

    
2759
    # Collect data from all nodes with disks
2760
    result = self.rpc.call_blockdev_getmirrorstatus_multi(
2761
               node_disks.keys(), node_disks_dev_inst_only)
2762

    
2763
    assert len(result) == len(node_disks)
2764

    
2765
    instdisk = {}
2766

    
2767
    for (nuuid, nres) in result.items():
2768
      node = self.cfg.GetNodeInfo(nuuid)
2769
      disks = node_disks[node.uuid]
2770

    
2771
      if nres.offline:
2772
        # No data from this node
2773
        data = len(disks) * [(False, "node offline")]
2774
      else:
2775
        msg = nres.fail_msg
2776
        self._ErrorIf(msg, constants.CV_ENODERPC, node.name,
2777
                      "while getting disk information: %s", msg)
2778
        if msg:
2779
          # No data from this node
2780
          data = len(disks) * [(False, msg)]
2781
        else:
2782
          data = []
2783
          for idx, i in enumerate(nres.payload):
2784
            if isinstance(i, (tuple, list)) and len(i) == 2:
2785
              data.append(i)
2786
            else:
2787
              logging.warning("Invalid result from node %s, entry %d: %s",
2788
                              node.name, idx, i)
2789
              data.append((False, "Invalid result from the remote node"))
2790

    
2791
      for ((inst_uuid, _), status) in zip(disks, data):
2792
        instdisk.setdefault(inst_uuid, {}).setdefault(node.uuid, []) \
2793
          .append(status)
2794

    
2795
    # Add empty entries for diskless instances.
2796
    for inst_uuid in diskless_instances:
2797
      assert inst_uuid not in instdisk
2798
      instdisk[inst_uuid] = {}
2799

    
2800
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2801
                      len(nuuids) <= len(instanceinfo[inst].all_nodes) and
2802
                      compat.all(isinstance(s, (tuple, list)) and
2803
                                 len(s) == 2 for s in statuses)
2804
                      for inst, nuuids in instdisk.items()
2805
                      for nuuid, statuses in nuuids.items())
2806
    if __debug__:
2807
      instdisk_keys = set(instdisk)
2808
      instanceinfo_keys = set(instanceinfo)
2809
      assert instdisk_keys == instanceinfo_keys, \
2810
        ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2811
         (instdisk_keys, instanceinfo_keys))
2812

    
2813
    return instdisk
2814

    
2815
  @staticmethod
2816
  def _SshNodeSelector(group_uuid, all_nodes):
2817
    """Create endless iterators for all potential SSH check hosts.
2818

2819
    """
2820
    nodes = [node for node in all_nodes
2821
             if (node.group != group_uuid and
2822
                 not node.offline)]
2823
    keyfunc = operator.attrgetter("group")
2824

    
2825
    return map(itertools.cycle,
2826
               [sorted(map(operator.attrgetter("name"), names))
2827
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2828
                                                  keyfunc)])
2829

    
2830
  @classmethod
2831
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2832
    """Choose which nodes should talk to which other nodes.
2833

2834
    We will make nodes contact all nodes in their group, and one node from
2835
    every other group.
2836

2837
    @warning: This algorithm has a known issue if one node group is much
2838
      smaller than others (e.g. just one node). In such a case all other
2839
      nodes will talk to the single node.
2840

2841
    """
2842
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2843
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
2844

    
2845
    return (online_nodes,
2846
            dict((name, sorted([i.next() for i in sel]))
2847
                 for name in online_nodes))
2848

    
2849
  def BuildHooksEnv(self):
2850
    """Build hooks env.
2851

2852
    Cluster-Verify hooks just ran in the post phase and their failure makes
2853
    the output be logged in the verify output and the verification to fail.
2854

2855
    """
2856
    env = {
2857
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2858
      }
2859

    
2860
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2861
               for node in self.my_node_info.values())
2862

    
2863
    return env
2864

    
2865
  def BuildHooksNodes(self):
2866
    """Build hooks nodes.
2867

2868
    """
2869
    return ([], list(self.my_node_info.keys()))
2870

    
2871
  def Exec(self, feedback_fn):
2872
    """Verify integrity of the node group, performing various test on nodes.
2873

2874
    """
2875
    # This method has too many local variables. pylint: disable=R0914
2876
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
2877

    
2878
    if not self.my_node_uuids:
2879
      # empty node group
2880
      feedback_fn("* Empty node group, skipping verification")
2881
      return True
2882

    
2883
    self.bad = False
2884
    verbose = self.op.verbose
2885
    self._feedback_fn = feedback_fn
2886

    
2887
    vg_name = self.cfg.GetVGName()
2888
    drbd_helper = self.cfg.GetDRBDHelper()
2889
    cluster = self.cfg.GetClusterInfo()
2890
    hypervisors = cluster.enabled_hypervisors
2891
    node_data_list = self.my_node_info.values()
2892

    
2893
    i_non_redundant = [] # Non redundant instances
2894
    i_non_a_balanced = [] # Non auto-balanced instances
2895
    i_offline = 0 # Count of offline instances
2896
    n_offline = 0 # Count of offline nodes
2897
    n_drained = 0 # Count of nodes being drained
2898
    node_vol_should = {}
2899

    
2900
    # FIXME: verify OS list
2901

    
2902
    # File verification
2903
    filemap = ComputeAncillaryFiles(cluster, False)
2904

    
2905
    # do local checksums
2906
    master_node_uuid = self.master_node = self.cfg.GetMasterNode()
2907
    master_ip = self.cfg.GetMasterIP()
2908

    
2909
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_uuids))
2910

    
2911
    user_scripts = []
2912
    if self.cfg.GetUseExternalMipScript():
2913
      user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2914

    
2915
    node_verify_param = {
2916
      constants.NV_FILELIST:
2917
        map(vcluster.MakeVirtualPath,
2918
            utils.UniqueSequence(filename
2919
                                 for files in filemap
2920
                                 for filename in files)),
2921
      constants.NV_NODELIST:
2922
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2923
                                  self.all_node_info.values()),
2924
      constants.NV_HYPERVISOR: hypervisors,
2925
      constants.NV_HVPARAMS:
2926
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2927
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2928
                                 for node in node_data_list
2929
                                 if not node.offline],
2930
      constants.NV_INSTANCELIST: hypervisors,
2931
      constants.NV_VERSION: None,
2932
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2933
      constants.NV_NODESETUP: None,
2934
      constants.NV_TIME: None,
2935
      constants.NV_MASTERIP: (self.cfg.GetMasterNodeName(), master_ip),
2936
      constants.NV_OSLIST: None,
2937
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2938
      constants.NV_USERSCRIPTS: user_scripts,
2939
      }
2940

    
2941
    if vg_name is not None:
2942
      node_verify_param[constants.NV_VGLIST] = None
2943
      node_verify_param[constants.NV_LVLIST] = vg_name
2944
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2945

    
2946
    if cluster.IsDiskTemplateEnabled(constants.DT_DRBD8):
2947
      if drbd_helper:
2948
        node_verify_param[constants.NV_DRBDVERSION] = None
2949
        node_verify_param[constants.NV_DRBDLIST] = None
2950
        node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2951

    
2952
    if cluster.IsFileStorageEnabled() or \
2953
        cluster.IsSharedFileStorageEnabled():
2954
      # Load file storage paths only from master node
2955
      node_verify_param[constants.NV_ACCEPTED_STORAGE_PATHS] = \
2956
        self.cfg.GetMasterNodeName()
2957
      if cluster.IsFileStorageEnabled():
2958
        node_verify_param[constants.NV_FILE_STORAGE_PATH] = \
2959
          cluster.file_storage_dir
2960

    
2961
    # bridge checks
2962
    # FIXME: this needs to be changed per node-group, not cluster-wide
2963
    bridges = set()
2964
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2965
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2966
      bridges.add(default_nicpp[constants.NIC_LINK])
2967
    for inst_uuid in self.my_inst_info.values():
2968
      for nic in inst_uuid.nics:
2969
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
2970
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2971
          bridges.add(full_nic[constants.NIC_LINK])
2972

    
2973
    if bridges:
2974
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
2975

    
2976
    # Build our expected cluster state
2977
    node_image = dict((node.uuid, self.NodeImage(offline=node.offline,
2978
                                                 uuid=node.uuid,
2979
                                                 vm_capable=node.vm_capable))
2980
                      for node in node_data_list)
2981

    
2982
    # Gather OOB paths
2983
    oob_paths = []
2984
    for node in self.all_node_info.values():
2985
      path = SupportsOob(self.cfg, node)
2986
      if path and path not in oob_paths:
2987
        oob_paths.append(path)
2988

    
2989
    if oob_paths:
2990
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2991

    
2992
    for inst_uuid in self.my_inst_uuids:
2993
      instance = self.my_inst_info[inst_uuid]
2994
      if instance.admin_state == constants.ADMINST_OFFLINE:
2995
        i_offline += 1
2996

    
2997
      for nuuid in instance.all_nodes:
2998
        if nuuid not in node_image:
2999
          gnode = self.NodeImage(uuid=nuuid)
3000
          gnode.ghost = (nuuid not in self.all_node_info)
3001
          node_image[nuuid] = gnode
3002

    
3003
      instance.MapLVsByNode(node_vol_should)
3004

    
3005
      pnode = instance.primary_node
3006
      node_image[pnode].pinst.append(instance.uuid)
3007

    
3008
      for snode in instance.secondary_nodes:
3009
        nimg = node_image[snode]
3010
        nimg.sinst.append(instance.uuid)
3011
        if pnode not in nimg.sbp:
3012
          nimg.sbp[pnode] = []
3013
        nimg.sbp[pnode].append(instance.uuid)
3014

    
3015
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
3016
                                               self.my_node_info.keys())
3017
    # The value of exclusive_storage should be the same across the group, so if
3018
    # it's True for at least a node, we act as if it were set for all the nodes
3019
    self._exclusive_storage = compat.any(es_flags.values())
3020
    if self._exclusive_storage:
3021
      node_verify_param[constants.NV_EXCLUSIVEPVS] = True
3022

    
3023
    node_group_uuids = dict(map(lambda n: (n.name, n.group),
3024
                                self.cfg.GetAllNodesInfo().values()))
3025
    groups_config = self.cfg.GetAllNodeGroupsInfoDict()
3026

    
3027
    # At this point, we have the in-memory data structures complete,
3028
    # except for the runtime information, which we'll gather next
3029

    
3030
    # Due to the way our RPC system works, exact response times cannot be
3031
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
3032
    # time before and after executing the request, we can at least have a time
3033
    # window.
3034
    nvinfo_starttime = time.time()
3035
    all_nvinfo = self.rpc.call_node_verify(self.my_node_uuids,
3036
                                           node_verify_param,
3037
                                           self.cfg.GetClusterName(),
3038
                                           self.cfg.GetClusterInfo().hvparams,
3039
                                           node_group_uuids,
3040
                                           groups_config)
3041
    nvinfo_endtime = time.time()
3042

    
3043
    if self.extra_lv_nodes and vg_name is not None:
3044
      extra_lv_nvinfo = \
3045
          self.rpc.call_node_verify(self.extra_lv_nodes,
3046
                                    {constants.NV_LVLIST: vg_name},
3047
                                    self.cfg.GetClusterName(),
3048
                                    self.cfg.GetClusterInfo().hvparams,
3049
                                    node_group_uuids,
3050
                                    groups_config)
3051
    else:
3052
      extra_lv_nvinfo = {}
3053

    
3054
    all_drbd_map = self.cfg.ComputeDRBDMap()
3055

    
3056
    feedback_fn("* Gathering disk information (%s nodes)" %
3057
                len(self.my_node_uuids))
3058
    instdisk = self._CollectDiskInfo(self.my_node_info.keys(), node_image,
3059
                                     self.my_inst_info)
3060

    
3061
    feedback_fn("* Verifying configuration file consistency")
3062

    
3063
    # If not all nodes are being checked, we need to make sure the master node
3064
    # and a non-checked vm_capable node are in the list.
3065
    absent_node_uuids = set(self.all_node_info).difference(self.my_node_info)
3066
    if absent_node_uuids:
3067
      vf_nvinfo = all_nvinfo.copy()
3068
      vf_node_info = list(self.my_node_info.values())
3069
      additional_node_uuids = []
3070
      if master_node_uuid not in self.my_node_info:
3071
        additional_node_uuids.append(master_node_uuid)
3072
        vf_node_info.append(self.all_node_info[master_node_uuid])
3073
      # Add the first vm_capable node we find which is not included,
3074
      # excluding the master node (which we already have)
3075
      for node_uuid in absent_node_uuids:
3076
        nodeinfo = self.all_node_info[node_uuid]
3077
        if (nodeinfo.vm_capable and not nodeinfo.offline and
3078
            node_uuid != master_node_uuid):
3079
          additional_node_uuids.append(node_uuid)
3080
          vf_node_info.append(self.all_node_info[node_uuid])
3081
          break
3082
      key = constants.NV_FILELIST
3083
      vf_nvinfo.update(self.rpc.call_node_verify(
3084
         additional_node_uuids, {key: node_verify_param[key]},
3085
         self.cfg.GetClusterName(), self.cfg.GetClusterInfo().hvparams,
3086
         node_group_uuids,
3087
         groups_config))
3088
    else:
3089
      vf_nvinfo = all_nvinfo
3090
      vf_node_info = self.my_node_info.values()
3091

    
3092
    self._VerifyFiles(vf_node_info, master_node_uuid, vf_nvinfo, filemap)
3093

    
3094
    feedback_fn("* Verifying node status")
3095

    
3096
    refos_img = None
3097

    
3098
    for node_i in node_data_list:
3099
      nimg = node_image[node_i.uuid]
3100

    
3101
      if node_i.offline:
3102
        if verbose:
3103
          feedback_fn("* Skipping offline node %s" % (node_i.name,))
3104
        n_offline += 1
3105
        continue
3106

    
3107
      if node_i.uuid == master_node_uuid:
3108
        ntype = "master"
3109
      elif node_i.master_candidate:
3110
        ntype = "master candidate"
3111
      elif node_i.drained:
3112
        ntype = "drained"
3113
        n_drained += 1
3114
      else:
3115
        ntype = "regular"
3116
      if verbose:
3117
        feedback_fn("* Verifying node %s (%s)" % (node_i.name, ntype))
3118

    
3119
      msg = all_nvinfo[node_i.uuid].fail_msg
3120
      self._ErrorIf(msg, constants.CV_ENODERPC, node_i.name,
3121
                    "while contacting node: %s", msg)
3122
      if msg:
3123
        nimg.rpc_fail = True
3124
        continue
3125

    
3126
      nresult = all_nvinfo[node_i.uuid].payload
3127

    
3128
      nimg.call_ok = self._VerifyNode(node_i, nresult)
3129
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
3130
      self._VerifyNodeNetwork(node_i, nresult)
3131
      self._VerifyNodeUserScripts(node_i, nresult)
3132
      self._VerifyOob(node_i, nresult)
3133
      self._VerifyAcceptedFileStoragePaths(node_i, nresult,
3134
                                           node_i.uuid == master_node_uuid)
3135
      self._VerifyFileStoragePaths(node_i, nresult)
3136
      self._VerifySharedFileStoragePaths(node_i, nresult)
3137

    
3138
      if nimg.vm_capable:
3139
        self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
3140
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
3141
                             all_drbd_map)
3142

    
3143
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
3144
        self._UpdateNodeInstances(node_i, nresult, nimg)
3145
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
3146
        self._UpdateNodeOS(node_i, nresult, nimg)
3147

    
3148
        if not nimg.os_fail:
3149
          if refos_img is None:
3150
            refos_img = nimg
3151
          self._VerifyNodeOS(node_i, nimg, refos_img)
3152
        self._VerifyNodeBridges(node_i, nresult, bridges)
3153

    
3154
        # Check whether all running instances are primary for the node. (This
3155
        # can no longer be done from _VerifyInstance below, since some of the
3156
        # wrong instances could be from other node groups.)
3157
        non_primary_inst_uuids = set(nimg.instances).difference(nimg.pinst)
3158

    
3159
        for inst_uuid in non_primary_inst_uuids:
3160
          test = inst_uuid in self.all_inst_info
3161
          self._ErrorIf(test, constants.CV_EINSTANCEWRONGNODE,
3162
                        self.cfg.GetInstanceName(inst_uuid),
3163
                        "instance should not run on node %s", node_i.name)
3164
          self._ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
3165
                        "node is running unknown instance %s", inst_uuid)
3166

    
3167
    self._VerifyGroupDRBDVersion(all_nvinfo)
3168
    self._VerifyGroupLVM(node_image, vg_name)
3169

    
3170
    for node_uuid, result in extra_lv_nvinfo.items():
3171
      self._UpdateNodeVolumes(self.all_node_info[node_uuid], result.payload,
3172
                              node_image[node_uuid], vg_name)
3173

    
3174
    feedback_fn("* Verifying instance status")
3175
    for inst_uuid in self.my_inst_uuids:
3176
      instance = self.my_inst_info[inst_uuid]
3177
      if verbose:
3178
        feedback_fn("* Verifying instance %s" % instance.name)
3179
      self._VerifyInstance(instance, node_image, instdisk[inst_uuid])
3180

    
3181
      # If the instance is non-redundant we cannot survive losing its primary
3182
      # node, so we are not N+1 compliant.
3183
      if instance.disk_template not in constants.DTS_MIRRORED:
3184
        i_non_redundant.append(instance)
3185

    
3186
      if not cluster.FillBE(instance)[constants.BE_AUTO_BALANCE]:
3187
        i_non_a_balanced.append(instance)
3188

    
3189
    feedback_fn("* Verifying orphan volumes")
3190
    reserved = utils.FieldSet(*cluster.reserved_lvs)
3191

    
3192
    # We will get spurious "unknown volume" warnings if any node of this group
3193
    # is secondary for an instance whose primary is in another group. To avoid
3194
    # them, we find these instances and add their volumes to node_vol_should.
3195
    for instance in self.all_inst_info.values():
3196
      for secondary in instance.secondary_nodes:
3197
        if (secondary in self.my_node_info
3198
            and instance.name not in self.my_inst_info):
3199
          instance.MapLVsByNode(node_vol_should)
3200
          break
3201

    
3202
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
3203

    
3204
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
3205
      feedback_fn("* Verifying N+1 Memory redundancy")
3206
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
3207

    
3208
    feedback_fn("* Other Notes")
3209
    if i_non_redundant:
3210
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
3211
                  % len(i_non_redundant))
3212

    
3213
    if i_non_a_balanced:
3214
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
3215
                  % len(i_non_a_balanced))
3216

    
3217
    if i_offline:
3218
      feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
3219

    
3220
    if n_offline:
3221
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
3222

    
3223
    if n_drained:
3224
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
3225

    
3226
    return not self.bad
3227

    
3228
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
3229
    """Analyze the post-hooks' result
3230

3231
    This method analyses the hook result, handles it, and sends some
3232
    nicely-formatted feedback back to the user.
3233

3234
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
3235
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
3236
    @param hooks_results: the results of the multi-node hooks rpc call
3237
    @param feedback_fn: function used send feedback back to the caller
3238
    @param lu_result: previous Exec result
3239
    @return: the new Exec result, based on the previous result
3240
        and hook results
3241

3242
    """
3243
    # We only really run POST phase hooks, only for non-empty groups,
3244
    # and are only interested in their results
3245
    if not self.my_node_uuids:
3246
      # empty node group
3247
      pass
3248
    elif phase == constants.HOOKS_PHASE_POST:
3249
      # Used to change hooks' output to proper indentation
3250
      feedback_fn("* Hooks Results")
3251
      assert hooks_results, "invalid result from hooks"
3252

    
3253
      for node_name in hooks_results:
3254
        res = hooks_results[node_name]
3255
        msg = res.fail_msg
3256
        test = msg and not res.offline
3257
        self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3258
                      "Communication failure in hooks execution: %s", msg)
3259
        if res.offline or msg:
3260
          # No need to investigate payload if node is offline or gave
3261
          # an error.
3262
          continue
3263
        for script, hkr, output in res.payload:
3264
          test = hkr == constants.HKR_FAIL
3265
          self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3266
                        "Script %s failed, output:", script)
3267
          if test:
3268
            output = self._HOOKS_INDENT_RE.sub("      ", output)
3269
            feedback_fn("%s" % output)
3270
            lu_result = False
3271

    
3272
    return lu_result
3273

    
3274

    
3275
class LUClusterVerifyDisks(NoHooksLU):
3276
  """Verifies the cluster disks status.
3277

3278
  """
3279
  REQ_BGL = False
3280

    
3281
  def ExpandNames(self):
3282
    self.share_locks = ShareAll()
3283
    self.needed_locks = {
3284
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
3285
      }
3286

    
3287
  def Exec(self, feedback_fn):
3288
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3289

    
3290
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3291
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3292
                           for group in group_names])