Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 9f039737

History | View | Annotate | Download (464.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0201,C0302
25

    
26
# W0201 since most LU attributes are defined in CheckPrereq or similar
27
# functions
28

    
29
# C0302: since we have waaaay to many lines in this module
30

    
31
import os
32
import os.path
33
import time
34
import re
35
import platform
36
import logging
37
import copy
38
import OpenSSL
39
import socket
40
import tempfile
41
import shutil
42
import itertools
43
import operator
44

    
45
from ganeti import ssh
46
from ganeti import utils
47
from ganeti import errors
48
from ganeti import hypervisor
49
from ganeti import locking
50
from ganeti import constants
51
from ganeti import objects
52
from ganeti import serializer
53
from ganeti import ssconf
54
from ganeti import uidpool
55
from ganeti import compat
56
from ganeti import masterd
57
from ganeti import netutils
58
from ganeti import query
59
from ganeti import qlang
60
from ganeti import opcodes
61
from ganeti import ht
62

    
63
import ganeti.masterd.instance # pylint: disable-msg=W0611
64

    
65

    
66
class ResultWithJobs:
67
  """Data container for LU results with jobs.
68

69
  Instances of this class returned from L{LogicalUnit.Exec} will be recognized
70
  by L{mcpu.Processor._ProcessResult}. The latter will then submit the jobs
71
  contained in the C{jobs} attribute and include the job IDs in the opcode
72
  result.
73

74
  """
75
  def __init__(self, jobs, **kwargs):
76
    """Initializes this class.
77

78
    Additional return values can be specified as keyword arguments.
79

80
    @type jobs: list of lists of L{opcode.OpCode}
81
    @param jobs: A list of lists of opcode objects
82

83
    """
84
    self.jobs = jobs
85
    self.other = kwargs
86

    
87

    
88
class LogicalUnit(object):
89
  """Logical Unit base class.
90

91
  Subclasses must follow these rules:
92
    - implement ExpandNames
93
    - implement CheckPrereq (except when tasklets are used)
94
    - implement Exec (except when tasklets are used)
95
    - implement BuildHooksEnv
96
    - implement BuildHooksNodes
97
    - redefine HPATH and HTYPE
98
    - optionally redefine their run requirements:
99
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
100

101
  Note that all commands require root permissions.
102

103
  @ivar dry_run_result: the value (if any) that will be returned to the caller
104
      in dry-run mode (signalled by opcode dry_run parameter)
105

106
  """
107
  HPATH = None
108
  HTYPE = None
109
  REQ_BGL = True
110

    
111
  def __init__(self, processor, op, context, rpc):
112
    """Constructor for LogicalUnit.
113

114
    This needs to be overridden in derived classes in order to check op
115
    validity.
116

117
    """
118
    self.proc = processor
119
    self.op = op
120
    self.cfg = context.cfg
121
    self.glm = context.glm
122
    self.context = context
123
    self.rpc = rpc
124
    # Dicts used to declare locking needs to mcpu
125
    self.needed_locks = None
126
    self.share_locks = dict.fromkeys(locking.LEVELS, 0)
127
    self.add_locks = {}
128
    self.remove_locks = {}
129
    # Used to force good behavior when calling helper functions
130
    self.recalculate_locks = {}
131
    # logging
132
    self.Log = processor.Log # pylint: disable-msg=C0103
133
    self.LogWarning = processor.LogWarning # pylint: disable-msg=C0103
134
    self.LogInfo = processor.LogInfo # pylint: disable-msg=C0103
135
    self.LogStep = processor.LogStep # pylint: disable-msg=C0103
136
    # support for dry-run
137
    self.dry_run_result = None
138
    # support for generic debug attribute
139
    if (not hasattr(self.op, "debug_level") or
140
        not isinstance(self.op.debug_level, int)):
141
      self.op.debug_level = 0
142

    
143
    # Tasklets
144
    self.tasklets = None
145

    
146
    # Validate opcode parameters and set defaults
147
    self.op.Validate(True)
148

    
149
    self.CheckArguments()
150

    
151
  def CheckArguments(self):
152
    """Check syntactic validity for the opcode arguments.
153

154
    This method is for doing a simple syntactic check and ensure
155
    validity of opcode parameters, without any cluster-related
156
    checks. While the same can be accomplished in ExpandNames and/or
157
    CheckPrereq, doing these separate is better because:
158

159
      - ExpandNames is left as as purely a lock-related function
160
      - CheckPrereq is run after we have acquired locks (and possible
161
        waited for them)
162

163
    The function is allowed to change the self.op attribute so that
164
    later methods can no longer worry about missing parameters.
165

166
    """
167
    pass
168

    
169
  def ExpandNames(self):
170
    """Expand names for this LU.
171

172
    This method is called before starting to execute the opcode, and it should
173
    update all the parameters of the opcode to their canonical form (e.g. a
174
    short node name must be fully expanded after this method has successfully
175
    completed). This way locking, hooks, logging, etc. can work correctly.
176

177
    LUs which implement this method must also populate the self.needed_locks
178
    member, as a dict with lock levels as keys, and a list of needed lock names
179
    as values. Rules:
180

181
      - use an empty dict if you don't need any lock
182
      - if you don't need any lock at a particular level omit that level
183
      - don't put anything for the BGL level
184
      - if you want all locks at a level use locking.ALL_SET as a value
185

186
    If you need to share locks (rather than acquire them exclusively) at one
187
    level you can modify self.share_locks, setting a true value (usually 1) for
188
    that level. By default locks are not shared.
189

190
    This function can also define a list of tasklets, which then will be
191
    executed in order instead of the usual LU-level CheckPrereq and Exec
192
    functions, if those are not defined by the LU.
193

194
    Examples::
195

196
      # Acquire all nodes and one instance
197
      self.needed_locks = {
198
        locking.LEVEL_NODE: locking.ALL_SET,
199
        locking.LEVEL_INSTANCE: ['instance1.example.com'],
200
      }
201
      # Acquire just two nodes
202
      self.needed_locks = {
203
        locking.LEVEL_NODE: ['node1.example.com', 'node2.example.com'],
204
      }
205
      # Acquire no locks
206
      self.needed_locks = {} # No, you can't leave it to the default value None
207

208
    """
209
    # The implementation of this method is mandatory only if the new LU is
210
    # concurrent, so that old LUs don't need to be changed all at the same
211
    # time.
212
    if self.REQ_BGL:
213
      self.needed_locks = {} # Exclusive LUs don't need locks.
214
    else:
215
      raise NotImplementedError
216

    
217
  def DeclareLocks(self, level):
218
    """Declare LU locking needs for a level
219

220
    While most LUs can just declare their locking needs at ExpandNames time,
221
    sometimes there's the need to calculate some locks after having acquired
222
    the ones before. This function is called just before acquiring locks at a
223
    particular level, but after acquiring the ones at lower levels, and permits
224
    such calculations. It can be used to modify self.needed_locks, and by
225
    default it does nothing.
226

227
    This function is only called if you have something already set in
228
    self.needed_locks for the level.
229

230
    @param level: Locking level which is going to be locked
231
    @type level: member of ganeti.locking.LEVELS
232

233
    """
234

    
235
  def CheckPrereq(self):
236
    """Check prerequisites for this LU.
237

238
    This method should check that the prerequisites for the execution
239
    of this LU are fulfilled. It can do internode communication, but
240
    it should be idempotent - no cluster or system changes are
241
    allowed.
242

243
    The method should raise errors.OpPrereqError in case something is
244
    not fulfilled. Its return value is ignored.
245

246
    This method should also update all the parameters of the opcode to
247
    their canonical form if it hasn't been done by ExpandNames before.
248

249
    """
250
    if self.tasklets is not None:
251
      for (idx, tl) in enumerate(self.tasklets):
252
        logging.debug("Checking prerequisites for tasklet %s/%s",
253
                      idx + 1, len(self.tasklets))
254
        tl.CheckPrereq()
255
    else:
256
      pass
257

    
258
  def Exec(self, feedback_fn):
259
    """Execute the LU.
260

261
    This method should implement the actual work. It should raise
262
    errors.OpExecError for failures that are somewhat dealt with in
263
    code, or expected.
264

265
    """
266
    if self.tasklets is not None:
267
      for (idx, tl) in enumerate(self.tasklets):
268
        logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
269
        tl.Exec(feedback_fn)
270
    else:
271
      raise NotImplementedError
272

    
273
  def BuildHooksEnv(self):
274
    """Build hooks environment for this LU.
275

276
    @rtype: dict
277
    @return: Dictionary containing the environment that will be used for
278
      running the hooks for this LU. The keys of the dict must not be prefixed
279
      with "GANETI_"--that'll be added by the hooks runner. The hooks runner
280
      will extend the environment with additional variables. If no environment
281
      should be defined, an empty dictionary should be returned (not C{None}).
282
    @note: If the C{HPATH} attribute of the LU class is C{None}, this function
283
      will not be called.
284

285
    """
286
    raise NotImplementedError
287

    
288
  def BuildHooksNodes(self):
289
    """Build list of nodes to run LU's hooks.
290

291
    @rtype: tuple; (list, list)
292
    @return: Tuple containing a list of node names on which the hook
293
      should run before the execution and a list of node names on which the
294
      hook should run after the execution. No nodes should be returned as an
295
      empty list (and not None).
296
    @note: If the C{HPATH} attribute of the LU class is C{None}, this function
297
      will not be called.
298

299
    """
300
    raise NotImplementedError
301

    
302
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
303
    """Notify the LU about the results of its hooks.
304

305
    This method is called every time a hooks phase is executed, and notifies
306
    the Logical Unit about the hooks' result. The LU can then use it to alter
307
    its result based on the hooks.  By default the method does nothing and the
308
    previous result is passed back unchanged but any LU can define it if it
309
    wants to use the local cluster hook-scripts somehow.
310

311
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
312
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
313
    @param hook_results: the results of the multi-node hooks rpc call
314
    @param feedback_fn: function used send feedback back to the caller
315
    @param lu_result: the previous Exec result this LU had, or None
316
        in the PRE phase
317
    @return: the new Exec result, based on the previous result
318
        and hook results
319

320
    """
321
    # API must be kept, thus we ignore the unused argument and could
322
    # be a function warnings
323
    # pylint: disable-msg=W0613,R0201
324
    return lu_result
325

    
326
  def _ExpandAndLockInstance(self):
327
    """Helper function to expand and lock an instance.
328

329
    Many LUs that work on an instance take its name in self.op.instance_name
330
    and need to expand it and then declare the expanded name for locking. This
331
    function does it, and then updates self.op.instance_name to the expanded
332
    name. It also initializes needed_locks as a dict, if this hasn't been done
333
    before.
334

335
    """
336
    if self.needed_locks is None:
337
      self.needed_locks = {}
338
    else:
339
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
340
        "_ExpandAndLockInstance called with instance-level locks set"
341
    self.op.instance_name = _ExpandInstanceName(self.cfg,
342
                                                self.op.instance_name)
343
    self.needed_locks[locking.LEVEL_INSTANCE] = self.op.instance_name
344

    
345
  def _LockInstancesNodes(self, primary_only=False):
346
    """Helper function to declare instances' nodes for locking.
347

348
    This function should be called after locking one or more instances to lock
349
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
350
    with all primary or secondary nodes for instances already locked and
351
    present in self.needed_locks[locking.LEVEL_INSTANCE].
352

353
    It should be called from DeclareLocks, and for safety only works if
354
    self.recalculate_locks[locking.LEVEL_NODE] is set.
355

356
    In the future it may grow parameters to just lock some instance's nodes, or
357
    to just lock primaries or secondary nodes, if needed.
358

359
    If should be called in DeclareLocks in a way similar to::
360

361
      if level == locking.LEVEL_NODE:
362
        self._LockInstancesNodes()
363

364
    @type primary_only: boolean
365
    @param primary_only: only lock primary nodes of locked instances
366

367
    """
368
    assert locking.LEVEL_NODE in self.recalculate_locks, \
369
      "_LockInstancesNodes helper function called with no nodes to recalculate"
370

    
371
    # TODO: check if we're really been called with the instance locks held
372

    
373
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
374
    # future we might want to have different behaviors depending on the value
375
    # of self.recalculate_locks[locking.LEVEL_NODE]
376
    wanted_nodes = []
377
    for instance_name in self.glm.list_owned(locking.LEVEL_INSTANCE):
378
      instance = self.context.cfg.GetInstanceInfo(instance_name)
379
      wanted_nodes.append(instance.primary_node)
380
      if not primary_only:
381
        wanted_nodes.extend(instance.secondary_nodes)
382

    
383
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
384
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
385
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
386
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
387

    
388
    del self.recalculate_locks[locking.LEVEL_NODE]
389

    
390

    
391
class NoHooksLU(LogicalUnit): # pylint: disable-msg=W0223
392
  """Simple LU which runs no hooks.
393

394
  This LU is intended as a parent for other LogicalUnits which will
395
  run no hooks, in order to reduce duplicate code.
396

397
  """
398
  HPATH = None
399
  HTYPE = None
400

    
401
  def BuildHooksEnv(self):
402
    """Empty BuildHooksEnv for NoHooksLu.
403

404
    This just raises an error.
405

406
    """
407
    raise AssertionError("BuildHooksEnv called for NoHooksLUs")
408

    
409
  def BuildHooksNodes(self):
410
    """Empty BuildHooksNodes for NoHooksLU.
411

412
    """
413
    raise AssertionError("BuildHooksNodes called for NoHooksLU")
414

    
415

    
416
class Tasklet:
417
  """Tasklet base class.
418

419
  Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
420
  they can mix legacy code with tasklets. Locking needs to be done in the LU,
421
  tasklets know nothing about locks.
422

423
  Subclasses must follow these rules:
424
    - Implement CheckPrereq
425
    - Implement Exec
426

427
  """
428
  def __init__(self, lu):
429
    self.lu = lu
430

    
431
    # Shortcuts
432
    self.cfg = lu.cfg
433
    self.rpc = lu.rpc
434

    
435
  def CheckPrereq(self):
436
    """Check prerequisites for this tasklets.
437

438
    This method should check whether the prerequisites for the execution of
439
    this tasklet are fulfilled. It can do internode communication, but it
440
    should be idempotent - no cluster or system changes are allowed.
441

442
    The method should raise errors.OpPrereqError in case something is not
443
    fulfilled. Its return value is ignored.
444

445
    This method should also update all parameters to their canonical form if it
446
    hasn't been done before.
447

448
    """
449
    pass
450

    
451
  def Exec(self, feedback_fn):
452
    """Execute the tasklet.
453

454
    This method should implement the actual work. It should raise
455
    errors.OpExecError for failures that are somewhat dealt with in code, or
456
    expected.
457

458
    """
459
    raise NotImplementedError
460

    
461

    
462
class _QueryBase:
463
  """Base for query utility classes.
464

465
  """
466
  #: Attribute holding field definitions
467
  FIELDS = None
468

    
469
  def __init__(self, filter_, fields, use_locking):
470
    """Initializes this class.
471

472
    """
473
    self.use_locking = use_locking
474

    
475
    self.query = query.Query(self.FIELDS, fields, filter_=filter_,
476
                             namefield="name")
477
    self.requested_data = self.query.RequestedData()
478
    self.names = self.query.RequestedNames()
479

    
480
    # Sort only if no names were requested
481
    self.sort_by_name = not self.names
482

    
483
    self.do_locking = None
484
    self.wanted = None
485

    
486
  def _GetNames(self, lu, all_names, lock_level):
487
    """Helper function to determine names asked for in the query.
488

489
    """
490
    if self.do_locking:
491
      names = lu.glm.list_owned(lock_level)
492
    else:
493
      names = all_names
494

    
495
    if self.wanted == locking.ALL_SET:
496
      assert not self.names
497
      # caller didn't specify names, so ordering is not important
498
      return utils.NiceSort(names)
499

    
500
    # caller specified names and we must keep the same order
501
    assert self.names
502
    assert not self.do_locking or lu.glm.is_owned(lock_level)
503

    
504
    missing = set(self.wanted).difference(names)
505
    if missing:
506
      raise errors.OpExecError("Some items were removed before retrieving"
507
                               " their data: %s" % missing)
508

    
509
    # Return expanded names
510
    return self.wanted
511

    
512
  def ExpandNames(self, lu):
513
    """Expand names for this query.
514

515
    See L{LogicalUnit.ExpandNames}.
516

517
    """
518
    raise NotImplementedError()
519

    
520
  def DeclareLocks(self, lu, level):
521
    """Declare locks for this query.
522

523
    See L{LogicalUnit.DeclareLocks}.
524

525
    """
526
    raise NotImplementedError()
527

    
528
  def _GetQueryData(self, lu):
529
    """Collects all data for this query.
530

531
    @return: Query data object
532

533
    """
534
    raise NotImplementedError()
535

    
536
  def NewStyleQuery(self, lu):
537
    """Collect data and execute query.
538

539
    """
540
    return query.GetQueryResponse(self.query, self._GetQueryData(lu),
541
                                  sort_by_name=self.sort_by_name)
542

    
543
  def OldStyleQuery(self, lu):
544
    """Collect data and execute query.
545

546
    """
547
    return self.query.OldStyleQuery(self._GetQueryData(lu),
548
                                    sort_by_name=self.sort_by_name)
549

    
550

    
551
def _ShareAll():
552
  """Returns a dict declaring all lock levels shared.
553

554
  """
555
  return dict.fromkeys(locking.LEVELS, 1)
556

    
557

    
558
def _SupportsOob(cfg, node):
559
  """Tells if node supports OOB.
560

561
  @type cfg: L{config.ConfigWriter}
562
  @param cfg: The cluster configuration
563
  @type node: L{objects.Node}
564
  @param node: The node
565
  @return: The OOB script if supported or an empty string otherwise
566

567
  """
568
  return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM]
569

    
570

    
571
def _GetWantedNodes(lu, nodes):
572
  """Returns list of checked and expanded node names.
573

574
  @type lu: L{LogicalUnit}
575
  @param lu: the logical unit on whose behalf we execute
576
  @type nodes: list
577
  @param nodes: list of node names or None for all nodes
578
  @rtype: list
579
  @return: the list of nodes, sorted
580
  @raise errors.ProgrammerError: if the nodes parameter is wrong type
581

582
  """
583
  if nodes:
584
    return [_ExpandNodeName(lu.cfg, name) for name in nodes]
585

    
586
  return utils.NiceSort(lu.cfg.GetNodeList())
587

    
588

    
589
def _GetWantedInstances(lu, instances):
590
  """Returns list of checked and expanded instance names.
591

592
  @type lu: L{LogicalUnit}
593
  @param lu: the logical unit on whose behalf we execute
594
  @type instances: list
595
  @param instances: list of instance names or None for all instances
596
  @rtype: list
597
  @return: the list of instances, sorted
598
  @raise errors.OpPrereqError: if the instances parameter is wrong type
599
  @raise errors.OpPrereqError: if any of the passed instances is not found
600

601
  """
