Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / cluster.py @ 258de3fe

History | View | Annotate | Download (121 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with the cluster."""
23

    
24
import OpenSSL
25

    
26
import copy
27
import itertools
28
import logging
29
import operator
30
import os
31
import re
32
import time
33

    
34
from ganeti import compat
35
from ganeti import constants
36
from ganeti import errors
37
from ganeti import hypervisor
38
from ganeti import locking
39
from ganeti import masterd
40
from ganeti import netutils
41
from ganeti import objects
42
from ganeti import opcodes
43
from ganeti import pathutils
44
from ganeti import query
45
from ganeti import rpc
46
from ganeti import runtime
47
from ganeti import ssh
48
from ganeti import uidpool
49
from ganeti import utils
50
from ganeti import vcluster
51

    
52
from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
53
  ResultWithJobs
54
from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55
  ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56
  GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57
  GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58
  CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59
  ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob, \
60
  CheckIpolicyVsDiskTemplates, CheckDiskAccessModeValidity, \
61
  CheckDiskAccessModeConsistency
62

    
63
import ganeti.masterd.instance
64

    
65

    
66
class LUClusterActivateMasterIp(NoHooksLU):
67
  """Activate the master IP on the master node.
68

69
  """
70
  def Exec(self, feedback_fn):
71
    """Activate the master IP.
72

73
    """
74
    master_params = self.cfg.GetMasterNetworkParameters()
75
    ems = self.cfg.GetUseExternalMipScript()
76
    result = self.rpc.call_node_activate_master_ip(master_params.uuid,
77
                                                   master_params, ems)
78
    result.Raise("Could not activate the master IP")
79

    
80

    
81
class LUClusterDeactivateMasterIp(NoHooksLU):
82
  """Deactivate the master IP on the master node.
83

84
  """
85
  def Exec(self, feedback_fn):
86
    """Deactivate the master IP.
87

88
    """
89
    master_params = self.cfg.GetMasterNetworkParameters()
90
    ems = self.cfg.GetUseExternalMipScript()
91
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
92
                                                     master_params, ems)
93
    result.Raise("Could not deactivate the master IP")
94

    
95

    
96
class LUClusterConfigQuery(NoHooksLU):
97
  """Return configuration values.
98

99
  """
100
  REQ_BGL = False
101

    
102
  def CheckArguments(self):
103
    self.cq = ClusterQuery(None, self.op.output_fields, False)
104

    
105
  def ExpandNames(self):
106
    self.cq.ExpandNames(self)
107

    
108
  def DeclareLocks(self, level):
109
    self.cq.DeclareLocks(self, level)
110

    
111
  def Exec(self, feedback_fn):
112
    result = self.cq.OldStyleQuery(self)
113

    
114
    assert len(result) == 1
115

    
116
    return result[0]
117

    
118

    
119
class LUClusterDestroy(LogicalUnit):
120
  """Logical unit for destroying the cluster.
121

122
  """
123
  HPATH = "cluster-destroy"
124
  HTYPE = constants.HTYPE_CLUSTER
125

    
126
  def BuildHooksEnv(self):
127
    """Build hooks env.
128

129
    """
130
    return {
131
      "OP_TARGET": self.cfg.GetClusterName(),
132
      }
133

    
134
  def BuildHooksNodes(self):
135
    """Build hooks nodes.
136

137
    """
138
    return ([], [])
139

    
140
  def CheckPrereq(self):
141
    """Check prerequisites.
142

143
    This checks whether the cluster is empty.
144

145
    Any errors are signaled by raising errors.OpPrereqError.
146

147
    """
148
    master = self.cfg.GetMasterNode()
149

    
150
    nodelist = self.cfg.GetNodeList()
151
    if len(nodelist) != 1 or nodelist[0] != master:
152
      raise errors.OpPrereqError("There are still %d node(s) in"
153
                                 " this cluster." % (len(nodelist) - 1),
154
                                 errors.ECODE_INVAL)
155
    instancelist = self.cfg.GetInstanceList()
156
    if instancelist:
157
      raise errors.OpPrereqError("There are still %d instance(s) in"
158
                                 " this cluster." % len(instancelist),
159
                                 errors.ECODE_INVAL)
160

    
161
  def Exec(self, feedback_fn):
162
    """Destroys the cluster.
163

164
    """
165
    master_params = self.cfg.GetMasterNetworkParameters()
166

    
167
    # Run post hooks on master node before it's removed
168
    RunPostHook(self, self.cfg.GetNodeName(master_params.uuid))
169

    
170
    ems = self.cfg.GetUseExternalMipScript()
171
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
172
                                                     master_params, ems)
173
    result.Warn("Error disabling the master IP address", self.LogWarning)
174
    return master_params.uuid
175

    
176

    
177
class LUClusterPostInit(LogicalUnit):
178
  """Logical unit for running hooks after cluster initialization.
179

180
  """
181
  HPATH = "cluster-init"
182
  HTYPE = constants.HTYPE_CLUSTER
183

    
184
  def BuildHooksEnv(self):
185
    """Build hooks env.
186

187
    """
188
    return {
189
      "OP_TARGET": self.cfg.GetClusterName(),
190
      }
191

    
192
  def BuildHooksNodes(self):
193
    """Build hooks nodes.
194

195
    """
196
    return ([], [self.cfg.GetMasterNode()])
197

    
198
  def Exec(self, feedback_fn):
199
    """Nothing to do.
200

201
    """
202
    return True
203

    
204

    
205
class ClusterQuery(QueryBase):
206
  FIELDS = query.CLUSTER_FIELDS
207

    
208
  #: Do not sort (there is only one item)
209
  SORT_FIELD = None
210

    
211
  def ExpandNames(self, lu):
212
    lu.needed_locks = {}
213

    
214
    # The following variables interact with _QueryBase._GetNames
215
    self.wanted = locking.ALL_SET
216
    self.do_locking = self.use_locking
217

    
218
    if self.do_locking:
219
      raise errors.OpPrereqError("Can not use locking for cluster queries",
220
                                 errors.ECODE_INVAL)
221

    
222
  def DeclareLocks(self, lu, level):
223
    pass
224

    
225
  def _GetQueryData(self, lu):
226
    """Computes the list of nodes and their attributes.
227

228
    """
229
    # Locking is not used
230
    assert not (compat.any(lu.glm.is_owned(level)
231
                           for level in locking.LEVELS
232
                           if level != locking.LEVEL_CLUSTER) or
233
                self.do_locking or self.use_locking)
234

    
235
    if query.CQ_CONFIG in self.requested_data:
236
      cluster = lu.cfg.GetClusterInfo()
237
      nodes = lu.cfg.GetAllNodesInfo()
238
    else:
239
      cluster = NotImplemented
240
      nodes = NotImplemented
241

    
242
    if query.CQ_QUEUE_DRAINED in self.requested_data:
243
      drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
244
    else:
245
      drain_flag = NotImplemented
246

    
247
    if query.CQ_WATCHER_PAUSE in self.requested_data:
248
      master_node_uuid = lu.cfg.GetMasterNode()
249

    
250
      result = lu.rpc.call_get_watcher_pause(master_node_uuid)
251
      result.Raise("Can't retrieve watcher pause from master node '%s'" %
252
                   lu.cfg.GetMasterNodeName())
253

    
254
      watcher_pause = result.payload
255
    else:
256
      watcher_pause = NotImplemented
257

    
258
    return query.ClusterQueryData(cluster, nodes, drain_flag, watcher_pause)
259

    
260

    
261
class LUClusterQuery(NoHooksLU):
262
  """Query cluster configuration.
263

264
  """
265
  REQ_BGL = False
266

    
267
  def ExpandNames(self):
268
    self.needed_locks = {}
269

    
270
  def Exec(self, feedback_fn):
271
    """Return cluster config.
272

273
    """
274
    cluster = self.cfg.GetClusterInfo()
275
    os_hvp = {}
276

    
277
    # Filter just for enabled hypervisors
278
    for os_name, hv_dict in cluster.os_hvp.items():
279
      os_hvp[os_name] = {}
280
      for hv_name, hv_params in hv_dict.items():
281
        if hv_name in cluster.enabled_hypervisors:
282
          os_hvp[os_name][hv_name] = hv_params
283

    
284
    # Convert ip_family to ip_version
285
    primary_ip_version = constants.IP4_VERSION
286
    if cluster.primary_ip_family == netutils.IP6Address.family:
287
      primary_ip_version = constants.IP6_VERSION
288

    
289
    result = {
290
      "software_version": constants.RELEASE_VERSION,
291
      "protocol_version": constants.PROTOCOL_VERSION,
292
      "config_version": constants.CONFIG_VERSION,
293
      "os_api_version": max(constants.OS_API_VERSIONS),
294
      "export_version": constants.EXPORT_VERSION,
295
      "vcs_version": constants.VCS_VERSION,
296
      "architecture": runtime.GetArchInfo(),
297
      "name": cluster.cluster_name,
298
      "master": self.cfg.GetMasterNodeName(),
299
      "default_hypervisor": cluster.primary_hypervisor,
300
      "enabled_hypervisors": cluster.enabled_hypervisors,
301
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
302
                        for hypervisor_name in cluster.enabled_hypervisors]),
303
      "os_hvp": os_hvp,
304
      "beparams": cluster.beparams,
305
      "osparams": cluster.osparams,
306
      "ipolicy": cluster.ipolicy,
307
      "nicparams": cluster.nicparams,
308
      "ndparams": cluster.ndparams,
309
      "diskparams": cluster.diskparams,
310
      "candidate_pool_size": cluster.candidate_pool_size,
311
      "master_netdev": cluster.master_netdev,
312
      "master_netmask": cluster.master_netmask,
313
      "use_external_mip_script": cluster.use_external_mip_script,
314
      "volume_group_name": cluster.volume_group_name,
315
      "drbd_usermode_helper": cluster.drbd_usermode_helper,
316
      "file_storage_dir": cluster.file_storage_dir,
317
      "shared_file_storage_dir": cluster.shared_file_storage_dir,
318
      "maintain_node_health": cluster.maintain_node_health,
319
      "ctime": cluster.ctime,
320
      "mtime": cluster.mtime,
321
      "uuid": cluster.uuid,
322
      "tags": list(cluster.GetTags()),
323
      "uid_pool": cluster.uid_pool,
324
      "default_iallocator": cluster.default_iallocator,
325
      "reserved_lvs": cluster.reserved_lvs,
326
      "primary_ip_version": primary_ip_version,
327
      "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
328
      "hidden_os": cluster.hidden_os,
329
      "blacklisted_os": cluster.blacklisted_os,
330
      "enabled_disk_templates": cluster.enabled_disk_templates,
331
      }
332

    
333
    return result
334

    
335

    
336
class LUClusterRedistConf(NoHooksLU):
337
  """Force the redistribution of cluster configuration.
338

339
  This is a very simple LU.
340

341
  """
342
  REQ_BGL = False
343

    
344
  def ExpandNames(self):
345
    self.needed_locks = {
346
      locking.LEVEL_NODE: locking.ALL_SET,
347
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
348
    }
349
    self.share_locks = ShareAll()
350

    
351
  def Exec(self, feedback_fn):
352
    """Redistribute the configuration.
353

354
    """
355
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
356
    RedistributeAncillaryFiles(self)
357

    
358

    
359
class LUClusterRename(LogicalUnit):
360
  """Rename the cluster.
361

362
  """
363
  HPATH = "cluster-rename"
364
  HTYPE = constants.HTYPE_CLUSTER
365

    
366
  def BuildHooksEnv(self):
367
    """Build hooks env.
368

369
    """
370
    return {
371
      "OP_TARGET": self.cfg.GetClusterName(),
372
      "NEW_NAME": self.op.name,
373
      }
374

    
375
  def BuildHooksNodes(self):
376
    """Build hooks nodes.
377

378
    """
379
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
380

    
381
  def CheckPrereq(self):
382
    """Verify that the passed name is a valid one.
383

384
    """
385
    hostname = netutils.GetHostname(name=self.op.name,
386
                                    family=self.cfg.GetPrimaryIPFamily())
387

    
388
    new_name = hostname.name
389
    self.ip = new_ip = hostname.ip
390
    old_name = self.cfg.GetClusterName()
391
    old_ip = self.cfg.GetMasterIP()
392
    if new_name == old_name and new_ip == old_ip:
393
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
394
                                 " cluster has changed",
395
                                 errors.ECODE_INVAL)
396
    if new_ip != old_ip:
397
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
398
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
399
                                   " reachable on the network" %
400
                                   new_ip, errors.ECODE_NOTUNIQUE)
401

    
402
    self.op.name = new_name
403

    
404
  def Exec(self, feedback_fn):
405
    """Rename the cluster.
406

407
    """
408
    clustername = self.op.name
409
    new_ip = self.ip
410

    
411
    # shutdown the master IP
412
    master_params = self.cfg.GetMasterNetworkParameters()
413
    ems = self.cfg.GetUseExternalMipScript()
414
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
415
                                                     master_params, ems)
416
    result.Raise("Could not disable the master role")
417

    
418
    try:
419
      cluster = self.cfg.GetClusterInfo()
420
      cluster.cluster_name = clustername
421
      cluster.master_ip = new_ip
422
      self.cfg.Update(cluster, feedback_fn)
423

    
424
      # update the known hosts file
425
      ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
426
      node_list = self.cfg.GetOnlineNodeList()
427
      try:
428
        node_list.remove(master_params.uuid)
429
      except ValueError:
430
        pass
431
      UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
432
    finally:
433
      master_params.ip = new_ip
434
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
435
                                                     master_params, ems)
436
      result.Warn("Could not re-enable the master role on the master,"
437
                  " please restart manually", self.LogWarning)
438

    
439
    return clustername
440

    
441

    
442
class LUClusterRepairDiskSizes(NoHooksLU):
443
  """Verifies the cluster disks sizes.
444

445
  """
446
  REQ_BGL = False
447

    
448
  def ExpandNames(self):
449
    if self.op.instances:
450
      (_, self.wanted_names) = GetWantedInstances(self, self.op.instances)
451
      # Not getting the node allocation lock as only a specific set of
452
      # instances (and their nodes) is going to be acquired
453
      self.needed_locks = {
454
        locking.LEVEL_NODE_RES: [],
455
        locking.LEVEL_INSTANCE: self.wanted_names,
456
        }
457
      self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
458
    else:
459
      self.wanted_names = None
460
      self.needed_locks = {
461
        locking.LEVEL_NODE_RES: locking.ALL_SET,
462
        locking.LEVEL_INSTANCE: locking.ALL_SET,
463

    
464
        # This opcode is acquires the node locks for all instances
465
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
466
        }
467

    
468
    self.share_locks = {
469
      locking.LEVEL_NODE_RES: 1,
470
      locking.LEVEL_INSTANCE: 0,
471
      locking.LEVEL_NODE_ALLOC: 1,
472
      }
473

    
474
  def DeclareLocks(self, level):
475
    if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
476
      self._LockInstancesNodes(primary_only=True, level=level)
477

    
478
  def CheckPrereq(self):
479
    """Check prerequisites.
480

481
    This only checks the optional instance list against the existing names.
482

483
    """
484
    if self.wanted_names is None:
485
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
486

    
487
    self.wanted_instances = \
488
        map(compat.snd, self.cfg.GetMultiInstanceInfoByName(self.wanted_names))
489

    
490
  def _EnsureChildSizes(self, disk):
491
    """Ensure children of the disk have the needed disk size.
492

493
    This is valid mainly for DRBD8 and fixes an issue where the
494
    children have smaller disk size.
495

496
    @param disk: an L{ganeti.objects.Disk} object
497

498
    """
499
    if disk.dev_type == constants.DT_DRBD8:
500
      assert disk.children, "Empty children for DRBD8?"
501
      fchild = disk.children[0]
502
      mismatch = fchild.size < disk.size
503
      if mismatch:
504
        self.LogInfo("Child disk has size %d, parent %d, fixing",
505
                     fchild.size, disk.size)
506
        fchild.size = disk.size
507

    
508
      # and we recurse on this child only, not on the metadev
509
      return self._EnsureChildSizes(fchild) or mismatch
510
    else:
511
      return False
512

    
513
  def Exec(self, feedback_fn):
514
    """Verify the size of cluster disks.
515

516
    """
517
    # TODO: check child disks too
518
    # TODO: check differences in size between primary/secondary nodes
519
    per_node_disks = {}
520
    for instance in self.wanted_instances:
521
      pnode = instance.primary_node
522
      if pnode not in per_node_disks:
523
        per_node_disks[pnode] = []
524
      for idx, disk in enumerate(instance.disks):
525
        per_node_disks[pnode].append((instance, idx, disk))
526

    
527
    assert not (frozenset(per_node_disks.keys()) -
528
                self.owned_locks(locking.LEVEL_NODE_RES)), \
529
      "Not owning correct locks"
530
    assert not self.owned_locks(locking.LEVEL_NODE)
531

    
532
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
533
                                               per_node_disks.keys())
534

    
535
    changed = []
536
    for node_uuid, dskl in per_node_disks.items():
537
      if not dskl:
538
        # no disks on the node
539
        continue
540

    
541
      newl = [([v[2].Copy()], v[0]) for v in dskl]
542
      node_name = self.cfg.GetNodeName(node_uuid)
543
      result = self.rpc.call_blockdev_getdimensions(node_uuid, newl)
544
      if result.fail_msg:
545
        self.LogWarning("Failure in blockdev_getdimensions call to node"
546
                        " %s, ignoring", node_name)
547
        continue
548
      if len(result.payload) != len(dskl):
549
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
550
                        " result.payload=%s", node_name, len(dskl),
551
                        result.payload)
552
        self.LogWarning("Invalid result from node %s, ignoring node results",
553
                        node_name)
554
        continue
555
      for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
556
        if dimensions is None:
557
          self.LogWarning("Disk %d of instance %s did not return size"
558
                          " information, ignoring", idx, instance.name)
559
          continue
560
        if not isinstance(dimensions, (tuple, list)):
561
          self.LogWarning("Disk %d of instance %s did not return valid"
562
                          " dimension information, ignoring", idx,
563
                          instance.name)
564
          continue
565
        (size, spindles) = dimensions
566
        if not isinstance(size, (int, long)):
567
          self.LogWarning("Disk %d of instance %s did not return valid"
568
                          " size information, ignoring", idx, instance.name)
569
          continue
570
        size = size >> 20
571
        if size != disk.size:
572
          self.LogInfo("Disk %d of instance %s has mismatched size,"
573
                       " correcting: recorded %d, actual %d", idx,
574
                       instance.name, disk.size, size)
575
          disk.size = size
576
          self.cfg.Update(instance, feedback_fn)
577
          changed.append((instance.name, idx, "size", size))
578
        if es_flags[node_uuid]:
579
          if spindles is None:
580
            self.LogWarning("Disk %d of instance %s did not return valid"
581
                            " spindles information, ignoring", idx,
582
                            instance.name)
583
          elif disk.spindles is None or disk.spindles != spindles:
584
            self.LogInfo("Disk %d of instance %s has mismatched spindles,"
585
                         " correcting: recorded %s, actual %s",
586
                         idx, instance.name, disk.spindles, spindles)
587
            disk.spindles = spindles
588
            self.cfg.Update(instance, feedback_fn)
589
            changed.append((instance.name, idx, "spindles", disk.spindles))
590
        if self._EnsureChildSizes(disk):
591
          self.cfg.Update(instance, feedback_fn)
592
          changed.append((instance.name, idx, "size", disk.size))
593
    return changed
594

    
595

    
596
def _ValidateNetmask(cfg, netmask):
597
  """Checks if a netmask is valid.
598

599
  @type cfg: L{config.ConfigWriter}
600
  @param cfg: The cluster configuration
601
  @type netmask: int
602
  @param netmask: the netmask to be verified
603
  @raise errors.OpPrereqError: if the validation fails
604

605
  """
606
  ip_family = cfg.GetPrimaryIPFamily()
607
  try:
608
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
609
  except errors.ProgrammerError:
610
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
611
                               ip_family, errors.ECODE_INVAL)
612
  if not ipcls.ValidateNetmask(netmask):
613
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
614
                               (netmask), errors.ECODE_INVAL)
615

    
616

    
617
def CheckFileBasedStoragePathVsEnabledDiskTemplates(
618
    logging_warn_fn, file_storage_dir, enabled_disk_templates,
619
    file_disk_template):
620
  """Checks whether the given file-based storage directory is acceptable.
621

622
  Note: This function is public, because it is also used in bootstrap.py.
623

624
  @type logging_warn_fn: function
625
  @param logging_warn_fn: function which accepts a string and logs it
626
  @type file_storage_dir: string
627
  @param file_storage_dir: the directory to be used for file-based instances
628
  @type enabled_disk_templates: list of string
629
  @param enabled_disk_templates: the list of enabled disk templates
630
  @type file_disk_template: string
631
  @param file_disk_template: the file-based disk template for which the
632
      path should be checked
633

634
  """
635
  assert (file_disk_template in
636
          utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
637
  file_storage_enabled = file_disk_template in enabled_disk_templates
638
  if file_storage_dir is not None:
639
    if file_storage_dir == "":
640
      if file_storage_enabled:
641
        raise errors.OpPrereqError(
642
            "Unsetting the '%s' storage directory while having '%s' storage"
643
            " enabled is not permitted." %
644
            (file_disk_template, file_disk_template))
645
    else:
646
      if not file_storage_enabled:
647
        logging_warn_fn(
648
            "Specified a %s storage directory, although %s storage is not"
649
            " enabled." % (file_disk_template, file_disk_template))
650
  else:
651
    raise errors.ProgrammerError("Received %s storage dir with value"
652
                                 " 'None'." % file_disk_template)
653

    
654

    
655
def CheckFileStoragePathVsEnabledDiskTemplates(
656
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
657
  """Checks whether the given file storage directory is acceptable.
658

659
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
660

661
  """
662
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
663
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
664
      constants.DT_FILE)
665

    
666

    
667
def CheckSharedFileStoragePathVsEnabledDiskTemplates(
668
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
669
  """Checks whether the given shared file storage directory is acceptable.
670

671
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
672

673
  """
674
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
675
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
676
      constants.DT_SHARED_FILE)
677

    
678

    
679
class LUClusterSetParams(LogicalUnit):
680
  """Change the parameters of the cluster.
681

682
  """
683
  HPATH = "cluster-modify"
684
  HTYPE = constants.HTYPE_CLUSTER
685
  REQ_BGL = False
686

    
687
  def CheckArguments(self):
688
    """Check parameters
689

690
    """
691
    if self.op.uid_pool:
692
      uidpool.CheckUidPool(self.op.uid_pool)
693

    
694
    if self.op.add_uids:
695
      uidpool.CheckUidPool(self.op.add_uids)
696

    
697
    if self.op.remove_uids:
698
      uidpool.CheckUidPool(self.op.remove_uids)
699

    
700
    if self.op.master_netmask is not None:
701
      _ValidateNetmask(self.cfg, self.op.master_netmask)
702

    
703
    if self.op.diskparams:
704
      for dt_params in self.op.diskparams.values():
705
        utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
706
      try:
707
        utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
708
        CheckDiskAccessModeValidity(self.op.diskparams)
709
      except errors.OpPrereqError, err:
710
        raise errors.OpPrereqError("While verify diskparams options: %s" % err,
711
                                   errors.ECODE_INVAL)
712

    
713
  def ExpandNames(self):
714
    # FIXME: in the future maybe other cluster params won't require checking on
715
    # all nodes to be modified.
716
    # FIXME: This opcode changes cluster-wide settings. Is acquiring all
717
    # resource locks the right thing, shouldn't it be the BGL instead?
718
    self.needed_locks = {
719
      locking.LEVEL_NODE: locking.ALL_SET,
720
      locking.LEVEL_INSTANCE: locking.ALL_SET,
721
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
722
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
723
    }
724
    self.share_locks = ShareAll()
725

    
726
  def BuildHooksEnv(self):
727
    """Build hooks env.
728

729
    """
730
    return {
731
      "OP_TARGET": self.cfg.GetClusterName(),
732
      "NEW_VG_NAME": self.op.vg_name,
733
      }
734

    
735
  def BuildHooksNodes(self):
736
    """Build hooks nodes.
737

738
    """
739
    mn = self.cfg.GetMasterNode()
740
    return ([mn], [mn])
741

    
742
  def _CheckVgName(self, node_uuids, enabled_disk_templates,
743
                   new_enabled_disk_templates):
744
    """Check the consistency of the vg name on all nodes and in case it gets
745
       unset whether there are instances still using it.
746

747
    """
748
    lvm_is_enabled = utils.IsLvmEnabled(enabled_disk_templates)
749
    lvm_gets_enabled = utils.LvmGetsEnabled(enabled_disk_templates,
750
                                            new_enabled_disk_templates)
751
    current_vg_name = self.cfg.GetVGName()
752

    
753
    if self.op.vg_name == '':
754
      if lvm_is_enabled:
755
        raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
756
                                   " disk templates are or get enabled.")
757

    
758
    if self.op.vg_name is None:
759
      if current_vg_name is None and lvm_is_enabled:
760
        raise errors.OpPrereqError("Please specify a volume group when"
761
                                   " enabling lvm-based disk-templates.")
762

    
763
    if self.op.vg_name is not None and not self.op.vg_name:
764
      if self.cfg.HasAnyDiskOfType(constants.DT_PLAIN):
765
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
766
                                   " instances exist", errors.ECODE_INVAL)
767

    
768
    if (self.op.vg_name is not None and lvm_is_enabled) or \
769
        (self.cfg.GetVGName() is not None and lvm_gets_enabled):
770
      self._CheckVgNameOnNodes(node_uuids)
771

    
772
  def _CheckVgNameOnNodes(self, node_uuids):
773
    """Check the status of the volume group on each node.
774

775
    """
776
    vglist = self.rpc.call_vg_list(node_uuids)
777
    for node_uuid in node_uuids:
778
      msg = vglist[node_uuid].fail_msg
779
      if msg:
780
        # ignoring down node
781
        self.LogWarning("Error while gathering data on node %s"
782
                        " (ignoring node): %s",
783
                        self.cfg.GetNodeName(node_uuid), msg)
784
        continue
785
      vgstatus = utils.CheckVolumeGroupSize(vglist[node_uuid].payload,
786
                                            self.op.vg_name,
787
                                            constants.MIN_VG_SIZE)
788
      if vgstatus:
789
        raise errors.OpPrereqError("Error on node '%s': %s" %
790
                                   (self.cfg.GetNodeName(node_uuid), vgstatus),
791
                                   errors.ECODE_ENVIRON)
792

    
793
  @staticmethod
794
  def _GetDiskTemplateSetsInner(op_enabled_disk_templates,
795
                                old_enabled_disk_templates):
796
    """Computes three sets of disk templates.
797

798
    @see: C{_GetDiskTemplateSets} for more details.
799

800
    """
801
    enabled_disk_templates = None
802
    new_enabled_disk_templates = []
803
    disabled_disk_templates = []
804
    if op_enabled_disk_templates:
805
      enabled_disk_templates = op_enabled_disk_templates
806
      new_enabled_disk_templates = \
807
        list(set(enabled_disk_templates)
808
             - set(old_enabled_disk_templates))
809
      disabled_disk_templates = \
810
        list(set(old_enabled_disk_templates)
811
             - set(enabled_disk_templates))
812
    else:
813
      enabled_disk_templates = old_enabled_disk_templates
814
    return (enabled_disk_templates, new_enabled_disk_templates,
815
            disabled_disk_templates)
816

    
817
  def _GetDiskTemplateSets(self, cluster):
818
    """Computes three sets of disk templates.
819

820
    The three sets are:
821
      - disk templates that will be enabled after this operation (no matter if
822
        they were enabled before or not)
823
      - disk templates that get enabled by this operation (thus haven't been
824
        enabled before.)
825
      - disk templates that get disabled by this operation
826

827
    """
828
    return self._GetDiskTemplateSetsInner(self.op.enabled_disk_templates,
829
                                          cluster.enabled_disk_templates)
830

    
831
  def _CheckIpolicy(self, cluster, enabled_disk_templates):
832
    """Checks the ipolicy.
833

834
    @type cluster: C{objects.Cluster}
835
    @param cluster: the cluster's configuration
836
    @type enabled_disk_templates: list of string
837
    @param enabled_disk_templates: list of (possibly newly) enabled disk
838
      templates
839

840
    """
841
    # FIXME: write unit tests for this
842
    if self.op.ipolicy:
843
      self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
844
                                           group_policy=False)
845

    
846
      CheckIpolicyVsDiskTemplates(self.new_ipolicy,
847
                                  enabled_disk_templates)
848

    
849
      all_instances = self.cfg.GetAllInstancesInfo().values()
850
      violations = set()
851
      for group in self.cfg.GetAllNodeGroupsInfo().values():
852
        instances = frozenset([inst for inst in all_instances
853
                               if compat.any(nuuid in group.members
854
                                             for nuuid in inst.all_nodes)])
855
        new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
856
        ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
857
        new = ComputeNewInstanceViolations(ipol, new_ipolicy, instances,
858
                                           self.cfg)
859
        if new:
860
          violations.update(new)
861

    
862
      if violations:
863
        self.LogWarning("After the ipolicy change the following instances"
864
                        " violate them: %s",
865
                        utils.CommaJoin(utils.NiceSort(violations)))
866
    else:
867
      CheckIpolicyVsDiskTemplates(cluster.ipolicy,
868
                                  enabled_disk_templates)
869

    
870
  def _CheckDrbdHelperOnNodes(self, drbd_helper, node_uuids):
871
    """Checks whether the set DRBD helper actually exists on the nodes.
872

873
    @type drbd_helper: string
874
    @param drbd_helper: path of the drbd usermode helper binary
875
    @type node_uuids: list of strings
876
    @param node_uuids: list of node UUIDs to check for the helper
877

878
    """
879
    # checks given drbd helper on all nodes
880
    helpers = self.rpc.call_drbd_helper(node_uuids)
881
    for (_, ninfo) in self.cfg.GetMultiNodeInfo(node_uuids):
882
      if ninfo.offline:
883
        self.LogInfo("Not checking drbd helper on offline node %s",
884
                     ninfo.name)
885
        continue
886
      msg = helpers[ninfo.uuid].fail_msg
887
      if msg:
888
        raise errors.OpPrereqError("Error checking drbd helper on node"
889
                                   " '%s': %s" % (ninfo.name, msg),
890
                                   errors.ECODE_ENVIRON)
891
      node_helper = helpers[ninfo.uuid].payload
892
      if node_helper != drbd_helper:
893
        raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
894
                                   (ninfo.name, node_helper),
895
                                   errors.ECODE_ENVIRON)
896

    
897
  def _CheckDrbdHelper(self, node_uuids, drbd_enabled, drbd_gets_enabled):
898
    """Check the DRBD usermode helper.
899

900
    @type node_uuids: list of strings
901
    @param node_uuids: a list of nodes' UUIDs
902
    @type drbd_enabled: boolean
903
    @param drbd_enabled: whether DRBD will be enabled after this operation
904
      (no matter if it was disabled before or not)
905
    @type drbd_gets_enabled: boolen
906
    @param drbd_gets_enabled: true if DRBD was disabled before this
907
      operation, but will be enabled afterwards
908

909
    """
910
    if self.op.drbd_helper == '':
911
      if drbd_enabled:
912
        raise errors.OpPrereqError("Cannot disable drbd helper while"
913
                                   " DRBD is enabled.")
914
      if self.cfg.HasAnyDiskOfType(constants.DT_DRBD8):
915
        raise errors.OpPrereqError("Cannot disable drbd helper while"
916
                                   " drbd-based instances exist",
917
                                   errors.ECODE_INVAL)
918

    
919
    else:
920
      if self.op.drbd_helper is not None and drbd_enabled:
921
        self._CheckDrbdHelperOnNodes(self.op.drbd_helper, node_uuids)
922
      else:
923
        if drbd_gets_enabled:
924
          current_drbd_helper = self.cfg.GetClusterInfo().drbd_usermode_helper
925
          if current_drbd_helper is not None:
926
            self._CheckDrbdHelperOnNodes(current_drbd_helper, node_uuids)
927
          else:
928
            raise errors.OpPrereqError("Cannot enable DRBD without a"
929
                                       " DRBD usermode helper set.")
930

    
931
  def _CheckInstancesOfDisabledDiskTemplates(
932
      self, disabled_disk_templates):
933
    """Check whether we try to disable a disk template that is in use.
934

935
    @type disabled_disk_templates: list of string
936
    @param disabled_disk_templates: list of disk templates that are going to
937
      be disabled by this operation
938

939
    """
940
    for disk_template in disabled_disk_templates:
941
      if self.cfg.HasAnyDiskOfType(disk_template):
942
        raise errors.OpPrereqError(
943
            "Cannot disable disk template '%s', because there is at least one"
944
            " instance using it." % disk_template)
945

    
946
  def CheckPrereq(self):
947
    """Check prerequisites.
948

949
    This checks whether the given params don't conflict and
950
    if the given volume group is valid.
951

952
    """
953
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
954
    self.cluster = cluster = self.cfg.GetClusterInfo()
955

    
956
    vm_capable_node_uuids = [node.uuid
957
                             for node in self.cfg.GetAllNodesInfo().values()
958
                             if node.uuid in node_uuids and node.vm_capable]
959

    
960
    (enabled_disk_templates, new_enabled_disk_templates,
961
      disabled_disk_templates) = self._GetDiskTemplateSets(cluster)
962
    self._CheckInstancesOfDisabledDiskTemplates(disabled_disk_templates)
963

    
964
    self._CheckVgName(vm_capable_node_uuids, enabled_disk_templates,
965
                      new_enabled_disk_templates)
966

    
967
    if self.op.file_storage_dir is not None:
968
      CheckFileStoragePathVsEnabledDiskTemplates(
969
          self.LogWarning, self.op.file_storage_dir, enabled_disk_templates)
970

    
971
    if self.op.shared_file_storage_dir is not None:
972
      CheckSharedFileStoragePathVsEnabledDiskTemplates(
973
          self.LogWarning, self.op.shared_file_storage_dir,
974
          enabled_disk_templates)
975

    
976
    drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
977
    drbd_gets_enabled = constants.DT_DRBD8 in new_enabled_disk_templates
978
    self._CheckDrbdHelper(node_uuids, drbd_enabled, drbd_gets_enabled)
979

    
980
    # validate params changes
981
    if self.op.beparams:
982
      objects.UpgradeBeParams(self.op.beparams)
983
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
984
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
985

    
986
    if self.op.ndparams:
987
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
988
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
989

    
990
      # TODO: we need a more general way to handle resetting
991
      # cluster-level parameters to default values
992
      if self.new_ndparams["oob_program"] == "":
993
        self.new_ndparams["oob_program"] = \
994
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
995

    
996
    if self.op.hv_state:
997
      new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
998
                                           self.cluster.hv_state_static)
999
      self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
1000
                               for hv, values in new_hv_state.items())
1001

    
1002
    if self.op.disk_state:
1003
      new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
1004
                                               self.cluster.disk_state_static)
1005
      self.new_disk_state = \
1006
        dict((storage, dict((name, cluster.SimpleFillDiskState(values))
1007
                            for name, values in svalues.items()))
1008
             for storage, svalues in new_disk_state.items())
1009

    
1010
    self._CheckIpolicy(cluster, enabled_disk_templates)
1011

    
1012
    if self.op.nicparams:
1013
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1014
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
1015
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
1016
      nic_errors = []
1017

    
1018
      # check all instances for consistency
1019
      for instance in self.cfg.GetAllInstancesInfo().values():
1020
        for nic_idx, nic in enumerate(instance.nics):
1021
          params_copy = copy.deepcopy(nic.nicparams)
1022
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
1023

    
1024
          # check parameter syntax
1025
          try:
1026
            objects.NIC.CheckParameterSyntax(params_filled)
1027
          except errors.ConfigurationError, err:
1028
            nic_errors.append("Instance %s, nic/%d: %s" %
1029
                              (instance.name, nic_idx, err))
1030

    
1031
          # if we're moving instances to routed, check that they have an ip
1032
          target_mode = params_filled[constants.NIC_MODE]
1033
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
1034
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
1035
                              " address" % (instance.name, nic_idx))
1036
      if nic_errors:
1037
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
1038
                                   "\n".join(nic_errors), errors.ECODE_INVAL)
1039

    
1040
    # hypervisor list/parameters
1041
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
1042
    if self.op.hvparams:
1043
      for hv_name, hv_dict in self.op.hvparams.items():
1044
        if hv_name not in self.new_hvparams:
1045
          self.new_hvparams[hv_name] = hv_dict
1046
        else:
1047
          self.new_hvparams[hv_name].update(hv_dict)
1048

    
1049
    # disk template parameters
1050
    self.new_diskparams = objects.FillDict(cluster.diskparams, {})
1051
    if self.op.diskparams:
1052
      for dt_name, dt_params in self.op.diskparams.items():
1053
        if dt_name not in self.new_diskparams:
1054
          self.new_diskparams[dt_name] = dt_params
1055
        else:
1056
          self.new_diskparams[dt_name].update(dt_params)
1057
      CheckDiskAccessModeConsistency(self.op.diskparams, self.cfg)
1058

    
1059
    # os hypervisor parameters
1060
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
1061
    if self.op.os_hvp:
1062
      for os_name, hvs in self.op.os_hvp.items():
1063
        if os_name not in self.new_os_hvp:
1064
          self.new_os_hvp[os_name] = hvs
1065
        else:
1066
          for hv_name, hv_dict in hvs.items():
1067
            if hv_dict is None:
1068
              # Delete if it exists
1069
              self.new_os_hvp[os_name].pop(hv_name, None)
1070
            elif hv_name not in self.new_os_hvp[os_name]:
1071
              self.new_os_hvp[os_name][hv_name] = hv_dict
1072
            else:
1073
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
1074

    
1075
    # os parameters
1076
    self.new_osp = objects.FillDict(cluster.osparams, {})
1077
    if self.op.osparams:
1078
      for os_name, osp in self.op.osparams.items():
1079
        if os_name not in self.new_osp:
1080
          self.new_osp[os_name] = {}
1081

    
1082
        self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
1083
                                                 use_none=True)
1084

    
1085
        if not self.new_osp[os_name]:
1086
          # we removed all parameters
1087
          del self.new_osp[os_name]
1088
        else:
1089
          # check the parameter validity (remote check)
1090
          CheckOSParams(self, False, [self.cfg.GetMasterNode()],
1091
                        os_name, self.new_osp[os_name])
1092

    
1093
    # changes to the hypervisor list
1094
    if self.op.enabled_hypervisors is not None:
1095
      self.hv_list = self.op.enabled_hypervisors
1096
      for hv in self.hv_list:
1097
        # if the hypervisor doesn't already exist in the cluster
1098
        # hvparams, we initialize it to empty, and then (in both
1099
        # cases) we make sure to fill the defaults, as we might not
1100
        # have a complete defaults list if the hypervisor wasn't
1101
        # enabled before
1102
        if hv not in new_hvp:
1103
          new_hvp[hv] = {}
1104
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
1105
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
1106
    else:
1107
      self.hv_list = cluster.enabled_hypervisors
1108

    
1109
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1110
      # either the enabled list has changed, or the parameters have, validate
1111
      for hv_name, hv_params in self.new_hvparams.items():
1112
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1113
            (self.op.enabled_hypervisors and
1114
             hv_name in self.op.enabled_hypervisors)):
1115
          # either this is a new hypervisor, or its parameters have changed
1116
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1117
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1118
          hv_class.CheckParameterSyntax(hv_params)
1119
          CheckHVParams(self, node_uuids, hv_name, hv_params)
1120

    
1121
    self._CheckDiskTemplateConsistency()
1122

    
1123
    if self.op.os_hvp:
1124
      # no need to check any newly-enabled hypervisors, since the
1125
      # defaults have already been checked in the above code-block
1126
      for os_name, os_hvp in self.new_os_hvp.items():
1127
        for hv_name, hv_params in os_hvp.items():
1128
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1129
          # we need to fill in the new os_hvp on top of the actual hv_p
1130
          cluster_defaults = self.new_hvparams.get(hv_name, {})
1131
          new_osp = objects.FillDict(cluster_defaults, hv_params)
1132
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1133
          hv_class.CheckParameterSyntax(new_osp)
1134
          CheckHVParams(self, node_uuids, hv_name, new_osp)
1135

    
1136
    if self.op.default_iallocator:
1137
      alloc_script = utils.FindFile(self.op.default_iallocator,
1138
                                    constants.IALLOCATOR_SEARCH_PATH,
1139
                                    os.path.isfile)
1140
      if alloc_script is None:
1141
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
1142
                                   " specified" % self.op.default_iallocator,
1143
                                   errors.ECODE_INVAL)
1144

    
1145
  def _CheckDiskTemplateConsistency(self):
1146
    """Check whether the disk templates that are going to be disabled
1147
       are still in use by some instances.
1148

1149
    """
1150
    if self.op.enabled_disk_templates:
1151
      cluster = self.cfg.GetClusterInfo()
1152
      instances = self.cfg.GetAllInstancesInfo()
1153

    
1154
      disk_templates_to_remove = set(cluster.enabled_disk_templates) \
1155
        - set(self.op.enabled_disk_templates)
1156
      for instance in instances.itervalues():
1157
        if instance.disk_template in disk_templates_to_remove:
1158
          raise errors.OpPrereqError("Cannot disable disk template '%s',"
1159
                                     " because instance '%s' is using it." %
1160
                                     (instance.disk_template, instance.name))
1161

    
1162
  def _SetVgName(self, feedback_fn):
1163
    """Determines and sets the new volume group name.
1164

1165
    """
1166
    if self.op.vg_name is not None:
1167
      new_volume = self.op.vg_name
1168
      if not new_volume:
1169
        new_volume = None
1170
      if new_volume != self.cfg.GetVGName():
1171
        self.cfg.SetVGName(new_volume)
1172
      else:
1173
        feedback_fn("Cluster LVM configuration already in desired"
1174
                    " state, not changing")
1175

    
1176
  def _SetFileStorageDir(self, feedback_fn):
1177
    """Set the file storage directory.
1178

1179
    """
1180
    if self.op.file_storage_dir is not None:
1181
      if self.cluster.file_storage_dir == self.op.file_storage_dir:
1182
        feedback_fn("Global file storage dir already set to value '%s'"
1183
                    % self.cluster.file_storage_dir)
1184
      else:
1185
        self.cluster.file_storage_dir = self.op.file_storage_dir
1186

    
1187
  def _SetDrbdHelper(self, feedback_fn):
1188
    """Set the DRBD usermode helper.
1189

1190
    """
1191
    if self.op.drbd_helper is not None:
1192
      if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
1193
        feedback_fn("Note that you specified a drbd user helper, but did not"
1194
                    " enable the drbd disk template.")
1195
      new_helper = self.op.drbd_helper
1196
      if not new_helper:
1197
        new_helper = None
1198
      if new_helper != self.cfg.GetDRBDHelper():
1199
        self.cfg.SetDRBDHelper(new_helper)
1200
      else:
1201
        feedback_fn("Cluster DRBD helper already in desired state,"
1202
                    " not changing")
1203

    
1204
  def Exec(self, feedback_fn):
1205
    """Change the parameters of the cluster.
1206

1207
    """
1208
    if self.op.enabled_disk_templates:
1209
      self.cluster.enabled_disk_templates = \
1210
        list(self.op.enabled_disk_templates)
1211

    
1212
    self._SetVgName(feedback_fn)
1213
    self._SetFileStorageDir(feedback_fn)
1214
    self._SetDrbdHelper(feedback_fn)
1215

    
1216
    if self.op.hvparams:
1217
      self.cluster.hvparams = self.new_hvparams
1218
    if self.op.os_hvp:
1219
      self.cluster.os_hvp = self.new_os_hvp
1220
    if self.op.enabled_hypervisors is not None:
1221
      self.cluster.hvparams = self.new_hvparams
1222
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1223
    if self.op.beparams:
1224
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1225
    if self.op.nicparams:
1226
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1227
    if self.op.ipolicy:
1228
      self.cluster.ipolicy = self.new_ipolicy
1229
    if self.op.osparams:
1230
      self.cluster.osparams = self.new_osp
1231
    if self.op.ndparams:
1232
      self.cluster.ndparams = self.new_ndparams
1233
    if self.op.diskparams:
1234
      self.cluster.diskparams = self.new_diskparams
1235
    if self.op.hv_state:
1236
      self.cluster.hv_state_static = self.new_hv_state
1237
    if self.op.disk_state:
1238
      self.cluster.disk_state_static = self.new_disk_state
1239

    
1240
    if self.op.candidate_pool_size is not None:
1241
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1242
      # we need to update the pool size here, otherwise the save will fail
1243
      AdjustCandidatePool(self, [])
1244

    
1245
    if self.op.maintain_node_health is not None:
1246
      if self.op.maintain_node_health and not constants.ENABLE_CONFD:
1247
        feedback_fn("Note: CONFD was disabled at build time, node health"
1248
                    " maintenance is not useful (still enabling it)")
1249
      self.cluster.maintain_node_health = self.op.maintain_node_health
1250

    
1251
    if self.op.modify_etc_hosts is not None:
1252
      self.cluster.modify_etc_hosts = self.op.modify_etc_hosts
1253

    
1254
    if self.op.prealloc_wipe_disks is not None:
1255
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
1256

    
1257
    if self.op.add_uids is not None:
1258
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
1259

    
1260
    if self.op.remove_uids is not None:
1261
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
1262

    
1263
    if self.op.uid_pool is not None:
1264
      self.cluster.uid_pool = self.op.uid_pool
1265

    
1266
    if self.op.default_iallocator is not None:
1267
      self.cluster.default_iallocator = self.op.default_iallocator
1268

    
1269
    if self.op.reserved_lvs is not None:
1270
      self.cluster.reserved_lvs = self.op.reserved_lvs
1271

    
1272
    if self.op.use_external_mip_script is not None:
1273
      self.cluster.use_external_mip_script = self.op.use_external_mip_script
1274

    
1275
    def helper_os(aname, mods, desc):
1276
      desc += " OS list"
1277
      lst = getattr(self.cluster, aname)
1278
      for key, val in mods:
1279
        if key == constants.DDM_ADD:
1280
          if val in lst:
1281
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1282
          else:
1283
            lst.append(val)
1284
        elif key == constants.DDM_REMOVE:
1285
          if val in lst:
1286
            lst.remove(val)
1287
          else:
1288
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1289
        else:
1290
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
1291

    
1292
    if self.op.hidden_os:
1293
      helper_os("hidden_os", self.op.hidden_os, "hidden")
1294

    
1295
    if self.op.blacklisted_os:
1296
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1297

    
1298
    if self.op.master_netdev:
1299
      master_params = self.cfg.GetMasterNetworkParameters()
1300
      ems = self.cfg.GetUseExternalMipScript()
1301
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
1302
                  self.cluster.master_netdev)
1303
      result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
1304
                                                       master_params, ems)
1305
      if not self.op.force:
1306
        result.Raise("Could not disable the master ip")
1307
      else:
1308
        if result.fail_msg:
1309
          msg = ("Could not disable the master ip (continuing anyway): %s" %
1310
                 result.fail_msg)
1311
          feedback_fn(msg)
1312
      feedback_fn("Changing master_netdev from %s to %s" %
1313
                  (master_params.netdev, self.op.master_netdev))
1314
      self.cluster.master_netdev = self.op.master_netdev
1315

    
1316
    if self.op.master_netmask:
1317
      master_params = self.cfg.GetMasterNetworkParameters()
1318
      feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1319
      result = self.rpc.call_node_change_master_netmask(
1320
                 master_params.uuid, master_params.netmask,
1321
                 self.op.master_netmask, master_params.ip,
1322
                 master_params.netdev)
1323
      result.Warn("Could not change the master IP netmask", feedback_fn)
1324
      self.cluster.master_netmask = self.op.master_netmask
1325

    
1326
    self.cfg.Update(self.cluster, feedback_fn)
1327

    
1328
    if self.op.master_netdev:
1329
      master_params = self.cfg.GetMasterNetworkParameters()
1330
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
1331
                  self.op.master_netdev)
1332
      ems = self.cfg.GetUseExternalMipScript()
1333
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
1334
                                                     master_params, ems)
1335
      result.Warn("Could not re-enable the master ip on the master,"
1336
                  " please restart manually", self.LogWarning)
1337

    
1338

    
1339
class LUClusterVerify(NoHooksLU):
1340
  """Submits all jobs necessary to verify the cluster.
1341

1342
  """
1343
  REQ_BGL = False
1344

    
1345
  def ExpandNames(self):
1346
    self.needed_locks = {}
1347

    
1348
  def Exec(self, feedback_fn):
1349
    jobs = []
1350

    
1351
    if self.op.group_name:
1352
      groups = [self.op.group_name]
1353
      depends_fn = lambda: None
1354
    else:
1355
      groups = self.cfg.GetNodeGroupList()
1356

    
1357
      # Verify global configuration
1358
      jobs.append([
1359
        opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1360
        ])
1361

    
1362
      # Always depend on global verification
1363
      depends_fn = lambda: [(-len(jobs), [])]
1364

    
1365
    jobs.extend(
1366
      [opcodes.OpClusterVerifyGroup(group_name=group,
1367
                                    ignore_errors=self.op.ignore_errors,
1368
                                    depends=depends_fn())]
1369
      for group in groups)
1370

    
1371
    # Fix up all parameters
1372
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1373
      op.debug_simulate_errors = self.op.debug_simulate_errors
1374
      op.verbose = self.op.verbose
1375
      op.error_codes = self.op.error_codes
1376
      try:
1377
        op.skip_checks = self.op.skip_checks
1378
      except AttributeError:
1379
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1380

    
1381
    return ResultWithJobs(jobs)
1382

    
1383

    
1384
class _VerifyErrors(object):
1385
  """Mix-in for cluster/group verify LUs.
1386

1387
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1388
  self.op and self._feedback_fn to be available.)
