Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ f39b695a

History | View | Annotate | Download (465 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0201,C0302
25

    
26
# W0201 since most LU attributes are defined in CheckPrereq or similar
27
# functions
28

    
29
# C0302: since we have waaaay to many lines in this module
30

    
31
import os
32
import os.path
33
import time
34
import re
35
import platform
36
import logging
37
import copy
38
import OpenSSL
39
import socket
40
import tempfile
41
import shutil
42
import itertools
43
import operator
44

    
45
from ganeti import ssh
46
from ganeti import utils
47
from ganeti import errors
48
from ganeti import hypervisor
49
from ganeti import locking
50
from ganeti import constants
51
from ganeti import objects
52
from ganeti import serializer
53
from ganeti import ssconf
54
from ganeti import uidpool
55
from ganeti import compat
56
from ganeti import masterd
57
from ganeti import netutils
58
from ganeti import query
59
from ganeti import qlang
60
from ganeti import opcodes
61
from ganeti import ht
62

    
63
import ganeti.masterd.instance # pylint: disable-msg=W0611
64

    
65

    
66
class ResultWithJobs:
67
  """Data container for LU results with jobs.
68

69
  Instances of this class returned from L{LogicalUnit.Exec} will be recognized
70
  by L{mcpu.Processor._ProcessResult}. The latter will then submit the jobs
71
  contained in the C{jobs} attribute and include the job IDs in the opcode
72
  result.
73

74
  """
75
  def __init__(self, jobs, **kwargs):
76
    """Initializes this class.
77

78
    Additional return values can be specified as keyword arguments.
79

80
    @type jobs: list of lists of L{opcode.OpCode}
81
    @param jobs: A list of lists of opcode objects
82

83
    """
84
    self.jobs = jobs
85
    self.other = kwargs
86

    
87

    
88
class LogicalUnit(object):
89
  """Logical Unit base class.
90

91
  Subclasses must follow these rules:
92
    - implement ExpandNames
93
    - implement CheckPrereq (except when tasklets are used)
94
    - implement Exec (except when tasklets are used)
95
    - implement BuildHooksEnv
96
    - implement BuildHooksNodes
97
    - redefine HPATH and HTYPE
98
    - optionally redefine their run requirements:
99
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
100

101
  Note that all commands require root permissions.
102

103
  @ivar dry_run_result: the value (if any) that will be returned to the caller
104
      in dry-run mode (signalled by opcode dry_run parameter)
105

106
  """
107
  HPATH = None
108
  HTYPE = None
109
  REQ_BGL = True
110

    
111
  def __init__(self, processor, op, context, rpc):
112
    """Constructor for LogicalUnit.
113

114
    This needs to be overridden in derived classes in order to check op
115
    validity.
116

117
    """
118
    self.proc = processor
119
    self.op = op
120
    self.cfg = context.cfg
121
    self.glm = context.glm
122
    self.context = context
123
    self.rpc = rpc
124
    # Dicts used to declare locking needs to mcpu
125
    self.needed_locks = None
126
    self.share_locks = dict.fromkeys(locking.LEVELS, 0)
127
    self.add_locks = {}
128
    self.remove_locks = {}
129
    # Used to force good behavior when calling helper functions
130
    self.recalculate_locks = {}
131
    # logging
132
    self.Log = processor.Log # pylint: disable-msg=C0103
133
    self.LogWarning = processor.LogWarning # pylint: disable-msg=C0103
134
    self.LogInfo = processor.LogInfo # pylint: disable-msg=C0103
135
    self.LogStep = processor.LogStep # pylint: disable-msg=C0103
136
    # support for dry-run
137
    self.dry_run_result = None
138
    # support for generic debug attribute
139
    if (not hasattr(self.op, "debug_level") or
140
        not isinstance(self.op.debug_level, int)):
141
      self.op.debug_level = 0
142

    
143
    # Tasklets
144
    self.tasklets = None
145

    
146
    # Validate opcode parameters and set defaults
147
    self.op.Validate(True)
148

    
149
    self.CheckArguments()
150

    
151
  def CheckArguments(self):
152
    """Check syntactic validity for the opcode arguments.
153

154
    This method is for doing a simple syntactic check and ensure
155
    validity of opcode parameters, without any cluster-related
156
    checks. While the same can be accomplished in ExpandNames and/or
157
    CheckPrereq, doing these separate is better because:
158

159
      - ExpandNames is left as as purely a lock-related function
160
      - CheckPrereq is run after we have acquired locks (and possible
161
        waited for them)
162

163
    The function is allowed to change the self.op attribute so that
164
    later methods can no longer worry about missing parameters.
165

166
    """
167
    pass
168

    
169
  def ExpandNames(self):
170
    """Expand names for this LU.
171

172
    This method is called before starting to execute the opcode, and it should
173
    update all the parameters of the opcode to their canonical form (e.g. a
174
    short node name must be fully expanded after this method has successfully
175
    completed). This way locking, hooks, logging, etc. can work correctly.
176

177
    LUs which implement this method must also populate the self.needed_locks
178
    member, as a dict with lock levels as keys, and a list of needed lock names
179
    as values. Rules:
180

181
      - use an empty dict if you don't need any lock
182
      - if you don't need any lock at a particular level omit that level
183
      - don't put anything for the BGL level
184
      - if you want all locks at a level use locking.ALL_SET as a value
185

186
    If you need to share locks (rather than acquire them exclusively) at one
187
    level you can modify self.share_locks, setting a true value (usually 1) for
188
    that level. By default locks are not shared.
189

190
    This function can also define a list of tasklets, which then will be
191
    executed in order instead of the usual LU-level CheckPrereq and Exec
192
    functions, if those are not defined by the LU.
193

194
    Examples::
195

196
      # Acquire all nodes and one instance
197
      self.needed_locks = {
198
        locking.LEVEL_NODE: locking.ALL_SET,
199
        locking.LEVEL_INSTANCE: ['instance1.example.com'],
200
      }
201
      # Acquire just two nodes
202
      self.needed_locks = {
203
        locking.LEVEL_NODE: ['node1.example.com', 'node2.example.com'],
204
      }
205
      # Acquire no locks
206
      self.needed_locks = {} # No, you can't leave it to the default value None
207

208
    """
209
    # The implementation of this method is mandatory only if the new LU is
210
    # concurrent, so that old LUs don't need to be changed all at the same
211
    # time.
212
    if self.REQ_BGL:
213
      self.needed_locks = {} # Exclusive LUs don't need locks.
214
    else:
215
      raise NotImplementedError
216

    
217
  def DeclareLocks(self, level):
218
    """Declare LU locking needs for a level
219

220
    While most LUs can just declare their locking needs at ExpandNames time,
221
    sometimes there's the need to calculate some locks after having acquired
222
    the ones before. This function is called just before acquiring locks at a
223
    particular level, but after acquiring the ones at lower levels, and permits
224
    such calculations. It can be used to modify self.needed_locks, and by
225
    default it does nothing.
226

227
    This function is only called if you have something already set in
228
    self.needed_locks for the level.
229

230
    @param level: Locking level which is going to be locked
231
    @type level: member of ganeti.locking.LEVELS
232

233
    """
234

    
235
  def CheckPrereq(self):
236
    """Check prerequisites for this LU.
237

238
    This method should check that the prerequisites for the execution
239
    of this LU are fulfilled. It can do internode communication, but
240
    it should be idempotent - no cluster or system changes are
241
    allowed.
242

243
    The method should raise errors.OpPrereqError in case something is
244
    not fulfilled. Its return value is ignored.
245

246
    This method should also update all the parameters of the opcode to
247
    their canonical form if it hasn't been done by ExpandNames before.
248

249
    """
250
    if self.tasklets is not None:
251
      for (idx, tl) in enumerate(self.tasklets):
252
        logging.debug("Checking prerequisites for tasklet %s/%s",
253
                      idx + 1, len(self.tasklets))
254
        tl.CheckPrereq()
255
    else:
256
      pass
257

    
258
  def Exec(self, feedback_fn):
259
    """Execute the LU.
260

261
    This method should implement the actual work. It should raise
262
    errors.OpExecError for failures that are somewhat dealt with in
263
    code, or expected.
264

265
    """
266
    if self.tasklets is not None:
267
      for (idx, tl) in enumerate(self.tasklets):
268
        logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
269
        tl.Exec(feedback_fn)
270
    else:
271
      raise NotImplementedError
272

    
273
  def BuildHooksEnv(self):
274
    """Build hooks environment for this LU.
275

276
    @rtype: dict
277
    @return: Dictionary containing the environment that will be used for
278
      running the hooks for this LU. The keys of the dict must not be prefixed
279
      with "GANETI_"--that'll be added by the hooks runner. The hooks runner
280
      will extend the environment with additional variables. If no environment
281
      should be defined, an empty dictionary should be returned (not C{None}).
282
    @note: If the C{HPATH} attribute of the LU class is C{None}, this function
283
      will not be called.
284

285
    """
286
    raise NotImplementedError
287

    
288
  def BuildHooksNodes(self):
289
    """Build list of nodes to run LU's hooks.
290

291
    @rtype: tuple; (list, list)
292
    @return: Tuple containing a list of node names on which the hook
293
      should run before the execution and a list of node names on which the
294
      hook should run after the execution. No nodes should be returned as an
295
      empty list (and not None).
296
    @note: If the C{HPATH} attribute of the LU class is C{None}, this function
297
      will not be called.
298

299
    """
300
    raise NotImplementedError
301

    
302
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
303
    """Notify the LU about the results of its hooks.
304

305
    This method is called every time a hooks phase is executed, and notifies
306
    the Logical Unit about the hooks' result. The LU can then use it to alter
307
    its result based on the hooks.  By default the method does nothing and the
308
    previous result is passed back unchanged but any LU can define it if it
309
    wants to use the local cluster hook-scripts somehow.
310

311
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
312
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
313
    @param hook_results: the results of the multi-node hooks rpc call
314
    @param feedback_fn: function used send feedback back to the caller
315
    @param lu_result: the previous Exec result this LU had, or None
316
        in the PRE phase
317
    @return: the new Exec result, based on the previous result
318
        and hook results
319

320
    """
321
    # API must be kept, thus we ignore the unused argument and could
322
    # be a function warnings
323
    # pylint: disable-msg=W0613,R0201
324
    return lu_result
325

    
326
  def _ExpandAndLockInstance(self):
327
    """Helper function to expand and lock an instance.
328

329
    Many LUs that work on an instance take its name in self.op.instance_name
330
    and need to expand it and then declare the expanded name for locking. This
331
    function does it, and then updates self.op.instance_name to the expanded
332
    name. It also initializes needed_locks as a dict, if this hasn't been done
333
    before.
334

335
    """
336
    if self.needed_locks is None:
337
      self.needed_locks = {}
338
    else:
339
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
340
        "_ExpandAndLockInstance called with instance-level locks set"
341
    self.op.instance_name = _ExpandInstanceName(self.cfg,
342
                                                self.op.instance_name)
343
    self.needed_locks[locking.LEVEL_INSTANCE] = self.op.instance_name
344

    
345
  def _LockInstancesNodes(self, primary_only=False):
346
    """Helper function to declare instances' nodes for locking.
347

348
    This function should be called after locking one or more instances to lock
349
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
350
    with all primary or secondary nodes for instances already locked and
351
    present in self.needed_locks[locking.LEVEL_INSTANCE].
352

353
    It should be called from DeclareLocks, and for safety only works if
354
    self.recalculate_locks[locking.LEVEL_NODE] is set.
355

356
    In the future it may grow parameters to just lock some instance's nodes, or
357
    to just lock primaries or secondary nodes, if needed.
358

359
    If should be called in DeclareLocks in a way similar to::
360

361
      if level == locking.LEVEL_NODE:
362
        self._LockInstancesNodes()
363

364
    @type primary_only: boolean
365
    @param primary_only: only lock primary nodes of locked instances
366

367
    """
368
    assert locking.LEVEL_NODE in self.recalculate_locks, \
369
      "_LockInstancesNodes helper function called with no nodes to recalculate"
370

    
371
    # TODO: check if we're really been called with the instance locks held
372

    
373
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
374
    # future we might want to have different behaviors depending on the value
375
    # of self.recalculate_locks[locking.LEVEL_NODE]
376
    wanted_nodes = []
377
    for instance_name in self.glm.list_owned(locking.LEVEL_INSTANCE):
378
      instance = self.context.cfg.GetInstanceInfo(instance_name)
379
      wanted_nodes.append(instance.primary_node)
380
      if not primary_only:
381
        wanted_nodes.extend(instance.secondary_nodes)
382

    
383
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
384
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
385
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
386
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
387

    
388
    del self.recalculate_locks[locking.LEVEL_NODE]
389

    
390

    
391
class NoHooksLU(LogicalUnit): # pylint: disable-msg=W0223
392
  """Simple LU which runs no hooks.
393

394
  This LU is intended as a parent for other LogicalUnits which will
395
  run no hooks, in order to reduce duplicate code.
396

397
  """
398
  HPATH = None
399
  HTYPE = None
400

    
401
  def BuildHooksEnv(self):
402
    """Empty BuildHooksEnv for NoHooksLu.
403

404
    This just raises an error.
405

406
    """
407
    raise AssertionError("BuildHooksEnv called for NoHooksLUs")
408

    
409
  def BuildHooksNodes(self):
410
    """Empty BuildHooksNodes for NoHooksLU.
411

412
    """
413
    raise AssertionError("BuildHooksNodes called for NoHooksLU")
414

    
415

    
416
class Tasklet:
417
  """Tasklet base class.
418

419
  Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
420
  they can mix legacy code with tasklets. Locking needs to be done in the LU,
421
  tasklets know nothing about locks.
422

423
  Subclasses must follow these rules:
424
    - Implement CheckPrereq
425
    - Implement Exec
426

427
  """
428
  def __init__(self, lu):
429
    self.lu = lu
430

    
431
    # Shortcuts
432
    self.cfg = lu.cfg
433
    self.rpc = lu.rpc
434

    
435
  def CheckPrereq(self):
436
    """Check prerequisites for this tasklets.
437

438
    This method should check whether the prerequisites for the execution of
439
    this tasklet are fulfilled. It can do internode communication, but it
440
    should be idempotent - no cluster or system changes are allowed.
441

442
    The method should raise errors.OpPrereqError in case something is not
443
    fulfilled. Its return value is ignored.
444

445
    This method should also update all parameters to their canonical form if it
446
    hasn't been done before.
447

448
    """
449
    pass
450

    
451
  def Exec(self, feedback_fn):
452
    """Execute the tasklet.
453

454
    This method should implement the actual work. It should raise
455
    errors.OpExecError for failures that are somewhat dealt with in code, or
456
    expected.
457

458
    """
459
    raise NotImplementedError
460

    
461

    
462
class _QueryBase:
463
  """Base for query utility classes.
464

465
  """
466
  #: Attribute holding field definitions
467
  FIELDS = None
468

    
469
  def __init__(self, filter_, fields, use_locking):
470
    """Initializes this class.
471

472
    """
473
    self.use_locking = use_locking
474

    
475
    self.query = query.Query(self.FIELDS, fields, filter_=filter_,
476
                             namefield="name")
477
    self.requested_data = self.query.RequestedData()
478
    self.names = self.query.RequestedNames()
479

    
480
    # Sort only if no names were requested
481
    self.sort_by_name = not self.names
482

    
483
    self.do_locking = None
484
    self.wanted = None
485

    
486
  def _GetNames(self, lu, all_names, lock_level):
487
    """Helper function to determine names asked for in the query.
488

489
    """
490
    if self.do_locking:
491
      names = lu.glm.list_owned(lock_level)
492
    else:
493
      names = all_names
494

    
495
    if self.wanted == locking.ALL_SET:
496
      assert not self.names
497
      # caller didn't specify names, so ordering is not important
498
      return utils.NiceSort(names)
499

    
500
    # caller specified names and we must keep the same order
501
    assert self.names
502
    assert not self.do_locking or lu.glm.is_owned(lock_level)
503

    
504
    missing = set(self.wanted).difference(names)
505
    if missing:
506
      raise errors.OpExecError("Some items were removed before retrieving"
507
                               " their data: %s" % missing)
508

    
509
    # Return expanded names
510
    return self.wanted
511

    
512
  def ExpandNames(self, lu):
513
    """Expand names for this query.
514

515
    See L{LogicalUnit.ExpandNames}.
516

517
    """
518
    raise NotImplementedError()
519

    
520
  def DeclareLocks(self, lu, level):
521
    """Declare locks for this query.
522

523
    See L{LogicalUnit.DeclareLocks}.
524

525
    """
526
    raise NotImplementedError()
527

    
528
  def _GetQueryData(self, lu):
529
    """Collects all data for this query.
530

531
    @return: Query data object
532

533
    """
534
    raise NotImplementedError()
535

    
536
  def NewStyleQuery(self, lu):
537
    """Collect data and execute query.
538

539
    """
540
    return query.GetQueryResponse(self.query, self._GetQueryData(lu),
541
                                  sort_by_name=self.sort_by_name)
542

    
543
  def OldStyleQuery(self, lu):
544
    """Collect data and execute query.
545

546
    """
547
    return self.query.OldStyleQuery(self._GetQueryData(lu),
548
                                    sort_by_name=self.sort_by_name)
549

    
550

    
551
def _ShareAll():
552
  """Returns a dict declaring all lock levels shared.
553

554
  """
555
  return dict.fromkeys(locking.LEVELS, 1)
556

    
557

    
558
def _SupportsOob(cfg, node):
559
  """Tells if node supports OOB.
560

561
  @type cfg: L{config.ConfigWriter}
562
  @param cfg: The cluster configuration
563
  @type node: L{objects.Node}
564
  @param node: The node
565
  @return: The OOB script if supported or an empty string otherwise
566

567
  """
568
  return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM]
569

    
570

    
571
def _GetWantedNodes(lu, nodes):
572
  """Returns list of checked and expanded node names.
573

574
  @type lu: L{LogicalUnit}
575
  @param lu: the logical unit on whose behalf we execute
576
  @type nodes: list
577
  @param nodes: list of node names or None for all nodes
578
  @rtype: list
579
  @return: the list of nodes, sorted
580
  @raise errors.ProgrammerError: if the nodes parameter is wrong type
581

582
  """
583
  if nodes:
584
    return [_ExpandNodeName(lu.cfg, name) for name in nodes]
585

    
586
  return utils.NiceSort(lu.cfg.GetNodeList())
587

    
588

    
589
def _GetWantedInstances(lu, instances):
590
  """Returns list of checked and expanded instance names.
591

592
  @type lu: L{LogicalUnit}
593
  @param lu: the logical unit on whose behalf we execute
594
  @type instances: list
595
  @param instances: list of instance names or None for all instances
596
  @rtype: list
597
  @return: the list of instances, sorted
598
  @raise errors.OpPrereqError: if the instances parameter is wrong type
599
  @raise errors.OpPrereqError: if any of the passed instances is not found
600

601
  """
