Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / cluster.py @ 84ad6b78

History | View | Annotate | Download (120 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with the cluster."""
23

    
24
import OpenSSL
25

    
26
import copy
27
import itertools
28
import logging
29
import operator
30
import os
31
import re
32
import time
33

    
34
from ganeti import compat
35
from ganeti import constants
36
from ganeti import errors
37
from ganeti import hypervisor
38
from ganeti import locking
39
from ganeti import masterd
40
from ganeti import netutils
41
from ganeti import objects
42
from ganeti import opcodes
43
from ganeti import pathutils
44
from ganeti import query
45
from ganeti import rpc
46
from ganeti import runtime
47
from ganeti import ssh
48
from ganeti import uidpool
49
from ganeti import utils
50
from ganeti import vcluster
51

    
52
from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
53
  ResultWithJobs
54
from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55
  ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56
  GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57
  GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58
  CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59
  ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob, \
60
  CheckIpolicyVsDiskTemplates, CheckDiskAccessModeValidity, \
61
  CheckDiskAccessModeConsistency
62

    
63
import ganeti.masterd.instance
64

    
65

    
66
class LUClusterActivateMasterIp(NoHooksLU):
67
  """Activate the master IP on the master node.
68

69
  """
70
  def Exec(self, feedback_fn):
71
    """Activate the master IP.
72

73
    """
74
    master_params = self.cfg.GetMasterNetworkParameters()
75
    ems = self.cfg.GetUseExternalMipScript()
76
    result = self.rpc.call_node_activate_master_ip(master_params.uuid,
77
                                                   master_params, ems)
78
    result.Raise("Could not activate the master IP")
79

    
80

    
81
class LUClusterDeactivateMasterIp(NoHooksLU):
82
  """Deactivate the master IP on the master node.
83

84
  """
85
  def Exec(self, feedback_fn):
86
    """Deactivate the master IP.
87

88
    """
89
    master_params = self.cfg.GetMasterNetworkParameters()
90
    ems = self.cfg.GetUseExternalMipScript()
91
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
92
                                                     master_params, ems)
93
    result.Raise("Could not deactivate the master IP")
94

    
95

    
96
class LUClusterConfigQuery(NoHooksLU):
97
  """Return configuration values.
98

99
  """
100
  REQ_BGL = False
101

    
102
  def CheckArguments(self):
103
    self.cq = ClusterQuery(None, self.op.output_fields, False)
104

    
105
  def ExpandNames(self):
106
    self.cq.ExpandNames(self)
107

    
108
  def DeclareLocks(self, level):
109
    self.cq.DeclareLocks(self, level)
110

    
111
  def Exec(self, feedback_fn):
112
    result = self.cq.OldStyleQuery(self)
113

    
114
    assert len(result) == 1
115

    
116
    return result[0]
117

    
118

    
119
class LUClusterDestroy(LogicalUnit):
120
  """Logical unit for destroying the cluster.
121

122
  """
123
  HPATH = "cluster-destroy"
124
  HTYPE = constants.HTYPE_CLUSTER
125

    
126
  def BuildHooksEnv(self):
127
    """Build hooks env.
128

129
    """
130
    return {
131
      "OP_TARGET": self.cfg.GetClusterName(),
132
      }
133

    
134
  def BuildHooksNodes(self):
135
    """Build hooks nodes.
136

137
    """
138
    return ([], [])
139

    
140
  def CheckPrereq(self):
141
    """Check prerequisites.
142

143
    This checks whether the cluster is empty.
144

145
    Any errors are signaled by raising errors.OpPrereqError.
146

147
    """
148
    master = self.cfg.GetMasterNode()
149

    
150
    nodelist = self.cfg.GetNodeList()
151
    if len(nodelist) != 1 or nodelist[0] != master:
152
      raise errors.OpPrereqError("There are still %d node(s) in"
153
                                 " this cluster." % (len(nodelist) - 1),
154
                                 errors.ECODE_INVAL)
155
    instancelist = self.cfg.GetInstanceList()
156
    if instancelist:
157
      raise errors.OpPrereqError("There are still %d instance(s) in"
158
                                 " this cluster." % len(instancelist),
159
                                 errors.ECODE_INVAL)
160

    
161
  def Exec(self, feedback_fn):
162
    """Destroys the cluster.
163

164
    """
165
    master_params = self.cfg.GetMasterNetworkParameters()
166

    
167
    # Run post hooks on master node before it's removed
168
    RunPostHook(self, self.cfg.GetNodeName(master_params.uuid))
169

    
170
    ems = self.cfg.GetUseExternalMipScript()
171
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
172
                                                     master_params, ems)
173
    result.Warn("Error disabling the master IP address", self.LogWarning)
174
    return master_params.uuid
175

    
176

    
177
class LUClusterPostInit(LogicalUnit):
178
  """Logical unit for running hooks after cluster initialization.
179

180
  """
181
  HPATH = "cluster-init"
182
  HTYPE = constants.HTYPE_CLUSTER
183

    
184
  def BuildHooksEnv(self):
185
    """Build hooks env.
186

187
    """
188
    return {
189
      "OP_TARGET": self.cfg.GetClusterName(),
190
      }
191

    
192
  def BuildHooksNodes(self):
193
    """Build hooks nodes.
194

195
    """
196
    return ([], [self.cfg.GetMasterNode()])
197

    
198
  def Exec(self, feedback_fn):
199
    """Nothing to do.
200

201
    """
202
    return True
203

    
204

    
205
class ClusterQuery(QueryBase):
206
  FIELDS = query.CLUSTER_FIELDS
207

    
208
  #: Do not sort (there is only one item)
209
  SORT_FIELD = None
210

    
211
  def ExpandNames(self, lu):
212
    lu.needed_locks = {}
213

    
214
    # The following variables interact with _QueryBase._GetNames
215
    self.wanted = locking.ALL_SET
216
    self.do_locking = self.use_locking
217

    
218
    if self.do_locking:
219
      raise errors.OpPrereqError("Can not use locking for cluster queries",
220
                                 errors.ECODE_INVAL)
221

    
222
  def DeclareLocks(self, lu, level):
223
    pass
224

    
225
  def _GetQueryData(self, lu):
226
    """Computes the list of nodes and their attributes.
227

228
    """
229
    # Locking is not used
230
    assert not (compat.any(lu.glm.is_owned(level)
231
                           for level in locking.LEVELS
232
                           if level != locking.LEVEL_CLUSTER) or
233
                self.do_locking or self.use_locking)
234

    
235
    if query.CQ_CONFIG in self.requested_data:
236
      cluster = lu.cfg.GetClusterInfo()
237
      nodes = lu.cfg.GetAllNodesInfo()
238
    else:
239
      cluster = NotImplemented
240
      nodes = NotImplemented
241

    
242
    if query.CQ_QUEUE_DRAINED in self.requested_data:
243
      drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
244
    else:
245
      drain_flag = NotImplemented
246

    
247
    if query.CQ_WATCHER_PAUSE in self.requested_data:
248
      master_node_uuid = lu.cfg.GetMasterNode()
249

    
250
      result = lu.rpc.call_get_watcher_pause(master_node_uuid)
251
      result.Raise("Can't retrieve watcher pause from master node '%s'" %
252
                   lu.cfg.GetMasterNodeName())
253

    
254
      watcher_pause = result.payload
255
    else:
256
      watcher_pause = NotImplemented
257

    
258
    return query.ClusterQueryData(cluster, nodes, drain_flag, watcher_pause)
259

    
260

    
261
class LUClusterQuery(NoHooksLU):
262
  """Query cluster configuration.
263

264
  """
265
  REQ_BGL = False
266

    
267
  def ExpandNames(self):
268
    self.needed_locks = {}
269

    
270
  def Exec(self, feedback_fn):
271
    """Return cluster config.
272

273
    """
274
    cluster = self.cfg.GetClusterInfo()
275
    os_hvp = {}
276

    
277
    # Filter just for enabled hypervisors
278
    for os_name, hv_dict in cluster.os_hvp.items():
279
      os_hvp[os_name] = {}
280
      for hv_name, hv_params in hv_dict.items():
281
        if hv_name in cluster.enabled_hypervisors:
282
          os_hvp[os_name][hv_name] = hv_params
283

    
284
    # Convert ip_family to ip_version
285
    primary_ip_version = constants.IP4_VERSION
286
    if cluster.primary_ip_family == netutils.IP6Address.family:
287
      primary_ip_version = constants.IP6_VERSION
288

    
289
    result = {
290
      "software_version": constants.RELEASE_VERSION,
291
      "protocol_version": constants.PROTOCOL_VERSION,
292
      "config_version": constants.CONFIG_VERSION,
293
      "os_api_version": max(constants.OS_API_VERSIONS),
294
      "export_version": constants.EXPORT_VERSION,
295
      "vcs_version": constants.VCS_VERSION,
296
      "architecture": runtime.GetArchInfo(),
297
      "name": cluster.cluster_name,
298
      "master": self.cfg.GetMasterNodeName(),
299
      "default_hypervisor": cluster.primary_hypervisor,
300
      "enabled_hypervisors": cluster.enabled_hypervisors,
301
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
302
                        for hypervisor_name in cluster.enabled_hypervisors]),
303
      "os_hvp": os_hvp,
304
      "beparams": cluster.beparams,
305
      "osparams": cluster.osparams,
306
      "ipolicy": cluster.ipolicy,
307
      "nicparams": cluster.nicparams,
308
      "ndparams": cluster.ndparams,
309
      "diskparams": cluster.diskparams,
310
      "candidate_pool_size": cluster.candidate_pool_size,
311
      "master_netdev": cluster.master_netdev,
312
      "master_netmask": cluster.master_netmask,
313
      "use_external_mip_script": cluster.use_external_mip_script,
314
      "volume_group_name": cluster.volume_group_name,
315
      "drbd_usermode_helper": cluster.drbd_usermode_helper,
316
      "file_storage_dir": cluster.file_storage_dir,
317
      "shared_file_storage_dir": cluster.shared_file_storage_dir,
318
      "maintain_node_health": cluster.maintain_node_health,
319
      "ctime": cluster.ctime,
320
      "mtime": cluster.mtime,
321
      "uuid": cluster.uuid,
322
      "tags": list(cluster.GetTags()),
323
      "uid_pool": cluster.uid_pool,
324
      "default_iallocator": cluster.default_iallocator,
325
      "reserved_lvs": cluster.reserved_lvs,
326
      "primary_ip_version": primary_ip_version,
327
      "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
328
      "hidden_os": cluster.hidden_os,
329
      "blacklisted_os": cluster.blacklisted_os,
330
      "enabled_disk_templates": cluster.enabled_disk_templates,
331
      }
332

    
333
    return result
334

    
335

    
336
class LUClusterRedistConf(NoHooksLU):
337
  """Force the redistribution of cluster configuration.
338

339
  This is a very simple LU.
340

341
  """
342
  REQ_BGL = False
343

    
344
  def ExpandNames(self):
345
    self.needed_locks = {
346
      locking.LEVEL_NODE: locking.ALL_SET,
347
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
348
    }
349
    self.share_locks = ShareAll()
350

    
351
  def Exec(self, feedback_fn):
352
    """Redistribute the configuration.
353

354
    """
355
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
356
    RedistributeAncillaryFiles(self)
357

    
358

    
359
class LUClusterRename(LogicalUnit):
360
  """Rename the cluster.
361

362
  """
363
  HPATH = "cluster-rename"
364
  HTYPE = constants.HTYPE_CLUSTER
365

    
366
  def BuildHooksEnv(self):
367
    """Build hooks env.
368

369
    """
370
    return {
371
      "OP_TARGET": self.cfg.GetClusterName(),
372
      "NEW_NAME": self.op.name,
373
      }
374

    
375
  def BuildHooksNodes(self):
376
    """Build hooks nodes.
377

378
    """
379
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
380

    
381
  def CheckPrereq(self):
382
    """Verify that the passed name is a valid one.
383

384
    """
385
    hostname = netutils.GetHostname(name=self.op.name,
386
                                    family=self.cfg.GetPrimaryIPFamily())
387

    
388
    new_name = hostname.name
389
    self.ip = new_ip = hostname.ip
390
    old_name = self.cfg.GetClusterName()
391
    old_ip = self.cfg.GetMasterIP()
392
    if new_name == old_name and new_ip == old_ip:
393
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
394
                                 " cluster has changed",
395
                                 errors.ECODE_INVAL)
396
    if new_ip != old_ip:
397
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
398
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
399
                                   " reachable on the network" %
400
                                   new_ip, errors.ECODE_NOTUNIQUE)
401

    
402
    self.op.name = new_name
403

    
404
  def Exec(self, feedback_fn):
405
    """Rename the cluster.
406

407
    """
408
    clustername = self.op.name
409
    new_ip = self.ip
410

    
411
    # shutdown the master IP
412
    master_params = self.cfg.GetMasterNetworkParameters()
413
    ems = self.cfg.GetUseExternalMipScript()
414
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
415
                                                     master_params, ems)
416
    result.Raise("Could not disable the master role")
417

    
418
    try:
419
      cluster = self.cfg.GetClusterInfo()
420
      cluster.cluster_name = clustername
421
      cluster.master_ip = new_ip
422
      self.cfg.Update(cluster, feedback_fn)
423

    
424
      # update the known hosts file
425
      ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
426
      node_list = self.cfg.GetOnlineNodeList()
427
      try:
428
        node_list.remove(master_params.uuid)
429
      except ValueError:
430
        pass
431
      UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
432
    finally:
433
      master_params.ip = new_ip
434
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
435
                                                     master_params, ems)
436
      result.Warn("Could not re-enable the master role on the master,"
437
                  " please restart manually", self.LogWarning)
438

    
439
    return clustername
440

    
441

    
442
class LUClusterRepairDiskSizes(NoHooksLU):
443
  """Verifies the cluster disks sizes.
444

445
  """
446
  REQ_BGL = False
447

    
448
  def ExpandNames(self):
449
    if self.op.instances:
450
      (_, self.wanted_names) = GetWantedInstances(self, self.op.instances)
451
      # Not getting the node allocation lock as only a specific set of
452
      # instances (and their nodes) is going to be acquired
453
      self.needed_locks = {
454
        locking.LEVEL_NODE_RES: [],
455
        locking.LEVEL_INSTANCE: self.wanted_names,
456
        }
457
      self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
458
    else:
459
      self.wanted_names = None
460
      self.needed_locks = {
461
        locking.LEVEL_NODE_RES: locking.ALL_SET,
462
        locking.LEVEL_INSTANCE: locking.ALL_SET,
463

    
464
        # This opcode is acquires the node locks for all instances
465
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
466
        }
467

    
468
    self.share_locks = {
469
      locking.LEVEL_NODE_RES: 1,
470
      locking.LEVEL_INSTANCE: 0,
471
      locking.LEVEL_NODE_ALLOC: 1,
472
      }
473

    
474
  def DeclareLocks(self, level):
475
    if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
476
      self._LockInstancesNodes(primary_only=True, level=level)
477

    
478
  def CheckPrereq(self):
479
    """Check prerequisites.
480

481
    This only checks the optional instance list against the existing names.
482

483
    """
484
    if self.wanted_names is None:
485
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
486

    
487
    self.wanted_instances = \
488
        map(compat.snd, self.cfg.GetMultiInstanceInfoByName(self.wanted_names))
489

    
490
  def _EnsureChildSizes(self, disk):
491
    """Ensure children of the disk have the needed disk size.
492

493
    This is valid mainly for DRBD8 and fixes an issue where the
494
    children have smaller disk size.
495

496
    @param disk: an L{ganeti.objects.Disk} object
497

498
    """
499
    if disk.dev_type == constants.DT_DRBD8:
500
      assert disk.children, "Empty children for DRBD8?"
501
      fchild = disk.children[0]
502
      mismatch = fchild.size < disk.size
503
      if mismatch:
504
        self.LogInfo("Child disk has size %d, parent %d, fixing",
505
                     fchild.size, disk.size)
506
        fchild.size = disk.size
507

    
508
      # and we recurse on this child only, not on the metadev
509
      return self._EnsureChildSizes(fchild) or mismatch
510
    else:
511
      return False
512

    
513
  def Exec(self, feedback_fn):
514
    """Verify the size of cluster disks.
515

516
    """
517
    # TODO: check child disks too
518
    # TODO: check differences in size between primary/secondary nodes
519
    per_node_disks = {}
520
    for instance in self.wanted_instances:
521
      pnode = instance.primary_node
522
      if pnode not in per_node_disks:
523
        per_node_disks[pnode] = []
524
      for idx, disk in enumerate(instance.disks):
525
        per_node_disks[pnode].append((instance, idx, disk))
526

    
527
    assert not (frozenset(per_node_disks.keys()) -
528
                self.owned_locks(locking.LEVEL_NODE_RES)), \
529
      "Not owning correct locks"
530
    assert not self.owned_locks(locking.LEVEL_NODE)
531

    
532
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
533
                                               per_node_disks.keys())
534

    
535
    changed = []
536
    for node_uuid, dskl in per_node_disks.items():
537
      if not dskl:
538
        # no disks on the node
539
        continue
540

    
541
      newl = [([v[2].Copy()], v[0]) for v in dskl]
542
      node_name = self.cfg.GetNodeName(node_uuid)
543
      result = self.rpc.call_blockdev_getdimensions(node_uuid, newl)
544
      if result.fail_msg:
545
        self.LogWarning("Failure in blockdev_getdimensions call to node"
546
                        " %s, ignoring", node_name)
547
        continue
548
      if len(result.payload) != len(dskl):
549
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
550
                        " result.payload=%s", node_name, len(dskl),
551
                        result.payload)
552
        self.LogWarning("Invalid result from node %s, ignoring node results",
553
                        node_name)
554
        continue
555
      for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
556
        if dimensions is None:
557
          self.LogWarning("Disk %d of instance %s did not return size"
558
                          " information, ignoring", idx, instance.name)
559
          continue
560
        if not isinstance(dimensions, (tuple, list)):
561
          self.LogWarning("Disk %d of instance %s did not return valid"
562
                          " dimension information, ignoring", idx,
563
                          instance.name)
564
          continue
565
        (size, spindles) = dimensions
566
        if not isinstance(size, (int, long)):
567
          self.LogWarning("Disk %d of instance %s did not return valid"
568
                          " size information, ignoring", idx, instance.name)
569
          continue
570
        size = size >> 20
571
        if size != disk.size:
572
          self.LogInfo("Disk %d of instance %s has mismatched size,"
573
                       " correcting: recorded %d, actual %d", idx,
574
                       instance.name, disk.size, size)
575
          disk.size = size
576
          self.cfg.Update(instance, feedback_fn)
577
          changed.append((instance.name, idx, "size", size))
578
        if es_flags[node_uuid]:
579
          if spindles is None:
580
            self.LogWarning("Disk %d of instance %s did not return valid"
581
                            " spindles information, ignoring", idx,
582
                            instance.name)
583
          elif disk.spindles is None or disk.spindles != spindles:
584
            self.LogInfo("Disk %d of instance %s has mismatched spindles,"
585
                         " correcting: recorded %s, actual %s",
586
                         idx, instance.name, disk.spindles, spindles)
587
            disk.spindles = spindles
588
            self.cfg.Update(instance, feedback_fn)
589
            changed.append((instance.name, idx, "spindles", disk.spindles))
590
        if self._EnsureChildSizes(disk):
591
          self.cfg.Update(instance, feedback_fn)
592
          changed.append((instance.name, idx, "size", disk.size))
593
    return changed
594

    
595

    
596
def _ValidateNetmask(cfg, netmask):
597
  """Checks if a netmask is valid.
598

599
  @type cfg: L{config.ConfigWriter}
600
  @param cfg: The cluster configuration
601
  @type netmask: int
602
  @param netmask: the netmask to be verified
603
  @raise errors.OpPrereqError: if the validation fails
604

605
  """
606
  ip_family = cfg.GetPrimaryIPFamily()
607
  try:
608
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
609
  except errors.ProgrammerError:
610
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
611
                               ip_family, errors.ECODE_INVAL)
612
  if not ipcls.ValidateNetmask(netmask):
613
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
614
                               (netmask), errors.ECODE_INVAL)
615

    
616

    
617
def CheckFileBasedStoragePathVsEnabledDiskTemplates(
618
    logging_warn_fn, file_storage_dir, enabled_disk_templates,
619
    file_disk_template):
620
  """Checks whether the given file-based storage directory is acceptable.
621

622
  Note: This function is public, because it is also used in bootstrap.py.
623

624
  @type logging_warn_fn: function
625
  @param logging_warn_fn: function which accepts a string and logs it
626
  @type file_storage_dir: string
627
  @param file_storage_dir: the directory to be used for file-based instances
628
  @type enabled_disk_templates: list of string
629
  @param enabled_disk_templates: the list of enabled disk templates
630
  @type file_disk_template: string
631
  @param file_disk_template: the file-based disk template for which the
632
      path should be checked
633

634
  """
635
  assert (file_disk_template in
636
          utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
637
  file_storage_enabled = file_disk_template in enabled_disk_templates
638
  if file_storage_dir is not None:
639
    if file_storage_dir == "":
640
      if file_storage_enabled:
641
        raise errors.OpPrereqError(
642
            "Unsetting the '%s' storage directory while having '%s' storage"
643
            " enabled is not permitted." %
644
            (file_disk_template, file_disk_template))
645
    else:
646
      if not file_storage_enabled:
647
        logging_warn_fn(
648
            "Specified a %s storage directory, although %s storage is not"
649
            " enabled." % (file_disk_template, file_disk_template))
650
  else:
651
    raise errors.ProgrammerError("Received %s storage dir with value"
652
                                 " 'None'." % file_disk_template)
653

    
654

    
655
def CheckFileStoragePathVsEnabledDiskTemplates(
656
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
657
  """Checks whether the given file storage directory is acceptable.
658

659
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
660

661
  """
662
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
663
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
664
      constants.DT_FILE)
665

    
666

    
667
def CheckSharedFileStoragePathVsEnabledDiskTemplates(
668
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
669
  """Checks whether the given shared file storage directory is acceptable.
670

671
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
672

673
  """
674
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
675
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
676
      constants.DT_SHARED_FILE)
677

    
678

    
679
class LUClusterSetParams(LogicalUnit):
680
  """Change the parameters of the cluster.
681

682
  """
683
  HPATH = "cluster-modify"
684
  HTYPE = constants.HTYPE_CLUSTER
685
  REQ_BGL = False
686

    
687
  def CheckArguments(self):
688
    """Check parameters
689

690
    """
691
    if self.op.uid_pool:
692
      uidpool.CheckUidPool(self.op.uid_pool)
693

    
694
    if self.op.add_uids:
695
      uidpool.CheckUidPool(self.op.add_uids)
696

    
697
    if self.op.remove_uids:
698
      uidpool.CheckUidPool(self.op.remove_uids)
699

    
700
    if self.op.master_netmask is not None:
701
      _ValidateNetmask(self.cfg, self.op.master_netmask)
702

    
703
    if self.op.diskparams:
704
      for dt_params in self.op.diskparams.values():
705
        utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
706
      try:
707
        utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
708
        CheckDiskAccessModeValidity(self.op.diskparams)
709
      except errors.OpPrereqError, err:
710
        raise errors.OpPrereqError("While verify diskparams options: %s" % err,
711
                                   errors.ECODE_INVAL)
712

    
713
  def ExpandNames(self):
714
    # FIXME: in the future maybe other cluster params won't require checking on
715
    # all nodes to be modified.
716
    # FIXME: This opcode changes cluster-wide settings. Is acquiring all
717
    # resource locks the right thing, shouldn't it be the BGL instead?
718
    self.needed_locks = {
719
      locking.LEVEL_NODE: locking.ALL_SET,
720
      locking.LEVEL_INSTANCE: locking.ALL_SET,
721
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
722
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
723
    }
724
    self.share_locks = ShareAll()
725

    
726
  def BuildHooksEnv(self):
727
    """Build hooks env.
728

729
    """
730
    return {
731
      "OP_TARGET": self.cfg.GetClusterName(),
732
      "NEW_VG_NAME": self.op.vg_name,
733
      }
734

    
735
  def BuildHooksNodes(self):
736
    """Build hooks nodes.
737

738
    """
739
    mn = self.cfg.GetMasterNode()
740
    return ([mn], [mn])
741

    
742
  def _CheckVgName(self, node_uuids, enabled_disk_templates,
743
                   new_enabled_disk_templates):
744
    """Check the consistency of the vg name on all nodes and in case it gets
745
       unset whether there are instances still using it.
746

747
    """
748
    lvm_is_enabled = utils.IsLvmEnabled(enabled_disk_templates)
749
    lvm_gets_enabled = utils.LvmGetsEnabled(enabled_disk_templates,
750
                                            new_enabled_disk_templates)
751
    current_vg_name = self.cfg.GetVGName()
752

    
753
    if self.op.vg_name == '':
754
      if lvm_is_enabled:
755
        raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
756
                                   " disk templates are or get enabled.")
757

    
758
    if self.op.vg_name is None:
759
      if current_vg_name is None and lvm_is_enabled:
760
        raise errors.OpPrereqError("Please specify a volume group when"
761
                                   " enabling lvm-based disk-templates.")
762

    
763
    if self.op.vg_name is not None and not self.op.vg_name:
764
      if self.cfg.HasAnyDiskOfType(constants.DT_PLAIN):
765
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
766
                                   " instances exist", errors.ECODE_INVAL)
767

    
768
    if (self.op.vg_name is not None and lvm_is_enabled) or \
769
        (self.cfg.GetVGName() is not None and lvm_gets_enabled):
770
      self._CheckVgNameOnNodes(node_uuids)
771

    
772
  def _CheckVgNameOnNodes(self, node_uuids):
773
    """Check the status of the volume group on each node.
774

775
    """
776
    vglist = self.rpc.call_vg_list(node_uuids)
777
    for node_uuid in node_uuids:
778
      msg = vglist[node_uuid].fail_msg
779
      if msg:
780
        # ignoring down node
781
        self.LogWarning("Error while gathering data on node %s"
782
                        " (ignoring node): %s",
783
                        self.cfg.GetNodeName(node_uuid), msg)
784
        continue
785
      vgstatus = utils.CheckVolumeGroupSize(vglist[node_uuid].payload,
786
                                            self.op.vg_name,
787
                                            constants.MIN_VG_SIZE)
788
      if vgstatus:
789
        raise errors.OpPrereqError("Error on node '%s': %s" %
790
                                   (self.cfg.GetNodeName(node_uuid), vgstatus),
791
                                   errors.ECODE_ENVIRON)
792

    
793
  @staticmethod
794
  def _GetEnabledDiskTemplatesInner(op_enabled_disk_templates,
795
                                    old_enabled_disk_templates):
796
    """Determines the enabled disk templates and the subset of disk templates
797
       that are newly enabled by this operation.
798

799
    """
800
    enabled_disk_templates = None
801
    new_enabled_disk_templates = []
802
    if op_enabled_disk_templates:
803
      enabled_disk_templates = op_enabled_disk_templates
804
      new_enabled_disk_templates = \
805
        list(set(enabled_disk_templates)
806
             - set(old_enabled_disk_templates))
807
    else:
808
      enabled_disk_templates = old_enabled_disk_templates
809
    return (enabled_disk_templates, new_enabled_disk_templates)
810

    
811
  def _GetEnabledDiskTemplates(self, cluster):
812
    """Determines the enabled disk templates and the subset of disk templates
813
       that are newly enabled by this operation.
814

815
    """
816
    return self._GetEnabledDiskTemplatesInner(self.op.enabled_disk_templates,
817
                                              cluster.enabled_disk_templates)
818

    
819
  def _CheckIpolicy(self, cluster, enabled_disk_templates):
820
    """Checks the ipolicy.
821

822
    @type cluster: C{objects.Cluster}
823
    @param cluster: the cluster's configuration
824
    @type enabled_disk_templates: list of string
825
    @param enabled_disk_templates: list of (possibly newly) enabled disk
826
      templates
827

828
    """
829
    # FIXME: write unit tests for this
830
    if self.op.ipolicy:
831
      self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
832
                                           group_policy=False)
833

    
834
      CheckIpolicyVsDiskTemplates(self.new_ipolicy,
835
                                  enabled_disk_templates)
836

    
837
      all_instances = self.cfg.GetAllInstancesInfo().values()
838
      violations = set()
839
      for group in self.cfg.GetAllNodeGroupsInfo().values():
840
        instances = frozenset([inst for inst in all_instances
841
                               if compat.any(nuuid in group.members
842
                                             for nuuid in inst.all_nodes)])
843
        new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
844
        ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
845
        new = ComputeNewInstanceViolations(ipol, new_ipolicy, instances,
846
                                           self.cfg)
847
        if new:
848
          violations.update(new)
849

    
850
      if violations:
851
        self.LogWarning("After the ipolicy change the following instances"
852
                        " violate them: %s",
853
                        utils.CommaJoin(utils.NiceSort(violations)))
854
    else:
855
      CheckIpolicyVsDiskTemplates(cluster.ipolicy,
856
                                  enabled_disk_templates)
857

    
858
  def _CheckDrbdHelperOnNodes(self, drbd_helper, node_uuids):
859
    """Checks whether the set DRBD helper actually exists on the nodes.
860

861
    @type drbd_helper: string
862
    @param drbd_helper: path of the drbd usermode helper binary
863
    @type node_uuids: list of strings
864
    @param node_uuids: list of node UUIDs to check for the helper
865

866
    """
867
    # checks given drbd helper on all nodes
868
    helpers = self.rpc.call_drbd_helper(node_uuids)
869
    for (_, ninfo) in self.cfg.GetMultiNodeInfo(node_uuids):
870
      if ninfo.offline:
871
        self.LogInfo("Not checking drbd helper on offline node %s",
872
                     ninfo.name)
873
        continue
874
      msg = helpers[ninfo.uuid].fail_msg
875
      if msg:
876
        raise errors.OpPrereqError("Error checking drbd helper on node"
877
                                   " '%s': %s" % (ninfo.name, msg),
878
                                   errors.ECODE_ENVIRON)
879
      node_helper = helpers[ninfo.uuid].payload
880
      if node_helper != drbd_helper:
881
        raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
882
                                   (ninfo.name, node_helper),
883
                                   errors.ECODE_ENVIRON)
884

    
885
  def _CheckDrbdHelper(self, node_uuids, drbd_enabled, drbd_gets_enabled):
886
    """Check the DRBD usermode helper.
887

888
    @type node_uuids: list of strings
889
    @param node_uuids: a list of nodes' UUIDs
890
    @type drbd_enabled: boolean
891
    @param drbd_enabled: whether DRBD will be enabled after this operation
892
      (no matter if it was disabled before or not)
893
    @type drbd_gets_enabled: boolen
894
    @param drbd_gets_enabled: true if DRBD was disabled before this
895
      operation, but will be enabled afterwards
896

897
    """
898
    if self.op.drbd_helper == '':
899
      if drbd_enabled:
900
        raise errors.OpPrereqError("Cannot disable drbd helper while"
901
                                   " DRBD is enabled.")
902
      if self.cfg.HasAnyDiskOfType(constants.DT_DRBD8):
903
        raise errors.OpPrereqError("Cannot disable drbd helper while"
904
                                   " drbd-based instances exist",
905
                                   errors.ECODE_INVAL)
906

    
907
    else:
908
      if self.op.drbd_helper is not None and drbd_enabled:
909
        self._CheckDrbdHelperOnNodes(self.op.drbd_helper, node_uuids)
910
      else:
911
        if drbd_gets_enabled:
912
          current_drbd_helper = self.cfg.GetClusterInfo().drbd_usermode_helper
913
          if current_drbd_helper is not None:
914
            self._CheckDrbdHelperOnNodes(current_drbd_helper, node_uuids)
915
          else:
916
            raise errors.OpPrereqError("Cannot enable DRBD without a"
917
                                       " DRBD usermode helper set.")
918

    
919
  def CheckPrereq(self):
920
    """Check prerequisites.
921

922
    This checks whether the given params don't conflict and
923
    if the given volume group is valid.
924

925
    """
926
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
927
    self.cluster = cluster = self.cfg.GetClusterInfo()
928

    
929
    vm_capable_node_uuids = [node.uuid
930
                             for node in self.cfg.GetAllNodesInfo().values()
931
                             if node.uuid in node_uuids and node.vm_capable]
932

    
933
    (enabled_disk_templates, new_enabled_disk_templates) = \
934
      self._GetEnabledDiskTemplates(cluster)
935

    
936
    self._CheckVgName(vm_capable_node_uuids, enabled_disk_templates,
937
                      new_enabled_disk_templates)
938

    
939
    if self.op.file_storage_dir is not None:
940
      CheckFileStoragePathVsEnabledDiskTemplates(
941
          self.LogWarning, self.op.file_storage_dir, enabled_disk_templates)
942

    
943
    if self.op.shared_file_storage_dir is not None:
944
      CheckSharedFileStoragePathVsEnabledDiskTemplates(
945
          self.LogWarning, self.op.shared_file_storage_dir,
946
          enabled_disk_templates)
947

    
948
    drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
949
    drbd_gets_enabled = constants.DT_DRBD8 in new_enabled_disk_templates
950
    self._CheckDrbdHelper(node_uuids, drbd_enabled, drbd_gets_enabled)
951

    
952
    # validate params changes
953
    if self.op.beparams:
954
      objects.UpgradeBeParams(self.op.beparams)
955
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
956
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
957

    
958
    if self.op.ndparams:
959
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
960
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
961

    
962
      # TODO: we need a more general way to handle resetting
963
      # cluster-level parameters to default values
964
      if self.new_ndparams["oob_program"] == "":
965
        self.new_ndparams["oob_program"] = \
966
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
967

    
968
    if self.op.hv_state:
969
      new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
970
                                           self.cluster.hv_state_static)
971
      self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
972
                               for hv, values in new_hv_state.items())
973

    
974
    if self.op.disk_state:
975
      new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
976
                                               self.cluster.disk_state_static)
977
      self.new_disk_state = \
978
        dict((storage, dict((name, cluster.SimpleFillDiskState(values))
979
                            for name, values in svalues.items()))
980
             for storage, svalues in new_disk_state.items())
981

    
982
    self._CheckIpolicy(cluster, enabled_disk_templates)
983

    
984
    if self.op.nicparams:
985
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
986
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
987
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
988
      nic_errors = []
989

    
990
      # check all instances for consistency
991
      for instance in self.cfg.GetAllInstancesInfo().values():
992
        for nic_idx, nic in enumerate(instance.nics):
993
          params_copy = copy.deepcopy(nic.nicparams)
994
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
995

    
996
          # check parameter syntax
997
          try:
998
            objects.NIC.CheckParameterSyntax(params_filled)
999
          except errors.ConfigurationError, err:
1000
            nic_errors.append("Instance %s, nic/%d: %s" %
1001
                              (instance.name, nic_idx, err))
1002

    
1003
          # if we're moving instances to routed, check that they have an ip
1004
          target_mode = params_filled[constants.NIC_MODE]
1005
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
1006
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
1007
                              " address" % (instance.name, nic_idx))
1008
      if nic_errors:
1009
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
1010
                                   "\n".join(nic_errors), errors.ECODE_INVAL)
1011

    
1012
    # hypervisor list/parameters
1013
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
1014
    if self.op.hvparams:
1015
      for hv_name, hv_dict in self.op.hvparams.items():
1016
        if hv_name not in self.new_hvparams:
1017
          self.new_hvparams[hv_name] = hv_dict
1018
        else:
1019
          self.new_hvparams[hv_name].update(hv_dict)
1020

    
1021
    # disk template parameters
1022
    self.new_diskparams = objects.FillDict(cluster.diskparams, {})
1023
    if self.op.diskparams:
1024
      for dt_name, dt_params in self.op.diskparams.items():
1025
        if dt_name not in self.new_diskparams:
1026
          self.new_diskparams[dt_name] = dt_params
1027
        else:
1028
          self.new_diskparams[dt_name].update(dt_params)
1029
      CheckDiskAccessModeConsistency(self.op.diskparams, self.cfg)
1030

    
1031
    # os hypervisor parameters
1032
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
1033
    if self.op.os_hvp:
1034
      for os_name, hvs in self.op.os_hvp.items():
1035
        if os_name not in self.new_os_hvp:
1036
          self.new_os_hvp[os_name] = hvs
1037
        else:
1038
          for hv_name, hv_dict in hvs.items():
1039
            if hv_dict is None:
1040
              # Delete if it exists
1041
              self.new_os_hvp[os_name].pop(hv_name, None)
1042
            elif hv_name not in self.new_os_hvp[os_name]:
1043
              self.new_os_hvp[os_name][hv_name] = hv_dict
1044
            else:
1045
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
1046

    
1047
    # os parameters
1048
    self.new_osp = objects.FillDict(cluster.osparams, {})
1049
    if self.op.osparams:
1050
      for os_name, osp in self.op.osparams.items():
1051
        if os_name not in self.new_osp:
1052
          self.new_osp[os_name] = {}
1053

    
1054
        self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
1055
                                                 use_none=True)
1056

    
1057
        if not self.new_osp[os_name]:
1058
          # we removed all parameters
1059
          del self.new_osp[os_name]
1060
        else:
1061
          # check the parameter validity (remote check)
1062
          CheckOSParams(self, False, [self.cfg.GetMasterNode()],
1063
                        os_name, self.new_osp[os_name])
1064

    
1065
    # changes to the hypervisor list
1066
    if self.op.enabled_hypervisors is not None:
1067
      self.hv_list = self.op.enabled_hypervisors
1068
      for hv in self.hv_list:
1069
        # if the hypervisor doesn't already exist in the cluster
1070
        # hvparams, we initialize it to empty, and then (in both
1071
        # cases) we make sure to fill the defaults, as we might not
1072
        # have a complete defaults list if the hypervisor wasn't
1073
        # enabled before
1074
        if hv not in new_hvp:
1075
          new_hvp[hv] = {}
1076
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
1077
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
1078
    else:
1079
      self.hv_list = cluster.enabled_hypervisors
1080

    
1081
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1082
      # either the enabled list has changed, or the parameters have, validate
1083
      for hv_name, hv_params in self.new_hvparams.items():
1084
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1085
            (self.op.enabled_hypervisors and
1086
             hv_name in self.op.enabled_hypervisors)):
1087
          # either this is a new hypervisor, or its parameters have changed
1088
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1089
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1090
          hv_class.CheckParameterSyntax(hv_params)
1091
          CheckHVParams(self, node_uuids, hv_name, hv_params)
1092

    
1093
    self._CheckDiskTemplateConsistency()
1094

    
1095
    if self.op.os_hvp:
1096
      # no need to check any newly-enabled hypervisors, since the
1097
      # defaults have already been checked in the above code-block
1098
      for os_name, os_hvp in self.new_os_hvp.items():
1099
        for hv_name, hv_params in os_hvp.items():
1100
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1101
          # we need to fill in the new os_hvp on top of the actual hv_p
1102
          cluster_defaults = self.new_hvparams.get(hv_name, {})
1103
          new_osp = objects.FillDict(cluster_defaults, hv_params)
1104
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1105
          hv_class.CheckParameterSyntax(new_osp)
1106
          CheckHVParams(self, node_uuids, hv_name, new_osp)
1107

    
1108
    if self.op.default_iallocator:
1109
      alloc_script = utils.FindFile(self.op.default_iallocator,
1110
                                    constants.IALLOCATOR_SEARCH_PATH,
1111
                                    os.path.isfile)
1112
      if alloc_script is None:
1113
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
1114
                                   " specified" % self.op.default_iallocator,
1115
                                   errors.ECODE_INVAL)
1116

    
1117
  def _CheckDiskTemplateConsistency(self):
1118
    """Check whether the disk templates that are going to be disabled
1119
       are still in use by some instances.
1120

1121
    """
1122
    if self.op.enabled_disk_templates:
1123
      cluster = self.cfg.GetClusterInfo()
1124
      instances = self.cfg.GetAllInstancesInfo()
1125

    
1126
      disk_templates_to_remove = set(cluster.enabled_disk_templates) \
1127
        - set(self.op.enabled_disk_templates)
1128
      for instance in instances.itervalues():
1129
        if instance.disk_template in disk_templates_to_remove:
1130
          raise errors.OpPrereqError("Cannot disable disk template '%s',"
1131
                                     " because instance '%s' is using it." %
1132
                                     (instance.disk_template, instance.name))
1133

    
1134
  def _SetVgName(self, feedback_fn):
1135
    """Determines and sets the new volume group name.
1136

1137
    """
1138
    if self.op.vg_name is not None:
1139
      new_volume = self.op.vg_name
1140
      if not new_volume:
1141
        new_volume = None
1142
      if new_volume != self.cfg.GetVGName():
1143
        self.cfg.SetVGName(new_volume)
1144
      else:
1145
        feedback_fn("Cluster LVM configuration already in desired"
1146
                    " state, not changing")
1147

    
1148
  def _SetFileStorageDir(self, feedback_fn):
1149
    """Set the file storage directory.
1150

1151
    """
1152
    if self.op.file_storage_dir is not None:
1153
      if self.cluster.file_storage_dir == self.op.file_storage_dir:
1154
        feedback_fn("Global file storage dir already set to value '%s'"
1155
                    % self.cluster.file_storage_dir)
1156
      else:
1157
        self.cluster.file_storage_dir = self.op.file_storage_dir
1158

    
1159
  def _SetDrbdHelper(self, feedback_fn):
1160
    """Set the DRBD usermode helper.
1161

1162
    """
1163
    if self.op.drbd_helper is not None:
1164
      if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
1165
        feedback_fn("Note that you specified a drbd user helper, but did not"
1166
                    " enable the drbd disk template.")
1167
      new_helper = self.op.drbd_helper
1168
      if not new_helper:
1169
        new_helper = None
1170
      if new_helper != self.cfg.GetDRBDHelper():
1171
        self.cfg.SetDRBDHelper(new_helper)
1172
      else:
1173
        feedback_fn("Cluster DRBD helper already in desired state,"
1174
                    " not changing")
1175

    
1176
  def Exec(self, feedback_fn):
1177
    """Change the parameters of the cluster.
1178

1179
    """
1180
    if self.op.enabled_disk_templates:
1181
      self.cluster.enabled_disk_templates = \
1182
        list(set(self.op.enabled_disk_templates))
1183

    
1184
    self._SetVgName(feedback_fn)
1185
    self._SetFileStorageDir(feedback_fn)
1186
    self._SetDrbdHelper(feedback_fn)
1187

    
1188
    if self.op.hvparams:
1189
      self.cluster.hvparams = self.new_hvparams
1190
    if self.op.os_hvp:
1191
      self.cluster.os_hvp = self.new_os_hvp
1192
    if self.op.enabled_hypervisors is not None:
1193
      self.cluster.hvparams = self.new_hvparams
1194
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1195
    if self.op.beparams:
1196
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1197
    if self.op.nicparams:
1198
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1199
    if self.op.ipolicy:
1200
      self.cluster.ipolicy = self.new_ipolicy
1201
    if self.op.osparams:
1202
      self.cluster.osparams = self.new_osp
1203
    if self.op.ndparams:
1204
      self.cluster.ndparams = self.new_ndparams
1205
    if self.op.diskparams:
1206
      self.cluster.diskparams = self.new_diskparams
1207
    if self.op.hv_state:
1208
      self.cluster.hv_state_static = self.new_hv_state
1209
    if self.op.disk_state:
1210
      self.cluster.disk_state_static = self.new_disk_state
1211

    
1212
    if self.op.candidate_pool_size is not None:
1213
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1214
      # we need to update the pool size here, otherwise the save will fail
1215
      AdjustCandidatePool(self, [])
1216

    
1217
    if self.op.maintain_node_health is not None:
1218
      if self.op.maintain_node_health and not constants.ENABLE_CONFD:
1219
        feedback_fn("Note: CONFD was disabled at build time, node health"
1220
                    " maintenance is not useful (still enabling it)")
1221
      self.cluster.maintain_node_health = self.op.maintain_node_health
1222

    
1223
    if self.op.modify_etc_hosts is not None:
1224
      self.cluster.modify_etc_hosts = self.op.modify_etc_hosts
1225

    
1226
    if self.op.prealloc_wipe_disks is not None:
1227
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
1228

    
1229
    if self.op.add_uids is not None:
1230
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
1231

    
1232
    if self.op.remove_uids is not None:
1233
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
1234

    
1235
    if self.op.uid_pool is not None:
1236
      self.cluster.uid_pool = self.op.uid_pool
1237

    
1238
    if self.op.default_iallocator is not None:
1239
      self.cluster.default_iallocator = self.op.default_iallocator
1240

    
1241
    if self.op.reserved_lvs is not None:
1242
      self.cluster.reserved_lvs = self.op.reserved_lvs
1243

    
1244
    if self.op.use_external_mip_script is not None:
1245
      self.cluster.use_external_mip_script = self.op.use_external_mip_script
1246

    
1247
    def helper_os(aname, mods, desc):
1248
      desc += " OS list"
1249
      lst = getattr(self.cluster, aname)
1250
      for key, val in mods:
1251
        if key == constants.DDM_ADD:
1252
          if val in lst:
1253
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1254
          else:
1255
            lst.append(val)
1256
        elif key == constants.DDM_REMOVE:
1257
          if val in lst:
1258
            lst.remove(val)
1259
          else:
1260
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1261
        else:
1262
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
1263

    
1264
    if self.op.hidden_os:
1265
      helper_os("hidden_os", self.op.hidden_os, "hidden")
1266

    
1267
    if self.op.blacklisted_os:
1268
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1269

    
1270
    if self.op.master_netdev:
1271
      master_params = self.cfg.GetMasterNetworkParameters()
1272
      ems = self.cfg.GetUseExternalMipScript()
1273
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
1274
                  self.cluster.master_netdev)
1275
      result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
1276
                                                       master_params, ems)
1277
      if not self.op.force:
1278
        result.Raise("Could not disable the master ip")
1279
      else:
1280
        if result.fail_msg:
1281
          msg = ("Could not disable the master ip (continuing anyway): %s" %
1282
                 result.fail_msg)
1283
          feedback_fn(msg)
1284
      feedback_fn("Changing master_netdev from %s to %s" %
1285
                  (master_params.netdev, self.op.master_netdev))
1286
      self.cluster.master_netdev = self.op.master_netdev
1287

    
1288
    if self.op.master_netmask:
1289
      master_params = self.cfg.GetMasterNetworkParameters()
1290
      feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1291
      result = self.rpc.call_node_change_master_netmask(
1292
                 master_params.uuid, master_params.netmask,
1293
                 self.op.master_netmask, master_params.ip,
1294
                 master_params.netdev)
1295
      result.Warn("Could not change the master IP netmask", feedback_fn)
1296
      self.cluster.master_netmask = self.op.master_netmask
1297

    
1298
    self.cfg.Update(self.cluster, feedback_fn)
1299

    
1300
    if self.op.master_netdev:
1301
      master_params = self.cfg.GetMasterNetworkParameters()
1302
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
1303
                  self.op.master_netdev)
1304
      ems = self.cfg.GetUseExternalMipScript()
1305
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
1306
                                                     master_params, ems)
1307
      result.Warn("Could not re-enable the master ip on the master,"
1308
                  " please restart manually", self.LogWarning)
1309

    
1310

    
1311
class LUClusterVerify(NoHooksLU):
1312
  """Submits all jobs necessary to verify the cluster.
1313

1314
  """
1315
  REQ_BGL = False
1316

    
1317
  def ExpandNames(self):
1318
    self.needed_locks = {}
1319

    
1320
  def Exec(self, feedback_fn):
1321
    jobs = []
1322

    
1323
    if self.op.group_name:
1324
      groups = [self.op.group_name]
1325
      depends_fn = lambda: None
1326
    else:
1327
      groups = self.cfg.GetNodeGroupList()
1328

    
1329
      # Verify global configuration
1330
      jobs.append([
1331
        opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1332
        ])
1333

    
1334
      # Always depend on global verification
1335
      depends_fn = lambda: [(-len(jobs), [])]
1336

    
1337
    jobs.extend(
1338
      [opcodes.OpClusterVerifyGroup(group_name=group,
1339
                                    ignore_errors=self.op.ignore_errors,
1340
                                    depends=depends_fn())]
1341
      for group in groups)
1342

    
1343
    # Fix up all parameters
1344
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1345
      op.debug_simulate_errors = self.op.debug_simulate_errors
1346
      op.verbose = self.op.verbose
1347
      op.error_codes = self.op.error_codes
1348
      try:
1349
        op.skip_checks = self.op.skip_checks
1350
      except AttributeError:
1351
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1352

    
1353
    return ResultWithJobs(jobs)
1354

    
1355

    
1356
class _VerifyErrors(object):
1357
  """Mix-in for cluster/group verify LUs.
1358

1359
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1360
  self.op and self._feedback_fn to be available.)
