Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / cluster.py @ b0e8ed3f

History | View | Annotate | Download (122.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with the cluster."""
23

    
24
import OpenSSL
25

    
26
import copy
27
import itertools
28
import logging
29
import operator
30
import os
31
import re
32
import time
33

    
34
from ganeti import compat
35
from ganeti import constants
36
from ganeti import errors
37
from ganeti import hypervisor
38
from ganeti import locking
39
from ganeti import masterd
40
from ganeti import netutils
41
from ganeti import objects
42
from ganeti import opcodes
43
from ganeti import pathutils
44
from ganeti import query
45
from ganeti import rpc
46
from ganeti import runtime
47
from ganeti import ssh
48
from ganeti import uidpool
49
from ganeti import utils
50
from ganeti import vcluster
51

    
52
from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
53
  ResultWithJobs
54
from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55
  ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56
  GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57
  GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58
  CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59
  ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob, \
60
  CheckIpolicyVsDiskTemplates, CheckDiskAccessModeValidity, \
61
  CheckDiskAccessModeConsistency
62

    
63
import ganeti.masterd.instance
64

    
65

    
66
class LUClusterActivateMasterIp(NoHooksLU):
67
  """Activate the master IP on the master node.
68

69
  """
70
  def Exec(self, feedback_fn):
71
    """Activate the master IP.
72

73
    """
74
    master_params = self.cfg.GetMasterNetworkParameters()
75
    ems = self.cfg.GetUseExternalMipScript()
76
    result = self.rpc.call_node_activate_master_ip(master_params.uuid,
77
                                                   master_params, ems)
78
    result.Raise("Could not activate the master IP")
79

    
80

    
81
class LUClusterDeactivateMasterIp(NoHooksLU):
82
  """Deactivate the master IP on the master node.
83

84
  """
85
  def Exec(self, feedback_fn):
86
    """Deactivate the master IP.
87

88
    """
89
    master_params = self.cfg.GetMasterNetworkParameters()
90
    ems = self.cfg.GetUseExternalMipScript()
91
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
92
                                                     master_params, ems)
93
    result.Raise("Could not deactivate the master IP")
94

    
95

    
96
class LUClusterConfigQuery(NoHooksLU):
97
  """Return configuration values.
98

99
  """
100
  REQ_BGL = False
101

    
102
  def CheckArguments(self):
103
    self.cq = ClusterQuery(None, self.op.output_fields, False)
104

    
105
  def ExpandNames(self):
106
    self.cq.ExpandNames(self)
107

    
108
  def DeclareLocks(self, level):
109
    self.cq.DeclareLocks(self, level)
110

    
111
  def Exec(self, feedback_fn):
112
    result = self.cq.OldStyleQuery(self)
113

    
114
    assert len(result) == 1
115

    
116
    return result[0]
117

    
118

    
119
class LUClusterDestroy(LogicalUnit):
120
  """Logical unit for destroying the cluster.
121

122
  """
123
  HPATH = "cluster-destroy"
124
  HTYPE = constants.HTYPE_CLUSTER
125

    
126
  def BuildHooksEnv(self):
127
    """Build hooks env.
128

129
    """
130
    return {
131
      "OP_TARGET": self.cfg.GetClusterName(),
132
      }
133

    
134
  def BuildHooksNodes(self):
135
    """Build hooks nodes.
136

137
    """
138
    return ([], [])
139

    
140
  def CheckPrereq(self):
141
    """Check prerequisites.
142

143
    This checks whether the cluster is empty.
144

145
    Any errors are signaled by raising errors.OpPrereqError.
146

147
    """
148
    master = self.cfg.GetMasterNode()
149

    
150
    nodelist = self.cfg.GetNodeList()
151
    if len(nodelist) != 1 or nodelist[0] != master:
152
      raise errors.OpPrereqError("There are still %d node(s) in"
153
                                 " this cluster." % (len(nodelist) - 1),
154
                                 errors.ECODE_INVAL)
155
    instancelist = self.cfg.GetInstanceList()
156
    if instancelist:
157
      raise errors.OpPrereqError("There are still %d instance(s) in"
158
                                 " this cluster." % len(instancelist),
159
                                 errors.ECODE_INVAL)
160

    
161
  def Exec(self, feedback_fn):
162
    """Destroys the cluster.
163

164
    """
165
    master_params = self.cfg.GetMasterNetworkParameters()
166

    
167
    # Run post hooks on master node before it's removed
168
    RunPostHook(self, self.cfg.GetNodeName(master_params.uuid))
169

    
170
    ems = self.cfg.GetUseExternalMipScript()
171
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
172
                                                     master_params, ems)
173
    result.Warn("Error disabling the master IP address", self.LogWarning)
174
    return master_params.uuid
175

    
176

    
177
class LUClusterPostInit(LogicalUnit):
178
  """Logical unit for running hooks after cluster initialization.
179

180
  """
181
  HPATH = "cluster-init"
182
  HTYPE = constants.HTYPE_CLUSTER
183

    
184
  def CheckArguments(self):
185
    self.master_uuid = self.cfg.GetMasterNode()
186
    self.master_ndparams = self.cfg.GetNdParams(self.cfg.GetMasterNodeInfo())
187

    
188
    # TODO: When Issue 584 is solved, and None is properly parsed when used
189
    # as a default value, ndparams.get(.., None) can be changed to
190
    # ndparams[..] to access the values directly
191

    
192
    # OpenvSwitch: Warn user if link is missing
193
    if (self.master_ndparams[constants.ND_OVS] and not
194
        self.master_ndparams.get(constants.ND_OVS_LINK, None)):
195
      self.LogInfo("No physical interface for OpenvSwitch was given."
196
                   " OpenvSwitch will not have an outside connection. This"
197
                   " might not be what you want.")
198

    
199
  def BuildHooksEnv(self):
200
    """Build hooks env.
201

202
    """
203
    return {
204
      "OP_TARGET": self.cfg.GetClusterName(),
205
      }
206

    
207
  def BuildHooksNodes(self):
208
    """Build hooks nodes.
209

210
    """
211
    return ([], [self.cfg.GetMasterNode()])
212

    
213
  def Exec(self, feedback_fn):
214
    """Create and configure Open vSwitch
215

216
    """
217
    if self.master_ndparams[constants.ND_OVS]:
218
      result = self.rpc.call_node_configure_ovs(
219
                 self.master_uuid,
220
                 self.master_ndparams[constants.ND_OVS_NAME],
221
                 self.master_ndparams.get(constants.ND_OVS_LINK, None))
222
      result.Raise("Could not successully configure Open vSwitch")
223
    return True
224

    
225

    
226
class ClusterQuery(QueryBase):
227
  FIELDS = query.CLUSTER_FIELDS
228

    
229
  #: Do not sort (there is only one item)
230
  SORT_FIELD = None
231

    
232
  def ExpandNames(self, lu):
233
    lu.needed_locks = {}
234

    
235
    # The following variables interact with _QueryBase._GetNames
236
    self.wanted = locking.ALL_SET
237
    self.do_locking = self.use_locking
238

    
239
    if self.do_locking:
240
      raise errors.OpPrereqError("Can not use locking for cluster queries",
241
                                 errors.ECODE_INVAL)
242

    
243
  def DeclareLocks(self, lu, level):
244
    pass
245

    
246
  def _GetQueryData(self, lu):
247
    """Computes the list of nodes and their attributes.
248

249
    """
250
    # Locking is not used
251
    assert not (compat.any(lu.glm.is_owned(level)
252
                           for level in locking.LEVELS
253
                           if level != locking.LEVEL_CLUSTER) or
254
                self.do_locking or self.use_locking)
255

    
256
    if query.CQ_CONFIG in self.requested_data:
257
      cluster = lu.cfg.GetClusterInfo()
258
      nodes = lu.cfg.GetAllNodesInfo()
259
    else:
260
      cluster = NotImplemented
261
      nodes = NotImplemented
262

    
263
    if query.CQ_QUEUE_DRAINED in self.requested_data:
264
      drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
265
    else:
266
      drain_flag = NotImplemented
267

    
268
    if query.CQ_WATCHER_PAUSE in self.requested_data:
269
      master_node_uuid = lu.cfg.GetMasterNode()
270

    
271
      result = lu.rpc.call_get_watcher_pause(master_node_uuid)
272
      result.Raise("Can't retrieve watcher pause from master node '%s'" %
273
                   lu.cfg.GetMasterNodeName())
274

    
275
      watcher_pause = result.payload
276
    else:
277
      watcher_pause = NotImplemented
278

    
279
    return query.ClusterQueryData(cluster, nodes, drain_flag, watcher_pause)
280

    
281

    
282
class LUClusterQuery(NoHooksLU):
283
  """Query cluster configuration.
284

285
  """
286
  REQ_BGL = False
287

    
288
  def ExpandNames(self):
289
    self.needed_locks = {}
290

    
291
  def Exec(self, feedback_fn):
292
    """Return cluster config.
293

294
    """
295
    cluster = self.cfg.GetClusterInfo()
296
    os_hvp = {}
297

    
298
    # Filter just for enabled hypervisors
299
    for os_name, hv_dict in cluster.os_hvp.items():
300
      os_hvp[os_name] = {}
301
      for hv_name, hv_params in hv_dict.items():
302
        if hv_name in cluster.enabled_hypervisors:
303
          os_hvp[os_name][hv_name] = hv_params
304

    
305
    # Convert ip_family to ip_version
306
    primary_ip_version = constants.IP4_VERSION
307
    if cluster.primary_ip_family == netutils.IP6Address.family:
308
      primary_ip_version = constants.IP6_VERSION
309

    
310
    result = {
311
      "software_version": constants.RELEASE_VERSION,
312
      "protocol_version": constants.PROTOCOL_VERSION,
313
      "config_version": constants.CONFIG_VERSION,
314
      "os_api_version": max(constants.OS_API_VERSIONS),
315
      "export_version": constants.EXPORT_VERSION,
316
      "vcs_version": constants.VCS_VERSION,
317
      "architecture": runtime.GetArchInfo(),
318
      "name": cluster.cluster_name,
319
      "master": self.cfg.GetMasterNodeName(),
320
      "default_hypervisor": cluster.primary_hypervisor,
321
      "enabled_hypervisors": cluster.enabled_hypervisors,
322
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
323
                        for hypervisor_name in cluster.enabled_hypervisors]),
324
      "os_hvp": os_hvp,
325
      "beparams": cluster.beparams,
326
      "osparams": cluster.osparams,
327
      "ipolicy": cluster.ipolicy,
328
      "nicparams": cluster.nicparams,
329
      "ndparams": cluster.ndparams,
330
      "diskparams": cluster.diskparams,
331
      "candidate_pool_size": cluster.candidate_pool_size,
332
      "master_netdev": cluster.master_netdev,
333
      "master_netmask": cluster.master_netmask,
334
      "use_external_mip_script": cluster.use_external_mip_script,
335
      "volume_group_name": cluster.volume_group_name,
336
      "drbd_usermode_helper": cluster.drbd_usermode_helper,
337
      "file_storage_dir": cluster.file_storage_dir,
338
      "shared_file_storage_dir": cluster.shared_file_storage_dir,
339
      "maintain_node_health": cluster.maintain_node_health,
340
      "ctime": cluster.ctime,
341
      "mtime": cluster.mtime,
342
      "uuid": cluster.uuid,
343
      "tags": list(cluster.GetTags()),
344
      "uid_pool": cluster.uid_pool,
345
      "default_iallocator": cluster.default_iallocator,
346
      "default_iallocator_params": cluster.default_iallocator_params,
347
      "reserved_lvs": cluster.reserved_lvs,
348
      "primary_ip_version": primary_ip_version,
349
      "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
350
      "hidden_os": cluster.hidden_os,
351
      "blacklisted_os": cluster.blacklisted_os,
352
      "enabled_disk_templates": cluster.enabled_disk_templates,
353
      }
354

    
355
    return result
356

    
357

    
358
class LUClusterRedistConf(NoHooksLU):
359
  """Force the redistribution of cluster configuration.
360

361
  This is a very simple LU.
362

363
  """
364
  REQ_BGL = False
365

    
366
  def ExpandNames(self):
367
    self.needed_locks = {
368
      locking.LEVEL_NODE: locking.ALL_SET,
369
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
370
    }
371
    self.share_locks = ShareAll()
372

    
373
  def Exec(self, feedback_fn):
374
    """Redistribute the configuration.
375

376
    """
377
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
378
    RedistributeAncillaryFiles(self)
379

    
380

    
381
class LUClusterRename(LogicalUnit):
382
  """Rename the cluster.
383

384
  """
385
  HPATH = "cluster-rename"
386
  HTYPE = constants.HTYPE_CLUSTER
387

    
388
  def BuildHooksEnv(self):
389
    """Build hooks env.
390

391
    """
392
    return {
393
      "OP_TARGET": self.cfg.GetClusterName(),
394
      "NEW_NAME": self.op.name,
395
      }
396

    
397
  def BuildHooksNodes(self):
398
    """Build hooks nodes.
399

400
    """
401
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
402

    
403
  def CheckPrereq(self):
404
    """Verify that the passed name is a valid one.
405

406
    """
407
    hostname = netutils.GetHostname(name=self.op.name,
408
                                    family=self.cfg.GetPrimaryIPFamily())
409

    
410
    new_name = hostname.name
411
    self.ip = new_ip = hostname.ip
412
    old_name = self.cfg.GetClusterName()
413
    old_ip = self.cfg.GetMasterIP()
414
    if new_name == old_name and new_ip == old_ip:
415
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
416
                                 " cluster has changed",
417
                                 errors.ECODE_INVAL)
418
    if new_ip != old_ip:
419
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
420
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
421
                                   " reachable on the network" %
422
                                   new_ip, errors.ECODE_NOTUNIQUE)
423

    
424
    self.op.name = new_name
425

    
426
  def Exec(self, feedback_fn):
427
    """Rename the cluster.
428

429
    """
430
    clustername = self.op.name
431
    new_ip = self.ip
432

    
433
    # shutdown the master IP
434
    master_params = self.cfg.GetMasterNetworkParameters()
435
    ems = self.cfg.GetUseExternalMipScript()
436
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
437
                                                     master_params, ems)
438
    result.Raise("Could not disable the master role")
439

    
440
    try:
441
      cluster = self.cfg.GetClusterInfo()
442
      cluster.cluster_name = clustername
443
      cluster.master_ip = new_ip
444
      self.cfg.Update(cluster, feedback_fn)
445

    
446
      # update the known hosts file
447
      ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
448
      node_list = self.cfg.GetOnlineNodeList()
449
      try:
450
        node_list.remove(master_params.uuid)
451
      except ValueError:
452
        pass
453
      UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
454
    finally:
455
      master_params.ip = new_ip
456
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
457
                                                     master_params, ems)
458
      result.Warn("Could not re-enable the master role on the master,"
459
                  " please restart manually", self.LogWarning)
460

    
461
    return clustername
462

    
463

    
464
class LUClusterRepairDiskSizes(NoHooksLU):
465
  """Verifies the cluster disks sizes.
466

467
  """
468
  REQ_BGL = False
469

    
470
  def ExpandNames(self):
471
    if self.op.instances:
472
      (_, self.wanted_names) = GetWantedInstances(self, self.op.instances)
473
      # Not getting the node allocation lock as only a specific set of
474
      # instances (and their nodes) is going to be acquired
475
      self.needed_locks = {
476
        locking.LEVEL_NODE_RES: [],
477
        locking.LEVEL_INSTANCE: self.wanted_names,
478
        }
479
      self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
480
    else:
481
      self.wanted_names = None
482
      self.needed_locks = {
483
        locking.LEVEL_NODE_RES: locking.ALL_SET,
484
        locking.LEVEL_INSTANCE: locking.ALL_SET,
485

    
486
        # This opcode is acquires the node locks for all instances
487
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
488
        }
489

    
490
    self.share_locks = {
491
      locking.LEVEL_NODE_RES: 1,
492
      locking.LEVEL_INSTANCE: 0,
493
      locking.LEVEL_NODE_ALLOC: 1,
494
      }
495

    
496
  def DeclareLocks(self, level):
497
    if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
498
      self._LockInstancesNodes(primary_only=True, level=level)
499

    
500
  def CheckPrereq(self):
501
    """Check prerequisites.
502

503
    This only checks the optional instance list against the existing names.
504

505
    """
506
    if self.wanted_names is None:
507
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
508

    
509
    self.wanted_instances = \
510
        map(compat.snd, self.cfg.GetMultiInstanceInfoByName(self.wanted_names))
511

    
512
  def _EnsureChildSizes(self, disk):
513
    """Ensure children of the disk have the needed disk size.
514

515
    This is valid mainly for DRBD8 and fixes an issue where the
516
    children have smaller disk size.
517

518
    @param disk: an L{ganeti.objects.Disk} object
519

520
    """
521
    if disk.dev_type == constants.DT_DRBD8:
522
      assert disk.children, "Empty children for DRBD8?"
523
      fchild = disk.children[0]
524
      mismatch = fchild.size < disk.size
525
      if mismatch:
526
        self.LogInfo("Child disk has size %d, parent %d, fixing",
527
                     fchild.size, disk.size)
528
        fchild.size = disk.size
529

    
530
      # and we recurse on this child only, not on the metadev
531
      return self._EnsureChildSizes(fchild) or mismatch
532
    else:
533
      return False
534

    
535
  def Exec(self, feedback_fn):
536
    """Verify the size of cluster disks.
537

538
    """
539
    # TODO: check child disks too
540
    # TODO: check differences in size between primary/secondary nodes
541
    per_node_disks = {}
542
    for instance in self.wanted_instances:
543
      pnode = instance.primary_node
544
      if pnode not in per_node_disks:
545
        per_node_disks[pnode] = []
546
      for idx, disk in enumerate(instance.disks):
547
        per_node_disks[pnode].append((instance, idx, disk))
548

    
549
    assert not (frozenset(per_node_disks.keys()) -
550
                self.owned_locks(locking.LEVEL_NODE_RES)), \
551
      "Not owning correct locks"
552
    assert not self.owned_locks(locking.LEVEL_NODE)
553

    
554
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
555
                                               per_node_disks.keys())
556

    
557
    changed = []
558
    for node_uuid, dskl in per_node_disks.items():
559
      if not dskl:
560
        # no disks on the node
561
        continue
562

    
563
      newl = [([v[2].Copy()], v[0]) for v in dskl]
564
      node_name = self.cfg.GetNodeName(node_uuid)
565
      result = self.rpc.call_blockdev_getdimensions(node_uuid, newl)
566
      if result.fail_msg:
567
        self.LogWarning("Failure in blockdev_getdimensions call to node"
568
                        " %s, ignoring", node_name)
569
        continue
570
      if len(result.payload) != len(dskl):
571
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
572
                        " result.payload=%s", node_name, len(dskl),
573
                        result.payload)
574
        self.LogWarning("Invalid result from node %s, ignoring node results",
575
                        node_name)
576
        continue
577
      for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
578
        if dimensions is None:
579
          self.LogWarning("Disk %d of instance %s did not return size"
580
                          " information, ignoring", idx, instance.name)
581
          continue
582
        if not isinstance(dimensions, (tuple, list)):
583
          self.LogWarning("Disk %d of instance %s did not return valid"
584
                          " dimension information, ignoring", idx,
585
                          instance.name)
586
          continue
587
        (size, spindles) = dimensions
588
        if not isinstance(size, (int, long)):
589
          self.LogWarning("Disk %d of instance %s did not return valid"
590
                          " size information, ignoring", idx, instance.name)
591
          continue
592
        size = size >> 20
593
        if size != disk.size:
594
          self.LogInfo("Disk %d of instance %s has mismatched size,"
595
                       " correcting: recorded %d, actual %d", idx,
596
                       instance.name, disk.size, size)
597
          disk.size = size
598
          self.cfg.Update(instance, feedback_fn)
599
          changed.append((instance.name, idx, "size", size))
600
        if es_flags[node_uuid]:
601
          if spindles is None:
602
            self.LogWarning("Disk %d of instance %s did not return valid"
603
                            " spindles information, ignoring", idx,
604
                            instance.name)
605
          elif disk.spindles is None or disk.spindles != spindles:
606
            self.LogInfo("Disk %d of instance %s has mismatched spindles,"
607
                         " correcting: recorded %s, actual %s",
608
                         idx, instance.name, disk.spindles, spindles)
609
            disk.spindles = spindles
610
            self.cfg.Update(instance, feedback_fn)
611
            changed.append((instance.name, idx, "spindles", disk.spindles))
612
        if self._EnsureChildSizes(disk):
613
          self.cfg.Update(instance, feedback_fn)
614
          changed.append((instance.name, idx, "size", disk.size))
615
    return changed
616

    
617

    
618
def _ValidateNetmask(cfg, netmask):
619
  """Checks if a netmask is valid.
620

621
  @type cfg: L{config.ConfigWriter}
622
  @param cfg: The cluster configuration
623
  @type netmask: int
624
  @param netmask: the netmask to be verified
625
  @raise errors.OpPrereqError: if the validation fails
626

627
  """
628
  ip_family = cfg.GetPrimaryIPFamily()
629
  try:
630
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
631
  except errors.ProgrammerError:
632
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
633
                               ip_family, errors.ECODE_INVAL)
634
  if not ipcls.ValidateNetmask(netmask):
635
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
636
                               (netmask), errors.ECODE_INVAL)
637

    
638

    
639
def CheckFileBasedStoragePathVsEnabledDiskTemplates(
640
    logging_warn_fn, file_storage_dir, enabled_disk_templates,
641
    file_disk_template):
642
  """Checks whether the given file-based storage directory is acceptable.
643

644
  Note: This function is public, because it is also used in bootstrap.py.
645

646
  @type logging_warn_fn: function
647
  @param logging_warn_fn: function which accepts a string and logs it
648
  @type file_storage_dir: string
649
  @param file_storage_dir: the directory to be used for file-based instances
650
  @type enabled_disk_templates: list of string
651
  @param enabled_disk_templates: the list of enabled disk templates
652
  @type file_disk_template: string
653
  @param file_disk_template: the file-based disk template for which the
654
      path should be checked
655

656
  """
657
  assert (file_disk_template in
658
          utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
659
  file_storage_enabled = file_disk_template in enabled_disk_templates
660
  if file_storage_dir is not None:
661
    if file_storage_dir == "":
662
      if file_storage_enabled:
663
        raise errors.OpPrereqError(
664
            "Unsetting the '%s' storage directory while having '%s' storage"
665
            " enabled is not permitted." %
666
            (file_disk_template, file_disk_template))
667
    else:
668
      if not file_storage_enabled:
669
        logging_warn_fn(
670
            "Specified a %s storage directory, although %s storage is not"
671
            " enabled." % (file_disk_template, file_disk_template))
672
  else:
673
    raise errors.ProgrammerError("Received %s storage dir with value"
674
                                 " 'None'." % file_disk_template)
675

    
676

    
677
def CheckFileStoragePathVsEnabledDiskTemplates(
678
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
679
  """Checks whether the given file storage directory is acceptable.
680

681
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
682

683
  """
684
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
685
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
686
      constants.DT_FILE)
687

    
688

    
689
def CheckSharedFileStoragePathVsEnabledDiskTemplates(
690
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
691
  """Checks whether the given shared file storage directory is acceptable.
692

693
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
694

695
  """
696
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
697
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
698
      constants.DT_SHARED_FILE)
699

    
700

    
701
class LUClusterSetParams(LogicalUnit):
702
  """Change the parameters of the cluster.
703

704
  """
705
  HPATH = "cluster-modify"
706
  HTYPE = constants.HTYPE_CLUSTER
707
  REQ_BGL = False
708

    
709
  def CheckArguments(self):
710
    """Check parameters
711

712
    """
713
    if self.op.uid_pool:
714
      uidpool.CheckUidPool(self.op.uid_pool)
715

    
716
    if self.op.add_uids:
717
      uidpool.CheckUidPool(self.op.add_uids)
718

    
719
    if self.op.remove_uids:
720
      uidpool.CheckUidPool(self.op.remove_uids)
721

    
722
    if self.op.master_netmask is not None:
723
      _ValidateNetmask(self.cfg, self.op.master_netmask)
724

    
725
    if self.op.diskparams:
726
      for dt_params in self.op.diskparams.values():
727
        utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
728
      try:
729
        utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
730
        CheckDiskAccessModeValidity(self.op.diskparams)
731
      except errors.OpPrereqError, err:
732
        raise errors.OpPrereqError("While verify diskparams options: %s" % err,
733
                                   errors.ECODE_INVAL)
734

    
735
  def ExpandNames(self):
736
    # FIXME: in the future maybe other cluster params won't require checking on
737
    # all nodes to be modified.
738
    # FIXME: This opcode changes cluster-wide settings. Is acquiring all
739
    # resource locks the right thing, shouldn't it be the BGL instead?
740
    self.needed_locks = {
741
      locking.LEVEL_NODE: locking.ALL_SET,
742
      locking.LEVEL_INSTANCE: locking.ALL_SET,
743
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
744
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
745
    }
746
    self.share_locks = ShareAll()
747

    
748
  def BuildHooksEnv(self):
749
    """Build hooks env.
750

751
    """
752
    return {
753
      "OP_TARGET": self.cfg.GetClusterName(),
754
      "NEW_VG_NAME": self.op.vg_name,
755
      }
756

    
757
  def BuildHooksNodes(self):
758
    """Build hooks nodes.
759

760
    """
761
    mn = self.cfg.GetMasterNode()
762
    return ([mn], [mn])
763

    
764
  def _CheckVgName(self, node_uuids, enabled_disk_templates,
765
                   new_enabled_disk_templates):
766
    """Check the consistency of the vg name on all nodes and in case it gets
767
       unset whether there are instances still using it.
768

769
    """
770
    lvm_is_enabled = utils.IsLvmEnabled(enabled_disk_templates)
771
    lvm_gets_enabled = utils.LvmGetsEnabled(enabled_disk_templates,
772
                                            new_enabled_disk_templates)
773
    current_vg_name = self.cfg.GetVGName()
774

    
775
    if self.op.vg_name == '':
776
      if lvm_is_enabled:
777
        raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
778
                                   " disk templates are or get enabled.")
779

    
780
    if self.op.vg_name is None:
781
      if current_vg_name is None and lvm_is_enabled:
782
        raise errors.OpPrereqError("Please specify a volume group when"
783
                                   " enabling lvm-based disk-templates.")
784

    
785
    if self.op.vg_name is not None and not self.op.vg_name:
786
      if self.cfg.HasAnyDiskOfType(constants.DT_PLAIN):
787
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
788
                                   " instances exist", errors.ECODE_INVAL)
789

    
790
    if (self.op.vg_name is not None and lvm_is_enabled) or \
791
        (self.cfg.GetVGName() is not None and lvm_gets_enabled):
792
      self._CheckVgNameOnNodes(node_uuids)
793

    
794
  def _CheckVgNameOnNodes(self, node_uuids):
795
    """Check the status of the volume group on each node.
796

797
    """
798
    vglist = self.rpc.call_vg_list(node_uuids)
799
    for node_uuid in node_uuids:
800
      msg = vglist[node_uuid].fail_msg
801
      if msg:
802
        # ignoring down node
803
        self.LogWarning("Error while gathering data on node %s"
804
                        " (ignoring node): %s",
805
                        self.cfg.GetNodeName(node_uuid), msg)
806
        continue
807
      vgstatus = utils.CheckVolumeGroupSize(vglist[node_uuid].payload,
808
                                            self.op.vg_name,
809
                                            constants.MIN_VG_SIZE)
810
      if vgstatus:
811
        raise errors.OpPrereqError("Error on node '%s': %s" %
812
                                   (self.cfg.GetNodeName(node_uuid), vgstatus),
813
                                   errors.ECODE_ENVIRON)
814

    
815
  @staticmethod
816
  def _GetDiskTemplateSetsInner(op_enabled_disk_templates,
817
                                old_enabled_disk_templates):
818
    """Computes three sets of disk templates.
819

820
    @see: C{_GetDiskTemplateSets} for more details.
821

822
    """
823
    enabled_disk_templates = None
824
    new_enabled_disk_templates = []
825
    disabled_disk_templates = []
826
    if op_enabled_disk_templates:
827
      enabled_disk_templates = op_enabled_disk_templates
828
      new_enabled_disk_templates = \
829
        list(set(enabled_disk_templates)
830
             - set(old_enabled_disk_templates))
831
      disabled_disk_templates = \
832
        list(set(old_enabled_disk_templates)
833
             - set(enabled_disk_templates))
834
    else:
835
      enabled_disk_templates = old_enabled_disk_templates
836
    return (enabled_disk_templates, new_enabled_disk_templates,
837
            disabled_disk_templates)
838

    
839
  def _GetDiskTemplateSets(self, cluster):
840
    """Computes three sets of disk templates.
841

842
    The three sets are:
843
      - disk templates that will be enabled after this operation (no matter if
844
        they were enabled before or not)
845
      - disk templates that get enabled by this operation (thus haven't been
846
        enabled before.)
847
      - disk templates that get disabled by this operation
848

849
    """
850
    return self._GetDiskTemplateSetsInner(self.op.enabled_disk_templates,
851
                                          cluster.enabled_disk_templates)
852

    
853
  def _CheckIpolicy(self, cluster, enabled_disk_templates):
854
    """Checks the ipolicy.
855

856
    @type cluster: C{objects.Cluster}
857
    @param cluster: the cluster's configuration
858
    @type enabled_disk_templates: list of string
859
    @param enabled_disk_templates: list of (possibly newly) enabled disk
860
      templates
861

862
    """
863
    # FIXME: write unit tests for this
864
    if self.op.ipolicy:
865
      self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
866
                                           group_policy=False)
867

    
868
      CheckIpolicyVsDiskTemplates(self.new_ipolicy,
869
                                  enabled_disk_templates)
870

    
871
      all_instances = self.cfg.GetAllInstancesInfo().values()
872
      violations = set()
873
      for group in self.cfg.GetAllNodeGroupsInfo().values():
874
        instances = frozenset([inst for inst in all_instances
875
                               if compat.any(nuuid in group.members
876
                                             for nuuid in inst.all_nodes)])
877
        new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
878
        ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
879
        new = ComputeNewInstanceViolations(ipol, new_ipolicy, instances,
880
                                           self.cfg)
881
        if new:
882
          violations.update(new)
883

    
884
      if violations:
885
        self.LogWarning("After the ipolicy change the following instances"
886
                        " violate them: %s",
887
                        utils.CommaJoin(utils.NiceSort(violations)))
888
    else:
889
      CheckIpolicyVsDiskTemplates(cluster.ipolicy,
890
                                  enabled_disk_templates)
891

    
892
  def _CheckDrbdHelperOnNodes(self, drbd_helper, node_uuids):
893
    """Checks whether the set DRBD helper actually exists on the nodes.
894

895
    @type drbd_helper: string
896
    @param drbd_helper: path of the drbd usermode helper binary
897
    @type node_uuids: list of strings
898
    @param node_uuids: list of node UUIDs to check for the helper
899

900
    """
901
    # checks given drbd helper on all nodes
902
    helpers = self.rpc.call_drbd_helper(node_uuids)
903
    for (_, ninfo) in self.cfg.GetMultiNodeInfo(node_uuids):
904
      if ninfo.offline:
905
        self.LogInfo("Not checking drbd helper on offline node %s",
906
                     ninfo.name)
907
        continue
908
      msg = helpers[ninfo.uuid].fail_msg
909
      if msg:
910
        raise errors.OpPrereqError("Error checking drbd helper on node"
911
                                   " '%s': %s" % (ninfo.name, msg),
912
                                   errors.ECODE_ENVIRON)
913
      node_helper = helpers[ninfo.uuid].payload
914
      if node_helper != drbd_helper:
915
        raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
916
                                   (ninfo.name, node_helper),
917
                                   errors.ECODE_ENVIRON)
918

    
919
  def _CheckDrbdHelper(self, node_uuids, drbd_enabled, drbd_gets_enabled):
920
    """Check the DRBD usermode helper.
921

922
    @type node_uuids: list of strings
923
    @param node_uuids: a list of nodes' UUIDs
924
    @type drbd_enabled: boolean
925
    @param drbd_enabled: whether DRBD will be enabled after this operation
926
      (no matter if it was disabled before or not)
927
    @type drbd_gets_enabled: boolen
928
    @param drbd_gets_enabled: true if DRBD was disabled before this
929
      operation, but will be enabled afterwards
930

931
    """
932
    if self.op.drbd_helper == '':
933
      if drbd_enabled:
934
        raise errors.OpPrereqError("Cannot disable drbd helper while"
935
                                   " DRBD is enabled.")
936
      if self.cfg.HasAnyDiskOfType(constants.DT_DRBD8):
937
        raise errors.OpPrereqError("Cannot disable drbd helper while"
938
                                   " drbd-based instances exist",
939
                                   errors.ECODE_INVAL)
940

    
941
    else:
942
      if self.op.drbd_helper is not None and drbd_enabled:
943
        self._CheckDrbdHelperOnNodes(self.op.drbd_helper, node_uuids)
944
      else:
945
        if drbd_gets_enabled:
946
          current_drbd_helper = self.cfg.GetClusterInfo().drbd_usermode_helper
947
          if current_drbd_helper is not None:
948
            self._CheckDrbdHelperOnNodes(current_drbd_helper, node_uuids)
949
          else:
950
            raise errors.OpPrereqError("Cannot enable DRBD without a"
951
                                       " DRBD usermode helper set.")
952

    
953
  def _CheckInstancesOfDisabledDiskTemplates(
954
      self, disabled_disk_templates):
955
    """Check whether we try to disable a disk template that is in use.
956

957
    @type disabled_disk_templates: list of string
958
    @param disabled_disk_templates: list of disk templates that are going to
959
      be disabled by this operation
960

961
    """
962
    for disk_template in disabled_disk_templates:
963
      if self.cfg.HasAnyDiskOfType(disk_template):
964
        raise errors.OpPrereqError(
965
            "Cannot disable disk template '%s', because there is at least one"
966
            " instance using it." % disk_template)
967

    
968
  def CheckPrereq(self):
969
    """Check prerequisites.
970

971
    This checks whether the given params don't conflict and
972
    if the given volume group is valid.
973

974
    """
975
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
976
    self.cluster = cluster = self.cfg.GetClusterInfo()
977

    
978
    vm_capable_node_uuids = [node.uuid
979
                             for node in self.cfg.GetAllNodesInfo().values()
980
                             if node.uuid in node_uuids and node.vm_capable]
981

    
982
    (enabled_disk_templates, new_enabled_disk_templates,
983
      disabled_disk_templates) = self._GetDiskTemplateSets(cluster)
984
    self._CheckInstancesOfDisabledDiskTemplates(disabled_disk_templates)
985

    
986
    self._CheckVgName(vm_capable_node_uuids, enabled_disk_templates,
987
                      new_enabled_disk_templates)
988

    
989
    if self.op.file_storage_dir is not None:
990
      CheckFileStoragePathVsEnabledDiskTemplates(
991
          self.LogWarning, self.op.file_storage_dir, enabled_disk_templates)
992

    
993
    if self.op.shared_file_storage_dir is not None:
994
      CheckSharedFileStoragePathVsEnabledDiskTemplates(
995
          self.LogWarning, self.op.shared_file_storage_dir,
996
          enabled_disk_templates)
997

    
998
    drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
999
    drbd_gets_enabled = constants.DT_DRBD8 in new_enabled_disk_templates
1000
    self._CheckDrbdHelper(node_uuids, drbd_enabled, drbd_gets_enabled)
1001

    
1002
    # validate params changes
1003
    if self.op.beparams:
1004
      objects.UpgradeBeParams(self.op.beparams)
1005
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1006
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
1007

    
1008
    if self.op.ndparams:
1009
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
1010
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
1011

    
1012
      # TODO: we need a more general way to handle resetting
1013
      # cluster-level parameters to default values
1014
      if self.new_ndparams["oob_program"] == "":
1015
        self.new_ndparams["oob_program"] = \
1016
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
1017

    
1018
    if self.op.hv_state:
1019
      new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
1020
                                           self.cluster.hv_state_static)
1021
      self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
1022
                               for hv, values in new_hv_state.items())
1023

    
1024
    if self.op.disk_state:
1025
      new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
1026
                                               self.cluster.disk_state_static)
1027
      self.new_disk_state = \
1028
        dict((storage, dict((name, cluster.SimpleFillDiskState(values))
1029
                            for name, values in svalues.items()))
1030
             for storage, svalues in new_disk_state.items())
1031

    
1032
    self._CheckIpolicy(cluster, enabled_disk_templates)
1033

    
1034
    if self.op.nicparams:
1035
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1036
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
1037
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
1038
      nic_errors = []
1039

    
1040
      # check all instances for consistency
1041
      for instance in self.cfg.GetAllInstancesInfo().values():
1042
        for nic_idx, nic in enumerate(instance.nics):
1043
          params_copy = copy.deepcopy(nic.nicparams)
1044
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
1045

    
1046
          # check parameter syntax
1047
          try:
1048
            objects.NIC.CheckParameterSyntax(params_filled)
1049
          except errors.ConfigurationError, err:
1050
            nic_errors.append("Instance %s, nic/%d: %s" %
1051
                              (instance.name, nic_idx, err))
1052

    
1053
          # if we're moving instances to routed, check that they have an ip
1054
          target_mode = params_filled[constants.NIC_MODE]
1055
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
1056
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
1057
                              " address" % (instance.name, nic_idx))
1058
      if nic_errors:
1059
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
1060
                                   "\n".join(nic_errors), errors.ECODE_INVAL)
1061

    
1062
    # hypervisor list/parameters
1063
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
1064
    if self.op.hvparams:
1065
      for hv_name, hv_dict in self.op.hvparams.items():
1066
        if hv_name not in self.new_hvparams:
1067
          self.new_hvparams[hv_name] = hv_dict
1068
        else:
1069
          self.new_hvparams[hv_name].update(hv_dict)
1070

    
1071
    # disk template parameters
1072
    self.new_diskparams = objects.FillDict(cluster.diskparams, {})
1073
    if self.op.diskparams:
1074
      for dt_name, dt_params in self.op.diskparams.items():
1075
        if dt_name not in self.new_diskparams:
1076
          self.new_diskparams[dt_name] = dt_params
1077
        else:
1078
          self.new_diskparams[dt_name].update(dt_params)
1079
      CheckDiskAccessModeConsistency(self.op.diskparams, self.cfg)
1080

    
1081
    # os hypervisor parameters
1082
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
1083
    if self.op.os_hvp:
1084
      for os_name, hvs in self.op.os_hvp.items():
1085
        if os_name not in self.new_os_hvp:
1086
          self.new_os_hvp[os_name] = hvs
1087
        else:
1088
          for hv_name, hv_dict in hvs.items():
1089
            if hv_dict is None:
1090
              # Delete if it exists
1091
              self.new_os_hvp[os_name].pop(hv_name, None)
1092
            elif hv_name not in self.new_os_hvp[os_name]:
1093
              self.new_os_hvp[os_name][hv_name] = hv_dict
1094
            else:
1095
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
1096

    
1097
    # os parameters
1098
    self.new_osp = objects.FillDict(cluster.osparams, {})
1099
    if self.op.osparams:
1100
      for os_name, osp in self.op.osparams.items():
1101
        if os_name not in self.new_osp:
1102
          self.new_osp[os_name] = {}
1103

    
1104
        self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
1105
                                                 use_none=True)
1106

    
1107
        if not self.new_osp[os_name]:
1108
          # we removed all parameters
1109
          del self.new_osp[os_name]
1110
        else:
1111
          # check the parameter validity (remote check)
1112
          CheckOSParams(self, False, [self.cfg.GetMasterNode()],
1113
                        os_name, self.new_osp[os_name])
1114

    
1115
    # changes to the hypervisor list
1116
    if self.op.enabled_hypervisors is not None:
1117
      self.hv_list = self.op.enabled_hypervisors
1118
      for hv in self.hv_list:
1119
        # if the hypervisor doesn't already exist in the cluster
1120
        # hvparams, we initialize it to empty, and then (in both
1121
        # cases) we make sure to fill the defaults, as we might not
1122
        # have a complete defaults list if the hypervisor wasn't
1123
        # enabled before
1124
        if hv not in new_hvp:
1125
          new_hvp[hv] = {}
1126
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
1127
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
1128
    else:
1129
      self.hv_list = cluster.enabled_hypervisors
1130

    
1131
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1132
      # either the enabled list has changed, or the parameters have, validate
1133
      for hv_name, hv_params in self.new_hvparams.items():
1134
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1135
            (self.op.enabled_hypervisors and
1136
             hv_name in self.op.enabled_hypervisors)):
1137
          # either this is a new hypervisor, or its parameters have changed
1138
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1139
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1140
          hv_class.CheckParameterSyntax(hv_params)
1141
          CheckHVParams(self, node_uuids, hv_name, hv_params)
1142

    
1143
    self._CheckDiskTemplateConsistency()
1144

    
1145
    if self.op.os_hvp:
1146
      # no need to check any newly-enabled hypervisors, since the
1147
      # defaults have already been checked in the above code-block
1148
      for os_name, os_hvp in self.new_os_hvp.items():
1149
        for hv_name, hv_params in os_hvp.items():
1150
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1151
          # we need to fill in the new os_hvp on top of the actual hv_p
1152
          cluster_defaults = self.new_hvparams.get(hv_name, {})
1153
          new_osp = objects.FillDict(cluster_defaults, hv_params)
1154
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1155
          hv_class.CheckParameterSyntax(new_osp)
1156
          CheckHVParams(self, node_uuids, hv_name, new_osp)
1157

    
1158
    if self.op.default_iallocator:
1159
      alloc_script = utils.FindFile(self.op.default_iallocator,
1160
                                    constants.IALLOCATOR_SEARCH_PATH,
1161
                                    os.path.isfile)
1162
      if alloc_script is None:
1163
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
1164
                                   " specified" % self.op.default_iallocator,
1165
                                   errors.ECODE_INVAL)
1166

    
1167
  def _CheckDiskTemplateConsistency(self):
1168
    """Check whether the disk templates that are going to be disabled
1169
       are still in use by some instances.
1170

1171
    """
1172
    if self.op.enabled_disk_templates:
1173
      cluster = self.cfg.GetClusterInfo()
1174
      instances = self.cfg.GetAllInstancesInfo()
1175

    
1176
      disk_templates_to_remove = set(cluster.enabled_disk_templates) \
1177
        - set(self.op.enabled_disk_templates)
1178
      for instance in instances.itervalues():
1179
        if instance.disk_template in disk_templates_to_remove:
1180
          raise errors.OpPrereqError("Cannot disable disk template '%s',"
1181
                                     " because instance '%s' is using it." %
1182
                                     (instance.disk_template, instance.name))
1183

    
1184
  def _SetVgName(self, feedback_fn):
1185
    """Determines and sets the new volume group name.
1186

1187
    """
1188
    if self.op.vg_name is not None:
1189
      new_volume = self.op.vg_name
1190
      if not new_volume:
1191
        new_volume = None
1192
      if new_volume != self.cfg.GetVGName():
1193
        self.cfg.SetVGName(new_volume)
1194
      else:
1195
        feedback_fn("Cluster LVM configuration already in desired"
1196
                    " state, not changing")
1197

    
1198
  def _SetFileStorageDir(self, feedback_fn):
1199
    """Set the file storage directory.
1200

1201
    """
1202
    if self.op.file_storage_dir is not None:
1203
      if self.cluster.file_storage_dir == self.op.file_storage_dir:
1204
        feedback_fn("Global file storage dir already set to value '%s'"
1205
                    % self.cluster.file_storage_dir)
1206
      else:
1207
        self.cluster.file_storage_dir = self.op.file_storage_dir
1208

    
1209
  def _SetDrbdHelper(self, feedback_fn):
1210
    """Set the DRBD usermode helper.
1211

1212
    """
1213
    if self.op.drbd_helper is not None:
1214
      if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
1215
        feedback_fn("Note that you specified a drbd user helper, but did not"
1216
                    " enable the drbd disk template.")
1217
      new_helper = self.op.drbd_helper
1218
      if not new_helper:
1219
        new_helper = None
1220
      if new_helper != self.cfg.GetDRBDHelper():
1221
        self.cfg.SetDRBDHelper(new_helper)
1222
      else:
1223
        feedback_fn("Cluster DRBD helper already in desired state,"
1224
                    " not changing")
1225

    
1226
  def Exec(self, feedback_fn):
1227
    """Change the parameters of the cluster.
1228

1229
    """
1230
    if self.op.enabled_disk_templates:
1231
      self.cluster.enabled_disk_templates = \
1232
        list(self.op.enabled_disk_templates)
1233

    
1234
    self._SetVgName(feedback_fn)
1235
    self._SetFileStorageDir(feedback_fn)
1236
    self._SetDrbdHelper(feedback_fn)
1237

    
1238
    if self.op.hvparams:
1239
      self.cluster.hvparams = self.new_hvparams
1240
    if self.op.os_hvp:
1241
      self.cluster.os_hvp = self.new_os_hvp
1242
    if self.op.enabled_hypervisors is not None:
1243
      self.cluster.hvparams = self.new_hvparams
1244
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1245
    if self.op.beparams:
1246
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1247
    if self.op.nicparams:
1248
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1249
    if self.op.ipolicy:
1250
      self.cluster.ipolicy = self.new_ipolicy
1251
    if self.op.osparams:
1252
      self.cluster.osparams = self.new_osp
1253
    if self.op.ndparams:
1254
      self.cluster.ndparams = self.new_ndparams
1255
    if self.op.diskparams:
1256
      self.cluster.diskparams = self.new_diskparams
1257
    if self.op.hv_state:
1258
      self.cluster.hv_state_static = self.new_hv_state
1259
    if self.op.disk_state:
1260
      self.cluster.disk_state_static = self.new_disk_state
1261

    
1262
    if self.op.candidate_pool_size is not None:
1263
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1264
      # we need to update the pool size here, otherwise the save will fail
1265
      AdjustCandidatePool(self, [])
1266

    
1267
    if self.op.maintain_node_health is not None:
1268
      if self.op.maintain_node_health and not constants.ENABLE_CONFD:
1269
        feedback_fn("Note: CONFD was disabled at build time, node health"
1270
                    " maintenance is not useful (still enabling it)")
1271
      self.cluster.maintain_node_health = self.op.maintain_node_health
1272

    
1273
    if self.op.modify_etc_hosts is not None:
1274
      self.cluster.modify_etc_hosts = self.op.modify_etc_hosts
1275

    
1276
    if self.op.prealloc_wipe_disks is not None:
1277
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
1278

    
1279
    if self.op.add_uids is not None:
1280
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
1281

    
1282
    if self.op.remove_uids is not None:
1283
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
1284

    
1285
    if self.op.uid_pool is not None:
1286
      self.cluster.uid_pool = self.op.uid_pool
1287

    
1288
    if self.op.default_iallocator is not None:
1289
      self.cluster.default_iallocator = self.op.default_iallocator
1290

    
1291
    if self.op.default_iallocator_params is not None:
1292
      self.cluster.default_iallocator_params = self.op.default_iallocator_params
1293

    
1294
    if self.op.reserved_lvs is not None:
1295
      self.cluster.reserved_lvs = self.op.reserved_lvs
1296

    
1297
    if self.op.use_external_mip_script is not None:
1298
      self.cluster.use_external_mip_script = self.op.use_external_mip_script
1299

    
1300
    def helper_os(aname, mods, desc):
1301
      desc += " OS list"
1302
      lst = getattr(self.cluster, aname)
1303
      for key, val in mods:
1304
        if key == constants.DDM_ADD:
1305
          if val in lst:
1306
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1307
          else:
1308
            lst.append(val)
1309
        elif key == constants.DDM_REMOVE:
1310
          if val in lst:
1311
            lst.remove(val)
1312
          else:
1313
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1314
        else:
1315
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
1316

    
1317
    if self.op.hidden_os:
1318
      helper_os("hidden_os", self.op.hidden_os, "hidden")
1319

    
1320
    if self.op.blacklisted_os:
1321
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1322

    
1323
    if self.op.master_netdev:
1324
      master_params = self.cfg.GetMasterNetworkParameters()
1325
      ems = self.cfg.GetUseExternalMipScript()
1326
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
1327
                  self.cluster.master_netdev)
1328
      result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
1329
                                                       master_params, ems)
1330
      if not self.op.force:
1331
        result.Raise("Could not disable the master ip")
1332
      else:
1333
        if result.fail_msg:
1334
          msg = ("Could not disable the master ip (continuing anyway): %s" %
1335
                 result.fail_msg)
1336
          feedback_fn(msg)
1337
      feedback_fn("Changing master_netdev from %s to %s" %
1338
                  (master_params.netdev, self.op.master_netdev))
1339
      self.cluster.master_netdev = self.op.master_netdev
1340

    
1341
    if self.op.master_netmask:
1342
      master_params = self.cfg.GetMasterNetworkParameters()
1343
      feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1344
      result = self.rpc.call_node_change_master_netmask(
1345
                 master_params.uuid, master_params.netmask,
1346
                 self.op.master_netmask, master_params.ip,
1347
                 master_params.netdev)
1348
      result.Warn("Could not change the master IP netmask", feedback_fn)
1349
      self.cluster.master_netmask = self.op.master_netmask
1350

    
1351
    self.cfg.Update(self.cluster, feedback_fn)
1352

    
1353
    if self.op.master_netdev:
1354
      master_params = self.cfg.GetMasterNetworkParameters()
1355
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
1356
                  self.op.master_netdev)
1357
      ems = self.cfg.GetUseExternalMipScript()
1358
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
1359
                                                     master_params, ems)
1360
      result.Warn("Could not re-enable the master ip on the master,"
1361
                  " please restart manually", self.LogWarning)
1362

    
1363

    
1364
class LUClusterVerify(NoHooksLU):
1365
  """Submits all jobs necessary to verify the cluster.
1366

1367
  """
1368
  REQ_BGL = False
1369

    
1370
  def ExpandNames(self):
1371
    self.needed_locks = {}
1372

    
1373
  def Exec(self, feedback_fn):
1374
    jobs = []
1375

    
1376
    if self.op.group_name:
1377
      groups = [self.op.group_name]
1378
      depends_fn = lambda: None
1379
    else:
1380
      groups = self.cfg.GetNodeGroupList()
1381

    
1382
      # Verify global configuration
1383
      jobs.append([
1384
        opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1385
        ])
1386

    
1387
      # Always depend on global verification
1388
      depends_fn = lambda: [(-len(jobs), [])]
1389

    
1390
    jobs.extend(
1391
      [opcodes.OpClusterVerifyGroup(group_name=group,
1392
                                    ignore_errors=self.op.ignore_errors,
1393
                                    depends=depends_fn())]
1394
      for group in groups)
1395

    
1396
    # Fix up all parameters
1397
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1398
      op.debug_simulate_errors = self.op.debug_simulate_errors
1399
      op.verbose = self.op.verbose
1400
      op.error_codes = self.op.error_codes
1401
      try:
1402
        op.skip_checks = self.op.skip_checks
1403
      except AttributeError:
1404
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1405

    
1406
    return ResultWithJobs(jobs)
1407

    
1408

    
1409
class _VerifyErrors(object):
1410
  """Mix-in for cluster/group verify LUs.
1411

1412
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1413
  self.op and self._feedback_fn to be available.)