1389

1390
  """
1391

    
1392
  ETYPE_FIELD = "code"
1393
  ETYPE_ERROR = "ERROR"
1394
  ETYPE_WARNING = "WARNING"
1395

    
1396
  def _Error(self, ecode, item, msg, *args, **kwargs):
1397
    """Format an error message.
1398

1399
    Based on the opcode's error_codes parameter, either format a
1400
    parseable error code, or a simpler error string.
1401

1402
    This must be called only from Exec and functions called from Exec.
1403

1404
    """
1405
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1406
    itype, etxt, _ = ecode
1407
    # If the error code is in the list of ignored errors, demote the error to a
1408
    # warning
1409
    if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1410
      ltype = self.ETYPE_WARNING
1411
    # first complete the msg
1412
    if args:
1413
      msg = msg % args
1414
    # then format the whole message
1415
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1416
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1417
    else:
1418
      if item:
1419
        item = " " + item
1420
      else:
1421
        item = ""
1422
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1423
    # and finally report it via the feedback_fn
1424
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1425
    # do not mark the operation as failed for WARN cases only
1426
    if ltype == self.ETYPE_ERROR:
1427
      self.bad = True
1428

    
1429
  def _ErrorIf(self, cond, *args, **kwargs):
1430
    """Log an error message if the passed condition is True.