602
  if instances:
603
    wanted = [_ExpandInstanceName(lu.cfg, name) for name in instances]
604
  else:
605
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
606
  return wanted
607

    
608

    
609
def _GetUpdatedParams(old_params, update_dict,
610
                      use_default=True, use_none=False):
611
  """Return the new version of a parameter dictionary.
612

613
  @type old_params: dict
614
  @param old_params: old parameters
615
  @type update_dict: dict
616
  @param update_dict: dict containing new parameter values, or
617
      constants.VALUE_DEFAULT to reset the parameter to its default
618
      value
619
  @param use_default: boolean
620
  @type use_default: whether to recognise L{constants.VALUE_DEFAULT}
621
      values as 'to be deleted' values
622
  @param use_none: boolean
623
  @type use_none: whether to recognise C{None} values as 'to be
624
      deleted' values
625
  @rtype: dict
626
  @return: the new parameter dictionary
627

628
  """
629
  params_copy = copy.deepcopy(old_params)
630
  for key, val in update_dict.iteritems():
631
    if ((use_default and val == constants.VALUE_DEFAULT) or
632
        (use_none and val is None)):
633
      try:
634
        del params_copy[key]
635
      except KeyError:
636
        pass
637
    else:
638
      params_copy[key] = val
639
  return params_copy
640

    
641

    
642
def _ReleaseLocks(lu, level, names=None, keep=None):
643
  """Releases locks owned by an LU.
644

645
  @type lu: L{LogicalUnit}
646
  @param level: Lock level
647
  @type names: list or None
648
  @param names: Names of locks to release
649
  @type keep: list or None
650
  @param keep: Names of locks to retain
651

652
  """
653
  assert not (keep is not None and names is not None), \
654
         "Only one of the 'names' and the 'keep' parameters can be given"
655

    
656
  if names is not None:
657
    should_release = names.__contains__
658
  elif keep:
659
    should_release = lambda name: name not in keep
660
  else:
661
    should_release = None
662

    
663
  if should_release:
664
    retain = []
665
    release = []
666

    
667
    # Determine which locks to release
668
    for name in lu.glm.list_owned(level):
669
      if should_release(name):
670
        release.append(name)
671
      else:
672
        retain.append(name)
673

    
674
    assert len(lu.glm.list_owned(level)) == (len(retain) + len(release))
675

    
676
    # Release just some locks
677
    lu.glm.release(level, names=release)
678

    
679
    assert frozenset(lu.glm.list_owned(level)) == frozenset(retain)
680
  else:
681
    # Release everything
682
    lu.glm.release(level)
683

    
684
    assert not lu.glm.is_owned(level), "No locks should be owned"
685

    
686

    
687
def _MapInstanceDisksToNodes(instances):
688
  """Creates a map from (node, volume) to instance name.
689

690
  @type instances: list of L{objects.Instance}
691
  @rtype: dict; tuple of (node name, volume name) as key, instance name as value
692

693
  """
694
  return dict(((node, vol), inst.name)
695
              for inst in instances
696
              for (node, vols) in inst.MapLVsByNode().items()
697
              for vol in vols)
698

    
699

    
700
def _RunPostHook(lu, node_name):
701
  """Runs the post-hook for an opcode on a single node.
702

703
  """
704
  hm = lu.proc.hmclass(lu.rpc.call_hooks_runner, lu)
705
  try:
706
    hm.RunPhase(constants.HOOKS_PHASE_POST, nodes=[node_name])
707
  except:
708
    # pylint: disable-msg=W0702
709
    lu.LogWarning("Errors occurred running hooks on %s" % node_name)
710

    
711

    
712
def _CheckOutputFields(static, dynamic, selected):
713
  """Checks whether all selected fields are valid.
714

715
  @type static: L{utils.FieldSet}
716
  @param static: static fields set
717
  @type dynamic: L{utils.FieldSet}
718
  @param dynamic: dynamic fields set
719

720
  """
721
  f = utils.FieldSet()
722
  f.Extend(static)
723
  f.Extend(dynamic)
724

    
725
  delta = f.NonMatching(selected)
726
  if delta:
727
    raise errors.OpPrereqError("Unknown output fields selected: %s"
728
                               % ",".join(delta), errors.ECODE_INVAL)
729

    
730

    
731
def _CheckGlobalHvParams(params):
732
  """Validates that given hypervisor params are not global ones.
733

734
  This will ensure that instances don't get customised versions of
735
  global params.
736

737
  """
738
  used_globals = constants.HVC_GLOBALS.intersection(params)
739
  if used_globals:
740
    msg = ("The following hypervisor parameters are global and cannot"
741
           " be customized at instance level, please modify them at"
742
           " cluster level: %s" % utils.CommaJoin(used_globals))
743
    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
744

    
745

    
746
def _CheckNodeOnline(lu, node, msg=None):
747
  """Ensure that a given node is online.
748

749
  @param lu: the LU on behalf of which we make the check
750
  @param node: the node to check
751
  @param msg: if passed, should be a message to replace the default one
752
  @raise errors.OpPrereqError: if the node is offline
753

754
  """
755
  if msg is None:
756
    msg = "Can't use offline node"
757
  if lu.cfg.GetNodeInfo(node).offline:
758
    raise errors.OpPrereqError("%s: %s" % (msg, node), errors.ECODE_STATE)
759

    
760

    
761
def _CheckNodeNotDrained(lu, node):
762
  """Ensure that a given node is not drained.
763

764
  @param lu: the LU on behalf of which we make the check
765
  @param node: the node to check
766
  @raise errors.OpPrereqError: if the node is drained
767

768
  """
769
  if lu.cfg.GetNodeInfo(node).drained:
770
    raise errors.OpPrereqError("Can't use drained node %s" % node,
771
                               errors.ECODE_STATE)
772

    
773

    
774
def _CheckNodeVmCapable(lu, node):
775
  """Ensure that a given node is vm capable.
776

777
  @param lu: the LU on behalf of which we make the check
778
  @param node: the node to check
779
  @raise errors.OpPrereqError: if the node is not vm capable
780

781
  """
782
  if not lu.cfg.GetNodeInfo(node).vm_capable:
783
    raise errors.OpPrereqError("Can't use non-vm_capable node %s" % node,
784
                               errors.ECODE_STATE)
785

    
786

    
787
def _CheckNodeHasOS(lu, node, os_name, force_variant):
788
  """Ensure that a node supports a given OS.
789

790
  @param lu: the LU on behalf of which we make the check
791
  @param node: the node to check
792
  @param os_name: the OS to query about
793
  @param force_variant: whether to ignore variant errors
794
  @raise errors.OpPrereqError: if the node is not supporting the OS
795

796
  """
797
  result = lu.rpc.call_os_get(node, os_name)
798
  result.Raise("OS '%s' not in supported OS list for node %s" %
799
               (os_name, node),
800
               prereq=True, ecode=errors.ECODE_INVAL)
801
  if not force_variant:
802
    _CheckOSVariant(result.payload, os_name)
803

    
804

    
805
def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
806
  """Ensure that a node has the given secondary ip.
807

808
  @type lu: L{LogicalUnit}
809
  @param lu: the LU on behalf of which we make the check
810
  @type node: string
811
  @param node: the node to check
812
  @type secondary_ip: string
813
  @param secondary_ip: the ip to check
814
  @type prereq: boolean
815
  @param prereq: whether to throw a prerequisite or an execute error
816
  @raise errors.OpPrereqError: if the node doesn't have the ip, and prereq=True
817
  @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
818

819
  """
820
  result = lu.rpc.call_node_has_ip_address(node, secondary_ip)
821
  result.Raise("Failure checking secondary ip on node %s" % node,
822
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
823
  if not result.payload:
824
    msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
825
           " please fix and re-run this command" % secondary_ip)
826
    if prereq:
827
      raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
828
    else:
829
      raise errors.OpExecError(msg)
830

    
831

    
832
def _GetClusterDomainSecret():
833
  """Reads the cluster domain secret.
834

835
  """
836
  return utils.ReadOneLineFile(constants.CLUSTER_DOMAIN_SECRET_FILE,
837
                               strict=True)
838

    
839

    
840
def _CheckInstanceDown(lu, instance, reason):
841
  """Ensure that an instance is not running."""
842
  if instance.admin_up:
843
    raise errors.OpPrereqError("Instance %s is marked to be up, %s" %
844
                               (instance.name, reason), errors.ECODE_STATE)
845

    
846
  pnode = instance.primary_node
847
  ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
848
  ins_l.Raise("Can't contact node %s for instance information" % pnode,
849
              prereq=True, ecode=errors.ECODE_ENVIRON)
850

    
851
  if instance.name in ins_l.payload:
852
    raise errors.OpPrereqError("Instance %s is running, %s" %
853
                               (instance.name, reason), errors.ECODE_STATE)
854

    
855

    
856
def _ExpandItemName(fn, name, kind):
857
  """Expand an item name.
858

859
  @param fn: the function to use for expansion
860
  @param name: requested item name
861
  @param kind: text description ('Node' or 'Instance')
862
  @return: the resolved (full) name
863
  @raise errors.OpPrereqError: if the item is not found
864

865
  """
866
  full_name = fn(name)
867
  if full_name is None:
868
    raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
869
                               errors.ECODE_NOENT)
870
  return full_name
871

    
872

    
873
def _ExpandNodeName(cfg, name):
874
  """Wrapper over L{_ExpandItemName} for nodes."""
875
  return _ExpandItemName(cfg.ExpandNodeName, name, "Node")
876

    
877

    
878
def _ExpandInstanceName(cfg, name):
879
  """Wrapper over L{_ExpandItemName} for instance."""
880
  return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
881

    
882

    
883
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
884
                          memory, vcpus, nics, disk_template, disks,
885
                          bep, hvp, hypervisor_name, tags):
886
  """Builds instance related env variables for hooks
887

888
  This builds the hook environment from individual variables.
889

890
  @type name: string
891
  @param name: the name of the instance
892
  @type primary_node: string
893
  @param primary_node: the name of the instance's primary node
894
  @type secondary_nodes: list
895
  @param secondary_nodes: list of secondary nodes as strings
896
  @type os_type: string
897
  @param os_type: the name of the instance's OS
898
  @type status: boolean
899
  @param status: the should_run status of the instance
900
  @type memory: string
901
  @param memory: the memory size of the instance
902
  @type vcpus: string
903
  @param vcpus: the count of VCPUs the instance has
904
  @type nics: list
905
  @param nics: list of tuples (ip, mac, mode, link) representing
906
      the NICs the instance has
907
  @type disk_template: string
908
  @param disk_template: the disk template of the instance
909
  @type disks: list
910
  @param disks: the list of (size, mode) pairs
911
  @type bep: dict
912
  @param bep: the backend parameters for the instance
913
  @type hvp: dict
914
  @param hvp: the hypervisor parameters for the instance
915
  @type hypervisor_name: string
916
  @param hypervisor_name: the hypervisor for the instance
917
  @type tags: list
918
  @param tags: list of instance tags as strings
919
  @rtype: dict
920
  @return: the hook environment for this instance
921

922
  """
923
  if status:
924
    str_status = "up"
925
  else:
926
    str_status = "down"
927
  env = {
928
    "OP_TARGET": name,
929
    "INSTANCE_NAME": name,
930
    "INSTANCE_PRIMARY": primary_node,
931
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
932
    "INSTANCE_OS_TYPE": os_type,
933
    "INSTANCE_STATUS": str_status,
934
    "INSTANCE_MEMORY": memory,
935
    "INSTANCE_VCPUS": vcpus,
936
    "INSTANCE_DISK_TEMPLATE": disk_template,
937
    "INSTANCE_HYPERVISOR": hypervisor_name,
938
  }
939

    
940
  if nics:
941
    nic_count = len(nics)
942
    for idx, (ip, mac, mode, link) in enumerate(nics):
943
      if ip is None:
944
        ip = ""
945
      env["INSTANCE_NIC%d_IP" % idx] = ip
946
      env["INSTANCE_NIC%d_MAC" % idx] = mac
947
      env["INSTANCE_NIC%d_MODE" % idx] = mode
948
      env["INSTANCE_NIC%d_LINK" % idx] = link
949
      if mode == constants.NIC_MODE_BRIDGED:
950
        env["INSTANCE_NIC%d_BRIDGE" % idx] = link
951
  else:
952
    nic_count = 0
953

    
954
  env["INSTANCE_NIC_COUNT"] = nic_count
955

    
956
  if disks:
957
    disk_count = len(disks)
958
    for idx, (size, mode) in enumerate(disks):
959
      env["INSTANCE_DISK%d_SIZE" % idx] = size
960
      env["INSTANCE_DISK%d_MODE" % idx] = mode
961
  else:
962
    disk_count = 0
963

    
964
  env["INSTANCE_DISK_COUNT"] = disk_count
965

    
966
  if not tags:
967
    tags = []
968

    
969
  env["INSTANCE_TAGS"] = " ".join(tags)
970

    
971
  for source, kind in [(bep, "BE"), (hvp, "HV")]:
972
    for key, value in source.items():
973
      env["INSTANCE_%s_%s" % (kind, key)] = value
974

    
975
  return env
976

    
977

    
978
def _NICListToTuple(lu, nics):
979
  """Build a list of nic information tuples.
980

981
  This list is suitable to be passed to _BuildInstanceHookEnv or as a return
982
  value in LUInstanceQueryData.
983

984
  @type lu:  L{LogicalUnit}
985
  @param lu: the logical unit on whose behalf we execute
986
  @type nics: list of L{objects.NIC}
987
  @param nics: list of nics to convert to hooks tuples
988

989
  """
990
  hooks_nics = []
991
  cluster = lu.cfg.GetClusterInfo()
992
  for nic in nics:
993
    ip = nic.ip
994
    mac = nic.mac
995
    filled_params = cluster.SimpleFillNIC(nic.nicparams)
996
    mode = filled_params[constants.NIC_MODE]
997
    link = filled_params[constants.NIC_LINK]
998
    hooks_nics.append((ip, mac, mode, link))
999
  return hooks_nics
1000

    
1001

    
1002
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
1003
  """Builds instance related env variables for hooks from an object.
1004

1005
  @type lu: L{LogicalUnit}
1006
  @param lu: the logical unit on whose behalf we execute
1007
  @type instance: L{objects.Instance}
1008
  @param instance: the instance for which we should build the
1009
      environment
1010
  @type override: dict
1011
  @param override: dictionary with key/values that will override
1012
      our values
1013
  @rtype: dict
1014
  @return: the hook environment dictionary
1015

1016
  """
1017
  cluster = lu.cfg.GetClusterInfo()
1018
  bep = cluster.FillBE(instance)
1019
  hvp = cluster.FillHV(instance)
1020
  args = {
1021
    "name": instance.name,
1022
    "primary_node": instance.primary_node,
1023
    "secondary_nodes": instance.secondary_nodes,
1024
    "os_type": instance.os,
1025
    "status": instance.admin_up,
1026
    "memory": bep[constants.BE_MEMORY],
1027
    "vcpus": bep[constants.BE_VCPUS],
1028
    "nics": _NICListToTuple(lu, instance.nics),
1029
    "disk_template": instance.disk_template,
1030
    "disks": [(disk.size, disk.mode) for disk in instance.disks],
1031
    "bep": bep,
1032
    "hvp": hvp,
1033
    "hypervisor_name": instance.hypervisor,
1034
    "tags": instance.tags,
1035
  }
1036
  if override:
1037
    args.update(override)
1038
  return _BuildInstanceHookEnv(**args) # pylint: disable-msg=W0142
1039

    
1040

    
1041
def _AdjustCandidatePool(lu, exceptions):
1042
  """Adjust the candidate pool after node operations.
1043

1044
  """
1045
  mod_list = lu.cfg.MaintainCandidatePool(exceptions)
1046
  if mod_list:
1047
    lu.LogInfo("Promoted nodes to master candidate role: %s",
1048
               utils.CommaJoin(node.name for node in mod_list))
1049
    for name in mod_list:
1050
      lu.context.ReaddNode(name)
1051
  mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
1052
  if mc_now > mc_max:
1053
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
1054
               (mc_now, mc_max))
1055

    
1056

    
1057
def _DecideSelfPromotion(lu, exceptions=None):
1058
  """Decide whether I should promote myself as a master candidate.
1059

1060
  """
1061
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
1062
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
1063
  # the new node will increase mc_max with one, so:
1064
  mc_should = min(mc_should + 1, cp_size)
1065
  return mc_now < mc_should
1066

    
1067

    
1068
def _CheckNicsBridgesExist(lu, target_nics, target_node):
1069
  """Check that the brigdes needed by a list of nics exist.
1070

1071
  """
1072
  cluster = lu.cfg.GetClusterInfo()
1073
  paramslist = [cluster.SimpleFillNIC(nic.nicparams) for nic in target_nics]