602
  if instances:
603
    wanted = [_ExpandInstanceName(lu.cfg, name) for name in instances]
604
  else:
605
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
606
  return wanted
607

    
608

    
609
def _GetUpdatedParams(old_params, update_dict,
610
                      use_default=True, use_none=False):
611
  """Return the new version of a parameter dictionary.
612

613
  @type old_params: dict
614
  @param old_params: old parameters
615
  @type update_dict: dict
616
  @param update_dict: dict containing new parameter values, or
617
      constants.VALUE_DEFAULT to reset the parameter to its default
618
      value
619
  @param use_default: boolean
620
  @type use_default: whether to recognise L{constants.VALUE_DEFAULT}
621
      values as 'to be deleted' values
622
  @param use_none: boolean
623
  @type use_none: whether to recognise C{None} values as 'to be
624
      deleted' values
625
  @rtype: dict
626
  @return: the new parameter dictionary
627

628
  """
629
  params_copy = copy.deepcopy(old_params)
630
  for key, val in update_dict.iteritems():
631
    if ((use_default and val == constants.VALUE_DEFAULT) or
632
        (use_none and val is None)):
633
      try:
634
        del params_copy[key]
635
      except KeyError:
636
        pass
637
    else:
638
      params_copy[key] = val
639
  return params_copy
640

    
641

    
642
def _ReleaseLocks(lu, level, names=None, keep=None):
643
  """Releases locks owned by an LU.
644

645
  @type lu: L{LogicalUnit}
646
  @param level: Lock level
647
  @type names: list or None
648
  @param names: Names of locks to release
649
  @type keep: list or None
650
  @param keep: Names of locks to retain
651

652
  """
653
  assert not (keep is not None and names is not None), \
654
         "Only one of the 'names' and the 'keep' parameters can be given"
655

    
656
  if names is not None:
657
    should_release = names.__contains__
658
  elif keep:
659
    should_release = lambda name: name not in keep
660
  else:
661
    should_release = None
662

    
663
  if should_release:
664
    retain = []
665
    release = []
666

    
667
    # Determine which locks to release
668
    for name in lu.glm.list_owned(level):
669
      if should_release(name):
670
        release.append(name)
671
      else:
672
        retain.append(name)
673

    
674
    assert len(lu.glm.list_owned(level)) == (len(retain) + len(release))
675

    
676
    # Release just some locks
677
    lu.glm.release(level, names=release)
678

    
679
    assert frozenset(lu.glm.list_owned(level)) == frozenset(retain)
680
  else:
681
    # Release everything
682
    lu.glm.release(level)
683

    
684
    assert not lu.glm.is_owned(level), "No locks should be owned"
685

    
686

    
687
def _MapInstanceDisksToNodes(instances):
688
  """Creates a map from (node, volume) to instance name.
689

690
  @type instances: list of L{objects.Instance}
691
  @rtype: dict; tuple of (node name, volume name) as key, instance name as value
692

693
  """
694
  return dict(((node, vol), inst.name)
695
              for inst in instances
696
              for (node, vols) in inst.MapLVsByNode().items()
697
              for vol in vols)
698

    
699

    
700
def _RunPostHook(lu, node_name):
701
  """Runs the post-hook for an opcode on a single node.
702

703
  """
704
  hm = lu.proc.hmclass(lu.rpc.call_hooks_runner, lu)
705
  try:
706
    hm.RunPhase(constants.HOOKS_PHASE_POST, nodes=[node_name])
707
  except:
708
    # pylint: disable-msg=W0702
709
    lu.LogWarning("Errors occurred running hooks on %s" % node_name)
710

    
711

    
712
def _CheckOutputFields(static, dynamic, selected):
713
  """Checks whether all selected fields are valid.
714

715
  @type static: L{utils.FieldSet}
716
  @param static: static fields set
717
  @type dynamic: L{utils.FieldSet}
718
  @param dynamic: dynamic fields set
719

720
  """
721
  f = utils.FieldSet()
722
  f.Extend(static)
723
  f.Extend(dynamic)
724

    
725
  delta = f.NonMatching(selected)
726
  if delta:
727
    raise errors.OpPrereqError("Unknown output fields selected: %s"
728
                               % ",".join(delta), errors.ECODE_INVAL)
729

    
730

    
731
def _CheckGlobalHvParams(params):
732
  """Validates that given hypervisor params are not global ones.
733

734
  This will ensure that instances don't get customised versions of
735
  global params.
736

737
  """
738
  used_globals = constants.HVC_GLOBALS.intersection(params)
739
  if used_globals:
740
    msg = ("The following hypervisor parameters are global and cannot"
741
           " be customized at instance level, please modify them at"
742
           " cluster level: %s" % utils.CommaJoin(used_globals))
743
    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
744

    
745

    
746
def _CheckNodeOnline(lu, node, msg=None):
747
  """Ensure that a given node is online.
748

749
  @param lu: the LU on behalf of which we make the check
750
  @param node: the node to check
751
  @param msg: if passed, should be a message to replace the default one
752
  @raise errors.OpPrereqError: if the node is offline
753

754
  """
755
  if msg is None:
756
    msg = "Can't use offline node"
757
  if lu.cfg.GetNodeInfo(node).offline:
758
    raise errors.OpPrereqError("%s: %s" % (msg, node), errors.ECODE_STATE)
759

    
760

    
761
def _CheckNodeNotDrained(lu, node):
762
  """Ensure that a given node is not drained.
763

764
  @param lu: the LU on behalf of which we make the check
765
  @param node: the node to check
766
  @raise errors.OpPrereqError: if the node is drained
767

768
  """
769
  if lu.cfg.GetNodeInfo(node).drained:
770
    raise errors.OpPrereqError("Can't use drained node %s" % node,
771
                               errors.ECODE_STATE)
772

    
773

    
774
def _CheckNodeVmCapable(lu, node):
775
  """Ensure that a given node is vm capable.
776

777
  @param lu: the LU on behalf of which we make the check
778
  @param node: the node to check
779
  @raise errors.OpPrereqError: if the node is not vm capable
780

781
  """
782
  if not lu.cfg.GetNodeInfo(node).vm_capable:
783
    raise errors.OpPrereqError("Can't use non-vm_capable node %s" % node,
784
                               errors.ECODE_STATE)
785

    
786

    
787
def _CheckNodeHasOS(lu, node, os_name, force_variant):
788
  """Ensure that a node supports a given OS.
789

790
  @param lu: the LU on behalf of which we make the check
791
  @param node: the node to check
792
  @param os_name: the OS to query about
793
  @param force_variant: whether to ignore variant errors
794
  @raise errors.OpPrereqError: if the node is not supporting the OS
795

796
  """
797
  result = lu.rpc.call_os_get(node, os_name)
798
  result.Raise("OS '%s' not in supported OS list for node %s" %
799
               (os_name, node),
800
               prereq=True, ecode=errors.ECODE_INVAL)
801
  if not force_variant:
802
    _CheckOSVariant(result.payload, os_name)
803

    
804

    
805
def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
806
  """Ensure that a node has the given secondary ip.
807

808
  @type lu: L{LogicalUnit}
809
  @param lu: the LU on behalf of which we make the check
810
  @type node: string
811
  @param node: the node to check
812
  @type secondary_ip: string
813
  @param secondary_ip: the ip to check
814
  @type prereq: boolean
815
  @param prereq: whether to throw a prerequisite or an execute error
816
  @raise errors.OpPrereqError: if the node doesn't have the ip, and prereq=True
817
  @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
818

819
  """
820
  result = lu.rpc.call_node_has_ip_address(node, secondary_ip)
821
  result.Raise("Failure checking secondary ip on node %s" % node,
822
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
823
  if not result.payload:
824
    msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
825
           " please fix and re-run this command" % secondary_ip)
826
    if prereq:
827
      raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
828
    else:
829
      raise errors.OpExecError(msg)
830

    
831

    
832
def _GetClusterDomainSecret():
833
  """Reads the cluster domain secret.
834

835
  """
836
  return utils.ReadOneLineFile(constants.CLUSTER_DOMAIN_SECRET_FILE,
837
                               strict=True)
838

    
839

    
840
def _CheckInstanceDown(lu, instance, reason):
841
  """Ensure that an instance is not running."""
842
  if instance.admin_up:
843
    raise errors.OpPrereqError("Instance %s is marked to be up, %s" %
844
                               (instance.name, reason), errors.ECODE_STATE)
845

    
846
  pnode = instance.primary_node
847
  ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
848
  ins_l.Raise("Can't contact node %s for instance information" % pnode,
849
              prereq=True, ecode=errors.ECODE_ENVIRON)
850

    
851
  if instance.name in ins_l.payload:
852
    raise errors.OpPrereqError("Instance %s is running, %s" %
853
                               (instance.name, reason), errors.ECODE_STATE)
854

    
855

    
856
def _ExpandItemName(fn, name, kind):
857
  """Expand an item name.
858

859
  @param fn: the function to use for expansion
860
  @param name: requested item name
861
  @param kind: text description ('Node' or 'Instance')
862
  @return: the resolved (full) name
863
  @raise errors.OpPrereqError: if the item is not found
864

865
  """
866
  full_name = fn(name)
867
  if full_name is None:
868
    raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
869
                               errors.ECODE_NOENT)
870
  return full_name
871

    
872

    
873
def _ExpandNodeName(cfg, name):
874
  """Wrapper over L{_ExpandItemName} for nodes."""
875
  return _ExpandItemName(cfg.ExpandNodeName, name, "Node")
876

    
877

    
878
def _ExpandInstanceName(cfg, name):
879
  """Wrapper over L{_ExpandItemName} for instance."""
880
  return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
881

    
882

    
883
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
884
                          memory, vcpus, nics, disk_template, disks,
885
                          bep, hvp, hypervisor_name, tags):
886
  """Builds instance related env variables for hooks
887

888
  This builds the hook environment from individual variables.
889

890
  @type name: string
891
  @param name: the name of the instance
892
  @type primary_node: string
893
  @param primary_node: the name of the instance's primary node
894
  @type secondary_nodes: list
895
  @param secondary_nodes: list of secondary nodes as strings
896
  @type os_type: string
897
  @param os_type: the name of the instance's OS
898
  @type status: boolean
899
  @param status: the should_run status of the instance
900
  @type memory: string
901
  @param memory: the memory size of the instance
902
  @type vcpus: string
903
  @param vcpus: the count of VCPUs the instance has
904
  @type nics: list
905
  @param nics: list of tuples (ip, mac, mode, link) representing
906
      the NICs the instance has
907
  @type disk_template: string
908
  @param disk_template: the disk template of the instance
909
  @type disks: list
910
  @param disks: the list of (size, mode) pairs
911
  @type bep: dict
912
  @param bep: the backend parameters for the instance
913
  @type hvp: dict
914
  @param hvp: the hypervisor parameters for the instance
915
  @type hypervisor_name: string
916
  @param hypervisor_name: the hypervisor for the instance
917
  @type tags: list
918
  @param tags: list of instance tags as strings
919
  @rtype: dict
920
  @return: the hook environment for this instance
921

922
  """
923
  if status:
924
    str_status = "up"
925
  else:
926
    str_status = "down"
927
  env = {
928
    "OP_TARGET": name,
929
    "INSTANCE_NAME": name,
930
    "INSTANCE_PRIMARY": primary_node,
931
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
932
    "INSTANCE_OS_TYPE": os_type,
933
    "INSTANCE_STATUS": str_status,
934
    "INSTANCE_MEMORY": memory,
935
    "INSTANCE_VCPUS": vcpus,
936
    "INSTANCE_DISK_TEMPLATE": disk_template,
937
    "INSTANCE_HYPERVISOR": hypervisor_name,
938
  }
939

    
940
  if nics:
941
    nic_count = len(nics)
942
    for idx, (ip, mac, mode, link) in enumerate(nics):
943
      if ip is None:
944
        ip = ""
945
      env["INSTANCE_NIC%d_IP" % idx] = ip
946
      env["INSTANCE_NIC%d_MAC" % idx] = mac
947
      env["INSTANCE_NIC%d_MODE" % idx] = mode
948
      env["INSTANCE_NIC%d_LINK" % idx] = link
949
      if mode == constants.NIC_MODE_BRIDGED:
950
        env["INSTANCE_NIC%d_BRIDGE" % idx] = link
951
  else:
952
    nic_count = 0
953

    
954
  env["INSTANCE_NIC_COUNT"] = nic_count
955

    
956
  if disks:
957
    disk_count = len(disks)
958
    for idx, (size, mode) in enumerate(disks):
959
      env["INSTANCE_DISK%d_SIZE" % idx] = size
960
      env["INSTANCE_DISK%d_MODE" % idx] = mode
961
  else:
962
    disk_count = 0
963

    
964
  env["INSTANCE_DISK_COUNT"] = disk_count
965

    
966
  if not tags:
967
    tags = []
968

    
969
  env["INSTANCE_TAGS"] = " ".join(tags)
970

    
971
  for source, kind in [(bep, "BE"), (hvp, "HV")]:
972
    for key, value in source.items():
973
      env["INSTANCE_%s_%s" % (kind, key)] = value
974

    
975
  return env
976

    
977

    
978
def _NICListToTuple(lu, nics):
979
  """Build a list of nic information tuples.
980

981
  This list is suitable to be passed to _BuildInstanceHookEnv or as a return
982
  value in LUInstanceQueryData.
983

984
  @type lu:  L{LogicalUnit}
985
  @param lu: the logical unit on whose behalf we execute
986
  @type nics: list of L{objects.NIC}
987
  @param nics: list of nics to convert to hooks tuples
988

989
  """
990
  hooks_nics = []
991
  cluster = lu.cfg.GetClusterInfo()
992
  for nic in nics:
993
    ip = nic.ip
994
    mac = nic.mac
995
    filled_params = cluster.SimpleFillNIC(nic.nicparams)
996
    mode = filled_params[constants.NIC_MODE]
997
    link = filled_params[constants.NIC_LINK]
998
    hooks_nics.append((ip, mac, mode, link))
999
  return hooks_nics
1000

    
1001

    
1002
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
1003
  """Builds instance related env variables for hooks from an object.
1004

1005
  @type lu: L{LogicalUnit}
1006
  @param lu: the logical unit on whose behalf we execute
1007
  @type instance: L{objects.Instance}
1008
  @param instance: the instance for which we should build the
1009
      environment
1010
  @type override: dict
1011
  @param override: dictionary with key/values that will override
1012
      our values
1013
  @rtype: dict
1014
  @return: the hook environment dictionary
1015

1016
  """
1017
  cluster = lu.cfg.GetClusterInfo()
1018
  bep = cluster.FillBE(instance)
1019
  hvp = cluster.FillHV(instance)
1020
  args = {
1021
    "name": instance.name,
1022
    "primary_node": instance.primary_node,
1023
    "secondary_nodes": instance.secondary_nodes,
1024
    "os_type": instance.os,
1025
    "status": instance.admin_up,
1026
    "memory": bep[constants.BE_MEMORY],
1027
    "vcpus": bep[constants.BE_VCPUS],
1028
    "nics": _NICListToTuple(lu, instance.nics),
1029
    "disk_template": instance.disk_template,
1030
    "disks": [(disk.size, disk.mode) for disk in instance.disks],
1031
    "bep": bep,
1032
    "hvp": hvp,
1033
    "hypervisor_name": instance.hypervisor,
1034
    "tags": instance.tags,
1035
  }
1036
  if override:
1037
    args.update(override)
1038
  return _BuildInstanceHookEnv(**args) # pylint: disable-msg=W0142
1039

    
1040

    
1041
def _AdjustCandidatePool(lu, exceptions):
1042
  """Adjust the candidate pool after node operations.
1043

1044
  """
1045
  mod_list = lu.cfg.MaintainCandidatePool(exceptions)
1046
  if mod_list:
1047
    lu.LogInfo("Promoted nodes to master candidate role: %s",
1048
               utils.CommaJoin(node.name for node in mod_list))
1049
    for name in mod_list:
1050
      lu.context.ReaddNode(name)
1051
  mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
1052
  if mc_now > mc_max:
1053
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
1054
               (mc_now, mc_max))
1055

    
1056

    
1057
def _DecideSelfPromotion(lu, exceptions=None):
1058
  """Decide whether I should promote myself as a master candidate.
1059

1060
  """
1061
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
1062
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
1063
  # the new node will increase mc_max with one, so:
1064
  mc_should = min(mc_should + 1, cp_size)
1065
  return mc_now < mc_should
1066

    
1067

    
1068
def _CheckNicsBridgesExist(lu, target_nics, target_node):
1069
  """Check that the brigdes needed by a list of nics exist.
1070

1071
  """
1072
  cluster = lu.cfg.GetClusterInfo()
1073
  paramslist = [cluster.SimpleFillNIC(nic.nicparams) for nic in target_nics]