1431

1432
    """
1433
    if (bool(cond)
1434
        or self.op.debug_simulate_errors): # pylint: disable=E1101
1435
      self._Error(*args, **kwargs)
1436

    
1437

    
1438
def _VerifyCertificate(filename):
1439
  """Verifies a certificate for L{LUClusterVerifyConfig}.
1440

1441
  @type filename: string
1442
  @param filename: Path to PEM file
1443

1444
  """
1445
  try:
1446
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1447
                                           utils.ReadFile(filename))
1448
  except Exception, err: # pylint: disable=W0703
1449
    return (LUClusterVerifyConfig.ETYPE_ERROR,
1450
            "Failed to load X509 certificate %s: %s" % (filename, err))
1451

    
1452
  (errcode, msg) = \
1453
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1454
                                constants.SSL_CERT_EXPIRATION_ERROR)
1455

    
1456
  if msg:
1457
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1458
  else:
1459
    fnamemsg = None
1460

    
1461
  if errcode is None:
1462
    return (None, fnamemsg)
1463
  elif errcode == utils.CERT_WARNING:
1464
    return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1465
  elif errcode == utils.CERT_ERROR:
1466
    return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1467

    
1468
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1469

    
1470

    
1471
def _GetAllHypervisorParameters(cluster, instances):
1472
  """Compute the set of all hypervisor parameters.