1074
  brlist = [params[constants.NIC_LINK] for params in paramslist
1075
            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
1076
  if brlist:
1077
    result = lu.rpc.call_bridges_exist(target_node, brlist)
1078
    result.Raise("Error checking bridges on destination node '%s'" %
1079
                 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
1080

    
1081

    
1082
def _CheckInstanceBridgesExist(lu, instance, node=None):
1083
  """Check that the brigdes needed by an instance exist.
1084

1085
  """
1086
  if node is None:
1087
    node = instance.primary_node
1088
  _CheckNicsBridgesExist(lu, instance.nics, node)
1089

    
1090

    
1091
def _CheckOSVariant(os_obj, name):
1092
  """Check whether an OS name conforms to the os variants specification.
1093

1094
  @type os_obj: L{objects.OS}
1095
  @param os_obj: OS object to check
1096
  @type name: string
1097
  @param name: OS name passed by the user, to check for validity
1098

1099
  """
1100
  variant = objects.OS.GetVariant(name)
1101
  if not os_obj.supported_variants:
1102
    if variant:
1103
      raise errors.OpPrereqError("OS '%s' doesn't support variants ('%s'"
1104
                                 " passed)" % (os_obj.name, variant),
1105
                                 errors.ECODE_INVAL)
1106
    return
1107
  if not variant:
1108
    raise errors.OpPrereqError("OS name must include a variant",
1109
                               errors.ECODE_INVAL)
1110

    
1111
  if variant not in os_obj.supported_variants:
1112
    raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
1113

    
1114

    
1115
def _GetNodeInstancesInner(cfg, fn):
1116
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
1117

    
1118

    
1119
def _GetNodeInstances(cfg, node_name):
1120
  """Returns a list of all primary and secondary instances on a node.
1121

1122
  """
1123

    
1124
  return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
1125

    
1126

    
1127
def _GetNodePrimaryInstances(cfg, node_name):
1128
  """Returns primary instances on a node.
1129

1130
  """
1131
  return _GetNodeInstancesInner(cfg,
1132
                                lambda inst: node_name == inst.primary_node)
1133

    
1134

    
1135
def _GetNodeSecondaryInstances(cfg, node_name):
1136
  """Returns secondary instances on a node.
1137

1138
  """
1139
  return _GetNodeInstancesInner(cfg,
1140
                                lambda inst: node_name in inst.secondary_nodes)
1141

    
1142

    
1143
def _GetStorageTypeArgs(cfg, storage_type):
1144
  """Returns the arguments for a storage type.
1145

1146
  """
1147
  # Special case for file storage
1148
  if storage_type == constants.ST_FILE:
1149
    # storage.FileStorage wants a list of storage directories
1150
    return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
1151

    
1152
  return []
1153

    
1154

    
1155
def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
1156
  faulty = []
1157

    
1158
  for dev in instance.disks:
1159
    cfg.SetDiskID(dev, node_name)
1160

    
1161
  result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
1162
  result.Raise("Failed to get disk status from node %s" % node_name,
1163
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
1164

    
1165
  for idx, bdev_status in enumerate(result.payload):
1166
    if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
1167
      faulty.append(idx)
1168

    
1169
  return faulty
1170

    
1171

    
1172
def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
1173
  """Check the sanity of iallocator and node arguments and use the
1174
  cluster-wide iallocator if appropriate.
1175

1176
  Check that at most one of (iallocator, node) is specified. If none is
1177
  specified, then the LU's opcode's iallocator slot is filled with the
1178
  cluster-wide default iallocator.
1179

1180
  @type iallocator_slot: string
1181
  @param iallocator_slot: the name of the opcode iallocator slot
1182
  @type node_slot: string
1183
  @param node_slot: the name of the opcode target node slot
1184

1185
  """
1186
  node = getattr(lu.op, node_slot, None)
1187
  iallocator = getattr(lu.op, iallocator_slot, None)
1188

    
1189
  if node is not None and iallocator is not None:
1190
    raise errors.OpPrereqError("Do not specify both, iallocator and node",
1191
                               errors.ECODE_INVAL)
1192
  elif node is None and iallocator is None:
1193
    default_iallocator = lu.cfg.GetDefaultIAllocator()
1194
    if default_iallocator:
1195
      setattr(lu.op, iallocator_slot, default_iallocator)
1196
    else:
1197
      raise errors.OpPrereqError("No iallocator or node given and no"
1198
                                 " cluster-wide default iallocator found;"
1199
                                 " please specify either an iallocator or a"
1200
                                 " node, or set a cluster-wide default"
1201
                                 " iallocator")
1202

    
1203

    
1204
class LUClusterPostInit(LogicalUnit):
1205
  """Logical unit for running hooks after cluster initialization.
1206

1207
  """
1208
  HPATH = "cluster-init"
1209
  HTYPE = constants.HTYPE_CLUSTER
1210

    
1211
  def BuildHooksEnv(self):
1212
    """Build hooks env.
1213

1214
    """
1215
    return {
1216
      "OP_TARGET": self.cfg.GetClusterName(),
1217
      }
1218

    
1219
  def BuildHooksNodes(self):
1220
    """Build hooks nodes.
1221

1222
    """
1223
    return ([], [self.cfg.GetMasterNode()])
1224

    
1225
  def Exec(self, feedback_fn):
1226
    """Nothing to do.
1227

1228
    """
1229
    return True
1230

    
1231

    
1232
class LUClusterDestroy(LogicalUnit):
1233
  """Logical unit for destroying the cluster.
1234

1235
  """
1236
  HPATH = "cluster-destroy"
1237
  HTYPE = constants.HTYPE_CLUSTER
1238

    
1239
  def BuildHooksEnv(self):
1240
    """Build hooks env.
1241

1242
    """
1243
    return {
1244
      "OP_TARGET": self.cfg.GetClusterName(),
1245
      }
1246

    
1247
  def BuildHooksNodes(self):
1248
    """Build hooks nodes.
1249

1250
    """
1251
    return ([], [])
1252

    
1253
  def CheckPrereq(self):
1254
    """Check prerequisites.
1255

1256
    This checks whether the cluster is empty.
1257

1258
    Any errors are signaled by raising errors.OpPrereqError.
1259

1260
    """
1261
    master = self.cfg.GetMasterNode()
1262

    
1263
    nodelist = self.cfg.GetNodeList()
1264
    if len(nodelist) != 1 or nodelist[0] != master:
1265
      raise errors.OpPrereqError("There are still %d node(s) in"
1266
                                 " this cluster." % (len(nodelist) - 1),
1267
                                 errors.ECODE_INVAL)
1268
    instancelist = self.cfg.GetInstanceList()
1269
    if instancelist:
1270
      raise errors.OpPrereqError("There are still %d instance(s) in"
1271
                                 " this cluster." % len(instancelist),
1272
                                 errors.ECODE_INVAL)
1273

    
1274
  def Exec(self, feedback_fn):
1275
    """Destroys the cluster.
1276

1277
    """
1278
    master = self.cfg.GetMasterNode()
1279

    
1280
    # Run post hooks on master node before it's removed
1281
    _RunPostHook(self, master)
1282

    
1283
    result = self.rpc.call_node_stop_master(master, False)
1284
    result.Raise("Could not disable the master role")
1285

    
1286
    return master
1287

    
1288

    
1289
def _VerifyCertificate(filename):
1290
  """Verifies a certificate for L{LUClusterVerifyConfig}.
1291

1292
  @type filename: string
1293
  @param filename: Path to PEM file
1294

1295
  """
1296
  try:
1297
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1298
                                           utils.ReadFile(filename))
1299
  except Exception, err: # pylint: disable-msg=W0703
1300
    return (LUClusterVerifyConfig.ETYPE_ERROR,
1301
            "Failed to load X509 certificate %s: %s" % (filename, err))
1302

    
1303
  (errcode, msg) = \
1304
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1305
                                constants.SSL_CERT_EXPIRATION_ERROR)
1306

    
1307
  if msg:
1308
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1309
  else:
1310
    fnamemsg = None
1311

    
1312
  if errcode is None:
1313
    return (None, fnamemsg)
1314
  elif errcode == utils.CERT_WARNING:
1315
    return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1316
  elif errcode == utils.CERT_ERROR:
1317
    return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1318

    
1319
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1320

    
1321

    
1322
def _GetAllHypervisorParameters(cluster, instances):
1323
  """Compute the set of all hypervisor parameters.
1324

1325
  @type cluster: L{objects.Cluster}
1326
  @param cluster: the cluster object
1327
  @param instances: list of L{objects.Instance}
1328
  @param instances: additional instances from which to obtain parameters
1329
  @rtype: list of (origin, hypervisor, parameters)
1330
  @return: a list with all parameters found, indicating the hypervisor they
1331
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1332

1333
  """
1334
  hvp_data = []
1335

    
1336
  for hv_name in cluster.enabled_hypervisors:
1337
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1338

    
1339
  for os_name, os_hvp in cluster.os_hvp.items():
1340
    for hv_name, hv_params in os_hvp.items():
1341
      if hv_params:
1342
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1343
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1344

    
1345
  # TODO: collapse identical parameter values in a single one
1346
  for instance in instances:
1347
    if instance.hvparams:
1348
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1349
                       cluster.FillHV(instance)))
1350

    
1351
  return hvp_data
1352

    
1353

    
1354
class _VerifyErrors(object):
1355
  """Mix-in for cluster/group verify LUs.
1356

1357
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1358
  self.op and self._feedback_fn to be available.)
1359

1360
  """
1361
  TCLUSTER = "cluster"
1362
  TNODE = "node"
1363
  TINSTANCE = "instance"
1364

    
1365
  ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
1366
  ECLUSTERCERT = (TCLUSTER, "ECLUSTERCERT")
1367
  ECLUSTERFILECHECK = (TCLUSTER, "ECLUSTERFILECHECK")
1368
  ECLUSTERDANGLINGNODES = (TNODE, "ECLUSTERDANGLINGNODES")
1369
  ECLUSTERDANGLINGINST = (TNODE, "ECLUSTERDANGLINGINST")
1370
  EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
1371
  EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
1372
  EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
1373
  EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
1374
  EINSTANCEFAULTYDISK = (TINSTANCE, "EINSTANCEFAULTYDISK")
1375
  EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE")
1376
  EINSTANCESPLITGROUPS = (TINSTANCE, "EINSTANCESPLITGROUPS")
1377
  ENODEDRBD = (TNODE, "ENODEDRBD")
1378
  ENODEDRBDHELPER = (TNODE, "ENODEDRBDHELPER")
1379
  ENODEFILECHECK = (TNODE, "ENODEFILECHECK")
1380
  ENODEHOOKS = (TNODE, "ENODEHOOKS")
1381
  ENODEHV = (TNODE, "ENODEHV")
1382
  ENODELVM = (TNODE, "ENODELVM")
1383
  ENODEN1 = (TNODE, "ENODEN1")
1384
  ENODENET = (TNODE, "ENODENET")
1385
  ENODEOS = (TNODE, "ENODEOS")
1386
  ENODEORPHANINSTANCE = (TNODE, "ENODEORPHANINSTANCE")
1387
  ENODEORPHANLV = (TNODE, "ENODEORPHANLV")
1388
  ENODERPC = (TNODE, "ENODERPC")
1389
  ENODESSH = (TNODE, "ENODESSH")
1390
  ENODEVERSION = (TNODE, "ENODEVERSION")
1391
  ENODESETUP = (TNODE, "ENODESETUP")
1392
  ENODETIME = (TNODE, "ENODETIME")
1393
  ENODEOOBPATH = (TNODE, "ENODEOOBPATH")
1394

    
1395
  ETYPE_FIELD = "code"
1396
  ETYPE_ERROR = "ERROR"
1397
  ETYPE_WARNING = "WARNING"
1398

    
1399
  def _Error(self, ecode, item, msg, *args, **kwargs):
1400
    """Format an error message.
1401

1402
    Based on the opcode's error_codes parameter, either format a
1403
    parseable error code, or a simpler error string.
1404

1405
    This must be called only from Exec and functions called from Exec.
1406

1407
    """
1408
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1409
    itype, etxt = ecode
1410
    # first complete the msg
1411
    if args:
1412
      msg = msg % args
1413
    # then format the whole message
1414
    if self.op.error_codes: # This is a mix-in. pylint: disable-msg=E1101
1415
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1416
    else:
1417
      if item:
1418
        item = " " + item
1419
      else:
1420
        item = ""
1421
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1422
    # and finally report it via the feedback_fn
1423
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable-msg=E1101
1424

    
1425
  def _ErrorIf(self, cond, *args, **kwargs):
1426
    """Log an error message if the passed condition is True.
1427

1428
    """
1429
    cond = (bool(cond)
1430
            or self.op.debug_simulate_errors) # pylint: disable-msg=E1101
1431
    if cond:
1432
      self._Error(*args, **kwargs)
1433
    # do not mark the operation as failed for WARN cases only
1434
    if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
1435
      self.bad = self.bad or cond
1436

    
1437

    
1438
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1439
  """Verifies the cluster config.
1440

1441
  """
1442
  REQ_BGL = True
1443

    
1444
  def _VerifyHVP(self, hvp_data):
1445
    """Verifies locally the syntax of the hypervisor parameters.
1446

1447
    """
1448
    for item, hv_name, hv_params in hvp_data:
1449
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1450
             (item, hv_name))
1451
      try:
1452
        hv_class = hypervisor.GetHypervisor(hv_name)
1453
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1454
        hv_class.CheckParameterSyntax(hv_params)
1455
      except errors.GenericError, err:
1456
        self._ErrorIf(True, self.ECLUSTERCFG, None, msg % str(err))
1457

    
1458
  def ExpandNames(self):
1459
    # Information can be safely retrieved as the BGL is acquired in exclusive
1460
    # mode
1461
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1462
    self.all_node_info = self.cfg.GetAllNodesInfo()
1463
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1464
    self.needed_locks = {}
1465

    
1466
  def Exec(self, feedback_fn):
1467
    """Verify integrity of cluster, performing various test on nodes.
1468

1469
    """
1470
    self.bad = False
1471
    self._feedback_fn = feedback_fn
1472

    
1473
    feedback_fn("* Verifying cluster config")
1474

    
1475
    for msg in self.cfg.VerifyConfig():
1476
      self._ErrorIf(True, self.ECLUSTERCFG, None, msg)
1477

    
1478
    feedback_fn("* Verifying cluster certificate files")
1479

    
1480
    for cert_filename in constants.ALL_CERT_FILES:
1481
      (errcode, msg) = _VerifyCertificate(cert_filename)
1482
      self._ErrorIf(errcode, self.ECLUSTERCERT, None, msg, code=errcode)
1483

    
1484
    feedback_fn("* Verifying hypervisor parameters")
1485

    
1486
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1487
                                                self.all_inst_info.values()))
1488

    
1489
    feedback_fn("* Verifying all nodes belong to an existing group")
1490

    
1491
    # We do this verification here because, should this bogus circumstance
1492
    # occur, it would never be caught by VerifyGroup, which only acts on
1493
    # nodes/instances reachable from existing node groups.
1494

    
1495
    dangling_nodes = set(node.name for node in self.all_node_info.values()
1496
                         if node.group not in self.all_group_info)
1497

    
1498
    dangling_instances = {}
1499
    no_node_instances = []
1500

    
1501
    for inst in self.all_inst_info.values():
1502
      if inst.primary_node in dangling_nodes:
1503
        dangling_instances.setdefault(inst.primary_node, []).append(inst.name)
1504
      elif inst.primary_node not in self.all_node_info:
1505
        no_node_instances.append(inst.name)
1506

    
1507
    pretty_dangling = [
1508
        "%s (%s)" %
1509
        (node.name,
1510
         utils.CommaJoin(dangling_instances.get(node.name,
1511
                                                ["no instances"])))
1512
        for node in dangling_nodes]
1513

    
1514
    self._ErrorIf(bool(dangling_nodes), self.ECLUSTERDANGLINGNODES, None,
1515
                  "the following nodes (and their instances) belong to a non"
1516
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1517

    
1518
    self._ErrorIf(bool(no_node_instances), self.ECLUSTERDANGLINGINST, None,
1519
                  "the following instances have a non-existing primary-node:"
1520
                  " %s", utils.CommaJoin(no_node_instances))
1521

    
1522
    return (not self.bad, [g.name for g in self.all_group_info.values()])
1523

    
1524

    
1525
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1526
  """Verifies the status of a node group.
1527

1528
  """
1529
  HPATH = "cluster-verify"
1530
  HTYPE = constants.HTYPE_CLUSTER
1531
  REQ_BGL = False
1532

    
1533
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1534

    
1535
  class NodeImage(object):
1536
    """A class representing the logical and physical status of a node.
1537

1538
    @type name: string
1539
    @ivar name: the node name to which this object refers
1540
    @ivar volumes: a structure as returned from
1541
        L{ganeti.backend.GetVolumeList} (runtime)
1542
    @ivar instances: a list of running instances (runtime)
1543
    @ivar pinst: list of configured primary instances (config)
1544
    @ivar sinst: list of configured secondary instances (config)
1545
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1546
        instances for which this node is secondary (config)
1547
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1548
    @ivar dfree: free disk, as reported by the node (runtime)
1549
    @ivar offline: the offline status (config)
1550
    @type rpc_fail: boolean
1551
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1552
        not whether the individual keys were correct) (runtime)
1553
    @type lvm_fail: boolean
1554
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1555
    @type hyp_fail: boolean
1556
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1557
    @type ghost: boolean
1558
    @ivar ghost: whether this is a known node or not (config)
1559
    @type os_fail: boolean
1560
    @ivar os_fail: whether the RPC call didn't return valid OS data
1561
    @type oslist: list
1562
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1563
    @type vm_capable: boolean
1564
    @ivar vm_capable: whether the node can host instances
1565

1566
    """
1567
    def __init__(self, offline=False, name=None, vm_capable=True):
1568
      self.name = name
1569
      self.volumes = {}
1570
      self.instances = []
1571
      self.pinst = []
1572
      self.sinst = []
1573
      self.sbp = {}
1574
      self.mfree = 0
1575
      self.dfree = 0
1576
      self.offline = offline
1577
      self.vm_capable = vm_capable
1578
      self.rpc_fail = False
1579
      self.lvm_fail = False
1580
      self.hyp_fail = False
1581
      self.ghost = False
1582
      self.os_fail = False
1583
      self.oslist = {}
1584

    
1585
  def ExpandNames(self):
1586
    # This raises errors.OpPrereqError on its own:
1587
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1588

    
1589
    # Get instances in node group; this is unsafe and needs verification later
1590
    inst_names = self.cfg.GetNodeGroupInstances(self.group_uuid)
1591

    
1592
    self.needed_locks = {
1593
      locking.LEVEL_INSTANCE: inst_names,
1594
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1595
      locking.LEVEL_NODE: [],
1596
      }
1597

    
1598
    self.share_locks = _ShareAll()
1599

    
1600
  def DeclareLocks(self, level):
1601
    if level == locking.LEVEL_NODE:
1602
      # Get members of node group; this is unsafe and needs verification later
1603
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1604

    
1605
      all_inst_info = self.cfg.GetAllInstancesInfo()
1606

    
1607
      # In Exec(), we warn about mirrored instances that have primary and
1608
      # secondary living in separate node groups. To fully verify that
1609
      # volumes for these instances are healthy, we will need to do an
1610
      # extra call to their secondaries. We ensure here those nodes will
1611
      # be locked.
1612
      for inst in self.glm.list_owned(locking.LEVEL_INSTANCE):
1613
        # Important: access only the instances whose lock is owned
1614
        if all_inst_info[inst].disk_template in constants.DTS_INT_MIRROR:
1615
          nodes.update(all_inst_info[inst].secondary_nodes)
1616

    
1617
      self.needed_locks[locking.LEVEL_NODE] = nodes
1618

    
1619
  def CheckPrereq(self):
1620
    group_nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1621
    group_instances = self.cfg.GetNodeGroupInstances(self.group_uuid)
1622

    
1623
    unlocked_nodes = \
1624
        group_nodes.difference(self.glm.list_owned(locking.LEVEL_NODE))
1625

    
1626
    unlocked_instances = \
1627
        group_instances.difference(self.glm.list_owned(locking.LEVEL_INSTANCE))
1628

    
1629
    if unlocked_nodes:
1630
      raise errors.OpPrereqError("Missing lock for nodes: %s" %
1631
                                 utils.CommaJoin(unlocked_nodes))
1632

    
1633
    if unlocked_instances:
1634
      raise errors.OpPrereqError("Missing lock for instances: %s" %
1635
                                 utils.CommaJoin(unlocked_instances))
1636

    
1637
    self.all_node_info = self.cfg.GetAllNodesInfo()
1638
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1639

    
1640
    self.my_node_names = utils.NiceSort(group_nodes)
1641
    self.my_inst_names = utils.NiceSort(group_instances)
1642

    
1643
    self.my_node_info = dict((name, self.all_node_info[name])
1644
                             for name in self.my_node_names)
1645

    
1646
    self.my_inst_info = dict((name, self.all_inst_info[name])
1647
                             for name in self.my_inst_names)
1648

    
1649
    # We detect here the nodes that will need the extra RPC calls for verifying
1650
    # split LV volumes; they should be locked.
1651
    extra_lv_nodes = set()
1652

    
1653
    for inst in self.my_inst_info.values():
1654
      if inst.disk_template in constants.DTS_INT_MIRROR:
1655
        group = self.my_node_info[inst.primary_node].group
1656
        for nname in inst.secondary_nodes:
1657
          if self.all_node_info[nname].group != group:
1658
            extra_lv_nodes.add(nname)
1659

    
1660
    unlocked_lv_nodes = \
1661
        extra_lv_nodes.difference(self.glm.list_owned(locking.LEVEL_NODE))
1662

    
1663
    if unlocked_lv_nodes:
1664
      raise errors.OpPrereqError("these nodes could be locked: %s" %
1665
                                 utils.CommaJoin(unlocked_lv_nodes))
1666
    self.extra_lv_nodes = list(extra_lv_nodes)
1667

    
1668
  def _VerifyNode(self, ninfo, nresult):
1669
    """Perform some basic validation on data returned from a node.
1670

1671
      - check the result data structure is well formed and has all the
1672
        mandatory fields
1673
      - check ganeti version
1674

1675
    @type ninfo: L{objects.Node}
1676
    @param ninfo: the node to check
1677
    @param nresult: the results from the node
1678
    @rtype: boolean
1679
    @return: whether overall this call was successful (and we can expect
1680
         reasonable values in the respose)
1681

1682
    """
1683
    node = ninfo.name
1684
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1685

    
1686
    # main result, nresult should be a non-empty dict
1687
    test = not nresult or not isinstance(nresult, dict)
1688
    _ErrorIf(test, self.ENODERPC, node,
1689
                  "unable to verify node: no data returned")
1690
    if test:
1691
      return False
1692

    
1693
    # compares ganeti version
1694
    local_version = constants.PROTOCOL_VERSION
1695
    remote_version = nresult.get("version", None)
1696
    test = not (remote_version and
1697
                isinstance(remote_version, (list, tuple)) and
1698
                len(remote_version) == 2)
1699
    _ErrorIf(test, self.ENODERPC, node,
1700
             "connection to node returned invalid data")
1701
    if test:
1702
      return False
1703

    
1704
    test = local_version != remote_version[0]
1705
    _ErrorIf(test, self.ENODEVERSION, node,
1706
             "incompatible protocol versions: master %s,"
1707
             " node %s", local_version, remote_version[0])
1708
    if test:
1709
      return False
1710

    
1711
    # node seems compatible, we can actually try to look into its results
1712

    
1713
    # full package version
1714
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1715
                  self.ENODEVERSION, node,
1716
                  "software version mismatch: master %s, node %s",
1717
                  constants.RELEASE_VERSION, remote_version[1],
1718
                  code=self.ETYPE_WARNING)
