Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 35007011

History | View | Annotate | Download (464.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0201,C0302
25

    
26
# W0201 since most LU attributes are defined in CheckPrereq or similar
27
# functions
28

    
29
# C0302: since we have waaaay to many lines in this module
30

    
31
import os
32
import os.path
33
import time
34
import re
35
import platform
36
import logging
37
import copy
38
import OpenSSL
39
import socket
40
import tempfile
41
import shutil
42
import itertools
43
import operator
44

    
45
from ganeti import ssh
46
from ganeti import utils
47
from ganeti import errors
48
from ganeti import hypervisor
49
from ganeti import locking
50
from ganeti import constants
51
from ganeti import objects
52
from ganeti import serializer
53
from ganeti import ssconf
54
from ganeti import uidpool
55
from ganeti import compat
56
from ganeti import masterd
57
from ganeti import netutils
58
from ganeti import query
59
from ganeti import qlang
60
from ganeti import opcodes
61
from ganeti import ht
62

    
63
import ganeti.masterd.instance # pylint: disable-msg=W0611
64

    
65

    
66
class ResultWithJobs:
67
  """Data container for LU results with jobs.
68

69
  Instances of this class returned from L{LogicalUnit.Exec} will be recognized
70
  by L{mcpu.Processor._ProcessResult}. The latter will then submit the jobs
71
  contained in the C{jobs} attribute and include the job IDs in the opcode
72
  result.
73

74
  """
75
  def __init__(self, jobs, **kwargs):
76
    """Initializes this class.
77

78
    Additional return values can be specified as keyword arguments.
79

80
    @type jobs: list of lists of L{opcode.OpCode}
81
    @param jobs: A list of lists of opcode objects
82

83
    """
84
    self.jobs = jobs
85
    self.other = kwargs
86

    
87

    
88
class LogicalUnit(object):
89
  """Logical Unit base class.
90

91
  Subclasses must follow these rules:
92
    - implement ExpandNames
93
    - implement CheckPrereq (except when tasklets are used)
94
    - implement Exec (except when tasklets are used)
95
    - implement BuildHooksEnv
96
    - implement BuildHooksNodes
97
    - redefine HPATH and HTYPE
98
    - optionally redefine their run requirements:
99
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
100

101
  Note that all commands require root permissions.
102

103
  @ivar dry_run_result: the value (if any) that will be returned to the caller
104
      in dry-run mode (signalled by opcode dry_run parameter)
105

106
  """
107
  HPATH = None
108
  HTYPE = None
109
  REQ_BGL = True
110

    
111
  def __init__(self, processor, op, context, rpc):
112
    """Constructor for LogicalUnit.
113

114
    This needs to be overridden in derived classes in order to check op
115
    validity.
116

117
    """
118
    self.proc = processor
119
    self.op = op
120
    self.cfg = context.cfg
121
    self.glm = context.glm
122
    self.context = context
123
    self.rpc = rpc
124
    # Dicts used to declare locking needs to mcpu
125
    self.needed_locks = None
126
    self.share_locks = dict.fromkeys(locking.LEVELS, 0)
127
    self.add_locks = {}
128
    self.remove_locks = {}
129
    # Used to force good behavior when calling helper functions
130
    self.recalculate_locks = {}
131
    # logging
132
    self.Log = processor.Log # pylint: disable-msg=C0103
133
    self.LogWarning = processor.LogWarning # pylint: disable-msg=C0103
134
    self.LogInfo = processor.LogInfo # pylint: disable-msg=C0103
135
    self.LogStep = processor.LogStep # pylint: disable-msg=C0103
136
    # support for dry-run
137
    self.dry_run_result = None
138
    # support for generic debug attribute
139
    if (not hasattr(self.op, "debug_level") or
140
        not isinstance(self.op.debug_level, int)):
141
      self.op.debug_level = 0
142

    
143
    # Tasklets
144
    self.tasklets = None
145

    
146
    # Validate opcode parameters and set defaults
147
    self.op.Validate(True)
148

    
149
    self.CheckArguments()
150

    
151
  def CheckArguments(self):
152
    """Check syntactic validity for the opcode arguments.
153

154
    This method is for doing a simple syntactic check and ensure
155
    validity of opcode parameters, without any cluster-related
156
    checks. While the same can be accomplished in ExpandNames and/or
157
    CheckPrereq, doing these separate is better because:
158

159
      - ExpandNames is left as as purely a lock-related function
160
      - CheckPrereq is run after we have acquired locks (and possible
161
        waited for them)
162

163
    The function is allowed to change the self.op attribute so that
164
    later methods can no longer worry about missing parameters.
165

166
    """
167
    pass
168

    
169
  def ExpandNames(self):
170
    """Expand names for this LU.
171

172
    This method is called before starting to execute the opcode, and it should
173
    update all the parameters of the opcode to their canonical form (e.g. a
174
    short node name must be fully expanded after this method has successfully
175
    completed). This way locking, hooks, logging, etc. can work correctly.
176

177
    LUs which implement this method must also populate the self.needed_locks
178
    member, as a dict with lock levels as keys, and a list of needed lock names
179
    as values. Rules:
180

181
      - use an empty dict if you don't need any lock
182
      - if you don't need any lock at a particular level omit that level
183
      - don't put anything for the BGL level
184
      - if you want all locks at a level use locking.ALL_SET as a value
185

186
    If you need to share locks (rather than acquire them exclusively) at one
187
    level you can modify self.share_locks, setting a true value (usually 1) for
188
    that level. By default locks are not shared.
189

190
    This function can also define a list of tasklets, which then will be
191
    executed in order instead of the usual LU-level CheckPrereq and Exec
192
    functions, if those are not defined by the LU.
193

194
    Examples::
195

196
      # Acquire all nodes and one instance
197
      self.needed_locks = {
198
        locking.LEVEL_NODE: locking.ALL_SET,
199
        locking.LEVEL_INSTANCE: ['instance1.example.com'],
200
      }
201
      # Acquire just two nodes
202
      self.needed_locks = {
203
        locking.LEVEL_NODE: ['node1.example.com', 'node2.example.com'],
204
      }
205
      # Acquire no locks
206
      self.needed_locks = {} # No, you can't leave it to the default value None
207

208
    """
209
    # The implementation of this method is mandatory only if the new LU is
210
    # concurrent, so that old LUs don't need to be changed all at the same
211
    # time.
212
    if self.REQ_BGL:
213
      self.needed_locks = {} # Exclusive LUs don't need locks.
214
    else:
215
      raise NotImplementedError
216

    
217
  def DeclareLocks(self, level):
218
    """Declare LU locking needs for a level
219

220
    While most LUs can just declare their locking needs at ExpandNames time,
221
    sometimes there's the need to calculate some locks after having acquired
222
    the ones before. This function is called just before acquiring locks at a
223
    particular level, but after acquiring the ones at lower levels, and permits
224
    such calculations. It can be used to modify self.needed_locks, and by
225
    default it does nothing.
226

227
    This function is only called if you have something already set in
228
    self.needed_locks for the level.
229

230
    @param level: Locking level which is going to be locked
231
    @type level: member of ganeti.locking.LEVELS
232

233
    """
234

    
235
  def CheckPrereq(self):
236
    """Check prerequisites for this LU.
237

238
    This method should check that the prerequisites for the execution
239
    of this LU are fulfilled. It can do internode communication, but
240
    it should be idempotent - no cluster or system changes are
241
    allowed.
242

243
    The method should raise errors.OpPrereqError in case something is
244
    not fulfilled. Its return value is ignored.
245

246
    This method should also update all the parameters of the opcode to
247
    their canonical form if it hasn't been done by ExpandNames before.
248

249
    """
250
    if self.tasklets is not None:
251
      for (idx, tl) in enumerate(self.tasklets):
252
        logging.debug("Checking prerequisites for tasklet %s/%s",
253
                      idx + 1, len(self.tasklets))
254
        tl.CheckPrereq()
255
    else:
256
      pass
257

    
258
  def Exec(self, feedback_fn):
259
    """Execute the LU.
260

261
    This method should implement the actual work. It should raise
262
    errors.OpExecError for failures that are somewhat dealt with in
263
    code, or expected.
264

265
    """
266
    if self.tasklets is not None:
267
      for (idx, tl) in enumerate(self.tasklets):
268
        logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
269
        tl.Exec(feedback_fn)
270
    else:
271
      raise NotImplementedError
272

    
273
  def BuildHooksEnv(self):
274
    """Build hooks environment for this LU.
275

276
    @rtype: dict
277
    @return: Dictionary containing the environment that will be used for
278
      running the hooks for this LU. The keys of the dict must not be prefixed
279
      with "GANETI_"--that'll be added by the hooks runner. The hooks runner
280
      will extend the environment with additional variables. If no environment
281
      should be defined, an empty dictionary should be returned (not C{None}).
282
    @note: If the C{HPATH} attribute of the LU class is C{None}, this function
283
      will not be called.
284

285
    """
286
    raise NotImplementedError
287

    
288
  def BuildHooksNodes(self):
289
    """Build list of nodes to run LU's hooks.
290

291
    @rtype: tuple; (list, list)
292
    @return: Tuple containing a list of node names on which the hook
293
      should run before the execution and a list of node names on which the
294
      hook should run after the execution. No nodes should be returned as an
295
      empty list (and not None).
296
    @note: If the C{HPATH} attribute of the LU class is C{None}, this function
297
      will not be called.
298

299
    """
300
    raise NotImplementedError
301

    
302
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
303
    """Notify the LU about the results of its hooks.
304

305
    This method is called every time a hooks phase is executed, and notifies
306
    the Logical Unit about the hooks' result. The LU can then use it to alter
307
    its result based on the hooks.  By default the method does nothing and the
308
    previous result is passed back unchanged but any LU can define it if it
309
    wants to use the local cluster hook-scripts somehow.
310

311
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
312
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
313
    @param hook_results: the results of the multi-node hooks rpc call
314
    @param feedback_fn: function used send feedback back to the caller
315
    @param lu_result: the previous Exec result this LU had, or None
316
        in the PRE phase
317
    @return: the new Exec result, based on the previous result
318
        and hook results
319

320
    """
321
    # API must be kept, thus we ignore the unused argument and could
322
    # be a function warnings
323
    # pylint: disable-msg=W0613,R0201
324
    return lu_result
325

    
326
  def _ExpandAndLockInstance(self):
327
    """Helper function to expand and lock an instance.
328

329
    Many LUs that work on an instance take its name in self.op.instance_name
330
    and need to expand it and then declare the expanded name for locking. This
331
    function does it, and then updates self.op.instance_name to the expanded
332
    name. It also initializes needed_locks as a dict, if this hasn't been done
333
    before.
334

335
    """
336
    if self.needed_locks is None:
337
      self.needed_locks = {}
338
    else:
339
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
340
        "_ExpandAndLockInstance called with instance-level locks set"
341
    self.op.instance_name = _ExpandInstanceName(self.cfg,
342
                                                self.op.instance_name)
343
    self.needed_locks[locking.LEVEL_INSTANCE] = self.op.instance_name
344

    
345
  def _LockInstancesNodes(self, primary_only=False):
346
    """Helper function to declare instances' nodes for locking.
347

348
    This function should be called after locking one or more instances to lock
349
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
350
    with all primary or secondary nodes for instances already locked and
351
    present in self.needed_locks[locking.LEVEL_INSTANCE].
352

353
    It should be called from DeclareLocks, and for safety only works if
354
    self.recalculate_locks[locking.LEVEL_NODE] is set.
355

356
    In the future it may grow parameters to just lock some instance's nodes, or
357
    to just lock primaries or secondary nodes, if needed.
358

359
    If should be called in DeclareLocks in a way similar to::
360

361
      if level == locking.LEVEL_NODE:
362
        self._LockInstancesNodes()
363

364
    @type primary_only: boolean
365
    @param primary_only: only lock primary nodes of locked instances
366

367
    """
368
    assert locking.LEVEL_NODE in self.recalculate_locks, \
369
      "_LockInstancesNodes helper function called with no nodes to recalculate"
370

    
371
    # TODO: check if we're really been called with the instance locks held
372

    
373
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
374
    # future we might want to have different behaviors depending on the value
375
    # of self.recalculate_locks[locking.LEVEL_NODE]
376
    wanted_nodes = []
377
    for instance_name in self.glm.list_owned(locking.LEVEL_INSTANCE):
378
      instance = self.context.cfg.GetInstanceInfo(instance_name)
379
      wanted_nodes.append(instance.primary_node)
380
      if not primary_only:
381
        wanted_nodes.extend(instance.secondary_nodes)
382

    
383
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
384
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
385
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
386
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
387

    
388
    del self.recalculate_locks[locking.LEVEL_NODE]
389

    
390

    
391
class NoHooksLU(LogicalUnit): # pylint: disable-msg=W0223
392
  """Simple LU which runs no hooks.
393

394
  This LU is intended as a parent for other LogicalUnits which will
395
  run no hooks, in order to reduce duplicate code.
396

397
  """
398
  HPATH = None
399
  HTYPE = None
400

    
401
  def BuildHooksEnv(self):
402
    """Empty BuildHooksEnv for NoHooksLu.
403

404
    This just raises an error.
405

406
    """
407
    raise AssertionError("BuildHooksEnv called for NoHooksLUs")
408

    
409
  def BuildHooksNodes(self):
410
    """Empty BuildHooksNodes for NoHooksLU.
411

412
    """
413
    raise AssertionError("BuildHooksNodes called for NoHooksLU")
414

    
415

    
416
class Tasklet:
417
  """Tasklet base class.
418

419
  Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
420
  they can mix legacy code with tasklets. Locking needs to be done in the LU,
421
  tasklets know nothing about locks.
422

423
  Subclasses must follow these rules:
424
    - Implement CheckPrereq
425
    - Implement Exec
426

427
  """
428
  def __init__(self, lu):
429
    self.lu = lu
430

    
431
    # Shortcuts
432
    self.cfg = lu.cfg
433
    self.rpc = lu.rpc
434

    
435
  def CheckPrereq(self):
436
    """Check prerequisites for this tasklets.
437

438
    This method should check whether the prerequisites for the execution of
439
    this tasklet are fulfilled. It can do internode communication, but it
440
    should be idempotent - no cluster or system changes are allowed.
441

442
    The method should raise errors.OpPrereqError in case something is not
443
    fulfilled. Its return value is ignored.
444

445
    This method should also update all parameters to their canonical form if it
446
    hasn't been done before.
447

448
    """
449
    pass
450

    
451
  def Exec(self, feedback_fn):
452
    """Execute the tasklet.
453

454
    This method should implement the actual work. It should raise
455
    errors.OpExecError for failures that are somewhat dealt with in code, or
456
    expected.
457

458
    """
459
    raise NotImplementedError
460

    
461

    
462
class _QueryBase:
463
  """Base for query utility classes.
464

465
  """
466
  #: Attribute holding field definitions
467
  FIELDS = None
468

    
469
  def __init__(self, filter_, fields, use_locking):
470
    """Initializes this class.
471

472
    """
473
    self.use_locking = use_locking
474

    
475
    self.query = query.Query(self.FIELDS, fields, filter_=filter_,
476
                             namefield="name")
477
    self.requested_data = self.query.RequestedData()
478
    self.names = self.query.RequestedNames()
479

    
480
    # Sort only if no names were requested
481
    self.sort_by_name = not self.names
482

    
483
    self.do_locking = None
484
    self.wanted = None
485

    
486
  def _GetNames(self, lu, all_names, lock_level):
487
    """Helper function to determine names asked for in the query.
488

489
    """
490
    if self.do_locking:
491
      names = lu.glm.list_owned(lock_level)
492
    else:
493
      names = all_names
494

    
495
    if self.wanted == locking.ALL_SET:
496
      assert not self.names
497
      # caller didn't specify names, so ordering is not important
498
      return utils.NiceSort(names)
499

    
500
    # caller specified names and we must keep the same order
501
    assert self.names
502
    assert not self.do_locking or lu.glm.is_owned(lock_level)
503

    
504
    missing = set(self.wanted).difference(names)
505
    if missing:
506
      raise errors.OpExecError("Some items were removed before retrieving"
507
                               " their data: %s" % missing)
508

    
509
    # Return expanded names
510
    return self.wanted
511

    
512
  def ExpandNames(self, lu):
513
    """Expand names for this query.
514

515
    See L{LogicalUnit.ExpandNames}.
516

517
    """
518
    raise NotImplementedError()
519

    
520
  def DeclareLocks(self, lu, level):
521
    """Declare locks for this query.
522

523
    See L{LogicalUnit.DeclareLocks}.
524

525
    """
526
    raise NotImplementedError()
527

    
528
  def _GetQueryData(self, lu):
529
    """Collects all data for this query.
530

531
    @return: Query data object
532

533
    """
534
    raise NotImplementedError()
535

    
536
  def NewStyleQuery(self, lu):
537
    """Collect data and execute query.
538

539
    """
540
    return query.GetQueryResponse(self.query, self._GetQueryData(lu),
541
                                  sort_by_name=self.sort_by_name)
542

    
543
  def OldStyleQuery(self, lu):
544
    """Collect data and execute query.
545

546
    """
547
    return self.query.OldStyleQuery(self._GetQueryData(lu),
548
                                    sort_by_name=self.sort_by_name)
549

    
550

    
551
def _ShareAll():
552
  """Returns a dict declaring all lock levels shared.
553

554
  """
555
  return dict.fromkeys(locking.LEVELS, 1)
556

    
557

    
558
def _SupportsOob(cfg, node):
559
  """Tells if node supports OOB.
560

561
  @type cfg: L{config.ConfigWriter}
562
  @param cfg: The cluster configuration
563
  @type node: L{objects.Node}
564
  @param node: The node
565
  @return: The OOB script if supported or an empty string otherwise
566

567
  """
568
  return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM]
569

    
570

    
571
def _GetWantedNodes(lu, nodes):
572
  """Returns list of checked and expanded node names.
573

574
  @type lu: L{LogicalUnit}
575
  @param lu: the logical unit on whose behalf we execute
576
  @type nodes: list
577
  @param nodes: list of node names or None for all nodes
578
  @rtype: list
579
  @return: the list of nodes, sorted
580
  @raise errors.ProgrammerError: if the nodes parameter is wrong type
581

582
  """
583
  if nodes:
584
    return [_ExpandNodeName(lu.cfg, name) for name in nodes]
585

    
586
  return utils.NiceSort(lu.cfg.GetNodeList())
587

    
588

    
589
def _GetWantedInstances(lu, instances):
590
  """Returns list of checked and expanded instance names.
591

592
  @type lu: L{LogicalUnit}
593
  @param lu: the logical unit on whose behalf we execute
594
  @type instances: list
595
  @param instances: list of instance names or None for all instances
596
  @rtype: list
597
  @return: the list of instances, sorted
598
  @raise errors.OpPrereqError: if the instances parameter is wrong type
599
  @raise errors.OpPrereqError: if any of the passed instances is not found
600

601
  """
