Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ d12f5c66

History | View | Annotate | Download (353.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0201,C0302
25

    
26
# W0201 since most LU attributes are defined in CheckPrereq or similar
27
# functions
28

    
29
# C0302: since we have waaaay to many lines in this module
30

    
31
import os
32
import os.path
33
import time
34
import re
35
import platform
36
import logging
37
import copy
38
import OpenSSL
39

    
40
from ganeti import ssh
41
from ganeti import utils
42
from ganeti import errors
43
from ganeti import hypervisor
44
from ganeti import locking
45
from ganeti import constants
46
from ganeti import objects
47
from ganeti import serializer
48
from ganeti import ssconf
49
from ganeti import uidpool
50
from ganeti import compat
51
from ganeti import masterd
52

    
53
import ganeti.masterd.instance # pylint: disable-msg=W0611
54

    
55

    
56
# Modifiable default values; need to define these here before the
57
# actual LUs
58

    
59
def _EmptyList():
60
  """Returns an empty list.
61

62
  """
63
  return []
64

    
65

    
66
def _EmptyDict():
67
  """Returns an empty dict.
68

69
  """
70
  return {}
71

    
72

    
73
# Some basic types
74
def _TNotNone(val):
75
  """Checks if the given value is not None.
76

77
  """
78
  return val is not None
79

    
80

    
81
def _TNone(val):
82
  """Checks if the given value is None.
83

84
  """
85
  return val is None
86

    
87

    
88
def _TBool(val):
89
  """Checks if the given value is a boolean.
90

91
  """
92
  return isinstance(val, bool)
93

    
94

    
95
def _TInt(val):
96
  """Checks if the given value is an integer.
97

98
  """
99
  return isinstance(val, int)
100

    
101

    
102
def _TFloat(val):
103
  """Checks if the given value is a float.
104

105
  """
106
  return isinstance(val, float)
107

    
108

    
109
def _TString(val):
110
  """Checks if the given value is a string.
111

112
  """
113
  return isinstance(val, basestring)
114

    
115

    
116
def _TTrue(val):
117
  """Checks if a given value evaluates to a boolean True value.
118

119
  """
120
  return bool(val)
121

    
122

    
123
def _TElemOf(target_list):
124
  """Builds a function that checks if a given value is a member of a list.
125

126
  """
127
  return lambda val: val in target_list
128

    
129

    
130
# Container types
131
def _TList(val):
132
  """Checks if the given value is a list.
133

134
  """
135
  return isinstance(val, list)
136

    
137

    
138
def _TDict(val):
139
  """Checks if the given value is a dictionary.
140

141
  """
142
  return isinstance(val, dict)
143

    
144

    
145
# Combinator types
146
def _TAnd(*args):
147
  """Combine multiple functions using an AND operation.
148

149
  """
150
  def fn(val):
151
    return compat.all(t(val) for t in args)
152
  return fn
153

    
154

    
155
def _TOr(*args):
156
  """Combine multiple functions using an AND operation.
157

158
  """
159
  def fn(val):
160
    return compat.any(t(val) for t in args)
161
  return fn
162

    
163

    
164
# Type aliases
165

    
166
# non-empty string
167
_TNonEmptyString = _TAnd(_TString, _TTrue)
168

    
169

    
170
# positive integer
171
_TPositiveInt = _TAnd(_TInt, lambda v: v >= 0)
172

    
173

    
174
def _TListOf(my_type):
175
  """Checks if a given value is a list with all elements of the same type.
176

177
  """
178
  return _TAnd(_TList,
179
               lambda lst: compat.all(my_type(v) for v in lst))
180

    
181

    
182
def _TDictOf(key_type, val_type):
183
  """Checks a dict type for the type of its key/values.
184

185
  """
186
  return _TAnd(_TDict,
187
               lambda my_dict: (compat.all(key_type(v) for v in my_dict.keys())
188
                                and compat.all(val_type(v)
189
                                               for v in my_dict.values())))
190

    
191

    
192
# End types
193
class LogicalUnit(object):
194
  """Logical Unit base class.
195

196
  Subclasses must follow these rules:
197
    - implement ExpandNames
198
    - implement CheckPrereq (except when tasklets are used)
199
    - implement Exec (except when tasklets are used)
200
    - implement BuildHooksEnv
201
    - redefine HPATH and HTYPE
202
    - optionally redefine their run requirements:
203
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
204

205
  Note that all commands require root permissions.
206

207
  @ivar dry_run_result: the value (if any) that will be returned to the caller
208
      in dry-run mode (signalled by opcode dry_run parameter)
209
  @cvar _OP_DEFS: a list of opcode attributes and the defaults values
210
      they should get if not already existing
211

212
  """
213
  HPATH = None
214
  HTYPE = None
215
  _OP_REQP = []
216
  _OP_DEFS = []
217
  REQ_BGL = True
218

    
219
  def __init__(self, processor, op, context, rpc):
220
    """Constructor for LogicalUnit.
221

222
    This needs to be overridden in derived classes in order to check op
223
    validity.
224

225
    """
226
    self.proc = processor
227
    self.op = op
228
    self.cfg = context.cfg
229
    self.context = context
230
    self.rpc = rpc
231
    # Dicts used to declare locking needs to mcpu
232
    self.needed_locks = None
233
    self.acquired_locks = {}
234
    self.share_locks = dict.fromkeys(locking.LEVELS, 0)
235
    self.add_locks = {}
236
    self.remove_locks = {}
237
    # Used to force good behavior when calling helper functions
238
    self.recalculate_locks = {}
239
    self.__ssh = None
240
    # logging
241
    self.LogWarning = processor.LogWarning # pylint: disable-msg=C0103
242
    self.LogInfo = processor.LogInfo # pylint: disable-msg=C0103
243
    self.LogStep = processor.LogStep # pylint: disable-msg=C0103
244
    # support for dry-run
245
    self.dry_run_result = None
246
    # support for generic debug attribute
247
    if (not hasattr(self.op, "debug_level") or
248
        not isinstance(self.op.debug_level, int)):
249
      self.op.debug_level = 0
250

    
251
    # Tasklets
252
    self.tasklets = None
253

    
254
    for aname, aval in self._OP_DEFS:
255
      if not hasattr(self.op, aname):
256
        if callable(aval):
257
          dval = aval()
258
        else:
259
          dval = aval
260
        setattr(self.op, aname, dval)
261

    
262
    for attr_name, test in self._OP_REQP:
263
      if not hasattr(op, attr_name):
264
        raise errors.OpPrereqError("Required parameter '%s' missing" %
265
                                   attr_name, errors.ECODE_INVAL)
266
      attr_val = getattr(op, attr_name, None)
267
      if not callable(test):
268
        raise errors.ProgrammerError("Validation for parameter '%s' failed,"
269
                                     " given type is not a proper type (%s)" %
270
                                     (attr_name, test))
271
      if not test(attr_val):
272
        logging.error("OpCode %s, parameter %s, has invalid type %s/value %s",
273
                      self.op.OP_ID, attr_name, type(attr_val), attr_val)
274
        raise errors.OpPrereqError("Parameter '%s' has invalid type" %
275
                                   attr_name, errors.ECODE_INVAL)
276

    
277
    self.CheckArguments()
278

    
279
  def __GetSSH(self):
280
    """Returns the SshRunner object
281

282
    """
283
    if not self.__ssh:
284
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
285
    return self.__ssh
286

    
287
  ssh = property(fget=__GetSSH)
288

    
289
  def CheckArguments(self):
290
    """Check syntactic validity for the opcode arguments.
291

292
    This method is for doing a simple syntactic check and ensure
293
    validity of opcode parameters, without any cluster-related
294
    checks. While the same can be accomplished in ExpandNames and/or
295
    CheckPrereq, doing these separate is better because:
296

297
      - ExpandNames is left as as purely a lock-related function
298
      - CheckPrereq is run after we have acquired locks (and possible
299
        waited for them)
300

301
    The function is allowed to change the self.op attribute so that
302
    later methods can no longer worry about missing parameters.
303

304
    """
305
    pass
306

    
307
  def ExpandNames(self):
308
    """Expand names for this LU.
309

310
    This method is called before starting to execute the opcode, and it should
311
    update all the parameters of the opcode to their canonical form (e.g. a
312
    short node name must be fully expanded after this method has successfully
313
    completed). This way locking, hooks, logging, ecc. can work correctly.
314

315
    LUs which implement this method must also populate the self.needed_locks
316
    member, as a dict with lock levels as keys, and a list of needed lock names
317
    as values. Rules:
318

319
      - use an empty dict if you don't need any lock
320
      - if you don't need any lock at a particular level omit that level
321
      - don't put anything for the BGL level
322
      - if you want all locks at a level use locking.ALL_SET as a value
323

324
    If you need to share locks (rather than acquire them exclusively) at one
325
    level you can modify self.share_locks, setting a true value (usually 1) for
326
    that level. By default locks are not shared.
327

328
    This function can also define a list of tasklets, which then will be
329
    executed in order instead of the usual LU-level CheckPrereq and Exec
330
    functions, if those are not defined by the LU.
331

332
    Examples::
333

334
      # Acquire all nodes and one instance
335
      self.needed_locks = {
336
        locking.LEVEL_NODE: locking.ALL_SET,
337
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
338
      }
339
      # Acquire just two nodes
340
      self.needed_locks = {
341
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
342
      }
343
      # Acquire no locks
344
      self.needed_locks = {} # No, you can't leave it to the default value None
345

346
    """
347
    # The implementation of this method is mandatory only if the new LU is
348
    # concurrent, so that old LUs don't need to be changed all at the same
349
    # time.
350
    if self.REQ_BGL:
351
      self.needed_locks = {} # Exclusive LUs don't need locks.
352
    else:
353
      raise NotImplementedError
354

    
355
  def DeclareLocks(self, level):
356
    """Declare LU locking needs for a level
357

358
    While most LUs can just declare their locking needs at ExpandNames time,
359
    sometimes there's the need to calculate some locks after having acquired
360
    the ones before. This function is called just before acquiring locks at a
361
    particular level, but after acquiring the ones at lower levels, and permits
362
    such calculations. It can be used to modify self.needed_locks, and by
363
    default it does nothing.
364

365
    This function is only called if you have something already set in
366
    self.needed_locks for the level.
367

368
    @param level: Locking level which is going to be locked
369
    @type level: member of ganeti.locking.LEVELS
370

371
    """
372

    
373
  def CheckPrereq(self):
374
    """Check prerequisites for this LU.
375

376
    This method should check that the prerequisites for the execution
377
    of this LU are fulfilled. It can do internode communication, but
378
    it should be idempotent - no cluster or system changes are
379
    allowed.
380

381
    The method should raise errors.OpPrereqError in case something is
382
    not fulfilled. Its return value is ignored.
383

384
    This method should also update all the parameters of the opcode to
385
    their canonical form if it hasn't been done by ExpandNames before.
386

387
    """
388
    if self.tasklets is not None:
389
      for (idx, tl) in enumerate(self.tasklets):
390
        logging.debug("Checking prerequisites for tasklet %s/%s",
391
                      idx + 1, len(self.tasklets))
392
        tl.CheckPrereq()
393
    else:
394
      pass
395

    
396
  def Exec(self, feedback_fn):
397
    """Execute the LU.
398

399
    This method should implement the actual work. It should raise
400
    errors.OpExecError for failures that are somewhat dealt with in
401
    code, or expected.
402

403
    """
404
    if self.tasklets is not None:
405
      for (idx, tl) in enumerate(self.tasklets):
406
        logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
407
        tl.Exec(feedback_fn)
408
    else:
409
      raise NotImplementedError
410

    
411
  def BuildHooksEnv(self):
412
    """Build hooks environment for this LU.
413

414
    This method should return a three-node tuple consisting of: a dict
415
    containing the environment that will be used for running the
416
    specific hook for this LU, a list of node names on which the hook
417
    should run before the execution, and a list of node names on which
418
    the hook should run after the execution.
419

420
    The keys of the dict must not have 'GANETI_' prefixed as this will
421
    be handled in the hooks runner. Also note additional keys will be
422
    added by the hooks runner. If the LU doesn't define any
423
    environment, an empty dict (and not None) should be returned.
424

425
    No nodes should be returned as an empty list (and not None).
426

427
    Note that if the HPATH for a LU class is None, this function will
428
    not be called.
429

430
    """
431
    raise NotImplementedError
432

    
433
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
434
    """Notify the LU about the results of its hooks.
435

436
    This method is called every time a hooks phase is executed, and notifies
437
    the Logical Unit about the hooks' result. The LU can then use it to alter
438
    its result based on the hooks.  By default the method does nothing and the
439
    previous result is passed back unchanged but any LU can define it if it
440
    wants to use the local cluster hook-scripts somehow.
441

442
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
443
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
444
    @param hook_results: the results of the multi-node hooks rpc call
445
    @param feedback_fn: function used send feedback back to the caller
446
    @param lu_result: the previous Exec result this LU had, or None
447
        in the PRE phase
448
    @return: the new Exec result, based on the previous result
449
        and hook results
450

451
    """
452
    # API must be kept, thus we ignore the unused argument and could
453
    # be a function warnings
454
    # pylint: disable-msg=W0613,R0201
455
    return lu_result
456

    
457
  def _ExpandAndLockInstance(self):
458
    """Helper function to expand and lock an instance.
459

460
    Many LUs that work on an instance take its name in self.op.instance_name
461
    and need to expand it and then declare the expanded name for locking. This
462
    function does it, and then updates self.op.instance_name to the expanded
463
    name. It also initializes needed_locks as a dict, if this hasn't been done
464
    before.
465

466
    """
467
    if self.needed_locks is None:
468
      self.needed_locks = {}
469
    else:
470
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
471
        "_ExpandAndLockInstance called with instance-level locks set"
472
    self.op.instance_name = _ExpandInstanceName(self.cfg,
473
                                                self.op.instance_name)
474
    self.needed_locks[locking.LEVEL_INSTANCE] = self.op.instance_name
475

    
476
  def _LockInstancesNodes(self, primary_only=False):
477
    """Helper function to declare instances' nodes for locking.
478

479
    This function should be called after locking one or more instances to lock
480
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
481
    with all primary or secondary nodes for instances already locked and
482
    present in self.needed_locks[locking.LEVEL_INSTANCE].
483

484
    It should be called from DeclareLocks, and for safety only works if
485
    self.recalculate_locks[locking.LEVEL_NODE] is set.
486

487
    In the future it may grow parameters to just lock some instance's nodes, or
488
    to just lock primaries or secondary nodes, if needed.
489

490
    If should be called in DeclareLocks in a way similar to::
491

492
      if level == locking.LEVEL_NODE:
493
        self._LockInstancesNodes()
494

495
    @type primary_only: boolean
496
    @param primary_only: only lock primary nodes of locked instances
497

498
    """
499
    assert locking.LEVEL_NODE in self.recalculate_locks, \
500
      "_LockInstancesNodes helper function called with no nodes to recalculate"
501

    
502
    # TODO: check if we're really been called with the instance locks held
503

    
504
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
505
    # future we might want to have different behaviors depending on the value
506
    # of self.recalculate_locks[locking.LEVEL_NODE]
507
    wanted_nodes = []
508
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
509
      instance = self.context.cfg.GetInstanceInfo(instance_name)
510
      wanted_nodes.append(instance.primary_node)
511
      if not primary_only:
512
        wanted_nodes.extend(instance.secondary_nodes)
513

    
514
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
515
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
516
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
517
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
518

    
519
    del self.recalculate_locks[locking.LEVEL_NODE]
520

    
521

    
522
class NoHooksLU(LogicalUnit): # pylint: disable-msg=W0223
523
  """Simple LU which runs no hooks.
524

525
  This LU is intended as a parent for other LogicalUnits which will
526
  run no hooks, in order to reduce duplicate code.
527

528
  """
529
  HPATH = None
530
  HTYPE = None
531

    
532
  def BuildHooksEnv(self):
533
    """Empty BuildHooksEnv for NoHooksLu.
534

535
    This just raises an error.
536

537
    """
538
    assert False, "BuildHooksEnv called for NoHooksLUs"
539

    
540

    
541
class Tasklet:
542
  """Tasklet base class.
543

544
  Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
545
  they can mix legacy code with tasklets. Locking needs to be done in the LU,
546
  tasklets know nothing about locks.
547

548
  Subclasses must follow these rules:
549
    - Implement CheckPrereq
550
    - Implement Exec
551

552
  """
553
  def __init__(self, lu):
554
    self.lu = lu
555

    
556
    # Shortcuts
557
    self.cfg = lu.cfg
558
    self.rpc = lu.rpc
559

    
560
  def CheckPrereq(self):
561
    """Check prerequisites for this tasklets.
562

563
    This method should check whether the prerequisites for the execution of
564
    this tasklet are fulfilled. It can do internode communication, but it
565
    should be idempotent - no cluster or system changes are allowed.
566

567
    The method should raise errors.OpPrereqError in case something is not
568
    fulfilled. Its return value is ignored.
569

570
    This method should also update all parameters to their canonical form if it
571
    hasn't been done before.
572

573
    """
574
    pass
575

    
576
  def Exec(self, feedback_fn):
577
    """Execute the tasklet.
578

579
    This method should implement the actual work. It should raise
580
    errors.OpExecError for failures that are somewhat dealt with in code, or
581
    expected.
582

583
    """
584
    raise NotImplementedError
585

    
586

    
587
def _GetWantedNodes(lu, nodes):
588
  """Returns list of checked and expanded node names.
589

590
  @type lu: L{LogicalUnit}
591
  @param lu: the logical unit on whose behalf we execute
592
  @type nodes: list
593
  @param nodes: list of node names or None for all nodes
594
  @rtype: list
595
  @return: the list of nodes, sorted
596
  @raise errors.ProgrammerError: if the nodes parameter is wrong type
597

598
  """
599
  if not nodes:
600
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
601
      " non-empty list of nodes whose name is to be expanded.")
602

    
603
  wanted = [_ExpandNodeName(lu.cfg, name) for name in nodes]
604
  return utils.NiceSort(wanted)