1719

    
1720
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1721
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1722
      for hv_name, hv_result in hyp_result.iteritems():
1723
        test = hv_result is not None
1724
        _ErrorIf(test, self.ENODEHV, node,
1725
                 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1726

    
1727
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1728
    if ninfo.vm_capable and isinstance(hvp_result, list):
1729
      for item, hv_name, hv_result in hvp_result:
1730
        _ErrorIf(True, self.ENODEHV, node,
1731
                 "hypervisor %s parameter verify failure (source %s): %s",
1732
                 hv_name, item, hv_result)
1733

    
1734
    test = nresult.get(constants.NV_NODESETUP,
1735
                       ["Missing NODESETUP results"])
1736
    _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s",
1737
             "; ".join(test))
1738

    
1739
    return True
1740

    
1741
  def _VerifyNodeTime(self, ninfo, nresult,
1742
                      nvinfo_starttime, nvinfo_endtime):
1743
    """Check the node time.
1744

1745
    @type ninfo: L{objects.Node}
1746
    @param ninfo: the node to check
1747
    @param nresult: the remote results for the node
1748
    @param nvinfo_starttime: the start time of the RPC call
1749
    @param nvinfo_endtime: the end time of the RPC call
1750

1751
    """
1752
    node = ninfo.name
1753
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1754

    
1755
    ntime = nresult.get(constants.NV_TIME, None)
1756
    try:
1757
      ntime_merged = utils.MergeTime(ntime)
1758
    except (ValueError, TypeError):
1759
      _ErrorIf(True, self.ENODETIME, node, "Node returned invalid time")
1760
      return
1761

    
1762
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1763
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1764
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1765
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1766
    else:
1767
      ntime_diff = None
1768

    
1769
    _ErrorIf(ntime_diff is not None, self.ENODETIME, node,
1770
             "Node time diverges by at least %s from master node time",
1771
             ntime_diff)
1772

    
1773
  def _VerifyNodeLVM(self, ninfo, nresult, vg_name):
1774
    """Check the node LVM results.
1775

1776
    @type ninfo: L{objects.Node}
1777
    @param ninfo: the node to check
1778
    @param nresult: the remote results for the node
1779
    @param vg_name: the configured VG name
1780

1781
    """
1782
    if vg_name is None:
1783
      return
1784

    
1785
    node = ninfo.name
1786
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1787

    
1788
    # checks vg existence and size > 20G
1789
    vglist = nresult.get(constants.NV_VGLIST, None)
1790
    test = not vglist
1791
    _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups")
1792
    if not test:
1793
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1794
                                            constants.MIN_VG_SIZE)
1795
      _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus)
1796

    
1797
    # check pv names
1798
    pvlist = nresult.get(constants.NV_PVLIST, None)
1799
    test = pvlist is None
1800
    _ErrorIf(test, self.ENODELVM, node, "Can't get PV list from node")
1801
    if not test:
1802
      # check that ':' is not present in PV names, since it's a
1803
      # special character for lvcreate (denotes the range of PEs to
1804
      # use on the PV)
1805
      for _, pvname, owner_vg in pvlist:
1806
        test = ":" in pvname
1807
        _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV"
1808
                 " '%s' of VG '%s'", pvname, owner_vg)
1809

    
1810
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1811
    """Check the node bridges.
1812

1813
    @type ninfo: L{objects.Node}
1814
    @param ninfo: the node to check
1815
    @param nresult: the remote results for the node
1816
    @param bridges: the expected list of bridges
1817

1818
    """
1819
    if not bridges:
1820
      return
1821

    
1822
    node = ninfo.name
1823
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1824

    
1825
    missing = nresult.get(constants.NV_BRIDGES, None)
1826
    test = not isinstance(missing, list)
1827
    _ErrorIf(test, self.ENODENET, node,
1828
             "did not return valid bridge information")
1829
    if not test:
1830
      _ErrorIf(bool(missing), self.ENODENET, node, "missing bridges: %s" %
1831
               utils.CommaJoin(sorted(missing)))
1832

    
1833
  def _VerifyNodeNetwork(self, ninfo, nresult):
1834
    """Check the node network connectivity results.
1835

1836
    @type ninfo: L{objects.Node}
1837
    @param ninfo: the node to check
1838
    @param nresult: the remote results for the node
1839

1840
    """
1841
    node = ninfo.name
1842
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1843

    
1844
    test = constants.NV_NODELIST not in nresult
1845
    _ErrorIf(test, self.ENODESSH, node,
1846
             "node hasn't returned node ssh connectivity data")
1847
    if not test:
1848
      if nresult[constants.NV_NODELIST]:
1849
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1850
          _ErrorIf(True, self.ENODESSH, node,
1851
                   "ssh communication with node '%s': %s", a_node, a_msg)
1852

    
1853
    test = constants.NV_NODENETTEST not in nresult
1854
    _ErrorIf(test, self.ENODENET, node,
1855
             "node hasn't returned node tcp connectivity data")
1856
    if not test:
1857
      if nresult[constants.NV_NODENETTEST]:
1858
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1859
        for anode in nlist:
1860
          _ErrorIf(True, self.ENODENET, node,
1861
                   "tcp communication with node '%s': %s",
1862
                   anode, nresult[constants.NV_NODENETTEST][anode])
1863

    
1864
    test = constants.NV_MASTERIP not in nresult
1865
    _ErrorIf(test, self.ENODENET, node,
1866
             "node hasn't returned node master IP reachability data")
1867
    if not test:
1868
      if not nresult[constants.NV_MASTERIP]:
1869
        if node == self.master_node:
1870
          msg = "the master node cannot reach the master IP (not configured?)"
1871
        else:
1872
          msg = "cannot reach the master IP"
1873
        _ErrorIf(True, self.ENODENET, node, msg)
1874

    
1875
  def _VerifyInstance(self, instance, instanceconfig, node_image,
1876
                      diskstatus):
1877
    """Verify an instance.
1878

1879
    This function checks to see if the required block devices are
1880
    available on the instance's node.
1881

1882
    """
1883
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1884
    node_current = instanceconfig.primary_node
1885

    
1886
    node_vol_should = {}
1887
    instanceconfig.MapLVsByNode(node_vol_should)
1888

    
1889
    for node in node_vol_should:
1890
      n_img = node_image[node]
1891
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1892
        # ignore missing volumes on offline or broken nodes
1893
        continue
1894
      for volume in node_vol_should[node]:
1895
        test = volume not in n_img.volumes
1896
        _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance,
1897
                 "volume %s missing on node %s", volume, node)
1898

    
1899
    if instanceconfig.admin_up:
1900
      pri_img = node_image[node_current]
1901
      test = instance not in pri_img.instances and not pri_img.offline
1902
      _ErrorIf(test, self.EINSTANCEDOWN, instance,
1903
               "instance not running on its primary node %s",
1904
               node_current)
1905

    
1906
    diskdata = [(nname, success, status, idx)
1907
                for (nname, disks) in diskstatus.items()
1908
                for idx, (success, status) in enumerate(disks)]
1909

    
1910
    for nname, success, bdev_status, idx in diskdata:
1911
      # the 'ghost node' construction in Exec() ensures that we have a
1912
      # node here
1913
      snode = node_image[nname]
1914
      bad_snode = snode.ghost or snode.offline
1915
      _ErrorIf(instanceconfig.admin_up and not success and not bad_snode,
1916
               self.EINSTANCEFAULTYDISK, instance,
1917
               "couldn't retrieve status for disk/%s on %s: %s",
1918
               idx, nname, bdev_status)
1919
      _ErrorIf((instanceconfig.admin_up and success and
1920
                bdev_status.ldisk_status == constants.LDS_FAULTY),
1921
               self.EINSTANCEFAULTYDISK, instance,
1922
               "disk/%s on %s is faulty", idx, nname)
1923

    
1924
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
1925
    """Verify if there are any unknown volumes in the cluster.
1926

1927
    The .os, .swap and backup volumes are ignored. All other volumes are
1928
    reported as unknown.
1929

1930
    @type reserved: L{ganeti.utils.FieldSet}
1931
    @param reserved: a FieldSet of reserved volume names
1932

1933
    """
1934
    for node, n_img in node_image.items():
1935
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1936
        # skip non-healthy nodes
1937
        continue
1938
      for volume in n_img.volumes:
1939
        test = ((node not in node_vol_should or
1940
                volume not in node_vol_should[node]) and
1941
                not reserved.Matches(volume))
1942
        self._ErrorIf(test, self.ENODEORPHANLV, node,
1943
                      "volume %s is unknown", volume)
1944

    
1945
  def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
1946
    """Verify N+1 Memory Resilience.
1947

1948
    Check that if one single node dies we can still start all the
1949
    instances it was primary for.
1950

1951
    """
1952
    cluster_info = self.cfg.GetClusterInfo()
1953
    for node, n_img in node_image.items():
1954
      # This code checks that every node which is now listed as
1955
      # secondary has enough memory to host all instances it is
1956
      # supposed to should a single other node in the cluster fail.
1957
      # FIXME: not ready for failover to an arbitrary node
1958
      # FIXME: does not support file-backed instances
1959
      # WARNING: we currently take into account down instances as well
1960
      # as up ones, considering that even if they're down someone
1961
      # might want to start them even in the event of a node failure.
1962
      if n_img.offline:
1963
        # we're skipping offline nodes from the N+1 warning, since
1964
        # most likely we don't have good memory infromation from them;
1965
        # we already list instances living on such nodes, and that's
1966
        # enough warning
1967
        continue
1968
      for prinode, instances in n_img.sbp.items():
1969
        needed_mem = 0
1970
        for instance in instances:
1971
          bep = cluster_info.FillBE(instance_cfg[instance])
1972
          if bep[constants.BE_AUTO_BALANCE]:
1973
            needed_mem += bep[constants.BE_MEMORY]
1974
        test = n_img.mfree < needed_mem
1975
        self._ErrorIf(test, self.ENODEN1, node,
1976
                      "not enough memory to accomodate instance failovers"
1977
                      " should node %s fail (%dMiB needed, %dMiB available)",
1978
                      prinode, needed_mem, n_img.mfree)
1979

    
1980
  @classmethod
1981
  def _VerifyFiles(cls, errorif, nodeinfo, master_node, all_nvinfo,
1982
                   (files_all, files_all_opt, files_mc, files_vm)):
1983
    """Verifies file checksums collected from all nodes.
1984

1985
    @param errorif: Callback for reporting errors
1986
    @param nodeinfo: List of L{objects.Node} objects
1987
    @param master_node: Name of master node
1988
    @param all_nvinfo: RPC results
1989

1990
    """
1991
    node_names = frozenset(node.name for node in nodeinfo if not node.offline)
1992

    
1993
    assert master_node in node_names
1994
    assert (len(files_all | files_all_opt | files_mc | files_vm) ==
1995
            sum(map(len, [files_all, files_all_opt, files_mc, files_vm]))), \
1996
           "Found file listed in more than one file list"
1997

    
1998
    # Define functions determining which nodes to consider for a file
1999
    file2nodefn = dict([(filename, fn)
2000
      for (files, fn) in [(files_all, None),
2001
                          (files_all_opt, None),
2002
                          (files_mc, lambda node: (node.master_candidate or
2003
                                                   node.name == master_node)),
2004
                          (files_vm, lambda node: node.vm_capable)]
2005
      for filename in files])
2006

    
2007
    fileinfo = dict((filename, {}) for filename in file2nodefn.keys())
2008

    
2009
    for node in nodeinfo:
2010
      if node.offline:
2011
        continue
2012

    
2013
      nresult = all_nvinfo[node.name]
2014

    
2015
      if nresult.fail_msg or not nresult.payload:
2016
        node_files = None
2017
      else:
2018
        node_files = nresult.payload.get(constants.NV_FILELIST, None)
2019

    
2020
      test = not (node_files and isinstance(node_files, dict))
2021
      errorif(test, cls.ENODEFILECHECK, node.name,
2022
              "Node did not return file checksum data")
2023
      if test:
2024
        continue
2025

    
2026
      for (filename, checksum) in node_files.items():
2027
        # Check if the file should be considered for a node
2028
        fn = file2nodefn[filename]
2029
        if fn is None or fn(node):
2030
          fileinfo[filename].setdefault(checksum, set()).add(node.name)
2031

    
2032
    for (filename, checksums) in fileinfo.items():
2033
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2034

    
2035
      # Nodes having the file
2036
      with_file = frozenset(node_name
2037
                            for nodes in fileinfo[filename].values()
2038
                            for node_name in nodes)
2039

    
2040
      # Nodes missing file
2041
      missing_file = node_names - with_file
2042

    
2043
      if filename in files_all_opt:
2044
        # All or no nodes
2045
        errorif(missing_file and missing_file != node_names,
2046
                cls.ECLUSTERFILECHECK, None,
2047
                "File %s is optional, but it must exist on all or no"
2048
                " nodes (not found on %s)",
2049
                filename, utils.CommaJoin(utils.NiceSort(missing_file)))
2050
      else:
2051
        errorif(missing_file, cls.ECLUSTERFILECHECK, None,
2052
                "File %s is missing from node(s) %s", filename,
2053
                utils.CommaJoin(utils.NiceSort(missing_file)))
2054

    
2055
      # See if there are multiple versions of the file
2056
      test = len(checksums) > 1
2057
      if test:
2058
        variants = ["variant %s on %s" %
2059
                    (idx + 1, utils.CommaJoin(utils.NiceSort(nodes)))
2060
                    for (idx, (checksum, nodes)) in
2061
                      enumerate(sorted(checksums.items()))]
2062
      else:
2063
        variants = []
2064

    
2065
      errorif(test, cls.ECLUSTERFILECHECK, None,
2066
              "File %s found with %s different checksums (%s)",
2067
              filename, len(checksums), "; ".join(variants))
2068

    
2069
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2070
                      drbd_map):
2071
    """Verifies and the node DRBD status.
2072

2073
    @type ninfo: L{objects.Node}
2074
    @param ninfo: the node to check
2075
    @param nresult: the remote results for the node
2076
    @param instanceinfo: the dict of instances
2077
    @param drbd_helper: the configured DRBD usermode helper
2078
    @param drbd_map: the DRBD map as returned by
2079
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2080

2081
    """
2082
    node = ninfo.name
2083
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2084

    
2085
    if drbd_helper:
2086
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2087
      test = (helper_result == None)
2088
      _ErrorIf(test, self.ENODEDRBDHELPER, node,
2089
               "no drbd usermode helper returned")
2090
      if helper_result:
2091
        status, payload = helper_result
2092
        test = not status
2093
        _ErrorIf(test, self.ENODEDRBDHELPER, node,
2094
                 "drbd usermode helper check unsuccessful: %s", payload)
2095
        test = status and (payload != drbd_helper)
2096
        _ErrorIf(test, self.ENODEDRBDHELPER, node,
2097
                 "wrong drbd usermode helper: %s", payload)
2098

    
2099
    # compute the DRBD minors
2100
    node_drbd = {}
2101
    for minor, instance in drbd_map[node].items():
2102
      test = instance not in instanceinfo
2103
      _ErrorIf(test, self.ECLUSTERCFG, None,
2104
               "ghost instance '%s' in temporary DRBD map", instance)
2105
        # ghost instance should not be running, but otherwise we
2106
        # don't give double warnings (both ghost instance and
2107
        # unallocated minor in use)
2108
      if test:
2109
        node_drbd[minor] = (instance, False)
2110
      else:
2111
        instance = instanceinfo[instance]
2112
        node_drbd[minor] = (instance.name, instance.admin_up)
2113

    
2114
    # and now check them
2115
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2116
    test = not isinstance(used_minors, (tuple, list))
2117
    _ErrorIf(test, self.ENODEDRBD, node,
2118
             "cannot parse drbd status file: %s", str(used_minors))
2119
    if test:
2120
      # we cannot check drbd status
2121
      return
2122

    
2123
    for minor, (iname, must_exist) in node_drbd.items():
2124
      test = minor not in used_minors and must_exist