602
  if instances:
603
    wanted = [_ExpandInstanceName(lu.cfg, name) for name in instances]
604
  else:
605
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
606
  return wanted
607

    
608

    
609
def _GetUpdatedParams(old_params, update_dict,
610
                      use_default=True, use_none=False):
611
  """Return the new version of a parameter dictionary.
612

613
  @type old_params: dict
614
  @param old_params: old parameters
615
  @type update_dict: dict
616
  @param update_dict: dict containing new parameter values, or
617
      constants.VALUE_DEFAULT to reset the parameter to its default
618
      value
619
  @param use_default: boolean
620
  @type use_default: whether to recognise L{constants.VALUE_DEFAULT}
621
      values as 'to be deleted' values
622
  @param use_none: boolean
623
  @type use_none: whether to recognise C{None} values as 'to be
624
      deleted' values
625
  @rtype: dict
626
  @return: the new parameter dictionary
627

628
  """
629
  params_copy = copy.deepcopy(old_params)
630
  for key, val in update_dict.iteritems():
631
    if ((use_default and val == constants.VALUE_DEFAULT) or
632
        (use_none and val is None)):
633
      try:
634
        del params_copy[key]
635
      except KeyError:
636
        pass
637
    else:
638
      params_copy[key] = val
639
  return params_copy
640

    
641

    
642
def _ReleaseLocks(lu, level, names=None, keep=None):
643
  """Releases locks owned by an LU.
644

645
  @type lu: L{LogicalUnit}
646
  @param level: Lock level
647
  @type names: list or None
648
  @param names: Names of locks to release
649
  @type keep: list or None
650
  @param keep: Names of locks to retain
651

652
  """
653
  assert not (keep is not None and names is not None), \
654
         "Only one of the 'names' and the 'keep' parameters can be given"
655

    
656
  if names is not None:
657
    should_release = names.__contains__
658
  elif keep:
659
    should_release = lambda name: name not in keep
660
  else:
661
    should_release = None
662

    
663
  if should_release:
664
    retain = []
665
    release = []
666

    
667
    # Determine which locks to release
668
    for name in lu.glm.list_owned(level):
669
      if should_release(name):
670
        release.append(name)
671
      else:
672
        retain.append(name)
673

    
674
    assert len(lu.glm.list_owned(level)) == (len(retain) + len(release))
675

    
676
    # Release just some locks
677
    lu.glm.release(level, names=release)
678

    
679
    assert frozenset(lu.glm.list_owned(level)) == frozenset(retain)
680
  else:
681
    # Release everything
682
    lu.glm.release(level)
683

    
684
    assert not lu.glm.is_owned(level), "No locks should be owned"
685

    
686

    
687
def _MapInstanceDisksToNodes(instances):
688
  """Creates a map from (node, volume) to instance name.
689

690
  @type instances: list of L{objects.Instance}
691
  @rtype: dict; tuple of (node name, volume name) as key, instance name as value
692

693
  """
694
  return dict(((node, vol), inst.name)
695
              for inst in instances
696
              for (node, vols) in inst.MapLVsByNode().items()
697
              for vol in vols)
698

    
699

    
700
def _RunPostHook(lu, node_name):
701
  """Runs the post-hook for an opcode on a single node.
702

703
  """
704
  hm = lu.proc.hmclass(lu.rpc.call_hooks_runner, lu)
705
  try:
706
    hm.RunPhase(constants.HOOKS_PHASE_POST, nodes=[node_name])
707
  except:
708
    # pylint: disable-msg=W0702
709
    lu.LogWarning("Errors occurred running hooks on %s" % node_name)
710

    
711

    
712
def _CheckOutputFields(static, dynamic, selected):
713
  """Checks whether all selected fields are valid.
714

715
  @type static: L{utils.FieldSet}
716
  @param static: static fields set
717
  @type dynamic: L{utils.FieldSet}
718
  @param dynamic: dynamic fields set
719

720
  """
721
  f = utils.FieldSet()
722
  f.Extend(static)
723
  f.Extend(dynamic)
724

    
725
  delta = f.NonMatching(selected)
726
  if delta:
727
    raise errors.OpPrereqError("Unknown output fields selected: %s"
728
                               % ",".join(delta), errors.ECODE_INVAL)
729

    
730

    
731
def _CheckGlobalHvParams(params):
732
  """Validates that given hypervisor params are not global ones.
733

734
  This will ensure that instances don't get customised versions of
735
  global params.
736

737
  """
738
  used_globals = constants.HVC_GLOBALS.intersection(params)
739
  if used_globals:
740
    msg = ("The following hypervisor parameters are global and cannot"
741
           " be customized at instance level, please modify them at"
742
           " cluster level: %s" % utils.CommaJoin(used_globals))
743
    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
744

    
745

    
746
def _CheckNodeOnline(lu, node, msg=None):
747
  """Ensure that a given node is online.
748

749
  @param lu: the LU on behalf of which we make the check
750
  @param node: the node to check
751
  @param msg: if passed, should be a message to replace the default one
752
  @raise errors.OpPrereqError: if the node is offline
753

754
  """
755
  if msg is None:
756
    msg = "Can't use offline node"
757
  if lu.cfg.GetNodeInfo(node).offline:
758
    raise errors.OpPrereqError("%s: %s" % (msg, node), errors.ECODE_STATE)
759

    
760

    
761
def _CheckNodeNotDrained(lu, node):
762
  """Ensure that a given node is not drained.
763

764
  @param lu: the LU on behalf of which we make the check
765
  @param node: the node to check
766
  @raise errors.OpPrereqError: if the node is drained
767

768
  """
769
  if lu.cfg.GetNodeInfo(node).drained:
770
    raise errors.OpPrereqError("Can't use drained node %s" % node,
771
                               errors.ECODE_STATE)
772

    
773

    
774
def _CheckNodeVmCapable(lu, node):
775
  """Ensure that a given node is vm capable.
776

777
  @param lu: the LU on behalf of which we make the check
778
  @param node: the node to check
779
  @raise errors.OpPrereqError: if the node is not vm capable
780

781
  """
782
  if not lu.cfg.GetNodeInfo(node).vm_capable:
783
    raise errors.OpPrereqError("Can't use non-vm_capable node %s" % node,
784
                               errors.ECODE_STATE)
785

    
786

    
787
def _CheckNodeHasOS(lu, node, os_name, force_variant):
788
  """Ensure that a node supports a given OS.
789

790
  @param lu: the LU on behalf of which we make the check
791
  @param node: the node to check
792
  @param os_name: the OS to query about
793
  @param force_variant: whether to ignore variant errors
794
  @raise errors.OpPrereqError: if the node is not supporting the OS
795

796
  """
797
  result = lu.rpc.call_os_get(node, os_name)
798
  result.Raise("OS '%s' not in supported OS list for node %s" %
799
               (os_name, node),
800
               prereq=True, ecode=errors.ECODE_INVAL)
801
  if not force_variant:
802
    _CheckOSVariant(result.payload, os_name)
803

    
804

    
805
def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
806
  """Ensure that a node has the given secondary ip.
807

808
  @type lu: L{LogicalUnit}
809
  @param lu: the LU on behalf of which we make the check
810
  @type node: string
811
  @param node: the node to check
812
  @type secondary_ip: string
813
  @param secondary_ip: the ip to check
814
  @type prereq: boolean
815
  @param prereq: whether to throw a prerequisite or an execute error
816
  @raise errors.OpPrereqError: if the node doesn't have the ip, and prereq=True
817
  @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
818

819
  """
820
  result = lu.rpc.call_node_has_ip_address(node, secondary_ip)
821
  result.Raise("Failure checking secondary ip on node %s" % node,
822
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
823
  if not result.payload:
824
    msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
825
           " please fix and re-run this command" % secondary_ip)
826
    if prereq:
827
      raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
828
    else:
829
      raise errors.OpExecError(msg)
830

    
831

    
832
def _GetClusterDomainSecret():
833
  """Reads the cluster domain secret.
834

835
  """
836
  return utils.ReadOneLineFile(constants.CLUSTER_DOMAIN_SECRET_FILE,
837
                               strict=True)
838

    
839

    
840
def _CheckInstanceDown(lu, instance, reason):
841
  """Ensure that an instance is not running."""
842
  if instance.admin_up:
843
    raise errors.OpPrereqError("Instance %s is marked to be up, %s" %
844
                               (instance.name, reason), errors.ECODE_STATE)
845

    
846
  pnode = instance.primary_node
847
  ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
848
  ins_l.Raise("Can't contact node %s for instance information" % pnode,
849
              prereq=True, ecode=errors.ECODE_ENVIRON)
850

    
851
  if instance.name in ins_l.payload:
852
    raise errors.OpPrereqError("Instance %s is running, %s" %
853
                               (instance.name, reason), errors.ECODE_STATE)
854

    
855

    
856
def _ExpandItemName(fn, name, kind):
857
  """Expand an item name.
858

859
  @param fn: the function to use for expansion
860
  @param name: requested item name
861
  @param kind: text description ('Node' or 'Instance')
862
  @return: the resolved (full) name
863
  @raise errors.OpPrereqError: if the item is not found
864

865
  """
866
  full_name = fn(name)
867
  if full_name is None:
868
    raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
869
                               errors.ECODE_NOENT)
870
  return full_name
871

    
872

    
873
def _ExpandNodeName(cfg, name):
874
  """Wrapper over L{_ExpandItemName} for nodes."""
875
  return _ExpandItemName(cfg.ExpandNodeName, name, "Node")
876

    
877

    
878
def _ExpandInstanceName(cfg, name):
879
  """Wrapper over L{_ExpandItemName} for instance."""
880
  return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
881

    
882

    
883
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
884
                          memory, vcpus, nics, disk_template, disks,
885
                          bep, hvp, hypervisor_name, tags):
886
  """Builds instance related env variables for hooks
887

888
  This builds the hook environment from individual variables.
889

890
  @type name: string
891
  @param name: the name of the instance
892
  @type primary_node: string
893
  @param primary_node: the name of the instance's primary node
894
  @type secondary_nodes: list
895
  @param secondary_nodes: list of secondary nodes as strings
896
  @type os_type: string
897
  @param os_type: the name of the instance's OS
898
  @type status: boolean
899
  @param status: the should_run status of the instance
900
  @type memory: string
901
  @param memory: the memory size of the instance
902
  @type vcpus: string
903
  @param vcpus: the count of VCPUs the instance has
904
  @type nics: list
905
  @param nics: list of tuples (ip, mac, mode, link) representing
906
      the NICs the instance has
907
  @type disk_template: string
908
  @param disk_template: the disk template of the instance
909
  @type disks: list
910
  @param disks: the list of (size, mode) pairs
911
  @type bep: dict
912
  @param bep: the backend parameters for the instance
913
  @type hvp: dict
914
  @param hvp: the hypervisor parameters for the instance
915
  @type hypervisor_name: string
916
  @param hypervisor_name: the hypervisor for the instance
917
  @type tags: list
918
  @param tags: list of instance tags as strings
919
  @rtype: dict
920
  @return: the hook environment for this instance
921

922
  """
923
  if status:
924
    str_status = "up"
925
  else:
926
    str_status = "down"
927
  env = {
928
    "OP_TARGET": name,
929
    "INSTANCE_NAME": name,
930
    "INSTANCE_PRIMARY": primary_node,
931
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
932
    "INSTANCE_OS_TYPE": os_type,
933
    "INSTANCE_STATUS": str_status,
934
    "INSTANCE_MEMORY": memory,
935
    "INSTANCE_VCPUS": vcpus,
936
    "INSTANCE_DISK_TEMPLATE": disk_template,
937
    "INSTANCE_HYPERVISOR": hypervisor_name,
938
  }
939

    
940
  if nics:
941
    nic_count = len(nics)
942
    for idx, (ip, mac, mode, link) in enumerate(nics):
943
      if ip is None:
944
        ip = ""
945
      env["INSTANCE_NIC%d_IP" % idx] = ip
946
      env["INSTANCE_NIC%d_MAC" % idx] = mac
947
      env["INSTANCE_NIC%d_MODE" % idx] = mode
948
      env["INSTANCE_NIC%d_LINK" % idx] = link
949
      if mode == constants.NIC_MODE_BRIDGED:
950
        env["INSTANCE_NIC%d_BRIDGE" % idx] = link
951
  else:
952
    nic_count = 0
953

    
954
  env["INSTANCE_NIC_COUNT"] = nic_count
955

    
956
  if disks:
957
    disk_count = len(disks)
958
    for idx, (size, mode) in enumerate(disks):
959
      env["INSTANCE_DISK%d_SIZE" % idx] = size
960
      env["INSTANCE_DISK%d_MODE" % idx] = mode
961
  else:
962
    disk_count = 0
963

    
964
  env["INSTANCE_DISK_COUNT"] = disk_count
965

    
966
  if not tags:
967
    tags = []
968

    
969
  env["INSTANCE_TAGS"] = " ".join(tags)
970

    
971
  for source, kind in [(bep, "BE"), (hvp, "HV")]:
972
    for key, value in source.items():
973
      env["INSTANCE_%s_%s" % (kind, key)] = value
974

    
975
  return env
976

    
977

    
978
def _NICListToTuple(lu, nics):
979
  """Build a list of nic information tuples.
980

981
  This list is suitable to be passed to _BuildInstanceHookEnv or as a return
982
  value in LUInstanceQueryData.
983

984
  @type lu:  L{LogicalUnit}
985
  @param lu: the logical unit on whose behalf we execute
986
  @type nics: list of L{objects.NIC}
987
  @param nics: list of nics to convert to hooks tuples
988

989
  """
990
  hooks_nics = []
991
  cluster = lu.cfg.GetClusterInfo()
992
  for nic in nics:
993
    ip = nic.ip
994
    mac = nic.mac
995
    filled_params = cluster.SimpleFillNIC(nic.nicparams)
996
    mode = filled_params[constants.NIC_MODE]
997
    link = filled_params[constants.NIC_LINK]
998
    hooks_nics.append((ip, mac, mode, link))
999
  return hooks_nics
1000

    
1001

    
1002
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
1003
  """Builds instance related env variables for hooks from an object.
1004

1005
  @type lu: L{LogicalUnit}
1006
  @param lu: the logical unit on whose behalf we execute
1007
  @type instance: L{objects.Instance}
1008
  @param instance: the instance for which we should build the
1009
      environment
1010
  @type override: dict
1011
  @param override: dictionary with key/values that will override
1012
      our values
1013
  @rtype: dict
1014
  @return: the hook environment dictionary
1015

1016
  """
1017
  cluster = lu.cfg.GetClusterInfo()
1018
  bep = cluster.FillBE(instance)
1019
  hvp = cluster.FillHV(instance)
1020
  args = {
1021
    "name": instance.name,
1022
    "primary_node": instance.primary_node,
1023
    "secondary_nodes": instance.secondary_nodes,
1024
    "os_type": instance.os,
1025
    "status": instance.admin_up,
1026
    "memory": bep[constants.BE_MEMORY],
1027
    "vcpus": bep[constants.BE_VCPUS],
1028
    "nics": _NICListToTuple(lu, instance.nics),
1029
    "disk_template": instance.disk_template,
1030
    "disks": [(disk.size, disk.mode) for disk in instance.disks],
1031
    "bep": bep,
1032
    "hvp": hvp,
1033
    "hypervisor_name": instance.hypervisor,
1034
    "tags": instance.tags,
1035
  }
1036
  if override:
1037
    args.update(override)
1038
  return _BuildInstanceHookEnv(**args) # pylint: disable-msg=W0142
1039

    
1040

    
1041
def _AdjustCandidatePool(lu, exceptions):
1042
  """Adjust the candidate pool after node operations.
1043

1044
  """
1045
  mod_list = lu.cfg.MaintainCandidatePool(exceptions)
1046
  if mod_list:
1047
    lu.LogInfo("Promoted nodes to master candidate role: %s",
1048
               utils.CommaJoin(node.name for node in mod_list))
1049
    for name in mod_list:
1050
      lu.context.ReaddNode(name)
1051
  mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
1052
  if mc_now > mc_max:
1053
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
1054
               (mc_now, mc_max))
1055

    
1056

    
1057
def _DecideSelfPromotion(lu, exceptions=None):
1058
  """Decide whether I should promote myself as a master candidate.
1059

1060
  """
1061
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
1062
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
1063
  # the new node will increase mc_max with one, so:
1064
  mc_should = min(mc_should + 1, cp_size)
1065
  return mc_now < mc_should
1066

    
1067

    
1068
def _CheckNicsBridgesExist(lu, target_nics, target_node):
1069
  """Check that the brigdes needed by a list of nics exist.
1070

1071
  """
1072
  cluster = lu.cfg.GetClusterInfo()
1073
  paramslist = [cluster.SimpleFillNIC(nic.nicparams) for nic in target_nics]
