Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 3b61ee44

History | View | Annotate | Download (460.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0201,C0302
25

    
26
# W0201 since most LU attributes are defined in CheckPrereq or similar
27
# functions
28

    
29
# C0302: since we have waaaay to many lines in this module
30

    
31
import os
32
import os.path
33
import time
34
import re
35
import platform
36
import logging
37
import copy
38
import OpenSSL
39
import socket
40
import tempfile
41
import shutil
42
import itertools
43
import operator
44

    
45
from ganeti import ssh
46
from ganeti import utils
47
from ganeti import errors
48
from ganeti import hypervisor
49
from ganeti import locking
50
from ganeti import constants
51
from ganeti import objects
52
from ganeti import serializer
53
from ganeti import ssconf
54
from ganeti import uidpool
55
from ganeti import compat
56
from ganeti import masterd
57
from ganeti import netutils
58
from ganeti import query
59
from ganeti import qlang
60
from ganeti import opcodes
61
from ganeti import ht
62

    
63
import ganeti.masterd.instance # pylint: disable-msg=W0611
64

    
65

    
66
def _SupportsOob(cfg, node):
67
  """Tells if node supports OOB.
68

69
  @type cfg: L{config.ConfigWriter}
70
  @param cfg: The cluster configuration
71
  @type node: L{objects.Node}
72
  @param node: The node
73
  @return: The OOB script if supported or an empty string otherwise
74

75
  """
76
  return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM]
77

    
78

    
79
class ResultWithJobs:
80
  """Data container for LU results with jobs.
81

82
  Instances of this class returned from L{LogicalUnit.Exec} will be recognized
83
  by L{mcpu.Processor._ProcessResult}. The latter will then submit the jobs
84
  contained in the C{jobs} attribute and include the job IDs in the opcode
85
  result.
86

87
  """
88
  def __init__(self, jobs, **kwargs):
89
    """Initializes this class.
90

91
    Additional return values can be specified as keyword arguments.
92

93
    @type jobs: list of lists of L{opcode.OpCode}
94
    @param jobs: A list of lists of opcode objects
95

96
    """
97
    self.jobs = jobs
98
    self.other = kwargs
99

    
100

    
101
class LogicalUnit(object):
102
  """Logical Unit base class.
103

104
  Subclasses must follow these rules:
105
    - implement ExpandNames
106
    - implement CheckPrereq (except when tasklets are used)
107
    - implement Exec (except when tasklets are used)
108
    - implement BuildHooksEnv
109
    - implement BuildHooksNodes
110
    - redefine HPATH and HTYPE
111
    - optionally redefine their run requirements:
112
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
113

114
  Note that all commands require root permissions.
115

116
  @ivar dry_run_result: the value (if any) that will be returned to the caller
117
      in dry-run mode (signalled by opcode dry_run parameter)
118

119
  """
120
  HPATH = None
121
  HTYPE = None
122
  REQ_BGL = True
123

    
124
  def __init__(self, processor, op, context, rpc):
125
    """Constructor for LogicalUnit.
126

127
    This needs to be overridden in derived classes in order to check op
128
    validity.
129

130
    """
131
    self.proc = processor
132
    self.op = op
133
    self.cfg = context.cfg
134
    self.glm = context.glm
135
    self.context = context
136
    self.rpc = rpc
137
    # Dicts used to declare locking needs to mcpu
138
    self.needed_locks = None
139
    self.share_locks = dict.fromkeys(locking.LEVELS, 0)
140
    self.add_locks = {}
141
    self.remove_locks = {}
142
    # Used to force good behavior when calling helper functions
143
    self.recalculate_locks = {}
144
    # logging
145
    self.Log = processor.Log # pylint: disable-msg=C0103
146
    self.LogWarning = processor.LogWarning # pylint: disable-msg=C0103
147
    self.LogInfo = processor.LogInfo # pylint: disable-msg=C0103
148
    self.LogStep = processor.LogStep # pylint: disable-msg=C0103
149
    # support for dry-run
150
    self.dry_run_result = None
151
    # support for generic debug attribute
152
    if (not hasattr(self.op, "debug_level") or
153
        not isinstance(self.op.debug_level, int)):
154
      self.op.debug_level = 0
155

    
156
    # Tasklets
157
    self.tasklets = None
158

    
159
    # Validate opcode parameters and set defaults
160
    self.op.Validate(True)
161

    
162
    self.CheckArguments()
163

    
164
  def CheckArguments(self):
165
    """Check syntactic validity for the opcode arguments.
166

167
    This method is for doing a simple syntactic check and ensure
168
    validity of opcode parameters, without any cluster-related
169
    checks. While the same can be accomplished in ExpandNames and/or
170
    CheckPrereq, doing these separate is better because:
171

172
      - ExpandNames is left as as purely a lock-related function
173
      - CheckPrereq is run after we have acquired locks (and possible
174
        waited for them)
175

176
    The function is allowed to change the self.op attribute so that
177
    later methods can no longer worry about missing parameters.
178

179
    """
180
    pass
181

    
182
  def ExpandNames(self):
183
    """Expand names for this LU.
184

185
    This method is called before starting to execute the opcode, and it should
186
    update all the parameters of the opcode to their canonical form (e.g. a
187
    short node name must be fully expanded after this method has successfully
188
    completed). This way locking, hooks, logging, etc. can work correctly.
189

190
    LUs which implement this method must also populate the self.needed_locks
191
    member, as a dict with lock levels as keys, and a list of needed lock names
192
    as values. Rules:
193

194
      - use an empty dict if you don't need any lock
195
      - if you don't need any lock at a particular level omit that level
196
      - don't put anything for the BGL level
197
      - if you want all locks at a level use locking.ALL_SET as a value
198

199
    If you need to share locks (rather than acquire them exclusively) at one
200
    level you can modify self.share_locks, setting a true value (usually 1) for
201
    that level. By default locks are not shared.
202

203
    This function can also define a list of tasklets, which then will be
204
    executed in order instead of the usual LU-level CheckPrereq and Exec
205
    functions, if those are not defined by the LU.
206

207
    Examples::
208

209
      # Acquire all nodes and one instance
210
      self.needed_locks = {
211
        locking.LEVEL_NODE: locking.ALL_SET,
212
        locking.LEVEL_INSTANCE: ['instance1.example.com'],
213
      }
214
      # Acquire just two nodes
215
      self.needed_locks = {
216
        locking.LEVEL_NODE: ['node1.example.com', 'node2.example.com'],
217
      }
218
      # Acquire no locks
219
      self.needed_locks = {} # No, you can't leave it to the default value None
220

221
    """
222
    # The implementation of this method is mandatory only if the new LU is
223
    # concurrent, so that old LUs don't need to be changed all at the same
224
    # time.
225
    if self.REQ_BGL:
226
      self.needed_locks = {} # Exclusive LUs don't need locks.
227
    else:
228
      raise NotImplementedError
229

    
230
  def DeclareLocks(self, level):
231
    """Declare LU locking needs for a level
232

233
    While most LUs can just declare their locking needs at ExpandNames time,
234
    sometimes there's the need to calculate some locks after having acquired
235
    the ones before. This function is called just before acquiring locks at a
236
    particular level, but after acquiring the ones at lower levels, and permits
237
    such calculations. It can be used to modify self.needed_locks, and by
238
    default it does nothing.
239

240
    This function is only called if you have something already set in
241
    self.needed_locks for the level.
242

243
    @param level: Locking level which is going to be locked
244
    @type level: member of ganeti.locking.LEVELS
245

246
    """
247

    
248
  def CheckPrereq(self):
249
    """Check prerequisites for this LU.
250

251
    This method should check that the prerequisites for the execution
252
    of this LU are fulfilled. It can do internode communication, but
253
    it should be idempotent - no cluster or system changes are
254
    allowed.
255

256
    The method should raise errors.OpPrereqError in case something is
257
    not fulfilled. Its return value is ignored.
258

259
    This method should also update all the parameters of the opcode to
260
    their canonical form if it hasn't been done by ExpandNames before.
261

262
    """
263
    if self.tasklets is not None:
264
      for (idx, tl) in enumerate(self.tasklets):
265
        logging.debug("Checking prerequisites for tasklet %s/%s",
266
                      idx + 1, len(self.tasklets))
267
        tl.CheckPrereq()
268
    else:
269
      pass
270

    
271
  def Exec(self, feedback_fn):
272
    """Execute the LU.
273

274
    This method should implement the actual work. It should raise
275
    errors.OpExecError for failures that are somewhat dealt with in
276
    code, or expected.
277

278
    """
279
    if self.tasklets is not None:
280
      for (idx, tl) in enumerate(self.tasklets):
281
        logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
282
        tl.Exec(feedback_fn)
283
    else:
284
      raise NotImplementedError
285

    
286
  def BuildHooksEnv(self):
287
    """Build hooks environment for this LU.
288

289
    @rtype: dict
290
    @return: Dictionary containing the environment that will be used for
291
      running the hooks for this LU. The keys of the dict must not be prefixed
292
      with "GANETI_"--that'll be added by the hooks runner. The hooks runner
293
      will extend the environment with additional variables. If no environment
294
      should be defined, an empty dictionary should be returned (not C{None}).
295
    @note: If the C{HPATH} attribute of the LU class is C{None}, this function
296
      will not be called.
297

298
    """
299
    raise NotImplementedError
300

    
301
  def BuildHooksNodes(self):
302
    """Build list of nodes to run LU's hooks.
303

304
    @rtype: tuple; (list, list)
305
    @return: Tuple containing a list of node names on which the hook
306
      should run before the execution and a list of node names on which the
307
      hook should run after the execution. No nodes should be returned as an
308
      empty list (and not None).
309
    @note: If the C{HPATH} attribute of the LU class is C{None}, this function
310
      will not be called.
311

312
    """
313
    raise NotImplementedError
314

    
315
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
316
    """Notify the LU about the results of its hooks.
317

318
    This method is called every time a hooks phase is executed, and notifies
319
    the Logical Unit about the hooks' result. The LU can then use it to alter
320
    its result based on the hooks.  By default the method does nothing and the
321
    previous result is passed back unchanged but any LU can define it if it
322
    wants to use the local cluster hook-scripts somehow.
323

324
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
325
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
326
    @param hook_results: the results of the multi-node hooks rpc call
327
    @param feedback_fn: function used send feedback back to the caller
328
    @param lu_result: the previous Exec result this LU had, or None
329
        in the PRE phase
330
    @return: the new Exec result, based on the previous result
331
        and hook results
332

333
    """
334
    # API must be kept, thus we ignore the unused argument and could
335
    # be a function warnings
336
    # pylint: disable-msg=W0613,R0201
337
    return lu_result
338

    
339
  def _ExpandAndLockInstance(self):
340
    """Helper function to expand and lock an instance.
341

342
    Many LUs that work on an instance take its name in self.op.instance_name
343
    and need to expand it and then declare the expanded name for locking. This
344
    function does it, and then updates self.op.instance_name to the expanded
345
    name. It also initializes needed_locks as a dict, if this hasn't been done
346
    before.
347

348
    """
349
    if self.needed_locks is None:
350
      self.needed_locks = {}
351
    else:
352
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
353
        "_ExpandAndLockInstance called with instance-level locks set"
354
    self.op.instance_name = _ExpandInstanceName(self.cfg,
355
                                                self.op.instance_name)
356
    self.needed_locks[locking.LEVEL_INSTANCE] = self.op.instance_name
357

    
358
  def _LockInstancesNodes(self, primary_only=False):
359
    """Helper function to declare instances' nodes for locking.
360

361
    This function should be called after locking one or more instances to lock
362
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
363
    with all primary or secondary nodes for instances already locked and
364
    present in self.needed_locks[locking.LEVEL_INSTANCE].
365

366
    It should be called from DeclareLocks, and for safety only works if
367
    self.recalculate_locks[locking.LEVEL_NODE] is set.
368

369
    In the future it may grow parameters to just lock some instance's nodes, or
370
    to just lock primaries or secondary nodes, if needed.
371

372
    If should be called in DeclareLocks in a way similar to::
373

374
      if level == locking.LEVEL_NODE:
375
        self._LockInstancesNodes()
376

377
    @type primary_only: boolean
378
    @param primary_only: only lock primary nodes of locked instances
379

380
    """
381
    assert locking.LEVEL_NODE in self.recalculate_locks, \
382
      "_LockInstancesNodes helper function called with no nodes to recalculate"
383

    
384
    # TODO: check if we're really been called with the instance locks held
385

    
386
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
387
    # future we might want to have different behaviors depending on the value
388
    # of self.recalculate_locks[locking.LEVEL_NODE]
389
    wanted_nodes = []
390
    for instance_name in self.glm.list_owned(locking.LEVEL_INSTANCE):
391
      instance = self.context.cfg.GetInstanceInfo(instance_name)
392
      wanted_nodes.append(instance.primary_node)
393
      if not primary_only:
394
        wanted_nodes.extend(instance.secondary_nodes)
395

    
396
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
397
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
398
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
399
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
400

    
401
    del self.recalculate_locks[locking.LEVEL_NODE]
402

    
403

    
404
class NoHooksLU(LogicalUnit): # pylint: disable-msg=W0223
405
  """Simple LU which runs no hooks.
406

407
  This LU is intended as a parent for other LogicalUnits which will
408
  run no hooks, in order to reduce duplicate code.
409

410
  """
411
  HPATH = None
412
  HTYPE = None
413

    
414
  def BuildHooksEnv(self):
415
    """Empty BuildHooksEnv for NoHooksLu.
416

417
    This just raises an error.
418

419
    """
420
    raise AssertionError("BuildHooksEnv called for NoHooksLUs")
421

    
422
  def BuildHooksNodes(self):
423
    """Empty BuildHooksNodes for NoHooksLU.
424

425
    """
426
    raise AssertionError("BuildHooksNodes called for NoHooksLU")
427

    
428

    
429
class Tasklet:
430
  """Tasklet base class.
431

432
  Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
433
  they can mix legacy code with tasklets. Locking needs to be done in the LU,
434
  tasklets know nothing about locks.
435

436
  Subclasses must follow these rules:
437
    - Implement CheckPrereq
438
    - Implement Exec
439

440
  """
441
  def __init__(self, lu):
442
    self.lu = lu
443

    
444
    # Shortcuts
445
    self.cfg = lu.cfg
446
    self.rpc = lu.rpc
447

    
448
  def CheckPrereq(self):
449
    """Check prerequisites for this tasklets.
450

451
    This method should check whether the prerequisites for the execution of
452
    this tasklet are fulfilled. It can do internode communication, but it
453
    should be idempotent - no cluster or system changes are allowed.
454

455
    The method should raise errors.OpPrereqError in case something is not
456
    fulfilled. Its return value is ignored.
457

458
    This method should also update all parameters to their canonical form if it
459
    hasn't been done before.
460

461
    """
462
    pass
463

    
464
  def Exec(self, feedback_fn):
465
    """Execute the tasklet.
466

467
    This method should implement the actual work. It should raise
468
    errors.OpExecError for failures that are somewhat dealt with in code, or
469
    expected.
470

471
    """
472
    raise NotImplementedError
473

    
474

    
475
class _QueryBase:
476
  """Base for query utility classes.
477

478
  """
479
  #: Attribute holding field definitions
480
  FIELDS = None
481

    
482
  def __init__(self, filter_, fields, use_locking):
483
    """Initializes this class.
484

485
    """
486
    self.use_locking = use_locking
487

    
488
    self.query = query.Query(self.FIELDS, fields, filter_=filter_,
489
                             namefield="name")
490
    self.requested_data = self.query.RequestedData()
491
    self.names = self.query.RequestedNames()
492

    
493
    # Sort only if no names were requested
494
    self.sort_by_name = not self.names
495

    
496
    self.do_locking = None
497
    self.wanted = None
498

    
499
  def _GetNames(self, lu, all_names, lock_level):
500
    """Helper function to determine names asked for in the query.
501

502
    """
503
    if self.do_locking:
504
      names = lu.glm.list_owned(lock_level)
505
    else:
506
      names = all_names
507

    
508
    if self.wanted == locking.ALL_SET:
509
      assert not self.names
510
      # caller didn't specify names, so ordering is not important
511
      return utils.NiceSort(names)
512

    
513
    # caller specified names and we must keep the same order
514
    assert self.names
515
    assert not self.do_locking or lu.glm.is_owned(lock_level)
516

    
517
    missing = set(self.wanted).difference(names)
518
    if missing:
519
      raise errors.OpExecError("Some items were removed before retrieving"
520
                               " their data: %s" % missing)
521

    
522
    # Return expanded names
523
    return self.wanted
524

    
525
  def ExpandNames(self, lu):
526
    """Expand names for this query.
527

528
    See L{LogicalUnit.ExpandNames}.
529

530
    """
531
    raise NotImplementedError()
532

    
533
  def DeclareLocks(self, lu, level):
534
    """Declare locks for this query.
535

536
    See L{LogicalUnit.DeclareLocks}.
537

538
    """
539
    raise NotImplementedError()
540

    
541
  def _GetQueryData(self, lu):
542
    """Collects all data for this query.
543

544
    @return: Query data object
545

546
    """
547
    raise NotImplementedError()
548

    
549
  def NewStyleQuery(self, lu):
550
    """Collect data and execute query.
551

552
    """
553
    return query.GetQueryResponse(self.query, self._GetQueryData(lu),
554
                                  sort_by_name=self.sort_by_name)
555

    
556
  def OldStyleQuery(self, lu):
557
    """Collect data and execute query.
558

559
    """
560
    return self.query.OldStyleQuery(self._GetQueryData(lu),
561
                                    sort_by_name=self.sort_by_name)
562

    
563

    
564
def _GetWantedNodes(lu, nodes):
565
  """Returns list of checked and expanded node names.
566

567
  @type lu: L{LogicalUnit}
568
  @param lu: the logical unit on whose behalf we execute
569
  @type nodes: list
570
  @param nodes: list of node names or None for all nodes
571
  @rtype: list
572
  @return: the list of nodes, sorted
573
  @raise errors.ProgrammerError: if the nodes parameter is wrong type
574

575
  """
576
  if nodes:
577
    return [_ExpandNodeName(lu.cfg, name) for name in nodes]
578

    
579
  return utils.NiceSort(lu.cfg.GetNodeList())
580

    
581

    
582
def _GetWantedInstances(lu, instances):
583
  """Returns list of checked and expanded instance names.
584

585
  @type lu: L{LogicalUnit}
586
  @param lu: the logical unit on whose behalf we execute
587
  @type instances: list
588
  @param instances: list of instance names or None for all instances
589
  @rtype: list
590
  @return: the list of instances, sorted
591
  @raise errors.OpPrereqError: if the instances parameter is wrong type
592
  @raise errors.OpPrereqError: if any of the passed instances is not found
593

594
  """