1074
  brlist = [params[constants.NIC_LINK] for params in paramslist
1075
            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
1076
  if brlist:
1077
    result = lu.rpc.call_bridges_exist(target_node, brlist)
1078
    result.Raise("Error checking bridges on destination node '%s'" %
1079
                 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
1080

    
1081

    
1082
def _CheckInstanceBridgesExist(lu, instance, node=None):
1083
  """Check that the brigdes needed by an instance exist.
1084

1085
  """
1086
  if node is None:
1087
    node = instance.primary_node
1088
  _CheckNicsBridgesExist(lu, instance.nics, node)
1089

    
1090

    
1091
def _CheckOSVariant(os_obj, name):
1092
  """Check whether an OS name conforms to the os variants specification.
1093

1094
  @type os_obj: L{objects.OS}
1095
  @param os_obj: OS object to check
1096
  @type name: string
1097
  @param name: OS name passed by the user, to check for validity
1098

1099
  """
1100
  if not os_obj.supported_variants:
1101
    return
1102
  variant = objects.OS.GetVariant(name)
1103
  if not variant:
1104
    raise errors.OpPrereqError("OS name must include a variant",
1105
                               errors.ECODE_INVAL)
1106

    
1107
  if variant not in os_obj.supported_variants:
1108
    raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
1109

    
1110

    
1111
def _GetNodeInstancesInner(cfg, fn):
1112
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
1113

    
1114

    
1115
def _GetNodeInstances(cfg, node_name):
1116
  """Returns a list of all primary and secondary instances on a node.
1117

1118
  """
1119

    
1120
  return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
1121

    
1122

    
1123
def _GetNodePrimaryInstances(cfg, node_name):
1124
  """Returns primary instances on a node.
1125

1126
  """
1127
  return _GetNodeInstancesInner(cfg,
1128
                                lambda inst: node_name == inst.primary_node)
1129

    
1130

    
1131
def _GetNodeSecondaryInstances(cfg, node_name):
1132
  """Returns secondary instances on a node.
1133

1134
  """
1135
  return _GetNodeInstancesInner(cfg,
1136
                                lambda inst: node_name in inst.secondary_nodes)
1137

    
1138

    
1139
def _GetStorageTypeArgs(cfg, storage_type):
1140
  """Returns the arguments for a storage type.
1141

1142
  """
1143
  # Special case for file storage
1144
  if storage_type == constants.ST_FILE:
1145
    # storage.FileStorage wants a list of storage directories
1146
    return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
1147

    
1148
  return []
1149

    
1150

    
1151
def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
1152
  faulty = []
1153

    
1154
  for dev in instance.disks:
1155
    cfg.SetDiskID(dev, node_name)
1156

    
1157
  result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
1158
  result.Raise("Failed to get disk status from node %s" % node_name,
1159
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
1160

    
1161
  for idx, bdev_status in enumerate(result.payload):
1162
    if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
1163
      faulty.append(idx)
1164

    
1165
  return faulty
1166

    
1167

    
1168
def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
1169
  """Check the sanity of iallocator and node arguments and use the
1170
  cluster-wide iallocator if appropriate.
1171

1172
  Check that at most one of (iallocator, node) is specified. If none is
1173
  specified, then the LU's opcode's iallocator slot is filled with the
1174
  cluster-wide default iallocator.
1175

1176
  @type iallocator_slot: string
1177
  @param iallocator_slot: the name of the opcode iallocator slot
1178
  @type node_slot: string
1179
  @param node_slot: the name of the opcode target node slot
1180

1181
  """
1182
  node = getattr(lu.op, node_slot, None)
1183
  iallocator = getattr(lu.op, iallocator_slot, None)
1184

    
1185
  if node is not None and iallocator is not None:
1186
    raise errors.OpPrereqError("Do not specify both, iallocator and node",
1187
                               errors.ECODE_INVAL)
1188
  elif node is None and iallocator is None:
1189
    default_iallocator = lu.cfg.GetDefaultIAllocator()
1190
    if default_iallocator:
1191
      setattr(lu.op, iallocator_slot, default_iallocator)
1192
    else:
1193
      raise errors.OpPrereqError("No iallocator or node given and no"
1194
                                 " cluster-wide default iallocator found;"
1195
                                 " please specify either an iallocator or a"
1196
                                 " node, or set a cluster-wide default"
1197
                                 " iallocator")
1198

    
1199

    
1200
class LUClusterPostInit(LogicalUnit):
1201
  """Logical unit for running hooks after cluster initialization.
1202

1203
  """
1204
  HPATH = "cluster-init"
1205
  HTYPE = constants.HTYPE_CLUSTER
1206

    
1207
  def BuildHooksEnv(self):
1208
    """Build hooks env.
1209

1210
    """
1211
    return {
1212
      "OP_TARGET": self.cfg.GetClusterName(),
1213
      }
1214

    
1215
  def BuildHooksNodes(self):
1216
    """Build hooks nodes.
1217

1218
    """
1219
    return ([], [self.cfg.GetMasterNode()])
1220

    
1221
  def Exec(self, feedback_fn):
1222
    """Nothing to do.
1223

1224
    """
1225
    return True
1226

    
1227

    
1228
class LUClusterDestroy(LogicalUnit):
1229
  """Logical unit for destroying the cluster.
1230

1231
  """
1232
  HPATH = "cluster-destroy"
1233
  HTYPE = constants.HTYPE_CLUSTER
1234

    
1235
  def BuildHooksEnv(self):
1236
    """Build hooks env.
1237

1238
    """
1239
    return {
1240
      "OP_TARGET": self.cfg.GetClusterName(),
1241
      }
1242

    
1243
  def BuildHooksNodes(self):
1244
    """Build hooks nodes.
1245

1246
    """
1247
    return ([], [])
1248

    
1249
  def CheckPrereq(self):
1250
    """Check prerequisites.
1251

1252
    This checks whether the cluster is empty.
1253

1254
    Any errors are signaled by raising errors.OpPrereqError.
1255

1256
    """
1257
    master = self.cfg.GetMasterNode()
1258

    
1259
    nodelist = self.cfg.GetNodeList()
1260
    if len(nodelist) != 1 or nodelist[0] != master:
1261
      raise errors.OpPrereqError("There are still %d node(s) in"
1262
                                 " this cluster." % (len(nodelist) - 1),
1263
                                 errors.ECODE_INVAL)
1264
    instancelist = self.cfg.GetInstanceList()
1265
    if instancelist:
1266
      raise errors.OpPrereqError("There are still %d instance(s) in"
1267
                                 " this cluster." % len(instancelist),
1268
                                 errors.ECODE_INVAL)
1269

    
1270
  def Exec(self, feedback_fn):
1271
    """Destroys the cluster.
1272

1273
    """
1274
    master = self.cfg.GetMasterNode()
1275

    
1276
    # Run post hooks on master node before it's removed
1277
    _RunPostHook(self, master)
1278

    
1279
    result = self.rpc.call_node_stop_master(master, False)
1280
    result.Raise("Could not disable the master role")
1281

    
1282
    return master
1283

    
1284

    
1285
def _VerifyCertificate(filename):
1286
  """Verifies a certificate for L{LUClusterVerifyConfig}.
1287

1288
  @type filename: string
1289
  @param filename: Path to PEM file
1290

1291
  """
1292
  try:
1293
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1294
                                           utils.ReadFile(filename))
1295
  except Exception, err: # pylint: disable-msg=W0703
1296
    return (LUClusterVerifyConfig.ETYPE_ERROR,
1297
            "Failed to load X509 certificate %s: %s" % (filename, err))
1298

    
1299
  (errcode, msg) = \
1300
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1301
                                constants.SSL_CERT_EXPIRATION_ERROR)
1302

    
1303
  if msg:
1304
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1305
  else:
1306
    fnamemsg = None
1307

    
1308
  if errcode is None:
1309
    return (None, fnamemsg)
1310
  elif errcode == utils.CERT_WARNING:
1311
    return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1312
  elif errcode == utils.CERT_ERROR:
1313
    return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1314

    
1315
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1316

    
1317

    
1318
def _GetAllHypervisorParameters(cluster, instances):
1319
  """Compute the set of all hypervisor parameters.
1320

1321
  @type cluster: L{objects.Cluster}
1322
  @param cluster: the cluster object
1323
  @param instances: list of L{objects.Instance}
1324
  @param instances: additional instances from which to obtain parameters
1325
  @rtype: list of (origin, hypervisor, parameters)
1326
  @return: a list with all parameters found, indicating the hypervisor they
1327
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1328

1329
  """
1330
  hvp_data = []
1331

    
1332
  for hv_name in cluster.enabled_hypervisors:
1333
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1334

    
1335
  for os_name, os_hvp in cluster.os_hvp.items():
1336
    for hv_name, hv_params in os_hvp.items():
1337
      if hv_params:
1338
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1339
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1340

    
1341
  # TODO: collapse identical parameter values in a single one
1342
  for instance in instances:
1343
    if instance.hvparams:
1344
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1345
                       cluster.FillHV(instance)))
1346

    
1347
  return hvp_data
1348

    
1349

    
1350
class _VerifyErrors(object):
1351
  """Mix-in for cluster/group verify LUs.
1352

1353
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1354
  self.op and self._feedback_fn to be available.)
1355

1356
  """
1357
  TCLUSTER = "cluster"
1358
  TNODE = "node"
1359
  TINSTANCE = "instance"
1360

    
1361
  ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
1362
  ECLUSTERCERT = (TCLUSTER, "ECLUSTERCERT")
1363
  ECLUSTERFILECHECK = (TCLUSTER, "ECLUSTERFILECHECK")
1364
  ECLUSTERDANGLINGNODES = (TNODE, "ECLUSTERDANGLINGNODES")
1365
  ECLUSTERDANGLINGINST = (TNODE, "ECLUSTERDANGLINGINST")
1366
  EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
1367
  EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
1368
  EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
1369
  EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
1370
  EINSTANCEFAULTYDISK = (TINSTANCE, "EINSTANCEFAULTYDISK")
1371
  EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE")
1372
  EINSTANCESPLITGROUPS = (TINSTANCE, "EINSTANCESPLITGROUPS")
1373
  ENODEDRBD = (TNODE, "ENODEDRBD")
1374
  ENODEDRBDHELPER = (TNODE, "ENODEDRBDHELPER")
1375
  ENODEFILECHECK = (TNODE, "ENODEFILECHECK")
1376
  ENODEHOOKS = (TNODE, "ENODEHOOKS")
1377
  ENODEHV = (TNODE, "ENODEHV")
1378
  ENODELVM = (TNODE, "ENODELVM")
1379
  ENODEN1 = (TNODE, "ENODEN1")
1380
  ENODENET = (TNODE, "ENODENET")
1381
  ENODEOS = (TNODE, "ENODEOS")
1382
  ENODEORPHANINSTANCE = (TNODE, "ENODEORPHANINSTANCE")
1383
  ENODEORPHANLV = (TNODE, "ENODEORPHANLV")
1384
  ENODERPC = (TNODE, "ENODERPC")
1385
  ENODESSH = (TNODE, "ENODESSH")
1386
  ENODEVERSION = (TNODE, "ENODEVERSION")
1387
  ENODESETUP = (TNODE, "ENODESETUP")
1388
  ENODETIME = (TNODE, "ENODETIME")
1389
  ENODEOOBPATH = (TNODE, "ENODEOOBPATH")
1390

    
1391
  ETYPE_FIELD = "code"
1392
  ETYPE_ERROR = "ERROR"
1393
  ETYPE_WARNING = "WARNING"
1394

    
1395
  def _Error(self, ecode, item, msg, *args, **kwargs):
1396
    """Format an error message.
1397

1398
    Based on the opcode's error_codes parameter, either format a
1399
    parseable error code, or a simpler error string.
1400

1401
    This must be called only from Exec and functions called from Exec.
1402

1403
    """
1404
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1405
    itype, etxt = ecode
1406
    # first complete the msg
1407
    if args:
1408
      msg = msg % args
1409
    # then format the whole message
1410
    if self.op.error_codes: # This is a mix-in. pylint: disable-msg=E1101
1411
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1412
    else:
1413
      if item:
1414
        item = " " + item
1415
      else:
1416
        item = ""
1417
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1418
    # and finally report it via the feedback_fn
1419
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable-msg=E1101
1420

    
1421
  def _ErrorIf(self, cond, *args, **kwargs):
1422
    """Log an error message if the passed condition is True.
1423

1424
    """
1425
    cond = (bool(cond)
1426
            or self.op.debug_simulate_errors) # pylint: disable-msg=E1101
1427
    if cond:
1428
      self._Error(*args, **kwargs)
1429
    # do not mark the operation as failed for WARN cases only
1430
    if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
1431
      self.bad = self.bad or cond
1432

    
1433

    
1434
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1435
  """Verifies the cluster config.
1436

1437
  """
1438
  REQ_BGL = True
1439

    
1440
  def _VerifyHVP(self, hvp_data):
1441
    """Verifies locally the syntax of the hypervisor parameters.
1442

1443
    """
1444
    for item, hv_name, hv_params in hvp_data:
1445
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1446
             (item, hv_name))
1447
      try:
1448
        hv_class = hypervisor.GetHypervisor(hv_name)
1449
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1450
        hv_class.CheckParameterSyntax(hv_params)
1451
      except errors.GenericError, err:
1452
        self._ErrorIf(True, self.ECLUSTERCFG, None, msg % str(err))
1453

    
1454
  def ExpandNames(self):
1455
    # Information can be safely retrieved as the BGL is acquired in exclusive
1456
    # mode
1457
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1458
    self.all_node_info = self.cfg.GetAllNodesInfo()
1459
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1460
    self.needed_locks = {}
1461

    
1462
  def Exec(self, feedback_fn):
1463
    """Verify integrity of cluster, performing various test on nodes.
1464

1465
    """
1466
    self.bad = False
1467
    self._feedback_fn = feedback_fn
1468

    
1469
    feedback_fn("* Verifying cluster config")
1470

    
1471
    for msg in self.cfg.VerifyConfig():
1472
      self._ErrorIf(True, self.ECLUSTERCFG, None, msg)
1473

    
1474
    feedback_fn("* Verifying cluster certificate files")
1475

    
1476
    for cert_filename in constants.ALL_CERT_FILES:
1477
      (errcode, msg) = _VerifyCertificate(cert_filename)
1478
      self._ErrorIf(errcode, self.ECLUSTERCERT, None, msg, code=errcode)
1479

    
1480
    feedback_fn("* Verifying hypervisor parameters")
1481

    
1482
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1483
                                                self.all_inst_info.values()))
1484

    
1485
    feedback_fn("* Verifying all nodes belong to an existing group")
1486

    
1487
    # We do this verification here because, should this bogus circumstance
1488
    # occur, it would never be caught by VerifyGroup, which only acts on
1489
    # nodes/instances reachable from existing node groups.
1490

    
1491
    dangling_nodes = set(node.name for node in self.all_node_info.values()
1492
                         if node.group not in self.all_group_info)
1493

    
1494
    dangling_instances = {}
1495
    no_node_instances = []
1496

    
1497
    for inst in self.all_inst_info.values():
1498
      if inst.primary_node in dangling_nodes:
1499
        dangling_instances.setdefault(inst.primary_node, []).append(inst.name)
1500
      elif inst.primary_node not in self.all_node_info:
1501
        no_node_instances.append(inst.name)
1502

    
1503
    pretty_dangling = [
1504
        "%s (%s)" %
1505
        (node.name,
1506
         utils.CommaJoin(dangling_instances.get(node.name,
1507
                                                ["no instances"])))
1508
        for node in dangling_nodes]
1509

    
1510
    self._ErrorIf(bool(dangling_nodes), self.ECLUSTERDANGLINGNODES, None,
1511
                  "the following nodes (and their instances) belong to a non"
1512
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1513

    
1514
    self._ErrorIf(bool(no_node_instances), self.ECLUSTERDANGLINGINST, None,
1515
                  "the following instances have a non-existing primary-node:"
1516
                  " %s", utils.CommaJoin(no_node_instances))
1517

    
1518
    return (not self.bad, [g.name for g in self.all_group_info.values()])
1519

    
1520

    
1521
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1522
  """Verifies the status of a node group.
1523

1524
  """
1525
  HPATH = "cluster-verify"
1526
  HTYPE = constants.HTYPE_CLUSTER
1527
  REQ_BGL = False
1528

    
1529
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1530

    
1531
  class NodeImage(object):
1532
    """A class representing the logical and physical status of a node.
1533

1534
    @type name: string
1535
    @ivar name: the node name to which this object refers
1536
    @ivar volumes: a structure as returned from
1537
        L{ganeti.backend.GetVolumeList} (runtime)
1538
    @ivar instances: a list of running instances (runtime)
1539
    @ivar pinst: list of configured primary instances (config)
1540
    @ivar sinst: list of configured secondary instances (config)
1541
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1542
        instances for which this node is secondary (config)
1543
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1544
    @ivar dfree: free disk, as reported by the node (runtime)
1545
    @ivar offline: the offline status (config)
1546
    @type rpc_fail: boolean
1547
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1548
        not whether the individual keys were correct) (runtime)
1549
    @type lvm_fail: boolean
1550
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1551
    @type hyp_fail: boolean
1552
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1553
    @type ghost: boolean
1554
    @ivar ghost: whether this is a known node or not (config)
1555
    @type os_fail: boolean
1556
    @ivar os_fail: whether the RPC call didn't return valid OS data
1557
    @type oslist: list
1558
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1559
    @type vm_capable: boolean
1560
    @ivar vm_capable: whether the node can host instances
1561

1562
    """
1563
    def __init__(self, offline=False, name=None, vm_capable=True):
1564
      self.name = name
1565
      self.volumes = {}
1566
      self.instances = []
1567
      self.pinst = []
1568
      self.sinst = []
1569
      self.sbp = {}
1570
      self.mfree = 0
1571
      self.dfree = 0
1572
      self.offline = offline
1573
      self.vm_capable = vm_capable
1574
      self.rpc_fail = False
1575
      self.lvm_fail = False
1576
      self.hyp_fail = False
1577
      self.ghost = False
1578
      self.os_fail = False
1579
      self.oslist = {}
1580

    
1581
  def ExpandNames(self):
1582
    # This raises errors.OpPrereqError on its own:
1583
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1584

    
1585
    # Get instances in node group; this is unsafe and needs verification later
1586
    inst_names = self.cfg.GetNodeGroupInstances(self.group_uuid)
1587

    
1588
    self.needed_locks = {
1589
      locking.LEVEL_INSTANCE: inst_names,
1590
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1591
      locking.LEVEL_NODE: [],
1592
      }
1593

    
1594
    self.share_locks = _ShareAll()
1595

    
1596
  def DeclareLocks(self, level):
1597
    if level == locking.LEVEL_NODE:
1598
      # Get members of node group; this is unsafe and needs verification later
1599
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1600

    
1601
      all_inst_info = self.cfg.GetAllInstancesInfo()
1602

    
1603
      # In Exec(), we warn about mirrored instances that have primary and
1604
      # secondary living in separate node groups. To fully verify that
1605
      # volumes for these instances are healthy, we will need to do an
1606
      # extra call to their secondaries. We ensure here those nodes will
1607
      # be locked.
1608
      for inst in self.glm.list_owned(locking.LEVEL_INSTANCE):
1609
        # Important: access only the instances whose lock is owned
1610
        if all_inst_info[inst].disk_template in constants.DTS_INT_MIRROR:
1611
          nodes.update(all_inst_info[inst].secondary_nodes)
1612

    
1613
      self.needed_locks[locking.LEVEL_NODE] = nodes
1614

    
1615
  def CheckPrereq(self):
1616
    group_nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1617
    group_instances = self.cfg.GetNodeGroupInstances(self.group_uuid)
1618

    
1619
    unlocked_nodes = \
1620
        group_nodes.difference(self.glm.list_owned(locking.LEVEL_NODE))
1621

    
1622
    unlocked_instances = \
1623
        group_instances.difference(self.glm.list_owned(locking.LEVEL_INSTANCE))
1624

    
1625
    if unlocked_nodes:
1626
      raise errors.OpPrereqError("Missing lock for nodes: %s" %
1627
                                 utils.CommaJoin(unlocked_nodes))
1628

    
1629
    if unlocked_instances:
1630
      raise errors.OpPrereqError("Missing lock for instances: %s" %
1631
                                 utils.CommaJoin(unlocked_instances))
1632

    
1633
    self.all_node_info = self.cfg.GetAllNodesInfo()
1634
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1635

    
1636
    self.my_node_names = utils.NiceSort(group_nodes)
1637
    self.my_inst_names = utils.NiceSort(group_instances)
1638

    
1639
    self.my_node_info = dict((name, self.all_node_info[name])
1640
                             for name in self.my_node_names)
1641

    
1642
    self.my_inst_info = dict((name, self.all_inst_info[name])
1643
                             for name in self.my_inst_names)
1644

    
1645
    # We detect here the nodes that will need the extra RPC calls for verifying
1646
    # split LV volumes; they should be locked.
1647
    extra_lv_nodes = set()
1648

    
1649
    for inst in self.my_inst_info.values():
1650
      if inst.disk_template in constants.DTS_INT_MIRROR:
1651
        group = self.my_node_info[inst.primary_node].group
1652
        for nname in inst.secondary_nodes:
1653
          if self.all_node_info[nname].group != group:
1654
            extra_lv_nodes.add(nname)
1655

    
1656
    unlocked_lv_nodes = \
1657
        extra_lv_nodes.difference(self.glm.list_owned(locking.LEVEL_NODE))
1658

    
1659
    if unlocked_lv_nodes:
1660
      raise errors.OpPrereqError("these nodes could be locked: %s" %
1661
                                 utils.CommaJoin(unlocked_lv_nodes))
1662
    self.extra_lv_nodes = list(extra_lv_nodes)
1663

    
1664
  def _VerifyNode(self, ninfo, nresult):
1665
    """Perform some basic validation on data returned from a node.
1666

1667
      - check the result data structure is well formed and has all the
1668
        mandatory fields
1669
      - check ganeti version
1670

1671
    @type ninfo: L{objects.Node}
1672
    @param ninfo: the node to check
1673
    @param nresult: the results from the node
1674
    @rtype: boolean
1675
    @return: whether overall this call was successful (and we can expect
1676
         reasonable values in the respose)
1677

1678
    """
1679
    node = ninfo.name
1680
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1681

    
1682
    # main result, nresult should be a non-empty dict
1683
    test = not nresult or not isinstance(nresult, dict)
1684
    _ErrorIf(test, self.ENODERPC, node,
1685
                  "unable to verify node: no data returned")
1686
    if test:
1687
      return False
1688

    
1689
    # compares ganeti version
1690
    local_version = constants.PROTOCOL_VERSION
1691
    remote_version = nresult.get("version", None)
1692
    test = not (remote_version and
1693
                isinstance(remote_version, (list, tuple)) and
1694
                len(remote_version) == 2)
1695
    _ErrorIf(test, self.ENODERPC, node,
1696
             "connection to node returned invalid data")
1697
    if test:
1698
      return False
1699

    
1700
    test = local_version != remote_version[0]
1701
    _ErrorIf(test, self.ENODEVERSION, node,
1702
             "incompatible protocol versions: master %s,"
1703
             " node %s", local_version, remote_version[0])
1704
    if test:
1705
      return False
1706

    
1707
    # node seems compatible, we can actually try to look into its results
1708

    
1709
    # full package version
1710
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1711
                  self.ENODEVERSION, node,
1712
                  "software version mismatch: master %s, node %s",
1713
                  constants.RELEASE_VERSION, remote_version[1],
1714
                  code=self.ETYPE_WARNING)