1074
  brlist = [params[constants.NIC_LINK] for params in paramslist
1075
            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
1076
  if brlist:
1077
    result = lu.rpc.call_bridges_exist(target_node, brlist)
1078
    result.Raise("Error checking bridges on destination node '%s'" %
1079
                 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
1080

    
1081

    
1082
def _CheckInstanceBridgesExist(lu, instance, node=None):
1083
  """Check that the brigdes needed by an instance exist.
1084

1085
  """
1086
  if node is None:
1087
    node = instance.primary_node
1088
  _CheckNicsBridgesExist(lu, instance.nics, node)
1089

    
1090

    
1091
def _CheckOSVariant(os_obj, name):
1092
  """Check whether an OS name conforms to the os variants specification.
1093

1094
  @type os_obj: L{objects.OS}
1095
  @param os_obj: OS object to check
1096
  @type name: string
1097
  @param name: OS name passed by the user, to check for validity
1098

1099
  """
1100
  variant = objects.OS.GetVariant(name)
1101
  if not os_obj.supported_variants:
1102
    if variant:
1103
      raise errors.OpPrereqError("OS '%s' doesn't support variants ('%s'"
1104
                                 " passed)" % (os_obj.name, variant),
1105
                                 errors.ECODE_INVAL)
1106
    return
1107
  if not variant:
1108
    raise errors.OpPrereqError("OS name must include a variant",
1109
                               errors.ECODE_INVAL)
1110

    
1111
  if variant not in os_obj.supported_variants:
1112
    raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
1113

    
1114

    
1115
def _GetNodeInstancesInner(cfg, fn):
1116
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
1117

    
1118

    
1119
def _GetNodeInstances(cfg, node_name):
1120
  """Returns a list of all primary and secondary instances on a node.
1121

1122
  """
1123

    
1124
  return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
1125

    
1126

    
1127
def _GetNodePrimaryInstances(cfg, node_name):
1128
  """Returns primary instances on a node.
1129

1130
  """
1131
  return _GetNodeInstancesInner(cfg,
1132
                                lambda inst: node_name == inst.primary_node)
1133

    
1134

    
1135
def _GetNodeSecondaryInstances(cfg, node_name):
1136
  """Returns secondary instances on a node.
1137

1138
  """
1139
  return _GetNodeInstancesInner(cfg,
1140
                                lambda inst: node_name in inst.secondary_nodes)
1141

    
1142

    
1143
def _GetStorageTypeArgs(cfg, storage_type):
1144
  """Returns the arguments for a storage type.
1145

1146
  """
1147
  # Special case for file storage
1148
  if storage_type == constants.ST_FILE:
1149
    # storage.FileStorage wants a list of storage directories
1150
    return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
1151

    
1152
  return []
1153

    
1154

    
1155
def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
1156
  faulty = []
1157

    
1158
  for dev in instance.disks:
1159
    cfg.SetDiskID(dev, node_name)
1160

    
1161
  result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
1162
  result.Raise("Failed to get disk status from node %s" % node_name,
1163
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
1164

    
1165
  for idx, bdev_status in enumerate(result.payload):
1166
    if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
1167
      faulty.append(idx)
1168

    
1169
  return faulty
1170

    
1171

    
1172
def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
1173
  """Check the sanity of iallocator and node arguments and use the
1174
  cluster-wide iallocator if appropriate.
1175

1176
  Check that at most one of (iallocator, node) is specified. If none is
1177
  specified, then the LU's opcode's iallocator slot is filled with the
1178
  cluster-wide default iallocator.
1179

1180
  @type iallocator_slot: string
1181
  @param iallocator_slot: the name of the opcode iallocator slot
1182
  @type node_slot: string
1183
  @param node_slot: the name of the opcode target node slot
1184

1185
  """
1186
  node = getattr(lu.op, node_slot, None)
1187
  iallocator = getattr(lu.op, iallocator_slot, None)
1188

    
1189
  if node is not None and iallocator is not None:
1190
    raise errors.OpPrereqError("Do not specify both, iallocator and node",
1191
                               errors.ECODE_INVAL)
1192
  elif node is None and iallocator is None:
1193
    default_iallocator = lu.cfg.GetDefaultIAllocator()
1194
    if default_iallocator:
1195
      setattr(lu.op, iallocator_slot, default_iallocator)
1196
    else:
1197
      raise errors.OpPrereqError("No iallocator or node given and no"
1198
                                 " cluster-wide default iallocator found;"
1199
                                 " please specify either an iallocator or a"
1200
                                 " node, or set a cluster-wide default"
1201
                                 " iallocator")
1202

    
1203

    
1204
class LUClusterPostInit(LogicalUnit):
1205
  """Logical unit for running hooks after cluster initialization.
1206

1207
  """
1208
  HPATH = "cluster-init"
1209
  HTYPE = constants.HTYPE_CLUSTER
1210

    
1211
  def BuildHooksEnv(self):
1212
    """Build hooks env.
1213

1214
    """
1215
    return {
1216
      "OP_TARGET": self.cfg.GetClusterName(),
1217
      }
1218

    
1219
  def BuildHooksNodes(self):
1220
    """Build hooks nodes.
1221

1222
    """
1223
    return ([], [self.cfg.GetMasterNode()])
1224

    
1225
  def Exec(self, feedback_fn):
1226
    """Nothing to do.
1227

1228
    """
1229
    return True
1230

    
1231

    
1232
class LUClusterDestroy(LogicalUnit):
1233
  """Logical unit for destroying the cluster.
1234

1235
  """
1236
  HPATH = "cluster-destroy"
1237
  HTYPE = constants.HTYPE_CLUSTER
1238

    
1239
  def BuildHooksEnv(self):
1240
    """Build hooks env.
1241

1242
    """
1243
    return {
1244
      "OP_TARGET": self.cfg.GetClusterName(),
1245
      }
1246

    
1247
  def BuildHooksNodes(self):
1248
    """Build hooks nodes.
1249

1250
    """
1251
    return ([], [])
1252

    
1253
  def CheckPrereq(self):
1254
    """Check prerequisites.
1255

1256
    This checks whether the cluster is empty.
1257

1258
    Any errors are signaled by raising errors.OpPrereqError.
1259

1260
    """
1261
    master = self.cfg.GetMasterNode()
1262

    
1263
    nodelist = self.cfg.GetNodeList()
1264
    if len(nodelist) != 1 or nodelist[0] != master:
1265
      raise errors.OpPrereqError("There are still %d node(s) in"
1266
                                 " this cluster." % (len(nodelist) - 1),
1267
                                 errors.ECODE_INVAL)
1268
    instancelist = self.cfg.GetInstanceList()
1269
    if instancelist:
1270
      raise errors.OpPrereqError("There are still %d instance(s) in"
1271
                                 " this cluster." % len(instancelist),
1272
                                 errors.ECODE_INVAL)
1273

    
1274
  def Exec(self, feedback_fn):
1275
    """Destroys the cluster.
1276

1277
    """
1278
    master = self.cfg.GetMasterNode()
1279

    
1280
    # Run post hooks on master node before it's removed
1281
    _RunPostHook(self, master)
1282

    
1283
    result = self.rpc.call_node_stop_master(master, False)
1284
    result.Raise("Could not disable the master role")
1285

    
1286
    return master
1287

    
1288

    
1289
def _VerifyCertificate(filename):
1290
  """Verifies a certificate for L{LUClusterVerifyConfig}.
1291

1292
  @type filename: string
1293
  @param filename: Path to PEM file
1294

1295
  """
1296
  try:
1297
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1298
                                           utils.ReadFile(filename))
1299
  except Exception, err: # pylint: disable-msg=W0703
1300
    return (LUClusterVerifyConfig.ETYPE_ERROR,
1301
            "Failed to load X509 certificate %s: %s" % (filename, err))
1302

    
1303
  (errcode, msg) = \
1304
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1305
                                constants.SSL_CERT_EXPIRATION_ERROR)
1306

    
1307
  if msg:
1308
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1309
  else:
1310
    fnamemsg = None
1311

    
1312
  if errcode is None:
1313
    return (None, fnamemsg)
1314
  elif errcode == utils.CERT_WARNING:
1315
    return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1316
  elif errcode == utils.CERT_ERROR:
1317
    return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1318

    
1319
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1320

    
1321

    
1322
def _GetAllHypervisorParameters(cluster, instances):
1323
  """Compute the set of all hypervisor parameters.
1324

1325
  @type cluster: L{objects.Cluster}
1326
  @param cluster: the cluster object
1327
  @param instances: list of L{objects.Instance}
1328
  @param instances: additional instances from which to obtain parameters
1329
  @rtype: list of (origin, hypervisor, parameters)
1330
  @return: a list with all parameters found, indicating the hypervisor they
1331
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1332

1333
  """
1334
  hvp_data = []
1335

    
1336
  for hv_name in cluster.enabled_hypervisors:
1337
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1338

    
1339
  for os_name, os_hvp in cluster.os_hvp.items():
1340
    for hv_name, hv_params in os_hvp.items():
1341
      if hv_params:
1342
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1343
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1344

    
1345
  # TODO: collapse identical parameter values in a single one
1346
  for instance in instances:
1347
    if instance.hvparams:
1348
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1349
                       cluster.FillHV(instance)))
1350

    
1351
  return hvp_data
1352

    
1353

    
1354
class _VerifyErrors(object):
1355
  """Mix-in for cluster/group verify LUs.
1356

1357
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1358
  self.op and self._feedback_fn to be available.)
1359

1360
  """
1361
  TCLUSTER = "cluster"
1362
  TNODE = "node"
1363
  TINSTANCE = "instance"
1364

    
1365
  ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
1366
  ECLUSTERCERT = (TCLUSTER, "ECLUSTERCERT")
1367
  ECLUSTERFILECHECK = (TCLUSTER, "ECLUSTERFILECHECK")
1368
  ECLUSTERDANGLINGNODES = (TNODE, "ECLUSTERDANGLINGNODES")
1369
  ECLUSTERDANGLINGINST = (TNODE, "ECLUSTERDANGLINGINST")
1370
  EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
1371
  EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
1372
  EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
1373
  EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
1374
  EINSTANCEFAULTYDISK = (TINSTANCE, "EINSTANCEFAULTYDISK")
1375
  EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE")
1376
  EINSTANCESPLITGROUPS = (TINSTANCE, "EINSTANCESPLITGROUPS")
1377
  ENODEDRBD = (TNODE, "ENODEDRBD")
1378
  ENODEDRBDHELPER = (TNODE, "ENODEDRBDHELPER")
1379
  ENODEFILECHECK = (TNODE, "ENODEFILECHECK")
1380
  ENODEHOOKS = (TNODE, "ENODEHOOKS")
1381
  ENODEHV = (TNODE, "ENODEHV")
1382
  ENODELVM = (TNODE, "ENODELVM")
1383
  ENODEN1 = (TNODE, "ENODEN1")
1384
  ENODENET = (TNODE, "ENODENET")
1385
  ENODEOS = (TNODE, "ENODEOS")
1386
  ENODEORPHANINSTANCE = (TNODE, "ENODEORPHANINSTANCE")
1387
  ENODEORPHANLV = (TNODE, "ENODEORPHANLV")
1388
  ENODERPC = (TNODE, "ENODERPC")
1389
  ENODESSH = (TNODE, "ENODESSH")
1390
  ENODEVERSION = (TNODE, "ENODEVERSION")
1391
  ENODESETUP = (TNODE, "ENODESETUP")
1392
  ENODETIME = (TNODE, "ENODETIME")
1393
  ENODEOOBPATH = (TNODE, "ENODEOOBPATH")
1394

    
1395
  ETYPE_FIELD = "code"
1396
  ETYPE_ERROR = "ERROR"
1397
  ETYPE_WARNING = "WARNING"
1398

    
1399
  def _Error(self, ecode, item, msg, *args, **kwargs):
1400
    """Format an error message.
1401

1402
    Based on the opcode's error_codes parameter, either format a
1403
    parseable error code, or a simpler error string.
1404

1405
    This must be called only from Exec and functions called from Exec.
1406

1407
    """
1408
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1409
    itype, etxt = ecode
1410
    # first complete the msg
1411
    if args:
1412
      msg = msg % args
1413
    # then format the whole message
1414
    if self.op.error_codes: # This is a mix-in. pylint: disable-msg=E1101
1415
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1416
    else:
1417
      if item:
1418
        item = " " + item
1419
      else:
1420
        item = ""
1421
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1422
    # and finally report it via the feedback_fn
1423
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable-msg=E1101
1424

    
1425
  def _ErrorIf(self, cond, *args, **kwargs):
1426
    """Log an error message if the passed condition is True.
1427

1428
    """
1429
    cond = (bool(cond)
1430
            or self.op.debug_simulate_errors) # pylint: disable-msg=E1101
1431
    if cond:
1432
      self._Error(*args, **kwargs)
1433
    # do not mark the operation as failed for WARN cases only
1434
    if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
1435
      self.bad = self.bad or cond
1436

    
1437

    
1438
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1439
  """Verifies the cluster config.
1440

1441
  """
1442
  REQ_BGL = True
1443

    
1444
  def _VerifyHVP(self, hvp_data):
1445
    """Verifies locally the syntax of the hypervisor parameters.
1446

1447
    """
1448
    for item, hv_name, hv_params in hvp_data:
1449
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1450
             (item, hv_name))
1451
      try:
1452
        hv_class = hypervisor.GetHypervisor(hv_name)
1453
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1454
        hv_class.CheckParameterSyntax(hv_params)
1455
      except errors.GenericError, err:
1456
        self._ErrorIf(True, self.ECLUSTERCFG, None, msg % str(err))
1457

    
1458
  def ExpandNames(self):
1459
    # Information can be safely retrieved as the BGL is acquired in exclusive
1460
    # mode
1461
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1462
    self.all_node_info = self.cfg.GetAllNodesInfo()
1463
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1464
    self.needed_locks = {}
1465

    
1466
  def Exec(self, feedback_fn):
1467
    """Verify integrity of cluster, performing various test on nodes.
1468

1469
    """
1470
    self.bad = False
1471
    self._feedback_fn = feedback_fn
1472

    
1473
    feedback_fn("* Verifying cluster config")
1474

    
1475
    for msg in self.cfg.VerifyConfig():
1476
      self._ErrorIf(True, self.ECLUSTERCFG, None, msg)
1477

    
1478
    feedback_fn("* Verifying cluster certificate files")
1479

    
1480
    for cert_filename in constants.ALL_CERT_FILES:
1481
      (errcode, msg) = _VerifyCertificate(cert_filename)
1482
      self._ErrorIf(errcode, self.ECLUSTERCERT, None, msg, code=errcode)
1483

    
1484
    feedback_fn("* Verifying hypervisor parameters")
1485

    
1486
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1487
                                                self.all_inst_info.values()))
1488

    
1489
    feedback_fn("* Verifying all nodes belong to an existing group")
1490

    
1491
    # We do this verification here because, should this bogus circumstance
1492
    # occur, it would never be caught by VerifyGroup, which only acts on
1493
    # nodes/instances reachable from existing node groups.
1494

    
1495
    dangling_nodes = set(node.name for node in self.all_node_info.values()
1496
                         if node.group not in self.all_group_info)
1497

    
1498
    dangling_instances = {}
1499
    no_node_instances = []
1500

    
1501
    for inst in self.all_inst_info.values():
1502
      if inst.primary_node in dangling_nodes:
1503
        dangling_instances.setdefault(inst.primary_node, []).append(inst.name)
1504
      elif inst.primary_node not in self.all_node_info:
1505
        no_node_instances.append(inst.name)
1506

    
1507
    pretty_dangling = [
1508
        "%s (%s)" %
1509
        (node.name,
1510
         utils.CommaJoin(dangling_instances.get(node.name,
1511
                                                ["no instances"])))
1512
        for node in dangling_nodes]
1513

    
1514
    self._ErrorIf(bool(dangling_nodes), self.ECLUSTERDANGLINGNODES, None,
1515
                  "the following nodes (and their instances) belong to a non"
1516
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1517

    
1518
    self._ErrorIf(bool(no_node_instances), self.ECLUSTERDANGLINGINST, None,
1519
                  "the following instances have a non-existing primary-node:"
1520
                  " %s", utils.CommaJoin(no_node_instances))
1521

    
1522
    return (not self.bad, [g.name for g in self.all_group_info.values()])
1523

    
1524

    
1525
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1526
  """Verifies the status of a node group.
1527

1528
  """
1529
  HPATH = "cluster-verify"
1530
  HTYPE = constants.HTYPE_CLUSTER
1531
  REQ_BGL = False
1532

    
1533
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1534

    
1535
  class NodeImage(object):
1536
    """A class representing the logical and physical status of a node.
1537

1538
    @type name: string
1539
    @ivar name: the node name to which this object refers
1540
    @ivar volumes: a structure as returned from
1541
        L{ganeti.backend.GetVolumeList} (runtime)
1542
    @ivar instances: a list of running instances (runtime)
1543
    @ivar pinst: list of configured primary instances (config)
1544
    @ivar sinst: list of configured secondary instances (config)
1545
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1546
        instances for which this node is secondary (config)
1547
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1548
    @ivar dfree: free disk, as reported by the node (runtime)
1549
    @ivar offline: the offline status (config)
1550
    @type rpc_fail: boolean
1551
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1552
        not whether the individual keys were correct) (runtime)
1553
    @type lvm_fail: boolean
1554
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1555
    @type hyp_fail: boolean
1556
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1557
    @type ghost: boolean
1558
    @ivar ghost: whether this is a known node or not (config)
1559
    @type os_fail: boolean
1560
    @ivar os_fail: whether the RPC call didn't return valid OS data
1561
    @type oslist: list
1562
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1563
    @type vm_capable: boolean
1564
    @ivar vm_capable: whether the node can host instances
1565

1566
    """
1567
    def __init__(self, offline=False, name=None, vm_capable=True):
1568
      self.name = name
1569
      self.volumes = {}
1570
      self.instances = []
1571
      self.pinst = []
1572
      self.sinst = []
1573
      self.sbp = {}
1574
      self.mfree = 0
1575
      self.dfree = 0
1576
      self.offline = offline
1577
      self.vm_capable = vm_capable
1578
      self.rpc_fail = False
1579
      self.lvm_fail = False
1580
      self.hyp_fail = False
1581
      self.ghost = False
1582
      self.os_fail = False
1583
      self.oslist = {}
1584

    
1585
  def ExpandNames(self):
1586
    # This raises errors.OpPrereqError on its own:
1587
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1588

    
1589
    # Get instances in node group; this is unsafe and needs verification later
1590
    inst_names = self.cfg.GetNodeGroupInstances(self.group_uuid)
1591

    
1592
    self.needed_locks = {
1593
      locking.LEVEL_INSTANCE: inst_names,
1594
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1595
      locking.LEVEL_NODE: [],
1596
      }
1597

    
1598
    self.share_locks = _ShareAll()
1599

    
1600
  def DeclareLocks(self, level):
1601
    if level == locking.LEVEL_NODE:
1602
      # Get members of node group; this is unsafe and needs verification later
1603
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1604

    
1605
      all_inst_info = self.cfg.GetAllInstancesInfo()
1606

    
1607
      # In Exec(), we warn about mirrored instances that have primary and
1608
      # secondary living in separate node groups. To fully verify that
1609
      # volumes for these instances are healthy, we will need to do an
1610
      # extra call to their secondaries. We ensure here those nodes will
1611
      # be locked.
1612
      for inst in self.glm.list_owned(locking.LEVEL_INSTANCE):
1613
        # Important: access only the instances whose lock is owned
1614
        if all_inst_info[inst].disk_template in constants.DTS_INT_MIRROR:
1615
          nodes.update(all_inst_info[inst].secondary_nodes)
1616

    
1617
      self.needed_locks[locking.LEVEL_NODE] = nodes
1618

    
1619
  def CheckPrereq(self):
1620
    group_nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1621
    group_instances = self.cfg.GetNodeGroupInstances(self.group_uuid)
1622

    
1623
    unlocked_nodes = \
1624
        group_nodes.difference(self.glm.list_owned(locking.LEVEL_NODE))
1625

    
1626
    unlocked_instances = \
1627
        group_instances.difference(self.glm.list_owned(locking.LEVEL_INSTANCE))
1628

    
1629
    if unlocked_nodes:
1630
      raise errors.OpPrereqError("Missing lock for nodes: %s" %
1631
                                 utils.CommaJoin(unlocked_nodes))
1632

    
1633
    if unlocked_instances:
1634
      raise errors.OpPrereqError("Missing lock for instances: %s" %
1635
                                 utils.CommaJoin(unlocked_instances))
1636

    
1637
    self.all_node_info = self.cfg.GetAllNodesInfo()
1638
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1639

    
1640
    self.my_node_names = utils.NiceSort(group_nodes)
1641
    self.my_inst_names = utils.NiceSort(group_instances)
1642

    
1643
    self.my_node_info = dict((name, self.all_node_info[name])
1644
                             for name in self.my_node_names)
1645

    
1646
    self.my_inst_info = dict((name, self.all_inst_info[name])
1647
                             for name in self.my_inst_names)
1648

    
1649
    # We detect here the nodes that will need the extra RPC calls for verifying
1650
    # split LV volumes; they should be locked.
1651
    extra_lv_nodes = set()
1652

    
1653
    for inst in self.my_inst_info.values():
1654
      if inst.disk_template in constants.DTS_INT_MIRROR:
1655
        group = self.my_node_info[inst.primary_node].group
1656
        for nname in inst.secondary_nodes:
1657
          if self.all_node_info[nname].group != group:
1658
            extra_lv_nodes.add(nname)
1659

    
1660
    unlocked_lv_nodes = \
1661
        extra_lv_nodes.difference(self.glm.list_owned(locking.LEVEL_NODE))
1662

    
1663
    if unlocked_lv_nodes:
1664
      raise errors.OpPrereqError("these nodes could be locked: %s" %
1665
                                 utils.CommaJoin(unlocked_lv_nodes))
1666
    self.extra_lv_nodes = list(extra_lv_nodes)
1667

    
1668
  def _VerifyNode(self, ninfo, nresult):
1669
    """Perform some basic validation on data returned from a node.
1670

1671
      - check the result data structure is well formed and has all the
1672
        mandatory fields
1673
      - check ganeti version
1674

1675
    @type ninfo: L{objects.Node}
1676
    @param ninfo: the node to check
1677
    @param nresult: the results from the node
1678
    @rtype: boolean
1679
    @return: whether overall this call was successful (and we can expect
1680
         reasonable values in the respose)
1681

1682
    """
1683
    node = ninfo.name
1684
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1685

    
1686
    # main result, nresult should be a non-empty dict
1687
    test = not nresult or not isinstance(nresult, dict)
1688
    _ErrorIf(test, self.ENODERPC, node,
1689
                  "unable to verify node: no data returned")
1690
    if test:
1691
      return False
1692

    
1693
    # compares ganeti version
1694
    local_version = constants.PROTOCOL_VERSION
1695
    remote_version = nresult.get("version", None)
1696
    test = not (remote_version and
1697
                isinstance(remote_version, (list, tuple)) and
1698
                len(remote_version) == 2)
1699
    _ErrorIf(test, self.ENODERPC, node,
1700
             "connection to node returned invalid data")
1701
    if test:
1702
      return False
1703

    
1704
    test = local_version != remote_version[0]
1705
    _ErrorIf(test, self.ENODEVERSION, node,
1706
             "incompatible protocol versions: master %s,"
1707
             " node %s", local_version, remote_version[0])
1708
    if test:
1709
      return False
1710

    
1711
    # node seems compatible, we can actually try to look into its results
1712

    
1713
    # full package version
1714
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1715
                  self.ENODEVERSION, node,
1716
                  "software version mismatch: master %s, node %s",
1717
                  constants.RELEASE_VERSION, remote_version[1],
1718
                  code=self.ETYPE_WARNING)