1473

1474
  @type cluster: L{objects.Cluster}
1475
  @param cluster: the cluster object
1476
  @param instances: list of L{objects.Instance}
1477
  @param instances: additional instances from which to obtain parameters
1478
  @rtype: list of (origin, hypervisor, parameters)
1479
  @return: a list with all parameters found, indicating the hypervisor they
1480
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1481

1482
  """
1483
  hvp_data = []
1484

    
1485
  for hv_name in cluster.enabled_hypervisors:
1486
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1487

    
1488
  for os_name, os_hvp in cluster.os_hvp.items():
1489
    for hv_name, hv_params in os_hvp.items():
1490
      if hv_params:
1491
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1492
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1493

    
1494
  # TODO: collapse identical parameter values in a single one
1495
  for instance in instances:
1496
    if instance.hvparams:
1497
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1498
                       cluster.FillHV(instance)))
1499

    
1500
  return hvp_data
1501

    
1502

    
1503
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1504
  """Verifies the cluster config.
1505

1506
  """
1507
  REQ_BGL = False
1508

    
1509
  def _VerifyHVP(self, hvp_data):
1510
    """Verifies locally the syntax of the hypervisor parameters.
1511

1512
    """
1513
    for item, hv_name, hv_params in hvp_data:
1514
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1515
             (item, hv_name))
1516
      try:
1517
        hv_class = hypervisor.GetHypervisorClass(hv_name)
1518
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1519
        hv_class.CheckParameterSyntax(hv_params)
1520
      except errors.GenericError, err:
1521
        self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1522

    
1523
  def ExpandNames(self):
1524
    self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1525
    self.share_locks = ShareAll()
1526

    
1527
  def CheckPrereq(self):
1528
    """Check prerequisites.
1529

1530
    """
1531
    # Retrieve all information
1532
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1533
    self.all_node_info = self.cfg.GetAllNodesInfo()
1534
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1535

    
1536
  def Exec(self, feedback_fn):
1537
    """Verify integrity of cluster, performing various test on nodes.
1538

1539
    """
1540
    self.bad = False
1541
    self._feedback_fn = feedback_fn
1542

    
1543
    feedback_fn("* Verifying cluster config")
1544

    
1545
    for msg in self.cfg.VerifyConfig():
1546
      self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1547

    
1548
    feedback_fn("* Verifying cluster certificate files")
1549

    
1550
    for cert_filename in pathutils.ALL_CERT_FILES:
1551
      (errcode, msg) = _VerifyCertificate(cert_filename)
1552
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1553

    
1554
    self._ErrorIf(not utils.CanRead(constants.LUXID_USER,
1555
                                    pathutils.NODED_CERT_FILE),
1556
                  constants.CV_ECLUSTERCERT,
1557
                  None,
1558
                  pathutils.NODED_CERT_FILE + " must be accessible by the " +
1559
                    constants.LUXID_USER + " user")
1560

    
1561
    feedback_fn("* Verifying hypervisor parameters")
1562

    
1563
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1564
                                                self.all_inst_info.values()))
1565

    
1566
    feedback_fn("* Verifying all nodes belong to an existing group")
1567

    
1568
    # We do this verification here because, should this bogus circumstance
1569
    # occur, it would never be caught by VerifyGroup, which only acts on
1570
    # nodes/instances reachable from existing node groups.
1571

    
1572
    dangling_nodes = set(node for node in self.all_node_info.values()
1573
                         if node.group not in self.all_group_info)
1574

    
1575
    dangling_instances = {}
1576
    no_node_instances = []
1577

    
1578
    for inst in self.all_inst_info.values():
1579
      if inst.primary_node in [node.uuid for node in dangling_nodes]:
1580
        dangling_instances.setdefault(inst.primary_node, []).append(inst)
1581
      elif inst.primary_node not in self.all_node_info:
1582
        no_node_instances.append(inst)
1583

    
1584
    pretty_dangling = [
1585
        "%s (%s)" %
1586
        (node.name,
1587
         utils.CommaJoin(inst.name for
1588
                         inst in dangling_instances.get(node.uuid, [])))
1589
        for node in dangling_nodes]
1590

    
1591
    self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1592
                  None,
1593
                  "the following nodes (and their instances) belong to a non"
1594
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1595

    
1596
    self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1597
                  None,
1598
                  "the following instances have a non-existing primary-node:"
1599
                  " %s", utils.CommaJoin(inst.name for
1600
                                         inst in no_node_instances))
1601

    
1602
    return not self.bad
1603

    
1604

    
1605
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1606
  """Verifies the status of a node group.
1607

1608
  """
1609
  HPATH = "cluster-verify"
1610
  HTYPE = constants.HTYPE_CLUSTER
1611
  REQ_BGL = False
1612

    
1613
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1614

    
1615
  class NodeImage(object):
1616
    """A class representing the logical and physical status of a node.
1617

1618
    @type uuid: string
1619
    @ivar uuid: the node UUID to which this object refers
1620
    @ivar volumes: a structure as returned from
1621
        L{ganeti.backend.GetVolumeList} (runtime)
1622
    @ivar instances: a list of running instances (runtime)
1623
    @ivar pinst: list of configured primary instances (config)
1624
    @ivar sinst: list of configured secondary instances (config)
1625
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1626
        instances for which this node is secondary (config)
1627
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1628
    @ivar dfree: free disk, as reported by the node (runtime)
1629
    @ivar offline: the offline status (config)
1630
    @type rpc_fail: boolean
1631
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1632
        not whether the individual keys were correct) (runtime)
1633
    @type lvm_fail: boolean
1634
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1635
    @type hyp_fail: boolean
1636
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1637
    @type ghost: boolean
1638
    @ivar ghost: whether this is a known node or not (config)
1639
    @type os_fail: boolean
1640
    @ivar os_fail: whether the RPC call didn't return valid OS data
1641
    @type oslist: list
1642
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1643
    @type vm_capable: boolean
1644
    @ivar vm_capable: whether the node can host instances
1645
    @type pv_min: float
1646
    @ivar pv_min: size in MiB of the smallest PVs
1647
    @type pv_max: float
1648
    @ivar pv_max: size in MiB of the biggest PVs
1649

1650
    """
1651
    def __init__(self, offline=False, uuid=None, vm_capable=True):
1652
      self.uuid = uuid
1653
      self.volumes = {}
1654
      self.instances = []
1655
      self.pinst = []
1656
      self.sinst = []
1657
      self.sbp = {}
1658
      self.mfree = 0
1659
      self.dfree = 0
1660
      self.offline = offline
1661
      self.vm_capable = vm_capable
1662
      self.rpc_fail = False
1663
      self.lvm_fail = False
1664
      self.hyp_fail = False
1665
      self.ghost = False
1666
      self.os_fail = False
1667
      self.oslist = {}
1668
      self.pv_min = None
1669
      self.pv_max = None
1670

    
1671
  def ExpandNames(self):
1672
    # This raises errors.OpPrereqError on its own:
1673
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1674

    
1675
    # Get instances in node group; this is unsafe and needs verification later
1676
    inst_uuids = \
1677
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1678

    
1679
    self.needed_locks = {
1680
      locking.LEVEL_INSTANCE: self.cfg.GetInstanceNames(inst_uuids),
1681
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1682
      locking.LEVEL_NODE: [],
1683

    
1684
      # This opcode is run by watcher every five minutes and acquires all nodes
1685
      # for a group. It doesn't run for a long time, so it's better to acquire
1686
      # the node allocation lock as well.
1687
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1688
      }
1689

    
1690
    self.share_locks = ShareAll()
1691

    
1692
  def DeclareLocks(self, level):
1693
    if level == locking.LEVEL_NODE:
1694
      # Get members of node group; this is unsafe and needs verification later
1695
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1696

    
1697
      # In Exec(), we warn about mirrored instances that have primary and
1698
      # secondary living in separate node groups. To fully verify that
1699
      # volumes for these instances are healthy, we will need to do an
1700
      # extra call to their secondaries. We ensure here those nodes will
1701
      # be locked.
1702
      for inst_name in self.owned_locks(locking.LEVEL_INSTANCE):
1703
        # Important: access only the instances whose lock is owned
1704
        instance = self.cfg.GetInstanceInfoByName(inst_name)
1705
        if instance.disk_template in constants.DTS_INT_MIRROR:
1706
          nodes.update(instance.secondary_nodes)
1707

    
1708
      self.needed_locks[locking.LEVEL_NODE] = nodes
1709

    
1710
  def CheckPrereq(self):
1711
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1712
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1713

    
1714
    group_node_uuids = set(self.group_info.members)
1715
    group_inst_uuids = \
1716
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1717

    
1718
    unlocked_node_uuids = \
1719
        group_node_uuids.difference(self.owned_locks(locking.LEVEL_NODE))
1720

    
1721
    unlocked_inst_uuids = \
1722
        group_inst_uuids.difference(
1723
          [self.cfg.GetInstanceInfoByName(name).uuid
1724
           for name in self.owned_locks(locking.LEVEL_INSTANCE)])
1725

    
1726
    if unlocked_node_uuids:
1727
      raise errors.OpPrereqError(
1728
        "Missing lock for nodes: %s" %
1729
        utils.CommaJoin(self.cfg.GetNodeNames(unlocked_node_uuids)),
1730
        errors.ECODE_STATE)
1731

    
1732
    if unlocked_inst_uuids:
1733
      raise errors.OpPrereqError(
1734
        "Missing lock for instances: %s" %
1735
        utils.CommaJoin(self.cfg.GetInstanceNames(unlocked_inst_uuids)),
1736
        errors.ECODE_STATE)
1737

    
1738
    self.all_node_info = self.cfg.GetAllNodesInfo()
1739
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1740

    
1741
    self.my_node_uuids = group_node_uuids
1742
    self.my_node_info = dict((node_uuid, self.all_node_info[node_uuid])
1743
                             for node_uuid in group_node_uuids)
1744

    
1745
    self.my_inst_uuids = group_inst_uuids
1746
    self.my_inst_info = dict((inst_uuid, self.all_inst_info[inst_uuid])
1747
                             for inst_uuid in group_inst_uuids)
1748

    
1749
    # We detect here the nodes that will need the extra RPC calls for verifying
1750
    # split LV volumes; they should be locked.
1751
    extra_lv_nodes = set()
1752

    
1753
    for inst in self.my_inst_info.values():
1754
      if inst.disk_template in constants.DTS_INT_MIRROR:
1755
        for nuuid in inst.all_nodes:
1756
          if self.all_node_info[nuuid].group != self.group_uuid:
1757
            extra_lv_nodes.add(nuuid)
1758

    
1759
    unlocked_lv_nodes = \
1760
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1761

    
1762
    if unlocked_lv_nodes:
1763
      raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1764
                                 utils.CommaJoin(unlocked_lv_nodes),
1765
                                 errors.ECODE_STATE)
1766
    self.extra_lv_nodes = list(extra_lv_nodes)
1767

    
1768
  def _VerifyNode(self, ninfo, nresult):
1769
    """Perform some basic validation on data returned from a node.