1361

1362
  """
1363

    
1364
  ETYPE_FIELD = "code"
1365
  ETYPE_ERROR = "ERROR"
1366
  ETYPE_WARNING = "WARNING"
1367

    
1368
  def _Error(self, ecode, item, msg, *args, **kwargs):
1369
    """Format an error message.
1370

1371
    Based on the opcode's error_codes parameter, either format a
1372
    parseable error code, or a simpler error string.
1373

1374
    This must be called only from Exec and functions called from Exec.
1375

1376
    """
1377
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1378
    itype, etxt, _ = ecode
1379
    # If the error code is in the list of ignored errors, demote the error to a
1380
    # warning
1381
    if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1382
      ltype = self.ETYPE_WARNING
1383
    # first complete the msg
1384
    if args:
1385
      msg = msg % args
1386
    # then format the whole message
1387
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1388
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1389
    else:
1390
      if item:
1391
        item = " " + item
1392
      else:
1393
        item = ""
1394
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1395
    # and finally report it via the feedback_fn
1396
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1397
    # do not mark the operation as failed for WARN cases only
1398
    if ltype == self.ETYPE_ERROR:
1399
      self.bad = True
1400

    
1401
  def _ErrorIf(self, cond, *args, **kwargs):
1402
    """Log an error message if the passed condition is True.