1414

1415
  """
1416

    
1417
  ETYPE_FIELD = "code"
1418
  ETYPE_ERROR = "ERROR"
1419
  ETYPE_WARNING = "WARNING"
1420

    
1421
  def _Error(self, ecode, item, msg, *args, **kwargs):
1422
    """Format an error message.
1423

1424
    Based on the opcode's error_codes parameter, either format a
1425
    parseable error code, or a simpler error string.
1426

1427
    This must be called only from Exec and functions called from Exec.
1428

1429
    """
1430
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1431
    itype, etxt, _ = ecode
1432
    # If the error code is in the list of ignored errors, demote the error to a
1433
    # warning
1434
    if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1435
      ltype = self.ETYPE_WARNING
1436
    # first complete the msg
1437
    if args:
1438
      msg = msg % args
1439
    # then format the whole message
1440
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1441
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1442
    else:
1443
      if item:
1444
        item = " " + item
1445
      else:
1446
        item = ""
1447
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1448
    # and finally report it via the feedback_fn
1449
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1450
    # do not mark the operation as failed for WARN cases only
1451
    if ltype == self.ETYPE_ERROR:
1452
      self.bad = True
1453

    
1454
  def _ErrorIf(self, cond, *args, **kwargs):
1455
    """Log an error message if the passed condition is True.