2125
      _ErrorIf(test, self.ENODEDRBD, node,
2126
               "drbd minor %d of instance %s is not active", minor, iname)
2127
    for minor in used_minors:
2128
      test = minor not in node_drbd
2129
      _ErrorIf(test, self.ENODEDRBD, node,
2130
               "unallocated drbd minor %d is in use", minor)
2131

    
2132
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2133
    """Builds the node OS structures.
2134

2135
    @type ninfo: L{objects.Node}
2136
    @param ninfo: the node to check
2137
    @param nresult: the remote results for the node
2138
    @param nimg: the node image object
2139

2140
    """
2141
    node = ninfo.name
2142
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2143

    
2144
    remote_os = nresult.get(constants.NV_OSLIST, None)
2145
    test = (not isinstance(remote_os, list) or
2146
            not compat.all(isinstance(v, list) and len(v) == 7
2147
                           for v in remote_os))
2148

    
2149
    _ErrorIf(test, self.ENODEOS, node,
2150
             "node hasn't returned valid OS data")
2151

    
2152
    nimg.os_fail = test
2153

    
2154
    if test:
2155
      return
2156

    
2157
    os_dict = {}
2158

    
2159
    for (name, os_path, status, diagnose,
2160
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2161

    
2162
      if name not in os_dict:
2163
        os_dict[name] = []
2164

    
2165
      # parameters is a list of lists instead of list of tuples due to
2166
      # JSON lacking a real tuple type, fix it:
2167
      parameters = [tuple(v) for v in parameters]
2168
      os_dict[name].append((os_path, status, diagnose,
2169
                            set(variants), set(parameters), set(api_ver)))
2170

    
2171
    nimg.oslist = os_dict
2172

    
2173
  def _VerifyNodeOS(self, ninfo, nimg, base):
2174
    """Verifies the node OS list.
2175

2176
    @type ninfo: L{objects.Node}
2177
    @param ninfo: the node to check
2178
    @param nimg: the node image object
2179
    @param base: the 'template' node we match against (e.g. from the master)
2180

2181
    """
2182
    node = ninfo.name
2183
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2184

    
2185
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2186

    
2187
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2188
    for os_name, os_data in nimg.oslist.items():
2189
      assert os_data, "Empty OS status for OS %s?!" % os_name
2190
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2191
      _ErrorIf(not f_status, self.ENODEOS, node,
2192
               "Invalid OS %s (located at %s): %s", os_name, f_path, f_diag)
2193
      _ErrorIf(len(os_data) > 1, self.ENODEOS, node,
2194
               "OS '%s' has multiple entries (first one shadows the rest): %s",
2195
               os_name, utils.CommaJoin([v[0] for v in os_data]))
2196
      # this will catched in backend too
2197
      _ErrorIf(compat.any(v >= constants.OS_API_V15 for v in f_api)
2198
               and not f_var, self.ENODEOS, node,
2199
               "OS %s with API at least %d does not declare any variant",
2200
               os_name, constants.OS_API_V15)
2201
      # comparisons with the 'base' image
2202
      test = os_name not in base.oslist
2203
      _ErrorIf(test, self.ENODEOS, node,
2204
               "Extra OS %s not present on reference node (%s)",
2205
               os_name, base.name)
2206
      if test:
2207
        continue
2208
      assert base.oslist[os_name], "Base node has empty OS status?"
2209
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2210
      if not b_status:
2211
        # base OS is invalid, skipping
2212
        continue
2213
      for kind, a, b in [("API version", f_api, b_api),
2214
                         ("variants list", f_var, b_var),
2215
                         ("parameters", beautify_params(f_param),
2216
                          beautify_params(b_param))]:
2217
        _ErrorIf(a != b, self.ENODEOS, node,
2218
                 "OS %s for %s differs from reference node %s: [%s] vs. [%s]",
2219
                 kind, os_name, base.name,
2220
                 utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2221

    
2222
    # check any missing OSes
2223
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2224
    _ErrorIf(missing, self.ENODEOS, node,
2225
             "OSes present on reference node %s but missing on this node: %s",
2226
             base.name, utils.CommaJoin(missing))
2227

    
2228
  def _VerifyOob(self, ninfo, nresult):
2229
    """Verifies out of band functionality of a node.
2230

2231
    @type ninfo: L{objects.Node}
2232
    @param ninfo: the node to check
2233
    @param nresult: the remote results for the node
2234

2235
    """
2236
    node = ninfo.name
2237
    # We just have to verify the paths on master and/or master candidates
2238
    # as the oob helper is invoked on the master
2239
    if ((ninfo.master_candidate or ninfo.master_capable) and
2240
        constants.NV_OOB_PATHS in nresult):
2241
      for path_result in nresult[constants.NV_OOB_PATHS]:
2242
        self._ErrorIf(path_result, self.ENODEOOBPATH, node, path_result)
2243

    
2244
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2245
    """Verifies and updates the node volume data.
2246

2247
    This function will update a L{NodeImage}'s internal structures
2248
    with data from the remote call.
2249

2250
    @type ninfo: L{objects.Node}
2251
    @param ninfo: the node to check
2252
    @param nresult: the remote results for the node
2253
    @param nimg: the node image object
2254
    @param vg_name: the configured VG name
2255

2256
    """
2257
    node = ninfo.name
2258
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2259

    
2260
    nimg.lvm_fail = True
2261
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2262
    if vg_name is None:
2263
      pass
2264
    elif isinstance(lvdata, basestring):
2265
      _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s",
2266
               utils.SafeEncode(lvdata))
2267
    elif not isinstance(lvdata, dict):
2268
      _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)")
2269
    else:
2270
      nimg.volumes = lvdata
2271
      nimg.lvm_fail = False
2272

    
2273
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2274
    """Verifies and updates the node instance list.
2275

2276
    If the listing was successful, then updates this node's instance
2277
    list. Otherwise, it marks the RPC call as failed for the instance
2278
    list key.
2279

2280
    @type ninfo: L{objects.Node}
2281
    @param ninfo: the node to check
2282
    @param nresult: the remote results for the node
2283
    @param nimg: the node image object
2284

2285
    """
2286
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2287
    test = not isinstance(idata, list)
2288
    self._ErrorIf(test, self.ENODEHV, ninfo.name, "rpc call to node failed"
2289
                  " (instancelist): %s", utils.SafeEncode(str(idata)))
2290
    if test:
2291
      nimg.hyp_fail = True
2292
    else:
2293
      nimg.instances = idata
2294

    
2295
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2296
    """Verifies and computes a node information map
2297

2298
    @type ninfo: L{objects.Node}
2299
    @param ninfo: the node to check
2300
    @param nresult: the remote results for the node
2301
    @param nimg: the node image object
2302
    @param vg_name: the configured VG name
2303

2304
    """
2305
    node = ninfo.name
2306
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2307

    
2308
    # try to read free memory (from the hypervisor)
2309
    hv_info = nresult.get(constants.NV_HVINFO, None)
2310
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2311
    _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)")
2312
    if not test:
2313
      try:
2314
        nimg.mfree = int(hv_info["memory_free"])
2315
      except (ValueError, TypeError):
2316
        _ErrorIf(True, self.ENODERPC, node,
2317
                 "node returned invalid nodeinfo, check hypervisor")
2318

    
2319
    # FIXME: devise a free space model for file based instances as well
2320
    if vg_name is not None:
2321
      test = (constants.NV_VGLIST not in nresult or
2322
              vg_name not in nresult[constants.NV_VGLIST])
2323
      _ErrorIf(test, self.ENODELVM, node,
2324
               "node didn't return data for the volume group '%s'"
2325
               " - it is either missing or broken", vg_name)
2326
      if not test:
2327
        try:
2328
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2329
        except (ValueError, TypeError):
2330
          _ErrorIf(True, self.ENODERPC, node,
2331
                   "node returned invalid LVM info, check LVM status")
2332

    
2333
  def _CollectDiskInfo(self, nodelist, node_image, instanceinfo):
2334
    """Gets per-disk status information for all instances.
2335

2336
    @type nodelist: list of strings
2337
    @param nodelist: Node names
2338
    @type node_image: dict of (name, L{objects.Node})
2339
    @param node_image: Node objects
2340
    @type instanceinfo: dict of (name, L{objects.Instance})
2341
    @param instanceinfo: Instance objects
2342
    @rtype: {instance: {node: [(succes, payload)]}}
2343
    @return: a dictionary of per-instance dictionaries with nodes as
2344
        keys and disk information as values; the disk information is a
2345
        list of tuples (success, payload)
2346

2347
    """
2348
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2349

    
2350
    node_disks = {}
2351
    node_disks_devonly = {}
2352
    diskless_instances = set()
2353
    diskless = constants.DT_DISKLESS
2354

    
2355
    for nname in nodelist:
2356
      node_instances = list(itertools.chain(node_image[nname].pinst,
2357
                                            node_image[nname].sinst))
2358
      diskless_instances.update(inst for inst in node_instances
2359
                                if instanceinfo[inst].disk_template == diskless)
2360
      disks = [(inst, disk)
2361
               for inst in node_instances
2362
               for disk in instanceinfo[inst].disks]
2363

    
2364
      if not disks:
2365
        # No need to collect data
2366
        continue
2367

    
2368
      node_disks[nname] = disks
2369

    
2370
      # Creating copies as SetDiskID below will modify the objects and that can
2371
      # lead to incorrect data returned from nodes
2372
      devonly = [dev.Copy() for (_, dev) in disks]
2373

    
2374
      for dev in devonly:
2375
        self.cfg.SetDiskID(dev, nname)
2376

    
2377
      node_disks_devonly[nname] = devonly
2378

    
2379
    assert len(node_disks) == len(node_disks_devonly)
2380

    
2381
    # Collect data from all nodes with disks
2382
    result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2383
                                                          node_disks_devonly)
2384

    
2385
    assert len(result) == len(node_disks)
2386

    
2387
    instdisk = {}
2388

    
2389
    for (nname, nres) in result.items():
2390
      disks = node_disks[nname]
2391

    
2392
      if nres.offline:
2393
        # No data from this node
2394
        data = len(disks) * [(False, "node offline")]
2395
      else:
2396
        msg = nres.fail_msg
2397
        _ErrorIf(msg, self.ENODERPC, nname,
2398
                 "while getting disk information: %s", msg)
2399
        if msg:
2400
          # No data from this node
2401
          data = len(disks) * [(False, msg)]
2402
        else:
2403
          data = []
2404
          for idx, i in enumerate(nres.payload):
2405
            if isinstance(i, (tuple, list)) and len(i) == 2:
2406
              data.append(i)
2407
            else:
2408
              logging.warning("Invalid result from node %s, entry %d: %s",
2409
                              nname, idx, i)
2410
              data.append((False, "Invalid result from the remote node"))
2411

    
2412
      for ((inst, _), status) in zip(disks, data):
2413
        instdisk.setdefault(inst, {}).setdefault(nname, []).append(status)
2414

    
2415
    # Add empty entries for diskless instances.
2416
    for inst in diskless_instances:
2417
      assert inst not in instdisk
2418
      instdisk[inst] = {}
2419

    
2420
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2421
                      len(nnames) <= len(instanceinfo[inst].all_nodes) and
2422
                      compat.all(isinstance(s, (tuple, list)) and
2423
                                 len(s) == 2 for s in statuses)
2424
                      for inst, nnames in instdisk.items()
2425
                      for nname, statuses in nnames.items())
2426
    assert set(instdisk) == set(instanceinfo), "instdisk consistency failure"
2427

    
2428
    return instdisk
2429

    
2430
  def BuildHooksEnv(self):
2431
    """Build hooks env.
2432

2433
    Cluster-Verify hooks just ran in the post phase and their failure makes
2434
    the output be logged in the verify output and the verification to fail.
2435

2436
    """
2437
    env = {
2438
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
2439
      }
2440

    
2441
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2442
               for node in self.my_node_info.values())
2443

    
2444
    return env
2445

    
2446
  def BuildHooksNodes(self):
2447
    """Build hooks nodes.
2448

2449
    """
2450
    return ([], self.my_node_names)
2451

    
2452
  def Exec(self, feedback_fn):
2453
    """Verify integrity of the node group, performing various test on nodes.
2454

2455
    """
2456
    # This method has too many local variables. pylint: disable-msg=R0914
2457

    
2458
    if not self.my_node_names:
2459
      # empty node group
2460
      feedback_fn("* Empty node group, skipping verification")
2461
      return True
2462

    
2463
    self.bad = False
2464
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2465
    verbose = self.op.verbose
2466
    self._feedback_fn = feedback_fn
2467

    
2468
    vg_name = self.cfg.GetVGName()
2469
    drbd_helper = self.cfg.GetDRBDHelper()
2470
    cluster = self.cfg.GetClusterInfo()
2471
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
2472
    hypervisors = cluster.enabled_hypervisors
2473
    node_data_list = [self.my_node_info[name] for name in self.my_node_names]
2474

    
2475
    i_non_redundant = [] # Non redundant instances
2476
    i_non_a_balanced = [] # Non auto-balanced instances
2477
    n_offline = 0 # Count of offline nodes
2478
    n_drained = 0 # Count of nodes being drained
2479
    node_vol_should = {}
2480

    
2481
    # FIXME: verify OS list
2482

    
2483
    # File verification
2484
    filemap = _ComputeAncillaryFiles(cluster, False)
2485

    
2486
    # do local checksums
2487
    master_node = self.master_node = self.cfg.GetMasterNode()
2488
    master_ip = self.cfg.GetMasterIP()
2489

    
2490
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_names))
2491

    
2492
    # We will make nodes contact all nodes in their group, and one node from
2493
    # every other group.
2494
    # TODO: should it be a *random* node, different every time?
2495
    online_nodes = [node.name for node in node_data_list if not node.offline]
2496
    other_group_nodes = {}
2497

    
2498
    for name in sorted(self.all_node_info):
2499
      node = self.all_node_info[name]
2500
      if (node.group not in other_group_nodes
2501
          and node.group != self.group_uuid
2502
          and not node.offline):
2503
        other_group_nodes[node.group] = node.name
2504

    
2505
    node_verify_param = {
2506
      constants.NV_FILELIST:
2507
        utils.UniqueSequence(filename
2508
                             for files in filemap
2509
                             for filename in files),
2510
      constants.NV_NODELIST: online_nodes + other_group_nodes.values(),
2511
      constants.NV_HYPERVISOR: hypervisors,
2512
      constants.NV_HVPARAMS:
2513
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2514
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2515
                                 for node in node_data_list
2516
                                 if not node.offline],
2517
      constants.NV_INSTANCELIST: hypervisors,
2518
      constants.NV_VERSION: None,
2519
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2520
      constants.NV_NODESETUP: None,
2521
      constants.NV_TIME: None,
2522
      constants.NV_MASTERIP: (master_node, master_ip),
2523
      constants.NV_OSLIST: None,
2524
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2525
      }
2526

    
2527
    if vg_name is not None:
2528
      node_verify_param[constants.NV_VGLIST] = None
2529
      node_verify_param[constants.NV_LVLIST] = vg_name
2530
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2531
      node_verify_param[constants.NV_DRBDLIST] = None
2532

    
2533
    if drbd_helper:
2534
      node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2535

    
2536
    # bridge checks
2537
    # FIXME: this needs to be changed per node-group, not cluster-wide
2538
    bridges = set()
2539
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2540
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2541
      bridges.add(default_nicpp[constants.NIC_LINK])
2542
    for instance in self.my_inst_info.values():
2543
      for nic in instance.nics:
2544
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
2545
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2546
          bridges.add(full_nic[constants.NIC_LINK])
2547

    
2548
    if bridges:
2549
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
2550

    
2551
    # Build our expected cluster state
2552
    node_image = dict((node.name, self.NodeImage(offline=node.offline,
2553
                                                 name=node.name,
2554
                                                 vm_capable=node.vm_capable))
2555
                      for node in node_data_list)
2556

    
2557
    # Gather OOB paths
2558
    oob_paths = []
2559
    for node in self.all_node_info.values():
2560
      path = _SupportsOob(self.cfg, node)
2561
      if path and path not in oob_paths:
2562
        oob_paths.append(path)
2563

    
2564
    if oob_paths:
2565
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2566

    
2567
    for instance in self.my_inst_names:
2568
      inst_config = self.my_inst_info[instance]
2569

    
2570
      for nname in inst_config.all_nodes:
2571
        if nname not in node_image:
2572
          gnode = self.NodeImage(name=nname)
2573
          gnode.ghost = (nname not in self.all_node_info)
2574
          node_image[nname] = gnode
2575

    
2576
      inst_config.MapLVsByNode(node_vol_should)
2577

    
2578
      pnode = inst_config.primary_node
2579
      node_image[pnode].pinst.append(instance)
2580

    
2581
      for snode in inst_config.secondary_nodes:
2582
        nimg = node_image[snode]
2583
        nimg.sinst.append(instance)
2584
        if pnode not in nimg.sbp:
2585
          nimg.sbp[pnode] = []
2586
        nimg.sbp[pnode].append(instance)
2587

    
2588
    # At this point, we have the in-memory data structures complete,
2589
    # except for the runtime information, which we'll gather next
2590

    
2591
    # Due to the way our RPC system works, exact response times cannot be
2592
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2593
    # time before and after executing the request, we can at least have a time
2594
    # window.
2595
    nvinfo_starttime = time.time()
2596
    all_nvinfo = self.rpc.call_node_verify(self.my_node_names,
2597
                                           node_verify_param,
2598
                                           self.cfg.GetClusterName())
2599
    nvinfo_endtime = time.time()
2600

    
2601
    if self.extra_lv_nodes and vg_name is not None:
2602
      extra_lv_nvinfo = \
2603
          self.rpc.call_node_verify(self.extra_lv_nodes,
2604
                                    {constants.NV_LVLIST: vg_name},
2605
                                    self.cfg.GetClusterName())
2606
    else:
2607
      extra_lv_nvinfo = {}
2608

    
2609
    all_drbd_map = self.cfg.ComputeDRBDMap()
2610

    
2611
    feedback_fn("* Gathering disk information (%s nodes)" %
2612
                len(self.my_node_names))
2613
    instdisk = self._CollectDiskInfo(self.my_node_names, node_image,
2614
                                     self.my_inst_info)
2615

    
2616
    feedback_fn("* Verifying configuration file consistency")
2617

    
2618
    # If not all nodes are being checked, we need to make sure the master node
2619
    # and a non-checked vm_capable node are in the list.
2620
    absent_nodes = set(self.all_node_info).difference(self.my_node_info)
2621
    if absent_nodes:
2622
      vf_nvinfo = all_nvinfo.copy()
2623
      vf_node_info = list(self.my_node_info.values())
2624
      additional_nodes = []
2625
      if master_node not in self.my_node_info:
2626
        additional_nodes.append(master_node)
2627
        vf_node_info.append(self.all_node_info[master_node])
2628
      # Add the first vm_capable node we find which is not included
2629
      for node in absent_nodes:
2630
        nodeinfo = self.all_node_info[node]
2631
        if nodeinfo.vm_capable and not nodeinfo.offline:
2632
          additional_nodes.append(node)
2633
          vf_node_info.append(self.all_node_info[node])
2634
          break
2635
      key = constants.NV_FILELIST
2636
      vf_nvinfo.update(self.rpc.call_node_verify(additional_nodes,
2637
                                                 {key: node_verify_param[key]},
2638
                                                 self.cfg.GetClusterName()))
2639
    else:
2640
      vf_nvinfo = all_nvinfo
2641
      vf_node_info = self.my_node_info.values()
2642

    
2643
    self._VerifyFiles(_ErrorIf, vf_node_info, master_node, vf_nvinfo, filemap)
2644

    
2645
    feedback_fn("* Verifying node status")
2646

    
2647
    refos_img = None
2648

    
2649
    for node_i in node_data_list:
2650
      node = node_i.name
2651
      nimg = node_image[node]
2652

    
2653
      if node_i.offline:
2654
        if verbose:
2655
          feedback_fn("* Skipping offline node %s" % (node,))
2656
        n_offline += 1
2657
        continue
2658

    
2659
      if node == master_node:
2660
        ntype = "master"
2661
      elif node_i.master_candidate:
2662
        ntype = "master candidate"
2663
      elif node_i.drained:
2664
        ntype = "drained"
2665
        n_drained += 1
2666
      else:
2667
        ntype = "regular"
2668
      if verbose:
2669
        feedback_fn("* Verifying node %s (%s)" % (node, ntype))
2670

    
2671
      msg = all_nvinfo[node].fail_msg
2672
      _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg)
2673
      if msg:
2674
        nimg.rpc_fail = True
2675
        continue
2676

    
2677
      nresult = all_nvinfo[node].payload
2678

    
2679
      nimg.call_ok = self._VerifyNode(node_i, nresult)
2680
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2681
      self._VerifyNodeNetwork(node_i, nresult)
2682
      self._VerifyOob(node_i, nresult)
2683

    
2684
      if nimg.vm_capable:
2685
        self._VerifyNodeLVM(node_i, nresult, vg_name)
2686
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
2687
                             all_drbd_map)
2688

    
2689
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2690
        self._UpdateNodeInstances(node_i, nresult, nimg)
2691
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2692
        self._UpdateNodeOS(node_i, nresult, nimg)
2693

    
2694
        if not nimg.os_fail:
2695
          if refos_img is None:
2696
            refos_img = nimg
2697
          self._VerifyNodeOS(node_i, nimg, refos_img)
2698
        self._VerifyNodeBridges(node_i, nresult, bridges)
2699

    
2700
        # Check whether all running instancies are primary for the node. (This
2701
        # can no longer be done from _VerifyInstance below, since some of the
2702
        # wrong instances could be from other node groups.)
2703
        non_primary_inst = set(nimg.instances).difference(nimg.pinst)
2704

    
2705
        for inst in non_primary_inst:
2706
          test = inst in self.all_inst_info
2707
          _ErrorIf(test, self.EINSTANCEWRONGNODE, inst,
2708
                   "instance should not run on node %s", node_i.name)
2709
          _ErrorIf(not test, self.ENODEORPHANINSTANCE, node_i.name,
2710
                   "node is running unknown instance %s", inst)
2711

    
2712
    for node, result in extra_lv_nvinfo.items():
2713
      self._UpdateNodeVolumes(self.all_node_info[node], result.payload,
2714
                              node_image[node], vg_name)
2715

    
2716
    feedback_fn("* Verifying instance status")
2717
    for instance in self.my_inst_names:
2718
      if verbose:
2719
        feedback_fn("* Verifying instance %s" % instance)
2720
      inst_config = self.my_inst_info[instance]
2721
      self._VerifyInstance(instance, inst_config, node_image,
2722
                           instdisk[instance])
2723
      inst_nodes_offline = []
2724

    
2725
      pnode = inst_config.primary_node
2726
      pnode_img = node_image[pnode]
2727
      _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2728
               self.ENODERPC, pnode, "instance %s, connection to"
2729
               " primary node failed", instance)
2730

    
2731
      _ErrorIf(inst_config.admin_up and pnode_img.offline,
2732
               self.EINSTANCEBADNODE, instance,
2733
               "instance is marked as running and lives on offline node %s",
2734
               inst_config.primary_node)
2735

    
2736
      # If the instance is non-redundant we cannot survive losing its primary
2737
      # node, so we are not N+1 compliant. On the other hand we have no disk
2738
      # templates with more than one secondary so that situation is not well
2739
      # supported either.
2740
      # FIXME: does not support file-backed instances
2741
      if not inst_config.secondary_nodes:
2742
        i_non_redundant.append(instance)
2743

    
2744
      _ErrorIf(len(inst_config.secondary_nodes) > 1, self.EINSTANCELAYOUT,
2745
               instance, "instance has multiple secondary nodes: %s",
2746
               utils.CommaJoin(inst_config.secondary_nodes),
2747
               code=self.ETYPE_WARNING)
2748

    
2749
      if inst_config.disk_template in constants.DTS_INT_MIRROR:
2750
        pnode = inst_config.primary_node
2751
        instance_nodes = utils.NiceSort(inst_config.all_nodes)
2752
        instance_groups = {}
2753

    
2754
        for node in instance_nodes:
2755
          instance_groups.setdefault(self.all_node_info[node].group,
2756
                                     []).append(node)
2757

    
2758
        pretty_list = [
2759
          "%s (group %s)" % (utils.CommaJoin(nodes), groupinfo[group].name)
2760
          # Sort so that we always list the primary node first.
2761
          for group, nodes in sorted(instance_groups.items(),
2762
                                     key=lambda (_, nodes): pnode in nodes,
2763
                                     reverse=True)]
2764

    
2765
        self._ErrorIf(len(instance_groups) > 1, self.EINSTANCESPLITGROUPS,
2766
                      instance, "instance has primary and secondary nodes in"
2767
                      " different groups: %s", utils.CommaJoin(pretty_list),
2768
                      code=self.ETYPE_WARNING)
2769

    
2770
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
2771
        i_non_a_balanced.append(instance)
2772

    
2773
      for snode in inst_config.secondary_nodes:
2774
        s_img = node_image[snode]
2775
        _ErrorIf(s_img.rpc_fail and not s_img.offline, self.ENODERPC, snode,
2776
                 "instance %s, connection to secondary node failed", instance)
2777

    
2778
        if s_img.offline:
2779
          inst_nodes_offline.append(snode)
2780

    
2781
      # warn that the instance lives on offline nodes
2782
      _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance,
2783
               "instance has offline secondary node(s) %s",
2784
               utils.CommaJoin(inst_nodes_offline))
2785
      # ... or ghost/non-vm_capable nodes
2786
      for node in inst_config.all_nodes:
2787
        _ErrorIf(node_image[node].ghost, self.EINSTANCEBADNODE, instance,
2788
                 "instance lives on ghost node %s", node)
2789
        _ErrorIf(not node_image[node].vm_capable, self.EINSTANCEBADNODE,
2790
                 instance, "instance lives on non-vm_capable node %s", node)
2791

    
2792
    feedback_fn("* Verifying orphan volumes")
2793
    reserved = utils.FieldSet(*cluster.reserved_lvs)
2794

    
2795
    # We will get spurious "unknown volume" warnings if any node of this group
2796
    # is secondary for an instance whose primary is in another group. To avoid
2797
    # them, we find these instances and add their volumes to node_vol_should.
2798
    for inst in self.all_inst_info.values():
2799
      for secondary in inst.secondary_nodes:
2800
        if (secondary in self.my_node_info
2801
            and inst.name not in self.my_inst_info):
2802
          inst.MapLVsByNode(node_vol_should)
2803
          break
2804

    
2805
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
2806

    
2807
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
2808
      feedback_fn("* Verifying N+1 Memory redundancy")
2809
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
2810

    
2811
    feedback_fn("* Other Notes")
2812
    if i_non_redundant:
2813
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
2814
                  % len(i_non_redundant))
2815

    
2816
    if i_non_a_balanced:
2817
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
2818
                  % len(i_non_a_balanced))
2819

    
2820
    if n_offline:
2821
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
2822

    
2823
    if n_drained:
2824
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
2825

    
2826
    return not self.bad
2827

    
2828
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
2829
    """Analyze the post-hooks' result
2830

2831
    This method analyses the hook result, handles it, and sends some
2832
    nicely-formatted feedback back to the user.
2833

2834
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
2835
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
2836
    @param hooks_results: the results of the multi-node hooks rpc call
2837
    @param feedback_fn: function used send feedback back to the caller
2838
    @param lu_result: previous Exec result
2839
    @return: the new Exec result, based on the previous result
2840
        and hook results
2841

2842
    """
2843
    # We only really run POST phase hooks, only for non-empty groups,
2844
    # and are only interested in their results
2845
    if not self.my_node_names:
2846
      # empty node group
2847
      pass
2848
    elif phase == constants.HOOKS_PHASE_POST:
2849
      # Used to change hooks' output to proper indentation
2850
      feedback_fn("* Hooks Results")
2851
      assert hooks_results, "invalid result from hooks"
2852

    
2853
      for node_name in hooks_results:
2854
        res = hooks_results[node_name]
2855
        msg = res.fail_msg
2856
        test = msg and not res.offline
2857
        self._ErrorIf(test, self.ENODEHOOKS, node_name,
2858
                      "Communication failure in hooks execution: %s", msg)
2859
        if res.offline or msg:
2860
          # No need to investigate payload if node is offline or gave an error.
2861
          # override manually lu_result here as _ErrorIf only
2862
          # overrides self.bad
2863
          lu_result = 1
2864
          continue
2865
        for script, hkr, output in res.payload:
2866
          test = hkr == constants.HKR_FAIL
2867
          self._ErrorIf(test, self.ENODEHOOKS, node_name,
2868
                        "Script %s failed, output:", script)
2869
          if test:
2870
            output = self._HOOKS_INDENT_RE.sub("      ", output)
2871
            feedback_fn("%s" % output)
2872
            lu_result = 0
2873

    
2874
    return lu_result
2875

    
2876

    
2877
class LUClusterVerifyDisks(NoHooksLU):
2878
  """Verifies the cluster disks status.
2879

2880
  """
2881
  REQ_BGL = False
2882

    
2883
  def ExpandNames(self):
2884
    self.share_locks = _ShareAll()
2885
    self.needed_locks = {
2886
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
2887
      }
2888

    
2889
  def Exec(self, feedback_fn):
2890
    group_names = self.glm.list_owned(locking.LEVEL_NODEGROUP)
2891

    
2892
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
2893
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
2894
                           for group in group_names])
2895

    
2896

    
2897
class LUGroupVerifyDisks(NoHooksLU):
2898
  """Verifies the status of all disks in a node group.
2899

2900
  """
2901
  REQ_BGL = False
2902

    
2903
  def ExpandNames(self):
2904
    # Raises errors.OpPrereqError on its own if group can't be found
2905
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
2906

    
2907
    self.share_locks = _ShareAll()
2908
    self.needed_locks = {
2909
      locking.LEVEL_INSTANCE: [],
2910
      locking.LEVEL_NODEGROUP: [],
2911
      locking.LEVEL_NODE: [],
2912
      }
2913

    
2914
  def DeclareLocks(self, level):
2915
    if level == locking.LEVEL_INSTANCE:
2916
      assert not self.needed_locks[locking.LEVEL_INSTANCE]
2917

    
2918
      # Lock instances optimistically, needs verification once node and group
2919
      # locks have been acquired
2920
      self.needed_locks[locking.LEVEL_INSTANCE] = \
2921
        self.cfg.GetNodeGroupInstances(self.group_uuid)
2922

    
2923
    elif level == locking.LEVEL_NODEGROUP:
2924
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
2925

    
2926
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
2927
        set([self.group_uuid] +
2928
            # Lock all groups used by instances optimistically; this requires
2929
            # going via the node before it's locked, requiring verification
2930
            # later on
2931
            [group_uuid
2932
             for instance_name in
2933
               self.glm.list_owned(locking.LEVEL_INSTANCE)
2934
             for group_uuid in
2935
               self.cfg.GetInstanceNodeGroups(instance_name)])
2936

    
2937
    elif level == locking.LEVEL_NODE:
2938
      # This will only lock the nodes in the group to be verified which contain
2939
      # actual instances
2940
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
2941
      self._LockInstancesNodes()
2942

    
2943
      # Lock all nodes in group to be verified
2944
      assert self.group_uuid in self.glm.list_owned(locking.LEVEL_NODEGROUP)
2945
      member_nodes = self.cfg.GetNodeGroup(self.group_uuid).members
2946
      self.needed_locks[locking.LEVEL_NODE].extend(member_nodes)
2947

    
2948
  def CheckPrereq(self):
2949
    owned_instances = frozenset(self.glm.list_owned(locking.LEVEL_INSTANCE))
2950
    owned_groups = frozenset(self.glm.list_owned(locking.LEVEL_NODEGROUP))
2951
    owned_nodes = frozenset(self.glm.list_owned(locking.LEVEL_NODE))
2952

    
2953
    assert self.group_uuid in owned_groups
2954

    
2955
    # Check if locked instances are still correct
2956
    wanted_instances = self.cfg.GetNodeGroupInstances(self.group_uuid)
2957
    if owned_instances != wanted_instances:
2958
      raise errors.OpPrereqError("Instances in node group %s changed since"
2959
                                 " locks were acquired, wanted %s, have %s;"
2960
                                 " retry the operation" %
2961
                                 (self.op.group_name,
2962
                                  utils.CommaJoin(wanted_instances),
2963
                                  utils.CommaJoin(owned_instances)),
2964
                                 errors.ECODE_STATE)
2965

    
2966
    # Get instance information
2967
    self.instances = dict((name, self.cfg.GetInstanceInfo(name))
2968
                          for name in owned_instances)
2969

    
2970
    # Check if node groups for locked instances are still correct
2971
    for (instance_name, inst) in self.instances.items():
2972
      assert self.group_uuid in self.cfg.GetInstanceNodeGroups(instance_name), \
2973
        "Instance %s has no node in group %s" % (instance_name, self.group_uuid)
2974
      assert owned_nodes.issuperset(inst.all_nodes), \
2975
        "Instance %s's nodes changed while we kept the lock" % instance_name
2976

    
2977
      inst_groups = self.cfg.GetInstanceNodeGroups(instance_name)
2978
      if not owned_groups.issuperset(inst_groups):
2979
        raise errors.OpPrereqError("Instance %s's node groups changed since"
2980
                                   " locks were acquired, current groups are"
2981
                                   " are '%s', owning groups '%s'; retry the"
2982
                                   " operation" %
2983
                                   (instance_name,
2984
                                    utils.CommaJoin(inst_groups),
2985
                                    utils.CommaJoin(owned_groups)),
2986
                                   errors.ECODE_STATE)
2987

    
2988
  def Exec(self, feedback_fn):
2989
    """Verify integrity of cluster disks.
2990

2991
    @rtype: tuple of three items
2992
    @return: a tuple of (dict of node-to-node_error, list of instances
2993
        which need activate-disks, dict of instance: (node, volume) for
2994
        missing volumes
2995

2996
    """
2997
    res_nodes = {}
2998
    res_instances = set()
2999
    res_missing = {}
3000

    
3001
    nv_dict = _MapInstanceDisksToNodes([inst
3002
                                        for inst in self.instances.values()
3003
                                        if inst.admin_up])
3004

    
3005
    if nv_dict:
3006
      nodes = utils.NiceSort(set(self.glm.list_owned(locking.LEVEL_NODE)) &
3007
                             set(self.cfg.GetVmCapableNodeList()))
3008

    
3009
      node_lvs = self.rpc.call_lv_list(nodes, [])
3010

    
3011
      for (node, node_res) in node_lvs.items():
3012
        if node_res.offline:
3013
          continue
3014

    
3015
        msg = node_res.fail_msg
3016
        if msg:
3017
          logging.warning("Error enumerating LVs on node %s: %s", node, msg)
3018
          res_nodes[node] = msg
3019
          continue
3020

    
3021
        for lv_name, (_, _, lv_online) in node_res.payload.items():
3022
          inst = nv_dict.pop((node, lv_name), None)
3023
          if not (lv_online or inst is None):
3024
            res_instances.add(inst)
3025

    
3026
      # any leftover items in nv_dict are missing LVs, let's arrange the data
3027
      # better
3028
      for key, inst in nv_dict.iteritems():
3029
        res_missing.setdefault(inst, []).append(key)
3030

    
3031
    return (res_nodes, list(res_instances), res_missing)
3032

    
3033

    
3034
class LUClusterRepairDiskSizes(NoHooksLU):
3035
  """Verifies the cluster disks sizes.
3036

3037
  """
3038
  REQ_BGL = False
3039

    
3040
  def ExpandNames(self):
3041
    if self.op.instances:
3042
      self.wanted_names = _GetWantedInstances(self, self.op.instances)
3043
      self.needed_locks = {
3044
        locking.LEVEL_NODE: [],
3045
        locking.LEVEL_INSTANCE: self.wanted_names,
3046
        }
3047
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3048
    else:
3049
      self.wanted_names = None
3050
      self.needed_locks = {
3051
        locking.LEVEL_NODE: locking.ALL_SET,
3052
        locking.LEVEL_INSTANCE: locking.ALL_SET,
3053
        }
3054
    self.share_locks = _ShareAll()
3055

    
3056
  def DeclareLocks(self, level):
3057
    if level == locking.LEVEL_NODE and self.wanted_names is not None:
3058
      self._LockInstancesNodes(primary_only=True)
3059

    
3060
  def CheckPrereq(self):
3061
    """Check prerequisites.
3062

3063
    This only checks the optional instance list against the existing names.
3064

3065
    """
3066
    if self.wanted_names is None:
3067
      self.wanted_names = self.glm.list_owned(locking.LEVEL_INSTANCE)
3068

    
3069
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3070
                             in self.wanted_names]
3071

    
3072
  def _EnsureChildSizes(self, disk):
3073
    """Ensure children of the disk have the needed disk size.
3074

3075
    This is valid mainly for DRBD8 and fixes an issue where the
3076
    children have smaller disk size.
3077

3078
    @param disk: an L{ganeti.objects.Disk} object
3079

3080
    """
3081
    if disk.dev_type == constants.LD_DRBD8:
3082
      assert disk.children, "Empty children for DRBD8?"
3083
      fchild = disk.children[0]
3084
      mismatch = fchild.size < disk.size
3085
      if mismatch:
3086
        self.LogInfo("Child disk has size %d, parent %d, fixing",
3087
                     fchild.size, disk.size)
3088
        fchild.size = disk.size
3089

    
3090
      # and we recurse on this child only, not on the metadev
3091
      return self._EnsureChildSizes(fchild) or mismatch
3092
    else:
3093
      return False
3094

    
3095
  def Exec(self, feedback_fn):