1719

    
1720
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1721
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1722
      for hv_name, hv_result in hyp_result.iteritems():
1723
        test = hv_result is not None
1724
        _ErrorIf(test, self.ENODEHV, node,
1725
                 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1726

    
1727
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1728
    if ninfo.vm_capable and isinstance(hvp_result, list):
1729
      for item, hv_name, hv_result in hvp_result:
1730
        _ErrorIf(True, self.ENODEHV, node,
1731
                 "hypervisor %s parameter verify failure (source %s): %s",
1732
                 hv_name, item, hv_result)
1733

    
1734
    test = nresult.get(constants.NV_NODESETUP,
1735
                       ["Missing NODESETUP results"])
1736
    _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s",
1737
             "; ".join(test))
1738

    
1739
    return True
1740

    
1741
  def _VerifyNodeTime(self, ninfo, nresult,
1742
                      nvinfo_starttime, nvinfo_endtime):
1743
    """Check the node time.
1744

1745
    @type ninfo: L{objects.Node}
1746
    @param ninfo: the node to check
1747
    @param nresult: the remote results for the node
1748
    @param nvinfo_starttime: the start time of the RPC call
1749
    @param nvinfo_endtime: the end time of the RPC call
1750

1751
    """
1752
    node = ninfo.name
1753
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1754

    
1755
    ntime = nresult.get(constants.NV_TIME, None)
1756
    try:
1757
      ntime_merged = utils.MergeTime(ntime)
1758
    except (ValueError, TypeError):
1759
      _ErrorIf(True, self.ENODETIME, node, "Node returned invalid time")
1760
      return
1761

    
1762
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1763
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1764
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1765
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1766
    else:
1767
      ntime_diff = None
1768

    
1769
    _ErrorIf(ntime_diff is not None, self.ENODETIME, node,
1770
             "Node time diverges by at least %s from master node time",
1771
             ntime_diff)
1772

    
1773
  def _VerifyNodeLVM(self, ninfo, nresult, vg_name):
1774
    """Check the node LVM results.
1775

1776
    @type ninfo: L{objects.Node}
1777
    @param ninfo: the node to check
1778
    @param nresult: the remote results for the node
1779
    @param vg_name: the configured VG name
1780

1781
    """
1782
    if vg_name is None:
1783
      return
1784

    
1785
    node = ninfo.name
1786
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1787

    
1788
    # checks vg existence and size > 20G
1789
    vglist = nresult.get(constants.NV_VGLIST, None)
1790
    test = not vglist
1791
    _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups")
1792
    if not test:
1793
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1794
                                            constants.MIN_VG_SIZE)
1795
      _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus)
1796

    
1797
    # check pv names
1798
    pvlist = nresult.get(constants.NV_PVLIST, None)
1799
    test = pvlist is None
1800
    _ErrorIf(test, self.ENODELVM, node, "Can't get PV list from node")
1801
    if not test:
1802
      # check that ':' is not present in PV names, since it's a
1803
      # special character for lvcreate (denotes the range of PEs to
1804
      # use on the PV)
1805
      for _, pvname, owner_vg in pvlist:
1806
        test = ":" in pvname
1807
        _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV"
1808
                 " '%s' of VG '%s'", pvname, owner_vg)
1809

    
1810
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1811
    """Check the node bridges.
1812

1813
    @type ninfo: L{objects.Node}
1814
    @param ninfo: the node to check
1815
    @param nresult: the remote results for the node
1816
    @param bridges: the expected list of bridges
1817

1818
    """
1819
    if not bridges:
1820
      return
1821

    
1822
    node = ninfo.name
1823
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1824

    
1825
    missing = nresult.get(constants.NV_BRIDGES, None)
1826
    test = not isinstance(missing, list)
1827
    _ErrorIf(test, self.ENODENET, node,
1828
             "did not return valid bridge information")
1829
    if not test:
1830
      _ErrorIf(bool(missing), self.ENODENET, node, "missing bridges: %s" %
1831
               utils.CommaJoin(sorted(missing)))
1832

    
1833
  def _VerifyNodeNetwork(self, ninfo, nresult):
1834
    """Check the node network connectivity results.
1835

1836
    @type ninfo: L{objects.Node}
1837
    @param ninfo: the node to check
1838
    @param nresult: the remote results for the node
1839

1840
    """
1841
    node = ninfo.name
1842
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1843

    
1844
    test = constants.NV_NODELIST not in nresult
1845
    _ErrorIf(test, self.ENODESSH, node,
1846
             "node hasn't returned node ssh connectivity data")
1847
    if not test:
1848
      if nresult[constants.NV_NODELIST]:
1849
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1850
          _ErrorIf(True, self.ENODESSH, node,
1851
                   "ssh communication with node '%s': %s", a_node, a_msg)
1852

    
1853
    test = constants.NV_NODENETTEST not in nresult
1854
    _ErrorIf(test, self.ENODENET, node,
1855
             "node hasn't returned node tcp connectivity data")
1856
    if not test:
1857
      if nresult[constants.NV_NODENETTEST]:
1858
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1859
        for anode in nlist:
1860
          _ErrorIf(True, self.ENODENET, node,
1861
                   "tcp communication with node '%s': %s",
1862
                   anode, nresult[constants.NV_NODENETTEST][anode])
1863

    
1864
    test = constants.NV_MASTERIP not in nresult
1865
    _ErrorIf(test, self.ENODENET, node,
1866
             "node hasn't returned node master IP reachability data")
1867
    if not test:
1868
      if not nresult[constants.NV_MASTERIP]:
1869
        if node == self.master_node:
1870
          msg = "the master node cannot reach the master IP (not configured?)"
1871
        else:
1872
          msg = "cannot reach the master IP"
1873
        _ErrorIf(True, self.ENODENET, node, msg)
1874

    
1875
  def _VerifyInstance(self, instance, instanceconfig, node_image,
1876
                      diskstatus):
1877
    """Verify an instance.
1878

1879
    This function checks to see if the required block devices are
1880
    available on the instance's node.
1881

1882
    """
1883
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1884
    node_current = instanceconfig.primary_node
1885

    
1886
    node_vol_should = {}
1887
    instanceconfig.MapLVsByNode(node_vol_should)
1888

    
1889
    for node in node_vol_should:
1890
      n_img = node_image[node]
1891
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1892
        # ignore missing volumes on offline or broken nodes
1893
        continue
1894
      for volume in node_vol_should[node]:
1895
        test = volume not in n_img.volumes
1896
        _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance,
1897
                 "volume %s missing on node %s", volume, node)
1898

    
1899
    if instanceconfig.admin_up:
1900
      pri_img = node_image[node_current]
1901
      test = instance not in pri_img.instances and not pri_img.offline
1902
      _ErrorIf(test, self.EINSTANCEDOWN, instance,
1903
               "instance not running on its primary node %s",
1904
               node_current)
1905

    
1906
    diskdata = [(nname, success, status, idx)
1907
                for (nname, disks) in diskstatus.items()
1908
                for idx, (success, status) in enumerate(disks)]
1909

    
1910
    for nname, success, bdev_status, idx in diskdata:
1911
      # the 'ghost node' construction in Exec() ensures that we have a
1912
      # node here
1913
      snode = node_image[nname]
1914
      bad_snode = snode.ghost or snode.offline
1915
      _ErrorIf(instanceconfig.admin_up and not success and not bad_snode,
1916
               self.EINSTANCEFAULTYDISK, instance,
1917
               "couldn't retrieve status for disk/%s on %s: %s",
1918
               idx, nname, bdev_status)
1919
      _ErrorIf((instanceconfig.admin_up and success and
1920
                bdev_status.ldisk_status == constants.LDS_FAULTY),
1921
               self.EINSTANCEFAULTYDISK, instance,
1922
               "disk/%s on %s is faulty", idx, nname)
1923

    
1924
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
1925
    """Verify if there are any unknown volumes in the cluster.
1926

1927
    The .os, .swap and backup volumes are ignored. All other volumes are
1928
    reported as unknown.
1929

1930
    @type reserved: L{ganeti.utils.FieldSet}
1931
    @param reserved: a FieldSet of reserved volume names
1932

1933
    """
1934
    for node, n_img in node_image.items():
1935
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1936
        # skip non-healthy nodes
1937
        continue
1938
      for volume in n_img.volumes:
1939
        test = ((node not in node_vol_should or
1940
                volume not in node_vol_should[node]) and
1941
                not reserved.Matches(volume))
1942
        self._ErrorIf(test, self.ENODEORPHANLV, node,
1943
                      "volume %s is unknown", volume)
1944

    
1945
  def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
1946
    """Verify N+1 Memory Resilience.
1947

1948
    Check that if one single node dies we can still start all the
1949
    instances it was primary for.
1950

1951
    """
1952
    cluster_info = self.cfg.GetClusterInfo()
1953
    for node, n_img in node_image.items():
1954
      # This code checks that every node which is now listed as
1955
      # secondary has enough memory to host all instances it is
1956
      # supposed to should a single other node in the cluster fail.
1957
      # FIXME: not ready for failover to an arbitrary node
1958
      # FIXME: does not support file-backed instances
1959
      # WARNING: we currently take into account down instances as well
1960
      # as up ones, considering that even if they're down someone
1961
      # might want to start them even in the event of a node failure.
1962
      if n_img.offline:
1963
        # we're skipping offline nodes from the N+1 warning, since
1964
        # most likely we don't have good memory infromation from them;
1965
        # we already list instances living on such nodes, and that's
1966
        # enough warning
1967
        continue
1968
      for prinode, instances in n_img.sbp.items():
1969
        needed_mem = 0
1970
        for instance in instances:
1971
          bep = cluster_info.FillBE(instance_cfg[instance])
1972
          if bep[constants.BE_AUTO_BALANCE]:
1973
            needed_mem += bep[constants.BE_MEMORY]
1974
        test = n_img.mfree < needed_mem
1975
        self._ErrorIf(test, self.ENODEN1, node,
1976
                      "not enough memory to accomodate instance failovers"
1977
                      " should node %s fail (%dMiB needed, %dMiB available)",
1978
                      prinode, needed_mem, n_img.mfree)
1979

    
1980
  @classmethod
1981
  def _VerifyFiles(cls, errorif, nodeinfo, master_node, all_nvinfo,
1982
                   (files_all, files_all_opt, files_mc, files_vm)):
1983
    """Verifies file checksums collected from all nodes.
1984

1985
    @param errorif: Callback for reporting errors
1986
    @param nodeinfo: List of L{objects.Node} objects
1987
    @param master_node: Name of master node
1988
    @param all_nvinfo: RPC results
1989

1990
    """
1991
    node_names = frozenset(node.name for node in nodeinfo if not node.offline)
1992

    
1993
    assert master_node in node_names
1994
    assert (len(files_all | files_all_opt | files_mc | files_vm) ==
1995
            sum(map(len, [files_all, files_all_opt, files_mc, files_vm]))), \
1996
           "Found file listed in more than one file list"
1997

    
1998
    # Define functions determining which nodes to consider for a file
1999
    file2nodefn = dict([(filename, fn)
2000
      for (files, fn) in [(files_all, None),
2001
                          (files_all_opt, None),
2002
                          (files_mc, lambda node: (node.master_candidate or
2003
                                                   node.name == master_node)),
2004
                          (files_vm, lambda node: node.vm_capable)]
2005
      for filename in files])
2006

    
2007
    fileinfo = dict((filename, {}) for filename in file2nodefn.keys())
2008

    
2009
    for node in nodeinfo:
2010
      if node.offline:
2011
        continue
2012

    
2013
      nresult = all_nvinfo[node.name]
2014

    
2015
      if nresult.fail_msg or not nresult.payload:
2016
        node_files = None
2017
      else:
2018
        node_files = nresult.payload.get(constants.NV_FILELIST, None)
2019

    
2020
      test = not (node_files and isinstance(node_files, dict))
2021
      errorif(test, cls.ENODEFILECHECK, node.name,
2022
              "Node did not return file checksum data")
2023
      if test:
2024
        continue
2025

    
2026
      for (filename, checksum) in node_files.items():
2027
        # Check if the file should be considered for a node
2028
        fn = file2nodefn[filename]
2029
        if fn is None or fn(node):
2030
          fileinfo[filename].setdefault(checksum, set()).add(node.name)
2031

    
2032
    for (filename, checksums) in fileinfo.items():
2033
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2034

    
2035
      # Nodes having the file
2036
      with_file = frozenset(node_name
2037
                            for nodes in fileinfo[filename].values()
2038
                            for node_name in nodes)
2039

    
2040
      # Nodes missing file
2041
      missing_file = node_names - with_file
2042

    
2043
      if filename in files_all_opt:
2044
        # All or no nodes
2045
        errorif(missing_file and missing_file != node_names,
2046
                cls.ECLUSTERFILECHECK, None,
2047
                "File %s is optional, but it must exist on all or no"
2048
                " nodes (not found on %s)",
2049
                filename, utils.CommaJoin(utils.NiceSort(missing_file)))
2050
      else:
2051
        errorif(missing_file, cls.ECLUSTERFILECHECK, None,
2052
                "File %s is missing from node(s) %s", filename,
2053
                utils.CommaJoin(utils.NiceSort(missing_file)))
2054

    
2055
      # See if there are multiple versions of the file
2056
      test = len(checksums) > 1
2057
      if test:
2058
        variants = ["variant %s on %s" %
2059
                    (idx + 1, utils.CommaJoin(utils.NiceSort(nodes)))
2060
                    for (idx, (checksum, nodes)) in
2061
                      enumerate(sorted(checksums.items()))]
2062
      else:
2063
        variants = []
2064

    
2065
      errorif(test, cls.ECLUSTERFILECHECK, None,
2066
              "File %s found with %s different checksums (%s)",
2067
              filename, len(checksums), "; ".join(variants))
2068

    
2069
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2070
                      drbd_map):
2071
    """Verifies and the node DRBD status.
2072

2073
    @type ninfo: L{objects.Node}
2074
    @param ninfo: the node to check
2075
    @param nresult: the remote results for the node
2076
    @param instanceinfo: the dict of instances
2077
    @param drbd_helper: the configured DRBD usermode helper
2078
    @param drbd_map: the DRBD map as returned by
2079
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2080

2081
    """
2082
    node = ninfo.name
2083
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2084

    
2085
    if drbd_helper:
2086
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2087
      test = (helper_result == None)
2088
      _ErrorIf(test, self.ENODEDRBDHELPER, node,
2089
               "no drbd usermode helper returned")
2090
      if helper_result:
2091
        status, payload = helper_result
2092
        test = not status
2093
        _ErrorIf(test, self.ENODEDRBDHELPER, node,
2094
                 "drbd usermode helper check unsuccessful: %s", payload)
2095
        test = status and (payload != drbd_helper)
2096
        _ErrorIf(test, self.ENODEDRBDHELPER, node,
2097
                 "wrong drbd usermode helper: %s", payload)
2098

    
2099
    # compute the DRBD minors
2100
    node_drbd = {}
2101
    for minor, instance in drbd_map[node].items():
2102
      test = instance not in instanceinfo
2103
      _ErrorIf(test, self.ECLUSTERCFG, None,
2104
               "ghost instance '%s' in temporary DRBD map", instance)
2105
        # ghost instance should not be running, but otherwise we
2106
        # don't give double warnings (both ghost instance and
2107
        # unallocated minor in use)
2108
      if test:
2109
        node_drbd[minor] = (instance, False)
2110
      else:
2111
        instance = instanceinfo[instance]
2112
        node_drbd[minor] = (instance.name, instance.admin_up)
2113

    
2114
    # and now check them
2115
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2116
    test = not isinstance(used_minors, (tuple, list))
2117
    _ErrorIf(test, self.ENODEDRBD, node,
2118
             "cannot parse drbd status file: %s", str(used_minors))
2119
    if test:
2120
      # we cannot check drbd status
2121
      return
2122

    
2123
    for minor, (iname, must_exist) in node_drbd.items():
2124
      test = minor not in used_minors and must_exist
