Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ adb6d685

History | View | Annotate | Download (353.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0201,C0302
25

    
26
# W0201 since most LU attributes are defined in CheckPrereq or similar
27
# functions
28

    
29
# C0302: since we have waaaay to many lines in this module
30

    
31
import os
32
import os.path
33
import time
34
import re
35
import platform
36
import logging
37
import copy
38
import OpenSSL
39

    
40
from ganeti import ssh
41
from ganeti import utils
42
from ganeti import errors
43
from ganeti import hypervisor
44
from ganeti import locking
45
from ganeti import constants
46
from ganeti import objects
47
from ganeti import serializer
48
from ganeti import ssconf
49
from ganeti import uidpool
50
from ganeti import compat
51
from ganeti import masterd
52

    
53
import ganeti.masterd.instance # pylint: disable-msg=W0611
54

    
55

    
56
# Modifiable default values; need to define these here before the
57
# actual LUs
58

    
59
def _EmptyList():
60
  """Returns an empty list.
61

62
  """
63
  return []
64

    
65

    
66
def _EmptyDict():
67
  """Returns an empty dict.
68

69
  """
70
  return {}
71

    
72

    
73
# Some basic types
74
def _TNotNone(val):
75
  """Checks if the given value is not None.
76

77
  """
78
  return val is not None
79

    
80

    
81
def _TNone(val):
82
  """Checks if the given value is None.
83

84
  """
85
  return val is None
86

    
87

    
88
def _TBool(val):
89
  """Checks if the given value is a boolean.
90

91
  """
92
  return isinstance(val, bool)
93

    
94

    
95
def _TInt(val):
96
  """Checks if the given value is an integer.
97

98
  """
99
  return isinstance(val, int)
100

    
101

    
102
def _TFloat(val):
103
  """Checks if the given value is a float.
104

105
  """
106
  return isinstance(val, float)
107

    
108

    
109
def _TString(val):
110
  """Checks if the given value is a string.
111

112
  """
113
  return isinstance(val, basestring)
114

    
115

    
116
def _TTrue(val):
117
  """Checks if a given value evaluates to a boolean True value.
118

119
  """
120
  return bool(val)
121

    
122

    
123
def _TElemOf(target_list):
124
  """Builds a function that checks if a given value is a member of a list.
125

126
  """
127
  return lambda val: val in target_list
128

    
129

    
130
# Container types
131
def _TList(val):
132
  """Checks if the given value is a list.
133

134
  """
135
  return isinstance(val, list)
136

    
137

    
138
def _TDict(val):
139
  """Checks if the given value is a dictionary.
140

141
  """
142
  return isinstance(val, dict)
143

    
144

    
145
# Combinator types
146
def _TAnd(*args):
147
  """Combine multiple functions using an AND operation.
148

149
  """
150
  def fn(val):
151
    return compat.all(t(val) for t in args)
152
  return fn
153

    
154

    
155
def _TOr(*args):
156
  """Combine multiple functions using an AND operation.
157

158
  """
159
  def fn(val):
160
    return compat.any(t(val) for t in args)
161
  return fn
162

    
163

    
164
# Type aliases
165

    
166
# non-empty string
167
_TNonEmptyString = _TAnd(_TString, _TTrue)
168

    
169

    
170
# positive integer
171
_TPositiveInt = _TAnd(_TInt, lambda v: v >= 0)
172

    
173

    
174
def _TListOf(my_type):
175
  """Checks if a given value is a list with all elements of the same type.
176

177
  """
178
  return _TAnd(_TList,
179
               lambda lst: compat.all(lst, my_type))
180

    
181

    
182
def _TDictOf(key_type, val_type):
183
  """Checks a dict type for the type of its key/values.
184

185
  """
186
  return _TAnd(_TDict,
187
               lambda my_dict: (compat.all(my_dict.keys(), key_type) and
188
                                compat.all(my_dict.values(), val_type)))
189

    
190

    
191
# End types
192
class LogicalUnit(object):
193
  """Logical Unit base class.
194

195
  Subclasses must follow these rules:
196
    - implement ExpandNames
197
    - implement CheckPrereq (except when tasklets are used)
198
    - implement Exec (except when tasklets are used)
199
    - implement BuildHooksEnv
200
    - redefine HPATH and HTYPE
201
    - optionally redefine their run requirements:
202
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
203

204
  Note that all commands require root permissions.
205

206
  @ivar dry_run_result: the value (if any) that will be returned to the caller
207
      in dry-run mode (signalled by opcode dry_run parameter)
208
  @cvar _OP_DEFS: a list of opcode attributes and the defaults values
209
      they should get if not already existing
210

211
  """
212
  HPATH = None
213
  HTYPE = None
214
  _OP_REQP = []
215
  _OP_DEFS = []
216
  REQ_BGL = True
217

    
218
  def __init__(self, processor, op, context, rpc):
219
    """Constructor for LogicalUnit.
220

221
    This needs to be overridden in derived classes in order to check op
222
    validity.
223

224
    """
225
    self.proc = processor
226
    self.op = op
227
    self.cfg = context.cfg
228
    self.context = context
229
    self.rpc = rpc
230
    # Dicts used to declare locking needs to mcpu
231
    self.needed_locks = None
232
    self.acquired_locks = {}
233
    self.share_locks = dict.fromkeys(locking.LEVELS, 0)
234
    self.add_locks = {}
235
    self.remove_locks = {}
236
    # Used to force good behavior when calling helper functions
237
    self.recalculate_locks = {}
238
    self.__ssh = None
239
    # logging
240
    self.LogWarning = processor.LogWarning # pylint: disable-msg=C0103
241
    self.LogInfo = processor.LogInfo # pylint: disable-msg=C0103
242
    self.LogStep = processor.LogStep # pylint: disable-msg=C0103
243
    # support for dry-run
244
    self.dry_run_result = None
245
    # support for generic debug attribute
246
    if (not hasattr(self.op, "debug_level") or
247
        not isinstance(self.op.debug_level, int)):
248
      self.op.debug_level = 0
249

    
250
    # Tasklets
251
    self.tasklets = None
252

    
253
    for aname, aval in self._OP_DEFS:
254
      if not hasattr(self.op, aname):
255
        if callable(aval):
256
          dval = aval()
257
        else:
258
          dval = aval
259
        setattr(self.op, aname, dval)
260

    
261
    for attr_name, test in self._OP_REQP:
262
      if not hasattr(op, attr_name):
263
        raise errors.OpPrereqError("Required parameter '%s' missing" %
264
                                   attr_name, errors.ECODE_INVAL)
265
      attr_val = getattr(op, attr_name, None)
266
      if not callable(test):
267
        raise errors.ProgrammerError("Validation for parameter '%s' failed,"
268
                                     " given type is not a proper type (%s)" %
269
                                     (attr_name, test))
270
      if not test(attr_val):
271
        raise errors.OpPrereqError("Parameter '%s' has invalid type" %
272
                                   attr_name, errors.ECODE_INVAL)
273

    
274
    self.CheckArguments()
275

    
276
  def __GetSSH(self):
277
    """Returns the SshRunner object
278

279
    """
280
    if not self.__ssh:
281
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
282
    return self.__ssh
283

    
284
  ssh = property(fget=__GetSSH)
285

    
286
  def CheckArguments(self):
287
    """Check syntactic validity for the opcode arguments.
288

289
    This method is for doing a simple syntactic check and ensure
290
    validity of opcode parameters, without any cluster-related
291
    checks. While the same can be accomplished in ExpandNames and/or
292
    CheckPrereq, doing these separate is better because:
293

294
      - ExpandNames is left as as purely a lock-related function
295
      - CheckPrereq is run after we have acquired locks (and possible
296
        waited for them)
297

298
    The function is allowed to change the self.op attribute so that
299
    later methods can no longer worry about missing parameters.
300

301
    """
302
    pass
303

    
304
  def ExpandNames(self):
305
    """Expand names for this LU.
306

307
    This method is called before starting to execute the opcode, and it should
308
    update all the parameters of the opcode to their canonical form (e.g. a
309
    short node name must be fully expanded after this method has successfully
310
    completed). This way locking, hooks, logging, ecc. can work correctly.
311

312
    LUs which implement this method must also populate the self.needed_locks
313
    member, as a dict with lock levels as keys, and a list of needed lock names
314
    as values. Rules:
315

316
      - use an empty dict if you don't need any lock
317
      - if you don't need any lock at a particular level omit that level
318
      - don't put anything for the BGL level
319
      - if you want all locks at a level use locking.ALL_SET as a value
320

321
    If you need to share locks (rather than acquire them exclusively) at one
322
    level you can modify self.share_locks, setting a true value (usually 1) for
323
    that level. By default locks are not shared.
324

325
    This function can also define a list of tasklets, which then will be
326
    executed in order instead of the usual LU-level CheckPrereq and Exec
327
    functions, if those are not defined by the LU.
328

329
    Examples::
330

331
      # Acquire all nodes and one instance
332
      self.needed_locks = {
333
        locking.LEVEL_NODE: locking.ALL_SET,
334
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
335
      }
336
      # Acquire just two nodes
337
      self.needed_locks = {
338
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
339
      }
340
      # Acquire no locks
341
      self.needed_locks = {} # No, you can't leave it to the default value None
342

343
    """
344
    # The implementation of this method is mandatory only if the new LU is
345
    # concurrent, so that old LUs don't need to be changed all at the same
346
    # time.
347
    if self.REQ_BGL:
348
      self.needed_locks = {} # Exclusive LUs don't need locks.
349
    else:
350
      raise NotImplementedError
351

    
352
  def DeclareLocks(self, level):
353
    """Declare LU locking needs for a level
354

355
    While most LUs can just declare their locking needs at ExpandNames time,
356
    sometimes there's the need to calculate some locks after having acquired
357
    the ones before. This function is called just before acquiring locks at a
358
    particular level, but after acquiring the ones at lower levels, and permits
359
    such calculations. It can be used to modify self.needed_locks, and by
360
    default it does nothing.
361

362
    This function is only called if you have something already set in
363
    self.needed_locks for the level.
364

365
    @param level: Locking level which is going to be locked
366
    @type level: member of ganeti.locking.LEVELS
367

368
    """
369

    
370
  def CheckPrereq(self):
371
    """Check prerequisites for this LU.
372

373
    This method should check that the prerequisites for the execution
374
    of this LU are fulfilled. It can do internode communication, but
375
    it should be idempotent - no cluster or system changes are
376
    allowed.
377

378
    The method should raise errors.OpPrereqError in case something is
379
    not fulfilled. Its return value is ignored.
380

381
    This method should also update all the parameters of the opcode to
382
    their canonical form if it hasn't been done by ExpandNames before.
383

384
    """
385
    if self.tasklets is not None:
386
      for (idx, tl) in enumerate(self.tasklets):
387
        logging.debug("Checking prerequisites for tasklet %s/%s",
388
                      idx + 1, len(self.tasklets))
389
        tl.CheckPrereq()
390
    else:
391
      pass
392

    
393
  def Exec(self, feedback_fn):
394
    """Execute the LU.
395

396
    This method should implement the actual work. It should raise
397
    errors.OpExecError for failures that are somewhat dealt with in
398
    code, or expected.
399

400
    """
401
    if self.tasklets is not None:
402
      for (idx, tl) in enumerate(self.tasklets):
403
        logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
404
        tl.Exec(feedback_fn)
405
    else:
406
      raise NotImplementedError
407

    
408
  def BuildHooksEnv(self):
409
    """Build hooks environment for this LU.
410

411
    This method should return a three-node tuple consisting of: a dict
412
    containing the environment that will be used for running the
413
    specific hook for this LU, a list of node names on which the hook
414
    should run before the execution, and a list of node names on which
415
    the hook should run after the execution.
416

417
    The keys of the dict must not have 'GANETI_' prefixed as this will
418
    be handled in the hooks runner. Also note additional keys will be
419
    added by the hooks runner. If the LU doesn't define any
420
    environment, an empty dict (and not None) should be returned.
421

422
    No nodes should be returned as an empty list (and not None).
423

424
    Note that if the HPATH for a LU class is None, this function will
425
    not be called.
426

427
    """
428
    raise NotImplementedError
429

    
430
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
431
    """Notify the LU about the results of its hooks.
432

433
    This method is called every time a hooks phase is executed, and notifies
434
    the Logical Unit about the hooks' result. The LU can then use it to alter
435
    its result based on the hooks.  By default the method does nothing and the
436
    previous result is passed back unchanged but any LU can define it if it
437
    wants to use the local cluster hook-scripts somehow.
438

439
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
440
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
441
    @param hook_results: the results of the multi-node hooks rpc call
442
    @param feedback_fn: function used send feedback back to the caller
443
    @param lu_result: the previous Exec result this LU had, or None
444
        in the PRE phase
445
    @return: the new Exec result, based on the previous result
446
        and hook results
447

448
    """
449
    # API must be kept, thus we ignore the unused argument and could
450
    # be a function warnings
451
    # pylint: disable-msg=W0613,R0201
452
    return lu_result
453

    
454
  def _ExpandAndLockInstance(self):
455
    """Helper function to expand and lock an instance.
456

457
    Many LUs that work on an instance take its name in self.op.instance_name
458
    and need to expand it and then declare the expanded name for locking. This
459
    function does it, and then updates self.op.instance_name to the expanded
460
    name. It also initializes needed_locks as a dict, if this hasn't been done
461
    before.
462

463
    """
464
    if self.needed_locks is None:
465
      self.needed_locks = {}
466
    else:
467
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
468
        "_ExpandAndLockInstance called with instance-level locks set"
469
    self.op.instance_name = _ExpandInstanceName(self.cfg,
470
                                                self.op.instance_name)
471
    self.needed_locks[locking.LEVEL_INSTANCE] = self.op.instance_name
472

    
473
  def _LockInstancesNodes(self, primary_only=False):
474
    """Helper function to declare instances' nodes for locking.
475

476
    This function should be called after locking one or more instances to lock
477
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
478
    with all primary or secondary nodes for instances already locked and
479
    present in self.needed_locks[locking.LEVEL_INSTANCE].
480

481
    It should be called from DeclareLocks, and for safety only works if
482
    self.recalculate_locks[locking.LEVEL_NODE] is set.
483

484
    In the future it may grow parameters to just lock some instance's nodes, or
485
    to just lock primaries or secondary nodes, if needed.
486

487
    If should be called in DeclareLocks in a way similar to::
488

489
      if level == locking.LEVEL_NODE:
490
        self._LockInstancesNodes()
491

492
    @type primary_only: boolean
493
    @param primary_only: only lock primary nodes of locked instances
494

495
    """
496
    assert locking.LEVEL_NODE in self.recalculate_locks, \
497
      "_LockInstancesNodes helper function called with no nodes to recalculate"
498

    
499
    # TODO: check if we're really been called with the instance locks held
500

    
501
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
502
    # future we might want to have different behaviors depending on the value
503
    # of self.recalculate_locks[locking.LEVEL_NODE]
504
    wanted_nodes = []
505
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
506
      instance = self.context.cfg.GetInstanceInfo(instance_name)
507
      wanted_nodes.append(instance.primary_node)
508
      if not primary_only:
509
        wanted_nodes.extend(instance.secondary_nodes)
510

    
511
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
512
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
513
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
514
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
515

    
516
    del self.recalculate_locks[locking.LEVEL_NODE]
517

    
518

    
519
class NoHooksLU(LogicalUnit): # pylint: disable-msg=W0223
520
  """Simple LU which runs no hooks.
521

522
  This LU is intended as a parent for other LogicalUnits which will
523
  run no hooks, in order to reduce duplicate code.
524

525
  """
526
  HPATH = None
527
  HTYPE = None
528

    
529
  def BuildHooksEnv(self):
530
    """Empty BuildHooksEnv for NoHooksLu.
531

532
    This just raises an error.
533

534
    """
535
    assert False, "BuildHooksEnv called for NoHooksLUs"
536

    
537

    
538
class Tasklet:
539
  """Tasklet base class.
540

541
  Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
542
  they can mix legacy code with tasklets. Locking needs to be done in the LU,
543
  tasklets know nothing about locks.
544

545
  Subclasses must follow these rules:
546
    - Implement CheckPrereq
547
    - Implement Exec
548

549
  """
550
  def __init__(self, lu):
551
    self.lu = lu
552

    
553
    # Shortcuts
554
    self.cfg = lu.cfg
555
    self.rpc = lu.rpc
556

    
557
  def CheckPrereq(self):
558
    """Check prerequisites for this tasklets.
559

560
    This method should check whether the prerequisites for the execution of
561
    this tasklet are fulfilled. It can do internode communication, but it
562
    should be idempotent - no cluster or system changes are allowed.
563

564
    The method should raise errors.OpPrereqError in case something is not
565
    fulfilled. Its return value is ignored.
566

567
    This method should also update all parameters to their canonical form if it
568
    hasn't been done before.
569

570
    """
571
    pass
572

    
573
  def Exec(self, feedback_fn):
574
    """Execute the tasklet.
575

576
    This method should implement the actual work. It should raise
577
    errors.OpExecError for failures that are somewhat dealt with in code, or
578
    expected.
579

580
    """
581
    raise NotImplementedError
582

    
583

    
584
def _GetWantedNodes(lu, nodes):
585
  """Returns list of checked and expanded node names.
586

587
  @type lu: L{LogicalUnit}
588
  @param lu: the logical unit on whose behalf we execute
589
  @type nodes: list
590
  @param nodes: list of node names or None for all nodes
591
  @rtype: list
592
  @return: the list of nodes, sorted
593
  @raise errors.ProgrammerError: if the nodes parameter is wrong type
594

595
  """
596
  if not nodes:
597
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
598
      " non-empty list of nodes whose name is to be expanded.")
599

    
600
  wanted = [_ExpandNodeName(lu.cfg, name) for name in nodes]
601
  return utils.NiceSort(wanted)
602

    
603

    
604
def _GetWantedInstances(lu, instances):
605
  """Returns list of checked and expanded instance names.
606

607
  @type lu: L{LogicalUnit}
608
  @param lu: the logical unit on whose behalf we execute
609
  @type instances: list
610
  @param instances: list of instance names or None for all instances
611
  @rtype: list
612
  @return: the list of instances, sorted
613
  @raise errors.OpPrereqError: if the instances parameter is wrong type
614
  @raise errors.OpPrereqError: if any of the passed instances is not found
615

616
  """
