Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / cluster.py @ 4869595d

History | View | Annotate | Download (122.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with the cluster."""
23

    
24
import OpenSSL
25

    
26
import copy
27
import itertools
28
import logging
29
import operator
30
import os
31
import re
32
import time
33

    
34
from ganeti import compat
35
from ganeti import constants
36
from ganeti import errors
37
from ganeti import hypervisor
38
from ganeti import locking
39
from ganeti import masterd
40
from ganeti import netutils
41
from ganeti import objects
42
from ganeti import opcodes
43
from ganeti import pathutils
44
from ganeti import query
45
import ganeti.rpc.node as rpc
46
from ganeti import runtime
47
from ganeti import ssh
48
from ganeti import uidpool
49
from ganeti import utils
50
from ganeti import vcluster
51

    
52
from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
53
  ResultWithJobs
54
from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55
  ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56
  GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57
  GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58
  CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59
  ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob, \
60
  CheckIpolicyVsDiskTemplates, CheckDiskAccessModeValidity, \
61
  CheckDiskAccessModeConsistency
62

    
63
import ganeti.masterd.instance
64

    
65

    
66
class LUClusterActivateMasterIp(NoHooksLU):
67
  """Activate the master IP on the master node.
68

69
  """
70
  def Exec(self, feedback_fn):
71
    """Activate the master IP.
72

73
    """
74
    master_params = self.cfg.GetMasterNetworkParameters()
75
    ems = self.cfg.GetUseExternalMipScript()
76
    result = self.rpc.call_node_activate_master_ip(master_params.uuid,
77
                                                   master_params, ems)
78
    result.Raise("Could not activate the master IP")
79

    
80

    
81
class LUClusterDeactivateMasterIp(NoHooksLU):
82
  """Deactivate the master IP on the master node.
83

84
  """
85
  def Exec(self, feedback_fn):
86
    """Deactivate the master IP.
87

88
    """
89
    master_params = self.cfg.GetMasterNetworkParameters()
90
    ems = self.cfg.GetUseExternalMipScript()
91
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
92
                                                     master_params, ems)
93
    result.Raise("Could not deactivate the master IP")
94

    
95

    
96
class LUClusterConfigQuery(NoHooksLU):
97
  """Return configuration values.
98

99
  """
100
  REQ_BGL = False
101

    
102
  def CheckArguments(self):
103
    self.cq = ClusterQuery(None, self.op.output_fields, False)
104

    
105
  def ExpandNames(self):
106
    self.cq.ExpandNames(self)
107

    
108
  def DeclareLocks(self, level):
109
    self.cq.DeclareLocks(self, level)
110

    
111
  def Exec(self, feedback_fn):
112
    result = self.cq.OldStyleQuery(self)
113

    
114
    assert len(result) == 1
115

    
116
    return result[0]
117

    
118

    
119
class LUClusterDestroy(LogicalUnit):
120
  """Logical unit for destroying the cluster.
121

122
  """
123
  HPATH = "cluster-destroy"
124
  HTYPE = constants.HTYPE_CLUSTER
125

    
126
  def BuildHooksEnv(self):
127
    """Build hooks env.
128

129
    """
130
    return {
131
      "OP_TARGET": self.cfg.GetClusterName(),
132
      }
133

    
134
  def BuildHooksNodes(self):
135
    """Build hooks nodes.
136

137
    """
138
    return ([], [])
139

    
140
  def CheckPrereq(self):
141
    """Check prerequisites.
142

143
    This checks whether the cluster is empty.
144

145
    Any errors are signaled by raising errors.OpPrereqError.
146

147
    """
148
    master = self.cfg.GetMasterNode()
149

    
150
    nodelist = self.cfg.GetNodeList()
151
    if len(nodelist) != 1 or nodelist[0] != master:
152
      raise errors.OpPrereqError("There are still %d node(s) in"
153
                                 " this cluster." % (len(nodelist) - 1),
154
                                 errors.ECODE_INVAL)
155
    instancelist = self.cfg.GetInstanceList()
156
    if instancelist:
157
      raise errors.OpPrereqError("There are still %d instance(s) in"
158
                                 " this cluster." % len(instancelist),
159
                                 errors.ECODE_INVAL)
160

    
161
  def Exec(self, feedback_fn):
162
    """Destroys the cluster.
163

164
    """
165
    master_params = self.cfg.GetMasterNetworkParameters()
166

    
167
    # Run post hooks on master node before it's removed
168
    RunPostHook(self, self.cfg.GetNodeName(master_params.uuid))
169

    
170
    ems = self.cfg.GetUseExternalMipScript()
171
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
172
                                                     master_params, ems)
173
    result.Warn("Error disabling the master IP address", self.LogWarning)
174
    return master_params.uuid
175

    
176

    
177
class LUClusterPostInit(LogicalUnit):
178
  """Logical unit for running hooks after cluster initialization.
179

180
  """
181
  HPATH = "cluster-init"
182
  HTYPE = constants.HTYPE_CLUSTER
183

    
184
  def CheckArguments(self):
185
    self.master_uuid = self.cfg.GetMasterNode()
186
    self.master_ndparams = self.cfg.GetNdParams(self.cfg.GetMasterNodeInfo())
187

    
188
    # TODO: When Issue 584 is solved, and None is properly parsed when used
189
    # as a default value, ndparams.get(.., None) can be changed to
190
    # ndparams[..] to access the values directly
191

    
192
    # OpenvSwitch: Warn user if link is missing
193
    if (self.master_ndparams[constants.ND_OVS] and not
194
        self.master_ndparams.get(constants.ND_OVS_LINK, None)):
195
      self.LogInfo("No physical interface for OpenvSwitch was given."
196
                   " OpenvSwitch will not have an outside connection. This"
197
                   " might not be what you want.")
198

    
199
  def BuildHooksEnv(self):
200
    """Build hooks env.
201

202
    """
203
    return {
204
      "OP_TARGET": self.cfg.GetClusterName(),
205
      }
206

    
207
  def BuildHooksNodes(self):
208
    """Build hooks nodes.
209

210
    """
211
    return ([], [self.cfg.GetMasterNode()])
212

    
213
  def Exec(self, feedback_fn):
214
    """Create and configure Open vSwitch
215

216
    """
217
    if self.master_ndparams[constants.ND_OVS]:
218
      result = self.rpc.call_node_configure_ovs(
219
                 self.master_uuid,
220
                 self.master_ndparams[constants.ND_OVS_NAME],
221
                 self.master_ndparams.get(constants.ND_OVS_LINK, None))
222
      result.Raise("Could not successully configure Open vSwitch")
223
    return True
224

    
225

    
226
class ClusterQuery(QueryBase):
227
  FIELDS = query.CLUSTER_FIELDS
228

    
229
  #: Do not sort (there is only one item)
230
  SORT_FIELD = None
231

    
232
  def ExpandNames(self, lu):
233
    lu.needed_locks = {}
234

    
235
    # The following variables interact with _QueryBase._GetNames
236
    self.wanted = locking.ALL_SET
237
    self.do_locking = self.use_locking
238

    
239
    if self.do_locking:
240
      raise errors.OpPrereqError("Can not use locking for cluster queries",
241
                                 errors.ECODE_INVAL)
242

    
243
  def DeclareLocks(self, lu, level):
244
    pass
245

    
246
  def _GetQueryData(self, lu):
247
    """Computes the list of nodes and their attributes.
248

249
    """
250
    # Locking is not used
251
    assert not (compat.any(lu.glm.is_owned(level)
252
                           for level in locking.LEVELS
253
                           if level != locking.LEVEL_CLUSTER) or
254
                self.do_locking or self.use_locking)
255

    
256
    if query.CQ_CONFIG in self.requested_data:
257
      cluster = lu.cfg.GetClusterInfo()
258
      nodes = lu.cfg.GetAllNodesInfo()
259
    else:
260
      cluster = NotImplemented
261
      nodes = NotImplemented
262

    
263
    if query.CQ_QUEUE_DRAINED in self.requested_data:
264
      drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
265
    else:
266
      drain_flag = NotImplemented
267

    
268
    if query.CQ_WATCHER_PAUSE in self.requested_data:
269
      master_node_uuid = lu.cfg.GetMasterNode()
270

    
271
      result = lu.rpc.call_get_watcher_pause(master_node_uuid)
272
      result.Raise("Can't retrieve watcher pause from master node '%s'" %
273
                   lu.cfg.GetMasterNodeName())
274

    
275
      watcher_pause = result.payload
276
    else:
277
      watcher_pause = NotImplemented
278

    
279
    return query.ClusterQueryData(cluster, nodes, drain_flag, watcher_pause)
280

    
281

    
282
class LUClusterQuery(NoHooksLU):
283
  """Query cluster configuration.
284

285
  """
286
  REQ_BGL = False
287

    
288
  def ExpandNames(self):
289
    self.needed_locks = {}
290

    
291
  def Exec(self, feedback_fn):
292
    """Return cluster config.
293

294
    """
295
    cluster = self.cfg.GetClusterInfo()
296
    os_hvp = {}
297

    
298
    # Filter just for enabled hypervisors
299
    for os_name, hv_dict in cluster.os_hvp.items():
300
      os_hvp[os_name] = {}
301
      for hv_name, hv_params in hv_dict.items():
302
        if hv_name in cluster.enabled_hypervisors:
303
          os_hvp[os_name][hv_name] = hv_params
304

    
305
    # Convert ip_family to ip_version
306
    primary_ip_version = constants.IP4_VERSION
307
    if cluster.primary_ip_family == netutils.IP6Address.family:
308
      primary_ip_version = constants.IP6_VERSION
309

    
310
    result = {
311
      "software_version": constants.RELEASE_VERSION,
312
      "protocol_version": constants.PROTOCOL_VERSION,
313
      "config_version": constants.CONFIG_VERSION,
314
      "os_api_version": max(constants.OS_API_VERSIONS),
315
      "export_version": constants.EXPORT_VERSION,
316
      "vcs_version": constants.VCS_VERSION,
317
      "architecture": runtime.GetArchInfo(),
318
      "name": cluster.cluster_name,
319
      "master": self.cfg.GetMasterNodeName(),
320
      "default_hypervisor": cluster.primary_hypervisor,
321
      "enabled_hypervisors": cluster.enabled_hypervisors,
322
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
323
                        for hypervisor_name in cluster.enabled_hypervisors]),
324
      "os_hvp": os_hvp,
325
      "beparams": cluster.beparams,
326
      "osparams": cluster.osparams,
327
      "ipolicy": cluster.ipolicy,
328
      "nicparams": cluster.nicparams,
329
      "ndparams": cluster.ndparams,
330
      "diskparams": cluster.diskparams,
331
      "candidate_pool_size": cluster.candidate_pool_size,
332
      "master_netdev": cluster.master_netdev,
333
      "master_netmask": cluster.master_netmask,
334
      "use_external_mip_script": cluster.use_external_mip_script,
335
      "volume_group_name": cluster.volume_group_name,
336
      "drbd_usermode_helper": cluster.drbd_usermode_helper,
337
      "file_storage_dir": cluster.file_storage_dir,
338
      "shared_file_storage_dir": cluster.shared_file_storage_dir,
339
      "maintain_node_health": cluster.maintain_node_health,
340
      "ctime": cluster.ctime,
341
      "mtime": cluster.mtime,
342
      "uuid": cluster.uuid,
343
      "tags": list(cluster.GetTags()),
344
      "uid_pool": cluster.uid_pool,
345
      "default_iallocator": cluster.default_iallocator,
346
      "default_iallocator_params": cluster.default_iallocator_params,
347
      "reserved_lvs": cluster.reserved_lvs,
348
      "primary_ip_version": primary_ip_version,
349
      "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
350
      "hidden_os": cluster.hidden_os,
351
      "blacklisted_os": cluster.blacklisted_os,
352
      "enabled_disk_templates": cluster.enabled_disk_templates,
353
      }
354

    
355
    return result
356

    
357

    
358
class LUClusterRedistConf(NoHooksLU):
359
  """Force the redistribution of cluster configuration.
360

361
  This is a very simple LU.
362

363
  """
364
  REQ_BGL = False
365

    
366
  def ExpandNames(self):
367
    self.needed_locks = {
368
      locking.LEVEL_NODE: locking.ALL_SET,
369
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
370
    }
371
    self.share_locks = ShareAll()
372

    
373
  def Exec(self, feedback_fn):
374
    """Redistribute the configuration.
375

376
    """
377
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
378
    RedistributeAncillaryFiles(self)
379

    
380

    
381
class LUClusterRename(LogicalUnit):
382
  """Rename the cluster.
383

384
  """
385
  HPATH = "cluster-rename"
386
  HTYPE = constants.HTYPE_CLUSTER
387

    
388
  def BuildHooksEnv(self):
389
    """Build hooks env.
390

391
    """
392
    return {
393
      "OP_TARGET": self.cfg.GetClusterName(),
394
      "NEW_NAME": self.op.name,
395
      }
396

    
397
  def BuildHooksNodes(self):
398
    """Build hooks nodes.
399

400
    """
401
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
402

    
403
  def CheckPrereq(self):
404
    """Verify that the passed name is a valid one.
405

406
    """
407
    hostname = netutils.GetHostname(name=self.op.name,
408
                                    family=self.cfg.GetPrimaryIPFamily())
409

    
410
    new_name = hostname.name
411
    self.ip = new_ip = hostname.ip
412
    old_name = self.cfg.GetClusterName()
413
    old_ip = self.cfg.GetMasterIP()
414
    if new_name == old_name and new_ip == old_ip:
415
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
416
                                 " cluster has changed",
417
                                 errors.ECODE_INVAL)
418
    if new_ip != old_ip:
419
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
420
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
421
                                   " reachable on the network" %
422
                                   new_ip, errors.ECODE_NOTUNIQUE)
423

    
424
    self.op.name = new_name
425

    
426
  def Exec(self, feedback_fn):
427
    """Rename the cluster.
428

429
    """
430
    clustername = self.op.name
431
    new_ip = self.ip
432

    
433
    # shutdown the master IP
434
    master_params = self.cfg.GetMasterNetworkParameters()
435
    ems = self.cfg.GetUseExternalMipScript()
436
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
437
                                                     master_params, ems)
438
    result.Raise("Could not disable the master role")
439

    
440
    try:
441
      cluster = self.cfg.GetClusterInfo()
442
      cluster.cluster_name = clustername
443
      cluster.master_ip = new_ip
444
      self.cfg.Update(cluster, feedback_fn)
445

    
446
      # update the known hosts file
447
      ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
448
      node_list = self.cfg.GetOnlineNodeList()
449
      try:
450
        node_list.remove(master_params.uuid)
451
      except ValueError:
452
        pass
453
      UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
454
    finally:
455
      master_params.ip = new_ip
456
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
457
                                                     master_params, ems)
458
      result.Warn("Could not re-enable the master role on the master,"
459
                  " please restart manually", self.LogWarning)
460

    
461
    return clustername
462

    
463

    
464
class LUClusterRepairDiskSizes(NoHooksLU):
465
  """Verifies the cluster disks sizes.
466

467
  """
468
  REQ_BGL = False
469

    
470
  def ExpandNames(self):
471
    if self.op.instances:
472
      (_, self.wanted_names) = GetWantedInstances(self, self.op.instances)
473
      # Not getting the node allocation lock as only a specific set of
474
      # instances (and their nodes) is going to be acquired
475
      self.needed_locks = {
476
        locking.LEVEL_NODE_RES: [],
477
        locking.LEVEL_INSTANCE: self.wanted_names,
478
        }
479
      self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
480
    else:
481
      self.wanted_names = None
482
      self.needed_locks = {
483
        locking.LEVEL_NODE_RES: locking.ALL_SET,
484
        locking.LEVEL_INSTANCE: locking.ALL_SET,
485

    
486
        # This opcode is acquires the node locks for all instances
487
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
488
        }
489

    
490
    self.share_locks = {
491
      locking.LEVEL_NODE_RES: 1,
492
      locking.LEVEL_INSTANCE: 0,
493
      locking.LEVEL_NODE_ALLOC: 1,
494
      }
495

    
496
  def DeclareLocks(self, level):
497
    if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
498
      self._LockInstancesNodes(primary_only=True, level=level)
499

    
500
  def CheckPrereq(self):
501
    """Check prerequisites.
502

503
    This only checks the optional instance list against the existing names.
504

505
    """
506
    if self.wanted_names is None:
507
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
508

    
509
    self.wanted_instances = \
510
        map(compat.snd, self.cfg.GetMultiInstanceInfoByName(self.wanted_names))
511

    
512
  def _EnsureChildSizes(self, disk):
513
    """Ensure children of the disk have the needed disk size.
514

515
    This is valid mainly for DRBD8 and fixes an issue where the
516
    children have smaller disk size.
517

518
    @param disk: an L{ganeti.objects.Disk} object
519

520
    """
521
    if disk.dev_type == constants.DT_DRBD8:
522
      assert disk.children, "Empty children for DRBD8?"
523
      fchild = disk.children[0]
524
      mismatch = fchild.size < disk.size
525
      if mismatch:
526
        self.LogInfo("Child disk has size %d, parent %d, fixing",
527
                     fchild.size, disk.size)
528
        fchild.size = disk.size
529

    
530
      # and we recurse on this child only, not on the metadev
531
      return self._EnsureChildSizes(fchild) or mismatch
532
    else:
533
      return False
534

    
535
  def Exec(self, feedback_fn):
536
    """Verify the size of cluster disks.
537

538
    """
539
    # TODO: check child disks too
540
    # TODO: check differences in size between primary/secondary nodes
541
    per_node_disks = {}
542
    for instance in self.wanted_instances:
543
      pnode = instance.primary_node
544
      if pnode not in per_node_disks:
545
        per_node_disks[pnode] = []
546
      for idx, disk in enumerate(instance.disks):
547
        per_node_disks[pnode].append((instance, idx, disk))
548

    
549
    assert not (frozenset(per_node_disks.keys()) -
550
                self.owned_locks(locking.LEVEL_NODE_RES)), \
551
      "Not owning correct locks"
552
    assert not self.owned_locks(locking.LEVEL_NODE)
553

    
554
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
555
                                               per_node_disks.keys())
556

    
557
    changed = []
558
    for node_uuid, dskl in per_node_disks.items():
559
      if not dskl:
560
        # no disks on the node
561
        continue
562

    
563
      newl = [([v[2].Copy()], v[0]) for v in dskl]
564
      node_name = self.cfg.GetNodeName(node_uuid)
565
      result = self.rpc.call_blockdev_getdimensions(node_uuid, newl)
566
      if result.fail_msg:
567
        self.LogWarning("Failure in blockdev_getdimensions call to node"
568
                        " %s, ignoring", node_name)
569
        continue
570
      if len(result.payload) != len(dskl):
571
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
572
                        " result.payload=%s", node_name, len(dskl),
573
                        result.payload)
574
        self.LogWarning("Invalid result from node %s, ignoring node results",
575
                        node_name)
576
        continue
577
      for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
578
        if dimensions is None:
579
          self.LogWarning("Disk %d of instance %s did not return size"
580
                          " information, ignoring", idx, instance.name)
581
          continue
582
        if not isinstance(dimensions, (tuple, list)):
583
          self.LogWarning("Disk %d of instance %s did not return valid"
584
                          " dimension information, ignoring", idx,
585
                          instance.name)
586
          continue
587
        (size, spindles) = dimensions
588
        if not isinstance(size, (int, long)):
589
          self.LogWarning("Disk %d of instance %s did not return valid"
590
                          " size information, ignoring", idx, instance.name)
591
          continue
592
        size = size >> 20
593
        if size != disk.size:
594
          self.LogInfo("Disk %d of instance %s has mismatched size,"
595
                       " correcting: recorded %d, actual %d", idx,
596
                       instance.name, disk.size, size)
597
          disk.size = size
598
          self.cfg.Update(instance, feedback_fn)
599
          changed.append((instance.name, idx, "size", size))
600
        if es_flags[node_uuid]:
601
          if spindles is None:
602
            self.LogWarning("Disk %d of instance %s did not return valid"
603
                            " spindles information, ignoring", idx,
604
                            instance.name)
605
          elif disk.spindles is None or disk.spindles != spindles:
606
            self.LogInfo("Disk %d of instance %s has mismatched spindles,"
607
                         " correcting: recorded %s, actual %s",
608
                         idx, instance.name, disk.spindles, spindles)
609
            disk.spindles = spindles
610
            self.cfg.Update(instance, feedback_fn)
611
            changed.append((instance.name, idx, "spindles", disk.spindles))
612
        if self._EnsureChildSizes(disk):
613
          self.cfg.Update(instance, feedback_fn)
614
          changed.append((instance.name, idx, "size", disk.size))
615
    return changed
616

    
617

    
618
def _ValidateNetmask(cfg, netmask):
619
  """Checks if a netmask is valid.
620

621
  @type cfg: L{config.ConfigWriter}
622
  @param cfg: The cluster configuration
623
  @type netmask: int
624
  @param netmask: the netmask to be verified
625
  @raise errors.OpPrereqError: if the validation fails
626

627
  """
628
  ip_family = cfg.GetPrimaryIPFamily()
629
  try:
630
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
631
  except errors.ProgrammerError:
632
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
633
                               ip_family, errors.ECODE_INVAL)
634
  if not ipcls.ValidateNetmask(netmask):
635
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
636
                               (netmask), errors.ECODE_INVAL)
637

    
638

    
639
def CheckFileBasedStoragePathVsEnabledDiskTemplates(
640
    logging_warn_fn, file_storage_dir, enabled_disk_templates,
641
    file_disk_template):
642
  """Checks whether the given file-based storage directory is acceptable.
643

644
  Note: This function is public, because it is also used in bootstrap.py.
645

646
  @type logging_warn_fn: function
647
  @param logging_warn_fn: function which accepts a string and logs it
648
  @type file_storage_dir: string
649
  @param file_storage_dir: the directory to be used for file-based instances
650
  @type enabled_disk_templates: list of string
651
  @param enabled_disk_templates: the list of enabled disk templates
652
  @type file_disk_template: string
653
  @param file_disk_template: the file-based disk template for which the
654
      path should be checked
655

656
  """
657
  assert (file_disk_template in utils.storage.GetDiskTemplatesOfStorageTypes(
658
            constants.ST_FILE, constants.ST_SHARED_FILE
659
         ))
660
  file_storage_enabled = file_disk_template in enabled_disk_templates
661
  if file_storage_dir is not None:
662
    if file_storage_dir == "":
663
      if file_storage_enabled:
664
        raise errors.OpPrereqError(
665
            "Unsetting the '%s' storage directory while having '%s' storage"
666
            " enabled is not permitted." %
667
            (file_disk_template, file_disk_template))
668
    else:
669
      if not file_storage_enabled:
670
        logging_warn_fn(
671
            "Specified a %s storage directory, although %s storage is not"
672
            " enabled." % (file_disk_template, file_disk_template))
673
  else:
674
    raise errors.ProgrammerError("Received %s storage dir with value"
675
                                 " 'None'." % file_disk_template)
676

    
677

    
678
def CheckFileStoragePathVsEnabledDiskTemplates(
679
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
680
  """Checks whether the given file storage directory is acceptable.
681

682
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
683

684
  """
685
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
686
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
687
      constants.DT_FILE)
688

    
689

    
690
def CheckSharedFileStoragePathVsEnabledDiskTemplates(
691
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
692
  """Checks whether the given shared file storage directory is acceptable.
693

694
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
695

696
  """
697
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
698
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
699
      constants.DT_SHARED_FILE)
700

    
701

    
702
class LUClusterSetParams(LogicalUnit):
703
  """Change the parameters of the cluster.
704

705
  """
706
  HPATH = "cluster-modify"
707
  HTYPE = constants.HTYPE_CLUSTER
708
  REQ_BGL = False
709

    
710
  def CheckArguments(self):
711
    """Check parameters
712

713
    """
714
    if self.op.uid_pool:
715
      uidpool.CheckUidPool(self.op.uid_pool)
716

    
717
    if self.op.add_uids:
718
      uidpool.CheckUidPool(self.op.add_uids)
719

    
720
    if self.op.remove_uids:
721
      uidpool.CheckUidPool(self.op.remove_uids)
722

    
723
    if self.op.master_netmask is not None:
724
      _ValidateNetmask(self.cfg, self.op.master_netmask)
725

    
726
    if self.op.diskparams:
727
      for dt_params in self.op.diskparams.values():
728
        utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
729
      try:
730
        utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
731
        CheckDiskAccessModeValidity(self.op.diskparams)
732
      except errors.OpPrereqError, err:
733
        raise errors.OpPrereqError("While verify diskparams options: %s" % err,
734
                                   errors.ECODE_INVAL)
735

    
736
  def ExpandNames(self):
737
    # FIXME: in the future maybe other cluster params won't require checking on
738
    # all nodes to be modified.
739
    # FIXME: This opcode changes cluster-wide settings. Is acquiring all
740
    # resource locks the right thing, shouldn't it be the BGL instead?
741
    self.needed_locks = {
742
      locking.LEVEL_NODE: locking.ALL_SET,
743
      locking.LEVEL_INSTANCE: locking.ALL_SET,
744
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
745
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
746
    }
747
    self.share_locks = ShareAll()
748

    
749
  def BuildHooksEnv(self):
750
    """Build hooks env.
751

752
    """
753
    return {
754
      "OP_TARGET": self.cfg.GetClusterName(),
755
      "NEW_VG_NAME": self.op.vg_name,
756
      }
757

    
758
  def BuildHooksNodes(self):
759
    """Build hooks nodes.
760

761
    """
762
    mn = self.cfg.GetMasterNode()
763
    return ([mn], [mn])
764

    
765
  def _CheckVgName(self, node_uuids, enabled_disk_templates,
766
                   new_enabled_disk_templates):
767
    """Check the consistency of the vg name on all nodes and in case it gets
768
       unset whether there are instances still using it.
769

770
    """
771
    lvm_is_enabled = utils.IsLvmEnabled(enabled_disk_templates)
772
    lvm_gets_enabled = utils.LvmGetsEnabled(enabled_disk_templates,
773
                                            new_enabled_disk_templates)
774
    current_vg_name = self.cfg.GetVGName()
775

    
776
    if self.op.vg_name == '':
777
      if lvm_is_enabled:
778
        raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
779
                                   " disk templates are or get enabled.")
780

    
781
    if self.op.vg_name is None:
782
      if current_vg_name is None and lvm_is_enabled:
783
        raise errors.OpPrereqError("Please specify a volume group when"
784
                                   " enabling lvm-based disk-templates.")
785

    
786
    if self.op.vg_name is not None and not self.op.vg_name:
787
      if self.cfg.HasAnyDiskOfType(constants.DT_PLAIN):
788
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
789
                                   " instances exist", errors.ECODE_INVAL)
790

    
791
    if (self.op.vg_name is not None and lvm_is_enabled) or \
792
        (self.cfg.GetVGName() is not None and lvm_gets_enabled):
793
      self._CheckVgNameOnNodes(node_uuids)
794

    
795
  def _CheckVgNameOnNodes(self, node_uuids):
796
    """Check the status of the volume group on each node.
797

798
    """
799
    vglist = self.rpc.call_vg_list(node_uuids)
800
    for node_uuid in node_uuids:
801
      msg = vglist[node_uuid].fail_msg
802
      if msg:
803
        # ignoring down node
804
        self.LogWarning("Error while gathering data on node %s"
805
                        " (ignoring node): %s",
806
                        self.cfg.GetNodeName(node_uuid), msg)
807
        continue
808
      vgstatus = utils.CheckVolumeGroupSize(vglist[node_uuid].payload,
809
                                            self.op.vg_name,
810
                                            constants.MIN_VG_SIZE)
811
      if vgstatus:
812
        raise errors.OpPrereqError("Error on node '%s': %s" %
813
                                   (self.cfg.GetNodeName(node_uuid), vgstatus),
814
                                   errors.ECODE_ENVIRON)
815

    
816
  @staticmethod
817
  def _GetDiskTemplateSetsInner(op_enabled_disk_templates,
818
                                old_enabled_disk_templates):
819
    """Computes three sets of disk templates.
820

821
    @see: C{_GetDiskTemplateSets} for more details.
822

823
    """
824
    enabled_disk_templates = None
825
    new_enabled_disk_templates = []
826
    disabled_disk_templates = []
827
    if op_enabled_disk_templates:
828
      enabled_disk_templates = op_enabled_disk_templates
829
      new_enabled_disk_templates = \
830
        list(set(enabled_disk_templates)
831
             - set(old_enabled_disk_templates))
832
      disabled_disk_templates = \
833
        list(set(old_enabled_disk_templates)
834
             - set(enabled_disk_templates))
835
    else:
836
      enabled_disk_templates = old_enabled_disk_templates
837
    return (enabled_disk_templates, new_enabled_disk_templates,
838
            disabled_disk_templates)
839

    
840
  def _GetDiskTemplateSets(self, cluster):
841
    """Computes three sets of disk templates.
842

843
    The three sets are:
844
      - disk templates that will be enabled after this operation (no matter if
845
        they were enabled before or not)
846
      - disk templates that get enabled by this operation (thus haven't been
847
        enabled before.)
848
      - disk templates that get disabled by this operation
849

850
    """
851
    return self._GetDiskTemplateSetsInner(self.op.enabled_disk_templates,
852
                                          cluster.enabled_disk_templates)
853

    
854
  def _CheckIpolicy(self, cluster, enabled_disk_templates):
855
    """Checks the ipolicy.
856

857
    @type cluster: C{objects.Cluster}
858
    @param cluster: the cluster's configuration
859
    @type enabled_disk_templates: list of string
860
    @param enabled_disk_templates: list of (possibly newly) enabled disk
861
      templates
862

863
    """
864
    # FIXME: write unit tests for this
865
    if self.op.ipolicy:
866
      self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
867
                                           group_policy=False)
868

    
869
      CheckIpolicyVsDiskTemplates(self.new_ipolicy,
870
                                  enabled_disk_templates)
871

    
872
      all_instances = self.cfg.GetAllInstancesInfo().values()
873
      violations = set()
874
      for group in self.cfg.GetAllNodeGroupsInfo().values():
875
        instances = frozenset([inst for inst in all_instances
876
                               if compat.any(nuuid in group.members
877
                                             for nuuid in inst.all_nodes)])
878
        new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
879
        ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
880
        new = ComputeNewInstanceViolations(ipol, new_ipolicy, instances,
881
                                           self.cfg)
882
        if new:
883
          violations.update(new)
884

    
885
      if violations:
886
        self.LogWarning("After the ipolicy change the following instances"
887
                        " violate them: %s",
888
                        utils.CommaJoin(utils.NiceSort(violations)))
889
    else:
890
      CheckIpolicyVsDiskTemplates(cluster.ipolicy,
891
                                  enabled_disk_templates)
892

    
893
  def _CheckDrbdHelperOnNodes(self, drbd_helper, node_uuids):
894
    """Checks whether the set DRBD helper actually exists on the nodes.
895

896
    @type drbd_helper: string
897
    @param drbd_helper: path of the drbd usermode helper binary
898
    @type node_uuids: list of strings
899
    @param node_uuids: list of node UUIDs to check for the helper
900

901
    """
902
    # checks given drbd helper on all nodes
903
    helpers = self.rpc.call_drbd_helper(node_uuids)
904
    for (_, ninfo) in self.cfg.GetMultiNodeInfo(node_uuids):
905
      if ninfo.offline:
906
        self.LogInfo("Not checking drbd helper on offline node %s",
907
                     ninfo.name)
908
        continue
909
      msg = helpers[ninfo.uuid].fail_msg
910
      if msg:
911
        raise errors.OpPrereqError("Error checking drbd helper on node"
912
                                   " '%s': %s" % (ninfo.name, msg),
913
                                   errors.ECODE_ENVIRON)
914
      node_helper = helpers[ninfo.uuid].payload
915
      if node_helper != drbd_helper:
916
        raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
917
                                   (ninfo.name, node_helper),
918
                                   errors.ECODE_ENVIRON)
919

    
920
  def _CheckDrbdHelper(self, node_uuids, drbd_enabled, drbd_gets_enabled):
921
    """Check the DRBD usermode helper.
922

923
    @type node_uuids: list of strings
924
    @param node_uuids: a list of nodes' UUIDs
925
    @type drbd_enabled: boolean
926
    @param drbd_enabled: whether DRBD will be enabled after this operation
927
      (no matter if it was disabled before or not)
928
    @type drbd_gets_enabled: boolen
929
    @param drbd_gets_enabled: true if DRBD was disabled before this
930
      operation, but will be enabled afterwards
931

932
    """
933
    if self.op.drbd_helper == '':
934
      if drbd_enabled:
935
        raise errors.OpPrereqError("Cannot disable drbd helper while"
936
                                   " DRBD is enabled.")
937
      if self.cfg.HasAnyDiskOfType(constants.DT_DRBD8):
938
        raise errors.OpPrereqError("Cannot disable drbd helper while"
939
                                   " drbd-based instances exist",
940
                                   errors.ECODE_INVAL)
941

    
942
    else:
943
      if self.op.drbd_helper is not None and drbd_enabled:
944
        self._CheckDrbdHelperOnNodes(self.op.drbd_helper, node_uuids)
945
      else:
946
        if drbd_gets_enabled:
947
          current_drbd_helper = self.cfg.GetClusterInfo().drbd_usermode_helper
948
          if current_drbd_helper is not None:
949
            self._CheckDrbdHelperOnNodes(current_drbd_helper, node_uuids)
950
          else:
951
            raise errors.OpPrereqError("Cannot enable DRBD without a"
952
                                       " DRBD usermode helper set.")
953

    
954
  def _CheckInstancesOfDisabledDiskTemplates(
955
      self, disabled_disk_templates):
956
    """Check whether we try to disable a disk template that is in use.
957

958
    @type disabled_disk_templates: list of string
959
    @param disabled_disk_templates: list of disk templates that are going to
960
      be disabled by this operation
961

962
    """
963
    for disk_template in disabled_disk_templates:
964
      if self.cfg.HasAnyDiskOfType(disk_template):
965
        raise errors.OpPrereqError(
966
            "Cannot disable disk template '%s', because there is at least one"
967
            " instance using it." % disk_template)
968

    
969
  def CheckPrereq(self):
970
    """Check prerequisites.
971

972
    This checks whether the given params don't conflict and
973
    if the given volume group is valid.
974

975
    """
976
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
977
    self.cluster = cluster = self.cfg.GetClusterInfo()
978

    
979
    vm_capable_node_uuids = [node.uuid
980
                             for node in self.cfg.GetAllNodesInfo().values()
981
                             if node.uuid in node_uuids and node.vm_capable]
982

    
983
    (enabled_disk_templates, new_enabled_disk_templates,
984
      disabled_disk_templates) = self._GetDiskTemplateSets(cluster)
985
    self._CheckInstancesOfDisabledDiskTemplates(disabled_disk_templates)
986

    
987
    self._CheckVgName(vm_capable_node_uuids, enabled_disk_templates,
988
                      new_enabled_disk_templates)
989

    
990
    if self.op.file_storage_dir is not None:
991
      CheckFileStoragePathVsEnabledDiskTemplates(
992
          self.LogWarning, self.op.file_storage_dir, enabled_disk_templates)
993

    
994
    if self.op.shared_file_storage_dir is not None:
995
      CheckSharedFileStoragePathVsEnabledDiskTemplates(
996
          self.LogWarning, self.op.shared_file_storage_dir,
997
          enabled_disk_templates)
998

    
999
    drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
1000
    drbd_gets_enabled = constants.DT_DRBD8 in new_enabled_disk_templates
1001
    self._CheckDrbdHelper(node_uuids, drbd_enabled, drbd_gets_enabled)
1002

    
1003
    # validate params changes
1004
    if self.op.beparams:
1005
      objects.UpgradeBeParams(self.op.beparams)
1006
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1007
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
1008

    
1009
    if self.op.ndparams:
1010
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
1011
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
1012

    
1013
      # TODO: we need a more general way to handle resetting
1014
      # cluster-level parameters to default values
1015
      if self.new_ndparams["oob_program"] == "":
1016
        self.new_ndparams["oob_program"] = \
1017
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
1018

    
1019
    if self.op.hv_state:
1020
      new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
1021
                                           self.cluster.hv_state_static)
1022
      self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
1023
                               for hv, values in new_hv_state.items())
1024

    
1025
    if self.op.disk_state:
1026
      new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
1027
                                               self.cluster.disk_state_static)
1028
      self.new_disk_state = \
1029
        dict((storage, dict((name, cluster.SimpleFillDiskState(values))
1030
                            for name, values in svalues.items()))
1031
             for storage, svalues in new_disk_state.items())
1032

    
1033
    self._CheckIpolicy(cluster, enabled_disk_templates)
1034

    
1035
    if self.op.nicparams:
1036
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1037
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
1038
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
1039
      nic_errors = []
1040

    
1041
      # check all instances for consistency
1042
      for instance in self.cfg.GetAllInstancesInfo().values():
1043
        for nic_idx, nic in enumerate(instance.nics):
1044
          params_copy = copy.deepcopy(nic.nicparams)
1045
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
1046

    
1047
          # check parameter syntax
1048
          try:
1049
            objects.NIC.CheckParameterSyntax(params_filled)
1050
          except errors.ConfigurationError, err:
1051
            nic_errors.append("Instance %s, nic/%d: %s" %
1052
                              (instance.name, nic_idx, err))
1053

    
1054
          # if we're moving instances to routed, check that they have an ip
1055
          target_mode = params_filled[constants.NIC_MODE]
1056
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
1057
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
1058
                              " address" % (instance.name, nic_idx))
1059
      if nic_errors:
1060
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
1061
                                   "\n".join(nic_errors), errors.ECODE_INVAL)
1062

    
1063
    # hypervisor list/parameters
1064
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
1065
    if self.op.hvparams:
1066
      for hv_name, hv_dict in self.op.hvparams.items():
1067
        if hv_name not in self.new_hvparams:
1068
          self.new_hvparams[hv_name] = hv_dict
1069
        else:
1070
          self.new_hvparams[hv_name].update(hv_dict)
1071

    
1072
    # disk template parameters
1073
    self.new_diskparams = objects.FillDict(cluster.diskparams, {})
1074
    if self.op.diskparams:
1075
      for dt_name, dt_params in self.op.diskparams.items():
1076
        if dt_name not in self.new_diskparams:
1077
          self.new_diskparams[dt_name] = dt_params
1078
        else:
1079
          self.new_diskparams[dt_name].update(dt_params)
1080
      CheckDiskAccessModeConsistency(self.op.diskparams, self.cfg)
1081

    
1082
    # os hypervisor parameters
1083
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
1084
    if self.op.os_hvp:
1085
      for os_name, hvs in self.op.os_hvp.items():
1086
        if os_name not in self.new_os_hvp:
1087
          self.new_os_hvp[os_name] = hvs
1088
        else:
1089
          for hv_name, hv_dict in hvs.items():
1090
            if hv_dict is None:
1091
              # Delete if it exists
1092
              self.new_os_hvp[os_name].pop(hv_name, None)
1093
            elif hv_name not in self.new_os_hvp[os_name]:
1094
              self.new_os_hvp[os_name][hv_name] = hv_dict
1095
            else:
1096
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
1097

    
1098
    # os parameters
1099
    self.new_osp = objects.FillDict(cluster.osparams, {})
1100
    if self.op.osparams:
1101
      for os_name, osp in self.op.osparams.items():
1102
        if os_name not in self.new_osp:
1103
          self.new_osp[os_name] = {}
1104

    
1105
        self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
1106
                                                 use_none=True)
1107

    
1108
        if not self.new_osp[os_name]:
1109
          # we removed all parameters
1110
          del self.new_osp[os_name]
1111
        else:
1112
          # check the parameter validity (remote check)
1113
          CheckOSParams(self, False, [self.cfg.GetMasterNode()],
1114
                        os_name, self.new_osp[os_name])
1115

    
1116
    # changes to the hypervisor list
1117
    if self.op.enabled_hypervisors is not None:
1118
      self.hv_list = self.op.enabled_hypervisors
1119
      for hv in self.hv_list:
1120
        # if the hypervisor doesn't already exist in the cluster
1121
        # hvparams, we initialize it to empty, and then (in both
1122
        # cases) we make sure to fill the defaults, as we might not
1123
        # have a complete defaults list if the hypervisor wasn't
1124
        # enabled before
1125
        if hv not in new_hvp:
1126
          new_hvp[hv] = {}
1127
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
1128
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
1129
    else:
1130
      self.hv_list = cluster.enabled_hypervisors
1131

    
1132
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1133
      # either the enabled list has changed, or the parameters have, validate
1134
      for hv_name, hv_params in self.new_hvparams.items():
1135
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1136
            (self.op.enabled_hypervisors and
1137
             hv_name in self.op.enabled_hypervisors)):
1138
          # either this is a new hypervisor, or its parameters have changed
1139
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1140
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1141
          hv_class.CheckParameterSyntax(hv_params)
1142
          CheckHVParams(self, node_uuids, hv_name, hv_params)
1143

    
1144
    self._CheckDiskTemplateConsistency()
1145

    
1146
    if self.op.os_hvp:
1147
      # no need to check any newly-enabled hypervisors, since the
1148
      # defaults have already been checked in the above code-block
1149
      for os_name, os_hvp in self.new_os_hvp.items():
1150
        for hv_name, hv_params in os_hvp.items():
1151
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1152
          # we need to fill in the new os_hvp on top of the actual hv_p
1153
          cluster_defaults = self.new_hvparams.get(hv_name, {})
1154
          new_osp = objects.FillDict(cluster_defaults, hv_params)
1155
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1156
          hv_class.CheckParameterSyntax(new_osp)
1157
          CheckHVParams(self, node_uuids, hv_name, new_osp)
1158

    
1159
    if self.op.default_iallocator:
1160
      alloc_script = utils.FindFile(self.op.default_iallocator,
1161
                                    constants.IALLOCATOR_SEARCH_PATH,
1162
                                    os.path.isfile)
1163
      if alloc_script is None:
1164
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
1165
                                   " specified" % self.op.default_iallocator,
1166
                                   errors.ECODE_INVAL)
1167

    
1168
  def _CheckDiskTemplateConsistency(self):
1169
    """Check whether the disk templates that are going to be disabled
1170
       are still in use by some instances.
1171

1172
    """
1173
    if self.op.enabled_disk_templates:
1174
      cluster = self.cfg.GetClusterInfo()
1175
      instances = self.cfg.GetAllInstancesInfo()
1176

    
1177
      disk_templates_to_remove = set(cluster.enabled_disk_templates) \
1178
        - set(self.op.enabled_disk_templates)
1179
      for instance in instances.itervalues():
1180
        if instance.disk_template in disk_templates_to_remove:
1181
          raise errors.OpPrereqError("Cannot disable disk template '%s',"
1182
                                     " because instance '%s' is using it." %
1183
                                     (instance.disk_template, instance.name))
1184

    
1185
  def _SetVgName(self, feedback_fn):
1186
    """Determines and sets the new volume group name.
1187

1188
    """
1189
    if self.op.vg_name is not None:
1190
      new_volume = self.op.vg_name
1191
      if not new_volume:
1192
        new_volume = None
1193
      if new_volume != self.cfg.GetVGName():
1194
        self.cfg.SetVGName(new_volume)
1195
      else:
1196
        feedback_fn("Cluster LVM configuration already in desired"
1197
                    " state, not changing")
1198

    
1199
  def _SetFileStorageDir(self, feedback_fn):
1200
    """Set the file storage directory.
1201

1202
    """
1203
    if self.op.file_storage_dir is not None:
1204
      if self.cluster.file_storage_dir == self.op.file_storage_dir:
1205
        feedback_fn("Global file storage dir already set to value '%s'"
1206
                    % self.cluster.file_storage_dir)
1207
      else:
1208
        self.cluster.file_storage_dir = self.op.file_storage_dir
1209

    
1210
  def _SetDrbdHelper(self, feedback_fn):
1211
    """Set the DRBD usermode helper.
1212

1213
    """
1214
    if self.op.drbd_helper is not None:
1215
      if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
1216
        feedback_fn("Note that you specified a drbd user helper, but did not"
1217
                    " enable the drbd disk template.")
1218
      new_helper = self.op.drbd_helper
1219
      if not new_helper:
1220
        new_helper = None
1221
      if new_helper != self.cfg.GetDRBDHelper():
1222
        self.cfg.SetDRBDHelper(new_helper)
1223
      else:
1224
        feedback_fn("Cluster DRBD helper already in desired state,"
1225
                    " not changing")
1226

    
1227
  def Exec(self, feedback_fn):
1228
    """Change the parameters of the cluster.
1229

1230
    """
1231
    if self.op.enabled_disk_templates:
1232
      self.cluster.enabled_disk_templates = \
1233
        list(self.op.enabled_disk_templates)
1234

    
1235
    self._SetVgName(feedback_fn)
1236
    self._SetFileStorageDir(feedback_fn)
1237
    self._SetDrbdHelper(feedback_fn)
1238

    
1239
    if self.op.hvparams:
1240
      self.cluster.hvparams = self.new_hvparams
1241
    if self.op.os_hvp:
1242
      self.cluster.os_hvp = self.new_os_hvp
1243
    if self.op.enabled_hypervisors is not None:
1244
      self.cluster.hvparams = self.new_hvparams
1245
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1246
    if self.op.beparams:
1247
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1248
    if self.op.nicparams:
1249
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1250
    if self.op.ipolicy:
1251
      self.cluster.ipolicy = self.new_ipolicy
1252
    if self.op.osparams:
1253
      self.cluster.osparams = self.new_osp
1254
    if self.op.ndparams:
1255
      self.cluster.ndparams = self.new_ndparams
1256
    if self.op.diskparams:
1257
      self.cluster.diskparams = self.new_diskparams
1258
    if self.op.hv_state:
1259
      self.cluster.hv_state_static = self.new_hv_state
1260
    if self.op.disk_state:
1261
      self.cluster.disk_state_static = self.new_disk_state
1262

    
1263
    if self.op.candidate_pool_size is not None:
1264
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1265
      # we need to update the pool size here, otherwise the save will fail
1266
      AdjustCandidatePool(self, [])
1267

    
1268
    if self.op.maintain_node_health is not None:
1269
      if self.op.maintain_node_health and not constants.ENABLE_CONFD:
1270
        feedback_fn("Note: CONFD was disabled at build time, node health"
1271
                    " maintenance is not useful (still enabling it)")
1272
      self.cluster.maintain_node_health = self.op.maintain_node_health
1273

    
1274
    if self.op.modify_etc_hosts is not None:
1275
      self.cluster.modify_etc_hosts = self.op.modify_etc_hosts
1276

    
1277
    if self.op.prealloc_wipe_disks is not None:
1278
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
1279

    
1280
    if self.op.add_uids is not None:
1281
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
1282

    
1283
    if self.op.remove_uids is not None:
1284
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
1285

    
1286
    if self.op.uid_pool is not None:
1287
      self.cluster.uid_pool = self.op.uid_pool
1288

    
1289
    if self.op.default_iallocator is not None:
1290
      self.cluster.default_iallocator = self.op.default_iallocator
1291

    
1292
    if self.op.default_iallocator_params is not None:
1293
      self.cluster.default_iallocator_params = self.op.default_iallocator_params
1294

    
1295
    if self.op.reserved_lvs is not None:
1296
      self.cluster.reserved_lvs = self.op.reserved_lvs
1297

    
1298
    if self.op.use_external_mip_script is not None:
1299
      self.cluster.use_external_mip_script = self.op.use_external_mip_script
1300

    
1301
    def helper_os(aname, mods, desc):
1302
      desc += " OS list"
1303
      lst = getattr(self.cluster, aname)
1304
      for key, val in mods:
1305
        if key == constants.DDM_ADD:
1306
          if val in lst:
1307
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1308
          else:
1309
            lst.append(val)
1310
        elif key == constants.DDM_REMOVE:
1311
          if val in lst:
1312
            lst.remove(val)
1313
          else:
1314
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1315
        else:
1316
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
1317

    
1318
    if self.op.hidden_os:
1319
      helper_os("hidden_os", self.op.hidden_os, "hidden")
1320

    
1321
    if self.op.blacklisted_os:
1322
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1323

    
1324
    if self.op.master_netdev:
1325
      master_params = self.cfg.GetMasterNetworkParameters()
1326
      ems = self.cfg.GetUseExternalMipScript()
1327
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
1328
                  self.cluster.master_netdev)
1329
      result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
1330
                                                       master_params, ems)
1331
      if not self.op.force:
1332
        result.Raise("Could not disable the master ip")
1333
      else:
1334
        if result.fail_msg:
1335
          msg = ("Could not disable the master ip (continuing anyway): %s" %
1336
                 result.fail_msg)
1337
          feedback_fn(msg)
1338
      feedback_fn("Changing master_netdev from %s to %s" %
1339
                  (master_params.netdev, self.op.master_netdev))
1340
      self.cluster.master_netdev = self.op.master_netdev
1341

    
1342
    if self.op.master_netmask:
1343
      master_params = self.cfg.GetMasterNetworkParameters()
1344
      feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1345
      result = self.rpc.call_node_change_master_netmask(
1346
                 master_params.uuid, master_params.netmask,
1347
                 self.op.master_netmask, master_params.ip,
1348
                 master_params.netdev)
1349
      result.Warn("Could not change the master IP netmask", feedback_fn)
1350
      self.cluster.master_netmask = self.op.master_netmask
1351

    
1352
    self.cfg.Update(self.cluster, feedback_fn)
1353

    
1354
    if self.op.master_netdev:
1355
      master_params = self.cfg.GetMasterNetworkParameters()
1356
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
1357
                  self.op.master_netdev)
1358
      ems = self.cfg.GetUseExternalMipScript()
1359
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
1360
                                                     master_params, ems)
1361
      result.Warn("Could not re-enable the master ip on the master,"
1362
                  " please restart manually", self.LogWarning)
1363

    
1364

    
1365
class LUClusterVerify(NoHooksLU):
1366
  """Submits all jobs necessary to verify the cluster.
1367

1368
  """
1369
  REQ_BGL = False
1370

    
1371
  def ExpandNames(self):
1372
    self.needed_locks = {}
1373

    
1374
  def Exec(self, feedback_fn):
1375
    jobs = []
1376

    
1377
    if self.op.group_name:
1378
      groups = [self.op.group_name]
1379
      depends_fn = lambda: None
1380
    else:
1381
      groups = self.cfg.GetNodeGroupList()
1382

    
1383
      # Verify global configuration
1384
      jobs.append([
1385
        opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1386
        ])
1387

    
1388
      # Always depend on global verification
1389
      depends_fn = lambda: [(-len(jobs), [])]
1390

    
1391
    jobs.extend(
1392
      [opcodes.OpClusterVerifyGroup(group_name=group,
1393
                                    ignore_errors=self.op.ignore_errors,
1394
                                    depends=depends_fn())]
1395
      for group in groups)
1396

    
1397
    # Fix up all parameters
1398
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1399
      op.debug_simulate_errors = self.op.debug_simulate_errors
1400
      op.verbose = self.op.verbose
1401
      op.error_codes = self.op.error_codes
1402
      try:
1403
        op.skip_checks = self.op.skip_checks
1404
      except AttributeError:
1405
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1406

    
1407
    return ResultWithJobs(jobs)
1408

    
1409

    
1410
class _VerifyErrors(object):
1411
  """Mix-in for cluster/group verify LUs.
1412

1413
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1414
  self.op and self._feedback_fn to be available.)