605

    
606

    
607
def _GetWantedInstances(lu, instances):
608
  """Returns list of checked and expanded instance names.
609

610
  @type lu: L{LogicalUnit}
611
  @param lu: the logical unit on whose behalf we execute
612
  @type instances: list
613
  @param instances: list of instance names or None for all instances
614
  @rtype: list
615
  @return: the list of instances, sorted
616
  @raise errors.OpPrereqError: if the instances parameter is wrong type
617
  @raise errors.OpPrereqError: if any of the passed instances is not found
618

619
  """
620
  if instances:
621
    wanted = [_ExpandInstanceName(lu.cfg, name) for name in instances]
622
  else:
623
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
624
  return wanted
625

    
626

    
627
def _GetUpdatedParams(old_params, update_dict,
628
                      use_default=True, use_none=False):
629
  """Return the new version of a parameter dictionary.
630

631
  @type old_params: dict
632
  @param old_params: old parameters
633
  @type update_dict: dict
634
  @param update_dict: dict containing new parameter values, or
635
      constants.VALUE_DEFAULT to reset the parameter to its default
636
      value
637
  @param use_default: boolean
638
  @type use_default: whether to recognise L{constants.VALUE_DEFAULT}
639
      values as 'to be deleted' values
640
  @param use_none: boolean
641
  @type use_none: whether to recognise C{None} values as 'to be
642
      deleted' values
643
  @rtype: dict
644
  @return: the new parameter dictionary
645

646
  """
647
  params_copy = copy.deepcopy(old_params)
648
  for key, val in update_dict.iteritems():
649
    if ((use_default and val == constants.VALUE_DEFAULT) or
650
        (use_none and val is None)):
651
      try:
652
        del params_copy[key]
653
      except KeyError:
654
        pass
655
    else:
656
      params_copy[key] = val
657
  return params_copy
658

    
659

    
660
def _CheckOutputFields(static, dynamic, selected):
661
  """Checks whether all selected fields are valid.
662

663
  @type static: L{utils.FieldSet}
664
  @param static: static fields set
665
  @type dynamic: L{utils.FieldSet}
666
  @param dynamic: dynamic fields set
667

668
  """
669
  f = utils.FieldSet()
670
  f.Extend(static)
671
  f.Extend(dynamic)
672

    
673
  delta = f.NonMatching(selected)
674
  if delta:
675
    raise errors.OpPrereqError("Unknown output fields selected: %s"
676
                               % ",".join(delta), errors.ECODE_INVAL)
677

    
678

    
679
def _CheckBooleanOpField(op, name):
680
  """Validates boolean opcode parameters.
681

682
  This will ensure that an opcode parameter is either a boolean value,
683
  or None (but that it always exists).
684

685
  """
686
  val = getattr(op, name, None)
687
  if not (val is None or isinstance(val, bool)):
688
    raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
689
                               (name, str(val)), errors.ECODE_INVAL)
690
  setattr(op, name, val)
691

    
692

    
693
def _CheckGlobalHvParams(params):
694
  """Validates that given hypervisor params are not global ones.
695

696
  This will ensure that instances don't get customised versions of
697
  global params.
698

699
  """
700
  used_globals = constants.HVC_GLOBALS.intersection(params)
701
  if used_globals:
702
    msg = ("The following hypervisor parameters are global and cannot"
703
           " be customized at instance level, please modify them at"
704
           " cluster level: %s" % utils.CommaJoin(used_globals))
705
    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
706

    
707

    
708
def _CheckNodeOnline(lu, node):
709
  """Ensure that a given node is online.
710

711
  @param lu: the LU on behalf of which we make the check
712
  @param node: the node to check
713
  @raise errors.OpPrereqError: if the node is offline
714

715
  """
716
  if lu.cfg.GetNodeInfo(node).offline:
717
    raise errors.OpPrereqError("Can't use offline node %s" % node,
718
                               errors.ECODE_INVAL)
719

    
720

    
721
def _CheckNodeNotDrained(lu, node):
722
  """Ensure that a given node is not drained.
723

724
  @param lu: the LU on behalf of which we make the check
725
  @param node: the node to check
726
  @raise errors.OpPrereqError: if the node is drained
727

728
  """
729
  if lu.cfg.GetNodeInfo(node).drained:
730
    raise errors.OpPrereqError("Can't use drained node %s" % node,
731
                               errors.ECODE_INVAL)
732

    
733

    
734
def _CheckNodeHasOS(lu, node, os_name, force_variant):
735
  """Ensure that a node supports a given OS.
736

737
  @param lu: the LU on behalf of which we make the check
738
  @param node: the node to check
739
  @param os_name: the OS to query about
740
  @param force_variant: whether to ignore variant errors
741
  @raise errors.OpPrereqError: if the node is not supporting the OS
742

743
  """
744
  result = lu.rpc.call_os_get(node, os_name)
745
  result.Raise("OS '%s' not in supported OS list for node %s" %
746
               (os_name, node),
747
               prereq=True, ecode=errors.ECODE_INVAL)
748
  if not force_variant:
749
    _CheckOSVariant(result.payload, os_name)
750

    
751

    
752
def _RequireFileStorage():
753
  """Checks that file storage is enabled.
754

755
  @raise errors.OpPrereqError: when file storage is disabled
756

757
  """
758
  if not constants.ENABLE_FILE_STORAGE:
759
    raise errors.OpPrereqError("File storage disabled at configure time",
760
                               errors.ECODE_INVAL)
761

    
762

    
763
def _CheckDiskTemplate(template):
764
  """Ensure a given disk template is valid.
765

766
  """
767
  if template not in constants.DISK_TEMPLATES:
768
    msg = ("Invalid disk template name '%s', valid templates are: %s" %
769
           (template, utils.CommaJoin(constants.DISK_TEMPLATES)))
770
    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
771
  if template == constants.DT_FILE:
772
    _RequireFileStorage()
773

    
774

    
775
def _CheckStorageType(storage_type):
776
  """Ensure a given storage type is valid.
777

778
  """
779
  if storage_type not in constants.VALID_STORAGE_TYPES:
780
    raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
781
                               errors.ECODE_INVAL)
782
  if storage_type == constants.ST_FILE:
783
    _RequireFileStorage()
784
  return True
785

    
786

    
787
def _GetClusterDomainSecret():
788
  """Reads the cluster domain secret.
789

790
  """
791
  return utils.ReadOneLineFile(constants.CLUSTER_DOMAIN_SECRET_FILE,
792
                               strict=True)
793

    
794

    
795
def _CheckInstanceDown(lu, instance, reason):
796
  """Ensure that an instance is not running."""
797
  if instance.admin_up:
798
    raise errors.OpPrereqError("Instance %s is marked to be up, %s" %
799
                               (instance.name, reason), errors.ECODE_STATE)
800

    
801
  pnode = instance.primary_node
802
  ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
803
  ins_l.Raise("Can't contact node %s for instance information" % pnode,
804
              prereq=True, ecode=errors.ECODE_ENVIRON)
805

    
806
  if instance.name in ins_l.payload:
807
    raise errors.OpPrereqError("Instance %s is running, %s" %
808
                               (instance.name, reason), errors.ECODE_STATE)
809

    
810

    
811
def _ExpandItemName(fn, name, kind):
812
  """Expand an item name.
813

814
  @param fn: the function to use for expansion
815
  @param name: requested item name
816
  @param kind: text description ('Node' or 'Instance')
817
  @return: the resolved (full) name
818
  @raise errors.OpPrereqError: if the item is not found
819

820
  """
821
  full_name = fn(name)
822
  if full_name is None:
823
    raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
824
                               errors.ECODE_NOENT)
825
  return full_name
826

    
827

    
828
def _ExpandNodeName(cfg, name):
829
  """Wrapper over L{_ExpandItemName} for nodes."""
830
  return _ExpandItemName(cfg.ExpandNodeName, name, "Node")
831

    
832

    
833
def _ExpandInstanceName(cfg, name):
834
  """Wrapper over L{_ExpandItemName} for instance."""
835
  return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
836

    
837

    
838
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
839
                          memory, vcpus, nics, disk_template, disks,
840
                          bep, hvp, hypervisor_name):
841
  """Builds instance related env variables for hooks
842

843
  This builds the hook environment from individual variables.
844

845
  @type name: string
846
  @param name: the name of the instance
847
  @type primary_node: string
848
  @param primary_node: the name of the instance's primary node
849
  @type secondary_nodes: list
850
  @param secondary_nodes: list of secondary nodes as strings
851
  @type os_type: string
852
  @param os_type: the name of the instance's OS
853
  @type status: boolean
854
  @param status: the should_run status of the instance
855
  @type memory: string
856
  @param memory: the memory size of the instance
857
  @type vcpus: string
858
  @param vcpus: the count of VCPUs the instance has
859
  @type nics: list
860
  @param nics: list of tuples (ip, mac, mode, link) representing
861
      the NICs the instance has
862
  @type disk_template: string
863
  @param disk_template: the disk template of the instance
864
  @type disks: list
865
  @param disks: the list of (size, mode) pairs
866
  @type bep: dict
867
  @param bep: the backend parameters for the instance
868
  @type hvp: dict
869
  @param hvp: the hypervisor parameters for the instance
870
  @type hypervisor_name: string
871
  @param hypervisor_name: the hypervisor for the instance
872
  @rtype: dict
873
  @return: the hook environment for this instance
874

875
  """
876
  if status:
877
    str_status = "up"
878
  else:
879
    str_status = "down"
880
  env = {
881
    "OP_TARGET": name,
882
    "INSTANCE_NAME": name,
883
    "INSTANCE_PRIMARY": primary_node,
884
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
885
    "INSTANCE_OS_TYPE": os_type,
886
    "INSTANCE_STATUS": str_status,
887
    "INSTANCE_MEMORY": memory,
888
    "INSTANCE_VCPUS": vcpus,
889
    "INSTANCE_DISK_TEMPLATE": disk_template,
890
    "INSTANCE_HYPERVISOR": hypervisor_name,
891
  }
892

    
893
  if nics:
894
    nic_count = len(nics)
895
    for idx, (ip, mac, mode, link) in enumerate(nics):
896
      if ip is None:
897
        ip = ""
898
      env["INSTANCE_NIC%d_IP" % idx] = ip
899
      env["INSTANCE_NIC%d_MAC" % idx] = mac
900
      env["INSTANCE_NIC%d_MODE" % idx] = mode
901
      env["INSTANCE_NIC%d_LINK" % idx] = link
902
      if mode == constants.NIC_MODE_BRIDGED:
903
        env["INSTANCE_NIC%d_BRIDGE" % idx] = link
904
  else:
905
    nic_count = 0
906

    
907
  env["INSTANCE_NIC_COUNT"] = nic_count
908

    
909
  if disks:
910
    disk_count = len(disks)
911
    for idx, (size, mode) in enumerate(disks):
912
      env["INSTANCE_DISK%d_SIZE" % idx] = size
913
      env["INSTANCE_DISK%d_MODE" % idx] = mode
914
  else:
915
    disk_count = 0
916

    
917
  env["INSTANCE_DISK_COUNT"] = disk_count
918

    
919
  for source, kind in [(bep, "BE"), (hvp, "HV")]:
920
    for key, value in source.items():
921
      env["INSTANCE_%s_%s" % (kind, key)] = value
922

    
923
  return env
924

    
925

    
926
def _NICListToTuple(lu, nics):
927
  """Build a list of nic information tuples.
928

929
  This list is suitable to be passed to _BuildInstanceHookEnv or as a return
930
  value in LUQueryInstanceData.
931

932
  @type lu:  L{LogicalUnit}
933
  @param lu: the logical unit on whose behalf we execute
934
  @type nics: list of L{objects.NIC}
935
  @param nics: list of nics to convert to hooks tuples
936

937
  """
938
  hooks_nics = []
939
  cluster = lu.cfg.GetClusterInfo()
940
  for nic in nics:
941
    ip = nic.ip
942
    mac = nic.mac
943
    filled_params = cluster.SimpleFillNIC(nic.nicparams)
944
    mode = filled_params[constants.NIC_MODE]
945
    link = filled_params[constants.NIC_LINK]
946
    hooks_nics.append((ip, mac, mode, link))
947
  return hooks_nics
948

    
949

    
950
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
951
  """Builds instance related env variables for hooks from an object.
952

953
  @type lu: L{LogicalUnit}
954
  @param lu: the logical unit on whose behalf we execute
955
  @type instance: L{objects.Instance}
956
  @param instance: the instance for which we should build the
957
      environment
958
  @type override: dict
959
  @param override: dictionary with key/values that will override
960
      our values
961
  @rtype: dict
962
  @return: the hook environment dictionary
963

964
  """
965
  cluster = lu.cfg.GetClusterInfo()
966
  bep = cluster.FillBE(instance)
967
  hvp = cluster.FillHV(instance)
968
  args = {
969
    'name': instance.name,
970
    'primary_node': instance.primary_node,
971
    'secondary_nodes': instance.secondary_nodes,
972
    'os_type': instance.os,
973
    'status': instance.admin_up,
974
    'memory': bep[constants.BE_MEMORY],
975
    'vcpus': bep[constants.BE_VCPUS],
976
    'nics': _NICListToTuple(lu, instance.nics),
977
    'disk_template': instance.disk_template,
978
    'disks': [(disk.size, disk.mode) for disk in instance.disks],
979
    'bep': bep,
980
    'hvp': hvp,
981
    'hypervisor_name': instance.hypervisor,
982
  }
983
  if override:
984
    args.update(override)
985
  return _BuildInstanceHookEnv(**args) # pylint: disable-msg=W0142
986

    
987

    
988
def _AdjustCandidatePool(lu, exceptions):
989
  """Adjust the candidate pool after node operations.
990

991
  """
992
  mod_list = lu.cfg.MaintainCandidatePool(exceptions)
993
  if mod_list:
994
    lu.LogInfo("Promoted nodes to master candidate role: %s",
995
               utils.CommaJoin(node.name for node in mod_list))
996
    for name in mod_list:
997
      lu.context.ReaddNode(name)
998
  mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
999
  if mc_now > mc_max:
1000
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
1001
               (mc_now, mc_max))
1002

    
1003

    
1004
def _DecideSelfPromotion(lu, exceptions=None):
1005
  """Decide whether I should promote myself as a master candidate.
1006

1007
  """
1008
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
1009
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
1010
  # the new node will increase mc_max with one, so:
1011
  mc_should = min(mc_should + 1, cp_size)
1012
  return mc_now < mc_should
1013

    
1014

    
1015
def _CheckNicsBridgesExist(lu, target_nics, target_node):
1016
  """Check that the brigdes needed by a list of nics exist.
1017

1018
  """
1019
  cluster = lu.cfg.GetClusterInfo()
1020
  paramslist = [cluster.SimpleFillNIC(nic.nicparams) for nic in target_nics]