1403

1404
    """
1405
    if (bool(cond)
1406
        or self.op.debug_simulate_errors): # pylint: disable=E1101
1407
      self._Error(*args, **kwargs)
1408

    
1409

    
1410
def _VerifyCertificate(filename):
1411
  """Verifies a certificate for L{LUClusterVerifyConfig}.
1412

1413
  @type filename: string
1414
  @param filename: Path to PEM file
1415

1416
  """
1417
  try:
1418
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1419
                                           utils.ReadFile(filename))
1420
  except Exception, err: # pylint: disable=W0703
1421
    return (LUClusterVerifyConfig.ETYPE_ERROR,
1422
            "Failed to load X509 certificate %s: %s" % (filename, err))
1423

    
1424
  (errcode, msg) = \
1425
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1426
                                constants.SSL_CERT_EXPIRATION_ERROR)
1427

    
1428
  if msg:
1429
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1430
  else:
1431
    fnamemsg = None
1432

    
1433
  if errcode is None:
1434
    return (None, fnamemsg)
1435
  elif errcode == utils.CERT_WARNING:
1436
    return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1437
  elif errcode == utils.CERT_ERROR:
1438
    return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1439

    
1440
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1441

    
1442

    
1443
def _GetAllHypervisorParameters(cluster, instances):
1444
  """Compute the set of all hypervisor parameters.