1415

1416
  """
1417

    
1418
  ETYPE_FIELD = "code"
1419
  ETYPE_ERROR = "ERROR"
1420
  ETYPE_WARNING = "WARNING"
1421

    
1422
  def _Error(self, ecode, item, msg, *args, **kwargs):
1423
    """Format an error message.
1424

1425
    Based on the opcode's error_codes parameter, either format a
1426
    parseable error code, or a simpler error string.
1427

1428
    This must be called only from Exec and functions called from Exec.
1429

1430
    """
1431
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1432
    itype, etxt, _ = ecode
1433
    # If the error code is in the list of ignored errors, demote the error to a
1434
    # warning
1435
    if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1436
      ltype = self.ETYPE_WARNING
1437
    # first complete the msg
1438
    if args:
1439
      msg = msg % args
1440
    # then format the whole message
1441
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1442
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1443
    else:
1444
      if item:
1445
        item = " " + item
1446
      else:
1447
        item = ""
1448
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1449
    # and finally report it via the feedback_fn
1450
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1451
    # do not mark the operation as failed for WARN cases only
1452
    if ltype == self.ETYPE_ERROR:
1453
      self.bad = True
1454

    
1455
  def _ErrorIf(self, cond, *args, **kwargs):
1456
    """Log an error message if the passed condition is True.