595
  if instances:
596
    wanted = [_ExpandInstanceName(lu.cfg, name) for name in instances]
597
  else:
598
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
599
  return wanted
600

    
601

    
602
def _GetUpdatedParams(old_params, update_dict,
603
                      use_default=True, use_none=False):
604
  """Return the new version of a parameter dictionary.
605

606
  @type old_params: dict
607
  @param old_params: old parameters
608
  @type update_dict: dict
609
  @param update_dict: dict containing new parameter values, or
610
      constants.VALUE_DEFAULT to reset the parameter to its default
611
      value
612
  @param use_default: boolean
613
  @type use_default: whether to recognise L{constants.VALUE_DEFAULT}
614
      values as 'to be deleted' values
615
  @param use_none: boolean
616
  @type use_none: whether to recognise C{None} values as 'to be
617
      deleted' values
618
  @rtype: dict
619
  @return: the new parameter dictionary
620

621
  """
622
  params_copy = copy.deepcopy(old_params)
623
  for key, val in update_dict.iteritems():
624
    if ((use_default and val == constants.VALUE_DEFAULT) or
625
        (use_none and val is None)):
626
      try:
627
        del params_copy[key]
628
      except KeyError:
629
        pass
630
    else:
631
      params_copy[key] = val
632
  return params_copy
633

    
634

    
635
def _ReleaseLocks(lu, level, names=None, keep=None):
636
  """Releases locks owned by an LU.
637

638
  @type lu: L{LogicalUnit}
639
  @param level: Lock level
640
  @type names: list or None
641
  @param names: Names of locks to release
642
  @type keep: list or None
643
  @param keep: Names of locks to retain
644

645
  """
646
  assert not (keep is not None and names is not None), \
647
         "Only one of the 'names' and the 'keep' parameters can be given"
648

    
649
  if names is not None:
650
    should_release = names.__contains__
651
  elif keep:
652
    should_release = lambda name: name not in keep
653
  else:
654
    should_release = None
655

    
656
  if should_release:
657
    retain = []
658
    release = []
659

    
660
    # Determine which locks to release
661
    for name in lu.glm.list_owned(level):
662
      if should_release(name):
663
        release.append(name)
664
      else:
665
        retain.append(name)
666

    
667
    assert len(lu.glm.list_owned(level)) == (len(retain) + len(release))
668

    
669
    # Release just some locks
670
    lu.glm.release(level, names=release)
671

    
672
    assert frozenset(lu.glm.list_owned(level)) == frozenset(retain)
673
  else:
674
    # Release everything
675
    lu.glm.release(level)
676

    
677
    assert not lu.glm.is_owned(level), "No locks should be owned"
678

    
679

    
680
def _RunPostHook(lu, node_name):
681
  """Runs the post-hook for an opcode on a single node.
682

683
  """
684
  hm = lu.proc.hmclass(lu.rpc.call_hooks_runner, lu)
685
  try:
686
    hm.RunPhase(constants.HOOKS_PHASE_POST, nodes=[node_name])
687
  except:
688
    # pylint: disable-msg=W0702
689
    lu.LogWarning("Errors occurred running hooks on %s" % node_name)
690

    
691

    
692
def _CheckOutputFields(static, dynamic, selected):
693
  """Checks whether all selected fields are valid.
694

695
  @type static: L{utils.FieldSet}
696
  @param static: static fields set
697
  @type dynamic: L{utils.FieldSet}
698
  @param dynamic: dynamic fields set
699

700
  """
701
  f = utils.FieldSet()
702
  f.Extend(static)
703
  f.Extend(dynamic)
704

    
705
  delta = f.NonMatching(selected)
706
  if delta:
707
    raise errors.OpPrereqError("Unknown output fields selected: %s"
708
                               % ",".join(delta), errors.ECODE_INVAL)
709

    
710

    
711
def _CheckGlobalHvParams(params):
712
  """Validates that given hypervisor params are not global ones.
713

714
  This will ensure that instances don't get customised versions of
715
  global params.
716

717
  """
718
  used_globals = constants.HVC_GLOBALS.intersection(params)
719
  if used_globals:
720
    msg = ("The following hypervisor parameters are global and cannot"
721
           " be customized at instance level, please modify them at"
722
           " cluster level: %s" % utils.CommaJoin(used_globals))
723
    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
724

    
725

    
726
def _CheckNodeOnline(lu, node, msg=None):
727
  """Ensure that a given node is online.
728

729
  @param lu: the LU on behalf of which we make the check
730
  @param node: the node to check
731
  @param msg: if passed, should be a message to replace the default one
732
  @raise errors.OpPrereqError: if the node is offline
733

734
  """
735
  if msg is None:
736
    msg = "Can't use offline node"
737
  if lu.cfg.GetNodeInfo(node).offline:
738
    raise errors.OpPrereqError("%s: %s" % (msg, node), errors.ECODE_STATE)
739

    
740

    
741
def _CheckNodeNotDrained(lu, node):
742
  """Ensure that a given node is not drained.
743

744
  @param lu: the LU on behalf of which we make the check
745
  @param node: the node to check
746
  @raise errors.OpPrereqError: if the node is drained
747

748
  """
749
  if lu.cfg.GetNodeInfo(node).drained:
750
    raise errors.OpPrereqError("Can't use drained node %s" % node,
751
                               errors.ECODE_STATE)
752

    
753

    
754
def _CheckNodeVmCapable(lu, node):
755
  """Ensure that a given node is vm capable.
756

757
  @param lu: the LU on behalf of which we make the check
758
  @param node: the node to check
759
  @raise errors.OpPrereqError: if the node is not vm capable
760

761
  """
762
  if not lu.cfg.GetNodeInfo(node).vm_capable:
763
    raise errors.OpPrereqError("Can't use non-vm_capable node %s" % node,
764
                               errors.ECODE_STATE)
765

    
766

    
767
def _CheckNodeHasOS(lu, node, os_name, force_variant):
768
  """Ensure that a node supports a given OS.
769

770
  @param lu: the LU on behalf of which we make the check
771
  @param node: the node to check
772
  @param os_name: the OS to query about
773
  @param force_variant: whether to ignore variant errors
774
  @raise errors.OpPrereqError: if the node is not supporting the OS
775

776
  """
777
  result = lu.rpc.call_os_get(node, os_name)
778
  result.Raise("OS '%s' not in supported OS list for node %s" %
779
               (os_name, node),
780
               prereq=True, ecode=errors.ECODE_INVAL)
781
  if not force_variant:
782
    _CheckOSVariant(result.payload, os_name)
783

    
784

    
785
def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
786
  """Ensure that a node has the given secondary ip.
787

788
  @type lu: L{LogicalUnit}
789
  @param lu: the LU on behalf of which we make the check
790
  @type node: string
791
  @param node: the node to check
792
  @type secondary_ip: string
793
  @param secondary_ip: the ip to check
794
  @type prereq: boolean
795
  @param prereq: whether to throw a prerequisite or an execute error
796
  @raise errors.OpPrereqError: if the node doesn't have the ip, and prereq=True
797
  @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
798

799
  """
800
  result = lu.rpc.call_node_has_ip_address(node, secondary_ip)
801
  result.Raise("Failure checking secondary ip on node %s" % node,
802
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
803
  if not result.payload:
804
    msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
805
           " please fix and re-run this command" % secondary_ip)
806
    if prereq:
807
      raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
808
    else:
809
      raise errors.OpExecError(msg)
810

    
811

    
812
def _GetClusterDomainSecret():
813
  """Reads the cluster domain secret.
814

815
  """
816
  return utils.ReadOneLineFile(constants.CLUSTER_DOMAIN_SECRET_FILE,
817
                               strict=True)
818

    
819

    
820
def _CheckInstanceDown(lu, instance, reason):
821
  """Ensure that an instance is not running."""
822
  if instance.admin_up:
823
    raise errors.OpPrereqError("Instance %s is marked to be up, %s" %
824
                               (instance.name, reason), errors.ECODE_STATE)
825

    
826
  pnode = instance.primary_node
827
  ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
828
  ins_l.Raise("Can't contact node %s for instance information" % pnode,
829
              prereq=True, ecode=errors.ECODE_ENVIRON)
830

    
831
  if instance.name in ins_l.payload:
832
    raise errors.OpPrereqError("Instance %s is running, %s" %
833
                               (instance.name, reason), errors.ECODE_STATE)
834

    
835

    
836
def _ExpandItemName(fn, name, kind):
837
  """Expand an item name.
838

839
  @param fn: the function to use for expansion
840
  @param name: requested item name
841
  @param kind: text description ('Node' or 'Instance')
842
  @return: the resolved (full) name
843
  @raise errors.OpPrereqError: if the item is not found
844

845
  """
846
  full_name = fn(name)
847
  if full_name is None:
848
    raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
849
                               errors.ECODE_NOENT)
850
  return full_name
851

    
852

    
853
def _ExpandNodeName(cfg, name):
854
  """Wrapper over L{_ExpandItemName} for nodes."""
855
  return _ExpandItemName(cfg.ExpandNodeName, name, "Node")
856

    
857

    
858
def _ExpandInstanceName(cfg, name):
859
  """Wrapper over L{_ExpandItemName} for instance."""
860
  return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
861

    
862

    
863
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
864
                          memory, vcpus, nics, disk_template, disks,
865
                          bep, hvp, hypervisor_name, tags):
866
  """Builds instance related env variables for hooks
867

868
  This builds the hook environment from individual variables.
869

870
  @type name: string
871
  @param name: the name of the instance
872
  @type primary_node: string
873
  @param primary_node: the name of the instance's primary node
874
  @type secondary_nodes: list
875
  @param secondary_nodes: list of secondary nodes as strings
876
  @type os_type: string
877
  @param os_type: the name of the instance's OS
878
  @type status: boolean
879
  @param status: the should_run status of the instance
880
  @type memory: string
881
  @param memory: the memory size of the instance
882
  @type vcpus: string
883
  @param vcpus: the count of VCPUs the instance has
884
  @type nics: list
885
  @param nics: list of tuples (ip, mac, mode, link) representing
886
      the NICs the instance has
887
  @type disk_template: string
888
  @param disk_template: the disk template of the instance
889
  @type disks: list
890
  @param disks: the list of (size, mode) pairs
891
  @type bep: dict
892
  @param bep: the backend parameters for the instance
893
  @type hvp: dict
894
  @param hvp: the hypervisor parameters for the instance
895
  @type hypervisor_name: string
896
  @param hypervisor_name: the hypervisor for the instance
897
  @type tags: list
898
  @param tags: list of instance tags as strings
899
  @rtype: dict
900
  @return: the hook environment for this instance
901

902
  """
903
  if status:
904
    str_status = "up"
905
  else:
906
    str_status = "down"
907
  env = {
908
    "OP_TARGET": name,
909
    "INSTANCE_NAME": name,
910
    "INSTANCE_PRIMARY": primary_node,
911
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
912
    "INSTANCE_OS_TYPE": os_type,
913
    "INSTANCE_STATUS": str_status,
914
    "INSTANCE_MEMORY": memory,
915
    "INSTANCE_VCPUS": vcpus,
916
    "INSTANCE_DISK_TEMPLATE": disk_template,
917
    "INSTANCE_HYPERVISOR": hypervisor_name,
918
  }
919

    
920
  if nics:
921
    nic_count = len(nics)
922
    for idx, (ip, mac, mode, link) in enumerate(nics):
923
      if ip is None:
924
        ip = ""
925
      env["INSTANCE_NIC%d_IP" % idx] = ip
926
      env["INSTANCE_NIC%d_MAC" % idx] = mac
927
      env["INSTANCE_NIC%d_MODE" % idx] = mode
928
      env["INSTANCE_NIC%d_LINK" % idx] = link
929
      if mode == constants.NIC_MODE_BRIDGED:
930
        env["INSTANCE_NIC%d_BRIDGE" % idx] = link
931
  else:
932
    nic_count = 0
933

    
934
  env["INSTANCE_NIC_COUNT"] = nic_count
935

    
936
  if disks:
937
    disk_count = len(disks)
938
    for idx, (size, mode) in enumerate(disks):
939
      env["INSTANCE_DISK%d_SIZE" % idx] = size
940
      env["INSTANCE_DISK%d_MODE" % idx] = mode
941
  else:
942
    disk_count = 0
943

    
944
  env["INSTANCE_DISK_COUNT"] = disk_count
945

    
946
  if not tags:
947
    tags = []
948

    
949
  env["INSTANCE_TAGS"] = " ".join(tags)
950

    
951
  for source, kind in [(bep, "BE"), (hvp, "HV")]:
952
    for key, value in source.items():
953
      env["INSTANCE_%s_%s" % (kind, key)] = value
954

    
955
  return env
956

    
957

    
958
def _NICListToTuple(lu, nics):
959
  """Build a list of nic information tuples.
960

961
  This list is suitable to be passed to _BuildInstanceHookEnv or as a return
962
  value in LUInstanceQueryData.
963

964
  @type lu:  L{LogicalUnit}
965
  @param lu: the logical unit on whose behalf we execute
966
  @type nics: list of L{objects.NIC}
967
  @param nics: list of nics to convert to hooks tuples
968

969
  """
970
  hooks_nics = []
971
  cluster = lu.cfg.GetClusterInfo()
972
  for nic in nics:
973
    ip = nic.ip
974
    mac = nic.mac
975
    filled_params = cluster.SimpleFillNIC(nic.nicparams)
976
    mode = filled_params[constants.NIC_MODE]
977
    link = filled_params[constants.NIC_LINK]
978
    hooks_nics.append((ip, mac, mode, link))
979
  return hooks_nics
980

    
981

    
982
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
983
  """Builds instance related env variables for hooks from an object.
984

985
  @type lu: L{LogicalUnit}
986
  @param lu: the logical unit on whose behalf we execute
987
  @type instance: L{objects.Instance}
988
  @param instance: the instance for which we should build the
989
      environment
990
  @type override: dict
991
  @param override: dictionary with key/values that will override
992
      our values
993
  @rtype: dict
994
  @return: the hook environment dictionary
995

996
  """
997
  cluster = lu.cfg.GetClusterInfo()
998
  bep = cluster.FillBE(instance)
999
  hvp = cluster.FillHV(instance)
1000
  args = {
1001
    'name': instance.name,
1002
    'primary_node': instance.primary_node,
1003
    'secondary_nodes': instance.secondary_nodes,
1004
    'os_type': instance.os,
1005
    'status': instance.admin_up,
1006
    'memory': bep[constants.BE_MEMORY],
1007
    'vcpus': bep[constants.BE_VCPUS],
1008
    'nics': _NICListToTuple(lu, instance.nics),
1009
    'disk_template': instance.disk_template,
1010
    'disks': [(disk.size, disk.mode) for disk in instance.disks],
1011
    'bep': bep,
1012
    'hvp': hvp,
1013
    'hypervisor_name': instance.hypervisor,
1014
    'tags': instance.tags,
1015
  }
1016
  if override:
1017
    args.update(override)
1018
  return _BuildInstanceHookEnv(**args) # pylint: disable-msg=W0142
1019

    
1020

    
1021
def _AdjustCandidatePool(lu, exceptions):
1022
  """Adjust the candidate pool after node operations.
1023

1024
  """
1025
  mod_list = lu.cfg.MaintainCandidatePool(exceptions)
1026
  if mod_list:
1027
    lu.LogInfo("Promoted nodes to master candidate role: %s",
1028
               utils.CommaJoin(node.name for node in mod_list))
1029
    for name in mod_list:
1030
      lu.context.ReaddNode(name)
1031
  mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
1032
  if mc_now > mc_max:
1033
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
1034
               (mc_now, mc_max))
1035

    
1036

    
1037
def _DecideSelfPromotion(lu, exceptions=None):
1038
  """Decide whether I should promote myself as a master candidate.
1039

1040
  """
1041
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
1042
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
1043
  # the new node will increase mc_max with one, so:
1044
  mc_should = min(mc_should + 1, cp_size)
1045
  return mc_now < mc_should
1046

    
1047

    
1048
def _CheckNicsBridgesExist(lu, target_nics, target_node):
1049
  """Check that the brigdes needed by a list of nics exist.
1050

1051
  """
1052
  cluster = lu.cfg.GetClusterInfo()
1053
  paramslist = [cluster.SimpleFillNIC(nic.nicparams) for nic in target_nics]