1456

1457
    """
1458
    if (bool(cond)
1459
        or self.op.debug_simulate_errors): # pylint: disable=E1101
1460
      self._Error(*args, **kwargs)
1461

    
1462

    
1463
def _VerifyCertificate(filename):
1464
  """Verifies a certificate for L{LUClusterVerifyConfig}.
1465

1466
  @type filename: string
1467
  @param filename: Path to PEM file
1468

1469
  """
1470
  try:
1471
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1472
                                           utils.ReadFile(filename))
1473
  except Exception, err: # pylint: disable=W0703
1474
    return (LUClusterVerifyConfig.ETYPE_ERROR,
1475
            "Failed to load X509 certificate %s: %s" % (filename, err))
1476

    
1477
  (errcode, msg) = \
1478
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1479
                                constants.SSL_CERT_EXPIRATION_ERROR)
1480

    
1481
  if msg:
1482
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1483
  else:
1484
    fnamemsg = None
1485

    
1486
  if errcode is None:
1487
    return (None, fnamemsg)
1488
  elif errcode == utils.CERT_WARNING:
1489
    return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1490
  elif errcode == utils.CERT_ERROR:
1491
    return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1492

    
1493
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1494

    
1495

    
1496
def _GetAllHypervisorParameters(cluster, instances):
1497
  """Compute the set of all hypervisor parameters.