1445

1446
  @type cluster: L{objects.Cluster}
1447
  @param cluster: the cluster object
1448
  @param instances: list of L{objects.Instance}
1449
  @param instances: additional instances from which to obtain parameters
1450
  @rtype: list of (origin, hypervisor, parameters)
1451
  @return: a list with all parameters found, indicating the hypervisor they
1452
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1453

1454
  """
1455
  hvp_data = []
1456

    
1457
  for hv_name in cluster.enabled_hypervisors:
1458
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1459

    
1460
  for os_name, os_hvp in cluster.os_hvp.items():
1461
    for hv_name, hv_params in os_hvp.items():
1462
      if hv_params:
1463
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1464
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1465

    
1466
  # TODO: collapse identical parameter values in a single one
1467
  for instance in instances:
1468
    if instance.hvparams:
1469
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1470
                       cluster.FillHV(instance)))
1471

    
1472
  return hvp_data
1473

    
1474

    
1475
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1476
  """Verifies the cluster config.
1477

1478
  """
1479
  REQ_BGL = False
1480

    
1481
  def _VerifyHVP(self, hvp_data):
1482
    """Verifies locally the syntax of the hypervisor parameters.
1483

1484
    """
1485
    for item, hv_name, hv_params in hvp_data:
1486
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1487
             (item, hv_name))
1488
      try:
1489
        hv_class = hypervisor.GetHypervisorClass(hv_name)
1490
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1491
        hv_class.CheckParameterSyntax(hv_params)
1492
      except errors.GenericError, err:
1493
        self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1494

    
1495
  def ExpandNames(self):
1496
    self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1497
    self.share_locks = ShareAll()
1498

    
1499
  def CheckPrereq(self):
1500
    """Check prerequisites.
1501

1502
    """
1503
    # Retrieve all information
1504
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1505
    self.all_node_info = self.cfg.GetAllNodesInfo()
1506
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1507

    
1508
  def Exec(self, feedback_fn):
1509
    """Verify integrity of cluster, performing various test on nodes.
1510

1511
    """
1512
    self.bad = False
1513
    self._feedback_fn = feedback_fn
1514

    
1515
    feedback_fn("* Verifying cluster config")
1516

    
1517
    for msg in self.cfg.VerifyConfig():
1518
      self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1519

    
1520
    feedback_fn("* Verifying cluster certificate files")
1521

    
1522
    for cert_filename in pathutils.ALL_CERT_FILES:
1523
      (errcode, msg) = _VerifyCertificate(cert_filename)
1524
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1525

    
1526
    self._ErrorIf(not utils.CanRead(constants.LUXID_USER,
1527
                                    pathutils.NODED_CERT_FILE),
1528
                  constants.CV_ECLUSTERCERT,
1529
                  None,
1530
                  pathutils.NODED_CERT_FILE + " must be accessible by the " +
1531
                    constants.LUXID_USER + " user")
1532

    
1533
    feedback_fn("* Verifying hypervisor parameters")
1534

    
1535
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1536
                                                self.all_inst_info.values()))
1537

    
1538
    feedback_fn("* Verifying all nodes belong to an existing group")
1539

    
1540
    # We do this verification here because, should this bogus circumstance
1541
    # occur, it would never be caught by VerifyGroup, which only acts on
1542
    # nodes/instances reachable from existing node groups.
1543

    
1544
    dangling_nodes = set(node for node in self.all_node_info.values()
1545
                         if node.group not in self.all_group_info)
1546

    
1547
    dangling_instances = {}
1548
    no_node_instances = []
1549

    
1550
    for inst in self.all_inst_info.values():
1551
      if inst.primary_node in [node.uuid for node in dangling_nodes]:
1552
        dangling_instances.setdefault(inst.primary_node, []).append(inst)
1553
      elif inst.primary_node not in self.all_node_info:
1554
        no_node_instances.append(inst)
1555

    
1556
    pretty_dangling = [
1557
        "%s (%s)" %
1558
        (node.name,
1559
         utils.CommaJoin(inst.name for
1560
                         inst in dangling_instances.get(node.uuid, [])))
1561
        for node in dangling_nodes]
1562

    
1563
    self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1564
                  None,
1565
                  "the following nodes (and their instances) belong to a non"
1566
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1567

    
1568
    self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1569
                  None,
1570
                  "the following instances have a non-existing primary-node:"
1571
                  " %s", utils.CommaJoin(inst.name for
1572
                                         inst in no_node_instances))
1573

    
1574
    return not self.bad
1575

    
1576

    
1577
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1578
  """Verifies the status of a node group.
1579

1580
  """
1581
  HPATH = "cluster-verify"
1582
  HTYPE = constants.HTYPE_CLUSTER
1583
  REQ_BGL = False
1584

    
1585
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1586

    
1587
  class NodeImage(object):
1588
    """A class representing the logical and physical status of a node.
1589

1590
    @type uuid: string
1591
    @ivar uuid: the node UUID to which this object refers
1592
    @ivar volumes: a structure as returned from
1593
        L{ganeti.backend.GetVolumeList} (runtime)
1594
    @ivar instances: a list of running instances (runtime)
1595
    @ivar pinst: list of configured primary instances (config)
1596
    @ivar sinst: list of configured secondary instances (config)
1597
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1598
        instances for which this node is secondary (config)
1599
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1600
    @ivar dfree: free disk, as reported by the node (runtime)
1601
    @ivar offline: the offline status (config)
1602
    @type rpc_fail: boolean
1603
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1604
        not whether the individual keys were correct) (runtime)
1605
    @type lvm_fail: boolean
1606
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1607
    @type hyp_fail: boolean
1608
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1609
    @type ghost: boolean
1610
    @ivar ghost: whether this is a known node or not (config)
1611
    @type os_fail: boolean
1612
    @ivar os_fail: whether the RPC call didn't return valid OS data
1613
    @type oslist: list
1614
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1615
    @type vm_capable: boolean
1616
    @ivar vm_capable: whether the node can host instances
1617
    @type pv_min: float
1618
    @ivar pv_min: size in MiB of the smallest PVs
1619
    @type pv_max: float
1620
    @ivar pv_max: size in MiB of the biggest PVs
1621

1622
    """
1623
    def __init__(self, offline=False, uuid=None, vm_capable=True):
1624
      self.uuid = uuid
1625
      self.volumes = {}
1626
      self.instances = []
1627
      self.pinst = []
1628
      self.sinst = []
1629
      self.sbp = {}
1630
      self.mfree = 0
1631
      self.dfree = 0
1632
      self.offline = offline
1633
      self.vm_capable = vm_capable
1634
      self.rpc_fail = False
1635
      self.lvm_fail = False
1636
      self.hyp_fail = False
1637
      self.ghost = False
1638
      self.os_fail = False
1639
      self.oslist = {}
1640
      self.pv_min = None
1641
      self.pv_max = None
1642

    
1643
  def ExpandNames(self):
1644
    # This raises errors.OpPrereqError on its own:
1645
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1646

    
1647
    # Get instances in node group; this is unsafe and needs verification later
1648
    inst_uuids = \
1649
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1650

    
1651
    self.needed_locks = {
1652
      locking.LEVEL_INSTANCE: self.cfg.GetInstanceNames(inst_uuids),
1653
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1654
      locking.LEVEL_NODE: [],
1655

    
1656
      # This opcode is run by watcher every five minutes and acquires all nodes
1657
      # for a group. It doesn't run for a long time, so it's better to acquire
1658
      # the node allocation lock as well.
1659
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1660
      }
1661

    
1662
    self.share_locks = ShareAll()
1663

    
1664
  def DeclareLocks(self, level):
1665
    if level == locking.LEVEL_NODE:
1666
      # Get members of node group; this is unsafe and needs verification later
1667
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1668

    
1669
      # In Exec(), we warn about mirrored instances that have primary and
1670
      # secondary living in separate node groups. To fully verify that
1671
      # volumes for these instances are healthy, we will need to do an
1672
      # extra call to their secondaries. We ensure here those nodes will
1673
      # be locked.
1674
      for inst_name in self.owned_locks(locking.LEVEL_INSTANCE):
1675
        # Important: access only the instances whose lock is owned
1676
        instance = self.cfg.GetInstanceInfoByName(inst_name)
1677
        if instance.disk_template in constants.DTS_INT_MIRROR:
1678
          nodes.update(instance.secondary_nodes)
1679

    
1680
      self.needed_locks[locking.LEVEL_NODE] = nodes
1681

    
1682
  def CheckPrereq(self):
1683
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1684
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1685

    
1686
    group_node_uuids = set(self.group_info.members)
1687
    group_inst_uuids = \
1688
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1689

    
1690
    unlocked_node_uuids = \
1691
        group_node_uuids.difference(self.owned_locks(locking.LEVEL_NODE))
1692

    
1693
    unlocked_inst_uuids = \
1694
        group_inst_uuids.difference(
1695
          [self.cfg.GetInstanceInfoByName(name).uuid
1696
           for name in self.owned_locks(locking.LEVEL_INSTANCE)])
1697

    
1698
    if unlocked_node_uuids:
1699
      raise errors.OpPrereqError(
1700
        "Missing lock for nodes: %s" %
1701
        utils.CommaJoin(self.cfg.GetNodeNames(unlocked_node_uuids)),
1702
        errors.ECODE_STATE)
1703

    
1704
    if unlocked_inst_uuids:
1705
      raise errors.OpPrereqError(
1706
        "Missing lock for instances: %s" %
1707
        utils.CommaJoin(self.cfg.GetInstanceNames(unlocked_inst_uuids)),
1708
        errors.ECODE_STATE)
1709

    
1710
    self.all_node_info = self.cfg.GetAllNodesInfo()
1711
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1712

    
1713
    self.my_node_uuids = group_node_uuids
1714
    self.my_node_info = dict((node_uuid, self.all_node_info[node_uuid])
1715
                             for node_uuid in group_node_uuids)
1716

    
1717
    self.my_inst_uuids = group_inst_uuids
1718
    self.my_inst_info = dict((inst_uuid, self.all_inst_info[inst_uuid])
1719
                             for inst_uuid in group_inst_uuids)
1720

    
1721
    # We detect here the nodes that will need the extra RPC calls for verifying
1722
    # split LV volumes; they should be locked.
1723
    extra_lv_nodes = set()
1724

    
1725
    for inst in self.my_inst_info.values():
1726
      if inst.disk_template in constants.DTS_INT_MIRROR:
1727
        for nuuid in inst.all_nodes:
1728
          if self.all_node_info[nuuid].group != self.group_uuid:
1729
            extra_lv_nodes.add(nuuid)
1730

    
1731
    unlocked_lv_nodes = \
1732
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1733

    
1734
    if unlocked_lv_nodes:
1735
      raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1736
                                 utils.CommaJoin(unlocked_lv_nodes),
1737
                                 errors.ECODE_STATE)
1738
    self.extra_lv_nodes = list(extra_lv_nodes)
1739

    
1740
  def _VerifyNode(self, ninfo, nresult):
1741
    """Perform some basic validation on data returned from a node.