1054
  brlist = [params[constants.NIC_LINK] for params in paramslist
1055
            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
1056
  if brlist:
1057
    result = lu.rpc.call_bridges_exist(target_node, brlist)
1058
    result.Raise("Error checking bridges on destination node '%s'" %
1059
                 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
1060

    
1061

    
1062
def _CheckInstanceBridgesExist(lu, instance, node=None):
1063
  """Check that the brigdes needed by an instance exist.
1064

1065
  """
1066
  if node is None:
1067
    node = instance.primary_node
1068
  _CheckNicsBridgesExist(lu, instance.nics, node)
1069

    
1070

    
1071
def _CheckOSVariant(os_obj, name):
1072
  """Check whether an OS name conforms to the os variants specification.
1073

1074
  @type os_obj: L{objects.OS}
1075
  @param os_obj: OS object to check
1076
  @type name: string
1077
  @param name: OS name passed by the user, to check for validity
1078

1079
  """
1080
  if not os_obj.supported_variants:
1081
    return
1082
  variant = objects.OS.GetVariant(name)
1083
  if not variant:
1084
    raise errors.OpPrereqError("OS name must include a variant",
1085
                               errors.ECODE_INVAL)
1086

    
1087
  if variant not in os_obj.supported_variants:
1088
    raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
1089

    
1090

    
1091
def _GetNodeInstancesInner(cfg, fn):
1092
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
1093

    
1094

    
1095
def _GetNodeInstances(cfg, node_name):
1096
  """Returns a list of all primary and secondary instances on a node.
1097

1098
  """
1099

    
1100
  return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
1101

    
1102

    
1103
def _GetNodePrimaryInstances(cfg, node_name):
1104
  """Returns primary instances on a node.
1105

1106
  """
1107
  return _GetNodeInstancesInner(cfg,
1108
                                lambda inst: node_name == inst.primary_node)
1109

    
1110

    
1111
def _GetNodeSecondaryInstances(cfg, node_name):
1112
  """Returns secondary instances on a node.
1113

1114
  """
1115
  return _GetNodeInstancesInner(cfg,
1116
                                lambda inst: node_name in inst.secondary_nodes)
1117

    
1118

    
1119
def _GetStorageTypeArgs(cfg, storage_type):
1120
  """Returns the arguments for a storage type.
1121

1122
  """
1123
  # Special case for file storage
1124
  if storage_type == constants.ST_FILE:
1125
    # storage.FileStorage wants a list of storage directories
1126
    return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
1127

    
1128
  return []
1129

    
1130

    
1131
def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
1132
  faulty = []
1133

    
1134
  for dev in instance.disks:
1135
    cfg.SetDiskID(dev, node_name)
1136

    
1137
  result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
1138
  result.Raise("Failed to get disk status from node %s" % node_name,
1139
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
1140

    
1141
  for idx, bdev_status in enumerate(result.payload):
1142
    if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
1143
      faulty.append(idx)
1144

    
1145
  return faulty
1146

    
1147

    
1148
def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
1149
  """Check the sanity of iallocator and node arguments and use the
1150
  cluster-wide iallocator if appropriate.
1151

1152
  Check that at most one of (iallocator, node) is specified. If none is
1153
  specified, then the LU's opcode's iallocator slot is filled with the
1154
  cluster-wide default iallocator.
1155

1156
  @type iallocator_slot: string
1157
  @param iallocator_slot: the name of the opcode iallocator slot
1158
  @type node_slot: string
1159
  @param node_slot: the name of the opcode target node slot
1160

1161
  """
1162
  node = getattr(lu.op, node_slot, None)
1163
  iallocator = getattr(lu.op, iallocator_slot, None)
1164

    
1165
  if node is not None and iallocator is not None:
1166
    raise errors.OpPrereqError("Do not specify both, iallocator and node",
1167
                               errors.ECODE_INVAL)
1168
  elif node is None and iallocator is None:
1169
    default_iallocator = lu.cfg.GetDefaultIAllocator()
1170
    if default_iallocator:
1171
      setattr(lu.op, iallocator_slot, default_iallocator)
1172
    else:
1173
      raise errors.OpPrereqError("No iallocator or node given and no"
1174
                                 " cluster-wide default iallocator found;"
1175
                                 " please specify either an iallocator or a"
1176
                                 " node, or set a cluster-wide default"
1177
                                 " iallocator")
1178

    
1179

    
1180
class LUClusterPostInit(LogicalUnit):
1181
  """Logical unit for running hooks after cluster initialization.
1182

1183
  """
1184
  HPATH = "cluster-init"
1185
  HTYPE = constants.HTYPE_CLUSTER
1186

    
1187
  def BuildHooksEnv(self):
1188
    """Build hooks env.
1189

1190
    """
1191
    return {
1192
      "OP_TARGET": self.cfg.GetClusterName(),
1193
      }
1194

    
1195
  def BuildHooksNodes(self):
1196
    """Build hooks nodes.
1197

1198
    """
1199
    return ([], [self.cfg.GetMasterNode()])
1200

    
1201
  def Exec(self, feedback_fn):
1202
    """Nothing to do.
1203

1204
    """
1205
    return True
1206

    
1207

    
1208
class LUClusterDestroy(LogicalUnit):
1209
  """Logical unit for destroying the cluster.
1210

1211
  """
1212
  HPATH = "cluster-destroy"
1213
  HTYPE = constants.HTYPE_CLUSTER
1214

    
1215
  def BuildHooksEnv(self):
1216
    """Build hooks env.
1217

1218
    """
1219
    return {
1220
      "OP_TARGET": self.cfg.GetClusterName(),
1221
      }
1222

    
1223
  def BuildHooksNodes(self):
1224
    """Build hooks nodes.
1225

1226
    """
1227
    return ([], [])
1228

    
1229
  def CheckPrereq(self):
1230
    """Check prerequisites.
1231

1232
    This checks whether the cluster is empty.
1233

1234
    Any errors are signaled by raising errors.OpPrereqError.
1235

1236
    """
1237
    master = self.cfg.GetMasterNode()
1238

    
1239
    nodelist = self.cfg.GetNodeList()
1240
    if len(nodelist) != 1 or nodelist[0] != master:
1241
      raise errors.OpPrereqError("There are still %d node(s) in"
1242
                                 " this cluster." % (len(nodelist) - 1),
1243
                                 errors.ECODE_INVAL)
1244
    instancelist = self.cfg.GetInstanceList()
1245
    if instancelist:
1246
      raise errors.OpPrereqError("There are still %d instance(s) in"
1247
                                 " this cluster." % len(instancelist),
1248
                                 errors.ECODE_INVAL)
1249

    
1250
  def Exec(self, feedback_fn):
1251
    """Destroys the cluster.
1252

1253
    """
1254
    master = self.cfg.GetMasterNode()
1255

    
1256
    # Run post hooks on master node before it's removed
1257
    _RunPostHook(self, master)
1258

    
1259
    result = self.rpc.call_node_stop_master(master, False)
1260
    result.Raise("Could not disable the master role")
1261

    
1262
    return master
1263

    
1264

    
1265
def _VerifyCertificate(filename):
1266
  """Verifies a certificate for L{LUClusterVerifyConfig}.
1267

1268
  @type filename: string
1269
  @param filename: Path to PEM file
1270

1271
  """
1272
  try:
1273
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1274
                                           utils.ReadFile(filename))
1275
  except Exception, err: # pylint: disable-msg=W0703
1276
    return (LUClusterVerifyConfig.ETYPE_ERROR,
1277
            "Failed to load X509 certificate %s: %s" % (filename, err))
1278

    
1279
  (errcode, msg) = \
1280
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1281
                                constants.SSL_CERT_EXPIRATION_ERROR)
1282

    
1283
  if msg:
1284
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1285
  else:
1286
    fnamemsg = None
1287

    
1288
  if errcode is None:
1289
    return (None, fnamemsg)
1290
  elif errcode == utils.CERT_WARNING:
1291
    return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1292
  elif errcode == utils.CERT_ERROR:
1293
    return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1294

    
1295
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1296

    
1297

    
1298
def _GetAllHypervisorParameters(cluster, instances):
1299
  """Compute the set of all hypervisor parameters.
1300

1301
  @type cluster: L{objects.Cluster}
1302
  @param cluster: the cluster object
1303
  @param instances: list of L{objects.Instance}
1304
  @param instances: additional instances from which to obtain parameters
1305
  @rtype: list of (origin, hypervisor, parameters)
1306
  @return: a list with all parameters found, indicating the hypervisor they
1307
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1308

1309
  """
1310
  hvp_data = []
1311

    
1312
  for hv_name in cluster.enabled_hypervisors:
1313
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1314

    
1315
  for os_name, os_hvp in cluster.os_hvp.items():
1316
    for hv_name, hv_params in os_hvp.items():
1317
      if hv_params:
1318
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1319
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1320

    
1321
  # TODO: collapse identical parameter values in a single one
1322
  for instance in instances:
1323
    if instance.hvparams:
1324
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1325
                       cluster.FillHV(instance)))
1326

    
1327
  return hvp_data
1328

    
1329

    
1330
class _VerifyErrors(object):
1331
  """Mix-in for cluster/group verify LUs.
1332

1333
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1334
  self.op and self._feedback_fn to be available.)
1335

1336
  """
1337
  TCLUSTER = "cluster"
1338
  TNODE = "node"
1339
  TINSTANCE = "instance"
1340

    
1341
  ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
1342
  ECLUSTERCERT = (TCLUSTER, "ECLUSTERCERT")
1343
  ECLUSTERFILECHECK = (TCLUSTER, "ECLUSTERFILECHECK")
1344
  ECLUSTERDANGLINGNODES = (TNODE, "ECLUSTERDANGLINGNODES")
1345
  ECLUSTERDANGLINGINST = (TNODE, "ECLUSTERDANGLINGINST")
1346
  EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
1347
  EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
1348
  EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
1349
  EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
1350
  EINSTANCEFAULTYDISK = (TINSTANCE, "EINSTANCEFAULTYDISK")
1351
  EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE")
1352
  EINSTANCESPLITGROUPS = (TINSTANCE, "EINSTANCESPLITGROUPS")
1353
  ENODEDRBD = (TNODE, "ENODEDRBD")
1354
  ENODEDRBDHELPER = (TNODE, "ENODEDRBDHELPER")
1355
  ENODEFILECHECK = (TNODE, "ENODEFILECHECK")
1356
  ENODEHOOKS = (TNODE, "ENODEHOOKS")
1357
  ENODEHV = (TNODE, "ENODEHV")
1358
  ENODELVM = (TNODE, "ENODELVM")
1359
  ENODEN1 = (TNODE, "ENODEN1")
1360
  ENODENET = (TNODE, "ENODENET")
1361
  ENODEOS = (TNODE, "ENODEOS")
1362
  ENODEORPHANINSTANCE = (TNODE, "ENODEORPHANINSTANCE")
1363
  ENODEORPHANLV = (TNODE, "ENODEORPHANLV")
1364
  ENODERPC = (TNODE, "ENODERPC")
1365
  ENODESSH = (TNODE, "ENODESSH")
1366
  ENODEVERSION = (TNODE, "ENODEVERSION")
1367
  ENODESETUP = (TNODE, "ENODESETUP")
1368
  ENODETIME = (TNODE, "ENODETIME")
1369
  ENODEOOBPATH = (TNODE, "ENODEOOBPATH")
1370

    
1371
  ETYPE_FIELD = "code"
1372
  ETYPE_ERROR = "ERROR"
1373
  ETYPE_WARNING = "WARNING"
1374

    
1375
  def _Error(self, ecode, item, msg, *args, **kwargs):
1376
    """Format an error message.
1377

1378
    Based on the opcode's error_codes parameter, either format a
1379
    parseable error code, or a simpler error string.
1380

1381
    This must be called only from Exec and functions called from Exec.
1382

1383
    """
1384
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1385
    itype, etxt = ecode
1386
    # first complete the msg
1387
    if args:
1388
      msg = msg % args
1389
    # then format the whole message
1390
    if self.op.error_codes: # This is a mix-in. pylint: disable-msg=E1101
1391
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1392
    else:
1393
      if item:
1394
        item = " " + item
1395
      else:
1396
        item = ""
1397
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1398
    # and finally report it via the feedback_fn
1399
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable-msg=E1101
1400

    
1401
  def _ErrorIf(self, cond, *args, **kwargs):
1402
    """Log an error message if the passed condition is True.
1403

1404
    """
1405
    cond = (bool(cond)
1406
            or self.op.debug_simulate_errors) # pylint: disable-msg=E1101
1407
    if cond:
1408
      self._Error(*args, **kwargs)
1409
    # do not mark the operation as failed for WARN cases only
1410
    if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
1411
      self.bad = self.bad or cond
1412

    
1413

    
1414
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1415
  """Verifies the cluster config.
1416

1417
  """
1418
  REQ_BGL = True
1419

    
1420
  def _VerifyHVP(self, hvp_data):
1421
    """Verifies locally the syntax of the hypervisor parameters.
1422

1423
    """
1424
    for item, hv_name, hv_params in hvp_data:
1425
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1426
             (item, hv_name))
1427
      try:
1428
        hv_class = hypervisor.GetHypervisor(hv_name)
1429
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1430
        hv_class.CheckParameterSyntax(hv_params)
1431
      except errors.GenericError, err:
1432
        self._ErrorIf(True, self.ECLUSTERCFG, None, msg % str(err))
1433

    
1434
  def ExpandNames(self):
1435
    # Information can be safely retrieved as the BGL is acquired in exclusive
1436
    # mode
1437
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1438
    self.all_node_info = self.cfg.GetAllNodesInfo()
1439
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1440
    self.needed_locks = {}
1441

    
1442
  def Exec(self, feedback_fn):
1443
    """Verify integrity of cluster, performing various test on nodes.
1444

1445
    """
1446
    self.bad = False
1447
    self._feedback_fn = feedback_fn
1448

    
1449
    feedback_fn("* Verifying cluster config")
1450

    
1451
    for msg in self.cfg.VerifyConfig():
1452
      self._ErrorIf(True, self.ECLUSTERCFG, None, msg)
1453

    
1454
    feedback_fn("* Verifying cluster certificate files")
1455

    
1456
    for cert_filename in constants.ALL_CERT_FILES:
1457
      (errcode, msg) = _VerifyCertificate(cert_filename)
1458
      self._ErrorIf(errcode, self.ECLUSTERCERT, None, msg, code=errcode)
1459

    
1460
    feedback_fn("* Verifying hypervisor parameters")
1461

    
1462
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1463
                                                self.all_inst_info.values()))
1464

    
1465
    feedback_fn("* Verifying all nodes belong to an existing group")
1466

    
1467
    # We do this verification here because, should this bogus circumstance
1468
    # occur, it would never be caught by VerifyGroup, which only acts on
1469
    # nodes/instances reachable from existing node groups.
1470

    
1471
    dangling_nodes = set(node.name for node in self.all_node_info.values()
1472
                         if node.group not in self.all_group_info)
1473

    
1474
    dangling_instances = {}
1475
    no_node_instances = []
1476

    
1477
    for inst in self.all_inst_info.values():
1478
      if inst.primary_node in dangling_nodes:
1479
        dangling_instances.setdefault(inst.primary_node, []).append(inst.name)
1480
      elif inst.primary_node not in self.all_node_info:
1481
        no_node_instances.append(inst.name)
1482

    
1483
    pretty_dangling = [
1484
        "%s (%s)" %
1485
        (node.name,
1486
         utils.CommaJoin(dangling_instances.get(node.name,
1487
                                                ["no instances"])))
1488
        for node in dangling_nodes]
1489

    
1490
    self._ErrorIf(bool(dangling_nodes), self.ECLUSTERDANGLINGNODES, None,
1491
                  "the following nodes (and their instances) belong to a non"
1492
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1493

    
1494
    self._ErrorIf(bool(no_node_instances), self.ECLUSTERDANGLINGINST, None,
1495
                  "the following instances have a non-existing primary-node:"
1496
                  " %s", utils.CommaJoin(no_node_instances))
1497

    
1498
    return (not self.bad, [g.name for g in self.all_group_info.values()])
1499

    
1500

    
1501
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1502
  """Verifies the status of a node group.
1503

1504
  """
1505
  HPATH = "cluster-verify"
1506
  HTYPE = constants.HTYPE_CLUSTER
1507
  REQ_BGL = False
1508

    
1509
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1510

    
1511
  class NodeImage(object):
1512
    """A class representing the logical and physical status of a node.
1513

1514
    @type name: string
1515
    @ivar name: the node name to which this object refers
1516
    @ivar volumes: a structure as returned from
1517
        L{ganeti.backend.GetVolumeList} (runtime)
1518
    @ivar instances: a list of running instances (runtime)
1519
    @ivar pinst: list of configured primary instances (config)
1520
    @ivar sinst: list of configured secondary instances (config)
1521
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1522
        instances for which this node is secondary (config)
1523
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1524
    @ivar dfree: free disk, as reported by the node (runtime)
1525
    @ivar offline: the offline status (config)
1526
    @type rpc_fail: boolean
1527
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1528
        not whether the individual keys were correct) (runtime)
1529
    @type lvm_fail: boolean
1530
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1531
    @type hyp_fail: boolean
1532
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1533
    @type ghost: boolean
1534
    @ivar ghost: whether this is a known node or not (config)
1535
    @type os_fail: boolean
1536
    @ivar os_fail: whether the RPC call didn't return valid OS data
1537
    @type oslist: list
1538
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1539
    @type vm_capable: boolean
1540
    @ivar vm_capable: whether the node can host instances
1541

1542
    """
1543
    def __init__(self, offline=False, name=None, vm_capable=True):
1544
      self.name = name
1545
      self.volumes = {}
1546
      self.instances = []
1547
      self.pinst = []
1548
      self.sinst = []
1549
      self.sbp = {}
1550
      self.mfree = 0
1551
      self.dfree = 0
1552
      self.offline = offline
1553
      self.vm_capable = vm_capable
1554
      self.rpc_fail = False
1555
      self.lvm_fail = False
1556
      self.hyp_fail = False
1557
      self.ghost = False
1558
      self.os_fail = False
1559
      self.oslist = {}
1560

    
1561
  def ExpandNames(self):
1562
    # This raises errors.OpPrereqError on its own:
1563
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1564

    
1565
    # Get instances in node group; this is unsafe and needs verification later
1566
    inst_names = self.cfg.GetNodeGroupInstances(self.group_uuid)
1567

    
1568
    self.needed_locks = {
1569
      locking.LEVEL_INSTANCE: inst_names,
1570
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1571
      locking.LEVEL_NODE: [],
1572
      }
1573

    
1574
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1575

    
1576
  def DeclareLocks(self, level):
1577
    if level == locking.LEVEL_NODE:
1578
      # Get members of node group; this is unsafe and needs verification later
1579
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1580

    
1581
      all_inst_info = self.cfg.GetAllInstancesInfo()
1582

    
1583
      # In Exec(), we warn about mirrored instances that have primary and
1584
      # secondary living in separate node groups. To fully verify that
1585
      # volumes for these instances are healthy, we will need to do an
1586
      # extra call to their secondaries. We ensure here those nodes will
1587
      # be locked.
1588
      for inst in self.glm.list_owned(locking.LEVEL_INSTANCE):
1589
        # Important: access only the instances whose lock is owned
1590
        if all_inst_info[inst].disk_template in constants.DTS_INT_MIRROR:
1591
          nodes.update(all_inst_info[inst].secondary_nodes)
1592

    
1593
      self.needed_locks[locking.LEVEL_NODE] = nodes
1594

    
1595
  def CheckPrereq(self):
1596
    group_nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1597
    group_instances = self.cfg.GetNodeGroupInstances(self.group_uuid)
1598

    
1599
    unlocked_nodes = \
1600
        group_nodes.difference(self.glm.list_owned(locking.LEVEL_NODE))
1601

    
1602
    unlocked_instances = \
1603
        group_instances.difference(self.glm.list_owned(locking.LEVEL_INSTANCE))
1604

    
1605
    if unlocked_nodes:
1606
      raise errors.OpPrereqError("Missing lock for nodes: %s" %
1607
                                 utils.CommaJoin(unlocked_nodes))
1608

    
1609
    if unlocked_instances:
1610
      raise errors.OpPrereqError("Missing lock for instances: %s" %
1611
                                 utils.CommaJoin(unlocked_instances))
1612

    
1613
    self.all_node_info = self.cfg.GetAllNodesInfo()
1614
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1615

    
1616
    self.my_node_names = utils.NiceSort(group_nodes)
1617
    self.my_inst_names = utils.NiceSort(group_instances)
1618

    
1619
    self.my_node_info = dict((name, self.all_node_info[name])
1620
                             for name in self.my_node_names)
1621

    
1622
    self.my_inst_info = dict((name, self.all_inst_info[name])
1623
                             for name in self.my_inst_names)
1624

    
1625
    # We detect here the nodes that will need the extra RPC calls for verifying
1626
    # split LV volumes; they should be locked.
1627
    extra_lv_nodes = set()
1628

    
1629
    for inst in self.my_inst_info.values():
1630
      if inst.disk_template in constants.DTS_INT_MIRROR:
1631
        group = self.my_node_info[inst.primary_node].group
1632
        for nname in inst.secondary_nodes:
1633
          if self.all_node_info[nname].group != group:
1634
            extra_lv_nodes.add(nname)
1635

    
1636
    unlocked_lv_nodes = \
1637
        extra_lv_nodes.difference(self.glm.list_owned(locking.LEVEL_NODE))
1638

    
1639
    if unlocked_lv_nodes:
1640
      raise errors.OpPrereqError("these nodes could be locked: %s" %
1641
                                 utils.CommaJoin(unlocked_lv_nodes))
1642
    self.extra_lv_nodes = list(extra_lv_nodes)
1643

    
1644
  def _VerifyNode(self, ninfo, nresult):
1645
    """Perform some basic validation on data returned from a node.
1646

1647
      - check the result data structure is well formed and has all the
1648
        mandatory fields
1649
      - check ganeti version
1650

1651
    @type ninfo: L{objects.Node}
1652
    @param ninfo: the node to check
1653
    @param nresult: the results from the node
1654
    @rtype: boolean
1655
    @return: whether overall this call was successful (and we can expect
1656
         reasonable values in the respose)
1657

1658
    """
1659
    node = ninfo.name
1660
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1661

    
1662
    # main result, nresult should be a non-empty dict
1663
    test = not nresult or not isinstance(nresult, dict)
1664
    _ErrorIf(test, self.ENODERPC, node,
1665
                  "unable to verify node: no data returned")
1666
    if test:
1667
      return False
1668

    
1669
    # compares ganeti version
1670
    local_version = constants.PROTOCOL_VERSION
1671
    remote_version = nresult.get("version", None)
1672
    test = not (remote_version and
1673
                isinstance(remote_version, (list, tuple)) and
1674
                len(remote_version) == 2)
1675
    _ErrorIf(test, self.ENODERPC, node,
1676
             "connection to node returned invalid data")
1677
    if test:
1678
      return False
1679

    
1680
    test = local_version != remote_version[0]
1681
    _ErrorIf(test, self.ENODEVERSION, node,
1682
             "incompatible protocol versions: master %s,"
1683
             " node %s", local_version, remote_version[0])
1684
    if test:
1685
      return False
1686

    
1687
    # node seems compatible, we can actually try to look into its results
1688

    
1689
    # full package version
1690
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1691
                  self.ENODEVERSION, node,
1692
                  "software version mismatch: master %s, node %s",
1693
                  constants.RELEASE_VERSION, remote_version[1],
1694
                  code=self.ETYPE_WARNING)