1715

    
1716
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1717
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1718
      for hv_name, hv_result in hyp_result.iteritems():
1719
        test = hv_result is not None
1720
        _ErrorIf(test, self.ENODEHV, node,
1721
                 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1722

    
1723
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1724
    if ninfo.vm_capable and isinstance(hvp_result, list):
1725
      for item, hv_name, hv_result in hvp_result:
1726
        _ErrorIf(True, self.ENODEHV, node,
1727
                 "hypervisor %s parameter verify failure (source %s): %s",
1728
                 hv_name, item, hv_result)
1729

    
1730
    test = nresult.get(constants.NV_NODESETUP,
1731
                       ["Missing NODESETUP results"])
1732
    _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s",
1733
             "; ".join(test))
1734

    
1735
    return True
1736

    
1737
  def _VerifyNodeTime(self, ninfo, nresult,
1738
                      nvinfo_starttime, nvinfo_endtime):
1739
    """Check the node time.
1740

1741
    @type ninfo: L{objects.Node}
1742
    @param ninfo: the node to check
1743
    @param nresult: the remote results for the node
1744
    @param nvinfo_starttime: the start time of the RPC call
1745
    @param nvinfo_endtime: the end time of the RPC call
1746

1747
    """
1748
    node = ninfo.name
1749
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1750

    
1751
    ntime = nresult.get(constants.NV_TIME, None)
1752
    try:
1753
      ntime_merged = utils.MergeTime(ntime)
1754
    except (ValueError, TypeError):
1755
      _ErrorIf(True, self.ENODETIME, node, "Node returned invalid time")
1756
      return
1757

    
1758
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1759
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1760
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1761
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1762
    else:
1763
      ntime_diff = None
1764

    
1765
    _ErrorIf(ntime_diff is not None, self.ENODETIME, node,
1766
             "Node time diverges by at least %s from master node time",
1767
             ntime_diff)
1768

    
1769
  def _VerifyNodeLVM(self, ninfo, nresult, vg_name):
1770
    """Check the node LVM results.
1771

1772
    @type ninfo: L{objects.Node}
1773
    @param ninfo: the node to check
1774
    @param nresult: the remote results for the node
1775
    @param vg_name: the configured VG name
1776

1777
    """
1778
    if vg_name is None:
1779
      return
1780

    
1781
    node = ninfo.name
1782
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1783

    
1784
    # checks vg existence and size > 20G
1785
    vglist = nresult.get(constants.NV_VGLIST, None)
1786
    test = not vglist
1787
    _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups")
1788
    if not test:
1789
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1790
                                            constants.MIN_VG_SIZE)
1791
      _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus)
1792

    
1793
    # check pv names
1794
    pvlist = nresult.get(constants.NV_PVLIST, None)
1795
    test = pvlist is None
1796
    _ErrorIf(test, self.ENODELVM, node, "Can't get PV list from node")
1797
    if not test:
1798
      # check that ':' is not present in PV names, since it's a
1799
      # special character for lvcreate (denotes the range of PEs to
1800
      # use on the PV)
1801
      for _, pvname, owner_vg in pvlist:
1802
        test = ":" in pvname
1803
        _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV"
1804
                 " '%s' of VG '%s'", pvname, owner_vg)
1805

    
1806
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1807
    """Check the node bridges.
1808

1809
    @type ninfo: L{objects.Node}
1810
    @param ninfo: the node to check
1811
    @param nresult: the remote results for the node
1812
    @param bridges: the expected list of bridges
1813

1814
    """
1815
    if not bridges:
1816
      return
1817

    
1818
    node = ninfo.name
1819
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1820

    
1821
    missing = nresult.get(constants.NV_BRIDGES, None)
1822
    test = not isinstance(missing, list)
1823
    _ErrorIf(test, self.ENODENET, node,
1824
             "did not return valid bridge information")
1825
    if not test:
1826
      _ErrorIf(bool(missing), self.ENODENET, node, "missing bridges: %s" %
1827
               utils.CommaJoin(sorted(missing)))
1828

    
1829
  def _VerifyNodeNetwork(self, ninfo, nresult):
1830
    """Check the node network connectivity results.
1831

1832
    @type ninfo: L{objects.Node}
1833
    @param ninfo: the node to check
1834
    @param nresult: the remote results for the node
1835

1836
    """
1837
    node = ninfo.name
1838
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1839

    
1840
    test = constants.NV_NODELIST not in nresult
1841
    _ErrorIf(test, self.ENODESSH, node,
1842
             "node hasn't returned node ssh connectivity data")
1843
    if not test:
1844
      if nresult[constants.NV_NODELIST]:
1845
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1846
          _ErrorIf(True, self.ENODESSH, node,
1847
                   "ssh communication with node '%s': %s", a_node, a_msg)
1848

    
1849
    test = constants.NV_NODENETTEST not in nresult
1850
    _ErrorIf(test, self.ENODENET, node,
1851
             "node hasn't returned node tcp connectivity data")
1852
    if not test:
1853
      if nresult[constants.NV_NODENETTEST]:
1854
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1855
        for anode in nlist:
1856
          _ErrorIf(True, self.ENODENET, node,
1857
                   "tcp communication with node '%s': %s",
1858
                   anode, nresult[constants.NV_NODENETTEST][anode])
1859

    
1860
    test = constants.NV_MASTERIP not in nresult
1861
    _ErrorIf(test, self.ENODENET, node,
1862
             "node hasn't returned node master IP reachability data")
1863
    if not test:
1864
      if not nresult[constants.NV_MASTERIP]:
1865
        if node == self.master_node:
1866
          msg = "the master node cannot reach the master IP (not configured?)"
1867
        else:
1868
          msg = "cannot reach the master IP"
1869
        _ErrorIf(True, self.ENODENET, node, msg)
1870

    
1871
  def _VerifyInstance(self, instance, instanceconfig, node_image,
1872
                      diskstatus):
1873
    """Verify an instance.
1874

1875
    This function checks to see if the required block devices are
1876
    available on the instance's node.
1877

1878
    """
1879
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1880
    node_current = instanceconfig.primary_node
1881

    
1882
    node_vol_should = {}
1883
    instanceconfig.MapLVsByNode(node_vol_should)
1884

    
1885
    for node in node_vol_should:
1886
      n_img = node_image[node]
1887
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1888
        # ignore missing volumes on offline or broken nodes
1889
        continue
1890
      for volume in node_vol_should[node]:
1891
        test = volume not in n_img.volumes
1892
        _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance,
1893
                 "volume %s missing on node %s", volume, node)
1894

    
1895
    if instanceconfig.admin_up:
1896
      pri_img = node_image[node_current]
1897
      test = instance not in pri_img.instances and not pri_img.offline
1898
      _ErrorIf(test, self.EINSTANCEDOWN, instance,
1899
               "instance not running on its primary node %s",
1900
               node_current)
1901

    
1902
    diskdata = [(nname, success, status, idx)
1903
                for (nname, disks) in diskstatus.items()
1904
                for idx, (success, status) in enumerate(disks)]
1905

    
1906
    for nname, success, bdev_status, idx in diskdata:
1907
      # the 'ghost node' construction in Exec() ensures that we have a
1908
      # node here
1909
      snode = node_image[nname]
1910
      bad_snode = snode.ghost or snode.offline
1911
      _ErrorIf(instanceconfig.admin_up and not success and not bad_snode,
1912
               self.EINSTANCEFAULTYDISK, instance,
1913
               "couldn't retrieve status for disk/%s on %s: %s",
1914
               idx, nname, bdev_status)
1915
      _ErrorIf((instanceconfig.admin_up and success and
1916
                bdev_status.ldisk_status == constants.LDS_FAULTY),
1917
               self.EINSTANCEFAULTYDISK, instance,
1918
               "disk/%s on %s is faulty", idx, nname)
1919

    
1920
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
1921
    """Verify if there are any unknown volumes in the cluster.
1922

1923
    The .os, .swap and backup volumes are ignored. All other volumes are
1924
    reported as unknown.
1925

1926
    @type reserved: L{ganeti.utils.FieldSet}
1927
    @param reserved: a FieldSet of reserved volume names
1928

1929
    """
1930
    for node, n_img in node_image.items():
1931
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1932
        # skip non-healthy nodes
1933
        continue
1934
      for volume in n_img.volumes:
1935
        test = ((node not in node_vol_should or
1936
                volume not in node_vol_should[node]) and
1937
                not reserved.Matches(volume))
1938
        self._ErrorIf(test, self.ENODEORPHANLV, node,
1939
                      "volume %s is unknown", volume)
1940

    
1941
  def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
1942
    """Verify N+1 Memory Resilience.
1943

1944
    Check that if one single node dies we can still start all the
1945
    instances it was primary for.
1946

1947
    """
1948
    cluster_info = self.cfg.GetClusterInfo()
1949
    for node, n_img in node_image.items():
1950
      # This code checks that every node which is now listed as
1951
      # secondary has enough memory to host all instances it is
1952
      # supposed to should a single other node in the cluster fail.
1953
      # FIXME: not ready for failover to an arbitrary node
1954
      # FIXME: does not support file-backed instances
1955
      # WARNING: we currently take into account down instances as well
1956
      # as up ones, considering that even if they're down someone
1957
      # might want to start them even in the event of a node failure.
1958
      if n_img.offline:
1959
        # we're skipping offline nodes from the N+1 warning, since
1960
        # most likely we don't have good memory infromation from them;
1961
        # we already list instances living on such nodes, and that's
1962
        # enough warning
1963
        continue
1964
      for prinode, instances in n_img.sbp.items():
1965
        needed_mem = 0
1966
        for instance in instances:
1967
          bep = cluster_info.FillBE(instance_cfg[instance])
1968
          if bep[constants.BE_AUTO_BALANCE]:
1969
            needed_mem += bep[constants.BE_MEMORY]
1970
        test = n_img.mfree < needed_mem
1971
        self._ErrorIf(test, self.ENODEN1, node,
1972
                      "not enough memory to accomodate instance failovers"
1973
                      " should node %s fail (%dMiB needed, %dMiB available)",
1974
                      prinode, needed_mem, n_img.mfree)
1975

    
1976
  @classmethod
1977
  def _VerifyFiles(cls, errorif, nodeinfo, master_node, all_nvinfo,
1978
                   (files_all, files_all_opt, files_mc, files_vm)):
1979
    """Verifies file checksums collected from all nodes.
1980

1981
    @param errorif: Callback for reporting errors
1982
    @param nodeinfo: List of L{objects.Node} objects
1983
    @param master_node: Name of master node
1984
    @param all_nvinfo: RPC results
1985

1986
    """
1987
    node_names = frozenset(node.name for node in nodeinfo)
1988

    
1989
    assert master_node in node_names
1990
    assert (len(files_all | files_all_opt | files_mc | files_vm) ==
1991
            sum(map(len, [files_all, files_all_opt, files_mc, files_vm]))), \
1992
           "Found file listed in more than one file list"
1993

    
1994
    # Define functions determining which nodes to consider for a file
1995
    file2nodefn = dict([(filename, fn)
1996
      for (files, fn) in [(files_all, None),
1997
                          (files_all_opt, None),
1998
                          (files_mc, lambda node: (node.master_candidate or
1999
                                                   node.name == master_node)),
2000
                          (files_vm, lambda node: node.vm_capable)]
2001
      for filename in files])
2002

    
2003
    fileinfo = dict((filename, {}) for filename in file2nodefn.keys())
2004

    
2005
    for node in nodeinfo:
2006
      nresult = all_nvinfo[node.name]
2007

    
2008
      if nresult.fail_msg or not nresult.payload:
2009
        node_files = None
2010
      else:
2011
        node_files = nresult.payload.get(constants.NV_FILELIST, None)
2012

    
2013
      test = not (node_files and isinstance(node_files, dict))
2014
      errorif(test, cls.ENODEFILECHECK, node.name,
2015
              "Node did not return file checksum data")
2016
      if test:
2017
        continue
2018

    
2019
      for (filename, checksum) in node_files.items():
2020
        # Check if the file should be considered for a node
2021
        fn = file2nodefn[filename]
2022
        if fn is None or fn(node):
2023
          fileinfo[filename].setdefault(checksum, set()).add(node.name)
2024

    
2025
    for (filename, checksums) in fileinfo.items():
2026
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2027

    
2028
      # Nodes having the file
2029
      with_file = frozenset(node_name
2030
                            for nodes in fileinfo[filename].values()
2031
                            for node_name in nodes)
2032

    
2033
      # Nodes missing file
2034
      missing_file = node_names - with_file
2035

    
2036
      if filename in files_all_opt:
2037
        # All or no nodes
2038
        errorif(missing_file and missing_file != node_names,
2039
                cls.ECLUSTERFILECHECK, None,
2040
                "File %s is optional, but it must exist on all or no"
2041
                " nodes (not found on %s)",
2042
                filename, utils.CommaJoin(utils.NiceSort(missing_file)))
2043
      else:
2044
        errorif(missing_file, cls.ECLUSTERFILECHECK, None,
2045
                "File %s is missing from node(s) %s", filename,
2046
                utils.CommaJoin(utils.NiceSort(missing_file)))
2047

    
2048
      # See if there are multiple versions of the file
2049
      test = len(checksums) > 1
2050
      if test:
2051
        variants = ["variant %s on %s" %
2052
                    (idx + 1, utils.CommaJoin(utils.NiceSort(nodes)))
2053
                    for (idx, (checksum, nodes)) in
2054
                      enumerate(sorted(checksums.items()))]
2055
      else:
2056
        variants = []
2057

    
2058
      errorif(test, cls.ECLUSTERFILECHECK, None,
2059
              "File %s found with %s different checksums (%s)",
2060
              filename, len(checksums), "; ".join(variants))
2061

    
2062
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2063
                      drbd_map):
2064
    """Verifies and the node DRBD status.
2065

2066
    @type ninfo: L{objects.Node}
2067
    @param ninfo: the node to check
2068
    @param nresult: the remote results for the node
2069
    @param instanceinfo: the dict of instances
2070
    @param drbd_helper: the configured DRBD usermode helper
2071
    @param drbd_map: the DRBD map as returned by
2072
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2073

2074
    """
2075
    node = ninfo.name
2076
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2077

    
2078
    if drbd_helper:
2079
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2080
      test = (helper_result == None)
2081
      _ErrorIf(test, self.ENODEDRBDHELPER, node,
2082
               "no drbd usermode helper returned")
2083
      if helper_result:
2084
        status, payload = helper_result
2085
        test = not status
2086
        _ErrorIf(test, self.ENODEDRBDHELPER, node,
2087
                 "drbd usermode helper check unsuccessful: %s", payload)
2088
        test = status and (payload != drbd_helper)
2089
        _ErrorIf(test, self.ENODEDRBDHELPER, node,
2090
                 "wrong drbd usermode helper: %s", payload)
2091

    
2092
    # compute the DRBD minors
2093
    node_drbd = {}
2094
    for minor, instance in drbd_map[node].items():
2095
      test = instance not in instanceinfo
2096
      _ErrorIf(test, self.ECLUSTERCFG, None,
2097
               "ghost instance '%s' in temporary DRBD map", instance)
2098
        # ghost instance should not be running, but otherwise we
2099
        # don't give double warnings (both ghost instance and
2100
        # unallocated minor in use)
2101
      if test:
2102
        node_drbd[minor] = (instance, False)
2103
      else:
2104
        instance = instanceinfo[instance]
2105
        node_drbd[minor] = (instance.name, instance.admin_up)
2106

    
2107
    # and now check them
2108
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2109
    test = not isinstance(used_minors, (tuple, list))
2110
    _ErrorIf(test, self.ENODEDRBD, node,
2111
             "cannot parse drbd status file: %s", str(used_minors))
2112
    if test:
2113
      # we cannot check drbd status
2114
      return