1742

1743
      - check the result data structure is well formed and has all the
1744
        mandatory fields
1745
      - check ganeti version
1746

1747
    @type ninfo: L{objects.Node}
1748
    @param ninfo: the node to check
1749
    @param nresult: the results from the node
1750
    @rtype: boolean
1751
    @return: whether overall this call was successful (and we can expect
1752
         reasonable values in the respose)
1753

1754
    """
1755
    # main result, nresult should be a non-empty dict
1756
    test = not nresult or not isinstance(nresult, dict)
1757
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1758
                  "unable to verify node: no data returned")
1759
    if test:
1760
      return False
1761

    
1762
    # compares ganeti version
1763
    local_version = constants.PROTOCOL_VERSION
1764
    remote_version = nresult.get("version", None)
1765
    test = not (remote_version and
1766
                isinstance(remote_version, (list, tuple)) and
1767
                len(remote_version) == 2)
1768
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1769
                  "connection to node returned invalid data")
1770
    if test:
1771
      return False
1772

    
1773
    test = local_version != remote_version[0]
1774
    self._ErrorIf(test, constants.CV_ENODEVERSION, ninfo.name,
1775
                  "incompatible protocol versions: master %s,"
1776
                  " node %s", local_version, remote_version[0])
1777
    if test:
1778
      return False
1779

    
1780
    # node seems compatible, we can actually try to look into its results
1781

    
1782
    # full package version
1783
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1784
                  constants.CV_ENODEVERSION, ninfo.name,
1785
                  "software version mismatch: master %s, node %s",
1786
                  constants.RELEASE_VERSION, remote_version[1],
1787
                  code=self.ETYPE_WARNING)
1788

    
1789
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1790
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1791
      for hv_name, hv_result in hyp_result.iteritems():
1792
        test = hv_result is not None
1793
        self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
1794
                      "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1795

    
1796
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1797
    if ninfo.vm_capable and isinstance(hvp_result, list):
1798
      for item, hv_name, hv_result in hvp_result:
1799
        self._ErrorIf(True, constants.CV_ENODEHV, ninfo.name,
1800
                      "hypervisor %s parameter verify failure (source %s): %s",
1801
                      hv_name, item, hv_result)
1802

    
1803
    test = nresult.get(constants.NV_NODESETUP,
1804
                       ["Missing NODESETUP results"])
1805
    self._ErrorIf(test, constants.CV_ENODESETUP, ninfo.name,
1806
                  "node setup error: %s", "; ".join(test))
1807

    
1808
    return True
1809

    
1810
  def _VerifyNodeTime(self, ninfo, nresult,
1811
                      nvinfo_starttime, nvinfo_endtime):
1812
    """Check the node time.
1813

1814
    @type ninfo: L{objects.Node}
1815
    @param ninfo: the node to check
1816
    @param nresult: the remote results for the node
1817
    @param nvinfo_starttime: the start time of the RPC call
1818
    @param nvinfo_endtime: the end time of the RPC call
1819

1820
    """
1821
    ntime = nresult.get(constants.NV_TIME, None)
1822
    try:
1823
      ntime_merged = utils.MergeTime(ntime)
1824
    except (ValueError, TypeError):
1825
      self._ErrorIf(True, constants.CV_ENODETIME, ninfo.name,
1826
                    "Node returned invalid time")
1827
      return
1828

    
1829
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1830
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1831
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1832
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1833
    else:
1834
      ntime_diff = None
1835

    
1836
    self._ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, ninfo.name,
1837
                  "Node time diverges by at least %s from master node time",
1838
                  ntime_diff)
1839

    
1840
  def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1841
    """Check the node LVM results and update info for cross-node checks.
1842

1843
    @type ninfo: L{objects.Node}
1844
    @param ninfo: the node to check
1845
    @param nresult: the remote results for the node
1846
    @param vg_name: the configured VG name
1847
    @type nimg: L{NodeImage}
1848
    @param nimg: node image
1849

1850
    """
1851
    if vg_name is None:
1852
      return
1853

    
1854
    # checks vg existence and size > 20G
1855
    vglist = nresult.get(constants.NV_VGLIST, None)
1856
    test = not vglist
1857
    self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
1858
                  "unable to check volume groups")
1859
    if not test:
1860
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1861
                                            constants.MIN_VG_SIZE)
1862
      self._ErrorIf(vgstatus, constants.CV_ENODELVM, ninfo.name, vgstatus)
1863

    
1864
    # Check PVs
1865
    (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1866
    for em in errmsgs:
1867
      self._Error(constants.CV_ENODELVM, ninfo.name, em)
1868
    if pvminmax is not None:
1869
      (nimg.pv_min, nimg.pv_max) = pvminmax
1870

    
1871
  def _VerifyGroupDRBDVersion(self, node_verify_infos):
1872
    """Check cross-node DRBD version consistency.
1873

1874
    @type node_verify_infos: dict
1875
    @param node_verify_infos: infos about nodes as returned from the
1876
      node_verify call.
1877

1878
    """
1879
    node_versions = {}
1880
    for node_uuid, ndata in node_verify_infos.items():
1881
      nresult = ndata.payload
1882
      version = nresult.get(constants.NV_DRBDVERSION, "Missing DRBD version")
1883
      node_versions[node_uuid] = version
1884

    
1885
    if len(set(node_versions.values())) > 1:
1886
      for node_uuid, version in sorted(node_versions.items()):
1887
        msg = "DRBD version mismatch: %s" % version
1888
        self._Error(constants.CV_ENODEDRBDHELPER, node_uuid, msg,
1889
                    code=self.ETYPE_WARNING)
1890

    
1891
  def _VerifyGroupLVM(self, node_image, vg_name):
1892
    """Check cross-node consistency in LVM.
1893

1894
    @type node_image: dict
1895
    @param node_image: info about nodes, mapping from node to names to
1896
      L{NodeImage} objects
1897
    @param vg_name: the configured VG name
1898

1899
    """
1900
    if vg_name is None:
1901
      return
1902

    
1903
    # Only exclusive storage needs this kind of checks
1904
    if not self._exclusive_storage:
1905
      return
1906

    
1907
    # exclusive_storage wants all PVs to have the same size (approximately),
1908
    # if the smallest and the biggest ones are okay, everything is fine.
1909
    # pv_min is None iff pv_max is None
1910
    vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1911
    if not vals:
1912
      return
1913
    (pvmin, minnode_uuid) = min((ni.pv_min, ni.uuid) for ni in vals)
1914
    (pvmax, maxnode_uuid) = max((ni.pv_max, ni.uuid) for ni in vals)
1915
    bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1916
    self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1917
                  "PV sizes differ too much in the group; smallest (%s MB) is"
1918
                  " on %s, biggest (%s MB) is on %s",
1919
                  pvmin, self.cfg.GetNodeName(minnode_uuid),
1920
                  pvmax, self.cfg.GetNodeName(maxnode_uuid))
1921

    
1922
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1923
    """Check the node bridges.
1924

1925
    @type ninfo: L{objects.Node}
1926
    @param ninfo: the node to check
1927
    @param nresult: the remote results for the node
1928
    @param bridges: the expected list of bridges
1929

1930
    """
1931
    if not bridges:
1932
      return
1933

    
1934
    missing = nresult.get(constants.NV_BRIDGES, None)
1935
    test = not isinstance(missing, list)
1936
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1937
                  "did not return valid bridge information")
1938
    if not test:
1939
      self._ErrorIf(bool(missing), constants.CV_ENODENET, ninfo.name,
1940
                    "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1941

    
1942
  def _VerifyNodeUserScripts(self, ninfo, nresult):
1943
    """Check the results of user scripts presence and executability on the node
1944

1945
    @type ninfo: L{objects.Node}
1946
    @param ninfo: the node to check
1947
    @param nresult: the remote results for the node
1948

1949
    """
1950
    test = not constants.NV_USERSCRIPTS in nresult
1951
    self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1952
                  "did not return user scripts information")
1953

    
1954
    broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
1955
    if not test:
1956
      self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1957
                    "user scripts not present or not executable: %s" %
1958
                    utils.CommaJoin(sorted(broken_scripts)))
1959

    
1960
  def _VerifyNodeNetwork(self, ninfo, nresult):
1961
    """Check the node network connectivity results.
1962

1963
    @type ninfo: L{objects.Node}
1964
    @param ninfo: the node to check
1965
    @param nresult: the remote results for the node
1966

1967
    """
1968
    test = constants.NV_NODELIST not in nresult
1969
    self._ErrorIf(test, constants.CV_ENODESSH, ninfo.name,
1970
                  "node hasn't returned node ssh connectivity data")
1971
    if not test:
1972
      if nresult[constants.NV_NODELIST]:
1973
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1974
          self._ErrorIf(True, constants.CV_ENODESSH, ninfo.name,
1975
                        "ssh communication with node '%s': %s", a_node, a_msg)
1976

    
1977
    test = constants.NV_NODENETTEST not in nresult
1978
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1979
                  "node hasn't returned node tcp connectivity data")
1980
    if not test:
1981
      if nresult[constants.NV_NODENETTEST]:
1982
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1983
        for anode in nlist:
1984
          self._ErrorIf(True, constants.CV_ENODENET, ninfo.name,
1985
                        "tcp communication with node '%s': %s",
1986
                        anode, nresult[constants.NV_NODENETTEST][anode])
1987

    
1988
    test = constants.NV_MASTERIP not in nresult
1989
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1990
                  "node hasn't returned node master IP reachability data")
1991
    if not test:
1992
      if not nresult[constants.NV_MASTERIP]:
1993
        if ninfo.uuid == self.master_node:
1994
          msg = "the master node cannot reach the master IP (not configured?)"
1995
        else:
1996
          msg = "cannot reach the master IP"
1997
        self._ErrorIf(True, constants.CV_ENODENET, ninfo.name, msg)
1998

    
1999
  def _VerifyInstance(self, instance, node_image, diskstatus):
2000
    """Verify an instance.
2001

2002
    This function checks to see if the required block devices are
2003
    available on the instance's node, and that the nodes are in the correct
2004
    state.
2005

2006
    """
2007
    pnode_uuid = instance.primary_node
2008
    pnode_img = node_image[pnode_uuid]
2009
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
2010

    
2011
    node_vol_should = {}
2012
    instance.MapLVsByNode(node_vol_should)
2013

    
2014
    cluster = self.cfg.GetClusterInfo()
2015
    ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2016
                                                            self.group_info)
2017
    err = ComputeIPolicyInstanceViolation(ipolicy, instance, self.cfg)
2018
    self._ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance.name,
2019
                  utils.CommaJoin(err), code=self.ETYPE_WARNING)
2020

    
2021
    for node_uuid in node_vol_should:
2022
      n_img = node_image[node_uuid]
2023
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
2024
        # ignore missing volumes on offline or broken nodes
2025
        continue
2026
      for volume in node_vol_should[node_uuid]:
2027
        test = volume not in n_img.volumes
2028
        self._ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance.name,
2029
                      "volume %s missing on node %s", volume,
2030
                      self.cfg.GetNodeName(node_uuid))
2031

    
2032
    if instance.admin_state == constants.ADMINST_UP:
2033
      test = instance.uuid not in pnode_img.instances and not pnode_img.offline
2034
      self._ErrorIf(test, constants.CV_EINSTANCEDOWN, instance.name,
2035
                    "instance not running on its primary node %s",
2036
                     self.cfg.GetNodeName(pnode_uuid))
2037
      self._ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE,
2038
                    instance.name, "instance is marked as running and lives on"
2039
                    " offline node %s", self.cfg.GetNodeName(pnode_uuid))
2040

    
2041
    diskdata = [(nname, success, status, idx)
2042
                for (nname, disks) in diskstatus.items()
2043
                for idx, (success, status) in enumerate(disks)]
2044

    
2045
    for nname, success, bdev_status, idx in diskdata:
2046
      # the 'ghost node' construction in Exec() ensures that we have a
2047
      # node here
2048
      snode = node_image[nname]
2049
      bad_snode = snode.ghost or snode.offline
2050
      self._ErrorIf(instance.disks_active and
2051
                    not success and not bad_snode,
2052
                    constants.CV_EINSTANCEFAULTYDISK, instance.name,
2053
                    "couldn't retrieve status for disk/%s on %s: %s",
2054
                    idx, self.cfg.GetNodeName(nname), bdev_status)
2055

    
2056
      if instance.disks_active and success and \
2057
         (bdev_status.is_degraded or
2058
          bdev_status.ldisk_status != constants.LDS_OKAY):
2059
        msg = "disk/%s on %s" % (idx, self.cfg.GetNodeName(nname))
2060
        if bdev_status.is_degraded:
2061
          msg += " is degraded"
2062
        if bdev_status.ldisk_status != constants.LDS_OKAY:
2063
          msg += "; state is '%s'" % \
2064
                 constants.LDS_NAMES[bdev_status.ldisk_status]
2065

    
2066
        self._Error(constants.CV_EINSTANCEFAULTYDISK, instance.name, msg)
2067

    
2068
    self._ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2069
                  constants.CV_ENODERPC, self.cfg.GetNodeName(pnode_uuid),
2070
                  "instance %s, connection to primary node failed",
2071
                  instance.name)
2072

    
2073
    self._ErrorIf(len(instance.secondary_nodes) > 1,
2074
                  constants.CV_EINSTANCELAYOUT, instance.name,
2075
                  "instance has multiple secondary nodes: %s",
2076
                  utils.CommaJoin(instance.secondary_nodes),
2077
                  code=self.ETYPE_WARNING)
2078

    
2079
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, instance.all_nodes)
2080
    if any(es_flags.values()):
2081
      if instance.disk_template not in constants.DTS_EXCL_STORAGE:
2082
        # Disk template not compatible with exclusive_storage: no instance
2083
        # node should have the flag set
2084
        es_nodes = [n
2085
                    for (n, es) in es_flags.items()
2086
                    if es]
2087
        self._Error(constants.CV_EINSTANCEUNSUITABLENODE, instance.name,
2088
                    "instance has template %s, which is not supported on nodes"
2089
                    " that have exclusive storage set: %s",
2090
                    instance.disk_template,
2091
                    utils.CommaJoin(self.cfg.GetNodeNames(es_nodes)))
2092
      for (idx, disk) in enumerate(instance.disks):
2093
        self._ErrorIf(disk.spindles is None,
2094
                      constants.CV_EINSTANCEMISSINGCFGPARAMETER, instance.name,
2095
                      "number of spindles not configured for disk %s while"
2096
                      " exclusive storage is enabled, try running"
2097
                      " gnt-cluster repair-disk-sizes", idx)
2098

    
2099
    if instance.disk_template in constants.DTS_INT_MIRROR:
2100
      instance_nodes = utils.NiceSort(instance.all_nodes)
2101
      instance_groups = {}
2102

    
2103
      for node_uuid in instance_nodes:
2104
        instance_groups.setdefault(self.all_node_info[node_uuid].group,
2105
                                   []).append(node_uuid)
2106

    
2107
      pretty_list = [
2108
        "%s (group %s)" % (utils.CommaJoin(self.cfg.GetNodeNames(nodes)),
2109
                           groupinfo[group].name)
2110
        # Sort so that we always list the primary node first.
2111
        for group, nodes in sorted(instance_groups.items(),
2112
                                   key=lambda (_, nodes): pnode_uuid in nodes,
2113
                                   reverse=True)]
2114

    
2115
      self._ErrorIf(len(instance_groups) > 1,
2116
                    constants.CV_EINSTANCESPLITGROUPS,
2117
                    instance.name, "instance has primary and secondary nodes in"
2118
                    " different groups: %s", utils.CommaJoin(pretty_list),
2119
                    code=self.ETYPE_WARNING)
2120

    
2121
    inst_nodes_offline = []
2122
    for snode in instance.secondary_nodes:
2123
      s_img = node_image[snode]
2124
      self._ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
2125
                    self.cfg.GetNodeName(snode),
2126
                    "instance %s, connection to secondary node failed",
2127
                    instance.name)
2128

    
2129
      if s_img.offline:
2130
        inst_nodes_offline.append(snode)
2131

    
2132
    # warn that the instance lives on offline nodes
2133
    self._ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE,
2134
                  instance.name, "instance has offline secondary node(s) %s",
2135
                  utils.CommaJoin(self.cfg.GetNodeNames(inst_nodes_offline)))
2136
    # ... or ghost/non-vm_capable nodes
2137
    for node_uuid in instance.all_nodes:
2138
      self._ErrorIf(node_image[node_uuid].ghost, constants.CV_EINSTANCEBADNODE,
2139
                    instance.name, "instance lives on ghost node %s",
2140
                    self.cfg.GetNodeName(node_uuid))
2141
      self._ErrorIf(not node_image[node_uuid].vm_capable,
2142
                    constants.CV_EINSTANCEBADNODE, instance.name,
2143
                    "instance lives on non-vm_capable node %s",
2144
                    self.cfg.GetNodeName(node_uuid))
2145

    
2146
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2147
    """Verify if there are any unknown volumes in the cluster.