1695

    
1696
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1697
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1698
      for hv_name, hv_result in hyp_result.iteritems():
1699
        test = hv_result is not None
1700
        _ErrorIf(test, self.ENODEHV, node,
1701
                 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1702

    
1703
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1704
    if ninfo.vm_capable and isinstance(hvp_result, list):
1705
      for item, hv_name, hv_result in hvp_result:
1706
        _ErrorIf(True, self.ENODEHV, node,
1707
                 "hypervisor %s parameter verify failure (source %s): %s",
1708
                 hv_name, item, hv_result)
1709

    
1710
    test = nresult.get(constants.NV_NODESETUP,
1711
                       ["Missing NODESETUP results"])
1712
    _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s",
1713
             "; ".join(test))
1714

    
1715
    return True
1716

    
1717
  def _VerifyNodeTime(self, ninfo, nresult,
1718
                      nvinfo_starttime, nvinfo_endtime):
1719
    """Check the node time.
1720

1721
    @type ninfo: L{objects.Node}
1722
    @param ninfo: the node to check
1723
    @param nresult: the remote results for the node
1724
    @param nvinfo_starttime: the start time of the RPC call
1725
    @param nvinfo_endtime: the end time of the RPC call
1726

1727
    """
1728
    node = ninfo.name
1729
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1730

    
1731
    ntime = nresult.get(constants.NV_TIME, None)
1732
    try:
1733
      ntime_merged = utils.MergeTime(ntime)
1734
    except (ValueError, TypeError):
1735
      _ErrorIf(True, self.ENODETIME, node, "Node returned invalid time")
1736
      return
1737

    
1738
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1739
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1740
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1741
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1742
    else:
1743
      ntime_diff = None
1744

    
1745
    _ErrorIf(ntime_diff is not None, self.ENODETIME, node,
1746
             "Node time diverges by at least %s from master node time",
1747
             ntime_diff)
1748

    
1749
  def _VerifyNodeLVM(self, ninfo, nresult, vg_name):
1750
    """Check the node LVM results.
1751

1752
    @type ninfo: L{objects.Node}
1753
    @param ninfo: the node to check
1754
    @param nresult: the remote results for the node
1755
    @param vg_name: the configured VG name
1756

1757
    """
1758
    if vg_name is None:
1759
      return
1760

    
1761
    node = ninfo.name
1762
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1763

    
1764
    # checks vg existence and size > 20G
1765
    vglist = nresult.get(constants.NV_VGLIST, None)
1766
    test = not vglist
1767
    _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups")
1768
    if not test:
1769
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1770
                                            constants.MIN_VG_SIZE)
1771
      _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus)
1772

    
1773
    # check pv names
1774
    pvlist = nresult.get(constants.NV_PVLIST, None)
1775
    test = pvlist is None
1776
    _ErrorIf(test, self.ENODELVM, node, "Can't get PV list from node")
1777
    if not test:
1778
      # check that ':' is not present in PV names, since it's a
1779
      # special character for lvcreate (denotes the range of PEs to
1780
      # use on the PV)
1781
      for _, pvname, owner_vg in pvlist:
1782
        test = ":" in pvname
1783
        _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV"
1784
                 " '%s' of VG '%s'", pvname, owner_vg)
1785

    
1786
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1787
    """Check the node bridges.
1788

1789
    @type ninfo: L{objects.Node}
1790
    @param ninfo: the node to check
1791
    @param nresult: the remote results for the node
1792
    @param bridges: the expected list of bridges
1793

1794
    """
1795
    if not bridges:
1796
      return
1797

    
1798
    node = ninfo.name
1799
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1800

    
1801
    missing = nresult.get(constants.NV_BRIDGES, None)
1802
    test = not isinstance(missing, list)
1803
    _ErrorIf(test, self.ENODENET, node,
1804
             "did not return valid bridge information")
1805
    if not test:
1806
      _ErrorIf(bool(missing), self.ENODENET, node, "missing bridges: %s" %
1807
               utils.CommaJoin(sorted(missing)))
1808

    
1809
  def _VerifyNodeNetwork(self, ninfo, nresult):
1810
    """Check the node network connectivity results.
1811

1812
    @type ninfo: L{objects.Node}
1813
    @param ninfo: the node to check
1814
    @param nresult: the remote results for the node
1815

1816
    """
1817
    node = ninfo.name
1818
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1819

    
1820
    test = constants.NV_NODELIST not in nresult
1821
    _ErrorIf(test, self.ENODESSH, node,
1822
             "node hasn't returned node ssh connectivity data")
1823
    if not test:
1824
      if nresult[constants.NV_NODELIST]:
1825
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1826
          _ErrorIf(True, self.ENODESSH, node,
1827
                   "ssh communication with node '%s': %s", a_node, a_msg)
1828

    
1829
    test = constants.NV_NODENETTEST not in nresult
1830
    _ErrorIf(test, self.ENODENET, node,
1831
             "node hasn't returned node tcp connectivity data")
1832
    if not test:
1833
      if nresult[constants.NV_NODENETTEST]:
1834
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1835
        for anode in nlist:
1836
          _ErrorIf(True, self.ENODENET, node,
1837
                   "tcp communication with node '%s': %s",
1838
                   anode, nresult[constants.NV_NODENETTEST][anode])
1839

    
1840
    test = constants.NV_MASTERIP not in nresult
1841
    _ErrorIf(test, self.ENODENET, node,
1842
             "node hasn't returned node master IP reachability data")
1843
    if not test:
1844
      if not nresult[constants.NV_MASTERIP]:
1845
        if node == self.master_node:
1846
          msg = "the master node cannot reach the master IP (not configured?)"
1847
        else:
1848
          msg = "cannot reach the master IP"
1849
        _ErrorIf(True, self.ENODENET, node, msg)
1850

    
1851
  def _VerifyInstance(self, instance, instanceconfig, node_image,
1852
                      diskstatus):
1853
    """Verify an instance.
1854

1855
    This function checks to see if the required block devices are
1856
    available on the instance's node.
1857

1858
    """
1859
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1860
    node_current = instanceconfig.primary_node
1861

    
1862
    node_vol_should = {}
1863
    instanceconfig.MapLVsByNode(node_vol_should)
1864

    
1865
    for node in node_vol_should:
1866
      n_img = node_image[node]
1867
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1868
        # ignore missing volumes on offline or broken nodes
1869
        continue
1870
      for volume in node_vol_should[node]:
1871
        test = volume not in n_img.volumes
1872
        _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance,
1873
                 "volume %s missing on node %s", volume, node)
1874

    
1875
    if instanceconfig.admin_up:
1876
      pri_img = node_image[node_current]
1877
      test = instance not in pri_img.instances and not pri_img.offline
1878
      _ErrorIf(test, self.EINSTANCEDOWN, instance,
1879
               "instance not running on its primary node %s",
1880
               node_current)
1881

    
1882
    diskdata = [(nname, success, status, idx)
1883
                for (nname, disks) in diskstatus.items()
1884
                for idx, (success, status) in enumerate(disks)]
1885

    
1886
    for nname, success, bdev_status, idx in diskdata:
1887
      # the 'ghost node' construction in Exec() ensures that we have a
1888
      # node here
1889
      snode = node_image[nname]
1890
      bad_snode = snode.ghost or snode.offline
1891
      _ErrorIf(instanceconfig.admin_up and not success and not bad_snode,
1892
               self.EINSTANCEFAULTYDISK, instance,
1893
               "couldn't retrieve status for disk/%s on %s: %s",
1894
               idx, nname, bdev_status)
1895
      _ErrorIf((instanceconfig.admin_up and success and
1896
                bdev_status.ldisk_status == constants.LDS_FAULTY),
1897
               self.EINSTANCEFAULTYDISK, instance,
1898
               "disk/%s on %s is faulty", idx, nname)
1899

    
1900
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
1901
    """Verify if there are any unknown volumes in the cluster.
1902

1903
    The .os, .swap and backup volumes are ignored. All other volumes are
1904
    reported as unknown.
1905

1906
    @type reserved: L{ganeti.utils.FieldSet}
1907
    @param reserved: a FieldSet of reserved volume names
1908

1909
    """
1910
    for node, n_img in node_image.items():
1911
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1912
        # skip non-healthy nodes
1913
        continue
1914
      for volume in n_img.volumes:
1915
        test = ((node not in node_vol_should or
1916
                volume not in node_vol_should[node]) and
1917
                not reserved.Matches(volume))
1918
        self._ErrorIf(test, self.ENODEORPHANLV, node,
1919
                      "volume %s is unknown", volume)
1920

    
1921
  def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
1922
    """Verify N+1 Memory Resilience.
1923

1924
    Check that if one single node dies we can still start all the
1925
    instances it was primary for.
1926

1927
    """
1928
    cluster_info = self.cfg.GetClusterInfo()
1929
    for node, n_img in node_image.items():
1930
      # This code checks that every node which is now listed as
1931
      # secondary has enough memory to host all instances it is
1932
      # supposed to should a single other node in the cluster fail.
1933
      # FIXME: not ready for failover to an arbitrary node
1934
      # FIXME: does not support file-backed instances
1935
      # WARNING: we currently take into account down instances as well
1936
      # as up ones, considering that even if they're down someone
1937
      # might want to start them even in the event of a node failure.
1938
      if n_img.offline:
1939
        # we're skipping offline nodes from the N+1 warning, since
1940
        # most likely we don't have good memory infromation from them;
1941
        # we already list instances living on such nodes, and that's
1942
        # enough warning
1943
        continue
1944
      for prinode, instances in n_img.sbp.items():
1945
        needed_mem = 0
1946
        for instance in instances:
1947
          bep = cluster_info.FillBE(instance_cfg[instance])
1948
          if bep[constants.BE_AUTO_BALANCE]:
1949
            needed_mem += bep[constants.BE_MEMORY]
1950
        test = n_img.mfree < needed_mem
1951
        self._ErrorIf(test, self.ENODEN1, node,
1952
                      "not enough memory to accomodate instance failovers"
1953
                      " should node %s fail (%dMiB needed, %dMiB available)",
1954
                      prinode, needed_mem, n_img.mfree)
1955

    
1956
  @classmethod
1957
  def _VerifyFiles(cls, errorif, nodeinfo, master_node, all_nvinfo,
1958
                   (files_all, files_all_opt, files_mc, files_vm)):
1959
    """Verifies file checksums collected from all nodes.
1960

1961
    @param errorif: Callback for reporting errors
1962
    @param nodeinfo: List of L{objects.Node} objects
1963
    @param master_node: Name of master node
1964
    @param all_nvinfo: RPC results
1965

1966
    """
1967
    node_names = frozenset(node.name for node in nodeinfo)
1968

    
1969
    assert master_node in node_names
1970
    assert (len(files_all | files_all_opt | files_mc | files_vm) ==
1971
            sum(map(len, [files_all, files_all_opt, files_mc, files_vm]))), \
1972
           "Found file listed in more than one file list"
1973

    
1974
    # Define functions determining which nodes to consider for a file
1975
    file2nodefn = dict([(filename, fn)
1976
      for (files, fn) in [(files_all, None),
1977
                          (files_all_opt, None),
1978
                          (files_mc, lambda node: (node.master_candidate or
1979
                                                   node.name == master_node)),
1980
                          (files_vm, lambda node: node.vm_capable)]
1981
      for filename in files])
1982

    
1983
    fileinfo = dict((filename, {}) for filename in file2nodefn.keys())
1984

    
1985
    for node in nodeinfo:
1986
      nresult = all_nvinfo[node.name]
1987

    
1988
      if nresult.fail_msg or not nresult.payload:
1989
        node_files = None
1990
      else:
1991
        node_files = nresult.payload.get(constants.NV_FILELIST, None)
1992

    
1993
      test = not (node_files and isinstance(node_files, dict))
1994
      errorif(test, cls.ENODEFILECHECK, node.name,
1995
              "Node did not return file checksum data")
1996
      if test:
1997
        continue
1998

    
1999
      for (filename, checksum) in node_files.items():
2000
        # Check if the file should be considered for a node
2001
        fn = file2nodefn[filename]
2002
        if fn is None or fn(node):
2003
          fileinfo[filename].setdefault(checksum, set()).add(node.name)
2004

    
2005
    for (filename, checksums) in fileinfo.items():
2006
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2007

    
2008
      # Nodes having the file
2009
      with_file = frozenset(node_name
2010
                            for nodes in fileinfo[filename].values()
2011
                            for node_name in nodes)
2012

    
2013
      # Nodes missing file
2014
      missing_file = node_names - with_file
2015

    
2016
      if filename in files_all_opt:
2017
        # All or no nodes
2018
        errorif(missing_file and missing_file != node_names,
2019
                cls.ECLUSTERFILECHECK, None,
2020
                "File %s is optional, but it must exist on all or no"
2021
                " nodes (not found on %s)",
2022
                filename, utils.CommaJoin(utils.NiceSort(missing_file)))
2023
      else:
2024
        errorif(missing_file, cls.ECLUSTERFILECHECK, None,
2025
                "File %s is missing from node(s) %s", filename,
2026
                utils.CommaJoin(utils.NiceSort(missing_file)))
2027

    
2028
      # See if there are multiple versions of the file
2029
      test = len(checksums) > 1
2030
      if test:
2031
        variants = ["variant %s on %s" %
2032
                    (idx + 1, utils.CommaJoin(utils.NiceSort(nodes)))
2033
                    for (idx, (checksum, nodes)) in
2034
                      enumerate(sorted(checksums.items()))]
2035
      else:
2036
        variants = []
2037

    
2038
      errorif(test, cls.ECLUSTERFILECHECK, None,
2039
              "File %s found with %s different checksums (%s)",
2040
              filename, len(checksums), "; ".join(variants))
2041

    
2042
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2043
                      drbd_map):
2044
    """Verifies and the node DRBD status.
2045

2046
    @type ninfo: L{objects.Node}
2047
    @param ninfo: the node to check
2048
    @param nresult: the remote results for the node
2049
    @param instanceinfo: the dict of instances
2050
    @param drbd_helper: the configured DRBD usermode helper
2051
    @param drbd_map: the DRBD map as returned by
2052
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2053

2054
    """
2055
    node = ninfo.name
2056
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2057

    
2058
    if drbd_helper:
2059
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2060
      test = (helper_result == None)
2061
      _ErrorIf(test, self.ENODEDRBDHELPER, node,
2062
               "no drbd usermode helper returned")
2063
      if helper_result:
2064
        status, payload = helper_result
2065
        test = not status
2066
        _ErrorIf(test, self.ENODEDRBDHELPER, node,
2067
                 "drbd usermode helper check unsuccessful: %s", payload)
2068
        test = status and (payload != drbd_helper)
2069
        _ErrorIf(test, self.ENODEDRBDHELPER, node,
2070
                 "wrong drbd usermode helper: %s", payload)
2071

    
2072
    # compute the DRBD minors
2073
    node_drbd = {}
2074
    for minor, instance in drbd_map[node].items():
2075
      test = instance not in instanceinfo
2076
      _ErrorIf(test, self.ECLUSTERCFG, None,
2077
               "ghost instance '%s' in temporary DRBD map", instance)
2078
        # ghost instance should not be running, but otherwise we
2079
        # don't give double warnings (both ghost instance and
2080
        # unallocated minor in use)
2081
      if test:
2082
        node_drbd[minor] = (instance, False)
2083
      else:
2084
        instance = instanceinfo[instance]
2085
        node_drbd[minor] = (instance.name, instance.admin_up)
2086

    
2087
    # and now check them
2088
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2089
    test = not isinstance(used_minors, (tuple, list))
2090
    _ErrorIf(test, self.ENODEDRBD, node,
2091
             "cannot parse drbd status file: %s", str(used_minors))
2092
    if test:
2093
      # we cannot check drbd status
2094
      return
2095

    
2096
    for minor, (iname, must_exist) in node_drbd.items():
2097
      test = minor not in used_minors and must_exist
2098
      _ErrorIf(test, self.ENODEDRBD, node,
2099
               "drbd minor %d of instance %s is not active", minor, iname)
2100
    for minor in used_minors:
2101
      test = minor not in node_drbd
2102
      _ErrorIf(test, self.ENODEDRBD, node,
2103
               "unallocated drbd minor %d is in use", minor)
2104

    
2105
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2106
    """Builds the node OS structures.
2107

2108
    @type ninfo: L{objects.Node}
2109
    @param ninfo: the node to check
2110
    @param nresult: the remote results for the node
2111
    @param nimg: the node image object
2112

2113
    """