1498

1499
  @type cluster: L{objects.Cluster}
1500
  @param cluster: the cluster object
1501
  @param instances: list of L{objects.Instance}
1502
  @param instances: additional instances from which to obtain parameters
1503
  @rtype: list of (origin, hypervisor, parameters)
1504
  @return: a list with all parameters found, indicating the hypervisor they
1505
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1506

1507
  """
1508
  hvp_data = []
1509

    
1510
  for hv_name in cluster.enabled_hypervisors:
1511
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1512

    
1513
  for os_name, os_hvp in cluster.os_hvp.items():
1514
    for hv_name, hv_params in os_hvp.items():
1515
      if hv_params:
1516
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1517
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1518

    
1519
  # TODO: collapse identical parameter values in a single one
1520
  for instance in instances:
1521
    if instance.hvparams:
1522
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1523
                       cluster.FillHV(instance)))
1524

    
1525
  return hvp_data
1526

    
1527

    
1528
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1529
  """Verifies the cluster config.
1530

1531
  """
1532
  REQ_BGL = False
1533

    
1534
  def _VerifyHVP(self, hvp_data):
1535
    """Verifies locally the syntax of the hypervisor parameters.
1536

1537
    """
1538
    for item, hv_name, hv_params in hvp_data:
1539
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1540
             (item, hv_name))
1541
      try:
1542
        hv_class = hypervisor.GetHypervisorClass(hv_name)
1543
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1544
        hv_class.CheckParameterSyntax(hv_params)
1545
      except errors.GenericError, err:
1546
        self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1547

    
1548
  def ExpandNames(self):
1549
    self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1550
    self.share_locks = ShareAll()
1551

    
1552
  def CheckPrereq(self):
1553
    """Check prerequisites.
1554

1555
    """
1556
    # Retrieve all information
1557
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1558
    self.all_node_info = self.cfg.GetAllNodesInfo()
1559
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1560

    
1561
  def Exec(self, feedback_fn):
1562
    """Verify integrity of cluster, performing various test on nodes.
1563

1564
    """
1565
    self.bad = False
1566
    self._feedback_fn = feedback_fn
1567

    
1568
    feedback_fn("* Verifying cluster config")
1569

    
1570
    for msg in self.cfg.VerifyConfig():
1571
      self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1572

    
1573
    feedback_fn("* Verifying cluster certificate files")
1574

    
1575
    for cert_filename in pathutils.ALL_CERT_FILES:
1576
      (errcode, msg) = _VerifyCertificate(cert_filename)
1577
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1578

    
1579
    self._ErrorIf(not utils.CanRead(constants.LUXID_USER,
1580
                                    pathutils.NODED_CERT_FILE),
1581
                  constants.CV_ECLUSTERCERT,
1582
                  None,
1583
                  pathutils.NODED_CERT_FILE + " must be accessible by the " +
1584
                    constants.LUXID_USER + " user")
1585

    
1586
    feedback_fn("* Verifying hypervisor parameters")
1587

    
1588
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1589
                                                self.all_inst_info.values()))
1590

    
1591
    feedback_fn("* Verifying all nodes belong to an existing group")
1592

    
1593
    # We do this verification here because, should this bogus circumstance
1594
    # occur, it would never be caught by VerifyGroup, which only acts on
1595
    # nodes/instances reachable from existing node groups.
1596

    
1597
    dangling_nodes = set(node for node in self.all_node_info.values()
1598
                         if node.group not in self.all_group_info)
1599

    
1600
    dangling_instances = {}
1601
    no_node_instances = []
1602

    
1603
    for inst in self.all_inst_info.values():
1604
      if inst.primary_node in [node.uuid for node in dangling_nodes]:
1605
        dangling_instances.setdefault(inst.primary_node, []).append(inst)
1606
      elif inst.primary_node not in self.all_node_info:
1607
        no_node_instances.append(inst)
1608

    
1609
    pretty_dangling = [
1610
        "%s (%s)" %
1611
        (node.name,
1612
         utils.CommaJoin(inst.name for
1613
                         inst in dangling_instances.get(node.uuid, [])))
1614
        for node in dangling_nodes]
1615

    
1616
    self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1617
                  None,
1618
                  "the following nodes (and their instances) belong to a non"
1619
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1620

    
1621
    self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1622
                  None,
1623
                  "the following instances have a non-existing primary-node:"
1624
                  " %s", utils.CommaJoin(inst.name for
1625
                                         inst in no_node_instances))
1626

    
1627
    return not self.bad
1628

    
1629

    
1630
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1631
  """Verifies the status of a node group.
1632

1633
  """
1634
  HPATH = "cluster-verify"
1635
  HTYPE = constants.HTYPE_CLUSTER
1636
  REQ_BGL = False
1637

    
1638
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1639

    
1640
  class NodeImage(object):
1641
    """A class representing the logical and physical status of a node.
1642

1643
    @type uuid: string
1644
    @ivar uuid: the node UUID to which this object refers
1645
    @ivar volumes: a structure as returned from
1646
        L{ganeti.backend.GetVolumeList} (runtime)
1647
    @ivar instances: a list of running instances (runtime)
1648
    @ivar pinst: list of configured primary instances (config)
1649
    @ivar sinst: list of configured secondary instances (config)
1650
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1651
        instances for which this node is secondary (config)
1652
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1653
    @ivar dfree: free disk, as reported by the node (runtime)
1654
    @ivar offline: the offline status (config)
1655
    @type rpc_fail: boolean
1656
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1657
        not whether the individual keys were correct) (runtime)
1658
    @type lvm_fail: boolean
1659
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1660
    @type hyp_fail: boolean
1661
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1662
    @type ghost: boolean
1663
    @ivar ghost: whether this is a known node or not (config)
1664
    @type os_fail: boolean
1665
    @ivar os_fail: whether the RPC call didn't return valid OS data
1666
    @type oslist: list
1667
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1668
    @type vm_capable: boolean
1669
    @ivar vm_capable: whether the node can host instances
1670
    @type pv_min: float
1671
    @ivar pv_min: size in MiB of the smallest PVs
1672
    @type pv_max: float
1673
    @ivar pv_max: size in MiB of the biggest PVs
1674