2148

2149
    The .os, .swap and backup volumes are ignored. All other volumes are
2150
    reported as unknown.
2151

2152
    @type reserved: L{ganeti.utils.FieldSet}
2153
    @param reserved: a FieldSet of reserved volume names
2154

2155
    """
2156
    for node_uuid, n_img in node_image.items():
2157
      if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
2158
          self.all_node_info[node_uuid].group != self.group_uuid):
2159
        # skip non-healthy nodes
2160
        continue
2161
      for volume in n_img.volumes:
2162
        test = ((node_uuid not in node_vol_should or
2163
                volume not in node_vol_should[node_uuid]) and
2164
                not reserved.Matches(volume))
2165
        self._ErrorIf(test, constants.CV_ENODEORPHANLV,
2166
                      self.cfg.GetNodeName(node_uuid),
2167
                      "volume %s is unknown", volume)
2168

    
2169
  def _VerifyNPlusOneMemory(self, node_image, all_insts):
2170
    """Verify N+1 Memory Resilience.
2171

2172
    Check that if one single node dies we can still start all the
2173
    instances it was primary for.
2174

2175
    """
2176
    cluster_info = self.cfg.GetClusterInfo()
2177
    for node_uuid, n_img in node_image.items():
2178
      # This code checks that every node which is now listed as
2179
      # secondary has enough memory to host all instances it is
2180
      # supposed to should a single other node in the cluster fail.
2181
      # FIXME: not ready for failover to an arbitrary node
2182
      # FIXME: does not support file-backed instances
2183
      # WARNING: we currently take into account down instances as well
2184
      # as up ones, considering that even if they're down someone
2185
      # might want to start them even in the event of a node failure.
2186
      if n_img.offline or \
2187
         self.all_node_info[node_uuid].group != self.group_uuid:
2188
        # we're skipping nodes marked offline and nodes in other groups from
2189
        # the N+1 warning, since most likely we don't have good memory
2190
        # information from them; we already list instances living on such
2191
        # nodes, and that's enough warning
2192
        continue
2193
      #TODO(dynmem): also consider ballooning out other instances
2194
      for prinode, inst_uuids in n_img.sbp.items():
2195
        needed_mem = 0
2196
        for inst_uuid in inst_uuids:
2197
          bep = cluster_info.FillBE(all_insts[inst_uuid])
2198
          if bep[constants.BE_AUTO_BALANCE]:
2199
            needed_mem += bep[constants.BE_MINMEM]
2200
        test = n_img.mfree < needed_mem
2201
        self._ErrorIf(test, constants.CV_ENODEN1,
2202
                      self.cfg.GetNodeName(node_uuid),
2203
                      "not enough memory to accomodate instance failovers"
2204
                      " should node %s fail (%dMiB needed, %dMiB available)",
2205
                      self.cfg.GetNodeName(prinode), needed_mem, n_img.mfree)
2206

    
2207
  def _VerifyFiles(self, nodes, master_node_uuid, all_nvinfo,
2208
                   (files_all, files_opt, files_mc, files_vm)):
2209
    """Verifies file checksums collected from all nodes.
2210

2211
    @param nodes: List of L{objects.Node} objects
2212
    @param master_node_uuid: UUID of master node
2213
    @param all_nvinfo: RPC results
2214

2215
    """
2216
    # Define functions determining which nodes to consider for a file
2217
    files2nodefn = [
2218
      (files_all, None),
2219
      (files_mc, lambda node: (node.master_candidate or
2220
                               node.uuid == master_node_uuid)),
2221
      (files_vm, lambda node: node.vm_capable),
2222
      ]
2223

    
2224
    # Build mapping from filename to list of nodes which should have the file
2225
    nodefiles = {}
2226
    for (files, fn) in files2nodefn:
2227
      if fn is None:
2228
        filenodes = nodes
2229
      else:
2230
        filenodes = filter(fn, nodes)
2231
      nodefiles.update((filename,
2232
                        frozenset(map(operator.attrgetter("uuid"), filenodes)))
2233
                       for filename in files)
2234

    
2235
    assert set(nodefiles) == (files_all | files_mc | files_vm)
2236

    
2237
    fileinfo = dict((filename, {}) for filename in nodefiles)
2238
    ignore_nodes = set()
2239

    
2240
    for node in nodes:
2241
      if node.offline:
2242
        ignore_nodes.add(node.uuid)
2243
        continue
2244

    
2245
      nresult = all_nvinfo[node.uuid]
2246

    
2247
      if nresult.fail_msg or not nresult.payload:
2248
        node_files = None
2249
      else:
2250
        fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
2251
        node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2252
                          for (key, value) in fingerprints.items())
2253
        del fingerprints
2254

    
2255
      test = not (node_files and isinstance(node_files, dict))
2256
      self._ErrorIf(test, constants.CV_ENODEFILECHECK, node.name,
2257
                    "Node did not return file checksum data")
2258
      if test:
2259
        ignore_nodes.add(node.uuid)
2260
        continue
2261

    
2262
      # Build per-checksum mapping from filename to nodes having it
2263
      for (filename, checksum) in node_files.items():
2264
        assert filename in nodefiles
2265
        fileinfo[filename].setdefault(checksum, set()).add(node.uuid)
2266

    
2267
    for (filename, checksums) in fileinfo.items():
2268
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2269

    
2270
      # Nodes having the file
2271
      with_file = frozenset(node_uuid
2272
                            for node_uuids in fileinfo[filename].values()
2273
                            for node_uuid in node_uuids) - ignore_nodes
2274

    
2275
      expected_nodes = nodefiles[filename] - ignore_nodes
2276

    
2277
      # Nodes missing file
2278
      missing_file = expected_nodes - with_file
2279

    
2280
      if filename in files_opt:
2281
        # All or no nodes
2282
        self._ErrorIf(missing_file and missing_file != expected_nodes,
2283
                      constants.CV_ECLUSTERFILECHECK, None,
2284
                      "File %s is optional, but it must exist on all or no"
2285
                      " nodes (not found on %s)",
2286
                      filename,
2287
                      utils.CommaJoin(
2288
                        utils.NiceSort(
2289
                          map(self.cfg.GetNodeName, missing_file))))
2290
      else:
2291
        self._ErrorIf(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2292
                      "File %s is missing from node(s) %s", filename,
2293
                      utils.CommaJoin(
2294
                        utils.NiceSort(
2295
                          map(self.cfg.GetNodeName, missing_file))))
2296

    
2297
        # Warn if a node has a file it shouldn't
2298
        unexpected = with_file - expected_nodes
2299
        self._ErrorIf(unexpected,
2300
                      constants.CV_ECLUSTERFILECHECK, None,
2301
                      "File %s should not exist on node(s) %s",
2302
                      filename, utils.CommaJoin(
2303
                        utils.NiceSort(map(self.cfg.GetNodeName, unexpected))))
2304

    
2305
      # See if there are multiple versions of the file
2306
      test = len(checksums) > 1
2307
      if test:
2308
        variants = ["variant %s on %s" %
2309
                    (idx + 1,
2310
                     utils.CommaJoin(utils.NiceSort(
2311
                       map(self.cfg.GetNodeName, node_uuids))))
2312
                    for (idx, (checksum, node_uuids)) in
2313
                      enumerate(sorted(checksums.items()))]
2314
      else:
2315
        variants = []
2316

    
2317
      self._ErrorIf(test, constants.CV_ECLUSTERFILECHECK, None,
2318
                    "File %s found with %s different checksums (%s)",
2319
                    filename, len(checksums), "; ".join(variants))
2320

    
2321
  def _VerifyNodeDrbdHelper(self, ninfo, nresult, drbd_helper):
2322
    """Verify the drbd helper.
2323

2324
    """
2325
    if drbd_helper:
2326
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2327
      test = (helper_result is None)
2328
      self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2329
                    "no drbd usermode helper returned")
2330
      if helper_result:
2331
        status, payload = helper_result
2332
        test = not status
2333
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2334
                      "drbd usermode helper check unsuccessful: %s", payload)
2335
        test = status and (payload != drbd_helper)
2336
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2337
                      "wrong drbd usermode helper: %s", payload)
2338

    
2339
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2340
                      drbd_map):
2341
    """Verifies and the node DRBD status.
2342

2343
    @type ninfo: L{objects.Node}
2344
    @param ninfo: the node to check
2345
    @param nresult: the remote results for the node
2346
    @param instanceinfo: the dict of instances
2347
    @param drbd_helper: the configured DRBD usermode helper
2348
    @param drbd_map: the DRBD map as returned by
2349
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2350

2351
    """
2352
    self._VerifyNodeDrbdHelper(ninfo, nresult, drbd_helper)
2353

    
2354
    # compute the DRBD minors
2355
    node_drbd = {}
2356
    for minor, inst_uuid in drbd_map[ninfo.uuid].items():
2357
      test = inst_uuid not in instanceinfo
2358
      self._ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2359
                    "ghost instance '%s' in temporary DRBD map", inst_uuid)
2360
        # ghost instance should not be running, but otherwise we
2361
        # don't give double warnings (both ghost instance and
2362
        # unallocated minor in use)
2363
      if test:
2364
        node_drbd[minor] = (inst_uuid, False)
2365
      else:
2366
        instance = instanceinfo[inst_uuid]
2367
        node_drbd[minor] = (inst_uuid, instance.disks_active)
2368

    
2369
    # and now check them
2370
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2371
    test = not isinstance(used_minors, (tuple, list))
2372
    self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2373
                  "cannot parse drbd status file: %s", str(used_minors))
2374
    if test:
2375
      # we cannot check drbd status
2376
      return
2377

    
2378
    for minor, (inst_uuid, must_exist) in node_drbd.items():
2379
      test = minor not in used_minors and must_exist
2380
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2381
                    "drbd minor %d of instance %s is not active", minor,
2382
                    self.cfg.GetInstanceName(inst_uuid))
2383
    for minor in used_minors:
2384
      test = minor not in node_drbd
2385
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2386
                    "unallocated drbd minor %d is in use", minor)
2387

    
2388
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2389
    """Builds the node OS structures.
2390

2391
    @type ninfo: L{objects.Node}
2392
    @param ninfo: the node to check
2393
    @param nresult: the remote results for the node
2394
    @param nimg: the node image object
2395

2396
    """
2397
    remote_os = nresult.get(constants.NV_OSLIST, None)
2398
    test = (not isinstance(remote_os, list) or
2399
            not compat.all(isinstance(v, list) and len(v) == 7
2400
                           for v in remote_os))
2401

    
2402
    self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2403
                  "node hasn't returned valid OS data")
2404

    
2405
    nimg.os_fail = test
2406

    
2407
    if test:
2408
      return
2409

    
2410
    os_dict = {}
2411

    
2412
    for (name, os_path, status, diagnose,
2413
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2414

    
2415
      if name not in os_dict:
2416
        os_dict[name] = []
2417

    
2418
      # parameters is a list of lists instead of list of tuples due to
2419
      # JSON lacking a real tuple type, fix it:
2420
      parameters = [tuple(v) for v in parameters]
2421
      os_dict[name].append((os_path, status, diagnose,
2422
                            set(variants), set(parameters), set(api_ver)))
2423

    
2424
    nimg.oslist = os_dict
2425

    
2426
  def _VerifyNodeOS(self, ninfo, nimg, base):
2427
    """Verifies the node OS list.
2428

2429
    @type ninfo: L{objects.Node}
2430
    @param ninfo: the node to check
2431
    @param nimg: the node image object
2432
    @param base: the 'template' node we match against (e.g. from the master)
2433