1770

1771
      - check the result data structure is well formed and has all the
1772
        mandatory fields
1773
      - check ganeti version
1774

1775
    @type ninfo: L{objects.Node}
1776
    @param ninfo: the node to check
1777
    @param nresult: the results from the node
1778
    @rtype: boolean
1779
    @return: whether overall this call was successful (and we can expect
1780
         reasonable values in the respose)
1781

1782
    """
1783
    # main result, nresult should be a non-empty dict
1784
    test = not nresult or not isinstance(nresult, dict)
1785
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1786
                  "unable to verify node: no data returned")
1787
    if test:
1788
      return False
1789

    
1790
    # compares ganeti version
1791
    local_version = constants.PROTOCOL_VERSION
1792
    remote_version = nresult.get("version", None)
1793
    test = not (remote_version and
1794
                isinstance(remote_version, (list, tuple)) and
1795
                len(remote_version) == 2)
1796
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1797
                  "connection to node returned invalid data")
1798
    if test:
1799
      return False
1800

    
1801
    test = local_version != remote_version[0]
1802
    self._ErrorIf(test, constants.CV_ENODEVERSION, ninfo.name,
1803
                  "incompatible protocol versions: master %s,"
1804
                  " node %s", local_version, remote_version[0])
1805
    if test:
1806
      return False
1807

    
1808
    # node seems compatible, we can actually try to look into its results
1809

    
1810
    # full package version
1811
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1812
                  constants.CV_ENODEVERSION, ninfo.name,
1813
                  "software version mismatch: master %s, node %s",
1814
                  constants.RELEASE_VERSION, remote_version[1],
1815
                  code=self.ETYPE_WARNING)
1816

    
1817
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1818
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1819
      for hv_name, hv_result in hyp_result.iteritems():
1820
        test = hv_result is not None
1821
        self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
1822
                      "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1823

    
1824
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1825
    if ninfo.vm_capable and isinstance(hvp_result, list):
1826
      for item, hv_name, hv_result in hvp_result:
1827
        self._ErrorIf(True, constants.CV_ENODEHV, ninfo.name,
1828
                      "hypervisor %s parameter verify failure (source %s): %s",
1829
                      hv_name, item, hv_result)
1830

    
1831
    test = nresult.get(constants.NV_NODESETUP,
1832
                       ["Missing NODESETUP results"])
1833
    self._ErrorIf(test, constants.CV_ENODESETUP, ninfo.name,
1834
                  "node setup error: %s", "; ".join(test))
1835

    
1836
    return True
1837

    
1838
  def _VerifyNodeTime(self, ninfo, nresult,
1839
                      nvinfo_starttime, nvinfo_endtime):
1840
    """Check the node time.
1841

1842
    @type ninfo: L{objects.Node}
1843
    @param ninfo: the node to check
1844
    @param nresult: the remote results for the node
1845
    @param nvinfo_starttime: the start time of the RPC call
1846
    @param nvinfo_endtime: the end time of the RPC call
1847

1848
    """
1849
    ntime = nresult.get(constants.NV_TIME, None)
1850
    try:
1851
      ntime_merged = utils.MergeTime(ntime)
1852
    except (ValueError, TypeError):
1853
      self._ErrorIf(True, constants.CV_ENODETIME, ninfo.name,
1854
                    "Node returned invalid time")
1855
      return
1856

    
1857
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1858
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1859
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1860
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1861
    else:
1862
      ntime_diff = None
1863

    
1864
    self._ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, ninfo.name,
1865
                  "Node time diverges by at least %s from master node time",
1866
                  ntime_diff)
1867

    
1868
  def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1869
    """Check the node LVM results and update info for cross-node checks.
1870

1871
    @type ninfo: L{objects.Node}
1872
    @param ninfo: the node to check
1873
    @param nresult: the remote results for the node
1874
    @param vg_name: the configured VG name
1875
    @type nimg: L{NodeImage}
1876
    @param nimg: node image
1877

1878
    """
1879
    if vg_name is None:
1880
      return
1881

    
1882
    # checks vg existence and size > 20G
1883
    vglist = nresult.get(constants.NV_VGLIST, None)
1884
    test = not vglist
1885
    self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
1886
                  "unable to check volume groups")
1887
    if not test:
1888
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1889
                                            constants.MIN_VG_SIZE)
1890
      self._ErrorIf(vgstatus, constants.CV_ENODELVM, ninfo.name, vgstatus)
1891

    
1892
    # Check PVs
1893
    (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1894
    for em in errmsgs:
1895
      self._Error(constants.CV_ENODELVM, ninfo.name, em)
1896
    if pvminmax is not None:
1897
      (nimg.pv_min, nimg.pv_max) = pvminmax
1898

    
1899
  def _VerifyGroupDRBDVersion(self, node_verify_infos):
1900
    """Check cross-node DRBD version consistency.
1901

1902
    @type node_verify_infos: dict
1903
    @param node_verify_infos: infos about nodes as returned from the
1904
      node_verify call.
1905

1906
    """
1907
    node_versions = {}
1908
    for node_uuid, ndata in node_verify_infos.items():
1909
      nresult = ndata.payload
1910
      version = nresult.get(constants.NV_DRBDVERSION, "Missing DRBD version")
1911
      node_versions[node_uuid] = version
1912

    
1913
    if len(set(node_versions.values())) > 1:
1914
      for node_uuid, version in sorted(node_versions.items()):
1915
        msg = "DRBD version mismatch: %s" % version
1916
        self._Error(constants.CV_ENODEDRBDHELPER, node_uuid, msg,
1917
                    code=self.ETYPE_WARNING)
1918

    
1919
  def _VerifyGroupLVM(self, node_image, vg_name):
1920
    """Check cross-node consistency in LVM.
1921

1922
    @type node_image: dict
1923
    @param node_image: info about nodes, mapping from node to names to
1924
      L{NodeImage} objects
1925
    @param vg_name: the configured VG name
1926

1927
    """
1928
    if vg_name is None:
1929
      return
1930

    
1931
    # Only exclusive storage needs this kind of checks
1932
    if not self._exclusive_storage:
1933
      return
1934

    
1935
    # exclusive_storage wants all PVs to have the same size (approximately),
1936
    # if the smallest and the biggest ones are okay, everything is fine.
1937
    # pv_min is None iff pv_max is None
1938
    vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1939
    if not vals:
1940
      return
1941
    (pvmin, minnode_uuid) = min((ni.pv_min, ni.uuid) for ni in vals)
1942
    (pvmax, maxnode_uuid) = max((ni.pv_max, ni.uuid) for ni in vals)
1943
    bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1944
    self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1945
                  "PV sizes differ too much in the group; smallest (%s MB) is"
1946
                  " on %s, biggest (%s MB) is on %s",
1947
                  pvmin, self.cfg.GetNodeName(minnode_uuid),
1948
                  pvmax, self.cfg.GetNodeName(maxnode_uuid))
1949

    
1950
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1951
    """Check the node bridges.
1952

1953
    @type ninfo: L{objects.Node}
1954
    @param ninfo: the node to check
1955
    @param nresult: the remote results for the node
1956
    @param bridges: the expected list of bridges
1957

1958
    """
1959
    if not bridges:
1960
      return
1961

    
1962
    missing = nresult.get(constants.NV_BRIDGES, None)
1963
    test = not isinstance(missing, list)
1964
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1965
                  "did not return valid bridge information")
1966
    if not test:
1967
      self._ErrorIf(bool(missing), constants.CV_ENODENET, ninfo.name,
1968
                    "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1969

    
1970
  def _VerifyNodeUserScripts(self, ninfo, nresult):
1971
    """Check the results of user scripts presence and executability on the node
1972

1973
    @type ninfo: L{objects.Node}
1974
    @param ninfo: the node to check
1975
    @param nresult: the remote results for the node
1976

1977
    """
1978
    test = not constants.NV_USERSCRIPTS in nresult
1979
    self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1980
                  "did not return user scripts information")
1981

    
1982
    broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
1983
    if not test:
1984
      self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1985
                    "user scripts not present or not executable: %s" %
1986
                    utils.CommaJoin(sorted(broken_scripts)))
1987

    
1988
  def _VerifyNodeNetwork(self, ninfo, nresult):
1989
    """Check the node network connectivity results.
1990

1991
    @type ninfo: L{objects.Node}
1992
    @param ninfo: the node to check
1993
    @param nresult: the remote results for the node
1994

1995
    """
1996
    test = constants.NV_NODELIST not in nresult
1997
    self._ErrorIf(test, constants.CV_ENODESSH, ninfo.name,
1998
                  "node hasn't returned node ssh connectivity data")
1999
    if not test:
2000
      if nresult[constants.NV_NODELIST]:
2001
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
2002
          self._ErrorIf(True, constants.CV_ENODESSH, ninfo.name,
2003
                        "ssh communication with node '%s': %s", a_node, a_msg)
2004

    
2005
    test = constants.NV_NODENETTEST not in nresult
2006
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
2007
                  "node hasn't returned node tcp connectivity data")
2008
    if not test:
2009
      if nresult[constants.NV_NODENETTEST]:
2010
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
2011
        for anode in nlist:
2012
          self._ErrorIf(True, constants.CV_ENODENET, ninfo.name,
2013
                        "tcp communication with node '%s': %s",
2014
                        anode, nresult[constants.NV_NODENETTEST][anode])
2015

    
2016
    test = constants.NV_MASTERIP not in nresult
2017
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
2018
                  "node hasn't returned node master IP reachability data")
2019
    if not test:
2020
      if not nresult[constants.NV_MASTERIP]:
2021
        if ninfo.uuid == self.master_node:
2022
          msg = "the master node cannot reach the master IP (not configured?)"
2023
        else:
2024
          msg = "cannot reach the master IP"
2025
        self._ErrorIf(True, constants.CV_ENODENET, ninfo.name, msg)
2026

    
2027
  def _VerifyInstance(self, instance, node_image, diskstatus):
2028
    """Verify an instance.
2029

2030
    This function checks to see if the required block devices are
2031
    available on the instance's node, and that the nodes are in the correct
2032
    state.
2033