1675
    """
1676
    def __init__(self, offline=False, uuid=None, vm_capable=True):
1677
      self.uuid = uuid
1678
      self.volumes = {}
1679
      self.instances = []
1680
      self.pinst = []
1681
      self.sinst = []
1682
      self.sbp = {}
1683
      self.mfree = 0
1684
      self.dfree = 0
1685
      self.offline = offline
1686
      self.vm_capable = vm_capable
1687
      self.rpc_fail = False
1688
      self.lvm_fail = False
1689
      self.hyp_fail = False
1690
      self.ghost = False
1691
      self.os_fail = False
1692
      self.oslist = {}
1693
      self.pv_min = None
1694
      self.pv_max = None
1695

    
1696
  def ExpandNames(self):
1697
    # This raises errors.OpPrereqError on its own:
1698
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1699

    
1700
    # Get instances in node group; this is unsafe and needs verification later
1701
    inst_uuids = \
1702
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1703

    
1704
    self.needed_locks = {
1705
      locking.LEVEL_INSTANCE: self.cfg.GetInstanceNames(inst_uuids),
1706
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1707
      locking.LEVEL_NODE: [],
1708

    
1709
      # This opcode is run by watcher every five minutes and acquires all nodes
1710
      # for a group. It doesn't run for a long time, so it's better to acquire
1711
      # the node allocation lock as well.
1712
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1713
      }
1714

    
1715
    self.share_locks = ShareAll()
1716

    
1717
  def DeclareLocks(self, level):
1718
    if level == locking.LEVEL_NODE:
1719
      # Get members of node group; this is unsafe and needs verification later
1720
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1721

    
1722
      # In Exec(), we warn about mirrored instances that have primary and
1723
      # secondary living in separate node groups. To fully verify that
1724
      # volumes for these instances are healthy, we will need to do an
1725
      # extra call to their secondaries. We ensure here those nodes will
1726
      # be locked.
1727
      for inst_name in self.owned_locks(locking.LEVEL_INSTANCE):
1728
        # Important: access only the instances whose lock is owned
1729
        instance = self.cfg.GetInstanceInfoByName(inst_name)
1730
        if instance.disk_template in constants.DTS_INT_MIRROR:
1731
          nodes.update(instance.secondary_nodes)
1732

    
1733
      self.needed_locks[locking.LEVEL_NODE] = nodes
1734

    
1735
  def CheckPrereq(self):
1736
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1737
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1738

    
1739
    group_node_uuids = set(self.group_info.members)
1740
    group_inst_uuids = \
1741
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1742

    
1743
    unlocked_node_uuids = \
1744
        group_node_uuids.difference(self.owned_locks(locking.LEVEL_NODE))
1745

    
1746
    unlocked_inst_uuids = \
1747
        group_inst_uuids.difference(
1748
          [self.cfg.GetInstanceInfoByName(name).uuid
1749
           for name in self.owned_locks(locking.LEVEL_INSTANCE)])
1750

    
1751
    if unlocked_node_uuids:
1752
      raise errors.OpPrereqError(
1753
        "Missing lock for nodes: %s" %
1754
        utils.CommaJoin(self.cfg.GetNodeNames(unlocked_node_uuids)),
1755
        errors.ECODE_STATE)
1756

    
1757
    if unlocked_inst_uuids:
1758
      raise errors.OpPrereqError(
1759
        "Missing lock for instances: %s" %
1760
        utils.CommaJoin(self.cfg.GetInstanceNames(unlocked_inst_uuids)),
1761
        errors.ECODE_STATE)
1762

    
1763
    self.all_node_info = self.cfg.GetAllNodesInfo()
1764
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1765

    
1766
    self.my_node_uuids = group_node_uuids
1767
    self.my_node_info = dict((node_uuid, self.all_node_info[node_uuid])
1768
                             for node_uuid in group_node_uuids)
1769

    
1770
    self.my_inst_uuids = group_inst_uuids
1771
    self.my_inst_info = dict((inst_uuid, self.all_inst_info[inst_uuid])
1772
                             for inst_uuid in group_inst_uuids)
1773

    
1774
    # We detect here the nodes that will need the extra RPC calls for verifying
1775
    # split LV volumes; they should be locked.
1776
    extra_lv_nodes = set()
1777

    
1778
    for inst in self.my_inst_info.values():
1779
      if inst.disk_template in constants.DTS_INT_MIRROR:
1780
        for nuuid in inst.all_nodes:
1781
          if self.all_node_info[nuuid].group != self.group_uuid:
1782
            extra_lv_nodes.add(nuuid)
1783

    
1784
    unlocked_lv_nodes = \
1785
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1786

    
1787
    if unlocked_lv_nodes:
1788
      raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1789
                                 utils.CommaJoin(unlocked_lv_nodes),
1790
                                 errors.ECODE_STATE)
1791
    self.extra_lv_nodes = list(extra_lv_nodes)
1792

    
1793
  def _VerifyNode(self, ninfo, nresult):
1794
    """Perform some basic validation on data returned from a node.
1795

1796
      - check the result data structure is well formed and has all the
1797
        mandatory fields
1798
      - check ganeti version
1799

1800
    @type ninfo: L{objects.Node}
1801
    @param ninfo: the node to check
1802
    @param nresult: the results from the node
1803
    @rtype: boolean
1804
    @return: whether overall this call was successful (and we can expect
1805
         reasonable values in the respose)
1806

1807
    """
1808
    # main result, nresult should be a non-empty dict
1809
    test = not nresult or not isinstance(nresult, dict)
1810
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1811
                  "unable to verify node: no data returned")
1812
    if test:
1813
      return False
1814

    
1815
    # compares ganeti version
1816
    local_version = constants.PROTOCOL_VERSION
1817
    remote_version = nresult.get("version", None)
1818
    test = not (remote_version and
1819
                isinstance(remote_version, (list, tuple)) and
1820
                len(remote_version) == 2)
1821
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1822
                  "connection to node returned invalid data")
1823
    if test:
1824
      return False
1825

    
1826
    test = local_version != remote_version[0]
1827
    self._ErrorIf(test, constants.CV_ENODEVERSION, ninfo.name,
1828
                  "incompatible protocol versions: master %s,"
1829
                  " node %s", local_version, remote_version[0])
1830
    if test:
1831
      return False
1832

    
1833
    # node seems compatible, we can actually try to look into its results
1834

    
1835
    # full package version
1836
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1837
                  constants.CV_ENODEVERSION, ninfo.name,
1838
                  "software version mismatch: master %s, node %s",
1839
                  constants.RELEASE_VERSION, remote_version[1],
1840
                  code=self.ETYPE_WARNING)
1841

    
1842
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1843
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1844
      for hv_name, hv_result in hyp_result.iteritems():
1845
        test = hv_result is not None
1846
        self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
1847
                      "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1848

    
1849
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1850
    if ninfo.vm_capable and isinstance(hvp_result, list):
1851
      for item, hv_name, hv_result in hvp_result:
1852
        self._ErrorIf(True, constants.CV_ENODEHV, ninfo.name,
1853
                      "hypervisor %s parameter verify failure (source %s): %s",
1854
                      hv_name, item, hv_result)
1855

    
1856
    test = nresult.get(constants.NV_NODESETUP,
1857
                       ["Missing NODESETUP results"])
1858
    self._ErrorIf(test, constants.CV_ENODESETUP, ninfo.name,
1859
                  "node setup error: %s", "; ".join(test))
1860

    
1861
    return True
1862

    
1863
  def _VerifyNodeTime(self, ninfo, nresult,
1864
                      nvinfo_starttime, nvinfo_endtime):
1865
    """Check the node time.
1866

1867
    @type ninfo: L{objects.Node}
1868
    @param ninfo: the node to check
1869
    @param nresult: the remote results for the node
1870
    @param nvinfo_starttime: the start time of the RPC call
1871
    @param nvinfo_endtime: the end time of the RPC call
1872

1873
    """
1874
    ntime = nresult.get(constants.NV_TIME, None)
1875
    try:
1876
      ntime_merged = utils.MergeTime(ntime)
1877
    except (ValueError, TypeError):
1878
      self._ErrorIf(True, constants.CV_ENODETIME, ninfo.name,
1879
                    "Node returned invalid time")
1880
      return
1881

    
1882
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1883
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1884
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1885
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1886
    else:
1887
      ntime_diff = None
1888

    
1889
    self._ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, ninfo.name,
1890
                  "Node time diverges by at least %s from master node time",
1891
                  ntime_diff)
1892

    
1893
  def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1894
    """Check the node LVM results and update info for cross-node checks.
1895

1896
    @type ninfo: L{objects.Node}
1897
    @param ninfo: the node to check
1898
    @param nresult: the remote results for the node
1899
    @param vg_name: the configured VG name
1900
    @type nimg: L{NodeImage}
1901
    @param nimg: node image
1902

1903
    """
1904
    if vg_name is None:
1905
      return
1906

    
1907
    # checks vg existence and size > 20G
1908
    vglist = nresult.get(constants.NV_VGLIST, None)
1909
    test = not vglist
1910
    self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
1911
                  "unable to check volume groups")
1912
    if not test:
1913
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1914
                                            constants.MIN_VG_SIZE)
1915
      self._ErrorIf(vgstatus, constants.CV_ENODELVM, ninfo.name, vgstatus)
1916

    
1917
    # Check PVs
1918
    (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1919
    for em in errmsgs:
1920
      self._Error(constants.CV_ENODELVM, ninfo.name, em)
1921
    if pvminmax is not None:
1922
      (nimg.pv_min, nimg.pv_max) = pvminmax
1923

    
1924
  def _VerifyGroupDRBDVersion(self, node_verify_infos):
1925
    """Check cross-node DRBD version consistency.
1926

1927
    @type node_verify_infos: dict
1928
    @param node_verify_infos: infos about nodes as returned from the
1929
      node_verify call.
1930

1931
    """
1932
    node_versions = {}
1933
    for node_uuid, ndata in node_verify_infos.items():
1934
      nresult = ndata.payload
1935
      if nresult:
1936
        version = nresult.get(constants.NV_DRBDVERSION, "Missing DRBD version")
1937
        node_versions[node_uuid] = version
1938

    
1939
    if len(set(node_versions.values())) > 1:
1940
      for node_uuid, version in sorted(node_versions.items()):
1941
        msg = "DRBD version mismatch: %s" % version
1942
        self._Error(constants.CV_ENODEDRBDHELPER, node_uuid, msg,
1943
                    code=self.ETYPE_WARNING)
1944

    
1945
  def _VerifyGroupLVM(self, node_image, vg_name):
1946
    """Check cross-node consistency in LVM.
1947

1948
    @type node_image: dict
1949
    @param node_image: info about nodes, mapping from node to names to
1950
      L{NodeImage} objects
1951
    @param vg_name: the configured VG name
1952

1953
    """
1954
    if vg_name is None:
1955
      return
1956

    
1957
    # Only exclusive storage needs this kind of checks
1958
    if not self._exclusive_storage:
1959
      return
1960

    
1961
    # exclusive_storage wants all PVs to have the same size (approximately),
1962
    # if the smallest and the biggest ones are okay, everything is fine.
1963
    # pv_min is None iff pv_max is None
1964
    vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1965
    if not vals:
1966
      return
1967
    (pvmin, minnode_uuid) = min((ni.pv_min, ni.uuid) for ni in vals)
1968
    (pvmax, maxnode_uuid) = max((ni.pv_max, ni.uuid) for ni in vals)
1969
    bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1970
    self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1971
                  "PV sizes differ too much in the group; smallest (%s MB) is"
1972
                  " on %s, biggest (%s MB) is on %s",
1973
                  pvmin, self.cfg.GetNodeName(minnode_uuid),
1974
                  pvmax, self.cfg.GetNodeName(maxnode_uuid))
1975

    
1976
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1977
    """Check the node bridges.
1978

1979
    @type ninfo: L{objects.Node}
1980
    @param ninfo: the node to check
1981
    @param nresult: the remote results for the node
1982
    @param bridges: the expected list of bridges
1983

1984
    """
1985
    if not bridges:
1986
      return
1987

    
1988
    missing = nresult.get(constants.NV_BRIDGES, None)
1989
    test = not isinstance(missing, list)
1990
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1991
                  "did not return valid bridge information")
1992
    if not test:
1993
      self._ErrorIf(bool(missing), constants.CV_ENODENET, ninfo.name,
1994
                    "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1995

    
1996
  def _VerifyNodeUserScripts(self, ninfo, nresult):
1997
    """Check the results of user scripts presence and executability on the node
1998

1999
    @type ninfo: L{objects.Node}
2000
    @param ninfo: the node to check
2001
    @param nresult: the remote results for the node
2002

2003
    """
2004
    test = not constants.NV_USERSCRIPTS in nresult
2005
    self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
2006
                  "did not return user scripts information")
2007

    
2008
    broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
2009
    if not test:
2010
      self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
2011
                    "user scripts not present or not executable: %s" %
2012
                    utils.CommaJoin(sorted(broken_scripts)))
2013

    
2014
  def _VerifyNodeNetwork(self, ninfo, nresult):
2015
    """Check the node network connectivity results.
2016

2017
    @type ninfo: L{objects.Node}
2018
    @param ninfo: the node to check
2019
    @param nresult: the remote results for the node
2020

2021
    """
2022
    test = constants.NV_NODELIST not in nresult
2023
    self._ErrorIf(test, constants.CV_ENODESSH, ninfo.name,
2024
                  "node hasn't returned node ssh connectivity data")
2025
    if not test:
2026
      if nresult[constants.NV_NODELIST]:
2027
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
2028
          self._ErrorIf(True, constants.CV_ENODESSH, ninfo.name,
2029
                        "ssh communication with node '%s': %s", a_node, a_msg)
2030

    
2031
    test = constants.NV_NODENETTEST not in nresult
2032
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
2033
                  "node hasn't returned node tcp connectivity data")
2034
    if not test:
2035
      if nresult[constants.NV_NODENETTEST]:
2036
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
2037
        for anode in nlist:
2038
          self._ErrorIf(True, constants.CV_ENODENET, ninfo.name,
2039
                        "tcp communication with node '%s': %s",
2040
                        anode, nresult[constants.NV_NODENETTEST][anode])
2041

    
2042
    test = constants.NV_MASTERIP not in nresult
2043
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
2044
                  "node hasn't returned node master IP reachability data")
2045
    if not test:
2046
      if not nresult[constants.NV_MASTERIP]:
2047
        if ninfo.uuid == self.master_node:
2048
          msg = "the master node cannot reach the master IP (not configured?)"
2049
        else:
2050
          msg = "cannot reach the master IP"
2051
        self._ErrorIf(True, constants.CV_ENODENET, ninfo.name, msg)
2052

    
2053
  def _VerifyInstance(self, instance, node_image, diskstatus):
2054
    """Verify an instance.
2055

2056
    This function checks to see if the required block devices are
2057
    available on the instance's node, and that the nodes are in the correct
2058
    state.
2059