2125
      _ErrorIf(test, self.ENODEDRBD, node,
2126
               "drbd minor %d of instance %s is not active", minor, iname)
2127
    for minor in used_minors:
2128
      test = minor not in node_drbd
2129
      _ErrorIf(test, self.ENODEDRBD, node,
2130
               "unallocated drbd minor %d is in use", minor)
2131

    
2132
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2133
    """Builds the node OS structures.
2134

2135
    @type ninfo: L{objects.Node}
2136
    @param ninfo: the node to check
2137
    @param nresult: the remote results for the node
2138
    @param nimg: the node image object
2139

2140
    """
2141
    node = ninfo.name
2142
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2143

    
2144
    remote_os = nresult.get(constants.NV_OSLIST, None)
2145
    test = (not isinstance(remote_os, list) or
2146
            not compat.all(isinstance(v, list) and len(v) == 7
2147
                           for v in remote_os))
2148

    
2149
    _ErrorIf(test, self.ENODEOS, node,
2150
             "node hasn't returned valid OS data")
2151

    
2152
    nimg.os_fail = test
2153

    
2154
    if test:
2155
      return
2156

    
2157
    os_dict = {}
2158

    
2159
    for (name, os_path, status, diagnose,
2160
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2161

    
2162
      if name not in os_dict:
2163
        os_dict[name] = []
2164

    
2165
      # parameters is a list of lists instead of list of tuples due to
2166
      # JSON lacking a real tuple type, fix it:
2167
      parameters = [tuple(v) for v in parameters]
2168
      os_dict[name].append((os_path, status, diagnose,
2169
                            set(variants), set(parameters), set(api_ver)))
2170

    
2171
    nimg.oslist = os_dict
2172

    
2173
  def _VerifyNodeOS(self, ninfo, nimg, base):
2174
    """Verifies the node OS list.
2175

2176
    @type ninfo: L{objects.Node}
2177
    @param ninfo: the node to check
2178
    @param nimg: the node image object
2179
    @param base: the 'template' node we match against (e.g. from the master)
2180

2181
    """
2182
    node = ninfo.name
2183
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2184

    
2185
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2186

    
2187
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2188
    for os_name, os_data in nimg.oslist.items():
2189
      assert os_data, "Empty OS status for OS %s?!" % os_name
2190
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2191
      _ErrorIf(not f_status, self.ENODEOS, node,
2192
               "Invalid OS %s (located at %s): %s", os_name, f_path, f_diag)
2193
      _ErrorIf(len(os_data) > 1, self.ENODEOS, node,
2194
               "OS '%s' has multiple entries (first one shadows the rest): %s",
2195
               os_name, utils.CommaJoin([v[0] for v in os_data]))
2196
      # comparisons with the 'base' image
2197
      test = os_name not in base.oslist
2198
      _ErrorIf(test, self.ENODEOS, node,
2199
               "Extra OS %s not present on reference node (%s)",
2200
               os_name, base.name)
2201
      if test:
2202
        continue
2203
      assert base.oslist[os_name], "Base node has empty OS status?"
2204
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2205
      if not b_status:
2206
        # base OS is invalid, skipping
2207
        continue
2208
      for kind, a, b in [("API version", f_api, b_api),
2209
                         ("variants list", f_var, b_var),
2210
                         ("parameters", beautify_params(f_param),
2211
                          beautify_params(b_param))]:
2212
        _ErrorIf(a != b, self.ENODEOS, node,
2213
                 "OS %s for %s differs from reference node %s: [%s] vs. [%s]",
2214
                 kind, os_name, base.name,
2215
                 utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2216

    
2217
    # check any missing OSes
2218
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2219
    _ErrorIf(missing, self.ENODEOS, node,
2220
             "OSes present on reference node %s but missing on this node: %s",
2221
             base.name, utils.CommaJoin(missing))
2222

    
2223
  def _VerifyOob(self, ninfo, nresult):
2224
    """Verifies out of band functionality of a node.
2225

2226
    @type ninfo: L{objects.Node}
2227
    @param ninfo: the node to check
2228
    @param nresult: the remote results for the node
2229

2230
    """
2231
    node = ninfo.name
2232
    # We just have to verify the paths on master and/or master candidates
2233
    # as the oob helper is invoked on the master
2234
    if ((ninfo.master_candidate or ninfo.master_capable) and
2235
        constants.NV_OOB_PATHS in nresult):
2236
      for path_result in nresult[constants.NV_OOB_PATHS]:
2237
        self._ErrorIf(path_result, self.ENODEOOBPATH, node, path_result)
2238

    
2239
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2240
    """Verifies and updates the node volume data.
2241

2242
    This function will update a L{NodeImage}'s internal structures
2243
    with data from the remote call.
2244

2245
    @type ninfo: L{objects.Node}
2246
    @param ninfo: the node to check
2247
    @param nresult: the remote results for the node
2248
    @param nimg: the node image object
2249
    @param vg_name: the configured VG name
2250

2251
    """
2252
    node = ninfo.name
2253
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2254

    
2255
    nimg.lvm_fail = True
2256
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2257
    if vg_name is None:
2258
      pass
2259
    elif isinstance(lvdata, basestring):
2260
      _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s",
2261
               utils.SafeEncode(lvdata))
2262
    elif not isinstance(lvdata, dict):
2263
      _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)")
2264
    else:
2265
      nimg.volumes = lvdata
2266
      nimg.lvm_fail = False
2267

    
2268
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2269
    """Verifies and updates the node instance list.
2270

2271
    If the listing was successful, then updates this node's instance
2272
    list. Otherwise, it marks the RPC call as failed for the instance
2273
    list key.
2274

2275
    @type ninfo: L{objects.Node}
2276
    @param ninfo: the node to check
2277
    @param nresult: the remote results for the node
2278
    @param nimg: the node image object
2279

2280
    """
2281
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2282
    test = not isinstance(idata, list)
2283
    self._ErrorIf(test, self.ENODEHV, ninfo.name, "rpc call to node failed"
2284
                  " (instancelist): %s", utils.SafeEncode(str(idata)))
2285
    if test:
2286
      nimg.hyp_fail = True
2287
    else:
2288
      nimg.instances = idata
2289

    
2290
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2291
    """Verifies and computes a node information map
2292

2293
    @type ninfo: L{objects.Node}
2294
    @param ninfo: the node to check
2295
    @param nresult: the remote results for the node
2296
    @param nimg: the node image object
2297
    @param vg_name: the configured VG name
2298

2299
    """
2300
    node = ninfo.name
2301
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2302

    
2303
    # try to read free memory (from the hypervisor)
2304
    hv_info = nresult.get(constants.NV_HVINFO, None)
2305
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2306
    _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)")
2307
    if not test:
2308
      try:
2309
        nimg.mfree = int(hv_info["memory_free"])
2310
      except (ValueError, TypeError):
2311
        _ErrorIf(True, self.ENODERPC, node,
2312
                 "node returned invalid nodeinfo, check hypervisor")
2313

    
2314
    # FIXME: devise a free space model for file based instances as well
2315
    if vg_name is not None:
2316
      test = (constants.NV_VGLIST not in nresult or
2317
              vg_name not in nresult[constants.NV_VGLIST])
2318
      _ErrorIf(test, self.ENODELVM, node,
2319
               "node didn't return data for the volume group '%s'"
2320
               " - it is either missing or broken", vg_name)
2321
      if not test:
2322
        try:
2323
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2324
        except (ValueError, TypeError):
2325
          _ErrorIf(True, self.ENODERPC, node,
2326
                   "node returned invalid LVM info, check LVM status")
2327

    
2328
  def _CollectDiskInfo(self, nodelist, node_image, instanceinfo):
2329
    """Gets per-disk status information for all instances.
2330

2331
    @type nodelist: list of strings
2332
    @param nodelist: Node names
2333
    @type node_image: dict of (name, L{objects.Node})
2334
    @param node_image: Node objects
2335
    @type instanceinfo: dict of (name, L{objects.Instance})
2336
    @param instanceinfo: Instance objects
2337
    @rtype: {instance: {node: [(succes, payload)]}}
2338
    @return: a dictionary of per-instance dictionaries with nodes as
2339
        keys and disk information as values; the disk information is a
2340
        list of tuples (success, payload)
2341

2342
    """
2343
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2344

    
2345
    node_disks = {}
2346
    node_disks_devonly = {}
2347
    diskless_instances = set()
2348
    diskless = constants.DT_DISKLESS
2349

    
2350
    for nname in nodelist:
2351
      node_instances = list(itertools.chain(node_image[nname].pinst,
2352
                                            node_image[nname].sinst))
2353
      diskless_instances.update(inst for inst in node_instances
2354
                                if instanceinfo[inst].disk_template == diskless)
2355
      disks = [(inst, disk)
2356
               for inst in node_instances
2357
               for disk in instanceinfo[inst].disks]
2358

    
2359
      if not disks:
2360
        # No need to collect data
2361
        continue
2362

    
2363
      node_disks[nname] = disks
2364

    
2365
      # Creating copies as SetDiskID below will modify the objects and that can
2366
      # lead to incorrect data returned from nodes
2367
      devonly = [dev.Copy() for (_, dev) in disks]
2368

    
2369
      for dev in devonly:
2370
        self.cfg.SetDiskID(dev, nname)
2371

    
2372
      node_disks_devonly[nname] = devonly
2373

    
2374
    assert len(node_disks) == len(node_disks_devonly)
2375

    
2376
    # Collect data from all nodes with disks
2377
    result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2378
                                                          node_disks_devonly)
2379

    
2380
    assert len(result) == len(node_disks)
2381

    
2382
    instdisk = {}
2383

    
2384
    for (nname, nres) in result.items():
2385
      disks = node_disks[nname]
2386

    
2387
      if nres.offline:
2388
        # No data from this node
2389
        data = len(disks) * [(False, "node offline")]
2390
      else:
2391
        msg = nres.fail_msg
2392
        _ErrorIf(msg, self.ENODERPC, nname,
2393
                 "while getting disk information: %s", msg)
2394
        if msg:
2395
          # No data from this node
2396
          data = len(disks) * [(False, msg)]
2397
        else:
2398
          data = []
2399
          for idx, i in enumerate(nres.payload):
2400
            if isinstance(i, (tuple, list)) and len(i) == 2:
2401
              data.append(i)
2402
            else:
2403
              logging.warning("Invalid result from node %s, entry %d: %s",
2404
                              nname, idx, i)
2405
              data.append((False, "Invalid result from the remote node"))
2406

    
2407
      for ((inst, _), status) in zip(disks, data):
2408
        instdisk.setdefault(inst, {}).setdefault(nname, []).append(status)
2409

    
2410
    # Add empty entries for diskless instances.
2411
    for inst in diskless_instances:
2412
      assert inst not in instdisk
2413
      instdisk[inst] = {}
2414

    
2415
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2416
                      len(nnames) <= len(instanceinfo[inst].all_nodes) and
2417
                      compat.all(isinstance(s, (tuple, list)) and
2418
                                 len(s) == 2 for s in statuses)
2419
                      for inst, nnames in instdisk.items()
2420
                      for nname, statuses in nnames.items())
2421
    assert set(instdisk) == set(instanceinfo), "instdisk consistency failure"
2422

    
2423
    return instdisk
2424

    
2425
  def BuildHooksEnv(self):
2426
    """Build hooks env.
2427

2428
    Cluster-Verify hooks just ran in the post phase and their failure makes
2429
    the output be logged in the verify output and the verification to fail.
2430

2431
    """
2432
    env = {
2433
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
2434
      }
2435

    
2436
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2437
               for node in self.my_node_info.values())
2438

    
2439
    return env
2440

    
2441
  def BuildHooksNodes(self):
2442
    """Build hooks nodes.
2443

2444
    """
2445
    return ([], self.my_node_names)
2446

    
2447
  def Exec(self, feedback_fn):
2448
    """Verify integrity of the node group, performing various test on nodes.
2449

2450
    """
2451
    # This method has too many local variables. pylint: disable-msg=R0914
2452

    
2453
    if not self.my_node_names:
2454
      # empty node group
2455
      feedback_fn("* Empty node group, skipping verification")
2456
      return True
2457

    
2458
    self.bad = False
2459
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2460
    verbose = self.op.verbose
2461
    self._feedback_fn = feedback_fn
2462

    
2463
    vg_name = self.cfg.GetVGName()
2464
    drbd_helper = self.cfg.GetDRBDHelper()
2465
    cluster = self.cfg.GetClusterInfo()
2466
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
2467
    hypervisors = cluster.enabled_hypervisors
2468
    node_data_list = [self.my_node_info[name] for name in self.my_node_names]
2469

    
2470
    i_non_redundant = [] # Non redundant instances
2471
    i_non_a_balanced = [] # Non auto-balanced instances
2472
    n_offline = 0 # Count of offline nodes
2473
    n_drained = 0 # Count of nodes being drained
2474
    node_vol_should = {}
2475

    
2476
    # FIXME: verify OS list
2477

    
2478
    # File verification
2479
    filemap = _ComputeAncillaryFiles(cluster, False)
2480

    
2481
    # do local checksums
2482
    master_node = self.master_node = self.cfg.GetMasterNode()
2483
    master_ip = self.cfg.GetMasterIP()
2484

    
2485
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_names))
2486

    
2487
    # We will make nodes contact all nodes in their group, and one node from
2488
    # every other group.
2489
    # TODO: should it be a *random* node, different every time?
2490
    online_nodes = [node.name for node in node_data_list if not node.offline]
2491
    other_group_nodes = {}
2492

    
2493
    for name in sorted(self.all_node_info):
2494
      node = self.all_node_info[name]
2495
      if (node.group not in other_group_nodes
2496
          and node.group != self.group_uuid
2497
          and not node.offline):
2498
        other_group_nodes[node.group] = node.name
2499

    
2500
    node_verify_param = {
2501
      constants.NV_FILELIST:
2502
        utils.UniqueSequence(filename
2503
                             for files in filemap
2504
                             for filename in files),
2505
      constants.NV_NODELIST: online_nodes + other_group_nodes.values(),
2506
      constants.NV_HYPERVISOR: hypervisors,
2507
      constants.NV_HVPARAMS:
2508
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2509
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2510
                                 for node in node_data_list
2511
                                 if not node.offline],
2512
      constants.NV_INSTANCELIST: hypervisors,
2513
      constants.NV_VERSION: None,
2514
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2515
      constants.NV_NODESETUP: None,
2516
      constants.NV_TIME: None,
2517
      constants.NV_MASTERIP: (master_node, master_ip),
2518
      constants.NV_OSLIST: None,
2519
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2520
      }
2521

    
2522
    if vg_name is not None:
2523
      node_verify_param[constants.NV_VGLIST] = None
2524
      node_verify_param[constants.NV_LVLIST] = vg_name
2525
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2526
      node_verify_param[constants.NV_DRBDLIST] = None
2527

    
2528
    if drbd_helper:
2529
      node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2530

    
2531
    # bridge checks
2532
    # FIXME: this needs to be changed per node-group, not cluster-wide
2533
    bridges = set()
2534
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2535
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2536
      bridges.add(default_nicpp[constants.NIC_LINK])
2537
    for instance in self.my_inst_info.values():
2538
      for nic in instance.nics:
2539
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
2540
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2541
          bridges.add(full_nic[constants.NIC_LINK])
2542

    
2543
    if bridges:
2544
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
2545

    
2546
    # Build our expected cluster state
2547
    node_image = dict((node.name, self.NodeImage(offline=node.offline,
2548
                                                 name=node.name,
2549
                                                 vm_capable=node.vm_capable))
2550
                      for node in node_data_list)
2551

    
2552
    # Gather OOB paths
2553
    oob_paths = []
2554
    for node in self.all_node_info.values():
2555
      path = _SupportsOob(self.cfg, node)
2556
      if path and path not in oob_paths:
2557
        oob_paths.append(path)
2558

    
2559
    if oob_paths:
2560
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2561

    
2562
    for instance in self.my_inst_names:
2563
      inst_config = self.my_inst_info[instance]
2564

    
2565
      for nname in inst_config.all_nodes:
2566
        if nname not in node_image:
2567
          gnode = self.NodeImage(name=nname)
2568
          gnode.ghost = (nname not in self.all_node_info)
2569
          node_image[nname] = gnode
2570

    
2571
      inst_config.MapLVsByNode(node_vol_should)
2572

    
2573
      pnode = inst_config.primary_node
2574
      node_image[pnode].pinst.append(instance)
2575

    
2576
      for snode in inst_config.secondary_nodes:
2577
        nimg = node_image[snode]
2578
        nimg.sinst.append(instance)
2579
        if pnode not in nimg.sbp:
2580
          nimg.sbp[pnode] = []
2581
        nimg.sbp[pnode].append(instance)
2582

    
2583
    # At this point, we have the in-memory data structures complete,
2584
    # except for the runtime information, which we'll gather next
2585

    
2586
    # Due to the way our RPC system works, exact response times cannot be
2587
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2588
    # time before and after executing the request, we can at least have a time
2589
    # window.
2590
    nvinfo_starttime = time.time()
2591
    all_nvinfo = self.rpc.call_node_verify(self.my_node_names,
2592
                                           node_verify_param,
2593
                                           self.cfg.GetClusterName())
2594
    nvinfo_endtime = time.time()
2595

    
2596
    if self.extra_lv_nodes and vg_name is not None:
2597
      extra_lv_nvinfo = \
2598
          self.rpc.call_node_verify(self.extra_lv_nodes,
2599
                                    {constants.NV_LVLIST: vg_name},
2600
                                    self.cfg.GetClusterName())
2601
    else:
2602
      extra_lv_nvinfo = {}
2603

    
2604
    all_drbd_map = self.cfg.ComputeDRBDMap()
2605

    
2606
    feedback_fn("* Gathering disk information (%s nodes)" %
2607
                len(self.my_node_names))
2608
    instdisk = self._CollectDiskInfo(self.my_node_names, node_image,
2609
                                     self.my_inst_info)
2610

    
2611
    feedback_fn("* Verifying configuration file consistency")
2612

    
2613
    # If not all nodes are being checked, we need to make sure the master node
2614
    # and a non-checked vm_capable node are in the list.
2615
    absent_nodes = set(self.all_node_info).difference(self.my_node_info)
2616
    if absent_nodes:
2617
      vf_nvinfo = all_nvinfo.copy()
2618
      vf_node_info = list(self.my_node_info.values())
2619
      additional_nodes = []
2620
      if master_node not in self.my_node_info:
2621
        additional_nodes.append(master_node)
2622
        vf_node_info.append(self.all_node_info[master_node])
2623
      # Add the first vm_capable node we find which is not included
2624
      for node in absent_nodes:
2625
        nodeinfo = self.all_node_info[node]
2626
        if nodeinfo.vm_capable and not nodeinfo.offline:
2627
          additional_nodes.append(node)
2628
          vf_node_info.append(self.all_node_info[node])
2629
          break
2630
      key = constants.NV_FILELIST