1457

1458
    """
1459
    if (bool(cond)
1460
        or self.op.debug_simulate_errors): # pylint: disable=E1101
1461
      self._Error(*args, **kwargs)
1462

    
1463

    
1464
def _VerifyCertificate(filename):
1465
  """Verifies a certificate for L{LUClusterVerifyConfig}.
1466

1467
  @type filename: string
1468
  @param filename: Path to PEM file
1469

1470
  """
1471
  try:
1472
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1473
                                           utils.ReadFile(filename))
1474
  except Exception, err: # pylint: disable=W0703
1475
    return (LUClusterVerifyConfig.ETYPE_ERROR,
1476
            "Failed to load X509 certificate %s: %s" % (filename, err))
1477

    
1478
  (errcode, msg) = \
1479
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1480
                                constants.SSL_CERT_EXPIRATION_ERROR)
1481

    
1482
  if msg:
1483
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1484
  else:
1485
    fnamemsg = None
1486

    
1487
  if errcode is None:
1488
    return (None, fnamemsg)
1489
  elif errcode == utils.CERT_WARNING:
1490
    return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1491
  elif errcode == utils.CERT_ERROR:
1492
    return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1493

    
1494
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1495

    
1496

    
1497
def _GetAllHypervisorParameters(cluster, instances):
1498
  """Compute the set of all hypervisor parameters.