3096
    """Verify the size of cluster disks.
3097

3098
    """
3099
    # TODO: check child disks too
3100
    # TODO: check differences in size between primary/secondary nodes
3101
    per_node_disks = {}
3102
    for instance in self.wanted_instances:
3103
      pnode = instance.primary_node
3104
      if pnode not in per_node_disks:
3105
        per_node_disks[pnode] = []
3106
      for idx, disk in enumerate(instance.disks):
3107
        per_node_disks[pnode].append((instance, idx, disk))
3108

    
3109
    changed = []
3110
    for node, dskl in per_node_disks.items():
3111
      newl = [v[2].Copy() for v in dskl]
3112
      for dsk in newl:
3113
        self.cfg.SetDiskID(dsk, node)
3114
      result = self.rpc.call_blockdev_getsize(node, newl)
3115
      if result.fail_msg:
3116
        self.LogWarning("Failure in blockdev_getsize call to node"
3117
                        " %s, ignoring", node)
3118
        continue
3119
      if len(result.payload) != len(dskl):
3120
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
3121
                        " result.payload=%s", node, len(dskl), result.payload)
3122
        self.LogWarning("Invalid result from node %s, ignoring node results",
3123
                        node)
3124
        continue
3125
      for ((instance, idx, disk), size) in zip(dskl, result.payload):
3126
        if size is None:
3127
          self.LogWarning("Disk %d of instance %s did not return size"
3128
                          " information, ignoring", idx, instance.name)
3129
          continue
3130
        if not isinstance(size, (int, long)):
3131
          self.LogWarning("Disk %d of instance %s did not return valid"
3132
                          " size information, ignoring", idx, instance.name)
3133
          continue
3134
        size = size >> 20
3135
        if size != disk.size:
3136
          self.LogInfo("Disk %d of instance %s has mismatched size,"
3137
                       " correcting: recorded %d, actual %d", idx,
3138
                       instance.name, disk.size, size)
3139
          disk.size = size
3140
          self.cfg.Update(instance, feedback_fn)
3141
          changed.append((instance.name, idx, size))
3142
        if self._EnsureChildSizes(disk):
3143
          self.cfg.Update(instance, feedback_fn)
3144
          changed.append((instance.name, idx, disk.size))
3145
    return changed
3146

    
3147

    
3148
class LUClusterRename(LogicalUnit):
3149
  """Rename the cluster.
3150

3151
  """
3152
  HPATH = "cluster-rename"
3153
  HTYPE = constants.HTYPE_CLUSTER
3154

    
3155
  def BuildHooksEnv(self):
3156
    """Build hooks env.
3157

3158
    """
3159
    return {
3160
      "OP_TARGET": self.cfg.GetClusterName(),
3161
      "NEW_NAME": self.op.name,
3162
      }
3163

    
3164
  def BuildHooksNodes(self):
3165
    """Build hooks nodes.
3166

3167
    """
3168
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
3169

    
3170
  def CheckPrereq(self):
3171
    """Verify that the passed name is a valid one.
3172

3173
    """
3174
    hostname = netutils.GetHostname(name=self.op.name,
3175
                                    family=self.cfg.GetPrimaryIPFamily())
3176

    
3177
    new_name = hostname.name
3178
    self.ip = new_ip = hostname.ip
3179
    old_name = self.cfg.GetClusterName()
3180
    old_ip = self.cfg.GetMasterIP()
3181
    if new_name == old_name and new_ip == old_ip:
3182
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
3183
                                 " cluster has changed",
3184
                                 errors.ECODE_INVAL)
3185
    if new_ip != old_ip:
3186
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
3187
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
3188
                                   " reachable on the network" %
3189
                                   new_ip, errors.ECODE_NOTUNIQUE)
3190

    
3191
    self.op.name = new_name
3192

    
3193
  def Exec(self, feedback_fn):
3194
    """Rename the cluster.
3195

3196
    """
3197
    clustername = self.op.name
3198
    ip = self.ip
3199

    
3200
    # shutdown the master IP
3201
    master = self.cfg.GetMasterNode()
3202
    result = self.rpc.call_node_stop_master(master, False)
3203
    result.Raise("Could not disable the master role")
3204

    
3205
    try:
3206
      cluster = self.cfg.GetClusterInfo()
3207
      cluster.cluster_name = clustername
3208
      cluster.master_ip = ip
3209
      self.cfg.Update(cluster, feedback_fn)
3210

    
3211
      # update the known hosts file
3212
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
3213
      node_list = self.cfg.GetOnlineNodeList()
3214
      try:
3215
        node_list.remove(master)
3216
      except ValueError:
3217
        pass
3218
      _UploadHelper(self, node_list, constants.SSH_KNOWN_HOSTS_FILE)
3219
    finally:
3220
      result = self.rpc.call_node_start_master(master, False, False)
3221
      msg = result.fail_msg
3222
      if msg:
3223
        self.LogWarning("Could not re-enable the master role on"
3224
                        " the master, please restart manually: %s", msg)
3225

    
3226
    return clustername
3227

    
3228

    
3229
class LUClusterSetParams(LogicalUnit):
3230
  """Change the parameters of the cluster.
3231

3232
  """
3233
  HPATH = "cluster-modify"
3234
  HTYPE = constants.HTYPE_CLUSTER
3235
  REQ_BGL = False
3236

    
3237
  def CheckArguments(self):
3238
    """Check parameters
3239

3240
    """
3241
    if self.op.uid_pool:
3242
      uidpool.CheckUidPool(self.op.uid_pool)
3243

    
3244
    if self.op.add_uids:
3245
      uidpool.CheckUidPool(self.op.add_uids)
3246

    
3247
    if self.op.remove_uids:
3248
      uidpool.CheckUidPool(self.op.remove_uids)
3249

    
3250
  def ExpandNames(self):
3251
    # FIXME: in the future maybe other cluster params won't require checking on
3252
    # all nodes to be modified.
3253
    self.needed_locks = {
3254
      locking.LEVEL_NODE: locking.ALL_SET,
3255
    }
3256
    self.share_locks[locking.LEVEL_NODE] = 1
3257

    
3258
  def BuildHooksEnv(self):
3259
    """Build hooks env.
3260

3261
    """
3262
    return {
3263
      "OP_TARGET": self.cfg.GetClusterName(),
3264
      "NEW_VG_NAME": self.op.vg_name,
3265
      }
3266

    
3267
  def BuildHooksNodes(self):
3268
    """Build hooks nodes.
3269

3270
    """
3271
    mn = self.cfg.GetMasterNode()
3272
    return ([mn], [mn])
3273

    
3274
  def CheckPrereq(self):
3275
    """Check prerequisites.
3276

3277
    This checks whether the given params don't conflict and
3278
    if the given volume group is valid.
3279

3280
    """
3281
    if self.op.vg_name is not None and not self.op.vg_name:
3282
      if self.cfg.HasAnyDiskOfType(constants.LD_LV):
3283
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
3284
                                   " instances exist", errors.ECODE_INVAL)
3285

    
3286
    if self.op.drbd_helper is not None and not self.op.drbd_helper:
3287
      if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
3288
        raise errors.OpPrereqError("Cannot disable drbd helper while"
3289
                                   " drbd-based instances exist",
3290
                                   errors.ECODE_INVAL)
3291

    
3292
    node_list = self.glm.list_owned(locking.LEVEL_NODE)
3293

    
3294
    # if vg_name not None, checks given volume group on all nodes
3295
    if self.op.vg_name:
3296
      vglist = self.rpc.call_vg_list(node_list)
3297
      for node in node_list:
3298
        msg = vglist[node].fail_msg
3299
        if msg:
3300
          # ignoring down node
3301
          self.LogWarning("Error while gathering data on node %s"
3302
                          " (ignoring node): %s", node, msg)
3303
          continue
3304
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
3305
                                              self.op.vg_name,
3306
                                              constants.MIN_VG_SIZE)
3307
        if vgstatus:
3308
          raise errors.OpPrereqError("Error on node '%s': %s" %
3309
                                     (node, vgstatus), errors.ECODE_ENVIRON)
3310

    
3311
    if self.op.drbd_helper:
3312
      # checks given drbd helper on all nodes
3313
      helpers = self.rpc.call_drbd_helper(node_list)
3314
      for node in node_list:
3315
        ninfo = self.cfg.GetNodeInfo(node)
3316
        if ninfo.offline:
3317
          self.LogInfo("Not checking drbd helper on offline node %s", node)
3318
          continue
3319
        msg = helpers[node].fail_msg
3320
        if msg:
3321
          raise errors.OpPrereqError("Error checking drbd helper on node"
3322
                                     " '%s': %s" % (node, msg),
3323
                                     errors.ECODE_ENVIRON)
3324
        node_helper = helpers[node].payload
3325
        if node_helper != self.op.drbd_helper:
3326
          raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
3327
                                     (node, node_helper), errors.ECODE_ENVIRON)
3328

    
3329
    self.cluster = cluster = self.cfg.GetClusterInfo()
3330
    # validate params changes
3331
    if self.op.beparams:
3332
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
3333
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
3334

    
3335
    if self.op.ndparams:
3336
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
3337
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
3338

    
3339
      # TODO: we need a more general way to handle resetting
3340
      # cluster-level parameters to default values
3341
      if self.new_ndparams["oob_program"] == "":
3342
        self.new_ndparams["oob_program"] = \
3343
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
3344

    
3345
    if self.op.nicparams:
3346
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
3347
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
3348
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
3349
      nic_errors = []
3350

    
3351
      # check all instances for consistency
3352
      for instance in self.cfg.GetAllInstancesInfo().values():
3353
        for nic_idx, nic in enumerate(instance.nics):
3354
          params_copy = copy.deepcopy(nic.nicparams)
3355
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
3356

    
3357
          # check parameter syntax
3358
          try:
3359
            objects.NIC.CheckParameterSyntax(params_filled)
3360
          except errors.ConfigurationError, err:
3361
            nic_errors.append("Instance %s, nic/%d: %s" %
3362
                              (instance.name, nic_idx, err))
3363

    
3364
          # if we're moving instances to routed, check that they have an ip
3365
          target_mode = params_filled[constants.NIC_MODE]
3366
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
3367
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
3368
                              " address" % (instance.name, nic_idx))
3369
      if nic_errors:
3370
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
3371
                                   "\n".join(nic_errors))
3372

    
3373
    # hypervisor list/parameters
3374
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
3375
    if self.op.hvparams:
3376
      for hv_name, hv_dict in self.op.hvparams.items():
3377
        if hv_name not in self.new_hvparams:
3378
          self.new_hvparams[hv_name] = hv_dict
3379
        else:
3380
          self.new_hvparams[hv_name].update(hv_dict)
3381

    
3382
    # os hypervisor parameters
3383
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
3384
    if self.op.os_hvp:
3385
      for os_name, hvs in self.op.os_hvp.items():
3386
        if os_name not in self.new_os_hvp:
3387
          self.new_os_hvp[os_name] = hvs
3388
        else:
3389
          for hv_name, hv_dict in hvs.items():
3390
            if hv_name not in self.new_os_hvp[os_name]:
3391
              self.new_os_hvp[os_name][hv_name] = hv_dict
3392
            else:
3393
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
3394

    
3395
    # os parameters
3396
    self.new_osp = objects.FillDict(cluster.osparams, {})
3397
    if self.op.osparams:
3398
      for os_name, osp in self.op.osparams.items():
3399
        if os_name not in self.new_osp:
3400
          self.new_osp[os_name] = {}
3401

    
3402
        self.new_osp[os_name] = _GetUpdatedParams(self.new_osp[os_name], osp,
3403
                                                  use_none=True)
3404

    
3405
        if not self.new_osp[os_name]:
3406
          # we removed all parameters
3407
          del self.new_osp[os_name]
3408
        else:
3409
          # check the parameter validity (remote check)
3410
          _CheckOSParams(self, False, [self.cfg.GetMasterNode()],
3411
                         os_name, self.new_osp[os_name])
3412

    
3413
    # changes to the hypervisor list
3414
    if self.op.enabled_hypervisors is not None:
3415
      self.hv_list = self.op.enabled_hypervisors
3416
      for hv in self.hv_list:
3417
        # if the hypervisor doesn't already exist in the cluster
3418
        # hvparams, we initialize it to empty, and then (in both
3419
        # cases) we make sure to fill the defaults, as we might not
3420
        # have a complete defaults list if the hypervisor wasn't
3421
        # enabled before
3422
        if hv not in new_hvp:
3423
          new_hvp[hv] = {}
3424
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
3425
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
3426
    else:
3427
      self.hv_list = cluster.enabled_hypervisors
3428

    
3429
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
3430
      # either the enabled list has changed, or the parameters have, validate
3431
      for hv_name, hv_params in self.new_hvparams.items():
3432
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
3433
            (self.op.enabled_hypervisors and
3434
             hv_name in self.op.enabled_hypervisors)):
3435
          # either this is a new hypervisor, or its parameters have changed
3436
          hv_class = hypervisor.GetHypervisor(hv_name)
3437
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
3438
          hv_class.CheckParameterSyntax(hv_params)
3439
          _CheckHVParams(self, node_list, hv_name, hv_params)
3440

    
3441
    if self.op.os_hvp:
3442
      # no need to check any newly-enabled hypervisors, since the
3443
      # defaults have already been checked in the above code-block
3444
      for os_name, os_hvp in self.new_os_hvp.items():
3445
        for hv_name, hv_params in os_hvp.items():
3446
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
3447
          # we need to fill in the new os_hvp on top of the actual hv_p
3448
          cluster_defaults = self.new_hvparams.get(hv_name, {})
3449
          new_osp = objects.FillDict(cluster_defaults, hv_params)
3450
          hv_class = hypervisor.GetHypervisor(hv_name)
3451
          hv_class.CheckParameterSyntax(new_osp)
3452
          _CheckHVParams(self, node_list, hv_name, new_osp)
3453

    
3454
    if self.op.default_iallocator:
3455
      alloc_script = utils.FindFile(self.op.default_iallocator,
3456
                                    constants.IALLOCATOR_SEARCH_PATH,
3457
                                    os.path.isfile)
3458
      if alloc_script is None:
3459
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
3460
                                   " specified" % self.op.default_iallocator,
3461
                                   errors.ECODE_INVAL)
3462

    
3463
  def Exec(self, feedback_fn):
3464
    """Change the parameters of the cluster.
3465

3466
    """
3467
    if self.op.vg_name is not None:
3468
      new_volume = self.op.vg_name
3469
      if not new_volume:
3470
        new_volume = None
3471
      if new_volume != self.cfg.GetVGName():
3472
        self.cfg.SetVGName(new_volume)
3473
      else:
3474
        feedback_fn("Cluster LVM configuration already in desired"
3475
                    " state, not changing")
3476
    if self.op.drbd_helper is not None:
3477
      new_helper = self.op.drbd_helper
3478
      if not new_helper:
3479
        new_helper = None
3480
      if new_helper != self.cfg.GetDRBDHelper():
3481
        self.cfg.SetDRBDHelper(new_helper)
3482
      else:
3483
        feedback_fn("Cluster DRBD helper already in desired state,"
3484
                    " not changing")
3485
    if self.op.hvparams:
3486
      self.cluster.hvparams = self.new_hvparams
3487
    if self.op.os_hvp:
3488
      self.cluster.os_hvp = self.new_os_hvp
3489
    if self.op.enabled_hypervisors is not None:
3490
      self.cluster.hvparams = self.new_hvparams
3491
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
3492
    if self.op.beparams:
3493
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
3494
    if self.op.nicparams:
3495
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
3496
    if self.op.osparams:
3497
      self.cluster.osparams = self.new_osp
3498
    if self.op.ndparams:
3499
      self.cluster.ndparams = self.new_ndparams
3500

    
3501
    if self.op.candidate_pool_size is not None:
3502
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
3503
      # we need to update the pool size here, otherwise the save will fail
3504
      _AdjustCandidatePool(self, [])
3505

    
3506
    if self.op.maintain_node_health is not None:
3507
      self.cluster.maintain_node_health = self.op.maintain_node_health
3508

    
3509
    if self.op.prealloc_wipe_disks is not None:
3510
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
3511

    
3512
    if self.op.add_uids is not None:
3513
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
3514

    
3515
    if self.op.remove_uids is not None:
3516
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
3517

    
3518
    if self.op.uid_pool is not None:
3519
      self.cluster.uid_pool = self.op.uid_pool
3520

    
3521
    if self.op.default_iallocator is not None:
3522
      self.cluster.default_iallocator = self.op.default_iallocator
3523

    
3524
    if self.op.reserved_lvs is not None:
3525
      self.cluster.reserved_lvs = self.op.reserved_lvs
3526

    
3527
    def helper_os(aname, mods, desc):
3528
      desc += " OS list"
3529
      lst = getattr(self.cluster, aname)
3530
      for key, val in mods:
3531
        if key == constants.DDM_ADD:
3532
          if val in lst:
3533
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
3534
          else:
3535
            lst.append(val)
3536
        elif key == constants.DDM_REMOVE:
3537
          if val in lst:
3538
            lst.remove(val)
3539
          else:
3540
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
3541
        else:
3542
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
3543

    
3544
    if self.op.hidden_os:
3545
      helper_os("hidden_os", self.op.hidden_os, "hidden")
3546

    
3547
    if self.op.blacklisted_os:
3548
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
3549

    
3550
    if self.op.master_netdev:
3551
      master = self.cfg.GetMasterNode()
3552
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
3553
                  self.cluster.master_netdev)
3554
      result = self.rpc.call_node_stop_master(master, False)
3555
      result.Raise("Could not disable the master ip")
3556
      feedback_fn("Changing master_netdev from %s to %s" %
3557
                  (self.cluster.master_netdev, self.op.master_netdev))
3558
      self.cluster.master_netdev = self.op.master_netdev
3559

    
3560
    self.cfg.Update(self.cluster, feedback_fn)
3561

    
3562
    if self.op.master_netdev:
3563
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
3564
                  self.op.master_netdev)
3565
      result = self.rpc.call_node_start_master(master, False, False)
3566
      if result.fail_msg:
3567
        self.LogWarning("Could not re-enable the master ip on"
3568
                        " the master, please restart manually: %s",
3569
                        result.fail_msg)
3570

    
3571

    
3572
def _UploadHelper(lu, nodes, fname):
3573
  """Helper for uploading a file and showing warnings.
3574

3575
  """
3576
  if os.path.exists(fname):
3577
    result = lu.rpc.call_upload_file(nodes, fname)
3578
    for to_node, to_result in result.items():
3579
      msg = to_result.fail_msg
3580
      if msg:
3581
        msg = ("Copy of file %s to node %s failed: %s" %
3582
               (fname, to_node, msg))
3583
        lu.proc.LogWarning(msg)
3584

    
3585

    
3586
def _ComputeAncillaryFiles(cluster, redist):
3587
  """Compute files external to Ganeti which need to be consistent.
3588

3589
  @type redist: boolean
3590
  @param redist: Whether to include files which need to be redistributed
3591

3592
  """