1021
  brlist = [params[constants.NIC_LINK] for params in paramslist
1022
            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
1023
  if brlist:
1024
    result = lu.rpc.call_bridges_exist(target_node, brlist)
1025
    result.Raise("Error checking bridges on destination node '%s'" %
1026
                 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
1027

    
1028

    
1029
def _CheckInstanceBridgesExist(lu, instance, node=None):
1030
  """Check that the brigdes needed by an instance exist.
1031

1032
  """
1033
  if node is None:
1034
    node = instance.primary_node
1035
  _CheckNicsBridgesExist(lu, instance.nics, node)
1036

    
1037

    
1038
def _CheckOSVariant(os_obj, name):
1039
  """Check whether an OS name conforms to the os variants specification.
1040

1041
  @type os_obj: L{objects.OS}
1042
  @param os_obj: OS object to check
1043
  @type name: string
1044
  @param name: OS name passed by the user, to check for validity
1045

1046
  """
1047
  if not os_obj.supported_variants:
1048
    return
1049
  try:
1050
    variant = name.split("+", 1)[1]
1051
  except IndexError:
1052
    raise errors.OpPrereqError("OS name must include a variant",
1053
                               errors.ECODE_INVAL)
1054

    
1055
  if variant not in os_obj.supported_variants:
1056
    raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
1057

    
1058

    
1059
def _GetNodeInstancesInner(cfg, fn):
1060
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
1061

    
1062

    
1063
def _GetNodeInstances(cfg, node_name):
1064
  """Returns a list of all primary and secondary instances on a node.
1065

1066
  """
1067

    
1068
  return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
1069

    
1070

    
1071
def _GetNodePrimaryInstances(cfg, node_name):
1072
  """Returns primary instances on a node.
1073

1074
  """
1075
  return _GetNodeInstancesInner(cfg,
1076
                                lambda inst: node_name == inst.primary_node)
1077

    
1078

    
1079
def _GetNodeSecondaryInstances(cfg, node_name):
1080
  """Returns secondary instances on a node.
1081

1082
  """
1083
  return _GetNodeInstancesInner(cfg,
1084
                                lambda inst: node_name in inst.secondary_nodes)
1085

    
1086

    
1087
def _GetStorageTypeArgs(cfg, storage_type):
1088
  """Returns the arguments for a storage type.
1089

1090
  """
1091
  # Special case for file storage
1092
  if storage_type == constants.ST_FILE:
1093
    # storage.FileStorage wants a list of storage directories
1094
    return [[cfg.GetFileStorageDir()]]
1095

    
1096
  return []
1097

    
1098

    
1099
def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
1100
  faulty = []
1101

    
1102
  for dev in instance.disks:
1103
    cfg.SetDiskID(dev, node_name)
1104

    
1105
  result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
1106
  result.Raise("Failed to get disk status from node %s" % node_name,
1107
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
1108

    
1109
  for idx, bdev_status in enumerate(result.payload):
1110
    if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
1111
      faulty.append(idx)
1112

    
1113
  return faulty
1114

    
1115

    
1116
class LUPostInitCluster(LogicalUnit):
1117
  """Logical unit for running hooks after cluster initialization.
1118

1119
  """
1120
  HPATH = "cluster-init"
1121
  HTYPE = constants.HTYPE_CLUSTER
1122
  _OP_REQP = []
1123

    
1124
  def BuildHooksEnv(self):
1125
    """Build hooks env.
1126

1127
    """
1128
    env = {"OP_TARGET": self.cfg.GetClusterName()}
1129
    mn = self.cfg.GetMasterNode()
1130
    return env, [], [mn]
1131

    
1132
  def Exec(self, feedback_fn):
1133
    """Nothing to do.
1134

1135
    """
1136
    return True
1137

    
1138

    
1139
class LUDestroyCluster(LogicalUnit):
1140
  """Logical unit for destroying the cluster.
1141

1142
  """
1143
  HPATH = "cluster-destroy"
1144
  HTYPE = constants.HTYPE_CLUSTER
1145
  _OP_REQP = []
1146

    
1147
  def BuildHooksEnv(self):
1148
    """Build hooks env.
1149

1150
    """
1151
    env = {"OP_TARGET": self.cfg.GetClusterName()}
1152
    return env, [], []
1153

    
1154
  def CheckPrereq(self):
1155
    """Check prerequisites.
1156

1157
    This checks whether the cluster is empty.
1158

1159
    Any errors are signaled by raising errors.OpPrereqError.
1160

1161
    """
1162
    master = self.cfg.GetMasterNode()
1163

    
1164
    nodelist = self.cfg.GetNodeList()
1165
    if len(nodelist) != 1 or nodelist[0] != master:
1166
      raise errors.OpPrereqError("There are still %d node(s) in"
1167
                                 " this cluster." % (len(nodelist) - 1),
1168
                                 errors.ECODE_INVAL)
1169
    instancelist = self.cfg.GetInstanceList()
1170
    if instancelist:
1171
      raise errors.OpPrereqError("There are still %d instance(s) in"
1172
                                 " this cluster." % len(instancelist),
1173
                                 errors.ECODE_INVAL)
1174

    
1175
  def Exec(self, feedback_fn):
1176
    """Destroys the cluster.
1177

1178
    """
1179
    master = self.cfg.GetMasterNode()
1180
    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
1181

    
1182
    # Run post hooks on master node before it's removed
1183
    hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
1184
    try:
1185
      hm.RunPhase(constants.HOOKS_PHASE_POST, [master])
1186
    except:
1187
      # pylint: disable-msg=W0702
1188
      self.LogWarning("Errors occurred running hooks on %s" % master)
1189

    
1190
    result = self.rpc.call_node_stop_master(master, False)
1191
    result.Raise("Could not disable the master role")
1192

    
1193
    if modify_ssh_setup:
1194
      priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1195
      utils.CreateBackup(priv_key)
1196
      utils.CreateBackup(pub_key)
1197

    
1198
    return master
1199

    
1200

    
1201
def _VerifyCertificate(filename):
1202
  """Verifies a certificate for LUVerifyCluster.
1203

1204
  @type filename: string
1205
  @param filename: Path to PEM file
1206

1207
  """
1208
  try:
1209
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1210
                                           utils.ReadFile(filename))
1211
  except Exception, err: # pylint: disable-msg=W0703
1212
    return (LUVerifyCluster.ETYPE_ERROR,
1213
            "Failed to load X509 certificate %s: %s" % (filename, err))
1214

    
1215
  (errcode, msg) = \
1216
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1217
                                constants.SSL_CERT_EXPIRATION_ERROR)
1218

    
1219
  if msg:
1220
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1221
  else:
1222
    fnamemsg = None
1223

    
1224
  if errcode is None:
1225
    return (None, fnamemsg)
1226
  elif errcode == utils.CERT_WARNING:
1227
    return (LUVerifyCluster.ETYPE_WARNING, fnamemsg)
1228
  elif errcode == utils.CERT_ERROR:
1229
    return (LUVerifyCluster.ETYPE_ERROR, fnamemsg)
1230

    
1231
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1232

    
1233

    
1234
class LUVerifyCluster(LogicalUnit):
1235
  """Verifies the cluster status.
1236

1237
  """
1238
  HPATH = "cluster-verify"
1239
  HTYPE = constants.HTYPE_CLUSTER
1240
  _OP_REQP = [
1241
    ("skip_checks", _TListOf(_TElemOf(constants.VERIFY_OPTIONAL_CHECKS))),
1242
    ("verbose", _TBool),
1243
    ("error_codes", _TBool),
1244
    ("debug_simulate_errors", _TBool),
1245
    ]
1246
  REQ_BGL = False
1247

    
1248
  TCLUSTER = "cluster"
1249
  TNODE = "node"
1250
  TINSTANCE = "instance"
1251

    
1252
  ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
1253
  ECLUSTERCERT = (TCLUSTER, "ECLUSTERCERT")
1254
  EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
1255
  EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
1256
  EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
1257
  EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
1258
  EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
1259
  EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE")
1260
  ENODEDRBD = (TNODE, "ENODEDRBD")
1261
  ENODEFILECHECK = (TNODE, "ENODEFILECHECK")
1262
  ENODEHOOKS = (TNODE, "ENODEHOOKS")
1263
  ENODEHV = (TNODE, "ENODEHV")
1264
  ENODELVM = (TNODE, "ENODELVM")
1265
  ENODEN1 = (TNODE, "ENODEN1")
1266
  ENODENET = (TNODE, "ENODENET")
1267
  ENODEOS = (TNODE, "ENODEOS")
1268
  ENODEORPHANINSTANCE = (TNODE, "ENODEORPHANINSTANCE")
1269
  ENODEORPHANLV = (TNODE, "ENODEORPHANLV")
1270
  ENODERPC = (TNODE, "ENODERPC")
1271
  ENODESSH = (TNODE, "ENODESSH")
1272
  ENODEVERSION = (TNODE, "ENODEVERSION")
1273
  ENODESETUP = (TNODE, "ENODESETUP")
1274
  ENODETIME = (TNODE, "ENODETIME")
1275

    
1276
  ETYPE_FIELD = "code"
1277
  ETYPE_ERROR = "ERROR"
1278
  ETYPE_WARNING = "WARNING"
1279

    
1280
  class NodeImage(object):
1281
    """A class representing the logical and physical status of a node.
1282

1283
    @type name: string
1284
    @ivar name: the node name to which this object refers
1285
    @ivar volumes: a structure as returned from
1286
        L{ganeti.backend.GetVolumeList} (runtime)
1287
    @ivar instances: a list of running instances (runtime)
1288
    @ivar pinst: list of configured primary instances (config)
1289
    @ivar sinst: list of configured secondary instances (config)
1290
    @ivar sbp: diction of {secondary-node: list of instances} of all peers
1291
        of this node (config)
1292
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1293
    @ivar dfree: free disk, as reported by the node (runtime)
1294
    @ivar offline: the offline status (config)
1295
    @type rpc_fail: boolean
1296
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1297
        not whether the individual keys were correct) (runtime)
1298
    @type lvm_fail: boolean
1299
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1300
    @type hyp_fail: boolean
1301
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1302
    @type ghost: boolean
1303
    @ivar ghost: whether this is a known node or not (config)
1304
    @type os_fail: boolean
1305
    @ivar os_fail: whether the RPC call didn't return valid OS data
1306
    @type oslist: list
1307
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1308

1309
    """
1310
    def __init__(self, offline=False, name=None):
1311
      self.name = name
1312
      self.volumes = {}
1313
      self.instances = []
1314
      self.pinst = []
1315
      self.sinst = []
1316
      self.sbp = {}
1317
      self.mfree = 0
1318
      self.dfree = 0
1319
      self.offline = offline
1320
      self.rpc_fail = False
1321
      self.lvm_fail = False
1322
      self.hyp_fail = False
1323
      self.ghost = False
1324
      self.os_fail = False
1325
      self.oslist = {}
1326

    
1327
  def ExpandNames(self):
1328
    self.needed_locks = {
1329
      locking.LEVEL_NODE: locking.ALL_SET,
1330
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1331
    }
1332
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1333

    
1334
  def _Error(self, ecode, item, msg, *args, **kwargs):
1335
    """Format an error message.
1336

1337
    Based on the opcode's error_codes parameter, either format a
1338
    parseable error code, or a simpler error string.
1339

1340
    This must be called only from Exec and functions called from Exec.
1341

1342
    """
1343
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1344
    itype, etxt = ecode
1345
    # first complete the msg
1346
    if args:
1347
      msg = msg % args
1348
    # then format the whole message
1349
    if self.op.error_codes:
1350
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1351
    else:
1352
      if item:
1353
        item = " " + item
1354
      else:
1355
        item = ""
1356
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1357
    # and finally report it via the feedback_fn
1358
    self._feedback_fn("  - %s" % msg)
1359

    
1360
  def _ErrorIf(self, cond, *args, **kwargs):
1361
    """Log an error message if the passed condition is True.
1362

1363
    """
1364
    cond = bool(cond) or self.op.debug_simulate_errors
1365
    if cond:
1366
      self._Error(*args, **kwargs)
1367
    # do not mark the operation as failed for WARN cases only
1368
    if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
1369
      self.bad = self.bad or cond
1370

    
1371
  def _VerifyNode(self, ninfo, nresult):
1372
    """Run multiple tests against a node.
1373

1374
    Test list:
1375

1376
      - compares ganeti version
1377
      - checks vg existence and size > 20G
1378
      - checks config file checksum
1379
      - checks ssh to other nodes
1380

1381
    @type ninfo: L{objects.Node}
1382
    @param ninfo: the node to check
1383
    @param nresult: the results from the node
1384
    @rtype: boolean
1385
    @return: whether overall this call was successful (and we can expect
1386
         reasonable values in the respose)
1387

1388
    """
1389
    node = ninfo.name
1390
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1391

    
1392
    # main result, nresult should be a non-empty dict
1393
    test = not nresult or not isinstance(nresult, dict)
1394
    _ErrorIf(test, self.ENODERPC, node,
1395
                  "unable to verify node: no data returned")
1396
    if test:
1397
      return False
1398

    
1399
    # compares ganeti version
1400
    local_version = constants.PROTOCOL_VERSION
1401
    remote_version = nresult.get("version", None)
1402
    test = not (remote_version and
1403
                isinstance(remote_version, (list, tuple)) and
1404
                len(remote_version) == 2)
1405
    _ErrorIf(test, self.ENODERPC, node,
1406
             "connection to node returned invalid data")
1407
    if test:
1408
      return False
1409

    
1410
    test = local_version != remote_version[0]
1411
    _ErrorIf(test, self.ENODEVERSION, node,
1412
             "incompatible protocol versions: master %s,"
1413
             " node %s", local_version, remote_version[0])
1414
    if test:
1415
      return False
1416

    
1417
    # node seems compatible, we can actually try to look into its results
1418

    
1419
    # full package version
1420
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1421
                  self.ENODEVERSION, node,
1422
                  "software version mismatch: master %s, node %s",
1423
                  constants.RELEASE_VERSION, remote_version[1],
1424
                  code=self.ETYPE_WARNING)
1425

    
1426
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1427
    if isinstance(hyp_result, dict):
1428
      for hv_name, hv_result in hyp_result.iteritems():
1429
        test = hv_result is not None
1430
        _ErrorIf(test, self.ENODEHV, node,
1431
                 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1432

    
1433

    
1434
    test = nresult.get(constants.NV_NODESETUP,
1435
                           ["Missing NODESETUP results"])
1436
    _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s",
1437
             "; ".join(test))
1438

    
1439
    return True
1440

    
1441
  def _VerifyNodeTime(self, ninfo, nresult,
1442
                      nvinfo_starttime, nvinfo_endtime):
1443
    """Check the node time.
1444

1445
    @type ninfo: L{objects.Node}
1446
    @param ninfo: the node to check
1447
    @param nresult: the remote results for the node
1448
    @param nvinfo_starttime: the start time of the RPC call
1449
    @param nvinfo_endtime: the end time of the RPC call
1450

1451
    """
1452
    node = ninfo.name
1453
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1454

    
1455
    ntime = nresult.get(constants.NV_TIME, None)
1456
    try:
1457
      ntime_merged = utils.MergeTime(ntime)
1458
    except (ValueError, TypeError):
1459
      _ErrorIf(True, self.ENODETIME, node, "Node returned invalid time")
1460
      return
1461

    
1462
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1463
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1464
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1465
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1466
    else:
1467
      ntime_diff = None
1468

    
1469
    _ErrorIf(ntime_diff is not None, self.ENODETIME, node,
1470
             "Node time diverges by at least %s from master node time",
1471
             ntime_diff)
1472

    
1473
  def _VerifyNodeLVM(self, ninfo, nresult, vg_name):
1474
    """Check the node time.
1475

1476
    @type ninfo: L{objects.Node}
1477
    @param ninfo: the node to check
1478
    @param nresult: the remote results for the node
1479
    @param vg_name: the configured VG name
1480

1481
    """
1482
    if vg_name is None:
1483
      return
1484

    
1485
    node = ninfo.name
1486
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1487

    
1488
    # checks vg existence and size > 20G
1489
    vglist = nresult.get(constants.NV_VGLIST, None)
1490
    test = not vglist
1491
    _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups")
1492
    if not test:
1493
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1494
                                            constants.MIN_VG_SIZE)
1495
      _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus)
1496

    
1497
    # check pv names
1498
    pvlist = nresult.get(constants.NV_PVLIST, None)
1499
    test = pvlist is None
1500
    _ErrorIf(test, self.ENODELVM, node, "Can't get PV list from node")
1501
    if not test:
1502
      # check that ':' is not present in PV names, since it's a
1503
      # special character for lvcreate (denotes the range of PEs to
1504
      # use on the PV)
1505
      for _, pvname, owner_vg in pvlist:
1506
        test = ":" in pvname
1507
        _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV"
1508
                 " '%s' of VG '%s'", pvname, owner_vg)
1509

    
1510
  def _VerifyNodeNetwork(self, ninfo, nresult):
1511
    """Check the node time.
1512

1513
    @type ninfo: L{objects.Node}
1514
    @param ninfo: the node to check
1515
    @param nresult: the remote results for the node
1516

1517
    """
1518
    node = ninfo.name
1519
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1520

    
1521
    test = constants.NV_NODELIST not in nresult
1522
    _ErrorIf(test, self.ENODESSH, node,
1523
             "node hasn't returned node ssh connectivity data")
1524
    if not test:
1525
      if nresult[constants.NV_NODELIST]:
1526
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1527
          _ErrorIf(True, self.ENODESSH, node,
1528
                   "ssh communication with node '%s': %s", a_node, a_msg)
1529

    
1530
    test = constants.NV_NODENETTEST not in nresult
1531
    _ErrorIf(test, self.ENODENET, node,
1532
             "node hasn't returned node tcp connectivity data")
1533
    if not test:
1534
      if nresult[constants.NV_NODENETTEST]:
1535
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1536
        for anode in nlist:
1537
          _ErrorIf(True, self.ENODENET, node,
1538
                   "tcp communication with node '%s': %s",
1539
                   anode, nresult[constants.NV_NODENETTEST][anode])
1540

    
1541
    test = constants.NV_MASTERIP not in nresult
1542
    _ErrorIf(test, self.ENODENET, node,
1543
             "node hasn't returned node master IP reachability data")
1544
    if not test:
1545
      if not nresult[constants.NV_MASTERIP]:
1546
        if node == self.master_node:
1547
          msg = "the master node cannot reach the master IP (not configured?)"
1548
        else:
1549
          msg = "cannot reach the master IP"
1550
        _ErrorIf(True, self.ENODENET, node, msg)
1551

    
1552

    
1553
  def _VerifyInstance(self, instance, instanceconfig, node_image):
1554
    """Verify an instance.
1555

1556
    This function checks to see if the required block devices are
1557
    available on the instance's node.
1558

1559
    """
1560
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1561
    node_current = instanceconfig.primary_node
1562

    
1563
    node_vol_should = {}
1564
    instanceconfig.MapLVsByNode(node_vol_should)
1565

    
1566
    for node in node_vol_should:
1567
      n_img = node_image[node]
1568
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1569
        # ignore missing volumes on offline or broken nodes
1570
        continue
1571
      for volume in node_vol_should[node]:
1572
        test = volume not in n_img.volumes
1573
        _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance,
1574
                 "volume %s missing on node %s", volume, node)
1575

    
1576
    if instanceconfig.admin_up:
1577
      pri_img = node_image[node_current]
1578
      test = instance not in pri_img.instances and not pri_img.offline
1579
      _ErrorIf(test, self.EINSTANCEDOWN, instance,
1580
               "instance not running on its primary node %s",
1581
               node_current)
1582

    
1583
    for node, n_img in node_image.items():
1584
      if (not node == node_current):
1585
        test = instance in n_img.instances
1586
        _ErrorIf(test, self.EINSTANCEWRONGNODE, instance,
1587
                 "instance should not run on node %s", node)
1588

    
1589
  def _VerifyOrphanVolumes(self, node_vol_should, node_image):
1590
    """Verify if there are any unknown volumes in the cluster.
1591

1592
    The .os, .swap and backup volumes are ignored. All other volumes are
1593
    reported as unknown.
1594

1595
    """
1596
    for node, n_img in node_image.items():
1597
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1598
        # skip non-healthy nodes
1599
        continue
1600
      for volume in n_img.volumes:
1601
        test = (node not in node_vol_should or
1602
                volume not in node_vol_should[node])
1603
        self._ErrorIf(test, self.ENODEORPHANLV, node,
1604
                      "volume %s is unknown", volume)
1605

    
1606
  def _VerifyOrphanInstances(self, instancelist, node_image):
1607
    """Verify the list of running instances.
1608

1609
    This checks what instances are running but unknown to the cluster.
1610

1611
    """
1612
    for node, n_img in node_image.items():
1613
      for o_inst in n_img.instances:
1614
        test = o_inst not in instancelist
1615
        self._ErrorIf(test, self.ENODEORPHANINSTANCE, node,
1616
                      "instance %s on node %s should not exist", o_inst, node)
1617

    
1618
  def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
1619
    """Verify N+1 Memory Resilience.
1620

1621
    Check that if one single node dies we can still start all the
1622
    instances it was primary for.
1623

1624
    """
1625
    for node, n_img in node_image.items():
1626
      # This code checks that every node which is now listed as
1627
      # secondary has enough memory to host all instances it is
1628
      # supposed to should a single other node in the cluster fail.
1629
      # FIXME: not ready for failover to an arbitrary node
1630
      # FIXME: does not support file-backed instances
1631
      # WARNING: we currently take into account down instances as well
1632
      # as up ones, considering that even if they're down someone
1633
      # might want to start them even in the event of a node failure.
1634
      for prinode, instances in n_img.sbp.items():
1635
        needed_mem = 0
1636
        for instance in instances:
1637
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
1638
          if bep[constants.BE_AUTO_BALANCE]:
1639
            needed_mem += bep[constants.BE_MEMORY]
1640
        test = n_img.mfree < needed_mem
1641
        self._ErrorIf(test, self.ENODEN1, node,
1642
                      "not enough memory on to accommodate"
1643
                      " failovers should peer node %s fail", prinode)
1644

    
1645
  def _VerifyNodeFiles(self, ninfo, nresult, file_list, local_cksum,
1646
                       master_files):
1647
    """Verifies and computes the node required file checksums.
1648

1649
    @type ninfo: L{objects.Node}
1650
    @param ninfo: the node to check
1651
    @param nresult: the remote results for the node
1652
    @param file_list: required list of files
1653
    @param local_cksum: dictionary of local files and their checksums
1654
    @param master_files: list of files that only masters should have
1655

1656
    """
1657
    node = ninfo.name
1658
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1659

    
1660
    remote_cksum = nresult.get(constants.NV_FILELIST, None)
1661
    test = not isinstance(remote_cksum, dict)
1662
    _ErrorIf(test, self.ENODEFILECHECK, node,
1663
             "node hasn't returned file checksum data")
1664
    if test:
1665
      return
1666

    
1667
    for file_name in file_list:
1668
      node_is_mc = ninfo.master_candidate
1669
      must_have = (file_name not in master_files) or node_is_mc
1670
      # missing
1671
      test1 = file_name not in remote_cksum
1672
      # invalid checksum
1673
      test2 = not test1 and remote_cksum[file_name] != local_cksum[file_name]
1674
      # existing and good
1675
      test3 = not test1 and remote_cksum[file_name] == local_cksum[file_name]
1676
      _ErrorIf(test1 and must_have, self.ENODEFILECHECK, node,
1677
               "file '%s' missing", file_name)
1678
      _ErrorIf(test2 and must_have, self.ENODEFILECHECK, node,
1679
               "file '%s' has wrong checksum", file_name)
1680
      # not candidate and this is not a must-have file
1681
      _ErrorIf(test2 and not must_have, self.ENODEFILECHECK, node,
1682
               "file '%s' should not exist on non master"
1683
               " candidates (and the file is outdated)", file_name)
1684
      # all good, except non-master/non-must have combination
1685
      _ErrorIf(test3 and not must_have, self.ENODEFILECHECK, node,
1686
               "file '%s' should not exist"
1687
               " on non master candidates", file_name)
1688

    
1689
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_map):
1690
    """Verifies and the node DRBD status.
1691

1692
    @type ninfo: L{objects.Node}
1693
    @param ninfo: the node to check
1694
    @param nresult: the remote results for the node
1695
    @param instanceinfo: the dict of instances
1696
    @param drbd_map: the DRBD map as returned by
1697
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
1698

1699
    """
1700
    node = ninfo.name
1701
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1702

    
1703
    # compute the DRBD minors
1704
    node_drbd = {}
1705
    for minor, instance in drbd_map[node].items():
1706
      test = instance not in instanceinfo
1707
      _ErrorIf(test, self.ECLUSTERCFG, None,
1708
               "ghost instance '%s' in temporary DRBD map", instance)
1709
        # ghost instance should not be running, but otherwise we
1710
        # don't give double warnings (both ghost instance and
1711
        # unallocated minor in use)
1712
      if test:
1713
        node_drbd[minor] = (instance, False)
1714
      else:
1715
        instance = instanceinfo[instance]
1716
        node_drbd[minor] = (instance.name, instance.admin_up)
1717

    
1718
    # and now check them
1719
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
1720
    test = not isinstance(used_minors, (tuple, list))
1721
    _ErrorIf(test, self.ENODEDRBD, node,
1722
             "cannot parse drbd status file: %s", str(used_minors))
1723
    if test:
1724
      # we cannot check drbd status
1725
      return
1726

    
1727
    for minor, (iname, must_exist) in node_drbd.items():
1728
      test = minor not in used_minors and must_exist
1729
      _ErrorIf(test, self.ENODEDRBD, node,
1730
               "drbd minor %d of instance %s is not active", minor, iname)
1731
    for minor in used_minors:
1732
      test = minor not in node_drbd
1733
      _ErrorIf(test, self.ENODEDRBD, node,
1734
               "unallocated drbd minor %d is in use", minor)
1735

    
1736
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
1737
    """Builds the node OS structures.
1738

1739
    @type ninfo: L{objects.Node}
1740
    @param ninfo: the node to check
1741
    @param nresult: the remote results for the node
1742
    @param nimg: the node image object
1743

1744
    """
1745
    node = ninfo.name
1746
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1747

    
1748
    remote_os = nresult.get(constants.NV_OSLIST, None)
1749
    test = (not isinstance(remote_os, list) or
1750
            not compat.all(isinstance(v, list) and len(v) == 7
1751
                           for v in remote_os))
1752

    
1753
    _ErrorIf(test, self.ENODEOS, node,
1754
             "node hasn't returned valid OS data")
1755

    
1756
    nimg.os_fail = test
1757

    
1758
    if test:
1759
      return
1760

    
1761
    os_dict = {}
1762

    
1763
    for (name, os_path, status, diagnose,
1764
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
1765

    
1766
      if name not in os_dict:
1767
        os_dict[name] = []
1768

    
1769
      # parameters is a list of lists instead of list of tuples due to
1770
      # JSON lacking a real tuple type, fix it:
1771
      parameters = [tuple(v) for v in parameters]
1772
      os_dict[name].append((os_path, status, diagnose,
1773
                            set(variants), set(parameters), set(api_ver)))
1774

    
1775
    nimg.oslist = os_dict
1776

    
1777
  def _VerifyNodeOS(self, ninfo, nimg, base):
1778
    """Verifies the node OS list.
1779

1780
    @type ninfo: L{objects.Node}
1781
    @param ninfo: the node to check
1782
    @param nimg: the node image object
1783
    @param base: the 'template' node we match against (e.g. from the master)
1784

1785
    """
1786
    node = ninfo.name
1787
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1788

    
1789
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
1790

    
1791
    for os_name, os_data in nimg.oslist.items():
1792
      assert os_data, "Empty OS status for OS %s?!" % os_name
1793
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
1794
      _ErrorIf(not f_status, self.ENODEOS, node,
1795
               "Invalid OS %s (located at %s): %s", os_name, f_path, f_diag)
1796
      _ErrorIf(len(os_data) > 1, self.ENODEOS, node,
1797
               "OS '%s' has multiple entries (first one shadows the rest): %s",
1798
               os_name, utils.CommaJoin([v[0] for v in os_data]))
1799
      # this will catched in backend too
1800
      _ErrorIf(compat.any(v >= constants.OS_API_V15 for v in f_api)
1801
               and not f_var, self.ENODEOS, node,
1802
               "OS %s with API at least %d does not declare any variant",
1803
               os_name, constants.OS_API_V15)
1804
      # comparisons with the 'base' image
1805
      test = os_name not in base.oslist
1806
      _ErrorIf(test, self.ENODEOS, node,
1807
               "Extra OS %s not present on reference node (%s)",
1808
               os_name, base.name)
1809
      if test:
1810
        continue
1811
      assert base.oslist[os_name], "Base node has empty OS status?"
1812
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
1813
      if not b_status:
1814
        # base OS is invalid, skipping
1815
        continue
1816
      for kind, a, b in [("API version", f_api, b_api),
1817
                         ("variants list", f_var, b_var),
1818
                         ("parameters", f_param, b_param)]:
1819
        _ErrorIf(a != b, self.ENODEOS, node,
1820
                 "OS %s %s differs from reference node %s: %s vs. %s",
1821
                 kind, os_name, base.name,
1822
                 utils.CommaJoin(a), utils.CommaJoin(b))
1823

    
1824
    # check any missing OSes
1825
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
1826
    _ErrorIf(missing, self.ENODEOS, node,
1827
             "OSes present on reference node %s but missing on this node: %s",
1828
             base.name, utils.CommaJoin(missing))
1829

    
1830
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
1831
    """Verifies and updates the node volume data.
1832

1833
    This function will update a L{NodeImage}'s internal structures
1834
    with data from the remote call.
1835

1836
    @type ninfo: L{objects.Node}
1837
    @param ninfo: the node to check
1838
    @param nresult: the remote results for the node
1839
    @param nimg: the node image object
1840
    @param vg_name: the configured VG name
1841

1842
    """
1843
    node = ninfo.name
1844
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1845

    
1846
    nimg.lvm_fail = True
1847
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1848
    if vg_name is None:
1849
      pass
1850
    elif isinstance(lvdata, basestring):
1851
      _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s",
1852
               utils.SafeEncode(lvdata))
1853
    elif not isinstance(lvdata, dict):
1854
      _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)")
1855
    else:
1856
      nimg.volumes = lvdata
1857
      nimg.lvm_fail = False
1858

    
1859
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
1860
    """Verifies and updates the node instance list.
1861

1862
    If the listing was successful, then updates this node's instance
1863
    list. Otherwise, it marks the RPC call as failed for the instance
1864
    list key.
1865

1866
    @type ninfo: L{objects.Node}
1867
    @param ninfo: the node to check
1868
    @param nresult: the remote results for the node
1869
    @param nimg: the node image object
1870

1871
    """
1872
    idata = nresult.get(constants.NV_INSTANCELIST, None)
1873
    test = not isinstance(idata, list)
1874
    self._ErrorIf(test, self.ENODEHV, ninfo.name, "rpc call to node failed"
1875
                  " (instancelist): %s", utils.SafeEncode(str(idata)))
1876
    if test:
1877
      nimg.hyp_fail = True
1878
    else:
1879
      nimg.instances = idata
1880

    
1881
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
1882
    """Verifies and computes a node information map
1883

1884
    @type ninfo: L{objects.Node}
1885
    @param ninfo: the node to check
1886
    @param nresult: the remote results for the node
1887
    @param nimg: the node image object
1888
    @param vg_name: the configured VG name
1889

1890
    """
1891
    node = ninfo.name
1892
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1893

    
1894
    # try to read free memory (from the hypervisor)
1895
    hv_info = nresult.get(constants.NV_HVINFO, None)
1896
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
1897
    _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)")
1898
    if not test:
1899
      try:
1900
        nimg.mfree = int(hv_info["memory_free"])
1901
      except (ValueError, TypeError):
1902
        _ErrorIf(True, self.ENODERPC, node,
1903
                 "node returned invalid nodeinfo, check hypervisor")
1904

    
1905
    # FIXME: devise a free space model for file based instances as well
1906
    if vg_name is not None:
1907
      test = (constants.NV_VGLIST not in nresult or
1908
              vg_name not in nresult[constants.NV_VGLIST])
1909
      _ErrorIf(test, self.ENODELVM, node,
1910
               "node didn't return data for the volume group '%s'"
1911
               " - it is either missing or broken", vg_name)
1912
      if not test:
1913
        try:
1914
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
1915
        except (ValueError, TypeError):
1916
          _ErrorIf(True, self.ENODERPC, node,
1917
                   "node returned invalid LVM info, check LVM status")
1918

    
1919
  def BuildHooksEnv(self):
1920
    """Build hooks env.
1921

1922
    Cluster-Verify hooks just ran in the post phase and their failure makes
1923
    the output be logged in the verify output and the verification to fail.
1924

1925
    """
1926
    all_nodes = self.cfg.GetNodeList()
1927
    env = {
1928
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
1929
      }
1930
    for node in self.cfg.GetAllNodesInfo().values():
1931
      env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
1932

    
1933
    return env, [], all_nodes
1934

    
1935
  def Exec(self, feedback_fn):
1936
    """Verify integrity of cluster, performing various test on nodes.
1937

1938
    """
1939
    self.bad = False
1940
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1941
    verbose = self.op.verbose
1942
    self._feedback_fn = feedback_fn
1943
    feedback_fn("* Verifying global settings")
1944
    for msg in self.cfg.VerifyConfig():
1945
      _ErrorIf(True, self.ECLUSTERCFG, None, msg)
1946

    
1947
    # Check the cluster certificates
1948
    for cert_filename in constants.ALL_CERT_FILES:
1949
      (errcode, msg) = _VerifyCertificate(cert_filename)
1950
      _ErrorIf(errcode, self.ECLUSTERCERT, None, msg, code=errcode)
1951

    
1952
    vg_name = self.cfg.GetVGName()
1953
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
1954
    cluster = self.cfg.GetClusterInfo()
1955
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
1956
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
1957
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
1958
    instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
1959
                        for iname in instancelist)
1960
    i_non_redundant = [] # Non redundant instances
1961
    i_non_a_balanced = [] # Non auto-balanced instances
1962
    n_offline = 0 # Count of offline nodes
1963
    n_drained = 0 # Count of nodes being drained
1964
    node_vol_should = {}
1965

    
1966
    # FIXME: verify OS list
1967
    # do local checksums
1968
    master_files = [constants.CLUSTER_CONF_FILE]
1969
    master_node = self.master_node = self.cfg.GetMasterNode()
1970
    master_ip = self.cfg.GetMasterIP()
1971

    
1972
    file_names = ssconf.SimpleStore().GetFileList()
1973
    file_names.extend(constants.ALL_CERT_FILES)
1974
    file_names.extend(master_files)
1975
    if cluster.modify_etc_hosts:
1976
      file_names.append(constants.ETC_HOSTS)
1977

    
1978
    local_checksums = utils.FingerprintFiles(file_names)
1979

    
1980
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1981
    node_verify_param = {
1982
      constants.NV_FILELIST: file_names,
1983
      constants.NV_NODELIST: [node.name for node in nodeinfo
1984
                              if not node.offline],
1985
      constants.NV_HYPERVISOR: hypervisors,
1986
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1987
                                  node.secondary_ip) for node in nodeinfo
1988
                                 if not node.offline],
1989
      constants.NV_INSTANCELIST: hypervisors,
1990
      constants.NV_VERSION: None,
1991
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1992
      constants.NV_NODESETUP: None,
1993
      constants.NV_TIME: None,
1994
      constants.NV_MASTERIP: (master_node, master_ip),
1995
      constants.NV_OSLIST: None,
1996
      }
1997

    
1998
    if vg_name is not None:
1999
      node_verify_param[constants.NV_VGLIST] = None
2000
      node_verify_param[constants.NV_LVLIST] = vg_name
2001
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2002
      node_verify_param[constants.NV_DRBDLIST] = None
2003

    
2004
    # Build our expected cluster state
2005
    node_image = dict((node.name, self.NodeImage(offline=node.offline,
2006
                                                 name=node.name))
2007
                      for node in nodeinfo)
2008

    
2009
    for instance in instancelist:
2010
      inst_config = instanceinfo[instance]
2011

    
2012
      for nname in inst_config.all_nodes:
2013
        if nname not in node_image:
2014
          # ghost node
2015
          gnode = self.NodeImage(name=nname)
2016
          gnode.ghost = True
2017
          node_image[nname] = gnode
2018

    
2019
      inst_config.MapLVsByNode(node_vol_should)
2020

    
2021
      pnode = inst_config.primary_node
2022
      node_image[pnode].pinst.append(instance)
2023

    
2024
      for snode in inst_config.secondary_nodes:
2025
        nimg = node_image[snode]
2026
        nimg.sinst.append(instance)
2027
        if pnode not in nimg.sbp:
2028
          nimg.sbp[pnode] = []
2029
        nimg.sbp[pnode].append(instance)
2030

    
2031
    # At this point, we have the in-memory data structures complete,
2032
    # except for the runtime information, which we'll gather next
2033

    
2034
    # Due to the way our RPC system works, exact response times cannot be
2035
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2036
    # time before and after executing the request, we can at least have a time
2037
    # window.
2038
    nvinfo_starttime = time.time()
2039
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
2040
                                           self.cfg.GetClusterName())
2041
    nvinfo_endtime = time.time()
2042

    
2043
    all_drbd_map = self.cfg.ComputeDRBDMap()
2044

    
2045
    feedback_fn("* Verifying node status")
2046

    
2047
    refos_img = None
2048

    
2049
    for node_i in nodeinfo:
2050
      node = node_i.name
2051
      nimg = node_image[node]
2052

    
2053
      if node_i.offline:
2054
        if verbose:
2055
          feedback_fn("* Skipping offline node %s" % (node,))
2056
        n_offline += 1
2057
        continue
2058

    
2059
      if node == master_node:
2060
        ntype = "master"
2061
      elif node_i.master_candidate:
2062
        ntype = "master candidate"
2063
      elif node_i.drained:
2064
        ntype = "drained"
2065
        n_drained += 1
2066
      else:
2067
        ntype = "regular"
2068
      if verbose:
2069
        feedback_fn("* Verifying node %s (%s)" % (node, ntype))
2070

    
2071
      msg = all_nvinfo[node].fail_msg
2072
      _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg)
2073
      if msg:
2074
        nimg.rpc_fail = True
2075
        continue
2076

    
2077
      nresult = all_nvinfo[node].payload
2078

    
2079
      nimg.call_ok = self._VerifyNode(node_i, nresult)
2080
      self._VerifyNodeNetwork(node_i, nresult)
2081
      self._VerifyNodeLVM(node_i, nresult, vg_name)
2082
      self._VerifyNodeFiles(node_i, nresult, file_names, local_checksums,
2083
                            master_files)
2084
      self._VerifyNodeDrbd(node_i, nresult, instanceinfo, all_drbd_map)
2085
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2086

    
2087
      self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2088
      self._UpdateNodeInstances(node_i, nresult, nimg)
2089
      self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2090
      self._UpdateNodeOS(node_i, nresult, nimg)
2091
      if not nimg.os_fail:
2092
        if refos_img is None:
2093
          refos_img = nimg
2094
        self._VerifyNodeOS(node_i, nimg, refos_img)
2095

    
2096
    feedback_fn("* Verifying instance status")
2097
    for instance in instancelist:
2098
      if verbose:
2099
        feedback_fn("* Verifying instance %s" % instance)
2100
      inst_config = instanceinfo[instance]
2101
      self._VerifyInstance(instance, inst_config, node_image)
2102
      inst_nodes_offline = []
2103

    
2104
      pnode = inst_config.primary_node
2105
      pnode_img = node_image[pnode]
2106
      _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2107
               self.ENODERPC, pnode, "instance %s, connection to"
2108
               " primary node failed", instance)
2109

    
2110
      if pnode_img.offline:
2111
        inst_nodes_offline.append(pnode)
2112

    
2113
      # If the instance is non-redundant we cannot survive losing its primary
2114
      # node, so we are not N+1 compliant. On the other hand we have no disk
2115
      # templates with more than one secondary so that situation is not well
2116
      # supported either.
2117
      # FIXME: does not support file-backed instances
2118
      if not inst_config.secondary_nodes:
2119
        i_non_redundant.append(instance)
2120
      _ErrorIf(len(inst_config.secondary_nodes) > 1, self.EINSTANCELAYOUT,
2121
               instance, "instance has multiple secondary nodes: %s",
2122
               utils.CommaJoin(inst_config.secondary_nodes),
2123
               code=self.ETYPE_WARNING)
2124

    
2125
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
2126
        i_non_a_balanced.append(instance)
2127

    
2128
      for snode in inst_config.secondary_nodes:
2129
        s_img = node_image[snode]
2130
        _ErrorIf(s_img.rpc_fail and not s_img.offline, self.ENODERPC, snode,
2131
                 "instance %s, connection to secondary node failed", instance)
2132

    
2133
        if s_img.offline:
2134
          inst_nodes_offline.append(snode)
2135

    
2136
      # warn that the instance lives on offline nodes
2137
      _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance,
2138
               "instance lives on offline node(s) %s",
2139
               utils.CommaJoin(inst_nodes_offline))
2140
      # ... or ghost nodes
2141
      for node in inst_config.all_nodes:
2142
        _ErrorIf(node_image[node].ghost, self.EINSTANCEBADNODE, instance,
2143
                 "instance lives on ghost node %s", node)
2144

    
2145
    feedback_fn("* Verifying orphan volumes")
2146
    self._VerifyOrphanVolumes(node_vol_should, node_image)
2147

    
2148
    feedback_fn("* Verifying orphan instances")
2149
    self._VerifyOrphanInstances(instancelist, node_image)
2150

    
2151
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
2152
      feedback_fn("* Verifying N+1 Memory redundancy")
2153
      self._VerifyNPlusOneMemory(node_image, instanceinfo)
2154

    
2155
    feedback_fn("* Other Notes")
2156
    if i_non_redundant:
2157
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
2158
                  % len(i_non_redundant))
2159

    
2160
    if i_non_a_balanced:
2161
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
2162
                  % len(i_non_a_balanced))
2163

    
2164
    if n_offline:
2165
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
2166

    
2167
    if n_drained:
2168
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
2169

    
2170
    return not self.bad
2171

    
2172
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
2173
    """Analyze the post-hooks' result
2174

2175
    This method analyses the hook result, handles it, and sends some
2176
    nicely-formatted feedback back to the user.
2177

2178
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
2179
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
2180
    @param hooks_results: the results of the multi-node hooks rpc call
2181
    @param feedback_fn: function used send feedback back to the caller
2182
    @param lu_result: previous Exec result
2183
    @return: the new Exec result, based on the previous result
2184
        and hook results
2185

2186
    """
2187
    # We only really run POST phase hooks, and are only interested in
2188
    # their results
2189
    if phase == constants.HOOKS_PHASE_POST:
2190
      # Used to change hooks' output to proper indentation
2191
      indent_re = re.compile('^', re.M)
2192
      feedback_fn("* Hooks Results")
2193
      assert hooks_results, "invalid result from hooks"
2194

    
2195
      for node_name in hooks_results:
2196
        res = hooks_results[node_name]
2197
        msg = res.fail_msg
2198
        test = msg and not res.offline
2199
        self._ErrorIf(test, self.ENODEHOOKS, node_name,
2200
                      "Communication failure in hooks execution: %s", msg)
2201
        if res.offline or msg:
2202
          # No need to investigate payload if node is offline or gave an error.
2203
          # override manually lu_result here as _ErrorIf only
2204
          # overrides self.bad
2205
          lu_result = 1
2206
          continue
2207
        for script, hkr, output in res.payload:
2208
          test = hkr == constants.HKR_FAIL
2209
          self._ErrorIf(test, self.ENODEHOOKS, node_name,
2210
                        "Script %s failed, output:", script)
2211
          if test:
2212
            output = indent_re.sub('      ', output)
2213
            feedback_fn("%s" % output)
2214
            lu_result = 0
2215

    
2216
      return lu_result
2217

    
2218

    
2219
class LUVerifyDisks(NoHooksLU):
2220
  """Verifies the cluster disks status.
2221

2222
  """
2223
  _OP_REQP = []
2224
  REQ_BGL = False
2225

    
2226
  def ExpandNames(self):
2227
    self.needed_locks = {
2228
      locking.LEVEL_NODE: locking.ALL_SET,
2229
      locking.LEVEL_INSTANCE: locking.ALL_SET,
2230
    }
2231
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
2232

    
2233
  def Exec(self, feedback_fn):
2234
    """Verify integrity of cluster disks.
2235

2236
    @rtype: tuple of three items
2237
    @return: a tuple of (dict of node-to-node_error, list of instances
2238
        which need activate-disks, dict of instance: (node, volume) for
2239
        missing volumes
2240

2241
    """
2242
    result = res_nodes, res_instances, res_missing = {}, [], {}
2243

    
2244
    vg_name = self.cfg.GetVGName()
2245
    nodes = utils.NiceSort(self.cfg.GetNodeList())
2246
    instances = [self.cfg.GetInstanceInfo(name)
2247
                 for name in self.cfg.GetInstanceList()]
2248

    
2249
    nv_dict = {}
2250
    for inst in instances:
2251
      inst_lvs = {}
2252
      if (not inst.admin_up or
2253
          inst.disk_template not in constants.DTS_NET_MIRROR):
2254
        continue
2255
      inst.MapLVsByNode(inst_lvs)
2256
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
2257
      for node, vol_list in inst_lvs.iteritems():
2258
        for vol in vol_list:
2259
          nv_dict[(node, vol)] = inst
2260

    
2261
    if not nv_dict:
2262
      return result
2263

    
2264
    node_lvs = self.rpc.call_lv_list(nodes, vg_name)
2265

    
2266
    for node in nodes:
2267
      # node_volume
2268
      node_res = node_lvs[node]
2269
      if node_res.offline:
2270
        continue
2271
      msg = node_res.fail_msg
2272
      if msg:
2273
        logging.warning("Error enumerating LVs on node %s: %s", node, msg)
2274
        res_nodes[node] = msg
2275
        continue
2276

    
2277
      lvs = node_res.payload
2278
      for lv_name, (_, _, lv_online) in lvs.items():
2279
        inst = nv_dict.pop((node, lv_name), None)
2280
        if (not lv_online and inst is not None
2281
            and inst.name not in res_instances):
2282
          res_instances.append(inst.name)
2283

    
2284
    # any leftover items in nv_dict are missing LVs, let's arrange the
2285
    # data better
2286
    for key, inst in nv_dict.iteritems():
2287
      if inst.name not in res_missing:
2288
        res_missing[inst.name] = []
2289
      res_missing[inst.name].append(key)
2290

    
2291
    return result
2292

    
2293

    
2294
class LURepairDiskSizes(NoHooksLU):
2295
  """Verifies the cluster disks sizes.
2296

2297
  """
2298
  _OP_REQP = [("instances", _TListOf(_TNonEmptyString))]
2299
  REQ_BGL = False
2300

    
2301
  def ExpandNames(self):
2302
    if self.op.instances:
2303
      self.wanted_names = []
2304
      for name in self.op.instances:
2305
        full_name = _ExpandInstanceName(self.cfg, name)
2306
        self.wanted_names.append(full_name)
2307
      self.needed_locks = {
2308
        locking.LEVEL_NODE: [],
2309
        locking.LEVEL_INSTANCE: self.wanted_names,
2310
        }
2311
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2312
    else:
2313
      self.wanted_names = None
2314
      self.needed_locks = {
2315
        locking.LEVEL_NODE: locking.ALL_SET,
2316
        locking.LEVEL_INSTANCE: locking.ALL_SET,
2317
        }
2318
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
2319

    
2320
  def DeclareLocks(self, level):
2321
    if level == locking.LEVEL_NODE and self.wanted_names is not None:
2322
      self._LockInstancesNodes(primary_only=True)
2323

    
2324
  def CheckPrereq(self):
2325
    """Check prerequisites.
2326

2327
    This only checks the optional instance list against the existing names.
2328

2329
    """
2330
    if self.wanted_names is None:
2331
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2332

    
2333
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
2334
                             in self.wanted_names]
2335

    
2336
  def _EnsureChildSizes(self, disk):
2337
    """Ensure children of the disk have the needed disk size.
2338

2339
    This is valid mainly for DRBD8 and fixes an issue where the
2340
    children have smaller disk size.
2341

2342
    @param disk: an L{ganeti.objects.Disk} object
2343

2344
    """
2345
    if disk.dev_type == constants.LD_DRBD8:
2346
      assert disk.children, "Empty children for DRBD8?"
2347
      fchild = disk.children[0]
2348
      mismatch = fchild.size < disk.size
2349
      if mismatch:
2350
        self.LogInfo("Child disk has size %d, parent %d, fixing",
2351
                     fchild.size, disk.size)
2352
        fchild.size = disk.size
2353

    
2354
      # and we recurse on this child only, not on the metadev
2355
      return self._EnsureChildSizes(fchild) or mismatch
2356
    else:
2357
      return False
2358

    
2359
  def Exec(self, feedback_fn):
2360
    """Verify the size of cluster disks.
2361

2362
    """
2363
    # TODO: check child disks too
2364
    # TODO: check differences in size between primary/secondary nodes
2365
    per_node_disks = {}
2366
    for instance in self.wanted_instances:
2367
      pnode = instance.primary_node
2368
      if pnode not in per_node_disks:
2369
        per_node_disks[pnode] = []
2370
      for idx, disk in enumerate(instance.disks):
2371
        per_node_disks[pnode].append((instance, idx, disk))
2372

    
2373
    changed = []
2374
    for node, dskl in per_node_disks.items():
2375
      newl = [v[2].Copy() for v in dskl]
2376
      for dsk in newl:
2377
        self.cfg.SetDiskID(dsk, node)
2378
      result = self.rpc.call_blockdev_getsizes(node, newl)
2379
      if result.fail_msg:
2380
        self.LogWarning("Failure in blockdev_getsizes call to node"
2381
                        " %s, ignoring", node)
2382
        continue
2383
      if len(result.data) != len(dskl):
2384
        self.LogWarning("Invalid result from node %s, ignoring node results",
2385
                        node)
2386
        continue
2387
      for ((instance, idx, disk), size) in zip(dskl, result.data):
2388
        if size is None:
2389
          self.LogWarning("Disk %d of instance %s did not return size"
2390
                          " information, ignoring", idx, instance.name)
2391
          continue
2392
        if not isinstance(size, (int, long)):
2393
          self.LogWarning("Disk %d of instance %s did not return valid"
2394
                          " size information, ignoring", idx, instance.name)
2395
          continue
2396
        size = size >> 20
2397
        if size != disk.size:
2398
          self.LogInfo("Disk %d of instance %s has mismatched size,"
2399
                       " correcting: recorded %d, actual %d", idx,
2400
                       instance.name, disk.size, size)
2401
          disk.size = size
2402
          self.cfg.Update(instance, feedback_fn)
2403
          changed.append((instance.name, idx, size))
2404
        if self._EnsureChildSizes(disk):
2405
          self.cfg.Update(instance, feedback_fn)
2406
          changed.append((instance.name, idx, disk.size))
2407
    return changed
2408

    
2409

    
2410
class LURenameCluster(LogicalUnit):
2411
  """Rename the cluster.
2412

2413
  """
2414
  HPATH = "cluster-rename"
2415
  HTYPE = constants.HTYPE_CLUSTER
2416
  _OP_REQP = [("name", _TNonEmptyString)]
2417

    
2418
  def BuildHooksEnv(self):
2419
    """Build hooks env.
2420

2421
    """
2422
    env = {
2423
      "OP_TARGET": self.cfg.GetClusterName(),
2424
      "NEW_NAME": self.op.name,
2425
      }
2426
    mn = self.cfg.GetMasterNode()
2427
    all_nodes = self.cfg.GetNodeList()
2428
    return env, [mn], all_nodes
2429

    
2430
  def CheckPrereq(self):
2431
    """Verify that the passed name is a valid one.
2432

2433
    """
2434
    hostname = utils.GetHostInfo(self.op.name)
2435

    
2436
    new_name = hostname.name
2437
    self.ip = new_ip = hostname.ip
2438
    old_name = self.cfg.GetClusterName()
2439
    old_ip = self.cfg.GetMasterIP()
2440
    if new_name == old_name and new_ip == old_ip:
2441
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
2442
                                 " cluster has changed",
2443
                                 errors.ECODE_INVAL)
2444
    if new_ip != old_ip:
2445
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
2446
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
2447
                                   " reachable on the network. Aborting." %
2448
                                   new_ip, errors.ECODE_NOTUNIQUE)
2449

    
2450
    self.op.name = new_name
2451

    
2452
  def Exec(self, feedback_fn):
2453
    """Rename the cluster.
2454

2455
    """
2456
    clustername = self.op.name
2457
    ip = self.ip
2458

    
2459
    # shutdown the master IP
2460
    master = self.cfg.GetMasterNode()
2461
    result = self.rpc.call_node_stop_master(master, False)
2462
    result.Raise("Could not disable the master role")
2463

    
2464
    try:
2465
      cluster = self.cfg.GetClusterInfo()
2466
      cluster.cluster_name = clustername
2467
      cluster.master_ip = ip
2468
      self.cfg.Update(cluster, feedback_fn)
2469

    
2470
      # update the known hosts file
2471
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
2472
      node_list = self.cfg.GetNodeList()
2473
      try:
2474
        node_list.remove(master)
2475
      except ValueError:
2476
        pass
2477
      result = self.rpc.call_upload_file(node_list,
2478
                                         constants.SSH_KNOWN_HOSTS_FILE)
2479
      for to_node, to_result in result.iteritems():
2480
        msg = to_result.fail_msg
2481
        if msg:
2482
          msg = ("Copy of file %s to node %s failed: %s" %
2483
                 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
2484
          self.proc.LogWarning(msg)
2485

    
2486
    finally:
2487
      result = self.rpc.call_node_start_master(master, False, False)
2488
      msg = result.fail_msg
2489
      if msg:
2490
        self.LogWarning("Could not re-enable the master role on"
2491
                        " the master, please restart manually: %s", msg)
2492

    
2493

    
2494
def _RecursiveCheckIfLVMBased(disk):
2495
  """Check if the given disk or its children are lvm-based.
2496

2497
  @type disk: L{objects.Disk}
2498
  @param disk: the disk to check
2499
  @rtype: boolean
2500
  @return: boolean indicating whether a LD_LV dev_type was found or not
2501

2502
  """
2503
  if disk.children:
2504
    for chdisk in disk.children:
2505
      if _RecursiveCheckIfLVMBased(chdisk):
2506
        return True
2507
  return disk.dev_type == constants.LD_LV
2508

    
2509

    
2510
class LUSetClusterParams(LogicalUnit):
2511
  """Change the parameters of the cluster.
2512

2513
  """
2514
  HPATH = "cluster-modify"
2515
  HTYPE = constants.HTYPE_CLUSTER
2516
  _OP_REQP = [
2517
    ("hvparams", _TOr(_TDictOf(_TNonEmptyString, _TDict), _TNone)),
2518
    ("os_hvp", _TOr(_TDictOf(_TNonEmptyString, _TDict), _TNone)),
2519
    ("osparams", _TOr(_TDictOf(_TNonEmptyString, _TDict), _TNone)),
2520
    ("enabled_hypervisors",
2521
     _TOr(_TAnd(_TListOf(_TElemOf(constants.HYPER_TYPES)), _TTrue), _TNone)),
2522
    ]
2523
  _OP_DEFS = [
2524
    ("candidate_pool_size", None),
2525
    ("uid_pool", None),
2526
    ("add_uids", None),
2527
    ("remove_uids", None),
2528
    ("hvparams", None),
2529
    ("os_hvp", None),
2530
    ]
2531
  REQ_BGL = False
2532

    
2533
  def CheckArguments(self):
2534
    """Check parameters
2535

2536
    """
2537
    if self.op.candidate_pool_size is not None:
2538
      try:
2539
        self.op.candidate_pool_size = int(self.op.candidate_pool_size)
2540
      except (ValueError, TypeError), err:
2541
        raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
2542
                                   str(err), errors.ECODE_INVAL)
2543
      if self.op.candidate_pool_size < 1:
2544
        raise errors.OpPrereqError("At least one master candidate needed",
2545
                                   errors.ECODE_INVAL)
2546

    
2547
    _CheckBooleanOpField(self.op, "maintain_node_health")
2548

    
2549
    if self.op.uid_pool:
2550
      uidpool.CheckUidPool(self.op.uid_pool)
2551

    
2552
    if self.op.add_uids:
2553
      uidpool.CheckUidPool(self.op.add_uids)
2554

    
2555
    if self.op.remove_uids:
2556
      uidpool.CheckUidPool(self.op.remove_uids)
2557

    
2558
  def ExpandNames(self):
2559
    # FIXME: in the future maybe other cluster params won't require checking on
2560
    # all nodes to be modified.
2561
    self.needed_locks = {
2562
      locking.LEVEL_NODE: locking.ALL_SET,
2563
    }
2564
    self.share_locks[locking.LEVEL_NODE] = 1
2565

    
2566
  def BuildHooksEnv(self):
2567
    """Build hooks env.
2568

2569
    """
2570
    env = {
2571
      "OP_TARGET": self.cfg.GetClusterName(),
2572
      "NEW_VG_NAME": self.op.vg_name,
2573
      }
2574
    mn = self.cfg.GetMasterNode()
2575
    return env, [mn], [mn]
2576

    
2577
  def CheckPrereq(self):
2578
    """Check prerequisites.
2579

2580
    This checks whether the given params don't conflict and
2581
    if the given volume group is valid.
2582

2583
    """
2584
    if self.op.vg_name is not None and not self.op.vg_name:
2585
      instances = self.cfg.GetAllInstancesInfo().values()
2586
      for inst in instances:
2587
        for disk in inst.disks:
2588
          if _RecursiveCheckIfLVMBased(disk):
2589
            raise errors.OpPrereqError("Cannot disable lvm storage while"
2590
                                       " lvm-based instances exist",
2591
                                       errors.ECODE_INVAL)
2592

    
2593
    node_list = self.acquired_locks[locking.LEVEL_NODE]
2594

    
2595
    # if vg_name not None, checks given volume group on all nodes
2596
    if self.op.vg_name:
2597
      vglist = self.rpc.call_vg_list(node_list)
2598
      for node in node_list:
2599
        msg = vglist[node].fail_msg
2600
        if msg:
2601
          # ignoring down node
2602
          self.LogWarning("Error while gathering data on node %s"
2603
                          " (ignoring node): %s", node, msg)
2604
          continue
2605
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
2606
                                              self.op.vg_name,
2607
                                              constants.MIN_VG_SIZE)
2608
        if vgstatus:
2609
          raise errors.OpPrereqError("Error on node '%s': %s" %
2610
                                     (node, vgstatus), errors.ECODE_ENVIRON)
2611

    
2612
    self.cluster = cluster = self.cfg.GetClusterInfo()
2613
    # validate params changes
2614
    if self.op.beparams:
2615
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
2616
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
2617

    
2618
    if self.op.nicparams:
2619
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
2620
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
2621
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
2622
      nic_errors = []
2623

    
2624
      # check all instances for consistency
2625
      for instance in self.cfg.GetAllInstancesInfo().values():
2626
        for nic_idx, nic in enumerate(instance.nics):
2627
          params_copy = copy.deepcopy(nic.nicparams)
2628
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
2629

    
2630
          # check parameter syntax
2631
          try:
2632
            objects.NIC.CheckParameterSyntax(params_filled)
2633
          except errors.ConfigurationError, err:
2634
            nic_errors.append("Instance %s, nic/%d: %s" %
2635
                              (instance.name, nic_idx, err))
2636

    
2637
          # if we're moving instances to routed, check that they have an ip
2638
          target_mode = params_filled[constants.NIC_MODE]
2639
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
2640
            nic_errors.append("Instance %s, nic/%d: routed nick with no ip" %
2641
                              (instance.name, nic_idx))
2642
      if nic_errors:
2643
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
2644
                                   "\n".join(nic_errors))
2645

    
2646
    # hypervisor list/parameters
2647
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
2648
    if self.op.hvparams:
2649
      for hv_name, hv_dict in self.op.hvparams.items():
2650
        if hv_name not in self.new_hvparams:
2651
          self.new_hvparams[hv_name] = hv_dict
2652
        else:
2653
          self.new_hvparams[hv_name].update(hv_dict)
2654

    
2655
    # os hypervisor parameters
2656
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
2657
    if self.op.os_hvp:
2658
      for os_name, hvs in self.op.os_hvp.items():
2659
        if os_name not in self.new_os_hvp:
2660
          self.new_os_hvp[os_name] = hvs
2661
        else:
2662
          for hv_name, hv_dict in hvs.items():
2663
            if hv_name not in self.new_os_hvp[os_name]:
2664
              self.new_os_hvp[os_name][hv_name] = hv_dict
2665
            else:
2666
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
2667

    
2668
    # os parameters
2669
    self.new_osp = objects.FillDict(cluster.osparams, {})
2670
    if self.op.osparams:
2671
      for os_name, osp in self.op.osparams.items():
2672
        if os_name not in self.new_osp:
2673
          self.new_osp[os_name] = {}
2674

    
2675
        self.new_osp[os_name] = _GetUpdatedParams(self.new_osp[os_name], osp,
2676
                                                  use_none=True)
2677

    
2678
        if not self.new_osp[os_name]:
2679
          # we removed all parameters
2680
          del self.new_osp[os_name]
2681
        else:
2682
          # check the parameter validity (remote check)
2683
          _CheckOSParams(self, False, [self.cfg.GetMasterNode()],
2684
                         os_name, self.new_osp[os_name])
2685

    
2686
    # changes to the hypervisor list
2687
    if self.op.enabled_hypervisors is not None:
2688
      self.hv_list = self.op.enabled_hypervisors
2689
      for hv in self.hv_list:
2690
        # if the hypervisor doesn't already exist in the cluster
2691
        # hvparams, we initialize it to empty, and then (in both
2692
        # cases) we make sure to fill the defaults, as we might not
2693
        # have a complete defaults list if the hypervisor wasn't
2694
        # enabled before
2695
        if hv not in new_hvp:
2696
          new_hvp[hv] = {}
2697
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
2698
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
2699
    else:
2700
      self.hv_list = cluster.enabled_hypervisors
2701

    
2702
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
2703
      # either the enabled list has changed, or the parameters have, validate
2704
      for hv_name, hv_params in self.new_hvparams.items():
2705
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
2706
            (self.op.enabled_hypervisors and
2707
             hv_name in self.op.enabled_hypervisors)):
2708
          # either this is a new hypervisor, or its parameters have changed
2709
          hv_class = hypervisor.GetHypervisor(hv_name)
2710
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
2711
          hv_class.CheckParameterSyntax(hv_params)
2712
          _CheckHVParams(self, node_list, hv_name, hv_params)
2713

    
2714
    if self.op.os_hvp:
2715
      # no need to check any newly-enabled hypervisors, since the
2716
      # defaults have already been checked in the above code-block
2717
      for os_name, os_hvp in self.new_os_hvp.items():
2718
        for hv_name, hv_params in os_hvp.items():
2719
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
2720
          # we need to fill in the new os_hvp on top of the actual hv_p
2721
          cluster_defaults = self.new_hvparams.get(hv_name, {})
2722
          new_osp = objects.FillDict(cluster_defaults, hv_params)
2723
          hv_class = hypervisor.GetHypervisor(hv_name)
2724
          hv_class.CheckParameterSyntax(new_osp)
2725
          _CheckHVParams(self, node_list, hv_name, new_osp)
2726

    
2727

    
2728
  def Exec(self, feedback_fn):
2729
    """Change the parameters of the cluster.
2730

2731
    """
2732
    if self.op.vg_name is not None:
2733
      new_volume = self.op.vg_name
2734
      if not new_volume:
2735
        new_volume = None
2736
      if new_volume != self.cfg.GetVGName():
2737
        self.cfg.SetVGName(new_volume)
2738
      else:
2739
        feedback_fn("Cluster LVM configuration already in desired"
2740
                    " state, not changing")
2741
    if self.op.hvparams:
2742
      self.cluster.hvparams = self.new_hvparams
2743
    if self.op.os_hvp:
2744
      self.cluster.os_hvp = self.new_os_hvp
2745
    if self.op.enabled_hypervisors is not None:
2746
      self.cluster.hvparams = self.new_hvparams
2747
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
2748
    if self.op.beparams:
2749
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
2750
    if self.op.nicparams:
2751
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
2752
    if self.op.osparams:
2753
      self.cluster.osparams = self.new_osp
2754

    
2755
    if self.op.candidate_pool_size is not None:
2756
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
2757
      # we need to update the pool size here, otherwise the save will fail
2758
      _AdjustCandidatePool(self, [])
2759

    
2760
    if self.op.maintain_node_health is not None:
2761
      self.cluster.maintain_node_health = self.op.maintain_node_health
2762

    
2763
    if self.op.add_uids is not None:
2764
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
2765

    
2766
    if self.op.remove_uids is not None:
2767
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
2768

    
2769
    if self.op.uid_pool is not None:
2770
      self.cluster.uid_pool = self.op.uid_pool
2771

    
2772
    self.cfg.Update(self.cluster, feedback_fn)
2773

    
2774

    
2775
def _RedistributeAncillaryFiles(lu, additional_nodes=None):
2776
  """Distribute additional files which are part of the cluster configuration.
2777

2778
  ConfigWriter takes care of distributing the config and ssconf files, but
2779
  there are more files which should be distributed to all nodes. This function
2780
  makes sure those are copied.
2781

2782
  @param lu: calling logical unit
2783
  @param additional_nodes: list of nodes not in the config to distribute to
2784

2785
  """
2786
  # 1. Gather target nodes
2787
  myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
2788
  dist_nodes = lu.cfg.GetOnlineNodeList()
2789
  if additional_nodes is not None:
2790
    dist_nodes.extend(additional_nodes)
2791
  if myself.name in dist_nodes:
2792
    dist_nodes.remove(myself.name)
2793

    
2794
  # 2. Gather files to distribute
2795
  dist_files = set([constants.ETC_HOSTS,
2796
                    constants.SSH_KNOWN_HOSTS_FILE,
2797
                    constants.RAPI_CERT_FILE,
2798
                    constants.RAPI_USERS_FILE,
2799
                    constants.CONFD_HMAC_KEY,
2800
                    constants.CLUSTER_DOMAIN_SECRET_FILE,
2801
                   ])
2802

    
2803
  enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
2804
  for hv_name in enabled_hypervisors:
2805
    hv_class = hypervisor.GetHypervisor(hv_name)
2806
    dist_files.update(hv_class.GetAncillaryFiles())
2807

    
2808
  # 3. Perform the files upload
2809
  for fname in dist_files:
2810
    if os.path.exists(fname):
2811
      result = lu.rpc.call_upload_file(dist_nodes, fname)
2812
      for to_node, to_result in result.items():
2813
        msg = to_result.fail_msg
2814
        if msg:
2815
          msg = ("Copy of file %s to node %s failed: %s" %
2816
                 (fname, to_node, msg))
2817
          lu.proc.LogWarning(msg)
2818

    
2819

    
2820
class LURedistributeConfig(NoHooksLU):
2821
  """Force the redistribution of cluster configuration.
2822

2823
  This is a very simple LU.
2824

2825
  """
2826
  _OP_REQP = []
2827
  REQ_BGL = False
2828

    
2829
  def ExpandNames(self):
2830
    self.needed_locks = {
2831
      locking.LEVEL_NODE: locking.ALL_SET,
2832
    }
2833
    self.share_locks[locking.LEVEL_NODE] = 1
2834

    
2835
  def Exec(self, feedback_fn):
2836
    """Redistribute the configuration.
2837

2838
    """
2839
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
2840
    _RedistributeAncillaryFiles(self)
2841

    
2842

    
2843
def _WaitForSync(lu, instance, disks=None, oneshot=False):
2844
  """Sleep and poll for an instance's disk to sync.
2845

2846
  """
2847
  if not instance.disks or disks is not None and not disks:
2848
    return True
2849

    
2850
  disks = _ExpandCheckDisks(instance, disks)
2851

    
2852
  if not oneshot:
2853
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
2854

    
2855
  node = instance.primary_node
2856

    
2857
  for dev in disks:
2858
    lu.cfg.SetDiskID(dev, node)
2859

    
2860
  # TODO: Convert to utils.Retry
2861

    
2862
  retries = 0
2863
  degr_retries = 10 # in seconds, as we sleep 1 second each time
2864
  while True:
2865
    max_time = 0
2866
    done = True
2867
    cumul_degraded = False
2868
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, disks)
2869
    msg = rstats.fail_msg
2870
    if msg:
2871
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
2872
      retries += 1
2873
      if retries >= 10:
2874
        raise errors.RemoteError("Can't contact node %s for mirror data,"
2875
                                 " aborting." % node)
2876
      time.sleep(6)
2877
      continue
2878
    rstats = rstats.payload
2879
    retries = 0
2880
    for i, mstat in enumerate(rstats):
2881
      if mstat is None:
2882
        lu.LogWarning("Can't compute data for node %s/%s",
2883
                           node, disks[i].iv_name)
2884
        continue
2885

    
2886
      cumul_degraded = (cumul_degraded or
2887
                        (mstat.is_degraded and mstat.sync_percent is None))
2888
      if mstat.sync_percent is not None:
2889
        done = False
2890
        if mstat.estimated_time is not None:
2891
          rem_time = ("%s remaining (estimated)" %
2892
                      utils.FormatSeconds(mstat.estimated_time))
2893
          max_time = mstat.estimated_time
2894
        else:
2895
          rem_time = "no time estimate"
2896
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
2897
                        (disks[i].iv_name, mstat.sync_percent, rem_time))
2898

    
2899
    # if we're done but degraded, let's do a few small retries, to
2900
    # make sure we see a stable and not transient situation; therefore
2901
    # we force restart of the loop
2902
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
2903
      logging.info("Degraded disks found, %d retries left", degr_retries)
2904
      degr_retries -= 1
2905
      time.sleep(1)
2906
      continue
2907

    
2908
    if done or oneshot:
2909
      break
2910

    
2911
    time.sleep(min(60, max_time))
2912

    
2913
  if done:
2914
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
2915
  return not cumul_degraded
2916

    
2917

    
2918
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
2919
  """Check that mirrors are not degraded.
2920

2921
  The ldisk parameter, if True, will change the test from the
2922
  is_degraded attribute (which represents overall non-ok status for
2923
  the device(s)) to the ldisk (representing the local storage status).
2924

2925
  """
2926
  lu.cfg.SetDiskID(dev, node)
2927

    
2928
  result = True
2929

    
2930
  if on_primary or dev.AssembleOnSecondary():
2931
    rstats = lu.rpc.call_blockdev_find(node, dev)
2932
    msg = rstats.fail_msg
2933
    if msg:
2934
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
2935
      result = False
2936
    elif not rstats.payload:
2937
      lu.LogWarning("Can't find disk on node %s", node)
2938
      result = False
2939
    else:
2940
      if ldisk:
2941
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
2942
      else:
2943
        result = result and not rstats.payload.is_degraded
2944

    
2945
  if dev.children:
2946
    for child in dev.children:
2947
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
2948

    
2949
  return result
2950

    
2951

    
2952
class LUDiagnoseOS(NoHooksLU):
2953
  """Logical unit for OS diagnose/query.
2954

2955
  """
2956
  _OP_REQP = [
2957
    ("output_fields", _TListOf(_TNonEmptyString)),
2958
    ("names", _TListOf(_TNonEmptyString)),
2959
    ]
2960
  REQ_BGL = False
2961
  _FIELDS_STATIC = utils.FieldSet()
2962
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status", "variants",
2963
                                   "parameters", "api_versions")
2964

    
2965
  def CheckArguments(self):
2966
    if self.op.names:
2967
      raise errors.OpPrereqError("Selective OS query not supported",
2968
                                 errors.ECODE_INVAL)
2969

    
2970
    _CheckOutputFields(static=self._FIELDS_STATIC,
2971
                       dynamic=self._FIELDS_DYNAMIC,
2972
                       selected=self.op.output_fields)
2973

    
2974
  def ExpandNames(self):
2975
    # Lock all nodes, in shared mode
2976
    # Temporary removal of locks, should be reverted later
2977
    # TODO: reintroduce locks when they are lighter-weight
2978
    self.needed_locks = {}
2979
    #self.share_locks[locking.LEVEL_NODE] = 1
2980
    #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2981

    
2982
  @staticmethod
2983
  def _DiagnoseByOS(rlist):
2984
    """Remaps a per-node return list into an a per-os per-node dictionary
2985

2986
    @param rlist: a map with node names as keys and OS objects as values
2987

2988
    @rtype: dict
2989
    @return: a dictionary with osnames as keys and as value another
2990
        map, with nodes as keys and tuples of (path, status, diagnose,
2991
        variants, parameters, api_versions) as values, eg::
2992

2993
          {"debian-etch": {"node1": [(/usr/lib/..., True, "", [], []),
2994
                                     (/srv/..., False, "invalid api")],
2995
                           "node2": [(/srv/..., True, "", [], [])]}
2996
          }
2997

2998
    """
2999
    all_os = {}
3000
    # we build here the list of nodes that didn't fail the RPC (at RPC
3001
    # level), so that nodes with a non-responding node daemon don't
3002
    # make all OSes invalid
3003
    good_nodes = [node_name for node_name in rlist
3004
                  if not rlist[node_name].fail_msg]
3005
    for node_name, nr in rlist.items():
3006
      if nr.fail_msg or not nr.payload:
3007
        continue
3008
      for (name, path, status, diagnose, variants,
3009
           params, api_versions) in nr.payload:
3010
        if name not in all_os:
3011
          # build a list of nodes for this os containing empty lists
3012
          # for each node in node_list
3013
          all_os[name] = {}
3014
          for nname in good_nodes:
3015
            all_os[name][nname] = []
3016
        # convert params from [name, help] to (name, help)
3017
        params = [tuple(v) for v in params]
3018
        all_os[name][node_name].append((path, status, diagnose,
3019
                                        variants, params, api_versions))
3020
    return all_os
3021

    
3022
  def Exec(self, feedback_fn):
3023
    """Compute the list of OSes.
3024

3025
    """
3026
    valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
3027
    node_data = self.rpc.call_os_diagnose(valid_nodes)
3028
    pol = self._DiagnoseByOS(node_data)
3029
    output = []
3030

    
3031
    for os_name, os_data in pol.items():
3032
      row = []
3033
      valid = True
3034
      (variants, params, api_versions) = null_state = (set(), set(), set())
3035
      for idx, osl in enumerate(os_data.values()):
3036
        valid = bool(valid and osl and osl[0][1])
3037
        if not valid:
3038
          (variants, params, api_versions) = null_state
3039
          break
3040
        node_variants, node_params, node_api = osl[0][3:6]
3041
        if idx == 0: # first entry
3042
          variants = set(node_variants)
3043
          params = set(node_params)
3044
          api_versions = set(node_api)
3045
        else: # keep consistency
3046
          variants.intersection_update(node_variants)
3047
          params.intersection_update(node_params)
3048
          api_versions.intersection_update(node_api)
3049

    
3050
      for field in self.op.output_fields:
3051
        if field == "name":
3052
          val = os_name
3053
        elif field == "valid":
3054
          val = valid
3055
        elif field == "node_status":
3056
          # this is just a copy of the dict
3057
          val = {}
3058
          for node_name, nos_list in os_data.items():
3059
            val[node_name] = nos_list
3060
        elif field == "variants":
3061
          val = list(variants)
3062
        elif field == "parameters":
3063
          val = list(params)
3064
        elif field == "api_versions":
3065
          val = list(api_versions)
3066
        else:
3067
          raise errors.ParameterError(field)
3068
        row.append(val)
3069
      output.append(row)
3070

    
3071
    return output