1499

1500
  @type cluster: L{objects.Cluster}
1501
  @param cluster: the cluster object
1502
  @param instances: list of L{objects.Instance}
1503
  @param instances: additional instances from which to obtain parameters
1504
  @rtype: list of (origin, hypervisor, parameters)
1505
  @return: a list with all parameters found, indicating the hypervisor they
1506
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1507

1508
  """
1509
  hvp_data = []
1510

    
1511
  for hv_name in cluster.enabled_hypervisors:
1512
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1513

    
1514
  for os_name, os_hvp in cluster.os_hvp.items():
1515
    for hv_name, hv_params in os_hvp.items():
1516
      if hv_params:
1517
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1518
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1519

    
1520
  # TODO: collapse identical parameter values in a single one
1521
  for instance in instances:
1522
    if instance.hvparams:
1523
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1524
                       cluster.FillHV(instance)))
1525

    
1526
  return hvp_data
1527

    
1528

    
1529
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1530
  """Verifies the cluster config.
1531

1532
  """
1533
  REQ_BGL = False
1534

    
1535
  def _VerifyHVP(self, hvp_data):
1536
    """Verifies locally the syntax of the hypervisor parameters.
1537

1538
    """
1539
    for item, hv_name, hv_params in hvp_data:
1540
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1541
             (item, hv_name))
1542
      try:
1543
        hv_class = hypervisor.GetHypervisorClass(hv_name)
1544
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1545
        hv_class.CheckParameterSyntax(hv_params)
1546
      except errors.GenericError, err:
1547
        self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1548

    
1549
  def ExpandNames(self):
1550
    self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1551
    self.share_locks = ShareAll()
1552

    
1553
  def CheckPrereq(self):
1554
    """Check prerequisites.
1555

1556
    """
1557
    # Retrieve all information
1558
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1559
    self.all_node_info = self.cfg.GetAllNodesInfo()
1560
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1561

    
1562
  def Exec(self, feedback_fn):
1563
    """Verify integrity of cluster, performing various test on nodes.
1564

1565
    """
1566
    self.bad = False
1567
    self._feedback_fn = feedback_fn
1568

    
1569
    feedback_fn("* Verifying cluster config")
1570

    
1571
    for msg in self.cfg.VerifyConfig():
1572
      self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1573

    
1574
    feedback_fn("* Verifying cluster certificate files")
1575

    
1576
    for cert_filename in pathutils.ALL_CERT_FILES:
1577
      (errcode, msg) = _VerifyCertificate(cert_filename)
1578
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1579

    
1580
    self._ErrorIf(not utils.CanRead(constants.LUXID_USER,
1581
                                    pathutils.NODED_CERT_FILE),
1582
                  constants.CV_ECLUSTERCERT,
1583
                  None,
1584
                  pathutils.NODED_CERT_FILE + " must be accessible by the " +
1585
                    constants.LUXID_USER + " user")
1586

    
1587
    feedback_fn("* Verifying hypervisor parameters")
1588

    
1589
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1590
                                                self.all_inst_info.values()))
1591

    
1592
    feedback_fn("* Verifying all nodes belong to an existing group")
1593

    
1594
    # We do this verification here because, should this bogus circumstance
1595
    # occur, it would never be caught by VerifyGroup, which only acts on
1596
    # nodes/instances reachable from existing node groups.
1597

    
1598
    dangling_nodes = set(node for node in self.all_node_info.values()
1599
                         if node.group not in self.all_group_info)
1600

    
1601
    dangling_instances = {}
1602
    no_node_instances = []
1603

    
1604
    for inst in self.all_inst_info.values():
1605
      if inst.primary_node in [node.uuid for node in dangling_nodes]:
1606
        dangling_instances.setdefault(inst.primary_node, []).append(inst)
1607
      elif inst.primary_node not in self.all_node_info:
1608
        no_node_instances.append(inst)
1609

    
1610
    pretty_dangling = [
1611
        "%s (%s)" %
1612
        (node.name,
1613
         utils.CommaJoin(inst.name for
1614
                         inst in dangling_instances.get(node.uuid, [])))
1615
        for node in dangling_nodes]
1616

    
1617
    self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1618
                  None,
1619
                  "the following nodes (and their instances) belong to a non"
1620
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1621

    
1622
    self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1623
                  None,
1624
                  "the following instances have a non-existing primary-node:"
1625
                  " %s", utils.CommaJoin(inst.name for
1626
                                         inst in no_node_instances))
1627

    
1628
    return not self.bad
1629

    
1630

    
1631
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1632
  """Verifies the status of a node group.
1633

1634
  """
1635
  HPATH = "cluster-verify"
1636
  HTYPE = constants.HTYPE_CLUSTER
1637
  REQ_BGL = False
1638

    
1639
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1640

    
1641
  class NodeImage(object):
1642
    """A class representing the logical and physical status of a node.
1643

1644
    @type uuid: string
1645
    @ivar uuid: the node UUID to which this object refers
1646
    @ivar volumes: a structure as returned from
1647
        L{ganeti.backend.GetVolumeList} (runtime)
1648
    @ivar instances: a list of running instances (runtime)
1649
    @ivar pinst: list of configured primary instances (config)
1650
    @ivar sinst: list of configured secondary instances (config)
1651
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1652
        instances for which this node is secondary (config)
1653
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1654
    @ivar dfree: free disk, as reported by the node (runtime)
1655
    @ivar offline: the offline status (config)
1656
    @type rpc_fail: boolean
1657
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1658
        not whether the individual keys were correct) (runtime)
1659
    @type lvm_fail: boolean
1660
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1661
    @type hyp_fail: boolean
1662
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1663
    @type ghost: boolean
1664
    @ivar ghost: whether this is a known node or not (config)
1665
    @type os_fail: boolean
1666
    @ivar os_fail: whether the RPC call didn't return valid OS data
1667
    @type oslist: list
1668
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1669
    @type vm_capable: boolean
1670
    @ivar vm_capable: whether the node can host instances
1671
    @type pv_min: float
1672
    @ivar pv_min: size in MiB of the smallest PVs
1673
    @type pv_max: float
1674
    @ivar pv_max: size in MiB of the biggest PVs
1675