617
  if instances:
618
    wanted = [_ExpandInstanceName(lu.cfg, name) for name in instances]
619
  else:
620
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
621
  return wanted
622

    
623

    
624
def _GetUpdatedParams(old_params, update_dict,
625
                      use_default=True, use_none=False):
626
  """Return the new version of a parameter dictionary.
627

628
  @type old_params: dict
629
  @param old_params: old parameters
630
  @type update_dict: dict
631
  @param update_dict: dict containing new parameter values, or
632
      constants.VALUE_DEFAULT to reset the parameter to its default
633
      value
634
  @param use_default: boolean
635
  @type use_default: whether to recognise L{constants.VALUE_DEFAULT}
636
      values as 'to be deleted' values
637
  @param use_none: boolean
638
  @type use_none: whether to recognise C{None} values as 'to be
639
      deleted' values
640
  @rtype: dict
641
  @return: the new parameter dictionary
642

643
  """
644
  params_copy = copy.deepcopy(old_params)
645
  for key, val in update_dict.iteritems():
646
    if ((use_default and val == constants.VALUE_DEFAULT) or
647
        (use_none and val is None)):
648
      try:
649
        del params_copy[key]
650
      except KeyError:
651
        pass
652
    else:
653
      params_copy[key] = val
654
  return params_copy
655

    
656

    
657
def _CheckOutputFields(static, dynamic, selected):
658
  """Checks whether all selected fields are valid.
659

660
  @type static: L{utils.FieldSet}
661
  @param static: static fields set
662
  @type dynamic: L{utils.FieldSet}
663
  @param dynamic: dynamic fields set
664

665
  """
666
  f = utils.FieldSet()
667
  f.Extend(static)
668
  f.Extend(dynamic)
669

    
670
  delta = f.NonMatching(selected)
671
  if delta:
672
    raise errors.OpPrereqError("Unknown output fields selected: %s"
673
                               % ",".join(delta), errors.ECODE_INVAL)
674

    
675

    
676
def _CheckBooleanOpField(op, name):
677
  """Validates boolean opcode parameters.
678

679
  This will ensure that an opcode parameter is either a boolean value,
680
  or None (but that it always exists).
681

682
  """
683
  val = getattr(op, name, None)
684
  if not (val is None or isinstance(val, bool)):
685
    raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
686
                               (name, str(val)), errors.ECODE_INVAL)
687
  setattr(op, name, val)
688

    
689

    
690
def _CheckGlobalHvParams(params):
691
  """Validates that given hypervisor params are not global ones.
692

693
  This will ensure that instances don't get customised versions of
694
  global params.
695

696
  """
697
  used_globals = constants.HVC_GLOBALS.intersection(params)
698
  if used_globals:
699
    msg = ("The following hypervisor parameters are global and cannot"
700
           " be customized at instance level, please modify them at"
701
           " cluster level: %s" % utils.CommaJoin(used_globals))
702
    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
703

    
704

    
705
def _CheckNodeOnline(lu, node):
706
  """Ensure that a given node is online.
707

708
  @param lu: the LU on behalf of which we make the check
709
  @param node: the node to check
710
  @raise errors.OpPrereqError: if the node is offline
711

712
  """
713
  if lu.cfg.GetNodeInfo(node).offline:
714
    raise errors.OpPrereqError("Can't use offline node %s" % node,
715
                               errors.ECODE_INVAL)
716

    
717

    
718
def _CheckNodeNotDrained(lu, node):
719
  """Ensure that a given node is not drained.
720

721
  @param lu: the LU on behalf of which we make the check
722
  @param node: the node to check
723
  @raise errors.OpPrereqError: if the node is drained
724

725
  """
726
  if lu.cfg.GetNodeInfo(node).drained:
727
    raise errors.OpPrereqError("Can't use drained node %s" % node,
728
                               errors.ECODE_INVAL)
729

    
730

    
731
def _CheckNodeHasOS(lu, node, os_name, force_variant):
732
  """Ensure that a node supports a given OS.
733

734
  @param lu: the LU on behalf of which we make the check
735
  @param node: the node to check
736
  @param os_name: the OS to query about
737
  @param force_variant: whether to ignore variant errors
738
  @raise errors.OpPrereqError: if the node is not supporting the OS
739

740
  """
741
  result = lu.rpc.call_os_get(node, os_name)
742
  result.Raise("OS '%s' not in supported OS list for node %s" %
743
               (os_name, node),
744
               prereq=True, ecode=errors.ECODE_INVAL)
745
  if not force_variant:
746
    _CheckOSVariant(result.payload, os_name)
747

    
748

    
749
def _RequireFileStorage():
750
  """Checks that file storage is enabled.
751

752
  @raise errors.OpPrereqError: when file storage is disabled
753

754
  """
755
  if not constants.ENABLE_FILE_STORAGE:
756
    raise errors.OpPrereqError("File storage disabled at configure time",
757
                               errors.ECODE_INVAL)
758

    
759

    
760
def _CheckDiskTemplate(template):
761
  """Ensure a given disk template is valid.
762

763
  """
764
  if template not in constants.DISK_TEMPLATES:
765
    msg = ("Invalid disk template name '%s', valid templates are: %s" %
766
           (template, utils.CommaJoin(constants.DISK_TEMPLATES)))
767
    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
768
  if template == constants.DT_FILE:
769
    _RequireFileStorage()
770

    
771

    
772
def _CheckStorageType(storage_type):
773
  """Ensure a given storage type is valid.
774

775
  """
776
  if storage_type not in constants.VALID_STORAGE_TYPES:
777
    raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
778
                               errors.ECODE_INVAL)
779
  if storage_type == constants.ST_FILE:
780
    _RequireFileStorage()
781
  return True
782

    
783

    
784
def _GetClusterDomainSecret():
785
  """Reads the cluster domain secret.
786

787
  """
788
  return utils.ReadOneLineFile(constants.CLUSTER_DOMAIN_SECRET_FILE,
789
                               strict=True)
790

    
791

    
792
def _CheckInstanceDown(lu, instance, reason):
793
  """Ensure that an instance is not running."""
794
  if instance.admin_up:
795
    raise errors.OpPrereqError("Instance %s is marked to be up, %s" %
796
                               (instance.name, reason), errors.ECODE_STATE)
797

    
798
  pnode = instance.primary_node
799
  ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
800
  ins_l.Raise("Can't contact node %s for instance information" % pnode,
801
              prereq=True, ecode=errors.ECODE_ENVIRON)
802

    
803
  if instance.name in ins_l.payload:
804
    raise errors.OpPrereqError("Instance %s is running, %s" %
805
                               (instance.name, reason), errors.ECODE_STATE)
806

    
807

    
808
def _ExpandItemName(fn, name, kind):
809
  """Expand an item name.
810

811
  @param fn: the function to use for expansion
812
  @param name: requested item name
813
  @param kind: text description ('Node' or 'Instance')
814
  @return: the resolved (full) name
815
  @raise errors.OpPrereqError: if the item is not found
816

817
  """
818
  full_name = fn(name)
819
  if full_name is None:
820
    raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
821
                               errors.ECODE_NOENT)
822
  return full_name
823

    
824

    
825
def _ExpandNodeName(cfg, name):
826
  """Wrapper over L{_ExpandItemName} for nodes."""
827
  return _ExpandItemName(cfg.ExpandNodeName, name, "Node")
828

    
829

    
830
def _ExpandInstanceName(cfg, name):
831
  """Wrapper over L{_ExpandItemName} for instance."""
832
  return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
833

    
834

    
835
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
836
                          memory, vcpus, nics, disk_template, disks,
837
                          bep, hvp, hypervisor_name):
838
  """Builds instance related env variables for hooks
839

840
  This builds the hook environment from individual variables.
841

842
  @type name: string
843
  @param name: the name of the instance
844
  @type primary_node: string
845
  @param primary_node: the name of the instance's primary node
846
  @type secondary_nodes: list
847
  @param secondary_nodes: list of secondary nodes as strings
848
  @type os_type: string
849
  @param os_type: the name of the instance's OS
850
  @type status: boolean
851
  @param status: the should_run status of the instance
852
  @type memory: string
853
  @param memory: the memory size of the instance
854
  @type vcpus: string
855
  @param vcpus: the count of VCPUs the instance has
856
  @type nics: list
857
  @param nics: list of tuples (ip, mac, mode, link) representing
858
      the NICs the instance has
859
  @type disk_template: string
860
  @param disk_template: the disk template of the instance
861
  @type disks: list
862
  @param disks: the list of (size, mode) pairs
863
  @type bep: dict
864
  @param bep: the backend parameters for the instance
865
  @type hvp: dict
866
  @param hvp: the hypervisor parameters for the instance
867
  @type hypervisor_name: string
868
  @param hypervisor_name: the hypervisor for the instance
869
  @rtype: dict
870
  @return: the hook environment for this instance
871

872
  """
873
  if status:
874
    str_status = "up"
875
  else:
876
    str_status = "down"
877
  env = {
878
    "OP_TARGET": name,
879
    "INSTANCE_NAME": name,
880
    "INSTANCE_PRIMARY": primary_node,
881
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
882
    "INSTANCE_OS_TYPE": os_type,
883
    "INSTANCE_STATUS": str_status,
884
    "INSTANCE_MEMORY": memory,
885
    "INSTANCE_VCPUS": vcpus,
886
    "INSTANCE_DISK_TEMPLATE": disk_template,
887
    "INSTANCE_HYPERVISOR": hypervisor_name,
888
  }
889

    
890
  if nics:
891
    nic_count = len(nics)
892
    for idx, (ip, mac, mode, link) in enumerate(nics):
893
      if ip is None:
894
        ip = ""
895
      env["INSTANCE_NIC%d_IP" % idx] = ip
896
      env["INSTANCE_NIC%d_MAC" % idx] = mac
897
      env["INSTANCE_NIC%d_MODE" % idx] = mode
898
      env["INSTANCE_NIC%d_LINK" % idx] = link
899
      if mode == constants.NIC_MODE_BRIDGED:
900
        env["INSTANCE_NIC%d_BRIDGE" % idx] = link
901
  else:
902
    nic_count = 0
903

    
904
  env["INSTANCE_NIC_COUNT"] = nic_count
905

    
906
  if disks:
907
    disk_count = len(disks)
908
    for idx, (size, mode) in enumerate(disks):
909
      env["INSTANCE_DISK%d_SIZE" % idx] = size
910
      env["INSTANCE_DISK%d_MODE" % idx] = mode
911
  else:
912
    disk_count = 0
913

    
914
  env["INSTANCE_DISK_COUNT"] = disk_count
915

    
916
  for source, kind in [(bep, "BE"), (hvp, "HV")]:
917
    for key, value in source.items():
918
      env["INSTANCE_%s_%s" % (kind, key)] = value
919

    
920
  return env
921

    
922

    
923
def _NICListToTuple(lu, nics):
924
  """Build a list of nic information tuples.
925

926
  This list is suitable to be passed to _BuildInstanceHookEnv or as a return
927
  value in LUQueryInstanceData.
928

929
  @type lu:  L{LogicalUnit}
930
  @param lu: the logical unit on whose behalf we execute
931
  @type nics: list of L{objects.NIC}
932
  @param nics: list of nics to convert to hooks tuples
933

934
  """
935
  hooks_nics = []
936
  cluster = lu.cfg.GetClusterInfo()
937
  for nic in nics:
938
    ip = nic.ip
939
    mac = nic.mac
940
    filled_params = cluster.SimpleFillNIC(nic.nicparams)
941
    mode = filled_params[constants.NIC_MODE]
942
    link = filled_params[constants.NIC_LINK]
943
    hooks_nics.append((ip, mac, mode, link))
944
  return hooks_nics
945

    
946

    
947
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
948
  """Builds instance related env variables for hooks from an object.
949

950
  @type lu: L{LogicalUnit}
951
  @param lu: the logical unit on whose behalf we execute
952
  @type instance: L{objects.Instance}
953
  @param instance: the instance for which we should build the
954
      environment
955
  @type override: dict
956
  @param override: dictionary with key/values that will override
957
      our values
958
  @rtype: dict
959
  @return: the hook environment dictionary
960

961
  """
962
  cluster = lu.cfg.GetClusterInfo()
963
  bep = cluster.FillBE(instance)
964
  hvp = cluster.FillHV(instance)
965
  args = {
966
    'name': instance.name,
967
    'primary_node': instance.primary_node,
968
    'secondary_nodes': instance.secondary_nodes,
969
    'os_type': instance.os,
970
    'status': instance.admin_up,
971
    'memory': bep[constants.BE_MEMORY],
972
    'vcpus': bep[constants.BE_VCPUS],
973
    'nics': _NICListToTuple(lu, instance.nics),
974
    'disk_template': instance.disk_template,
975
    'disks': [(disk.size, disk.mode) for disk in instance.disks],
976
    'bep': bep,
977
    'hvp': hvp,
978
    'hypervisor_name': instance.hypervisor,
979
  }
980
  if override:
981
    args.update(override)
982
  return _BuildInstanceHookEnv(**args) # pylint: disable-msg=W0142
983

    
984

    
985
def _AdjustCandidatePool(lu, exceptions):
986
  """Adjust the candidate pool after node operations.
987

988
  """
989
  mod_list = lu.cfg.MaintainCandidatePool(exceptions)
990
  if mod_list:
991
    lu.LogInfo("Promoted nodes to master candidate role: %s",
992
               utils.CommaJoin(node.name for node in mod_list))
993
    for name in mod_list:
994
      lu.context.ReaddNode(name)
995
  mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
996
  if mc_now > mc_max:
997
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
998
               (mc_now, mc_max))
999

    
1000

    
1001
def _DecideSelfPromotion(lu, exceptions=None):
1002
  """Decide whether I should promote myself as a master candidate.
1003

1004
  """
1005
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
1006
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
1007
  # the new node will increase mc_max with one, so:
1008
  mc_should = min(mc_should + 1, cp_size)
1009
  return mc_now < mc_should
1010

    
1011

    
1012
def _CheckNicsBridgesExist(lu, target_nics, target_node):
1013
  """Check that the brigdes needed by a list of nics exist.
1014

1015
  """
1016
  cluster = lu.cfg.GetClusterInfo()
1017
  paramslist = [cluster.SimpleFillNIC(nic.nicparams) for nic in target_nics]