2114
    node = ninfo.name
2115
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2116

    
2117
    remote_os = nresult.get(constants.NV_OSLIST, None)
2118
    test = (not isinstance(remote_os, list) or
2119
            not compat.all(isinstance(v, list) and len(v) == 7
2120
                           for v in remote_os))
2121

    
2122
    _ErrorIf(test, self.ENODEOS, node,
2123
             "node hasn't returned valid OS data")
2124

    
2125
    nimg.os_fail = test
2126

    
2127
    if test:
2128
      return
2129

    
2130
    os_dict = {}
2131

    
2132
    for (name, os_path, status, diagnose,
2133
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2134

    
2135
      if name not in os_dict:
2136
        os_dict[name] = []
2137

    
2138
      # parameters is a list of lists instead of list of tuples due to
2139
      # JSON lacking a real tuple type, fix it:
2140
      parameters = [tuple(v) for v in parameters]
2141
      os_dict[name].append((os_path, status, diagnose,
2142
                            set(variants), set(parameters), set(api_ver)))
2143

    
2144
    nimg.oslist = os_dict
2145

    
2146
  def _VerifyNodeOS(self, ninfo, nimg, base):
2147
    """Verifies the node OS list.
2148

2149
    @type ninfo: L{objects.Node}
2150
    @param ninfo: the node to check
2151
    @param nimg: the node image object
2152
    @param base: the 'template' node we match against (e.g. from the master)
2153

2154
    """
2155
    node = ninfo.name
2156
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2157

    
2158
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2159

    
2160
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2161
    for os_name, os_data in nimg.oslist.items():
2162
      assert os_data, "Empty OS status for OS %s?!" % os_name
2163
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2164
      _ErrorIf(not f_status, self.ENODEOS, node,
2165
               "Invalid OS %s (located at %s): %s", os_name, f_path, f_diag)
2166
      _ErrorIf(len(os_data) > 1, self.ENODEOS, node,
2167
               "OS '%s' has multiple entries (first one shadows the rest): %s",
2168
               os_name, utils.CommaJoin([v[0] for v in os_data]))
2169
      # this will catched in backend too
2170
      _ErrorIf(compat.any(v >= constants.OS_API_V15 for v in f_api)
2171
               and not f_var, self.ENODEOS, node,
2172
               "OS %s with API at least %d does not declare any variant",
2173
               os_name, constants.OS_API_V15)
2174
      # comparisons with the 'base' image
2175
      test = os_name not in base.oslist
2176
      _ErrorIf(test, self.ENODEOS, node,
2177
               "Extra OS %s not present on reference node (%s)",
2178
               os_name, base.name)
2179
      if test:
2180
        continue
2181
      assert base.oslist[os_name], "Base node has empty OS status?"
2182
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2183
      if not b_status:
2184
        # base OS is invalid, skipping
2185
        continue
2186
      for kind, a, b in [("API version", f_api, b_api),
2187
                         ("variants list", f_var, b_var),
2188
                         ("parameters", beautify_params(f_param),
2189
                          beautify_params(b_param))]:
2190
        _ErrorIf(a != b, self.ENODEOS, node,
2191
                 "OS %s for %s differs from reference node %s: [%s] vs. [%s]",
2192
                 kind, os_name, base.name,
2193
                 utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2194

    
2195
    # check any missing OSes
2196
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2197
    _ErrorIf(missing, self.ENODEOS, node,
2198
             "OSes present on reference node %s but missing on this node: %s",
2199
             base.name, utils.CommaJoin(missing))
2200

    
2201
  def _VerifyOob(self, ninfo, nresult):
2202
    """Verifies out of band functionality of a node.
2203

2204
    @type ninfo: L{objects.Node}
2205
    @param ninfo: the node to check
2206
    @param nresult: the remote results for the node
2207

2208
    """
2209
    node = ninfo.name
2210
    # We just have to verify the paths on master and/or master candidates
2211
    # as the oob helper is invoked on the master
2212
    if ((ninfo.master_candidate or ninfo.master_capable) and
2213
        constants.NV_OOB_PATHS in nresult):
2214
      for path_result in nresult[constants.NV_OOB_PATHS]:
2215
        self._ErrorIf(path_result, self.ENODEOOBPATH, node, path_result)
2216

    
2217
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2218
    """Verifies and updates the node volume data.
2219

2220
    This function will update a L{NodeImage}'s internal structures
2221
    with data from the remote call.
2222

2223
    @type ninfo: L{objects.Node}
2224
    @param ninfo: the node to check
2225
    @param nresult: the remote results for the node
2226
    @param nimg: the node image object
2227
    @param vg_name: the configured VG name
2228

2229
    """
2230
    node = ninfo.name
2231
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2232

    
2233
    nimg.lvm_fail = True
2234
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2235
    if vg_name is None:
2236
      pass
2237
    elif isinstance(lvdata, basestring):
2238
      _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s",
2239
               utils.SafeEncode(lvdata))
2240
    elif not isinstance(lvdata, dict):
2241
      _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)")
2242
    else:
2243
      nimg.volumes = lvdata
2244
      nimg.lvm_fail = False
2245

    
2246
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2247
    """Verifies and updates the node instance list.
2248

2249
    If the listing was successful, then updates this node's instance
2250
    list. Otherwise, it marks the RPC call as failed for the instance
2251
    list key.
2252

2253
    @type ninfo: L{objects.Node}
2254
    @param ninfo: the node to check
2255
    @param nresult: the remote results for the node
2256
    @param nimg: the node image object
2257

2258
    """
2259
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2260
    test = not isinstance(idata, list)
2261
    self._ErrorIf(test, self.ENODEHV, ninfo.name, "rpc call to node failed"
2262
                  " (instancelist): %s", utils.SafeEncode(str(idata)))
2263
    if test:
2264
      nimg.hyp_fail = True
2265
    else:
2266
      nimg.instances = idata
2267

    
2268
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2269
    """Verifies and computes a node information map
2270

2271
    @type ninfo: L{objects.Node}
2272
    @param ninfo: the node to check
2273
    @param nresult: the remote results for the node
2274
    @param nimg: the node image object
2275
    @param vg_name: the configured VG name
2276

2277
    """
2278
    node = ninfo.name
2279
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2280

    
2281
    # try to read free memory (from the hypervisor)
2282
    hv_info = nresult.get(constants.NV_HVINFO, None)
2283
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2284
    _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)")
2285
    if not test:
2286
      try:
2287
        nimg.mfree = int(hv_info["memory_free"])
2288
      except (ValueError, TypeError):
2289
        _ErrorIf(True, self.ENODERPC, node,
2290
                 "node returned invalid nodeinfo, check hypervisor")
2291

    
2292
    # FIXME: devise a free space model for file based instances as well
2293
    if vg_name is not None:
2294
      test = (constants.NV_VGLIST not in nresult or
2295
              vg_name not in nresult[constants.NV_VGLIST])
2296
      _ErrorIf(test, self.ENODELVM, node,
2297
               "node didn't return data for the volume group '%s'"
2298
               " - it is either missing or broken", vg_name)
2299
      if not test:
2300
        try:
2301
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2302
        except (ValueError, TypeError):
2303
          _ErrorIf(True, self.ENODERPC, node,
2304
                   "node returned invalid LVM info, check LVM status")
2305

    
2306
  def _CollectDiskInfo(self, nodelist, node_image, instanceinfo):
2307
    """Gets per-disk status information for all instances.
2308

2309
    @type nodelist: list of strings
2310
    @param nodelist: Node names
2311
    @type node_image: dict of (name, L{objects.Node})
2312
    @param node_image: Node objects
2313
    @type instanceinfo: dict of (name, L{objects.Instance})
2314
    @param instanceinfo: Instance objects
2315
    @rtype: {instance: {node: [(succes, payload)]}}
2316
    @return: a dictionary of per-instance dictionaries with nodes as
2317
        keys and disk information as values; the disk information is a
2318
        list of tuples (success, payload)
2319

2320
    """
2321
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2322

    
2323
    node_disks = {}
2324
    node_disks_devonly = {}
2325
    diskless_instances = set()
2326
    diskless = constants.DT_DISKLESS
2327

    
2328
    for nname in nodelist:
2329
      node_instances = list(itertools.chain(node_image[nname].pinst,
2330
                                            node_image[nname].sinst))
2331
      diskless_instances.update(inst for inst in node_instances
2332
                                if instanceinfo[inst].disk_template == diskless)
2333
      disks = [(inst, disk)
2334
               for inst in node_instances
2335
               for disk in instanceinfo[inst].disks]
2336

    
2337
      if not disks:
2338
        # No need to collect data
2339
        continue
2340

    
2341
      node_disks[nname] = disks
2342

    
2343
      # Creating copies as SetDiskID below will modify the objects and that can
2344
      # lead to incorrect data returned from nodes
2345
      devonly = [dev.Copy() for (_, dev) in disks]
2346

    
2347
      for dev in devonly:
2348
        self.cfg.SetDiskID(dev, nname)
2349

    
2350
      node_disks_devonly[nname] = devonly
2351

    
2352
    assert len(node_disks) == len(node_disks_devonly)
2353

    
2354
    # Collect data from all nodes with disks
2355
    result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2356
                                                          node_disks_devonly)
2357

    
2358
    assert len(result) == len(node_disks)
2359

    
2360
    instdisk = {}
2361

    
2362
    for (nname, nres) in result.items():
2363
      disks = node_disks[nname]
2364

    
2365
      if nres.offline:
2366
        # No data from this node
2367
        data = len(disks) * [(False, "node offline")]
2368
      else:
2369
        msg = nres.fail_msg
2370
        _ErrorIf(msg, self.ENODERPC, nname,
2371
                 "while getting disk information: %s", msg)
2372
        if msg:
2373
          # No data from this node
2374
          data = len(disks) * [(False, msg)]
2375
        else:
2376
          data = []
2377
          for idx, i in enumerate(nres.payload):
2378
            if isinstance(i, (tuple, list)) and len(i) == 2:
2379
              data.append(i)
2380
            else:
2381
              logging.warning("Invalid result from node %s, entry %d: %s",
2382
                              nname, idx, i)
2383
              data.append((False, "Invalid result from the remote node"))
2384

    
2385
      for ((inst, _), status) in zip(disks, data):
2386
        instdisk.setdefault(inst, {}).setdefault(nname, []).append(status)
2387

    
2388
    # Add empty entries for diskless instances.
2389
    for inst in diskless_instances:
2390
      assert inst not in instdisk
2391
      instdisk[inst] = {}
2392

    
2393
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2394
                      len(nnames) <= len(instanceinfo[inst].all_nodes) and
2395
                      compat.all(isinstance(s, (tuple, list)) and
2396
                                 len(s) == 2 for s in statuses)
2397
                      for inst, nnames in instdisk.items()
2398
                      for nname, statuses in nnames.items())
2399
    assert set(instdisk) == set(instanceinfo), "instdisk consistency failure"
2400

    
2401
    return instdisk
2402

    
2403
  def BuildHooksEnv(self):
2404
    """Build hooks env.
2405

2406
    Cluster-Verify hooks just ran in the post phase and their failure makes
2407
    the output be logged in the verify output and the verification to fail.
2408

2409
    """
2410
    env = {
2411
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
2412
      }
2413

    
2414
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2415
               for node in self.my_node_info.values())
2416

    
2417
    return env
2418

    
2419
  def BuildHooksNodes(self):
2420
    """Build hooks nodes.
2421

2422
    """
2423
    return ([], self.my_node_names)
2424

    
2425
  def Exec(self, feedback_fn):
2426
    """Verify integrity of the node group, performing various test on nodes.
2427

2428
    """
2429
    # This method has too many local variables. pylint: disable-msg=R0914
2430

    
2431
    if not self.my_node_names:
2432
      # empty node group
2433
      feedback_fn("* Empty node group, skipping verification")
2434
      return True
2435

    
2436
    self.bad = False
2437
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2438
    verbose = self.op.verbose
2439
    self._feedback_fn = feedback_fn
2440

    
2441
    vg_name = self.cfg.GetVGName()
2442
    drbd_helper = self.cfg.GetDRBDHelper()
2443
    cluster = self.cfg.GetClusterInfo()
2444
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
2445
    hypervisors = cluster.enabled_hypervisors
2446
    node_data_list = [self.my_node_info[name] for name in self.my_node_names]
2447

    
2448
    i_non_redundant = [] # Non redundant instances
2449
    i_non_a_balanced = [] # Non auto-balanced instances
2450
    n_offline = 0 # Count of offline nodes
2451
    n_drained = 0 # Count of nodes being drained
2452
    node_vol_should = {}
2453

    
2454
    # FIXME: verify OS list
2455

    
2456
    # File verification
2457
    filemap = _ComputeAncillaryFiles(cluster, False)
2458

    
2459
    # do local checksums
2460
    master_node = self.master_node = self.cfg.GetMasterNode()
2461
    master_ip = self.cfg.GetMasterIP()
2462

    
2463
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_names))
2464

    
2465
    # We will make nodes contact all nodes in their group, and one node from
2466
    # every other group.
2467
    # TODO: should it be a *random* node, different every time?
2468
    online_nodes = [node.name for node in node_data_list if not node.offline]
2469
    other_group_nodes = {}
2470

    
2471
    for name in sorted(self.all_node_info):
2472
      node = self.all_node_info[name]
2473
      if (node.group not in other_group_nodes
2474
          and node.group != self.group_uuid
2475
          and not node.offline):
2476
        other_group_nodes[node.group] = node.name
2477

    
2478
    node_verify_param = {
2479
      constants.NV_FILELIST:
2480
        utils.UniqueSequence(filename
2481
                             for files in filemap
2482
                             for filename in files),
2483
      constants.NV_NODELIST: online_nodes + other_group_nodes.values(),
2484
      constants.NV_HYPERVISOR: hypervisors,
2485
      constants.NV_HVPARAMS:
2486
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2487
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2488
                                 for node in node_data_list
2489
                                 if not node.offline],
2490
      constants.NV_INSTANCELIST: hypervisors,
2491
      constants.NV_VERSION: None,
2492
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2493
      constants.NV_NODESETUP: None,
2494
      constants.NV_TIME: None,
2495
      constants.NV_MASTERIP: (master_node, master_ip),
2496
      constants.NV_OSLIST: None,
2497
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2498
      }
2499

    
2500
    if vg_name is not None:
2501
      node_verify_param[constants.NV_VGLIST] = None
2502
      node_verify_param[constants.NV_LVLIST] = vg_name
2503
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2504
      node_verify_param[constants.NV_DRBDLIST] = None
2505

    
2506
    if drbd_helper:
2507
      node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2508

    
2509
    # bridge checks
2510
    # FIXME: this needs to be changed per node-group, not cluster-wide
2511
    bridges = set()
2512
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2513
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2514
      bridges.add(default_nicpp[constants.NIC_LINK])
2515
    for instance in self.my_inst_info.values():
2516
      for nic in instance.nics:
2517
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
2518
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2519
          bridges.add(full_nic[constants.NIC_LINK])
2520

    
2521
    if bridges:
2522
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
2523

    
2524
    # Build our expected cluster state
2525
    node_image = dict((node.name, self.NodeImage(offline=node.offline,
2526
                                                 name=node.name,
2527
                                                 vm_capable=node.vm_capable))
2528
                      for node in node_data_list)
2529

    
2530
    # Gather OOB paths
2531
    oob_paths = []
2532
    for node in self.all_node_info.values():
2533
      path = _SupportsOob(self.cfg, node)
2534
      if path and path not in oob_paths:
2535
        oob_paths.append(path)
2536

    
2537
    if oob_paths:
2538
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2539

    
2540
    for instance in self.my_inst_names:
2541
      inst_config = self.my_inst_info[instance]
2542

    
2543
      for nname in inst_config.all_nodes:
2544
        if nname not in node_image:
2545
          gnode = self.NodeImage(name=nname)
2546
          gnode.ghost = (nname not in self.all_node_info)
2547
          node_image[nname] = gnode
2548

    
2549
      inst_config.MapLVsByNode(node_vol_should)
2550

    
2551
      pnode = inst_config.primary_node
2552
      node_image[pnode].pinst.append(instance)
2553

    
2554
      for snode in inst_config.secondary_nodes:
2555
        nimg = node_image[snode]
2556
        nimg.sinst.append(instance)
2557
        if pnode not in nimg.sbp:
2558
          nimg.sbp[pnode] = []
2559
        nimg.sbp[pnode].append(instance)
2560

    
2561
    # At this point, we have the in-memory data structures complete,
2562
    # except for the runtime information, which we'll gather next
2563

    
2564
    # Due to the way our RPC system works, exact response times cannot be
2565
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2566
    # time before and after executing the request, we can at least have a time
2567
    # window.
2568
    nvinfo_starttime = time.time()
2569
    all_nvinfo = self.rpc.call_node_verify(self.my_node_names,
2570
                                           node_verify_param,
2571
                                           self.cfg.GetClusterName())
2572
    nvinfo_endtime = time.time()
2573

    
2574
    if self.extra_lv_nodes and vg_name is not None:
2575
      extra_lv_nvinfo = \
2576
          self.rpc.call_node_verify(self.extra_lv_nodes,
2577
                                    {constants.NV_LVLIST: vg_name},
2578
                                    self.cfg.GetClusterName())
2579
    else:
2580
      extra_lv_nvinfo = {}
2581

    
2582
    all_drbd_map = self.cfg.ComputeDRBDMap()
2583

    
2584
    feedback_fn("* Gathering disk information (%s nodes)" %
2585
                len(self.my_node_names))
2586
    instdisk = self._CollectDiskInfo(self.my_node_names, node_image,
2587
                                     self.my_inst_info)
2588

    
2589
    feedback_fn("* Verifying configuration file consistency")
2590

    
2591
    # If not all nodes are being checked, we need to make sure the master node
2592
    # and a non-checked vm_capable node are in the list.
2593
    absent_nodes = set(self.all_node_info).difference(self.my_node_info)
2594
    if absent_nodes:
2595
      vf_nvinfo = all_nvinfo.copy()
2596
      vf_node_info = list(self.my_node_info.values())
2597
      additional_nodes = []
2598
      if master_node not in self.my_node_info:
2599
        additional_nodes.append(master_node)
2600
        vf_node_info.append(self.all_node_info[master_node])
2601
      # Add the first vm_capable node we find which is not included
2602
      for node in absent_nodes:
2603
        nodeinfo = self.all_node_info[node]
2604
        if nodeinfo.vm_capable and not nodeinfo.offline:
2605
          additional_nodes.append(node)
2606
          vf_node_info.append(self.all_node_info[node])
2607
          break
2608
      key = constants.NV_FILELIST