1676
    """
1677
    def __init__(self, offline=False, uuid=None, vm_capable=True):
1678
      self.uuid = uuid
1679
      self.volumes = {}
1680
      self.instances = []
1681
      self.pinst = []
1682
      self.sinst = []
1683
      self.sbp = {}
1684
      self.mfree = 0
1685
      self.dfree = 0
1686
      self.offline = offline
1687
      self.vm_capable = vm_capable
1688
      self.rpc_fail = False
1689
      self.lvm_fail = False
1690
      self.hyp_fail = False
1691
      self.ghost = False
1692
      self.os_fail = False
1693
      self.oslist = {}
1694
      self.pv_min = None
1695
      self.pv_max = None
1696

    
1697
  def ExpandNames(self):
1698
    # This raises errors.OpPrereqError on its own:
1699
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1700

    
1701
    # Get instances in node group; this is unsafe and needs verification later
1702
    inst_uuids = \
1703
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1704

    
1705
    self.needed_locks = {
1706
      locking.LEVEL_INSTANCE: self.cfg.GetInstanceNames(inst_uuids),
1707
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1708
      locking.LEVEL_NODE: [],
1709

    
1710
      # This opcode is run by watcher every five minutes and acquires all nodes
1711
      # for a group. It doesn't run for a long time, so it's better to acquire
1712
      # the node allocation lock as well.
1713
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1714
      }
1715

    
1716
    self.share_locks = ShareAll()
1717

    
1718
  def DeclareLocks(self, level):
1719
    if level == locking.LEVEL_NODE:
1720
      # Get members of node group; this is unsafe and needs verification later
1721
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1722

    
1723
      # In Exec(), we warn about mirrored instances that have primary and
1724
      # secondary living in separate node groups. To fully verify that
1725
      # volumes for these instances are healthy, we will need to do an
1726
      # extra call to their secondaries. We ensure here those nodes will
1727
      # be locked.
1728
      for inst_name in self.owned_locks(locking.LEVEL_INSTANCE):
1729
        # Important: access only the instances whose lock is owned
1730
        instance = self.cfg.GetInstanceInfoByName(inst_name)
1731
        if instance.disk_template in constants.DTS_INT_MIRROR:
1732
          nodes.update(instance.secondary_nodes)
1733

    
1734
      self.needed_locks[locking.LEVEL_NODE] = nodes
1735

    
1736
  def CheckPrereq(self):
1737
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1738
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1739

    
1740
    group_node_uuids = set(self.group_info.members)
1741
    group_inst_uuids = \
1742
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1743

    
1744
    unlocked_node_uuids = \
1745
        group_node_uuids.difference(self.owned_locks(locking.LEVEL_NODE))
1746

    
1747
    unlocked_inst_uuids = \
1748
        group_inst_uuids.difference(
1749
          [self.cfg.GetInstanceInfoByName(name).uuid
1750
           for name in self.owned_locks(locking.LEVEL_INSTANCE)])
1751

    
1752
    if unlocked_node_uuids:
1753
      raise errors.OpPrereqError(
1754
        "Missing lock for nodes: %s" %
1755
        utils.CommaJoin(self.cfg.GetNodeNames(unlocked_node_uuids)),
1756
        errors.ECODE_STATE)
1757

    
1758
    if unlocked_inst_uuids:
1759
      raise errors.OpPrereqError(
1760
        "Missing lock for instances: %s" %
1761
        utils.CommaJoin(self.cfg.GetInstanceNames(unlocked_inst_uuids)),
1762
        errors.ECODE_STATE)
1763

    
1764
    self.all_node_info = self.cfg.GetAllNodesInfo()
1765
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1766

    
1767
    self.my_node_uuids = group_node_uuids
1768
    self.my_node_info = dict((node_uuid, self.all_node_info[node_uuid])
1769
                             for node_uuid in group_node_uuids)
1770

    
1771
    self.my_inst_uuids = group_inst_uuids
1772
    self.my_inst_info = dict((inst_uuid, self.all_inst_info[inst_uuid])
1773
                             for inst_uuid in group_inst_uuids)
1774

    
1775
    # We detect here the nodes that will need the extra RPC calls for verifying
1776
    # split LV volumes; they should be locked.
1777
    extra_lv_nodes = set()
1778

    
1779
    for inst in self.my_inst_info.values():
1780
      if inst.disk_template in constants.DTS_INT_MIRROR:
1781
        for nuuid in inst.all_nodes:
1782
          if self.all_node_info[nuuid].group != self.group_uuid:
1783
            extra_lv_nodes.add(nuuid)
1784

    
1785
    unlocked_lv_nodes = \
1786
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1787

    
1788
    if unlocked_lv_nodes:
1789
      raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1790
                                 utils.CommaJoin(unlocked_lv_nodes),
1791
                                 errors.ECODE_STATE)
1792
    self.extra_lv_nodes = list(extra_lv_nodes)
1793

    
1794
  def _VerifyNode(self, ninfo, nresult):
1795
    """Perform some basic validation on data returned from a node.
1796

1797
      - check the result data structure is well formed and has all the
1798
        mandatory fields
1799
      - check ganeti version
1800

1801
    @type ninfo: L{objects.Node}
1802
    @param ninfo: the node to check
1803
    @param nresult: the results from the node
1804
    @rtype: boolean
1805
    @return: whether overall this call was successful (and we can expect
1806
         reasonable values in the respose)
1807

1808
    """
1809
    # main result, nresult should be a non-empty dict
1810
    test = not nresult or not isinstance(nresult, dict)
1811
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1812
                  "unable to verify node: no data returned")
1813
    if test:
1814
      return False
1815

    
1816
    # compares ganeti version
1817
    local_version = constants.PROTOCOL_VERSION
1818
    remote_version = nresult.get("version", None)
1819
    test = not (remote_version and
1820
                isinstance(remote_version, (list, tuple)) and
1821
                len(remote_version) == 2)
1822
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1823
                  "connection to node returned invalid data")
1824
    if test:
1825
      return False
1826

    
1827
    test = local_version != remote_version[0]
1828
    self._ErrorIf(test, constants.CV_ENODEVERSION, ninfo.name,
1829
                  "incompatible protocol versions: master %s,"
1830
                  " node %s", local_version, remote_version[0])
1831
    if test:
1832
      return False
1833

    
1834
    # node seems compatible, we can actually try to look into its results
1835

    
1836
    # full package version
1837
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1838
                  constants.CV_ENODEVERSION, ninfo.name,
1839
                  "software version mismatch: master %s, node %s",
1840
                  constants.RELEASE_VERSION, remote_version[1],
1841
                  code=self.ETYPE_WARNING)
1842

    
1843
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1844
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1845
      for hv_name, hv_result in hyp_result.iteritems():
1846
        test = hv_result is not None
1847
        self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
1848
                      "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1849

    
1850
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1851
    if ninfo.vm_capable and isinstance(hvp_result, list):
1852
      for item, hv_name, hv_result in hvp_result:
1853
        self._ErrorIf(True, constants.CV_ENODEHV, ninfo.name,
1854
                      "hypervisor %s parameter verify failure (source %s): %s",
1855
                      hv_name, item, hv_result)
1856

    
1857
    test = nresult.get(constants.NV_NODESETUP,
1858
                       ["Missing NODESETUP results"])
1859
    self._ErrorIf(test, constants.CV_ENODESETUP, ninfo.name,
1860
                  "node setup error: %s", "; ".join(test))
1861

    
1862
    return True
1863

    
1864
  def _VerifyNodeTime(self, ninfo, nresult,
1865
                      nvinfo_starttime, nvinfo_endtime):
1866
    """Check the node time.
1867

1868
    @type ninfo: L{objects.Node}
1869
    @param ninfo: the node to check
1870
    @param nresult: the remote results for the node
1871
    @param nvinfo_starttime: the start time of the RPC call
1872
    @param nvinfo_endtime: the end time of the RPC call
1873

1874
    """
1875
    ntime = nresult.get(constants.NV_TIME, None)
1876
    try:
1877
      ntime_merged = utils.MergeTime(ntime)
1878
    except (ValueError, TypeError):
1879
      self._ErrorIf(True, constants.CV_ENODETIME, ninfo.name,
1880
                    "Node returned invalid time")
1881
      return
1882

    
1883
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1884
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1885
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1886
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1887
    else:
1888
      ntime_diff = None
1889

    
1890
    self._ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, ninfo.name,
1891
                  "Node time diverges by at least %s from master node time",
1892
                  ntime_diff)
1893

    
1894
  def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1895
    """Check the node LVM results and update info for cross-node checks.
1896

1897
    @type ninfo: L{objects.Node}
1898
    @param ninfo: the node to check
1899
    @param nresult: the remote results for the node
1900
    @param vg_name: the configured VG name
1901
    @type nimg: L{NodeImage}
1902
    @param nimg: node image
1903

1904
    """
1905
    if vg_name is None:
1906
      return
1907

    
1908
    # checks vg existence and size > 20G
1909
    vglist = nresult.get(constants.NV_VGLIST, None)
1910
    test = not vglist
1911
    self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
1912
                  "unable to check volume groups")
1913
    if not test:
1914
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1915
                                            constants.MIN_VG_SIZE)
1916
      self._ErrorIf(vgstatus, constants.CV_ENODELVM, ninfo.name, vgstatus)
1917

    
1918
    # Check PVs
1919
    (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1920
    for em in errmsgs:
1921
      self._Error(constants.CV_ENODELVM, ninfo.name, em)
1922
    if pvminmax is not None:
1923
      (nimg.pv_min, nimg.pv_max) = pvminmax
1924

    
1925
  def _VerifyGroupDRBDVersion(self, node_verify_infos):
1926
    """Check cross-node DRBD version consistency.
1927

1928
    @type node_verify_infos: dict
1929
    @param node_verify_infos: infos about nodes as returned from the
1930
      node_verify call.
1931

1932
    """
1933
    node_versions = {}
1934
    for node_uuid, ndata in node_verify_infos.items():
1935
      nresult = ndata.payload
1936
      if nresult:
1937
        version = nresult.get(constants.NV_DRBDVERSION, "Missing DRBD version")
1938
        node_versions[node_uuid] = version
1939

    
1940
    if len(set(node_versions.values())) > 1:
1941
      for node_uuid, version in sorted(node_versions.items()):
1942
        msg = "DRBD version mismatch: %s" % version
1943
        self._Error(constants.CV_ENODEDRBDHELPER, node_uuid, msg,
1944
                    code=self.ETYPE_WARNING)
1945

    
1946
  def _VerifyGroupLVM(self, node_image, vg_name):
1947
    """Check cross-node consistency in LVM.
1948

1949
    @type node_image: dict
1950
    @param node_image: info about nodes, mapping from node to names to
1951
      L{NodeImage} objects
1952
    @param vg_name: the configured VG name
1953

1954
    """
1955
    if vg_name is None:
1956
      return
1957

    
1958
    # Only exclusive storage needs this kind of checks
1959
    if not self._exclusive_storage:
1960
      return
1961

    
1962
    # exclusive_storage wants all PVs to have the same size (approximately),
1963
    # if the smallest and the biggest ones are okay, everything is fine.
1964
    # pv_min is None iff pv_max is None
1965
    vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1966
    if not vals:
1967
      return
1968
    (pvmin, minnode_uuid) = min((ni.pv_min, ni.uuid) for ni in vals)
1969
    (pvmax, maxnode_uuid) = max((ni.pv_max, ni.uuid) for ni in vals)
1970
    bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1971
    self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1972
                  "PV sizes differ too much in the group; smallest (%s MB) is"
1973
                  " on %s, biggest (%s MB) is on %s",
1974
                  pvmin, self.cfg.GetNodeName(minnode_uuid),
1975
                  pvmax, self.cfg.GetNodeName(maxnode_uuid))
1976

    
1977
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1978
    """Check the node bridges.
1979

1980
    @type ninfo: L{objects.Node}
1981
    @param ninfo: the node to check
1982
    @param nresult: the remote results for the node
1983
    @param bridges: the expected list of bridges
1984

1985
    """
1986
    if not bridges:
1987
      return
1988

    
1989
    missing = nresult.get(constants.NV_BRIDGES, None)
1990
    test = not isinstance(missing, list)
1991
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1992
                  "did not return valid bridge information")
1993
    if not test:
1994
      self._ErrorIf(bool(missing), constants.CV_ENODENET, ninfo.name,
1995
                    "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1996

    
1997
  def _VerifyNodeUserScripts(self, ninfo, nresult):
1998
    """Check the results of user scripts presence and executability on the node
1999

2000
    @type ninfo: L{objects.Node}
2001
    @param ninfo: the node to check
2002
    @param nresult: the remote results for the node
2003

2004
    """
2005
    test = not constants.NV_USERSCRIPTS in nresult
2006
    self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
2007
                  "did not return user scripts information")
2008

    
2009
    broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
2010
    if not test:
2011
      self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
2012
                    "user scripts not present or not executable: %s" %
2013
                    utils.CommaJoin(sorted(broken_scripts)))
2014

    
2015
  def _VerifyNodeNetwork(self, ninfo, nresult):
2016
    """Check the node network connectivity results.
2017

2018
    @type ninfo: L{objects.Node}
2019
    @param ninfo: the node to check
2020
    @param nresult: the remote results for the node
2021

2022
    """
2023
    test = constants.NV_NODELIST not in nresult
2024
    self._ErrorIf(test, constants.CV_ENODESSH, ninfo.name,
2025
                  "node hasn't returned node ssh connectivity data")
2026
    if not test:
2027
      if nresult[constants.NV_NODELIST]:
2028
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
2029
          self._ErrorIf(True, constants.CV_ENODESSH, ninfo.name,
2030
                        "ssh communication with node '%s': %s", a_node, a_msg)
2031

    
2032
    test = constants.NV_NODENETTEST not in nresult
2033
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
2034
                  "node hasn't returned node tcp connectivity data")
2035
    if not test:
2036
      if nresult[constants.NV_NODENETTEST]:
2037
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
2038
        for anode in nlist:
2039
          self._ErrorIf(True, constants.CV_ENODENET, ninfo.name,
2040
                        "tcp communication with node '%s': %s",
2041
                        anode, nresult[constants.NV_NODENETTEST][anode])
2042

    
2043
    test = constants.NV_MASTERIP not in nresult
2044
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
2045
                  "node hasn't returned node master IP reachability data")
2046
    if not test:
2047
      if not nresult[constants.NV_MASTERIP]:
2048
        if ninfo.uuid == self.master_node:
2049
          msg = "the master node cannot reach the master IP (not configured?)"
2050
        else:
2051
          msg = "cannot reach the master IP"
2052
        self._ErrorIf(True, constants.CV_ENODENET, ninfo.name, msg)
2053

    
2054
  def _VerifyInstance(self, instance, node_image, diskstatus):
2055
    """Verify an instance.
2056

2057
    This function checks to see if the required block devices are
2058
    available on the instance's node, and that the nodes are in the correct
2059
    state.
2060