2434
    """
2435
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2436

    
2437
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2438
    for os_name, os_data in nimg.oslist.items():
2439
      assert os_data, "Empty OS status for OS %s?!" % os_name
2440
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2441
      self._ErrorIf(not f_status, constants.CV_ENODEOS, ninfo.name,
2442
                    "Invalid OS %s (located at %s): %s",
2443
                    os_name, f_path, f_diag)
2444
      self._ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, ninfo.name,
2445
                    "OS '%s' has multiple entries"
2446
                    " (first one shadows the rest): %s",
2447
                    os_name, utils.CommaJoin([v[0] for v in os_data]))
2448
      # comparisons with the 'base' image
2449
      test = os_name not in base.oslist
2450
      self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2451
                    "Extra OS %s not present on reference node (%s)",
2452
                    os_name, self.cfg.GetNodeName(base.uuid))
2453
      if test:
2454
        continue
2455
      assert base.oslist[os_name], "Base node has empty OS status?"
2456
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2457
      if not b_status:
2458
        # base OS is invalid, skipping
2459
        continue
2460
      for kind, a, b in [("API version", f_api, b_api),
2461
                         ("variants list", f_var, b_var),
2462
                         ("parameters", beautify_params(f_param),
2463
                          beautify_params(b_param))]:
2464
        self._ErrorIf(a != b, constants.CV_ENODEOS, ninfo.name,
2465
                      "OS %s for %s differs from reference node %s:"
2466
                      " [%s] vs. [%s]", kind, os_name,
2467
                      self.cfg.GetNodeName(base.uuid),
2468
                      utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2469

    
2470
    # check any missing OSes
2471
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2472
    self._ErrorIf(missing, constants.CV_ENODEOS, ninfo.name,
2473
                  "OSes present on reference node %s"
2474
                  " but missing on this node: %s",
2475
                  self.cfg.GetNodeName(base.uuid), utils.CommaJoin(missing))
2476

    
2477
  def _VerifyAcceptedFileStoragePaths(self, ninfo, nresult, is_master):
2478
    """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2479

2480
    @type ninfo: L{objects.Node}
2481
    @param ninfo: the node to check
2482
    @param nresult: the remote results for the node
2483
    @type is_master: bool
2484
    @param is_master: Whether node is the master node
2485

2486
    """
2487
    cluster = self.cfg.GetClusterInfo()
2488
    if (is_master and
2489
        (cluster.IsFileStorageEnabled() or
2490
         cluster.IsSharedFileStorageEnabled())):
2491
      try:
2492
        fspaths = nresult[constants.NV_ACCEPTED_STORAGE_PATHS]
2493
      except KeyError:
2494
        # This should never happen
2495
        self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2496
                      "Node did not return forbidden file storage paths")
2497
      else:
2498
        self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2499
                      "Found forbidden file storage paths: %s",
2500
                      utils.CommaJoin(fspaths))
2501
    else:
2502
      self._ErrorIf(constants.NV_ACCEPTED_STORAGE_PATHS in nresult,
2503
                    constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2504
                    "Node should not have returned forbidden file storage"
2505
                    " paths")
2506

    
2507
  def _VerifyStoragePaths(self, ninfo, nresult, file_disk_template,
2508
                          verify_key, error_key):
2509
    """Verifies (file) storage paths.
2510

2511
    @type ninfo: L{objects.Node}
2512
    @param ninfo: the node to check
2513
    @param nresult: the remote results for the node
2514
    @type file_disk_template: string
2515
    @param file_disk_template: file-based disk template, whose directory
2516
        is supposed to be verified
2517
    @type verify_key: string
2518
    @param verify_key: key for the verification map of this file
2519
        verification step
2520
    @param error_key: error key to be added to the verification results
2521
        in case something goes wrong in this verification step
2522

2523
    """
2524
    assert (file_disk_template in
2525
            utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
2526
    cluster = self.cfg.GetClusterInfo()
2527
    if cluster.IsDiskTemplateEnabled(file_disk_template):
2528
      self._ErrorIf(
2529
          verify_key in nresult,
2530
          error_key, ninfo.name,
2531
          "The configured %s storage path is unusable: %s" %
2532
          (file_disk_template, nresult.get(verify_key)))
2533

    
2534
  def _VerifyFileStoragePaths(self, ninfo, nresult):
2535
    """Verifies (file) storage paths.
2536

2537
    @see: C{_VerifyStoragePaths}
2538

2539
    """
2540
    self._VerifyStoragePaths(
2541
        ninfo, nresult, constants.DT_FILE,
2542
        constants.NV_FILE_STORAGE_PATH,
2543
        constants.CV_ENODEFILESTORAGEPATHUNUSABLE)
2544

    
2545
  def _VerifySharedFileStoragePaths(self, ninfo, nresult):
2546
    """Verifies (file) storage paths.
2547

2548
    @see: C{_VerifyStoragePaths}
2549

2550
    """
2551
    self._VerifyStoragePaths(
2552
        ninfo, nresult, constants.DT_SHARED_FILE,
2553
        constants.NV_SHARED_FILE_STORAGE_PATH,
2554
        constants.CV_ENODESHAREDFILESTORAGEPATHUNUSABLE)
2555

    
2556
  def _VerifyOob(self, ninfo, nresult):
2557
    """Verifies out of band functionality of a node.
2558

2559
    @type ninfo: L{objects.Node}
2560
    @param ninfo: the node to check
2561
    @param nresult: the remote results for the node
2562

2563
    """
2564
    # We just have to verify the paths on master and/or master candidates
2565
    # as the oob helper is invoked on the master
2566
    if ((ninfo.master_candidate or ninfo.master_capable) and
2567
        constants.NV_OOB_PATHS in nresult):
2568
      for path_result in nresult[constants.NV_OOB_PATHS]:
2569
        self._ErrorIf(path_result, constants.CV_ENODEOOBPATH,
2570
                      ninfo.name, path_result)
2571

    
2572
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2573
    """Verifies and updates the node volume data.
2574

2575
    This function will update a L{NodeImage}'s internal structures
2576
    with data from the remote call.
2577

2578
    @type ninfo: L{objects.Node}
2579
    @param ninfo: the node to check
2580
    @param nresult: the remote results for the node
2581
    @param nimg: the node image object
2582
    @param vg_name: the configured VG name
2583

2584
    """
2585
    nimg.lvm_fail = True
2586
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2587
    if vg_name is None:
2588
      pass
2589
    elif isinstance(lvdata, basestring):
2590
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2591
                    "LVM problem on node: %s", utils.SafeEncode(lvdata))
2592
    elif not isinstance(lvdata, dict):
2593
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2594
                    "rpc call to node failed (lvlist)")
2595
    else:
2596
      nimg.volumes = lvdata
2597
      nimg.lvm_fail = False
2598

    
2599
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2600
    """Verifies and updates the node instance list.
2601

2602
    If the listing was successful, then updates this node's instance
2603
    list. Otherwise, it marks the RPC call as failed for the instance
2604
    list key.
2605

2606
    @type ninfo: L{objects.Node}
2607
    @param ninfo: the node to check
2608
    @param nresult: the remote results for the node
2609
    @param nimg: the node image object
2610

2611
    """
2612
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2613
    test = not isinstance(idata, list)
2614
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2615
                  "rpc call to node failed (instancelist): %s",
2616
                  utils.SafeEncode(str(idata)))
2617
    if test:
2618
      nimg.hyp_fail = True
2619
    else:
2620
      nimg.instances = [inst.uuid for (_, inst) in
2621
                        self.cfg.GetMultiInstanceInfoByName(idata)]
2622

    
2623
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2624
    """Verifies and computes a node information map
2625

2626
    @type ninfo: L{objects.Node}
2627
    @param ninfo: the node to check
2628
    @param nresult: the remote results for the node
2629
    @param nimg: the node image object
2630
    @param vg_name: the configured VG name
2631

2632
    """
2633
    # try to read free memory (from the hypervisor)
2634
    hv_info = nresult.get(constants.NV_HVINFO, None)
2635
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2636
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2637
                  "rpc call to node failed (hvinfo)")
2638
    if not test:
2639
      try:
2640
        nimg.mfree = int(hv_info["memory_free"])
2641
      except (ValueError, TypeError):
2642
        self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2643
                      "node returned invalid nodeinfo, check hypervisor")
2644

    
2645
    # FIXME: devise a free space model for file based instances as well
2646
    if vg_name is not None:
2647
      test = (constants.NV_VGLIST not in nresult or
2648
              vg_name not in nresult[constants.NV_VGLIST])
2649
      self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
2650
                    "node didn't return data for the volume group '%s'"
2651
                    " - it is either missing or broken", vg_name)
2652
      if not test:
2653
        try:
2654
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2655
        except (ValueError, TypeError):
2656
          self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2657
                        "node returned invalid LVM info, check LVM status")
2658

    
2659
  def _CollectDiskInfo(self, node_uuids, node_image, instanceinfo):
2660
    """Gets per-disk status information for all instances.
2661

2662
    @type node_uuids: list of strings
2663
    @param node_uuids: Node UUIDs
2664
    @type node_image: dict of (UUID, L{objects.Node})
2665
    @param node_image: Node objects
2666
    @type instanceinfo: dict of (UUID, L{objects.Instance})
2667
    @param instanceinfo: Instance objects
2668
    @rtype: {instance: {node: [(succes, payload)]}}
2669
    @return: a dictionary of per-instance dictionaries with nodes as
2670
        keys and disk information as values; the disk information is a
2671
        list of tuples (success, payload)
2672

2673
    """
2674
    node_disks = {}
2675
    node_disks_dev_inst_only = {}
2676
    diskless_instances = set()
2677
    diskless = constants.DT_DISKLESS
2678

    
2679
    for nuuid in node_uuids:
2680
      node_inst_uuids = list(itertools.chain(node_image[nuuid].pinst,
2681
                                             node_image[nuuid].sinst))
2682
      diskless_instances.update(uuid for uuid in node_inst_uuids
2683
                                if instanceinfo[uuid].disk_template == diskless)
2684
      disks = [(inst_uuid, disk)
2685
               for inst_uuid in node_inst_uuids
2686
               for disk in instanceinfo[inst_uuid].disks]
2687

    
2688
      if not disks:
2689
        # No need to collect data
2690
        continue
2691

    
2692
      node_disks[nuuid] = disks
2693

    
2694
      # _AnnotateDiskParams makes already copies of the disks
2695
      dev_inst_only = []
2696
      for (inst_uuid, dev) in disks:
2697
        (anno_disk,) = AnnotateDiskParams(instanceinfo[inst_uuid], [dev],
2698
                                          self.cfg)
2699
        dev_inst_only.append((anno_disk, instanceinfo[inst_uuid]))
2700

    
2701
      node_disks_dev_inst_only[nuuid] = dev_inst_only
2702

    
2703
    assert len(node_disks) == len(node_disks_dev_inst_only)
2704

    
2705
    # Collect data from all nodes with disks
2706
    result = self.rpc.call_blockdev_getmirrorstatus_multi(
2707
               node_disks.keys(), node_disks_dev_inst_only)
2708

    
2709
    assert len(result) == len(node_disks)
2710

    
2711
    instdisk = {}
2712

    
2713
    for (nuuid, nres) in result.items():
2714
      node = self.cfg.GetNodeInfo(nuuid)
2715
      disks = node_disks[node.uuid]
2716

    
2717
      if nres.offline:
2718
        # No data from this node
2719
        data = len(disks) * [(False, "node offline")]
2720
      else:
2721
        msg = nres.fail_msg
2722
        self._ErrorIf(msg, constants.CV_ENODERPC, node.name,
2723
                      "while getting disk information: %s", msg)
2724
        if msg:
2725
          # No data from this node
2726
          data = len(disks) * [(False, msg)]
2727
        else:
2728
          data = []
2729
          for idx, i in enumerate(nres.payload):
2730
            if isinstance(i, (tuple, list)) and len(i) == 2:
2731
              data.append(i)
2732
            else:
2733
              logging.warning("Invalid result from node %s, entry %d: %s",
2734
                              node.name, idx, i)
2735
              data.append((False, "Invalid result from the remote node"))
2736

    
2737
      for ((inst_uuid, _), status) in zip(disks, data):
2738
        instdisk.setdefault(inst_uuid, {}).setdefault(node.uuid, []) \
2739
          .append(status)
2740

    
2741
    # Add empty entries for diskless instances.
2742
    for inst_uuid in diskless_instances:
2743
      assert inst_uuid not in instdisk
2744
      instdisk[inst_uuid] = {}
2745

    
2746
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2747
                      len(nuuids) <= len(instanceinfo[inst].all_nodes) and
2748
                      compat.all(isinstance(s, (tuple, list)) and
2749
                                 len(s) == 2 for s in statuses)
2750
                      for inst, nuuids in instdisk.items()
2751
                      for nuuid, statuses in nuuids.items())
2752
    if __debug__:
2753
      instdisk_keys = set(instdisk)
2754
      instanceinfo_keys = set(instanceinfo)
2755
      assert instdisk_keys == instanceinfo_keys, \
2756
        ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2757
         (instdisk_keys, instanceinfo_keys))
2758

    
2759
    return instdisk
2760

    
2761
  @staticmethod
2762
  def _SshNodeSelector(group_uuid, all_nodes):
2763
    """Create endless iterators for all potential SSH check hosts.
2764

2765
    """
2766
    nodes = [node for node in all_nodes
2767
             if (node.group != group_uuid and
2768
                 not node.offline)]
2769
    keyfunc = operator.attrgetter("group")
2770

    
2771
    return map(itertools.cycle,
2772
               [sorted(map(operator.attrgetter("name"), names))
2773
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2774
                                                  keyfunc)])
2775

    
2776
  @classmethod
2777
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2778
    """Choose which nodes should talk to which other nodes.
2779

2780
    We will make nodes contact all nodes in their group, and one node from
2781
    every other group.
2782

2783
    @warning: This algorithm has a known issue if one node group is much
2784
      smaller than others (e.g. just one node). In such a case all other
2785
      nodes will talk to the single node.
2786

2787
    """
2788
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2789
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
2790

    
2791
    return (online_nodes,
2792
            dict((name, sorted([i.next() for i in sel]))
2793
                 for name in online_nodes))
2794

    
2795
  def BuildHooksEnv(self):
2796
    """Build hooks env.
2797

2798
    Cluster-Verify hooks just ran in the post phase and their failure makes
2799
    the output be logged in the verify output and the verification to fail.
2800

2801
    """
2802
    env = {
2803
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2804
      }
2805

    
2806
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2807
               for node in self.my_node_info.values())
2808

    
2809
    return env
2810

    
2811
  def BuildHooksNodes(self):
2812
    """Build hooks nodes.
2813

2814
    """
2815
    return ([], list(self.my_node_info.keys()))
2816

    
2817
  def Exec(self, feedback_fn):
2818
    """Verify integrity of the node group, performing various test on nodes.
2819