2609
      vf_nvinfo.update(self.rpc.call_node_verify(additional_nodes,
2610
                                                 {key: node_verify_param[key]},
2611
                                                 self.cfg.GetClusterName()))
2612
    else:
2613
      vf_nvinfo = all_nvinfo
2614
      vf_node_info = self.my_node_info.values()
2615

    
2616
    self._VerifyFiles(_ErrorIf, vf_node_info, master_node, vf_nvinfo, filemap)
2617

    
2618
    feedback_fn("* Verifying node status")
2619

    
2620
    refos_img = None
2621

    
2622
    for node_i in node_data_list:
2623
      node = node_i.name
2624
      nimg = node_image[node]
2625

    
2626
      if node_i.offline:
2627
        if verbose:
2628
          feedback_fn("* Skipping offline node %s" % (node,))
2629
        n_offline += 1
2630
        continue
2631

    
2632
      if node == master_node:
2633
        ntype = "master"
2634
      elif node_i.master_candidate:
2635
        ntype = "master candidate"
2636
      elif node_i.drained:
2637
        ntype = "drained"
2638
        n_drained += 1
2639
      else:
2640
        ntype = "regular"
2641
      if verbose:
2642
        feedback_fn("* Verifying node %s (%s)" % (node, ntype))
2643

    
2644
      msg = all_nvinfo[node].fail_msg
2645
      _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg)
2646
      if msg:
2647
        nimg.rpc_fail = True
2648
        continue
2649

    
2650
      nresult = all_nvinfo[node].payload
2651

    
2652
      nimg.call_ok = self._VerifyNode(node_i, nresult)
2653
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2654
      self._VerifyNodeNetwork(node_i, nresult)
2655
      self._VerifyOob(node_i, nresult)
2656

    
2657
      if nimg.vm_capable:
2658
        self._VerifyNodeLVM(node_i, nresult, vg_name)
2659
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
2660
                             all_drbd_map)
2661

    
2662
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2663
        self._UpdateNodeInstances(node_i, nresult, nimg)
2664
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2665
        self._UpdateNodeOS(node_i, nresult, nimg)
2666

    
2667
        if not nimg.os_fail:
2668
          if refos_img is None:
2669
            refos_img = nimg
2670
          self._VerifyNodeOS(node_i, nimg, refos_img)
2671
        self._VerifyNodeBridges(node_i, nresult, bridges)
2672

    
2673
        # Check whether all running instancies are primary for the node. (This
2674
        # can no longer be done from _VerifyInstance below, since some of the
2675
        # wrong instances could be from other node groups.)
2676
        non_primary_inst = set(nimg.instances).difference(nimg.pinst)
2677

    
2678
        for inst in non_primary_inst:
2679
          test = inst in self.all_inst_info
2680
          _ErrorIf(test, self.EINSTANCEWRONGNODE, inst,
2681
                   "instance should not run on node %s", node_i.name)
2682
          _ErrorIf(not test, self.ENODEORPHANINSTANCE, node_i.name,
2683
                   "node is running unknown instance %s", inst)
2684

    
2685
    for node, result in extra_lv_nvinfo.items():
2686
      self._UpdateNodeVolumes(self.all_node_info[node], result.payload,
2687
                              node_image[node], vg_name)
2688

    
2689
    feedback_fn("* Verifying instance status")
2690
    for instance in self.my_inst_names:
2691
      if verbose:
2692
        feedback_fn("* Verifying instance %s" % instance)
2693
      inst_config = self.my_inst_info[instance]
2694
      self._VerifyInstance(instance, inst_config, node_image,
2695
                           instdisk[instance])
2696
      inst_nodes_offline = []
2697

    
2698
      pnode = inst_config.primary_node
2699
      pnode_img = node_image[pnode]
2700
      _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2701
               self.ENODERPC, pnode, "instance %s, connection to"
2702
               " primary node failed", instance)
2703

    
2704
      _ErrorIf(inst_config.admin_up and pnode_img.offline,
2705
               self.EINSTANCEBADNODE, instance,
2706
               "instance is marked as running and lives on offline node %s",
2707
               inst_config.primary_node)
2708

    
2709
      # If the instance is non-redundant we cannot survive losing its primary
2710
      # node, so we are not N+1 compliant. On the other hand we have no disk
2711
      # templates with more than one secondary so that situation is not well
2712
      # supported either.
2713
      # FIXME: does not support file-backed instances
2714
      if not inst_config.secondary_nodes:
2715
        i_non_redundant.append(instance)
2716

    
2717
      _ErrorIf(len(inst_config.secondary_nodes) > 1, self.EINSTANCELAYOUT,
2718
               instance, "instance has multiple secondary nodes: %s",
2719
               utils.CommaJoin(inst_config.secondary_nodes),
2720
               code=self.ETYPE_WARNING)
2721

    
2722
      if inst_config.disk_template in constants.DTS_INT_MIRROR:
2723
        pnode = inst_config.primary_node
2724
        instance_nodes = utils.NiceSort(inst_config.all_nodes)
2725
        instance_groups = {}
2726

    
2727
        for node in instance_nodes:
2728
          instance_groups.setdefault(self.all_node_info[node].group,
2729
                                     []).append(node)
2730

    
2731
        pretty_list = [
2732
          "%s (group %s)" % (utils.CommaJoin(nodes), groupinfo[group].name)
2733
          # Sort so that we always list the primary node first.
2734
          for group, nodes in sorted(instance_groups.items(),
2735
                                     key=lambda (_, nodes): pnode in nodes,
2736
                                     reverse=True)]
2737

    
2738
        self._ErrorIf(len(instance_groups) > 1, self.EINSTANCESPLITGROUPS,
2739
                      instance, "instance has primary and secondary nodes in"
2740
                      " different groups: %s", utils.CommaJoin(pretty_list),
2741
                      code=self.ETYPE_WARNING)
2742

    
2743
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
2744
        i_non_a_balanced.append(instance)
2745

    
2746
      for snode in inst_config.secondary_nodes:
2747
        s_img = node_image[snode]
2748
        _ErrorIf(s_img.rpc_fail and not s_img.offline, self.ENODERPC, snode,
2749
                 "instance %s, connection to secondary node failed", instance)
2750

    
2751
        if s_img.offline:
2752
          inst_nodes_offline.append(snode)
2753

    
2754
      # warn that the instance lives on offline nodes
2755
      _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance,
2756
               "instance has offline secondary node(s) %s",
2757
               utils.CommaJoin(inst_nodes_offline))
2758
      # ... or ghost/non-vm_capable nodes
2759
      for node in inst_config.all_nodes:
2760
        _ErrorIf(node_image[node].ghost, self.EINSTANCEBADNODE, instance,
2761
                 "instance lives on ghost node %s", node)
2762
        _ErrorIf(not node_image[node].vm_capable, self.EINSTANCEBADNODE,
2763
                 instance, "instance lives on non-vm_capable node %s", node)
2764

    
2765
    feedback_fn("* Verifying orphan volumes")
2766
    reserved = utils.FieldSet(*cluster.reserved_lvs)
2767

    
2768
    # We will get spurious "unknown volume" warnings if any node of this group
2769
    # is secondary for an instance whose primary is in another group. To avoid
2770
    # them, we find these instances and add their volumes to node_vol_should.
2771
    for inst in self.all_inst_info.values():
2772
      for secondary in inst.secondary_nodes:
2773
        if (secondary in self.my_node_info
2774
            and inst.name not in self.my_inst_info):
2775
          inst.MapLVsByNode(node_vol_should)
2776
          break
2777

    
2778
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
2779

    
2780
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
2781
      feedback_fn("* Verifying N+1 Memory redundancy")
2782
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
2783

    
2784
    feedback_fn("* Other Notes")
2785
    if i_non_redundant:
2786
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
2787
                  % len(i_non_redundant))
2788

    
2789
    if i_non_a_balanced:
2790
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
2791
                  % len(i_non_a_balanced))
2792

    
2793
    if n_offline:
2794
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
2795

    
2796
    if n_drained:
2797
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
2798

    
2799
    return not self.bad
2800

    
2801
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
2802
    """Analyze the post-hooks' result
2803

2804
    This method analyses the hook result, handles it, and sends some
2805
    nicely-formatted feedback back to the user.
2806

2807
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
2808
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
2809
    @param hooks_results: the results of the multi-node hooks rpc call
2810
    @param feedback_fn: function used send feedback back to the caller
2811
    @param lu_result: previous Exec result
2812
    @return: the new Exec result, based on the previous result
2813
        and hook results
2814

2815
    """
2816
    # We only really run POST phase hooks, only for non-empty groups,
2817
    # and are only interested in their results
2818
    if not self.my_node_names:
2819
      # empty node group
2820
      pass
2821
    elif phase == constants.HOOKS_PHASE_POST:
2822
      # Used to change hooks' output to proper indentation
2823
      feedback_fn("* Hooks Results")
2824
      assert hooks_results, "invalid result from hooks"
2825

    
2826
      for node_name in hooks_results:
2827
        res = hooks_results[node_name]
2828
        msg = res.fail_msg
2829
        test = msg and not res.offline
2830
        self._ErrorIf(test, self.ENODEHOOKS, node_name,
2831
                      "Communication failure in hooks execution: %s", msg)
2832
        if res.offline or msg:
2833
          # No need to investigate payload if node is offline or gave an error.
2834
          # override manually lu_result here as _ErrorIf only
2835
          # overrides self.bad
2836
          lu_result = 1
2837
          continue
2838
        for script, hkr, output in res.payload:
2839
          test = hkr == constants.HKR_FAIL
2840
          self._ErrorIf(test, self.ENODEHOOKS, node_name,
2841
                        "Script %s failed, output:", script)
2842
          if test:
2843
            output = self._HOOKS_INDENT_RE.sub('      ', output)
2844
            feedback_fn("%s" % output)
2845
            lu_result = 0
2846

    
2847
    return lu_result
2848

    
2849

    
2850
class LUClusterVerifyDisks(NoHooksLU):
2851
  """Verifies the cluster disks status.
2852

2853
  """
2854
  REQ_BGL = False
2855

    
2856
  def ExpandNames(self):
2857
    self.needed_locks = {
2858
      locking.LEVEL_NODE: locking.ALL_SET,
2859
      locking.LEVEL_INSTANCE: locking.ALL_SET,
2860
    }
2861
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
2862

    
2863
  def Exec(self, feedback_fn):
2864
    """Verify integrity of cluster disks.
2865

2866
    @rtype: tuple of three items
2867
    @return: a tuple of (dict of node-to-node_error, list of instances
2868
        which need activate-disks, dict of instance: (node, volume) for
2869
        missing volumes
2870

2871
    """
2872
    result = res_nodes, res_instances, res_missing = {}, [], {}
2873

    
2874
    nodes = utils.NiceSort(self.cfg.GetVmCapableNodeList())
2875
    instances = self.cfg.GetAllInstancesInfo().values()
2876

    
2877
    nv_dict = {}
2878
    for inst in instances:
2879
      inst_lvs = {}
2880
      if not inst.admin_up:
2881
        continue
2882
      inst.MapLVsByNode(inst_lvs)
2883
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
2884
      for node, vol_list in inst_lvs.iteritems():
2885
        for vol in vol_list:
2886
          nv_dict[(node, vol)] = inst
2887

    
2888
    if not nv_dict:
2889
      return result
2890

    
2891
    node_lvs = self.rpc.call_lv_list(nodes, [])
2892
    for node, node_res in node_lvs.items():
2893
      if node_res.offline:
2894
        continue
2895
      msg = node_res.fail_msg
2896
      if msg:
2897
        logging.warning("Error enumerating LVs on node %s: %s", node, msg)
2898
        res_nodes[node] = msg
2899
        continue
2900

    
2901
      lvs = node_res.payload
2902
      for lv_name, (_, _, lv_online) in lvs.items():
2903
        inst = nv_dict.pop((node, lv_name), None)
2904
        if (not lv_online and inst is not None
2905
            and inst.name not in res_instances):
2906
          res_instances.append(inst.name)
2907

    
2908
    # any leftover items in nv_dict are missing LVs, let's arrange the
2909
    # data better
2910
    for key, inst in nv_dict.iteritems():
2911
      if inst.name not in res_missing:
2912
        res_missing[inst.name] = []
2913
      res_missing[inst.name].append(key)
2914

    
2915
    return result
2916

    
2917

    
2918
class LUClusterRepairDiskSizes(NoHooksLU):
2919
  """Verifies the cluster disks sizes.
2920

2921
  """
2922
  REQ_BGL = False
2923

    
2924
  def ExpandNames(self):
2925
    if self.op.instances:
2926
      self.wanted_names = _GetWantedInstances(self, self.op.instances)
2927
      self.needed_locks = {
2928
        locking.LEVEL_NODE: [],
2929
        locking.LEVEL_INSTANCE: self.wanted_names,
2930
        }
2931
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2932
    else:
2933
      self.wanted_names = None
2934
      self.needed_locks = {
2935
        locking.LEVEL_NODE: locking.ALL_SET,
2936
        locking.LEVEL_INSTANCE: locking.ALL_SET,
2937
        }
2938
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
2939

    
2940
  def DeclareLocks(self, level):
2941
    if level == locking.LEVEL_NODE and self.wanted_names is not None:
2942
      self._LockInstancesNodes(primary_only=True)
2943

    
2944
  def CheckPrereq(self):
2945
    """Check prerequisites.
2946

2947
    This only checks the optional instance list against the existing names.
2948

2949
    """
2950
    if self.wanted_names is None:
2951
      self.wanted_names = self.glm.list_owned(locking.LEVEL_INSTANCE)
2952

    
2953
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
2954
                             in self.wanted_names]
2955

    
2956
  def _EnsureChildSizes(self, disk):
2957
    """Ensure children of the disk have the needed disk size.
2958

2959
    This is valid mainly for DRBD8 and fixes an issue where the
2960
    children have smaller disk size.
2961

2962
    @param disk: an L{ganeti.objects.Disk} object
2963

2964
    """
2965
    if disk.dev_type == constants.LD_DRBD8:
2966
      assert disk.children, "Empty children for DRBD8?"
2967
      fchild = disk.children[0]
2968
      mismatch = fchild.size < disk.size
2969
      if mismatch:
2970
        self.LogInfo("Child disk has size %d, parent %d, fixing",
2971
                     fchild.size, disk.size)
2972
        fchild.size = disk.size
2973

    
2974
      # and we recurse on this child only, not on the metadev
2975
      return self._EnsureChildSizes(fchild) or mismatch
2976
    else:
2977
      return False
2978

    
2979
  def Exec(self, feedback_fn):
2980
    """Verify the size of cluster disks.
2981

2982
    """
2983
    # TODO: check child disks too
2984
    # TODO: check differences in size between primary/secondary nodes
2985
    per_node_disks = {}
2986
    for instance in self.wanted_instances:
2987
      pnode = instance.primary_node
2988
      if pnode not in per_node_disks:
2989
        per_node_disks[pnode] = []
2990
      for idx, disk in enumerate(instance.disks):
2991
        per_node_disks[pnode].append((instance, idx, disk))
2992

    
2993
    changed = []
2994
    for node, dskl in per_node_disks.items():
2995
      newl = [v[2].Copy() for v in dskl]
2996
      for dsk in newl:
2997
        self.cfg.SetDiskID(dsk, node)
2998
      result = self.rpc.call_blockdev_getsize(node, newl)
2999
      if result.fail_msg:
3000
        self.LogWarning("Failure in blockdev_getsize call to node"
3001
                        " %s, ignoring", node)
3002
        continue
3003
      if len(result.payload) != len(dskl):
3004
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
3005
                        " result.payload=%s", node, len(dskl), result.payload)
3006
        self.LogWarning("Invalid result from node %s, ignoring node results",
3007
                        node)
3008
        continue
3009
      for ((instance, idx, disk), size) in zip(dskl, result.payload):
3010
        if size is None:
3011
          self.LogWarning("Disk %d of instance %s did not return size"
3012
                          " information, ignoring", idx, instance.name)
3013
          continue
3014
        if not isinstance(size, (int, long)):
3015
          self.LogWarning("Disk %d of instance %s did not return valid"
3016
                          " size information, ignoring", idx, instance.name)
3017
          continue
3018
        size = size >> 20
3019
        if size != disk.size:
3020
          self.LogInfo("Disk %d of instance %s has mismatched size,"
3021
                       " correcting: recorded %d, actual %d", idx,
3022
                       instance.name, disk.size, size)
3023
          disk.size = size
3024
          self.cfg.Update(instance, feedback_fn)
3025
          changed.append((instance.name, idx, size))
3026
        if self._EnsureChildSizes(disk):
3027
          self.cfg.Update(instance, feedback_fn)
3028
          changed.append((instance.name, idx, disk.size))
3029
    return changed
3030

    
3031

    
3032
class LUClusterRename(LogicalUnit):
3033
  """Rename the cluster.
3034

3035
  """
3036
  HPATH = "cluster-rename"
3037
  HTYPE = constants.HTYPE_CLUSTER
3038

    
3039
  def BuildHooksEnv(self):
3040
    """Build hooks env.
3041

3042
    """
3043
    return {
3044
      "OP_TARGET": self.cfg.GetClusterName(),
3045
      "NEW_NAME": self.op.name,
3046
      }
3047

    
3048
  def BuildHooksNodes(self):
3049
    """Build hooks nodes.
3050

3051
    """
3052
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
3053

    
3054
  def CheckPrereq(self):
3055
    """Verify that the passed name is a valid one.
3056

3057
    """
3058
    hostname = netutils.GetHostname(name=self.op.name,
3059
                                    family=self.cfg.GetPrimaryIPFamily())
3060

    
3061
    new_name = hostname.name
3062
    self.ip = new_ip = hostname.ip
3063
    old_name = self.cfg.GetClusterName()
3064
    old_ip = self.cfg.GetMasterIP()
3065
    if new_name == old_name and new_ip == old_ip:
3066
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
3067
                                 " cluster has changed",
3068
                                 errors.ECODE_INVAL)
3069
    if new_ip != old_ip:
3070
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
3071
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
3072
                                   " reachable on the network" %
3073
                                   new_ip, errors.ECODE_NOTUNIQUE)
3074

    
3075
    self.op.name = new_name
3076

    
3077
  def Exec(self, feedback_fn):
3078
    """Rename the cluster.
3079

3080
    """
3081
    clustername = self.op.name
3082
    ip = self.ip
3083

    
3084
    # shutdown the master IP
3085
    master = self.cfg.GetMasterNode()
3086
    result = self.rpc.call_node_stop_master(master, False)
3087
    result.Raise("Could not disable the master role")
3088

    
3089
    try:
3090
      cluster = self.cfg.GetClusterInfo()
3091
      cluster.cluster_name = clustername
3092
      cluster.master_ip = ip
3093
      self.cfg.Update(cluster, feedback_fn)