2061
    """
2062
    pnode_uuid = instance.primary_node
2063
    pnode_img = node_image[pnode_uuid]
2064
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
2065

    
2066
    node_vol_should = {}
2067
    instance.MapLVsByNode(node_vol_should)
2068

    
2069
    cluster = self.cfg.GetClusterInfo()
2070
    ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2071
                                                            self.group_info)
2072
    err = ComputeIPolicyInstanceViolation(ipolicy, instance, self.cfg)
2073
    self._ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance.name,
2074
                  utils.CommaJoin(err), code=self.ETYPE_WARNING)
2075

    
2076
    for node_uuid in node_vol_should:
2077
      n_img = node_image[node_uuid]
2078
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
2079
        # ignore missing volumes on offline or broken nodes
2080
        continue
2081
      for volume in node_vol_should[node_uuid]:
2082
        test = volume not in n_img.volumes
2083
        self._ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance.name,
2084
                      "volume %s missing on node %s", volume,
2085
                      self.cfg.GetNodeName(node_uuid))
2086

    
2087
    if instance.admin_state == constants.ADMINST_UP:
2088
      test = instance.uuid not in pnode_img.instances and not pnode_img.offline
2089
      self._ErrorIf(test, constants.CV_EINSTANCEDOWN, instance.name,
2090
                    "instance not running on its primary node %s",
2091
                     self.cfg.GetNodeName(pnode_uuid))
2092
      self._ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE,
2093
                    instance.name, "instance is marked as running and lives on"
2094
                    " offline node %s", self.cfg.GetNodeName(pnode_uuid))
2095

    
2096
    diskdata = [(nname, success, status, idx)
2097
                for (nname, disks) in diskstatus.items()
2098
                for idx, (success, status) in enumerate(disks)]
2099

    
2100
    for nname, success, bdev_status, idx in diskdata:
2101
      # the 'ghost node' construction in Exec() ensures that we have a
2102
      # node here
2103
      snode = node_image[nname]
2104
      bad_snode = snode.ghost or snode.offline
2105
      self._ErrorIf(instance.disks_active and
2106
                    not success and not bad_snode,
2107
                    constants.CV_EINSTANCEFAULTYDISK, instance.name,
2108
                    "couldn't retrieve status for disk/%s on %s: %s",
2109
                    idx, self.cfg.GetNodeName(nname), bdev_status)
2110

    
2111
      if instance.disks_active and success and \
2112
         (bdev_status.is_degraded or
2113
          bdev_status.ldisk_status != constants.LDS_OKAY):
2114
        msg = "disk/%s on %s" % (idx, self.cfg.GetNodeName(nname))
2115
        if bdev_status.is_degraded:
2116
          msg += " is degraded"
2117
        if bdev_status.ldisk_status != constants.LDS_OKAY:
2118
          msg += "; state is '%s'" % \
2119
                 constants.LDS_NAMES[bdev_status.ldisk_status]
2120

    
2121
        self._Error(constants.CV_EINSTANCEFAULTYDISK, instance.name, msg)
2122

    
2123
    self._ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2124
                  constants.CV_ENODERPC, self.cfg.GetNodeName(pnode_uuid),
2125
                  "instance %s, connection to primary node failed",
2126
                  instance.name)
2127

    
2128
    self._ErrorIf(len(instance.secondary_nodes) > 1,
2129
                  constants.CV_EINSTANCELAYOUT, instance.name,
2130
                  "instance has multiple secondary nodes: %s",
2131
                  utils.CommaJoin(instance.secondary_nodes),
2132
                  code=self.ETYPE_WARNING)
2133

    
2134
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, instance.all_nodes)
2135
    if any(es_flags.values()):
2136
      if instance.disk_template not in constants.DTS_EXCL_STORAGE:
2137
        # Disk template not compatible with exclusive_storage: no instance
2138
        # node should have the flag set
2139
        es_nodes = [n
2140
                    for (n, es) in es_flags.items()
2141
                    if es]
2142
        self._Error(constants.CV_EINSTANCEUNSUITABLENODE, instance.name,
2143
                    "instance has template %s, which is not supported on nodes"
2144
                    " that have exclusive storage set: %s",
2145
                    instance.disk_template,
2146
                    utils.CommaJoin(self.cfg.GetNodeNames(es_nodes)))
2147
      for (idx, disk) in enumerate(instance.disks):
2148
        self._ErrorIf(disk.spindles is None,
2149
                      constants.CV_EINSTANCEMISSINGCFGPARAMETER, instance.name,
2150
                      "number of spindles not configured for disk %s while"
2151
                      " exclusive storage is enabled, try running"
2152
                      " gnt-cluster repair-disk-sizes", idx)
2153

    
2154
    if instance.disk_template in constants.DTS_INT_MIRROR:
2155
      instance_nodes = utils.NiceSort(instance.all_nodes)
2156
      instance_groups = {}
2157

    
2158
      for node_uuid in instance_nodes:
2159
        instance_groups.setdefault(self.all_node_info[node_uuid].group,
2160
                                   []).append(node_uuid)
2161

    
2162
      pretty_list = [
2163
        "%s (group %s)" % (utils.CommaJoin(self.cfg.GetNodeNames(nodes)),
2164
                           groupinfo[group].name)
2165
        # Sort so that we always list the primary node first.
2166
        for group, nodes in sorted(instance_groups.items(),
2167
                                   key=lambda (_, nodes): pnode_uuid in nodes,
2168
                                   reverse=True)]
2169

    
2170
      self._ErrorIf(len(instance_groups) > 1,
2171
                    constants.CV_EINSTANCESPLITGROUPS,
2172
                    instance.name, "instance has primary and secondary nodes in"
2173
                    " different groups: %s", utils.CommaJoin(pretty_list),
2174
                    code=self.ETYPE_WARNING)
2175

    
2176
    inst_nodes_offline = []
2177
    for snode in instance.secondary_nodes:
2178
      s_img = node_image[snode]
2179
      self._ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
2180
                    self.cfg.GetNodeName(snode),
2181
                    "instance %s, connection to secondary node failed",
2182
                    instance.name)
2183

    
2184
      if s_img.offline:
2185
        inst_nodes_offline.append(snode)
2186

    
2187
    # warn that the instance lives on offline nodes
2188
    self._ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE,
2189
                  instance.name, "instance has offline secondary node(s) %s",
2190
                  utils.CommaJoin(self.cfg.GetNodeNames(inst_nodes_offline)))
2191
    # ... or ghost/non-vm_capable nodes
2192
    for node_uuid in instance.all_nodes:
2193
      self._ErrorIf(node_image[node_uuid].ghost, constants.CV_EINSTANCEBADNODE,
2194
                    instance.name, "instance lives on ghost node %s",
2195
                    self.cfg.GetNodeName(node_uuid))
2196
      self._ErrorIf(not node_image[node_uuid].vm_capable,
2197
                    constants.CV_EINSTANCEBADNODE, instance.name,
2198
                    "instance lives on non-vm_capable node %s",
2199
                    self.cfg.GetNodeName(node_uuid))
2200

    
2201
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2202
    """Verify if there are any unknown volumes in the cluster.
2203

2204
    The .os, .swap and backup volumes are ignored. All other volumes are
2205
    reported as unknown.
2206

2207
    @type reserved: L{ganeti.utils.FieldSet}
2208
    @param reserved: a FieldSet of reserved volume names
2209

2210
    """
2211
    for node_uuid, n_img in node_image.items():
2212
      if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
2213
          self.all_node_info[node_uuid].group != self.group_uuid):
2214
        # skip non-healthy nodes
2215
        continue
2216
      for volume in n_img.volumes:
2217
        test = ((node_uuid not in node_vol_should or
2218
                volume not in node_vol_should[node_uuid]) and
2219
                not reserved.Matches(volume))
2220
        self._ErrorIf(test, constants.CV_ENODEORPHANLV,
2221
                      self.cfg.GetNodeName(node_uuid),
2222
                      "volume %s is unknown", volume,
2223
                      code=_VerifyErrors.ETYPE_WARNING)
2224

    
2225
  def _VerifyNPlusOneMemory(self, node_image, all_insts):
2226
    """Verify N+1 Memory Resilience.
2227

2228
    Check that if one single node dies we can still start all the
2229
    instances it was primary for.
2230

2231
    """
2232
    cluster_info = self.cfg.GetClusterInfo()
2233
    for node_uuid, n_img in node_image.items():
2234
      # This code checks that every node which is now listed as
2235
      # secondary has enough memory to host all instances it is
2236
      # supposed to should a single other node in the cluster fail.
2237
      # FIXME: not ready for failover to an arbitrary node
2238
      # FIXME: does not support file-backed instances
2239
      # WARNING: we currently take into account down instances as well
2240
      # as up ones, considering that even if they're down someone
2241
      # might want to start them even in the event of a node failure.
2242
      if n_img.offline or \
2243
         self.all_node_info[node_uuid].group != self.group_uuid:
2244
        # we're skipping nodes marked offline and nodes in other groups from
2245
        # the N+1 warning, since most likely we don't have good memory
2246
        # information from them; we already list instances living on such
2247
        # nodes, and that's enough warning
2248
        continue
2249
      #TODO(dynmem): also consider ballooning out other instances
2250
      for prinode, inst_uuids in n_img.sbp.items():
2251
        needed_mem = 0
2252
        for inst_uuid in inst_uuids:
2253
          bep = cluster_info.FillBE(all_insts[inst_uuid])
2254
          if bep[constants.BE_AUTO_BALANCE]:
2255
            needed_mem += bep[constants.BE_MINMEM]
2256
        test = n_img.mfree < needed_mem
2257
        self._ErrorIf(test, constants.CV_ENODEN1,
2258
                      self.cfg.GetNodeName(node_uuid),
2259
                      "not enough memory to accomodate instance failovers"
2260
                      " should node %s fail (%dMiB needed, %dMiB available)",
2261
                      self.cfg.GetNodeName(prinode), needed_mem, n_img.mfree)
2262

    
2263
  def _VerifyFiles(self, nodes, master_node_uuid, all_nvinfo,
2264
                   (files_all, files_opt, files_mc, files_vm)):
2265
    """Verifies file checksums collected from all nodes.
2266

2267
    @param nodes: List of L{objects.Node} objects
2268
    @param master_node_uuid: UUID of master node
2269
    @param all_nvinfo: RPC results
2270

2271
    """
2272
    # Define functions determining which nodes to consider for a file
2273
    files2nodefn = [
2274
      (files_all, None),
2275
      (files_mc, lambda node: (node.master_candidate or
2276
                               node.uuid == master_node_uuid)),
2277
      (files_vm, lambda node: node.vm_capable),
2278
      ]
2279

    
2280
    # Build mapping from filename to list of nodes which should have the file
2281
    nodefiles = {}
2282
    for (files, fn) in files2nodefn:
2283
      if fn is None:
2284
        filenodes = nodes
2285
      else:
2286
        filenodes = filter(fn, nodes)
2287
      nodefiles.update((filename,
2288
                        frozenset(map(operator.attrgetter("uuid"), filenodes)))
2289
                       for filename in files)
2290

    
2291
    assert set(nodefiles) == (files_all | files_mc | files_vm)
2292

    
2293
    fileinfo = dict((filename, {}) for filename in nodefiles)
2294
    ignore_nodes = set()
2295

    
2296
    for node in nodes:
2297
      if node.offline:
2298
        ignore_nodes.add(node.uuid)
2299
        continue
2300

    
2301
      nresult = all_nvinfo[node.uuid]
2302

    
2303
      if nresult.fail_msg or not nresult.payload:
2304
        node_files = None
2305
      else:
2306
        fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
2307
        node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2308
                          for (key, value) in fingerprints.items())
2309
        del fingerprints
2310

    
2311
      test = not (node_files and isinstance(node_files, dict))
2312
      self._ErrorIf(test, constants.CV_ENODEFILECHECK, node.name,
2313
                    "Node did not return file checksum data")
2314
      if test:
2315
        ignore_nodes.add(node.uuid)
2316
        continue
2317

    
2318
      # Build per-checksum mapping from filename to nodes having it
2319
      for (filename, checksum) in node_files.items():
2320
        assert filename in nodefiles
2321
        fileinfo[filename].setdefault(checksum, set()).add(node.uuid)
2322

    
2323
    for (filename, checksums) in fileinfo.items():
2324
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2325

    
2326
      # Nodes having the file
2327
      with_file = frozenset(node_uuid
2328
                            for node_uuids in fileinfo[filename].values()
2329
                            for node_uuid in node_uuids) - ignore_nodes
2330

    
2331
      expected_nodes = nodefiles[filename] - ignore_nodes
2332

    
2333
      # Nodes missing file
2334
      missing_file = expected_nodes - with_file
2335

    
2336
      if filename in files_opt:
2337
        # All or no nodes
2338
        self._ErrorIf(missing_file and missing_file != expected_nodes,
2339
                      constants.CV_ECLUSTERFILECHECK, None,
2340
                      "File %s is optional, but it must exist on all or no"
2341
                      " nodes (not found on %s)",
2342
                      filename,
2343
                      utils.CommaJoin(
2344
                        utils.NiceSort(
2345
                          map(self.cfg.GetNodeName, missing_file))))
2346
      else:
2347
        self._ErrorIf(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2348
                      "File %s is missing from node(s) %s", filename,
2349
                      utils.CommaJoin(
2350
                        utils.NiceSort(
2351
                          map(self.cfg.GetNodeName, missing_file))))
2352

    
2353
        # Warn if a node has a file it shouldn't
2354
        unexpected = with_file - expected_nodes
2355
        self._ErrorIf(unexpected,
2356
                      constants.CV_ECLUSTERFILECHECK, None,
2357
                      "File %s should not exist on node(s) %s",
2358
                      filename, utils.CommaJoin(
2359
                        utils.NiceSort(map(self.cfg.GetNodeName, unexpected))))
2360

    
2361
      # See if there are multiple versions of the file
2362
      test = len(checksums) > 1
2363
      if test:
2364
        variants = ["variant %s on %s" %
2365
                    (idx + 1,
2366
                     utils.CommaJoin(utils.NiceSort(
2367
                       map(self.cfg.GetNodeName, node_uuids))))
2368
                    for (idx, (checksum, node_uuids)) in
2369
                      enumerate(sorted(checksums.items()))]
2370
      else:
2371
        variants = []
2372

    
2373
      self._ErrorIf(test, constants.CV_ECLUSTERFILECHECK, None,
2374
                    "File %s found with %s different checksums (%s)",
2375
                    filename, len(checksums), "; ".join(variants))
2376

    
2377
  def _VerifyNodeDrbdHelper(self, ninfo, nresult, drbd_helper):
2378
    """Verify the drbd helper.
2379

2380
    """
2381
    if drbd_helper:
2382
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2383
      test = (helper_result is None)
2384
      self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2385
                    "no drbd usermode helper returned")
2386
      if helper_result:
2387
        status, payload = helper_result
2388
        test = not status
2389
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2390
                      "drbd usermode helper check unsuccessful: %s", payload)
2391
        test = status and (payload != drbd_helper)
2392
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2393
                      "wrong drbd usermode helper: %s", payload)
2394

    
2395
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2396
                      drbd_map):
2397
    """Verifies and the node DRBD status.
2398

2399
    @type ninfo: L{objects.Node}
2400
    @param ninfo: the node to check
2401
    @param nresult: the remote results for the node
2402
    @param instanceinfo: the dict of instances
2403
    @param drbd_helper: the configured DRBD usermode helper
2404
    @param drbd_map: the DRBD map as returned by
2405
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2406

2407
    """
2408
    self._VerifyNodeDrbdHelper(ninfo, nresult, drbd_helper)
2409

    
2410
    # compute the DRBD minors
2411
    node_drbd = {}
2412
    for minor, inst_uuid in drbd_map[ninfo.uuid].items():
2413
      test = inst_uuid not in instanceinfo
2414
      self._ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2415
                    "ghost instance '%s' in temporary DRBD map", inst_uuid)
2416
        # ghost instance should not be running, but otherwise we
2417
        # don't give double warnings (both ghost instance and
2418
        # unallocated minor in use)
2419
      if test:
2420
        node_drbd[minor] = (inst_uuid, False)
2421
      else:
2422
        instance = instanceinfo[inst_uuid]
2423
        node_drbd[minor] = (inst_uuid, instance.disks_active)
2424

    
2425
    # and now check them
2426
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2427
    test = not isinstance(used_minors, (tuple, list))
2428
    self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2429
                  "cannot parse drbd status file: %s", str(used_minors))
2430
    if test:
2431
      # we cannot check drbd status
2432
      return
2433

    
2434
    for minor, (inst_uuid, must_exist) in node_drbd.items():
2435
      test = minor not in used_minors and must_exist
2436
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2437
                    "drbd minor %d of instance %s is not active", minor,
2438
                    self.cfg.GetInstanceName(inst_uuid))
2439
    for minor in used_minors:
2440
      test = minor not in node_drbd
2441
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2442
                    "unallocated drbd minor %d is in use", minor)
2443

    
2444
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2445
    """Builds the node OS structures.
2446

2447
    @type ninfo: L{objects.Node}
2448
    @param ninfo: the node to check
2449
    @param nresult: the remote results for the node
2450
    @param nimg: the node image object
2451

2452
    """
2453
    remote_os = nresult.get(constants.NV_OSLIST, None)
2454
    test = (not isinstance(remote_os, list) or
2455
            not compat.all(isinstance(v, list) and len(v) == 7
2456
                           for v in remote_os))
2457

    
2458
    self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2459
                  "node hasn't returned valid OS data")
2460

    
2461
    nimg.os_fail = test
2462

    
2463
    if test:
2464
      return
2465

    
2466
    os_dict = {}
2467

    
2468
    for (name, os_path, status, diagnose,
2469
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2470

    
2471
      if name not in os_dict:
2472
        os_dict[name] = []
2473

    
2474
      # parameters is a list of lists instead of list of tuples due to
2475
      # JSON lacking a real tuple type, fix it:
2476
      parameters = [tuple(v) for v in parameters]
2477
      os_dict[name].append((os_path, status, diagnose,
2478
                            set(variants), set(parameters), set(api_ver)))
2479

    
2480
    nimg.oslist = os_dict
2481

    
2482
  def _VerifyNodeOS(self, ninfo, nimg, base):
2483
    """Verifies the node OS list.
2484

2485
    @type ninfo: L{objects.Node}
2486
    @param ninfo: the node to check
2487
    @param nimg: the node image object
2488
    @param base: the 'template' node we match against (e.g. from the master)
2489