1018
  brlist = [params[constants.NIC_LINK] for params in paramslist
1019
            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
1020
  if brlist:
1021
    result = lu.rpc.call_bridges_exist(target_node, brlist)
1022
    result.Raise("Error checking bridges on destination node '%s'" %
1023
                 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
1024

    
1025

    
1026
def _CheckInstanceBridgesExist(lu, instance, node=None):
1027
  """Check that the brigdes needed by an instance exist.
1028

1029
  """
1030
  if node is None:
1031
    node = instance.primary_node
1032
  _CheckNicsBridgesExist(lu, instance.nics, node)
1033

    
1034

    
1035
def _CheckOSVariant(os_obj, name):
1036
  """Check whether an OS name conforms to the os variants specification.
1037

1038
  @type os_obj: L{objects.OS}
1039
  @param os_obj: OS object to check
1040
  @type name: string
1041
  @param name: OS name passed by the user, to check for validity
1042

1043
  """
1044
  if not os_obj.supported_variants:
1045
    return
1046
  try:
1047
    variant = name.split("+", 1)[1]
1048
  except IndexError:
1049
    raise errors.OpPrereqError("OS name must include a variant",
1050
                               errors.ECODE_INVAL)
1051

    
1052
  if variant not in os_obj.supported_variants:
1053
    raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
1054

    
1055

    
1056
def _GetNodeInstancesInner(cfg, fn):
1057
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
1058

    
1059

    
1060
def _GetNodeInstances(cfg, node_name):
1061
  """Returns a list of all primary and secondary instances on a node.
1062

1063
  """
1064

    
1065
  return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
1066

    
1067

    
1068
def _GetNodePrimaryInstances(cfg, node_name):
1069
  """Returns primary instances on a node.
1070

1071
  """
1072
  return _GetNodeInstancesInner(cfg,
1073
                                lambda inst: node_name == inst.primary_node)
1074

    
1075

    
1076
def _GetNodeSecondaryInstances(cfg, node_name):
1077
  """Returns secondary instances on a node.
1078

1079
  """
1080
  return _GetNodeInstancesInner(cfg,
1081
                                lambda inst: node_name in inst.secondary_nodes)
1082

    
1083

    
1084
def _GetStorageTypeArgs(cfg, storage_type):
1085
  """Returns the arguments for a storage type.
1086

1087
  """
1088
  # Special case for file storage
1089
  if storage_type == constants.ST_FILE:
1090
    # storage.FileStorage wants a list of storage directories
1091
    return [[cfg.GetFileStorageDir()]]
1092

    
1093
  return []
1094

    
1095

    
1096
def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
1097
  faulty = []
1098

    
1099
  for dev in instance.disks:
1100
    cfg.SetDiskID(dev, node_name)
1101

    
1102
  result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
1103
  result.Raise("Failed to get disk status from node %s" % node_name,
1104
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
1105

    
1106
  for idx, bdev_status in enumerate(result.payload):
1107
    if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
1108
      faulty.append(idx)
1109

    
1110
  return faulty
1111

    
1112

    
1113
class LUPostInitCluster(LogicalUnit):
1114
  """Logical unit for running hooks after cluster initialization.
1115

1116
  """
1117
  HPATH = "cluster-init"
1118
  HTYPE = constants.HTYPE_CLUSTER
1119
  _OP_REQP = []
1120

    
1121
  def BuildHooksEnv(self):
1122
    """Build hooks env.
1123

1124
    """
1125
    env = {"OP_TARGET": self.cfg.GetClusterName()}
1126
    mn = self.cfg.GetMasterNode()
1127
    return env, [], [mn]
1128

    
1129
  def Exec(self, feedback_fn):
1130
    """Nothing to do.
1131

1132
    """
1133
    return True
1134

    
1135

    
1136
class LUDestroyCluster(LogicalUnit):
1137
  """Logical unit for destroying the cluster.
1138

1139
  """
1140
  HPATH = "cluster-destroy"
1141
  HTYPE = constants.HTYPE_CLUSTER
1142
  _OP_REQP = []
1143

    
1144
  def BuildHooksEnv(self):
1145
    """Build hooks env.
1146

1147
    """
1148
    env = {"OP_TARGET": self.cfg.GetClusterName()}
1149
    return env, [], []
1150

    
1151
  def CheckPrereq(self):
1152
    """Check prerequisites.
1153

1154
    This checks whether the cluster is empty.
1155

1156
    Any errors are signaled by raising errors.OpPrereqError.
1157

1158
    """
1159
    master = self.cfg.GetMasterNode()
1160

    
1161
    nodelist = self.cfg.GetNodeList()
1162
    if len(nodelist) != 1 or nodelist[0] != master:
1163
      raise errors.OpPrereqError("There are still %d node(s) in"
1164
                                 " this cluster." % (len(nodelist) - 1),
1165
                                 errors.ECODE_INVAL)
1166
    instancelist = self.cfg.GetInstanceList()
1167
    if instancelist:
1168
      raise errors.OpPrereqError("There are still %d instance(s) in"
1169
                                 " this cluster." % len(instancelist),
1170
                                 errors.ECODE_INVAL)
1171

    
1172
  def Exec(self, feedback_fn):
1173
    """Destroys the cluster.
1174

1175
    """
1176
    master = self.cfg.GetMasterNode()
1177
    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
1178

    
1179
    # Run post hooks on master node before it's removed
1180
    hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
1181
    try:
1182
      hm.RunPhase(constants.HOOKS_PHASE_POST, [master])
1183
    except:
1184
      # pylint: disable-msg=W0702
1185
      self.LogWarning("Errors occurred running hooks on %s" % master)
1186

    
1187
    result = self.rpc.call_node_stop_master(master, False)
1188
    result.Raise("Could not disable the master role")
1189

    
1190
    if modify_ssh_setup:
1191
      priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1192
      utils.CreateBackup(priv_key)
1193
      utils.CreateBackup(pub_key)
1194

    
1195
    return master
1196

    
1197

    
1198
def _VerifyCertificate(filename):
1199
  """Verifies a certificate for LUVerifyCluster.
1200

1201
  @type filename: string
1202
  @param filename: Path to PEM file
1203

1204
  """
1205
  try:
1206
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1207
                                           utils.ReadFile(filename))
1208
  except Exception, err: # pylint: disable-msg=W0703
1209
    return (LUVerifyCluster.ETYPE_ERROR,
1210
            "Failed to load X509 certificate %s: %s" % (filename, err))
1211

    
1212
  (errcode, msg) = \
1213
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1214
                                constants.SSL_CERT_EXPIRATION_ERROR)
1215

    
1216
  if msg:
1217
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1218
  else:
1219
    fnamemsg = None
1220

    
1221
  if errcode is None:
1222
    return (None, fnamemsg)
1223
  elif errcode == utils.CERT_WARNING:
1224
    return (LUVerifyCluster.ETYPE_WARNING, fnamemsg)
1225
  elif errcode == utils.CERT_ERROR:
1226
    return (LUVerifyCluster.ETYPE_ERROR, fnamemsg)
1227

    
1228
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1229

    
1230

    
1231
class LUVerifyCluster(LogicalUnit):
1232
  """Verifies the cluster status.
1233

1234
  """
1235
  HPATH = "cluster-verify"
1236
  HTYPE = constants.HTYPE_CLUSTER
1237
  _OP_REQP = [
1238
    ("skip_checks", _TListOf(_TElemOf(constants.VERIFY_OPTIONAL_CHECKS))),
1239
    ("verbose", _TBool),
1240
    ("error_codes", _TBool),
1241
    ("debug_simulate_errors", _TBool),
1242
    ]
1243
  REQ_BGL = False
1244

    
1245
  TCLUSTER = "cluster"
1246
  TNODE = "node"
1247
  TINSTANCE = "instance"
1248

    
1249
  ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
1250
  ECLUSTERCERT = (TCLUSTER, "ECLUSTERCERT")
1251
  EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
1252
  EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
1253
  EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
1254
  EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
1255
  EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
1256
  EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE")
1257
  ENODEDRBD = (TNODE, "ENODEDRBD")
1258
  ENODEFILECHECK = (TNODE, "ENODEFILECHECK")
1259
  ENODEHOOKS = (TNODE, "ENODEHOOKS")
1260
  ENODEHV = (TNODE, "ENODEHV")
1261
  ENODELVM = (TNODE, "ENODELVM")
1262
  ENODEN1 = (TNODE, "ENODEN1")
1263
  ENODENET = (TNODE, "ENODENET")
1264
  ENODEOS = (TNODE, "ENODEOS")
1265
  ENODEORPHANINSTANCE = (TNODE, "ENODEORPHANINSTANCE")
1266
  ENODEORPHANLV = (TNODE, "ENODEORPHANLV")
1267
  ENODERPC = (TNODE, "ENODERPC")
1268
  ENODESSH = (TNODE, "ENODESSH")
1269
  ENODEVERSION = (TNODE, "ENODEVERSION")
1270
  ENODESETUP = (TNODE, "ENODESETUP")
1271
  ENODETIME = (TNODE, "ENODETIME")
1272

    
1273
  ETYPE_FIELD = "code"
1274
  ETYPE_ERROR = "ERROR"
1275
  ETYPE_WARNING = "WARNING"
1276

    
1277
  class NodeImage(object):
1278
    """A class representing the logical and physical status of a node.
1279

1280
    @type name: string
1281
    @ivar name: the node name to which this object refers
1282
    @ivar volumes: a structure as returned from
1283
        L{ganeti.backend.GetVolumeList} (runtime)
1284
    @ivar instances: a list of running instances (runtime)
1285
    @ivar pinst: list of configured primary instances (config)
1286
    @ivar sinst: list of configured secondary instances (config)
1287
    @ivar sbp: diction of {secondary-node: list of instances} of all peers
1288
        of this node (config)
1289
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1290
    @ivar dfree: free disk, as reported by the node (runtime)
1291
    @ivar offline: the offline status (config)
1292
    @type rpc_fail: boolean
1293
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1294
        not whether the individual keys were correct) (runtime)
1295
    @type lvm_fail: boolean
1296
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1297
    @type hyp_fail: boolean
1298
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1299
    @type ghost: boolean
1300
    @ivar ghost: whether this is a known node or not (config)
1301
    @type os_fail: boolean
1302
    @ivar os_fail: whether the RPC call didn't return valid OS data
1303
    @type oslist: list
1304
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1305

1306
    """
1307
    def __init__(self, offline=False, name=None):
1308
      self.name = name
1309
      self.volumes = {}
1310
      self.instances = []
1311
      self.pinst = []
1312
      self.sinst = []
1313
      self.sbp = {}
1314
      self.mfree = 0
1315
      self.dfree = 0
1316
      self.offline = offline
1317
      self.rpc_fail = False
1318
      self.lvm_fail = False
1319
      self.hyp_fail = False
1320
      self.ghost = False
1321
      self.os_fail = False
1322
      self.oslist = {}
1323

    
1324
  def ExpandNames(self):
1325
    self.needed_locks = {
1326
      locking.LEVEL_NODE: locking.ALL_SET,
1327
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1328
    }
1329
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1330

    
1331
  def _Error(self, ecode, item, msg, *args, **kwargs):
1332
    """Format an error message.
1333

1334
    Based on the opcode's error_codes parameter, either format a
1335
    parseable error code, or a simpler error string.
1336

1337
    This must be called only from Exec and functions called from Exec.
1338

1339
    """
1340
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1341
    itype, etxt = ecode
1342
    # first complete the msg
1343
    if args:
1344
      msg = msg % args
1345
    # then format the whole message
1346
    if self.op.error_codes:
1347
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1348
    else:
1349
      if item:
1350
        item = " " + item
1351
      else:
1352
        item = ""
1353
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1354
    # and finally report it via the feedback_fn
1355
    self._feedback_fn("  - %s" % msg)
1356

    
1357
  def _ErrorIf(self, cond, *args, **kwargs):
1358
    """Log an error message if the passed condition is True.
1359

1360
    """
1361
    cond = bool(cond) or self.op.debug_simulate_errors
1362
    if cond:
1363
      self._Error(*args, **kwargs)
1364
    # do not mark the operation as failed for WARN cases only
1365
    if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
1366
      self.bad = self.bad or cond
1367

    
1368
  def _VerifyNode(self, ninfo, nresult):
1369
    """Run multiple tests against a node.
1370

1371
    Test list:
1372

1373
      - compares ganeti version
1374
      - checks vg existence and size > 20G
1375
      - checks config file checksum
1376
      - checks ssh to other nodes
1377

1378
    @type ninfo: L{objects.Node}
1379
    @param ninfo: the node to check
1380
    @param nresult: the results from the node
1381
    @rtype: boolean
1382
    @return: whether overall this call was successful (and we can expect
1383
         reasonable values in the respose)
1384

1385
    """
1386
    node = ninfo.name
1387
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1388

    
1389
    # main result, nresult should be a non-empty dict
1390
    test = not nresult or not isinstance(nresult, dict)
1391
    _ErrorIf(test, self.ENODERPC, node,
1392
                  "unable to verify node: no data returned")
1393
    if test:
1394
      return False
1395

    
1396
    # compares ganeti version
1397
    local_version = constants.PROTOCOL_VERSION
1398
    remote_version = nresult.get("version", None)
1399
    test = not (remote_version and
1400
                isinstance(remote_version, (list, tuple)) and
1401
                len(remote_version) == 2)
1402
    _ErrorIf(test, self.ENODERPC, node,
1403
             "connection to node returned invalid data")
1404
    if test:
1405
      return False
1406

    
1407
    test = local_version != remote_version[0]
1408
    _ErrorIf(test, self.ENODEVERSION, node,
1409
             "incompatible protocol versions: master %s,"
1410
             " node %s", local_version, remote_version[0])
1411
    if test:
1412
      return False
1413

    
1414
    # node seems compatible, we can actually try to look into its results
1415

    
1416
    # full package version
1417
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1418
                  self.ENODEVERSION, node,
1419
                  "software version mismatch: master %s, node %s",
1420
                  constants.RELEASE_VERSION, remote_version[1],
1421
                  code=self.ETYPE_WARNING)
1422

    
1423
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1424
    if isinstance(hyp_result, dict):
1425
      for hv_name, hv_result in hyp_result.iteritems():
1426
        test = hv_result is not None
1427
        _ErrorIf(test, self.ENODEHV, node,
1428
                 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1429

    
1430

    
1431
    test = nresult.get(constants.NV_NODESETUP,
1432
                           ["Missing NODESETUP results"])
1433
    _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s",
1434
             "; ".join(test))
1435

    
1436
    return True
1437

    
1438
  def _VerifyNodeTime(self, ninfo, nresult,
1439
                      nvinfo_starttime, nvinfo_endtime):
1440
    """Check the node time.
1441

1442
    @type ninfo: L{objects.Node}
1443
    @param ninfo: the node to check
1444
    @param nresult: the remote results for the node
1445
    @param nvinfo_starttime: the start time of the RPC call
1446
    @param nvinfo_endtime: the end time of the RPC call
1447

1448
    """
1449
    node = ninfo.name
1450
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1451

    
1452
    ntime = nresult.get(constants.NV_TIME, None)
1453
    try:
1454
      ntime_merged = utils.MergeTime(ntime)
1455
    except (ValueError, TypeError):
1456
      _ErrorIf(True, self.ENODETIME, node, "Node returned invalid time")
1457
      return
1458

    
1459
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1460
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1461
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1462
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1463
    else:
1464
      ntime_diff = None
1465

    
1466
    _ErrorIf(ntime_diff is not None, self.ENODETIME, node,
1467
             "Node time diverges by at least %s from master node time",
1468
             ntime_diff)
1469

    
1470
  def _VerifyNodeLVM(self, ninfo, nresult, vg_name):
1471
    """Check the node time.
1472

1473
    @type ninfo: L{objects.Node}
1474
    @param ninfo: the node to check
1475
    @param nresult: the remote results for the node
1476
    @param vg_name: the configured VG name
1477

1478
    """
1479
    if vg_name is None:
1480
      return
1481

    
1482
    node = ninfo.name
1483
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1484

    
1485
    # checks vg existence and size > 20G
1486
    vglist = nresult.get(constants.NV_VGLIST, None)
1487
    test = not vglist
1488
    _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups")
1489
    if not test:
1490
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1491
                                            constants.MIN_VG_SIZE)
1492
      _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus)
1493

    
1494
    # check pv names
1495
    pvlist = nresult.get(constants.NV_PVLIST, None)
1496
    test = pvlist is None
1497
    _ErrorIf(test, self.ENODELVM, node, "Can't get PV list from node")
1498
    if not test:
1499
      # check that ':' is not present in PV names, since it's a
1500
      # special character for lvcreate (denotes the range of PEs to
1501
      # use on the PV)
1502
      for _, pvname, owner_vg in pvlist:
1503
        test = ":" in pvname
1504
        _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV"
1505
                 " '%s' of VG '%s'", pvname, owner_vg)
1506

    
1507
  def _VerifyNodeNetwork(self, ninfo, nresult):
1508
    """Check the node time.
1509

1510
    @type ninfo: L{objects.Node}
1511
    @param ninfo: the node to check
1512
    @param nresult: the remote results for the node
1513

1514
    """
1515
    node = ninfo.name
1516
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1517

    
1518
    test = constants.NV_NODELIST not in nresult
1519
    _ErrorIf(test, self.ENODESSH, node,
1520
             "node hasn't returned node ssh connectivity data")
1521
    if not test:
1522
      if nresult[constants.NV_NODELIST]:
1523
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1524
          _ErrorIf(True, self.ENODESSH, node,
1525
                   "ssh communication with node '%s': %s", a_node, a_msg)
1526

    
1527
    test = constants.NV_NODENETTEST not in nresult
1528
    _ErrorIf(test, self.ENODENET, node,
1529
             "node hasn't returned node tcp connectivity data")
1530
    if not test:
1531
      if nresult[constants.NV_NODENETTEST]:
1532
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1533
        for anode in nlist:
1534
          _ErrorIf(True, self.ENODENET, node,
1535
                   "tcp communication with node '%s': %s",
1536
                   anode, nresult[constants.NV_NODENETTEST][anode])
1537

    
1538
    test = constants.NV_MASTERIP not in nresult
1539
    _ErrorIf(test, self.ENODENET, node,
1540
             "node hasn't returned node master IP reachability data")
1541
    if not test:
1542
      if not nresult[constants.NV_MASTERIP]:
1543
        if node == self.master_node:
1544
          msg = "the master node cannot reach the master IP (not configured?)"
1545
        else:
1546
          msg = "cannot reach the master IP"
1547
        _ErrorIf(True, self.ENODENET, node, msg)
1548

    
1549

    
1550
  def _VerifyInstance(self, instance, instanceconfig, node_image):
1551
    """Verify an instance.
1552

1553
    This function checks to see if the required block devices are
1554
    available on the instance's node.
1555

1556
    """
1557
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1558
    node_current = instanceconfig.primary_node
1559

    
1560
    node_vol_should = {}
1561
    instanceconfig.MapLVsByNode(node_vol_should)
1562

    
1563
    for node in node_vol_should:
1564
      n_img = node_image[node]
1565
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1566
        # ignore missing volumes on offline or broken nodes
1567
        continue
1568
      for volume in node_vol_should[node]:
1569
        test = volume not in n_img.volumes
1570
        _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance,
1571
                 "volume %s missing on node %s", volume, node)
1572

    
1573
    if instanceconfig.admin_up:
1574
      pri_img = node_image[node_current]
1575
      test = instance not in pri_img.instances and not pri_img.offline
1576
      _ErrorIf(test, self.EINSTANCEDOWN, instance,
1577
               "instance not running on its primary node %s",
1578
               node_current)
1579

    
1580
    for node, n_img in node_image.items():
1581
      if (not node == node_current):
1582
        test = instance in n_img.instances
1583
        _ErrorIf(test, self.EINSTANCEWRONGNODE, instance,
1584
                 "instance should not run on node %s", node)
1585

    
1586
  def _VerifyOrphanVolumes(self, node_vol_should, node_image):
1587
    """Verify if there are any unknown volumes in the cluster.
1588

1589
    The .os, .swap and backup volumes are ignored. All other volumes are
1590
    reported as unknown.
1591

1592
    """
1593
    for node, n_img in node_image.items():
1594
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1595
        # skip non-healthy nodes
1596
        continue
1597
      for volume in n_img.volumes:
1598
        test = (node not in node_vol_should or
1599
                volume not in node_vol_should[node])
1600
        self._ErrorIf(test, self.ENODEORPHANLV, node,
1601
                      "volume %s is unknown", volume)
1602

    
1603
  def _VerifyOrphanInstances(self, instancelist, node_image):
1604
    """Verify the list of running instances.
1605

1606
    This checks what instances are running but unknown to the cluster.
1607

1608
    """
1609
    for node, n_img in node_image.items():
1610
      for o_inst in n_img.instances:
1611
        test = o_inst not in instancelist
1612
        self._ErrorIf(test, self.ENODEORPHANINSTANCE, node,
1613
                      "instance %s on node %s should not exist", o_inst, node)
1614

    
1615
  def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
1616
    """Verify N+1 Memory Resilience.
1617

1618
    Check that if one single node dies we can still start all the
1619
    instances it was primary for.
1620

1621
    """
1622
    for node, n_img in node_image.items():
1623
      # This code checks that every node which is now listed as
1624
      # secondary has enough memory to host all instances it is
1625
      # supposed to should a single other node in the cluster fail.
1626
      # FIXME: not ready for failover to an arbitrary node
1627
      # FIXME: does not support file-backed instances
1628
      # WARNING: we currently take into account down instances as well
1629
      # as up ones, considering that even if they're down someone
1630
      # might want to start them even in the event of a node failure.
1631
      for prinode, instances in n_img.sbp.items():
1632
        needed_mem = 0
1633
        for instance in instances:
1634
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
1635
          if bep[constants.BE_AUTO_BALANCE]:
1636
            needed_mem += bep[constants.BE_MEMORY]
1637
        test = n_img.mfree < needed_mem
1638
        self._ErrorIf(test, self.ENODEN1, node,
1639
                      "not enough memory on to accommodate"
1640
                      " failovers should peer node %s fail", prinode)