2115

    
2116
    for minor, (iname, must_exist) in node_drbd.items():
2117
      test = minor not in used_minors and must_exist
2118
      _ErrorIf(test, self.ENODEDRBD, node,
2119
               "drbd minor %d of instance %s is not active", minor, iname)
2120
    for minor in used_minors:
2121
      test = minor not in node_drbd
2122
      _ErrorIf(test, self.ENODEDRBD, node,
2123
               "unallocated drbd minor %d is in use", minor)
2124

    
2125
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2126
    """Builds the node OS structures.
2127

2128
    @type ninfo: L{objects.Node}
2129
    @param ninfo: the node to check
2130
    @param nresult: the remote results for the node
2131
    @param nimg: the node image object
2132

2133
    """
2134
    node = ninfo.name
2135
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2136

    
2137
    remote_os = nresult.get(constants.NV_OSLIST, None)
2138
    test = (not isinstance(remote_os, list) or
2139
            not compat.all(isinstance(v, list) and len(v) == 7
2140
                           for v in remote_os))
2141

    
2142
    _ErrorIf(test, self.ENODEOS, node,
2143
             "node hasn't returned valid OS data")
2144

    
2145
    nimg.os_fail = test
2146

    
2147
    if test:
2148
      return
2149

    
2150
    os_dict = {}
2151

    
2152
    for (name, os_path, status, diagnose,
2153
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2154

    
2155
      if name not in os_dict:
2156
        os_dict[name] = []
2157

    
2158
      # parameters is a list of lists instead of list of tuples due to
2159
      # JSON lacking a real tuple type, fix it:
2160
      parameters = [tuple(v) for v in parameters]
2161
      os_dict[name].append((os_path, status, diagnose,
2162
                            set(variants), set(parameters), set(api_ver)))
2163

    
2164
    nimg.oslist = os_dict
2165

    
2166
  def _VerifyNodeOS(self, ninfo, nimg, base):
2167
    """Verifies the node OS list.
2168

2169
    @type ninfo: L{objects.Node}
2170
    @param ninfo: the node to check
2171
    @param nimg: the node image object
2172
    @param base: the 'template' node we match against (e.g. from the master)
2173

2174
    """
2175
    node = ninfo.name
2176
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2177

    
2178
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2179

    
2180
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2181
    for os_name, os_data in nimg.oslist.items():
2182
      assert os_data, "Empty OS status for OS %s?!" % os_name
2183
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2184
      _ErrorIf(not f_status, self.ENODEOS, node,
2185
               "Invalid OS %s (located at %s): %s", os_name, f_path, f_diag)
2186
      _ErrorIf(len(os_data) > 1, self.ENODEOS, node,
2187
               "OS '%s' has multiple entries (first one shadows the rest): %s",
2188
               os_name, utils.CommaJoin([v[0] for v in os_data]))
2189
      # this will catched in backend too
2190
      _ErrorIf(compat.any(v >= constants.OS_API_V15 for v in f_api)
2191
               and not f_var, self.ENODEOS, node,
2192
               "OS %s with API at least %d does not declare any variant",
2193
               os_name, constants.OS_API_V15)
2194
      # comparisons with the 'base' image
2195
      test = os_name not in base.oslist
2196
      _ErrorIf(test, self.ENODEOS, node,
2197
               "Extra OS %s not present on reference node (%s)",
2198
               os_name, base.name)
2199
      if test:
2200
        continue
2201
      assert base.oslist[os_name], "Base node has empty OS status?"
2202
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2203
      if not b_status:
2204
        # base OS is invalid, skipping
2205
        continue
2206
      for kind, a, b in [("API version", f_api, b_api),
2207
                         ("variants list", f_var, b_var),
2208
                         ("parameters", beautify_params(f_param),
2209
                          beautify_params(b_param))]:
2210
        _ErrorIf(a != b, self.ENODEOS, node,
2211
                 "OS %s for %s differs from reference node %s: [%s] vs. [%s]",
2212
                 kind, os_name, base.name,
2213
                 utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2214

    
2215
    # check any missing OSes
2216
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2217
    _ErrorIf(missing, self.ENODEOS, node,
2218
             "OSes present on reference node %s but missing on this node: %s",
2219
             base.name, utils.CommaJoin(missing))
2220

    
2221
  def _VerifyOob(self, ninfo, nresult):
2222
    """Verifies out of band functionality of a node.
2223

2224
    @type ninfo: L{objects.Node}
2225
    @param ninfo: the node to check
2226
    @param nresult: the remote results for the node
2227

2228
    """
2229
    node = ninfo.name
2230
    # We just have to verify the paths on master and/or master candidates
2231
    # as the oob helper is invoked on the master
2232
    if ((ninfo.master_candidate or ninfo.master_capable) and
2233
        constants.NV_OOB_PATHS in nresult):
2234
      for path_result in nresult[constants.NV_OOB_PATHS]:
2235
        self._ErrorIf(path_result, self.ENODEOOBPATH, node, path_result)
2236

    
2237
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2238
    """Verifies and updates the node volume data.
2239

2240
    This function will update a L{NodeImage}'s internal structures
2241
    with data from the remote call.
2242

2243
    @type ninfo: L{objects.Node}
2244
    @param ninfo: the node to check
2245
    @param nresult: the remote results for the node
2246
    @param nimg: the node image object
2247
    @param vg_name: the configured VG name
2248

2249
    """
2250
    node = ninfo.name
2251
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2252

    
2253
    nimg.lvm_fail = True
2254
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2255
    if vg_name is None:
2256
      pass
2257
    elif isinstance(lvdata, basestring):
2258
      _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s",
2259
               utils.SafeEncode(lvdata))
2260
    elif not isinstance(lvdata, dict):
2261
      _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)")
2262
    else:
2263
      nimg.volumes = lvdata
2264
      nimg.lvm_fail = False
2265

    
2266
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2267
    """Verifies and updates the node instance list.
2268

2269
    If the listing was successful, then updates this node's instance
2270
    list. Otherwise, it marks the RPC call as failed for the instance
2271
    list key.
2272

2273
    @type ninfo: L{objects.Node}
2274
    @param ninfo: the node to check
2275
    @param nresult: the remote results for the node
2276
    @param nimg: the node image object
2277

2278
    """
2279
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2280
    test = not isinstance(idata, list)
2281
    self._ErrorIf(test, self.ENODEHV, ninfo.name, "rpc call to node failed"
2282
                  " (instancelist): %s", utils.SafeEncode(str(idata)))
2283
    if test:
2284
      nimg.hyp_fail = True
2285
    else:
2286
      nimg.instances = idata
2287

    
2288
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2289
    """Verifies and computes a node information map
2290

2291
    @type ninfo: L{objects.Node}
2292
    @param ninfo: the node to check
2293
    @param nresult: the remote results for the node
2294
    @param nimg: the node image object
2295
    @param vg_name: the configured VG name
2296

2297
    """
2298
    node = ninfo.name
2299
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2300

    
2301
    # try to read free memory (from the hypervisor)
2302
    hv_info = nresult.get(constants.NV_HVINFO, None)
2303
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2304
    _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)")
2305
    if not test:
2306
      try:
2307
        nimg.mfree = int(hv_info["memory_free"])
2308
      except (ValueError, TypeError):
2309
        _ErrorIf(True, self.ENODERPC, node,
2310
                 "node returned invalid nodeinfo, check hypervisor")
2311

    
2312
    # FIXME: devise a free space model for file based instances as well
2313
    if vg_name is not None:
2314
      test = (constants.NV_VGLIST not in nresult or
2315
              vg_name not in nresult[constants.NV_VGLIST])
2316
      _ErrorIf(test, self.ENODELVM, node,
2317
               "node didn't return data for the volume group '%s'"
2318
               " - it is either missing or broken", vg_name)
2319
      if not test:
2320
        try:
2321
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2322
        except (ValueError, TypeError):
2323
          _ErrorIf(True, self.ENODERPC, node,
2324
                   "node returned invalid LVM info, check LVM status")
2325

    
2326
  def _CollectDiskInfo(self, nodelist, node_image, instanceinfo):
2327
    """Gets per-disk status information for all instances.
2328

2329
    @type nodelist: list of strings
2330
    @param nodelist: Node names
2331
    @type node_image: dict of (name, L{objects.Node})
2332
    @param node_image: Node objects
2333
    @type instanceinfo: dict of (name, L{objects.Instance})
2334
    @param instanceinfo: Instance objects
2335
    @rtype: {instance: {node: [(succes, payload)]}}
2336
    @return: a dictionary of per-instance dictionaries with nodes as
2337
        keys and disk information as values; the disk information is a
2338
        list of tuples (success, payload)
2339

2340
    """
2341
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2342

    
2343
    node_disks = {}
2344
    node_disks_devonly = {}
2345
    diskless_instances = set()
2346
    diskless = constants.DT_DISKLESS
2347

    
2348
    for nname in nodelist:
2349
      node_instances = list(itertools.chain(node_image[nname].pinst,
2350
                                            node_image[nname].sinst))
2351
      diskless_instances.update(inst for inst in node_instances
2352
                                if instanceinfo[inst].disk_template == diskless)
2353
      disks = [(inst, disk)
2354
               for inst in node_instances
2355
               for disk in instanceinfo[inst].disks]
2356

    
2357
      if not disks:
2358
        # No need to collect data
2359
        continue
2360

    
2361
      node_disks[nname] = disks
2362

    
2363
      # Creating copies as SetDiskID below will modify the objects and that can
2364
      # lead to incorrect data returned from nodes
2365
      devonly = [dev.Copy() for (_, dev) in disks]
2366

    
2367
      for dev in devonly:
2368
        self.cfg.SetDiskID(dev, nname)
2369

    
2370
      node_disks_devonly[nname] = devonly
2371

    
2372
    assert len(node_disks) == len(node_disks_devonly)
2373

    
2374
    # Collect data from all nodes with disks
2375
    result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2376
                                                          node_disks_devonly)
2377

    
2378
    assert len(result) == len(node_disks)
2379

    
2380
    instdisk = {}
2381

    
2382
    for (nname, nres) in result.items():
2383
      disks = node_disks[nname]
2384

    
2385
      if nres.offline:
2386
        # No data from this node
2387
        data = len(disks) * [(False, "node offline")]
2388
      else:
2389
        msg = nres.fail_msg
2390
        _ErrorIf(msg, self.ENODERPC, nname,
2391
                 "while getting disk information: %s", msg)
2392
        if msg:
2393
          # No data from this node
2394
          data = len(disks) * [(False, msg)]
2395
        else:
2396
          data = []
2397
          for idx, i in enumerate(nres.payload):
2398
            if isinstance(i, (tuple, list)) and len(i) == 2:
2399
              data.append(i)
2400
            else:
2401
              logging.warning("Invalid result from node %s, entry %d: %s",
2402
                              nname, idx, i)
2403
              data.append((False, "Invalid result from the remote node"))
2404

    
2405
      for ((inst, _), status) in zip(disks, data):
2406
        instdisk.setdefault(inst, {}).setdefault(nname, []).append(status)
2407

    
2408
    # Add empty entries for diskless instances.
2409
    for inst in diskless_instances:
2410
      assert inst not in instdisk
2411
      instdisk[inst] = {}
2412

    
2413
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2414
                      len(nnames) <= len(instanceinfo[inst].all_nodes) and
2415
                      compat.all(isinstance(s, (tuple, list)) and
2416
                                 len(s) == 2 for s in statuses)
2417
                      for inst, nnames in instdisk.items()
2418
                      for nname, statuses in nnames.items())
2419
    assert set(instdisk) == set(instanceinfo), "instdisk consistency failure"
2420

    
2421
    return instdisk
2422

    
2423
  def BuildHooksEnv(self):
2424
    """Build hooks env.
2425

2426
    Cluster-Verify hooks just ran in the post phase and their failure makes
2427
    the output be logged in the verify output and the verification to fail.
2428

2429
    """
2430
    env = {
2431
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
2432
      }
2433

    
2434
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2435
               for node in self.my_node_info.values())
2436

    
2437
    return env
2438

    
2439
  def BuildHooksNodes(self):
2440
    """Build hooks nodes.
2441

2442
    """
2443
    return ([], self.my_node_names)
2444

    
2445
  def Exec(self, feedback_fn):
2446
    """Verify integrity of the node group, performing various test on nodes.
2447

2448
    """
2449
    # This method has too many local variables. pylint: disable-msg=R0914
2450

    
2451
    if not self.my_node_names:
2452
      # empty node group
2453
      feedback_fn("* Empty node group, skipping verification")
2454
      return True
2455

    
2456
    self.bad = False
2457
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2458
    verbose = self.op.verbose
2459
    self._feedback_fn = feedback_fn
2460

    
2461
    vg_name = self.cfg.GetVGName()
2462
    drbd_helper = self.cfg.GetDRBDHelper()
2463
    cluster = self.cfg.GetClusterInfo()
2464
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
2465
    hypervisors = cluster.enabled_hypervisors
2466
    node_data_list = [self.my_node_info[name] for name in self.my_node_names]
2467

    
2468
    i_non_redundant = [] # Non redundant instances
2469
    i_non_a_balanced = [] # Non auto-balanced instances
2470
    n_offline = 0 # Count of offline nodes
2471
    n_drained = 0 # Count of nodes being drained
2472
    node_vol_should = {}
2473

    
2474
    # FIXME: verify OS list
2475

    
2476
    # File verification
2477
    filemap = _ComputeAncillaryFiles(cluster, False)
2478

    
2479
    # do local checksums
2480
    master_node = self.master_node = self.cfg.GetMasterNode()
2481
    master_ip = self.cfg.GetMasterIP()
2482

    
2483
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_names))
2484

    
2485
    # We will make nodes contact all nodes in their group, and one node from
2486
    # every other group.
2487
    # TODO: should it be a *random* node, different every time?
2488
    online_nodes = [node.name for node in node_data_list if not node.offline]
2489
    other_group_nodes = {}
2490

    
2491
    for name in sorted(self.all_node_info):
2492
      node = self.all_node_info[name]
2493
      if (node.group not in other_group_nodes
2494
          and node.group != self.group_uuid
2495
          and not node.offline):
2496
        other_group_nodes[node.group] = node.name
2497

    
2498
    node_verify_param = {
2499
      constants.NV_FILELIST:
2500
        utils.UniqueSequence(filename
2501
                             for files in filemap
2502
                             for filename in files),
2503
      constants.NV_NODELIST: online_nodes + other_group_nodes.values(),
2504
      constants.NV_HYPERVISOR: hypervisors,
2505
      constants.NV_HVPARAMS:
2506
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2507
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2508
                                 for node in node_data_list
2509
                                 if not node.offline],
2510
      constants.NV_INSTANCELIST: hypervisors,
2511
      constants.NV_VERSION: None,
2512
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2513
      constants.NV_NODESETUP: None,
2514
      constants.NV_TIME: None,
2515
      constants.NV_MASTERIP: (master_node, master_ip),
2516
      constants.NV_OSLIST: None,
2517
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2518
      }
2519

    
2520
    if vg_name is not None:
2521
      node_verify_param[constants.NV_VGLIST] = None
2522
      node_verify_param[constants.NV_LVLIST] = vg_name
2523
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2524
      node_verify_param[constants.NV_DRBDLIST] = None
2525

    
2526
    if drbd_helper:
2527
      node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2528

    
2529
    # bridge checks
2530
    # FIXME: this needs to be changed per node-group, not cluster-wide
2531
    bridges = set()
2532
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2533
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2534
      bridges.add(default_nicpp[constants.NIC_LINK])
2535
    for instance in self.my_inst_info.values():
2536
      for nic in instance.nics:
2537
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
2538
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2539
          bridges.add(full_nic[constants.NIC_LINK])
2540

    
2541
    if bridges:
2542
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
2543

    
2544
    # Build our expected cluster state
2545
    node_image = dict((node.name, self.NodeImage(offline=node.offline,
2546
                                                 name=node.name,
2547
                                                 vm_capable=node.vm_capable))
2548
                      for node in node_data_list)
2549

    
2550
    # Gather OOB paths
2551
    oob_paths = []
2552
    for node in self.all_node_info.values():
2553
      path = _SupportsOob(self.cfg, node)
2554
      if path and path not in oob_paths:
2555
        oob_paths.append(path)
2556

    
2557
    if oob_paths:
2558
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2559

    
2560
    for instance in self.my_inst_names:
2561
      inst_config = self.my_inst_info[instance]
2562

    
2563
      for nname in inst_config.all_nodes:
2564
        if nname not in node_image:
2565
          gnode = self.NodeImage(name=nname)
2566
          gnode.ghost = (nname not in self.all_node_info)
2567
          node_image[nname] = gnode
2568

    
2569
      inst_config.MapLVsByNode(node_vol_should)
2570

    
2571
      pnode = inst_config.primary_node
2572
      node_image[pnode].pinst.append(instance)
2573

    
2574
      for snode in inst_config.secondary_nodes:
2575
        nimg = node_image[snode]
2576
        nimg.sinst.append(instance)
2577
        if pnode not in nimg.sbp:
2578
          nimg.sbp[pnode] = []
2579
        nimg.sbp[pnode].append(instance)
2580

    
2581
    # At this point, we have the in-memory data structures complete,
2582
    # except for the runtime information, which we'll gather next
2583

    
2584
    # Due to the way our RPC system works, exact response times cannot be
2585
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2586
    # time before and after executing the request, we can at least have a time
2587
    # window.
2588
    nvinfo_starttime = time.time()
2589
    all_nvinfo = self.rpc.call_node_verify(self.my_node_names,
2590
                                           node_verify_param,
2591
                                           self.cfg.GetClusterName())
2592
    nvinfo_endtime = time.time()
2593

    
2594
    if self.extra_lv_nodes and vg_name is not None:
2595
      extra_lv_nvinfo = \
2596
          self.rpc.call_node_verify(self.extra_lv_nodes,
2597
                                    {constants.NV_LVLIST: vg_name},
2598
                                    self.cfg.GetClusterName())
2599
    else:
2600
      extra_lv_nvinfo = {}
2601

    
2602
    all_drbd_map = self.cfg.ComputeDRBDMap()
2603

    
2604
    feedback_fn("* Gathering disk information (%s nodes)" %
2605
                len(self.my_node_names))
2606
    instdisk = self._CollectDiskInfo(self.my_node_names, node_image,
2607
                                     self.my_inst_info)
2608

    
2609
    feedback_fn("* Verifying configuration file consistency")
2610

    
2611
    # If not all nodes are being checked, we need to make sure the master node
2612
    # and a non-checked vm_capable node are in the list.
2613
    absent_nodes = set(self.all_node_info).difference(self.my_node_info)
2614
    if absent_nodes:
2615
      vf_nvinfo = all_nvinfo.copy()
2616
      vf_node_info = list(self.my_node_info.values())
2617
      additional_nodes = []