2490
    """
2491
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2492

    
2493
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2494
    for os_name, os_data in nimg.oslist.items():
2495
      assert os_data, "Empty OS status for OS %s?!" % os_name
2496
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2497
      self._ErrorIf(not f_status, constants.CV_ENODEOS, ninfo.name,
2498
                    "Invalid OS %s (located at %s): %s",
2499
                    os_name, f_path, f_diag)
2500
      self._ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, ninfo.name,
2501
                    "OS '%s' has multiple entries"
2502
                    " (first one shadows the rest): %s",
2503
                    os_name, utils.CommaJoin([v[0] for v in os_data]))
2504
      # comparisons with the 'base' image
2505
      test = os_name not in base.oslist
2506
      self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2507
                    "Extra OS %s not present on reference node (%s)",
2508
                    os_name, self.cfg.GetNodeName(base.uuid))
2509
      if test:
2510
        continue
2511
      assert base.oslist[os_name], "Base node has empty OS status?"
2512
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2513
      if not b_status:
2514
        # base OS is invalid, skipping
2515
        continue
2516
      for kind, a, b in [("API version", f_api, b_api),
2517
                         ("variants list", f_var, b_var),
2518
                         ("parameters", beautify_params(f_param),
2519
                          beautify_params(b_param))]:
2520
        self._ErrorIf(a != b, constants.CV_ENODEOS, ninfo.name,
2521
                      "OS %s for %s differs from reference node %s:"
2522
                      " [%s] vs. [%s]", kind, os_name,
2523
                      self.cfg.GetNodeName(base.uuid),
2524
                      utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2525

    
2526
    # check any missing OSes
2527
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2528
    self._ErrorIf(missing, constants.CV_ENODEOS, ninfo.name,
2529
                  "OSes present on reference node %s"
2530
                  " but missing on this node: %s",
2531
                  self.cfg.GetNodeName(base.uuid), utils.CommaJoin(missing))
2532

    
2533
  def _VerifyAcceptedFileStoragePaths(self, ninfo, nresult, is_master):
2534
    """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2535

2536
    @type ninfo: L{objects.Node}
2537
    @param ninfo: the node to check
2538
    @param nresult: the remote results for the node
2539
    @type is_master: bool
2540
    @param is_master: Whether node is the master node
2541

2542
    """
2543
    cluster = self.cfg.GetClusterInfo()
2544
    if (is_master and
2545
        (cluster.IsFileStorageEnabled() or
2546
         cluster.IsSharedFileStorageEnabled())):
2547
      try:
2548
        fspaths = nresult[constants.NV_ACCEPTED_STORAGE_PATHS]
2549
      except KeyError:
2550
        # This should never happen
2551
        self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2552
                      "Node did not return forbidden file storage paths")
2553
      else:
2554
        self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2555
                      "Found forbidden file storage paths: %s",
2556
                      utils.CommaJoin(fspaths))
2557
    else:
2558
      self._ErrorIf(constants.NV_ACCEPTED_STORAGE_PATHS in nresult,
2559
                    constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2560
                    "Node should not have returned forbidden file storage"
2561
                    " paths")
2562

    
2563
  def _VerifyStoragePaths(self, ninfo, nresult, file_disk_template,
2564
                          verify_key, error_key):
2565
    """Verifies (file) storage paths.
2566

2567
    @type ninfo: L{objects.Node}
2568
    @param ninfo: the node to check
2569
    @param nresult: the remote results for the node
2570
    @type file_disk_template: string
2571
    @param file_disk_template: file-based disk template, whose directory
2572
        is supposed to be verified
2573
    @type verify_key: string
2574
    @param verify_key: key for the verification map of this file
2575
        verification step
2576
    @param error_key: error key to be added to the verification results
2577
        in case something goes wrong in this verification step
2578

2579
    """
2580
    assert (file_disk_template in utils.storage.GetDiskTemplatesOfStorageTypes(
2581
              constants.ST_FILE, constants.ST_SHARED_FILE
2582
           ))
2583

    
2584
    cluster = self.cfg.GetClusterInfo()
2585
    if cluster.IsDiskTemplateEnabled(file_disk_template):
2586
      self._ErrorIf(
2587
          verify_key in nresult,
2588
          error_key, ninfo.name,
2589
          "The configured %s storage path is unusable: %s" %
2590
          (file_disk_template, nresult.get(verify_key)))
2591

    
2592
  def _VerifyFileStoragePaths(self, ninfo, nresult):
2593
    """Verifies (file) storage paths.
2594

2595
    @see: C{_VerifyStoragePaths}
2596

2597
    """
2598
    self._VerifyStoragePaths(
2599
        ninfo, nresult, constants.DT_FILE,
2600
        constants.NV_FILE_STORAGE_PATH,
2601
        constants.CV_ENODEFILESTORAGEPATHUNUSABLE)
2602

    
2603
  def _VerifySharedFileStoragePaths(self, ninfo, nresult):
2604
    """Verifies (file) storage paths.
2605

2606
    @see: C{_VerifyStoragePaths}
2607

2608
    """
2609
    self._VerifyStoragePaths(
2610
        ninfo, nresult, constants.DT_SHARED_FILE,
2611
        constants.NV_SHARED_FILE_STORAGE_PATH,
2612
        constants.CV_ENODESHAREDFILESTORAGEPATHUNUSABLE)
2613

    
2614
  def _VerifyOob(self, ninfo, nresult):
2615
    """Verifies out of band functionality of a node.
2616

2617
    @type ninfo: L{objects.Node}
2618
    @param ninfo: the node to check
2619
    @param nresult: the remote results for the node
2620

2621
    """
2622
    # We just have to verify the paths on master and/or master candidates
2623
    # as the oob helper is invoked on the master
2624
    if ((ninfo.master_candidate or ninfo.master_capable) and
2625
        constants.NV_OOB_PATHS in nresult):
2626
      for path_result in nresult[constants.NV_OOB_PATHS]:
2627
        self._ErrorIf(path_result, constants.CV_ENODEOOBPATH,
2628
                      ninfo.name, path_result)
2629

    
2630
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2631
    """Verifies and updates the node volume data.
2632

2633
    This function will update a L{NodeImage}'s internal structures
2634
    with data from the remote call.
2635

2636
    @type ninfo: L{objects.Node}
2637
    @param ninfo: the node to check
2638
    @param nresult: the remote results for the node
2639
    @param nimg: the node image object
2640
    @param vg_name: the configured VG name
2641

2642
    """
2643
    nimg.lvm_fail = True
2644
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2645
    if vg_name is None:
2646
      pass
2647
    elif isinstance(lvdata, basestring):
2648
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2649
                    "LVM problem on node: %s", utils.SafeEncode(lvdata))
2650
    elif not isinstance(lvdata, dict):
2651
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2652
                    "rpc call to node failed (lvlist)")
2653
    else:
2654
      nimg.volumes = lvdata
2655
      nimg.lvm_fail = False
2656

    
2657
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2658
    """Verifies and updates the node instance list.
2659

2660
    If the listing was successful, then updates this node's instance
2661
    list. Otherwise, it marks the RPC call as failed for the instance
2662
    list key.
2663

2664
    @type ninfo: L{objects.Node}
2665
    @param ninfo: the node to check
2666
    @param nresult: the remote results for the node
2667
    @param nimg: the node image object
2668

2669
    """
2670
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2671
    test = not isinstance(idata, list)
2672
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2673
                  "rpc call to node failed (instancelist): %s",
2674
                  utils.SafeEncode(str(idata)))
2675
    if test:
2676
      nimg.hyp_fail = True
2677
    else:
2678
      nimg.instances = [inst.uuid for (_, inst) in
2679
                        self.cfg.GetMultiInstanceInfoByName(idata)]
2680

    
2681
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2682
    """Verifies and computes a node information map
2683

2684
    @type ninfo: L{objects.Node}
2685
    @param ninfo: the node to check
2686
    @param nresult: the remote results for the node
2687
    @param nimg: the node image object
2688
    @param vg_name: the configured VG name
2689

2690
    """
2691
    # try to read free memory (from the hypervisor)
2692
    hv_info = nresult.get(constants.NV_HVINFO, None)
2693
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2694
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2695
                  "rpc call to node failed (hvinfo)")
2696
    if not test:
2697
      try:
2698
        nimg.mfree = int(hv_info["memory_free"])
2699
      except (ValueError, TypeError):
2700
        self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2701
                      "node returned invalid nodeinfo, check hypervisor")
2702

    
2703
    # FIXME: devise a free space model for file based instances as well
2704
    if vg_name is not None:
2705
      test = (constants.NV_VGLIST not in nresult or
2706
              vg_name not in nresult[constants.NV_VGLIST])
2707
      self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
2708
                    "node didn't return data for the volume group '%s'"
2709
                    " - it is either missing or broken", vg_name)
2710
      if not test:
2711
        try:
2712
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2713
        except (ValueError, TypeError):
2714
          self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2715
                        "node returned invalid LVM info, check LVM status")
2716

    
2717
  def _CollectDiskInfo(self, node_uuids, node_image, instanceinfo):
2718
    """Gets per-disk status information for all instances.
2719

2720
    @type node_uuids: list of strings
2721
    @param node_uuids: Node UUIDs
2722
    @type node_image: dict of (UUID, L{objects.Node})
2723
    @param node_image: Node objects
2724
    @type instanceinfo: dict of (UUID, L{objects.Instance})
2725
    @param instanceinfo: Instance objects
2726
    @rtype: {instance: {node: [(succes, payload)]}}
2727
    @return: a dictionary of per-instance dictionaries with nodes as
2728
        keys and disk information as values; the disk information is a
2729
        list of tuples (success, payload)
2730

2731
    """
2732
    node_disks = {}
2733
    node_disks_dev_inst_only = {}
2734
    diskless_instances = set()
2735
    diskless = constants.DT_DISKLESS
2736

    
2737
    for nuuid in node_uuids:
2738
      node_inst_uuids = list(itertools.chain(node_image[nuuid].pinst,
2739
                                             node_image[nuuid].sinst))
2740
      diskless_instances.update(uuid for uuid in node_inst_uuids
2741
                                if instanceinfo[uuid].disk_template == diskless)
2742
      disks = [(inst_uuid, disk)
2743
               for inst_uuid in node_inst_uuids
2744
               for disk in instanceinfo[inst_uuid].disks]
2745

    
2746
      if not disks:
2747
        # No need to collect data
2748
        continue
2749

    
2750
      node_disks[nuuid] = disks
2751

    
2752
      # _AnnotateDiskParams makes already copies of the disks
2753
      dev_inst_only = []
2754
      for (inst_uuid, dev) in disks:
2755
        (anno_disk,) = AnnotateDiskParams(instanceinfo[inst_uuid], [dev],
2756
                                          self.cfg)
2757
        dev_inst_only.append((anno_disk, instanceinfo[inst_uuid]))
2758

    
2759
      node_disks_dev_inst_only[nuuid] = dev_inst_only
2760

    
2761
    assert len(node_disks) == len(node_disks_dev_inst_only)
2762

    
2763
    # Collect data from all nodes with disks
2764
    result = self.rpc.call_blockdev_getmirrorstatus_multi(
2765
               node_disks.keys(), node_disks_dev_inst_only)
2766

    
2767
    assert len(result) == len(node_disks)
2768

    
2769
    instdisk = {}
2770

    
2771
    for (nuuid, nres) in result.items():
2772
      node = self.cfg.GetNodeInfo(nuuid)
2773
      disks = node_disks[node.uuid]
2774

    
2775
      if nres.offline:
2776
        # No data from this node
2777
        data = len(disks) * [(False, "node offline")]
2778
      else:
2779
        msg = nres.fail_msg
2780
        self._ErrorIf(msg, constants.CV_ENODERPC, node.name,
2781
                      "while getting disk information: %s", msg)
2782
        if msg:
2783
          # No data from this node
2784
          data = len(disks) * [(False, msg)]
2785
        else:
2786
          data = []
2787
          for idx, i in enumerate(nres.payload):
2788
            if isinstance(i, (tuple, list)) and len(i) == 2:
2789
              data.append(i)
2790
            else:
2791
              logging.warning("Invalid result from node %s, entry %d: %s",
2792
                              node.name, idx, i)
2793
              data.append((False, "Invalid result from the remote node"))
2794

    
2795
      for ((inst_uuid, _), status) in zip(disks, data):
2796
        instdisk.setdefault(inst_uuid, {}).setdefault(node.uuid, []) \
2797
          .append(status)
2798

    
2799
    # Add empty entries for diskless instances.
2800
    for inst_uuid in diskless_instances:
2801
      assert inst_uuid not in instdisk
2802
      instdisk[inst_uuid] = {}
2803

    
2804
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2805
                      len(nuuids) <= len(instanceinfo[inst].all_nodes) and
2806
                      compat.all(isinstance(s, (tuple, list)) and
2807
                                 len(s) == 2 for s in statuses)
2808
                      for inst, nuuids in instdisk.items()
2809
                      for nuuid, statuses in nuuids.items())
2810
    if __debug__:
2811
      instdisk_keys = set(instdisk)
2812
      instanceinfo_keys = set(instanceinfo)
2813
      assert instdisk_keys == instanceinfo_keys, \
2814
        ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2815
         (instdisk_keys, instanceinfo_keys))
2816

    
2817
    return instdisk
2818

    
2819
  @staticmethod
2820
  def _SshNodeSelector(group_uuid, all_nodes):
2821
    """Create endless iterators for all potential SSH check hosts.
2822

2823
    """
2824
    nodes = [node for node in all_nodes
2825
             if (node.group != group_uuid and
2826
                 not node.offline)]
2827
    keyfunc = operator.attrgetter("group")
2828

    
2829
    return map(itertools.cycle,
2830
               [sorted(map(operator.attrgetter("name"), names))
2831
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2832
                                                  keyfunc)])
2833

    
2834
  @classmethod
2835
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2836
    """Choose which nodes should talk to which other nodes.
2837

2838
    We will make nodes contact all nodes in their group, and one node from
2839
    every other group.
2840

2841
    @warning: This algorithm has a known issue if one node group is much
2842
      smaller than others (e.g. just one node). In such a case all other
2843
      nodes will talk to the single node.
2844

2845
    """
2846
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2847
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
2848

    
2849
    return (online_nodes,
2850
            dict((name, sorted([i.next() for i in sel]))
2851
                 for name in online_nodes))
2852

    
2853
  def BuildHooksEnv(self):
2854
    """Build hooks env.
2855

2856
    Cluster-Verify hooks just ran in the post phase and their failure makes
2857
    the output be logged in the verify output and the verification to fail.
2858

2859
    """
2860
    env = {
2861
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2862
      }
2863

    
2864
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2865
               for node in self.my_node_info.values())
2866

    
2867
    return env
2868

    
2869
  def BuildHooksNodes(self):
2870
    """Build hooks nodes.
2871

2872
    """
2873
    return ([], list(self.my_node_info.keys()))
2874

    
2875
  def Exec(self, feedback_fn):
2876
    """Verify integrity of the node group, performing various test on nodes.
2877