2631
      vf_nvinfo.update(self.rpc.call_node_verify(additional_nodes,
2632
                                                 {key: node_verify_param[key]},
2633
                                                 self.cfg.GetClusterName()))
2634
    else:
2635
      vf_nvinfo = all_nvinfo
2636
      vf_node_info = self.my_node_info.values()
2637

    
2638
    self._VerifyFiles(_ErrorIf, vf_node_info, master_node, vf_nvinfo, filemap)
2639

    
2640
    feedback_fn("* Verifying node status")
2641

    
2642
    refos_img = None
2643

    
2644
    for node_i in node_data_list:
2645
      node = node_i.name
2646
      nimg = node_image[node]
2647

    
2648
      if node_i.offline:
2649
        if verbose:
2650
          feedback_fn("* Skipping offline node %s" % (node,))
2651
        n_offline += 1
2652
        continue
2653

    
2654
      if node == master_node:
2655
        ntype = "master"
2656
      elif node_i.master_candidate:
2657
        ntype = "master candidate"
2658
      elif node_i.drained:
2659
        ntype = "drained"
2660
        n_drained += 1
2661
      else:
2662
        ntype = "regular"
2663
      if verbose:
2664
        feedback_fn("* Verifying node %s (%s)" % (node, ntype))
2665

    
2666
      msg = all_nvinfo[node].fail_msg
2667
      _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg)
2668
      if msg:
2669
        nimg.rpc_fail = True
2670
        continue
2671

    
2672
      nresult = all_nvinfo[node].payload
2673

    
2674
      nimg.call_ok = self._VerifyNode(node_i, nresult)
2675
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2676
      self._VerifyNodeNetwork(node_i, nresult)
2677
      self._VerifyOob(node_i, nresult)
2678

    
2679
      if nimg.vm_capable:
2680
        self._VerifyNodeLVM(node_i, nresult, vg_name)
2681
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
2682
                             all_drbd_map)
2683

    
2684
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2685
        self._UpdateNodeInstances(node_i, nresult, nimg)
2686
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2687
        self._UpdateNodeOS(node_i, nresult, nimg)
2688

    
2689
        if not nimg.os_fail:
2690
          if refos_img is None:
2691
            refos_img = nimg
2692
          self._VerifyNodeOS(node_i, nimg, refos_img)
2693
        self._VerifyNodeBridges(node_i, nresult, bridges)
2694

    
2695
        # Check whether all running instancies are primary for the node. (This
2696
        # can no longer be done from _VerifyInstance below, since some of the
2697
        # wrong instances could be from other node groups.)
2698
        non_primary_inst = set(nimg.instances).difference(nimg.pinst)
2699

    
2700
        for inst in non_primary_inst:
2701
          test = inst in self.all_inst_info
2702
          _ErrorIf(test, self.EINSTANCEWRONGNODE, inst,
2703
                   "instance should not run on node %s", node_i.name)
2704
          _ErrorIf(not test, self.ENODEORPHANINSTANCE, node_i.name,
2705
                   "node is running unknown instance %s", inst)
2706

    
2707
    for node, result in extra_lv_nvinfo.items():
2708
      self._UpdateNodeVolumes(self.all_node_info[node], result.payload,
2709
                              node_image[node], vg_name)
2710

    
2711
    feedback_fn("* Verifying instance status")
2712
    for instance in self.my_inst_names:
2713
      if verbose:
2714
        feedback_fn("* Verifying instance %s" % instance)
2715
      inst_config = self.my_inst_info[instance]
2716
      self._VerifyInstance(instance, inst_config, node_image,
2717
                           instdisk[instance])
2718
      inst_nodes_offline = []
2719

    
2720
      pnode = inst_config.primary_node
2721
      pnode_img = node_image[pnode]
2722
      _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2723
               self.ENODERPC, pnode, "instance %s, connection to"
2724
               " primary node failed", instance)
2725

    
2726
      _ErrorIf(inst_config.admin_up and pnode_img.offline,
2727
               self.EINSTANCEBADNODE, instance,
2728
               "instance is marked as running and lives on offline node %s",
2729
               inst_config.primary_node)
2730

    
2731
      # If the instance is non-redundant we cannot survive losing its primary
2732
      # node, so we are not N+1 compliant. On the other hand we have no disk
2733
      # templates with more than one secondary so that situation is not well
2734
      # supported either.
2735
      # FIXME: does not support file-backed instances
2736
      if not inst_config.secondary_nodes:
2737
        i_non_redundant.append(instance)
2738

    
2739
      _ErrorIf(len(inst_config.secondary_nodes) > 1, self.EINSTANCELAYOUT,
2740
               instance, "instance has multiple secondary nodes: %s",
2741
               utils.CommaJoin(inst_config.secondary_nodes),
2742
               code=self.ETYPE_WARNING)
2743

    
2744
      if inst_config.disk_template in constants.DTS_INT_MIRROR:
2745
        pnode = inst_config.primary_node
2746
        instance_nodes = utils.NiceSort(inst_config.all_nodes)
2747
        instance_groups = {}
2748

    
2749
        for node in instance_nodes:
2750
          instance_groups.setdefault(self.all_node_info[node].group,
2751
                                     []).append(node)
2752

    
2753
        pretty_list = [
2754
          "%s (group %s)" % (utils.CommaJoin(nodes), groupinfo[group].name)
2755
          # Sort so that we always list the primary node first.
2756
          for group, nodes in sorted(instance_groups.items(),
2757
                                     key=lambda (_, nodes): pnode in nodes,
2758
                                     reverse=True)]
2759

    
2760
        self._ErrorIf(len(instance_groups) > 1, self.EINSTANCESPLITGROUPS,
2761
                      instance, "instance has primary and secondary nodes in"
2762
                      " different groups: %s", utils.CommaJoin(pretty_list),
2763
                      code=self.ETYPE_WARNING)
2764

    
2765
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
2766
        i_non_a_balanced.append(instance)
2767

    
2768
      for snode in inst_config.secondary_nodes:
2769
        s_img = node_image[snode]
2770
        _ErrorIf(s_img.rpc_fail and not s_img.offline, self.ENODERPC, snode,
2771
                 "instance %s, connection to secondary node failed", instance)
2772

    
2773
        if s_img.offline:
2774
          inst_nodes_offline.append(snode)
2775

    
2776
      # warn that the instance lives on offline nodes
2777
      _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance,
2778
               "instance has offline secondary node(s) %s",
2779
               utils.CommaJoin(inst_nodes_offline))
2780
      # ... or ghost/non-vm_capable nodes
2781
      for node in inst_config.all_nodes:
2782
        _ErrorIf(node_image[node].ghost, self.EINSTANCEBADNODE, instance,
2783
                 "instance lives on ghost node %s", node)
2784
        _ErrorIf(not node_image[node].vm_capable, self.EINSTANCEBADNODE,
2785
                 instance, "instance lives on non-vm_capable node %s", node)
2786

    
2787
    feedback_fn("* Verifying orphan volumes")
2788
    reserved = utils.FieldSet(*cluster.reserved_lvs)
2789

    
2790
    # We will get spurious "unknown volume" warnings if any node of this group
2791
    # is secondary for an instance whose primary is in another group. To avoid
2792
    # them, we find these instances and add their volumes to node_vol_should.
2793
    for inst in self.all_inst_info.values():
2794
      for secondary in inst.secondary_nodes:
2795
        if (secondary in self.my_node_info
2796
            and inst.name not in self.my_inst_info):
2797
          inst.MapLVsByNode(node_vol_should)
2798
          break
2799

    
2800
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
2801

    
2802
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
2803
      feedback_fn("* Verifying N+1 Memory redundancy")
2804
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
2805

    
2806
    feedback_fn("* Other Notes")
2807
    if i_non_redundant:
2808
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
2809
                  % len(i_non_redundant))
2810

    
2811
    if i_non_a_balanced:
2812
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
2813
                  % len(i_non_a_balanced))
2814

    
2815
    if n_offline:
2816
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
2817

    
2818
    if n_drained:
2819
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
2820

    
2821
    return not self.bad
2822

    
2823
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
2824
    """Analyze the post-hooks' result
2825

2826
    This method analyses the hook result, handles it, and sends some
2827
    nicely-formatted feedback back to the user.
2828

2829
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
2830
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
2831
    @param hooks_results: the results of the multi-node hooks rpc call
2832
    @param feedback_fn: function used send feedback back to the caller
2833
    @param lu_result: previous Exec result
2834
    @return: the new Exec result, based on the previous result
2835
        and hook results
2836

2837
    """
2838
    # We only really run POST phase hooks, only for non-empty groups,
2839
    # and are only interested in their results
2840
    if not self.my_node_names:
2841
      # empty node group
2842
      pass
2843
    elif phase == constants.HOOKS_PHASE_POST:
2844
      # Used to change hooks' output to proper indentation
2845
      feedback_fn("* Hooks Results")
2846
      assert hooks_results, "invalid result from hooks"
2847

    
2848
      for node_name in hooks_results:
2849
        res = hooks_results[node_name]
2850
        msg = res.fail_msg
2851
        test = msg and not res.offline
2852
        self._ErrorIf(test, self.ENODEHOOKS, node_name,
2853
                      "Communication failure in hooks execution: %s", msg)
2854
        if res.offline or msg:
2855
          # No need to investigate payload if node is offline or gave an error.
2856
          # override manually lu_result here as _ErrorIf only
2857
          # overrides self.bad
2858
          lu_result = 1
2859
          continue
2860
        for script, hkr, output in res.payload:
2861
          test = hkr == constants.HKR_FAIL
2862
          self._ErrorIf(test, self.ENODEHOOKS, node_name,
2863
                        "Script %s failed, output:", script)
2864
          if test:
2865
            output = self._HOOKS_INDENT_RE.sub("      ", output)
2866
            feedback_fn("%s" % output)
2867
            lu_result = 0
2868

    
2869
    return lu_result
2870

    
2871

    
2872
class LUClusterVerifyDisks(NoHooksLU):
2873
  """Verifies the cluster disks status.
2874

2875
  """
2876
  REQ_BGL = False
2877

    
2878
  def ExpandNames(self):
2879
    self.share_locks = _ShareAll()
2880
    self.needed_locks = {
2881
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
2882
      }
2883

    
2884
  def Exec(self, feedback_fn):
2885
    group_names = self.glm.list_owned(locking.LEVEL_NODEGROUP)
2886

    
2887
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
2888
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
2889
                           for group in group_names])
2890

    
2891

    
2892
class LUGroupVerifyDisks(NoHooksLU):
2893
  """Verifies the status of all disks in a node group.
2894

2895
  """
2896
  REQ_BGL = False
2897

    
2898
  def ExpandNames(self):
2899
    # Raises errors.OpPrereqError on its own if group can't be found
2900
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
2901

    
2902
    self.share_locks = _ShareAll()
2903
    self.needed_locks = {
2904
      locking.LEVEL_INSTANCE: [],
2905
      locking.LEVEL_NODEGROUP: [],
2906
      locking.LEVEL_NODE: [],
2907
      }
2908

    
2909
  def DeclareLocks(self, level):
2910
    if level == locking.LEVEL_INSTANCE:
2911
      assert not self.needed_locks[locking.LEVEL_INSTANCE]
2912

    
2913
      # Lock instances optimistically, needs verification once node and group
2914
      # locks have been acquired
2915
      self.needed_locks[locking.LEVEL_INSTANCE] = \
2916
        self.cfg.GetNodeGroupInstances(self.group_uuid)
2917

    
2918
    elif level == locking.LEVEL_NODEGROUP:
2919
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
2920

    
2921
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
2922
        set([self.group_uuid] +
2923
            # Lock all groups used by instances optimistically; this requires
2924
            # going via the node before it's locked, requiring verification
2925
            # later on
2926
            [group_uuid
2927
             for instance_name in
2928
               self.glm.list_owned(locking.LEVEL_INSTANCE)
2929
             for group_uuid in
2930
               self.cfg.GetInstanceNodeGroups(instance_name)])
2931

    
2932
    elif level == locking.LEVEL_NODE:
2933
      # This will only lock the nodes in the group to be verified which contain
2934
      # actual instances
2935
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
2936
      self._LockInstancesNodes()
2937

    
2938
      # Lock all nodes in group to be verified
2939
      assert self.group_uuid in self.glm.list_owned(locking.LEVEL_NODEGROUP)
2940
      member_nodes = self.cfg.GetNodeGroup(self.group_uuid).members
2941
      self.needed_locks[locking.LEVEL_NODE].extend(member_nodes)
2942

    
2943
  def CheckPrereq(self):
2944
    owned_instances = frozenset(self.glm.list_owned(locking.LEVEL_INSTANCE))
2945
    owned_groups = frozenset(self.glm.list_owned(locking.LEVEL_NODEGROUP))
2946
    owned_nodes = frozenset(self.glm.list_owned(locking.LEVEL_NODE))
2947

    
2948
    assert self.group_uuid in owned_groups
2949

    
2950
    # Check if locked instances are still correct
2951
    wanted_instances = self.cfg.GetNodeGroupInstances(self.group_uuid)
2952
    if owned_instances != wanted_instances:
2953
      raise errors.OpPrereqError("Instances in node group %s changed since"
2954
                                 " locks were acquired, wanted %s, have %s;"
2955
                                 " retry the operation" %
2956
                                 (self.op.group_name,
2957
                                  utils.CommaJoin(wanted_instances),
2958
                                  utils.CommaJoin(owned_instances)),
2959
                                 errors.ECODE_STATE)
2960

    
2961
    # Get instance information
2962
    self.instances = dict((name, self.cfg.GetInstanceInfo(name))
2963
                          for name in owned_instances)
2964

    
2965
    # Check if node groups for locked instances are still correct
2966
    for (instance_name, inst) in self.instances.items():
2967
      assert self.group_uuid in self.cfg.GetInstanceNodeGroups(instance_name), \
2968
        "Instance %s has no node in group %s" % (instance_name, self.group_uuid)
2969
      assert owned_nodes.issuperset(inst.all_nodes), \
2970
        "Instance %s's nodes changed while we kept the lock" % instance_name
2971

    
2972
      inst_groups = self.cfg.GetInstanceNodeGroups(instance_name)
2973
      if not owned_groups.issuperset(inst_groups):
2974
        raise errors.OpPrereqError("Instance %s's node groups changed since"
2975
                                   " locks were acquired, current groups are"
2976
                                   " are '%s', owning groups '%s'; retry the"
2977
                                   " operation" %
2978
                                   (instance_name,
2979
                                    utils.CommaJoin(inst_groups),
2980
                                    utils.CommaJoin(owned_groups)),
2981
                                   errors.ECODE_STATE)
2982

    
2983
  def Exec(self, feedback_fn):
2984
    """Verify integrity of cluster disks.
2985

2986
    @rtype: tuple of three items
2987
    @return: a tuple of (dict of node-to-node_error, list of instances
2988
        which need activate-disks, dict of instance: (node, volume) for
2989
        missing volumes
2990

2991
    """
2992
    res_nodes = {}
2993
    res_instances = set()
2994
    res_missing = {}
2995

    
2996
    nv_dict = _MapInstanceDisksToNodes([inst
2997
                                        for inst in self.instances.values()
2998
                                        if inst.admin_up])
2999

    
3000
    if nv_dict:
3001
      nodes = utils.NiceSort(set(self.glm.list_owned(locking.LEVEL_NODE)) &
3002
                             set(self.cfg.GetVmCapableNodeList()))
3003

    
3004
      node_lvs = self.rpc.call_lv_list(nodes, [])
3005

    
3006
      for (node, node_res) in node_lvs.items():
3007
        if node_res.offline:
3008
          continue
3009

    
3010
        msg = node_res.fail_msg
3011
        if msg:
3012
          logging.warning("Error enumerating LVs on node %s: %s", node, msg)
3013
          res_nodes[node] = msg
3014
          continue
3015

    
3016
        for lv_name, (_, _, lv_online) in node_res.payload.items():
3017
          inst = nv_dict.pop((node, lv_name), None)
3018
          if not (lv_online or inst is None):
3019
            res_instances.add(inst)
3020

    
3021
      # any leftover items in nv_dict are missing LVs, let's arrange the data
3022
      # better
3023
      for key, inst in nv_dict.iteritems():
3024
        res_missing.setdefault(inst, []).append(key)
3025

    
3026
    return (res_nodes, list(res_instances), res_missing)
3027

    
3028

    
3029
class LUClusterRepairDiskSizes(NoHooksLU):
3030
  """Verifies the cluster disks sizes.
3031

3032
  """
3033
  REQ_BGL = False
3034

    
3035
  def ExpandNames(self):
3036
    if self.op.instances:
3037
      self.wanted_names = _GetWantedInstances(self, self.op.instances)
3038
      self.needed_locks = {
3039
        locking.LEVEL_NODE: [],
3040
        locking.LEVEL_INSTANCE: self.wanted_names,
3041
        }
3042
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3043
    else:
3044
      self.wanted_names = None
3045
      self.needed_locks = {
3046
        locking.LEVEL_NODE: locking.ALL_SET,
3047
        locking.LEVEL_INSTANCE: locking.ALL_SET,
3048
        }
3049
    self.share_locks = _ShareAll()
3050

    
3051
  def DeclareLocks(self, level):
3052
    if level == locking.LEVEL_NODE and self.wanted_names is not None:
3053
      self._LockInstancesNodes(primary_only=True)
3054

    
3055
  def CheckPrereq(self):
3056
    """Check prerequisites.
3057

3058
    This only checks the optional instance list against the existing names.
3059

3060
    """
3061
    if self.wanted_names is None:
3062
      self.wanted_names = self.glm.list_owned(locking.LEVEL_INSTANCE)
3063

    
3064
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3065
                             in self.wanted_names]
3066

    
3067
  def _EnsureChildSizes(self, disk):
3068
    """Ensure children of the disk have the needed disk size.
3069

3070
    This is valid mainly for DRBD8 and fixes an issue where the
3071
    children have smaller disk size.
3072

3073
    @param disk: an L{ganeti.objects.Disk} object
3074

3075
    """
3076
    if disk.dev_type == constants.LD_DRBD8:
3077
      assert disk.children, "Empty children for DRBD8?"
3078
      fchild = disk.children[0]
3079
      mismatch = fchild.size < disk.size
3080
      if mismatch:
3081
        self.LogInfo("Child disk has size %d, parent %d, fixing",
3082
                     fchild.size, disk.size)
3083
        fchild.size = disk.size
3084

    
3085
      # and we recurse on this child only, not on the metadev
3086
      return self._EnsureChildSizes(fchild) or mismatch
3087
    else:
3088
      return False
3089

    
3090
  def Exec(self, feedback_fn):
3091
    """Verify the size of cluster disks.
3092

3093
    """
3094
    # TODO: check child disks too
3095
    # TODO: check differences in size between primary/secondary nodes
3096
    per_node_disks = {}
3097
    for instance in self.wanted_instances:
3098
      pnode = instance.primary_node
3099
      if pnode not in per_node_disks:
3100
        per_node_disks[pnode] = []
3101
      for idx, disk in enumerate(instance.disks):
3102
        per_node_disks[pnode].append((instance, idx, disk))
3103

    
3104
    changed = []
3105
    for node, dskl in per_node_disks.items():
3106
      newl = [v[2].Copy() for v in dskl]
3107
      for dsk in newl:
3108
        self.cfg.SetDiskID(dsk, node)
3109
      result = self.rpc.call_blockdev_getsize(node, newl)
3110
      if result.fail_msg:
3111
        self.LogWarning("Failure in blockdev_getsize call to node"
3112
                        " %s, ignoring", node)
3113
        continue
3114
      if len(result.payload) != len(dskl):
3115
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
3116
                        " result.payload=%s", node, len(dskl), result.payload)
3117
        self.LogWarning("Invalid result from node %s, ignoring node results",
3118
                        node)
3119
        continue
3120
      for ((instance, idx, disk), size) in zip(dskl, result.payload):
3121
        if size is None:
3122
          self.LogWarning("Disk %d of instance %s did not return size"
3123
                          " information, ignoring", idx, instance.name)
3124
          continue
3125
        if not isinstance(size, (int, long)):
3126
          self.LogWarning("Disk %d of instance %s did not return valid"
3127
                          " size information, ignoring", idx, instance.name)
3128
          continue
3129
        size = size >> 20
3130
        if size != disk.size:
3131
          self.LogInfo("Disk %d of instance %s has mismatched size,"
3132
                       " correcting: recorded %d, actual %d", idx,
3133
                       instance.name, disk.size, size)
3134
          disk.size = size
3135
          self.cfg.Update(instance, feedback_fn)
3136
          changed.append((instance.name, idx, size))
3137
        if self._EnsureChildSizes(disk):
3138
          self.cfg.Update(instance, feedback_fn)
3139
          changed.append((instance.name, idx, disk.size))
3140
    return changed
3141

    
3142

    
3143
class LUClusterRename(LogicalUnit):
3144
  """Rename the cluster.
3145

3146
  """
3147
  HPATH = "cluster-rename"
3148
  HTYPE = constants.HTYPE_CLUSTER
3149

    
3150
  def BuildHooksEnv(self):
3151
    """Build hooks env.
3152

3153
    """
3154
    return {
3155
      "OP_TARGET": self.cfg.GetClusterName(),
3156
      "NEW_NAME": self.op.name,
3157
      }
3158

    
3159
  def BuildHooksNodes(self):
3160
    """Build hooks nodes.
3161

3162
    """
3163
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
3164

    
3165
  def CheckPrereq(self):
3166
    """Verify that the passed name is a valid one.
3167

3168
    """
3169
    hostname = netutils.GetHostname(name=self.op.name,
3170
                                    family=self.cfg.GetPrimaryIPFamily())
3171

    
3172
    new_name = hostname.name
3173
    self.ip = new_ip = hostname.ip
3174
    old_name = self.cfg.GetClusterName()
3175
    old_ip = self.cfg.GetMasterIP()
3176
    if new_name == old_name and new_ip == old_ip:
3177
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
3178
                                 " cluster has changed",
3179
                                 errors.ECODE_INVAL)
3180
    if new_ip != old_ip:
3181
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
3182
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
3183
                                   " reachable on the network" %
3184
                                   new_ip, errors.ECODE_NOTUNIQUE)
3185

    
3186
    self.op.name = new_name
3187

    
3188
  def Exec(self, feedback_fn):
3189
    """Rename the cluster.
3190

3191
    """
3192
    clustername = self.op.name
3193
    ip = self.ip
3194

    
3195
    # shutdown the master IP
3196
    master = self.cfg.GetMasterNode()
3197
    result = self.rpc.call_node_stop_master(master, False)
3198
    result.Raise("Could not disable the master role")
3199

    
3200
    try:
3201
      cluster = self.cfg.GetClusterInfo()
3202
      cluster.cluster_name = clustername
3203
      cluster.master_ip = ip
3204
      self.cfg.Update(cluster, feedback_fn)
3205

    
3206
      # update the known hosts file
3207
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
3208
      node_list = self.cfg.GetOnlineNodeList()
3209
      try:
3210
        node_list.remove(master)
3211
      except ValueError:
3212
        pass
3213
      _UploadHelper(self, node_list, constants.SSH_KNOWN_HOSTS_FILE)
3214
    finally:
3215
      result = self.rpc.call_node_start_master(master, False, False)
3216
      msg = result.fail_msg
3217
      if msg:
3218
        self.LogWarning("Could not re-enable the master role on"
3219
                        " the master, please restart manually: %s", msg)
3220

    
3221
    return clustername
3222

    
3223

    
3224
class LUClusterSetParams(LogicalUnit):
3225
  """Change the parameters of the cluster.
3226

3227
  """
3228
  HPATH = "cluster-modify"
3229
  HTYPE = constants.HTYPE_CLUSTER
3230
  REQ_BGL = False
3231

    
3232
  def CheckArguments(self):
3233
    """Check parameters
3234

3235
    """
3236
    if self.op.uid_pool:
3237
      uidpool.CheckUidPool(self.op.uid_pool)
3238

    
3239
    if self.op.add_uids:
3240
      uidpool.CheckUidPool(self.op.add_uids)
3241

    
3242
    if self.op.remove_uids:
3243
      uidpool.CheckUidPool(self.op.remove_uids)
3244

    
3245
  def ExpandNames(self):
3246
    # FIXME: in the future maybe other cluster params won't require checking on
3247
    # all nodes to be modified.
3248
    self.needed_locks = {
3249
      locking.LEVEL_NODE: locking.ALL_SET,
3250
    }
3251
    self.share_locks[locking.LEVEL_NODE] = 1
3252

    
3253
  def BuildHooksEnv(self):
3254
    """Build hooks env.
3255

3256
    """
3257
    return {
3258
      "OP_TARGET": self.cfg.GetClusterName(),
3259
      "NEW_VG_NAME": self.op.vg_name,
3260
      }
3261

    
3262
  def BuildHooksNodes(self):
3263
    """Build hooks nodes.
3264

3265
    """
3266
    mn = self.cfg.GetMasterNode()
3267
    return ([mn], [mn])
3268

    
3269
  def CheckPrereq(self):
3270
    """Check prerequisites.
3271

3272
    This checks whether the given params don't conflict and
3273
    if the given volume group is valid.
3274

3275
    """
3276
    if self.op.vg_name is not None and not self.op.vg_name:
3277
      if self.cfg.HasAnyDiskOfType(constants.LD_LV):
3278
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
3279
                                   " instances exist", errors.ECODE_INVAL)
3280

    
3281
    if self.op.drbd_helper is not None and not self.op.drbd_helper:
3282
      if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
3283
        raise errors.OpPrereqError("Cannot disable drbd helper while"
3284
                                   " drbd-based instances exist",
3285
                                   errors.ECODE_INVAL)
3286

    
3287
    node_list = self.glm.list_owned(locking.LEVEL_NODE)
3288

    
3289
    # if vg_name not None, checks given volume group on all nodes
3290
    if self.op.vg_name:
3291
      vglist = self.rpc.call_vg_list(node_list)
3292
      for node in node_list:
3293
        msg = vglist[node].fail_msg
3294
        if msg:
3295
          # ignoring down node
3296
          self.LogWarning("Error while gathering data on node %s"
3297
                          " (ignoring node): %s", node, msg)
3298
          continue
3299
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
3300
                                              self.op.vg_name,
3301
                                              constants.MIN_VG_SIZE)
3302
        if vgstatus:
3303
          raise errors.OpPrereqError("Error on node '%s': %s" %
3304
                                     (node, vgstatus), errors.ECODE_ENVIRON)
3305

    
3306
    if self.op.drbd_helper:
3307
      # checks given drbd helper on all nodes
3308
      helpers = self.rpc.call_drbd_helper(node_list)
3309
      for node in node_list:
3310
        ninfo = self.cfg.GetNodeInfo(node)
3311
        if ninfo.offline:
3312
          self.LogInfo("Not checking drbd helper on offline node %s", node)
3313
          continue
3314
        msg = helpers[node].fail_msg
3315
        if msg:
3316
          raise errors.OpPrereqError("Error checking drbd helper on node"
3317
                                     " '%s': %s" % (node, msg),
3318
                                     errors.ECODE_ENVIRON)
3319
        node_helper = helpers[node].payload
3320
        if node_helper != self.op.drbd_helper:
3321
          raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
3322
                                     (node, node_helper), errors.ECODE_ENVIRON)
3323

    
3324
    self.cluster = cluster = self.cfg.GetClusterInfo()
3325
    # validate params changes
3326
    if self.op.beparams:
3327
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
3328
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
3329

    
3330
    if self.op.ndparams:
3331
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
3332
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
3333

    
3334
      # TODO: we need a more general way to handle resetting
3335
      # cluster-level parameters to default values
3336
      if self.new_ndparams["oob_program"] == "":
3337
        self.new_ndparams["oob_program"] = \
3338
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
3339

    
3340
    if self.op.nicparams:
3341
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
3342
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
3343
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
3344
      nic_errors = []
3345

    
3346
      # check all instances for consistency
3347
      for instance in self.cfg.GetAllInstancesInfo().values():
3348
        for nic_idx, nic in enumerate(instance.nics):
3349
          params_copy = copy.deepcopy(nic.nicparams)
3350
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
3351

    
3352
          # check parameter syntax
3353
          try:
3354
            objects.NIC.CheckParameterSyntax(params_filled)
3355
          except errors.ConfigurationError, err:
3356
            nic_errors.append("Instance %s, nic/%d: %s" %
3357
                              (instance.name, nic_idx, err))
3358

    
3359
          # if we're moving instances to routed, check that they have an ip
3360
          target_mode = params_filled[constants.NIC_MODE]
3361
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
3362
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
3363
                              " address" % (instance.name, nic_idx))
3364
      if nic_errors:
3365
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
3366
                                   "\n".join(nic_errors))
3367

    
3368
    # hypervisor list/parameters
3369
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
3370
    if self.op.hvparams:
3371
      for hv_name, hv_dict in self.op.hvparams.items():
3372
        if hv_name not in self.new_hvparams:
3373
          self.new_hvparams[hv_name] = hv_dict
3374
        else:
3375
          self.new_hvparams[hv_name].update(hv_dict)
3376

    
3377
    # os hypervisor parameters
3378
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
3379
    if self.op.os_hvp:
3380
      for os_name, hvs in self.op.os_hvp.items():
3381
        if os_name not in self.new_os_hvp:
3382
          self.new_os_hvp[os_name] = hvs
3383
        else:
3384
          for hv_name, hv_dict in hvs.items():
3385
            if hv_name not in self.new_os_hvp[os_name]:
3386
              self.new_os_hvp[os_name][hv_name] = hv_dict
3387
            else:
3388
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
3389

    
3390
    # os parameters
3391
    self.new_osp = objects.FillDict(cluster.osparams, {})
3392
    if self.op.osparams:
3393
      for os_name, osp in self.op.osparams.items():
3394
        if os_name not in self.new_osp:
3395
          self.new_osp[os_name] = {}
3396

    
3397
        self.new_osp[os_name] = _GetUpdatedParams(self.new_osp[os_name], osp,
3398
                                                  use_none=True)
3399

    
3400
        if not self.new_osp[os_name]:
3401
          # we removed all parameters
3402
          del self.new_osp[os_name]
3403
        else:
3404
          # check the parameter validity (remote check)
3405
          _CheckOSParams(self, False, [self.cfg.GetMasterNode()],
3406
                         os_name, self.new_osp[os_name])
3407

    
3408
    # changes to the hypervisor list
3409
    if self.op.enabled_hypervisors is not None:
3410
      self.hv_list = self.op.enabled_hypervisors
3411
      for hv in self.hv_list:
3412
        # if the hypervisor doesn't already exist in the cluster
3413
        # hvparams, we initialize it to empty, and then (in both
3414
        # cases) we make sure to fill the defaults, as we might not
3415
        # have a complete defaults list if the hypervisor wasn't
3416
        # enabled before
3417
        if hv not in new_hvp:
3418
          new_hvp[hv] = {}
3419
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
3420
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
3421
    else:
3422
      self.hv_list = cluster.enabled_hypervisors
3423

    
3424
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
3425
      # either the enabled list has changed, or the parameters have, validate
3426
      for hv_name, hv_params in self.new_hvparams.items():
3427
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
3428
            (self.op.enabled_hypervisors and
3429
             hv_name in self.op.enabled_hypervisors)):
3430
          # either this is a new hypervisor, or its parameters have changed
3431
          hv_class = hypervisor.GetHypervisor(hv_name)
3432
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
3433
          hv_class.CheckParameterSyntax(hv_params)
3434
          _CheckHVParams(self, node_list, hv_name, hv_params)
3435

    
3436
    if self.op.os_hvp:
3437
      # no need to check any newly-enabled hypervisors, since the
3438
      # defaults have already been checked in the above code-block
3439
      for os_name, os_hvp in self.new_os_hvp.items():
3440
        for hv_name, hv_params in os_hvp.items():
3441
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
3442
          # we need to fill in the new os_hvp on top of the actual hv_p
3443
          cluster_defaults = self.new_hvparams.get(hv_name, {})
3444
          new_osp = objects.FillDict(cluster_defaults, hv_params)
3445
          hv_class = hypervisor.GetHypervisor(hv_name)
3446
          hv_class.CheckParameterSyntax(new_osp)
3447
          _CheckHVParams(self, node_list, hv_name, new_osp)
3448

    
3449
    if self.op.default_iallocator:
3450
      alloc_script = utils.FindFile(self.op.default_iallocator,
3451
                                    constants.IALLOCATOR_SEARCH_PATH,
3452
                                    os.path.isfile)
3453
      if alloc_script is None:
3454
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
3455
                                   " specified" % self.op.default_iallocator,
3456
                                   errors.ECODE_INVAL)
3457

    
3458
  def Exec(self, feedback_fn):
3459
    """Change the parameters of the cluster.
3460

3461
    """
3462
    if self.op.vg_name is not None:
3463
      new_volume = self.op.vg_name
3464
      if not new_volume:
3465
        new_volume = None
3466
      if new_volume != self.cfg.GetVGName():
3467
        self.cfg.SetVGName(new_volume)
3468
      else:
3469
        feedback_fn("Cluster LVM configuration already in desired"
3470
                    " state, not changing")
3471
    if self.op.drbd_helper is not None:
3472
      new_helper = self.op.drbd_helper
3473
      if not new_helper:
3474
        new_helper = None
3475
      if new_helper != self.cfg.GetDRBDHelper():
3476
        self.cfg.SetDRBDHelper(new_helper)
3477
      else:
3478
        feedback_fn("Cluster DRBD helper already in desired state,"
3479
                    " not changing")
3480
    if self.op.hvparams:
3481
      self.cluster.hvparams = self.new_hvparams
3482
    if self.op.os_hvp:
3483
      self.cluster.os_hvp = self.new_os_hvp
3484
    if self.op.enabled_hypervisors is not None:
3485
      self.cluster.hvparams = self.new_hvparams
3486
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
3487
    if self.op.beparams:
3488
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
3489
    if self.op.nicparams:
3490
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
3491
    if self.op.osparams:
3492
      self.cluster.osparams = self.new_osp
3493
    if self.op.ndparams:
3494
      self.cluster.ndparams = self.new_ndparams
3495

    
3496
    if self.op.candidate_pool_size is not None:
3497
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
3498
      # we need to update the pool size here, otherwise the save will fail
3499
      _AdjustCandidatePool(self, [])
3500

    
3501
    if self.op.maintain_node_health is not None:
3502
      self.cluster.maintain_node_health = self.op.maintain_node_health
3503

    
3504
    if self.op.prealloc_wipe_disks is not None:
3505
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
3506

    
3507
    if self.op.add_uids is not None:
3508
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
3509

    
3510
    if self.op.remove_uids is not None:
3511
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
3512

    
3513
    if self.op.uid_pool is not None:
3514
      self.cluster.uid_pool = self.op.uid_pool
3515

    
3516
    if self.op.default_iallocator is not None:
3517
      self.cluster.default_iallocator = self.op.default_iallocator
3518

    
3519
    if self.op.reserved_lvs is not None:
3520
      self.cluster.reserved_lvs = self.op.reserved_lvs
3521

    
3522
    def helper_os(aname, mods, desc):
3523
      desc += " OS list"
3524
      lst = getattr(self.cluster, aname)
3525
      for key, val in mods:
3526
        if key == constants.DDM_ADD:
3527
          if val in lst:
3528
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
3529
          else:
3530
            lst.append(val)
3531
        elif key == constants.DDM_REMOVE:
3532
          if val in lst:
3533
            lst.remove(val)
3534
          else:
3535
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
3536
        else:
3537
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
3538

    
3539
    if self.op.hidden_os:
3540
      helper_os("hidden_os", self.op.hidden_os, "hidden")
3541

    
3542
    if self.op.blacklisted_os:
3543
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
3544

    
3545
    if self.op.master_netdev:
3546
      master = self.cfg.GetMasterNode()
3547
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
3548
                  self.cluster.master_netdev)
3549
      result = self.rpc.call_node_stop_master(master, False)
3550
      result.Raise("Could not disable the master ip")
3551
      feedback_fn("Changing master_netdev from %s to %s" %
3552
                  (self.cluster.master_netdev, self.op.master_netdev))
3553
      self.cluster.master_netdev = self.op.master_netdev
3554

    
3555
    self.cfg.Update(self.cluster, feedback_fn)
3556

    
3557
    if self.op.master_netdev:
3558
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
3559
                  self.op.master_netdev)
3560
      result = self.rpc.call_node_start_master(master, False, False)
3561
      if result.fail_msg:
3562
        self.LogWarning("Could not re-enable the master ip on"
3563
                        " the master, please restart manually: %s",
3564
                        result.fail_msg)
3565

    
3566

    
3567
def _UploadHelper(lu, nodes, fname):
3568
  """Helper for uploading a file and showing warnings.
3569

3570
  """
3571
  if os.path.exists(fname):
3572
    result = lu.rpc.call_upload_file(nodes, fname)
3573
    for to_node, to_result in result.items():
3574
      msg = to_result.fail_msg
3575
      if msg:
3576
        msg = ("Copy of file %s to node %s failed: %s" %
3577
               (fname, to_node, msg))
3578
        lu.proc.LogWarning(msg)
3579

    
3580

    
3581
def _ComputeAncillaryFiles(cluster, redist):
3582
  """Compute files external to Ganeti which need to be consistent.
3583

3584
  @type redist: boolean
3585
  @param redist: Whether to include files which need to be redistributed
3586

3587
  """
3588
  # Compute files for all nodes