3094

    
3095
      # update the known hosts file
3096
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
3097
      node_list = self.cfg.GetOnlineNodeList()
3098
      try:
3099
        node_list.remove(master)
3100
      except ValueError:
3101
        pass
3102
      _UploadHelper(self, node_list, constants.SSH_KNOWN_HOSTS_FILE)
3103
    finally:
3104
      result = self.rpc.call_node_start_master(master, False, False)
3105
      msg = result.fail_msg
3106
      if msg:
3107
        self.LogWarning("Could not re-enable the master role on"
3108
                        " the master, please restart manually: %s", msg)
3109

    
3110
    return clustername
3111

    
3112

    
3113
class LUClusterSetParams(LogicalUnit):
3114
  """Change the parameters of the cluster.
3115

3116
  """
3117
  HPATH = "cluster-modify"
3118
  HTYPE = constants.HTYPE_CLUSTER
3119
  REQ_BGL = False
3120

    
3121
  def CheckArguments(self):
3122
    """Check parameters
3123

3124
    """
3125
    if self.op.uid_pool:
3126
      uidpool.CheckUidPool(self.op.uid_pool)
3127

    
3128
    if self.op.add_uids:
3129
      uidpool.CheckUidPool(self.op.add_uids)
3130

    
3131
    if self.op.remove_uids:
3132
      uidpool.CheckUidPool(self.op.remove_uids)
3133

    
3134
  def ExpandNames(self):
3135
    # FIXME: in the future maybe other cluster params won't require checking on
3136
    # all nodes to be modified.
3137
    self.needed_locks = {
3138
      locking.LEVEL_NODE: locking.ALL_SET,
3139
    }
3140
    self.share_locks[locking.LEVEL_NODE] = 1
3141

    
3142
  def BuildHooksEnv(self):
3143
    """Build hooks env.
3144

3145
    """
3146
    return {
3147
      "OP_TARGET": self.cfg.GetClusterName(),
3148
      "NEW_VG_NAME": self.op.vg_name,
3149
      }
3150

    
3151
  def BuildHooksNodes(self):
3152
    """Build hooks nodes.
3153

3154
    """
3155
    mn = self.cfg.GetMasterNode()
3156
    return ([mn], [mn])
3157

    
3158
  def CheckPrereq(self):
3159
    """Check prerequisites.
3160

3161
    This checks whether the given params don't conflict and
3162
    if the given volume group is valid.
3163

3164
    """
3165
    if self.op.vg_name is not None and not self.op.vg_name:
3166
      if self.cfg.HasAnyDiskOfType(constants.LD_LV):
3167
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
3168
                                   " instances exist", errors.ECODE_INVAL)
3169

    
3170
    if self.op.drbd_helper is not None and not self.op.drbd_helper:
3171
      if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
3172
        raise errors.OpPrereqError("Cannot disable drbd helper while"
3173
                                   " drbd-based instances exist",
3174
                                   errors.ECODE_INVAL)
3175

    
3176
    node_list = self.glm.list_owned(locking.LEVEL_NODE)
3177

    
3178
    # if vg_name not None, checks given volume group on all nodes
3179
    if self.op.vg_name:
3180
      vglist = self.rpc.call_vg_list(node_list)
3181
      for node in node_list:
3182
        msg = vglist[node].fail_msg
3183
        if msg:
3184
          # ignoring down node
3185
          self.LogWarning("Error while gathering data on node %s"
3186
                          " (ignoring node): %s", node, msg)
3187
          continue
3188
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
3189
                                              self.op.vg_name,
3190
                                              constants.MIN_VG_SIZE)
3191
        if vgstatus:
3192
          raise errors.OpPrereqError("Error on node '%s': %s" %
3193
                                     (node, vgstatus), errors.ECODE_ENVIRON)
3194

    
3195
    if self.op.drbd_helper:
3196
      # checks given drbd helper on all nodes
3197
      helpers = self.rpc.call_drbd_helper(node_list)
3198
      for node in node_list:
3199
        ninfo = self.cfg.GetNodeInfo(node)
3200
        if ninfo.offline:
3201
          self.LogInfo("Not checking drbd helper on offline node %s", node)
3202
          continue
3203
        msg = helpers[node].fail_msg
3204
        if msg:
3205
          raise errors.OpPrereqError("Error checking drbd helper on node"
3206
                                     " '%s': %s" % (node, msg),
3207
                                     errors.ECODE_ENVIRON)
3208
        node_helper = helpers[node].payload
3209
        if node_helper != self.op.drbd_helper:
3210
          raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
3211
                                     (node, node_helper), errors.ECODE_ENVIRON)
3212

    
3213
    self.cluster = cluster = self.cfg.GetClusterInfo()
3214
    # validate params changes
3215
    if self.op.beparams:
3216
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
3217
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
3218

    
3219
    if self.op.ndparams:
3220
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
3221
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
3222

    
3223
      # TODO: we need a more general way to handle resetting
3224
      # cluster-level parameters to default values
3225
      if self.new_ndparams["oob_program"] == "":
3226
        self.new_ndparams["oob_program"] = \
3227
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
3228

    
3229
    if self.op.nicparams:
3230
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
3231
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
3232
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
3233
      nic_errors = []
3234

    
3235
      # check all instances for consistency
3236
      for instance in self.cfg.GetAllInstancesInfo().values():
3237
        for nic_idx, nic in enumerate(instance.nics):
3238
          params_copy = copy.deepcopy(nic.nicparams)
3239
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
3240

    
3241
          # check parameter syntax
3242
          try:
3243
            objects.NIC.CheckParameterSyntax(params_filled)
3244
          except errors.ConfigurationError, err:
3245
            nic_errors.append("Instance %s, nic/%d: %s" %
3246
                              (instance.name, nic_idx, err))
3247

    
3248
          # if we're moving instances to routed, check that they have an ip
3249
          target_mode = params_filled[constants.NIC_MODE]
3250
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
3251
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
3252
                              " address" % (instance.name, nic_idx))
3253
      if nic_errors:
3254
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
3255
                                   "\n".join(nic_errors))
3256

    
3257
    # hypervisor list/parameters
3258
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
3259
    if self.op.hvparams:
3260
      for hv_name, hv_dict in self.op.hvparams.items():
3261
        if hv_name not in self.new_hvparams:
3262
          self.new_hvparams[hv_name] = hv_dict
3263
        else:
3264
          self.new_hvparams[hv_name].update(hv_dict)
3265

    
3266
    # os hypervisor parameters
3267
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
3268
    if self.op.os_hvp:
3269
      for os_name, hvs in self.op.os_hvp.items():
3270
        if os_name not in self.new_os_hvp:
3271
          self.new_os_hvp[os_name] = hvs
3272
        else:
3273
          for hv_name, hv_dict in hvs.items():
3274
            if hv_name not in self.new_os_hvp[os_name]:
3275
              self.new_os_hvp[os_name][hv_name] = hv_dict
3276
            else:
3277
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
3278

    
3279
    # os parameters
3280
    self.new_osp = objects.FillDict(cluster.osparams, {})
3281
    if self.op.osparams:
3282
      for os_name, osp in self.op.osparams.items():
3283
        if os_name not in self.new_osp:
3284
          self.new_osp[os_name] = {}
3285

    
3286
        self.new_osp[os_name] = _GetUpdatedParams(self.new_osp[os_name], osp,
3287
                                                  use_none=True)
3288

    
3289
        if not self.new_osp[os_name]:
3290
          # we removed all parameters
3291
          del self.new_osp[os_name]
3292
        else:
3293
          # check the parameter validity (remote check)
3294
          _CheckOSParams(self, False, [self.cfg.GetMasterNode()],
3295
                         os_name, self.new_osp[os_name])
3296

    
3297
    # changes to the hypervisor list
3298
    if self.op.enabled_hypervisors is not None:
3299
      self.hv_list = self.op.enabled_hypervisors
3300
      for hv in self.hv_list:
3301
        # if the hypervisor doesn't already exist in the cluster
3302
        # hvparams, we initialize it to empty, and then (in both
3303
        # cases) we make sure to fill the defaults, as we might not
3304
        # have a complete defaults list if the hypervisor wasn't
3305
        # enabled before
3306
        if hv not in new_hvp:
3307
          new_hvp[hv] = {}
3308
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
3309
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
3310
    else:
3311
      self.hv_list = cluster.enabled_hypervisors
3312

    
3313
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
3314
      # either the enabled list has changed, or the parameters have, validate
3315
      for hv_name, hv_params in self.new_hvparams.items():
3316
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
3317
            (self.op.enabled_hypervisors and
3318
             hv_name in self.op.enabled_hypervisors)):
3319
          # either this is a new hypervisor, or its parameters have changed
3320
          hv_class = hypervisor.GetHypervisor(hv_name)
3321
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
3322
          hv_class.CheckParameterSyntax(hv_params)
3323
          _CheckHVParams(self, node_list, hv_name, hv_params)
3324

    
3325
    if self.op.os_hvp:
3326
      # no need to check any newly-enabled hypervisors, since the
3327
      # defaults have already been checked in the above code-block
3328
      for os_name, os_hvp in self.new_os_hvp.items():
3329
        for hv_name, hv_params in os_hvp.items():
3330
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
3331
          # we need to fill in the new os_hvp on top of the actual hv_p
3332
          cluster_defaults = self.new_hvparams.get(hv_name, {})
3333
          new_osp = objects.FillDict(cluster_defaults, hv_params)
3334
          hv_class = hypervisor.GetHypervisor(hv_name)
3335
          hv_class.CheckParameterSyntax(new_osp)
3336
          _CheckHVParams(self, node_list, hv_name, new_osp)
3337

    
3338
    if self.op.default_iallocator:
3339
      alloc_script = utils.FindFile(self.op.default_iallocator,
3340
                                    constants.IALLOCATOR_SEARCH_PATH,
3341
                                    os.path.isfile)
3342
      if alloc_script is None:
3343
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
3344
                                   " specified" % self.op.default_iallocator,
3345
                                   errors.ECODE_INVAL)
3346

    
3347
  def Exec(self, feedback_fn):
3348
    """Change the parameters of the cluster.
3349

3350
    """
3351
    if self.op.vg_name is not None:
3352
      new_volume = self.op.vg_name
3353
      if not new_volume:
3354
        new_volume = None
3355
      if new_volume != self.cfg.GetVGName():
3356
        self.cfg.SetVGName(new_volume)
3357
      else:
3358
        feedback_fn("Cluster LVM configuration already in desired"
3359
                    " state, not changing")
3360
    if self.op.drbd_helper is not None:
3361
      new_helper = self.op.drbd_helper
3362
      if not new_helper:
3363
        new_helper = None
3364
      if new_helper != self.cfg.GetDRBDHelper():
3365
        self.cfg.SetDRBDHelper(new_helper)
3366
      else:
3367
        feedback_fn("Cluster DRBD helper already in desired state,"
3368
                    " not changing")
3369
    if self.op.hvparams:
3370
      self.cluster.hvparams = self.new_hvparams
3371
    if self.op.os_hvp:
3372
      self.cluster.os_hvp = self.new_os_hvp
3373
    if self.op.enabled_hypervisors is not None:
3374
      self.cluster.hvparams = self.new_hvparams
3375
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
3376
    if self.op.beparams:
3377
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
3378
    if self.op.nicparams:
3379
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
3380
    if self.op.osparams:
3381
      self.cluster.osparams = self.new_osp
3382
    if self.op.ndparams:
3383
      self.cluster.ndparams = self.new_ndparams
3384

    
3385
    if self.op.candidate_pool_size is not None:
3386
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
3387
      # we need to update the pool size here, otherwise the save will fail
3388
      _AdjustCandidatePool(self, [])
3389

    
3390
    if self.op.maintain_node_health is not None:
3391
      self.cluster.maintain_node_health = self.op.maintain_node_health
3392

    
3393
    if self.op.prealloc_wipe_disks is not None:
3394
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
3395

    
3396
    if self.op.add_uids is not None:
3397
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
3398

    
3399
    if self.op.remove_uids is not None:
3400
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
3401

    
3402
    if self.op.uid_pool is not None:
3403
      self.cluster.uid_pool = self.op.uid_pool
3404

    
3405
    if self.op.default_iallocator is not None:
3406
      self.cluster.default_iallocator = self.op.default_iallocator
3407

    
3408
    if self.op.reserved_lvs is not None:
3409
      self.cluster.reserved_lvs = self.op.reserved_lvs
3410

    
3411
    def helper_os(aname, mods, desc):
3412
      desc += " OS list"
3413
      lst = getattr(self.cluster, aname)
3414
      for key, val in mods:
3415
        if key == constants.DDM_ADD:
3416
          if val in lst:
3417
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
3418
          else:
3419
            lst.append(val)
3420
        elif key == constants.DDM_REMOVE:
3421
          if val in lst:
3422
            lst.remove(val)
3423
          else:
3424
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
3425
        else:
3426
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
3427

    
3428
    if self.op.hidden_os:
3429
      helper_os("hidden_os", self.op.hidden_os, "hidden")
3430

    
3431
    if self.op.blacklisted_os:
3432
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
3433

    
3434
    if self.op.master_netdev:
3435
      master = self.cfg.GetMasterNode()
3436
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
3437
                  self.cluster.master_netdev)
3438
      result = self.rpc.call_node_stop_master(master, False)
3439
      result.Raise("Could not disable the master ip")
3440
      feedback_fn("Changing master_netdev from %s to %s" %
3441
                  (self.cluster.master_netdev, self.op.master_netdev))
3442
      self.cluster.master_netdev = self.op.master_netdev
3443

    
3444
    self.cfg.Update(self.cluster, feedback_fn)
3445

    
3446
    if self.op.master_netdev:
3447
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
3448
                  self.op.master_netdev)
3449
      result = self.rpc.call_node_start_master(master, False, False)
3450
      if result.fail_msg:
3451
        self.LogWarning("Could not re-enable the master ip on"
3452
                        " the master, please restart manually: %s",
3453
                        result.fail_msg)
3454

    
3455

    
3456
def _UploadHelper(lu, nodes, fname):
3457
  """Helper for uploading a file and showing warnings.
3458

3459
  """
3460
  if os.path.exists(fname):
3461
    result = lu.rpc.call_upload_file(nodes, fname)
3462
    for to_node, to_result in result.items():
3463
      msg = to_result.fail_msg
3464
      if msg:
3465
        msg = ("Copy of file %s to node %s failed: %s" %
3466
               (fname, to_node, msg))
3467
        lu.proc.LogWarning(msg)
3468

    
3469

    
3470
def _ComputeAncillaryFiles(cluster, redist):
3471
  """Compute files external to Ganeti which need to be consistent.
3472

3473
  @type redist: boolean
3474
  @param redist: Whether to include files which need to be redistributed
3475

3476
  """
3477
  # Compute files for all nodes
3478
  files_all = set([
3479
    constants.SSH_KNOWN_HOSTS_FILE,
3480
    constants.CONFD_HMAC_KEY,
3481
    constants.CLUSTER_DOMAIN_SECRET_FILE,
3482
    ])
3483

    
3484
  if not redist:
3485
    files_all.update(constants.ALL_CERT_FILES)
3486
    files_all.update(ssconf.SimpleStore().GetFileList())
3487

    
3488
  if cluster.modify_etc_hosts:
3489
    files_all.add(constants.ETC_HOSTS)
3490

    
3491
  # Files which must either exist on all nodes or on none
3492
  files_all_opt = set([
3493
    constants.RAPI_USERS_FILE,
3494
    ])
3495

    
3496
  # Files which should only be on master candidates
3497
  files_mc = set()
3498
  if not redist:
3499
    files_mc.add(constants.CLUSTER_CONF_FILE)
3500

    
3501
  # Files which should only be on VM-capable nodes
3502
  files_vm = set(filename
3503
    for hv_name in cluster.enabled_hypervisors
3504
    for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles())
3505

    
3506
  # Filenames must be unique
3507
  assert (len(files_all | files_all_opt | files_mc | files_vm) ==
3508
          sum(map(len, [files_all, files_all_opt, files_mc, files_vm]))), \
3509
         "Found file listed in more than one file list"
3510

    
3511
  return (files_all, files_all_opt, files_mc, files_vm)
3512

    
3513

    
3514
def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
3515
  """Distribute additional files which are part of the cluster configuration.
3516

3517
  ConfigWriter takes care of distributing the config and ssconf files, but
3518
  there are more files which should be distributed to all nodes. This function
3519
  makes sure those are copied.
3520

3521
  @param lu: calling logical unit
3522
  @param additional_nodes: list of nodes not in the config to distribute to
3523
  @type additional_vm: boolean
3524
  @param additional_vm: whether the additional nodes are vm-capable or not
3525

3526
  """
3527
  # Gather target nodes
3528
  cluster = lu.cfg.GetClusterInfo()
3529
  master_info = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
3530

    
3531
  online_nodes = lu.cfg.GetOnlineNodeList()
3532
  vm_nodes = lu.cfg.GetVmCapableNodeList()
3533

    
3534
  if additional_nodes is not None:
3535
    online_nodes.extend(additional_nodes)
3536
    if additional_vm:
3537
      vm_nodes.extend(additional_nodes)
3538

    
3539
  # Never distribute to master node
3540
  for nodelist in [online_nodes, vm_nodes]:
3541
    if master_info.name in nodelist:
3542
      nodelist.remove(master_info.name)
3543

    
3544
  # Gather file lists
3545
  (files_all, files_all_opt, files_mc, files_vm) = \
3546
    _ComputeAncillaryFiles(cluster, True)
3547

    
3548
  # Never re-distribute configuration file from here
3549
  assert not (constants.CLUSTER_CONF_FILE in files_all or
3550
              constants.CLUSTER_CONF_FILE in files_vm)
3551
  assert not files_mc, "Master candidates not handled in this function"
3552

    
3553
  filemap = [
3554
    (online_nodes, files_all),
3555
    (online_nodes, files_all_opt),
3556
    (vm_nodes, files_vm),
3557
    ]
3558

    
3559
  # Upload the files
3560
  for (node_list, files) in filemap:
3561
    for fname in files:
3562
      _UploadHelper(lu, node_list, fname)
3563

    
3564

    
3565
class LUClusterRedistConf(NoHooksLU):
3566
  """Force the redistribution of cluster configuration.
3567

3568
  This is a very simple LU.
3569

3570
  """
3571
  REQ_BGL = False
3572

    
3573
  def ExpandNames(self):
3574
    self.needed_locks = {
3575
      locking.LEVEL_NODE: locking.ALL_SET,
3576
    }
3577
    self.share_locks[locking.LEVEL_NODE] = 1
3578

    
3579
  def Exec(self, feedback_fn):
3580
    """Redistribute the configuration.
3581

3582
    """
3583
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
3584
    _RedistributeAncillaryFiles(self)