2878
    """
2879
    # This method has too many local variables. pylint: disable=R0914
2880
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
2881

    
2882
    if not self.my_node_uuids:
2883
      # empty node group
2884
      feedback_fn("* Empty node group, skipping verification")
2885
      return True
2886

    
2887
    self.bad = False
2888
    verbose = self.op.verbose
2889
    self._feedback_fn = feedback_fn
2890

    
2891
    vg_name = self.cfg.GetVGName()
2892
    drbd_helper = self.cfg.GetDRBDHelper()
2893
    cluster = self.cfg.GetClusterInfo()
2894
    hypervisors = cluster.enabled_hypervisors
2895
    node_data_list = self.my_node_info.values()
2896

    
2897
    i_non_redundant = [] # Non redundant instances
2898
    i_non_a_balanced = [] # Non auto-balanced instances
2899
    i_offline = 0 # Count of offline instances
2900
    n_offline = 0 # Count of offline nodes
2901
    n_drained = 0 # Count of nodes being drained
2902
    node_vol_should = {}
2903

    
2904
    # FIXME: verify OS list
2905

    
2906
    # File verification
2907
    filemap = ComputeAncillaryFiles(cluster, False)
2908

    
2909
    # do local checksums
2910
    master_node_uuid = self.master_node = self.cfg.GetMasterNode()
2911
    master_ip = self.cfg.GetMasterIP()
2912

    
2913
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_uuids))
2914

    
2915
    user_scripts = []
2916
    if self.cfg.GetUseExternalMipScript():
2917
      user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2918

    
2919
    node_verify_param = {
2920
      constants.NV_FILELIST:
2921
        map(vcluster.MakeVirtualPath,
2922
            utils.UniqueSequence(filename
2923
                                 for files in filemap
2924
                                 for filename in files)),
2925
      constants.NV_NODELIST:
2926
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2927
                                  self.all_node_info.values()),
2928
      constants.NV_HYPERVISOR: hypervisors,
2929
      constants.NV_HVPARAMS:
2930
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2931
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2932
                                 for node in node_data_list
2933
                                 if not node.offline],
2934
      constants.NV_INSTANCELIST: hypervisors,
2935
      constants.NV_VERSION: None,
2936
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2937
      constants.NV_NODESETUP: None,
2938
      constants.NV_TIME: None,
2939
      constants.NV_MASTERIP: (self.cfg.GetMasterNodeName(), master_ip),
2940
      constants.NV_OSLIST: None,
2941
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2942
      constants.NV_USERSCRIPTS: user_scripts,
2943
      }
2944

    
2945
    if vg_name is not None:
2946
      node_verify_param[constants.NV_VGLIST] = None
2947
      node_verify_param[constants.NV_LVLIST] = vg_name
2948
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2949

    
2950
    if cluster.IsDiskTemplateEnabled(constants.DT_DRBD8):
2951
      if drbd_helper:
2952
        node_verify_param[constants.NV_DRBDVERSION] = None
2953
        node_verify_param[constants.NV_DRBDLIST] = None
2954
        node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2955

    
2956
    if cluster.IsFileStorageEnabled() or \
2957
        cluster.IsSharedFileStorageEnabled():
2958
      # Load file storage paths only from master node
2959
      node_verify_param[constants.NV_ACCEPTED_STORAGE_PATHS] = \
2960
        self.cfg.GetMasterNodeName()
2961
      if cluster.IsFileStorageEnabled():
2962
        node_verify_param[constants.NV_FILE_STORAGE_PATH] = \
2963
          cluster.file_storage_dir
2964

    
2965
    # bridge checks
2966
    # FIXME: this needs to be changed per node-group, not cluster-wide
2967
    bridges = set()
2968
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2969
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2970
      bridges.add(default_nicpp[constants.NIC_LINK])
2971
    for inst_uuid in self.my_inst_info.values():
2972
      for nic in inst_uuid.nics:
2973
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
2974
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2975
          bridges.add(full_nic[constants.NIC_LINK])
2976

    
2977
    if bridges:
2978
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
2979

    
2980
    # Build our expected cluster state
2981
    node_image = dict((node.uuid, self.NodeImage(offline=node.offline,
2982
                                                 uuid=node.uuid,
2983
                                                 vm_capable=node.vm_capable))
2984
                      for node in node_data_list)
2985

    
2986
    # Gather OOB paths
2987
    oob_paths = []
2988
    for node in self.all_node_info.values():
2989
      path = SupportsOob(self.cfg, node)
2990
      if path and path not in oob_paths:
2991
        oob_paths.append(path)
2992

    
2993
    if oob_paths:
2994
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2995

    
2996
    for inst_uuid in self.my_inst_uuids:
2997
      instance = self.my_inst_info[inst_uuid]
2998
      if instance.admin_state == constants.ADMINST_OFFLINE:
2999
        i_offline += 1
3000

    
3001
      for nuuid in instance.all_nodes:
3002
        if nuuid not in node_image:
3003
          gnode = self.NodeImage(uuid=nuuid)
3004
          gnode.ghost = (nuuid not in self.all_node_info)
3005
          node_image[nuuid] = gnode
3006

    
3007
      instance.MapLVsByNode(node_vol_should)
3008

    
3009
      pnode = instance.primary_node
3010
      node_image[pnode].pinst.append(instance.uuid)
3011

    
3012
      for snode in instance.secondary_nodes:
3013
        nimg = node_image[snode]
3014
        nimg.sinst.append(instance.uuid)
3015
        if pnode not in nimg.sbp:
3016
          nimg.sbp[pnode] = []
3017
        nimg.sbp[pnode].append(instance.uuid)
3018

    
3019
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
3020
                                               self.my_node_info.keys())
3021
    # The value of exclusive_storage should be the same across the group, so if
3022
    # it's True for at least a node, we act as if it were set for all the nodes
3023
    self._exclusive_storage = compat.any(es_flags.values())
3024
    if self._exclusive_storage:
3025
      node_verify_param[constants.NV_EXCLUSIVEPVS] = True
3026

    
3027
    node_group_uuids = dict(map(lambda n: (n.name, n.group),
3028
                                self.cfg.GetAllNodesInfo().values()))
3029
    groups_config = self.cfg.GetAllNodeGroupsInfoDict()
3030

    
3031
    # At this point, we have the in-memory data structures complete,
3032
    # except for the runtime information, which we'll gather next
3033

    
3034
    # Due to the way our RPC system works, exact response times cannot be
3035
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
3036
    # time before and after executing the request, we can at least have a time
3037
    # window.
3038
    nvinfo_starttime = time.time()
3039
    all_nvinfo = self.rpc.call_node_verify(self.my_node_uuids,
3040
                                           node_verify_param,
3041
                                           self.cfg.GetClusterName(),
3042
                                           self.cfg.GetClusterInfo().hvparams,
3043
                                           node_group_uuids,
3044
                                           groups_config)
3045
    nvinfo_endtime = time.time()
3046

    
3047
    if self.extra_lv_nodes and vg_name is not None:
3048
      extra_lv_nvinfo = \
3049
          self.rpc.call_node_verify(self.extra_lv_nodes,
3050
                                    {constants.NV_LVLIST: vg_name},
3051
                                    self.cfg.GetClusterName(),
3052
                                    self.cfg.GetClusterInfo().hvparams,
3053
                                    node_group_uuids,
3054
                                    groups_config)
3055
    else:
3056
      extra_lv_nvinfo = {}
3057

    
3058
    all_drbd_map = self.cfg.ComputeDRBDMap()
3059

    
3060
    feedback_fn("* Gathering disk information (%s nodes)" %
3061
                len(self.my_node_uuids))
3062
    instdisk = self._CollectDiskInfo(self.my_node_info.keys(), node_image,
3063
                                     self.my_inst_info)
3064

    
3065
    feedback_fn("* Verifying configuration file consistency")
3066

    
3067
    # If not all nodes are being checked, we need to make sure the master node
3068
    # and a non-checked vm_capable node are in the list.
3069
    absent_node_uuids = set(self.all_node_info).difference(self.my_node_info)
3070
    if absent_node_uuids:
3071
      vf_nvinfo = all_nvinfo.copy()
3072
      vf_node_info = list(self.my_node_info.values())
3073
      additional_node_uuids = []
3074
      if master_node_uuid not in self.my_node_info:
3075
        additional_node_uuids.append(master_node_uuid)
3076
        vf_node_info.append(self.all_node_info[master_node_uuid])
3077
      # Add the first vm_capable node we find which is not included,
3078
      # excluding the master node (which we already have)
3079
      for node_uuid in absent_node_uuids:
3080
        nodeinfo = self.all_node_info[node_uuid]
3081
        if (nodeinfo.vm_capable and not nodeinfo.offline and
3082
            node_uuid != master_node_uuid):
3083
          additional_node_uuids.append(node_uuid)
3084
          vf_node_info.append(self.all_node_info[node_uuid])
3085
          break
3086
      key = constants.NV_FILELIST
3087
      vf_nvinfo.update(self.rpc.call_node_verify(
3088
         additional_node_uuids, {key: node_verify_param[key]},
3089
         self.cfg.GetClusterName(), self.cfg.GetClusterInfo().hvparams,
3090
         node_group_uuids,
3091
         groups_config))
3092
    else:
3093
      vf_nvinfo = all_nvinfo
3094
      vf_node_info = self.my_node_info.values()
3095

    
3096
    self._VerifyFiles(vf_node_info, master_node_uuid, vf_nvinfo, filemap)
3097

    
3098
    feedback_fn("* Verifying node status")
3099

    
3100
    refos_img = None
3101

    
3102
    for node_i in node_data_list:
3103
      nimg = node_image[node_i.uuid]
3104

    
3105
      if node_i.offline:
3106
        if verbose:
3107
          feedback_fn("* Skipping offline node %s" % (node_i.name,))
3108
        n_offline += 1
3109
        continue
3110

    
3111
      if node_i.uuid == master_node_uuid:
3112
        ntype = "master"
3113
      elif node_i.master_candidate:
3114
        ntype = "master candidate"
3115
      elif node_i.drained:
3116
        ntype = "drained"
3117
        n_drained += 1
3118
      else:
3119
        ntype = "regular"
3120
      if verbose:
3121
        feedback_fn("* Verifying node %s (%s)" % (node_i.name, ntype))
3122

    
3123
      msg = all_nvinfo[node_i.uuid].fail_msg
3124
      self._ErrorIf(msg, constants.CV_ENODERPC, node_i.name,
3125
                    "while contacting node: %s", msg)
3126
      if msg:
3127
        nimg.rpc_fail = True
3128
        continue
3129

    
3130
      nresult = all_nvinfo[node_i.uuid].payload
3131

    
3132
      nimg.call_ok = self._VerifyNode(node_i, nresult)
3133
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
3134
      self._VerifyNodeNetwork(node_i, nresult)
3135
      self._VerifyNodeUserScripts(node_i, nresult)
3136
      self._VerifyOob(node_i, nresult)
3137
      self._VerifyAcceptedFileStoragePaths(node_i, nresult,
3138
                                           node_i.uuid == master_node_uuid)
3139
      self._VerifyFileStoragePaths(node_i, nresult)
3140
      self._VerifySharedFileStoragePaths(node_i, nresult)
3141

    
3142
      if nimg.vm_capable:
3143
        self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
3144
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
3145
                             all_drbd_map)
3146

    
3147
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
3148
        self._UpdateNodeInstances(node_i, nresult, nimg)
3149
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
3150
        self._UpdateNodeOS(node_i, nresult, nimg)
3151

    
3152
        if not nimg.os_fail:
3153
          if refos_img is None:
3154
            refos_img = nimg
3155
          self._VerifyNodeOS(node_i, nimg, refos_img)
3156
        self._VerifyNodeBridges(node_i, nresult, bridges)
3157

    
3158
        # Check whether all running instances are primary for the node. (This
3159
        # can no longer be done from _VerifyInstance below, since some of the
3160
        # wrong instances could be from other node groups.)
3161
        non_primary_inst_uuids = set(nimg.instances).difference(nimg.pinst)
3162

    
3163
        for inst_uuid in non_primary_inst_uuids:
3164
          test = inst_uuid in self.all_inst_info
3165
          self._ErrorIf(test, constants.CV_EINSTANCEWRONGNODE,
3166
                        self.cfg.GetInstanceName(inst_uuid),
3167
                        "instance should not run on node %s", node_i.name)
3168
          self._ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
3169
                        "node is running unknown instance %s", inst_uuid)
3170

    
3171
    self._VerifyGroupDRBDVersion(all_nvinfo)
3172
    self._VerifyGroupLVM(node_image, vg_name)
3173

    
3174
    for node_uuid, result in extra_lv_nvinfo.items():
3175
      self._UpdateNodeVolumes(self.all_node_info[node_uuid], result.payload,
3176
                              node_image[node_uuid], vg_name)
3177

    
3178
    feedback_fn("* Verifying instance status")
3179
    for inst_uuid in self.my_inst_uuids:
3180
      instance = self.my_inst_info[inst_uuid]
3181
      if verbose:
3182
        feedback_fn("* Verifying instance %s" % instance.name)
3183
      self._VerifyInstance(instance, node_image, instdisk[inst_uuid])
3184

    
3185
      # If the instance is non-redundant we cannot survive losing its primary
3186
      # node, so we are not N+1 compliant.
3187
      if instance.disk_template not in constants.DTS_MIRRORED:
3188
        i_non_redundant.append(instance)
3189

    
3190
      if not cluster.FillBE(instance)[constants.BE_AUTO_BALANCE]:
3191
        i_non_a_balanced.append(instance)
3192

    
3193
    feedback_fn("* Verifying orphan volumes")
3194
    reserved = utils.FieldSet(*cluster.reserved_lvs)
3195

    
3196
    # We will get spurious "unknown volume" warnings if any node of this group
3197
    # is secondary for an instance whose primary is in another group. To avoid
3198
    # them, we find these instances and add their volumes to node_vol_should.
3199
    for instance in self.all_inst_info.values():
3200
      for secondary in instance.secondary_nodes:
3201
        if (secondary in self.my_node_info
3202
            and instance.name not in self.my_inst_info):
3203
          instance.MapLVsByNode(node_vol_should)
3204
          break
3205

    
3206
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
3207

    
3208
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
3209
      feedback_fn("* Verifying N+1 Memory redundancy")
3210
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
3211

    
3212
    feedback_fn("* Other Notes")
3213
    if i_non_redundant:
3214
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
3215
                  % len(i_non_redundant))
3216

    
3217
    if i_non_a_balanced:
3218
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
3219
                  % len(i_non_a_balanced))
3220

    
3221
    if i_offline:
3222
      feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
3223

    
3224
    if n_offline:
3225
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
3226

    
3227
    if n_drained:
3228
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
3229

    
3230
    return not self.bad
3231

    
3232
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
3233
    """Analyze the post-hooks' result
3234

3235
    This method analyses the hook result, handles it, and sends some
3236
    nicely-formatted feedback back to the user.
3237

3238
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
3239
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
3240
    @param hooks_results: the results of the multi-node hooks rpc call
3241
    @param feedback_fn: function used send feedback back to the caller
3242
    @param lu_result: previous Exec result
3243
    @return: the new Exec result, based on the previous result
3244
        and hook results
3245

3246
    """
3247
    # We only really run POST phase hooks, only for non-empty groups,
3248
    # and are only interested in their results
3249
    if not self.my_node_uuids:
3250
      # empty node group
3251
      pass
3252
    elif phase == constants.HOOKS_PHASE_POST:
3253
      # Used to change hooks' output to proper indentation
3254
      feedback_fn("* Hooks Results")
3255
      assert hooks_results, "invalid result from hooks"
3256

    
3257
      for node_name in hooks_results:
3258
        res = hooks_results[node_name]
3259
        msg = res.fail_msg
3260
        test = msg and not res.offline
3261
        self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3262
                      "Communication failure in hooks execution: %s", msg)
3263
        if res.offline or msg:
3264
          # No need to investigate payload if node is offline or gave
3265
          # an error.
3266
          continue
3267
        for script, hkr, output in res.payload:
3268
          test = hkr == constants.HKR_FAIL
3269
          self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3270
                        "Script %s failed, output:", script)
3271
          if test:
3272
            output = self._HOOKS_INDENT_RE.sub("      ", output)
3273
            feedback_fn("%s" % output)
3274
            lu_result = False
3275

    
3276
    return lu_result
3277

    
3278

    
3279
class LUClusterVerifyDisks(NoHooksLU):
3280
  """Verifies the cluster disks status.
3281

3282
  """
3283
  REQ_BGL = False
3284

    
3285
  def ExpandNames(self):
3286
    self.share_locks = ShareAll()
3287
    self.needed_locks = {
3288
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
3289
      }
3290

    
3291
  def Exec(self, feedback_fn):
3292
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3293

    
3294
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3295
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3296
                           for group in group_names])