2060
    """
2061
    pnode_uuid = instance.primary_node
2062
    pnode_img = node_image[pnode_uuid]
2063
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
2064

    
2065
    node_vol_should = {}
2066
    instance.MapLVsByNode(node_vol_should)
2067

    
2068
    cluster = self.cfg.GetClusterInfo()
2069
    ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2070
                                                            self.group_info)
2071
    err = ComputeIPolicyInstanceViolation(ipolicy, instance, self.cfg)
2072
    self._ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance.name,
2073
                  utils.CommaJoin(err), code=self.ETYPE_WARNING)
2074

    
2075
    for node_uuid in node_vol_should:
2076
      n_img = node_image[node_uuid]
2077
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
2078
        # ignore missing volumes on offline or broken nodes
2079
        continue
2080
      for volume in node_vol_should[node_uuid]:
2081
        test = volume not in n_img.volumes
2082
        self._ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance.name,
2083
                      "volume %s missing on node %s", volume,
2084
                      self.cfg.GetNodeName(node_uuid))
2085

    
2086
    if instance.admin_state == constants.ADMINST_UP:
2087
      test = instance.uuid not in pnode_img.instances and not pnode_img.offline
2088
      self._ErrorIf(test, constants.CV_EINSTANCEDOWN, instance.name,
2089
                    "instance not running on its primary node %s",
2090
                     self.cfg.GetNodeName(pnode_uuid))
2091
      self._ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE,
2092
                    instance.name, "instance is marked as running and lives on"
2093
                    " offline node %s", self.cfg.GetNodeName(pnode_uuid))
2094

    
2095
    diskdata = [(nname, success, status, idx)
2096
                for (nname, disks) in diskstatus.items()
2097
                for idx, (success, status) in enumerate(disks)]
2098

    
2099
    for nname, success, bdev_status, idx in diskdata:
2100
      # the 'ghost node' construction in Exec() ensures that we have a
2101
      # node here
2102
      snode = node_image[nname]
2103
      bad_snode = snode.ghost or snode.offline
2104
      self._ErrorIf(instance.disks_active and
2105
                    not success and not bad_snode,
2106
                    constants.CV_EINSTANCEFAULTYDISK, instance.name,
2107
                    "couldn't retrieve status for disk/%s on %s: %s",
2108
                    idx, self.cfg.GetNodeName(nname), bdev_status)
2109

    
2110
      if instance.disks_active and success and \
2111
         (bdev_status.is_degraded or
2112
          bdev_status.ldisk_status != constants.LDS_OKAY):
2113
        msg = "disk/%s on %s" % (idx, self.cfg.GetNodeName(nname))
2114
        if bdev_status.is_degraded:
2115
          msg += " is degraded"
2116
        if bdev_status.ldisk_status != constants.LDS_OKAY:
2117
          msg += "; state is '%s'" % \
2118
                 constants.LDS_NAMES[bdev_status.ldisk_status]
2119

    
2120
        self._Error(constants.CV_EINSTANCEFAULTYDISK, instance.name, msg)
2121

    
2122
    self._ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2123
                  constants.CV_ENODERPC, self.cfg.GetNodeName(pnode_uuid),
2124
                  "instance %s, connection to primary node failed",
2125
                  instance.name)
2126

    
2127
    self._ErrorIf(len(instance.secondary_nodes) > 1,
2128
                  constants.CV_EINSTANCELAYOUT, instance.name,
2129
                  "instance has multiple secondary nodes: %s",
2130
                  utils.CommaJoin(instance.secondary_nodes),
2131
                  code=self.ETYPE_WARNING)
2132

    
2133
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, instance.all_nodes)
2134
    if any(es_flags.values()):
2135
      if instance.disk_template not in constants.DTS_EXCL_STORAGE:
2136
        # Disk template not compatible with exclusive_storage: no instance
2137
        # node should have the flag set
2138
        es_nodes = [n
2139
                    for (n, es) in es_flags.items()
2140
                    if es]
2141
        self._Error(constants.CV_EINSTANCEUNSUITABLENODE, instance.name,
2142
                    "instance has template %s, which is not supported on nodes"
2143
                    " that have exclusive storage set: %s",
2144
                    instance.disk_template,
2145
                    utils.CommaJoin(self.cfg.GetNodeNames(es_nodes)))
2146
      for (idx, disk) in enumerate(instance.disks):
2147
        self._ErrorIf(disk.spindles is None,
2148
                      constants.CV_EINSTANCEMISSINGCFGPARAMETER, instance.name,
2149
                      "number of spindles not configured for disk %s while"
2150
                      " exclusive storage is enabled, try running"
2151
                      " gnt-cluster repair-disk-sizes", idx)
2152

    
2153
    if instance.disk_template in constants.DTS_INT_MIRROR:
2154
      instance_nodes = utils.NiceSort(instance.all_nodes)
2155
      instance_groups = {}
2156

    
2157
      for node_uuid in instance_nodes:
2158
        instance_groups.setdefault(self.all_node_info[node_uuid].group,
2159
                                   []).append(node_uuid)
2160

    
2161
      pretty_list = [
2162
        "%s (group %s)" % (utils.CommaJoin(self.cfg.GetNodeNames(nodes)),
2163
                           groupinfo[group].name)
2164
        # Sort so that we always list the primary node first.
2165
        for group, nodes in sorted(instance_groups.items(),
2166
                                   key=lambda (_, nodes): pnode_uuid in nodes,
2167
                                   reverse=True)]
2168

    
2169
      self._ErrorIf(len(instance_groups) > 1,
2170
                    constants.CV_EINSTANCESPLITGROUPS,
2171
                    instance.name, "instance has primary and secondary nodes in"
2172
                    " different groups: %s", utils.CommaJoin(pretty_list),
2173
                    code=self.ETYPE_WARNING)
2174

    
2175
    inst_nodes_offline = []
2176
    for snode in instance.secondary_nodes:
2177
      s_img = node_image[snode]
2178
      self._ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
2179
                    self.cfg.GetNodeName(snode),
2180
                    "instance %s, connection to secondary node failed",
2181
                    instance.name)
2182

    
2183
      if s_img.offline:
2184
        inst_nodes_offline.append(snode)
2185

    
2186
    # warn that the instance lives on offline nodes
2187
    self._ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE,
2188
                  instance.name, "instance has offline secondary node(s) %s",
2189
                  utils.CommaJoin(self.cfg.GetNodeNames(inst_nodes_offline)))
2190
    # ... or ghost/non-vm_capable nodes
2191
    for node_uuid in instance.all_nodes:
2192
      self._ErrorIf(node_image[node_uuid].ghost, constants.CV_EINSTANCEBADNODE,
2193
                    instance.name, "instance lives on ghost node %s",
2194
                    self.cfg.GetNodeName(node_uuid))
2195
      self._ErrorIf(not node_image[node_uuid].vm_capable,
2196
                    constants.CV_EINSTANCEBADNODE, instance.name,
2197
                    "instance lives on non-vm_capable node %s",
2198
                    self.cfg.GetNodeName(node_uuid))
2199

    
2200
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2201
    """Verify if there are any unknown volumes in the cluster.
2202

2203
    The .os, .swap and backup volumes are ignored. All other volumes are
2204
    reported as unknown.
2205

2206
    @type reserved: L{ganeti.utils.FieldSet}
2207
    @param reserved: a FieldSet of reserved volume names
2208

2209
    """
2210
    for node_uuid, n_img in node_image.items():
2211
      if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
2212
          self.all_node_info[node_uuid].group != self.group_uuid):
2213
        # skip non-healthy nodes
2214
        continue
2215
      for volume in n_img.volumes:
2216
        test = ((node_uuid not in node_vol_should or
2217
                volume not in node_vol_should[node_uuid]) and
2218
                not reserved.Matches(volume))
2219
        self._ErrorIf(test, constants.CV_ENODEORPHANLV,
2220
                      self.cfg.GetNodeName(node_uuid),
2221
                      "volume %s is unknown", volume,
2222
                      code=_VerifyErrors.ETYPE_WARNING)
2223

    
2224
  def _VerifyNPlusOneMemory(self, node_image, all_insts):
2225
    """Verify N+1 Memory Resilience.
2226

2227
    Check that if one single node dies we can still start all the
2228
    instances it was primary for.
2229

2230
    """
2231
    cluster_info = self.cfg.GetClusterInfo()
2232
    for node_uuid, n_img in node_image.items():
2233
      # This code checks that every node which is now listed as
2234
      # secondary has enough memory to host all instances it is
2235
      # supposed to should a single other node in the cluster fail.
2236
      # FIXME: not ready for failover to an arbitrary node
2237
      # FIXME: does not support file-backed instances
2238
      # WARNING: we currently take into account down instances as well
2239
      # as up ones, considering that even if they're down someone
2240
      # might want to start them even in the event of a node failure.
2241
      if n_img.offline or \
2242
         self.all_node_info[node_uuid].group != self.group_uuid:
2243
        # we're skipping nodes marked offline and nodes in other groups from
2244
        # the N+1 warning, since most likely we don't have good memory
2245
        # information from them; we already list instances living on such
2246
        # nodes, and that's enough warning
2247
        continue
2248
      #TODO(dynmem): also consider ballooning out other instances
2249
      for prinode, inst_uuids in n_img.sbp.items():
2250
        needed_mem = 0
2251
        for inst_uuid in inst_uuids:
2252
          bep = cluster_info.FillBE(all_insts[inst_uuid])
2253
          if bep[constants.BE_AUTO_BALANCE]:
2254
            needed_mem += bep[constants.BE_MINMEM]
2255
        test = n_img.mfree < needed_mem
2256
        self._ErrorIf(test, constants.CV_ENODEN1,
2257
                      self.cfg.GetNodeName(node_uuid),
2258
                      "not enough memory to accomodate instance failovers"
2259
                      " should node %s fail (%dMiB needed, %dMiB available)",
2260
                      self.cfg.GetNodeName(prinode), needed_mem, n_img.mfree)
2261

    
2262
  def _VerifyFiles(self, nodes, master_node_uuid, all_nvinfo,
2263
                   (files_all, files_opt, files_mc, files_vm)):
2264
    """Verifies file checksums collected from all nodes.
2265

2266
    @param nodes: List of L{objects.Node} objects
2267
    @param master_node_uuid: UUID of master node
2268
    @param all_nvinfo: RPC results
2269

2270
    """
2271
    # Define functions determining which nodes to consider for a file
2272
    files2nodefn = [
2273
      (files_all, None),
2274
      (files_mc, lambda node: (node.master_candidate or
2275
                               node.uuid == master_node_uuid)),
2276
      (files_vm, lambda node: node.vm_capable),
2277
      ]
2278

    
2279
    # Build mapping from filename to list of nodes which should have the file
2280
    nodefiles = {}
2281
    for (files, fn) in files2nodefn:
2282
      if fn is None:
2283
        filenodes = nodes
2284
      else:
2285
        filenodes = filter(fn, nodes)
2286
      nodefiles.update((filename,
2287
                        frozenset(map(operator.attrgetter("uuid"), filenodes)))
2288
                       for filename in files)
2289

    
2290
    assert set(nodefiles) == (files_all | files_mc | files_vm)
2291

    
2292
    fileinfo = dict((filename, {}) for filename in nodefiles)
2293
    ignore_nodes = set()
2294

    
2295
    for node in nodes:
2296
      if node.offline:
2297
        ignore_nodes.add(node.uuid)
2298
        continue
2299

    
2300
      nresult = all_nvinfo[node.uuid]
2301

    
2302
      if nresult.fail_msg or not nresult.payload:
2303
        node_files = None
2304
      else:
2305
        fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
2306
        node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2307
                          for (key, value) in fingerprints.items())
2308
        del fingerprints
2309

    
2310
      test = not (node_files and isinstance(node_files, dict))
2311
      self._ErrorIf(test, constants.CV_ENODEFILECHECK, node.name,
2312
                    "Node did not return file checksum data")
2313
      if test:
2314
        ignore_nodes.add(node.uuid)
2315
        continue
2316

    
2317
      # Build per-checksum mapping from filename to nodes having it
2318
      for (filename, checksum) in node_files.items():
2319
        assert filename in nodefiles
2320
        fileinfo[filename].setdefault(checksum, set()).add(node.uuid)
2321

    
2322
    for (filename, checksums) in fileinfo.items():
2323
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2324

    
2325
      # Nodes having the file
2326
      with_file = frozenset(node_uuid
2327
                            for node_uuids in fileinfo[filename].values()
2328
                            for node_uuid in node_uuids) - ignore_nodes
2329

    
2330
      expected_nodes = nodefiles[filename] - ignore_nodes
2331

    
2332
      # Nodes missing file
2333
      missing_file = expected_nodes - with_file
2334

    
2335
      if filename in files_opt:
2336
        # All or no nodes
2337
        self._ErrorIf(missing_file and missing_file != expected_nodes,
2338
                      constants.CV_ECLUSTERFILECHECK, None,
2339
                      "File %s is optional, but it must exist on all or no"
2340
                      " nodes (not found on %s)",
2341
                      filename,
2342
                      utils.CommaJoin(
2343
                        utils.NiceSort(
2344
                          map(self.cfg.GetNodeName, missing_file))))
2345
      else:
2346
        self._ErrorIf(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2347
                      "File %s is missing from node(s) %s", filename,
2348
                      utils.CommaJoin(
2349
                        utils.NiceSort(
2350
                          map(self.cfg.GetNodeName, missing_file))))
2351

    
2352
        # Warn if a node has a file it shouldn't
2353
        unexpected = with_file - expected_nodes
2354
        self._ErrorIf(unexpected,
2355
                      constants.CV_ECLUSTERFILECHECK, None,
2356
                      "File %s should not exist on node(s) %s",
2357
                      filename, utils.CommaJoin(
2358
                        utils.NiceSort(map(self.cfg.GetNodeName, unexpected))))
2359

    
2360
      # See if there are multiple versions of the file
2361
      test = len(checksums) > 1
2362
      if test:
2363
        variants = ["variant %s on %s" %
2364
                    (idx + 1,
2365
                     utils.CommaJoin(utils.NiceSort(
2366
                       map(self.cfg.GetNodeName, node_uuids))))
2367
                    for (idx, (checksum, node_uuids)) in
2368
                      enumerate(sorted(checksums.items()))]
2369
      else:
2370
        variants = []
2371

    
2372
      self._ErrorIf(test, constants.CV_ECLUSTERFILECHECK, None,
2373
                    "File %s found with %s different checksums (%s)",
2374
                    filename, len(checksums), "; ".join(variants))
2375

    
2376
  def _VerifyNodeDrbdHelper(self, ninfo, nresult, drbd_helper):
2377
    """Verify the drbd helper.
2378

2379
    """
2380
    if drbd_helper:
2381
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2382
      test = (helper_result is None)
2383
      self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2384
                    "no drbd usermode helper returned")
2385
      if helper_result:
2386
        status, payload = helper_result
2387
        test = not status
2388
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2389
                      "drbd usermode helper check unsuccessful: %s", payload)
2390
        test = status and (payload != drbd_helper)
2391
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2392
                      "wrong drbd usermode helper: %s", payload)
2393

    
2394
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2395
                      drbd_map):
2396
    """Verifies and the node DRBD status.
2397

2398
    @type ninfo: L{objects.Node}
2399
    @param ninfo: the node to check
2400
    @param nresult: the remote results for the node
2401
    @param instanceinfo: the dict of instances
2402
    @param drbd_helper: the configured DRBD usermode helper
2403
    @param drbd_map: the DRBD map as returned by
2404
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2405

2406
    """
2407
    self._VerifyNodeDrbdHelper(ninfo, nresult, drbd_helper)
2408

    
2409
    # compute the DRBD minors
2410
    node_drbd = {}
2411
    for minor, inst_uuid in drbd_map[ninfo.uuid].items():
2412
      test = inst_uuid not in instanceinfo
2413
      self._ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2414
                    "ghost instance '%s' in temporary DRBD map", inst_uuid)
2415
        # ghost instance should not be running, but otherwise we
2416
        # don't give double warnings (both ghost instance and
2417
        # unallocated minor in use)
2418
      if test:
2419
        node_drbd[minor] = (inst_uuid, False)
2420
      else:
2421
        instance = instanceinfo[inst_uuid]
2422
        node_drbd[minor] = (inst_uuid, instance.disks_active)
2423

    
2424
    # and now check them
2425
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2426
    test = not isinstance(used_minors, (tuple, list))
2427
    self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2428
                  "cannot parse drbd status file: %s", str(used_minors))
2429
    if test:
2430
      # we cannot check drbd status
2431
      return
2432

    
2433
    for minor, (inst_uuid, must_exist) in node_drbd.items():
2434
      test = minor not in used_minors and must_exist
2435
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2436
                    "drbd minor %d of instance %s is not active", minor,
2437
                    self.cfg.GetInstanceName(inst_uuid))
2438
    for minor in used_minors:
2439
      test = minor not in node_drbd
2440
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2441
                    "unallocated drbd minor %d is in use", minor)
2442

    
2443
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2444
    """Builds the node OS structures.
2445

2446
    @type ninfo: L{objects.Node}
2447
    @param ninfo: the node to check
2448
    @param nresult: the remote results for the node
2449
    @param nimg: the node image object
2450

2451
    """
2452
    remote_os = nresult.get(constants.NV_OSLIST, None)
2453
    test = (not isinstance(remote_os, list) or
2454
            not compat.all(isinstance(v, list) and len(v) == 7
2455
                           for v in remote_os))
2456

    
2457
    self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2458
                  "node hasn't returned valid OS data")
2459

    
2460
    nimg.os_fail = test
2461

    
2462
    if test:
2463
      return
2464

    
2465
    os_dict = {}
2466

    
2467
    for (name, os_path, status, diagnose,
2468
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2469

    
2470
      if name not in os_dict:
2471
        os_dict[name] = []
2472

    
2473
      # parameters is a list of lists instead of list of tuples due to
2474
      # JSON lacking a real tuple type, fix it:
2475
      parameters = [tuple(v) for v in parameters]
2476
      os_dict[name].append((os_path, status, diagnose,
2477
                            set(variants), set(parameters), set(api_ver)))
2478

    
2479
    nimg.oslist = os_dict
2480

    
2481
  def _VerifyNodeOS(self, ninfo, nimg, base):
2482
    """Verifies the node OS list.
2483

2484
    @type ninfo: L{objects.Node}
2485
    @param ninfo: the node to check
2486
    @param nimg: the node image object
2487
    @param base: the 'template' node we match against (e.g. from the master)
2488