3072

    
3073

    
3074
class LURemoveNode(LogicalUnit):
3075
  """Logical unit for removing a node.
3076

3077
  """
3078
  HPATH = "node-remove"
3079
  HTYPE = constants.HTYPE_NODE
3080
  _OP_REQP = [("node_name", _TNonEmptyString)]
3081

    
3082
  def BuildHooksEnv(self):
3083
    """Build hooks env.
3084

3085
    This doesn't run on the target node in the pre phase as a failed
3086
    node would then be impossible to remove.
3087

3088
    """
3089
    env = {
3090
      "OP_TARGET": self.op.node_name,
3091
      "NODE_NAME": self.op.node_name,
3092
      }
3093
    all_nodes = self.cfg.GetNodeList()
3094
    try:
3095
      all_nodes.remove(self.op.node_name)
3096
    except ValueError:
3097
      logging.warning("Node %s which is about to be removed not found"
3098
                      " in the all nodes list", self.op.node_name)
3099
    return env, all_nodes, all_nodes
3100

    
3101
  def CheckPrereq(self):
3102
    """Check prerequisites.
3103

3104
    This checks:
3105
     - the node exists in the configuration
3106
     - it does not have primary or secondary instances
3107
     - it's not the master
3108

3109
    Any errors are signaled by raising errors.OpPrereqError.
3110

3111
    """
3112
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
3113
    node = self.cfg.GetNodeInfo(self.op.node_name)
3114
    assert node is not None
3115

    
3116
    instance_list = self.cfg.GetInstanceList()
3117

    
3118
    masternode = self.cfg.GetMasterNode()
3119
    if node.name == masternode:
3120
      raise errors.OpPrereqError("Node is the master node,"
3121
                                 " you need to failover first.",
3122
                                 errors.ECODE_INVAL)
3123

    
3124
    for instance_name in instance_list:
3125
      instance = self.cfg.GetInstanceInfo(instance_name)
3126
      if node.name in instance.all_nodes:
3127
        raise errors.OpPrereqError("Instance %s is still running on the node,"
3128
                                   " please remove first." % instance_name,
3129
                                   errors.ECODE_INVAL)
3130
    self.op.node_name = node.name
3131
    self.node = node
3132

    
3133
  def Exec(self, feedback_fn):
3134
    """Removes the node from the cluster.
3135

3136
    """
3137
    node = self.node
3138
    logging.info("Stopping the node daemon and removing configs from node %s",
3139
                 node.name)
3140

    
3141
    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
3142

    
3143
    # Promote nodes to master candidate as needed
3144
    _AdjustCandidatePool(self, exceptions=[node.name])
3145
    self.context.RemoveNode(node.name)
3146

    
3147
    # Run post hooks on the node before it's removed
3148
    hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
3149
    try:
3150
      hm.RunPhase(constants.HOOKS_PHASE_POST, [node.name])
3151
    except:
3152
      # pylint: disable-msg=W0702
3153
      self.LogWarning("Errors occurred running hooks on %s" % node.name)
3154

    
3155
    result = self.rpc.call_node_leave_cluster(node.name, modify_ssh_setup)
3156
    msg = result.fail_msg
3157
    if msg:
3158
      self.LogWarning("Errors encountered on the remote node while leaving"
3159
                      " the cluster: %s", msg)
3160

    
3161
    # Remove node from our /etc/hosts
3162
    if self.cfg.GetClusterInfo().modify_etc_hosts:
3163
      # FIXME: this should be done via an rpc call to node daemon
3164
      utils.RemoveHostFromEtcHosts(node.name)
3165
      _RedistributeAncillaryFiles(self)
3166

    
3167

    
3168
class LUQueryNodes(NoHooksLU):
3169
  """Logical unit for querying nodes.
3170

3171
  """
3172
  # pylint: disable-msg=W0142
3173
  _OP_REQP = [
3174
    ("output_fields", _TListOf(_TNonEmptyString)),
3175
    ("names", _TListOf(_TNonEmptyString)),
3176
    ("use_locking", _TBool),
3177
    ]
3178
  REQ_BGL = False
3179

    
3180
  _SIMPLE_FIELDS = ["name", "serial_no", "ctime", "mtime", "uuid",
3181
                    "master_candidate", "offline", "drained"]
3182

    
3183
  _FIELDS_DYNAMIC = utils.FieldSet(
3184
    "dtotal", "dfree",
3185
    "mtotal", "mnode", "mfree",
3186
    "bootid",
3187
    "ctotal", "cnodes", "csockets",
3188
    )
3189

    
3190
  _FIELDS_STATIC = utils.FieldSet(*[
3191
    "pinst_cnt", "sinst_cnt",
3192
    "pinst_list", "sinst_list",
3193
    "pip", "sip", "tags",
3194
    "master",
3195
    "role"] + _SIMPLE_FIELDS
3196
    )
3197

    
3198
  def CheckArguments(self):
3199
    _CheckOutputFields(static=self._FIELDS_STATIC,
3200
                       dynamic=self._FIELDS_DYNAMIC,
3201
                       selected=self.op.output_fields)
3202

    
3203
  def ExpandNames(self):
3204
    self.needed_locks = {}
3205
    self.share_locks[locking.LEVEL_NODE] = 1
3206

    
3207
    if self.op.names:
3208
      self.wanted = _GetWantedNodes(self, self.op.names)
3209
    else:
3210
      self.wanted = locking.ALL_SET
3211

    
3212
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3213
    self.do_locking = self.do_node_query and self.op.use_locking
3214
    if self.do_locking:
3215
      # if we don't request only static fields, we need to lock the nodes
3216
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
3217

    
3218
  def Exec(self, feedback_fn):
3219
    """Computes the list of nodes and their attributes.
3220

3221
    """
3222
    all_info = self.cfg.GetAllNodesInfo()
3223
    if self.do_locking:
3224
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
3225
    elif self.wanted != locking.ALL_SET:
3226
      nodenames = self.wanted
3227
      missing = set(nodenames).difference(all_info.keys())
3228
      if missing:
3229
        raise errors.OpExecError(
3230
          "Some nodes were removed before retrieving their data: %s" % missing)
3231
    else:
3232
      nodenames = all_info.keys()
3233

    
3234
    nodenames = utils.NiceSort(nodenames)
3235
    nodelist = [all_info[name] for name in nodenames]
3236

    
3237
    # begin data gathering
3238

    
3239
    if self.do_node_query:
3240
      live_data = {}
3241
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3242
                                          self.cfg.GetHypervisorType())
3243
      for name in nodenames:
3244
        nodeinfo = node_data[name]
3245
        if not nodeinfo.fail_msg and nodeinfo.payload:
3246
          nodeinfo = nodeinfo.payload
3247
          fn = utils.TryConvert
3248
          live_data[name] = {
3249
            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
3250
            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
3251
            "mfree": fn(int, nodeinfo.get('memory_free', None)),
3252
            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
3253
            "dfree": fn(int, nodeinfo.get('vg_free', None)),
3254
            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
3255
            "bootid": nodeinfo.get('bootid', None),
3256
            "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
3257
            "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
3258
            }
3259
        else:
3260
          live_data[name] = {}
3261
    else:
3262
      live_data = dict.fromkeys(nodenames, {})
3263

    
3264
    node_to_primary = dict([(name, set()) for name in nodenames])
3265
    node_to_secondary = dict([(name, set()) for name in nodenames])
3266

    
3267
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
3268
                             "sinst_cnt", "sinst_list"))
3269
    if inst_fields & frozenset(self.op.output_fields):
3270
      inst_data = self.cfg.GetAllInstancesInfo()
3271

    
3272
      for inst in inst_data.values():
3273
        if inst.primary_node in node_to_primary:
3274
          node_to_primary[inst.primary_node].add(inst.name)
3275
        for secnode in inst.secondary_nodes:
3276
          if secnode in node_to_secondary:
3277
            node_to_secondary[secnode].add(inst.name)
3278

    
3279
    master_node = self.cfg.GetMasterNode()
3280

    
3281
    # end data gathering
3282

    
3283
    output = []
3284
    for node in nodelist:
3285
      node_output = []
3286
      for field in self.op.output_fields:
3287
        if field in self._SIMPLE_FIELDS:
3288
          val = getattr(node, field)
3289
        elif field == "pinst_list":
3290
          val = list(node_to_primary[node.name])
3291
        elif field == "sinst_list":
3292
          val = list(node_to_secondary[node.name])
3293
        elif field == "pinst_cnt":
3294
          val = len(node_to_primary[node.name])
3295
        elif field == "sinst_cnt":
3296
          val = len(node_to_secondary[node.name])
3297
        elif field == "pip":
3298
          val = node.primary_ip
3299
        elif field == "sip":
3300
          val = node.secondary_ip
3301
        elif field == "tags":
3302
          val = list(node.GetTags())
3303
        elif field == "master":
3304
          val = node.name == master_node
3305
        elif self._FIELDS_DYNAMIC.Matches(field):
3306
          val = live_data[node.name].get(field, None)
3307
        elif field == "role":
3308
          if node.name == master_node:
3309
            val = "M"
3310
          elif node.master_candidate:
3311
            val = "C"
3312
          elif node.drained:
3313
            val = "D"
3314
          elif node.offline:
3315
            val = "O"
3316
          else:
3317
            val = "R"
3318
        else:
3319
          raise errors.ParameterError(field)
3320
        node_output.append(val)
3321
      output.append(node_output)
3322

    
3323
    return output
3324

    
3325

    
3326
class LUQueryNodeVolumes(NoHooksLU):
3327
  """Logical unit for getting volumes on node(s).
3328

3329
  """
3330
  _OP_REQP = [
3331
    ("nodes", _TListOf(_TNonEmptyString)),
3332
    ("output_fields", _TListOf(_TNonEmptyString)),
3333
    ]
3334
  REQ_BGL = False
3335
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
3336
  _FIELDS_STATIC = utils.FieldSet("node")
3337

    
3338
  def CheckArguments(self):
3339
    _CheckOutputFields(static=self._FIELDS_STATIC,
3340
                       dynamic=self._FIELDS_DYNAMIC,
3341
                       selected=self.op.output_fields)
3342

    
3343
  def ExpandNames(self):
3344
    self.needed_locks = {}
3345
    self.share_locks[locking.LEVEL_NODE] = 1
3346
    if not self.op.nodes:
3347
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3348
    else:
3349
      self.needed_locks[locking.LEVEL_NODE] = \
3350
        _GetWantedNodes(self, self.op.nodes)
3351

    
3352
  def Exec(self, feedback_fn):
3353
    """Computes the list of nodes and their attributes.
3354

3355
    """
3356
    nodenames = self.acquired_locks[locking.LEVEL_NODE]
3357
    volumes = self.rpc.call_node_volumes(nodenames)
3358

    
3359
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
3360
             in self.cfg.GetInstanceList()]
3361

    
3362
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
3363

    
3364
    output = []
3365
    for node in nodenames:
3366
      nresult = volumes[node]
3367
      if nresult.offline:
3368
        continue
3369
      msg = nresult.fail_msg
3370
      if msg:
3371
        self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
3372
        continue
3373

    
3374
      node_vols = nresult.payload[:]
3375
      node_vols.sort(key=lambda vol: vol['dev'])
3376

    
3377
      for vol in node_vols:
3378
        node_output = []
3379
        for field in self.op.output_fields:
3380
          if field == "node":
3381
            val = node
3382
          elif field == "phys":
3383
            val = vol['dev']
3384
          elif field == "vg":
3385
            val = vol['vg']
3386
          elif field == "name":
3387
            val = vol['name']
3388
          elif field == "size":
3389
            val = int(float(vol['size']))
3390
          elif field == "instance":
3391
            for inst in ilist:
3392
              if node not in lv_by_node[inst]:
3393
                continue
3394
              if vol['name'] in lv_by_node[inst][node]:
3395
                val = inst.name
3396
                break
3397
            else:
3398
              val = '-'
3399
          else:
3400
            raise errors.ParameterError(field)
3401
          node_output.append(str(val))
3402

    
3403
        output.append(node_output)
3404

    
3405
    return output
3406

    
3407

    
3408
class LUQueryNodeStorage(NoHooksLU):
3409
  """Logical unit for getting information on storage units on node(s).
3410

3411
  """
3412
  _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
3413
  _OP_REQP = [
3414
    ("nodes", _TListOf(_TNonEmptyString)),
3415
    ("storage_type", _CheckStorageType),
3416
    ("output_fields", _TListOf(_TNonEmptyString)),
3417
    ]
3418
  _OP_DEFS = [("name", None)]
3419
  REQ_BGL = False
3420

    
3421
  def CheckArguments(self):
3422
    _CheckOutputFields(static=self._FIELDS_STATIC,
3423
                       dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
3424
                       selected=self.op.output_fields)
3425

    
3426
  def ExpandNames(self):
3427
    self.needed_locks = {}
3428
    self.share_locks[locking.LEVEL_NODE] = 1
3429

    
3430
    if self.op.nodes:
3431
      self.needed_locks[locking.LEVEL_NODE] = \
3432
        _GetWantedNodes(self, self.op.nodes)
3433
    else:
3434
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3435

    
3436
  def Exec(self, feedback_fn):
3437
    """Computes the list of nodes and their attributes.
3438

3439
    """
3440
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
3441

    
3442
    # Always get name to sort by
3443
    if constants.SF_NAME in self.op.output_fields:
3444
      fields = self.op.output_fields[:]
3445
    else:
3446
      fields = [constants.SF_NAME] + self.op.output_fields
3447

    
3448
    # Never ask for node or type as it's only known to the LU
3449
    for extra in [constants.SF_NODE, constants.SF_TYPE]:
3450
      while extra in fields:
3451
        fields.remove(extra)
3452

    
3453
    field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
3454
    name_idx = field_idx[constants.SF_NAME]
3455

    
3456
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
3457
    data = self.rpc.call_storage_list(self.nodes,
3458
                                      self.op.storage_type, st_args,
3459
                                      self.op.name, fields)
3460

    
3461
    result = []
3462

    
3463
    for node in utils.NiceSort(self.nodes):
3464
      nresult = data[node]
3465
      if nresult.offline:
3466
        continue
3467

    
3468
      msg = nresult.fail_msg
3469
      if msg:
3470
        self.LogWarning("Can't get storage data from node %s: %s", node, msg)
3471
        continue
3472

    
3473
      rows = dict([(row[name_idx], row) for row in nresult.payload])
3474

    
3475
      for name in utils.NiceSort(rows.keys()):
3476
        row = rows[name]
3477

    
3478
        out = []
3479

    
3480
        for field in self.op.output_fields:
3481
          if field == constants.SF_NODE:
3482
            val = node
3483
          elif field == constants.SF_TYPE:
3484
            val = self.op.storage_type
3485
          elif field in field_idx:
3486
            val = row[field_idx[field]]
3487
          else:
3488
            raise errors.ParameterError(field)
3489

    
3490
          out.append(val)
3491

    
3492
        result.append(out)
3493

    
3494
    return result
3495

    
3496

    
3497
class LUModifyNodeStorage(NoHooksLU):
3498
  """Logical unit for modifying a storage volume on a node.
3499

3500
  """
3501
  _OP_REQP = [
3502
    ("node_name", _TNonEmptyString),
3503
    ("storage_type", _CheckStorageType),
3504
    ("name", _TNonEmptyString),
3505
    ("changes", _TDict),
3506
    ]
3507
  REQ_BGL = False
3508

    
3509
  def CheckArguments(self):
3510
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
3511

    
3512
    storage_type = self.op.storage_type
3513

    
3514
    try:
3515
      modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
3516
    except KeyError:
3517
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
3518
                                 " modified" % storage_type,
3519
                                 errors.ECODE_INVAL)
3520

    
3521
    diff = set(self.op.changes.keys()) - modifiable
3522
    if diff:
3523
      raise errors.OpPrereqError("The following fields can not be modified for"
3524
                                 " storage units of type '%s': %r" %
3525
                                 (storage_type, list(diff)),
3526
                                 errors.ECODE_INVAL)
3527

    
3528
  def ExpandNames(self):
3529
    self.needed_locks = {
3530
      locking.LEVEL_NODE: self.op.node_name,
3531
      }
3532

    
3533
  def Exec(self, feedback_fn):
3534
    """Computes the list of nodes and their attributes.
3535

3536
    """
3537
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
3538
    result = self.rpc.call_storage_modify(self.op.node_name,
3539
                                          self.op.storage_type, st_args,
3540
                                          self.op.name, self.op.changes)
3541
    result.Raise("Failed to modify storage unit '%s' on %s" %
3542
                 (self.op.name, self.op.node_name))
3543

    
3544

    
3545
class LUAddNode(LogicalUnit):
3546
  """Logical unit for adding node to the cluster.
3547

3548
  """
3549
  HPATH = "node-add"
3550
  HTYPE = constants.HTYPE_NODE
3551
  _OP_REQP = [
3552
    ("node_name", _TNonEmptyString),
3553
    ]
3554
  _OP_DEFS = [("secondary_ip", None)]
3555

    
3556
  def CheckArguments(self):
3557
    # validate/normalize the node name
3558
    self.op.node_name = utils.HostInfo.NormalizeName(self.op.node_name)
3559

    
3560
  def BuildHooksEnv(self):
3561
    """Build hooks env.
3562

3563
    This will run on all nodes before, and on all nodes + the new node after.
3564

3565
    """
3566
    env = {
3567
      "OP_TARGET": self.op.node_name,
3568
      "NODE_NAME": self.op.node_name,
3569
      "NODE_PIP": self.op.primary_ip,
3570
      "NODE_SIP": self.op.secondary_ip,
3571
      }
3572
    nodes_0 = self.cfg.GetNodeList()
3573
    nodes_1 = nodes_0 + [self.op.node_name, ]
3574
    return env, nodes_0, nodes_1
3575

    
3576
  def CheckPrereq(self):
3577
    """Check prerequisites.
3578

3579
    This checks:
3580
     - the new node is not already in the config
3581
     - it is resolvable
3582
     - its parameters (single/dual homed) matches the cluster
3583

3584
    Any errors are signaled by raising errors.OpPrereqError.
3585

3586
    """
3587
    node_name = self.op.node_name
3588
    cfg = self.cfg
3589

    
3590
    dns_data = utils.GetHostInfo(node_name)
3591

    
3592
    node = dns_data.name