2618
      if master_node not in self.my_node_info:
2619
        additional_nodes.append(master_node)
2620
        vf_node_info.append(self.all_node_info[master_node])
2621
      # Add the first vm_capable node we find which is not included
2622
      for node in absent_nodes:
2623
        nodeinfo = self.all_node_info[node]
2624
        if nodeinfo.vm_capable and not nodeinfo.offline:
2625
          additional_nodes.append(node)
2626
          vf_node_info.append(self.all_node_info[node])
2627
          break
2628
      key = constants.NV_FILELIST
2629
      vf_nvinfo.update(self.rpc.call_node_verify(additional_nodes,
2630
                                                 {key: node_verify_param[key]},
2631
                                                 self.cfg.GetClusterName()))
2632
    else:
2633
      vf_nvinfo = all_nvinfo
2634
      vf_node_info = self.my_node_info.values()
2635

    
2636
    self._VerifyFiles(_ErrorIf, vf_node_info, master_node, vf_nvinfo, filemap)
2637

    
2638
    feedback_fn("* Verifying node status")
2639

    
2640
    refos_img = None
2641

    
2642
    for node_i in node_data_list:
2643
      node = node_i.name
2644
      nimg = node_image[node]
2645

    
2646
      if node_i.offline:
2647
        if verbose:
2648
          feedback_fn("* Skipping offline node %s" % (node,))
2649
        n_offline += 1
2650
        continue
2651

    
2652
      if node == master_node:
2653
        ntype = "master"
2654
      elif node_i.master_candidate:
2655
        ntype = "master candidate"
2656
      elif node_i.drained:
2657
        ntype = "drained"
2658
        n_drained += 1
2659
      else:
2660
        ntype = "regular"
2661
      if verbose:
2662
        feedback_fn("* Verifying node %s (%s)" % (node, ntype))
2663

    
2664
      msg = all_nvinfo[node].fail_msg
2665
      _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg)
2666
      if msg:
2667
        nimg.rpc_fail = True
2668
        continue
2669

    
2670
      nresult = all_nvinfo[node].payload
2671

    
2672
      nimg.call_ok = self._VerifyNode(node_i, nresult)
2673
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2674
      self._VerifyNodeNetwork(node_i, nresult)
2675
      self._VerifyOob(node_i, nresult)
2676

    
2677
      if nimg.vm_capable:
2678
        self._VerifyNodeLVM(node_i, nresult, vg_name)
2679
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
2680
                             all_drbd_map)
2681

    
2682
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2683
        self._UpdateNodeInstances(node_i, nresult, nimg)
2684
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2685
        self._UpdateNodeOS(node_i, nresult, nimg)
2686

    
2687
        if not nimg.os_fail:
2688
          if refos_img is None:
2689
            refos_img = nimg
2690
          self._VerifyNodeOS(node_i, nimg, refos_img)
2691
        self._VerifyNodeBridges(node_i, nresult, bridges)
2692

    
2693
        # Check whether all running instancies are primary for the node. (This
2694
        # can no longer be done from _VerifyInstance below, since some of the
2695
        # wrong instances could be from other node groups.)
2696
        non_primary_inst = set(nimg.instances).difference(nimg.pinst)
2697

    
2698
        for inst in non_primary_inst:
2699
          test = inst in self.all_inst_info
2700
          _ErrorIf(test, self.EINSTANCEWRONGNODE, inst,
2701
                   "instance should not run on node %s", node_i.name)
2702
          _ErrorIf(not test, self.ENODEORPHANINSTANCE, node_i.name,
2703
                   "node is running unknown instance %s", inst)
2704

    
2705
    for node, result in extra_lv_nvinfo.items():
2706
      self._UpdateNodeVolumes(self.all_node_info[node], result.payload,
2707
                              node_image[node], vg_name)
2708

    
2709
    feedback_fn("* Verifying instance status")
2710
    for instance in self.my_inst_names:
2711
      if verbose:
2712
        feedback_fn("* Verifying instance %s" % instance)
2713
      inst_config = self.my_inst_info[instance]
2714
      self._VerifyInstance(instance, inst_config, node_image,
2715
                           instdisk[instance])
2716
      inst_nodes_offline = []
2717

    
2718
      pnode = inst_config.primary_node
2719
      pnode_img = node_image[pnode]
2720
      _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2721
               self.ENODERPC, pnode, "instance %s, connection to"
2722
               " primary node failed", instance)
2723

    
2724
      _ErrorIf(inst_config.admin_up and pnode_img.offline,
2725
               self.EINSTANCEBADNODE, instance,
2726
               "instance is marked as running and lives on offline node %s",
2727
               inst_config.primary_node)
2728

    
2729
      # If the instance is non-redundant we cannot survive losing its primary
2730
      # node, so we are not N+1 compliant. On the other hand we have no disk
2731
      # templates with more than one secondary so that situation is not well
2732
      # supported either.
2733
      # FIXME: does not support file-backed instances
2734
      if not inst_config.secondary_nodes:
2735
        i_non_redundant.append(instance)
2736

    
2737
      _ErrorIf(len(inst_config.secondary_nodes) > 1, self.EINSTANCELAYOUT,
2738
               instance, "instance has multiple secondary nodes: %s",
2739
               utils.CommaJoin(inst_config.secondary_nodes),
2740
               code=self.ETYPE_WARNING)
2741

    
2742
      if inst_config.disk_template in constants.DTS_INT_MIRROR:
2743
        pnode = inst_config.primary_node
2744
        instance_nodes = utils.NiceSort(inst_config.all_nodes)
2745
        instance_groups = {}
2746

    
2747
        for node in instance_nodes:
2748
          instance_groups.setdefault(self.all_node_info[node].group,
2749
                                     []).append(node)
2750

    
2751
        pretty_list = [
2752
          "%s (group %s)" % (utils.CommaJoin(nodes), groupinfo[group].name)
2753
          # Sort so that we always list the primary node first.
2754
          for group, nodes in sorted(instance_groups.items(),
2755
                                     key=lambda (_, nodes): pnode in nodes,
2756
                                     reverse=True)]
2757

    
2758
        self._ErrorIf(len(instance_groups) > 1, self.EINSTANCESPLITGROUPS,
2759
                      instance, "instance has primary and secondary nodes in"
2760
                      " different groups: %s", utils.CommaJoin(pretty_list),
2761
                      code=self.ETYPE_WARNING)
2762

    
2763
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
2764
        i_non_a_balanced.append(instance)
2765

    
2766
      for snode in inst_config.secondary_nodes:
2767
        s_img = node_image[snode]
2768
        _ErrorIf(s_img.rpc_fail and not s_img.offline, self.ENODERPC, snode,
2769
                 "instance %s, connection to secondary node failed", instance)
2770

    
2771
        if s_img.offline:
2772
          inst_nodes_offline.append(snode)
2773

    
2774
      # warn that the instance lives on offline nodes
2775
      _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance,
2776
               "instance has offline secondary node(s) %s",
2777
               utils.CommaJoin(inst_nodes_offline))
2778
      # ... or ghost/non-vm_capable nodes
2779
      for node in inst_config.all_nodes:
2780
        _ErrorIf(node_image[node].ghost, self.EINSTANCEBADNODE, instance,
2781
                 "instance lives on ghost node %s", node)
2782
        _ErrorIf(not node_image[node].vm_capable, self.EINSTANCEBADNODE,
2783
                 instance, "instance lives on non-vm_capable node %s", node)
2784

    
2785
    feedback_fn("* Verifying orphan volumes")
2786
    reserved = utils.FieldSet(*cluster.reserved_lvs)
2787

    
2788
    # We will get spurious "unknown volume" warnings if any node of this group
2789
    # is secondary for an instance whose primary is in another group. To avoid
2790
    # them, we find these instances and add their volumes to node_vol_should.
2791
    for inst in self.all_inst_info.values():
2792
      for secondary in inst.secondary_nodes:
2793
        if (secondary in self.my_node_info
2794
            and inst.name not in self.my_inst_info):
2795
          inst.MapLVsByNode(node_vol_should)
2796
          break
2797

    
2798
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
2799

    
2800
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
2801
      feedback_fn("* Verifying N+1 Memory redundancy")
2802
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
2803

    
2804
    feedback_fn("* Other Notes")
2805
    if i_non_redundant:
2806
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
2807
                  % len(i_non_redundant))
2808

    
2809
    if i_non_a_balanced:
2810
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
2811
                  % len(i_non_a_balanced))
2812

    
2813
    if n_offline:
2814
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
2815

    
2816
    if n_drained:
2817
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
2818

    
2819
    return not self.bad
2820

    
2821
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
2822
    """Analyze the post-hooks' result
2823

2824
    This method analyses the hook result, handles it, and sends some
2825
    nicely-formatted feedback back to the user.
2826

2827
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
2828
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
2829
    @param hooks_results: the results of the multi-node hooks rpc call
2830
    @param feedback_fn: function used send feedback back to the caller
2831
    @param lu_result: previous Exec result
2832
    @return: the new Exec result, based on the previous result
2833
        and hook results
2834

2835
    """
2836
    # We only really run POST phase hooks, only for non-empty groups,
2837
    # and are only interested in their results
2838
    if not self.my_node_names:
2839
      # empty node group
2840
      pass
2841
    elif phase == constants.HOOKS_PHASE_POST:
2842
      # Used to change hooks' output to proper indentation
2843
      feedback_fn("* Hooks Results")
2844
      assert hooks_results, "invalid result from hooks"
2845

    
2846
      for node_name in hooks_results:
2847
        res = hooks_results[node_name]
2848
        msg = res.fail_msg
2849
        test = msg and not res.offline
2850
        self._ErrorIf(test, self.ENODEHOOKS, node_name,
2851
                      "Communication failure in hooks execution: %s", msg)
2852
        if res.offline or msg:
2853
          # No need to investigate payload if node is offline or gave an error.
2854
          # override manually lu_result here as _ErrorIf only
2855
          # overrides self.bad
2856
          lu_result = 1
2857
          continue
2858
        for script, hkr, output in res.payload:
2859
          test = hkr == constants.HKR_FAIL
2860
          self._ErrorIf(test, self.ENODEHOOKS, node_name,
2861
                        "Script %s failed, output:", script)
2862
          if test:
2863
            output = self._HOOKS_INDENT_RE.sub("      ", output)
2864
            feedback_fn("%s" % output)
2865
            lu_result = 0
2866

    
2867
    return lu_result
2868

    
2869

    
2870
class LUClusterVerifyDisks(NoHooksLU):
2871
  """Verifies the cluster disks status.
2872

2873
  """
2874
  REQ_BGL = False
2875

    
2876
  def ExpandNames(self):
2877
    self.share_locks = _ShareAll()
2878
    self.needed_locks = {
2879
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
2880
      }
2881

    
2882
  def Exec(self, feedback_fn):
2883
    group_names = self.glm.list_owned(locking.LEVEL_NODEGROUP)
2884

    
2885
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
2886
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
2887
                           for group in group_names])
2888

    
2889

    
2890
class LUGroupVerifyDisks(NoHooksLU):
2891
  """Verifies the status of all disks in a node group.
2892

2893
  """
2894
  REQ_BGL = False
2895

    
2896
  def ExpandNames(self):
2897
    # Raises errors.OpPrereqError on its own if group can't be found
2898
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
2899

    
2900
    self.share_locks = _ShareAll()
2901
    self.needed_locks = {
2902
      locking.LEVEL_INSTANCE: [],
2903
      locking.LEVEL_NODEGROUP: [],
2904
      locking.LEVEL_NODE: [],
2905
      }
2906

    
2907
  def DeclareLocks(self, level):
2908
    if level == locking.LEVEL_INSTANCE:
2909
      assert not self.needed_locks[locking.LEVEL_INSTANCE]
2910

    
2911
      # Lock instances optimistically, needs verification once node and group
2912
      # locks have been acquired
2913
      self.needed_locks[locking.LEVEL_INSTANCE] = \
2914
        self.cfg.GetNodeGroupInstances(self.group_uuid)
2915

    
2916
    elif level == locking.LEVEL_NODEGROUP:
2917
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
2918

    
2919
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
2920
        set([self.group_uuid] +
2921
            # Lock all groups used by instances optimistically; this requires
2922
            # going via the node before it's locked, requiring verification
2923
            # later on
2924
            [group_uuid
2925
             for instance_name in
2926
               self.glm.list_owned(locking.LEVEL_INSTANCE)
2927
             for group_uuid in
2928
               self.cfg.GetInstanceNodeGroups(instance_name)])
2929

    
2930
    elif level == locking.LEVEL_NODE:
2931
      # This will only lock the nodes in the group to be verified which contain
2932
      # actual instances
2933
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
2934
      self._LockInstancesNodes()
2935

    
2936
      # Lock all nodes in group to be verified
2937
      assert self.group_uuid in self.glm.list_owned(locking.LEVEL_NODEGROUP)
2938
      member_nodes = self.cfg.GetNodeGroup(self.group_uuid).members
2939
      self.needed_locks[locking.LEVEL_NODE].extend(member_nodes)
2940

    
2941
  def CheckPrereq(self):
2942
    owned_instances = frozenset(self.glm.list_owned(locking.LEVEL_INSTANCE))
2943
    owned_groups = frozenset(self.glm.list_owned(locking.LEVEL_NODEGROUP))
2944
    owned_nodes = frozenset(self.glm.list_owned(locking.LEVEL_NODE))
2945

    
2946
    assert self.group_uuid in owned_groups
2947

    
2948
    # Check if locked instances are still correct
2949
    wanted_instances = self.cfg.GetNodeGroupInstances(self.group_uuid)
2950
    if owned_instances != wanted_instances:
2951
      raise errors.OpPrereqError("Instances in node group %s changed since"
2952
                                 " locks were acquired, wanted %s, have %s;"
2953
                                 " retry the operation" %
2954
                                 (self.op.group_name,
2955
                                  utils.CommaJoin(wanted_instances),
2956
                                  utils.CommaJoin(owned_instances)),
2957
                                 errors.ECODE_STATE)
2958

    
2959
    # Get instance information
2960
    self.instances = dict((name, self.cfg.GetInstanceInfo(name))
2961
                          for name in owned_instances)
2962

    
2963
    # Check if node groups for locked instances are still correct
2964
    for (instance_name, inst) in self.instances.items():
2965
      assert self.group_uuid in self.cfg.GetInstanceNodeGroups(instance_name), \
2966
        "Instance %s has no node in group %s" % (instance_name, self.group_uuid)
2967
      assert owned_nodes.issuperset(inst.all_nodes), \
2968
        "Instance %s's nodes changed while we kept the lock" % instance_name
2969

    
2970
      inst_groups = self.cfg.GetInstanceNodeGroups(instance_name)
2971
      if not owned_groups.issuperset(inst_groups):
2972
        raise errors.OpPrereqError("Instance %s's node groups changed since"
2973
                                   " locks were acquired, current groups are"
2974
                                   " are '%s', owning groups '%s'; retry the"
2975
                                   " operation" %
2976
                                   (instance_name,
2977
                                    utils.CommaJoin(inst_groups),
2978
                                    utils.CommaJoin(owned_groups)),
2979
                                   errors.ECODE_STATE)
2980

    
2981
  def Exec(self, feedback_fn):
2982
    """Verify integrity of cluster disks.
2983

2984
    @rtype: tuple of three items
2985
    @return: a tuple of (dict of node-to-node_error, list of instances
2986
        which need activate-disks, dict of instance: (node, volume) for
2987
        missing volumes
2988

2989
    """
2990
    res_nodes = {}
2991
    res_instances = set()
2992
    res_missing = {}
2993

    
2994
    nv_dict = _MapInstanceDisksToNodes([inst
2995
                                        for inst in self.instances.values()
2996
                                        if inst.admin_up])
2997

    
2998
    if nv_dict:
2999
      nodes = utils.NiceSort(set(self.glm.list_owned(locking.LEVEL_NODE)) &
3000
                             set(self.cfg.GetVmCapableNodeList()))
3001

    
3002
      node_lvs = self.rpc.call_lv_list(nodes, [])
3003

    
3004
      for (node, node_res) in node_lvs.items():
3005
        if node_res.offline:
3006
          continue
3007

    
3008
        msg = node_res.fail_msg
3009
        if msg:
3010
          logging.warning("Error enumerating LVs on node %s: %s", node, msg)
3011
          res_nodes[node] = msg
3012
          continue
3013

    
3014
        for lv_name, (_, _, lv_online) in node_res.payload.items():
3015
          inst = nv_dict.pop((node, lv_name), None)
3016
          if not (lv_online or inst is None):
3017
            res_instances.add(inst)
3018

    
3019
      # any leftover items in nv_dict are missing LVs, let's arrange the data
3020
      # better
3021
      for key, inst in nv_dict.iteritems():
3022
        res_missing.setdefault(inst, []).append(key)
3023

    
3024
    return (res_nodes, list(res_instances), res_missing)
3025

    
3026

    
3027
class LUClusterRepairDiskSizes(NoHooksLU):
3028
  """Verifies the cluster disks sizes.
3029

3030
  """
3031
  REQ_BGL = False
3032

    
3033
  def ExpandNames(self):
3034
    if self.op.instances:
3035
      self.wanted_names = _GetWantedInstances(self, self.op.instances)
3036
      self.needed_locks = {
3037
        locking.LEVEL_NODE: [],
3038
        locking.LEVEL_INSTANCE: self.wanted_names,
3039
        }
3040
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3041
    else:
3042
      self.wanted_names = None
3043
      self.needed_locks = {
3044
        locking.LEVEL_NODE: locking.ALL_SET,
3045
        locking.LEVEL_INSTANCE: locking.ALL_SET,
3046
        }
3047
    self.share_locks = _ShareAll()
3048

    
3049
  def DeclareLocks(self, level):
3050
    if level == locking.LEVEL_NODE and self.wanted_names is not None:
3051
      self._LockInstancesNodes(primary_only=True)
3052

    
3053
  def CheckPrereq(self):
3054
    """Check prerequisites.
3055

3056
    This only checks the optional instance list against the existing names.
3057

3058
    """
3059
    if self.wanted_names is None:
3060
      self.wanted_names = self.glm.list_owned(locking.LEVEL_INSTANCE)
3061

    
3062
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3063
                             in self.wanted_names]
3064

    
3065
  def _EnsureChildSizes(self, disk):
3066
    """Ensure children of the disk have the needed disk size.
3067

3068
    This is valid mainly for DRBD8 and fixes an issue where the
3069
    children have smaller disk size.
3070

3071
    @param disk: an L{ganeti.objects.Disk} object
3072

3073
    """
3074
    if disk.dev_type == constants.LD_DRBD8:
3075
      assert disk.children, "Empty children for DRBD8?"
3076
      fchild = disk.children[0]
3077
      mismatch = fchild.size < disk.size
3078
      if mismatch:
3079
        self.LogInfo("Child disk has size %d, parent %d, fixing",
3080
                     fchild.size, disk.size)
3081
        fchild.size = disk.size
3082

    
3083
      # and we recurse on this child only, not on the metadev
3084
      return self._EnsureChildSizes(fchild) or mismatch
3085
    else:
3086
      return False
3087

    
3088
  def Exec(self, feedback_fn):
3089
    """Verify the size of cluster disks.
3090

3091
    """
3092
    # TODO: check child disks too
3093
    # TODO: check differences in size between primary/secondary nodes
3094
    per_node_disks = {}
3095
    for instance in self.wanted_instances:
3096
      pnode = instance.primary_node
3097
      if pnode not in per_node_disks:
3098
        per_node_disks[pnode] = []
3099
      for idx, disk in enumerate(instance.disks):
3100
        per_node_disks[pnode].append((instance, idx, disk))
3101

    
3102
    changed = []
3103
    for node, dskl in per_node_disks.items():
3104
      newl = [v[2].Copy() for v in dskl]
3105
      for dsk in newl:
3106
        self.cfg.SetDiskID(dsk, node)
3107
      result = self.rpc.call_blockdev_getsize(node, newl)
3108
      if result.fail_msg:
3109
        self.LogWarning("Failure in blockdev_getsize call to node"
3110
                        " %s, ignoring", node)
3111
        continue
3112
      if len(result.payload) != len(dskl):
3113
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
3114
                        " result.payload=%s", node, len(dskl), result.payload)
3115
        self.LogWarning("Invalid result from node %s, ignoring node results",
3116
                        node)
3117
        continue
3118
      for ((instance, idx, disk), size) in zip(dskl, result.payload):
3119
        if size is None:
3120
          self.LogWarning("Disk %d of instance %s did not return size"
3121
                          " information, ignoring", idx, instance.name)
3122
          continue
3123
        if not isinstance(size, (int, long)):
3124
          self.LogWarning("Disk %d of instance %s did not return valid"
3125
                          " size information, ignoring", idx, instance.name)
3126
          continue
3127
        size = size >> 20
3128
        if size != disk.size:
3129
          self.LogInfo("Disk %d of instance %s has mismatched size,"
3130
                       " correcting: recorded %d, actual %d", idx,
3131
                       instance.name, disk.size, size)
3132
          disk.size = size
3133
          self.cfg.Update(instance, feedback_fn)
3134
          changed.append((instance.name, idx, size))
3135
        if self._EnsureChildSizes(disk):
3136
          self.cfg.Update(instance, feedback_fn)
3137
          changed.append((instance.name, idx, disk.size))
3138
    return changed
3139

    
3140

    
3141
class LUClusterRename(LogicalUnit):
3142
  """Rename the cluster.
3143

3144
  """
3145
  HPATH = "cluster-rename"
3146
  HTYPE = constants.HTYPE_CLUSTER
3147

    
3148
  def BuildHooksEnv(self):
3149
    """Build hooks env.
3150

3151
    """
3152
    return {
3153
      "OP_TARGET": self.cfg.GetClusterName(),
3154
      "NEW_NAME": self.op.name,
3155
      }
3156

    
3157
  def BuildHooksNodes(self):
3158
    """Build hooks nodes.
3159

3160
    """
3161
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
3162

    
3163
  def CheckPrereq(self):
3164
    """Verify that the passed name is a valid one.
3165

3166
    """
3167
    hostname = netutils.GetHostname(name=self.op.name,
3168
                                    family=self.cfg.GetPrimaryIPFamily())
3169

    
3170
    new_name = hostname.name
3171
    self.ip = new_ip = hostname.ip
3172
    old_name = self.cfg.GetClusterName()
3173
    old_ip = self.cfg.GetMasterIP()
3174
    if new_name == old_name and new_ip == old_ip:
3175
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
3176
                                 " cluster has changed",
3177
                                 errors.ECODE_INVAL)
3178
    if new_ip != old_ip:
3179
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
3180
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
3181
                                   " reachable on the network" %
3182
                                   new_ip, errors.ECODE_NOTUNIQUE)
3183

    
3184
    self.op.name = new_name
3185

    
3186
  def Exec(self, feedback_fn):
3187
    """Rename the cluster.
3188

3189
    """
3190
    clustername = self.op.name
3191
    ip = self.ip
3192

    
3193
    # shutdown the master IP
3194
    master = self.cfg.GetMasterNode()
3195
    result = self.rpc.call_node_stop_master(master, False)
3196
    result.Raise("Could not disable the master role")
3197

    
3198
    try:
3199
      cluster = self.cfg.GetClusterInfo()
3200
      cluster.cluster_name = clustername
3201
      cluster.master_ip = ip
3202
      self.cfg.Update(cluster, feedback_fn)
3203

    
3204
      # update the known hosts file
3205
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
3206
      node_list = self.cfg.GetOnlineNodeList()
3207
      try:
3208
        node_list.remove(master)
3209
      except ValueError:
3210
        pass
3211
      _UploadHelper(self, node_list, constants.SSH_KNOWN_HOSTS_FILE)
3212
    finally:
3213
      result = self.rpc.call_node_start_master(master, False, False)
3214
      msg = result.fail_msg
3215
      if msg:
3216
        self.LogWarning("Could not re-enable the master role on"
3217
                        " the master, please restart manually: %s", msg)
3218

    
3219
    return clustername
3220

    
3221

    
3222
class LUClusterSetParams(LogicalUnit):
3223
  """Change the parameters of the cluster.
3224

3225
  """
3226
  HPATH = "cluster-modify"
3227
  HTYPE = constants.HTYPE_CLUSTER
3228
  REQ_BGL = False
3229

    
3230
  def CheckArguments(self):
3231
    """Check parameters
3232

3233
    """
3234
    if self.op.uid_pool:
3235
      uidpool.CheckUidPool(self.op.uid_pool)
3236

    
3237
    if self.op.add_uids:
3238
      uidpool.CheckUidPool(self.op.add_uids)
3239

    
3240
    if self.op.remove_uids:
3241
      uidpool.CheckUidPool(self.op.remove_uids)
3242

    
3243
  def ExpandNames(self):
3244
    # FIXME: in the future maybe other cluster params won't require checking on
3245
    # all nodes to be modified.
3246
    self.needed_locks = {
3247
      locking.LEVEL_NODE: locking.ALL_SET,
3248
    }
3249
    self.share_locks[locking.LEVEL_NODE] = 1
3250

    
3251
  def BuildHooksEnv(self):
3252
    """Build hooks env.
3253

3254
    """
3255
    return {
3256
      "OP_TARGET": self.cfg.GetClusterName(),
3257
      "NEW_VG_NAME": self.op.vg_name,
3258
      }
3259

    
3260
  def BuildHooksNodes(self):
3261
    """Build hooks nodes.
3262

3263
    """
3264
    mn = self.cfg.GetMasterNode()
3265
    return ([mn], [mn])
3266

    
3267
  def CheckPrereq(self):
3268
    """Check prerequisites.
3269

3270
    This checks whether the given params don't conflict and
3271
    if the given volume group is valid.
3272

3273
    """
3274
    if self.op.vg_name is not None and not self.op.vg_name:
3275
      if self.cfg.HasAnyDiskOfType(constants.LD_LV):
3276
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
3277
                                   " instances exist", errors.ECODE_INVAL)
3278

    
3279
    if self.op.drbd_helper is not None and not self.op.drbd_helper:
3280
      if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
3281
        raise errors.OpPrereqError("Cannot disable drbd helper while"
3282
                                   " drbd-based instances exist",
3283
                                   errors.ECODE_INVAL)
3284

    
3285
    node_list = self.glm.list_owned(locking.LEVEL_NODE)
3286

    
3287
    # if vg_name not None, checks given volume group on all nodes
3288
    if self.op.vg_name:
3289
      vglist = self.rpc.call_vg_list(node_list)
3290
      for node in node_list:
3291
        msg = vglist[node].fail_msg
3292
        if msg:
3293
          # ignoring down node
3294
          self.LogWarning("Error while gathering data on node %s"
3295
                          " (ignoring node): %s", node, msg)
3296
          continue
3297
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
3298
                                              self.op.vg_name,
3299
                                              constants.MIN_VG_SIZE)
3300
        if vgstatus:
3301
          raise errors.OpPrereqError("Error on node '%s': %s" %
3302
                                     (node, vgstatus), errors.ECODE_ENVIRON)
3303

    
3304
    if self.op.drbd_helper:
3305
      # checks given drbd helper on all nodes
3306
      helpers = self.rpc.call_drbd_helper(node_list)
3307
      for node in node_list:
3308
        ninfo = self.cfg.GetNodeInfo(node)
3309
        if ninfo.offline:
3310
          self.LogInfo("Not checking drbd helper on offline node %s", node)
3311
          continue
3312
        msg = helpers[node].fail_msg
3313
        if msg:
3314
          raise errors.OpPrereqError("Error checking drbd helper on node"
3315
                                     " '%s': %s" % (node, msg),
3316
                                     errors.ECODE_ENVIRON)
3317
        node_helper = helpers[node].payload
3318
        if node_helper != self.op.drbd_helper:
3319
          raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
3320
                                     (node, node_helper), errors.ECODE_ENVIRON)
3321

    
3322
    self.cluster = cluster = self.cfg.GetClusterInfo()
3323
    # validate params changes
3324
    if self.op.beparams:
3325
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
3326
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
3327

    
3328
    if self.op.ndparams:
3329
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
3330
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
3331

    
3332
      # TODO: we need a more general way to handle resetting
3333
      # cluster-level parameters to default values
3334
      if self.new_ndparams["oob_program"] == "":
3335
        self.new_ndparams["oob_program"] = \
3336
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
3337

    
3338
    if self.op.nicparams:
3339
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
3340
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
3341
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
3342
      nic_errors = []
3343

    
3344
      # check all instances for consistency
3345
      for instance in self.cfg.GetAllInstancesInfo().values():
3346
        for nic_idx, nic in enumerate(instance.nics):
3347
          params_copy = copy.deepcopy(nic.nicparams)
3348
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
3349

    
3350
          # check parameter syntax
3351
          try:
3352
            objects.NIC.CheckParameterSyntax(params_filled)
3353
          except errors.ConfigurationError, err:
3354
            nic_errors.append("Instance %s, nic/%d: %s" %
3355
                              (instance.name, nic_idx, err))
3356

    
3357
          # if we're moving instances to routed, check that they have an ip
3358
          target_mode = params_filled[constants.NIC_MODE]
3359
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
3360
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
3361
                              " address" % (instance.name, nic_idx))
3362
      if nic_errors:
3363
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
3364
                                   "\n".join(nic_errors))
3365

    
3366
    # hypervisor list/parameters
3367
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
3368
    if self.op.hvparams:
3369
      for hv_name, hv_dict in self.op.hvparams.items():
3370
        if hv_name not in self.new_hvparams:
3371
          self.new_hvparams[hv_name] = hv_dict
3372
        else:
3373
          self.new_hvparams[hv_name].update(hv_dict)
3374

    
3375
    # os hypervisor parameters
3376
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
3377
    if self.op.os_hvp:
3378
      for os_name, hvs in self.op.os_hvp.items():
3379
        if os_name not in self.new_os_hvp:
3380
          self.new_os_hvp[os_name] = hvs
3381
        else:
3382
          for hv_name, hv_dict in hvs.items():
3383
            if hv_name not in self.new_os_hvp[os_name]:
3384
              self.new_os_hvp[os_name][hv_name] = hv_dict
3385
            else:
3386
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
3387

    
3388
    # os parameters
3389
    self.new_osp = objects.FillDict(cluster.osparams, {})
3390
    if self.op.osparams:
3391
      for os_name, osp in self.op.osparams.items():
3392
        if os_name not in self.new_osp:
3393
          self.new_osp[os_name] = {}
3394

    
3395
        self.new_osp[os_name] = _GetUpdatedParams(self.new_osp[os_name], osp,
3396
                                                  use_none=True)
3397

    
3398
        if not self.new_osp[os_name]:
3399
          # we removed all parameters
3400
          del self.new_osp[os_name]
3401
        else:
3402
          # check the parameter validity (remote check)
3403
          _CheckOSParams(self, False, [self.cfg.GetMasterNode()],
3404
                         os_name, self.new_osp[os_name])
3405

    
3406
    # changes to the hypervisor list
3407
    if self.op.enabled_hypervisors is not None:
3408
      self.hv_list = self.op.enabled_hypervisors
3409
      for hv in self.hv_list:
3410
        # if the hypervisor doesn't already exist in the cluster
3411
        # hvparams, we initialize it to empty, and then (in both
3412
        # cases) we make sure to fill the defaults, as we might not
3413
        # have a complete defaults list if the hypervisor wasn't
3414
        # enabled before
3415
        if hv not in new_hvp:
3416
          new_hvp[hv] = {}
3417
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
3418
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
3419
    else:
3420
      self.hv_list = cluster.enabled_hypervisors
3421

    
3422
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
3423
      # either the enabled list has changed, or the parameters have, validate
3424
      for hv_name, hv_params in self.new_hvparams.items():
3425
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
3426
            (self.op.enabled_hypervisors and
3427
             hv_name in self.op.enabled_hypervisors)):
3428
          # either this is a new hypervisor, or its parameters have changed
3429
          hv_class = hypervisor.GetHypervisor(hv_name)
3430
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
3431
          hv_class.CheckParameterSyntax(hv_params)
3432
          _CheckHVParams(self, node_list, hv_name, hv_params)
3433

    
3434
    if self.op.os_hvp:
3435
      # no need to check any newly-enabled hypervisors, since the
3436
      # defaults have already been checked in the above code-block
3437
      for os_name, os_hvp in self.new_os_hvp.items():
3438
        for hv_name, hv_params in os_hvp.items():
3439
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
3440
          # we need to fill in the new os_hvp on top of the actual hv_p
3441
          cluster_defaults = self.new_hvparams.get(hv_name, {})
3442
          new_osp = objects.FillDict(cluster_defaults, hv_params)
3443
          hv_class = hypervisor.GetHypervisor(hv_name)
3444
          hv_class.CheckParameterSyntax(new_osp)
3445
          _CheckHVParams(self, node_list, hv_name, new_osp)
3446

    
3447
    if self.op.default_iallocator:
3448
      alloc_script = utils.FindFile(self.op.default_iallocator,
3449
                                    constants.IALLOCATOR_SEARCH_PATH,
3450
                                    os.path.isfile)
3451
      if alloc_script is None:
3452
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
3453
                                   " specified" % self.op.default_iallocator,
3454
                                   errors.ECODE_INVAL)
3455

    
3456
  def Exec(self, feedback_fn):
3457
    """Change the parameters of the cluster.
3458

3459
    """
3460
    if self.op.vg_name is not None:
3461
      new_volume = self.op.vg_name
3462
      if not new_volume:
3463
        new_volume = None
3464
      if new_volume != self.cfg.GetVGName():
3465
        self.cfg.SetVGName(new_volume)
3466
      else:
3467
        feedback_fn("Cluster LVM configuration already in desired"
3468
                    " state, not changing")
3469
    if self.op.drbd_helper is not None:
3470
      new_helper = self.op.drbd_helper
3471
      if not new_helper:
3472
        new_helper = None
3473
      if new_helper != self.cfg.GetDRBDHelper():
3474
        self.cfg.SetDRBDHelper(new_helper)
3475
      else:
3476
        feedback_fn("Cluster DRBD helper already in desired state,"
3477
                    " not changing")
3478
    if self.op.hvparams:
3479
      self.cluster.hvparams = self.new_hvparams
3480
    if self.op.os_hvp:
3481
      self.cluster.os_hvp = self.new_os_hvp
3482
    if self.op.enabled_hypervisors is not None:
3483
      self.cluster.hvparams = self.new_hvparams
3484
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
3485
    if self.op.beparams:
3486
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
3487
    if self.op.nicparams:
3488
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
3489
    if self.op.osparams:
3490
      self.cluster.osparams = self.new_osp
3491
    if self.op.ndparams:
3492
      self.cluster.ndparams = self.new_ndparams
3493

    
3494
    if self.op.candidate_pool_size is not None:
3495
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
3496
      # we need to update the pool size here, otherwise the save will fail
3497
      _AdjustCandidatePool(self, [])
3498

    
3499
    if self.op.maintain_node_health is not None:
3500
      self.cluster.maintain_node_health = self.op.maintain_node_health
3501

    
3502
    if self.op.prealloc_wipe_disks is not None:
3503
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
3504

    
3505
    if self.op.add_uids is not None:
3506
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
3507

    
3508
    if self.op.remove_uids is not None:
3509
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
3510

    
3511
    if self.op.uid_pool is not None:
3512
      self.cluster.uid_pool = self.op.uid_pool
3513

    
3514
    if self.op.default_iallocator is not None:
3515
      self.cluster.default_iallocator = self.op.default_iallocator
3516

    
3517
    if self.op.reserved_lvs is not None:
3518
      self.cluster.reserved_lvs = self.op.reserved_lvs
3519

    
3520
    def helper_os(aname, mods, desc):
3521
      desc += " OS list"
3522
      lst = getattr(self.cluster, aname)
3523
      for key, val in mods:
3524
        if key == constants.DDM_ADD:
3525
          if val in lst:
3526
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
3527
          else:
3528
            lst.append(val)
3529
        elif key == constants.DDM_REMOVE:
3530
          if val in lst:
3531
            lst.remove(val)
3532
          else:
3533
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
3534
        else:
3535
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
3536

    
3537
    if self.op.hidden_os:
3538
      helper_os("hidden_os", self.op.hidden_os, "hidden")
3539

    
3540
    if self.op.blacklisted_os:
3541
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
3542

    
3543
    if self.op.master_netdev:
3544
      master = self.cfg.GetMasterNode()
3545
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
3546
                  self.cluster.master_netdev)
3547
      result = self.rpc.call_node_stop_master(master, False)
3548
      result.Raise("Could not disable the master ip")
3549
      feedback_fn("Changing master_netdev from %s to %s" %
3550
                  (self.cluster.master_netdev, self.op.master_netdev))
3551
      self.cluster.master_netdev = self.op.master_netdev
3552

    
3553
    self.cfg.Update(self.cluster, feedback_fn)
3554

    
3555
    if self.op.master_netdev:
3556
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
3557
                  self.op.master_netdev)
3558
      result = self.rpc.call_node_start_master(master, False, False)
3559
      if result.fail_msg:
3560
        self.LogWarning("Could not re-enable the master ip on"
3561
                        " the master, please restart manually: %s",
3562
                        result.fail_msg)
3563

    
3564

    
3565
def _UploadHelper(lu, nodes, fname):
3566
  """Helper for uploading a file and showing warnings.
3567

3568
  """
3569
  if os.path.exists(fname):