2489
    """
2490
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2491

    
2492
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2493
    for os_name, os_data in nimg.oslist.items():
2494
      assert os_data, "Empty OS status for OS %s?!" % os_name
2495
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2496
      self._ErrorIf(not f_status, constants.CV_ENODEOS, ninfo.name,
2497
                    "Invalid OS %s (located at %s): %s",
2498
                    os_name, f_path, f_diag)
2499
      self._ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, ninfo.name,
2500
                    "OS '%s' has multiple entries"
2501
                    " (first one shadows the rest): %s",
2502
                    os_name, utils.CommaJoin([v[0] for v in os_data]))
2503
      # comparisons with the 'base' image
2504
      test = os_name not in base.oslist
2505
      self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2506
                    "Extra OS %s not present on reference node (%s)",
2507
                    os_name, self.cfg.GetNodeName(base.uuid))
2508
      if test:
2509
        continue
2510
      assert base.oslist[os_name], "Base node has empty OS status?"
2511
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2512
      if not b_status:
2513
        # base OS is invalid, skipping
2514
        continue
2515
      for kind, a, b in [("API version", f_api, b_api),
2516
                         ("variants list", f_var, b_var),
2517
                         ("parameters", beautify_params(f_param),
2518
                          beautify_params(b_param))]:
2519
        self._ErrorIf(a != b, constants.CV_ENODEOS, ninfo.name,
2520
                      "OS %s for %s differs from reference node %s:"
2521
                      " [%s] vs. [%s]", kind, os_name,
2522
                      self.cfg.GetNodeName(base.uuid),
2523
                      utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2524

    
2525
    # check any missing OSes
2526
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2527
    self._ErrorIf(missing, constants.CV_ENODEOS, ninfo.name,
2528
                  "OSes present on reference node %s"
2529
                  " but missing on this node: %s",
2530
                  self.cfg.GetNodeName(base.uuid), utils.CommaJoin(missing))
2531

    
2532
  def _VerifyAcceptedFileStoragePaths(self, ninfo, nresult, is_master):
2533
    """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2534

2535
    @type ninfo: L{objects.Node}
2536
    @param ninfo: the node to check
2537
    @param nresult: the remote results for the node
2538
    @type is_master: bool
2539
    @param is_master: Whether node is the master node
2540

2541
    """
2542
    cluster = self.cfg.GetClusterInfo()
2543
    if (is_master and
2544
        (cluster.IsFileStorageEnabled() or
2545
         cluster.IsSharedFileStorageEnabled())):
2546
      try:
2547
        fspaths = nresult[constants.NV_ACCEPTED_STORAGE_PATHS]
2548
      except KeyError:
2549
        # This should never happen
2550
        self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2551
                      "Node did not return forbidden file storage paths")
2552
      else:
2553
        self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2554
                      "Found forbidden file storage paths: %s",
2555
                      utils.CommaJoin(fspaths))
2556
    else:
2557
      self._ErrorIf(constants.NV_ACCEPTED_STORAGE_PATHS in nresult,
2558
                    constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2559
                    "Node should not have returned forbidden file storage"
2560
                    " paths")
2561

    
2562
  def _VerifyStoragePaths(self, ninfo, nresult, file_disk_template,
2563
                          verify_key, error_key):
2564
    """Verifies (file) storage paths.
2565

2566
    @type ninfo: L{objects.Node}
2567
    @param ninfo: the node to check
2568
    @param nresult: the remote results for the node
2569
    @type file_disk_template: string
2570
    @param file_disk_template: file-based disk template, whose directory
2571
        is supposed to be verified
2572
    @type verify_key: string
2573
    @param verify_key: key for the verification map of this file
2574
        verification step
2575
    @param error_key: error key to be added to the verification results
2576
        in case something goes wrong in this verification step
2577

2578
    """
2579
    assert (file_disk_template in
2580
            utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
2581
    cluster = self.cfg.GetClusterInfo()
2582
    if cluster.IsDiskTemplateEnabled(file_disk_template):
2583
      self._ErrorIf(
2584
          verify_key in nresult,
2585
          error_key, ninfo.name,
2586
          "The configured %s storage path is unusable: %s" %
2587
          (file_disk_template, nresult.get(verify_key)))
2588

    
2589
  def _VerifyFileStoragePaths(self, ninfo, nresult):
2590
    """Verifies (file) storage paths.
2591

2592
    @see: C{_VerifyStoragePaths}
2593

2594
    """
2595
    self._VerifyStoragePaths(
2596
        ninfo, nresult, constants.DT_FILE,
2597
        constants.NV_FILE_STORAGE_PATH,
2598
        constants.CV_ENODEFILESTORAGEPATHUNUSABLE)
2599

    
2600
  def _VerifySharedFileStoragePaths(self, ninfo, nresult):
2601
    """Verifies (file) storage paths.
2602

2603
    @see: C{_VerifyStoragePaths}
2604

2605
    """
2606
    self._VerifyStoragePaths(
2607
        ninfo, nresult, constants.DT_SHARED_FILE,
2608
        constants.NV_SHARED_FILE_STORAGE_PATH,
2609
        constants.CV_ENODESHAREDFILESTORAGEPATHUNUSABLE)
2610

    
2611
  def _VerifyOob(self, ninfo, nresult):
2612
    """Verifies out of band functionality of a node.
2613

2614
    @type ninfo: L{objects.Node}
2615
    @param ninfo: the node to check
2616
    @param nresult: the remote results for the node
2617

2618
    """
2619
    # We just have to verify the paths on master and/or master candidates
2620
    # as the oob helper is invoked on the master
2621
    if ((ninfo.master_candidate or ninfo.master_capable) and
2622
        constants.NV_OOB_PATHS in nresult):
2623
      for path_result in nresult[constants.NV_OOB_PATHS]:
2624
        self._ErrorIf(path_result, constants.CV_ENODEOOBPATH,
2625
                      ninfo.name, path_result)
2626

    
2627
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2628
    """Verifies and updates the node volume data.
2629

2630
    This function will update a L{NodeImage}'s internal structures
2631
    with data from the remote call.
2632

2633
    @type ninfo: L{objects.Node}
2634
    @param ninfo: the node to check
2635
    @param nresult: the remote results for the node
2636
    @param nimg: the node image object
2637
    @param vg_name: the configured VG name
2638

2639
    """
2640
    nimg.lvm_fail = True
2641
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2642
    if vg_name is None:
2643
      pass
2644
    elif isinstance(lvdata, basestring):
2645
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2646
                    "LVM problem on node: %s", utils.SafeEncode(lvdata))
2647
    elif not isinstance(lvdata, dict):
2648
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2649
                    "rpc call to node failed (lvlist)")
2650
    else:
2651
      nimg.volumes = lvdata
2652
      nimg.lvm_fail = False
2653

    
2654
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2655
    """Verifies and updates the node instance list.
2656

2657
    If the listing was successful, then updates this node's instance
2658
    list. Otherwise, it marks the RPC call as failed for the instance
2659
    list key.
2660

2661
    @type ninfo: L{objects.Node}
2662
    @param ninfo: the node to check
2663
    @param nresult: the remote results for the node
2664
    @param nimg: the node image object
2665

2666
    """
2667
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2668
    test = not isinstance(idata, list)
2669
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2670
                  "rpc call to node failed (instancelist): %s",
2671
                  utils.SafeEncode(str(idata)))
2672
    if test:
2673
      nimg.hyp_fail = True
2674
    else:
2675
      nimg.instances = [inst.uuid for (_, inst) in
2676
                        self.cfg.GetMultiInstanceInfoByName(idata)]
2677

    
2678
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2679
    """Verifies and computes a node information map
2680

2681
    @type ninfo: L{objects.Node}
2682
    @param ninfo: the node to check
2683
    @param nresult: the remote results for the node
2684
    @param nimg: the node image object
2685
    @param vg_name: the configured VG name
2686

2687
    """
2688
    # try to read free memory (from the hypervisor)
2689
    hv_info = nresult.get(constants.NV_HVINFO, None)
2690
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2691
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2692
                  "rpc call to node failed (hvinfo)")
2693
    if not test:
2694
      try:
2695
        nimg.mfree = int(hv_info["memory_free"])
2696
      except (ValueError, TypeError):
2697
        self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2698
                      "node returned invalid nodeinfo, check hypervisor")
2699

    
2700
    # FIXME: devise a free space model for file based instances as well
2701
    if vg_name is not None:
2702
      test = (constants.NV_VGLIST not in nresult or
2703
              vg_name not in nresult[constants.NV_VGLIST])
2704
      self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
2705
                    "node didn't return data for the volume group '%s'"
2706
                    " - it is either missing or broken", vg_name)
2707
      if not test:
2708
        try:
2709
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2710
        except (ValueError, TypeError):
2711
          self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2712
                        "node returned invalid LVM info, check LVM status")
2713

    
2714
  def _CollectDiskInfo(self, node_uuids, node_image, instanceinfo):
2715
    """Gets per-disk status information for all instances.
2716

2717
    @type node_uuids: list of strings
2718
    @param node_uuids: Node UUIDs
2719
    @type node_image: dict of (UUID, L{objects.Node})
2720
    @param node_image: Node objects
2721
    @type instanceinfo: dict of (UUID, L{objects.Instance})
2722
    @param instanceinfo: Instance objects
2723
    @rtype: {instance: {node: [(succes, payload)]}}
2724
    @return: a dictionary of per-instance dictionaries with nodes as
2725
        keys and disk information as values; the disk information is a
2726
        list of tuples (success, payload)
2727

2728
    """
2729
    node_disks = {}
2730
    node_disks_dev_inst_only = {}
2731
    diskless_instances = set()
2732
    diskless = constants.DT_DISKLESS
2733

    
2734
    for nuuid in node_uuids:
2735
      node_inst_uuids = list(itertools.chain(node_image[nuuid].pinst,
2736
                                             node_image[nuuid].sinst))
2737
      diskless_instances.update(uuid for uuid in node_inst_uuids
2738
                                if instanceinfo[uuid].disk_template == diskless)
2739
      disks = [(inst_uuid, disk)
2740
               for inst_uuid in node_inst_uuids
2741
               for disk in instanceinfo[inst_uuid].disks]
2742

    
2743
      if not disks:
2744
        # No need to collect data
2745
        continue
2746

    
2747
      node_disks[nuuid] = disks
2748

    
2749
      # _AnnotateDiskParams makes already copies of the disks
2750
      dev_inst_only = []
2751
      for (inst_uuid, dev) in disks:
2752
        (anno_disk,) = AnnotateDiskParams(instanceinfo[inst_uuid], [dev],
2753
                                          self.cfg)
2754
        dev_inst_only.append((anno_disk, instanceinfo[inst_uuid]))
2755

    
2756
      node_disks_dev_inst_only[nuuid] = dev_inst_only
2757

    
2758
    assert len(node_disks) == len(node_disks_dev_inst_only)
2759

    
2760
    # Collect data from all nodes with disks
2761
    result = self.rpc.call_blockdev_getmirrorstatus_multi(
2762
               node_disks.keys(), node_disks_dev_inst_only)
2763

    
2764
    assert len(result) == len(node_disks)
2765

    
2766
    instdisk = {}
2767

    
2768
    for (nuuid, nres) in result.items():
2769
      node = self.cfg.GetNodeInfo(nuuid)
2770
      disks = node_disks[node.uuid]
2771

    
2772
      if nres.offline:
2773
        # No data from this node
2774
        data = len(disks) * [(False, "node offline")]
2775
      else:
2776
        msg = nres.fail_msg
2777
        self._ErrorIf(msg, constants.CV_ENODERPC, node.name,
2778
                      "while getting disk information: %s", msg)
2779
        if msg:
2780
          # No data from this node
2781
          data = len(disks) * [(False, msg)]
2782
        else:
2783
          data = []
2784
          for idx, i in enumerate(nres.payload):
2785
            if isinstance(i, (tuple, list)) and len(i) == 2:
2786
              data.append(i)
2787
            else:
2788
              logging.warning("Invalid result from node %s, entry %d: %s",
2789
                              node.name, idx, i)
2790
              data.append((False, "Invalid result from the remote node"))
2791

    
2792
      for ((inst_uuid, _), status) in zip(disks, data):
2793
        instdisk.setdefault(inst_uuid, {}).setdefault(node.uuid, []) \
2794
          .append(status)
2795

    
2796
    # Add empty entries for diskless instances.
2797
    for inst_uuid in diskless_instances:
2798
      assert inst_uuid not in instdisk
2799
      instdisk[inst_uuid] = {}
2800

    
2801
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2802
                      len(nuuids) <= len(instanceinfo[inst].all_nodes) and
2803
                      compat.all(isinstance(s, (tuple, list)) and
2804
                                 len(s) == 2 for s in statuses)
2805
                      for inst, nuuids in instdisk.items()
2806
                      for nuuid, statuses in nuuids.items())
2807
    if __debug__:
2808
      instdisk_keys = set(instdisk)
2809
      instanceinfo_keys = set(instanceinfo)
2810
      assert instdisk_keys == instanceinfo_keys, \
2811
        ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2812
         (instdisk_keys, instanceinfo_keys))
2813

    
2814
    return instdisk
2815

    
2816
  @staticmethod
2817
  def _SshNodeSelector(group_uuid, all_nodes):
2818
    """Create endless iterators for all potential SSH check hosts.
2819

2820
    """
2821
    nodes = [node for node in all_nodes
2822
             if (node.group != group_uuid and
2823
                 not node.offline)]
2824
    keyfunc = operator.attrgetter("group")
2825

    
2826
    return map(itertools.cycle,
2827
               [sorted(map(operator.attrgetter("name"), names))
2828
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2829
                                                  keyfunc)])
2830

    
2831
  @classmethod
2832
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2833
    """Choose which nodes should talk to which other nodes.
2834

2835
    We will make nodes contact all nodes in their group, and one node from
2836
    every other group.
2837

2838
    @warning: This algorithm has a known issue if one node group is much
2839
      smaller than others (e.g. just one node). In such a case all other
2840
      nodes will talk to the single node.
2841

2842
    """
2843
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2844
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
2845

    
2846
    return (online_nodes,
2847
            dict((name, sorted([i.next() for i in sel]))
2848
                 for name in online_nodes))
2849

    
2850
  def BuildHooksEnv(self):
2851
    """Build hooks env.
2852

2853
    Cluster-Verify hooks just ran in the post phase and their failure makes
2854
    the output be logged in the verify output and the verification to fail.
2855

2856
    """
2857
    env = {
2858
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2859
      }
2860

    
2861
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2862
               for node in self.my_node_info.values())
2863

    
2864
    return env
2865

    
2866
  def BuildHooksNodes(self):
2867
    """Build hooks nodes.
2868

2869
    """
2870
    return ([], list(self.my_node_info.keys()))
2871

    
2872
  def Exec(self, feedback_fn):
2873
    """Verify integrity of the node group, performing various test on nodes.
2874