3593
    primary_ip = self.op.primary_ip = dns_data.ip
3594
    if self.op.secondary_ip is None:
3595
      self.op.secondary_ip = primary_ip
3596
    if not utils.IsValidIP(self.op.secondary_ip):
3597
      raise errors.OpPrereqError("Invalid secondary IP given",
3598
                                 errors.ECODE_INVAL)
3599
    secondary_ip = self.op.secondary_ip
3600

    
3601
    node_list = cfg.GetNodeList()
3602
    if not self.op.readd and node in node_list:
3603
      raise errors.OpPrereqError("Node %s is already in the configuration" %
3604
                                 node, errors.ECODE_EXISTS)
3605
    elif self.op.readd and node not in node_list:
3606
      raise errors.OpPrereqError("Node %s is not in the configuration" % node,
3607
                                 errors.ECODE_NOENT)
3608

    
3609
    self.changed_primary_ip = False
3610

    
3611
    for existing_node_name in node_list:
3612
      existing_node = cfg.GetNodeInfo(existing_node_name)
3613

    
3614
      if self.op.readd and node == existing_node_name:
3615
        if existing_node.secondary_ip != secondary_ip:
3616
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
3617
                                     " address configuration as before",
3618
                                     errors.ECODE_INVAL)
3619
        if existing_node.primary_ip != primary_ip:
3620
          self.changed_primary_ip = True
3621

    
3622
        continue
3623

    
3624
      if (existing_node.primary_ip == primary_ip or
3625
          existing_node.secondary_ip == primary_ip or
3626
          existing_node.primary_ip == secondary_ip or
3627
          existing_node.secondary_ip == secondary_ip):
3628
        raise errors.OpPrereqError("New node ip address(es) conflict with"
3629
                                   " existing node %s" % existing_node.name,
3630
                                   errors.ECODE_NOTUNIQUE)
3631

    
3632
    # check that the type of the node (single versus dual homed) is the
3633
    # same as for the master
3634
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
3635
    master_singlehomed = myself.secondary_ip == myself.primary_ip
3636
    newbie_singlehomed = secondary_ip == primary_ip
3637
    if master_singlehomed != newbie_singlehomed:
3638
      if master_singlehomed:
3639
        raise errors.OpPrereqError("The master has no private ip but the"
3640
                                   " new node has one",
3641
                                   errors.ECODE_INVAL)
3642
      else:
3643
        raise errors.OpPrereqError("The master has a private ip but the"
3644
                                   " new node doesn't have one",
3645
                                   errors.ECODE_INVAL)
3646

    
3647
    # checks reachability
3648
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
3649
      raise errors.OpPrereqError("Node not reachable by ping",
3650
                                 errors.ECODE_ENVIRON)
3651

    
3652
    if not newbie_singlehomed:
3653
      # check reachability from my secondary ip to newbie's secondary ip
3654
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
3655
                           source=myself.secondary_ip):
3656
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
3657
                                   " based ping to noded port",
3658
                                   errors.ECODE_ENVIRON)
3659

    
3660
    if self.op.readd:
3661
      exceptions = [node]
3662
    else:
3663
      exceptions = []
3664

    
3665
    self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
3666

    
3667
    if self.op.readd:
3668
      self.new_node = self.cfg.GetNodeInfo(node)
3669
      assert self.new_node is not None, "Can't retrieve locked node %s" % node
3670
    else:
3671
      self.new_node = objects.Node(name=node,
3672
                                   primary_ip=primary_ip,
3673
                                   secondary_ip=secondary_ip,
3674
                                   master_candidate=self.master_candidate,
3675
                                   offline=False, drained=False)
3676

    
3677
  def Exec(self, feedback_fn):
3678
    """Adds the new node to the cluster.
3679

3680
    """
3681
    new_node = self.new_node
3682
    node = new_node.name
3683

    
3684
    # for re-adds, reset the offline/drained/master-candidate flags;
3685
    # we need to reset here, otherwise offline would prevent RPC calls
3686
    # later in the procedure; this also means that if the re-add
3687
    # fails, we are left with a non-offlined, broken node
3688
    if self.op.readd:
3689
      new_node.drained = new_node.offline = False # pylint: disable-msg=W0201
3690
      self.LogInfo("Readding a node, the offline/drained flags were reset")
3691
      # if we demote the node, we do cleanup later in the procedure
3692
      new_node.master_candidate = self.master_candidate
3693
      if self.changed_primary_ip:
3694
        new_node.primary_ip = self.op.primary_ip
3695

    
3696
    # notify the user about any possible mc promotion
3697
    if new_node.master_candidate:
3698
      self.LogInfo("Node will be a master candidate")
3699

    
3700
    # check connectivity
3701
    result = self.rpc.call_version([node])[node]
3702
    result.Raise("Can't get version information from node %s" % node)
3703
    if constants.PROTOCOL_VERSION == result.payload:
3704
      logging.info("Communication to node %s fine, sw version %s match",
3705
                   node, result.payload)
3706
    else:
3707
      raise errors.OpExecError("Version mismatch master version %s,"
3708
                               " node version %s" %
3709
                               (constants.PROTOCOL_VERSION, result.payload))
3710

    
3711
    # setup ssh on node
3712
    if self.cfg.GetClusterInfo().modify_ssh_setup:
3713
      logging.info("Copy ssh key to node %s", node)
3714
      priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
3715
      keyarray = []
3716
      keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
3717
                  constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
3718
                  priv_key, pub_key]
3719

    
3720
      for i in keyfiles:
3721
        keyarray.append(utils.ReadFile(i))
3722

    
3723
      result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
3724
                                      keyarray[2], keyarray[3], keyarray[4],
3725
                                      keyarray[5])
3726
      result.Raise("Cannot transfer ssh keys to the new node")
3727

    
3728
    # Add node to our /etc/hosts, and add key to known_hosts
3729
    if self.cfg.GetClusterInfo().modify_etc_hosts:
3730
      # FIXME: this should be done via an rpc call to node daemon
3731
      utils.AddHostToEtcHosts(new_node.name)
3732

    
3733
    if new_node.secondary_ip != new_node.primary_ip:
3734
      result = self.rpc.call_node_has_ip_address(new_node.name,
3735
                                                 new_node.secondary_ip)
3736
      result.Raise("Failure checking secondary ip on node %s" % new_node.name,
3737
                   prereq=True, ecode=errors.ECODE_ENVIRON)
3738
      if not result.payload:
3739
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
3740
                                 " you gave (%s). Please fix and re-run this"
3741
                                 " command." % new_node.secondary_ip)
3742

    
3743
    node_verify_list = [self.cfg.GetMasterNode()]
3744
    node_verify_param = {
3745
      constants.NV_NODELIST: [node],
3746
      # TODO: do a node-net-test as well?
3747
    }
3748

    
3749
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
3750
                                       self.cfg.GetClusterName())
3751
    for verifier in node_verify_list:
3752
      result[verifier].Raise("Cannot communicate with node %s" % verifier)
3753
      nl_payload = result[verifier].payload[constants.NV_NODELIST]
3754
      if nl_payload:
3755
        for failed in nl_payload:
3756
          feedback_fn("ssh/hostname verification failed"
3757
                      " (checking from %s): %s" %
3758
                      (verifier, nl_payload[failed]))
3759
        raise errors.OpExecError("ssh/hostname verification failed.")
3760

    
3761
    if self.op.readd:
3762
      _RedistributeAncillaryFiles(self)
3763
      self.context.ReaddNode(new_node)
3764
      # make sure we redistribute the config
3765
      self.cfg.Update(new_node, feedback_fn)
3766
      # and make sure the new node will not have old files around
3767
      if not new_node.master_candidate:
3768
        result = self.rpc.call_node_demote_from_mc(new_node.name)
3769
        msg = result.fail_msg
3770
        if msg:
3771
          self.LogWarning("Node failed to demote itself from master"
3772
                          " candidate status: %s" % msg)
3773
    else:
3774
      _RedistributeAncillaryFiles(self, additional_nodes=[node])
3775
      self.context.AddNode(new_node, self.proc.GetECId())
3776

    
3777

    
3778
class LUSetNodeParams(LogicalUnit):
3779
  """Modifies the parameters of a node.
3780

3781
  """
3782
  HPATH = "node-modify"
3783
  HTYPE = constants.HTYPE_NODE
3784
  _OP_REQP = [("node_name", _TNonEmptyString)]
3785
  REQ_BGL = False
3786

    
3787
  def CheckArguments(self):
3788
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
3789
    _CheckBooleanOpField(self.op, 'master_candidate')
3790
    _CheckBooleanOpField(self.op, 'offline')
3791
    _CheckBooleanOpField(self.op, 'drained')
3792
    _CheckBooleanOpField(self.op, 'auto_promote')
3793
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
3794
    if all_mods.count(None) == 3:
3795
      raise errors.OpPrereqError("Please pass at least one modification",
3796
                                 errors.ECODE_INVAL)
3797
    if all_mods.count(True) > 1:
3798
      raise errors.OpPrereqError("Can't set the node into more than one"
3799
                                 " state at the same time",
3800
                                 errors.ECODE_INVAL)
3801

    
3802
    # Boolean value that tells us whether we're offlining or draining the node
3803
    self.offline_or_drain = (self.op.offline == True or
3804
                             self.op.drained == True)
3805
    self.deoffline_or_drain = (self.op.offline == False or
3806
                               self.op.drained == False)
3807
    self.might_demote = (self.op.master_candidate == False or
3808
                         self.offline_or_drain)
3809

    
3810
    self.lock_all = self.op.auto_promote and self.might_demote
3811

    
3812

    
3813
  def ExpandNames(self):
3814
    if self.lock_all:
3815
      self.needed_locks = {locking.LEVEL_NODE: locking.ALL_SET}
3816
    else:
3817
      self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
3818

    
3819
  def BuildHooksEnv(self):
3820
    """Build hooks env.
3821

3822
    This runs on the master node.
3823

3824
    """
3825
    env = {
3826
      "OP_TARGET": self.op.node_name,
3827
      "MASTER_CANDIDATE": str(self.op.master_candidate),
3828
      "OFFLINE": str(self.op.offline),
3829
      "DRAINED": str(self.op.drained),
3830
      }
3831
    nl = [self.cfg.GetMasterNode(),
3832
          self.op.node_name]
3833
    return env, nl, nl
3834

    
3835
  def CheckPrereq(self):
3836
    """Check prerequisites.
3837

3838
    This only checks the instance list against the existing names.
3839

3840
    """
3841
    node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
3842

    
3843
    if (self.op.master_candidate is not None or
3844
        self.op.drained is not None or
3845
        self.op.offline is not None):
3846
      # we can't change the master's node flags
3847
      if self.op.node_name == self.cfg.GetMasterNode():
3848
        raise errors.OpPrereqError("The master role can be changed"
3849
                                   " only via masterfailover",
3850
                                   errors.ECODE_INVAL)
3851

    
3852

    
3853
    if node.master_candidate and self.might_demote and not self.lock_all:
3854
      assert not self.op.auto_promote, "auto-promote set but lock_all not"
3855
      # check if after removing the current node, we're missing master
3856
      # candidates
3857
      (mc_remaining, mc_should, _) = \
3858
          self.cfg.GetMasterCandidateStats(exceptions=[node.name])
3859
      if mc_remaining < mc_should:
3860
        raise errors.OpPrereqError("Not enough master candidates, please"
3861
                                   " pass auto_promote to allow promotion",
3862
                                   errors.ECODE_INVAL)
3863

    
3864
    if (self.op.master_candidate == True and
3865
        ((node.offline and not self.op.offline == False) or
3866
         (node.drained and not self.op.drained == False))):
3867
      raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
3868
                                 " to master_candidate" % node.name,
3869
                                 errors.ECODE_INVAL)
3870

    
3871
    # If we're being deofflined/drained, we'll MC ourself if needed
3872
    if (self.deoffline_or_drain and not self.offline_or_drain and not
3873
        self.op.master_candidate == True and not node.master_candidate):
3874
      self.op.master_candidate = _DecideSelfPromotion(self)
3875
      if self.op.master_candidate:
3876
        self.LogInfo("Autopromoting node to master candidate")
3877

    
3878
    return
3879

    
3880
  def Exec(self, feedback_fn):
3881
    """Modifies a node.
3882

3883
    """
3884
    node = self.node
3885

    
3886
    result = []
3887
    changed_mc = False
3888

    
3889
    if self.op.offline is not None:
3890
      node.offline = self.op.offline
3891
      result.append(("offline", str(self.op.offline)))
3892
      if self.op.offline == True:
3893
        if node.master_candidate:
3894
          node.master_candidate = False
3895
          changed_mc = True
3896
          result.append(("master_candidate", "auto-demotion due to offline"))
3897
        if node.drained:
3898
          node.drained = False
3899
          result.append(("drained", "clear drained status due to offline"))
3900

    
3901
    if self.op.master_candidate is not None:
3902
      node.master_candidate = self.op.master_candidate
3903
      changed_mc = True
3904
      result.append(("master_candidate", str(self.op.master_candidate)))
3905
      if self.op.master_candidate == False:
3906
        rrc = self.rpc.call_node_demote_from_mc(node.name)
3907
        msg = rrc.fail_msg
3908
        if msg:
3909
          self.LogWarning("Node failed to demote itself: %s" % msg)
3910

    
3911
    if self.op.drained is not None:
3912
      node.drained = self.op.drained
3913
      result.append(("drained", str(self.op.drained)))
3914
      if self.op.drained == True:
3915
        if node.master_candidate:
3916
          node.master_candidate = False
3917
          changed_mc = True
3918
          result.append(("master_candidate", "auto-demotion due to drain"))
3919
          rrc = self.rpc.call_node_demote_from_mc(node.name)
3920
          msg = rrc.fail_msg
3921
          if msg:
3922
            self.LogWarning("Node failed to demote itself: %s" % msg)
3923
        if node.offline:
3924
          node.offline = False
3925
          result.append(("offline", "clear offline status due to drain"))
3926

    
3927
    # we locked all nodes, we adjust the CP before updating this node
3928
    if self.lock_all:
3929
      _AdjustCandidatePool(self, [node.name])
3930

    
3931
    # this will trigger configuration file update, if needed
3932
    self.cfg.Update(node, feedback_fn)
3933

    
3934
    # this will trigger job queue propagation or cleanup
3935
    if changed_mc:
3936
      self.context.ReaddNode(node)
3937

    
3938
    return result
3939

    
3940

    
3941
class LUPowercycleNode(NoHooksLU):
3942
  """Powercycles a node.
3943

3944
  """
3945
  _OP_REQP = [
3946
    ("node_name", _TNonEmptyString),
3947
    ("force", _TBool),
3948
    ]
3949
  REQ_BGL = False
3950

    
3951
  def CheckArguments(self):
3952
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
3953
    if self.op.node_name == self.cfg.GetMasterNode() and not self.op.force:
3954
      raise errors.OpPrereqError("The node is the master and the force"
3955
                                 " parameter was not set",
3956
                                 errors.ECODE_INVAL)
3957

    
3958
  def ExpandNames(self):
3959
    """Locking for PowercycleNode.
3960

3961
    This is a last-resort option and shouldn't block on other
3962
    jobs. Therefore, we grab no locks.
3963

3964
    """
3965
    self.needed_locks = {}
3966

    
3967
  def Exec(self, feedback_fn):
3968
    """Reboots a node.
3969

3970
    """
3971
    result = self.rpc.call_node_powercycle(self.op.node_name,
3972
                                           self.cfg.GetHypervisorType())
3973
    result.Raise("Failed to schedule the reboot")
3974
    return result.payload
3975

    
3976

    
3977
class LUQueryClusterInfo(NoHooksLU):
3978
  """Query cluster configuration.
3979

3980
  """
3981
  _OP_REQP = []
3982
  REQ_BGL = False
3983

    
3984
  def ExpandNames(self):
3985
    self.needed_locks = {}
3986

    
3987
  def Exec(self, feedback_fn):
3988
    """Return cluster config.
3989

3990
    """
3991
    cluster = self.cfg.GetClusterInfo()
3992
    os_hvp = {}
3993

    
3994
    # Filter just for enabled hypervisors
3995
    for os_name, hv_dict in cluster.os_hvp.items():
3996
      os_hvp[os_name] = {}
3997
      for hv_name, hv_params in hv_dict.items():
3998
        if hv_name in cluster.enabled_hypervisors:
3999
          os_hvp[os_name][hv_name] = hv_params
4000

    
4001
    result = {
4002
      "software_version": constants.RELEASE_VERSION,
4003
      "protocol_version": constants.PROTOCOL_VERSION,
4004
      "config_version": constants.CONFIG_VERSION,
4005
      "os_api_version": max(constants.OS_API_VERSIONS),
4006
      "export_version": constants.EXPORT_VERSION,
4007
      "architecture": (platform.architecture()[0], platform.machine()),
4008
      "name": cluster.cluster_name,
4009
      "master": cluster.master_node,
4010
      "default_hypervisor": cluster.enabled_hypervisors[0],
4011
      "enabled_hypervisors": cluster.enabled_hypervisors,
4012
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
4013
                        for hypervisor_name in cluster.enabled_hypervisors]),
4014
      "os_hvp": os_hvp,
4015
      "beparams": cluster.beparams,
4016
      "osparams": cluster.osparams,
4017
      "nicparams": cluster.nicparams,
4018
      "candidate_pool_size": cluster.candidate_pool_size,
4019
      "master_netdev": cluster.master_netdev,
4020
      "volume_group_name": cluster.volume_group_name,
4021
      "file_storage_dir": cluster.file_storage_dir,
4022
      "maintain_node_health": cluster.maintain_node_health,
4023
      "ctime": cluster.ctime,
4024
      "mtime": cluster.mtime,
4025
      "uuid": cluster.uuid,
4026
      "tags": list(cluster.GetTags()),
4027
      "uid_pool": cluster.uid_pool,
4028
      }
4029

    
4030
    return result
4031

    
4032

    
4033
class LUQueryConfigValues(NoHooksLU):
4034
  """Return configuration values.
4035

4036
  """
4037
  _OP_REQP = []
4038
  REQ_BGL = False
4039
  _FIELDS_DYNAMIC = utils.FieldSet()
4040
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag",
4041
                                  "wa