1641

    
1642
  def _VerifyNodeFiles(self, ninfo, nresult, file_list, local_cksum,
1643
                       master_files):
1644
    """Verifies and computes the node required file checksums.
1645

1646
    @type ninfo: L{objects.Node}
1647
    @param ninfo: the node to check
1648
    @param nresult: the remote results for the node
1649
    @param file_list: required list of files
1650
    @param local_cksum: dictionary of local files and their checksums
1651
    @param master_files: list of files that only masters should have
1652

1653
    """
1654
    node = ninfo.name
1655
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1656

    
1657
    remote_cksum = nresult.get(constants.NV_FILELIST, None)
1658
    test = not isinstance(remote_cksum, dict)
1659
    _ErrorIf(test, self.ENODEFILECHECK, node,
1660
             "node hasn't returned file checksum data")
1661
    if test:
1662
      return
1663

    
1664
    for file_name in file_list:
1665
      node_is_mc = ninfo.master_candidate
1666
      must_have = (file_name not in master_files) or node_is_mc
1667
      # missing
1668
      test1 = file_name not in remote_cksum
1669
      # invalid checksum
1670
      test2 = not test1 and remote_cksum[file_name] != local_cksum[file_name]
1671
      # existing and good
1672
      test3 = not test1 and remote_cksum[file_name] == local_cksum[file_name]
1673
      _ErrorIf(test1 and must_have, self.ENODEFILECHECK, node,
1674
               "file '%s' missing", file_name)
1675
      _ErrorIf(test2 and must_have, self.ENODEFILECHECK, node,
1676
               "file '%s' has wrong checksum", file_name)
1677
      # not candidate and this is not a must-have file
1678
      _ErrorIf(test2 and not must_have, self.ENODEFILECHECK, node,
1679
               "file '%s' should not exist on non master"
1680
               " candidates (and the file is outdated)", file_name)
1681
      # all good, except non-master/non-must have combination
1682
      _ErrorIf(test3 and not must_have, self.ENODEFILECHECK, node,
1683
               "file '%s' should not exist"
1684
               " on non master candidates", file_name)
1685

    
1686
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_map):
1687
    """Verifies and the node DRBD status.
1688

1689
    @type ninfo: L{objects.Node}
1690
    @param ninfo: the node to check
1691
    @param nresult: the remote results for the node
1692
    @param instanceinfo: the dict of instances
1693
    @param drbd_map: the DRBD map as returned by
1694
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
1695

1696
    """
1697
    node = ninfo.name
1698
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1699

    
1700
    # compute the DRBD minors
1701
    node_drbd = {}
1702
    for minor, instance in drbd_map[node].items():
1703
      test = instance not in instanceinfo
1704
      _ErrorIf(test, self.ECLUSTERCFG, None,
1705
               "ghost instance '%s' in temporary DRBD map", instance)
1706
        # ghost instance should not be running, but otherwise we
1707
        # don't give double warnings (both ghost instance and
1708
        # unallocated minor in use)
1709
      if test:
1710
        node_drbd[minor] = (instance, False)
1711
      else:
1712
        instance = instanceinfo[instance]
1713
        node_drbd[minor] = (instance.name, instance.admin_up)
1714

    
1715
    # and now check them
1716
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
1717
    test = not isinstance(used_minors, (tuple, list))
1718
    _ErrorIf(test, self.ENODEDRBD, node,
1719
             "cannot parse drbd status file: %s", str(used_minors))
1720
    if test:
1721
      # we cannot check drbd status
1722
      return
1723

    
1724
    for minor, (iname, must_exist) in node_drbd.items():
1725
      test = minor not in used_minors and must_exist
1726
      _ErrorIf(test, self.ENODEDRBD, node,
1727
               "drbd minor %d of instance %s is not active", minor, iname)
1728
    for minor in used_minors:
1729
      test = minor not in node_drbd
1730
      _ErrorIf(test, self.ENODEDRBD, node,
1731
               "unallocated drbd minor %d is in use", minor)
1732

    
1733
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
1734
    """Builds the node OS structures.
1735

1736
    @type ninfo: L{objects.Node}
1737
    @param ninfo: the node to check
1738
    @param nresult: the remote results for the node
1739
    @param nimg: the node image object
1740

1741
    """
1742
    node = ninfo.name
1743
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1744

    
1745
    remote_os = nresult.get(constants.NV_OSLIST, None)
1746
    test = (not isinstance(remote_os, list) or
1747
            not compat.all(remote_os,
1748
                           lambda v: isinstance(v, list) and len(v) == 7))
1749

    
1750
    _ErrorIf(test, self.ENODEOS, node,
1751
             "node hasn't returned valid OS data")
1752

    
1753
    nimg.os_fail = test
1754

    
1755
    if test:
1756
      return
1757

    
1758
    os_dict = {}
1759

    
1760
    for (name, os_path, status, diagnose,
1761
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
1762

    
1763
      if name not in os_dict:
1764
        os_dict[name] = []
1765

    
1766
      # parameters is a list of lists instead of list of tuples due to
1767
      # JSON lacking a real tuple type, fix it:
1768
      parameters = [tuple(v) for v in parameters]
1769
      os_dict[name].append((os_path, status, diagnose,
1770
                            set(variants), set(parameters), set(api_ver)))
1771

    
1772
    nimg.oslist = os_dict
1773

    
1774
  def _VerifyNodeOS(self, ninfo, nimg, base):
1775
    """Verifies the node OS list.
1776

1777
    @type ninfo: L{objects.Node}
1778
    @param ninfo: the node to check
1779
    @param nimg: the node image object
1780
    @param base: the 'template' node we match against (e.g. from the master)
1781

1782
    """
1783
    node = ninfo.name
1784
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1785

    
1786
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
1787

    
1788
    for os_name, os_data in nimg.oslist.items():
1789
      assert os_data, "Empty OS status for OS %s?!" % os_name
1790
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
1791
      _ErrorIf(not f_status, self.ENODEOS, node,
1792
               "Invalid OS %s (located at %s): %s", os_name, f_path, f_diag)
1793
      _ErrorIf(len(os_data) > 1, self.ENODEOS, node,
1794
               "OS '%s' has multiple entries (first one shadows the rest): %s",
1795
               os_name, utils.CommaJoin([v[0] for v in os_data]))
1796
      # this will catched in backend too
1797
      _ErrorIf(compat.any(f_api, lambda v: v >= constants.OS_API_V15)
1798
               and not f_var, self.ENODEOS, node,
1799
               "OS %s with API at least %d does not declare any variant",
1800
               os_name, constants.OS_API_V15)
1801
      # comparisons with the 'base' image
1802
      test = os_name not in base.oslist
1803
      _ErrorIf(test, self.ENODEOS, node,
1804
               "Extra OS %s not present on reference node (%s)",
1805
               os_name, base.name)
1806
      if test:
1807
        continue
1808
      assert base.oslist[os_name], "Base node has empty OS status?"
1809
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
1810
      if not b_status:
1811
        # base OS is invalid, skipping
1812
        continue
1813
      for kind, a, b in [("API version", f_api, b_api),
1814
                         ("variants list", f_var, b_var),
1815
                         ("parameters", f_param, b_param)]:
1816
        _ErrorIf(a != b, self.ENODEOS, node,
1817
                 "OS %s %s differs from reference node %s: %s vs. %s",
1818
                 kind, os_name, base.name,
1819
                 utils.CommaJoin(a), utils.CommaJoin(a))
1820

    
1821
    # check any missing OSes
1822
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
1823
    _ErrorIf(missing, self.ENODEOS, node,
1824
             "OSes present on reference node %s but missing on this node: %s",
1825
             base.name, utils.CommaJoin(missing))
1826

    
1827
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
1828
    """Verifies and updates the node volume data.
1829

1830
    This function will update a L{NodeImage}'s internal structures
1831
    with data from the remote call.
1832

1833
    @type ninfo: L{objects.Node}
1834
    @param ninfo: the node to check
1835
    @param nresult: the remote results for the node
1836
    @param nimg: the node image object
1837
    @param vg_name: the configured VG name
1838

1839
    """
1840
    node = ninfo.name
1841
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1842

    
1843
    nimg.lvm_fail = True
1844
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1845
    if vg_name is None:
1846
      pass
1847
    elif isinstance(lvdata, basestring):
1848
      _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s",
1849
               utils.SafeEncode(lvdata))
1850
    elif not isinstance(lvdata, dict):
1851
      _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)")
1852
    else:
1853
      nimg.volumes = lvdata
1854
      nimg.lvm_fail = False
1855

    
1856
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
1857
    """Verifies and updates the node instance list.
1858

1859
    If the listing was successful, then updates this node's instance
1860
    list. Otherwise, it marks the RPC call as failed for the instance
1861
    list key.
1862

1863
    @type ninfo: L{objects.Node}
1864
    @param ninfo: the node to check
1865
    @param nresult: the remote results for the node
1866
    @param nimg: the node image object
1867

1868
    """
1869
    idata = nresult.get(constants.NV_INSTANCELIST, None)
1870
    test = not isinstance(idata, list)
1871
    self._ErrorIf(test, self.ENODEHV, ninfo.name, "rpc call to node failed"
1872
                  " (instancelist): %s", utils.SafeEncode(str(idata)))
1873
    if test:
1874
      nimg.hyp_fail = True
1875
    else:
1876
      nimg.instances = idata
1877

    
1878
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
1879
    """Verifies and computes a node information map
1880

1881
    @type ninfo: L{objects.Node}
1882
    @param ninfo: the node to check
1883
    @param nresult: the remote results for the node
1884
    @param nimg: the node image object
1885
    @param vg_name: the configured VG name
1886

1887
    """
1888
    node = ninfo.name
1889
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1890

    
1891
    # try to read free memory (from the hypervisor)
1892
    hv_info = nresult.get(constants.NV_HVINFO, None)
1893
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
1894
    _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)")
1895
    if not test:
1896
      try:
1897
        nimg.mfree = int(hv_info["memory_free"])
1898
      except (ValueError, TypeError):
1899
        _ErrorIf(True, self.ENODERPC, node,
1900
                 "node returned invalid nodeinfo, check hypervisor")
1901

    
1902
    # FIXME: devise a free space model for file based instances as well
1903
    if vg_name is not None:
1904
      test = (constants.NV_VGLIST not in nresult or
1905
              vg_name not in nresult[constants.NV_VGLIST])
1906
      _ErrorIf(test, self.ENODELVM, node,
1907
               "node didn't return data for the volume group '%s'"
1908
               " - it is either missing or broken", vg_name)
1909
      if not test:
1910
        try:
1911
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
1912
        except (ValueError, TypeError):
1913
          _ErrorIf(True, self.ENODERPC, node,
1914
                   "node returned invalid LVM info, check LVM status")
1915

    
1916
  def BuildHooksEnv(self):
1917
    """Build hooks env.
1918

1919
    Cluster-Verify hooks just ran in the post phase and their failure makes
1920
    the output be logged in the verify output and the verification to fail.
1921

1922
    """
1923
    all_nodes = self.cfg.GetNodeList()
1924
    env = {
1925
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
1926
      }
1927
    for node in self.cfg.GetAllNodesInfo().values():
1928
      env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
1929

    
1930
    return env, [], all_nodes
1931

    
1932
  def Exec(self, feedback_fn):
1933
    """Verify integrity of cluster, performing various test on nodes.
1934

1935
    """
1936
    self.bad = False
1937
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1938
    verbose = self.op.verbose
1939
    self._feedback_fn = feedback_fn
1940
    feedback_fn("* Verifying global settings")
1941
    for msg in self.cfg.VerifyConfig():
1942
      _ErrorIf(True, self.ECLUSTERCFG, None, msg)
1943

    
1944
    # Check the cluster certificates
1945
    for cert_filename in constants.ALL_CERT_FILES:
1946
      (errcode, msg) = _VerifyCertificate(cert_filename)
1947
      _ErrorIf(errcode, self.ECLUSTERCERT, None, msg, code=errcode)
1948

    
1949
    vg_name = self.cfg.GetVGName()
1950
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
1951
    cluster = self.cfg.GetClusterInfo()
1952
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
1953
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
1954
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
1955
    instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
1956
                        for iname in instancelist)
1957
    i_non_redundant = [] # Non redundant instances
1958
    i_non_a_balanced = [] # Non auto-balanced instances
1959
    n_offline = 0 # Count of offline nodes
1960
    n_drained = 0 # Count of nodes being drained
1961
    node_vol_should = {}
1962

    
1963
    # FIXME: verify OS list
1964
    # do local checksums
1965
    master_files = [constants.CLUSTER_CONF_FILE]
1966
    master_node = self.master_node = self.cfg.GetMasterNode()
1967
    master_ip = self.cfg.GetMasterIP()
1968

    
1969
    file_names = ssconf.SimpleStore().GetFileList()
1970
    file_names.extend(constants.ALL_CERT_FILES)
1971
    file_names.extend(master_files)
1972
    if cluster.modify_etc_hosts:
1973
      file_names.append(constants.ETC_HOSTS)
1974

    
1975
    local_checksums = utils.FingerprintFiles(file_names)
1976

    
1977
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1978
    node_verify_param = {
1979
      constants.NV_FILELIST: file_names,
1980
      constants.NV_NODELIST: [node.name for node in nodeinfo
1981
                              if not node.offline],
1982
      constants.NV_HYPERVISOR: hypervisors,
1983
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1984
                                  node.secondary_ip) for node in nodeinfo
1985
                                 if not node.offline],
1986
      constants.NV_INSTANCELIST: hypervisors,
1987
      constants.NV_VERSION: None,
1988
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1989
      constants.NV_NODESETUP: None,
1990
      constants.NV_TIME: None,
1991
      constants.NV_MASTERIP: (master_node, master_ip),
1992
      constants.NV_OSLIST: None,
1993
      }
1994

    
1995
    if vg_name is not None:
1996
      node_verify_param[constants.NV_VGLIST] = None
1997
      node_verify_param[constants.NV_LVLIST] = vg_name
1998
      node_verify_param[constants.NV_PVLIST] = [vg_name]
1999
      node_verify_param[constants.NV_DRBDLIST] = None
2000

    
2001
    # Build our expected cluster state
2002
    node_image = dict((node.name, self.NodeImage(offline=node.offline,
2003
                                                 name=node.name))
2004
                      for node in nodeinfo)
2005

    
2006
    for instance in instancelist:
2007
      inst_config = instanceinfo[instance]
2008

    
2009
      for nname in inst_config.all_nodes:
2010
        if nname not in node_image:
2011
          # ghost node
2012
          gnode = self.NodeImage(name=nname)
2013
          gnode.ghost = True
2014
          node_image[nname] = gnode
2015

    
2016
      inst_config.MapLVsByNode(node_vol_should)
2017

    
2018
      pnode = inst_config.primary_node
2019
      node_image[pnode].pinst.append(instance)
2020

    
2021
      for snode in inst_config.secondary_nodes:
2022
        nimg = node_image[snode]
2023
        nimg.sinst.append(instance)
2024
        if pnode not in nimg.sbp:
2025
          nimg.sbp[pnode] = []
2026
        nimg.sbp[pnode].append(instance)
2027

    
2028
    # At this point, we have the in-memory data structures complete,
2029
    # except for the runtime information, which we'll gather next
2030

    
2031
    # Due to the way our RPC system works, exact response times cannot be
2032
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2033
    # time before and after executing the request, we can at least have a time
2034
    # window.
2035
    nvinfo_starttime = time.time()
2036
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
2037
                                           self.cfg.GetClusterName())
2038
    nvinfo_endtime = time.time()
2039

    
2040
    all_drbd_map = self.cfg.ComputeDRBDMap()
2041

    
2042
    feedback_fn("* Verifying node status")
2043

    
2044
    refos_img = None
2045

    
2046
    for node_i in nodeinfo:
2047
      node = node_i.name
2048
      nimg = node_image[node]
2049

    
2050
      if node_i.offline:
2051
        if verbose:
2052
          feedback_fn("* Skipping offline node %s" % (node,))
2053
        n_offline += 1
2054
        continue
2055

    
2056
      if node == master_node:
2057
        ntype = "master"
2058
      elif node_i.master_candidate:
2059
        ntype = "master candidate"
2060
      elif node_i.drained:
2061
        ntype = "drained"
2062
        n_drained += 1
2063
      else:
2064
        ntype = "regular"
2065
      if verbose:
2066
        feedback_fn("* Verifying node %s (%s)" % (node, ntype))
2067

    
2068
      msg = all_nvinfo[node].fail_msg
2069
      _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg)
2070
      if msg:
2071
        nimg.rpc_fail = True
2072
        continue
2073

    
2074
      nresult = all_nvinfo[node].payload
2075

    
2076
      nimg.call_ok = self._VerifyNode(node_i, nresult)
2077
      self._VerifyNodeNetwork(node_i, nresult)
2078
      self._VerifyNodeLVM(node_i, nresult, vg_name)
2079
      self._VerifyNodeFiles(node_i, nresult, file_names, local_checksums,
2080
                            master_files)
2081
      self._VerifyNodeDrbd(node_i, nresult, instanceinfo, all_drbd_map)
2082
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2083

    
2084
      self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2085
      self._UpdateNodeInstances(node_i, nresult, nimg)
2086
      self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2087
      self._UpdateNodeOS(node_i, nresult, nimg)
2088
      if not nimg.os_fail:
2089
        if refos_img is None:
2090
          refos_img = nimg
2091
        self._VerifyNodeOS(node_i, nimg, refos_img)
2092

    
2093
    feedback_fn("* Verifying instance status")
2094
    for instance in instancelist:
2095
      if verbose:
2096
        feedback_fn("* Verifying instance %s" % instance)
2097
      inst_config = instanceinfo[instance]
2098
      self._VerifyInstance(instance, inst_config, node_image)
2099
      inst_nodes_offline = []
2100

    
2101
      pnode = inst_config.primary_node
2102
      pnode_img = node_image[pnode]
2103
      _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2104
               self.ENODERPC, pnode, "instance %s, connection to"
2105
               " primary node failed", instance)
2106

    
2107
      if pnode_img.offline:
2108
        inst_nodes_offline.append(pnode)
2109

    
2110
      # If the instance is non-redundant we cannot survive losing its primary