3593
  # Compute files for all nodes
3594
  files_all = set([
3595
    constants.SSH_KNOWN_HOSTS_FILE,
3596
    constants.CONFD_HMAC_KEY,
3597
    constants.CLUSTER_DOMAIN_SECRET_FILE,
3598
    ])
3599

    
3600
  if not redist:
3601
    files_all.update(constants.ALL_CERT_FILES)
3602
    files_all.update(ssconf.SimpleStore().GetFileList())
3603

    
3604
  if cluster.modify_etc_hosts:
3605
    files_all.add(constants.ETC_HOSTS)
3606

    
3607
  # Files which must either exist on all nodes or on none
3608
  files_all_opt = set([
3609
    constants.RAPI_USERS_FILE,
3610
    ])
3611

    
3612
  # Files which should only be on master candidates
3613
  files_mc = set()
3614
  if not redist:
3615
    files_mc.add(constants.CLUSTER_CONF_FILE)
3616

    
3617
  # Files which should only be on VM-capable nodes
3618
  files_vm = set(filename
3619
    for hv_name in cluster.enabled_hypervisors
3620
    for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles())
3621

    
3622
  # Filenames must be unique
3623
  assert (len(files_all | files_all_opt | files_mc | files_vm) ==
3624
          sum(map(len, [files_all, files_all_opt, files_mc, files_vm]))), \
3625
         "Found file listed in more than one file list"
3626

    
3627
  return (files_all, files_all_opt, files_mc, files_vm)
3628

    
3629

    
3630
def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
3631
  """Distribute additional files which are part of the cluster configuration.
3632

3633
  ConfigWriter takes care of distributing the config and ssconf files, but
3634
  there are more files which should be distributed to all nodes. This function
3635
  makes sure those are copied.
3636

3637
  @param lu: calling logical unit
3638
  @param additional_nodes: list of nodes not in the config to distribute to
3639
  @type additional_vm: boolean
3640
  @param additional_vm: whether the additional nodes are vm-capable or not
3641

3642
  """
3643
  # Gather target nodes
3644
  cluster = lu.cfg.GetClusterInfo()
3645
  master_info = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
3646

    
3647
  online_nodes = lu.cfg.GetOnlineNodeList()
3648
  vm_nodes = lu.cfg.GetVmCapableNodeList()
3649

    
3650
  if additional_nodes is not None:
3651
    online_nodes.extend(additional_nodes)
3652
    if additional_vm:
3653
      vm_nodes.extend(additional_nodes)
3654

    
3655
  # Never distribute to master node
3656
  for nodelist in [online_nodes, vm_nodes]:
3657
    if master_info.name in nodelist:
3658
      nodelist.remove(master_info.name)
3659

    
3660
  # Gather file lists
3661
  (files_all, files_all_opt, files_mc, files_vm) = \
3662
    _ComputeAncillaryFiles(cluster, True)
3663

    
3664
  # Never re-distribute configuration file from here
3665
  assert not (constants.CLUSTER_CONF_FILE in files_all or
3666
              constants.CLUSTER_CONF_FILE in files_vm)
3667
  assert not files_mc, "Master candidates not handled in this function"
3668

    
3669
  filemap = [
3670
    (online_nodes, files_all),
3671
    (online_nodes, files_all_opt),
3672
    (vm_nodes, files_vm),
3673
    ]
3674

    
3675
  # Upload the files
3676
  for (node_list, files) in filemap:
3677
    for fname in files:
3678
      _UploadHelper(lu, node_list, fname)
3679

    
3680

    
3681
class LUClusterRedistConf(NoHooksLU):
3682
  """Force the redistribution of cluster configuration.
3683

3684
  This is a very simple LU.
3685

3686
  """
3687
  REQ_BGL = False
3688

    
3689
  def ExpandNames(self):
3690
    self.needed_locks = {
3691
      locking.LEVEL_NODE: locking.ALL_SET,
3692
    }
3693
    self.share_locks[locking.LEVEL_NODE] = 1
3694

    
3695
  def Exec(self, feedback_fn):
3696
    """Redistribute the configuration.
3697

3698
    """
3699
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
3700
    _RedistributeAncillaryFiles(self)
3701

    
3702

    
3703
def _WaitForSync(lu, instance, disks=None, oneshot=False):
3704
  """Sleep and poll for an instance's disk to sync.
3705

3706
  """
3707
  if not instance.disks or disks is not None and not disks:
3708
    return True
3709

    
3710
  disks = _ExpandCheckDisks(instance, disks)
3711

    
3712
  if not oneshot:
3713
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
3714

    
3715
  node = instance.primary_node
3716

    
3717
  for dev in disks:
3718
    lu.cfg.SetDiskID(dev, node)
3719

    
3720
  # TODO: Convert to utils.Retry
3721

    
3722
  retries = 0
3723
  degr_retries = 10 # in seconds, as we sleep 1 second each time
3724
  while True:
3725
    max_time = 0
3726
    done = True
3727
    cumul_degraded = False
3728
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, disks)
3729
    msg = rstats.fail_msg
3730
    if msg:
3731
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
3732
      retries += 1
3733
      if retries >= 10:
3734
        raise errors.RemoteError("Can't contact node %s for mirror data,"
3735
                                 " aborting." % node)
3736
      time.sleep(6)
3737
      continue
3738
    rstats = rstats.payload
3739
    retries = 0
3740
    for i, mstat in enumerate(rstats):
3741
      if mstat is None:
3742
        lu.LogWarning("Can't compute data for node %s/%s",
3743
                           node, disks[i].iv_name)
3744
        continue
3745

    
3746
      cumul_degraded = (cumul_degraded or
3747
                        (mstat.is_degraded and mstat.sync_percent is None))
3748
      if mstat.sync_percent is not None:
3749
        done = False
3750
        if mstat.estimated_time is not None:
3751
          rem_time = ("%s remaining (estimated)" %
3752
                      utils.FormatSeconds(mstat.estimated_time))
3753
          max_time = mstat.estimated_time
3754
        else:
3755
          rem_time = "no time estimate"
3756
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
3757
                        (disks[i].iv_name, mstat.sync_percent, rem_time))
3758

    
3759
    # if we're done but degraded, let's do a few small retries, to
3760
    # make sure we see a stable and not transient situation; therefore
3761
    # we force restart of the loop
3762
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
3763
      logging.info("Degraded disks found, %d retries left", degr_retries)
3764
      degr_retries -= 1
3765
      time.sleep(1)
3766
      continue
3767

    
3768
    if done or oneshot:
3769
      break
3770

    
3771
    time.sleep(min(60, max_time))
3772

    
3773
  if done:
3774
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
3775
  return not cumul_degraded
3776

    
3777

    
3778
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
3779
  """Check that mirrors are not degraded.
3780

3781
  The ldisk parameter, if True, will change the test from the
3782
  is_degraded attribute (which represents overall non-ok status for
3783
  the device(s)) to the ldisk (representing the local storage status).
3784

3785
  """
3786
  lu.cfg.SetDiskID(dev, node)
3787

    
3788
  result = True
3789

    
3790
  if on_primary or dev.AssembleOnSecondary():
3791
    rstats = lu.rpc.call_blockdev_find(node, dev)
3792
    msg = rstats.fail_msg
3793
    if msg:
3794
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
3795
      result = False
3796
    elif not rstats.payload:
3797
      lu.LogWarning("Can't find disk on node %s", node)
3798
      result = False
3799
    else:
3800
      if ldisk:
3801
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
3802
      else:
3803
        result = result and not rstats.payload.is_degraded
3804

    
3805
  if dev.children:
3806
    for child in dev.children:
3807
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
3808

    
3809
  return result
3810

    
3811

    
3812
class LUOobCommand(NoHooksLU):
3813
  """Logical unit for OOB handling.
3814

3815
  """
3816
  REG_BGL = False
3817
  _SKIP_MASTER = (constants.OOB_POWER_OFF, constants.OOB_POWER_CYCLE)
3818

    
3819
  def ExpandNames(self):
3820
    """Gather locks we need.
3821

3822
    """
3823
    if self.op.node_names:
3824
      self.op.node_names = _GetWantedNodes(self, self.op.node_names)
3825
      lock_names = self.op.node_names
3826
    else:
3827
      lock_names = locking.ALL_SET
3828

    
3829
    self.needed_locks = {
3830
      locking.LEVEL_NODE: lock_names,
3831
      }
3832

    
3833
  def CheckPrereq(self):
3834
    """Check prerequisites.
3835

3836
    This checks:
3837
     - the node exists in the configuration
3838
     - OOB is supported
3839

3840
    Any errors are signaled by raising errors.OpPrereqError.
3841

3842
    """
3843
    self.nodes = []
3844
    self.master_node = self.cfg.GetMasterNode()
3845

    
3846
    assert self.op.power_delay >= 0.0
3847

    
3848
    if self.op.node_names:
3849
      if (self.op.command in self._SKIP_MASTER and
3850
          self.master_node in self.op.node_names):
3851
        master_node_obj = self.cfg.GetNodeInfo(self.master_node)
3852
        master_oob_handler = _SupportsOob(self.cfg, master_node_obj)
3853

    
3854
        if master_oob_handler:
3855
          additional_text = ("run '%s %s %s' if you want to operate on the"
3856
                             " master regardless") % (master_oob_handler,
3857
                                                      self.op.command,
3858
                                                      self.master_node)
3859
        else:
3860
          additional_text = "it does not support out-of-band operations"
3861

    
3862
        raise errors.OpPrereqError(("Operating on the master node %s is not"
3863
                                    " allowed for %s; %s") %
3864
                                   (self.master_node, self.op.command,
3865
                                    additional_text), errors.ECODE_INVAL)
3866
    else:
3867
      self.op.node_names = self.cfg.GetNodeList()
3868
      if self.op.command in self._SKIP_MASTER:
3869
        self.op.node_names.remove(self.master_node)
3870

    
3871
    if self.op.command in self._SKIP_MASTER:
3872
      assert self.master_node not in self.op.node_names
3873

    
3874
    for node_name in self.op.node_names:
3875
      node = self.cfg.GetNodeInfo(node_name)
3876

    
3877
      if node is None:
3878
        raise errors.OpPrereqError("Node %s not found" % node_name,
3879
                                   errors.ECODE_NOENT)
3880
      else:
3881
        self.nodes.append(node)
3882

    
3883
      if (not self.op.ignore_status and
3884
          (self.op.command == constants.OOB_POWER_OFF and not node.offline)):
3885
        raise errors.OpPrereqError(("Cannot power off node %s because it is"
3886
                                    " not marked offline") % node_name,
3887
                                   errors.ECODE_STATE)
3888

    
3889
  def Exec(self, feedback_fn):
3890
    """Execute OOB and return result if we expect any.
3891

3892
    """
3893
    master_node = self.master_node
3894
    ret = []
3895

    
3896
    for idx, node in enumerate(utils.NiceSort(self.nodes,
3897
                                              key=lambda node: node.name)):
3898
      node_entry = [(constants.RS_NORMAL, node.name)]
3899
      ret.append(node_entry)
3900

    
3901
      oob_program = _SupportsOob(self.cfg, node)
3902

    
3903
      if not oob_program:
3904
        node_entry.append((constants.RS_UNAVAIL, None))
3905
        continue
3906

    
3907
      logging.info("Executing out-of-band command '%s' using '%s' on %s",
3908
                   self.op.command, oob_program, node.name)
3909
      result = self.rpc.call_run_oob(master_node, oob_program,
3910
                                     self.op.command, node.name,
3911
                                     self.op.timeout)
3912

    
3913
      if result.fail_msg:
3914
        self.LogWarning("Out-of-band RPC failed on node '%s': %s",
3915
                        node.name, result.fail_msg)
3916
        node_entry.append((constants.RS_NODATA, None))
3917
      else:
3918
        try:
3919
          self._CheckPayload(result)
3920
        except errors.OpExecError, err:
3921
          self.LogWarning("Payload returned by node '%s' is not valid: %s",
3922
                          node.name, err)
3923
          node_entry.append((constants.RS_NODATA, None))
3924
        else:
3925
          if self.op.command == constants.OOB_HEALTH:
3926
            # For health we should log important events
3927
            for item, status in result.payload:
3928
              if status in [constants.OOB_STATUS_WARNING,
3929
                            constants.OOB_STATUS_CRITICAL]:
3930
                self.LogWarning("Item '%s' on node '%s' has status '%s'",
3931
                                item, node.name, status)
3932

    
3933
          if self.op.command == constants.OOB_POWER_ON:
3934
            node.powered = True
3935
          elif self.op.command == constants.OOB_POWER_OFF:
3936
            node.powered = False
3937
          elif self.op.command == constants.OOB_POWER_STATUS:
3938
            powered = result.payload[constants.OOB_POWER_STATUS_POWERED]
3939
            if powered != node.powered:
3940
              logging.warning(("Recorded power state (%s) of node '%s' does not"
3941
                               " match actual power state (%s)"), node.powered,
3942
                              node.name, powered)
3943

    
3944
          # For configuration changing commands we should update the node
3945
          if self.op.command in (constants.OOB_POWER_ON,
3946
                                 constants.OOB_POWER_OFF):
3947
            self.cfg.Update(node, feedback_fn)
3948

    
3949
          node_entry.append((constants.RS_NORMAL, result.payload))
3950

    
3951
          if (self.op.command == constants.OOB_POWER_ON and
3952
              idx < len(self.nodes) - 1):
3953
            time.sleep(self.op.power_delay)
3954

    
3955
    return ret
3956

    
3957
  def _CheckPayload(self, result):
3958
    """Checks if the payload is valid.
3959

3960
    @param result: RPC result
3961
    @raises errors.OpExecError: If payload is not valid
3962

3963
    """
3964
    errs = []
3965
    if self.op.command == constants.OOB_HEALTH:
3966
      if not isinstance(result.payload, list):
3967
        errs.append("command 'health' is expected to return a list but got %s" %
3968
                    type(result.payload))
3969
      else:
3970
        for item, status in result.payload:
3971
          if status not in constants.OOB_STATUSES:
3972
            errs.append("health item '%s' has invalid status '%s'" %
3973
                        (item, status))
3974

    
3975
    if self.op.command == constants.OOB_POWER_STATUS:
3976
      if not isinstance(result.payload, dict):
3977
        errs.append("power-status is expected to return a dict but got %s" %
3978
                    type(result.payload))
3979

    
3980
    if self.op.command in [
3981
        constants.OOB_POWER_ON,
3982
        constants.OOB_POWER_OFF,
3983
        constants.OOB_POWER_CYCLE,
3984
        ]:
3985
      if result.payload is not None:
3986
        errs.append("%s is expected to not return payload but got '%s'" %
3987
                    (self.op.command, result.payload))
3988

    
3989
    if errs:
3990
      raise errors.OpExecError("Check of out-of-band payload failed due to %s" %
3991
                               utils.CommaJoin(errs))
3992

    
3993
class _OsQuery(_QueryBase):
3994
  FIELDS = query.OS_FIELDS
3995

    
3996
  def ExpandNames(self, lu):
3997
    # Lock all nodes in shared mode
3998
    # Temporary removal of locks, should be reverted later
3999
    # TODO: reintroduce locks when they are lighter-weight
4000
    lu.needed_locks = {}
4001
    #self.share_locks[locking.LEVEL_NODE] = 1
4002
    #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4003

    
4004
    # The following variables interact with _QueryBase._GetNames
4005
    if self.names:
4006
      self.wanted = self.names
4007
    else:
4008
      self.wanted = locking.ALL_SET
4009

    
4010
    self.do_locking = self.use_locking
4011

    
4012
  def DeclareLocks(self, lu, level):
4013
    pass
4014

    
4015
  @staticmethod
4016
  def _DiagnoseByOS(rlist):
4017
    """Remaps a per-node return list into an a per-os per-node dictionary
4018

4019
    @param rlist: a map with node names as keys and OS objects as values
4020

4021
    @rtype: dict
4022
    @return: a dictionary with osnames as keys and as value another
4023
        map, with nodes as keys and tuples of (path, status, diagnose,
4024
        variants, parameters, api_versions) as values, eg::
4025

4026
          {"debian-etch": {"node1": [(/usr/lib/..., True, "", [], []),
4027
                                     (/srv/..., False, "invalid api")],
4028
                           "node2": [(/srv/..., True, "", [], [])]}
4029
          }
4030

4031
    """
4032
    all_os = {}
4033
    # we build here the list of nodes that didn't fail the RPC (at RPC
4034
    # level), so that nodes with a non-responding node daemon don't
4035
    # make all OSes invalid
4036
    good_nodes = [node_name for node_name in rlist
4037
                  if not rlist[node_name].fail_msg]
4038
    for node_name, nr in rlist.items():
4039
      if nr.fail_msg or not nr.payload:
4040
        continue
4041
      for (name, path, status, diagnose, variants,
4042
           params, api_versions) in nr.payload:
4043
        if name not in all_os:
4044
          # build a list of nodes for this os containing empty lists
4045
          # for each node in node_list
4046
          all_os[name] = {}
4047
          for nname in good_nodes:
4048
            all_os[name][nname] = []
4049
        # convert params from [name, help] to (name, help)
4050
        params = [tuple(v) for v in params]
4051
        all_os[name][node_name].append((path, status, diagnose,
4052
                                        variants, params, api_versions))
4053
    return all_os
4054

    
4055
  def _GetQueryData(self, lu):
4056
    """Computes the list of nodes and their attributes.
4057

4058
    """
4059
    # Locking is not used
4060
    assert not (compat.any(lu.glm.is_owned(level)
4061
                           for level in locking.LEVELS
4062
                           if level != locking.LEVEL_CLUSTER) or
4063
                self.do_locking or self.use_locking)
4064

    
4065
    valid_nodes = [node.name
4066
                   for node in lu.cfg.GetAllNodesInfo().values()
4067
                   if not node.offline and node.vm_capable]
4068
    pol = self._DiagnoseByOS(lu.rpc.call_os_diagnose(valid_nodes))
4069
    cluster = lu.cfg.GetClusterInfo()
4070

    
4071
    data = {}
4072

    
4073
    for (os_name, os_data) in pol.items():
4074
      info = query.OsInfo(name=os_name, valid=True, node_status=os_data,
4075
                          hidden=(os_name in cluster.hidden_os),
4076
                          blacklisted=(os_name in cluster.blacklisted_os))
4077

    
4078
      variants = set()
4079
      parameters = set()
4080
      api_versions = set()
4081

    
4082
      for idx, osl in enumerate(os_data.values()):
4083
        info.valid = bool(info.valid and osl and osl[0][1])
4084
        if not info.valid:
4085
          break
4086

    
4087
        (node_variants, node_params, node_api) = osl[0][3:6]
4088
        if idx == 0:
4089
          # First entry
4090
          variants.update(node_variants)
4091
          parameters.update(node_params)
4092