2034
    """
2035
    pnode_uuid = instance.primary_node
2036
    pnode_img = node_image[pnode_uuid]
2037
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
2038

    
2039
    node_vol_should = {}
2040
    instance.MapLVsByNode(node_vol_should)
2041

    
2042
    cluster = self.cfg.GetClusterInfo()
2043
    ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2044
                                                            self.group_info)
2045
    err = ComputeIPolicyInstanceViolation(ipolicy, instance, self.cfg)
2046
    self._ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance.name,
2047
                  utils.CommaJoin(err), code=self.ETYPE_WARNING)
2048

    
2049
    for node_uuid in node_vol_should:
2050
      n_img = node_image[node_uuid]
2051
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
2052
        # ignore missing volumes on offline or broken nodes
2053
        continue
2054
      for volume in node_vol_should[node_uuid]:
2055
        test = volume not in n_img.volumes
2056
        self._ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance.name,
2057
                      "volume %s missing on node %s", volume,
2058
                      self.cfg.GetNodeName(node_uuid))
2059

    
2060
    if instance.admin_state == constants.ADMINST_UP:
2061
      test = instance.uuid not in pnode_img.instances and not pnode_img.offline
2062
      self._ErrorIf(test, constants.CV_EINSTANCEDOWN, instance.name,
2063
                    "instance not running on its primary node %s",
2064
                     self.cfg.GetNodeName(pnode_uuid))
2065
      self._ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE,
2066
                    instance.name, "instance is marked as running and lives on"
2067
                    " offline node %s", self.cfg.GetNodeName(pnode_uuid))
2068

    
2069
    diskdata = [(nname, success, status, idx)
2070
                for (nname, disks) in diskstatus.items()
2071
                for idx, (success, status) in enumerate(disks)]
2072

    
2073
    for nname, success, bdev_status, idx in diskdata:
2074
      # the 'ghost node' construction in Exec() ensures that we have a
2075
      # node here
2076
      snode = node_image[nname]
2077
      bad_snode = snode.ghost or snode.offline
2078
      self._ErrorIf(instance.disks_active and
2079
                    not success and not bad_snode,
2080
                    constants.CV_EINSTANCEFAULTYDISK, instance.name,
2081
                    "couldn't retrieve status for disk/%s on %s: %s",
2082
                    idx, self.cfg.GetNodeName(nname), bdev_status)
2083

    
2084
      if instance.disks_active and success and \
2085
         (bdev_status.is_degraded or
2086
          bdev_status.ldisk_status != constants.LDS_OKAY):
2087
        msg = "disk/%s on %s" % (idx, self.cfg.GetNodeName(nname))
2088
        if bdev_status.is_degraded:
2089
          msg += " is degraded"
2090
        if bdev_status.ldisk_status != constants.LDS_OKAY:
2091
          msg += "; state is '%s'" % \
2092
                 constants.LDS_NAMES[bdev_status.ldisk_status]
2093

    
2094
        self._Error(constants.CV_EINSTANCEFAULTYDISK, instance.name, msg)
2095

    
2096
    self._ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2097
                  constants.CV_ENODERPC, self.cfg.GetNodeName(pnode_uuid),
2098
                  "instance %s, connection to primary node failed",
2099
                  instance.name)
2100

    
2101
    self._ErrorIf(len(instance.secondary_nodes) > 1,
2102
                  constants.CV_EINSTANCELAYOUT, instance.name,
2103
                  "instance has multiple secondary nodes: %s",
2104
                  utils.CommaJoin(instance.secondary_nodes),
2105
                  code=self.ETYPE_WARNING)
2106

    
2107
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, instance.all_nodes)
2108
    if any(es_flags.values()):
2109
      if instance.disk_template not in constants.DTS_EXCL_STORAGE:
2110
        # Disk template not compatible with exclusive_storage: no instance
2111
        # node should have the flag set
2112
        es_nodes = [n
2113
                    for (n, es) in es_flags.items()
2114
                    if es]
2115
        self._Error(constants.CV_EINSTANCEUNSUITABLENODE, instance.name,
2116
                    "instance has template %s, which is not supported on nodes"
2117
                    " that have exclusive storage set: %s",
2118
                    instance.disk_template,
2119
                    utils.CommaJoin(self.cfg.GetNodeNames(es_nodes)))
2120
      for (idx, disk) in enumerate(instance.disks):
2121
        self._ErrorIf(disk.spindles is None,
2122
                      constants.CV_EINSTANCEMISSINGCFGPARAMETER, instance.name,
2123
                      "number of spindles not configured for disk %s while"
2124
                      " exclusive storage is enabled, try running"
2125
                      " gnt-cluster repair-disk-sizes", idx)
2126

    
2127
    if instance.disk_template in constants.DTS_INT_MIRROR:
2128
      instance_nodes = utils.NiceSort(instance.all_nodes)
2129
      instance_groups = {}
2130

    
2131
      for node_uuid in instance_nodes:
2132
        instance_groups.setdefault(self.all_node_info[node_uuid].group,
2133
                                   []).append(node_uuid)
2134

    
2135
      pretty_list = [
2136
        "%s (group %s)" % (utils.CommaJoin(self.cfg.GetNodeNames(nodes)),
2137
                           groupinfo[group].name)
2138
        # Sort so that we always list the primary node first.
2139
        for group, nodes in sorted(instance_groups.items(),
2140
                                   key=lambda (_, nodes): pnode_uuid in nodes,
2141
                                   reverse=True)]
2142

    
2143
      self._ErrorIf(len(instance_groups) > 1,
2144
                    constants.CV_EINSTANCESPLITGROUPS,
2145
                    instance.name, "instance has primary and secondary nodes in"
2146
                    " different groups: %s", utils.CommaJoin(pretty_list),
2147
                    code=self.ETYPE_WARNING)
2148

    
2149
    inst_nodes_offline = []
2150
    for snode in instance.secondary_nodes:
2151
      s_img = node_image[snode]
2152
      self._ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
2153
                    self.cfg.GetNodeName(snode),
2154
                    "instance %s, connection to secondary node failed",
2155
                    instance.name)
2156

    
2157
      if s_img.offline:
2158
        inst_nodes_offline.append(snode)
2159

    
2160
    # warn that the instance lives on offline nodes
2161
    self._ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE,
2162
                  instance.name, "instance has offline secondary node(s) %s",
2163
                  utils.CommaJoin(self.cfg.GetNodeNames(inst_nodes_offline)))
2164
    # ... or ghost/non-vm_capable nodes
2165
    for node_uuid in instance.all_nodes:
2166
      self._ErrorIf(node_image[node_uuid].ghost, constants.CV_EINSTANCEBADNODE,
2167
                    instance.name, "instance lives on ghost node %s",
2168
                    self.cfg.GetNodeName(node_uuid))
2169
      self._ErrorIf(not node_image[node_uuid].vm_capable,
2170
                    constants.CV_EINSTANCEBADNODE, instance.name,
2171
                    "instance lives on non-vm_capable node %s",
2172
                    self.cfg.GetNodeName(node_uuid))
2173

    
2174
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2175
    """Verify if there are any unknown volumes in the cluster.
2176

2177
    The .os, .swap and backup volumes are ignored. All other volumes are
2178
    reported as unknown.
2179

2180
    @type reserved: L{ganeti.utils.FieldSet}
2181
    @param reserved: a FieldSet of reserved volume names
2182

2183
    """
2184
    for node_uuid, n_img in node_image.items():
2185
      if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
2186
          self.all_node_info[node_uuid].group != self.group_uuid):
2187
        # skip non-healthy nodes
2188
        continue
2189
      for volume in n_img.volumes:
2190
        test = ((node_uuid not in node_vol_should or
2191
                volume not in node_vol_should[node_uuid]) and
2192
                not reserved.Matches(volume))
2193
        self._ErrorIf(test, constants.CV_ENODEORPHANLV,
2194
                      self.cfg.GetNodeName(node_uuid),
2195
                      "volume %s is unknown", volume)
2196

    
2197
  def _VerifyNPlusOneMemory(self, node_image, all_insts):
2198
    """Verify N+1 Memory Resilience.
2199

2200
    Check that if one single node dies we can still start all the
2201
    instances it was primary for.
2202

2203
    """
2204
    cluster_info = self.cfg.GetClusterInfo()
2205
    for node_uuid, n_img in node_image.items():
2206
      # This code checks that every node which is now listed as
2207
      # secondary has enough memory to host all instances it is
2208
      # supposed to should a single other node in the cluster fail.
2209
      # FIXME: not ready for failover to an arbitrary node
2210
      # FIXME: does not support file-backed instances
2211
      # WARNING: we currently take into account down instances as well
2212
      # as up ones, considering that even if they're down someone
2213
      # might want to start them even in the event of a node failure.
2214
      if n_img.offline or \
2215
         self.all_node_info[node_uuid].group != self.group_uuid:
2216
        # we're skipping nodes marked offline and nodes in other groups from
2217
        # the N+1 warning, since most likely we don't have good memory
2218
        # information from them; we already list instances living on such
2219
        # nodes, and that's enough warning
2220
        continue
2221
      #TODO(dynmem): also consider ballooning out other instances
2222
      for prinode, inst_uuids in n_img.sbp.items():
2223
        needed_mem = 0
2224
        for inst_uuid in inst_uuids:
2225
          bep = cluster_info.FillBE(all_insts[inst_uuid])
2226
          if bep[constants.BE_AUTO_BALANCE]:
2227
            needed_mem += bep[constants.BE_MINMEM]
2228
        test = n_img.mfree < needed_mem
2229
        self._ErrorIf(test, constants.CV_ENODEN1,
2230
                      self.cfg.GetNodeName(node_uuid),
2231
                      "not enough memory to accomodate instance failovers"
2232
                      " should node %s fail (%dMiB needed, %dMiB available)",
2233
                      self.cfg.GetNodeName(prinode), needed_mem, n_img.mfree)
2234

    
2235
  def _VerifyFiles(self, nodes, master_node_uuid, all_nvinfo,
2236
                   (files_all, files_opt, files_mc, files_vm)):
2237
    """Verifies file checksums collected from all nodes.
2238

2239
    @param nodes: List of L{objects.Node} objects
2240
    @param master_node_uuid: UUID of master node
2241
    @param all_nvinfo: RPC results
2242

2243
    """
2244
    # Define functions determining which nodes to consider for a file
2245
    files2nodefn = [
2246
      (files_all, None),
2247
      (files_mc, lambda node: (node.master_candidate or
2248
                               node.uuid == master_node_uuid)),
2249
      (files_vm, lambda node: node.vm_capable),
2250
      ]
2251

    
2252
    # Build mapping from filename to list of nodes which should have the file
2253
    nodefiles = {}
2254
    for (files, fn) in files2nodefn:
2255
      if fn is None:
2256
        filenodes = nodes
2257
      else:
2258
        filenodes = filter(fn, nodes)
2259
      nodefiles.update((filename,
2260
                        frozenset(map(operator.attrgetter("uuid"), filenodes)))
2261
                       for filename in files)
2262

    
2263
    assert set(nodefiles) == (files_all | files_mc | files_vm)
2264

    
2265
    fileinfo = dict((filename, {}) for filename in nodefiles)
2266
    ignore_nodes = set()
2267

    
2268
    for node in nodes:
2269
      if node.offline:
2270
        ignore_nodes.add(node.uuid)
2271
        continue
2272

    
2273
      nresult = all_nvinfo[node.uuid]
2274

    
2275
      if nresult.fail_msg or not nresult.payload:
2276
        node_files = None
2277
      else:
2278
        fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
2279
        node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2280
                          for (key, value) in fingerprints.items())
2281
        del fingerprints
2282

    
2283
      test = not (node_files and isinstance(node_files, dict))
2284
      self._ErrorIf(test, constants.CV_ENODEFILECHECK, node.name,
2285
                    "Node did not return file checksum data")
2286
      if test:
2287
        ignore_nodes.add(node.uuid)
2288
        continue
2289

    
2290
      # Build per-checksum mapping from filename to nodes having it
2291
      for (filename, checksum) in node_files.items():
2292
        assert filename in nodefiles
2293
        fileinfo[filename].setdefault(checksum, set()).add(node.uuid)
2294

    
2295
    for (filename, checksums) in fileinfo.items():
2296
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2297

    
2298
      # Nodes having the file
2299
      with_file = frozenset(node_uuid
2300
                            for node_uuids in fileinfo[filename].values()
2301
                            for node_uuid in node_uuids) - ignore_nodes
2302

    
2303
      expected_nodes = nodefiles[filename] - ignore_nodes
2304

    
2305
      # Nodes missing file
2306
      missing_file = expected_nodes - with_file
2307

    
2308
      if filename in files_opt:
2309
        # All or no nodes
2310
        self._ErrorIf(missing_file and missing_file != expected_nodes,
2311
                      constants.CV_ECLUSTERFILECHECK, None,
2312
                      "File %s is optional, but it must exist on all or no"
2313
                      " nodes (not found on %s)",
2314
                      filename,
2315
                      utils.CommaJoin(
2316
                        utils.NiceSort(
2317
                          map(self.cfg.GetNodeName, missing_file))))
2318
      else:
2319
        self._ErrorIf(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2320
                      "File %s is missing from node(s) %s", filename,
2321
                      utils.CommaJoin(
2322
                        utils.NiceSort(
2323
                          map(self.cfg.GetNodeName, missing_file))))
2324

    
2325
        # Warn if a node has a file it shouldn't
2326
        unexpected = with_file - expected_nodes
2327
        self._ErrorIf(unexpected,
2328
                      constants.CV_ECLUSTERFILECHECK, None,
2329
                      "File %s should not exist on node(s) %s",
2330
                      filename, utils.CommaJoin(
2331
                        utils.NiceSort(map(self.cfg.GetNodeName, unexpected))))
2332

    
2333
      # See if there are multiple versions of the file
2334
      test = len(checksums) > 1
2335
      if test:
2336
        variants = ["variant %s on %s" %
2337
                    (idx + 1,
2338
                     utils.CommaJoin(utils.NiceSort(
2339
                       map(self.cfg.GetNodeName, node_uuids))))
2340
                    for (idx, (checksum, node_uuids)) in
2341
                      enumerate(sorted(checksums.items()))]
2342
      else:
2343
        variants = []
2344

    
2345
      self._ErrorIf(test, constants.CV_ECLUSTERFILECHECK, None,
2346
                    "File %s found with %s different checksums (%s)",
2347
                    filename, len(checksums), "; ".join(variants))
2348

    
2349
  def _VerifyNodeDrbdHelper(self, ninfo, nresult, drbd_helper):
2350
    """Verify the drbd helper.
2351

2352
    """
2353
    if drbd_helper:
2354
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2355
      test = (helper_result is None)
2356
      self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2357
                    "no drbd usermode helper returned")
2358
      if helper_result:
2359
        status, payload = helper_result
2360
        test = not status
2361
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2362
                      "drbd usermode helper check unsuccessful: %s", payload)
2363
        test = status and (payload != drbd_helper)
2364
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2365
                      "wrong drbd usermode helper: %s", payload)
2366

    
2367
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2368
                      drbd_map):
2369
    """Verifies and the node DRBD status.
2370

2371
    @type ninfo: L{objects.Node}
2372
    @param ninfo: the node to check
2373
    @param nresult: the remote results for the node
2374
    @param instanceinfo: the dict of instances
2375
    @param drbd_helper: the configured DRBD usermode helper
2376
    @param drbd_map: the DRBD map as returned by
2377
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2378

2379
    """
2380
    self._VerifyNodeDrbdHelper(ninfo, nresult, drbd_helper)
2381

    
2382
    # compute the DRBD minors
2383
    node_drbd = {}
2384
    for minor, inst_uuid in drbd_map[ninfo.uuid].items():
2385
      test = inst_uuid not in instanceinfo
2386
      self._ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2387
                    "ghost instance '%s' in temporary DRBD map", inst_uuid)
2388
        # ghost instance should not be running, but otherwise we
2389
        # don't give double warnings (both ghost instance and
2390
        # unallocated minor in use)
2391
      if test:
2392
        node_drbd[minor] = (inst_uuid, False)
2393
      else:
2394
        instance = instanceinfo[inst_uuid]
2395
        node_drbd[minor] = (inst_uuid, instance.disks_active)
2396

    
2397
    # and now check them
2398
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2399
    test = not isinstance(used_minors, (tuple, list))
2400
    self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2401
                  "cannot parse drbd status file: %s", str(used_minors))
2402
    if test:
2403
      # we cannot check drbd status
2404
      return
2405

    
2406
    for minor, (inst_uuid, must_exist) in node_drbd.items():
2407
      test = minor not in used_minors and must_exist
2408
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2409
                    "drbd minor %d of instance %s is not active", minor,
2410
                    self.cfg.GetInstanceName(inst_uuid))
2411
    for minor in used_minors:
2412
      test = minor not in node_drbd
2413
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2414
                    "unallocated drbd minor %d is in use", minor)
2415

    
2416
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2417
    """Builds the node OS structures.
2418

2419
    @type ninfo: L{objects.Node}
2420
    @param ninfo: the node to check
2421
    @param nresult: the remote results for the node
2422
    @param nimg: the node image object
2423

2424
    """
2425
    remote_os = nresult.get(constants.NV_OSLIST, None)
2426
    test = (not isinstance(remote_os, list) or
2427
            not compat.all(isinstance(v, list) and len(v) == 7
2428
                           for v in remote_os))
2429

    
2430
    self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2431
                  "node hasn't returned valid OS data")
2432

    
2433
    nimg.os_fail = test
2434

    
2435
    if test:
2436
      return
2437

    
2438
    os_dict = {}
2439

    
2440
    for (name, os_path, status, diagnose,
2441
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2442

    
2443
      if name not in os_dict:
2444
        os_dict[name] = []
2445

    
2446
      # parameters is a list of lists instead of list of tuples due to
2447
      # JSON lacking a real tuple type, fix it:
2448
      parameters = [tuple(v) for v in parameters]
2449
      os_dict[name].append((os_path, status, diagnose,
2450
                            set(variants), set(parameters), set(api_ver)))
2451

    
2452
    nimg.oslist = os_dict
2453

    
2454
  def _VerifyNodeOS(self, ninfo, nimg, base):
2455
    """Verifies the node OS list.
2456

2457
    @type ninfo: L{objects.Node}
2458
    @param ninfo: the node to check
2459
    @param nimg: the node image object
2460
    @param base: the 'template' node we match against (e.g. from the master)
2461