2111
      # node, so we are not N+1 compliant. On the other hand we have no disk
2112
      # templates with more than one secondary so that situation is not well
2113
      # supported either.
2114
      # FIXME: does not support file-backed instances
2115
      if not inst_config.secondary_nodes:
2116
        i_non_redundant.append(instance)
2117
      _ErrorIf(len(inst_config.secondary_nodes) > 1, self.EINSTANCELAYOUT,
2118
               instance, "instance has multiple secondary nodes: %s",
2119
               utils.CommaJoin(inst_config.secondary_nodes),
2120
               code=self.ETYPE_WARNING)
2121

    
2122
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
2123
        i_non_a_balanced.append(instance)
2124

    
2125
      for snode in inst_config.secondary_nodes:
2126
        s_img = node_image[snode]
2127
        _ErrorIf(s_img.rpc_fail and not s_img.offline, self.ENODERPC, snode,
2128
                 "instance %s, connection to secondary node failed", instance)
2129

    
2130
        if s_img.offline:
2131
          inst_nodes_offline.append(snode)
2132

    
2133
      # warn that the instance lives on offline nodes
2134
      _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance,
2135
               "instance lives on offline node(s) %s",
2136
               utils.CommaJoin(inst_nodes_offline))
2137
      # ... or ghost nodes
2138
      for node in inst_config.all_nodes:
2139
        _ErrorIf(node_image[node].ghost, self.EINSTANCEBADNODE, instance,
2140
                 "instance lives on ghost node %s", node)
2141

    
2142
    feedback_fn("* Verifying orphan volumes")
2143
    self._VerifyOrphanVolumes(node_vol_should, node_image)
2144

    
2145
    feedback_fn("* Verifying orphan instances")
2146
    self._VerifyOrphanInstances(instancelist, node_image)
2147

    
2148
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
2149
      feedback_fn("* Verifying N+1 Memory redundancy")
2150
      self._VerifyNPlusOneMemory(node_image, instanceinfo)
2151

    
2152
    feedback_fn("* Other Notes")
2153
    if i_non_redundant:
2154
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
2155
                  % len(i_non_redundant))
2156

    
2157
    if i_non_a_balanced:
2158
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
2159
                  % len(i_non_a_balanced))
2160

    
2161
    if n_offline:
2162
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
2163

    
2164
    if n_drained:
2165
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
2166

    
2167
    return not self.bad
2168

    
2169
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
2170
    """Analyze the post-hooks' result
2171

2172
    This method analyses the hook result, handles it, and sends some
2173
    nicely-formatted feedback back to the user.
2174

2175
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
2176
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
2177
    @param hooks_results: the results of the multi-node hooks rpc call
2178
    @param feedback_fn: function used send feedback back to the caller
2179
    @param lu_result: previous Exec result
2180
    @return: the new Exec result, based on the previous result
2181
        and hook results
2182

2183
    """
2184
    # We only really run POST phase hooks, and are only interested in
2185
    # their results
2186
    if phase == constants.HOOKS_PHASE_POST:
2187
      # Used to change hooks' output to proper indentation
2188
      indent_re = re.compile('^', re.M)
2189
      feedback_fn("* Hooks Results")
2190
      assert hooks_results, "invalid result from hooks"
2191

    
2192
      for node_name in hooks_results:
2193
        res = hooks_results[node_name]
2194
        msg = res.fail_msg
2195
        test = msg and not res.offline
2196
        self._ErrorIf(test, self.ENODEHOOKS, node_name,
2197
                      "Communication failure in hooks execution: %s", msg)
2198
        if res.offline or msg:
2199
          # No need to investigate payload if node is offline or gave an error.
2200
          # override manually lu_result here as _ErrorIf only
2201
          # overrides self.bad
2202
          lu_result = 1
2203
          continue
2204
        for script, hkr, output in res.payload:
2205
          test = hkr == constants.HKR_FAIL
2206
          self._ErrorIf(test, self.ENODEHOOKS, node_name,
2207
                        "Script %s failed, output:", script)
2208
          if test:
2209
            output = indent_re.sub('      ', output)
2210
            feedback_fn("%s" % output)
2211
            lu_result = 0
2212

    
2213
      return lu_result
2214

    
2215

    
2216
class LUVerifyDisks(NoHooksLU):
2217
  """Verifies the cluster disks status.
2218

2219
  """
2220
  _OP_REQP = []
2221
  REQ_BGL = False
2222

    
2223
  def ExpandNames(self):
2224
    self.needed_locks = {
2225
      locking.LEVEL_NODE: locking.ALL_SET,
2226
      locking.LEVEL_INSTANCE: locking.ALL_SET,
2227
    }
2228
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
2229

    
2230
  def Exec(self, feedback_fn):
2231
    """Verify integrity of cluster disks.
2232

2233
    @rtype: tuple of three items
2234
    @return: a tuple of (dict of node-to-node_error, list of instances
2235
        which need activate-disks, dict of instance: (node, volume) for
2236
        missing volumes
2237

2238
    """
2239
    result = res_nodes, res_instances, res_missing = {}, [], {}
2240

    
2241
    vg_name = self.cfg.GetVGName()
2242
    nodes = utils.NiceSort(self.cfg.GetNodeList())
2243
    instances = [self.cfg.GetInstanceInfo(name)
2244
                 for name in self.cfg.GetInstanceList()]
2245

    
2246
    nv_dict = {}
2247
    for inst in instances:
2248
      inst_lvs = {}
2249
      if (not inst.admin_up or
2250
          inst.disk_template not in constants.DTS_NET_MIRROR):
2251
        continue
2252
      inst.MapLVsByNode(inst_lvs)
2253
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
2254
      for node, vol_list in inst_lvs.iteritems():
2255
        for vol in vol_list:
2256
          nv_dict[(node, vol)] = inst
2257

    
2258
    if not nv_dict:
2259
      return result
2260

    
2261
    node_lvs = self.rpc.call_lv_list(nodes, vg_name)
2262

    
2263
    for node in nodes:
2264
      # node_volume
2265
      node_res = node_lvs[node]
2266
      if node_res.offline:
2267
        continue
2268
      msg = node_res.fail_msg
2269
      if msg:
2270
        logging.warning("Error enumerating LVs on node %s: %s", node, msg)
2271
        res_nodes[node] = msg
2272
        continue
2273

    
2274
      lvs = node_res.payload
2275
      for lv_name, (_, _, lv_online) in lvs.items():
2276
        inst = nv_dict.pop((node, lv_name), None)
2277
        if (not lv_online and inst is not None
2278
            and inst.name not in res_instances):
2279
          res_instances.append(inst.name)
2280

    
2281
    # any leftover items in nv_dict are missing LVs, let's arrange the
2282
    # data better
2283
    for key, inst in nv_dict.iteritems():
2284
      if inst.name not in res_missing:
2285
        res_missing[inst.name] = []
2286
      res_missing[inst.name].append(key)
2287

    
2288
    return result
2289

    
2290

    
2291
class LURepairDiskSizes(NoHooksLU):
2292
  """Verifies the cluster disks sizes.
2293

2294
  """
2295
  _OP_REQP = [("instances", _TListOf(_TNonEmptyString))]
2296
  REQ_BGL = False
2297

    
2298
  def ExpandNames(self):
2299
    if self.op.instances:
2300
      self.wanted_names = []
2301
      for name in self.op.instances:
2302
        full_name = _ExpandInstanceName(self.cfg, name)
2303
        self.wanted_names.append(full_name)
2304
      self.needed_locks = {
2305
        locking.LEVEL_NODE: [],
2306
        locking.LEVEL_INSTANCE: self.wanted_names,
2307
        }
2308
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2309
    else:
2310
      self.wanted_names = None
2311
      self.needed_locks = {
2312
        locking.LEVEL_NODE: locking.ALL_SET,
2313
        locking.LEVEL_INSTANCE: locking.ALL_SET,
2314
        }
2315
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
2316

    
2317
  def DeclareLocks(self, level):
2318
    if level == locking.LEVEL_NODE and self.wanted_names is not None:
2319
      self._LockInstancesNodes(primary_only=True)
2320

    
2321
  def CheckPrereq(self):
2322
    """Check prerequisites.
2323

2324
    This only checks the optional instance list against the existing names.
2325

2326
    """
2327
    if self.wanted_names is None:
2328
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2329

    
2330
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
2331
                             in self.wanted_names]
2332

    
2333
  def _EnsureChildSizes(self, disk):
2334
    """Ensure children of the disk have the needed disk size.
2335

2336
    This is valid mainly for DRBD8 and fixes an issue where the
2337
    children have smaller disk size.
2338

2339
    @param disk: an L{ganeti.objects.Disk} object
2340

2341
    """
2342
    if disk.dev_type == constants.LD_DRBD8:
2343
      assert disk.children, "Empty children for DRBD8?"
2344
      fchild = disk.children[0]
2345
      mismatch = fchild.size < disk.size
2346
      if mismatch:
2347
        self.LogInfo("Child disk has size %d, parent %d, fixing",
2348
                     fchild.size, disk.size)
2349
        fchild.size = disk.size
2350

    
2351
      # and we recurse on this child only, not on the metadev
2352
      return self._EnsureChildSizes(fchild) or mismatch
2353
    else:
2354
      return False
2355

    
2356
  def Exec(self, feedback_fn):
2357
    """Verify the size of cluster disks.
2358

2359
    """
2360
    # TODO: check child disks too
2361
    # TODO: check differences in size between primary/secondary nodes
2362
    per_node_disks = {}
2363
    for instance in self.wanted_instances:
2364
      pnode = instance.primary_node
2365
      if pnode not in per_node_disks:
2366
        per_node_disks[pnode] = []
2367
      for idx, disk in enumerate(instance.disks):
2368
        per_node_disks[pnode].append((instance, idx, disk))
2369

    
2370
    changed = []
2371
    for node, dskl in per_node_disks.items():
2372
      newl = [v[2].Copy() for v in dskl]
2373
      for dsk in newl:
2374
        self.cfg.SetDiskID(dsk, node)
2375
      result = self.rpc.call_blockdev_getsizes(node, newl)
2376
      if result.fail_msg:
2377
        self.LogWarning("Failure in blockdev_getsizes call to node"
2378
                        " %s, ignoring", node)
2379
        continue
2380
      if len(result.data) != len(dskl):
2381
        self.LogWarning("Invalid result from node %s, ignoring node results",
2382
                        node)
2383
        continue
2384
      for ((instance, idx, disk), size) in zip(dskl, result.data):
2385
        if size is None:
2386
          self.LogWarning("Disk %d of instance %s did not return size"
2387
                          " information, ignoring", idx, instance.name)
2388
          continue
2389
        if not isinstance(size, (int, long)):
2390
          self.LogWarning("Disk %d of instance %s did not return valid"
2391
                          " size information, ignoring", idx, instance.name)
2392
          continue
2393
        size = size >> 20
2394
        if size != disk.size:
2395
          self.LogInfo("Disk %d of instance %s has mismatched size,"
2396
                       " correcting: recorded %d, actual %d", idx,
2397
                       instance.name, disk.size, size)
2398
          disk.size = size
2399
          self.cfg.Update(instance, feedback_fn)
2400
          changed.append((instance.name, idx, size))
2401
        if self._EnsureChildSizes(disk):
2402
          self.cfg.Update(instance, feedback_fn)
2403
          changed.append((instance.name, idx, disk.size))
2404
    return changed
2405

    
2406

    
2407
class LURenameCluster(LogicalUnit):
2408
  """Rename the cluster.
2409

2410
  """
2411
  HPATH = "cluster-rename"
2412
  HTYPE = constants.HTYPE_CLUSTER
2413
  _OP_REQP = [("name", _TNonEmptyString)]
2414

    
2415
  def BuildHooksEnv(self):
2416
    """Build hooks env.
2417

2418
    """
2419
    env = {
2420
      "OP_TARGET": self.cfg.GetClusterName(),
2421
      "NEW_NAME": self.op.name,
2422
      }
2423
    mn = self.cfg.GetMasterNode()
2424
    all_nodes = self.cfg.GetNodeList()
2425
    return env, [mn], all_nodes
2426

    
2427
  def CheckPrereq(self):
2428
    """Verify that the passed name is a valid one.
2429

2430
    """
2431
    hostname = utils.GetHostInfo(self.op.name)
2432

    
2433
    new_name = hostname.name
2434
    self.ip = new_ip = hostname.ip
2435
    old_name = self.cfg.GetClusterName()
2436
    old_ip = self.cfg.GetMasterIP()
2437
    if new_name == old_name and new_ip == old_ip:
2438
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
2439
                                 " cluster has changed",
2440
                                 errors.ECODE_INVAL)
2441
    if new_ip != old_ip:
2442
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
2443
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
2444
                                   " reachable on the network. Aborting." %
2445
                                   new_ip, errors.ECODE_NOTUNIQUE)
2446

    
2447
    self.op.name = new_name
2448

    
2449
  def Exec(self, feedback_fn):
2450
    """Rename the cluster.
2451

2452
    """
2453
    clustername = self.op.name
2454
    ip = self.ip
2455

    
2456
    # shutdown the master IP
2457
    master = self.cfg.GetMasterNode()
2458
    result = self.rpc.call_node_stop_master(master, False)
2459
    result.Raise("Could not disable the master role")
2460

    
2461
    try:
2462
      cluster = self.cfg.GetClusterInfo()
2463
      cluster.cluster_name = clustername
2464
      cluster.master_ip = ip
2465
      self.cfg.Update(cluster, feedback_fn)
2466

    
2467
      # update the known hosts file
2468
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
2469
      node_list = self.cfg.GetNodeList()
2470
      try:
2471
        node_list.remove(master)
2472
      except ValueError:
2473
        pass
2474
      result = self.rpc.call_upload_file(node_list,
2475
                                         constants.SSH_KNOWN_HOSTS_FILE)
2476
      for to_node, to_result in result.iteritems():
2477
        msg = to_result.fail_msg
2478
        if msg:
2479
          msg = ("Copy of file %s to node %s failed: %s" %
2480
                 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
2481
          self.proc.LogWarning(msg)
2482

    
2483
    finally:
2484
      result = self.rpc.call_node_start_master(master, False, False)
2485
      msg = result.fail_msg
2486
      if msg:
2487
        self.LogWarning("Could not re-enable the master role on"
2488
                        " the master, please restart manually: %s", msg)
2489

    
2490

    
2491
def _RecursiveCheckIfLVMBased(disk):
2492
  """Check if the given disk or its children are lvm-based.
2493

2494
  @type disk: L{objects.Disk}
2495
  @param disk: the disk to check
2496
  @rtype: boolean
2497
  @return: boolean indicating whether a LD_LV dev_type was found or not
2498

2499
  """
2500
  if disk.children:
2501
    for chdisk in disk.children:
2502
      if _RecursiveCheckIfLVMBased(chdisk):
2503
        return True
2504
  return disk.dev_type == constants.LD_LV
2505

    
2506

    
2507
class LUSetClusterParams(LogicalUnit):
2508
  """Change the parameters of the cluster.
2509

2510
  """
2511
  HPATH = "cluster-modify"
2512
  HTYPE = constants.HTYPE_CLUSTER
2513
  _OP_REQP = [
2514
    ("hvparams", _TOr(_TDictOf(_TNonEmptyString, _TDict), _TNone)),
2515
    ("os_hvp", _TOr(_TDictOf(_TNonEmptyString, _TDict), _TNone)),
2516
    ("osparams", _TOr(_TDictOf(_TNonEmptyString, _TDict), _TNone)),
2517
    ("enabled_hypervisors",
2518
     _TOr(_TAnd(_TListOf(_TElemOf(constants.HYPER_TYPES)), _TTrue), _TNone)),
2519
    ]
2520
  _OP_DEFS = [
2521
    ("candidate_pool_size", None),
2522
    ("uid_pool", None),
2523
    ("add_uids", None),
2524
    ("remove_uids", None),
2525
    ("hvparams", None),
2526
    ("ov_hvp", None),
2527
    ]
2528
  REQ_BGL = False
2529

    
2530
  def CheckArguments(self):
2531
    """Check parameters
2532

2533
    """
2534
    if self.op.candidate_pool_size is not None:
2535
      try:
2536
        self.op.candidate_pool_size = int(self.op.candidate_pool_size)
2537
      except (ValueError, TypeError), err:
2538
        raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
2539
                                   str(err), errors.ECODE_INVAL)
2540
      if self.op.candidate_pool_size < 1:
2541
        raise errors.OpPrereqError("At least one master candidate needed",
2542
                                   errors.ECODE_INVAL)
2543

    
2544
    _CheckBooleanOpField(self.op, "maintain_node_health")
2545

    
2546
    if self.op.uid_pool:
2547
      uidpool.CheckUidPool(self.op.uid_pool)
2548

    
2549
    if self.op.add_uids:
2550
      uidpool.CheckUidPool(self.op.add_uids)
2551

    
2552
    if self.op.remove_uids:
2553
      uidpool.CheckUidPool(self.op.remove_uids)
2554

    
2555
  def ExpandNames(self):
2556
    # FIXME: in the future maybe other cluster params won't require checking on
2557
    # all nodes to be modified.
2558
    self.needed_locks = {
2559
      locking.LEVEL_NODE: locking.ALL_SET,
2560
    }
2561
    self.share_locks[locking.LEVEL_NODE] = 1
2562

    
2563
  def BuildHooksEnv(self):
2564
    """Build hooks env.
2565

2566
    """
2567
    env = {
2568
      "OP_TARGET": self.cfg.GetClusterName(),
2569
      "NEW_VG_NAME": self.op.vg_name,
2570
      }
2571
    mn = self.cfg.GetMasterNode()
2572
    return env, [mn], [mn]
2573

    
2574
  def CheckPrereq(self):
2575
    """Check prerequisites.
2576

2577
    This checks whether the given params don't conflict and
2578
    if the given volume group is valid.
2579

2580
    """
2581
    if self.op.vg_name is not None and not self.op.vg_name:
2582
      instances = self.cfg.GetAllInstancesInfo().values()
2583
      for inst in instances:
2584
        for disk in inst.disks:
2585
          if _RecursiveCheckIfLVMBased(disk):
2586
            raise errors.OpPrereqError("Cannot disable lvm storage while"
2587
                                       " lvm-based instances exist",
2588
                                       errors.ECODE_INVAL)