2820
    """
2821
    # This method has too many local variables. pylint: disable=R0914
2822
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
2823

    
2824
    if not self.my_node_uuids:
2825
      # empty node group
2826
      feedback_fn("* Empty node group, skipping verification")
2827
      return True
2828

    
2829
    self.bad = False
2830
    verbose = self.op.verbose
2831
    self._feedback_fn = feedback_fn
2832

    
2833
    vg_name = self.cfg.GetVGName()
2834
    drbd_helper = self.cfg.GetDRBDHelper()
2835
    cluster = self.cfg.GetClusterInfo()
2836
    hypervisors = cluster.enabled_hypervisors
2837
    node_data_list = self.my_node_info.values()
2838

    
2839
    i_non_redundant = [] # Non redundant instances
2840
    i_non_a_balanced = [] # Non auto-balanced instances
2841
    i_offline = 0 # Count of offline instances
2842
    n_offline = 0 # Count of offline nodes
2843
    n_drained = 0 # Count of nodes being drained
2844
    node_vol_should = {}
2845

    
2846
    # FIXME: verify OS list
2847

    
2848
    # File verification
2849
    filemap = ComputeAncillaryFiles(cluster, False)
2850

    
2851
    # do local checksums
2852
    master_node_uuid = self.master_node = self.cfg.GetMasterNode()
2853
    master_ip = self.cfg.GetMasterIP()
2854

    
2855
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_uuids))
2856

    
2857
    user_scripts = []
2858
    if self.cfg.GetUseExternalMipScript():
2859
      user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2860

    
2861
    node_verify_param = {
2862
      constants.NV_FILELIST:
2863
        map(vcluster.MakeVirtualPath,
2864
            utils.UniqueSequence(filename
2865
                                 for files in filemap
2866
                                 for filename in files)),
2867
      constants.NV_NODELIST:
2868
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2869
                                  self.all_node_info.values()),
2870
      constants.NV_HYPERVISOR: hypervisors,
2871
      constants.NV_HVPARAMS:
2872
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2873
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2874
                                 for node in node_data_list
2875
                                 if not node.offline],
2876
      constants.NV_INSTANCELIST: hypervisors,
2877
      constants.NV_VERSION: None,
2878
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2879
      constants.NV_NODESETUP: None,
2880
      constants.NV_TIME: None,
2881
      constants.NV_MASTERIP: (self.cfg.GetMasterNodeName(), master_ip),
2882
      constants.NV_OSLIST: None,
2883
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2884
      constants.NV_USERSCRIPTS: user_scripts,
2885
      }
2886

    
2887
    if vg_name is not None:
2888
      node_verify_param[constants.NV_VGLIST] = None
2889
      node_verify_param[constants.NV_LVLIST] = vg_name
2890
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2891

    
2892
    if cluster.IsDiskTemplateEnabled(constants.DT_DRBD8):
2893
      if drbd_helper:
2894
        node_verify_param[constants.NV_DRBDVERSION] = None
2895
        node_verify_param[constants.NV_DRBDLIST] = None
2896
        node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2897

    
2898
    if cluster.IsFileStorageEnabled() or \
2899
        cluster.IsSharedFileStorageEnabled():
2900
      # Load file storage paths only from master node
2901
      node_verify_param[constants.NV_ACCEPTED_STORAGE_PATHS] = \
2902
        self.cfg.GetMasterNodeName()
2903
      if cluster.IsFileStorageEnabled():
2904
        node_verify_param[constants.NV_FILE_STORAGE_PATH] = \
2905
          cluster.file_storage_dir
2906

    
2907
    # bridge checks
2908
    # FIXME: this needs to be changed per node-group, not cluster-wide
2909
    bridges = set()
2910
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2911
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2912
      bridges.add(default_nicpp[constants.NIC_LINK])
2913
    for inst_uuid in self.my_inst_info.values():
2914
      for nic in inst_uuid.nics:
2915
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
2916
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2917
          bridges.add(full_nic[constants.NIC_LINK])
2918

    
2919
    if bridges:
2920
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
2921

    
2922
    # Build our expected cluster state
2923
    node_image = dict((node.uuid, self.NodeImage(offline=node.offline,
2924
                                                 uuid=node.uuid,
2925
                                                 vm_capable=node.vm_capable))
2926
                      for node in node_data_list)
2927

    
2928
    # Gather OOB paths
2929
    oob_paths = []
2930
    for node in self.all_node_info.values():
2931
      path = SupportsOob(self.cfg, node)
2932
      if path and path not in oob_paths:
2933
        oob_paths.append(path)
2934

    
2935
    if oob_paths:
2936
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2937

    
2938
    for inst_uuid in self.my_inst_uuids:
2939
      instance = self.my_inst_info[inst_uuid]
2940
      if instance.admin_state == constants.ADMINST_OFFLINE:
2941
        i_offline += 1
2942

    
2943
      for nuuid in instance.all_nodes:
2944
        if nuuid not in node_image:
2945
          gnode = self.NodeImage(uuid=nuuid)
2946
          gnode.ghost = (nuuid not in self.all_node_info)
2947
          node_image[nuuid] = gnode
2948

    
2949
      instance.MapLVsByNode(node_vol_should)
2950

    
2951
      pnode = instance.primary_node
2952
      node_image[pnode].pinst.append(instance.uuid)
2953

    
2954
      for snode in instance.secondary_nodes:
2955
        nimg = node_image[snode]
2956
        nimg.sinst.append(instance.uuid)
2957
        if pnode not in nimg.sbp:
2958
          nimg.sbp[pnode] = []
2959
        nimg.sbp[pnode].append(instance.uuid)
2960

    
2961
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
2962
                                               self.my_node_info.keys())
2963
    # The value of exclusive_storage should be the same across the group, so if
2964
    # it's True for at least a node, we act as if it were set for all the nodes
2965
    self._exclusive_storage = compat.any(es_flags.values())
2966
    if self._exclusive_storage:
2967
      node_verify_param[constants.NV_EXCLUSIVEPVS] = True
2968

    
2969
    # At this point, we have the in-memory data structures complete,
2970
    # except for the runtime information, which we'll gather next
2971

    
2972
    # Due to the way our RPC system works, exact response times cannot be
2973
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2974
    # time before and after executing the request, we can at least have a time
2975
    # window.
2976
    nvinfo_starttime = time.time()
2977
    all_nvinfo = self.rpc.call_node_verify(self.my_node_uuids,
2978
                                           node_verify_param,
2979
                                           self.cfg.GetClusterName(),
2980
                                           self.cfg.GetClusterInfo().hvparams)
2981
    nvinfo_endtime = time.time()
2982

    
2983
    if self.extra_lv_nodes and vg_name is not None:
2984
      extra_lv_nvinfo = \
2985
          self.rpc.call_node_verify(self.extra_lv_nodes,
2986
                                    {constants.NV_LVLIST: vg_name},
2987
                                    self.cfg.GetClusterName(),
2988
                                    self.cfg.GetClusterInfo().hvparams)
2989
    else:
2990
      extra_lv_nvinfo = {}
2991

    
2992
    all_drbd_map = self.cfg.ComputeDRBDMap()
2993

    
2994
    feedback_fn("* Gathering disk information (%s nodes)" %
2995
                len(self.my_node_uuids))
2996
    instdisk = self._CollectDiskInfo(self.my_node_info.keys(), node_image,
2997
                                     self.my_inst_info)
2998

    
2999
    feedback_fn("* Verifying configuration file consistency")
3000

    
3001
    # If not all nodes are being checked, we need to make sure the master node
3002
    # and a non-checked vm_capable node are in the list.
3003
    absent_node_uuids = set(self.all_node_info).difference(self.my_node_info)
3004
    if absent_node_uuids:
3005
      vf_nvinfo = all_nvinfo.copy()
3006
      vf_node_info = list(self.my_node_info.values())
3007
      additional_node_uuids = []
3008
      if master_node_uuid not in self.my_node_info:
3009
        additional_node_uuids.append(master_node_uuid)
3010
        vf_node_info.append(self.all_node_info[master_node_uuid])
3011
      # Add the first vm_capable node we find which is not included,
3012
      # excluding the master node (which we already have)
3013
      for node_uuid in absent_node_uuids:
3014
        nodeinfo = self.all_node_info[node_uuid]
3015
        if (nodeinfo.vm_capable and not nodeinfo.offline and
3016
            node_uuid != master_node_uuid):
3017
          additional_node_uuids.append(node_uuid)
3018
          vf_node_info.append(self.all_node_info[node_uuid])
3019
          break
3020
      key = constants.NV_FILELIST
3021
      vf_nvinfo.update(self.rpc.call_node_verify(
3022
         additional_node_uuids, {key: node_verify_param[key]},
3023
         self.cfg.GetClusterName(), self.cfg.GetClusterInfo().hvparams))
3024
    else:
3025
      vf_nvinfo = all_nvinfo
3026
      vf_node_info = self.my_node_info.values()
3027

    
3028
    self._VerifyFiles(vf_node_info, master_node_uuid, vf_nvinfo, filemap)
3029

    
3030
    feedback_fn("* Verifying node status")
3031

    
3032
    refos_img = None
3033

    
3034
    for node_i in node_data_list:
3035
      nimg = node_image[node_i.uuid]
3036

    
3037
      if node_i.offline:
3038
        if verbose:
3039
          feedback_fn("* Skipping offline node %s" % (node_i.name,))
3040
        n_offline += 1
3041
        continue
3042

    
3043
      if node_i.uuid == master_node_uuid:
3044
        ntype = "master"
3045
      elif node_i.master_candidate:
3046
        ntype = "master candidate"
3047
      elif node_i.drained:
3048
        ntype = "drained"
3049
        n_drained += 1
3050
      else:
3051
        ntype = "regular"
3052
      if verbose:
3053
        feedback_fn("* Verifying node %s (%s)" % (node_i.name, ntype))
3054

    
3055
      msg = all_nvinfo[node_i.uuid].fail_msg
3056
      self._ErrorIf(msg, constants.CV_ENODERPC, node_i.name,
3057
                    "while contacting node: %s", msg)
3058
      if msg:
3059
        nimg.rpc_fail = True
3060
        continue
3061

    
3062
      nresult = all_nvinfo[node_i.uuid].payload
3063

    
3064
      nimg.call_ok = self._VerifyNode(node_i, nresult)
3065
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
3066
      self._VerifyNodeNetwork(node_i, nresult)
3067
      self._VerifyNodeUserScripts(node_i, nresult)
3068
      self._VerifyOob(node_i, nresult)
3069
      self._VerifyAcceptedFileStoragePaths(node_i, nresult,
3070
                                           node_i.uuid == master_node_uuid)
3071
      self._VerifyFileStoragePaths(node_i, nresult)
3072
      self._VerifySharedFileStoragePaths(node_i, nresult)
3073

    
3074
      if nimg.vm_capable:
3075
        self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
3076
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
3077
                             all_drbd_map)
3078

    
3079
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
3080
        self._UpdateNodeInstances(node_i, nresult, nimg)
3081
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
3082
        self._UpdateNodeOS(node_i, nresult, nimg)
3083

    
3084
        if not nimg.os_fail:
3085
          if refos_img is None:
3086
            refos_img = nimg
3087
          self._VerifyNodeOS(node_i, nimg, refos_img)
3088
        self._VerifyNodeBridges(node_i, nresult, bridges)
3089

    
3090
        # Check whether all running instances are primary for the node. (This
3091
        # can no longer be done from _VerifyInstance below, since some of the
3092
        # wrong instances could be from other node groups.)
3093
        non_primary_inst_uuids = set(nimg.instances).difference(nimg.pinst)
3094

    
3095
        for inst_uuid in non_primary_inst_uuids:
3096
          test = inst_uuid in self.all_inst_info
3097
          self._ErrorIf(test, constants.CV_EINSTANCEWRONGNODE,
3098
                        self.cfg.GetInstanceName(inst_uuid),
3099
                        "instance should not run on node %s", node_i.name)
3100
          self._ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
3101
                        "node is running unknown instance %s", inst_uuid)
3102

    
3103
    self._VerifyGroupDRBDVersion(all_nvinfo)
3104
    self._VerifyGroupLVM(node_image, vg_name)
3105

    
3106
    for node_uuid, result in extra_lv_nvinfo.items():
3107
      self._UpdateNodeVolumes(self.all_node_info[node_uuid], result.payload,
3108
                              node_image[node_uuid], vg_name)
3109

    
3110
    feedback_fn("* Verifying instance status")
3111
    for inst_uuid in self.my_inst_uuids:
3112
      instance = self.my_inst_info[inst_uuid]
3113
      if verbose:
3114
        feedback_fn("* Verifying instance %s" % instance.name)
3115
      self._VerifyInstance(instance, node_image, instdisk[inst_uuid])
3116

    
3117
      # If the instance is non-redundant we cannot survive losing its primary
3118
      # node, so we are not N+1 compliant.
3119
      if instance.disk_template not in constants.DTS_MIRRORED:
3120
        i_non_redundant.append(instance)
3121

    
3122
      if not cluster.FillBE(instance)[constants.BE_AUTO_BALANCE]:
3123
        i_non_a_balanced.append(instance)
3124

    
3125
    feedback_fn("* Verifying orphan volumes")
3126
    reserved = utils.FieldSet(*cluster.reserved_lvs)
3127

    
3128
    # We will get spurious "unknown volume" warnings if any node of this group
3129
    # is secondary for an instance whose primary is in another group. To avoid
3130
    # them, we find these instances and add their volumes to node_vol_should.
3131
    for instance in self.all_inst_info.values():
3132
      for secondary in instance.secondary_nodes:
3133
        if (secondary in self.my_node_info
3134
            and instance.name not in self.my_inst_info):
3135
          instance.MapLVsByNode(node_vol_should)
3136
          break
3137

    
3138
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
3139

    
3140
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
3141
      feedback_fn("* Verifying N+1 Memory redundancy")
3142
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
3143

    
3144
    feedback_fn("* Other Notes")
3145
    if i_non_redundant:
3146
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
3147
                  % len(i_non_redundant))
3148

    
3149
    if i_non_a_balanced:
3150
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
3151
                  % len(i_non_a_balanced))
3152

    
3153
    if i_offline:
3154
      feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
3155

    
3156
    if n_offline:
3157
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
3158

    
3159
    if n_drained:
3160
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
3161

    
3162
    return not self.bad
3163

    
3164
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
3165
    """Analyze the post-hooks' result
3166

3167
    This method analyses the hook result, handles it, and sends some
3168
    nicely-formatted feedback back to the user.
3169

3170
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
3171
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
3172
    @param hooks_results: the results of the multi-node hooks rpc call
3173
    @param feedback_fn: function used send feedback back to the caller
3174
    @param lu_result: previous Exec result
3175
    @return: the new Exec result, based on the previous result
3176
        and hook results
3177

3178
    """
3179
    # We only really run POST phase hooks, only for non-empty groups,
3180
    # and are only interested in their results
3181
    if not self.my_node_uuids:
3182
      # empty node group
3183
      pass
3184
    elif phase == constants.HOOKS_PHASE_POST:
3185
      # Used to change hooks' output to proper indentation
3186
      feedback_fn("* Hooks Results")
3187
      assert hooks_results, "invalid result from hooks"
3188

    
3189
      for node_name in hooks_results:
3190
        res = hooks_results[node_name]
3191
        msg = res.fail_msg
3192
        test = msg and not res.offline
3193
        self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3194
                      "Communication failure in hooks execution: %s", msg)
3195
        if res.offline or msg:
3196
          # No need to investigate payload if node is offline or gave
3197
          # an error.
3198
          continue
3199
        for script, hkr, output in res.payload:
3200
          test = hkr == constants.HKR_FAIL
3201
          self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3202
                        "Script %s failed, output:", script)
3203
          if test:
3204
            output = self._HOOKS_INDENT_RE.sub("      ", output)
3205
            feedback_fn("%s" % output)
3206
            lu_result = False
3207

    
3208
    return lu_result
3209

    
3210

    
3211
class LUClusterVerifyDisks(NoHooksLU):
3212
  """Verifies the cluster disks status.
3213

3214
  """
3215
  REQ_BGL = False
3216

    
3217
  def ExpandNames(self):
3218
    self.share_locks = ShareAll()
3219
    self.needed_locks = {
3220
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
3221
      }
3222

    
3223
  def Exec(self, feedback_fn):
3224
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3225

    
3226
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3227
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3228
                           for group in group_names])