3570
    result = lu.rpc.call_upload_file(nodes, fname)
3571
    for to_node, to_result in result.items():
3572
      msg = to_result.fail_msg
3573
      if msg:
3574
        msg = ("Copy of file %s to node %s failed: %s" %
3575
               (fname, to_node, msg))
3576
        lu.proc.LogWarning(msg)
3577

    
3578

    
3579
def _ComputeAncillaryFiles(cluster, redist):
3580
  """Compute files external to Ganeti which need to be consistent.
3581

3582
  @type redist: boolean
3583
  @param redist: Whether to include files which need to be redistributed
3584

3585
  """
3586
  # Compute files for all nodes
3587
  files_all = set([
3588
    constants.SSH_KNOWN_HOSTS_FILE,
3589
    constants.CONFD_HMAC_KEY,
3590
    constants.CLUSTER_DOMAIN_SECRET_FILE,
3591
    ])
3592

    
3593
  if not redist:
3594
    files_all.update(constants.ALL_CERT_FILES)
3595
    files_all.update(ssconf.SimpleStore().GetFileList())
3596

    
3597
  if cluster.modify_etc_hosts:
3598
    files_all.add(constants.ETC_HOSTS)
3599

    
3600
  # Files which must either exist on all nodes or on none
3601
  files_all_opt = set([
3602
    constants.RAPI_USERS_FILE,
3603
    ])
3604

    
3605
  # Files which should only be on master candidates
3606
  files_mc = set()
3607
  if not redist:
3608
    files_mc.add(constants.CLUSTER_CONF_FILE)
3609

    
3610
  # Files which should only be on VM-capable nodes
3611
  files_vm = set(filename
3612
    for hv_name in cluster.enabled_hypervisors
3613
    for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles())
3614

    
3615
  # Filenames must be unique
3616
  assert (len(files_all | files_all_opt | files_mc | files_vm) ==
3617
          sum(map(len, [files_all, files_all_opt, files_mc, files_vm]))), \
3618
         "Found file listed in more than one file list"
3619

    
3620
  return (files_all, files_all_opt, files_mc, files_vm)
3621

    
3622

    
3623
def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
3624
  """Distribute additional files which are part of the cluster configuration.
3625

3626
  ConfigWriter takes care of distributing the config and ssconf files, but
3627
  there are more files which should be distributed to all nodes. This function
3628
  makes sure those are copied.
3629

3630
  @param lu: calling logical unit
3631
  @param additional_nodes: list of nodes not in the config to distribute to
3632
  @type additional_vm: boolean
3633
  @param additional_vm: whether the additional nodes are vm-capable or not
3634

3635
  """
3636
  # Gather target nodes
3637
  cluster = lu.cfg.GetClusterInfo()
3638
  master_info = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
3639

    
3640
  online_nodes = lu.cfg.GetOnlineNodeList()
3641
  vm_nodes = lu.cfg.GetVmCapableNodeList()
3642

    
3643
  if additional_nodes is not None:
3644
    online_nodes.extend(additional_nodes)
3645
    if additional_vm:
3646
      vm_nodes.extend(additional_nodes)
3647

    
3648
  # Never distribute to master node
3649
  for nodelist in [online_nodes, vm_nodes]:
3650
    if master_info.name in nodelist:
3651
      nodelist.remove(master_info.name)
3652

    
3653
  # Gather file lists
3654
  (files_all, files_all_opt, files_mc, files_vm) = \
3655
    _ComputeAncillaryFiles(cluster, True)
3656

    
3657
  # Never re-distribute configuration file from here
3658
  assert not (constants.CLUSTER_CONF_FILE in files_all or
3659
              constants.CLUSTER_CONF_FILE in files_vm)
3660
  assert not files_mc, "Master candidates not handled in this function"
3661

    
3662
  filemap = [
3663
    (online_nodes, files_all),
3664
    (online_nodes, files_all_opt),
3665
    (vm_nodes, files_vm),
3666
    ]
3667

    
3668
  # Upload the files
3669
  for (node_list, files) in filemap:
3670
    for fname in files:
3671
      _UploadHelper(lu, node_list, fname)
3672

    
3673

    
3674
class LUClusterRedistConf(NoHooksLU):
3675
  """Force the redistribution of cluster configuration.
3676

3677
  This is a very simple LU.
3678

3679
  """
3680
  REQ_BGL = False
3681

    
3682
  def ExpandNames(self):
3683
    self.needed_locks = {
3684
      locking.LEVEL_NODE: locking.ALL_SET,
3685
    }
3686
    self.share_locks[locking.LEVEL_NODE] = 1
3687

    
3688
  def Exec(self, feedback_fn):
3689
    """Redistribute the configuration.
3690

3691
    """
3692
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
3693
    _RedistributeAncillaryFiles(self)
3694

    
3695

    
3696
def _WaitForSync(lu, instance, disks=None, oneshot=False):
3697
  """Sleep and poll for an instance's disk to sync.
3698

3699
  """
3700
  if not instance.disks or disks is not None and not disks:
3701
    return True
3702

    
3703
  disks = _ExpandCheckDisks(instance, disks)
3704

    
3705
  if not oneshot:
3706
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
3707

    
3708
  node = instance.primary_node
3709

    
3710
  for dev in disks:
3711
    lu.cfg.SetDiskID(dev, node)
3712

    
3713
  # TODO: Convert to utils.Retry
3714

    
3715
  retries = 0
3716
  degr_retries = 10 # in seconds, as we sleep 1 second each time
3717
  while True:
3718
    max_time = 0
3719
    done = True
3720
    cumul_degraded = False
3721
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, disks)
3722
    msg = rstats.fail_msg
3723
    if msg:
3724
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
3725
      retries += 1
3726
      if retries >= 10:
3727
        raise errors.RemoteError("Can't contact node %s for mirror data,"
3728
                                 " aborting." % node)
3729
      time.sleep(6)
3730
      continue
3731
    rstats = rstats.payload
3732
    retries = 0
3733
    for i, mstat in enumerate(rstats):
3734
      if mstat is None:
3735
        lu.LogWarning("Can't compute data for node %s/%s",
3736
                           node, disks[i].iv_name)
3737
        continue
3738

    
3739
      cumul_degraded = (cumul_degraded or
3740
                        (mstat.is_degraded and mstat.sync_percent is None))
3741
      if mstat.sync_percent is not None:
3742
        done = False
3743
        if mstat.estimated_time is not None:
3744
          rem_time = ("%s remaining (estimated)" %
3745
                      utils.FormatSeconds(mstat.estimated_time))
3746
          max_time = mstat.estimated_time
3747
        else:
3748
          rem_time = "no time estimate"
3749
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
3750
                        (disks[i].iv_name, mstat.sync_percent, rem_time))
3751

    
3752
    # if we're done but degraded, let's do a few small retries, to
3753
    # make sure we see a stable and not transient situation; therefore
3754
    # we force restart of the loop
3755
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
3756
      logging.info("Degraded disks found, %d retries left", degr_retries)
3757
      degr_retries -= 1
3758
      time.sleep(1)
3759
      continue
3760

    
3761
    if done or oneshot:
3762
      break
3763

    
3764
    time.sleep(min(60, max_time))
3765

    
3766
  if done:
3767
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
3768
  return not cumul_degraded
3769

    
3770

    
3771
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
3772
  """Check that mirrors are not degraded.
3773

3774
  The ldisk parameter, if True, will change the test from the
3775
  is_degraded attribute (which represents overall non-ok status for
3776
  the device(s)) to the ldisk (representing the local storage status).
3777

3778
  """
3779
  lu.cfg.SetDiskID(dev, node)
3780

    
3781
  result = True
3782

    
3783
  if on_primary or dev.AssembleOnSecondary():
3784
    rstats = lu.rpc.call_blockdev_find(node, dev)
3785
    msg = rstats.fail_msg
3786
    if msg:
3787
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
3788
      result = False
3789
    elif not rstats.payload:
3790
      lu.LogWarning("Can't find disk on node %s", node)
3791
      result = False
3792
    else:
3793
      if ldisk:
3794
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
3795
      else:
3796
        result = result and not rstats.payload.is_degraded
3797

    
3798
  if dev.children:
3799
    for child in dev.children:
3800
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
3801

    
3802
  return result
3803

    
3804

    
3805
class LUOobCommand(NoHooksLU):
3806
  """Logical unit for OOB handling.
3807

3808
  """
3809
  REG_BGL = False
3810
  _SKIP_MASTER = (constants.OOB_POWER_OFF, constants.OOB_POWER_CYCLE)
3811

    
3812
  def ExpandNames(self):
3813
    """Gather locks we need.
3814

3815
    """
3816
    if self.op.node_names:
3817
      self.op.node_names = _GetWantedNodes(self, self.op.node_names)
3818
      lock_names = self.op.node_names
3819
    else:
3820
      lock_names = locking.ALL_SET
3821

    
3822
    self.needed_locks = {
3823
      locking.LEVEL_NODE: lock_names,
3824
      }
3825

    
3826
  def CheckPrereq(self):
3827
    """Check prerequisites.
3828

3829
    This checks:
3830
     - the node exists in the configuration
3831
     - OOB is supported
3832

3833
    Any errors are signaled by raising errors.OpPrereqError.
3834

3835
    """
3836
    self.nodes = []
3837
    self.master_node = self.cfg.GetMasterNode()
3838

    
3839
    assert self.op.power_delay >= 0.0
3840

    
3841
    if self.op.node_names:
3842
      if (self.op.command in self._SKIP_MASTER and
3843
          self.master_node in self.op.node_names):
3844
        master_node_obj = self.cfg.GetNodeInfo(self.master_node)
3845
        master_oob_handler = _SupportsOob(self.cfg, master_node_obj)
3846

    
3847
        if master_oob_handler:
3848
          additional_text = ("run '%s %s %s' if you want to operate on the"
3849
                             " master regardless") % (master_oob_handler,
3850
                                                      self.op.command,
3851
                                                      self.master_node)
3852
        else:
3853
          additional_text = "it does not support out-of-band operations"
3854

    
3855
        raise errors.OpPrereqError(("Operating on the master node %s is not"
3856
                                    " allowed for %s; %s") %
3857
                                   (self.master_node, self.op.command,
3858
                                    additional_text), errors.ECODE_INVAL)
3859
    else:
3860
      self.op.node_names = self.cfg.GetNodeList()
3861
      if self.op.command in self._SKIP_MASTER:
3862
        self.op.node_names.remove(self.master_node)
3863

    
3864
    if self.op.command in self._SKIP_MASTER:
3865
      assert self.master_node not in self.op.node_names
3866

    
3867
    for node_name in self.op.node_names:
3868
      node = self.cfg.GetNodeInfo(node_name)
3869

    
3870
      if node is None:
3871
        raise errors.OpPrereqError("Node %s not found" % node_name,
3872
                                   errors.ECODE_NOENT)
3873
      else:
3874
        self.nodes.append(node)
3875

    
3876
      if (not self.op.ignore_status and
3877
          (self.op.command == constants.OOB_POWER_OFF and not node.offline)):
3878
        raise errors.OpPrereqError(("Cannot power off node %s because it is"
3879
                                    " not marked offline") % node_name,
3880
                                   errors.ECODE_STATE)
3881

    
3882
  def Exec(self, feedback_fn):
3883
    """Execute OOB and return result if we expect any.
3884

3885
    """
3886
    master_node = self.master_node
3887
    ret = []
3888

    
3889
    for idx, node in enumerate(utils.NiceSort(self.nodes,
3890
                                              key=lambda node: node.name)):
3891
      node_entry = [(constants.RS_NORMAL, node.name)]
3892
      ret.append(node_entry)
3893

    
3894
      oob_program = _SupportsOob(self.cfg, node)
3895

    
3896
      if not oob_program:
3897
        node_entry.append((constants.RS_UNAVAIL, None))
3898
        continue
3899

    
3900
      logging.info("Executing out-of-band command '%s' using '%s' on %s",
3901
                   self.op.command, oob_program, node.name)
3902
      result = self.rpc.call_run_oob(master_node, oob_program,
3903
                                     self.op.command, node.name,
3904
                                     self.op.timeout)
3905

    
3906
      if result.fail_msg:
3907
        self.LogWarning("Out-of-band RPC failed on node '%s': %s",
3908
                        node.name, result.fail_msg)
3909
        node_entry.append((constants.RS_NODATA, None))
3910
      else:
3911
        try:
3912
          self._CheckPayload(result)
3913
        except errors.OpExecError, err:
3914
          self.LogWarning("Payload returned by node '%s' is not valid: %s",
3915
                          node.name, err)
3916
          node_entry.append((constants.RS_NODATA, None))
3917
        else:
3918
          if self.op.command == constants.OOB_HEALTH:
3919
            # For health we should log important events
3920
            for item, status in result.payload:
3921
              if status in [constants.OOB_STATUS_WARNING,
3922
                            constants.OOB_STATUS_CRITICAL]:
3923
                self.LogWarning("Item '%s' on node '%s' has status '%s'",
3924
                                item, node.name, status)
3925

    
3926
          if self.op.command == constants.OOB_POWER_ON:
3927
            node.powered = True
3928
          elif self.op.command == constants.OOB_POWER_OFF:
3929
            node.powered = False
3930
          elif self.op.command == constants.OOB_POWER_STATUS:
3931
            powered = result.payload[constants.OOB_POWER_STATUS_POWERED]
3932
            if powered != node.powered:
3933
              logging.warning(("Recorded power state (%s) of node '%s' does not"
3934
                               " match actual power state (%s)"), node.powered,
3935
                              node.name, powered)
3936

    
3937
          # For configuration changing commands we should update the node
3938
          if self.op.command in (constants.OOB_POWER_ON,
3939
                                 constants.OOB_POWER_OFF):
3940
            self.cfg.Update(node, feedback_fn)
3941

    
3942
          node_entry.append((constants.RS_NORMAL, result.payload))
3943

    
3944
          if (self.op.command == constants.OOB_POWER_ON and
3945
              idx < len(self.nodes) - 1):
3946
            time.sleep(self.op.power_delay)
3947

    
3948
    return ret
3949

    
3950
  def _CheckPayload(self, result):
3951
    """Checks if the payload is valid.
3952

3953
    @param result: RPC result
3954
    @raises errors.OpExecError: If payload is not valid
3955

3956
    """
3957
    errs = []
3958
    if self.op.command == constants.OOB_HEALTH:
3959
      if not isinstance(result.payload, list):
3960
        errs.append("command 'health' is expected to return a list but got %s" %
3961
                    type(result.payload))
3962
      else:
3963
        for item, status in result.payload:
3964
          if status not in constants.OOB_STATUSES:
3965
            errs.append("health item '%s' has invalid status '%s'" %
3966
                        (item, status))
3967

    
3968
    if self.op.command == constants.OOB_POWER_STATUS:
3969
      if not isinstance(result.payload, dict):
3970
        errs.append("power-status is expected to return a dict but got %s" %
3971
                    type(result.payload))
3972

    
3973
    if self.op.command in [
3974
        constants.OOB_POWER_ON,
3975
        constants.OOB_POWER_OFF,
3976
        constants.OOB_POWER_CYCLE,
3977
        ]:
3978
      if result.payload is not None:
3979
        errs.append("%s is expected to not return payload but got '%s'" %
3980
                    (self.op.command, result.payload))
3981

    
3982
    if errs:
3983
      raise errors.OpExecError("Check of out-of-band payload failed due to %s" %
3984
                               utils.CommaJoin(errs))
3985

    
3986
class _OsQuery(_QueryBase):
3987
  FIELDS = query.OS_FIELDS
3988

    
3989
  def ExpandNames(self, lu):
3990
    # Lock all nodes in shared mode
3991
    # Temporary removal of locks, should be reverted later
3992
    # TODO: reintroduce locks when they are lighter-weight
3993
    lu.needed_locks = {}
3994
    #self.share_locks[locking.LEVEL_NODE] = 1
3995
    #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3996

    
3997
    # The following variables interact with _QueryBase._GetNames
3998
    if self.names:
3999
      self.wanted = self.names
4000
    else:
4001
      self.wanted = locking.ALL_SET
4002

    
4003
    self.do_locking = self.use_locking
4004

    
4005
  def DeclareLocks(self, lu, level):
4006
    pass
4007

    
4008
  @staticmethod
4009
  def _DiagnoseByOS(rlist):
4010
    """Remaps a per-node return list into an a per-os per-node dictionary
4011

4012
    @param rlist: a map with node names as keys and OS objects as values
4013

4014
    @rtype: dict
4015
    @return: a dictionary with osnames as keys and as value another
4016
        map, with nodes as keys and tuples of (path, status, diagnose,
4017
        variants, parameters, api_versions) as values, eg::
4018

4019
          {"debian-etch": {"node1": [(/usr/lib/..., True, "", [], []),
4020
                                     (/srv/..., False, "invalid api")],
4021
                           "node2": [(/srv/..., True, "", [], [])]}
4022
          }
4023

4024
    """
4025
    all_os = {}
4026
    # we build here the list of nodes that didn't fail the RPC (at RPC
4027
    # level), so that nodes with a non-responding node daemon don't
4028
    # make all OSes invalid
4029
    good_nodes = [node_name for node_name in rlist
4030
                  if not rlist[node_name].fail_msg]
4031
    for node_name, nr in rlist.items():
4032
      if nr.fail_msg or not nr.payload:
4033
        continue
4034
      for (name, path, status, diagnose, variants,
4035
           params, api_versions) in nr.payload:
4036
        if name not in all_os:
4037
          # build a list of nodes for this os containing empty lists
4038
          # for each node in node_list
4039
          all_os[name] = {}
4040
          for nname in good_nodes:
4041
            all_os[name][nname] = []
4042
        # convert params from [name, help] to (name, help)
4043
        params = [tuple(v) for v in params]
4044
        all_os[name][node_name].append((path, status, diagnose,
4045
                                        variants, params, api_versions))
4046
    return all_os
4047

    
4048
  def _GetQueryData(self, lu):
4049
    """Computes the list of nodes and their attributes.
4050

4051
    """
4052
    # Locking is not used
4053
    assert not (compat.any(lu.glm.is_owned(level)
4054
                           for level in locking.LEVELS
4055
                           if level != locking.LEVEL_CLUSTER) or
4056
                self.do_locking or self.use_locking)
4057

    
4058
    valid_nodes = [node.name
4059
                   for node in lu.cfg.GetAllNodesInfo().values()
4060
                   if not node.offline and node.vm_capable]
4061
    pol = self._DiagnoseByOS(lu.rpc.call_os_diagnose(valid_nodes))
4062
    cluster = lu.cfg.GetClusterInfo()
4063

    
4064
    data = {}
4065

    
4066
    for (os_name, os_data) in pol.items():
4067
      info = query.OsInfo(name=os_name, valid=True, node_status=os_data,
4068
                          hidden=(os_name in