2589

    
2590
    node_list = self.acquired_locks[locking.LEVEL_NODE]
2591

    
2592
    # if vg_name not None, checks given volume group on all nodes
2593
    if self.op.vg_name:
2594
      vglist = self.rpc.call_vg_list(node_list)
2595
      for node in node_list:
2596
        msg = vglist[node].fail_msg
2597
        if msg:
2598
          # ignoring down node
2599
          self.LogWarning("Error while gathering data on node %s"
2600
                          " (ignoring node): %s", node, msg)
2601
          continue
2602
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
2603
                                              self.op.vg_name,
2604
                                              constants.MIN_VG_SIZE)
2605
        if vgstatus:
2606
          raise errors.OpPrereqError("Error on node '%s': %s" %
2607
                                     (node, vgstatus), errors.ECODE_ENVIRON)
2608

    
2609
    self.cluster = cluster = self.cfg.GetClusterInfo()
2610
    # validate params changes
2611
    if self.op.beparams:
2612
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
2613
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
2614

    
2615
    if self.op.nicparams:
2616
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
2617
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
2618
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
2619
      nic_errors = []
2620

    
2621
      # check all instances for consistency
2622
      for instance in self.cfg.GetAllInstancesInfo().values():
2623
        for nic_idx, nic in enumerate(instance.nics):
2624
          params_copy = copy.deepcopy(nic.nicparams)
2625
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
2626

    
2627
          # check parameter syntax
2628
          try:
2629
            objects.NIC.CheckParameterSyntax(params_filled)
2630
          except errors.ConfigurationError, err:
2631
            nic_errors.append("Instance %s, nic/%d: %s" %
2632
                              (instance.name, nic_idx, err))
2633

    
2634
          # if we're moving instances to routed, check that they have an ip
2635
          target_mode = params_filled[constants.NIC_MODE]
2636
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
2637
            nic_errors.append("Instance %s, nic/%d: routed nick with no ip" %
2638
                              (instance.name, nic_idx))
2639
      if nic_errors:
2640
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
2641
                                   "\n".join(nic_errors))
2642

    
2643
    # hypervisor list/parameters
2644
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
2645
    if self.op.hvparams:
2646
      for hv_name, hv_dict in self.op.hvparams.items():
2647
        if hv_name not in self.new_hvparams:
2648
          self.new_hvparams[hv_name] = hv_dict
2649
        else:
2650
          self.new_hvparams[hv_name].update(hv_dict)
2651

    
2652
    # os hypervisor parameters
2653
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
2654
    if self.op.os_hvp:
2655
      for os_name, hvs in self.op.os_hvp.items():
2656
        if os_name not in self.new_os_hvp:
2657
          self.new_os_hvp[os_name] = hvs
2658
        else:
2659
          for hv_name, hv_dict in hvs.items():
2660
            if hv_name not in self.new_os_hvp[os_name]:
2661
              self.new_os_hvp[os_name][hv_name] = hv_dict
2662
            else:
2663
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
2664

    
2665
    # os parameters
2666
    self.new_osp = objects.FillDict(cluster.osparams, {})
2667
    if self.op.osparams:
2668
      for os_name, osp in self.op.osparams.items():
2669
        if os_name not in self.new_osp:
2670
          self.new_osp[os_name] = {}
2671

    
2672
        self.new_osp[os_name] = _GetUpdatedParams(self.new_osp[os_name], osp,
2673
                                                  use_none=True)
2674

    
2675
        if not self.new_osp[os_name]:
2676
          # we removed all parameters
2677
          del self.new_osp[os_name]
2678
        else:
2679
          # check the parameter validity (remote check)
2680
          _CheckOSParams(self, False, [self.cfg.GetMasterNode()],
2681
                         os_name, self.new_osp[os_name])
2682

    
2683
    # changes to the hypervisor list
2684
    if self.op.enabled_hypervisors is not None:
2685
      self.hv_list = self.op.enabled_hypervisors
2686
      for hv in self.hv_list:
2687
        # if the hypervisor doesn't already exist in the cluster
2688
        # hvparams, we initialize it to empty, and then (in both
2689
        # cases) we make sure to fill the defaults, as we might not
2690
        # have a complete defaults list if the hypervisor wasn't
2691
        # enabled before
2692
        if hv not in new_hvp:
2693
          new_hvp[hv] = {}
2694
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
2695
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
2696
    else:
2697
      self.hv_list = cluster.enabled_hypervisors
2698

    
2699
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
2700
      # either the enabled list has changed, or the parameters have, validate
2701
      for hv_name, hv_params in self.new_hvparams.items():
2702
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
2703
            (self.op.enabled_hypervisors and
2704
             hv_name in self.op.enabled_hypervisors)):
2705
          # either this is a new hypervisor, or its parameters have changed
2706
          hv_class = hypervisor.GetHypervisor(hv_name)
2707
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
2708
          hv_class.CheckParameterSyntax(hv_params)
2709
          _CheckHVParams(self, node_list, hv_name, hv_params)
2710

    
2711
    if self.op.os_hvp:
2712
      # no need to check any newly-enabled hypervisors, since the
2713
      # defaults have already been checked in the above code-block
2714
      for os_name, os_hvp in self.new_os_hvp.items():
2715
        for hv_name, hv_params in os_hvp.items():
2716
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
2717
          # we need to fill in the new os_hvp on top of the actual hv_p
2718
          cluster_defaults = self.new_hvparams.get(hv_name, {})
2719
          new_osp = objects.FillDict(cluster_defaults, hv_params)
2720
          hv_class = hypervisor.GetHypervisor(hv_name)
2721
          hv_class.CheckParameterSyntax(new_osp)
2722
          _CheckHVParams(self, node_list, hv_name, new_osp)
2723

    
2724

    
2725
  def Exec(self, feedback_fn):
2726
    """Change the parameters of the cluster.
2727

2728
    """
2729
    if self.op.vg_name is not None:
2730
      new_volume = self.op.vg_name
2731
      if not new_volume:
2732
        new_volume = None
2733
      if new_volume != self.cfg.GetVGName():
2734
        self.cfg.SetVGName(new_volume)
2735
      else:
2736
        feedback_fn("Cluster LVM configuration already in desired"
2737
                    " state, not changing")
2738
    if self.op.hvparams:
2739
      self.cluster.hvparams = self.new_hvparams
2740
    if self.op.os_hvp:
2741
      self.cluster.os_hvp = self.new_os_hvp
2742
    if self.op.enabled_hypervisors is not None:
2743
      self.cluster.hvparams = self.new_hvparams
2744
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
2745
    if self.op.beparams:
2746
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
2747
    if self.op.nicparams:
2748
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
2749
    if self.op.osparams:
2750
      self.cluster.osparams = self.new_osp
2751

    
2752
    if self.op.candidate_pool_size is not None:
2753
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
2754
      # we need to update the pool size here, otherwise the save will fail
2755
      _AdjustCandidatePool(self, [])
2756

    
2757
    if self.op.maintain_node_health is not None:
2758
      self.cluster.maintain_node_health = self.op.maintain_node_health
2759

    
2760
    if self.op.add_uids is not None:
2761
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
2762

    
2763
    if self.op.remove_uids is not None:
2764
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
2765

    
2766
    if self.op.uid_pool is not None:
2767
      self.cluster.uid_pool = self.op.uid_pool
2768

    
2769
    self.cfg.Update(self.cluster, feedback_fn)
2770

    
2771

    
2772
def _RedistributeAncillaryFiles(lu, additional_nodes=None):
2773
  """Distribute additional files which are part of the cluster configuration.
2774

2775
  ConfigWriter takes care of distributing the config and ssconf files, but
2776
  there are more files which should be distributed to all nodes. This function
2777
  makes sure those are copied.
2778

2779
  @param lu: calling logical unit
2780
  @param additional_nodes: list of nodes not in the config to distribute to
2781

2782
  """
2783
  # 1. Gather target nodes
2784
  myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
2785
  dist_nodes = lu.cfg.GetOnlineNodeList()
2786
  if additional_nodes is not None:
2787
    dist_nodes.extend(additional_nodes)
2788
  if myself.name in dist_nodes:
2789
    dist_nodes.remove(myself.name)
2790

    
2791
  # 2. Gather files to distribute
2792
  dist_files = set([constants.ETC_HOSTS,
2793
                    constants.SSH_KNOWN_HOSTS_FILE,
2794
                    constants.RAPI_CERT_FILE,
2795
                    constants.RAPI_USERS_FILE,
2796
                    constants.CONFD_HMAC_KEY,
2797
                    constants.CLUSTER_DOMAIN_SECRET_FILE,
2798
                   ])
2799

    
2800
  enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
2801
  for hv_name in enabled_hypervisors:
2802
    hv_class = hypervisor.GetHypervisor(hv_name)
2803
    dist_files.update(hv_class.GetAncillaryFiles())
2804

    
2805
  # 3. Perform the files upload
2806
  for fname in dist_files:
2807
    if os.path.exists(fname):
2808
      result = lu.rpc.call_upload_file(dist_nodes, fname)
2809
      for to_node, to_result in result.items():
2810
        msg = to_result.fail_msg
2811
        if msg:
2812
          msg = ("Copy of file %s to node %s failed: %s" %
2813
                 (fname, to_node, msg))
2814
          lu.proc.LogWarning(msg)
2815

    
2816

    
2817
class LURedistributeConfig(NoHooksLU):
2818
  """Force the redistribution of cluster configuration.
2819

2820
  This is a very simple LU.
2821

2822
  """
2823
  _OP_REQP = []
2824
  REQ_BGL = False
2825

    
2826
  def ExpandNames(self):
2827
    self.needed_locks = {
2828
      locking.LEVEL_NODE: locking.ALL_SET,
2829
    }
2830
    self.share_locks[locking.LEVEL_NODE] = 1
2831

    
2832
  def Exec(self, feedback_fn):
2833
    """Redistribute the configuration.
2834

2835
    """
2836
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
2837
    _RedistributeAncillaryFiles(self)
2838

    
2839

    
2840
def _WaitForSync(lu, instance, disks=None, oneshot=False):
2841
  """Sleep and poll for an instance's disk to sync.
2842

2843
  """
2844
  if not instance.disks or disks is not None and not disks:
2845
    return True
2846

    
2847
  disks = _ExpandCheckDisks(instance, disks)
2848

    
2849
  if not oneshot:
2850
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
2851

    
2852
  node = instance.primary_node
2853

    
2854
  for dev in disks:
2855
    lu.cfg.SetDiskID(dev, node)
2856

    
2857
  # TODO: Convert to utils.Retry
2858

    
2859
  retries = 0
2860
  degr_retries = 10 # in seconds, as we sleep 1 second each time
2861
  while True:
2862
    max_time = 0
2863
    done = True
2864
    cumul_degraded = False
2865
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, disks)
2866
    msg = rstats.fail_msg
2867
    if msg:
2868
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
2869
      retries += 1
2870
      if retries >= 10:
2871
        raise errors.RemoteError("Can't contact node %s for mirror data,"
2872
                                 " aborting." % node)
2873
      time.sleep(6)
2874
      continue
2875
    rstats = rstats.payload
2876
    retries = 0
2877
    for i, mstat in enumerate(rstats):
2878
      if mstat is None:
2879
        lu.LogWarning("Can't compute data for node %s/%s",
2880
                           node, disks[i].iv_name)
2881
        continue
2882

    
2883
      cumul_degraded = (cumul_degraded or
2884
                        (mstat.is_degraded and mstat.sync_percent is None))
2885
      if mstat.sync_percent is not None:
2886
        done = False
2887
        if mstat.estimated_time is not None:
2888
          rem_time = ("%s remaining (estimated)" %
2889
                      utils.FormatSeconds(mstat.estimated_time))
2890
          max_time = mstat.estimated_time
2891
        else:
2892
          rem_time = "no time estimate"
2893
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
2894
                        (disks[i].iv_name, mstat.sync_percent, rem_time))
2895

    
2896
    # if we're done but degraded, let's do a few small retries, to
2897
    # make sure we see a stable and not transient situation; therefore
2898
    # we force restart of the loop
2899
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
2900
      logging.info("Degraded disks found, %d retries left", degr_retries)
2901
      degr_retries -= 1
2902
      time.sleep(1)
2903
      continue
2904

    
2905
    if done or oneshot:
2906
      break
2907

    
2908
    time.sleep(min(60, max_time))
2909

    
2910
  if done:
2911
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
2912
  return not cumul_degraded
2913

    
2914

    
2915
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
2916
  """Check that mirrors are not degraded.
2917

2918
  The ldisk parameter, if True, will change the test from the
2919
  is_degraded attribute (which represents overall non-ok status for
2920
  the device(s)) to the ldisk (representing the local storage status).
2921

2922
  """
2923
  lu.cfg.SetDiskID(dev, node)
2924

    
2925
  result = True
2926

    
2927
  if on_primary or dev.AssembleOnSecondary():
2928
    rstats = lu.rpc.call_blockdev_find(node, dev)
2929
    msg = rstats.fail_msg
2930
    if msg:
2931
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
2932
      result = False
2933
    elif not rstats.payload:
2934
      lu.LogWarning("Can't find disk on node %s", node)
2935
      result = False
2936
    else:
2937
      if ldisk:
2938
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
2939
      else:
2940
        result = result and not rstats.payload.is_degraded
2941

    
2942
  if dev.children:
2943
    for child in dev.children:
2944
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
2945

    
2946
  return result
2947

    
2948

    
2949
class LUDiagnoseOS(NoHooksLU):
2950
  """Logical unit for OS diagnose/query.
2951

2952
  """
2953
  _OP_REQP = [
2954
    ("output_fields", _TListOf(_TNonEmptyString)),
2955
    ("names", _TListOf(_TNonEmptyString)),
2956
    ]
2957
  REQ_BGL = False
2958
  _FIELDS_STATIC = utils.FieldSet()
2959
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status", "variants",
2960
                                   "parameters", "api_versions")
2961

    
2962
  def CheckArguments(self):
2963
    if self.op.names:
2964
      raise errors.OpPrereqError("Selective OS query not supported",
2965
                                 errors.ECODE_INVAL)
2966

    
2967
    _CheckOutputFields(static=self._FIELDS_STATIC,
2968
                       dynamic=self._FIELDS_DYNAMIC,
2969
                       selected=self.op.output_fields)
2970

    
2971
  def ExpandNames(self):
2972
    # Lock all nodes, in shared mode
2973
    # Temporary removal of locks, should be reverted later
2974
    # TODO: reintroduce locks when they are lighter-weight
2975
    self.needed_locks = {}
2976
    #self.share_locks[locking.LEVEL_NODE] = 1
2977
    #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2978

    
2979
  @staticmethod
2980
  def _DiagnoseByOS(rlist):
2981
    """Remaps a per-node return list into an a per-os per-node dictionary
2982

2983
    @param rlist: a map with node names as keys and OS objects as values
2984

2985
    @rtype: dict
2986
    @return: a dictionary with osnames as keys and as value another
2987
        map, with nodes as keys and tuples of (path, status, diagnose,
2988
        variants, parameters, api_versions) as values, eg::
2989

2990
          {"debian-etch": {"node1": [(/usr/lib/..., True, "", [], []),
2991
                                     (/srv/..., False, "invalid api")],
2992
                           "node2": [(/srv/..., True, "", [], [])]}
2993
          }
2994

2995
    """
2996
    all_os = {}
2997
    # we build here the list of nodes that didn't fail the RPC (at RPC
2998
    # level), so that nodes with a non-responding node daemon don't
2999
    # make all OSes invalid
3000
    good_nodes = [node_name for node_name in rlist
3001
                  if not rlist[node_name].fail_msg]
3002
    for node_name, nr in rlist.items():
3003
      if nr.fail_msg or not nr.payload:
3004
        continue
3005
      for (name, path, status, diagnose, variants,
3006
           params, api_versions) in nr.payload:
3007
        if name not in all_os:
3008
          # build a list of nodes for this os containing empty lists
3009
          # for each node in node_list
3010
          all_os[name] = {}
3011
          for nname in good_nodes:
3012
            all_os[name][nname] = []
3013
        # convert params from [name, help] to (name, help)
3014
        params = [tuple(v) for v in params]
3015
        all_os[name][node_name].append((path, status, diagnose,
3016
                                        variants, params, api_versions))
3017
    return all_os
3018

    
3019
  def Exec(self, feedback_fn):
3020
    """Compute the list of OSes.
3021

3022
    """
3023
    valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
3024
    node_data = self.rpc.call_os_diagnose(valid_nodes)
3025
    pol = self._DiagnoseByOS(node_data)
3026
    output = []
3027

    
3028
    for os_name, os_data in pol.items():
3029
      row = []
3030
      valid = True
3031
      (variants, params, api_versions) = null_state = (set(), set(), set())
3032
      for idx, osl in enumerate(os_data.values()):
3033
        valid = bool(valid and osl and osl[0][1])
3034
        if not valid:
3035
          (variants, params, api_versions) = null_state
3036
          break
3037
        node_variants, node_params, node_api = osl[0][3:6]
3038
        if idx == 0: # first entry
3039
          variants = set(node_variants)
3040
          params = set(node_params)
3041
          api_versions = set(node_api)
3042
        else: # keep consistency
3043
          variants.intersection_update(node_variants)
3044
          params.intersection_update(node_params)
3045
          api_versions.intersection_update(node_api)
3046

    
3047
      for field in self.op.output_fields:
3048
        if field == "name":
3049
          val = os_name
3050
        elif field == "valid":
3051
          val = valid
3052
        elif field == "node_status":
3053
          # this is just a copy of the dict
3054
          val = {}
3055
          for node_name, nos_list in os_data.items():
3056
            val[node_name] = nos_list
3057
        elif field == "variants":
3058
          val = list(variants)
3059
        elif field == "parameters":
3060
          val = list(params)
3061
        elif field == "api_versions":
3062
          val = list(api_versions)
3063
        else:
3064
          raise errors.ParameterError(field)
3065
        row.append(val)
3066
      output.append(row)
3067

    
3068
    return output
3069

    
3070

    
3071
class LURemoveNode(LogicalUnit):
3072
  """Logical unit for removing a node.
3073

3074
  """
3075
  HPATH = "node-remove"
3076
  HTYPE = constants.HTYPE_NODE