2875
    """
2876
    # This method has too many local variables. pylint: disable=R0914
2877
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
2878

    
2879
    if not self.my_node_uuids:
2880
      # empty node group
2881
      feedback_fn("* Empty node group, skipping verification")
2882
      return True
2883

    
2884
    self.bad = False
2885
    verbose = self.op.verbose
2886
    self._feedback_fn = feedback_fn
2887

    
2888
    vg_name = self.cfg.GetVGName()
2889
    drbd_helper = self.cfg.GetDRBDHelper()
2890
    cluster = self.cfg.GetClusterInfo()
2891
    hypervisors = cluster.enabled_hypervisors
2892
    node_data_list = self.my_node_info.values()
2893

    
2894
    i_non_redundant = [] # Non redundant instances
2895
    i_non_a_balanced = [] # Non auto-balanced instances
2896
    i_offline = 0 # Count of offline instances
2897
    n_offline = 0 # Count of offline nodes
2898
    n_drained = 0 # Count of nodes being drained
2899
    node_vol_should = {}
2900

    
2901
    # FIXME: verify OS list
2902

    
2903
    # File verification
2904
    filemap = ComputeAncillaryFiles(cluster, False)
2905

    
2906
    # do local checksums
2907
    master_node_uuid = self.master_node = self.cfg.GetMasterNode()
2908
    master_ip = self.cfg.GetMasterIP()
2909

    
2910
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_uuids))
2911

    
2912
    user_scripts = []
2913
    if self.cfg.GetUseExternalMipScript():
2914
      user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2915

    
2916
    node_verify_param = {
2917
      constants.NV_FILELIST:
2918
        map(vcluster.MakeVirtualPath,
2919
            utils.UniqueSequence(filename
2920
                                 for files in filemap
2921
                                 for filename in files)),
2922
      constants.NV_NODELIST:
2923
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2924
                                  self.all_node_info.values()),
2925
      constants.NV_HYPERVISOR: hypervisors,
2926
      constants.NV_HVPARAMS:
2927
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2928
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2929
                                 for node in node_data_list
2930
                                 if not node.offline],
2931
      constants.NV_INSTANCELIST: hypervisors,
2932
      constants.NV_VERSION: None,
2933
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2934
      constants.NV_NODESETUP: None,
2935
      constants.NV_TIME: None,
2936
      constants.NV_MASTERIP: (self.cfg.GetMasterNodeName(), master_ip),
2937
      constants.NV_OSLIST: None,
2938
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2939
      constants.NV_USERSCRIPTS: user_scripts,
2940
      }
2941

    
2942
    if vg_name is not None:
2943
      node_verify_param[constants.NV_VGLIST] = None
2944
      node_verify_param[constants.NV_LVLIST] = vg_name
2945
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2946

    
2947
    if cluster.IsDiskTemplateEnabled(constants.DT_DRBD8):
2948
      if drbd_helper:
2949
        node_verify_param[constants.NV_DRBDVERSION] = None
2950
        node_verify_param[constants.NV_DRBDLIST] = None
2951
        node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2952

    
2953
    if cluster.IsFileStorageEnabled() or \
2954
        cluster.IsSharedFileStorageEnabled():
2955
      # Load file storage paths only from master node
2956
      node_verify_param[constants.NV_ACCEPTED_STORAGE_PATHS] = \
2957
        self.cfg.GetMasterNodeName()
2958
      if cluster.IsFileStorageEnabled():
2959
        node_verify_param[constants.NV_FILE_STORAGE_PATH] = \
2960
          cluster.file_storage_dir
2961

    
2962
    # bridge checks
2963
    # FIXME: this needs to be changed per node-group, not cluster-wide
2964
    bridges = set()
2965
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2966
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2967
      bridges.add(default_nicpp[constants.NIC_LINK])
2968
    for inst_uuid in self.my_inst_info.values():
2969
      for nic in inst_uuid.nics:
2970
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
2971
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2972
          bridges.add(full_nic[constants.NIC_LINK])
2973

    
2974
    if bridges:
2975
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
2976

    
2977
    # Build our expected cluster state
2978
    node_image = dict((node.uuid, self.NodeImage(offline=node.offline,
2979
                                                 uuid=node.uuid,
2980
                                                 vm_capable=node.vm_capable))
2981
                      for node in node_data_list)
2982

    
2983
    # Gather OOB paths
2984
    oob_paths = []
2985
    for node in self.all_node_info.values():
2986
      path = SupportsOob(self.cfg, node)
2987
      if path and path not in oob_paths:
2988
        oob_paths.append(path)
2989

    
2990
    if oob_paths:
2991
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2992

    
2993
    for inst_uuid in self.my_inst_uuids:
2994
      instance = self.my_inst_info[inst_uuid]
2995
      if instance.admin_state == constants.ADMINST_OFFLINE:
2996
        i_offline += 1
2997

    
2998
      for nuuid in instance.all_nodes:
2999
        if nuuid not in node_image:
3000
          gnode = self.NodeImage(uuid=nuuid)
3001
          gnode.ghost = (nuuid not in self.all_node_info)
3002
          node_image[nuuid] = gnode
3003

    
3004
      instance.MapLVsByNode(node_vol_should)
3005

    
3006
      pnode = instance.primary_node
3007
      node_image[pnode].pinst.append(instance.uuid)
3008

    
3009
      for snode in instance.secondary_nodes:
3010
        nimg = node_image[snode]
3011
        nimg.sinst.append(instance.uuid)
3012
        if pnode not in nimg.sbp:
3013
          nimg.sbp[pnode] = []
3014
        nimg.sbp[pnode].append(instance.uuid)
3015

    
3016
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
3017
                                               self.my_node_info.keys())
3018
    # The value of exclusive_storage should be the same across the group, so if
3019
    # it's True for at least a node, we act as if it were set for all the nodes
3020
    self._exclusive_storage = compat.any(es_flags.values())
3021
    if self._exclusive_storage:
3022
      node_verify_param[constants.NV_EXCLUSIVEPVS] = True
3023

    
3024
    node_group_uuids = dict(map(lambda n: (n.name, n.group),
3025
                                self.cfg.GetAllNodesInfo().values()))
3026
    groups_config = self.cfg.GetAllNodeGroupsInfoDict()
3027

    
3028
    # At this point, we have the in-memory data structures complete,
3029
    # except for the runtime information, which we'll gather next
3030

    
3031
    # Due to the way our RPC system works, exact response times cannot be
3032
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
3033
    # time before and after executing the request, we can at least have a time
3034
    # window.
3035
    nvinfo_starttime = time.time()
3036
    all_nvinfo = self.rpc.call_node_verify(self.my_node_uuids,
3037
                                           node_verify_param,
3038
                                           self.cfg.GetClusterName(),
3039
                                           self.cfg.GetClusterInfo().hvparams,
3040
                                           node_group_uuids,
3041
                                           groups_config)
3042
    nvinfo_endtime = time.time()
3043

    
3044
    if self.extra_lv_nodes and vg_name is not None:
3045
      extra_lv_nvinfo = \
3046
          self.rpc.call_node_verify(self.extra_lv_nodes,
3047
                                    {constants.NV_LVLIST: vg_name},
3048
                                    self.cfg.GetClusterName(),
3049
                                    self.cfg.GetClusterInfo().hvparams,
3050
                                    node_group_uuids,
3051
                                    groups_config)
3052
    else:
3053
      extra_lv_nvinfo = {}
3054

    
3055
    all_drbd_map = self.cfg.ComputeDRBDMap()
3056

    
3057
    feedback_fn("* Gathering disk information (%s nodes)" %
3058
                len(self.my_node_uuids))
3059
    instdisk = self._CollectDiskInfo(self.my_node_info.keys(), node_image,
3060
                                     self.my_inst_info)
3061

    
3062
    feedback_fn("* Verifying configuration file consistency")
3063

    
3064
    # If not all nodes are being checked, we need to make sure the master node
3065
    # and a non-checked vm_capable node are in the list.
3066
    absent_node_uuids = set(self.all_node_info).difference(self.my_node_info)
3067
    if absent_node_uuids:
3068
      vf_nvinfo = all_nvinfo.copy()
3069
      vf_node_info = list(self.my_node_info.values())
3070
      additional_node_uuids = []
3071
      if master_node_uuid not in self.my_node_info:
3072
        additional_node_uuids.append(master_node_uuid)
3073
        vf_node_info.append(self.all_node_info[master_node_uuid])
3074
      # Add the first vm_capable node we find which is not included,
3075
      # excluding the master node (which we already have)
3076
      for node_uuid in absent_node_uuids:
3077
        nodeinfo = self.all_node_info[node_uuid]
3078
        if (nodeinfo.vm_capable and not nodeinfo.offline and
3079
            node_uuid != master_node_uuid):
3080
          additional_node_uuids.append(node_uuid)
3081
          vf_node_info.append(self.all_node_info[node_uuid])
3082
          break
3083
      key = constants.NV_FILELIST
3084
      vf_nvinfo.update(self.rpc.call_node_verify(
3085
         additional_node_uuids, {key: node_verify_param[key]},
3086
         self.cfg.GetClusterName(), self.cfg.GetClusterInfo().hvparams,
3087
         node_group_uuids,
3088
         groups_config))
3089
    else:
3090
      vf_nvinfo = all_nvinfo
3091
      vf_node_info = self.my_node_info.values()
3092

    
3093
    self._VerifyFiles(vf_node_info, master_node_uuid, vf_nvinfo, filemap)
3094

    
3095
    feedback_fn("* Verifying node status")
3096

    
3097
    refos_img = None
3098

    
3099
    for node_i in node_data_list:
3100
      nimg = node_image[node_i.uuid]
3101

    
3102
      if node_i.offline:
3103
        if verbose:
3104
          feedback_fn("* Skipping offline node %s" % (node_i.name,))
3105
        n_offline += 1
3106
        continue
3107

    
3108
      if node_i.uuid == master_node_uuid:
3109
        ntype = "master"
3110
      elif node_i.master_candidate:
3111
        ntype = "master candidate"
3112
      elif node_i.drained:
3113
        ntype = "drained"
3114
        n_drained += 1
3115
      else:
3116
        ntype = "regular"
3117
      if verbose:
3118
        feedback_fn("* Verifying node %s (%s)" % (node_i.name, ntype))
3119

    
3120
      msg = all_nvinfo[node_i.uuid].fail_msg
3121
      self._ErrorIf(msg, constants.CV_ENODERPC, node_i.name,
3122
                    "while contacting node: %s", msg)
3123
      if msg:
3124
        nimg.rpc_fail = True
3125
        continue
3126

    
3127
      nresult = all_nvinfo[node_i.uuid].payload
3128

    
3129
      nimg.call_ok = self._VerifyNode(node_i, nresult)
3130
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
3131
      self._VerifyNodeNetwork(node_i, nresult)
3132
      self._VerifyNodeUserScripts(node_i, nresult)
3133
      self._VerifyOob(node_i, nresult)
3134
      self._VerifyAcceptedFileStoragePaths(node_i, nresult,
3135
                                           node_i.uuid == master_node_uuid)
3136
      self._VerifyFileStoragePaths(node_i, nresult)
3137
      self._VerifySharedFileStoragePaths(node_i, nresult)
3138

    
3139
      if nimg.vm_capable:
3140
        self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
3141
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
3142
                             all_drbd_map)
3143

    
3144
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
3145
        self._UpdateNodeInstances(node_i, nresult, nimg)
3146
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
3147
        self._UpdateNodeOS(node_i, nresult, nimg)
3148

    
3149
        if not nimg.os_fail:
3150
          if refos_img is None:
3151
            refos_img = nimg
3152
          self._VerifyNodeOS(node_i, nimg, refos_img)
3153
        self._VerifyNodeBridges(node_i, nresult, bridges)
3154

    
3155
        # Check whether all running instances are primary for the node. (This
3156
        # can no longer be done from _VerifyInstance below, since some of the
3157
        # wrong instances could be from other node groups.)
3158
        non_primary_inst_uuids = set(nimg.instances).difference(nimg.pinst)
3159

    
3160
        for inst_uuid in non_primary_inst_uuids:
3161
          test = inst_uuid in self.all_inst_info
3162
          self._ErrorIf(test, constants.CV_EINSTANCEWRONGNODE,
3163
                        self.cfg.GetInstanceName(inst_uuid),
3164
                        "instance should not run on node %s", node_i.name)
3165
          self._ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
3166
                        "node is running unknown instance %s", inst_uuid)
3167

    
3168
    self._VerifyGroupDRBDVersion(all_nvinfo)
3169
    self._VerifyGroupLVM(node_image, vg_name)
3170

    
3171
    for node_uuid, result in extra_lv_nvinfo.items():
3172
      self._UpdateNodeVolumes(self.all_node_info[node_uuid], result.payload,
3173
                              node_image[node_uuid], vg_name)
3174

    
3175
    feedback_fn("* Verifying instance status")
3176
    for inst_uuid in self.my_inst_uuids:
3177
      instance = self.my_inst_info[inst_uuid]
3178
      if verbose:
3179
        feedback_fn("* Verifying instance %s" % instance.name)
3180
      self._VerifyInstance(instance, node_image, instdisk[inst_uuid])
3181

    
3182
      # If the instance is non-redundant we cannot survive losing its primary
3183
      # node, so we are not N+1 compliant.
3184
      if instance.disk_template not in constants.DTS_MIRRORED:
3185
        i_non_redundant.append(instance)
3186

    
3187
      if not cluster.FillBE(instance)[constants.BE_AUTO_BALANCE]:
3188
        i_non_a_balanced.append(instance)
3189

    
3190
    feedback_fn("* Verifying orphan volumes")
3191
    reserved = utils.FieldSet(*cluster.reserved_lvs)
3192

    
3193
    # We will get spurious "unknown volume" warnings if any node of this group
3194
    # is secondary for an instance whose primary is in another group. To avoid
3195
    # them, we find these instances and add their volumes to node_vol_should.
3196
    for instance in self.all_inst_info.values():
3197
      for secondary in instance.secondary_nodes:
3198
        if (secondary in self.my_node_info
3199
            and instance.name not in self.my_inst_info):
3200
          instance.MapLVsByNode(node_vol_should)
3201
          break
3202

    
3203
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
3204

    
3205
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
3206
      feedback_fn("* Verifying N+1 Memory redundancy")
3207
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
3208

    
3209
    feedback_fn("* Other Notes")
3210
    if i_non_redundant:
3211
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
3212
                  % len(i_non_redundant))
3213

    
3214
    if i_non_a_balanced:
3215
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
3216
                  % len(i_non_a_balanced))
3217

    
3218
    if i_offline:
3219
      feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
3220

    
3221
    if n_offline:
3222
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
3223

    
3224
    if n_drained:
3225
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
3226

    
3227
    return not self.bad
3228

    
3229
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
3230
    """Analyze the post-hooks' result
3231

3232
    This method analyses the hook result, handles it, and sends some
3233
    nicely-formatted feedback back to the user.
3234

3235
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
3236
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
3237
    @param hooks_results: the results of the multi-node hooks rpc call
3238
    @param feedback_fn: function used send feedback back to the caller
3239
    @param lu_result: previous Exec result
3240
    @return: the new Exec result, based on the previous result
3241
        and hook results
3242

3243
    """
3244
    # We only really run POST phase hooks, only for non-empty groups,
3245
    # and are only interested in their results
3246
    if not self.my_node_uuids:
3247
      # empty node group
3248
      pass
3249
    elif phase == constants.HOOKS_PHASE_POST:
3250
      # Used to change hooks' output to proper indentation
3251
      feedback_fn("* Hooks Results")
3252
      assert hooks_results, "invalid result from hooks"
3253

    
3254
      for node_name in hooks_results:
3255
        res = hooks_results[node_name]
3256
        msg = res.fail_msg
3257
        test = msg and not res.offline
3258
        self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3259
                      "Communication failure in hooks execution: %s", msg)
3260
        if res.offline or msg:
3261
          # No need to investigate payload if node is offline or gave
3262
          # an error.
3263
          continue
3264
        for script, hkr, output in res.payload:
3265
          test = hkr == constants.HKR_FAIL
3266
          self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3267
                        "Script %s failed, output:", script)
3268
          if test:
3269
            output = self._HOOKS_INDENT_RE.sub("      ", output)
3270
            feedback_fn("%s" % output)
3271
            lu_result = False
3272

    
3273
    return lu_result
3274

    
3275

    
3276
class LUClusterVerifyDisks(NoHooksLU):
3277
  """Verifies the cluster disks status.
3278

3279
  """
3280
  REQ_BGL = False
3281

    
3282
  def ExpandNames(self):
3283
    self.share_locks = ShareAll()
3284
    self.needed_locks = {
3285
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
3286
      }
3287

    
3288
  def Exec(self, feedback_fn):
3289
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3290

    
3291
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3292
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3293
                           for group in group_names])