2462
    """
2463
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2464

    
2465
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2466
    for os_name, os_data in nimg.oslist.items():
2467
      assert os_data, "Empty OS status for OS %s?!" % os_name
2468
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2469
      self._ErrorIf(not f_status, constants.CV_ENODEOS, ninfo.name,
2470
                    "Invalid OS %s (located at %s): %s",
2471
                    os_name, f_path, f_diag)
2472
      self._ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, ninfo.name,
2473
                    "OS '%s' has multiple entries"
2474
                    " (first one shadows the rest): %s",
2475
                    os_name, utils.CommaJoin([v[0] for v in os_data]))
2476
      # comparisons with the 'base' image
2477
      test = os_name not in base.oslist
2478
      self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2479
                    "Extra OS %s not present on reference node (%s)",
2480
                    os_name, self.cfg.GetNodeName(base.uuid))
2481
      if test:
2482
        continue
2483
      assert base.oslist[os_name], "Base node has empty OS status?"
2484
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2485
      if not b_status:
2486
        # base OS is invalid, skipping
2487
        continue
2488
      for kind, a, b in [("API version", f_api, b_api),
2489
                         ("variants list", f_var, b_var),
2490
                         ("parameters", beautify_params(f_param),
2491
                          beautify_params(b_param))]:
2492
        self._ErrorIf(a != b, constants.CV_ENODEOS, ninfo.name,
2493
                      "OS %s for %s differs from reference node %s:"
2494
                      " [%s] vs. [%s]", kind, os_name,
2495
                      self.cfg.GetNodeName(base.uuid),
2496
                      utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2497

    
2498
    # check any missing OSes
2499
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2500
    self._ErrorIf(missing, constants.CV_ENODEOS, ninfo.name,
2501
                  "OSes present on reference node %s"
2502
                  " but missing on this node: %s",
2503
                  self.cfg.GetNodeName(base.uuid), utils.CommaJoin(missing))
2504

    
2505
  def _VerifyAcceptedFileStoragePaths(self, ninfo, nresult, is_master):
2506
    """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2507

2508
    @type ninfo: L{objects.Node}
2509
    @param ninfo: the node to check
2510
    @param nresult: the remote results for the node
2511
    @type is_master: bool
2512
    @param is_master: Whether node is the master node
2513

2514
    """
2515
    cluster = self.cfg.GetClusterInfo()
2516
    if (is_master and
2517
        (cluster.IsFileStorageEnabled() or
2518
         cluster.IsSharedFileStorageEnabled())):
2519
      try:
2520
        fspaths = nresult[constants.NV_ACCEPTED_STORAGE_PATHS]
2521
      except KeyError:
2522
        # This should never happen
2523
        self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2524
                      "Node did not return forbidden file storage paths")
2525
      else:
2526
        self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2527
                      "Found forbidden file storage paths: %s",
2528
                      utils.CommaJoin(fspaths))
2529
    else:
2530
      self._ErrorIf(constants.NV_ACCEPTED_STORAGE_PATHS in nresult,
2531
                    constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2532
                    "Node should not have returned forbidden file storage"
2533
                    " paths")
2534

    
2535
  def _VerifyStoragePaths(self, ninfo, nresult, file_disk_template,
2536
                          verify_key, error_key):
2537
    """Verifies (file) storage paths.
2538

2539
    @type ninfo: L{objects.Node}
2540
    @param ninfo: the node to check
2541
    @param nresult: the remote results for the node
2542
    @type file_disk_template: string
2543
    @param file_disk_template: file-based disk template, whose directory
2544
        is supposed to be verified
2545
    @type verify_key: string
2546
    @param verify_key: key for the verification map of this file
2547
        verification step
2548
    @param error_key: error key to be added to the verification results
2549
        in case something goes wrong in this verification step
2550

2551
    """
2552
    assert (file_disk_template in
2553
            utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
2554
    cluster = self.cfg.GetClusterInfo()
2555
    if cluster.IsDiskTemplateEnabled(file_disk_template):
2556
      self._ErrorIf(
2557
          verify_key in nresult,
2558
          error_key, ninfo.name,
2559
          "The configured %s storage path is unusable: %s" %
2560
          (file_disk_template, nresult.get(verify_key)))
2561

    
2562
  def _VerifyFileStoragePaths(self, ninfo, nresult):
2563
    """Verifies (file) storage paths.
2564

2565
    @see: C{_VerifyStoragePaths}
2566

2567
    """
2568
    self._VerifyStoragePaths(
2569
        ninfo, nresult, constants.DT_FILE,
2570
        constants.NV_FILE_STORAGE_PATH,
2571
        constants.CV_ENODEFILESTORAGEPATHUNUSABLE)
2572

    
2573
  def _VerifySharedFileStoragePaths(self, ninfo, nresult):
2574
    """Verifies (file) storage paths.
2575

2576
    @see: C{_VerifyStoragePaths}
2577

2578
    """
2579
    self._VerifyStoragePaths(
2580
        ninfo, nresult, constants.DT_SHARED_FILE,
2581
        constants.NV_SHARED_FILE_STORAGE_PATH,
2582
        constants.CV_ENODESHAREDFILESTORAGEPATHUNUSABLE)
2583

    
2584
  def _VerifyOob(self, ninfo, nresult):
2585
    """Verifies out of band functionality of a node.
2586

2587
    @type ninfo: L{objects.Node}
2588
    @param ninfo: the node to check
2589
    @param nresult: the remote results for the node
2590

2591
    """
2592
    # We just have to verify the paths on master and/or master candidates
2593
    # as the oob helper is invoked on the master
2594
    if ((ninfo.master_candidate or ninfo.master_capable) and
2595
        constants.NV_OOB_PATHS in nresult):
2596
      for path_result in nresult[constants.NV_OOB_PATHS]:
2597
        self._ErrorIf(path_result, constants.CV_ENODEOOBPATH,
2598
                      ninfo.name, path_result)
2599

    
2600
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2601
    """Verifies and updates the node volume data.
2602

2603
    This function will update a L{NodeImage}'s internal structures
2604
    with data from the remote call.
2605

2606
    @type ninfo: L{objects.Node}
2607
    @param ninfo: the node to check
2608
    @param nresult: the remote results for the node
2609
    @param nimg: the node image object
2610
    @param vg_name: the configured VG name
2611

2612
    """
2613
    nimg.lvm_fail = True
2614
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2615
    if vg_name is None:
2616
      pass
2617
    elif isinstance(lvdata, basestring):
2618
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2619
                    "LVM problem on node: %s", utils.SafeEncode(lvdata))
2620
    elif not isinstance(lvdata, dict):
2621
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2622
                    "rpc call to node failed (lvlist)")
2623
    else:
2624
      nimg.volumes = lvdata
2625
      nimg.lvm_fail = False
2626

    
2627
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2628
    """Verifies and updates the node instance list.
2629

2630
    If the listing was successful, then updates this node's instance
2631
    list. Otherwise, it marks the RPC call as failed for the instance
2632
    list key.
2633

2634
    @type ninfo: L{objects.Node}
2635
    @param ninfo: the node to check
2636
    @param nresult: the remote results for the node
2637
    @param nimg: the node image object
2638

2639
    """
2640
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2641
    test = not isinstance(idata, list)
2642
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2643
                  "rpc call to node failed (instancelist): %s",
2644
                  utils.SafeEncode(str(idata)))
2645
    if test:
2646
      nimg.hyp_fail = True
2647
    else:
2648
      nimg.instances = [inst.uuid for (_, inst) in
2649
                        self.cfg.GetMultiInstanceInfoByName(idata)]
2650

    
2651
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2652
    """Verifies and computes a node information map
2653

2654
    @type ninfo: L{objects.Node}
2655
    @param ninfo: the node to check
2656
    @param nresult: the remote results for the node
2657
    @param nimg: the node image object
2658
    @param vg_name: the configured VG name
2659

2660
    """
2661
    # try to read free memory (from the hypervisor)
2662
    hv_info = nresult.get(constants.NV_HVINFO, None)
2663
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2664
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2665
                  "rpc call to node failed (hvinfo)")
2666
    if not test:
2667
      try:
2668
        nimg.mfree = int(hv_info["memory_free"])
2669
      except (ValueError, TypeError):
2670
        self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2671
                      "node returned invalid nodeinfo, check hypervisor")
2672

    
2673
    # FIXME: devise a free space model for file based instances as well
2674
    if vg_name is not None:
2675
      test = (constants.NV_VGLIST not in nresult or
2676
              vg_name not in nresult[constants.NV_VGLIST])
2677
      self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
2678
                    "node didn't return data for the volume group '%s'"
2679
                    " - it is either missing or broken", vg_name)
2680
      if not test:
2681
        try:
2682
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2683
        except (ValueError, TypeError):
2684
          self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2685
                        "node returned invalid LVM info, check LVM status")
2686

    
2687
  def _CollectDiskInfo(self, node_uuids, node_image, instanceinfo):
2688
    """Gets per-disk status information for all instances.
2689

2690
    @type node_uuids: list of strings
2691
    @param node_uuids: Node UUIDs
2692
    @type node_image: dict of (UUID, L{objects.Node})
2693
    @param node_image: Node objects
2694
    @type instanceinfo: dict of (UUID, L{objects.Instance})
2695
    @param instanceinfo: Instance objects
2696
    @rtype: {instance: {node: [(succes, payload)]}}
2697
    @return: a dictionary of per-instance dictionaries with nodes as
2698
        keys and disk information as values; the disk information is a
2699
        list of tuples (success, payload)
2700

2701
    """
2702
    node_disks = {}
2703
    node_disks_dev_inst_only = {}
2704
    diskless_instances = set()
2705
    diskless = constants.DT_DISKLESS
2706

    
2707
    for nuuid in node_uuids:
2708
      node_inst_uuids = list(itertools.chain(node_image[nuuid].pinst,
2709
                                             node_image[nuuid].sinst))
2710
      diskless_instances.update(uuid for uuid in node_inst_uuids
2711
                                if instanceinfo[uuid].disk_template == diskless)
2712
      disks = [(inst_uuid, disk)
2713
               for inst_uuid in node_inst_uuids
2714
               for disk in instanceinfo[inst_uuid].disks]
2715

    
2716
      if not disks:
2717
        # No need to collect data
2718
        continue
2719

    
2720
      node_disks[nuuid] = disks
2721

    
2722
      # _AnnotateDiskParams makes already copies of the disks
2723
      dev_inst_only = []
2724
      for (inst_uuid, dev) in disks:
2725
        (anno_disk,) = AnnotateDiskParams(instanceinfo[inst_uuid], [dev],
2726
                                          self.cfg)
2727
        dev_inst_only.append((anno_disk, instanceinfo[inst_uuid]))
2728

    
2729
      node_disks_dev_inst_only[nuuid] = dev_inst_only
2730

    
2731
    assert len(node_disks) == len(node_disks_dev_inst_only)
2732

    
2733
    # Collect data from all nodes with disks
2734
    result = self.rpc.call_blockdev_getmirrorstatus_multi(
2735
               node_disks.keys(), node_disks_dev_inst_only)
2736

    
2737
    assert len(result) == len(node_disks)
2738

    
2739
    instdisk = {}
2740

    
2741
    for (nuuid, nres) in result.items():
2742
      node = self.cfg.GetNodeInfo(nuuid)
2743
      disks = node_disks[node.uuid]
2744

    
2745
      if nres.offline:
2746
        # No data from this node
2747
        data = len(disks) * [(False, "node offline")]
2748
      else:
2749
        msg = nres.fail_msg
2750
        self._ErrorIf(msg, constants.CV_ENODERPC, node.name,
2751
                      "while getting disk information: %s", msg)
2752
        if msg:
2753
          # No data from this node
2754
          data = len(disks) * [(False, msg)]
2755
        else:
2756
          data = []
2757
          for idx, i in enumerate(nres.payload):
2758
            if isinstance(i, (tuple, list)) and len(i) == 2:
2759
              data.append(i)
2760
            else:
2761
              logging.warning("Invalid result from node %s, entry %d: %s",
2762
                              node.name, idx, i)
2763
              data.append((False, "Invalid result from the remote node"))
2764

    
2765
      for ((inst_uuid, _), status) in zip(disks, data):
2766
        instdisk.setdefault(inst_uuid, {}).setdefault(node.uuid, []) \
2767
          .append(status)
2768

    
2769
    # Add empty entries for diskless instances.
2770
    for inst_uuid in diskless_instances:
2771
      assert inst_uuid not in instdisk
2772
      instdisk[inst_uuid] = {}
2773

    
2774
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2775
                      len(nuuids) <= len(instanceinfo[inst].all_nodes) and
2776
                      compat.all(isinstance(s, (tuple, list)) and
2777
                                 len(s) == 2 for s in statuses)
2778
                      for inst, nuuids in instdisk.items()
2779
                      for nuuid, statuses in nuuids.items())
2780
    if __debug__:
2781
      instdisk_keys = set(instdisk)
2782
      instanceinfo_keys = set(instanceinfo)
2783
      assert instdisk_keys == instanceinfo_keys, \
2784
        ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2785
         (instdisk_keys, instanceinfo_keys))
2786

    
2787
    return instdisk
2788

    
2789
  @staticmethod
2790
  def _SshNodeSelector(group_uuid, all_nodes):
2791
    """Create endless iterators for all potential SSH check hosts.
2792

2793
    """
2794
    nodes = [node for node in all_nodes
2795
             if (node.group != group_uuid and
2796
                 not node.offline)]
2797
    keyfunc = operator.attrgetter("group")
2798

    
2799
    return map(itertools.cycle,
2800
               [sorted(map(operator.attrgetter("name"), names))
2801
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2802
                                                  keyfunc)])
2803

    
2804
  @classmethod
2805
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2806
    """Choose which nodes should talk to which other nodes.
2807

2808
    We will make nodes contact all nodes in their group, and one node from
2809
    every other group.
2810

2811
    @warning: This algorithm has a known issue if one node group is much
2812
      smaller than others (e.g. just one node). In such a case all other
2813
      nodes will talk to the single node.
2814

2815
    """
2816
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2817
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
2818

    
2819
    return (online_nodes,
2820
            dict((name, sorted([i.next() for i in sel]))
2821
                 for name in online_nodes))
2822

    
2823
  def BuildHooksEnv(self):
2824
    """Build hooks env.
2825

2826
    Cluster-Verify hooks just ran in the post phase and their failure makes
2827
    the output be logged in the verify output and the verification to fail.
2828

2829
    """
2830
    env = {
2831
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2832
      }
2833

    
2834
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2835
               for node in self.my_node_info.values())
2836

    
2837
    return env
2838

    
2839
  def BuildHooksNodes(self):
2840
    """Build hooks nodes.
2841

2842
    """
2843
    return ([], list(self.my_node_info.keys()))
2844

    
2845
  def Exec(self, feedback_fn):
2846
    """Verify integrity of the node group, performing various test on nodes.
2847