3077
  _OP_REQP = [("node_name", _TNonEmptyString)]
3078

    
3079
  def BuildHooksEnv(self):
3080
    """Build hooks env.
3081

3082
    This doesn't run on the target node in the pre phase as a failed
3083
    node would then be impossible to remove.
3084

3085
    """
3086
    env = {
3087
      "OP_TARGET": self.op.node_name,
3088
      "NODE_NAME": self.op.node_name,
3089
      }
3090
    all_nodes = self.cfg.GetNodeList()
3091
    try:
3092
      all_nodes.remove(self.op.node_name)
3093
    except ValueError:
3094
      logging.warning("Node %s which is about to be removed not found"
3095
                      " in the all nodes list", self.op.node_name)
3096
    return env, all_nodes, all_nodes
3097

    
3098
  def CheckPrereq(self):
3099
    """Check prerequisites.
3100

3101
    This checks:
3102
     - the node exists in the configuration
3103
     - it does not have primary or secondary instances
3104
     - it's not the master
3105

3106
    Any errors are signaled by raising errors.OpPrereqError.
3107

3108
    """
3109
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
3110
    node = self.cfg.GetNodeInfo(self.op.node_name)
3111
    assert node is not None
3112

    
3113
    instance_list = self.cfg.GetInstanceList()
3114

    
3115
    masternode = self.cfg.GetMasterNode()
3116
    if node.name == masternode:
3117
      raise errors.OpPrereqError("Node is the master node,"
3118
                                 " you need to failover first.",
3119
                                 errors.ECODE_INVAL)
3120

    
3121
    for instance_name in instance_list:
3122
      instance = self.cfg.GetInstanceInfo(instance_name)
3123
      if node.name in instance.all_nodes:
3124
        raise errors.OpPrereqError("Instance %s is still running on the node,"
3125
                                   " please remove first." % instance_name,
3126
                                   errors.ECODE_INVAL)
3127
    self.op.node_name = node.name
3128
    self.node = node
3129

    
3130
  def Exec(self, feedback_fn):
3131
    """Removes the node from the cluster.
3132

3133
    """
3134
    node = self.node
3135
    logging.info("Stopping the node daemon and removing configs from node %s",
3136
                 node.name)
3137

    
3138
    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
3139

    
3140
    # Promote nodes to master candidate as needed
3141
    _AdjustCandidatePool(self, exceptions=[node.name])
3142
    self.context.RemoveNode(node.name)
3143

    
3144
    # Run post hooks on the node before it's removed
3145
    hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
3146
    try:
3147
      hm.RunPhase(constants.HOOKS_PHASE_POST, [node.name])
3148
    except:
3149
      # pylint: disable-msg=W0702
3150
      self.LogWarning("Errors occurred running hooks on %s" % node.name)
3151

    
3152
    result = self.rpc.call_node_leave_cluster(node.name, modify_ssh_setup)
3153
    msg = result.fail_msg
3154
    if msg:
3155
      self.LogWarning("Errors encountered on the remote node while leaving"
3156
                      " the cluster: %s", msg)
3157

    
3158
    # Remove node from our /etc/hosts
3159
    if self.cfg.GetClusterInfo().modify_etc_hosts:
3160
      # FIXME: this should be done via an rpc call to node daemon
3161
      utils.RemoveHostFromEtcHosts(node.name)
3162
      _RedistributeAncillaryFiles(self)
3163

    
3164

    
3165
class LUQueryNodes(NoHooksLU):
3166
  """Logical unit for querying nodes.
3167

3168
  """
3169
  # pylint: disable-msg=W0142
3170
  _OP_REQP = [
3171
    ("output_fields", _TListOf(_TNonEmptyString)),
3172
    ("names", _TListOf(_TNonEmptyString)),
3173
    ("use_locking", _TBool),
3174
    ]
3175
  REQ_BGL = False
3176

    
3177
  _SIMPLE_FIELDS = ["name", "serial_no", "ctime", "mtime", "uuid",
3178
                    "master_candidate", "offline", "drained"]
3179

    
3180
  _FIELDS_DYNAMIC = utils.FieldSet(
3181
    "dtotal", "dfree",
3182
    "mtotal", "mnode", "mfree",
3183
    "bootid",
3184
    "ctotal", "cnodes", "csockets",
3185
    )
3186

    
3187
  _FIELDS_STATIC = utils.FieldSet(*[
3188
    "pinst_cnt", "sinst_cnt",
3189
    "pinst_list", "sinst_list",
3190
    "pip", "sip", "tags",
3191
    "master",
3192
    "role"] + _SIMPLE_FIELDS
3193
    )
3194

    
3195
  def CheckArguments(self):
3196
    _CheckOutputFields(static=self._FIELDS_STATIC,
3197
                       dynamic=self._FIELDS_DYNAMIC,
3198
                       selected=self.op.output_fields)
3199

    
3200
  def ExpandNames(self):
3201
    self.needed_locks = {}
3202
    self.share_locks[locking.LEVEL_NODE] = 1
3203

    
3204
    if self.op.names:
3205
      self.wanted = _GetWantedNodes(self, self.op.names)
3206
    else:
3207
      self.wanted = locking.ALL_SET
3208

    
3209
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3210
    self.do_locking = self.do_node_query and self.op.use_locking
3211
    if self.do_locking:
3212
      # if we don't request only static fields, we need to lock the nodes
3213
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
3214

    
3215
  def Exec(self, feedback_fn):
3216
    """Computes the list of nodes and their attributes.
3217

3218
    """
3219
    all_info = self.cfg.GetAllNodesInfo()
3220
    if self.do_locking:
3221
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
3222
    elif self.wanted != locking.ALL_SET:
3223
      nodenames = self.wanted
3224
      missing = set(nodenames).difference(all_info.keys())
3225
      if missing:
3226
        raise errors.OpExecError(
3227
          "Some nodes were removed before retrieving their data: %s" % missing)
3228
    else:
3229
      nodenames = all_info.keys()
3230

    
3231
    nodenames = utils.NiceSort(nodenames)
3232
    nodelist = [all_info[name] for name in nodenames]
3233

    
3234
    # begin data gathering
3235

    
3236
    if self.do_node_query:
3237
      live_data = {}
3238
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3239
                                          self.cfg.GetHypervisorType())
3240
      for name in nodenames:
3241
        nodeinfo = node_data[name]
3242
        if not nodeinfo.fail_msg and nodeinfo.payload:
3243
          nodeinfo = nodeinfo.payload
3244
          fn = utils.TryConvert
3245
          live_data[name] = {
3246
            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
3247
            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
3248
            "mfree": fn(int, nodeinfo.get('memory_free', None)),
3249
            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
3250
            "dfree": fn(int, nodeinfo.get('vg_free', None)),
3251
            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
3252
            "bootid": nodeinfo.get('bootid', None),
3253
            "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
3254
            "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
3255
            }
3256
        else:
3257
          live_data[name] = {}
3258
    else:
3259
      live_data = dict.fromkeys(nodenames, {})
3260

    
3261
    node_to_primary = dict([(name, set()) for name in nodenames])
3262
    node_to_secondary = dict([(name, set()) for name in nodenames])
3263

    
3264
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
3265
                             "sinst_cnt", "sinst_list"))
3266
    if inst_fields & frozenset(self.op.output_fields):
3267
      inst_data = self.cfg.GetAllInstancesInfo()
3268

    
3269
      for inst in inst_data.values():
3270
        if inst.primary_node in node_to_primary:
3271
          node_to_primary[inst.primary_node].add(inst.name)
3272
        for secnode in inst.secondary_nodes:
3273
          if secnode in node_to_secondary:
3274
            node_to_secondary[secnode].add(inst.name)
3275

    
3276
    master_node = self.cfg.GetMasterNode()
3277

    
3278
    # end data gathering
3279

    
3280
    output = []
3281
    for node in nodelist:
3282
      node_output = []
3283
      for field in self.op.output_fields:
3284
        if field in self._SIMPLE_FIELDS:
3285
          val = getattr(node, field)
3286
        elif field == "pinst_list":
3287
          val = list(node_to_primary[node.name])
3288
        elif field == "sinst_list":
3289
          val = list(node_to_secondary[node.name])
3290
        elif field == "pinst_cnt":
3291
          val = len(node_to_primary[node.name])
3292
        elif field == "sinst_cnt":
3293
          val = len(node_to_secondary[node.name])
3294
        elif field == "pip":
3295
          val = node.primary_ip
3296
        elif field == "sip":
3297
          val = node.secondary_ip
3298
        elif field == "tags":
3299
          val = list(node.GetTags())
3300
        elif field == "master":
3301
          val = node.name == master_node
3302
        elif self._FIELDS_DYNAMIC.Matches(field):
3303
          val = live_data[node.name].get(field, None)
3304
        elif field == "role":
3305
          if node.name == master_node:
3306
            val = "M"
3307
          elif node.master_candidate:
3308
            val = "C"
3309
          elif node.drained:
3310
            val = "D"
3311
          elif node.offline:
3312
            val = "O"
3313
          else:
3314
            val = "R"
3315
        else:
3316
          raise errors.ParameterError(field)
3317
        node_output.append(val)
3318
      output.append(node_output)
3319

    
3320
    return output
3321

    
3322

    
3323
class LUQueryNodeVolumes(NoHooksLU):
3324
  """Logical unit for getting volumes on node(s).
3325

3326
  """
3327
  _OP_REQP = [
3328
    ("nodes", _TListOf(_TNonEmptyString)),
3329
    ("output_fields", _TListOf(_TNonEmptyString)),
3330
    ]
3331
  REQ_BGL = False
3332
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
3333
  _FIELDS_STATIC = utils.FieldSet("node")
3334

    
3335
  def CheckArguments(self):
3336
    _CheckOutputFields(static=self._FIELDS_STATIC,
3337
                       dynamic=self._FIELDS_DYNAMIC,
3338
                       selected=self.op.output_fields)
3339

    
3340
  def ExpandNames(self):
3341
    self.needed_locks = {}
3342
    self.share_locks[locking.LEVEL_NODE] = 1
3343
    if not self.op.nodes:
3344
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3345
    else:
3346
      self.needed_locks[locking.LEVEL_NODE] = \
3347
        _GetWantedNodes(self, self.op.nodes)
3348

    
3349
  def Exec(self, feedback_fn):
3350
    """Computes the list of nodes and their attributes.
3351

3352
    """
3353
    nodenames = self.acquired_locks[locking.LEVEL_NODE]
3354
    volumes = self.rpc.call_node_volumes(nodenames)
3355

    
3356
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
3357
             in self.cfg.GetInstanceList()]
3358

    
3359
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
3360

    
3361
    output = []
3362
    for node in nodenames:
3363
      nresult = volumes[node]
3364
      if nresult.offline:
3365
        continue
3366
      msg = nresult.fail_msg
3367
      if msg:
3368
        self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
3369
        continue
3370

    
3371
      node_vols = nresult.payload[:]
3372
      node_vols.sort(key=lambda vol: vol['dev'])
3373

    
3374
      for vol in node_vols:
3375
        node_output = []
3376
        for field in self.op.output_fields:
3377
          if field == "node":
3378
            val = node
3379
          elif field == "phys":
3380
            val = vol['dev']
3381
          elif field == "vg":
3382
            val = vol['vg']
3383
          elif field == "name":
3384
            val = vol['name']
3385
          elif field == "size":
3386
            val = int(float(vol['size']))
3387
          elif field == "instance":
3388
            for inst in ilist:
3389
              if node not in lv_by_node[inst]:
3390
                continue
3391
              if vol['name'] in lv_by_node[inst][node]:
3392
                val = inst.name
3393
                break
3394
            else:
3395
              val = '-'
3396
          else:
3397
            raise errors.ParameterError(field)
3398
          node_output.append(str(val))
3399

    
3400
        output.append(node_output)
3401

    
3402
    return output
3403

    
3404

    
3405
class LUQueryNodeStorage(NoHooksLU):
3406
  """Logical unit for getting information on storage units on node(s).
3407

3408
  """
3409
  _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
3410
  _OP_REQP = [
3411
    ("nodes", _TListOf(_TNonEmptyString)),
3412
    ("storage_type", _CheckStorageType),
3413
    ("output_fields", _TListOf(_TNonEmptyString)),
3414
    ]
3415
  _OP_DEFS = [("name", None)]
3416
  REQ_BGL = False
3417

    
3418
  def CheckArguments(self):
3419
    _CheckOutputFields(static=self._FIELDS_STATIC,
3420
                       dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
3421
                       selected=self.op.output_fields)
3422

    
3423
  def ExpandNames(self):
3424
    self.needed_locks = {}
3425
    self.share_locks[locking.LEVEL_NODE] = 1
3426

    
3427
    if self.op.nodes:
3428
      self.needed_locks[locking.LEVEL_NODE] = \
3429
        _GetWantedNodes(self, self.op.nodes)
3430
    else:
3431
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3432

    
3433
  def Exec(self, feedback_fn):
3434
    """Computes the list of nodes and their attributes.
3435

3436
    """
3437
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
3438

    
3439
    # Always get name to sort by
3440
    if constants.SF_NAME in self.op.output_fields:
3441
      fields = self.op.output_fields[:]
3442
    else:
3443
      fields = [constants.SF_NAME] + self.op.output_fields
3444

    
3445
    # Never ask for node or type as it's only known to the LU
3446
    for extra in [constants.SF_NODE, constants.SF_TYPE]:
3447
      while extra in fields:
3448
        fields.remove(extra)
3449

    
3450
    field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
3451
    name_idx = field_idx[constants.SF_NAME]
3452

    
3453
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
3454
    data = self.rpc.call_storage_list(self.nodes,
3455
                                      self.op.storage_type, st_args,
3456
                                      self.op.name, fields)
3457

    
3458
    result = []
3459

    
3460
    for node in utils.NiceSort(self.nodes):
3461
      nresult = data[node]
3462
      if nresult.offline:
3463
        continue
3464

    
3465
      msg = nresult.fail_msg
3466
      if msg:
3467
        self.LogWarning("Can't get storage data from node %s: %s", node, msg)
3468
        continue
3469

    
3470
      rows = dict([(row[name_idx], row) for row in nresult.payload])
3471

    
3472
      for name in utils.NiceSort(rows.keys()):
3473
        row = rows[name]
3474

    
3475
        out = []
3476

    
3477
        for field in self.op.output_fields:
3478
          if field == constants.SF_NODE:
3479
            val = node
3480
          elif field == constants.SF_TYPE:
3481
            val = self.op.storage_type
3482
          elif field in field_idx:
3483
            val = row[field_idx[field]]
3484
          else:
3485
            raise errors.ParameterError(field)
3486

    
3487
          out.append(val)
3488

    
3489
        result.append(out)
3490

    
3491
    return result
3492

    
3493

    
3494
class LUModifyNodeStorage(NoHooksLU):
3495
  """Logical unit for modifying a storage volume on a node.
3496

3497
  """
3498
  _OP_REQP = [
3499
    ("node_name", _TNonEmptyString),
3500
    ("storage_type", _CheckStorageType),
3501
    ("name", _TNonEmptyString),
3502
    ("changes", _TDict),
3503
    ]
3504
  REQ_BGL = False
3505

    
3506
  def CheckArguments(self):
3507
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
3508

    
3509
    storage_type = self.op.storage_type
3510

    
3511
    try:
3512
      modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
3513
    except KeyError:
3514
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
3515
                                 " modified" % storage_type,
3516
                                 errors.ECODE_INVAL)
3517

    
3518
    diff = set(self.op.changes.keys()) - modifiable
3519
    if diff:
3520
      raise errors.OpPrereqError("The following fields can not be modified for"
3521
                                 " storage units of type '%s': %r" %
3522
                                 (storage_type, list(diff)),
3523
                                 errors.ECODE_INVAL)
3524

    
3525
  def ExpandNames(self):
3526
    self.needed_locks = {
3527
      locking.LEVEL_NODE: self.op.node_name,
3528
      }
3529

    
3530
  def Exec(self, feedback_fn):
3531
    """Computes the list of nodes and their attributes.
3532

3533
    """
3534
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
3535
    result = self.rpc.call_storage_modify(self.op.node_name,
3536
                                          self.op.storage_type, st_args,
3537
                                          self.op.name, self.op.changes)
3538
    result.Raise("Failed to modify storage unit '%s' on %s" %
3539
                 (self.op.name, self.op.node_name))
3540

    
3541

    
3542
class LUAddNode(LogicalUnit):
3543
  """Logical unit for adding node to the cluster.
3544

3545
  """
3546
  HPATH = "node-add"
3547
  HTYPE = constants.HTYPE_NODE
3548
  _OP_REQP = [
3549
    ("node_name", _TNonEmptyString),
3550
    ]
3551
  _OP_DEFS = [("secondary_ip", None)]
3552

    
3553
  def CheckArguments(self):
3554
    # validate/normalize the node name
3555
    self.op.node_name = utils.HostInfo.NormalizeName(self.op.node_name)
3556

    
3557
  def BuildHooksEnv(self):
3558
    """Build hooks env.
3559

3560
    This will run on all nodes before, and on all nodes + the new node after.
3561

3562
    """
3563
    env = {
3564
      "OP_TARGET": self.op.node_name,
3565
      "NODE_NAME": self.op.node_name,
3566
      "NODE_PIP": self.op.primary_ip,
3567
      "NODE_SIP": self.op.secondary_ip,
3568
      }
3569
    nodes_0 = self.cfg.GetNodeList()
3570
    nodes_1 = nodes_0 + [self.op.node_name, ]
3571
    return env, nodes_0, nodes_1
3572

    
3573
  def CheckPrereq(self):
3574
    """Check prerequisites.
3575

3576
    This checks:
3577
     - the new node is not already in the config
3578
     - it is resolvable
3579
     - its parameters (single/dual homed) matches the cluster
3580

3581
    Any errors are signaled by raising errors.OpPrereqError.
3582

3583
    """
3584
    node_name = self.op.node_name
3585
    cfg = self.cfg
3586

    
3587
    dns_data = utils.GetHostInfo(node_name)
3588

    
3589
    node = dns_data.name
3590
    primary_ip = self.op.primary_ip = dns_data.ip
3591
    if self.op.secondary_ip is None:
3592
      self.op.secondary_ip = primary_ip