3589
  files_all = set([
3590
    constants.SSH_KNOWN_HOSTS_FILE,
3591
    constants.CONFD_HMAC_KEY,
3592
    constants.CLUSTER_DOMAIN_SECRET_FILE,
3593
    ])
3594

    
3595
  if not redist:
3596
    files_all.update(constants.ALL_CERT_FILES)
3597
    files_all.update(ssconf.SimpleStore().GetFileList())
3598

    
3599
  if cluster.modify_etc_hosts:
3600
    files_all.add(constants.ETC_HOSTS)
3601

    
3602
  # Files which must either exist on all nodes or on none
3603
  files_all_opt = set([
3604
    constants.RAPI_USERS_FILE,
3605
    ])
3606

    
3607
  # Files which should only be on master candidates
3608
  files_mc = set()
3609
  if not redist:
3610
    files_mc.add(constants.CLUSTER_CONF_FILE)
3611

    
3612
  # Files which should only be on VM-capable nodes
3613
  files_vm = set(filename
3614
    for hv_name in cluster.enabled_hypervisors
3615
    for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles())
3616

    
3617
  # Filenames must be unique
3618
  assert (len(files_all | files_all_opt | files_mc | files_vm) ==
3619
          sum(map(len, [files_all, files_all_opt, files_mc, files_vm]))), \
3620
         "Found file listed in more than one file list"
3621

    
3622
  return (files_all, files_all_opt, files_mc, files_vm)
3623

    
3624

    
3625
def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
3626
  """Distribute additional files which are part of the cluster configuration.
3627

3628
  ConfigWriter takes care of distributing the config and ssconf files, but
3629
  there are more files which should be distributed to all nodes. This function
3630
  makes sure those are copied.
3631

3632
  @param lu: calling logical unit
3633
  @param additional_nodes: list of nodes not in the config to distribute to
3634
  @type additional_vm: boolean
3635
  @param additional_vm: whether the additional nodes are vm-capable or not
3636

3637
  """
3638
  # Gather target nodes
3639
  cluster = lu.cfg.GetClusterInfo()
3640
  master_info = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
3641

    
3642
  online_nodes = lu.cfg.GetOnlineNodeList()
3643
  vm_nodes = lu.cfg.GetVmCapableNodeList()
3644

    
3645
  if additional_nodes is not None:
3646
    online_nodes.extend(additional_nodes)
3647
    if additional_vm:
3648
      vm_nodes.extend(additional_nodes)
3649

    
3650
  # Never distribute to master node
3651
  for nodelist in [online_nodes, vm_nodes]:
3652
    if master_info.name in nodelist:
3653
      nodelist.remove(master_info.name)
3654

    
3655
  # Gather file lists
3656
  (files_all, files_all_opt, files_mc, files_vm) = \
3657
    _ComputeAncillaryFiles(cluster, True)
3658

    
3659
  # Never re-distribute configuration file from here
3660
  assert not (constants.CLUSTER_CONF_FILE in files_all or
3661
              constants.CLUSTER_CONF_FILE in files_vm)
3662
  assert not files_mc, "Master candidates not handled in this function"
3663

    
3664
  filemap = [
3665
    (online_nodes, files_all),
3666
    (online_nodes, files_all_opt),
3667
    (vm_nodes, files_vm),
3668
    ]
3669

    
3670
  # Upload the files
3671
  for (node_list, files) in filemap:
3672
    for fname in files:
3673
      _UploadHelper(lu, node_list, fname)
3674

    
3675

    
3676
class LUClusterRedistConf(NoHooksLU):
3677
  """Force the redistribution of cluster configuration.
3678

3679
  This is a very simple LU.
3680

3681
  """
3682
  REQ_BGL = False
3683

    
3684
  def ExpandNames(self):
3685
    self.needed_locks = {
3686
      locking.LEVEL_NODE: locking.ALL_SET,
3687
    }
3688
    self.share_locks[locking.LEVEL_NODE] = 1
3689

    
3690
  def Exec(self, feedback_fn):
3691
    """Redistribute the configuration.
3692

3693
    """
3694
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
3695
    _RedistributeAncillaryFiles(self)
3696

    
3697

    
3698
def _WaitForSync(lu, instance, disks=None, oneshot=False):
3699
  """Sleep and poll for an instance's disk to sync.
3700

3701
  """
3702
  if not instance.disks or disks is not None and not disks:
3703
    return True
3704

    
3705
  disks = _ExpandCheckDisks(instance, disks)
3706

    
3707
  if not oneshot:
3708
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
3709

    
3710
  node = instance.primary_node
3711

    
3712
  for dev in disks:
3713
    lu.cfg.SetDiskID(dev, node)
3714

    
3715
  # TODO: Convert to utils.Retry
3716

    
3717
  retries = 0
3718
  degr_retries = 10 # in seconds, as we sleep 1 second each time
3719
  while True:
3720
    max_time = 0
3721
    done = True
3722
    cumul_degraded = False
3723
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, disks)
3724
    msg = rstats.fail_msg
3725
    if msg:
3726
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
3727
      retries += 1
3728
      if retries >= 10:
3729
        raise errors.RemoteError("Can't contact node %s for mirror data,"
3730
                                 " aborting." % node)
3731
      time.sleep(6)
3732
      continue
3733
    rstats = rstats.payload
3734
    retries = 0
3735
    for i, mstat in enumerate(rstats):
3736
      if mstat is None:
3737
        lu.LogWarning("Can't compute data for node %s/%s",
3738
                           node, disks[i].iv_name)
3739
        continue
3740

    
3741
      cumul_degraded = (cumul_degraded or
3742
                        (mstat.is_degraded and mstat.sync_percent is None))
3743
      if mstat.sync_percent is not None:
3744
        done = False
3745
        if mstat.estimated_time is not None:
3746
          rem_time = ("%s remaining (estimated)" %
3747
                      utils.FormatSeconds(mstat.estimated_time))
3748
          max_time = mstat.estimated_time
3749
        else:
3750
          rem_time = "no time estimate"
3751
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
3752
                        (disks[i].iv_name, mstat.sync_percent, rem_time))
3753

    
3754
    # if we're done but degraded, let's do a few small retries, to
3755
    # make sure we see a stable and not transient situation; therefore
3756
    # we force restart of the loop
3757
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
3758
      logging.info("Degraded disks found, %d retries left", degr_retries)
3759
      degr_retries -= 1
3760
      time.sleep(1)
3761
      continue
3762

    
3763
    if done or oneshot:
3764
      break
3765

    
3766
    time.sleep(min(60, max_time))
3767

    
3768
  if done:
3769
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
3770
  return not cumul_degraded
3771

    
3772

    
3773
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
3774
  """Check that mirrors are not degraded.
3775

3776
  The ldisk parameter, if True, will change the test from the
3777
  is_degraded attribute (which represents overall non-ok status for
3778
  the device(s)) to the ldisk (representing the local storage status).
3779

3780
  """
3781
  lu.cfg.SetDiskID(dev, node)
3782

    
3783
  result = True
3784

    
3785
  if on_primary or dev.AssembleOnSecondary():
3786
    rstats = lu.rpc.call_blockdev_find(node, dev)
3787
    msg = rstats.fail_msg
3788
    if msg:
3789
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
3790
      result = False
3791
    elif not rstats.payload:
3792
      lu.LogWarning("Can't find disk on node %s", node)
3793
      result = False
3794
    else:
3795
      if ldisk:
3796
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
3797
      else:
3798
        result = result and not rstats.payload.is_degraded
3799

    
3800
  if dev.children:
3801
    for child in dev.children:
3802
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
3803

    
3804
  return result
3805

    
3806

    
3807
class LUOobCommand(NoHooksLU):
3808
  """Logical unit for OOB handling.
3809

3810
  """
3811
  REG_BGL = False
3812
  _SKIP_MASTER = (constants.OOB_POWER_OFF, constants.OOB_POWER_CYCLE)
3813

    
3814
  def ExpandNames(self):
3815
    """Gather locks we need.
3816

3817
    """
3818
    if self.op.node_names:
3819
      self.op.node_names = _GetWantedNodes(self, self.op.node_names)
3820
      lock_names = self.op.node_names
3821
    else:
3822
      lock_names = locking.ALL_SET
3823

    
3824
    self.needed_locks = {
3825
      locking.LEVEL_NODE: lock_names,
3826
      }
3827

    
3828
  def CheckPrereq(self):
3829
    """Check prerequisites.
3830

3831
    This checks:
3832
     - the node exists in the configuration
3833
     - OOB is supported
3834

3835
    Any errors are signaled by raising errors.OpPrereqError.
3836

3837
    """
3838
    self.nodes = []
3839
    self.master_node = self.cfg.GetMasterNode()
3840

    
3841
    assert self.op.power_delay >= 0.0
3842

    
3843
    if self.op.node_names:
3844
      if (self.op.command in self._SKIP_MASTER and
3845
          self.master_node in self.op.node_names):
3846
        master_node_obj = self.cfg.GetNodeInfo(self.master_node)
3847
        master_oob_handler = _SupportsOob(self.cfg, master_node_obj)
3848

    
3849
        if master_oob_handler:
3850
          additional_text = ("run '%s %s %s' if you want to operate on the"
3851
                             " master regardless") % (master_oob_handler,
3852
                                                      self.op.command,
3853
                                                      self.master_node)
3854
        else:
3855
          additional_text = "it does not support out-of-band operations"
3856

    
3857
        raise errors.OpPrereqError(("Operating on the master node %s is not"
3858
                                    " allowed for %s; %s") %
3859
                                   (self.master_node, self.op.command,
3860
                                    additional_text), errors.ECODE_INVAL)
3861
    else:
3862
      self.op.node_names = self.cfg.GetNodeList()
3863
      if self.op.command in self._SKIP_MASTER:
3864
        self.op.node_names.remove(self.master_node)
3865

    
3866
    if self.op.command in self._SKIP_MASTER:
3867
      assert self.master_node not in self.op.node_names
3868

    
3869
    for node_name in self.op.node_names:
3870
      node = self.cfg.GetNodeInfo(node_name)
3871

    
3872
      if node is None:
3873
        raise errors.OpPrereqError("Node %s not found" % node_name,
3874
                                   errors.ECODE_NOENT)
3875
      else:
3876
        self.nodes.append(node)
3877

    
3878
      if (not self.op.ignore_status and
3879
          (self.op.command == constants.OOB_POWER_OFF and not node.offline)):
3880
        raise errors.OpPrereqError(("Cannot power off node %s because it is"
3881
                                    " not marked offline") % node_name,
3882
                                   errors.ECODE_STATE)
3883

    
3884
  def Exec(self, feedback_fn):
3885
    """Execute OOB and return result if we expect any.
3886

3887
    """
3888
    master_node = self.master_node
3889
    ret = []
3890

    
3891
    for idx, node in enumerate(utils.NiceSort(self.nodes,
3892
                                              key=lambda node: node.name)):
3893
      node_entry = [(constants.RS_NORMAL, node.name)]
3894
      ret.append(node_entry)
3895

    
3896
      oob_program = _SupportsOob(self.cfg, node)
3897

    
3898
      if not oob_program:
3899
        node_entry.append((constants.RS_UNAVAIL, None))
3900
        continue
3901

    
3902
      logging.info("Executing out-of-band command '%s' using '%s' on %s",
3903
                   self.op.command, oob_program, node.name)
3904
      result = self.rpc.call_run_oob(master_node, oob_program,
3905
                                     self.op.command, node.name,
3906
                                     self.op.timeout)
3907

    
3908
      if result.fail_msg:
3909
        self.LogWarning("Out-of-band RPC failed on node '%s': %s",
3910
                        node.name, result.fail_msg)
3911
        node_entry.append((constants.RS_NODATA, None))
3912
      else:
3913
        try:
3914
          self._CheckPayload(result)
3915
        except errors.OpExecError, err:
3916
          self.LogWarning("Payload returned by node '%s' is not valid: %s",
3917
                          node.name, err)
3918
          node_entry.append((constants.RS_NODATA, None))
3919
        else:
3920
          if self.op.command == constants.OOB_HEALTH:
3921
            # For health we should log important events
3922
            for item, status in result.payload:
3923
              if status in [constants.OOB_STATUS_WARNING,
3924
                            constants.OOB_STATUS_CRITICAL]:
3925
                self.LogWarning("Item '%s' on node '%s' has status '%s'",
3926
                                item, node.name, status)
3927

    
3928
          if self.op.command == constants.OOB_POWER_ON:
3929
            node.powered = True
3930
          elif self.op.command == constants.OOB_POWER_OFF:
3931
            node.powered = False
3932
          elif self.op.command == constants.OOB_POWER_STATUS:
3933
            powered = result.payload[constants.OOB_POWER_STATUS_POWERED]
3934
            if powered != node.powered:
3935
              logging.warning(("Recorded power state (%s) of node '%s' does not"
3936
                               " match actual power state (%s)"), node.powered,
3937
                              node.name, powered)
3938

    
3939
          # For configuration changing commands we should update the node
3940
          if self.op.command in (constants.OOB_POWER_ON,
3941
                                 constants.OOB_POWER_OFF):
3942
            self.cfg.Update(node, feedback_fn)
3943

    
3944
          node_entry.append((constants.RS_NORMAL, result.payload))
3945

    
3946
          if (self.op.command == constants.OOB_POWER_ON and
3947
              idx < len(self.nodes) - 1):
3948
            time.sleep(self.op.power_delay)
3949

    
3950
    return ret
3951

    
3952
  def _CheckPayload(self, result):
3953
    """Checks if the payload is valid.
3954

3955
    @param result: RPC result
3956
    @raises errors.OpExecError: If payload is not valid
3957

3958
    """
3959
    errs = []
3960
    if self.op.command == constants.OOB_HEALTH:
3961
      if not isinstance(result.payload, list):
3962
        errs.append("command 'health' is expected to return a list but got %s" %
3963
                    type(result.payload))
3964
      else:
3965
        for item, status in result.payload:
3966
          if status not in constants.OOB_STATUSES:
3967
            errs.append("health item '%s' has invalid status '%s'" %
3968
                        (item, status))
3969

    
3970
    if self.op.command == constants.OOB_POWER_STATUS:
3971
      if not isinstance(result.payload, dict):
3972
        errs.append("power-status is expected to return a dict but got %s" %
3973
                    type(result.payload))
3974

    
3975
    if self.op.command in [
3976
        constants.OOB_POWER_ON,
3977
        constants.OOB_POWER_OFF,
3978
        constants.OOB_POWER_CYCLE,
3979
        ]:
3980
      if result.payload is not None:
3981
        errs.append("%s is expected to not return payload but got '%s'" %
3982
                    (self.op.command, result.payload))
3983

    
3984
    if errs:
3985
      raise errors.OpExecError("Check of out-of-band payload failed due to %s" %
3986
                               utils.CommaJoin(errs))
3987

    
3988
class _OsQuery(_QueryBase):
3989
  FIELDS = query.OS_FIELDS
3990

    
3991
  def ExpandNames(self, lu):
3992
    # Lock all nodes in shared mode
3993
    # Temporary removal of locks, should be reverted later
3994
    # TODO: reintroduce locks when they are lighter-weight
3995
    lu.needed_locks = {}
3996
    #self.share_locks[locking.LEVEL_NODE] = 1
3997
    #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3998

    
3999
    # The following variables interact with _QueryBase._GetNames
4000
    if self.names:
4001
      self.wanted = self.names
4002
    else:
4003
      self.wanted = locking.ALL_SET
4004

    
4005
    self.do_locking = self.use_locking
4006

    
4007
  def DeclareLocks(self, lu, level):
4008
    pass
4009

    
4010
  @staticmethod
4011
  def _DiagnoseByOS(rlist):
4012
    """Remaps a per-node return list into an a per-os per-node dictionary
4013

4014
    @param rlist: a map with node names as keys and OS objects as values
4015

4016
    @rtype: dict
4017
    @return: a dictionary with osnames as keys and as value another
4018
        map, with nodes as keys and tuples of (path, status, diagnose,
4019
        variants, parameters, api_versions) as values, eg::
4020

4021
          {"debian-etch": {"node1": [(/usr/lib/..., True, "", [], []),
4022
                                     (/srv/..., False, "invalid api")],
4023
                           "node2": [(/srv/..., True, "", [], [])]}
4024
          }
4025

4026
    """
4027
    all_os = {}
4028
    # we build here the list of nodes that didn't fail the RPC (at RPC
4029
    # level), so that nodes with a non-responding node daemon don't
4030
    # make all OSes invalid
4031
    good_nodes = [node_name for node_name in rlist
4032
                  if not rlist[node_name].fail_msg]
4033
    for node_name, nr in rlist.items():
4034
      if nr.fail_msg or not nr.payload:
4035
        continue
4036
      for (name, path, status, diagnose, variants,
4037
           params, api_versions) in nr.payload:
4038
        if name not in all_os:
4039
          # build a list of nodes for this os containing empty lists
4040
          # for each node in node_list
4041
          all_os[name] = {}
4042
          for nname in good_nodes:
4043
            all_os[name][nname] = []
4044
        # convert params from [name, help] to (name, help)
4045
        params = [tuple(v) for v in params]
4046
        all_os[name][node_name].append((path, status, diagnose,
4047
                                        variants, params, api_versions))
4048
    return all_os
4049

    
4050
  def _GetQueryData(self, lu):
4051
    """Computes the list of nodes and their attributes.
4052

4053
    """
4054
    # Locking is not used
4055
    assert not (compat.any(lu.glm.is_owned(level)
4056
                           for level in locking.LEVELS
4057
                           if level != locking.LEVEL_CLUSTER) or
4058
                self.do_locking or self.use_locking)
4059

    
4060
    valid_nodes = [node.name
4061
                   for node in lu.cfg.GetAllNodesInfo().values()
4062
                   if not node.offline and node.vm_capable]
4063
    pol = self._DiagnoseByOS(lu.rpc.call_os_diagnose(valid_nodes))
4064
    cluster = lu.cfg.GetClusterInfo()
4065

    
4066
    data = {}
4067

    
4068
    for (os_name, os_data) in pol.items():
4069
      info = query.OsInfo(name=os_name, valid=True, node_status=os_data,
4070
                          hidden=(os_name in cluster.hidden_os),
4071
                          blacklisted=(os_name in cluster.blacklisted_os))
4072

    
4073
      variants = set()
4074
      parameters = set()
4075
      api_versions = set()
4076

    
4077
      for idx, osl in enumerate(os_data.values()):
4078
        info.valid = bool(info.valid and osl and osl[0][1])
4079
        if not info.valid:
4080
          break
4081

    
4082
        (node_variants, node_params, node_api) = osl[0][3:6]
4083
        if idx == 0:
4084
          # First entry
4085
          variants.update(node_variants)
4086
          parameters.update(node_params)
4087
          api_versions.update(node_api)
4088
        else:
4089
          # Filter out inconsistent values
4090
          variants.intersection_update(node_variants)
4091
          parameters.intersection_update(node_params)
4092
          api_versions.intersection_update(node_api)
4093

    
4094