2848
    """
2849
    # This method has too many local variables. pylint: disable=R0914
2850
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
2851

    
2852
    if not self.my_node_uuids:
2853
      # empty node group
2854
      feedback_fn("* Empty node group, skipping verification")
2855
      return True
2856

    
2857
    self.bad = False
2858
    verbose = self.op.verbose
2859
    self._feedback_fn = feedback_fn
2860

    
2861
    vg_name = self.cfg.GetVGName()
2862
    drbd_helper = self.cfg.GetDRBDHelper()
2863
    cluster = self.cfg.GetClusterInfo()
2864
    hypervisors = cluster.enabled_hypervisors
2865
    node_data_list = self.my_node_info.values()
2866

    
2867
    i_non_redundant = [] # Non redundant instances
2868
    i_non_a_balanced = [] # Non auto-balanced instances
2869
    i_offline = 0 # Count of offline instances
2870
    n_offline = 0 # Count of offline nodes
2871
    n_drained = 0 # Count of nodes being drained
2872
    node_vol_should = {}
2873

    
2874
    # FIXME: verify OS list
2875

    
2876
    # File verification
2877
    filemap = ComputeAncillaryFiles(cluster, False)
2878

    
2879
    # do local checksums
2880
    master_node_uuid = self.master_node = self.cfg.GetMasterNode()
2881
    master_ip = self.cfg.GetMasterIP()
2882

    
2883
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_uuids))
2884

    
2885
    user_scripts = []
2886
    if self.cfg.GetUseExternalMipScript():
2887
      user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2888

    
2889
    node_verify_param = {
2890
      constants.NV_FILELIST:
2891
        map(vcluster.MakeVirtualPath,
2892
            utils.UniqueSequence(filename
2893
                                 for files in filemap
2894
                                 for filename in files)),
2895
      constants.NV_NODELIST:
2896
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2897
                                  self.all_node_info.values()),
2898
      constants.NV_HYPERVISOR: hypervisors,
2899
      constants.NV_HVPARAMS:
2900
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2901
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2902
                                 for node in node_data_list
2903
                                 if not node.offline],
2904
      constants.NV_INSTANCELIST: hypervisors,
2905
      constants.NV_VERSION: None,
2906
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2907
      constants.NV_NODESETUP: None,
2908
      constants.NV_TIME: None,
2909
      constants.NV_MASTERIP: (self.cfg.GetMasterNodeName(), master_ip),
2910
      constants.NV_OSLIST: None,
2911
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2912
      constants.NV_USERSCRIPTS: user_scripts,
2913
      }
2914

    
2915
    if vg_name is not None:
2916
      node_verify_param[constants.NV_VGLIST] = None
2917
      node_verify_param[constants.NV_LVLIST] = vg_name
2918
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2919

    
2920
    if cluster.IsDiskTemplateEnabled(constants.DT_DRBD8):
2921
      if drbd_helper:
2922
        node_verify_param[constants.NV_DRBDVERSION] = None
2923
        node_verify_param[constants.NV_DRBDLIST] = None
2924
        node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2925

    
2926
    if cluster.IsFileStorageEnabled() or \
2927
        cluster.IsSharedFileStorageEnabled():
2928
      # Load file storage paths only from master node
2929
      node_verify_param[constants.NV_ACCEPTED_STORAGE_PATHS] = \
2930
        self.cfg.GetMasterNodeName()
2931
      if cluster.IsFileStorageEnabled():
2932
        node_verify_param[constants.NV_FILE_STORAGE_PATH] = \
2933
          cluster.file_storage_dir
2934

    
2935
    # bridge checks
2936
    # FIXME: this needs to be changed per node-group, not cluster-wide
2937
    bridges = set()
2938
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2939
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2940
      bridges.add(default_nicpp[constants.NIC_LINK])
2941
    for inst_uuid in self.my_inst_info.values():
2942
      for nic in inst_uuid.nics:
2943
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
2944
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2945
          bridges.add(full_nic[constants.NIC_LINK])
2946

    
2947
    if bridges:
2948
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
2949

    
2950
    # Build our expected cluster state
2951
    node_image = dict((node.uuid, self.NodeImage(offline=node.offline,
2952
                                                 uuid=node.uuid,
2953
                                                 vm_capable=node.vm_capable))
2954
                      for node in node_data_list)
2955

    
2956
    # Gather OOB paths
2957
    oob_paths = []
2958
    for node in self.all_node_info.values():
2959
      path = SupportsOob(self.cfg, node)
2960
      if path and path not in oob_paths:
2961
        oob_paths.append(path)
2962

    
2963
    if oob_paths:
2964
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2965

    
2966
    for inst_uuid in self.my_inst_uuids:
2967
      instance = self.my_inst_info[inst_uuid]
2968
      if instance.admin_state == constants.ADMINST_OFFLINE:
2969
        i_offline += 1
2970

    
2971
      for nuuid in instance.all_nodes:
2972
        if nuuid not in node_image:
2973
          gnode = self.NodeImage(uuid=nuuid)
2974
          gnode.ghost = (nuuid not in self.all_node_info)
2975
          node_image[nuuid] = gnode
2976

    
2977
      instance.MapLVsByNode(node_vol_should)
2978

    
2979
      pnode = instance.primary_node
2980
      node_image[pnode].pinst.append(instance.uuid)
2981

    
2982
      for snode in instance.secondary_nodes:
2983
        nimg = node_image[snode]
2984
        nimg.sinst.append(instance.uuid)
2985
        if pnode not in nimg.sbp:
2986
          nimg.sbp[pnode] = []
2987
        nimg.sbp[pnode].append(instance.uuid)
2988

    
2989
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
2990
                                               self.my_node_info.keys())
2991
    # The value of exclusive_storage should be the same across the group, so if
2992
    # it's True for at least a node, we act as if it were set for all the nodes
2993
    self._exclusive_storage = compat.any(es_flags.values())
2994
    if self._exclusive_storage:
2995
      node_verify_param[constants.NV_EXCLUSIVEPVS] = True
2996

    
2997
    # At this point, we have the in-memory data structures complete,
2998
    # except for the runtime information, which we'll gather next
2999

    
3000
    # Due to the way our RPC system works, exact response times cannot be
3001
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
3002
    # time before and after executing the request, we can at least have a time
3003
    # window.
3004
    nvinfo_starttime = time.time()
3005
    all_nvinfo = self.rpc.call_node_verify(self.my_node_uuids,
3006
                                           node_verify_param,
3007
                                           self.cfg.GetClusterName(),
3008
                                           self.cfg.GetClusterInfo().hvparams)
3009
    nvinfo_endtime = time.time()
3010

    
3011
    if self.extra_lv_nodes and vg_name is not None:
3012
      extra_lv_nvinfo = \
3013
          self.rpc.call_node_verify(self.extra_lv_nodes,
3014
                                    {constants.NV_LVLIST: vg_name},
3015
                                    self.cfg.GetClusterName(),
3016
                                    self.cfg.GetClusterInfo().hvparams)
3017
    else:
3018
      extra_lv_nvinfo = {}
3019

    
3020
    all_drbd_map = self.cfg.ComputeDRBDMap()
3021

    
3022
    feedback_fn("* Gathering disk information (%s nodes)" %
3023
                len(self.my_node_uuids))
3024
    instdisk = self._CollectDiskInfo(self.my_node_info.keys(), node_image,
3025
                                     self.my_inst_info)
3026

    
3027
    feedback_fn("* Verifying configuration file consistency")
3028

    
3029
    # If not all nodes are being checked, we need to make sure the master node
3030
    # and a non-checked vm_capable node are in the list.
3031
    absent_node_uuids = set(self.all_node_info).difference(self.my_node_info)
3032
    if absent_node_uuids:
3033
      vf_nvinfo = all_nvinfo.copy()
3034
      vf_node_info = list(self.my_node_info.values())
3035
      additional_node_uuids = []
3036
      if master_node_uuid not in self.my_node_info:
3037
        additional_node_uuids.append(master_node_uuid)
3038
        vf_node_info.append(self.all_node_info[master_node_uuid])
3039
      # Add the first vm_capable node we find which is not included,
3040
      # excluding the master node (which we already have)
3041
      for node_uuid in absent_node_uuids:
3042
        nodeinfo = self.all_node_info[node_uuid]
3043
        if (nodeinfo.vm_capable and not nodeinfo.offline and
3044
            node_uuid != master_node_uuid):
3045
          additional_node_uuids.append(node_uuid)
3046
          vf_node_info.append(self.all_node_info[node_uuid])
3047
          break
3048
      key = constants.NV_FILELIST
3049
      vf_nvinfo.update(self.rpc.call_node_verify(
3050
         additional_node_uuids, {key: node_verify_param[key]},
3051
         self.cfg.GetClusterName(), self.cfg.GetClusterInfo().hvparams))
3052
    else:
3053
      vf_nvinfo = all_nvinfo
3054
      vf_node_info = self.my_node_info.values()
3055

    
3056
    self._VerifyFiles(vf_node_info, master_node_uuid, vf_nvinfo, filemap)
3057

    
3058
    feedback_fn("* Verifying node status")
3059

    
3060
    refos_img = None
3061

    
3062
    for node_i in node_data_list:
3063
      nimg = node_image[node_i.uuid]
3064

    
3065
      if node_i.offline:
3066
        if verbose:
3067
          feedback_fn("* Skipping offline node %s" % (node_i.name,))
3068
        n_offline += 1
3069
        continue
3070

    
3071
      if node_i.uuid == master_node_uuid:
3072
        ntype = "master"
3073
      elif node_i.master_candidate:
3074
        ntype = "master candidate"
3075
      elif node_i.drained:
3076
        ntype = "drained"
3077
        n_drained += 1
3078
      else:
3079
        ntype = "regular"
3080
      if verbose:
3081
        feedback_fn("* Verifying node %s (%s)" % (node_i.name, ntype))
3082

    
3083
      msg = all_nvinfo[node_i.uuid].fail_msg
3084
      self._ErrorIf(msg, constants.CV_ENODERPC, node_i.name,
3085
                    "while contacting node: %s", msg)
3086
      if msg:
3087
        nimg.rpc_fail = True
3088
        continue
3089

    
3090
      nresult = all_nvinfo[node_i.uuid].payload
3091

    
3092
      nimg.call_ok = self._VerifyNode(node_i, nresult)
3093
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
3094
      self._VerifyNodeNetwork(node_i, nresult)
3095
      self._VerifyNodeUserScripts(node_i, nresult)
3096
      self._VerifyOob(node_i, nresult)
3097
      self._VerifyAcceptedFileStoragePaths(node_i, nresult,
3098
                                           node_i.uuid == master_node_uuid)
3099
      self._VerifyFileStoragePaths(node_i, nresult)
3100
      self._VerifySharedFileStoragePaths(node_i, nresult)
3101

    
3102
      if nimg.vm_capable:
3103
        self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
3104
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
3105
                             all_drbd_map)
3106

    
3107
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
3108
        self._UpdateNodeInstances(node_i, nresult, nimg)
3109
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
3110
        self._UpdateNodeOS(node_i, nresult, nimg)
3111

    
3112
        if not nimg.os_fail:
3113
          if refos_img is None:
3114
            refos_img = nimg
3115
          self._VerifyNodeOS(node_i, nimg, refos_img)
3116
        self._VerifyNodeBridges(node_i, nresult, bridges)
3117

    
3118
        # Check whether all running instances are primary for the node. (This
3119
        # can no longer be done from _VerifyInstance below, since some of the
3120
        # wrong instances could be from other node groups.)
3121
        non_primary_inst_uuids = set(nimg.instances).difference(nimg.pinst)
3122

    
3123
        for inst_uuid in non_primary_inst_uuids:
3124
          test = inst_uuid in self.all_inst_info
3125
          self._ErrorIf(test, constants.CV_EINSTANCEWRONGNODE,
3126
                        self.cfg.GetInstanceName(inst_uuid),
3127
                        "instance should not run on node %s", node_i.name)
3128
          self._ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
3129
                        "node is running unknown instance %s", inst_uuid)
3130

    
3131
    self._VerifyGroupDRBDVersion(all_nvinfo)
3132
    self._VerifyGroupLVM(node_image, vg_name)
3133

    
3134
    for node_uuid, result in extra_lv_nvinfo.items():
3135
      self._UpdateNodeVolumes(self.all_node_info[node_uuid], result.payload,
3136
                              node_image[node_uuid], vg_name)
3137

    
3138
    feedback_fn("* Verifying instance status")
3139
    for inst_uuid in self.my_inst_uuids:
3140
      instance = self.my_inst_info[inst_uuid]
3141
      if verbose:
3142
        feedback_fn("* Verifying instance %s" % instance.name)
3143
      self._VerifyInstance(instance, node_image, instdisk[inst_uuid])
3144

    
3145
      # If the instance is non-redundant we cannot survive losing its primary
3146
      # node, so we are not N+1 compliant.
3147
      if instance.disk_template not in constants.DTS_MIRRORED:
3148
        i_non_redundant.append(instance)
3149

    
3150
      if not cluster.FillBE(instance)[constants.BE_AUTO_BALANCE]:
3151
        i_non_a_balanced.append(instance)
3152

    
3153
    feedback_fn("* Verifying orphan volumes")
3154
    reserved = utils.FieldSet(*cluster.reserved_lvs)
3155

    
3156
    # We will get spurious "unknown volume" warnings if any node of this group
3157
    # is secondary for an instance whose primary is in another group. To avoid
3158
    # them, we find these instances and add their volumes to node_vol_should.
3159
    for instance in self.all_inst_info.values():
3160
      for secondary in instance.secondary_nodes:
3161
        if (secondary in self.my_node_info
3162
            and instance.name not in self.my_inst_info):
3163
          instance.MapLVsByNode(node_vol_should)
3164
          break
3165

    
3166
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
3167

    
3168
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
3169
      feedback_fn("* Verifying N+1 Memory redundancy")
3170
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
3171

    
3172
    feedback_fn("* Other Notes")
3173
    if i_non_redundant:
3174
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
3175
                  % len(i_non_redundant))
3176

    
3177
    if i_non_a_balanced:
3178
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
3179
                  % len(i_non_a_balanced))
3180

    
3181
    if i_offline:
3182
      feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
3183

    
3184
    if n_offline:
3185
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
3186

    
3187
    if n_drained:
3188
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
3189

    
3190
    return not self.bad
3191

    
3192
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
3193
    """Analyze the post-hooks' result
3194

3195
    This method analyses the hook result, handles it, and sends some
3196
    nicely-formatted feedback back to the user.
3197

3198
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
3199
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
3200
    @param hooks_results: the results of the multi-node hooks rpc call
3201
    @param feedback_fn: function used send feedback back to the caller
3202
    @param lu_result: previous Exec result
3203
    @return: the new Exec result, based on the previous result
3204
        and hook results
3205

3206
    """
3207
    # We only really run POST phase hooks, only for non-empty groups,
3208
    # and are only interested in their results
3209
    if not self.my_node_uuids:
3210
      # empty node group
3211
      pass
3212
    elif phase == constants.HOOKS_PHASE_POST:
3213
      # Used to change hooks' output to proper indentation
3214
      feedback_fn("* Hooks Results")
3215
      assert hooks_results, "invalid result from hooks"
3216

    
3217
      for node_name in hooks_results:
3218
        res = hooks_results[node_name]
3219
        msg = res.fail_msg
3220
        test = msg and not res.offline
3221
        self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3222
                      "Communication failure in hooks execution: %s", msg)
3223
        if res.offline or msg:
3224
          # No need to investigate payload if node is offline or gave
3225
          # an error.
3226
          continue
3227
        for script, hkr, output in res.payload:
3228
          test = hkr == constants.HKR_FAIL
3229
          self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3230
                        "Script %s failed, output:", script)
3231
          if test:
3232
            output = self._HOOKS_INDENT_RE.sub("      ", output)
3233
            feedback_fn("%s" % output)
3234
            lu_result = False
3235

    
3236
    return lu_result
3237

    
3238

    
3239
class LUClusterVerifyDisks(NoHooksLU):
3240
  """Verifies the cluster disks status.
3241

3242
  """
3243
  REQ_BGL = False
3244

    
3245
  def ExpandNames(self):
3246
    self.share_locks = ShareAll()
3247
    self.needed_locks = {
3248
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
3249
      }
3250

    
3251
  def Exec(self, feedback_fn):
3252
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3253

    
3254
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3255
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3256
                           for group in group_names])