3593
    if not utils.IsValidIP(self.op.secondary_ip):
3594
      raise errors.OpPrereqError("Invalid secondary IP given",
3595
                                 errors.ECODE_INVAL)
3596
    secondary_ip = self.op.secondary_ip
3597

    
3598
    node_list = cfg.GetNodeList()
3599
    if not self.op.readd and node in node_list:
3600
      raise errors.OpPrereqError("Node %s is already in the configuration" %
3601
                                 node, errors.ECODE_EXISTS)
3602
    elif self.op.readd and node not in node_list:
3603
      raise errors.OpPrereqError("Node %s is not in the configuration" % node,
3604
                                 errors.ECODE_NOENT)
3605

    
3606
    self.changed_primary_ip = False
3607

    
3608
    for existing_node_name in node_list:
3609
      existing_node = cfg.GetNodeInfo(existing_node_name)
3610

    
3611
      if self.op.readd and node == existing_node_name:
3612
        if existing_node.secondary_ip != secondary_ip:
3613
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
3614
                                     " address configuration as before",
3615
                                     errors.ECODE_INVAL)
3616
        if existing_node.primary_ip != primary_ip:
3617
          self.changed_primary_ip = True
3618

    
3619
        continue
3620

    
3621
      if (existing_node.primary_ip == primary_ip or
3622
          existing_node.secondary_ip == primary_ip or
3623
          existing_node.primary_ip == secondary_ip or
3624
          existing_node.secondary_ip == secondary_ip):
3625
        raise errors.OpPrereqError("New node ip address(es) conflict with"
3626
                                   " existing node %s" % existing_node.name,
3627
                                   errors.ECODE_NOTUNIQUE)
3628

    
3629
    # check that the type of the node (single versus dual homed) is the
3630
    # same as for the master
3631
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
3632
    master_singlehomed = myself.secondary_ip == myself.primary_ip
3633
    newbie_singlehomed = secondary_ip == primary_ip
3634
    if master_singlehomed != newbie_singlehomed:
3635
      if master_singlehomed:
3636
        raise errors.OpPrereqError("The master has no private ip but the"
3637
                                   " new node has one",
3638
                                   errors.ECODE_INVAL)
3639
      else:
3640
        raise errors.OpPrereqError("The master has a private ip but the"
3641
                                   " new node doesn't have one",
3642
                                   errors.ECODE_INVAL)
3643

    
3644
    # checks reachability
3645
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
3646
      raise errors.OpPrereqError("Node not reachable by ping",
3647
                                 errors.ECODE_ENVIRON)
3648

    
3649
    if not newbie_singlehomed:
3650
      # check reachability from my secondary ip to newbie's secondary ip
3651
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
3652
                           source=myself.secondary_ip):
3653
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
3654
                                   " based ping to noded port",
3655
                                   errors.ECODE_ENVIRON)
3656

    
3657
    if self.op.readd:
3658
      exceptions = [node]
3659
    else:
3660
      exceptions = []
3661

    
3662
    self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
3663

    
3664
    if self.op.readd:
3665
      self.new_node = self.cfg.GetNodeInfo(node)
3666
      assert self.new_node is not None, "Can't retrieve locked node %s" % node
3667
    else:
3668
      self.new_node = objects.Node(name=node,
3669
                                   primary_ip=primary_ip,
3670
                                   secondary_ip=secondary_ip,
3671
                                   master_candidate=self.master_candidate,
3672
                                   offline=False, drained=False)
3673

    
3674
  def Exec(self, feedback_fn):
3675
    """Adds the new node to the cluster.
3676

3677
    """
3678
    new_node = self.new_node
3679
    node = new_node.name
3680

    
3681
    # for re-adds, reset the offline/drained/master-candidate flags;
3682
    # we need to reset here, otherwise offline would prevent RPC calls
3683
    # later in the procedure; this also means that if the re-add
3684
    # fails, we are left with a non-offlined, broken node
3685
    if self.op.readd:
3686
      new_node.drained = new_node.offline = False # pylint: disable-msg=W0201
3687
      self.LogInfo("Readding a node, the offline/drained flags were reset")
3688
      # if we demote the node, we do cleanup later in the procedure
3689
      new_node.master_candidate = self.master_candidate
3690
      if self.changed_primary_ip:
3691
        new_node.primary_ip = self.op.primary_ip
3692

    
3693
    # notify the user about any possible mc promotion
3694
    if new_node.master_candidate:
3695
      self.LogInfo("Node will be a master candidate")
3696

    
3697
    # check connectivity
3698
    result = self.rpc.call_version([node])[node]
3699
    result.Raise("Can't get version information from node %s" % node)
3700
    if constants.PROTOCOL_VERSION == result.payload:
3701
      logging.info("Communication to node %s fine, sw version %s match",
3702
                   node, result.payload)
3703
    else:
3704
      raise errors.OpExecError("Version mismatch master version %s,"
3705
                               " node version %s" %
3706
                               (constants.PROTOCOL_VERSION, result.payload))
3707

    
3708
    # setup ssh on node
3709
    if self.cfg.GetClusterInfo().modify_ssh_setup:
3710
      logging.info("Copy ssh key to node %s", node)
3711
      priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
3712
      keyarray = []
3713
      keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
3714
                  constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
3715
                  priv_key, pub_key]
3716

    
3717
      for i in keyfiles:
3718
        keyarray.append(utils.ReadFile(i))
3719

    
3720
      result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
3721
                                      keyarray[2], keyarray[3], keyarray[4],
3722
                                      keyarray[5])
3723
      result.Raise("Cannot transfer ssh keys to the new node")
3724

    
3725
    # Add node to our /etc/hosts, and add key to known_hosts
3726
    if self.cfg.GetClusterInfo().modify_etc_hosts:
3727
      # FIXME: this should be done via an rpc call to node daemon
3728
      utils.AddHostToEtcHosts(new_node.name)
3729

    
3730
    if new_node.secondary_ip != new_node.primary_ip:
3731
      result = self.rpc.call_node_has_ip_address(new_node.name,
3732
                                                 new_node.secondary_ip)
3733
      result.Raise("Failure checking secondary ip on node %s" % new_node.name,
3734
                   prereq=True, ecode=errors.ECODE_ENVIRON)
3735
      if not result.payload:
3736
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
3737
                                 " you gave (%s). Please fix and re-run this"
3738
                                 " command." % new_node.secondary_ip)
3739

    
3740
    node_verify_list = [self.cfg.GetMasterNode()]
3741
    node_verify_param = {
3742
      constants.NV_NODELIST: [node],
3743
      # TODO: do a node-net-test as well?
3744
    }
3745

    
3746
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
3747
                                       self.cfg.GetClusterName())
3748
    for verifier in node_verify_list:
3749
      result[verifier].Raise("Cannot communicate with node %s" % verifier)
3750
      nl_payload = result[verifier].payload[constants.NV_NODELIST]
3751
      if nl_payload:
3752
        for failed in nl_payload:
3753
          feedback_fn("ssh/hostname verification failed"
3754
                      " (checking from %s): %s" %
3755
                      (verifier, nl_payload[failed]))
3756
        raise errors.OpExecError("ssh/hostname verification failed.")
3757

    
3758
    if self.op.readd:
3759
      _RedistributeAncillaryFiles(self)
3760
      self.context.ReaddNode(new_node)
3761
      # make sure we redistribute the config
3762
      self.cfg.Update(new_node, feedback_fn)
3763
      # and make sure the new node will not have old files around
3764
      if not new_node.master_candidate:
3765
        result = self.rpc.call_node_demote_from_mc(new_node.name)
3766
        msg = result.fail_msg
3767
        if msg:
3768
          self.LogWarning("Node failed to demote itself from master"
3769
                          " candidate status: %s" % msg)
3770
    else:
3771
      _RedistributeAncillaryFiles(self, additional_nodes=[node])
3772
      self.context.AddNode(new_node, self.proc.GetECId())
3773

    
3774

    
3775
class LUSetNodeParams(LogicalUnit):
3776
  """Modifies the parameters of a node.
3777

3778
  """
3779
  HPATH = "node-modify"
3780
  HTYPE = constants.HTYPE_NODE
3781
  _OP_REQP = [("node_name", _TNonEmptyString)]
3782
  REQ_BGL = False
3783

    
3784
  def CheckArguments(self):
3785
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
3786
    _CheckBooleanOpField(self.op, 'master_candidate')
3787
    _CheckBooleanOpField(self.op, 'offline')
3788
    _CheckBooleanOpField(self.op, 'drained')
3789
    _CheckBooleanOpField(self.op, 'auto_promote')
3790
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
3791
    if all_mods.count(None) == 3:
3792
      raise errors.OpPrereqError("Please pass at least one modification",
3793
                                 errors.ECODE_INVAL)
3794
    if all_mods.count(True) > 1:
3795
      raise errors.OpPrereqError("Can't set the node into more than one"
3796
                                 " state at the same time",
3797
                                 errors.ECODE_INVAL)
3798

    
3799
    # Boolean value that tells us whether we're offlining or draining the node
3800
    self.offline_or_drain = (self.op.offline == True or
3801
                             self.op.drained == True)
3802
    self.deoffline_or_drain = (self.op.offline == False or
3803
                               self.op.drained == False)
3804
    self.might_demote = (self.op.master_candidate == False or
3805
                         self.offline_or_drain)
3806

    
3807
    self.lock_all = self.op.auto_promote and self.might_demote
3808

    
3809

    
3810
  def ExpandNames(self):
3811
    if self.lock_all:
3812
      self.needed_locks = {locking.LEVEL_NODE: locking.ALL_SET}
3813
    else:
3814
      self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
3815

    
3816
  def BuildHooksEnv(self):
3817
    """Build hooks env.
3818

3819
    This runs on the master node.
3820

3821
    """
3822
    env = {
3823
      "OP_TARGET": self.op.node_name,
3824
      "MASTER_CANDIDATE": str(self.op.master_candidate),
3825
      "OFFLINE": str(self.op.offline),
3826
      "DRAINED": str(self.op.drained),
3827
      }
3828
    nl = [self.cfg.GetMasterNode(),
3829
          self.op.node_name]
3830
    return env, nl, nl
3831

    
3832
  def CheckPrereq(self):
3833
    """Check prerequisites.
3834

3835
    This only checks the instance list against the existing names.
3836

3837
    """
3838
    node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
3839

    
3840
    if (self.op.master_candidate is not None or
3841
        self.op.drained is not None or
3842
        self.op.offline is not None):
3843
      # we can't change the master's node flags
3844
      if self.op.node_name == self.cfg.GetMasterNode():
3845
        raise errors.OpPrereqError("The master role can be changed"
3846
                                   " only via masterfailover",
3847
                                   errors.ECODE_INVAL)
3848

    
3849

    
3850
    if node.master_candidate and self.might_demote and not self.lock_all:
3851
      assert not self.op.auto_promote, "auto-promote set but lock_all not"
3852
      # check if after removing the current node, we're missing master
3853
      # candidates
3854
      (mc_remaining, mc_should, _) = \
3855
          self.cfg.GetMasterCandidateStats(exceptions=[node.name])
3856
      if mc_remaining < mc_should:
3857
        raise errors.OpPrereqError("Not enough master candidates, please"
3858
                                   " pass auto_promote to allow promotion",
3859
                                   errors.ECODE_INVAL)
3860

    
3861
    if (self.op.master_candidate == True and
3862
        ((node.offline and not self.op.offline == False) or
3863
         (node.drained and not self.op.drained == False))):
3864
      raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
3865
                                 " to master_candidate" % node.name,
3866
                                 errors.ECODE_INVAL)
3867

    
3868
    # If we're being deofflined/drained, we'll MC ourself if needed
3869
    if (self.deoffline_or_drain and not self.offline_or_drain and not
3870
        self.op.master_candidate == True and not node.master_candidate):
3871
      self.op.master_candidate = _DecideSelfPromotion(self)
3872
      if self.op.master_candidate:
3873
        self.LogInfo("Autopromoting node to master candidate")
3874

    
3875
    return
3876

    
3877
  def Exec(self, feedback_fn):
3878
    """Modifies a node.
3879

3880
    """
3881
    node = self.node
3882

    
3883
    result = []
3884
    changed_mc = False
3885

    
3886
    if self.op.offline is not None:
3887
      node.offline = self.op.offline
3888
      result.append(("offline", str(self.op.offline)))
3889
      if self.op.offline == True:
3890
        if node.master_candidate:
3891
          node.master_candidate = False
3892
          changed_mc = True
3893
          result.append(("master_candidate", "auto-demotion due to offline"))
3894
        if node.drained:
3895
          node.drained = False
3896
          result.append(("drained", "clear drained status due to offline"))
3897

    
3898
    if self.op.master_candidate is not None:
3899
      node.master_candidate = self.op.master_candidate
3900
      changed_mc = True
3901
      result.append(("master_candidate", str(self.op.master_candidate)))
3902
      if self.op.master_candidate == False:
3903
        rrc = self.rpc.call_node_demote_from_mc(node.name)
3904
        msg = rrc.fail_msg
3905
        if msg:
3906
          self.LogWarning("Node failed to demote itself: %s" % msg)
3907

    
3908
    if self.op.drained is not None:
3909
      node.drained = self.op.drained
3910
      result.append(("drained", str(self.op.drained)))
3911
      if self.op.drained == True:
3912
        if node.master_candidate:
3913
          node.master_candidate = False
3914
          changed_mc = True
3915
          result.append(("master_candidate", "auto-demotion due to drain"))
3916
          rrc = self.rpc.call_node_demote_from_mc(node.name)
3917
          msg = rrc.fail_msg
3918
          if msg:
3919
            self.LogWarning("Node failed to demote itself: %s" % msg)
3920
        if node.offline:
3921
          node.offline = False
3922
          result.append(("offline", "clear offline status due to drain"))
3923

    
3924
    # we locked all nodes, we adjust the CP before updating this node
3925
    if self.lock_all:
3926
      _AdjustCandidatePool(self, [node.name])
3927

    
3928
    # this will trigger configuration file update, if needed
3929
    self.cfg.Update(node, feedback_fn)
3930

    
3931
    # this will trigger job queue propagation or cleanup
3932
    if changed_mc:
3933
      self.context.ReaddNode(node)
3934

    
3935
    return result
3936

    
3937

    
3938
class LUPowercycleNode(NoHooksLU):
3939
  """Powercycles a node.
3940

3941
  """
3942
  _OP_REQP = [
3943
    ("node_name", _TNonEmptyString),
3944
    ("force", _TBool),
3945
    ]
3946
  REQ_BGL = False
3947

    
3948
  def CheckArguments(self):
3949
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
3950
    if self.op.node_name == self.cfg.GetMasterNode() and not self.op.force:
3951
      raise errors.OpPrereqError("The node is the master and the force"
3952
                                 " parameter was not set",
3953
                                 errors.ECODE_INVAL)
3954

    
3955
  def ExpandNames(self):
3956
    """Locking for PowercycleNode.
3957

3958
    This is a last-resort option and shouldn't block on other
3959
    jobs. Therefore, we grab no locks.
3960

3961
    """
3962
    self.needed_locks = {}
3963

    
3964
  def Exec(self, feedback_fn):
3965
    """Reboots a node.
3966

3967
    """
3968
    result = self.rpc.call_node_powercycle(self.op.node_name,
3969
                                           self.cfg.GetHypervisorType())
3970
    result.Raise("Failed to schedule the reboot")
3971
    return result.payload
3972

    
3973

    
3974
class LUQueryClusterInfo(NoHooksLU):
3975
  """Query cluster configuration.
3976

3977
  """
3978
  _OP_REQP = []
3979
  REQ_BGL = False
3980

    
3981
  def ExpandNames(self):
3982
    self.needed_locks = {}
3983

    
3984
  def Exec(self, feedback_fn):
3985
    """Return cluster config.
3986

3987
    """
3988
    cluster = self.cfg.GetClusterInfo()
3989
    os_hvp = {}
3990

    
3991
    # Filter just for enabled hypervisors
3992
    for os_name, hv_dict in cluster.os_hvp.items():
3993
      os_hvp[os_name] = {}
3994
      for hv_name, hv_params in hv_dict.items():
3995
        if hv_name in cluster.enabled_hypervisors:
3996
          os_hvp[os_name][hv_name] = hv_params
3997

    
3998
    result = {
3999
      "software_version": constants.RELEASE_VERSION,
4000
      "protocol_version": constants.PROTOCOL_VERSION,
4001
      "config_version": constants.CONFIG_VERSION,
4002
      "os_api_version": max(constants.OS_API_VERSIONS),
4003
      "export_version": constants.EXPORT_VERSION,
4004
      "architecture": (platform.architecture()[0], platform.machine()),
4005
      "name": cluster.cluster_name,
4006
      "master": cluster.master_node,
4007
      "default_hypervisor": cluster.enabled_hypervisors[0],
4008
      "enabled_hypervisors": cluster.enabled_hypervisors,
4009
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
4010
                        for hypervisor_name in cluster.enabled_hypervisors]),
4011
      "os_hvp": os_hvp,
4012
      "beparams": cluster.beparams,
4013
      "osparams": cluster.osparams,
4014
      "nicparams": cluster.nicparams,
4015
      "candidate_pool_size": cluster.candidate_pool_size,
4016
      "master_netdev": cluster.master_netdev,
4017
      "volume_group_name": cluster.volume_group_name,
4018
      "file_storage_dir": cluster.file_storage_dir,
4019
      "maintain_node_health": cluster.maintain_node_health,
4020
      "ctime": cluster.ctime,
4021
      "mtime": cluster.mtime,
4022
      "uuid": cluster.uuid,
4023
      "tags": list(cluster.GetTags()),
4024
      "uid_pool": cluster.uid_pool,
4025
      }
4026

    
4027
    return result
4028

    
4029

    
4030
class LUQueryConfigValues(NoHooksLU):
4031
  """Return configuration values.
4032

4033
  """
4034
  _OP_REQP = []
4035
  REQ_BGL = False
4036
  _FIELDS_DYNAMIC = utils.FieldSet()
4037
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag",
4038
                                  "watcher_pause")
4039

    
4040
  def CheckArguments(self):
4041
    _CheckOutputFields(static=self._FIELDS_STATIC,
4042
                       dynamic=self._FIELDS_DYNAMIC,
4043