3585

    
3586

    
3587
def _WaitForSync(lu, instance, disks=None, oneshot=False):
3588
  """Sleep and poll for an instance's disk to sync.
3589

3590
  """
3591
  if not instance.disks or disks is not None and not disks:
3592
    return True
3593

    
3594
  disks = _ExpandCheckDisks(instance, disks)
3595

    
3596
  if not oneshot:
3597
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
3598

    
3599
  node = instance.primary_node
3600

    
3601
  for dev in disks:
3602
    lu.cfg.SetDiskID(dev, node)
3603

    
3604
  # TODO: Convert to utils.Retry
3605

    
3606
  retries = 0
3607
  degr_retries = 10 # in seconds, as we sleep 1 second each time
3608
  while True:
3609
    max_time = 0
3610
    done = True
3611
    cumul_degraded = False
3612
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, disks)
3613
    msg = rstats.fail_msg
3614
    if msg:
3615
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
3616
      retries += 1
3617
      if retries >= 10:
3618
        raise errors.RemoteError("Can't contact node %s for mirror data,"
3619
                                 " aborting." % node)
3620
      time.sleep(6)
3621
      continue
3622
    rstats = rstats.payload
3623
    retries = 0
3624
    for i, mstat in enumerate(rstats):
3625
      if mstat is None:
3626
        lu.LogWarning("Can't compute data for node %s/%s",
3627
                           node, disks[i].iv_name)
3628
        continue
3629

    
3630
      cumul_degraded = (cumul_degraded or
3631
                        (mstat.is_degraded and mstat.sync_percent is None))
3632
      if mstat.sync_percent is not None:
3633
        done = False
3634
        if mstat.estimated_time is not None:
3635
          rem_time = ("%s remaining (estimated)" %
3636
                      utils.FormatSeconds(mstat.estimated_time))
3637
          max_time = mstat.estimated_time
3638
        else:
3639
          rem_time = "no time estimate"
3640
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
3641
                        (disks[i].iv_name, mstat.sync_percent, rem_time))
3642

    
3643
    # if we're done but degraded, let's do a few small retries, to
3644
    # make sure we see a stable and not transient situation; therefore
3645
    # we force restart of the loop
3646
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
3647
      logging.info("Degraded disks found, %d retries left", degr_retries)
3648
      degr_retries -= 1
3649
      time.sleep(1)
3650
      continue
3651

    
3652
    if done or oneshot:
3653
      break
3654

    
3655
    time.sleep(min(60, max_time))
3656

    
3657
  if done:
3658
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
3659
  return not cumul_degraded
3660

    
3661

    
3662
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
3663
  """Check that mirrors are not degraded.
3664

3665
  The ldisk parameter, if True, will change the test from the
3666
  is_degraded attribute (which represents overall non-ok status for
3667
  the device(s)) to the ldisk (representing the local storage status).
3668

3669
  """
3670
  lu.cfg.SetDiskID(dev, node)
3671

    
3672
  result = True
3673

    
3674
  if on_primary or dev.AssembleOnSecondary():
3675
    rstats = lu.rpc.call_blockdev_find(node, dev)
3676
    msg = rstats.fail_msg
3677
    if msg:
3678
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
3679
      result = False
3680
    elif not rstats.payload:
3681
      lu.LogWarning("Can't find disk on node %s", node)
3682
      result = False
3683
    else:
3684
      if ldisk:
3685
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
3686
      else:
3687
        result = result and not rstats.payload.is_degraded
3688

    
3689
  if dev.children:
3690
    for child in dev.children:
3691
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
3692

    
3693
  return result
3694

    
3695

    
3696
class LUOobCommand(NoHooksLU):
3697
  """Logical unit for OOB handling.
3698

3699
  """
3700
  REG_BGL = False
3701
  _SKIP_MASTER = (constants.OOB_POWER_OFF, constants.OOB_POWER_CYCLE)
3702

    
3703
  def ExpandNames(self):
3704
    """Gather locks we need.
3705

3706
    """
3707
    if self.op.node_names:
3708
      self.op.node_names = _GetWantedNodes(self, self.op.node_names)
3709
      lock_names = self.op.node_names
3710
    else:
3711
      lock_names = locking.ALL_SET
3712

    
3713
    self.needed_locks = {
3714
      locking.LEVEL_NODE: lock_names,
3715
      }
3716

    
3717
  def CheckPrereq(self):
3718
    """Check prerequisites.
3719

3720
    This checks:
3721
     - the node exists in the configuration
3722
     - OOB is supported
3723

3724
    Any errors are signaled by raising errors.OpPrereqError.
3725

3726
    """
3727
    self.nodes = []
3728
    self.master_node = self.cfg.GetMasterNode()
3729

    
3730
    assert self.op.power_delay >= 0.0
3731

    
3732
    if self.op.node_names:
3733
      if (self.op.command in self._SKIP_MASTER and
3734
          self.master_node in self.op.node_names):
3735
        master_node_obj = self.cfg.GetNodeInfo(self.master_node)
3736
        master_oob_handler = _SupportsOob(self.cfg, master_node_obj)
3737

    
3738
        if master_oob_handler:
3739
          additional_text = ("run '%s %s %s' if you want to operate on the"
3740
                             " master regardless") % (master_oob_handler,
3741
                                                      self.op.command,
3742
                                                      self.master_node)
3743
        else:
3744
          additional_text = "it does not support out-of-band operations"
3745

    
3746
        raise errors.OpPrereqError(("Operating on the master node %s is not"
3747
                                    " allowed for %s; %s") %
3748
                                   (self.master_node, self.op.command,
3749
                                    additional_text), errors.ECODE_INVAL)
3750
    else:
3751
      self.op.node_names = self.cfg.GetNodeList()
3752
      if self.op.command in self._SKIP_MASTER:
3753
        self.op.node_names.remove(self.master_node)
3754

    
3755
    if self.op.command in self._SKIP_MASTER:
3756
      assert self.master_node not in self.op.node_names
3757

    
3758
    for node_name in self.op.node_names:
3759
      node = self.cfg.GetNodeInfo(node_name)
3760

    
3761
      if node is None:
3762
        raise errors.OpPrereqError("Node %s not found" % node_name,
3763
                                   errors.ECODE_NOENT)
3764
      else:
3765
        self.nodes.append(node)
3766

    
3767
      if (not self.op.ignore_status and
3768
          (self.op.command == constants.OOB_POWER_OFF and not node.offline)):
3769
        raise errors.OpPrereqError(("Cannot power off node %s because it is"
3770
                                    " not marked offline") % node_name,
3771
                                   errors.ECODE_STATE)
3772

    
3773
  def Exec(self, feedback_fn):
3774
    """Execute OOB and return result if we expect any.
3775

3776
    """
3777
    master_node = self.master_node
3778
    ret = []
3779

    
3780
    for idx, node in enumerate(utils.NiceSort(self.nodes,
3781
                                              key=lambda node: node.name)):
3782
      node_entry = [(constants.RS_NORMAL, node.name)]
3783
      ret.append(node_entry)
3784

    
3785
      oob_program = _SupportsOob(self.cfg, node)
3786

    
3787
      if not oob_program:
3788
        node_entry.append((constants.RS_UNAVAIL, None))
3789
        continue
3790

    
3791
      logging.info("Executing out-of-band command '%s' using '%s' on %s",
3792
                   self.op.command, oob_program, node.name)
3793
      result = self.rpc.call_run_oob(master_node, oob_program,
3794
                                     self.op.command, node.name,
3795
                                     self.op.timeout)
3796

    
3797
      if result.fail_msg:
3798
        self.LogWarning("Out-of-band RPC failed on node '%s': %s",
3799
                        node.name, result.fail_msg)
3800
        node_entry.append((constants.RS_NODATA, None))
3801
      else:
3802
        try:
3803
          self._CheckPayload(result)
3804
        except errors.OpExecError, err:
3805
          self.LogWarning("Payload returned by node '%s' is not valid: %s",
3806
                          node.name, err)
3807
          node_entry.append((constants.RS_NODATA, None))
3808
        else:
3809
          if self.op.command == constants.OOB_HEALTH:
3810
            # For health we should log important events
3811
            for item, status in result.payload:
3812
              if status in [constants.OOB_STATUS_WARNING,
3813
                            constants.OOB_STATUS_CRITICAL]:
3814
                self.LogWarning("Item '%s' on node '%s' has status '%s'",
3815
                                item, node.name, status)
3816

    
3817
          if self.op.command == constants.OOB_POWER_ON:
3818
            node.powered = True
3819
          elif self.op.command == constants.OOB_POWER_OFF:
3820
            node.powered = False
3821
          elif self.op.command == constants.OOB_POWER_STATUS:
3822
            powered = result.payload[constants.OOB_POWER_STATUS_POWERED]
3823
            if powered != node.powered:
3824
              logging.warning(("Recorded power state (%s) of node '%s' does not"
3825
                               " match actual power state (%s)"), node.powered,
3826
                              node.name, powered)
3827

    
3828
          # For configuration changing commands we should update the node
3829
          if self.op.command in (constants.OOB_POWER_ON,
3830
                                 constants.OOB_POWER_OFF):
3831
            self.cfg.Update(node, feedback_fn)
3832

    
3833
          node_entry.append((constants.RS_NORMAL, result.payload))
3834

    
3835
          if (self.op.command == constants.OOB_POWER_ON and
3836
              idx < len(self.nodes) - 1):
3837
            time.sleep(self.op.power_delay)
3838

    
3839
    return ret
3840

    
3841
  def _CheckPayload(self, result):
3842
    """Checks if the payload is valid.
3843

3844
    @param result: RPC result
3845
    @raises errors.OpExecError: If payload is not valid
3846

3847
    """
3848
    errs = []
3849
    if self.op.command == constants.OOB_HEALTH:
3850
      if not isinstance(result.payload, list):
3851
        errs.append("command 'health' is expected to return a list but got %s" %
3852
                    type(result.payload))
3853
      else:
3854
        for item, status in result.payload:
3855
          if status not in constants.OOB_STATUSES:
3856
            errs.append("health item '%s' has invalid status '%s'" %
3857
                        (item, status))
3858

    
3859
    if self.op.command == constants.OOB_POWER_STATUS:
3860
      if not isinstance(result.payload, dict):
3861
        errs.append("power-status is expected to return a dict but got %s" %
3862
                    type(result.payload))
3863

    
3864
    if self.op.command in [
3865
        constants.OOB_POWER_ON,
3866
        constants.OOB_POWER_OFF,
3867
        constants.OOB_POWER_CYCLE,
3868
        ]:
3869
      if result.payload is not None:
3870
        errs.append("%s is expected to not return payload but got '%s'" %
3871
                    (self.op.command, result.payload))
3872

    
3873
    if errs:
3874
      raise errors.OpExecError("Check of out-of-band payload failed due to %s" %
3875
                               utils.CommaJoin(errs))
3876

    
3877
class _OsQuery(_QueryBase):
3878
  FIELDS = query.OS_FIELDS
3879

    
3880
  def ExpandNames(self, lu):
3881
    # Lock all nodes in shared mode
3882
    # Temporary removal of locks, should be reverted later
3883
    # TODO: reintroduce locks when they are lighter-weight
3884
    lu.needed_locks = {}
3885
    #self.share_locks[locking.LEVEL_NODE] = 1
3886
    #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3887

    
3888
    # The following variables interact with _QueryBase._GetNames
3889
    if self.names:
3890
      self.wanted = self.names
3891
    else:
3892
      self.wanted = locking.ALL_SET
3893

    
3894
    self.do_locking = self.use_locking
3895

    
3896
  def DeclareLocks(self, lu, level):
3897
    pass
3898

    
3899
  @staticmethod
3900
  def _DiagnoseByOS(rlist):
3901
    """Remaps a per-node return list into an a per-os per-node dictionary
3902

3903
    @param rlist: a map with node names as keys and OS objects as values
3904

3905
    @rtype: dict
3906
    @return: a dictionary with osnames as keys and as value another
3907
        map, with nodes as keys and tuples of (path, status, diagnose,
3908
        variants, parameters, api_versions) as values, eg::
3909

3910
          {"debian-etch": {"node1": [(/usr/lib/..., True, "", [], []),
3911
                                     (/srv/..., False, "invalid api")],
3912
                           "node2": [(/srv/..., True, "", [], [])]}
3913
          }
3914

3915
    """
3916
    all_os = {}
3917
    # we build here the list of nodes that didn't fail the RPC (at RPC
3918
    # level), so that nodes with a non-responding node daemon don't
3919
    # make all OSes invalid
3920
    good_nodes = [node_name for node_name in rlist
3921
                  if not rlist[node_name].fail_msg]
3922
    for node_name, nr in rlist.items():
3923
      if nr.fail_msg or not nr.payload:
3924
        continue
3925
      for (name, path, status, diagnose, variants,
3926
           params, api_versions) in nr.payload:
3927
        if name not in all_os:
3928
          # build a list of nodes for this os containing empty lists
3929
          # for each node in node_list
3930
          all_os[name] = {}
3931
          for nname in good_nodes:
3932
            all_os[name][nname] = []
3933
        # convert params from [name, help] to (name, help)
3934
        params = [tuple(v) for v in params]
3935
        all_os[name][node_name].append((path, status, diagnose,
3936
                                        variants, params, api_versions))
3937
    return all_os
3938

    
3939
  def _GetQueryData(self, lu):
3940
    """Computes the list of nodes and their attributes.
3941

3942
    """
3943
    # Locking is not used
3944
    assert not (compat.any(lu.glm.is_owned(level)
3945
                           for level in locking.LEVELS
3946
                           if level != locking.LEVEL_CLUSTER) or
3947
                self.do_locking or self.use_locking)
3948

    
3949
    valid_nodes = [node.name
3950
                   for node in lu.cfg.GetAllNodesInfo().values()
3951
                   if not node.offline and node.vm_capable]
3952
    pol = self._DiagnoseByOS(lu.rpc.call_os_diagnose(valid_nodes))
3953
    cluster = lu.cfg.GetClusterInfo()
3954

    
3955
    data = {}
3956

    
3957
    for (os_name, os_data) in pol.items():
3958
      info = query.OsInfo(name=os_name, valid=True, node_status=os_data,
3959
                          hidden=(os_name in cluster.hidden_os),
3960
                          blacklisted=(os_name in cluster.blacklisted_os))
3961

    
3962
      variants = set()
3963
      parameters = set()
3964
      api_versions = set()
3965

    
3966
      for idx, osl in enumerate(os_data.values()):
3967
        info.valid = bool(info.valid and osl and osl[0][1])
3968
        if not info.valid:
3969
          break
3970

    
3971
        (node_variants, node_params, node_api) = osl[0][3:6]
3972
        if idx == 0:
3973
          # First entry
3974
          variants.update(node_variants)
3975
          parameters.update(node_params)
3976
          api_versions.update(node_api)
3977
        else:
3978
          # Filter out inconsistent values
3979
          variants.intersection_update(node_variants)
3980
          parameters.intersection_update(node_params)
3981
          api_versions.intersection_update(node_api)
3982

    
3983
      info.variants = list(variants)
3984
      info.parameters = list(parameters)
3985
      info.api_versions = list(api_versions)
3986

    
3987
      data[os_name] = info
3988

    
3989
    # Prepare data in requested order
3990
    return [data[name] for name in self._GetNames(lu, pol.keys(), None)
3991
            if name in data]
3992

    
3993

    
3994
class LUOsDiagnose(NoHooksLU):
3995
  """Logical unit for OS diagnose/query.
3996

3997
  """
3998
  REQ_BGL = False
3999

    
4000
  @staticmethod
4001
  def _BuildFilter(fields, names):
4002
    """Builds a filter for querying OSes.
4003

4004
    """
4005
    name_filter = qlang.MakeSimpleFilter("name", names)
4006

    
4007
    # Legacy behaviour: Hide hidden, blacklisted or invalid OSes if the
4008
    # respective field is not requested
4009
    status_filter = [[qlang.OP_NOT, [qlang.OP_TRUE, fname]]
4010
                     for fname in ["hidden", "blacklisted"]
4011
                     if fname not in fields]
4012
    if "valid" not in fields:
4013
      status_filter.append([qlang.OP_TRUE, "valid"])
4014

    
4015
    if status_filter:
4016
      status_filter.insert(0, qlang.OP_AND)
4017
    else:
4018
      status_filter = None
4019

    
4020
    if name_filter and status_filter:
4021
      return [qlang.OP_AND, name_filter, status_filter]
4022
    elif name_filter:
4023
      return name_filter
4024
    else:
4025
      return status_filter
4026

    
4027
  def CheckArguments(self):
4028
    self.oq = _OsQuery(self._BuildFilter(self.op.output_fields, self.op.names),
4029
                       self.op.output_fields, False)
4030

    
4031
  def ExpandNames(self):
4032
    self.oq.ExpandNames(self)
4033

    
4034
  def Exec(self, feedback_fn):
4035
    return self.oq.OldStyleQuery(self)
4036

    
4037

    
4038
class LUNodeRemove(LogicalUnit):
4039
  """Logical unit for removing a node.
4040

4041
  """
4042
  HPATH = "node-remove"
4043
  HTYPE = constants.HTYPE_NODE
4044

    
4045
  def BuildHooksEnv(self):
4046
    """Build hooks env.
4047

4048
    This doesn't run on the target node in the pre phase as a failed
4049
    node would then be impossible to remove.
4050

4051
    """
4052
    return {
4053
      "OP_TARGET": self.op.node_name,
4054
      "NODE_NAME": self.op.node_name,
4055
      }
4056

    
4057
  def BuildHooksNodes(self):
4058
    """Build hooks nodes.
4059

4060
    """
4061
    all_nodes = self.cfg.GetNodeList()
4062
    try:
4063
      all_nodes.remove(self.op.node_name)
4064
    except ValueError:
4065
      logging.warning("Node '%s', which is about to be removed, was not found"
4066
                      " in the list of all nodes", self.op.node_name)
4067
    return (all_nodes, all_nodes)
4068

    
4069
  def CheckPrereq(self):
4070
    """Check prerequisites.
4071

4072
    This checks:
4073
     - the node exists in the configuration
4074
     - it does not have primary or secondary instances
4075
     - it's not the master
4076

4077
    Any errors are signaled by raising errors.OpPrereqError.
4078

4079
    """
4080
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
4081
    node = self.cfg.GetNodeInfo(self.op.node_name)
4082
    assert node is not None
4083

    
4084
    instance_list = self.cfg.GetInstanceList()
4085

    
4086
    masternode = self.cfg.GetMasterNode()
4087
    if node.name == masternode:
4088
      raise errors.OpPrereqError("Node is the master node, failover to another"
4089
                                 " node is required", errors.ECODE_INVAL)
4090

    
4091
    for instance_name in instance_list:
4092
      instance = self.cfg